fix: stream tool summaries early and tool output

This commit is contained in:
Peter Steinberger
2026-01-03 20:37:57 +01:00
parent 03c1599544
commit a15cffb7de
15 changed files with 379 additions and 122 deletions

View File

@@ -48,6 +48,7 @@ import {
} from "./pi-embedded-subscribe.js";
import { extractAssistantText } from "./pi-embedded-utils.js";
import { createClawdisCodingTools } from "./pi-tools.js";
import { resolveSandboxContext } from "./sandbox.js";
import {
applySkillEnvOverrides,
applySkillEnvOverridesFromSnapshot,
@@ -362,7 +363,13 @@ export async function runEmbeddedPiAgent(params: {
return enqueueCommandInLane(sessionLane, () =>
enqueueGlobal(async () => {
const started = Date.now();
const resolvedWorkspace = resolveUserPath(params.workspaceDir);
const sandbox = await resolveSandboxContext({
config: params.config,
sessionKey: params.sessionKey,
workspaceDir: params.workspaceDir,
});
const workspaceDir = sandbox?.workspaceDir ?? params.workspaceDir;
const resolvedWorkspace = resolveUserPath(workspaceDir);
const prevCwd = process.cwd();
const provider =
@@ -425,6 +432,7 @@ export async function runEmbeddedPiAgent(params: {
const tools = createClawdisCodingTools({
bash: params.config?.agent?.bash,
surface: params.surface,
sandbox,
});
const machineName = await getMachineDisplayName();
const runtimeInfo = {
@@ -497,7 +505,6 @@ export async function runEmbeddedPiAgent(params: {
assistantTexts,
toolMetas,
unsubscribe,
flush: flushToolDebouncer,
waitForCompactionRetry,
} = subscribeEmbeddedPiSession({
session,
@@ -571,7 +578,6 @@ export async function runEmbeddedPiAgent(params: {
abortWarnTimer = undefined;
}
unsubscribe();
flushToolDebouncer();
if (ACTIVE_EMBEDDED_RUNS.get(params.sessionId) === queueHandle) {
ACTIVE_EMBEDDED_RUNS.delete(params.sessionId);
}

View File

@@ -630,4 +630,107 @@ describe("subscribeEmbeddedPiSession", () => {
await waitPromise;
expect(resolved).toBe(true);
});
it("emits tool summaries at tool start when verbose is on", () => {
let handler: ((evt: unknown) => void) | undefined;
const session: StubSession = {
subscribe: (fn) => {
handler = fn;
return () => {};
},
};
const onToolResult = vi.fn();
subscribeEmbeddedPiSession({
session: session as unknown as Parameters<
typeof subscribeEmbeddedPiSession
>[0]["session"],
runId: "run-tool",
verboseLevel: "on",
onToolResult,
});
handler?.({
type: "tool_execution_start",
toolName: "read",
toolCallId: "tool-1",
args: { path: "/tmp/a.txt" },
});
expect(onToolResult).toHaveBeenCalledTimes(1);
const payload = onToolResult.mock.calls[0][0];
expect(payload.text).toContain("/tmp/a.txt");
handler?.({
type: "tool_execution_end",
toolName: "read",
toolCallId: "tool-1",
isError: false,
result: "ok",
});
expect(onToolResult).toHaveBeenCalledTimes(1);
});
it("skips tool summaries when shouldEmitToolResult is false", () => {
let handler: ((evt: unknown) => void) | undefined;
const session: StubSession = {
subscribe: (fn) => {
handler = fn;
return () => {};
},
};
const onToolResult = vi.fn();
subscribeEmbeddedPiSession({
session: session as unknown as Parameters<
typeof subscribeEmbeddedPiSession
>[0]["session"],
runId: "run-tool-off",
shouldEmitToolResult: () => false,
onToolResult,
});
handler?.({
type: "tool_execution_start",
toolName: "read",
toolCallId: "tool-2",
args: { path: "/tmp/b.txt" },
});
expect(onToolResult).not.toHaveBeenCalled();
});
it("emits tool summaries when shouldEmitToolResult overrides verbose", () => {
let handler: ((evt: unknown) => void) | undefined;
const session: StubSession = {
subscribe: (fn) => {
handler = fn;
return () => {};
},
};
const onToolResult = vi.fn();
subscribeEmbeddedPiSession({
session: session as unknown as Parameters<
typeof subscribeEmbeddedPiSession
>[0]["session"],
runId: "run-tool-override",
verboseLevel: "off",
shouldEmitToolResult: () => true,
onToolResult,
});
handler?.({
type: "tool_execution_start",
toolName: "read",
toolCallId: "tool-3",
args: { path: "/tmp/c.txt" },
});
expect(onToolResult).toHaveBeenCalledTimes(1);
});
});

View File

@@ -2,10 +2,7 @@ import type { AgentEvent, AgentMessage } from "@mariozechner/pi-agent-core";
import type { AssistantMessage } from "@mariozechner/pi-ai";
import type { AgentSession } from "@mariozechner/pi-coding-agent";
import {
createToolDebouncer,
formatToolAggregate,
} from "../auto-reply/tool-meta.js";
import { formatToolAggregate } from "../auto-reply/tool-meta.js";
import { emitAgentEvent } from "../infra/agent-events.js";
import { splitMediaFromOutput } from "../media/parse.js";
import { defaultRuntime } from "../runtime.js";
@@ -113,6 +110,7 @@ export function subscribeEmbeddedPiSession(params: {
const assistantTexts: string[] = [];
const toolMetas: Array<{ toolName?: string; meta?: string }> = [];
const toolMetaById = new Map<string, string | undefined>();
const toolSummaryById = new Set<string>();
const blockReplyBreak = params.blockReplyBreak ?? "text_end";
let deltaBuffer = "";
let blockBuffer = "";
@@ -176,17 +174,25 @@ export function subscribeEmbeddedPiSession(params: {
return afterStart.slice(0, endIndex);
};
const toolDebouncer = createToolDebouncer((toolName, metas) => {
if (!params.onPartialReply) return;
const text = formatToolAggregate(toolName, metas);
const { text: cleanedText, mediaUrls } = splitMediaFromOutput(text);
void params.onPartialReply({
text: cleanedText,
mediaUrls: mediaUrls?.length ? mediaUrls : undefined,
});
});
const blockChunking = params.blockReplyChunking;
const shouldEmitToolResult = () =>
typeof params.shouldEmitToolResult === "function"
? params.shouldEmitToolResult()
: params.verboseLevel === "on";
const emitToolSummary = (toolName?: string, meta?: string) => {
if (!params.onToolResult) return;
const agg = formatToolAggregate(toolName, meta ? [meta] : undefined);
const { text: cleanedText, mediaUrls } = splitMediaFromOutput(agg);
if (!cleanedText && (!mediaUrls || mediaUrls.length === 0)) return;
try {
void params.onToolResult({
text: cleanedText,
mediaUrls: mediaUrls?.length ? mediaUrls : undefined,
});
} catch {
// ignore tool result delivery failures
}
};
const findSentenceBreak = (window: string, minChars: number): number => {
if (!window) return -1;
@@ -298,12 +304,12 @@ export function subscribeEmbeddedPiSession(params: {
assistantTexts.length = 0;
toolMetas.length = 0;
toolMetaById.clear();
toolSummaryById.clear();
deltaBuffer = "";
blockBuffer = "";
lastStreamedAssistant = undefined;
lastBlockReplyText = undefined;
assistantTextBaseline = 0;
toolDebouncer.flush();
};
const unsubscribe = params.session.subscribe(
@@ -336,6 +342,15 @@ export function subscribeEmbeddedPiSession(params: {
stream: "tool",
data: { phase: "start", name: toolName, toolCallId },
});
if (
params.onToolResult &&
shouldEmitToolResult() &&
!toolSummaryById.has(toolCallId)
) {
toolSummaryById.add(toolCallId);
emitToolSummary(toolName, meta);
}
}
if (evt.type === "tool_execution_update") {
@@ -382,7 +397,8 @@ export function subscribeEmbeddedPiSession(params: {
const sanitizedResult = sanitizeToolResult(result);
const meta = toolMetaById.get(toolCallId);
toolMetas.push({ toolName, meta });
toolDebouncer.push(toolName, meta);
toolMetaById.delete(toolCallId);
toolSummaryById.delete(toolCallId);
emitAgentEvent({
runId: params.runId,
@@ -406,25 +422,6 @@ export function subscribeEmbeddedPiSession(params: {
isError,
},
});
const emitToolResult =
typeof params.shouldEmitToolResult === "function"
? params.shouldEmitToolResult()
: params.verboseLevel === "on";
if (emitToolResult && params.onToolResult) {
const agg = formatToolAggregate(toolName, meta ? [meta] : undefined);
const { text: cleanedText, mediaUrls } = splitMediaFromOutput(agg);
if (cleanedText || (mediaUrls && mediaUrls.length > 0)) {
try {
void params.onToolResult({
text: cleanedText,
mediaUrls: mediaUrls?.length ? mediaUrls : undefined,
});
} catch {
// ignore tool result delivery failures
}
}
}
}
if (evt.type === "message_update") {
@@ -626,7 +623,6 @@ export function subscribeEmbeddedPiSession(params: {
if (evt.type === "agent_end") {
defaultRuntime.log?.(`embedded run agent end: runId=${params.runId}`);
toolDebouncer.flush();
if (pendingCompactionRetry > 0) {
resolveCompactionRetry();
} else {
@@ -640,7 +636,6 @@ export function subscribeEmbeddedPiSession(params: {
assistantTexts,
toolMetas,
unsubscribe,
flush: () => toolDebouncer.flush(),
waitForCompactionRetry: () => {
if (compactionInFlight || pendingCompactionRetry > 0) {
ensureCompactionPromise();

View File

@@ -1,7 +1,6 @@
import { beforeEach, describe, expect, it, vi } from "vitest";
import {
createToolDebouncer,
formatToolAggregate,
formatToolPrefix,
shortenMeta,
@@ -48,37 +47,3 @@ describe("tool meta formatting", () => {
expect(formatToolPrefix("x", "/Users/test/a.txt")).toBe("🧩 x: ~/a.txt");
});
});
describe("tool meta debouncer", () => {
it("flushes on timer and when tool changes", () => {
vi.useFakeTimers();
try {
const calls: Array<{ tool: string | undefined; metas: string[] }> = [];
const d = createToolDebouncer((tool, metas) => {
calls.push({ tool, metas });
}, 50);
d.push("a", "/tmp/1");
d.push("a", "/tmp/2");
expect(calls).toHaveLength(0);
vi.advanceTimersByTime(60);
expect(calls).toHaveLength(1);
expect(calls[0]).toMatchObject({
tool: "a",
metas: ["/tmp/1", "/tmp/2"],
});
d.push("a", "x");
d.push("b", "y"); // tool change flushes immediately
expect(calls).toHaveLength(2);
expect(calls[1]).toMatchObject({ tool: "a", metas: ["x"] });
vi.advanceTimersByTime(60);
expect(calls).toHaveLength(3);
expect(calls[2]).toMatchObject({ tool: "b", metas: ["y"] });
} finally {
vi.useRealTimers();
}
});
});

View File

@@ -4,9 +4,6 @@ import {
} from "../agents/tool-display.js";
import { shortenHomeInString, shortenHomePath } from "../utils.js";
export const TOOL_RESULT_DEBOUNCE_MS = 500;
export const TOOL_RESULT_FLUSH_COUNT = 5;
export function shortenPath(p: string): string {
return shortenHomePath(p);
}
@@ -77,33 +74,3 @@ function isPathLike(value: string): boolean {
if (value.includes("&&") || value.includes("||")) return false;
return /^~?(\/[^\s]+)+$/.test(value);
}
export function createToolDebouncer(
onFlush: (toolName: string | undefined, metas: string[]) => void,
windowMs = TOOL_RESULT_DEBOUNCE_MS,
) {
let pendingTool: string | undefined;
let pendingMetas: string[] = [];
let timer: NodeJS.Timeout | null = null;
const flush = () => {
if (!pendingTool && pendingMetas.length === 0) return;
onFlush(pendingTool, pendingMetas);
pendingTool = undefined;
pendingMetas = [];
if (timer) {
clearTimeout(timer);
timer = null;
}
};
const push = (toolName?: string, meta?: string) => {
if (pendingTool && toolName && pendingTool !== toolName) flush();
if (!pendingTool) pendingTool = toolName;
if (meta) pendingMetas.push(meta);
if (timer) clearTimeout(timer);
timer = setTimeout(flush, windowMs);
};
return { push, flush };
}

View File

@@ -186,12 +186,18 @@ export function createAgentEventHandler({
};
return (evt: AgentEventPayload) => {
const chatLink = chatRunState.registry.peek(evt.runId);
const sessionKey =
chatLink?.sessionKey ?? resolveSessionKeyForRun(evt.runId);
// Include sessionKey so Control UI can filter tool streams per session.
const agentPayload = sessionKey ? { ...evt, sessionKey } : evt;
const last = agentRunSeq.get(evt.runId) ?? 0;
if (evt.seq !== last + 1) {
broadcast("agent", {
runId: evt.runId,
stream: "error",
ts: Date.now(),
sessionKey,
data: {
reason: "seq gap",
expected: last + 1,
@@ -200,18 +206,15 @@ export function createAgentEventHandler({
});
}
agentRunSeq.set(evt.runId, evt.seq);
broadcast("agent", evt);
broadcast("agent", agentPayload);
const chatLink = chatRunState.registry.peek(evt.runId);
const sessionKey =
chatLink?.sessionKey ?? resolveSessionKeyForRun(evt.runId);
const jobState =
evt.stream === "job" && typeof evt.data?.state === "string"
? (evt.data.state as "done" | "error" | string)
: null;
if (sessionKey) {
bridgeSendToSession(sessionKey, "agent", evt);
bridgeSendToSession(sessionKey, "agent", agentPayload);
if (evt.stream === "assistant" && typeof evt.data?.text === "string") {
const clientRunId = chatLink?.clientRunId ?? evt.runId;
emitChatDelta(sessionKey, clientRunId, evt.seq, evt.data.text);

View File

@@ -478,4 +478,43 @@ describe("gateway server agent", () => {
ws.close();
await server.close();
});
test("agent events include sessionKey in agent payloads", async () => {
const { server, ws } = await startServerWithClient();
await connectOk(ws, {
client: {
name: "webchat",
version: "1.0.0",
platform: "test",
mode: "webchat",
},
});
registerAgentRunContext("run-tool-1", { sessionKey: "main" });
const agentEvtP = onceMessage(
ws,
(o) =>
o.type === "event" &&
o.event === "agent" &&
o.payload?.runId === "run-tool-1",
8000,
);
emitAgentEvent({
runId: "run-tool-1",
stream: "tool",
data: { phase: "start", name: "read", toolCallId: "tool-1" },
});
const evt = await agentEvtP;
const payload =
evt.payload && typeof evt.payload === "object"
? (evt.payload as Record<string, unknown>)
: {};
expect(payload.sessionKey).toBe("main");
ws.close();
await server.close();
});
});

View File

@@ -11,6 +11,7 @@ export type AgentEventPayload = {
stream: AgentEventStream;
ts: number;
data: Record<string, unknown>;
sessionKey?: string;
};
export type AgentRunContext = {