From c75b2a70673d9c50db19fa0de5cc40a8bbbc5a26 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Mon, 5 Jan 2026 19:43:54 +0100 Subject: [PATCH] refactor: unify reply dispatch across providers --- CHANGELOG.md | 1 + README.md | 171 +++++------ docs/configuration.md | 3 + docs/heartbeat.md | 7 + src/auto-reply/reply/reply-dispatcher.test.ts | 82 +++++ src/auto-reply/reply/reply-dispatcher.ts | 99 +++++++ src/discord/monitor.tool-result.test.ts | 151 ++++++++++ src/discord/monitor.ts | 72 +++-- src/imessage/monitor.test.ts | 35 +++ src/imessage/monitor.ts | 69 ++--- src/signal/monitor.tool-result.test.ts | 96 ++++++ src/signal/monitor.ts | 72 +++-- src/slack/monitor.tool-result.test.ts | 125 ++++++++ src/slack/monitor.ts | 65 ++-- src/telegram/bot.test.ts | 32 ++ src/telegram/bot.ts | 69 ++--- src/web/auto-reply.ts | 280 ++++++------------ 17 files changed, 953 insertions(+), 476 deletions(-) create mode 100644 src/auto-reply/reply/reply-dispatcher.test.ts create mode 100644 src/auto-reply/reply/reply-dispatcher.ts create mode 100644 src/discord/monitor.tool-result.test.ts create mode 100644 src/signal/monitor.tool-result.test.ts create mode 100644 src/slack/monitor.tool-result.test.ts diff --git a/CHANGELOG.md b/CHANGELOG.md index eb5d3d6cc..14a727498 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -26,6 +26,7 @@ - Block streaming: avoid splitting Markdown fenced blocks and reopen fences when forced to split. - Block streaming: preserve leading indentation in block replies (lists, indented fences). - Docs: document systemd lingering and logged-in session requirements on macOS/Windows. +- Auto-reply: unify tool/block/final delivery across providers and apply consistent heartbeat/prefix handling. Thanks @MSch for PR #225 (superseded commit 92c953d0749143eb2a3f31f3cd6ad0e8eabf48c3). ### Maintenance - Deps: bump pi-* stack, Slack SDK, discord-api-types, file-type, zod, and Biome. diff --git a/README.md b/README.md index e1d100b8b..a0cd141a0 100644 --- a/README.md +++ b/README.md @@ -16,15 +16,15 @@

**Clawdbot** is a *personal AI assistant* you run on your own devices. -It answers you on the surfaces you already use (WhatsApp, Telegram, Discord, iMessage, WebChat), can speak and listen on macOS/iOS, and can render a live Canvas you control. The Gateway is just the control plane — the product is the assistant. +It answers you on the surfaces you already use (WhatsApp, Telegram, Slack, Discord, iMessage, WebChat), can speak and listen on macOS/iOS, and can render a live Canvas you control. The Gateway is just the control plane — the product is the assistant. If you want a personal, single-user assistant that feels local, fast, and always-on, this is it. -Website: https://clawd.me · Docs: https://docs.clawdbot.com/ · FAQ: [`docs/faq.md`](docs/faq.md) · Wizard: [`docs/wizard.md`](docs/wizard.md) · Nix: [nix-clawdbot](https://github.com/clawdbot/nix-clawdbot) · Docker: [`docs/docker.md`](docs/docker.md) · Discord: https://discord.gg/clawd +Website: [clawdbot.com](https://clawdbot.com) · Docs: [docs.clawdbot.com](https://docs.clawdbot.com/) · FAQ: [FAQ](docs/faq.md) · Wizard: [Wizard](docs/wizard.md) · Nix: [nix-clawdbot](https://github.com/clawdbot/nix-clawdbot) · Docker: [Docker](docs/docker.md) · Discord: [discord.gg/clawd](https://discord.gg/clawd) Preferred setup: run the onboarding wizard (`clawdbot onboard`). It walks through gateway, workspace, providers, and skills. The CLI wizard is the recommended path and works on **macOS, Windows, and Linux**. -Subscriptions: **Anthropic (Claude Pro/Max)** and **OpenAI (ChatGPT/Codex)** are supported via OAuth. See `docs/onboarding.md`. +Subscriptions: **Anthropic (Claude Pro/Max)** and **OpenAI (ChatGPT/Codex)** are supported via OAuth. See [Onboarding](docs/onboarding.md). ## Recommended setup (from source) @@ -41,97 +41,6 @@ pnpm ui:build pnpm clawdbot onboard ``` -## Highlights - -- **Local-first Gateway** — single control plane for sessions, providers, tools, and events. -- **Multi-surface inbox** — WhatsApp, Telegram, Discord, iMessage, WebChat, macOS, iOS/Android. -- **Voice Wake + Talk Mode** — always-on speech for macOS/iOS/Android with ElevenLabs. -- **Live Canvas** — agent-driven visual workspace with A2UI. -- **First-class tools** — browser, canvas, nodes, cron, sessions, and Discord actions. -- **Companion apps** — macOS menu bar app + iOS/Android nodes. -- **Onboarding + skills** — wizard-driven setup with bundled/managed/workspace skills. - -## Everything we built so far - -### Core platform -- Gateway WS control plane with sessions, presence, config, cron, webhooks, control UI, and Canvas host. -- CLI surface: gateway, agent, send, wizard, doctor/update, and TUI. -- Pi agent runtime in RPC mode with tool streaming and block streaming. -- Session model: `main` for direct chats, group isolation, activation modes, queue modes, reply-back. -- Media pipeline: images/audio/video, transcription hooks, size caps, temp file lifecycle. - -### Surfaces + providers -- WhatsApp (Baileys), Telegram (grammY), Discord (discord.js), Signal (signal-cli), iMessage (imsg), WebChat. -- Group mention gating, reply tags, per-surface chunking and routing. - -### Apps + nodes -- macOS app: menu bar control plane, Voice Wake/PTT, Talk Mode overlay, WebChat, Debug tools, SSH remote gateway control. -- iOS node: Canvas, Voice Wake, Talk Mode, camera, screen recording, Bonjour pairing. -- Android node: Canvas, Talk Mode, camera, screen recording, optional SMS. -- macOS node mode: system.run/notify + canvas/camera exposure. - -### Tools + automation -- Browser control: dedicated clawd Chrome/Chromium, snapshots, actions, uploads, profiles. -- Canvas: A2UI push/reset, eval, snapshot. -- Nodes: camera snap/clip, screen record, location.get, notifications. -- Cron + wakeups; webhooks; Gmail Pub/Sub triggers. -- Skills platform: bundled, managed, and workspace skills with install gating + UI. - -### Ops + packaging -- Control UI + WebChat served directly from the Gateway. -- Tailscale Serve/Funnel or SSH tunnels with token/password auth. -- Nix mode for declarative config; Docker-based installs. -- Health, doctor migrations, structured logging, release tooling. - -## Changes since 2026.1.4 (2026-01-04) - -### Highlights -- Project rename completed: CLIs, paths, bundle IDs, env vars, and docs unified on Clawdbot. -- Agent-to-agent relay: `sessions_send` ping‑pong with `REPLY_SKIP` plus announce step with `ANNOUNCE_SKIP`. -- Gateway config hot reload, configurable port, and Control UI base-path support. -- Sandbox options: per-session Docker sandbox with hardened limits + optional sandboxed Chromium. -- New node capability: `location.get` across macOS/iOS/Android (CLI + tools). - -### Fixes -- Presence beacons keep node lists fresh; Instances view stays accurate. -- Block streaming + chunking reliability (Telegram/Discord ordering, fewer duplicates). -- WhatsApp GIF playback for MP4-based GIFs. -- Onboarding/Control UI basePath handling fixes + UI polish. -- Cleaner logging + clearer tool summaries. - -### Breaking -- Tool names drop the `clawdbot_` prefix (`browser`, `canvas`, `nodes`, `cron`, `gateway`). -- Bash tool removed `stdinMode: "pty"` support (use tmux for real TTYs). -- Primary session key is fixed to `main` (or `global` for global scope). - -## Project rename + changelog format - -Clawdis → Clawdbot. The rename touched every surface, path, and bundle ID. To make that transition explicit, releases now use **date-based versions** (`YYYY.M.D`), and the changelog is compressed into milestone summaries instead of long semver trains. Full detail still lives in git history and the docs. - -## How it works (short) - -``` -Your surfaces - │ - ▼ -┌───────────────────────────────┐ -│ Gateway │ ws://127.0.0.1:18789 -│ (control plane) │ tcp://0.0.0.0:18790 (optional Bridge) -└──────────────┬────────────────┘ - │ - ├─ Pi agent (RPC) - ├─ CLI (clawdbot …) - ├─ WebChat (browser) - ├─ macOS app (Clawdbot.app) - └─ iOS node (Canvas + voice) -``` - -## Skills registry (ClawdHub) - -ClawdHub is a minimal skill registry. With ClawdHub enabled, the agent can search for skills automatically and pull in new ones as needed. - -https://clawdhub.com - ## Quick start (from source) Runtime: **Node ≥22** + **pnpm**. @@ -156,15 +65,81 @@ pnpm gateway:watch # Send a message pnpm clawdbot send --to +1234567890 --message "Hello from Clawdbot" -# Talk to the assistant (optionally deliver back to WhatsApp/Telegram/Discord) +# Talk to the assistant (optionally deliver back to WhatsApp/Telegram/Slack/Discord) pnpm clawdbot agent --message "Ship checklist" --thinking high ``` If you run from source, prefer `pnpm clawdbot …` (not global `clawdbot`). +## Highlights + +- **Local-first Gateway** — single control plane for sessions, providers, tools, and events. +- **Multi-surface inbox** — WhatsApp, Telegram, Slack, Discord, iMessage, WebChat, macOS, iOS/Android. +- **Voice Wake + Talk Mode** — always-on speech for macOS/iOS/Android with ElevenLabs. +- **Live Canvas** — agent-driven visual workspace with A2UI. +- **First-class tools** — browser, canvas, nodes, cron, sessions, and Discord/Slack actions. +- **Companion apps** — macOS menu bar app + iOS/Android nodes. +- **Onboarding + skills** — wizard-driven setup with bundled/managed/workspace skills. + +## Everything we built so far + +### Core platform +- Gateway WS control plane with sessions, presence, config, cron, webhooks, control UI, and Canvas host. +- CLI surface: gateway, agent, send, wizard, doctor/update, and TUI. +- Pi agent runtime in RPC mode with tool streaming and block streaming. +- Session model: `main` for direct chats, group isolation, activation modes, queue modes, reply-back. +- Media pipeline: images/audio/video, transcription hooks, size caps, temp file lifecycle. + +### Surfaces + providers +- WhatsApp (Baileys), Telegram (grammY), Slack (Bolt), Discord (discord.js), Signal (signal-cli), iMessage (imsg), WebChat. +- Group mention gating, reply tags, per-surface chunking and routing. + +### Apps + nodes +- macOS app: menu bar control plane, Voice Wake/PTT, Talk Mode overlay, WebChat, Debug tools, SSH remote gateway control. +- iOS node: Canvas, Voice Wake, Talk Mode, camera, screen recording, Bonjour pairing. +- Android node: Canvas, Talk Mode, camera, screen recording, optional SMS. +- macOS node mode: system.run/notify + canvas/camera exposure. + +### Tools + automation +- Browser control: dedicated clawd Chrome/Chromium, snapshots, actions, uploads, profiles. +- Canvas: A2UI push/reset, eval, snapshot. +- Nodes: camera snap/clip, screen record, location.get, notifications. +- Cron + wakeups; webhooks; Gmail Pub/Sub triggers. +- Skills platform: bundled, managed, and workspace skills with install gating + UI. + +### Ops + packaging +- Control UI + WebChat served directly from the Gateway. +- Tailscale Serve/Funnel or SSH tunnels with token/password auth. +- Nix mode for declarative config; Docker-based installs. +- Health, doctor migrations, structured logging, release tooling. + +## How it works (short) + +``` +Your surfaces + │ + ▼ +┌───────────────────────────────┐ +│ Gateway │ ws://127.0.0.1:18789 +│ (control plane) │ tcp://0.0.0.0:18790 (optional Bridge) +└──────────────┬────────────────┘ + │ + ├─ Pi agent (RPC) + ├─ CLI (clawdbot …) + ├─ WebChat (browser) + ├─ macOS app (Clawdbot.app) + └─ iOS node (Canvas + voice) +``` + +## Skills registry (ClawdHub) + +ClawdHub is a minimal skill registry. With ClawdHub enabled, the agent can search for skills automatically and pull in new ones as needed. + +https://clawdhub.com + ## Chat commands -Send these in WhatsApp/Telegram/WebChat (group commands are owner-only): +Send these in WhatsApp/Telegram/Slack/WebChat (group commands are owner-only): - `/status` — health + session info (group shows activation mode) - `/new` or `/reset` — reset the session @@ -209,13 +184,13 @@ Build/run: `./scripts/restart-mac.sh` (packages + launches). - Voice trigger forwarding + Canvas surface. - Controlled via `clawdbot nodes …`. -Runbook: `docs/ios/connect.md`. +Runbook: [iOS connect](docs/ios/connect.md). ### Android node (internal) - Pairs via the same Bridge + pairing flow as iOS. - Exposes Canvas, Camera, and Screen capture commands. -- Runbook: `docs/android/connect.md`. +- Runbook: [Android connect](docs/android/connect.md). ## Agent workspace + skills diff --git a/docs/configuration.md b/docs/configuration.md index 006814237..6d1ad7563 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -446,6 +446,9 @@ Controls inbound/outbound prefixes and timestamps. } ``` +`responsePrefix` is applied to **all outbound replies** (tool summaries, block +streaming, final replies) across providers unless already present. + ### `talk` Defaults for Talk mode (macOS/iOS/Android). Voice IDs fall back to `ELEVENLABS_VOICE_ID` or `SAG_VOICE_ID` when unset. diff --git a/docs/heartbeat.md b/docs/heartbeat.md index 0cdb2ea70..4a2ab923b 100644 --- a/docs/heartbeat.md +++ b/docs/heartbeat.md @@ -22,6 +22,13 @@ If the model accidentally includes `HEARTBEAT_OK` at the start or end of a normal (non-heartbeat) reply, Clawdbot strips the token and logs a verbose message. If the reply is only `HEARTBEAT_OK`, it is dropped. +### Outbound normalization (all providers) +For **all providers** (WhatsApp/Web, Telegram, Slack, Discord, Signal, iMessage), +Clawdbot applies the same filtering to tool summaries, streaming block replies, +and final replies: +- drop payloads that are only `HEARTBEAT_OK` with no media +- strip `HEARTBEAT_OK` at the edges when mixed with other text + ## Config ```json5 diff --git a/src/auto-reply/reply/reply-dispatcher.test.ts b/src/auto-reply/reply/reply-dispatcher.test.ts new file mode 100644 index 000000000..d97822fe3 --- /dev/null +++ b/src/auto-reply/reply/reply-dispatcher.test.ts @@ -0,0 +1,82 @@ +import { describe, expect, it, vi } from "vitest"; +import { HEARTBEAT_TOKEN, SILENT_REPLY_TOKEN } from "../tokens.js"; +import { createReplyDispatcher } from "./reply-dispatcher.js"; + +describe("createReplyDispatcher", () => { + it("drops empty payloads and silent tokens without media", async () => { + const deliver = vi.fn().mockResolvedValue(undefined); + const dispatcher = createReplyDispatcher({ deliver }); + + expect(dispatcher.sendFinalReply({})).toBe(false); + expect(dispatcher.sendFinalReply({ text: " " })).toBe(false); + expect(dispatcher.sendFinalReply({ text: SILENT_REPLY_TOKEN })).toBe(false); + + await dispatcher.waitForIdle(); + expect(deliver).not.toHaveBeenCalled(); + }); + + it("strips heartbeat tokens and applies responsePrefix", async () => { + const deliver = vi.fn().mockResolvedValue(undefined); + const onHeartbeatStrip = vi.fn(); + const dispatcher = createReplyDispatcher({ + deliver, + responsePrefix: "PFX", + onHeartbeatStrip, + }); + + expect(dispatcher.sendFinalReply({ text: HEARTBEAT_TOKEN })).toBe(false); + expect( + dispatcher.sendToolResult({ text: `${HEARTBEAT_TOKEN} hello` }), + ).toBe(true); + await dispatcher.waitForIdle(); + + expect(deliver).toHaveBeenCalledTimes(1); + expect(deliver.mock.calls[0][0].text).toBe("PFX hello"); + expect(onHeartbeatStrip).toHaveBeenCalledTimes(2); + }); + + it("avoids double-prefixing and keeps media when heartbeat is the only text", async () => { + const deliver = vi.fn().mockResolvedValue(undefined); + const dispatcher = createReplyDispatcher({ + deliver, + responsePrefix: "PFX", + }); + + expect( + dispatcher.sendFinalReply({ + text: "PFX already", + mediaUrl: "file:///tmp/photo.jpg", + }), + ).toBe(true); + expect( + dispatcher.sendFinalReply({ + text: HEARTBEAT_TOKEN, + mediaUrl: "file:///tmp/photo.jpg", + }), + ).toBe(true); + + await dispatcher.waitForIdle(); + + expect(deliver).toHaveBeenCalledTimes(2); + expect(deliver.mock.calls[0][0].text).toBe("PFX already"); + expect(deliver.mock.calls[1][0].text).toBe(""); + }); + + it("preserves ordering across tool, block, and final replies", async () => { + const delivered: string[] = []; + const deliver = vi.fn(async (_payload, info) => { + delivered.push(info.kind); + if (info.kind === "tool") { + await new Promise((resolve) => setTimeout(resolve, 5)); + } + }); + const dispatcher = createReplyDispatcher({ deliver }); + + dispatcher.sendToolResult({ text: "tool" }); + dispatcher.sendBlockReply({ text: "block" }); + dispatcher.sendFinalReply({ text: "final" }); + + await dispatcher.waitForIdle(); + expect(delivered).toEqual(["tool", "block", "final"]); + }); +}); diff --git a/src/auto-reply/reply/reply-dispatcher.ts b/src/auto-reply/reply/reply-dispatcher.ts new file mode 100644 index 000000000..070cc7a65 --- /dev/null +++ b/src/auto-reply/reply/reply-dispatcher.ts @@ -0,0 +1,99 @@ +import { stripHeartbeatToken } from "../heartbeat.js"; +import { HEARTBEAT_TOKEN, SILENT_REPLY_TOKEN } from "../tokens.js"; +import type { ReplyPayload } from "../types.js"; + +export type ReplyDispatchKind = "tool" | "block" | "final"; + +type ReplyDispatchErrorHandler = ( + err: unknown, + info: { kind: ReplyDispatchKind }, +) => void; + +type ReplyDispatchDeliverer = ( + payload: ReplyPayload, + info: { kind: ReplyDispatchKind }, +) => Promise; + +export type ReplyDispatcherOptions = { + deliver: ReplyDispatchDeliverer; + responsePrefix?: string; + onHeartbeatStrip?: () => void; + onError?: ReplyDispatchErrorHandler; +}; + +type ReplyDispatcher = { + sendToolResult: (payload: ReplyPayload) => boolean; + sendBlockReply: (payload: ReplyPayload) => boolean; + sendFinalReply: (payload: ReplyPayload) => boolean; + waitForIdle: () => Promise; + getQueuedCounts: () => Record; +}; + +function normalizeReplyPayload( + payload: ReplyPayload, + opts: Pick, +): ReplyPayload | null { + const hasMedia = Boolean( + payload.mediaUrl || (payload.mediaUrls?.length ?? 0) > 0, + ); + const trimmed = payload.text?.trim() ?? ""; + if (!trimmed && !hasMedia) return null; + + // Avoid sending the explicit silent token when no media is attached. + if (trimmed === SILENT_REPLY_TOKEN && !hasMedia) return null; + + let text = payload.text ?? undefined; + if (text && !trimmed) { + // Keep empty text when media exists so media-only replies still send. + text = ""; + } + if (text?.includes(HEARTBEAT_TOKEN)) { + const stripped = stripHeartbeatToken(text, { mode: "message" }); + if (stripped.didStrip) opts.onHeartbeatStrip?.(); + if (stripped.shouldSkip && !hasMedia) return null; + text = stripped.text; + } + + if ( + opts.responsePrefix && + text && + text.trim() !== HEARTBEAT_TOKEN && + !text.startsWith(opts.responsePrefix) + ) { + text = `${opts.responsePrefix} ${text}`; + } + + return { ...payload, text }; +} + +export function createReplyDispatcher( + options: ReplyDispatcherOptions, +): ReplyDispatcher { + let sendChain: Promise = Promise.resolve(); + // Serialize outbound replies to preserve tool/block/final order. + const queuedCounts: Record = { + tool: 0, + block: 0, + final: 0, + }; + + const enqueue = (kind: ReplyDispatchKind, payload: ReplyPayload) => { + const normalized = normalizeReplyPayload(payload, options); + if (!normalized) return false; + queuedCounts[kind] += 1; + sendChain = sendChain + .then(() => options.deliver(normalized, { kind })) + .catch((err) => { + options.onError?.(err, { kind }); + }); + return true; + }; + + return { + sendToolResult: (payload) => enqueue("tool", payload), + sendBlockReply: (payload) => enqueue("block", payload), + sendFinalReply: (payload) => enqueue("final", payload), + waitForIdle: () => sendChain, + getQueuedCounts: () => ({ ...queuedCounts }), + }; +} diff --git a/src/discord/monitor.tool-result.test.ts b/src/discord/monitor.tool-result.test.ts new file mode 100644 index 000000000..3b95db93b --- /dev/null +++ b/src/discord/monitor.tool-result.test.ts @@ -0,0 +1,151 @@ +import { beforeEach, describe, expect, it, vi } from "vitest"; + +import { monitorDiscordProvider } from "./monitor.js"; + +const sendMock = vi.fn(); +const replyMock = vi.fn(); +const updateLastRouteMock = vi.fn(); +let config: Record = {}; + +vi.mock("../config/config.js", async (importOriginal) => { + const actual = await importOriginal(); + return { + ...actual, + loadConfig: () => config, + }; +}); + +vi.mock("../auto-reply/reply.js", () => ({ + getReplyFromConfig: (...args: unknown[]) => replyMock(...args), +})); + +vi.mock("./send.js", () => ({ + sendMessageDiscord: (...args: unknown[]) => sendMock(...args), +})); + +vi.mock("../config/sessions.js", () => ({ + resolveStorePath: vi.fn(() => "/tmp/clawdbot-sessions.json"), + updateLastRoute: (...args: unknown[]) => updateLastRouteMock(...args), + resolveSessionKey: vi.fn(), +})); + +vi.mock("discord.js", () => { + const handlers = new Map void>>(); + let lastClient: Client | null = null; + + class Client { + user = { id: "bot-id", tag: "bot#1" }; + constructor() { + lastClient = this; + } + on(event: string, handler: (...args: unknown[]) => void) { + if (!handlers.has(event)) handlers.set(event, new Set()); + handlers.get(event)?.add(handler); + } + once(event: string, handler: (...args: unknown[]) => void) { + this.on(event, handler); + } + off(event: string, handler: (...args: unknown[]) => void) { + handlers.get(event)?.delete(handler); + } + emit(event: string, ...args: unknown[]) { + for (const handler of handlers.get(event) ?? []) { + void handler(...args); + } + } + login = vi.fn().mockResolvedValue(undefined); + destroy = vi.fn().mockResolvedValue(undefined); + } + + return { + Client, + __getLastClient: () => lastClient, + Events: { + ClientReady: "ready", + Error: "error", + MessageCreate: "messageCreate", + MessageReactionAdd: "reactionAdd", + MessageReactionRemove: "reactionRemove", + }, + ChannelType: { + DM: "dm", + GroupDM: "group_dm", + GuildText: "guild_text", + }, + MessageType: { + Default: "default", + ChatInputCommand: "chat_command", + ContextMenuCommand: "context_command", + }, + GatewayIntentBits: {}, + Partials: {}, + }; +}); + +const flush = () => new Promise((resolve) => setTimeout(resolve, 0)); + +async function waitForClient() { + const discord = (await import("discord.js")) as unknown as { + __getLastClient: () => { emit: (...args: unknown[]) => void } | null; + }; + for (let i = 0; i < 10; i += 1) { + const client = discord.__getLastClient(); + if (client) return client; + await flush(); + } + return null; +} + +beforeEach(() => { + config = { + messages: { responsePrefix: "PFX" }, + discord: { dm: { enabled: true } }, + routing: { allowFrom: [] }, + }; + sendMock.mockReset().mockResolvedValue(undefined); + replyMock.mockReset(); + updateLastRouteMock.mockReset(); +}); + +describe("monitorDiscordProvider tool results", () => { + it("sends tool summaries with responsePrefix", async () => { + replyMock.mockImplementation(async (_ctx, opts) => { + await opts?.onToolResult?.({ text: "tool update" }); + return { text: "final reply" }; + }); + + const controller = new AbortController(); + const run = monitorDiscordProvider({ + token: "token", + abortSignal: controller.signal, + }); + + const discord = await import("discord.js"); + const client = await waitForClient(); + if (!client) throw new Error("Discord client not created"); + + client.emit(discord.Events.MessageCreate, { + id: "m1", + content: "hello", + author: { id: "u1", bot: false, username: "Ada" }, + channelId: "c1", + channel: { + type: discord.ChannelType.DM, + isSendable: () => false, + }, + guild: undefined, + mentions: { has: () => false }, + attachments: { first: () => undefined }, + type: discord.MessageType.Default, + createdTimestamp: Date.now(), + }); + + await flush(); + controller.abort(); + await run; + + expect(sendMock).toHaveBeenCalledTimes(2); + expect(sendMock.mock.calls[0][1]).toBe("PFX tool update"); + expect(sendMock.mock.calls[1][1]).toBe("PFX final reply"); + }); +}); diff --git a/src/discord/monitor.ts b/src/discord/monitor.ts index 4b7f6c94b..35bebbab5 100644 --- a/src/discord/monitor.ts +++ b/src/discord/monitor.ts @@ -18,6 +18,7 @@ import { import { chunkText, resolveTextChunkLimit } from "../auto-reply/chunk.js"; import { hasControlCommand } from "../auto-reply/command-detection.js"; import { formatAgentEnvelope } from "../auto-reply/envelope.js"; +import { createReplyDispatcher } from "../auto-reply/reply/reply-dispatcher.js"; import { getReplyFromConfig } from "../auto-reply/reply.js"; import type { ReplyPayload } from "../auto-reply/types.js"; import type { @@ -532,39 +533,36 @@ export async function monitorDiscordProvider(opts: MonitorDiscordOpts = {}) { } let didSendReply = false; - let blockSendChain: Promise = Promise.resolve(); - const sendBlockReply = (payload: ReplyPayload) => { - if ( - !payload?.text && - !payload?.mediaUrl && - !(payload?.mediaUrls?.length ?? 0) - ) { - return; - } - blockSendChain = blockSendChain - .then(async () => { - await deliverReplies({ - replies: [payload], - target: replyTarget, - token, - runtime, - replyToMode, - textLimit, - }); - didSendReply = true; - }) - .catch((err) => { - runtime.error?.( - danger(`discord block reply failed: ${String(err)}`), - ); + const dispatcher = createReplyDispatcher({ + responsePrefix: cfg.messages?.responsePrefix, + deliver: async (payload) => { + await deliverReplies({ + replies: [payload], + target: replyTarget, + token, + runtime, + replyToMode, + textLimit, }); - }; + didSendReply = true; + }, + onError: (err, info) => { + runtime.error?.( + danger(`discord ${info.kind} reply failed: ${String(err)}`), + ); + }, + }); const replyResult = await getReplyFromConfig( ctxPayload, { onReplyStart: () => sendTyping(message), - onBlockReply: sendBlockReply, + onToolResult: (payload) => { + dispatcher.sendToolResult(payload); + }, + onBlockReply: (payload) => { + dispatcher.sendBlockReply(payload); + }, }, cfg, ); @@ -573,8 +571,12 @@ export async function monitorDiscordProvider(opts: MonitorDiscordOpts = {}) { ? replyResult : [replyResult] : []; - await blockSendChain; - if (replies.length === 0) { + let queuedFinal = false; + for (const reply of replies) { + queuedFinal = dispatcher.sendFinalReply(reply) || queuedFinal; + } + await dispatcher.waitForIdle(); + if (!queuedFinal) { if ( isGuildMessage && shouldClearHistory && @@ -585,19 +587,11 @@ export async function monitorDiscordProvider(opts: MonitorDiscordOpts = {}) { } return; } - - await deliverReplies({ - replies, - target: replyTarget, - token, - runtime, - replyToMode, - textLimit, - }); didSendReply = true; if (shouldLogVerbose()) { + const finalCount = dispatcher.getQueuedCounts().final; logVerbose( - `discord: delivered ${replies.length} reply${replies.length === 1 ? "" : "ies"} to ${replyTarget}`, + `discord: delivered ${finalCount} reply${finalCount === 1 ? "" : "ies"} to ${replyTarget}`, ); } if ( diff --git a/src/imessage/monitor.test.ts b/src/imessage/monitor.test.ts index 070afc4ca..6b9dac986 100644 --- a/src/imessage/monitor.test.ts +++ b/src/imessage/monitor.test.ts @@ -139,6 +139,41 @@ describe("monitorIMessageProvider", () => { expect(replyMock).toHaveBeenCalled(); }); + it("prefixes tool and final replies with responsePrefix", async () => { + config = { + ...config, + messages: { responsePrefix: "PFX" }, + }; + replyMock.mockImplementation(async (_ctx, opts) => { + await opts?.onToolResult?.({ text: "tool update" }); + return { text: "final reply" }; + }); + const run = monitorIMessageProvider(); + await waitForSubscribe(); + + notificationHandler?.({ + method: "message", + params: { + message: { + id: 7, + chat_id: 77, + sender: "+15550001111", + is_from_me: false, + text: "hello", + is_group: false, + }, + }, + }); + + await flush(); + closeResolve?.(); + await run; + + expect(sendMock).toHaveBeenCalledTimes(2); + expect(sendMock.mock.calls[0][1]).toBe("PFX tool update"); + expect(sendMock.mock.calls[1][1]).toBe("PFX final reply"); + }); + it("delivers group replies when mentioned", async () => { replyMock.mockResolvedValueOnce({ text: "yo" }); const run = monitorIMessageProvider(); diff --git a/src/imessage/monitor.ts b/src/imessage/monitor.ts index cd47df9b0..300d27ed2 100644 --- a/src/imessage/monitor.ts +++ b/src/imessage/monitor.ts @@ -1,6 +1,7 @@ import { chunkText, resolveTextChunkLimit } from "../auto-reply/chunk.js"; import { hasControlCommand } from "../auto-reply/command-detection.js"; import { formatAgentEnvelope } from "../auto-reply/envelope.js"; +import { createReplyDispatcher } from "../auto-reply/reply/reply-dispatcher.js"; import { getReplyFromConfig } from "../auto-reply/reply.js"; import type { ReplyPayload } from "../auto-reply/types.js"; import { loadConfig } from "../config/config.js"; @@ -267,36 +268,35 @@ export async function monitorIMessageProvider( ); } - let blockSendChain: Promise = Promise.resolve(); - const sendBlockReply = (payload: ReplyPayload) => { - if ( - !payload?.text && - !payload?.mediaUrl && - !(payload?.mediaUrls?.length ?? 0) - ) { - return; - } - blockSendChain = blockSendChain - .then(async () => { - await deliverReplies({ - replies: [payload], - target: ctxPayload.To, - client, - runtime, - maxBytes: mediaMaxBytes, - textLimit, - }); - }) - .catch((err) => { - runtime.error?.( - danger(`imessage block reply failed: ${String(err)}`), - ); + const dispatcher = createReplyDispatcher({ + responsePrefix: cfg.messages?.responsePrefix, + deliver: async (payload) => { + await deliverReplies({ + replies: [payload], + target: ctxPayload.To, + client, + runtime, + maxBytes: mediaMaxBytes, + textLimit, }); - }; + }, + onError: (err, info) => { + runtime.error?.( + danger(`imessage ${info.kind} reply failed: ${String(err)}`), + ); + }, + }); const replyResult = await getReplyFromConfig( ctxPayload, - { onBlockReply: sendBlockReply }, + { + onToolResult: (payload) => { + dispatcher.sendToolResult(payload); + }, + onBlockReply: (payload) => { + dispatcher.sendBlockReply(payload); + }, + }, cfg, ); const replies = replyResult @@ -304,17 +304,12 @@ export async function monitorIMessageProvider( ? replyResult : [replyResult] : []; - await blockSendChain; - if (replies.length === 0) return; - - await deliverReplies({ - replies, - target: ctxPayload.To, - client, - runtime, - maxBytes: mediaMaxBytes, - textLimit, - }); + let queuedFinal = false; + for (const reply of replies) { + queuedFinal = dispatcher.sendFinalReply(reply) || queuedFinal; + } + await dispatcher.waitForIdle(); + if (!queuedFinal) return; }; const client = await createIMessageRpcClient({ diff --git a/src/signal/monitor.tool-result.test.ts b/src/signal/monitor.tool-result.test.ts new file mode 100644 index 000000000..5bf354a53 --- /dev/null +++ b/src/signal/monitor.tool-result.test.ts @@ -0,0 +1,96 @@ +import { beforeEach, describe, expect, it, vi } from "vitest"; + +import { monitorSignalProvider } from "./monitor.js"; + +const sendMock = vi.fn(); +const replyMock = vi.fn(); +const updateLastRouteMock = vi.fn(); +let config: Record = {}; + +vi.mock("../config/config.js", async (importOriginal) => { + const actual = await importOriginal(); + return { + ...actual, + loadConfig: () => config, + }; +}); + +vi.mock("../auto-reply/reply.js", () => ({ + getReplyFromConfig: (...args: unknown[]) => replyMock(...args), +})); + +vi.mock("./send.js", () => ({ + sendMessageSignal: (...args: unknown[]) => sendMock(...args), +})); + +vi.mock("../config/sessions.js", () => ({ + resolveStorePath: vi.fn(() => "/tmp/clawdbot-sessions.json"), + updateLastRoute: (...args: unknown[]) => updateLastRouteMock(...args), +})); + +const streamMock = vi.fn(); +const signalCheckMock = vi.fn(); +const signalRpcRequestMock = vi.fn(); + +vi.mock("./client.js", () => ({ + streamSignalEvents: (...args: unknown[]) => streamMock(...args), + signalCheck: (...args: unknown[]) => signalCheckMock(...args), + signalRpcRequest: (...args: unknown[]) => signalRpcRequestMock(...args), +})); + +vi.mock("./daemon.js", () => ({ + spawnSignalDaemon: vi.fn(() => ({ stop: vi.fn() })), +})); + +const flush = () => new Promise((resolve) => setTimeout(resolve, 0)); + +beforeEach(() => { + config = { + messages: { responsePrefix: "PFX" }, + signal: { autoStart: false }, + routing: { allowFrom: [] }, + }; + sendMock.mockReset().mockResolvedValue(undefined); + replyMock.mockReset(); + updateLastRouteMock.mockReset(); + streamMock.mockReset(); + signalCheckMock.mockReset().mockResolvedValue({}); + signalRpcRequestMock.mockReset().mockResolvedValue({}); +}); + +describe("monitorSignalProvider tool results", () => { + it("sends tool summaries with responsePrefix", async () => { + replyMock.mockImplementation(async (_ctx, opts) => { + await opts?.onToolResult?.({ text: "tool update" }); + return { text: "final reply" }; + }); + + streamMock.mockImplementation(async ({ onEvent }) => { + const payload = { + envelope: { + sourceNumber: "+15550001111", + sourceName: "Ada", + timestamp: 1, + dataMessage: { + message: "hello", + }, + }, + }; + await onEvent({ + event: "receive", + data: JSON.stringify(payload), + }); + }); + + await monitorSignalProvider({ + autoStart: false, + baseUrl: "http://127.0.0.1:8080", + }); + + await flush(); + + expect(sendMock).toHaveBeenCalledTimes(2); + expect(sendMock.mock.calls[0][1]).toBe("PFX tool update"); + expect(sendMock.mock.calls[1][1]).toBe("PFX final reply"); + }); +}); diff --git a/src/signal/monitor.ts b/src/signal/monitor.ts index 9513b05de..3cf92a6bd 100644 --- a/src/signal/monitor.ts +++ b/src/signal/monitor.ts @@ -1,5 +1,6 @@ import { chunkText, resolveTextChunkLimit } from "../auto-reply/chunk.js"; import { formatAgentEnvelope } from "../auto-reply/envelope.js"; +import { createReplyDispatcher } from "../auto-reply/reply/reply-dispatcher.js"; import { getReplyFromConfig } from "../auto-reply/reply.js"; import type { ReplyPayload } from "../auto-reply/types.js"; import { loadConfig } from "../config/config.js"; @@ -379,37 +380,36 @@ export async function monitorSignalProvider( ); } - let blockSendChain: Promise = Promise.resolve(); - const sendBlockReply = (payload: ReplyPayload) => { - if ( - !payload?.text && - !payload?.mediaUrl && - !(payload?.mediaUrls?.length ?? 0) - ) { - return; - } - blockSendChain = blockSendChain - .then(async () => { - await deliverReplies({ - replies: [payload], - target: ctxPayload.To, - baseUrl, - account, - runtime, - maxBytes: mediaMaxBytes, - textLimit, - }); - }) - .catch((err) => { - runtime.error?.( - danger(`signal block reply failed: ${String(err)}`), - ); + const dispatcher = createReplyDispatcher({ + responsePrefix: cfg.messages?.responsePrefix, + deliver: async (payload) => { + await deliverReplies({ + replies: [payload], + target: ctxPayload.To, + baseUrl, + account, + runtime, + maxBytes: mediaMaxBytes, + textLimit, }); - }; + }, + onError: (err, info) => { + runtime.error?.( + danger(`signal ${info.kind} reply failed: ${String(err)}`), + ); + }, + }); const replyResult = await getReplyFromConfig( ctxPayload, - { onBlockReply: sendBlockReply }, + { + onToolResult: (payload) => { + dispatcher.sendToolResult(payload); + }, + onBlockReply: (payload) => { + dispatcher.sendBlockReply(payload); + }, + }, cfg, ); const replies = replyResult @@ -417,18 +417,12 @@ export async function monitorSignalProvider( ? replyResult : [replyResult] : []; - await blockSendChain; - if (replies.length === 0) return; - - await deliverReplies({ - replies, - target: ctxPayload.To, - baseUrl, - account, - runtime, - maxBytes: mediaMaxBytes, - textLimit, - }); + let queuedFinal = false; + for (const reply of replies) { + queuedFinal = dispatcher.sendFinalReply(reply) || queuedFinal; + } + await dispatcher.waitForIdle(); + if (!queuedFinal) return; }; await streamSignalEvents({ diff --git a/src/slack/monitor.tool-result.test.ts b/src/slack/monitor.tool-result.test.ts new file mode 100644 index 000000000..ade080f2b --- /dev/null +++ b/src/slack/monitor.tool-result.test.ts @@ -0,0 +1,125 @@ +import { beforeEach, describe, expect, it, vi } from "vitest"; + +import { monitorSlackProvider } from "./monitor.js"; + +const sendMock = vi.fn(); +const replyMock = vi.fn(); +const updateLastRouteMock = vi.fn(); +let config: Record = {}; +const getSlackHandlers = () => + ( + globalThis as { + __slackHandlers?: Map Promise>; + } + ).__slackHandlers; + +vi.mock("../config/config.js", async (importOriginal) => { + const actual = await importOriginal(); + return { + ...actual, + loadConfig: () => config, + }; +}); + +vi.mock("../auto-reply/reply.js", () => ({ + getReplyFromConfig: (...args: unknown[]) => replyMock(...args), +})); + +vi.mock("./send.js", () => ({ + sendMessageSlack: (...args: unknown[]) => sendMock(...args), +})); + +vi.mock("../config/sessions.js", () => ({ + resolveStorePath: vi.fn(() => "/tmp/clawdbot-sessions.json"), + updateLastRoute: (...args: unknown[]) => updateLastRouteMock(...args), + resolveSessionKey: vi.fn(), +})); + +vi.mock("@slack/bolt", () => { + const handlers = new Map Promise>(); + (globalThis as { __slackHandlers?: typeof handlers }).__slackHandlers = + handlers; + class App { + client = { + auth: { test: vi.fn().mockResolvedValue({ user_id: "bot-user" }) }, + conversations: { + info: vi.fn().mockResolvedValue({ + channel: { name: "dm", is_im: true }, + }), + }, + users: { + info: vi.fn().mockResolvedValue({ + user: { profile: { display_name: "Ada" } }, + }), + }, + }; + event(name: string, handler: (args: unknown) => Promise) { + handlers.set(name, handler); + } + command() { + /* no-op */ + } + start = vi.fn().mockResolvedValue(undefined); + stop = vi.fn().mockResolvedValue(undefined); + } + return { default: { App } }; +}); + +const flush = () => new Promise((resolve) => setTimeout(resolve, 0)); + +async function waitForEvent(name: string) { + for (let i = 0; i < 10; i += 1) { + if (getSlackHandlers()?.has(name)) return; + await flush(); + } +} + +beforeEach(() => { + config = { + messages: { responsePrefix: "PFX" }, + slack: { dm: { enabled: true }, groupDm: { enabled: false } }, + routing: { allowFrom: [] }, + }; + sendMock.mockReset().mockResolvedValue(undefined); + replyMock.mockReset(); + updateLastRouteMock.mockReset(); +}); + +describe("monitorSlackProvider tool results", () => { + it("sends tool summaries with responsePrefix", async () => { + replyMock.mockImplementation(async (_ctx, opts) => { + await opts?.onToolResult?.({ text: "tool update" }); + return { text: "final reply" }; + }); + + const controller = new AbortController(); + const run = monitorSlackProvider({ + botToken: "bot-token", + appToken: "app-token", + abortSignal: controller.signal, + }); + + await waitForEvent("message"); + const handler = getSlackHandlers()?.get("message"); + if (!handler) throw new Error("Slack message handler not registered"); + + await handler({ + event: { + type: "message", + user: "U1", + text: "hello", + ts: "123", + channel: "C1", + channel_type: "im", + }, + }); + + await flush(); + controller.abort(); + await run; + + expect(sendMock).toHaveBeenCalledTimes(2); + expect(sendMock.mock.calls[0][1]).toBe("PFX tool update"); + expect(sendMock.mock.calls[1][1]).toBe("PFX final reply"); + }); +}); diff --git a/src/slack/monitor.ts b/src/slack/monitor.ts index ab8c03b48..4e26961d8 100644 --- a/src/slack/monitor.ts +++ b/src/slack/monitor.ts @@ -6,6 +6,7 @@ import bolt from "@slack/bolt"; import { chunkText, resolveTextChunkLimit } from "../auto-reply/chunk.js"; import { hasControlCommand } from "../auto-reply/command-detection.js"; import { formatAgentEnvelope } from "../auto-reply/envelope.js"; +import { createReplyDispatcher } from "../auto-reply/reply/reply-dispatcher.js"; import { getReplyFromConfig } from "../auto-reply/reply.js"; import { SILENT_REPLY_TOKEN } from "../auto-reply/tokens.js"; import type { ReplyPayload } from "../auto-reply/types.js"; @@ -699,34 +700,33 @@ export async function monitorSlackProvider(opts: MonitorSlackOpts = {}) { ); } - let blockSendChain: Promise = Promise.resolve(); - const sendBlockReply = (payload: ReplyPayload) => { - if ( - !payload?.text && - !payload?.mediaUrl && - !(payload?.mediaUrls?.length ?? 0) - ) { - return; - } - blockSendChain = blockSendChain - .then(async () => { - await deliverReplies({ - replies: [payload], - target: replyTarget, - token: botToken, - runtime, - textLimit, - }); - }) - .catch((err) => { - runtime.error?.(danger(`slack block reply failed: ${String(err)}`)); + const dispatcher = createReplyDispatcher({ + responsePrefix: cfg.messages?.responsePrefix, + deliver: async (payload) => { + await deliverReplies({ + replies: [payload], + target: replyTarget, + token: botToken, + runtime, + textLimit, }); - }; + }, + onError: (err, info) => { + runtime.error?.( + danger(`slack ${info.kind} reply failed: ${String(err)}`), + ); + }, + }); const replyResult = await getReplyFromConfig( ctxPayload, { - onBlockReply: sendBlockReply, + onToolResult: (payload) => { + dispatcher.sendToolResult(payload); + }, + onBlockReply: (payload) => { + dispatcher.sendBlockReply(payload); + }, }, cfg, ); @@ -735,19 +735,16 @@ export async function monitorSlackProvider(opts: MonitorSlackOpts = {}) { ? replyResult : [replyResult] : []; - await blockSendChain; - if (replies.length === 0) return; - - await deliverReplies({ - replies, - target: replyTarget, - token: botToken, - runtime, - textLimit, - }); + let queuedFinal = false; + for (const reply of replies) { + queuedFinal = dispatcher.sendFinalReply(reply) || queuedFinal; + } + await dispatcher.waitForIdle(); + if (!queuedFinal) return; if (shouldLogVerbose()) { + const finalCount = dispatcher.getQueuedCounts().final; logVerbose( - `slack: delivered ${replies.length} reply${replies.length === 1 ? "" : "ies"} to ${replyTarget}`, + `slack: delivered ${finalCount} reply${finalCount === 1 ? "" : "ies"} to ${replyTarget}`, ); } }; diff --git a/src/telegram/bot.test.ts b/src/telegram/bot.test.ts index 364c90188..26f82f659 100644 --- a/src/telegram/bot.test.ts +++ b/src/telegram/bot.test.ts @@ -228,6 +228,38 @@ describe("createTelegramBot", () => { } }); + it("prefixes tool and final replies with responsePrefix", async () => { + onSpy.mockReset(); + sendMessageSpy.mockReset(); + const replySpy = replyModule.__replySpy as unknown as ReturnType< + typeof vi.fn + >; + replySpy.mockReset(); + replySpy.mockImplementation(async (_ctx, opts) => { + await opts?.onToolResult?.({ text: "tool result" }); + return { text: "final reply" }; + }); + loadConfig.mockReturnValue({ messages: { responsePrefix: "PFX" } }); + + createTelegramBot({ token: "tok" }); + const handler = onSpy.mock.calls[0][1] as ( + ctx: Record, + ) => Promise; + await handler({ + message: { + chat: { id: 5, type: "private" }, + text: "hi", + date: 1736380800, + }, + me: { username: "clawdbot_bot" }, + getFile: async () => ({ download: async () => new Uint8Array() }), + }); + + expect(sendMessageSpy).toHaveBeenCalledTimes(2); + expect(sendMessageSpy.mock.calls[0][1]).toBe("PFX tool result"); + expect(sendMessageSpy.mock.calls[1][1]).toBe("PFX final reply"); + }); + it("honors replyToMode=all for threaded replies", async () => { onSpy.mockReset(); sendMessageSpy.mockReset(); diff --git a/src/telegram/bot.ts b/src/telegram/bot.ts index cfe76d0bc..ff7722c17 100644 --- a/src/telegram/bot.ts +++ b/src/telegram/bot.ts @@ -7,6 +7,7 @@ import { Bot, InputFile, webhookCallback } from "grammy"; import { chunkText, resolveTextChunkLimit } from "../auto-reply/chunk.js"; import { hasControlCommand } from "../auto-reply/command-detection.js"; import { formatAgentEnvelope } from "../auto-reply/envelope.js"; +import { createReplyDispatcher } from "../auto-reply/reply/reply-dispatcher.js"; import { getReplyFromConfig } from "../auto-reply/reply.js"; import type { ReplyPayload } from "../auto-reply/types.js"; import type { ReplyToMode } from "../config/config.js"; @@ -228,37 +229,33 @@ export function createTelegramBot(opts: TelegramBotOptions) { ); } - let blockSendChain: Promise = Promise.resolve(); - const sendBlockReply = (payload: ReplyPayload) => { - if ( - !payload?.text && - !payload?.mediaUrl && - !(payload?.mediaUrls?.length ?? 0) - ) { - return; - } - blockSendChain = blockSendChain - .then(async () => { - await deliverReplies({ - replies: [payload], - chatId: String(chatId), - token: opts.token, - runtime, - bot, - replyToMode, - textLimit, - }); - }) - .catch((err) => { - runtime.error?.( - danger(`telegram block reply failed: ${String(err)}`), - ); + const dispatcher = createReplyDispatcher({ + responsePrefix: cfg.messages?.responsePrefix, + deliver: async (payload) => { + await deliverReplies({ + replies: [payload], + chatId: String(chatId), + token: opts.token, + runtime, + bot, + replyToMode, + textLimit, }); - }; + }, + onError: (err, info) => { + runtime.error?.( + danger(`telegram ${info.kind} reply failed: ${String(err)}`), + ); + }, + }); const replyResult = await getReplyFromConfig( ctxPayload, - { onReplyStart: sendTyping, onBlockReply: sendBlockReply }, + { + onReplyStart: sendTyping, + onToolResult: dispatcher.sendToolResult, + onBlockReply: dispatcher.sendBlockReply, + }, cfg, ); const replies = replyResult @@ -266,18 +263,12 @@ export function createTelegramBot(opts: TelegramBotOptions) { ? replyResult : [replyResult] : []; - await blockSendChain; - if (replies.length === 0) return; - - await deliverReplies({ - replies, - chatId: String(chatId), - token: opts.token, - runtime, - bot, - replyToMode, - textLimit, - }); + let queuedFinal = false; + for (const reply of replies) { + queuedFinal = dispatcher.sendFinalReply(reply) || queuedFinal; + } + await dispatcher.waitForIdle(); + if (!queuedFinal) return; } catch (err) { runtime.error?.(danger(`handler failed: ${String(err)}`)); } diff --git a/src/web/auto-reply.ts b/src/web/auto-reply.ts index bbca52d41..4600d6682 100644 --- a/src/web/auto-reply.ts +++ b/src/web/auto-reply.ts @@ -8,6 +8,7 @@ import { HEARTBEAT_PROMPT, stripHeartbeatToken, } from "../auto-reply/heartbeat.js"; +import { createReplyDispatcher } from "../auto-reply/reply/reply-dispatcher.js"; import { getReplyFromConfig } from "../auto-reply/reply.js"; import { HEARTBEAT_TOKEN, SILENT_REPLY_TOKEN } from "../auto-reply/tokens.js"; import type { ReplyPayload } from "../auto-reply/types.js"; @@ -222,14 +223,6 @@ function debugMention( export { stripHeartbeatToken }; -function isSilentReply(payload?: ReplyPayload): boolean { - if (!payload) return false; - const text = payload.text?.trim(); - if (!text || text !== SILENT_REPLY_TOKEN) return false; - if (payload.mediaUrl || payload.mediaUrls?.length) return false; - return true; -} - function resolveHeartbeatReplyPayload( replyResult: ReplyPayload | ReplyPayload[] | undefined, ): ReplyPayload | undefined { @@ -952,6 +945,25 @@ export async function monitorWebProvider( // Track recently sent messages to prevent echo loops const recentlySent = new Set(); const MAX_RECENT_MESSAGES = 100; + const rememberSentText = ( + text: string | undefined, + opts: { combinedBody: string; logVerboseMessage?: boolean }, + ) => { + if (!text) return; + recentlySent.add(text); + if (opts.combinedBody) { + recentlySent.add(opts.combinedBody); + } + if (opts.logVerboseMessage) { + logVerbose( + `Added to echo detection set (size now: ${recentlySent.size}): ${text.substring(0, 50)}...`, + ); + } + if (recentlySent.size > MAX_RECENT_MESSAGES) { + const firstKey = recentlySent.values().next().value; + if (firstKey) recentlySent.delete(firstKey); + } + }; while (true) { if (stopRequested()) break; @@ -1103,114 +1115,71 @@ export async function monitorWebProvider( } } - const responsePrefix = cfg.messages?.responsePrefix; const textLimit = resolveTextChunkLimit(cfg, "whatsapp"); let didLogHeartbeatStrip = false; let didSendReply = false; - let toolSendChain: Promise = Promise.resolve(); - const sendToolResult = (payload: ReplyPayload) => { - if ( - !payload?.text && - !payload?.mediaUrl && - !(payload?.mediaUrls?.length ?? 0) - ) { - return; - } - if (isSilentReply(payload)) return; - const toolPayload: ReplyPayload = { ...payload }; - if (toolPayload.text?.includes(HEARTBEAT_TOKEN)) { - const stripped = stripHeartbeatToken(toolPayload.text, { - mode: "message", - }); - if (stripped.didStrip && !didLogHeartbeatStrip) { + const dispatcher = createReplyDispatcher({ + responsePrefix: cfg.messages?.responsePrefix, + onHeartbeatStrip: () => { + if (!didLogHeartbeatStrip) { didLogHeartbeatStrip = true; logVerbose("Stripped stray HEARTBEAT_OK token from web reply"); } - const hasMedia = Boolean( - toolPayload.mediaUrl || (toolPayload.mediaUrls?.length ?? 0) > 0, + }, + deliver: async (payload, info) => { + await deliverWebReply({ + replyResult: payload, + msg, + maxMediaBytes, + textLimit, + replyLogger, + connectionId, + // Tool + block updates are noisy; skip their log lines. + skipLog: info.kind !== "final", + }); + didSendReply = true; + if (info.kind === "tool") { + rememberSentText(payload.text, { combinedBody: "" }); + return; + } + const shouldLog = + info.kind === "final" && payload.text ? true : undefined; + rememberSentText(payload.text, { + combinedBody, + logVerboseMessage: shouldLog, + }); + if (info.kind === "final") { + const fromDisplay = + msg.chatType === "group" + ? conversationId + : (msg.from ?? "unknown"); + const hasMedia = Boolean( + payload.mediaUrl || payload.mediaUrls?.length, + ); + whatsappOutboundLog.info( + `Auto-replied to ${fromDisplay}${hasMedia ? " (media)" : ""}`, + ); + if (shouldLogVerbose()) { + const preview = + payload.text != null ? elide(payload.text, 400) : ""; + whatsappOutboundLog.debug( + `Reply body: ${preview}${hasMedia ? " (media)" : ""}`, + ); + } + } + }, + onError: (err, info) => { + const label = + info.kind === "tool" + ? "tool update" + : info.kind === "block" + ? "block update" + : "auto-reply"; + whatsappOutboundLog.error( + `Failed sending web ${label} to ${msg.from ?? conversationId}: ${formatError(err)}`, ); - if (stripped.shouldSkip && !hasMedia) return; - toolPayload.text = stripped.text; - } - if ( - responsePrefix && - toolPayload.text && - toolPayload.text.trim() !== HEARTBEAT_TOKEN && - !toolPayload.text.startsWith(responsePrefix) - ) { - toolPayload.text = `${responsePrefix} ${toolPayload.text}`; - } - toolSendChain = toolSendChain - .then(async () => { - await deliverWebReply({ - replyResult: toolPayload, - msg, - maxMediaBytes, - textLimit, - replyLogger, - connectionId, - skipLog: true, - }); - didSendReply = true; - if (toolPayload.text) { - recentlySent.add(toolPayload.text); - if (recentlySent.size > MAX_RECENT_MESSAGES) { - const firstKey = recentlySent.values().next().value; - if (firstKey) recentlySent.delete(firstKey); - } - } - }) - .catch((err) => { - whatsappOutboundLog.error( - `Failed sending web tool update to ${msg.from ?? conversationId}: ${formatError(err)}`, - ); - }); - }; - const sendBlockReply = (payload: ReplyPayload) => { - if ( - !payload?.text && - !payload?.mediaUrl && - !(payload?.mediaUrls?.length ?? 0) - ) { - return; - } - if (isSilentReply(payload)) return; - const blockPayload: ReplyPayload = { ...payload }; - if ( - responsePrefix && - blockPayload.text && - blockPayload.text.trim() !== HEARTBEAT_TOKEN && - !blockPayload.text.startsWith(responsePrefix) - ) { - blockPayload.text = `${responsePrefix} ${blockPayload.text}`; - } - toolSendChain = toolSendChain - .then(async () => { - await deliverWebReply({ - replyResult: blockPayload, - msg, - maxMediaBytes, - textLimit, - replyLogger, - connectionId, - skipLog: true, - }); - didSendReply = true; - if (blockPayload.text) { - recentlySent.add(blockPayload.text); - recentlySent.add(combinedBody); - if (recentlySent.size > MAX_RECENT_MESSAGES) { - const firstKey = recentlySent.values().next().value; - if (firstKey) recentlySent.delete(firstKey); - } - } - }) - .catch((err) => { - whatsappOutboundLog.error( - `Failed sending web block update to ${msg.from ?? conversationId}: ${formatError(err)}`, - ); - }); - }; + }, + }); const replyResult = await (replyResolver ?? getReplyFromConfig)( { @@ -1238,8 +1207,12 @@ export async function monitorWebProvider( }, { onReplyStart: msg.sendComposing, - onToolResult: sendToolResult, - onBlockReply: sendBlockReply, + onToolResult: (payload) => { + dispatcher.sendToolResult(payload); + }, + onBlockReply: (payload) => { + dispatcher.sendBlockReply(payload); + }, }, ); @@ -1249,12 +1222,12 @@ export async function monitorWebProvider( : [replyResult] : []; - const sendableReplies = replyList.filter( - (payload) => !isSilentReply(payload), - ); - - if (sendableReplies.length === 0) { - await toolSendChain; + let queuedFinal = false; + for (const replyPayload of replyList) { + queuedFinal = dispatcher.sendFinalReply(replyPayload) || queuedFinal; + } + await dispatcher.waitForIdle(); + if (!queuedFinal) { if (shouldClearGroupHistory && didSendReply) { groupHistories.set(conversationId, []); } @@ -1264,79 +1237,6 @@ export async function monitorWebProvider( return; } - await toolSendChain; - - for (const replyPayload of sendableReplies) { - if (replyPayload.text?.includes(HEARTBEAT_TOKEN)) { - const stripped = stripHeartbeatToken(replyPayload.text, { - mode: "message", - }); - if (stripped.didStrip && !didLogHeartbeatStrip) { - didLogHeartbeatStrip = true; - logVerbose("Stripped stray HEARTBEAT_OK token from web reply"); - } - const hasMedia = Boolean( - replyPayload.mediaUrl || (replyPayload.mediaUrls?.length ?? 0) > 0, - ); - if (stripped.shouldSkip && !hasMedia) continue; - replyPayload.text = stripped.text; - } - if ( - responsePrefix && - replyPayload.text && - replyPayload.text.trim() !== HEARTBEAT_TOKEN && - !replyPayload.text.startsWith(responsePrefix) - ) { - replyPayload.text = `${responsePrefix} ${replyPayload.text}`; - } - - try { - await deliverWebReply({ - replyResult: replyPayload, - msg, - maxMediaBytes, - textLimit, - replyLogger, - connectionId, - }); - didSendReply = true; - - if (replyPayload.text) { - recentlySent.add(replyPayload.text); - recentlySent.add(combinedBody); // Prevent echo on the combined text itself - logVerbose( - `Added to echo detection set (size now: ${recentlySent.size}): ${replyPayload.text.substring(0, 50)}...`, - ); - if (recentlySent.size > MAX_RECENT_MESSAGES) { - const firstKey = recentlySent.values().next().value; - if (firstKey) recentlySent.delete(firstKey); - } - } - - const fromDisplay = - msg.chatType === "group" ? conversationId : (msg.from ?? "unknown"); - const hasMedia = Boolean( - replyPayload.mediaUrl || replyPayload.mediaUrls?.length, - ); - whatsappOutboundLog.info( - `Auto-replied to ${fromDisplay}${hasMedia ? " (media)" : ""}`, - ); - if (shouldLogVerbose()) { - const preview = - replyPayload.text != null - ? elide(replyPayload.text, 400) - : ""; - whatsappOutboundLog.debug( - `Reply body: ${preview}${hasMedia ? " (media)" : ""}`, - ); - } - } catch (err) { - whatsappOutboundLog.error( - `Failed sending web auto-reply to ${msg.from ?? conversationId}: ${formatError(err)}`, - ); - } - } - if (shouldClearGroupHistory && didSendReply) { groupHistories.set(conversationId, []); }