From e7713a28aeecf671b158fd1391101b0d61f85ae8 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Tue, 16 Dec 2025 10:28:57 +0100 Subject: [PATCH] fix(auto-reply): parse agent_end and avoid rpc JSON leaks --- src/auto-reply/command-reply.test.ts | 68 ++++++++++++++++++++++++++++ src/auto-reply/command-reply.ts | 42 +++++++++++------ 2 files changed, 95 insertions(+), 15 deletions(-) diff --git a/src/auto-reply/command-reply.test.ts b/src/auto-reply/command-reply.test.ts index c7310a000..2864c645b 100644 --- a/src/auto-reply/command-reply.test.ts +++ b/src/auto-reply/command-reply.test.ts @@ -378,6 +378,74 @@ describe("runCommandReply (pi)", () => { expect(payloads?.[0]?.text).toBe("Acknowledged."); }); + it("parses assistant text from agent_end messages", async () => { + mockPiRpc({ + stdout: JSON.stringify({ + type: "agent_end", + messages: [ + { + role: "assistant", + content: [{ type: "text", text: "from agent_end" }], + model: "pi-1", + provider: "inflection", + usage: { input: 1, output: 1, cacheRead: 0, cacheWrite: 0, total: 2 }, + stopReason: "stop", + }, + ], + }), + stderr: "", + code: 0, + }); + + const { payloads } = await runCommandReply({ + reply: { + mode: "command", + command: ["pi", "{{Body}}"], + agent: { kind: "pi" }, + }, + templatingCtx: noopTemplateCtx, + sendSystemOnce: false, + isNewSession: true, + isFirstTurnInSession: true, + systemSent: false, + timeoutMs: 1000, + timeoutSeconds: 1, + enqueue: enqueueImmediate, + }); + + expect(payloads?.[0]?.text).toBe("from agent_end"); + }); + + it("does not leak JSON protocol frames when assistant emits no text", async () => { + mockPiRpc({ + stdout: [ + '{"type":"message_end","message":{"role":"assistant","content":[{"type":"thinking","thinking":"hmm"}],"usage":{"input":10,"output":5}}}', + ].join("\n"), + stderr: "", + code: 0, + }); + + const { payloads } = await runCommandReply({ + reply: { + mode: "command", + command: ["pi", "{{Body}}"], + agent: { kind: "pi" }, + }, + templatingCtx: noopTemplateCtx, + sendSystemOnce: false, + isNewSession: true, + isFirstTurnInSession: true, + systemSent: false, + timeoutMs: 1000, + timeoutSeconds: 1, + enqueue: enqueueImmediate, + }); + + expect(payloads?.[0]?.text).toMatch(/produced no output/i); + expect(payloads?.[0]?.text).not.toContain("message_end"); + expect(payloads?.[0]?.text).not.toContain("\"type\""); + }); + it("does not stream tool results when verbose is off", async () => { const onPartial = vi.fn(); mockPiRpc({ diff --git a/src/auto-reply/command-reply.ts b/src/auto-reply/command-reply.ts index 5d20aa030..00cf5193f 100644 --- a/src/auto-reply/command-reply.ts +++ b/src/auto-reply/command-reply.ts @@ -54,9 +54,15 @@ function stripRpcNoise(raw: string): string { if (type === "message_update") continue; // Ignore toolcall delta chatter and input buffer append events. - if (type === "message_update" && msgType === "toolcall_delta") continue; + if (msgType === "toolcall_delta") continue; if (type === "input_audio_buffer.append") continue; + // Preserve agent_end so piSpec.parseOutput can extract the final message set. + if (type === "agent_end") { + kept.push(line); + continue; + } + // Keep only assistant/tool messages; drop agent_start/turn_start/user/etc. const isAssistant = role === "assistant"; const isToolRole = @@ -140,12 +146,21 @@ function extractRpcAssistantText(raw: string): string | undefined { return lastAssistant?.trim() || undefined; } -function extractAssistantTextLoosely(raw: string): string | undefined { - // Fallback: grab the last "text":"..." occurrence from a JSON-ish blob. - const matches = [...raw.matchAll(/"text"\s*:\s*"([^"]+?)"/g)]; - if (!matches.length) return undefined; - const last = matches.at(-1)?.[1]; - return last ? last.replace(/\\n/g, "\n").trim() : undefined; +function extractNonJsonText(raw: string): string | undefined { + const kept: string[] = []; + for (const line of raw.split(/\n+/)) { + const trimmed = line.trim(); + if (!trimmed) continue; + try { + JSON.parse(trimmed); + // JSON protocol frame → never surface directly. + continue; + } catch { + kept.push(line); + } + } + const text = kept.join("\n").trim(); + return text ? text : undefined; } type CommandReplyConfig = NonNullable["reply"] & { @@ -859,12 +874,10 @@ export async function runCommandReply( }); } - // If parser gave nothing, fall back to best-effort assistant text (prefers RPC deltas). - const fallbackText = - rpcAssistantText ?? - extractRpcAssistantText(trimmed) ?? - extractAssistantTextLoosely(trimmed) ?? - trimmed; + // If parser gave nothing, fall back to best-effort assistant text (from RPC deltas), + // or any non-JSON stdout the child may have emitted (e.g. MEDIA tokens). + // Never fall back to raw stdout JSON protocol frames. + const fallbackText = rpcAssistantText ?? extractNonJsonText(rawStdout); const normalize = (s?: string) => stripStructuralPrefixes((s ?? "").trim()).toLowerCase(); const bodyNorm = normalize( @@ -1030,8 +1043,7 @@ export async function runCommandReply( `${timeoutSeconds}s${resolvedCwd ? ` (cwd: ${resolvedCwd})` : ""}. Try a shorter prompt or split the request.`; const partial = extractRpcAssistantText(errorObj.stdout ?? "") || - extractAssistantTextLoosely(errorObj.stdout ?? "") || - stripRpcNoise(errorObj.stdout ?? ""); + extractNonJsonText(errorObj.stdout ?? ""); const partialSnippet = partial && partial.length > 800 ? `${partial.slice(0, 800)}...`