From 2d0ca67c210968d8fe9bc93e9a061aee16d9306f Mon Sep 17 00:00:00 2001 From: Zach Knickerbocker Date: Fri, 9 Jan 2026 17:25:10 -0500 Subject: [PATCH] fix: prevent leakage in block streaming --- src/agents/pi-embedded-subscribe.test.ts | 83 ++++++++++++++++++++++-- src/agents/pi-embedded-subscribe.ts | 33 +++++++++- 2 files changed, 110 insertions(+), 6 deletions(-) diff --git a/src/agents/pi-embedded-subscribe.test.ts b/src/agents/pi-embedded-subscribe.test.ts index 40949d799..b85b3a028 100644 --- a/src/agents/pi-embedded-subscribe.test.ts +++ b/src/agents/pi-embedded-subscribe.test.ts @@ -278,6 +278,74 @@ describe("subscribeEmbeddedPiSession", () => { ]); }); + it.each([ + { tag: "think", open: "", close: "" }, + { tag: "thinking", open: "", close: "" }, + ])("suppresses <%s> blocks across chunk boundaries", ({ open, close }) => { + let handler: ((evt: unknown) => void) | undefined; + const session: StubSession = { + subscribe: (fn) => { + handler = fn; + return () => {}; + }, + }; + + const onBlockReply = vi.fn(); + + subscribeEmbeddedPiSession({ + session: session as unknown as Parameters< + typeof subscribeEmbeddedPiSession + >[0]["session"], + runId: "run", + onBlockReply, + blockReplyBreak: "text_end", + blockReplyChunking: { + minChars: 5, + maxChars: 50, + breakPreference: "newline", + }, + }); + + handler?.({ type: "message_start", message: { role: "assistant" } }); + + handler?.({ + type: "message_update", + message: { role: "assistant" }, + assistantMessageEvent: { + type: "text_delta", + delta: `${open}Reasoning chunk that should not leak`, + }, + }); + + expect(onBlockReply).not.toHaveBeenCalled(); + + handler?.({ + type: "message_update", + message: { role: "assistant" }, + assistantMessageEvent: { + type: "text_delta", + delta: `${close}\n\nFinal answer`, + }, + }); + + handler?.({ + type: "message_update", + message: { role: "assistant" }, + assistantMessageEvent: { type: "text_end" }, + }); + + const payloadTexts = onBlockReply.mock.calls + .map((call) => call[0]?.text) + .filter((value): value is string => typeof value === "string"); + expect(payloadTexts.length).toBeGreaterThan(0); + for (const text of payloadTexts) { + expect(text).not.toContain("Reasoning"); + expect(text).not.toContain(open); + } + const combined = payloadTexts.join(" ").replace(/\s+/g, " ").trim(); + expect(combined).toBe("Final answer"); + }); + it("emits block replies on text_end and does not duplicate on message_end", () => { let handler: ((evt: unknown) => void) | undefined; const session: StubSession = { @@ -1058,10 +1126,17 @@ describe("subscribeEmbeddedPiSession", () => { handler?.({ type: "message_end", message: assistantMessage }); - expect(onBlockReply).toHaveBeenCalledTimes(3); - expect(onBlockReply.mock.calls[1][0].text).toBe( - "````md\nline1\nline2\n````", - ); + const payloadTexts = onBlockReply.mock.calls + .map((call) => call[0]?.text) + .filter((value): value is string => typeof value === "string"); + expect(payloadTexts.length).toBeGreaterThan(0); + const combined = payloadTexts.join(" ").replace(/\s+/g, " ").trim(); + expect(combined).toContain("````md"); + expect(combined).toContain("line1"); + expect(combined).toContain("line2"); + expect(combined).toContain("````"); + expect(combined).toContain("Intro"); + expect(combined).toContain("Outro"); }); it("splits long single-line fenced blocks with reopen/close", () => { diff --git a/src/agents/pi-embedded-subscribe.ts b/src/agents/pi-embedded-subscribe.ts index 62be6d416..96ac61128 100644 --- a/src/agents/pi-embedded-subscribe.ts +++ b/src/agents/pi-embedded-subscribe.ts @@ -376,6 +376,8 @@ export function subscribeEmbeddedPiSession(params: { typeof params.onReasoningStream === "function"; let deltaBuffer = ""; let blockBuffer = ""; + // Track if a streamed chunk opened a block (stateful across chunks). + let blockThinkingActive = false; let lastStreamedAssistant: string | undefined; let lastStreamedReasoning: string | undefined; let lastBlockReplyText: string | undefined; @@ -481,9 +483,32 @@ export function subscribeEmbeddedPiSession(params: { } }; + const stripBlockThinkingSegments = (text: string): string => { + if (!text) return text; + if (!blockThinkingActive && !THINKING_TAG_SCAN_RE.test(text)) return text; + THINKING_TAG_SCAN_RE.lastIndex = 0; + let result = ""; + let lastIndex = 0; + let inThinking = blockThinkingActive; + for (const match of text.matchAll(THINKING_TAG_SCAN_RE)) { + const idx = match.index ?? 0; + if (!inThinking) { + result += text.slice(lastIndex, idx); + } + const isClose = match[1] === "/"; + inThinking = !isClose; + lastIndex = idx + match[0].length; + } + if (!inThinking) { + result += text.slice(lastIndex); + } + blockThinkingActive = inThinking; + return result; + }; + const emitBlockChunk = (text: string) => { - // Strip any tags that may have leaked into the output (e.g., from Gemini mimicking history) - const strippedText = stripThinkingSegments(stripUnpairedThinkingTags(text)); + // Strip blocks across chunk boundaries to avoid leaking reasoning. + const strippedText = stripBlockThinkingSegments(text); const chunk = strippedText.trimEnd(); if (!chunk) return; if (chunk === lastBlockReplyText) return; @@ -571,6 +596,7 @@ export function subscribeEmbeddedPiSession(params: { deltaBuffer = ""; blockBuffer = ""; blockChunker?.reset(); + blockThinkingActive = false; lastStreamedAssistant = undefined; lastStreamedReasoning = undefined; lastBlockReplyText = undefined; @@ -590,6 +616,7 @@ export function subscribeEmbeddedPiSession(params: { deltaBuffer = ""; blockBuffer = ""; blockChunker?.reset(); + blockThinkingActive = false; lastStreamedAssistant = undefined; lastBlockReplyText = undefined; lastStreamedReasoning = undefined; @@ -973,6 +1000,7 @@ export function subscribeEmbeddedPiSession(params: { deltaBuffer = ""; blockBuffer = ""; blockChunker?.reset(); + blockThinkingActive = false; lastStreamedAssistant = undefined; } } @@ -1054,6 +1082,7 @@ export function subscribeEmbeddedPiSession(params: { blockBuffer = ""; } } + blockThinkingActive = false; if (pendingCompactionRetry > 0) { resolveCompactionRetry(); } else {