fix: flush block reply coalescer on tool boundaries

When block streaming is enabled with verbose=off, tool blocks are hidden
but their boundary information was lost. Text segments before and after
tool execution would get coalesced into a single message because the
coalescer had no signal that a tool had executed between them.

This adds an onBlockReplyFlush callback that fires on tool_execution_start,
allowing the block reply pipeline to flush pending text before the tool
runs. This preserves natural message boundaries even when tools are hidden.

Fixes the issue where:
  text → [hidden tool] → text → rendered as one merged message

Now correctly renders as:
  text → [hidden tool] → text → two separate messages

Co-diagnosed-by: Krill (Discord assistant)
This commit is contained in:
The Admiral
2026-01-11 21:19:50 -05:00
committed by Peter Steinberger
parent d4d15c8a71
commit c64bcd047b
4 changed files with 108 additions and 0 deletions

View File

@@ -1313,6 +1313,8 @@ export async function runEmbeddedPiAgent(params: {
mediaUrls?: string[];
audioAsVoice?: boolean;
}) => void | Promise<void>;
/** Flush pending block replies (e.g., before tool execution to preserve message boundaries). */
onBlockReplyFlush?: () => void | Promise<void>;
blockReplyBreak?: "text_end" | "message_end";
blockReplyChunking?: BlockReplyChunking;
onReasoningStream?: (payload: {
@@ -1669,6 +1671,7 @@ export async function runEmbeddedPiAgent(params: {
onToolResult: params.onToolResult,
onReasoningStream: params.onReasoningStream,
onBlockReply: params.onBlockReply,
onBlockReplyFlush: params.onBlockReplyFlush,
blockReplyBreak: params.blockReplyBreak,
blockReplyChunking: params.blockReplyChunking,
onPartialReply: params.onPartialReply,

View File

@@ -1754,4 +1754,96 @@ describe("subscribeEmbeddedPiSession", () => {
expect(onToolResult).toHaveBeenCalledTimes(1);
});
it("calls onBlockReplyFlush before tool_execution_start to preserve message boundaries", () => {
let handler: SessionEventHandler | undefined;
const session: StubSession = {
subscribe: (fn) => {
handler = fn;
return () => {};
},
};
const onBlockReplyFlush = vi.fn();
const onBlockReply = vi.fn();
subscribeEmbeddedPiSession({
session: session as unknown as Parameters<
typeof subscribeEmbeddedPiSession
>[0]["session"],
runId: "run-flush-test",
onBlockReply,
onBlockReplyFlush,
blockReplyBreak: "text_end",
});
// Simulate text arriving before tool
handler?.({
type: "message_start",
message: { role: "assistant" },
});
handler?.({
type: "message_update",
message: { role: "assistant" },
assistantMessageEvent: {
type: "text_delta",
delta: "First message before tool.",
},
});
expect(onBlockReplyFlush).not.toHaveBeenCalled();
// Tool execution starts - should trigger flush
handler?.({
type: "tool_execution_start",
toolName: "bash",
toolCallId: "tool-flush-1",
args: { command: "echo hello" },
});
expect(onBlockReplyFlush).toHaveBeenCalledTimes(1);
// Another tool - should flush again
handler?.({
type: "tool_execution_start",
toolName: "read",
toolCallId: "tool-flush-2",
args: { path: "/tmp/test.txt" },
});
expect(onBlockReplyFlush).toHaveBeenCalledTimes(2);
});
it("does not call onBlockReplyFlush when callback is not provided", () => {
let handler: SessionEventHandler | undefined;
const session: StubSession = {
subscribe: (fn) => {
handler = fn;
return () => {};
},
};
const onBlockReply = vi.fn();
// No onBlockReplyFlush provided
subscribeEmbeddedPiSession({
session: session as unknown as Parameters<
typeof subscribeEmbeddedPiSession
>[0]["session"],
runId: "run-no-flush",
onBlockReply,
blockReplyBreak: "text_end",
});
// This should not throw even without onBlockReplyFlush
expect(() => {
handler?.({
type: "tool_execution_start",
toolName: "bash",
toolCallId: "tool-no-flush",
args: { command: "echo test" },
});
}).not.toThrow();
});
});

View File

@@ -198,6 +198,8 @@ export function subscribeEmbeddedPiSession(params: {
mediaUrls?: string[];
audioAsVoice?: boolean;
}) => void | Promise<void>;
/** Flush pending block replies (e.g., before tool execution to preserve message boundaries). */
onBlockReplyFlush?: () => void | Promise<void>;
blockReplyBreak?: "text_end" | "message_end";
blockReplyChunking?: BlockReplyChunking;
onPartialReply?: (payload: {
@@ -483,6 +485,11 @@ export function subscribeEmbeddedPiSession(params: {
}
if (evt.type === "tool_execution_start") {
// Flush pending block replies to preserve message boundaries before tool execution
if (params.onBlockReplyFlush) {
void params.onBlockReplyFlush();
}
const toolName = String(
(evt as AgentEvent & { toolName: string }).toolName,
);

View File

@@ -643,6 +643,12 @@ export async function runReplyAgent(params: {
blockReplyPipeline?.enqueue(blockPayload);
}
: undefined,
onBlockReplyFlush:
blockStreamingEnabled && blockReplyPipeline
? async () => {
await blockReplyPipeline.flush({ force: true });
}
: undefined,
shouldEmitToolResult,
onToolResult: opts?.onToolResult
? (payload) => {