From e3a44b10bc9bbf8bdff2522a9fd0f1854c46c990 Mon Sep 17 00:00:00 2001 From: SocialNerd42069 <118244303+SocialNerd42069@users.noreply.github.com> Date: Tue, 20 Jan 2026 23:40:38 -0600 Subject: [PATCH] fix: prevent duplicate assistant texts from whitespace differences - Add per-message dedup tracking in subscribeEmbeddedPiSession - Compare both trimmed and normalized text to catch near-duplicates - Reset dedup state on each new assistant message - Add test for trailing whitespace edge case Fixes duplicate Slack message delivery when the same text appears with minor whitespace differences (e.g., trailing newline). --- .../pi-embedded-subscribe.handlers.types.ts | 4 ++ ...-emit-duplicate-block-replies-text.test.ts | 29 +++++++++++++ src/agents/pi-embedded-subscribe.ts | 41 +++++++++++++++++-- 3 files changed, 70 insertions(+), 4 deletions(-) diff --git a/src/agents/pi-embedded-subscribe.handlers.types.ts b/src/agents/pi-embedded-subscribe.handlers.types.ts index 1832b2bf1..94a107961 100644 --- a/src/agents/pi-embedded-subscribe.handlers.types.ts +++ b/src/agents/pi-embedded-subscribe.handlers.types.ts @@ -39,6 +39,10 @@ export type EmbeddedPiSubscribeState = { lastStreamedAssistant?: string; lastStreamedReasoning?: string; lastBlockReplyText?: string; + assistantMessageIndex: number; + lastAssistantTextMessageIndex: number; + lastAssistantTextNormalized?: string; + lastAssistantTextTrimmed?: string; assistantTextBaseline: number; suppressBlockChunks: boolean; lastReasoningSent?: string; diff --git a/src/agents/pi-embedded-subscribe.subscribe-embedded-pi-session.does-not-emit-duplicate-block-replies-text.test.ts b/src/agents/pi-embedded-subscribe.subscribe-embedded-pi-session.does-not-emit-duplicate-block-replies-text.test.ts index 86963f08c..7d592caad 100644 --- a/src/agents/pi-embedded-subscribe.subscribe-embedded-pi-session.does-not-emit-duplicate-block-replies-text.test.ts +++ b/src/agents/pi-embedded-subscribe.subscribe-embedded-pi-session.does-not-emit-duplicate-block-replies-text.test.ts @@ -86,6 +86,35 @@ describe("subscribeEmbeddedPiSession", () => { expect(subscription.assistantTexts).toEqual(["Hello world"]); }); + it("does not duplicate assistantTexts when message_end repeats with trailing whitespace changes", () => { + let handler: SessionEventHandler | undefined; + const session: StubSession = { + subscribe: (fn) => { + handler = fn; + return () => {}; + }, + }; + + const subscription = subscribeEmbeddedPiSession({ + session: session as unknown as Parameters[0]["session"], + runId: "run", + }); + + const assistantMessageWithNewline = { + role: "assistant", + content: [{ type: "text", text: "Hello world\n" }], + } as AssistantMessage; + + const assistantMessageTrimmed = { + role: "assistant", + content: [{ type: "text", text: "Hello world" }], + } as AssistantMessage; + + handler?.({ type: "message_end", message: assistantMessageWithNewline }); + handler?.({ type: "message_end", message: assistantMessageTrimmed }); + + expect(subscription.assistantTexts).toEqual(["Hello world\n"]); + }); it("does not duplicate assistantTexts when message_end repeats with reasoning blocks", () => { let handler: SessionEventHandler | undefined; const session: StubSession = { diff --git a/src/agents/pi-embedded-subscribe.ts b/src/agents/pi-embedded-subscribe.ts index 1ed83100f..cb6d81be6 100644 --- a/src/agents/pi-embedded-subscribe.ts +++ b/src/agents/pi-embedded-subscribe.ts @@ -48,6 +48,10 @@ export function subscribeEmbeddedPiSession(params: SubscribeEmbeddedPiSessionPar lastStreamedAssistant: undefined, lastStreamedReasoning: undefined, lastBlockReplyText: undefined, + assistantMessageIndex: 0, + lastAssistantTextMessageIndex: -1, + lastAssistantTextNormalized: undefined, + lastAssistantTextTrimmed: undefined, assistantTextBaseline: 0, suppressBlockChunks: false, // Avoid late chunk inserts after final text merge. lastReasoningSent: undefined, @@ -84,9 +88,36 @@ export function subscribeEmbeddedPiSession(params: SubscribeEmbeddedPiSessionPar state.lastStreamedReasoning = undefined; state.lastReasoningSent = undefined; state.suppressBlockChunks = false; + state.assistantMessageIndex += 1; + state.lastAssistantTextMessageIndex = -1; + state.lastAssistantTextNormalized = undefined; + state.lastAssistantTextTrimmed = undefined; state.assistantTextBaseline = nextAssistantTextBaseline; }; + const rememberAssistantText = (text: string) => { + state.lastAssistantTextMessageIndex = state.assistantMessageIndex; + state.lastAssistantTextTrimmed = text.trimEnd(); + const normalized = normalizeTextForComparison(text); + state.lastAssistantTextNormalized = normalized.length > 0 ? normalized : undefined; + }; + + const shouldSkipAssistantText = (text: string) => { + if (state.lastAssistantTextMessageIndex !== state.assistantMessageIndex) return false; + const trimmed = text.trimEnd(); + if (trimmed && trimmed === state.lastAssistantTextTrimmed) return true; + const normalized = normalizeTextForComparison(text); + if (normalized.length > 0 && normalized === state.lastAssistantTextNormalized) return true; + return false; + }; + + const pushAssistantText = (text: string) => { + if (!text) return; + if (shouldSkipAssistantText(text)) return; + assistantTexts.push(text); + rememberAssistantText(text); + }; + const finalizeAssistantTexts = (args: { text: string; addedDuringMessage: boolean; @@ -103,16 +134,15 @@ export function subscribeEmbeddedPiSession(params: SubscribeEmbeddedPiSessionPar assistantTexts.length - state.assistantTextBaseline, text, ); + rememberAssistantText(text); } else { - const last = assistantTexts.at(-1); - if (!last || last !== text) assistantTexts.push(text); + pushAssistantText(text); } state.suppressBlockChunks = true; } else if (!addedDuringMessage && !chunkerHasBuffered && text) { // Non-streaming models (no text_delta): ensure assistantTexts gets the final // text when the chunker has nothing buffered to drain. - const last = assistantTexts.at(-1); - if (!last || last !== text) assistantTexts.push(text); + pushAssistantText(text); } state.assistantTextBaseline = assistantTexts.length; @@ -338,8 +368,11 @@ export function subscribeEmbeddedPiSession(params: SubscribeEmbeddedPiSessionPar return; } + if (shouldSkipAssistantText(chunk)) return; + state.lastBlockReplyText = chunk; assistantTexts.push(chunk); + rememberAssistantText(chunk); if (!params.onBlockReply) return; const splitResult = parseReplyDirectives(chunk); const {