From e44ed2681fa19364c91bb5ff2d436227ab8e5274 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Tue, 9 Dec 2025 01:37:51 +0100 Subject: [PATCH] refactor: type tau rpc stream events --- src/auto-reply/command-reply.ts | 56 +++++++++++++++------------------ 1 file changed, 26 insertions(+), 30 deletions(-) diff --git a/src/auto-reply/command-reply.ts b/src/auto-reply/command-reply.ts index 7946c4c8b..11cb2b7a3 100644 --- a/src/auto-reply/command-reply.ts +++ b/src/auto-reply/command-reply.ts @@ -13,6 +13,7 @@ import { splitMediaFromOutput } from "../media/parse.js"; import { enqueueCommand } from "../process/command-queue.js"; import type { runCommandWithTimeout } from "../process/exec.js"; import { runPiRpc } from "../process/tau-rpc.js"; +import type { AgentEvent, AssistantMessage, Message } from "@mariozechner/pi-ai"; import { applyTemplate, type TemplateContext } from "./templating.js"; import { formatToolAggregate, @@ -533,14 +534,10 @@ export async function runCommandReply( } }; let lastStreamedAssistant: string | undefined; - const streamAssistantFinal = (msg?: { - role?: string; - content?: unknown | unknown[]; - }) => { + const streamAssistantFinal = (msg?: AssistantMessage) => { if (!onPartialReply || msg?.role !== "assistant") return; - const content = msg.content; - const textBlocks = Array.isArray(content) - ? (content as Array<{ type?: string; text?: string }>) + const textBlocks = Array.isArray(msg.content) + ? (msg.content as Array<{ type?: string; text?: string }>) .filter((c) => c?.type === "text" && typeof c.text === "string") .map((c) => (c.text ?? "").trim()) .filter(Boolean) @@ -575,19 +572,11 @@ export async function runCommandReply( } return copy; })(); - type RpcStreamEvent = { - type?: string; - message?: { - role?: string; - content?: unknown; - toolCallId?: string; - tool_call_id?: string; - } & Record; - toolName?: string; - toolCallId?: string; - args?: unknown; - [key: string]: unknown; - }; + type RpcStreamEvent = + | AgentEvent + // Tau sometimes emits a bare "message" frame; treat it like message_end for parsing. + | { type: "message"; message: Message } + | { type: "message_end"; message: Message }; const rpcResult = await runPiRpc({ argv: rpcArgvForRun, @@ -625,14 +614,20 @@ export async function runCommandReply( } if ( - (ev.type === "message" || ev.type === "message_end") && - ev.message?.role === "tool_result" && - Array.isArray(ev.message.content) + ("message" in ev && ev.message) && + (ev.type === "message" || ev.type === "message_end") ) { - const toolName = inferToolName(ev.message); - const toolCallId = ev.message.toolCallId ?? ev.message.tool_call_id; + const msg = ev.message as Message; + const role = (msg as any).role; + const isToolResult = role === "toolResult" || role === "tool_result"; + if (!isToolResult || !Array.isArray(msg.content)) { + // not a tool result message we care about + } else { + const toolName = inferToolName(msg); + const toolCallId = + (msg as any).toolCallId ?? (msg as any).tool_call_id; const meta = - inferToolMeta(ev.message) ?? + inferToolMeta(msg) ?? (toolCallId ? toolMetaById.get(toolCallId) : undefined); emitAgentEvent({ @@ -672,10 +667,11 @@ export async function runCommandReply( flushPendingTool, TOOL_RESULT_DEBOUNCE_MS, ); + } } - if (ev.type === "message_end") { - streamAssistantFinal(ev.message); + if (ev.type === "message_end" && "message" in ev && ev.message && ev.message.role === "assistant") { + streamAssistantFinal(ev.message as AssistantMessage); const text = extractRpcAssistantText(line); if (text) { params.onAgentEvent?.({ @@ -686,10 +682,10 @@ export async function runCommandReply( } // Preserve existing partial reply hook when provided. - if (onPartialReply && ev.message?.role === "assistant") { + if (onPartialReply && "message" in ev && ev.message?.role === "assistant") { // Let the existing logic reuse the already-parsed message. try { - streamAssistantFinal(ev.message); + streamAssistantFinal(ev.message as AssistantMessage); } catch { /* ignore */ }