diff --git a/CHANGELOG.md b/CHANGELOG.md index 775d89437..7e42b159a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,7 +10,7 @@ - **Pi/Tau stability:** RPC replies buffered until the assistant turn finishes; parsers return consistent `texts[]`; web auto-replies keep a warm Tau RPC process to avoid cold starts. - **Claude prompt flow:** One-time `sessionIntro` with per-message `/think:high` bodyPrefix; system prompt always sent on first turn even with `sendSystemOnce`. - **Heartbeat UX:** Backpressure skips reply heartbeats while other commands run; skips don’t refresh session `updatedAt`; web/Twilio heartbeats normalize array payloads and optional `heartbeatCommand`. -- **Tau completion signal:** RPC now resolves on Tau’s `agent_end` event so late assistant messages (e.g., camera snap + weather) aren’t truncated. +- **Tau completion signal:** RPC now resolves on Tau’s `agent_end` (or process exit) so late assistant messages aren’t truncated; 5-minute hard cap only as a failsafe. ### Reliability & UX - Outbound chunking prefers newlines/word boundaries and enforces caps (1600 WhatsApp/Twilio, 4000 web). diff --git a/src/process/tau-rpc.ts b/src/process/tau-rpc.ts index ed996d836..50d577a65 100644 --- a/src/process/tau-rpc.ts +++ b/src/process/tau-rpc.ts @@ -24,9 +24,6 @@ class TauRpcClient { private stderr = ""; private buffer: string[] = []; private idleTimer: NodeJS.Timeout | null = null; - private sawToolActivity = false; - private seenAssistantEnd = false; - private seenAgentEnd = false; private readonly idleMs = 120; private pending: | { @@ -54,12 +51,14 @@ class TauRpcClient { this.stderr += d.toString(); }); this.child.on("exit", (code, signal) => { + if (this.idleTimer) clearTimeout(this.idleTimer); if (this.pending) { - this.pending.reject( - new Error(`tau rpc exited (code=${code}, signal=${signal})`), - ); - clearTimeout(this.pending.timer); + const pending = this.pending; this.pending = undefined; + const out = this.buffer.join("\n"); + clearTimeout(pending.timer); + // Treat process exit as completion with whatever output we captured. + pending.resolve({ stdout: out, stderr: this.stderr, code: code ?? 0, signal }); } this.dispose(); }); @@ -70,31 +69,16 @@ class TauRpcClient { this.buffer.push(line); this.pending?.onEvent?.(line); - // Parse the line once to track agent/tool lifecycle signals. + // Parse the line once to track agent lifecycle signals. try { const evt = JSON.parse(line) as { type?: string; message?: unknown }; - // Any tool activity (calls or execution events) means we should wait for agent_end, - // not the first assistant message_end, to avoid truncating follow-up replies. - if ( - evt?.type === "tool_execution_start" || - evt?.type === "tool_execution_end" || - (evt?.type === "message" && - evt.message && - JSON.stringify(evt.message).includes('"toolCall"')) - ) { - this.sawToolActivity = true; - } - if (evt?.type === "agent_end") { - this.seenAgentEnd = true; - if (this.idleTimer) clearTimeout(this.idleTimer); + // Tau signals the end of the prompt/response cycle; resolve with all buffered output. const pending = this.pending; this.pending = undefined; const out = this.buffer.join("\n"); this.buffer = []; - this.sawToolActivity = false; - this.seenAssistantEnd = false; clearTimeout(pending.timer); pending.resolve({ stdout: out, stderr: this.stderr, code: 0 }); return; @@ -102,42 +86,6 @@ class TauRpcClient { } catch { // ignore malformed/non-JSON lines } - - // Streamed JSON arrives line-by-line; mark when an assistant message finishes - // and resolve after a short idle to capture any follow-up events (e.g. tools) - // that belong to the same turn. - if ( - line.includes('"type":"message_end"') && - line.includes('"role":"assistant"') - ) { - this.seenAssistantEnd = true; - } - - if (this.seenAssistantEnd) { - if (this.idleTimer) clearTimeout(this.idleTimer); - this.idleTimer = setTimeout(() => { - if (!this.pending) return; - const out = this.buffer.join("\n"); - // If tools are in-flight, prefer waiting for agent_end to avoid dropping the - // post-tool assistant turn. The outer timeout still prevents hangs. - if (this.sawToolActivity && !this.seenAgentEnd) { - return; - } - // Only resolve once we have at least one assistant text payload; otherwise keep waiting. - const parsed = piSpec.parseOutput(out); - if (parsed.texts && parsed.texts.length > 0) { - const pending = this.pending; - this.pending = undefined; - this.buffer = []; - this.sawToolActivity = false; - this.seenAssistantEnd = false; - clearTimeout(pending.timer); - pending.resolve({ stdout: out, stderr: this.stderr, code: 0 }); - return; - } - // No assistant text yet; wait for more lines. - }, this.idleMs); // small idle window to group streaming blocks - } } async prompt( @@ -162,14 +110,13 @@ class TauRpcClient { if (!ok) child.stdin.once("drain", () => resolve()); }); return await new Promise((resolve, reject) => { + // Hard cap to avoid stuck relays; agent_end or process exit should usually resolve first. + const capMs = Math.min(timeoutMs, 5 * 60 * 1000); const timer = setTimeout(() => { this.pending = undefined; - this.sawToolActivity = false; - this.seenAssistantEnd = false; - this.seenAgentEnd = false; - reject(new Error(`tau rpc timed out after ${timeoutMs}ms`)); + reject(new Error(`tau rpc timed out after ${capMs}ms`)); child.kill("SIGKILL"); - }, timeoutMs); + }, capMs); this.pending = { resolve, reject, timer, onEvent }; }); }