From b4fbf2fe0d6f960b1ae20d353dd942ac0e18c632 Mon Sep 17 00:00:00 2001 From: Ruby Date: Fri, 9 Jan 2026 16:22:44 -0600 Subject: [PATCH 1/2] fix: enable block streaming for Telegram when streamMode is 'block' - Fix disableBlockStreaming logic in telegram/bot.ts to properly enable block streaming when telegram.streamMode is 'block' regardless of blockStreamingDefault setting - Set minChars default to 1 for Telegram block mode so chunks send immediately on newlines/sentences instead of waiting for 800 chars - Skip coalescing for Telegram block mode when not explicitly configured to reduce chunk batching delays - Fix newline preference to wait for actual newlines instead of breaking on any whitespace when buffer is under maxChars Fixes issue where all Telegram messages were batched into one message at the end instead of streaming as separate messages during generation. --- src/agents/pi-embedded-block-chunker.ts | 4 ++++ src/auto-reply/reply/block-streaming.ts | 14 ++++++++++++-- src/telegram/bot.ts | 18 +++++++++++++----- 3 files changed, 29 insertions(+), 7 deletions(-) diff --git a/src/agents/pi-embedded-block-chunker.ts b/src/agents/pi-embedded-block-chunker.ts index 7254bbd9f..3bf95c14b 100644 --- a/src/agents/pi-embedded-block-chunker.ts +++ b/src/agents/pi-embedded-block-chunker.ts @@ -221,6 +221,10 @@ export class EmbeddedBlockChunker { if (sentenceIdx >= minChars) return { index: sentenceIdx }; } + if (preference === "newline" && buffer.length < maxChars) { + return { index: -1 }; + } + for (let i = window.length - 1; i >= minChars; i--) { if (/\s/.test(window[i]) && isSafeFenceBreak(fenceSpans, i)) { return { index: i }; diff --git a/src/auto-reply/reply/block-streaming.ts b/src/auto-reply/reply/block-streaming.ts index 17f2cda43..23b3db24a 100644 --- a/src/auto-reply/reply/block-streaming.ts +++ b/src/auto-reply/reply/block-streaming.ts @@ -58,9 +58,12 @@ export function resolveBlockStreamingChunking( Math.floor(chunkCfg?.maxChars ?? DEFAULT_BLOCK_STREAM_MAX), ); const maxChars = Math.max(1, Math.min(maxRequested, textLimit)); + const telegramBlockStreaming = + providerKey === "telegram" && cfg?.telegram?.streamMode === "block"; + const minFallback = telegramBlockStreaming ? 1 : DEFAULT_BLOCK_STREAM_MIN; const minRequested = Math.max( 1, - Math.floor(chunkCfg?.minChars ?? DEFAULT_BLOCK_STREAM_MIN), + Math.floor(chunkCfg?.minChars ?? minFallback), ); const minChars = Math.min(minRequested, maxChars); const breakPreference = @@ -80,7 +83,7 @@ export function resolveBlockStreamingCoalescing( maxChars: number; breakPreference: "paragraph" | "newline" | "sentence"; }, -): BlockStreamingCoalescing { +): BlockStreamingCoalescing | undefined { const providerKey = normalizeChunkProvider(provider); const textLimit = resolveTextChunkLimit(cfg, providerKey, accountId); const normalizedAccountId = normalizeAccountId(accountId); @@ -132,6 +135,13 @@ export function resolveBlockStreamingCoalescing( })(); const coalesceCfg = providerCfg ?? cfg?.agents?.defaults?.blockStreamingCoalesce; + if ( + providerKey === "telegram" && + cfg?.telegram?.streamMode === "block" && + !coalesceCfg + ) { + return undefined; + } const minRequested = Math.max( 1, Math.floor( diff --git a/src/telegram/bot.ts b/src/telegram/bot.ts index c5979ec76..7776b240c 100644 --- a/src/telegram/bot.ts +++ b/src/telegram/bot.ts @@ -826,6 +826,16 @@ export function createTelegramBot(opts: TelegramBotOptions) { onReplyStart: sendTyping, }); + const blockStreamingDisabledByConfig = + typeof telegramCfg.blockStreaming === "boolean" + ? !telegramCfg.blockStreaming + : false; + const forceBlockStreaming = Boolean( + streamMode === "block" && !draftStream && !blockStreamingDisabledByConfig, + ); + const disableBlockStreaming = + Boolean(draftStream) || blockStreamingDisabledByConfig ? true : undefined; + const { queuedFinal } = await dispatchReplyFromConfig({ ctx: ctxPayload, cfg, @@ -841,11 +851,9 @@ export function createTelegramBot(opts: TelegramBotOptions) { if (payload.text) draftStream.update(payload.text); } : undefined, - disableBlockStreaming: - Boolean(draftStream) || - (typeof telegramCfg.blockStreaming === "boolean" - ? !telegramCfg.blockStreaming - : undefined), + disableBlockStreaming: forceBlockStreaming + ? false + : disableBlockStreaming, }, }); markDispatchIdle(); From 1fd7a6e310d7b6551f3a9464e43eb5dc65b003bc Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Sat, 10 Jan 2026 01:14:40 +0100 Subject: [PATCH 2/2] fix: keep telegram streamMode draft-only (#619) (thanks @rubyrunsstuff) --- CHANGELOG.md | 1 + src/agents/pi-embedded-runner.ts | 1 - src/auto-reply/reply.block-streaming.test.ts | 41 ++++++++++++++++++++ src/auto-reply/reply/block-streaming.ts | 11 +----- src/telegram/bot.ts | 16 +++----- 5 files changed, 48 insertions(+), 22 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 787d6fa62..a9df3d1ca 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -38,6 +38,7 @@ - Auth: default billing disable backoff to 5h (doubling, 24h cap) and surface disabled/cooldown profiles in `models list` + doctor. (#486) — thanks @steipete - Commands: harden slash command registry and list text-only commands in `/commands`. - Models/Auth: show per-agent auth candidates in `/model status`, and add `clawdbot models auth order {get,set,clear}` (per-agent auth rotation overrides). — thanks @steipete +- Telegram: keep streamMode draft-only; avoid forcing block streaming. (#619) — thanks @rubyrunsstuff - Debugging: add raw model stream logging flags and document gateway watch mode. - Gateway: decode dns-sd escaped UTF-8 in discovery output and show scan progress immediately. — thanks @steipete - Agent: add claude-cli/opus-4.5 runner via Claude CLI with resume support (tools disabled). diff --git a/src/agents/pi-embedded-runner.ts b/src/agents/pi-embedded-runner.ts index a95d5f38d..0a23ae9d4 100644 --- a/src/agents/pi-embedded-runner.ts +++ b/src/agents/pi-embedded-runner.ts @@ -109,7 +109,6 @@ import { // Optional features can be implemented as Pi extensions that run in the same Node process. - /** * Resolve provider-specific extraParams from model config. * Auto-enables thinking mode for GLM-4.x models unless explicitly disabled. diff --git a/src/auto-reply/reply.block-streaming.test.ts b/src/auto-reply/reply.block-streaming.test.ts index 8cb8ab767..9a13e65cc 100644 --- a/src/auto-reply/reply.block-streaming.test.ts +++ b/src/auto-reply/reply.block-streaming.test.ts @@ -273,4 +273,45 @@ describe("block streaming", () => { expect(sawAbort).toBe(true); }); }); + + it("does not enable block streaming for telegram streamMode block", async () => { + await withTempHome(async (home) => { + const onBlockReply = vi.fn().mockResolvedValue(undefined); + + const impl = async () => ({ + payloads: [{ text: "final" }], + meta: { + durationMs: 5, + agentMeta: { sessionId: "s", provider: "p", model: "m" }, + }, + }); + piEmbeddedMock.runEmbeddedPiAgent.mockImplementation(impl); + + const res = await getReplyFromConfig( + { + Body: "ping", + From: "+1004", + To: "+2000", + MessageSid: "msg-126", + Provider: "telegram", + }, + { + onBlockReply, + }, + { + agents: { + defaults: { + model: "anthropic/claude-opus-4-5", + workspace: path.join(home, "clawd"), + }, + }, + telegram: { allowFrom: ["*"], streamMode: "block" }, + session: { store: path.join(home, "sessions.json") }, + }, + ); + + expect(res?.text).toBe("final"); + expect(onBlockReply).not.toHaveBeenCalled(); + }); + }); }); diff --git a/src/auto-reply/reply/block-streaming.ts b/src/auto-reply/reply/block-streaming.ts index 23b3db24a..da60e1ce9 100644 --- a/src/auto-reply/reply/block-streaming.ts +++ b/src/auto-reply/reply/block-streaming.ts @@ -58,9 +58,7 @@ export function resolveBlockStreamingChunking( Math.floor(chunkCfg?.maxChars ?? DEFAULT_BLOCK_STREAM_MAX), ); const maxChars = Math.max(1, Math.min(maxRequested, textLimit)); - const telegramBlockStreaming = - providerKey === "telegram" && cfg?.telegram?.streamMode === "block"; - const minFallback = telegramBlockStreaming ? 1 : DEFAULT_BLOCK_STREAM_MIN; + const minFallback = DEFAULT_BLOCK_STREAM_MIN; const minRequested = Math.max( 1, Math.floor(chunkCfg?.minChars ?? minFallback), @@ -135,13 +133,6 @@ export function resolveBlockStreamingCoalescing( })(); const coalesceCfg = providerCfg ?? cfg?.agents?.defaults?.blockStreamingCoalesce; - if ( - providerKey === "telegram" && - cfg?.telegram?.streamMode === "block" && - !coalesceCfg - ) { - return undefined; - } const minRequested = Math.max( 1, Math.floor( diff --git a/src/telegram/bot.ts b/src/telegram/bot.ts index 7776b240c..aaba18307 100644 --- a/src/telegram/bot.ts +++ b/src/telegram/bot.ts @@ -826,15 +826,11 @@ export function createTelegramBot(opts: TelegramBotOptions) { onReplyStart: sendTyping, }); - const blockStreamingDisabledByConfig = - typeof telegramCfg.blockStreaming === "boolean" - ? !telegramCfg.blockStreaming - : false; - const forceBlockStreaming = Boolean( - streamMode === "block" && !draftStream && !blockStreamingDisabledByConfig, - ); const disableBlockStreaming = - Boolean(draftStream) || blockStreamingDisabledByConfig ? true : undefined; + Boolean(draftStream) || + (typeof telegramCfg.blockStreaming === "boolean" + ? !telegramCfg.blockStreaming + : undefined); const { queuedFinal } = await dispatchReplyFromConfig({ ctx: ctxPayload, @@ -851,9 +847,7 @@ export function createTelegramBot(opts: TelegramBotOptions) { if (payload.text) draftStream.update(payload.text); } : undefined, - disableBlockStreaming: forceBlockStreaming - ? false - : disableBlockStreaming, + disableBlockStreaming, }, }); markDispatchIdle();