import type { AgentEvent } from "@mariozechner/pi-agent-core"; import { emitAgentEvent } from "../infra/agent-events.js"; import { normalizeTextForComparison } from "./pi-embedded-helpers.js"; import { isMessagingTool, isMessagingToolSendAction } from "./pi-embedded-messaging.js"; import type { EmbeddedPiSubscribeContext } from "./pi-embedded-subscribe.handlers.types.js"; import { extractToolErrorMessage, extractToolResultText, extractMessagingToolSend, isToolResultError, sanitizeToolResult, } from "./pi-embedded-subscribe.tools.js"; import { inferToolMetaFromArgs } from "./pi-embedded-utils.js"; import { normalizeToolName } from "./tool-policy.js"; function extendExecMeta(toolName: string, args: unknown, meta?: string): string | undefined { const normalized = toolName.trim().toLowerCase(); if (normalized !== "exec" && normalized !== "bash") return meta; if (!args || typeof args !== "object") return meta; const record = args as Record; const flags: string[] = []; if (record.pty === true) flags.push("pty"); if (record.elevated === true) flags.push("elevated"); if (flags.length === 0) return meta; const suffix = flags.join(" · "); return meta ? `${meta} · ${suffix}` : suffix; } export async function handleToolExecutionStart( ctx: EmbeddedPiSubscribeContext, evt: AgentEvent & { toolName: string; toolCallId: string; args: unknown }, ) { // Flush pending block replies to preserve message boundaries before tool execution. ctx.flushBlockReplyBuffer(); if (ctx.params.onBlockReplyFlush) { void ctx.params.onBlockReplyFlush(); } const rawToolName = String(evt.toolName); const toolName = normalizeToolName(rawToolName); const toolCallId = String(evt.toolCallId); const args = evt.args; if (toolName === "read") { const record = args && typeof args === "object" ? (args as Record) : {}; const filePath = typeof record.path === "string" ? record.path.trim() : ""; if (!filePath) { const argsPreview = typeof args === "string" ? args.slice(0, 200) : undefined; ctx.log.warn( `read tool called without path: toolCallId=${toolCallId} argsType=${typeof args}${argsPreview ? ` argsPreview=${argsPreview}` : ""}`, ); } } const meta = extendExecMeta(toolName, args, inferToolMetaFromArgs(toolName, args)); ctx.state.toolMetaById.set(toolCallId, meta); ctx.log.debug( `embedded run tool start: runId=${ctx.params.runId} tool=${toolName} toolCallId=${toolCallId}`, ); const shouldEmitToolEvents = ctx.shouldEmitToolResult(); emitAgentEvent({ runId: ctx.params.runId, stream: "tool", data: { phase: "start", name: toolName, toolCallId, args: args as Record, }, }); // Best-effort typing signal; do not block tool summaries on slow emitters. void ctx.params.onAgentEvent?.({ stream: "tool", data: { phase: "start", name: toolName, toolCallId }, }); if ( ctx.params.onToolResult && shouldEmitToolEvents && !ctx.state.toolSummaryById.has(toolCallId) ) { ctx.state.toolSummaryById.add(toolCallId); ctx.emitToolSummary(toolName, meta); } // Track messaging tool sends (pending until confirmed in tool_execution_end). if (isMessagingTool(toolName)) { const argsRecord = args && typeof args === "object" ? (args as Record) : {}; const isMessagingSend = isMessagingToolSendAction(toolName, argsRecord); if (isMessagingSend) { const sendTarget = extractMessagingToolSend(toolName, argsRecord); if (sendTarget) { ctx.state.pendingMessagingTargets.set(toolCallId, sendTarget); } // Field names vary by tool: Discord/Slack use "content", sessions_send uses "message" const text = (argsRecord.content as string) ?? (argsRecord.message as string); if (text && typeof text === "string") { ctx.state.pendingMessagingTexts.set(toolCallId, text); ctx.log.debug(`Tracking pending messaging text: tool=${toolName} len=${text.length}`); } } } } export function handleToolExecutionUpdate( ctx: EmbeddedPiSubscribeContext, evt: AgentEvent & { toolName: string; toolCallId: string; partialResult?: unknown; }, ) { const toolName = normalizeToolName(String(evt.toolName)); const toolCallId = String(evt.toolCallId); const partial = evt.partialResult; const sanitized = sanitizeToolResult(partial); emitAgentEvent({ runId: ctx.params.runId, stream: "tool", data: { phase: "update", name: toolName, toolCallId, partialResult: sanitized, }, }); void ctx.params.onAgentEvent?.({ stream: "tool", data: { phase: "update", name: toolName, toolCallId, }, }); } export function handleToolExecutionEnd( ctx: EmbeddedPiSubscribeContext, evt: AgentEvent & { toolName: string; toolCallId: string; isError: boolean; result?: unknown; }, ) { const toolName = normalizeToolName(String(evt.toolName)); const toolCallId = String(evt.toolCallId); const isError = Boolean(evt.isError); const result = evt.result; const isToolError = isError || isToolResultError(result); const sanitizedResult = sanitizeToolResult(result); const meta = ctx.state.toolMetaById.get(toolCallId); ctx.state.toolMetas.push({ toolName, meta }); ctx.state.toolMetaById.delete(toolCallId); ctx.state.toolSummaryById.delete(toolCallId); if (isToolError) { const errorMessage = extractToolErrorMessage(sanitizedResult); ctx.state.lastToolError = { toolName, meta, error: errorMessage, }; } // Commit messaging tool text on success, discard on error. const pendingText = ctx.state.pendingMessagingTexts.get(toolCallId); const pendingTarget = ctx.state.pendingMessagingTargets.get(toolCallId); if (pendingText) { ctx.state.pendingMessagingTexts.delete(toolCallId); if (!isToolError) { ctx.state.messagingToolSentTexts.push(pendingText); ctx.state.messagingToolSentTextsNormalized.push(normalizeTextForComparison(pendingText)); ctx.log.debug(`Committed messaging text: tool=${toolName} len=${pendingText.length}`); ctx.trimMessagingToolSent(); } } if (pendingTarget) { ctx.state.pendingMessagingTargets.delete(toolCallId); if (!isToolError) { ctx.state.messagingToolSentTargets.push(pendingTarget); ctx.trimMessagingToolSent(); } } emitAgentEvent({ runId: ctx.params.runId, stream: "tool", data: { phase: "result", name: toolName, toolCallId, meta, isError: isToolError, result: sanitizedResult, }, }); void ctx.params.onAgentEvent?.({ stream: "tool", data: { phase: "result", name: toolName, toolCallId, meta, isError: isToolError, }, }); ctx.log.debug( `embedded run tool end: runId=${ctx.params.runId} tool=${toolName} toolCallId=${toolCallId}`, ); if (ctx.params.onToolResult && ctx.shouldEmitToolOutput()) { const outputText = extractToolResultText(sanitizedResult); if (outputText) { ctx.emitToolOutput(toolName, meta, outputText); } } }