fix: prevent duplicate assistant texts from whitespace differences

- Add per-message dedup tracking in subscribeEmbeddedPiSession
- Compare both trimmed and normalized text to catch near-duplicates
- Reset dedup state on each new assistant message
- Add test for trailing whitespace edge case

Fixes duplicate Slack message delivery when the same text appears
with minor whitespace differences (e.g., trailing newline).
This commit is contained in:
SocialNerd42069
2026-01-20 23:40:38 -06:00
committed by Peter Steinberger
parent 5b8007784b
commit e3a44b10bc
3 changed files with 70 additions and 4 deletions

View File

@@ -39,6 +39,10 @@ export type EmbeddedPiSubscribeState = {
lastStreamedAssistant?: string;
lastStreamedReasoning?: string;
lastBlockReplyText?: string;
assistantMessageIndex: number;
lastAssistantTextMessageIndex: number;
lastAssistantTextNormalized?: string;
lastAssistantTextTrimmed?: string;
assistantTextBaseline: number;
suppressBlockChunks: boolean;
lastReasoningSent?: string;

View File

@@ -86,6 +86,35 @@ describe("subscribeEmbeddedPiSession", () => {
expect(subscription.assistantTexts).toEqual(["Hello world"]);
});
it("does not duplicate assistantTexts when message_end repeats with trailing whitespace changes", () => {
let handler: SessionEventHandler | undefined;
const session: StubSession = {
subscribe: (fn) => {
handler = fn;
return () => {};
},
};
const subscription = subscribeEmbeddedPiSession({
session: session as unknown as Parameters<typeof subscribeEmbeddedPiSession>[0]["session"],
runId: "run",
});
const assistantMessageWithNewline = {
role: "assistant",
content: [{ type: "text", text: "Hello world\n" }],
} as AssistantMessage;
const assistantMessageTrimmed = {
role: "assistant",
content: [{ type: "text", text: "Hello world" }],
} as AssistantMessage;
handler?.({ type: "message_end", message: assistantMessageWithNewline });
handler?.({ type: "message_end", message: assistantMessageTrimmed });
expect(subscription.assistantTexts).toEqual(["Hello world\n"]);
});
it("does not duplicate assistantTexts when message_end repeats with reasoning blocks", () => {
let handler: SessionEventHandler | undefined;
const session: StubSession = {

View File

@@ -48,6 +48,10 @@ export function subscribeEmbeddedPiSession(params: SubscribeEmbeddedPiSessionPar
lastStreamedAssistant: undefined,
lastStreamedReasoning: undefined,
lastBlockReplyText: undefined,
assistantMessageIndex: 0,
lastAssistantTextMessageIndex: -1,
lastAssistantTextNormalized: undefined,
lastAssistantTextTrimmed: undefined,
assistantTextBaseline: 0,
suppressBlockChunks: false, // Avoid late chunk inserts after final text merge.
lastReasoningSent: undefined,
@@ -84,9 +88,36 @@ export function subscribeEmbeddedPiSession(params: SubscribeEmbeddedPiSessionPar
state.lastStreamedReasoning = undefined;
state.lastReasoningSent = undefined;
state.suppressBlockChunks = false;
state.assistantMessageIndex += 1;
state.lastAssistantTextMessageIndex = -1;
state.lastAssistantTextNormalized = undefined;
state.lastAssistantTextTrimmed = undefined;
state.assistantTextBaseline = nextAssistantTextBaseline;
};
const rememberAssistantText = (text: string) => {
state.lastAssistantTextMessageIndex = state.assistantMessageIndex;
state.lastAssistantTextTrimmed = text.trimEnd();
const normalized = normalizeTextForComparison(text);
state.lastAssistantTextNormalized = normalized.length > 0 ? normalized : undefined;
};
const shouldSkipAssistantText = (text: string) => {
if (state.lastAssistantTextMessageIndex !== state.assistantMessageIndex) return false;
const trimmed = text.trimEnd();
if (trimmed && trimmed === state.lastAssistantTextTrimmed) return true;
const normalized = normalizeTextForComparison(text);
if (normalized.length > 0 && normalized === state.lastAssistantTextNormalized) return true;
return false;
};
const pushAssistantText = (text: string) => {
if (!text) return;
if (shouldSkipAssistantText(text)) return;
assistantTexts.push(text);
rememberAssistantText(text);
};
const finalizeAssistantTexts = (args: {
text: string;
addedDuringMessage: boolean;
@@ -103,16 +134,15 @@ export function subscribeEmbeddedPiSession(params: SubscribeEmbeddedPiSessionPar
assistantTexts.length - state.assistantTextBaseline,
text,
);
rememberAssistantText(text);
} else {
const last = assistantTexts.at(-1);
if (!last || last !== text) assistantTexts.push(text);
pushAssistantText(text);
}
state.suppressBlockChunks = true;
} else if (!addedDuringMessage && !chunkerHasBuffered && text) {
// Non-streaming models (no text_delta): ensure assistantTexts gets the final
// text when the chunker has nothing buffered to drain.
const last = assistantTexts.at(-1);
if (!last || last !== text) assistantTexts.push(text);
pushAssistantText(text);
}
state.assistantTextBaseline = assistantTexts.length;
@@ -338,8 +368,11 @@ export function subscribeEmbeddedPiSession(params: SubscribeEmbeddedPiSessionPar
return;
}
if (shouldSkipAssistantText(chunk)) return;
state.lastBlockReplyText = chunk;
assistantTexts.push(chunk);
rememberAssistantText(chunk);
if (!params.onBlockReply) return;
const splitResult = parseReplyDirectives(chunk);
const {