From 3bbdcaf87f8ceffd633fda7c2316a6f1314f43d1 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Sat, 3 Jan 2026 17:10:47 +0100 Subject: [PATCH] fix: avoid duplicate block streaming --- CHANGELOG.md | 1 + docs/agent.md | 2 +- docs/configuration.md | 2 +- src/auto-reply/reply.ts | 14 ++++++++++++-- 4 files changed, 15 insertions(+), 4 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index efd2e6011..0df9cad07 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,7 @@ ### Fixes - Telegram: chunk block-stream replies to avoid “message is too long” errors (#124) — thanks @mukhtharcm. +- Block streaming: default to text_end and suppress duplicate block sends while in-flight. - Gmail hooks: resolve gcloud Python to a real executable when PATH uses mise shims — thanks @joargp. - Control UI: generate UUIDs when `crypto.randomUUID()` is unavailable over HTTP — thanks @ratulsarna. - Agent: add soft block-stream chunking (800–1200 chars default) with paragraph/newline preference. diff --git a/docs/agent.md b/docs/agent.md index f464fc32e..32a1a7733 100644 --- a/docs/agent.md +++ b/docs/agent.md @@ -82,7 +82,7 @@ current turn ends, then a new agent turn starts with the queued payloads. See Block streaming sends completed assistant blocks as soon as they finish; disable via `agent.blockStreamingDefault: "off"` if you only want the final response. -Tune the boundary via `agent.blockStreamingBreak` (`text_end` vs `message_end`). +Tune the boundary via `agent.blockStreamingBreak` (`text_end` vs `message_end`; defaults to text_end). Control soft block chunking with `agent.blockStreamingChunk` (defaults to 800–1200 chars; prefers paragraph breaks, then newlines; sentences last). diff --git a/docs/configuration.md b/docs/configuration.md index 427fe6dc0..ada50170f 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -393,7 +393,7 @@ Controls the embedded agent runtime (model/thinking/verbose/timeouts). Block streaming: - `agent.blockStreamingDefault`: `"on"`/`"off"` (default on). -- `agent.blockStreamingBreak`: `"text_end"` or `"message_end"`. +- `agent.blockStreamingBreak`: `"text_end"` or `"message_end"` (default: text_end). - `agent.blockStreamingChunk`: soft chunking for streamed blocks. Defaults to 800–1200 chars, prefers paragraph breaks (`\n\n`), then newlines, then sentences. Example: diff --git a/src/auto-reply/reply.ts b/src/auto-reply/reply.ts index 36e4b0d5c..af07632de 100644 --- a/src/auto-reply/reply.ts +++ b/src/auto-reply/reply.ts @@ -1124,14 +1124,14 @@ export async function getReplyFromConfig( (agentCfg?.verboseDefault as VerboseLevel | undefined); const resolvedBlockStreaming = agentCfg?.blockStreamingDefault === "off" ? "off" : "on"; - // TODO(steipete): Default to message_end for now; figure out why text_end breaks and whether we can revert. const resolvedBlockStreamingBreak = - agentCfg?.blockStreamingBreak === "text_end" ? "text_end" : "message_end"; + agentCfg?.blockStreamingBreak === "message_end" ? "message_end" : "text_end"; const blockStreamingEnabled = resolvedBlockStreaming === "on"; const blockReplyChunking = blockStreamingEnabled ? resolveBlockStreamingChunking(cfg, sessionCtx.Surface) : undefined; const streamedPayloadKeys = new Set(); + const pendingStreamedPayloadKeys = new Set(); const pendingBlockTasks = new Set>(); const buildPayloadKey = (payload: ReplyPayload) => { const text = payload.text?.trim() ?? ""; @@ -2232,6 +2232,13 @@ export async function getReplyFromConfig( replyToId: tagResult.replyToId, }; const payloadKey = buildPayloadKey(blockPayload); + if ( + streamedPayloadKeys.has(payloadKey) || + pendingStreamedPayloadKeys.has(payloadKey) + ) { + return; + } + pendingStreamedPayloadKeys.add(payloadKey); const task = (async () => { await startTypingOnText(cleaned); await opts.onBlockReply?.(blockPayload); @@ -2241,6 +2248,9 @@ export async function getReplyFromConfig( }) .catch((err) => { logVerbose(`block reply delivery failed: ${String(err)}`); + }) + .finally(() => { + pendingStreamedPayloadKeys.delete(payloadKey); }); pendingBlockTasks.add(task); void task.finally(() => pendingBlockTasks.delete(task));