pi: parse turn_end streams

This commit is contained in:
Peter Steinberger
2025-12-10 11:31:21 +00:00
parent cce65e19e1
commit 81385cf820
2 changed files with 69 additions and 38 deletions

View File

@@ -14,6 +14,7 @@ type PiAssistantMessage = {
model?: string;
provider?: string;
stopReason?: string;
errorMessage?: string;
name?: string;
toolName?: string;
tool_call_id?: string;
@@ -68,53 +69,80 @@ function parsePiJson(raw: string): AgentParseResult {
let lastAssistant: PiAssistantMessage | undefined;
let lastPushed: string | undefined;
const pickText = (msg?: PiAssistantMessage) =>
msg?.content
?.filter((c) => c?.type === "text" && typeof c.text === "string")
.map((c) => c.text)
.join("\n")
.trim();
const handleAssistant = (msg?: PiAssistantMessage) => {
if (!msg) return;
lastAssistant = msg;
const text = pickText(msg);
const fallbackError =
!text && typeof msg.errorMessage === "string"
? `Warning: ${msg.errorMessage}`
: undefined;
const chosen = (text || fallbackError)?.trim();
if (chosen && chosen !== lastPushed) {
texts.push(chosen);
lastPushed = chosen;
}
};
const handleToolResult = (msg?: PiAssistantMessage) => {
if (!msg || !msg.content) return;
const toolText = pickText(msg);
if (!toolText) return;
toolResults.push({
text: toolText,
toolName: inferToolName(msg),
meta: deriveToolMeta(msg),
});
};
for (const line of lines) {
try {
const ev = JSON.parse(line) as {
type?: string;
message?: PiAssistantMessage;
toolResults?: PiAssistantMessage[];
messages?: PiAssistantMessage[];
};
const isToolResult =
(ev.type === "message" || ev.type === "message_end") &&
ev.message?.role &&
typeof ev.message.role === "string" &&
ev.message.role.toLowerCase().includes("tool");
const isAssistantMessage =
(ev.type === "message" || ev.type === "message_end") &&
ev.message?.role === "assistant" &&
Array.isArray(ev.message.content);
if (!isAssistantMessage && !isToolResult) continue;
const msg = ev.message as PiAssistantMessage;
const msgText = msg.content
?.filter((c) => c?.type === "text" && typeof c.text === "string")
.map((c) => c.text)
.join("\n")
.trim();
if (isAssistantMessage) {
// Always remember the last assistant message so we keep usage/model meta even when no text.
lastAssistant = msg;
if (msgText && msgText !== lastPushed) {
texts.push(msgText);
lastPushed = msgText;
}
} else if (isToolResult && msg.content) {
const toolText = msg.content
?.filter((c) => c?.type === "text" && typeof c.text === "string")
.map((c) => c.text)
.join("\n")
.trim();
if (toolText) {
toolResults.push({
text: toolText,
toolName: inferToolName(msg),
meta: deriveToolMeta(msg),
});
// Turn-level assistant + tool results
if (ev.type === "turn_end") {
handleAssistant(ev.message);
if (Array.isArray(ev.toolResults)) {
for (const tr of ev.toolResults) handleToolResult(tr);
}
}
// Agent-level summary of all messages
if (ev.type === "agent_end" && Array.isArray(ev.messages)) {
for (const msg of ev.messages) {
const role = msg?.role ?? "";
if (role === "assistant") handleAssistant(msg);
else if (role.toLowerCase().includes("tool")) handleToolResult(msg);
}
}
const role = ev.message?.role ?? "";
const isAssistantMessage =
(ev.type === "message" ||
ev.type === "message_end" ||
ev.type === "message_start") &&
role === "assistant";
const isToolResult =
(ev.type === "message" ||
ev.type === "message_end" ||
ev.type === "message_start") &&
typeof role === "string" &&
role.toLowerCase().includes("tool");
if (isAssistantMessage) handleAssistant(ev.message);
if (isToolResult) handleToolResult(ev.message);
} catch {
// ignore malformed lines
}

View File

@@ -46,6 +46,9 @@ function stripRpcNoise(raw: string): string {
const msgType = msg?.type;
const role = msg?.role;
// Drop early lifecycle frames; we only want final assistant/tool outputs.
if (type === "message_start") continue;
// RPC streaming emits one message_update per delta; skip them to avoid flooding fallbacks.
if (type === "message_update") continue;