fix(agents): suppress partial replies with reasoning

This commit is contained in:
Peter Steinberger
2026-01-10 16:02:57 +01:00
parent 44564df028
commit 43b530ca1c
2 changed files with 115 additions and 41 deletions

View File

@@ -402,6 +402,68 @@ describe("subscribeEmbeddedPiSession", () => {
expect(subscription.assistantTexts).toEqual(["Final answer"]);
});
it("suppresses partial replies when reasoning is enabled and block replies are disabled", () => {
let handler: ((evt: unknown) => void) | undefined;
const session: StubSession = {
subscribe: (fn) => {
handler = fn;
return () => {};
},
};
const onPartialReply = vi.fn();
const subscription = subscribeEmbeddedPiSession({
session: session as unknown as Parameters<
typeof subscribeEmbeddedPiSession
>[0]["session"],
runId: "run",
reasoningMode: "on",
onPartialReply,
});
handler?.({ type: "message_start", message: { role: "assistant" } });
handler?.({
type: "message_update",
message: { role: "assistant" },
assistantMessageEvent: {
type: "text_delta",
delta: "Draft ",
},
});
handler?.({
type: "message_update",
message: { role: "assistant" },
assistantMessageEvent: {
type: "text_delta",
delta: "reply",
},
});
expect(onPartialReply).not.toHaveBeenCalled();
const assistantMessage = {
role: "assistant",
content: [
{ type: "thinking", thinking: "Because it helps" },
{ type: "text", text: "Final answer" },
],
} as AssistantMessage;
handler?.({ type: "message_end", message: assistantMessage });
handler?.({
type: "message_update",
message: { role: "assistant" },
assistantMessageEvent: {
type: "text_end",
content: "Draft reply",
},
});
expect(onPartialReply).not.toHaveBeenCalled();
expect(subscription.assistantTexts).toEqual(["Final answer"]);
});
it("emits block replies on text_end and does not duplicate on message_end", () => {
let handler: ((evt: unknown) => void) | undefined;
const session: StubSession = {

View File

@@ -223,6 +223,7 @@ export function subscribeEmbeddedPiSession(params: {
const blockReplyBreak = params.blockReplyBreak ?? "text_end";
const reasoningMode = params.reasoningMode ?? "off";
const includeReasoning = reasoningMode === "on";
const shouldEmitPartialReplies = !(includeReasoning && !params.onBlockReply);
const streamReasoning =
reasoningMode === "stream" &&
typeof params.onReasoningStream === "function";
@@ -241,6 +242,50 @@ export function subscribeEmbeddedPiSession(params: {
let compactionRetryPromise: Promise<void> | null = null;
let lastReasoningSent: string | undefined;
const resetAssistantMessageState = (nextAssistantTextBaseline: number) => {
deltaBuffer = "";
blockBuffer = "";
blockChunker?.reset();
blockThinkingActive = false;
lastStreamedAssistant = undefined;
lastBlockReplyText = undefined;
lastStreamedReasoning = undefined;
lastReasoningSent = undefined;
suppressBlockChunks = false;
assistantTextBaseline = nextAssistantTextBaseline;
};
const finalizeAssistantTexts = (args: {
text: string;
addedDuringMessage: boolean;
chunkerHasBuffered: boolean;
}) => {
const { text, addedDuringMessage, chunkerHasBuffered } = args;
// If we're not streaming block replies, ensure the final payload includes
// the final text even when interim streaming was enabled.
if (includeReasoning && text && !params.onBlockReply) {
if (assistantTexts.length > assistantTextBaseline) {
assistantTexts.splice(
assistantTextBaseline,
assistantTexts.length - assistantTextBaseline,
text,
);
} else {
const last = assistantTexts.at(-1);
if (!last || last !== text) assistantTexts.push(text);
}
suppressBlockChunks = true;
} else if (!addedDuringMessage && !chunkerHasBuffered && text) {
// Non-streaming models (no text_delta): ensure assistantTexts gets the final
// text when the chunker has nothing buffered to drain.
const last = assistantTexts.at(-1);
if (!last || last !== text) assistantTexts.push(text);
}
assistantTextBaseline = assistantTexts.length;
};
// ── Messaging tool duplicate detection ──────────────────────────────────────
// Track texts sent via messaging tools to suppress duplicate block replies.
// Only committed (successful) texts are checked - pending texts are tracked
@@ -426,15 +471,7 @@ export function subscribeEmbeddedPiSession(params: {
messagingToolSentTargets.length = 0;
pendingMessagingTexts.clear();
pendingMessagingTargets.clear();
deltaBuffer = "";
blockBuffer = "";
blockChunker?.reset();
blockThinkingActive = false;
lastStreamedAssistant = undefined;
lastStreamedReasoning = undefined;
lastBlockReplyText = undefined;
suppressBlockChunks = false;
assistantTextBaseline = 0;
resetAssistantMessageState(0);
};
const unsubscribe = params.session.subscribe(
@@ -447,16 +484,7 @@ export function subscribeEmbeddedPiSession(params: {
// Start-of-message is a safer reset point than message_end: some providers
// may deliver late text_end updates after message_end, which would
// otherwise re-trigger block replies.
deltaBuffer = "";
blockBuffer = "";
blockChunker?.reset();
blockThinkingActive = false;
lastStreamedAssistant = undefined;
lastBlockReplyText = undefined;
lastStreamedReasoning = undefined;
lastReasoningSent = undefined;
suppressBlockChunks = false;
assistantTextBaseline = assistantTexts.length;
resetAssistantMessageState(assistantTexts.length);
}
}
@@ -717,7 +745,7 @@ export function subscribeEmbeddedPiSession(params: {
mediaUrls: mediaUrls?.length ? mediaUrls : undefined,
},
});
if (params.onPartialReply) {
if (params.onPartialReply && shouldEmitPartialReplies) {
void params.onPartialReply({
text: cleanedText,
mediaUrls: mediaUrls?.length ? mediaUrls : undefined,
@@ -780,27 +808,11 @@ export function subscribeEmbeddedPiSession(params: {
const addedDuringMessage =
assistantTexts.length > assistantTextBaseline;
const chunkerHasBuffered = blockChunker?.hasBuffered() ?? false;
// If we're not streaming block replies, ensure the final payload
// includes the final text even when deltas already populated assistantTexts.
if (includeReasoning && text && !params.onBlockReply) {
if (assistantTexts.length > assistantTextBaseline) {
assistantTexts.splice(
assistantTextBaseline,
assistantTexts.length - assistantTextBaseline,
text,
);
} else {
const last = assistantTexts.at(-1);
if (!last || last !== text) assistantTexts.push(text);
}
suppressBlockChunks = true;
} else if (!addedDuringMessage && !chunkerHasBuffered && text) {
// Non-streaming models (no text_delta): ensure assistantTexts gets the
// final text when the chunker has nothing buffered to drain.
const last = assistantTexts.at(-1);
if (!last || last !== text) assistantTexts.push(text);
}
assistantTextBaseline = assistantTexts.length;
finalizeAssistantTexts({
text,
addedDuringMessage,
chunkerHasBuffered,
});
const onBlockReply = params.onBlockReply;
const shouldEmitReasoning = Boolean(