From 27d940f5b691c4e4c549a09c1b18ca801efdd941 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Mon, 12 Jan 2026 22:27:46 +0000 Subject: [PATCH] refactor: reuse streaming text normalizer across callbacks --- src/auto-reply/reply/agent-runner.ts | 53 +++++++--------------------- 1 file changed, 12 insertions(+), 41 deletions(-) diff --git a/src/auto-reply/reply/agent-runner.ts b/src/auto-reply/reply/agent-runner.ts index 833f40245..26d981416 100644 --- a/src/auto-reply/reply/agent-runner.ts +++ b/src/auto-reply/reply/agent-runner.ts @@ -549,8 +549,8 @@ export async function runReplyAgent(params: { ); const normalizeStreamingText = ( payload: ReplyPayload, - ): string | undefined => { - if (!allowPartialStream) return undefined; + ): { text?: string; skip: boolean } => { + if (!allowPartialStream) return { skip: true }; let text = payload.text; if (!isHeartbeat && text?.includes("HEARTBEAT_OK")) { const stripped = stripHeartbeatToken(text, { @@ -561,18 +561,18 @@ export async function runReplyAgent(params: { logVerbose("Stripped stray HEARTBEAT_OK token from reply"); } if (stripped.shouldSkip && (payload.mediaUrls?.length ?? 0) === 0) { - return undefined; + return { skip: true }; } text = stripped.text; } - if (isSilentReplyText(text, SILENT_REPLY_TOKEN)) return undefined; - return text; + if (isSilentReplyText(text, SILENT_REPLY_TOKEN)) return { skip: true }; + return { text, skip: false }; }; const handlePartialForTyping = async ( payload: ReplyPayload, ): Promise => { - const text = normalizeStreamingText(payload); - if (!text) return undefined; + const { text, skip } = normalizeStreamingText(payload); + if (skip || !text) return undefined; await typingSignals.signalTextDelta(text); return text; }; @@ -712,21 +712,9 @@ export async function runReplyAgent(params: { onBlockReply: blockStreamingEnabled && opts?.onBlockReply ? async (payload) => { - let text = payload.text; - if (!isHeartbeat && text?.includes("HEARTBEAT_OK")) { - const stripped = stripHeartbeatToken(text, { - mode: "message", - }); - if (stripped.didStrip && !didLogHeartbeatStrip) { - didLogHeartbeatStrip = true; - logVerbose( - "Stripped stray HEARTBEAT_OK token from reply", - ); - } - const hasMedia = (payload.mediaUrls?.length ?? 0) > 0; - if (stripped.shouldSkip && !hasMedia) return; - text = stripped.text; - } + const { text, skip } = normalizeStreamingText(payload); + const hasMedia = (payload.mediaUrls?.length ?? 0) > 0; + if (skip && !hasMedia) return; const taggedPayload = applyReplyTagsToPayload( { text, @@ -799,25 +787,8 @@ export async function runReplyAgent(params: { // If a tool callback starts typing after the run finalized, we can end up with // a typing loop that never sees a matching markRunComplete(). Track and drain. const task = (async () => { - let text = payload.text; - if (!isHeartbeat && text?.includes("HEARTBEAT_OK")) { - const stripped = stripHeartbeatToken(text, { - mode: "message", - }); - if (stripped.didStrip && !didLogHeartbeatStrip) { - didLogHeartbeatStrip = true; - logVerbose( - "Stripped stray HEARTBEAT_OK token from reply", - ); - } - if ( - stripped.shouldSkip && - (payload.mediaUrls?.length ?? 0) === 0 - ) { - return; - } - text = stripped.text; - } + const { text, skip } = normalizeStreamingText(payload); + if (skip) return; await typingSignals.signalTextDelta(text); await opts.onToolResult?.({ text,