Merge pull request #750 from sebslight/fix/block-streaming-tool-boundary-flush
fix: flush block reply coalescer on tool boundaries
This commit is contained in:
@@ -34,6 +34,7 @@
|
||||
- Plugins: treat `plugins.load.paths` directory entries as package roots when they contain `package.json` + `clawdbot.extensions`.
|
||||
- Config: expand `~` in `CLAWDBOT_CONFIG_PATH` and common path-like config fields (including `plugins.load.paths`).
|
||||
- Auto-reply: align `/think` default display with model reasoning defaults. (#751) — thanks @gabriel-trigo.
|
||||
- Auto-reply: flush block reply buffers on tool boundaries. (#750) — thanks @sebslight.
|
||||
- Docker: tolerate unset optional env vars in docker-setup.sh under strict mode. (#725) — thanks @petradonka.
|
||||
- CLI/Update: preserve base environment when passing overrides to update subprocesses. (#713) — thanks @danielz1z.
|
||||
- Agents: treat message tool errors as failures so fallback replies still send; require `to` + `message` for `action=send`. (#717) — thanks @theglove44.
|
||||
|
||||
@@ -1313,6 +1313,8 @@ export async function runEmbeddedPiAgent(params: {
|
||||
mediaUrls?: string[];
|
||||
audioAsVoice?: boolean;
|
||||
}) => void | Promise<void>;
|
||||
/** Flush pending block replies (e.g., before tool execution to preserve message boundaries). */
|
||||
onBlockReplyFlush?: () => void | Promise<void>;
|
||||
blockReplyBreak?: "text_end" | "message_end";
|
||||
blockReplyChunking?: BlockReplyChunking;
|
||||
onReasoningStream?: (payload: {
|
||||
@@ -1669,6 +1671,7 @@ export async function runEmbeddedPiAgent(params: {
|
||||
onToolResult: params.onToolResult,
|
||||
onReasoningStream: params.onReasoningStream,
|
||||
onBlockReply: params.onBlockReply,
|
||||
onBlockReplyFlush: params.onBlockReplyFlush,
|
||||
blockReplyBreak: params.blockReplyBreak,
|
||||
blockReplyChunking: params.blockReplyChunking,
|
||||
onPartialReply: params.onPartialReply,
|
||||
|
||||
@@ -1754,4 +1754,150 @@ describe("subscribeEmbeddedPiSession", () => {
|
||||
|
||||
expect(onToolResult).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
|
||||
it("calls onBlockReplyFlush before tool_execution_start to preserve message boundaries", () => {
|
||||
let handler: SessionEventHandler | undefined;
|
||||
const session: StubSession = {
|
||||
subscribe: (fn) => {
|
||||
handler = fn;
|
||||
return () => {};
|
||||
},
|
||||
};
|
||||
|
||||
const onBlockReplyFlush = vi.fn();
|
||||
const onBlockReply = vi.fn();
|
||||
|
||||
subscribeEmbeddedPiSession({
|
||||
session: session as unknown as Parameters<
|
||||
typeof subscribeEmbeddedPiSession
|
||||
>[0]["session"],
|
||||
runId: "run-flush-test",
|
||||
onBlockReply,
|
||||
onBlockReplyFlush,
|
||||
blockReplyBreak: "text_end",
|
||||
});
|
||||
|
||||
// Simulate text arriving before tool
|
||||
handler?.({
|
||||
type: "message_start",
|
||||
message: { role: "assistant" },
|
||||
});
|
||||
|
||||
handler?.({
|
||||
type: "message_update",
|
||||
message: { role: "assistant" },
|
||||
assistantMessageEvent: {
|
||||
type: "text_delta",
|
||||
delta: "First message before tool.",
|
||||
},
|
||||
});
|
||||
|
||||
expect(onBlockReplyFlush).not.toHaveBeenCalled();
|
||||
|
||||
// Tool execution starts - should trigger flush
|
||||
handler?.({
|
||||
type: "tool_execution_start",
|
||||
toolName: "bash",
|
||||
toolCallId: "tool-flush-1",
|
||||
args: { command: "echo hello" },
|
||||
});
|
||||
|
||||
expect(onBlockReplyFlush).toHaveBeenCalledTimes(1);
|
||||
|
||||
// Another tool - should flush again
|
||||
handler?.({
|
||||
type: "tool_execution_start",
|
||||
toolName: "read",
|
||||
toolCallId: "tool-flush-2",
|
||||
args: { path: "/tmp/test.txt" },
|
||||
});
|
||||
|
||||
expect(onBlockReplyFlush).toHaveBeenCalledTimes(2);
|
||||
});
|
||||
|
||||
it("flushes buffered block chunks before tool execution", () => {
|
||||
let handler: SessionEventHandler | undefined;
|
||||
const session: StubSession = {
|
||||
subscribe: (fn) => {
|
||||
handler = fn;
|
||||
return () => {};
|
||||
},
|
||||
};
|
||||
|
||||
const onBlockReply = vi.fn();
|
||||
const onBlockReplyFlush = vi.fn();
|
||||
|
||||
subscribeEmbeddedPiSession({
|
||||
session: session as unknown as Parameters<
|
||||
typeof subscribeEmbeddedPiSession
|
||||
>[0]["session"],
|
||||
runId: "run-flush-buffer",
|
||||
onBlockReply,
|
||||
onBlockReplyFlush,
|
||||
blockReplyBreak: "text_end",
|
||||
blockReplyChunking: { minChars: 50, maxChars: 200 },
|
||||
});
|
||||
|
||||
handler?.({
|
||||
type: "message_start",
|
||||
message: { role: "assistant" },
|
||||
});
|
||||
|
||||
handler?.({
|
||||
type: "message_update",
|
||||
message: { role: "assistant" },
|
||||
assistantMessageEvent: {
|
||||
type: "text_delta",
|
||||
delta: "Short chunk.",
|
||||
},
|
||||
});
|
||||
|
||||
expect(onBlockReply).not.toHaveBeenCalled();
|
||||
|
||||
handler?.({
|
||||
type: "tool_execution_start",
|
||||
toolName: "bash",
|
||||
toolCallId: "tool-flush-buffer-1",
|
||||
args: { command: "echo flush" },
|
||||
});
|
||||
|
||||
expect(onBlockReply).toHaveBeenCalledTimes(1);
|
||||
expect(onBlockReply.mock.calls[0]?.[0]?.text).toBe("Short chunk.");
|
||||
expect(onBlockReplyFlush).toHaveBeenCalledTimes(1);
|
||||
expect(onBlockReply.mock.invocationCallOrder[0]).toBeLessThan(
|
||||
onBlockReplyFlush.mock.invocationCallOrder[0],
|
||||
);
|
||||
});
|
||||
|
||||
it("does not call onBlockReplyFlush when callback is not provided", () => {
|
||||
let handler: SessionEventHandler | undefined;
|
||||
const session: StubSession = {
|
||||
subscribe: (fn) => {
|
||||
handler = fn;
|
||||
return () => {};
|
||||
},
|
||||
};
|
||||
|
||||
const onBlockReply = vi.fn();
|
||||
|
||||
// No onBlockReplyFlush provided
|
||||
subscribeEmbeddedPiSession({
|
||||
session: session as unknown as Parameters<
|
||||
typeof subscribeEmbeddedPiSession
|
||||
>[0]["session"],
|
||||
runId: "run-no-flush",
|
||||
onBlockReply,
|
||||
blockReplyBreak: "text_end",
|
||||
});
|
||||
|
||||
// This should not throw even without onBlockReplyFlush
|
||||
expect(() => {
|
||||
handler?.({
|
||||
type: "tool_execution_start",
|
||||
toolName: "bash",
|
||||
toolCallId: "tool-no-flush",
|
||||
args: { command: "echo test" },
|
||||
});
|
||||
}).not.toThrow();
|
||||
});
|
||||
});
|
||||
|
||||
@@ -198,6 +198,8 @@ export function subscribeEmbeddedPiSession(params: {
|
||||
mediaUrls?: string[];
|
||||
audioAsVoice?: boolean;
|
||||
}) => void | Promise<void>;
|
||||
/** Flush pending block replies (e.g., before tool execution to preserve message boundaries). */
|
||||
onBlockReplyFlush?: () => void | Promise<void>;
|
||||
blockReplyBreak?: "text_end" | "message_end";
|
||||
blockReplyChunking?: BlockReplyChunking;
|
||||
onPartialReply?: (payload: {
|
||||
@@ -444,6 +446,19 @@ export function subscribeEmbeddedPiSession(params: {
|
||||
});
|
||||
};
|
||||
|
||||
const flushBlockReplyBuffer = () => {
|
||||
if (!params.onBlockReply) return;
|
||||
if (blockChunker?.hasBuffered()) {
|
||||
blockChunker.drain({ force: true, emit: emitBlockChunk });
|
||||
blockChunker.reset();
|
||||
return;
|
||||
}
|
||||
if (blockBuffer.length > 0) {
|
||||
emitBlockChunk(blockBuffer);
|
||||
blockBuffer = "";
|
||||
}
|
||||
};
|
||||
|
||||
const emitReasoningStream = (text: string) => {
|
||||
if (!streamReasoning || !params.onReasoningStream) return;
|
||||
const formatted = formatReasoningMessage(text);
|
||||
@@ -483,6 +498,12 @@ export function subscribeEmbeddedPiSession(params: {
|
||||
}
|
||||
|
||||
if (evt.type === "tool_execution_start") {
|
||||
// Flush pending block replies to preserve message boundaries before tool execution.
|
||||
flushBlockReplyBuffer();
|
||||
if (params.onBlockReplyFlush) {
|
||||
void params.onBlockReplyFlush();
|
||||
}
|
||||
|
||||
const toolName = String(
|
||||
(evt as AgentEvent & { toolName: string }).toolName,
|
||||
);
|
||||
|
||||
@@ -643,6 +643,12 @@ export async function runReplyAgent(params: {
|
||||
blockReplyPipeline?.enqueue(blockPayload);
|
||||
}
|
||||
: undefined,
|
||||
onBlockReplyFlush:
|
||||
blockStreamingEnabled && blockReplyPipeline
|
||||
? async () => {
|
||||
await blockReplyPipeline.flush({ force: true });
|
||||
}
|
||||
: undefined,
|
||||
shouldEmitToolResult,
|
||||
onToolResult: opts?.onToolResult
|
||||
? (payload) => {
|
||||
|
||||
Reference in New Issue
Block a user