From b4fbf2fe0d6f960b1ae20d353dd942ac0e18c632 Mon Sep 17 00:00:00 2001 From: Ruby Date: Fri, 9 Jan 2026 16:22:44 -0600 Subject: [PATCH] fix: enable block streaming for Telegram when streamMode is 'block' - Fix disableBlockStreaming logic in telegram/bot.ts to properly enable block streaming when telegram.streamMode is 'block' regardless of blockStreamingDefault setting - Set minChars default to 1 for Telegram block mode so chunks send immediately on newlines/sentences instead of waiting for 800 chars - Skip coalescing for Telegram block mode when not explicitly configured to reduce chunk batching delays - Fix newline preference to wait for actual newlines instead of breaking on any whitespace when buffer is under maxChars Fixes issue where all Telegram messages were batched into one message at the end instead of streaming as separate messages during generation. --- src/agents/pi-embedded-block-chunker.ts | 4 ++++ src/auto-reply/reply/block-streaming.ts | 14 ++++++++++++-- src/telegram/bot.ts | 18 +++++++++++++----- 3 files changed, 29 insertions(+), 7 deletions(-) diff --git a/src/agents/pi-embedded-block-chunker.ts b/src/agents/pi-embedded-block-chunker.ts index 7254bbd9f..3bf95c14b 100644 --- a/src/agents/pi-embedded-block-chunker.ts +++ b/src/agents/pi-embedded-block-chunker.ts @@ -221,6 +221,10 @@ export class EmbeddedBlockChunker { if (sentenceIdx >= minChars) return { index: sentenceIdx }; } + if (preference === "newline" && buffer.length < maxChars) { + return { index: -1 }; + } + for (let i = window.length - 1; i >= minChars; i--) { if (/\s/.test(window[i]) && isSafeFenceBreak(fenceSpans, i)) { return { index: i }; diff --git a/src/auto-reply/reply/block-streaming.ts b/src/auto-reply/reply/block-streaming.ts index 17f2cda43..23b3db24a 100644 --- a/src/auto-reply/reply/block-streaming.ts +++ b/src/auto-reply/reply/block-streaming.ts @@ -58,9 +58,12 @@ export function resolveBlockStreamingChunking( Math.floor(chunkCfg?.maxChars ?? DEFAULT_BLOCK_STREAM_MAX), ); const maxChars = Math.max(1, Math.min(maxRequested, textLimit)); + const telegramBlockStreaming = + providerKey === "telegram" && cfg?.telegram?.streamMode === "block"; + const minFallback = telegramBlockStreaming ? 1 : DEFAULT_BLOCK_STREAM_MIN; const minRequested = Math.max( 1, - Math.floor(chunkCfg?.minChars ?? DEFAULT_BLOCK_STREAM_MIN), + Math.floor(chunkCfg?.minChars ?? minFallback), ); const minChars = Math.min(minRequested, maxChars); const breakPreference = @@ -80,7 +83,7 @@ export function resolveBlockStreamingCoalescing( maxChars: number; breakPreference: "paragraph" | "newline" | "sentence"; }, -): BlockStreamingCoalescing { +): BlockStreamingCoalescing | undefined { const providerKey = normalizeChunkProvider(provider); const textLimit = resolveTextChunkLimit(cfg, providerKey, accountId); const normalizedAccountId = normalizeAccountId(accountId); @@ -132,6 +135,13 @@ export function resolveBlockStreamingCoalescing( })(); const coalesceCfg = providerCfg ?? cfg?.agents?.defaults?.blockStreamingCoalesce; + if ( + providerKey === "telegram" && + cfg?.telegram?.streamMode === "block" && + !coalesceCfg + ) { + return undefined; + } const minRequested = Math.max( 1, Math.floor( diff --git a/src/telegram/bot.ts b/src/telegram/bot.ts index c5979ec76..7776b240c 100644 --- a/src/telegram/bot.ts +++ b/src/telegram/bot.ts @@ -826,6 +826,16 @@ export function createTelegramBot(opts: TelegramBotOptions) { onReplyStart: sendTyping, }); + const blockStreamingDisabledByConfig = + typeof telegramCfg.blockStreaming === "boolean" + ? !telegramCfg.blockStreaming + : false; + const forceBlockStreaming = Boolean( + streamMode === "block" && !draftStream && !blockStreamingDisabledByConfig, + ); + const disableBlockStreaming = + Boolean(draftStream) || blockStreamingDisabledByConfig ? true : undefined; + const { queuedFinal } = await dispatchReplyFromConfig({ ctx: ctxPayload, cfg, @@ -841,11 +851,9 @@ export function createTelegramBot(opts: TelegramBotOptions) { if (payload.text) draftStream.update(payload.text); } : undefined, - disableBlockStreaming: - Boolean(draftStream) || - (typeof telegramCfg.blockStreaming === "boolean" - ? !telegramCfg.blockStreaming - : undefined), + disableBlockStreaming: forceBlockStreaming + ? false + : disableBlockStreaming, }, }); markDispatchIdle();