fix: flush block reply buffers on tool boundaries (#750) (thanks @sebslight)
This commit is contained in:
@@ -34,6 +34,7 @@
|
|||||||
- Plugins: treat `plugins.load.paths` directory entries as package roots when they contain `package.json` + `clawdbot.extensions`.
|
- Plugins: treat `plugins.load.paths` directory entries as package roots when they contain `package.json` + `clawdbot.extensions`.
|
||||||
- Config: expand `~` in `CLAWDBOT_CONFIG_PATH` and common path-like config fields (including `plugins.load.paths`).
|
- Config: expand `~` in `CLAWDBOT_CONFIG_PATH` and common path-like config fields (including `plugins.load.paths`).
|
||||||
- Auto-reply: align `/think` default display with model reasoning defaults. (#751) — thanks @gabriel-trigo.
|
- Auto-reply: align `/think` default display with model reasoning defaults. (#751) — thanks @gabriel-trigo.
|
||||||
|
- Auto-reply: flush block reply buffers on tool boundaries. (#750) — thanks @sebslight.
|
||||||
- Docker: tolerate unset optional env vars in docker-setup.sh under strict mode. (#725) — thanks @petradonka.
|
- Docker: tolerate unset optional env vars in docker-setup.sh under strict mode. (#725) — thanks @petradonka.
|
||||||
- CLI/Update: preserve base environment when passing overrides to update subprocesses. (#713) — thanks @danielz1z.
|
- CLI/Update: preserve base environment when passing overrides to update subprocesses. (#713) — thanks @danielz1z.
|
||||||
- Agents: treat message tool errors as failures so fallback replies still send; require `to` + `message` for `action=send`. (#717) — thanks @theglove44.
|
- Agents: treat message tool errors as failures so fallback replies still send; require `to` + `message` for `action=send`. (#717) — thanks @theglove44.
|
||||||
|
|||||||
@@ -1815,6 +1815,60 @@ describe("subscribeEmbeddedPiSession", () => {
|
|||||||
expect(onBlockReplyFlush).toHaveBeenCalledTimes(2);
|
expect(onBlockReplyFlush).toHaveBeenCalledTimes(2);
|
||||||
});
|
});
|
||||||
|
|
||||||
|
it("flushes buffered block chunks before tool execution", () => {
|
||||||
|
let handler: SessionEventHandler | undefined;
|
||||||
|
const session: StubSession = {
|
||||||
|
subscribe: (fn) => {
|
||||||
|
handler = fn;
|
||||||
|
return () => {};
|
||||||
|
},
|
||||||
|
};
|
||||||
|
|
||||||
|
const onBlockReply = vi.fn();
|
||||||
|
const onBlockReplyFlush = vi.fn();
|
||||||
|
|
||||||
|
subscribeEmbeddedPiSession({
|
||||||
|
session: session as unknown as Parameters<
|
||||||
|
typeof subscribeEmbeddedPiSession
|
||||||
|
>[0]["session"],
|
||||||
|
runId: "run-flush-buffer",
|
||||||
|
onBlockReply,
|
||||||
|
onBlockReplyFlush,
|
||||||
|
blockReplyBreak: "text_end",
|
||||||
|
blockReplyChunking: { minChars: 50, maxChars: 200 },
|
||||||
|
});
|
||||||
|
|
||||||
|
handler?.({
|
||||||
|
type: "message_start",
|
||||||
|
message: { role: "assistant" },
|
||||||
|
});
|
||||||
|
|
||||||
|
handler?.({
|
||||||
|
type: "message_update",
|
||||||
|
message: { role: "assistant" },
|
||||||
|
assistantMessageEvent: {
|
||||||
|
type: "text_delta",
|
||||||
|
delta: "Short chunk.",
|
||||||
|
},
|
||||||
|
});
|
||||||
|
|
||||||
|
expect(onBlockReply).not.toHaveBeenCalled();
|
||||||
|
|
||||||
|
handler?.({
|
||||||
|
type: "tool_execution_start",
|
||||||
|
toolName: "bash",
|
||||||
|
toolCallId: "tool-flush-buffer-1",
|
||||||
|
args: { command: "echo flush" },
|
||||||
|
});
|
||||||
|
|
||||||
|
expect(onBlockReply).toHaveBeenCalledTimes(1);
|
||||||
|
expect(onBlockReply.mock.calls[0]?.[0]?.text).toBe("Short chunk.");
|
||||||
|
expect(onBlockReplyFlush).toHaveBeenCalledTimes(1);
|
||||||
|
expect(onBlockReply.mock.invocationCallOrder[0]).toBeLessThan(
|
||||||
|
onBlockReplyFlush.mock.invocationCallOrder[0],
|
||||||
|
);
|
||||||
|
});
|
||||||
|
|
||||||
it("does not call onBlockReplyFlush when callback is not provided", () => {
|
it("does not call onBlockReplyFlush when callback is not provided", () => {
|
||||||
let handler: SessionEventHandler | undefined;
|
let handler: SessionEventHandler | undefined;
|
||||||
const session: StubSession = {
|
const session: StubSession = {
|
||||||
|
|||||||
@@ -446,6 +446,19 @@ export function subscribeEmbeddedPiSession(params: {
|
|||||||
});
|
});
|
||||||
};
|
};
|
||||||
|
|
||||||
|
const flushBlockReplyBuffer = () => {
|
||||||
|
if (!params.onBlockReply) return;
|
||||||
|
if (blockChunker?.hasBuffered()) {
|
||||||
|
blockChunker.drain({ force: true, emit: emitBlockChunk });
|
||||||
|
blockChunker.reset();
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if (blockBuffer.length > 0) {
|
||||||
|
emitBlockChunk(blockBuffer);
|
||||||
|
blockBuffer = "";
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
const emitReasoningStream = (text: string) => {
|
const emitReasoningStream = (text: string) => {
|
||||||
if (!streamReasoning || !params.onReasoningStream) return;
|
if (!streamReasoning || !params.onReasoningStream) return;
|
||||||
const formatted = formatReasoningMessage(text);
|
const formatted = formatReasoningMessage(text);
|
||||||
@@ -485,7 +498,8 @@ export function subscribeEmbeddedPiSession(params: {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if (evt.type === "tool_execution_start") {
|
if (evt.type === "tool_execution_start") {
|
||||||
// Flush pending block replies to preserve message boundaries before tool execution
|
// Flush pending block replies to preserve message boundaries before tool execution.
|
||||||
|
flushBlockReplyBuffer();
|
||||||
if (params.onBlockReplyFlush) {
|
if (params.onBlockReplyFlush) {
|
||||||
void params.onBlockReplyFlush();
|
void params.onBlockReplyFlush();
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user