diff --git a/src/agents/pi-embedded-runner.ts b/src/agents/pi-embedded-runner.ts index f8acdd185..83678ac4e 100644 --- a/src/agents/pi-embedded-runner.ts +++ b/src/agents/pi-embedded-runner.ts @@ -1313,6 +1313,8 @@ export async function runEmbeddedPiAgent(params: { mediaUrls?: string[]; audioAsVoice?: boolean; }) => void | Promise; + /** Flush pending block replies (e.g., before tool execution to preserve message boundaries). */ + onBlockReplyFlush?: () => void | Promise; blockReplyBreak?: "text_end" | "message_end"; blockReplyChunking?: BlockReplyChunking; onReasoningStream?: (payload: { @@ -1669,6 +1671,7 @@ export async function runEmbeddedPiAgent(params: { onToolResult: params.onToolResult, onReasoningStream: params.onReasoningStream, onBlockReply: params.onBlockReply, + onBlockReplyFlush: params.onBlockReplyFlush, blockReplyBreak: params.blockReplyBreak, blockReplyChunking: params.blockReplyChunking, onPartialReply: params.onPartialReply, diff --git a/src/agents/pi-embedded-subscribe.test.ts b/src/agents/pi-embedded-subscribe.test.ts index d07691e90..974faea4d 100644 --- a/src/agents/pi-embedded-subscribe.test.ts +++ b/src/agents/pi-embedded-subscribe.test.ts @@ -1754,4 +1754,96 @@ describe("subscribeEmbeddedPiSession", () => { expect(onToolResult).toHaveBeenCalledTimes(1); }); + + it("calls onBlockReplyFlush before tool_execution_start to preserve message boundaries", () => { + let handler: SessionEventHandler | undefined; + const session: StubSession = { + subscribe: (fn) => { + handler = fn; + return () => {}; + }, + }; + + const onBlockReplyFlush = vi.fn(); + const onBlockReply = vi.fn(); + + subscribeEmbeddedPiSession({ + session: session as unknown as Parameters< + typeof subscribeEmbeddedPiSession + >[0]["session"], + runId: "run-flush-test", + onBlockReply, + onBlockReplyFlush, + blockReplyBreak: "text_end", + }); + + // Simulate text arriving before tool + handler?.({ + type: "message_start", + message: { role: "assistant" }, + }); + + handler?.({ + type: "message_update", + message: { role: "assistant" }, + assistantMessageEvent: { + type: "text_delta", + delta: "First message before tool.", + }, + }); + + expect(onBlockReplyFlush).not.toHaveBeenCalled(); + + // Tool execution starts - should trigger flush + handler?.({ + type: "tool_execution_start", + toolName: "bash", + toolCallId: "tool-flush-1", + args: { command: "echo hello" }, + }); + + expect(onBlockReplyFlush).toHaveBeenCalledTimes(1); + + // Another tool - should flush again + handler?.({ + type: "tool_execution_start", + toolName: "read", + toolCallId: "tool-flush-2", + args: { path: "/tmp/test.txt" }, + }); + + expect(onBlockReplyFlush).toHaveBeenCalledTimes(2); + }); + + it("does not call onBlockReplyFlush when callback is not provided", () => { + let handler: SessionEventHandler | undefined; + const session: StubSession = { + subscribe: (fn) => { + handler = fn; + return () => {}; + }, + }; + + const onBlockReply = vi.fn(); + + // No onBlockReplyFlush provided + subscribeEmbeddedPiSession({ + session: session as unknown as Parameters< + typeof subscribeEmbeddedPiSession + >[0]["session"], + runId: "run-no-flush", + onBlockReply, + blockReplyBreak: "text_end", + }); + + // This should not throw even without onBlockReplyFlush + expect(() => { + handler?.({ + type: "tool_execution_start", + toolName: "bash", + toolCallId: "tool-no-flush", + args: { command: "echo test" }, + }); + }).not.toThrow(); + }); }); diff --git a/src/agents/pi-embedded-subscribe.ts b/src/agents/pi-embedded-subscribe.ts index 9d3d325e9..f69209dd0 100644 --- a/src/agents/pi-embedded-subscribe.ts +++ b/src/agents/pi-embedded-subscribe.ts @@ -198,6 +198,8 @@ export function subscribeEmbeddedPiSession(params: { mediaUrls?: string[]; audioAsVoice?: boolean; }) => void | Promise; + /** Flush pending block replies (e.g., before tool execution to preserve message boundaries). */ + onBlockReplyFlush?: () => void | Promise; blockReplyBreak?: "text_end" | "message_end"; blockReplyChunking?: BlockReplyChunking; onPartialReply?: (payload: { @@ -483,6 +485,11 @@ export function subscribeEmbeddedPiSession(params: { } if (evt.type === "tool_execution_start") { + // Flush pending block replies to preserve message boundaries before tool execution + if (params.onBlockReplyFlush) { + void params.onBlockReplyFlush(); + } + const toolName = String( (evt as AgentEvent & { toolName: string }).toolName, ); diff --git a/src/auto-reply/reply/agent-runner.ts b/src/auto-reply/reply/agent-runner.ts index 6b3229286..23f7f8986 100644 --- a/src/auto-reply/reply/agent-runner.ts +++ b/src/auto-reply/reply/agent-runner.ts @@ -643,6 +643,12 @@ export async function runReplyAgent(params: { blockReplyPipeline?.enqueue(blockPayload); } : undefined, + onBlockReplyFlush: + blockStreamingEnabled && blockReplyPipeline + ? async () => { + await blockReplyPipeline.flush({ force: true }); + } + : undefined, shouldEmitToolResult, onToolResult: opts?.onToolResult ? (payload) => {