From 2d0ca67c210968d8fe9bc93e9a061aee16d9306f Mon Sep 17 00:00:00 2001 From: Zach Knickerbocker Date: Fri, 9 Jan 2026 17:25:10 -0500 Subject: [PATCH 1/2] fix: prevent leakage in block streaming --- src/agents/pi-embedded-subscribe.test.ts | 83 ++++++++++++++++++++++-- src/agents/pi-embedded-subscribe.ts | 33 +++++++++- 2 files changed, 110 insertions(+), 6 deletions(-) diff --git a/src/agents/pi-embedded-subscribe.test.ts b/src/agents/pi-embedded-subscribe.test.ts index 40949d799..b85b3a028 100644 --- a/src/agents/pi-embedded-subscribe.test.ts +++ b/src/agents/pi-embedded-subscribe.test.ts @@ -278,6 +278,74 @@ describe("subscribeEmbeddedPiSession", () => { ]); }); + it.each([ + { tag: "think", open: "", close: "" }, + { tag: "thinking", open: "", close: "" }, + ])("suppresses <%s> blocks across chunk boundaries", ({ open, close }) => { + let handler: ((evt: unknown) => void) | undefined; + const session: StubSession = { + subscribe: (fn) => { + handler = fn; + return () => {}; + }, + }; + + const onBlockReply = vi.fn(); + + subscribeEmbeddedPiSession({ + session: session as unknown as Parameters< + typeof subscribeEmbeddedPiSession + >[0]["session"], + runId: "run", + onBlockReply, + blockReplyBreak: "text_end", + blockReplyChunking: { + minChars: 5, + maxChars: 50, + breakPreference: "newline", + }, + }); + + handler?.({ type: "message_start", message: { role: "assistant" } }); + + handler?.({ + type: "message_update", + message: { role: "assistant" }, + assistantMessageEvent: { + type: "text_delta", + delta: `${open}Reasoning chunk that should not leak`, + }, + }); + + expect(onBlockReply).not.toHaveBeenCalled(); + + handler?.({ + type: "message_update", + message: { role: "assistant" }, + assistantMessageEvent: { + type: "text_delta", + delta: `${close}\n\nFinal answer`, + }, + }); + + handler?.({ + type: "message_update", + message: { role: "assistant" }, + assistantMessageEvent: { type: "text_end" }, + }); + + const payloadTexts = onBlockReply.mock.calls + .map((call) => call[0]?.text) + .filter((value): value is string => typeof value === "string"); + expect(payloadTexts.length).toBeGreaterThan(0); + for (const text of payloadTexts) { + expect(text).not.toContain("Reasoning"); + expect(text).not.toContain(open); + } + const combined = payloadTexts.join(" ").replace(/\s+/g, " ").trim(); + expect(combined).toBe("Final answer"); + }); + it("emits block replies on text_end and does not duplicate on message_end", () => { let handler: ((evt: unknown) => void) | undefined; const session: StubSession = { @@ -1058,10 +1126,17 @@ describe("subscribeEmbeddedPiSession", () => { handler?.({ type: "message_end", message: assistantMessage }); - expect(onBlockReply).toHaveBeenCalledTimes(3); - expect(onBlockReply.mock.calls[1][0].text).toBe( - "````md\nline1\nline2\n````", - ); + const payloadTexts = onBlockReply.mock.calls + .map((call) => call[0]?.text) + .filter((value): value is string => typeof value === "string"); + expect(payloadTexts.length).toBeGreaterThan(0); + const combined = payloadTexts.join(" ").replace(/\s+/g, " ").trim(); + expect(combined).toContain("````md"); + expect(combined).toContain("line1"); + expect(combined).toContain("line2"); + expect(combined).toContain("````"); + expect(combined).toContain("Intro"); + expect(combined).toContain("Outro"); }); it("splits long single-line fenced blocks with reopen/close", () => { diff --git a/src/agents/pi-embedded-subscribe.ts b/src/agents/pi-embedded-subscribe.ts index 62be6d416..96ac61128 100644 --- a/src/agents/pi-embedded-subscribe.ts +++ b/src/agents/pi-embedded-subscribe.ts @@ -376,6 +376,8 @@ export function subscribeEmbeddedPiSession(params: { typeof params.onReasoningStream === "function"; let deltaBuffer = ""; let blockBuffer = ""; + // Track if a streamed chunk opened a block (stateful across chunks). + let blockThinkingActive = false; let lastStreamedAssistant: string | undefined; let lastStreamedReasoning: string | undefined; let lastBlockReplyText: string | undefined; @@ -481,9 +483,32 @@ export function subscribeEmbeddedPiSession(params: { } }; + const stripBlockThinkingSegments = (text: string): string => { + if (!text) return text; + if (!blockThinkingActive && !THINKING_TAG_SCAN_RE.test(text)) return text; + THINKING_TAG_SCAN_RE.lastIndex = 0; + let result = ""; + let lastIndex = 0; + let inThinking = blockThinkingActive; + for (const match of text.matchAll(THINKING_TAG_SCAN_RE)) { + const idx = match.index ?? 0; + if (!inThinking) { + result += text.slice(lastIndex, idx); + } + const isClose = match[1] === "/"; + inThinking = !isClose; + lastIndex = idx + match[0].length; + } + if (!inThinking) { + result += text.slice(lastIndex); + } + blockThinkingActive = inThinking; + return result; + }; + const emitBlockChunk = (text: string) => { - // Strip any tags that may have leaked into the output (e.g., from Gemini mimicking history) - const strippedText = stripThinkingSegments(stripUnpairedThinkingTags(text)); + // Strip blocks across chunk boundaries to avoid leaking reasoning. + const strippedText = stripBlockThinkingSegments(text); const chunk = strippedText.trimEnd(); if (!chunk) return; if (chunk === lastBlockReplyText) return; @@ -571,6 +596,7 @@ export function subscribeEmbeddedPiSession(params: { deltaBuffer = ""; blockBuffer = ""; blockChunker?.reset(); + blockThinkingActive = false; lastStreamedAssistant = undefined; lastStreamedReasoning = undefined; lastBlockReplyText = undefined; @@ -590,6 +616,7 @@ export function subscribeEmbeddedPiSession(params: { deltaBuffer = ""; blockBuffer = ""; blockChunker?.reset(); + blockThinkingActive = false; lastStreamedAssistant = undefined; lastBlockReplyText = undefined; lastStreamedReasoning = undefined; @@ -973,6 +1000,7 @@ export function subscribeEmbeddedPiSession(params: { deltaBuffer = ""; blockBuffer = ""; blockChunker?.reset(); + blockThinkingActive = false; lastStreamedAssistant = undefined; } } @@ -1054,6 +1082,7 @@ export function subscribeEmbeddedPiSession(params: { blockBuffer = ""; } } + blockThinkingActive = false; if (pendingCompactionRetry > 0) { resolveCompactionRetry(); } else { From 51ec578cec987fe33b33b8b8e02669f770f47758 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Sat, 10 Jan 2026 00:02:13 +0100 Subject: [PATCH 2/2] fix: suppress leakage + split reasoning output (#614) (thanks @zknicker) --- CHANGELOG.md | 1 + docs/tools/thinking.md | 4 +-- src/agents/pi-embedded-runner.ts | 28 ++++++++--------- src/agents/pi-embedded-subscribe.test.ts | 15 ++++----- src/agents/pi-embedded-subscribe.ts | 40 ++++++++++++++---------- src/agents/pi-embedded-utils.ts | 7 +---- 6 files changed, 49 insertions(+), 46 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 8062d27d5..beb7e0660 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -76,6 +76,7 @@ - Auto-reply: preserve spacing when stripping inline directives. (#539) — thanks @joshp123 - Auto-reply: relax reply tag parsing to allow whitespace. (#560) — thanks @mcinteerj - Auto-reply: add per-provider block streaming toggles and coalesce streamed blocks to reduce line spam. (#536) — thanks @mcinteerj +- Auto-reply: suppress `` leakage in block streaming and emit `/reasoning` as a separate `Reasoning:` message. (#614) — thanks @zknicker - Auto-reply: default block streaming off for non-Telegram providers unless explicitly enabled, and avoid splitting on forced flushes below max. - Auto-reply: raise default coalesce minChars for Signal/Slack/Discord and clarify streaming vs draft streaming in docs. - Auto-reply: default block streaming coalesce idle to 1s to reduce tiny chunks. — thanks @steipete diff --git a/docs/tools/thinking.md b/docs/tools/thinking.md index b5a396085..5f407015e 100644 --- a/docs/tools/thinking.md +++ b/docs/tools/thinking.md @@ -38,8 +38,8 @@ read_when: ## Reasoning visibility (/reasoning) - Levels: `on|off|stream`. -- Directive-only message toggles whether thinking blocks are shown as italic text in replies. -- When enabled, any model-provided reasoning content is appended as a separate italic block. +- Directive-only message toggles whether thinking blocks are shown in replies. +- When enabled, reasoning is sent as a **separate message** prefixed with `Reasoning:`. - `stream` (Telegram only): streams reasoning into the Telegram draft bubble while the reply is generating, then sends the final answer without reasoning. - Alias: `/reason`. - Send `/reasoning` (or `/reasoning:`) with no argument to see the current reasoning level. diff --git a/src/agents/pi-embedded-runner.ts b/src/agents/pi-embedded-runner.ts index 0123d26bd..6d23a45c7 100644 --- a/src/agents/pi-embedded-runner.ts +++ b/src/agents/pi-embedded-runner.ts @@ -1604,23 +1604,21 @@ export async function runEmbeddedPiAgent(params: { } } - const fallbackText = lastAssistant - ? (() => { - const base = extractAssistantText(lastAssistant); - if (params.reasoningLevel !== "on") return base; - const thinking = extractAssistantThinking(lastAssistant); - const formatted = thinking - ? formatReasoningMarkdown(thinking) - : ""; - if (!formatted) return base; - return base ? `${formatted}\n\n${base}` : formatted; - })() + const reasoningText = + lastAssistant && params.reasoningLevel === "on" + ? formatReasoningMarkdown(extractAssistantThinking(lastAssistant)) + : ""; + if (reasoningText) replyItems.push({ text: reasoningText }); + + const fallbackAnswerText = lastAssistant + ? extractAssistantText(lastAssistant) : ""; - for (const text of assistantTexts.length + const answerTexts = assistantTexts.length ? assistantTexts - : fallbackText - ? [fallbackText] - : []) { + : fallbackAnswerText + ? [fallbackAnswerText] + : []; + for (const text of answerTexts) { const { text: cleanedText, mediaUrls } = splitMediaFromOutput(text); if (!cleanedText && (!mediaUrls || mediaUrls.length === 0)) continue; diff --git a/src/agents/pi-embedded-subscribe.test.ts b/src/agents/pi-embedded-subscribe.test.ts index b85b3a028..c238b2b8b 100644 --- a/src/agents/pi-embedded-subscribe.test.ts +++ b/src/agents/pi-embedded-subscribe.test.ts @@ -129,7 +129,7 @@ describe("subscribeEmbeddedPiSession", () => { expect(payload.text).toBe("Hello block"); }); - it("prepends reasoning before text when enabled", () => { + it("emits reasoning as a separate message when enabled", () => { let handler: ((evt: unknown) => void) | undefined; const session: StubSession = { subscribe: (fn) => { @@ -160,11 +160,11 @@ describe("subscribeEmbeddedPiSession", () => { handler?.({ type: "message_end", message: assistantMessage }); - expect(onBlockReply).toHaveBeenCalledTimes(1); - const payload = onBlockReply.mock.calls[0][0]; - expect(payload.text).toBe( - "_Reasoning:_\n_Because it helps_\n\nFinal answer", + expect(onBlockReply).toHaveBeenCalledTimes(2); + expect(onBlockReply.mock.calls[0][0].text).toBe( + "Reasoning:\nBecause it helps", ); + expect(onBlockReply.mock.calls[1][0].text).toBe("Final answer"); }); it("promotes tags to thinking blocks at write-time", () => { @@ -200,10 +200,11 @@ describe("subscribeEmbeddedPiSession", () => { handler?.({ type: "message_end", message: assistantMessage }); - expect(onBlockReply).toHaveBeenCalledTimes(1); + expect(onBlockReply).toHaveBeenCalledTimes(2); expect(onBlockReply.mock.calls[0][0].text).toBe( - "_Reasoning:_\n_Because it helps_\n\nFinal answer", + "Reasoning:\nBecause it helps", ); + expect(onBlockReply.mock.calls[1][0].text).toBe("Final answer"); expect(assistantMessage.content).toEqual([ { type: "thinking", thinking: "Because it helps" }, diff --git a/src/agents/pi-embedded-subscribe.ts b/src/agents/pi-embedded-subscribe.ts index 96ac61128..82943a2fe 100644 --- a/src/agents/pi-embedded-subscribe.ts +++ b/src/agents/pi-embedded-subscribe.ts @@ -936,11 +936,7 @@ export function subscribeEmbeddedPiSession(params: { const formattedReasoning = rawThinking ? formatReasoningMarkdown(rawThinking) : ""; - const text = includeReasoning - ? baseText && formattedReasoning - ? `${formattedReasoning}\n\n${baseText}` - : formattedReasoning || baseText - : baseText; + const text = baseText; const addedDuringMessage = assistantTexts.length > assistantTextBaseline; @@ -953,13 +949,28 @@ export function subscribeEmbeddedPiSession(params: { } assistantTextBaseline = assistantTexts.length; + const onBlockReply = params.onBlockReply; + const shouldEmitReasoning = + includeReasoning && + Boolean(formattedReasoning) && + Boolean(onBlockReply) && + formattedReasoning !== lastReasoningSent; + const shouldEmitReasoningBeforeAnswer = + shouldEmitReasoning && + blockReplyBreak === "message_end" && + !addedDuringMessage; + if (shouldEmitReasoningBeforeAnswer && formattedReasoning) { + lastReasoningSent = formattedReasoning; + void onBlockReply?.({ text: formattedReasoning }); + } + if ( (blockReplyBreak === "message_end" || (blockChunker ? blockChunker.hasBuffered() : blockBuffer.length > 0)) && text && - params.onBlockReply + onBlockReply ) { if (blockChunker?.hasBuffered()) { blockChunker.drain({ force: true, emit: emitBlockChunk }); @@ -975,7 +986,7 @@ export function subscribeEmbeddedPiSession(params: { const { text: cleanedText, mediaUrls } = splitMediaFromOutput(text); if (cleanedText || (mediaUrls && mediaUrls.length > 0)) { - void params.onBlockReply({ + void onBlockReply({ text: cleanedText, mediaUrls: mediaUrls?.length ? mediaUrls : undefined, }); @@ -983,16 +994,13 @@ export function subscribeEmbeddedPiSession(params: { } } } - const onBlockReply = params.onBlockReply; - const shouldEmitReasoningBlock = - includeReasoning && - Boolean(formattedReasoning) && - Boolean(onBlockReply) && - formattedReasoning !== lastReasoningSent && - (blockReplyBreak === "text_end" || Boolean(blockChunker)); - if (shouldEmitReasoningBlock && formattedReasoning && onBlockReply) { + if ( + shouldEmitReasoning && + !shouldEmitReasoningBeforeAnswer && + formattedReasoning + ) { lastReasoningSent = formattedReasoning; - void onBlockReply({ text: formattedReasoning }); + void onBlockReply?.({ text: formattedReasoning }); } if (streamReasoning && rawThinking) { emitReasoningStream(rawThinking); diff --git a/src/agents/pi-embedded-utils.ts b/src/agents/pi-embedded-utils.ts index c7698682b..96b2931ad 100644 --- a/src/agents/pi-embedded-utils.ts +++ b/src/agents/pi-embedded-utils.ts @@ -37,12 +37,7 @@ export function extractAssistantThinking(msg: AssistantMessage): string { export function formatReasoningMarkdown(text: string): string { const trimmed = text.trim(); if (!trimmed) return ""; - const lines = trimmed.split(/\r?\n/); - const wrapped = lines - .map((line) => line.trim()) - .map((line) => (line ? `_${line}_` : "")) - .filter((line) => line.length > 0); - return wrapped.length > 0 ? [`_Reasoning:_`, ...wrapped].join("\n") : ""; + return `Reasoning:\n${trimmed}`; } export function inferToolMetaFromArgs(