diff --git a/src/agents/pi-embedded-subscribe.test.ts b/src/agents/pi-embedded-subscribe.test.ts
index 40949d799..b85b3a028 100644
--- a/src/agents/pi-embedded-subscribe.test.ts
+++ b/src/agents/pi-embedded-subscribe.test.ts
@@ -278,6 +278,74 @@ describe("subscribeEmbeddedPiSession", () => {
]);
});
+ it.each([
+ { tag: "think", open: "", close: "" },
+ { tag: "thinking", open: "", close: "" },
+ ])("suppresses <%s> blocks across chunk boundaries", ({ open, close }) => {
+ let handler: ((evt: unknown) => void) | undefined;
+ const session: StubSession = {
+ subscribe: (fn) => {
+ handler = fn;
+ return () => {};
+ },
+ };
+
+ const onBlockReply = vi.fn();
+
+ subscribeEmbeddedPiSession({
+ session: session as unknown as Parameters<
+ typeof subscribeEmbeddedPiSession
+ >[0]["session"],
+ runId: "run",
+ onBlockReply,
+ blockReplyBreak: "text_end",
+ blockReplyChunking: {
+ minChars: 5,
+ maxChars: 50,
+ breakPreference: "newline",
+ },
+ });
+
+ handler?.({ type: "message_start", message: { role: "assistant" } });
+
+ handler?.({
+ type: "message_update",
+ message: { role: "assistant" },
+ assistantMessageEvent: {
+ type: "text_delta",
+ delta: `${open}Reasoning chunk that should not leak`,
+ },
+ });
+
+ expect(onBlockReply).not.toHaveBeenCalled();
+
+ handler?.({
+ type: "message_update",
+ message: { role: "assistant" },
+ assistantMessageEvent: {
+ type: "text_delta",
+ delta: `${close}\n\nFinal answer`,
+ },
+ });
+
+ handler?.({
+ type: "message_update",
+ message: { role: "assistant" },
+ assistantMessageEvent: { type: "text_end" },
+ });
+
+ const payloadTexts = onBlockReply.mock.calls
+ .map((call) => call[0]?.text)
+ .filter((value): value is string => typeof value === "string");
+ expect(payloadTexts.length).toBeGreaterThan(0);
+ for (const text of payloadTexts) {
+ expect(text).not.toContain("Reasoning");
+ expect(text).not.toContain(open);
+ }
+ const combined = payloadTexts.join(" ").replace(/\s+/g, " ").trim();
+ expect(combined).toBe("Final answer");
+ });
+
it("emits block replies on text_end and does not duplicate on message_end", () => {
let handler: ((evt: unknown) => void) | undefined;
const session: StubSession = {
@@ -1058,10 +1126,17 @@ describe("subscribeEmbeddedPiSession", () => {
handler?.({ type: "message_end", message: assistantMessage });
- expect(onBlockReply).toHaveBeenCalledTimes(3);
- expect(onBlockReply.mock.calls[1][0].text).toBe(
- "````md\nline1\nline2\n````",
- );
+ const payloadTexts = onBlockReply.mock.calls
+ .map((call) => call[0]?.text)
+ .filter((value): value is string => typeof value === "string");
+ expect(payloadTexts.length).toBeGreaterThan(0);
+ const combined = payloadTexts.join(" ").replace(/\s+/g, " ").trim();
+ expect(combined).toContain("````md");
+ expect(combined).toContain("line1");
+ expect(combined).toContain("line2");
+ expect(combined).toContain("````");
+ expect(combined).toContain("Intro");
+ expect(combined).toContain("Outro");
});
it("splits long single-line fenced blocks with reopen/close", () => {
diff --git a/src/agents/pi-embedded-subscribe.ts b/src/agents/pi-embedded-subscribe.ts
index 62be6d416..96ac61128 100644
--- a/src/agents/pi-embedded-subscribe.ts
+++ b/src/agents/pi-embedded-subscribe.ts
@@ -376,6 +376,8 @@ export function subscribeEmbeddedPiSession(params: {
typeof params.onReasoningStream === "function";
let deltaBuffer = "";
let blockBuffer = "";
+ // Track if a streamed chunk opened a block (stateful across chunks).
+ let blockThinkingActive = false;
let lastStreamedAssistant: string | undefined;
let lastStreamedReasoning: string | undefined;
let lastBlockReplyText: string | undefined;
@@ -481,9 +483,32 @@ export function subscribeEmbeddedPiSession(params: {
}
};
+ const stripBlockThinkingSegments = (text: string): string => {
+ if (!text) return text;
+ if (!blockThinkingActive && !THINKING_TAG_SCAN_RE.test(text)) return text;
+ THINKING_TAG_SCAN_RE.lastIndex = 0;
+ let result = "";
+ let lastIndex = 0;
+ let inThinking = blockThinkingActive;
+ for (const match of text.matchAll(THINKING_TAG_SCAN_RE)) {
+ const idx = match.index ?? 0;
+ if (!inThinking) {
+ result += text.slice(lastIndex, idx);
+ }
+ const isClose = match[1] === "/";
+ inThinking = !isClose;
+ lastIndex = idx + match[0].length;
+ }
+ if (!inThinking) {
+ result += text.slice(lastIndex);
+ }
+ blockThinkingActive = inThinking;
+ return result;
+ };
+
const emitBlockChunk = (text: string) => {
- // Strip any tags that may have leaked into the output (e.g., from Gemini mimicking history)
- const strippedText = stripThinkingSegments(stripUnpairedThinkingTags(text));
+ // Strip blocks across chunk boundaries to avoid leaking reasoning.
+ const strippedText = stripBlockThinkingSegments(text);
const chunk = strippedText.trimEnd();
if (!chunk) return;
if (chunk === lastBlockReplyText) return;
@@ -571,6 +596,7 @@ export function subscribeEmbeddedPiSession(params: {
deltaBuffer = "";
blockBuffer = "";
blockChunker?.reset();
+ blockThinkingActive = false;
lastStreamedAssistant = undefined;
lastStreamedReasoning = undefined;
lastBlockReplyText = undefined;
@@ -590,6 +616,7 @@ export function subscribeEmbeddedPiSession(params: {
deltaBuffer = "";
blockBuffer = "";
blockChunker?.reset();
+ blockThinkingActive = false;
lastStreamedAssistant = undefined;
lastBlockReplyText = undefined;
lastStreamedReasoning = undefined;
@@ -973,6 +1000,7 @@ export function subscribeEmbeddedPiSession(params: {
deltaBuffer = "";
blockBuffer = "";
blockChunker?.reset();
+ blockThinkingActive = false;
lastStreamedAssistant = undefined;
}
}
@@ -1054,6 +1082,7 @@ export function subscribeEmbeddedPiSession(params: {
blockBuffer = "";
}
}
+ blockThinkingActive = false;
if (pendingCompactionRetry > 0) {
resolveCompactionRetry();
} else {