From fd15704c775eb503f562c533efedfb5bce312dc3 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Fri, 9 Jan 2026 18:19:55 +0000 Subject: [PATCH] fix(auto-reply): coalesce block replies and document streaming toggles (#536) (thanks @mcinteerj) --- AGENTS.md | 2 +- CHANGELOG.md | 1 + docs/concepts/agent.md | 2 + docs/concepts/streaming.md | 16 ++ docs/gateway/configuration-examples.md | 3 + docs/gateway/configuration.md | 7 + src/agents/pi-embedded-subscribe.ts | 9 + src/auto-reply/reply.block-streaming.test.ts | 2 +- src/auto-reply/reply.ts | 6 +- .../agent-runner.block-streaming.test.ts | 132 +++++++++++++ src/auto-reply/reply/agent-runner.ts | 122 +++--------- .../reply/block-reply-coalescer.test.ts | 71 +++++++ src/auto-reply/reply/block-reply-coalescer.ts | 125 +++++++++++++ src/auto-reply/reply/block-reply-pipeline.ts | 173 ++++++++++++++++++ src/auto-reply/reply/block-streaming.ts | 98 +++++++++- src/config/types.ts | 27 +++ src/config/zod-schema.ts | 15 ++ src/telegram/bot.ts | 2 +- 18 files changed, 714 insertions(+), 99 deletions(-) create mode 100644 src/auto-reply/reply/agent-runner.block-streaming.test.ts create mode 100644 src/auto-reply/reply/block-reply-coalescer.test.ts create mode 100644 src/auto-reply/reply/block-reply-coalescer.ts create mode 100644 src/auto-reply/reply/block-reply-pipeline.ts diff --git a/AGENTS.md b/AGENTS.md index 3b66890c7..c09b361e3 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -87,7 +87,7 @@ - **Multi-agent safety:** focus reports on your edits; avoid guard-rail disclaimers unless truly blocked; when multiple agents touch the same file, continue if safe; end with a brief “other files present” note only if relevant. - Bug investigations: read source code of relevant npm dependencies and all related local code before concluding; aim for high-confidence root cause. - Code style: add brief comments for tricky logic; keep files under ~500 LOC when feasible (split/refactor as needed). -- When asked to open a “session” file, open the Pi session logs under `~/.clawdbot/sessions/*.jsonl` (newest unless a specific ID is given), not the default `sessions.json`. If logs are needed from another machine, SSH via Tailscale and read the same path there. +- When asked to open a “session” file, open the Pi session logs under `~/.clawdbot/agents/main/sessions/*.jsonl` (newest unless a specific ID is given), not the default `sessions.json`. If logs are needed from another machine, SSH via Tailscale and read the same path there. - Menubar dimming + restart flow mirrors Trimmy: use `scripts/restart-mac.sh` (kills all Clawdbot variants, runs `swift build`, packages, relaunches). Icon dimming depends on MenuBarExtraAccess wiring in AppMain; keep `appearsDisabled` updates intact when touching the status item. - Do not rebuild the macOS app over SSH; rebuilds must be run directly on the Mac. - Never send streaming/partial replies to external messaging surfaces (WhatsApp, Telegram); only final replies should be delivered there. Streaming/tool events may still go to internal UIs/control channel. diff --git a/CHANGELOG.md b/CHANGELOG.md index d094ef960..90128e027 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -51,6 +51,7 @@ - Auto-reply: avoid splitting outbound chunks inside parentheses. (#499) — thanks @philipp-spiess - Auto-reply: preserve spacing when stripping inline directives. (#539) — thanks @joshp123 - Auto-reply: relax reply tag parsing to allow whitespace. (#560) — thanks @mcinteerj +- Auto-reply: add per-provider block streaming toggles and coalesce streamed blocks to reduce line spam. (#536) — thanks @mcinteerj - Auto-reply: fix /status usage summary filtering for the active provider. - Status: show provider prefix in /status model display. (#506) — thanks @mcinteerj - Status: compact /status with session token usage + estimated cost, add `/cost` per-response usage lines (tokens-only for OAuth). diff --git a/docs/concepts/agent.md b/docs/concepts/agent.md index 13dcd75bc..33774740d 100644 --- a/docs/concepts/agent.md +++ b/docs/concepts/agent.md @@ -88,6 +88,8 @@ via `agents.defaults.blockStreamingDefault: "off"` if you only want the final re Tune the boundary via `agents.defaults.blockStreamingBreak` (`text_end` vs `message_end`; defaults to text_end). Control soft block chunking with `agents.defaults.blockStreamingChunk` (defaults to 800–1200 chars; prefers paragraph breaks, then newlines; sentences last). +Coalesce streamed chunks with `agents.defaults.blockStreamingCoalesce` to reduce +single-line spam (idle-based merging before send). Verbose tool summaries are emitted at tool start (no debounce); Control UI streams tool output via agent events when available. More details: [Streaming + chunking](/concepts/streaming). diff --git a/docs/concepts/streaming.md b/docs/concepts/streaming.md index 9d8f9e6c6..ff6a385c2 100644 --- a/docs/concepts/streaming.md +++ b/docs/concepts/streaming.md @@ -33,8 +33,10 @@ Legend: **Controls:** - `agents.defaults.blockStreamingDefault`: `"on"`/`"off"` (default on). +- Provider overrides: `*.blockStreaming` (and per-account variants) to force `"on"`/`"off"` per provider. - `agents.defaults.blockStreamingBreak`: `"text_end"` or `"message_end"`. - `agents.defaults.blockStreamingChunk`: `{ minChars, maxChars, breakPreference? }`. +- `agents.defaults.blockStreamingCoalesce`: `{ minChars?, maxChars?, idleMs? }` (merge streamed blocks before send). - Provider hard cap: `*.textChunkLimit` (e.g., `whatsapp.textChunkLimit`). - Discord soft cap: `discord.maxLinesPerMessage` (default 17) splits tall replies to avoid UI clipping. @@ -54,6 +56,20 @@ Block chunking is implemented by `EmbeddedBlockChunker`: `maxChars` is clamped to the provider `textChunkLimit`, so you can’t exceed per-provider caps. +## Coalescing (merge streamed blocks) + +When block streaming is enabled, Clawdbot can **merge consecutive block chunks** +before sending them out. This reduces “single-line spam” while still providing +progressive output. + +- Coalescing waits for **idle gaps** (`idleMs`) before flushing. +- Buffers are capped by `maxChars` and will flush if they exceed it. +- `minChars` prevents tiny fragments from sending until enough text accumulates + (final flush always sends remaining text). +- Joiner is derived from `blockStreamingChunk.breakPreference` + (`paragraph` → `\n\n`, `newline` → `\n`, `sentence` → space). +- Provider overrides are available via `*.blockStreamingCoalesce` (including per-account configs). + ## “Stream chunks or everything” This maps to: diff --git a/docs/gateway/configuration-examples.md b/docs/gateway/configuration-examples.md index 3bb47b681..e0ce44079 100644 --- a/docs/gateway/configuration-examples.md +++ b/docs/gateway/configuration-examples.md @@ -215,6 +215,9 @@ Save to `~/.clawdbot/clawdbot.json` and you can DM the bot from that number. maxChars: 1200, breakPreference: "paragraph" }, + blockStreamingCoalesce: { + idleMs: 400 + }, timeoutSeconds: 600, mediaMaxMb: 5, typingIntervalSeconds: 5, diff --git a/docs/gateway/configuration.md b/docs/gateway/configuration.md index 78ff0ea10..f3f5e8ee5 100644 --- a/docs/gateway/configuration.md +++ b/docs/gateway/configuration.md @@ -1142,6 +1142,7 @@ See [/concepts/session-pruning](/concepts/session-pruning) for behavior details. Block streaming: - `agents.defaults.blockStreamingDefault`: `"on"`/`"off"` (default on). +- Provider overrides: `*.blockStreaming` (and per-account variants) to force block streaming on/off. - `agents.defaults.blockStreamingBreak`: `"text_end"` or `"message_end"` (default: text_end). - `agents.defaults.blockStreamingChunk`: soft chunking for streamed blocks. Defaults to 800–1200 chars, prefers paragraph breaks (`\n\n`), then newlines, then sentences. @@ -1151,6 +1152,12 @@ Block streaming: agents: { defaults: { blockStreamingChunk: { minChars: 800, maxChars: 1200 } } } } ``` +- `agents.defaults.blockStreamingCoalesce`: merge streamed blocks before sending. + Defaults to `{ idleMs: 400 }` and inherits `minChars` from `blockStreamingChunk` + with `maxChars` capped to the provider text limit. + Provider overrides: `whatsapp.blockStreamingCoalesce`, `telegram.blockStreamingCoalesce`, + `discord.blockStreamingCoalesce`, `slack.blockStreamingCoalesce`, `signal.blockStreamingCoalesce`, + `imessage.blockStreamingCoalesce`, `msteams.blockStreamingCoalesce` (and per-account variants). See [/concepts/streaming](/concepts/streaming) for behavior + chunking details. Typing indicators: diff --git a/src/agents/pi-embedded-subscribe.ts b/src/agents/pi-embedded-subscribe.ts index 3f57c0288..62be6d416 100644 --- a/src/agents/pi-embedded-subscribe.ts +++ b/src/agents/pi-embedded-subscribe.ts @@ -1045,6 +1045,15 @@ export function subscribeEmbeddedPiSession(params: { stream: "lifecycle", data: { phase: "end" }, }); + if (params.onBlockReply) { + if (blockChunker?.hasBuffered()) { + blockChunker.drain({ force: true, emit: emitBlockChunk }); + blockChunker.reset(); + } else if (blockBuffer.length > 0) { + emitBlockChunk(blockBuffer); + blockBuffer = ""; + } + } if (pendingCompactionRetry > 0) { resolveCompactionRetry(); } else { diff --git a/src/auto-reply/reply.block-streaming.test.ts b/src/auto-reply/reply.block-streaming.test.ts index fd6aded66..1aa142470 100644 --- a/src/auto-reply/reply.block-streaming.test.ts +++ b/src/auto-reply/reply.block-streaming.test.ts @@ -149,7 +149,7 @@ describe("block streaming", () => { const res = await replyPromise; expect(res).toBeUndefined(); - expect(seen).toEqual(["first", "second"]); + expect(seen).toEqual(["first\n\nsecond"]); }); }); diff --git a/src/auto-reply/reply.ts b/src/auto-reply/reply.ts index 4fb4491e8..a8e7de8fa 100644 --- a/src/auto-reply/reply.ts +++ b/src/auto-reply/reply.ts @@ -454,7 +454,11 @@ export async function getReplyFromConfig( const blockStreamingEnabled = resolvedBlockStreaming === "on" && opts?.disableBlockStreaming !== true; const blockReplyChunking = blockStreamingEnabled - ? resolveBlockStreamingChunking(cfg, sessionCtx.Provider) + ? resolveBlockStreamingChunking( + cfg, + sessionCtx.Provider, + sessionCtx.AccountId, + ) : undefined; const modelState = await createModelSelectionState({ diff --git a/src/auto-reply/reply/agent-runner.block-streaming.test.ts b/src/auto-reply/reply/agent-runner.block-streaming.test.ts new file mode 100644 index 000000000..91c665682 --- /dev/null +++ b/src/auto-reply/reply/agent-runner.block-streaming.test.ts @@ -0,0 +1,132 @@ +import { describe, expect, it, vi } from "vitest"; + +import type { TemplateContext } from "../templating.js"; +import type { FollowupRun, QueueSettings } from "./queue.js"; +import { createMockTypingController } from "./test-helpers.js"; + +const runEmbeddedPiAgentMock = vi.fn(); + +vi.mock("../../agents/model-fallback.js", () => ({ + runWithModelFallback: async ({ + provider, + model, + run, + }: { + provider: string; + model: string; + run: (provider: string, model: string) => Promise; + }) => ({ + result: await run(provider, model), + provider, + model, + }), +})); + +vi.mock("../../agents/pi-embedded.js", () => ({ + queueEmbeddedPiMessage: vi.fn().mockReturnValue(false), + runEmbeddedPiAgent: (params: unknown) => runEmbeddedPiAgentMock(params), +})); + +vi.mock("./queue.js", async () => { + const actual = + await vi.importActual("./queue.js"); + return { + ...actual, + enqueueFollowupRun: vi.fn(), + scheduleFollowupDrain: vi.fn(), + }; +}); + +import { runReplyAgent } from "./agent-runner.js"; + +describe("runReplyAgent block streaming", () => { + it("coalesces duplicate text_end block replies", async () => { + const onBlockReply = vi.fn(); + runEmbeddedPiAgentMock.mockImplementationOnce(async (params) => { + const block = params.onBlockReply as + | ((payload: { text?: string }) => void) + | undefined; + block?.({ text: "Hello" }); + block?.({ text: "Hello" }); + return { + payloads: [{ text: "Final message" }], + meta: {}, + }; + }); + + const typing = createMockTypingController(); + const sessionCtx = { + Provider: "discord", + OriginatingTo: "channel:C1", + AccountId: "primary", + MessageSid: "msg", + } as unknown as TemplateContext; + const resolvedQueue = { mode: "interrupt" } as unknown as QueueSettings; + const followupRun = { + prompt: "hello", + summaryLine: "hello", + enqueuedAt: Date.now(), + run: { + sessionId: "session", + sessionKey: "main", + messageProvider: "discord", + sessionFile: "/tmp/session.jsonl", + workspaceDir: "/tmp", + config: { + agents: { + defaults: { + blockStreamingCoalesce: { + minChars: 1, + maxChars: 200, + idleMs: 0, + }, + }, + }, + }, + skillsSnapshot: {}, + provider: "anthropic", + model: "claude", + thinkLevel: "low", + verboseLevel: "off", + elevatedLevel: "off", + bashElevated: { + enabled: false, + allowed: false, + defaultLevel: "off", + }, + timeoutMs: 1_000, + blockReplyBreak: "text_end", + }, + } as unknown as FollowupRun; + + const result = await runReplyAgent({ + commandBody: "hello", + followupRun, + queueKey: "main", + resolvedQueue, + shouldSteer: false, + shouldFollowup: false, + isActive: false, + isStreaming: false, + opts: { onBlockReply }, + typing, + sessionCtx, + defaultModel: "anthropic/claude-opus-4-5", + resolvedVerboseLevel: "off", + isNewSession: false, + blockStreamingEnabled: true, + blockReplyChunking: { + minChars: 1, + maxChars: 200, + breakPreference: "paragraph", + }, + resolvedBlockStreamingBreak: "text_end", + shouldInjectGroupIntro: false, + typingMode: "instant", + }); + + expect(onBlockReply).toHaveBeenCalledTimes(1); + expect(onBlockReply.mock.calls[0][0].text).toBe("Hello"); + expect(result).toBeUndefined(); + }); +}); diff --git a/src/auto-reply/reply/agent-runner.ts b/src/auto-reply/reply/agent-runner.ts index 8d209d413..137720b14 100644 --- a/src/auto-reply/reply/agent-runner.ts +++ b/src/auto-reply/reply/agent-runner.ts @@ -35,6 +35,8 @@ import { normalizeVerboseLevel, type VerboseLevel } from "../thinking.js"; import { SILENT_REPLY_TOKEN } from "../tokens.js"; import type { GetReplyOptions, ReplyPayload } from "../types.js"; import { extractAudioTag } from "./audio-tags.js"; +import { createBlockReplyPipeline } from "./block-reply-pipeline.js"; +import { resolveBlockStreamingCoalescing } from "./block-streaming.js"; import { createFollowupRunner } from "./followup-runner.js"; import { enqueueFollowupRun, @@ -132,23 +134,6 @@ const appendUsageLine = ( return updated; }; -const withTimeout = async ( - promise: Promise, - timeoutMs: number, - timeoutError: Error, -): Promise => { - if (!timeoutMs || timeoutMs <= 0) return promise; - let timer: NodeJS.Timeout | undefined; - const timeoutPromise = new Promise((_, reject) => { - timer = setTimeout(() => reject(timeoutError), timeoutMs); - }); - try { - return await Promise.race([promise, timeoutPromise]); - } finally { - if (timer) clearTimeout(timer); - } -}; - export async function runReplyAgent(params: { commandBody: string; followupRun: FollowupRun; @@ -228,29 +213,9 @@ export async function runReplyAgent(params: { return resolvedVerboseLevel === "on"; }; - const streamedPayloadKeys = new Set(); - const pendingStreamedPayloadKeys = new Set(); - const pendingBlockTasks = new Set>(); const pendingToolTasks = new Set>(); - let blockReplyChain: Promise = Promise.resolve(); - let blockReplyAborted = false; - let didLogBlockReplyAbort = false; - let didStreamBlockReply = false; const blockReplyTimeoutMs = opts?.blockReplyTimeoutMs ?? BLOCK_REPLY_SEND_TIMEOUT_MS; - const buildPayloadKey = (payload: ReplyPayload) => { - const text = payload.text?.trim() ?? ""; - const mediaList = payload.mediaUrls?.length - ? payload.mediaUrls - : payload.mediaUrl - ? [payload.mediaUrl] - : []; - return JSON.stringify({ - text, - mediaList, - replyToId: payload.replyToId ?? null, - }); - }; const replyToChannel = sessionCtx.OriginatingChannel ?? ((sessionCtx.Surface ?? sessionCtx.Provider)?.toLowerCase() as @@ -265,6 +230,23 @@ export async function runReplyAgent(params: { replyToChannel, ); const cfg = followupRun.run.config; + const blockReplyCoalescing = + blockStreamingEnabled && opts?.onBlockReply + ? resolveBlockStreamingCoalescing( + cfg, + sessionCtx.Provider, + sessionCtx.AccountId, + blockReplyChunking, + ) + : undefined; + const blockReplyPipeline = + blockStreamingEnabled && opts?.onBlockReply + ? createBlockReplyPipeline({ + onBlockReply: opts.onBlockReply, + timeoutMs: blockReplyTimeoutMs, + coalescing: blockReplyCoalescing, + }) + : null; if (shouldSteer && isStreaming) { const steered = queueEmbeddedPiMessage( @@ -511,15 +493,6 @@ export async function runReplyAgent(params: { text: cleaned, audioAsVoice: audioTagResult.audioAsVoice, }); - const payloadKey = buildPayloadKey(blockPayload); - if ( - streamedPayloadKeys.has(payloadKey) || - pendingStreamedPayloadKeys.has(payloadKey) - ) { - return; - } - if (blockReplyAborted) return; - pendingStreamedPayloadKeys.add(payloadKey); void typingSignals .signalTextDelta(taggedPayload.text) .catch((err) => { @@ -527,50 +500,7 @@ export async function runReplyAgent(params: { `block reply typing signal failed: ${String(err)}`, ); }); - const timeoutError = new Error( - `block reply delivery timed out after ${blockReplyTimeoutMs}ms`, - ); - const abortController = new AbortController(); - blockReplyChain = blockReplyChain - .then(async () => { - if (blockReplyAborted) return false; - await withTimeout( - opts.onBlockReply?.(blockPayload, { - abortSignal: abortController.signal, - timeoutMs: blockReplyTimeoutMs, - }) ?? Promise.resolve(), - blockReplyTimeoutMs, - timeoutError, - ); - return true; - }) - .then((didSend) => { - if (!didSend) return; - streamedPayloadKeys.add(payloadKey); - didStreamBlockReply = true; - }) - .catch((err) => { - if (err === timeoutError) { - abortController.abort(); - blockReplyAborted = true; - if (!didLogBlockReplyAbort) { - didLogBlockReplyAbort = true; - logVerbose( - `block reply delivery timed out after ${blockReplyTimeoutMs}ms; skipping remaining block replies to preserve ordering`, - ); - } - return; - } - logVerbose( - `block reply delivery failed: ${String(err)}`, - ); - }) - .finally(() => { - pendingStreamedPayloadKeys.delete(payloadKey); - }); - const task = blockReplyChain; - pendingBlockTasks.add(task); - void task.finally(() => pendingBlockTasks.delete(task)); + blockReplyPipeline?.enqueue(blockPayload); } : undefined, shouldEmitToolResult, @@ -684,8 +614,9 @@ export async function runReplyAgent(params: { } const payloadArray = runResult.payloads ?? []; - if (pendingBlockTasks.size > 0) { - await Promise.allSettled(pendingBlockTasks); + if (blockReplyPipeline) { + await blockReplyPipeline.flush({ force: true }); + blockReplyPipeline.stop(); } if (pendingToolTasks.size > 0) { await Promise.allSettled(pendingToolTasks); @@ -736,7 +667,9 @@ export async function runReplyAgent(params: { // Drop final payloads only when block streaming succeeded end-to-end. // If streaming aborted (e.g., timeout), fall back to final payloads. const shouldDropFinalPayloads = - blockStreamingEnabled && didStreamBlockReply && !blockReplyAborted; + blockStreamingEnabled && + Boolean(blockReplyPipeline?.didStream()) && + !blockReplyPipeline?.isAborted(); const messagingToolSentTexts = runResult.messagingToolSentTexts ?? []; const messagingToolSentTargets = runResult.messagingToolSentTargets ?? []; const suppressMessagingToolReplies = shouldSuppressMessagingToolReplies({ @@ -753,7 +686,7 @@ export async function runReplyAgent(params: { ? [] : blockStreamingEnabled ? dedupedPayloads.filter( - (payload) => !streamedPayloadKeys.has(buildPayloadKey(payload)), + (payload) => !blockReplyPipeline?.hasSentPayload(payload), ) : dedupedPayloads; const replyPayloads = suppressMessagingToolReplies ? [] : filteredPayloads; @@ -886,6 +819,7 @@ export async function runReplyAgent(params: { finalPayloads.length === 1 ? finalPayloads[0] : finalPayloads, ); } finally { + blockReplyPipeline?.stop(); typing.markRunComplete(); } } diff --git a/src/auto-reply/reply/block-reply-coalescer.test.ts b/src/auto-reply/reply/block-reply-coalescer.test.ts new file mode 100644 index 000000000..06f7e42cc --- /dev/null +++ b/src/auto-reply/reply/block-reply-coalescer.test.ts @@ -0,0 +1,71 @@ +import { afterEach, describe, expect, it, vi } from "vitest"; +import { createBlockReplyCoalescer } from "./block-reply-coalescer.js"; + +describe("block reply coalescer", () => { + afterEach(() => { + vi.useRealTimers(); + }); + + it("coalesces chunks within the idle window", async () => { + vi.useFakeTimers(); + const flushes: string[] = []; + const coalescer = createBlockReplyCoalescer({ + config: { minChars: 1, maxChars: 200, idleMs: 100, joiner: " " }, + shouldAbort: () => false, + onFlush: (payload) => { + flushes.push(payload.text ?? ""); + }, + }); + + coalescer.enqueue({ text: "Hello" }); + coalescer.enqueue({ text: "world" }); + + await vi.advanceTimersByTimeAsync(100); + expect(flushes).toEqual(["Hello world"]); + coalescer.stop(); + }); + + it("waits until minChars before idle flush", async () => { + vi.useFakeTimers(); + const flushes: string[] = []; + const coalescer = createBlockReplyCoalescer({ + config: { minChars: 10, maxChars: 200, idleMs: 50, joiner: " " }, + shouldAbort: () => false, + onFlush: (payload) => { + flushes.push(payload.text ?? ""); + }, + }); + + coalescer.enqueue({ text: "short" }); + await vi.advanceTimersByTimeAsync(50); + expect(flushes).toEqual([]); + + coalescer.enqueue({ text: "message" }); + await vi.advanceTimersByTimeAsync(50); + expect(flushes).toEqual(["short message"]); + coalescer.stop(); + }); + + it("flushes buffered text before media payloads", () => { + const flushes: Array<{ text?: string; mediaUrls?: string[] }> = []; + const coalescer = createBlockReplyCoalescer({ + config: { minChars: 1, maxChars: 200, idleMs: 0, joiner: " " }, + shouldAbort: () => false, + onFlush: (payload) => { + flushes.push({ + text: payload.text, + mediaUrls: payload.mediaUrls, + }); + }, + }); + + coalescer.enqueue({ text: "Hello" }); + coalescer.enqueue({ text: "world" }); + coalescer.enqueue({ mediaUrls: ["https://example.com/a.png"] }); + void coalescer.flush({ force: true }); + + expect(flushes[0].text).toBe("Hello world"); + expect(flushes[1].mediaUrls).toEqual(["https://example.com/a.png"]); + coalescer.stop(); + }); +}); diff --git a/src/auto-reply/reply/block-reply-coalescer.ts b/src/auto-reply/reply/block-reply-coalescer.ts new file mode 100644 index 000000000..97535e942 --- /dev/null +++ b/src/auto-reply/reply/block-reply-coalescer.ts @@ -0,0 +1,125 @@ +import type { ReplyPayload } from "../types.js"; +import type { BlockStreamingCoalescing } from "./block-streaming.js"; + +export type BlockReplyCoalescer = { + enqueue: (payload: ReplyPayload) => void; + flush: (options?: { force?: boolean }) => Promise; + hasBuffered: () => boolean; + stop: () => void; +}; + +export function createBlockReplyCoalescer(params: { + config: BlockStreamingCoalescing; + shouldAbort: () => boolean; + onFlush: (payload: ReplyPayload) => Promise | void; +}): BlockReplyCoalescer { + const { config, shouldAbort, onFlush } = params; + const minChars = Math.max(1, Math.floor(config.minChars)); + const maxChars = Math.max(minChars, Math.floor(config.maxChars)); + const idleMs = Math.max(0, Math.floor(config.idleMs)); + const joiner = config.joiner ?? ""; + + let bufferText = ""; + let bufferReplyToId: ReplyPayload["replyToId"]; + let bufferAudioAsVoice: ReplyPayload["audioAsVoice"]; + let idleTimer: NodeJS.Timeout | undefined; + + const clearIdleTimer = () => { + if (!idleTimer) return; + clearTimeout(idleTimer); + idleTimer = undefined; + }; + + const resetBuffer = () => { + bufferText = ""; + bufferReplyToId = undefined; + bufferAudioAsVoice = undefined; + }; + + const scheduleIdleFlush = () => { + if (idleMs <= 0) return; + clearIdleTimer(); + idleTimer = setTimeout(() => { + void flush({ force: false }); + }, idleMs); + }; + + const flush = async (options?: { force?: boolean }) => { + clearIdleTimer(); + if (shouldAbort()) { + resetBuffer(); + return; + } + if (!bufferText) return; + if (!options?.force && bufferText.length < minChars) { + scheduleIdleFlush(); + return; + } + const payload: ReplyPayload = { + text: bufferText, + replyToId: bufferReplyToId, + audioAsVoice: bufferAudioAsVoice, + }; + resetBuffer(); + await onFlush(payload); + }; + + const enqueue = (payload: ReplyPayload) => { + if (shouldAbort()) return; + const hasMedia = + Boolean(payload.mediaUrl) || (payload.mediaUrls?.length ?? 0) > 0; + const text = payload.text ?? ""; + const hasText = text.trim().length > 0; + if (hasMedia) { + void flush({ force: true }); + void onFlush(payload); + return; + } + if (!hasText) return; + + if ( + bufferText && + (bufferReplyToId !== payload.replyToId || + bufferAudioAsVoice !== payload.audioAsVoice) + ) { + void flush({ force: true }); + } + + if (!bufferText) { + bufferReplyToId = payload.replyToId; + bufferAudioAsVoice = payload.audioAsVoice; + } + + const nextText = bufferText ? `${bufferText}${joiner}${text}` : text; + if (nextText.length > maxChars) { + if (bufferText) { + void flush({ force: true }); + bufferReplyToId = payload.replyToId; + bufferAudioAsVoice = payload.audioAsVoice; + if (text.length >= maxChars) { + void onFlush(payload); + return; + } + bufferText = text; + scheduleIdleFlush(); + return; + } + void onFlush(payload); + return; + } + + bufferText = nextText; + if (bufferText.length >= maxChars) { + void flush({ force: true }); + return; + } + scheduleIdleFlush(); + }; + + return { + enqueue, + flush, + hasBuffered: () => Boolean(bufferText), + stop: () => clearIdleTimer(), + }; +} diff --git a/src/auto-reply/reply/block-reply-pipeline.ts b/src/auto-reply/reply/block-reply-pipeline.ts new file mode 100644 index 000000000..a6ff7141d --- /dev/null +++ b/src/auto-reply/reply/block-reply-pipeline.ts @@ -0,0 +1,173 @@ +import { logVerbose } from "../../globals.js"; +import type { ReplyPayload } from "../types.js"; +import { createBlockReplyCoalescer } from "./block-reply-coalescer.js"; +import type { BlockStreamingCoalescing } from "./block-streaming.js"; + +export type BlockReplyPipeline = { + enqueue: (payload: ReplyPayload) => void; + flush: (options?: { force?: boolean }) => Promise; + stop: () => void; + hasBuffered: () => boolean; + didStream: () => boolean; + isAborted: () => boolean; + hasSentPayload: (payload: ReplyPayload) => boolean; +}; + +export function createBlockReplyPayloadKey(payload: ReplyPayload): string { + const text = payload.text?.trim() ?? ""; + const mediaList = payload.mediaUrls?.length + ? payload.mediaUrls + : payload.mediaUrl + ? [payload.mediaUrl] + : []; + return JSON.stringify({ + text, + mediaList, + replyToId: payload.replyToId ?? null, + }); +} + +const withTimeout = async ( + promise: Promise, + timeoutMs: number, + timeoutError: Error, +): Promise => { + if (!timeoutMs || timeoutMs <= 0) return promise; + let timer: NodeJS.Timeout | undefined; + const timeoutPromise = new Promise((_, reject) => { + timer = setTimeout(() => reject(timeoutError), timeoutMs); + }); + try { + return await Promise.race([promise, timeoutPromise]); + } finally { + if (timer) clearTimeout(timer); + } +}; + +export function createBlockReplyPipeline(params: { + onBlockReply: ( + payload: ReplyPayload, + options?: { abortSignal?: AbortSignal; timeoutMs?: number }, + ) => Promise | void; + timeoutMs: number; + coalescing?: BlockStreamingCoalescing; +}): BlockReplyPipeline { + const { onBlockReply, timeoutMs, coalescing } = params; + const sentKeys = new Set(); + const pendingKeys = new Set(); + const seenKeys = new Set(); + const bufferedKeys = new Set(); + let sendChain: Promise = Promise.resolve(); + let aborted = false; + let didStream = false; + let didLogTimeout = false; + + const sendPayload = (payload: ReplyPayload, skipSeen?: boolean) => { + if (aborted) return; + const payloadKey = createBlockReplyPayloadKey(payload); + if (!skipSeen) { + if (seenKeys.has(payloadKey)) return; + seenKeys.add(payloadKey); + } + if (sentKeys.has(payloadKey) || pendingKeys.has(payloadKey)) return; + pendingKeys.add(payloadKey); + + const timeoutError = new Error( + `block reply delivery timed out after ${timeoutMs}ms`, + ); + const abortController = new AbortController(); + sendChain = sendChain + .then(async () => { + if (aborted) return false; + await withTimeout( + onBlockReply(payload, { + abortSignal: abortController.signal, + timeoutMs, + }) ?? Promise.resolve(), + timeoutMs, + timeoutError, + ); + return true; + }) + .then((didSend) => { + if (!didSend) return; + sentKeys.add(payloadKey); + didStream = true; + }) + .catch((err) => { + if (err === timeoutError) { + abortController.abort(); + aborted = true; + if (!didLogTimeout) { + didLogTimeout = true; + logVerbose( + `block reply delivery timed out after ${timeoutMs}ms; skipping remaining block replies to preserve ordering`, + ); + } + return; + } + logVerbose(`block reply delivery failed: ${String(err)}`); + }) + .finally(() => { + pendingKeys.delete(payloadKey); + }); + }; + + const coalescer = coalescing + ? createBlockReplyCoalescer({ + config: coalescing, + shouldAbort: () => aborted, + onFlush: (payload) => { + bufferedKeys.clear(); + sendPayload(payload); + }, + }) + : null; + + const enqueue = (payload: ReplyPayload) => { + if (aborted) return; + const hasMedia = + Boolean(payload.mediaUrl) || (payload.mediaUrls?.length ?? 0) > 0; + if (hasMedia) { + void coalescer?.flush({ force: true }); + sendPayload(payload); + return; + } + if (coalescer) { + const payloadKey = createBlockReplyPayloadKey(payload); + if ( + seenKeys.has(payloadKey) || + pendingKeys.has(payloadKey) || + bufferedKeys.has(payloadKey) + ) { + return; + } + bufferedKeys.add(payloadKey); + coalescer.enqueue(payload); + return; + } + sendPayload(payload); + }; + + const flush = async (options?: { force?: boolean }) => { + await coalescer?.flush(options); + await sendChain; + }; + + const stop = () => { + coalescer?.stop(); + }; + + return { + enqueue, + flush, + stop, + hasBuffered: () => Boolean(coalescer?.hasBuffered()), + didStream: () => didStream, + isAborted: () => aborted, + hasSentPayload: (payload) => { + const payloadKey = createBlockReplyPayloadKey(payload); + return sentKeys.has(payloadKey); + }, + }; +} diff --git a/src/auto-reply/reply/block-streaming.ts b/src/auto-reply/reply/block-streaming.ts index 2a51369a3..5d4d9a476 100644 --- a/src/auto-reply/reply/block-streaming.ts +++ b/src/auto-reply/reply/block-streaming.ts @@ -1,8 +1,10 @@ import type { ClawdbotConfig } from "../../config/config.js"; +import { normalizeAccountId } from "../../routing/session-key.js"; import { resolveTextChunkLimit, type TextChunkProvider } from "../chunk.js"; const DEFAULT_BLOCK_STREAM_MIN = 800; const DEFAULT_BLOCK_STREAM_MAX = 1200; +const DEFAULT_BLOCK_STREAM_COALESCE_IDLE_MS = 400; const BLOCK_CHUNK_PROVIDERS = new Set([ "whatsapp", @@ -12,6 +14,7 @@ const BLOCK_CHUNK_PROVIDERS = new Set([ "signal", "imessage", "webchat", + "msteams", ]); function normalizeChunkProvider( @@ -24,16 +27,24 @@ function normalizeChunkProvider( : undefined; } +export type BlockStreamingCoalescing = { + minChars: number; + maxChars: number; + idleMs: number; + joiner: string; +}; + export function resolveBlockStreamingChunking( cfg: ClawdbotConfig | undefined, provider?: string, + accountId?: string | null, ): { minChars: number; maxChars: number; breakPreference: "paragraph" | "newline" | "sentence"; } { const providerKey = normalizeChunkProvider(provider); - const textLimit = resolveTextChunkLimit(cfg, providerKey); + const textLimit = resolveTextChunkLimit(cfg, providerKey, accountId); const chunkCfg = cfg?.agents?.defaults?.blockStreamingChunk; const maxRequested = Math.max( 1, @@ -52,3 +63,88 @@ export function resolveBlockStreamingChunking( : "paragraph"; return { minChars, maxChars, breakPreference }; } + +export function resolveBlockStreamingCoalescing( + cfg: ClawdbotConfig | undefined, + provider?: string, + accountId?: string | null, + chunking?: { + minChars: number; + maxChars: number; + breakPreference: "paragraph" | "newline" | "sentence"; + }, +): BlockStreamingCoalescing { + const providerKey = normalizeChunkProvider(provider); + const textLimit = resolveTextChunkLimit(cfg, providerKey, accountId); + const normalizedAccountId = normalizeAccountId(accountId); + const providerCfg = (() => { + if (!cfg || !providerKey) return undefined; + if (providerKey === "whatsapp") { + return ( + cfg.whatsapp?.accounts?.[normalizedAccountId]?.blockStreamingCoalesce ?? + cfg.whatsapp?.blockStreamingCoalesce + ); + } + if (providerKey === "telegram") { + return ( + cfg.telegram?.accounts?.[normalizedAccountId]?.blockStreamingCoalesce ?? + cfg.telegram?.blockStreamingCoalesce + ); + } + if (providerKey === "discord") { + return ( + cfg.discord?.accounts?.[normalizedAccountId]?.blockStreamingCoalesce ?? + cfg.discord?.blockStreamingCoalesce + ); + } + if (providerKey === "slack") { + return ( + cfg.slack?.accounts?.[normalizedAccountId]?.blockStreamingCoalesce ?? + cfg.slack?.blockStreamingCoalesce + ); + } + if (providerKey === "signal") { + return ( + cfg.signal?.accounts?.[normalizedAccountId]?.blockStreamingCoalesce ?? + cfg.signal?.blockStreamingCoalesce + ); + } + if (providerKey === "imessage") { + return ( + cfg.imessage?.accounts?.[normalizedAccountId]?.blockStreamingCoalesce ?? + cfg.imessage?.blockStreamingCoalesce + ); + } + if (providerKey === "msteams") { + return cfg.msteams?.blockStreamingCoalesce; + } + return undefined; + })(); + const coalesceCfg = + providerCfg ?? cfg?.agents?.defaults?.blockStreamingCoalesce; + const minRequested = Math.max( + 1, + Math.floor( + coalesceCfg?.minChars ?? chunking?.minChars ?? DEFAULT_BLOCK_STREAM_MIN, + ), + ); + const maxRequested = Math.max( + 1, + Math.floor(coalesceCfg?.maxChars ?? textLimit), + ); + const maxChars = Math.max(1, Math.min(maxRequested, textLimit)); + const minChars = Math.min(minRequested, maxChars); + const idleMs = Math.max( + 0, + Math.floor(coalesceCfg?.idleMs ?? DEFAULT_BLOCK_STREAM_COALESCE_IDLE_MS), + ); + const preference = chunking?.breakPreference ?? "paragraph"; + const joiner = + preference === "sentence" ? " " : preference === "newline" ? "\n" : "\n\n"; + return { + minChars, + maxChars, + idleMs, + joiner, + }; +} diff --git a/src/config/types.ts b/src/config/types.ts index 7cf3775e3..8cfdae2e6 100644 --- a/src/config/types.ts +++ b/src/config/types.ts @@ -16,6 +16,12 @@ export type OutboundRetryConfig = { jitter?: number; }; +export type BlockStreamingCoalesceConfig = { + minChars?: number; + maxChars?: number; + idleMs?: number; +}; + export type SessionSendPolicyAction = "allow" | "deny"; export type SessionSendPolicyMatch = { provider?: string; @@ -127,6 +133,8 @@ export type WhatsAppConfig = { textChunkLimit?: number; /** Disable block streaming for this account. */ blockStreaming?: boolean; + /** Merge streamed block replies before sending. */ + blockStreamingCoalesce?: BlockStreamingCoalesceConfig; /** Per-action tool gating (default: true for all). */ actions?: WhatsAppActionConfig; groups?: Record< @@ -153,6 +161,8 @@ export type WhatsAppAccountConfig = { groupPolicy?: GroupPolicy; textChunkLimit?: number; blockStreaming?: boolean; + /** Merge streamed block replies before sending. */ + blockStreamingCoalesce?: BlockStreamingCoalesceConfig; groups?: Record< string, { @@ -306,6 +316,8 @@ export type TelegramAccountConfig = { textChunkLimit?: number; /** Disable block streaming for this account. */ blockStreaming?: boolean; + /** Merge streamed block replies before sending. */ + blockStreamingCoalesce?: BlockStreamingCoalesceConfig; /** Draft streaming mode for Telegram (off|partial|block). Default: partial. */ streamMode?: "off" | "partial" | "block"; mediaMaxMb?: number; @@ -429,6 +441,8 @@ export type DiscordAccountConfig = { textChunkLimit?: number; /** Disable block streaming for this account. */ blockStreaming?: boolean; + /** Merge streamed block replies before sending. */ + blockStreamingCoalesce?: BlockStreamingCoalesceConfig; /** * Soft max line count per Discord message. * Discord clients can clip/collapse very tall messages; splitting by lines @@ -525,6 +539,8 @@ export type SlackAccountConfig = { groupPolicy?: GroupPolicy; textChunkLimit?: number; blockStreaming?: boolean; + /** Merge streamed block replies before sending. */ + blockStreamingCoalesce?: BlockStreamingCoalesceConfig; mediaMaxMb?: number; /** Reaction notification mode (off|own|all|allowlist). Default: own. */ reactionNotifications?: SlackReactionNotificationMode; @@ -579,6 +595,8 @@ export type SignalAccountConfig = { /** Outbound text chunk size (chars). Default: 4000. */ textChunkLimit?: number; blockStreaming?: boolean; + /** Merge streamed block replies before sending. */ + blockStreamingCoalesce?: BlockStreamingCoalesceConfig; mediaMaxMb?: number; }; @@ -632,6 +650,8 @@ export type MSTeamsConfig = { allowFrom?: Array; /** Outbound text chunk size (chars). Default: 4000. */ textChunkLimit?: number; + /** Merge streamed block replies before sending. */ + blockStreamingCoalesce?: BlockStreamingCoalesceConfig; /** * Allowed host suffixes for inbound attachment downloads. * Use ["*"] to allow any host (not recommended). @@ -678,6 +698,8 @@ export type IMessageAccountConfig = { /** Outbound text chunk size (chars). Default: 4000. */ textChunkLimit?: number; blockStreaming?: boolean; + /** Merge streamed block replies before sending. */ + blockStreamingCoalesce?: BlockStreamingCoalesceConfig; groups?: Record< string, { @@ -1201,6 +1223,11 @@ export type AgentDefaultsConfig = { maxChars?: number; breakPreference?: "paragraph" | "newline" | "sentence"; }; + /** + * Block reply coalescing (merge streamed chunks before send). + * idleMs: wait time before flushing when idle. + */ + blockStreamingCoalesce?: BlockStreamingCoalesceConfig; timeoutSeconds?: number; /** Max inbound media size in MB for agent-visible attachments (text note or future image attach). */ mediaMaxMb?: number; diff --git a/src/config/zod-schema.ts b/src/config/zod-schema.ts index 3e1990166..ae655eb41 100644 --- a/src/config/zod-schema.ts +++ b/src/config/zod-schema.ts @@ -97,6 +97,12 @@ const GroupPolicySchema = z.enum(["open", "disabled", "allowlist"]); const DmPolicySchema = z.enum(["pairing", "allowlist", "open", "disabled"]); +const BlockStreamingCoalesceSchema = z.object({ + minChars: z.number().int().positive().optional(), + maxChars: z.number().int().positive().optional(), + idleMs: z.number().int().nonnegative().optional(), +}); + const normalizeAllowFrom = (values?: Array): string[] => (values ?? []).map((v) => String(v).trim()).filter(Boolean); @@ -192,6 +198,7 @@ const TelegramAccountSchemaBase = z.object({ groupPolicy: GroupPolicySchema.optional().default("open"), textChunkLimit: z.number().int().positive().optional(), blockStreaming: z.boolean().optional(), + blockStreamingCoalesce: BlockStreamingCoalesceSchema.optional(), streamMode: z.enum(["off", "partial", "block"]).optional().default("partial"), mediaMaxMb: z.number().positive().optional(), retry: RetryConfigSchema, @@ -277,6 +284,7 @@ const DiscordAccountSchema = z.object({ groupPolicy: GroupPolicySchema.optional().default("open"), textChunkLimit: z.number().int().positive().optional(), blockStreaming: z.boolean().optional(), + blockStreamingCoalesce: BlockStreamingCoalesceSchema.optional(), maxLinesPerMessage: z.number().int().positive().optional(), mediaMaxMb: z.number().positive().optional(), historyLimit: z.number().int().min(0).optional(), @@ -347,6 +355,7 @@ const SlackAccountSchema = z.object({ groupPolicy: GroupPolicySchema.optional().default("open"), textChunkLimit: z.number().int().positive().optional(), blockStreaming: z.boolean().optional(), + blockStreamingCoalesce: BlockStreamingCoalesceSchema.optional(), mediaMaxMb: z.number().positive().optional(), reactionNotifications: z.enum(["off", "own", "all", "allowlist"]).optional(), reactionAllowlist: z.array(z.union([z.string(), z.number()])).optional(), @@ -398,6 +407,7 @@ const SignalAccountSchemaBase = z.object({ groupPolicy: GroupPolicySchema.optional().default("open"), textChunkLimit: z.number().int().positive().optional(), blockStreaming: z.boolean().optional(), + blockStreamingCoalesce: BlockStreamingCoalesceSchema.optional(), mediaMaxMb: z.number().int().positive().optional(), }); @@ -443,6 +453,7 @@ const IMessageAccountSchemaBase = z.object({ mediaMaxMb: z.number().int().positive().optional(), textChunkLimit: z.number().int().positive().optional(), blockStreaming: z.boolean().optional(), + blockStreamingCoalesce: BlockStreamingCoalesceSchema.optional(), groups: z .record( z.string(), @@ -507,6 +518,7 @@ const MSTeamsConfigSchema = z dmPolicy: DmPolicySchema.optional().default("pairing"), allowFrom: z.array(z.string()).optional(), textChunkLimit: z.number().int().positive().optional(), + blockStreamingCoalesce: BlockStreamingCoalesceSchema.optional(), mediaAllowHosts: z.array(z.string()).optional(), requireMention: z.boolean().optional(), replyStyle: MSTeamsReplyStyleSchema.optional(), @@ -994,6 +1006,7 @@ const AgentDefaultsSchema = z .optional(), }) .optional(), + blockStreamingCoalesce: BlockStreamingCoalesceSchema.optional(), timeoutSeconds: z.number().int().positive().optional(), mediaMaxMb: z.number().positive().optional(), typingIntervalSeconds: z.number().int().positive().optional(), @@ -1215,6 +1228,7 @@ export const ClawdbotSchema = z.object({ groupPolicy: GroupPolicySchema.optional().default("open"), textChunkLimit: z.number().int().positive().optional(), blockStreaming: z.boolean().optional(), + blockStreamingCoalesce: BlockStreamingCoalesceSchema.optional(), groups: z .record( z.string(), @@ -1249,6 +1263,7 @@ export const ClawdbotSchema = z.object({ groupPolicy: GroupPolicySchema.optional().default("open"), textChunkLimit: z.number().int().positive().optional(), blockStreaming: z.boolean().optional(), + blockStreamingCoalesce: BlockStreamingCoalesceSchema.optional(), actions: z .object({ reactions: z.boolean().optional(), diff --git a/src/telegram/bot.ts b/src/telegram/bot.ts index c0226a4c8..dd3fbcba0 100644 --- a/src/telegram/bot.ts +++ b/src/telegram/bot.ts @@ -673,7 +673,7 @@ export function createTelegramBot(opts: TelegramBotOptions) { : undefined; const draftChunking = draftStream && streamMode === "block" - ? resolveBlockStreamingChunking(cfg, "telegram") + ? resolveBlockStreamingChunking(cfg, "telegram", route.accountId) : undefined; const draftChunker = draftChunking ? new EmbeddedBlockChunker(draftChunking)