diff --git a/src/agents/pi.ts b/src/agents/pi.ts index 027b21ae9..9e2bbd9ac 100644 --- a/src/agents/pi.ts +++ b/src/agents/pi.ts @@ -14,6 +14,7 @@ type PiAssistantMessage = { model?: string; provider?: string; stopReason?: string; + errorMessage?: string; name?: string; toolName?: string; tool_call_id?: string; @@ -68,53 +69,80 @@ function parsePiJson(raw: string): AgentParseResult { let lastAssistant: PiAssistantMessage | undefined; let lastPushed: string | undefined; + const pickText = (msg?: PiAssistantMessage) => + msg?.content + ?.filter((c) => c?.type === "text" && typeof c.text === "string") + .map((c) => c.text) + .join("\n") + .trim(); + + const handleAssistant = (msg?: PiAssistantMessage) => { + if (!msg) return; + lastAssistant = msg; + const text = pickText(msg); + const fallbackError = + !text && typeof msg.errorMessage === "string" + ? `Warning: ${msg.errorMessage}` + : undefined; + const chosen = (text || fallbackError)?.trim(); + if (chosen && chosen !== lastPushed) { + texts.push(chosen); + lastPushed = chosen; + } + }; + + const handleToolResult = (msg?: PiAssistantMessage) => { + if (!msg || !msg.content) return; + const toolText = pickText(msg); + if (!toolText) return; + toolResults.push({ + text: toolText, + toolName: inferToolName(msg), + meta: deriveToolMeta(msg), + }); + }; + for (const line of lines) { try { const ev = JSON.parse(line) as { type?: string; message?: PiAssistantMessage; + toolResults?: PiAssistantMessage[]; + messages?: PiAssistantMessage[]; }; - const isToolResult = - (ev.type === "message" || ev.type === "message_end") && - ev.message?.role && - typeof ev.message.role === "string" && - ev.message.role.toLowerCase().includes("tool"); - const isAssistantMessage = - (ev.type === "message" || ev.type === "message_end") && - ev.message?.role === "assistant" && - Array.isArray(ev.message.content); - - if (!isAssistantMessage && !isToolResult) continue; - - const msg = ev.message as PiAssistantMessage; - const msgText = msg.content - ?.filter((c) => c?.type === "text" && typeof c.text === "string") - .map((c) => c.text) - .join("\n") - .trim(); - - if (isAssistantMessage) { - // Always remember the last assistant message so we keep usage/model meta even when no text. - lastAssistant = msg; - if (msgText && msgText !== lastPushed) { - texts.push(msgText); - lastPushed = msgText; - } - } else if (isToolResult && msg.content) { - const toolText = msg.content - ?.filter((c) => c?.type === "text" && typeof c.text === "string") - .map((c) => c.text) - .join("\n") - .trim(); - if (toolText) { - toolResults.push({ - text: toolText, - toolName: inferToolName(msg), - meta: deriveToolMeta(msg), - }); + // Turn-level assistant + tool results + if (ev.type === "turn_end") { + handleAssistant(ev.message); + if (Array.isArray(ev.toolResults)) { + for (const tr of ev.toolResults) handleToolResult(tr); } } + + // Agent-level summary of all messages + if (ev.type === "agent_end" && Array.isArray(ev.messages)) { + for (const msg of ev.messages) { + const role = msg?.role ?? ""; + if (role === "assistant") handleAssistant(msg); + else if (role.toLowerCase().includes("tool")) handleToolResult(msg); + } + } + + const role = ev.message?.role ?? ""; + const isAssistantMessage = + (ev.type === "message" || + ev.type === "message_end" || + ev.type === "message_start") && + role === "assistant"; + const isToolResult = + (ev.type === "message" || + ev.type === "message_end" || + ev.type === "message_start") && + typeof role === "string" && + role.toLowerCase().includes("tool"); + + if (isAssistantMessage) handleAssistant(ev.message); + if (isToolResult) handleToolResult(ev.message); } catch { // ignore malformed lines } diff --git a/src/auto-reply/command-reply.ts b/src/auto-reply/command-reply.ts index 17d24df71..f8cf8d9b5 100644 --- a/src/auto-reply/command-reply.ts +++ b/src/auto-reply/command-reply.ts @@ -46,6 +46,9 @@ function stripRpcNoise(raw: string): string { const msgType = msg?.type; const role = msg?.role; + // Drop early lifecycle frames; we only want final assistant/tool outputs. + if (type === "message_start") continue; + // RPC streaming emits one message_update per delta; skip them to avoid flooding fallbacks. if (type === "message_update") continue;