refactor: type tau rpc stream events

This commit is contained in:
Peter Steinberger
2025-12-09 01:37:51 +01:00
parent 27a545f79d
commit e44ed2681f

View File

@@ -13,6 +13,7 @@ import { splitMediaFromOutput } from "../media/parse.js";
import { enqueueCommand } from "../process/command-queue.js"; import { enqueueCommand } from "../process/command-queue.js";
import type { runCommandWithTimeout } from "../process/exec.js"; import type { runCommandWithTimeout } from "../process/exec.js";
import { runPiRpc } from "../process/tau-rpc.js"; import { runPiRpc } from "../process/tau-rpc.js";
import type { AgentEvent, AssistantMessage, Message } from "@mariozechner/pi-ai";
import { applyTemplate, type TemplateContext } from "./templating.js"; import { applyTemplate, type TemplateContext } from "./templating.js";
import { import {
formatToolAggregate, formatToolAggregate,
@@ -533,14 +534,10 @@ export async function runCommandReply(
} }
}; };
let lastStreamedAssistant: string | undefined; let lastStreamedAssistant: string | undefined;
const streamAssistantFinal = (msg?: { const streamAssistantFinal = (msg?: AssistantMessage) => {
role?: string;
content?: unknown | unknown[];
}) => {
if (!onPartialReply || msg?.role !== "assistant") return; if (!onPartialReply || msg?.role !== "assistant") return;
const content = msg.content; const textBlocks = Array.isArray(msg.content)
const textBlocks = Array.isArray(content) ? (msg.content as Array<{ type?: string; text?: string }>)
? (content as Array<{ type?: string; text?: string }>)
.filter((c) => c?.type === "text" && typeof c.text === "string") .filter((c) => c?.type === "text" && typeof c.text === "string")
.map((c) => (c.text ?? "").trim()) .map((c) => (c.text ?? "").trim())
.filter(Boolean) .filter(Boolean)
@@ -575,19 +572,11 @@ export async function runCommandReply(
} }
return copy; return copy;
})(); })();
type RpcStreamEvent = { type RpcStreamEvent =
type?: string; | AgentEvent
message?: { // Tau sometimes emits a bare "message" frame; treat it like message_end for parsing.
role?: string; | { type: "message"; message: Message }
content?: unknown; | { type: "message_end"; message: Message };
toolCallId?: string;
tool_call_id?: string;
} & Record<string, unknown>;
toolName?: string;
toolCallId?: string;
args?: unknown;
[key: string]: unknown;
};
const rpcResult = await runPiRpc({ const rpcResult = await runPiRpc({
argv: rpcArgvForRun, argv: rpcArgvForRun,
@@ -625,14 +614,20 @@ export async function runCommandReply(
} }
if ( if (
(ev.type === "message" || ev.type === "message_end") && ("message" in ev && ev.message) &&
ev.message?.role === "tool_result" && (ev.type === "message" || ev.type === "message_end")
Array.isArray(ev.message.content)
) { ) {
const toolName = inferToolName(ev.message); const msg = ev.message as Message;
const toolCallId = ev.message.toolCallId ?? ev.message.tool_call_id; const role = (msg as any).role;
const isToolResult = role === "toolResult" || role === "tool_result";
if (!isToolResult || !Array.isArray(msg.content)) {
// not a tool result message we care about
} else {
const toolName = inferToolName(msg);
const toolCallId =
(msg as any).toolCallId ?? (msg as any).tool_call_id;
const meta = const meta =
inferToolMeta(ev.message) ?? inferToolMeta(msg) ??
(toolCallId ? toolMetaById.get(toolCallId) : undefined); (toolCallId ? toolMetaById.get(toolCallId) : undefined);
emitAgentEvent({ emitAgentEvent({
@@ -672,10 +667,11 @@ export async function runCommandReply(
flushPendingTool, flushPendingTool,
TOOL_RESULT_DEBOUNCE_MS, TOOL_RESULT_DEBOUNCE_MS,
); );
}
} }
if (ev.type === "message_end") { if (ev.type === "message_end" && "message" in ev && ev.message && ev.message.role === "assistant") {
streamAssistantFinal(ev.message); streamAssistantFinal(ev.message as AssistantMessage);
const text = extractRpcAssistantText(line); const text = extractRpcAssistantText(line);
if (text) { if (text) {
params.onAgentEvent?.({ params.onAgentEvent?.({
@@ -686,10 +682,10 @@ export async function runCommandReply(
} }
// Preserve existing partial reply hook when provided. // Preserve existing partial reply hook when provided.
if (onPartialReply && ev.message?.role === "assistant") { if (onPartialReply && "message" in ev && ev.message?.role === "assistant") {
// Let the existing logic reuse the already-parsed message. // Let the existing logic reuse the already-parsed message.
try { try {
streamAssistantFinal(ev.message); streamAssistantFinal(ev.message as AssistantMessage);
} catch { } catch {
/* ignore */ /* ignore */
} }