diff --git a/src/agents/pi-embedded-subscribe.test.ts b/src/agents/pi-embedded-subscribe.test.ts index c238b2b8b..2b34439f7 100644 --- a/src/agents/pi-embedded-subscribe.test.ts +++ b/src/agents/pi-embedded-subscribe.test.ts @@ -347,6 +347,53 @@ describe("subscribeEmbeddedPiSession", () => { expect(combined).toBe("Final answer"); }); + it("keeps assistantTexts to the final answer when block replies are disabled", () => { + let handler: ((evt: unknown) => void) | undefined; + const session: StubSession = { + subscribe: (fn) => { + handler = fn; + return () => {}; + }, + }; + + const subscription = subscribeEmbeddedPiSession({ + session: session as unknown as Parameters< + typeof subscribeEmbeddedPiSession + >[0]["session"], + runId: "run", + reasoningMode: "on", + }); + + handler?.({ type: "message_start", message: { role: "assistant" } }); + handler?.({ + type: "message_update", + message: { role: "assistant" }, + assistantMessageEvent: { + type: "text_delta", + delta: "Final answer", + }, + }); + handler?.({ + type: "message_update", + message: { role: "assistant" }, + assistantMessageEvent: { + type: "text_end", + }, + }); + + const assistantMessage = { + role: "assistant", + content: [ + { type: "thinking", thinking: "Because it helps" }, + { type: "text", text: "Final answer" }, + ], + } as AssistantMessage; + + handler?.({ type: "message_end", message: assistantMessage }); + + expect(subscription.assistantTexts).toEqual(["Final answer"]); + }); + it("emits block replies on text_end and does not duplicate on message_end", () => { let handler: ((evt: unknown) => void) | undefined; const session: StubSession = { @@ -580,6 +627,37 @@ describe("subscribeEmbeddedPiSession", () => { expect(subscription.assistantTexts).toEqual(["Hello world"]); }); + it("does not duplicate assistantTexts when message_end repeats with reasoning blocks", () => { + let handler: SessionEventHandler | undefined; + const session: StubSession = { + subscribe: (fn) => { + handler = fn; + return () => {}; + }, + }; + + const subscription = subscribeEmbeddedPiSession({ + session: session as unknown as Parameters< + typeof subscribeEmbeddedPiSession + >[0]["session"], + runId: "run", + reasoningMode: "on", + }); + + const assistantMessage = { + role: "assistant", + content: [ + { type: "thinking", thinking: "Because" }, + { type: "text", text: "Hello world" }, + ], + } as AssistantMessage; + + handler?.({ type: "message_end", message: assistantMessage }); + handler?.({ type: "message_end", message: assistantMessage }); + + expect(subscription.assistantTexts).toEqual(["Hello world"]); + }); + it("populates assistantTexts for non-streaming models with chunking enabled", () => { // Non-streaming models (e.g. zai/glm-4.7): no text_delta events; message_end // must still populate assistantTexts so providers can deliver a final reply. diff --git a/src/agents/pi-embedded-subscribe.ts b/src/agents/pi-embedded-subscribe.ts index 1f2350e11..10a084520 100644 --- a/src/agents/pi-embedded-subscribe.ts +++ b/src/agents/pi-embedded-subscribe.ts @@ -294,6 +294,7 @@ export function subscribeEmbeddedPiSession(params: { let lastStreamedReasoning: string | undefined; let lastBlockReplyText: string | undefined; let assistantTextBaseline = 0; + let suppressBlockChunks = false; // Avoid late chunk inserts after final text merge. let compactionInFlight = false; let pendingCompactionRetry = 0; let compactionRetryResolve: (() => void) | undefined; @@ -419,6 +420,7 @@ export function subscribeEmbeddedPiSession(params: { }; const emitBlockChunk = (text: string) => { + if (suppressBlockChunks) return; // Strip blocks across chunk boundaries to avoid leaking reasoning. const strippedText = stripBlockThinkingSegments(text); const chunk = strippedText.trimEnd(); @@ -476,6 +478,7 @@ export function subscribeEmbeddedPiSession(params: { lastStreamedAssistant = undefined; lastStreamedReasoning = undefined; lastBlockReplyText = undefined; + suppressBlockChunks = false; assistantTextBaseline = 0; }; @@ -497,6 +500,7 @@ export function subscribeEmbeddedPiSession(params: { lastBlockReplyText = undefined; lastStreamedReasoning = undefined; lastReasoningSent = undefined; + suppressBlockChunks = false; assistantTextBaseline = assistantTexts.length; } } @@ -818,9 +822,23 @@ export function subscribeEmbeddedPiSession(params: { const addedDuringMessage = assistantTexts.length > assistantTextBaseline; const chunkerHasBuffered = blockChunker?.hasBuffered() ?? false; - // Non-streaming models (no text_delta): ensure assistantTexts gets the - // final text when the chunker has nothing buffered to drain. - if (!addedDuringMessage && !chunkerHasBuffered && text) { + // If we're not streaming block replies, ensure the final payload + // includes the final text even when deltas already populated assistantTexts. + if (includeReasoning && text && !params.onBlockReply) { + if (assistantTexts.length > assistantTextBaseline) { + assistantTexts.splice( + assistantTextBaseline, + assistantTexts.length - assistantTextBaseline, + text, + ); + } else { + const last = assistantTexts.at(-1); + if (!last || last !== text) assistantTexts.push(text); + } + suppressBlockChunks = true; + } else if (!addedDuringMessage && !chunkerHasBuffered && text) { + // Non-streaming models (no text_delta): ensure assistantTexts gets the + // final text when the chunker has nothing buffered to drain. const last = assistantTexts.at(-1); if (!last || last !== text) assistantTexts.push(text); }