diff --git a/src/agents/pi-embedded-subscribe.test.ts b/src/agents/pi-embedded-subscribe.test.ts index 038aa273a..1502c5822 100644 --- a/src/agents/pi-embedded-subscribe.test.ts +++ b/src/agents/pi-embedded-subscribe.test.ts @@ -402,6 +402,68 @@ describe("subscribeEmbeddedPiSession", () => { expect(subscription.assistantTexts).toEqual(["Final answer"]); }); + it("suppresses partial replies when reasoning is enabled and block replies are disabled", () => { + let handler: ((evt: unknown) => void) | undefined; + const session: StubSession = { + subscribe: (fn) => { + handler = fn; + return () => {}; + }, + }; + + const onPartialReply = vi.fn(); + + const subscription = subscribeEmbeddedPiSession({ + session: session as unknown as Parameters< + typeof subscribeEmbeddedPiSession + >[0]["session"], + runId: "run", + reasoningMode: "on", + onPartialReply, + }); + + handler?.({ type: "message_start", message: { role: "assistant" } }); + handler?.({ + type: "message_update", + message: { role: "assistant" }, + assistantMessageEvent: { + type: "text_delta", + delta: "Draft ", + }, + }); + handler?.({ + type: "message_update", + message: { role: "assistant" }, + assistantMessageEvent: { + type: "text_delta", + delta: "reply", + }, + }); + + expect(onPartialReply).not.toHaveBeenCalled(); + + const assistantMessage = { + role: "assistant", + content: [ + { type: "thinking", thinking: "Because it helps" }, + { type: "text", text: "Final answer" }, + ], + } as AssistantMessage; + + handler?.({ type: "message_end", message: assistantMessage }); + handler?.({ + type: "message_update", + message: { role: "assistant" }, + assistantMessageEvent: { + type: "text_end", + content: "Draft reply", + }, + }); + + expect(onPartialReply).not.toHaveBeenCalled(); + expect(subscription.assistantTexts).toEqual(["Final answer"]); + }); + it("emits block replies on text_end and does not duplicate on message_end", () => { let handler: ((evt: unknown) => void) | undefined; const session: StubSession = { diff --git a/src/agents/pi-embedded-subscribe.ts b/src/agents/pi-embedded-subscribe.ts index 4b582adab..e3b59d12b 100644 --- a/src/agents/pi-embedded-subscribe.ts +++ b/src/agents/pi-embedded-subscribe.ts @@ -223,6 +223,7 @@ export function subscribeEmbeddedPiSession(params: { const blockReplyBreak = params.blockReplyBreak ?? "text_end"; const reasoningMode = params.reasoningMode ?? "off"; const includeReasoning = reasoningMode === "on"; + const shouldEmitPartialReplies = !(includeReasoning && !params.onBlockReply); const streamReasoning = reasoningMode === "stream" && typeof params.onReasoningStream === "function"; @@ -241,6 +242,50 @@ export function subscribeEmbeddedPiSession(params: { let compactionRetryPromise: Promise | null = null; let lastReasoningSent: string | undefined; + const resetAssistantMessageState = (nextAssistantTextBaseline: number) => { + deltaBuffer = ""; + blockBuffer = ""; + blockChunker?.reset(); + blockThinkingActive = false; + lastStreamedAssistant = undefined; + lastBlockReplyText = undefined; + lastStreamedReasoning = undefined; + lastReasoningSent = undefined; + suppressBlockChunks = false; + assistantTextBaseline = nextAssistantTextBaseline; + }; + + const finalizeAssistantTexts = (args: { + text: string; + addedDuringMessage: boolean; + chunkerHasBuffered: boolean; + }) => { + const { text, addedDuringMessage, chunkerHasBuffered } = args; + + // If we're not streaming block replies, ensure the final payload includes + // the final text even when interim streaming was enabled. + if (includeReasoning && text && !params.onBlockReply) { + if (assistantTexts.length > assistantTextBaseline) { + assistantTexts.splice( + assistantTextBaseline, + assistantTexts.length - assistantTextBaseline, + text, + ); + } else { + const last = assistantTexts.at(-1); + if (!last || last !== text) assistantTexts.push(text); + } + suppressBlockChunks = true; + } else if (!addedDuringMessage && !chunkerHasBuffered && text) { + // Non-streaming models (no text_delta): ensure assistantTexts gets the final + // text when the chunker has nothing buffered to drain. + const last = assistantTexts.at(-1); + if (!last || last !== text) assistantTexts.push(text); + } + + assistantTextBaseline = assistantTexts.length; + }; + // ── Messaging tool duplicate detection ────────────────────────────────────── // Track texts sent via messaging tools to suppress duplicate block replies. // Only committed (successful) texts are checked - pending texts are tracked @@ -426,15 +471,7 @@ export function subscribeEmbeddedPiSession(params: { messagingToolSentTargets.length = 0; pendingMessagingTexts.clear(); pendingMessagingTargets.clear(); - deltaBuffer = ""; - blockBuffer = ""; - blockChunker?.reset(); - blockThinkingActive = false; - lastStreamedAssistant = undefined; - lastStreamedReasoning = undefined; - lastBlockReplyText = undefined; - suppressBlockChunks = false; - assistantTextBaseline = 0; + resetAssistantMessageState(0); }; const unsubscribe = params.session.subscribe( @@ -447,16 +484,7 @@ export function subscribeEmbeddedPiSession(params: { // Start-of-message is a safer reset point than message_end: some providers // may deliver late text_end updates after message_end, which would // otherwise re-trigger block replies. - deltaBuffer = ""; - blockBuffer = ""; - blockChunker?.reset(); - blockThinkingActive = false; - lastStreamedAssistant = undefined; - lastBlockReplyText = undefined; - lastStreamedReasoning = undefined; - lastReasoningSent = undefined; - suppressBlockChunks = false; - assistantTextBaseline = assistantTexts.length; + resetAssistantMessageState(assistantTexts.length); } } @@ -717,7 +745,7 @@ export function subscribeEmbeddedPiSession(params: { mediaUrls: mediaUrls?.length ? mediaUrls : undefined, }, }); - if (params.onPartialReply) { + if (params.onPartialReply && shouldEmitPartialReplies) { void params.onPartialReply({ text: cleanedText, mediaUrls: mediaUrls?.length ? mediaUrls : undefined, @@ -780,27 +808,11 @@ export function subscribeEmbeddedPiSession(params: { const addedDuringMessage = assistantTexts.length > assistantTextBaseline; const chunkerHasBuffered = blockChunker?.hasBuffered() ?? false; - // If we're not streaming block replies, ensure the final payload - // includes the final text even when deltas already populated assistantTexts. - if (includeReasoning && text && !params.onBlockReply) { - if (assistantTexts.length > assistantTextBaseline) { - assistantTexts.splice( - assistantTextBaseline, - assistantTexts.length - assistantTextBaseline, - text, - ); - } else { - const last = assistantTexts.at(-1); - if (!last || last !== text) assistantTexts.push(text); - } - suppressBlockChunks = true; - } else if (!addedDuringMessage && !chunkerHasBuffered && text) { - // Non-streaming models (no text_delta): ensure assistantTexts gets the - // final text when the chunker has nothing buffered to drain. - const last = assistantTexts.at(-1); - if (!last || last !== text) assistantTexts.push(text); - } - assistantTextBaseline = assistantTexts.length; + finalizeAssistantTexts({ + text, + addedDuringMessage, + chunkerHasBuffered, + }); const onBlockReply = params.onBlockReply; const shouldEmitReasoning = Boolean(