From 1fa7a587d6332572e5811bd93027ef81dc2bde45 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Mon, 12 Jan 2026 02:54:57 +0000 Subject: [PATCH] fix: flush block reply buffers on tool boundaries (#750) (thanks @sebslight) --- CHANGELOG.md | 1 + src/agents/pi-embedded-subscribe.test.ts | 54 ++++++++++++++++++++++++ src/agents/pi-embedded-subscribe.ts | 16 ++++++- 3 files changed, 70 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 5930eac9c..f1a6b23ef 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -34,6 +34,7 @@ - Plugins: treat `plugins.load.paths` directory entries as package roots when they contain `package.json` + `clawdbot.extensions`. - Config: expand `~` in `CLAWDBOT_CONFIG_PATH` and common path-like config fields (including `plugins.load.paths`). - Auto-reply: align `/think` default display with model reasoning defaults. (#751) — thanks @gabriel-trigo. +- Auto-reply: flush block reply buffers on tool boundaries. (#750) — thanks @sebslight. - Docker: tolerate unset optional env vars in docker-setup.sh under strict mode. (#725) — thanks @petradonka. - CLI/Update: preserve base environment when passing overrides to update subprocesses. (#713) — thanks @danielz1z. - Agents: treat message tool errors as failures so fallback replies still send; require `to` + `message` for `action=send`. (#717) — thanks @theglove44. diff --git a/src/agents/pi-embedded-subscribe.test.ts b/src/agents/pi-embedded-subscribe.test.ts index 974faea4d..2b8afa6fb 100644 --- a/src/agents/pi-embedded-subscribe.test.ts +++ b/src/agents/pi-embedded-subscribe.test.ts @@ -1815,6 +1815,60 @@ describe("subscribeEmbeddedPiSession", () => { expect(onBlockReplyFlush).toHaveBeenCalledTimes(2); }); + it("flushes buffered block chunks before tool execution", () => { + let handler: SessionEventHandler | undefined; + const session: StubSession = { + subscribe: (fn) => { + handler = fn; + return () => {}; + }, + }; + + const onBlockReply = vi.fn(); + const onBlockReplyFlush = vi.fn(); + + subscribeEmbeddedPiSession({ + session: session as unknown as Parameters< + typeof subscribeEmbeddedPiSession + >[0]["session"], + runId: "run-flush-buffer", + onBlockReply, + onBlockReplyFlush, + blockReplyBreak: "text_end", + blockReplyChunking: { minChars: 50, maxChars: 200 }, + }); + + handler?.({ + type: "message_start", + message: { role: "assistant" }, + }); + + handler?.({ + type: "message_update", + message: { role: "assistant" }, + assistantMessageEvent: { + type: "text_delta", + delta: "Short chunk.", + }, + }); + + expect(onBlockReply).not.toHaveBeenCalled(); + + handler?.({ + type: "tool_execution_start", + toolName: "bash", + toolCallId: "tool-flush-buffer-1", + args: { command: "echo flush" }, + }); + + expect(onBlockReply).toHaveBeenCalledTimes(1); + expect(onBlockReply.mock.calls[0]?.[0]?.text).toBe("Short chunk."); + expect(onBlockReplyFlush).toHaveBeenCalledTimes(1); + expect(onBlockReply.mock.invocationCallOrder[0]).toBeLessThan( + onBlockReplyFlush.mock.invocationCallOrder[0], + ); + }); + it("does not call onBlockReplyFlush when callback is not provided", () => { let handler: SessionEventHandler | undefined; const session: StubSession = { diff --git a/src/agents/pi-embedded-subscribe.ts b/src/agents/pi-embedded-subscribe.ts index f69209dd0..9b9b04f68 100644 --- a/src/agents/pi-embedded-subscribe.ts +++ b/src/agents/pi-embedded-subscribe.ts @@ -446,6 +446,19 @@ export function subscribeEmbeddedPiSession(params: { }); }; + const flushBlockReplyBuffer = () => { + if (!params.onBlockReply) return; + if (blockChunker?.hasBuffered()) { + blockChunker.drain({ force: true, emit: emitBlockChunk }); + blockChunker.reset(); + return; + } + if (blockBuffer.length > 0) { + emitBlockChunk(blockBuffer); + blockBuffer = ""; + } + }; + const emitReasoningStream = (text: string) => { if (!streamReasoning || !params.onReasoningStream) return; const formatted = formatReasoningMessage(text); @@ -485,7 +498,8 @@ export function subscribeEmbeddedPiSession(params: { } if (evt.type === "tool_execution_start") { - // Flush pending block replies to preserve message boundaries before tool execution + // Flush pending block replies to preserve message boundaries before tool execution. + flushBlockReplyBuffer(); if (params.onBlockReplyFlush) { void params.onBlockReplyFlush(); }