Previously, when block streaming was disabled (the default), text generated between tool calls would only appear after all tools completed. This was because onBlockReply wasn't passed to the subscription when block streaming was off, so flushBlockReplyBuffer() before tool execution did nothing. Now onBlockReply is always passed, and when block streaming is disabled, block replies are sent directly during tool flush. Directly sent payloads are tracked to avoid duplicates in final payloads. Also fixes a race condition where tool summaries could be emitted before the typing indicator started by awaiting onAgentEvent in tool handlers.
189 lines
5.9 KiB
TypeScript
189 lines
5.9 KiB
TypeScript
import type { AgentEvent } from "@mariozechner/pi-agent-core";
|
|
|
|
import { emitAgentEvent } from "../infra/agent-events.js";
|
|
import { normalizeTextForComparison } from "./pi-embedded-helpers.js";
|
|
import { isMessagingTool, isMessagingToolSendAction } from "./pi-embedded-messaging.js";
|
|
import type { EmbeddedPiSubscribeContext } from "./pi-embedded-subscribe.handlers.types.js";
|
|
import {
|
|
extractMessagingToolSend,
|
|
isToolResultError,
|
|
sanitizeToolResult,
|
|
} from "./pi-embedded-subscribe.tools.js";
|
|
import { inferToolMetaFromArgs } from "./pi-embedded-utils.js";
|
|
|
|
export async function handleToolExecutionStart(
|
|
ctx: EmbeddedPiSubscribeContext,
|
|
evt: AgentEvent & { toolName: string; toolCallId: string; args: unknown },
|
|
) {
|
|
// Flush pending block replies to preserve message boundaries before tool execution.
|
|
ctx.flushBlockReplyBuffer();
|
|
if (ctx.params.onBlockReplyFlush) {
|
|
void ctx.params.onBlockReplyFlush();
|
|
}
|
|
|
|
const toolName = String(evt.toolName);
|
|
const toolCallId = String(evt.toolCallId);
|
|
const args = evt.args;
|
|
|
|
if (toolName === "read") {
|
|
const record = args && typeof args === "object" ? (args as Record<string, unknown>) : {};
|
|
const filePath = typeof record.path === "string" ? record.path.trim() : "";
|
|
if (!filePath) {
|
|
const argsPreview = typeof args === "string" ? args.slice(0, 200) : undefined;
|
|
ctx.log.warn(
|
|
`read tool called without path: toolCallId=${toolCallId} argsType=${typeof args}${argsPreview ? ` argsPreview=${argsPreview}` : ""}`,
|
|
);
|
|
}
|
|
}
|
|
|
|
const meta = inferToolMetaFromArgs(toolName, args);
|
|
ctx.state.toolMetaById.set(toolCallId, meta);
|
|
ctx.log.debug(
|
|
`embedded run tool start: runId=${ctx.params.runId} tool=${toolName} toolCallId=${toolCallId}`,
|
|
);
|
|
|
|
const shouldEmitToolEvents = ctx.shouldEmitToolResult();
|
|
emitAgentEvent({
|
|
runId: ctx.params.runId,
|
|
stream: "tool",
|
|
data: {
|
|
phase: "start",
|
|
name: toolName,
|
|
toolCallId,
|
|
args: args as Record<string, unknown>,
|
|
},
|
|
});
|
|
// Await onAgentEvent to ensure typing indicator starts before tool summaries are emitted.
|
|
await ctx.params.onAgentEvent?.({
|
|
stream: "tool",
|
|
data: { phase: "start", name: toolName, toolCallId },
|
|
});
|
|
|
|
if (
|
|
ctx.params.onToolResult &&
|
|
shouldEmitToolEvents &&
|
|
!ctx.state.toolSummaryById.has(toolCallId)
|
|
) {
|
|
ctx.state.toolSummaryById.add(toolCallId);
|
|
ctx.emitToolSummary(toolName, meta);
|
|
}
|
|
|
|
// Track messaging tool sends (pending until confirmed in tool_execution_end).
|
|
if (isMessagingTool(toolName)) {
|
|
const argsRecord = args && typeof args === "object" ? (args as Record<string, unknown>) : {};
|
|
const isMessagingSend = isMessagingToolSendAction(toolName, argsRecord);
|
|
if (isMessagingSend) {
|
|
const sendTarget = extractMessagingToolSend(toolName, argsRecord);
|
|
if (sendTarget) {
|
|
ctx.state.pendingMessagingTargets.set(toolCallId, sendTarget);
|
|
}
|
|
// Field names vary by tool: Discord/Slack use "content", sessions_send uses "message"
|
|
const text = (argsRecord.content as string) ?? (argsRecord.message as string);
|
|
if (text && typeof text === "string") {
|
|
ctx.state.pendingMessagingTexts.set(toolCallId, text);
|
|
ctx.log.debug(`Tracking pending messaging text: tool=${toolName} len=${text.length}`);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
export function handleToolExecutionUpdate(
|
|
ctx: EmbeddedPiSubscribeContext,
|
|
evt: AgentEvent & {
|
|
toolName: string;
|
|
toolCallId: string;
|
|
partialResult?: unknown;
|
|
},
|
|
) {
|
|
const toolName = String(evt.toolName);
|
|
const toolCallId = String(evt.toolCallId);
|
|
const partial = evt.partialResult;
|
|
const sanitized = sanitizeToolResult(partial);
|
|
emitAgentEvent({
|
|
runId: ctx.params.runId,
|
|
stream: "tool",
|
|
data: {
|
|
phase: "update",
|
|
name: toolName,
|
|
toolCallId,
|
|
partialResult: sanitized,
|
|
},
|
|
});
|
|
void ctx.params.onAgentEvent?.({
|
|
stream: "tool",
|
|
data: {
|
|
phase: "update",
|
|
name: toolName,
|
|
toolCallId,
|
|
},
|
|
});
|
|
}
|
|
|
|
export function handleToolExecutionEnd(
|
|
ctx: EmbeddedPiSubscribeContext,
|
|
evt: AgentEvent & {
|
|
toolName: string;
|
|
toolCallId: string;
|
|
isError: boolean;
|
|
result?: unknown;
|
|
},
|
|
) {
|
|
const toolName = String(evt.toolName);
|
|
const toolCallId = String(evt.toolCallId);
|
|
const isError = Boolean(evt.isError);
|
|
const result = evt.result;
|
|
const isToolError = isError || isToolResultError(result);
|
|
const sanitizedResult = sanitizeToolResult(result);
|
|
const meta = ctx.state.toolMetaById.get(toolCallId);
|
|
ctx.state.toolMetas.push({ toolName, meta });
|
|
ctx.state.toolMetaById.delete(toolCallId);
|
|
ctx.state.toolSummaryById.delete(toolCallId);
|
|
|
|
// Commit messaging tool text on success, discard on error.
|
|
const pendingText = ctx.state.pendingMessagingTexts.get(toolCallId);
|
|
const pendingTarget = ctx.state.pendingMessagingTargets.get(toolCallId);
|
|
if (pendingText) {
|
|
ctx.state.pendingMessagingTexts.delete(toolCallId);
|
|
if (!isToolError) {
|
|
ctx.state.messagingToolSentTexts.push(pendingText);
|
|
ctx.state.messagingToolSentTextsNormalized.push(normalizeTextForComparison(pendingText));
|
|
ctx.log.debug(`Committed messaging text: tool=${toolName} len=${pendingText.length}`);
|
|
ctx.trimMessagingToolSent();
|
|
}
|
|
}
|
|
if (pendingTarget) {
|
|
ctx.state.pendingMessagingTargets.delete(toolCallId);
|
|
if (!isToolError) {
|
|
ctx.state.messagingToolSentTargets.push(pendingTarget);
|
|
ctx.trimMessagingToolSent();
|
|
}
|
|
}
|
|
|
|
emitAgentEvent({
|
|
runId: ctx.params.runId,
|
|
stream: "tool",
|
|
data: {
|
|
phase: "result",
|
|
name: toolName,
|
|
toolCallId,
|
|
meta,
|
|
isError: isToolError,
|
|
result: sanitizedResult,
|
|
},
|
|
});
|
|
void ctx.params.onAgentEvent?.({
|
|
stream: "tool",
|
|
data: {
|
|
phase: "result",
|
|
name: toolName,
|
|
toolCallId,
|
|
meta,
|
|
isError: isToolError,
|
|
},
|
|
});
|
|
|
|
ctx.log.debug(
|
|
`embedded run tool end: runId=${ctx.params.runId} tool=${toolName} toolCallId=${toolCallId}`,
|
|
);
|
|
}
|