Tau RPC: resolve on agent_end or exit

This commit is contained in:
Peter Steinberger
2025-12-03 11:34:00 +00:00
parent cc596ef011
commit 3958450223
2 changed files with 13 additions and 66 deletions

View File

@@ -10,7 +10,7 @@
- **Pi/Tau stability:** RPC replies buffered until the assistant turn finishes; parsers return consistent `texts[]`; web auto-replies keep a warm Tau RPC process to avoid cold starts.
- **Claude prompt flow:** One-time `sessionIntro` with per-message `/think:high` bodyPrefix; system prompt always sent on first turn even with `sendSystemOnce`.
- **Heartbeat UX:** Backpressure skips reply heartbeats while other commands run; skips dont refresh session `updatedAt`; web/Twilio heartbeats normalize array payloads and optional `heartbeatCommand`.
- **Tau completion signal:** RPC now resolves on Taus `agent_end` event so late assistant messages (e.g., camera snap + weather) arent truncated.
- **Tau completion signal:** RPC now resolves on Taus `agent_end` (or process exit) so late assistant messages arent truncated; 5-minute hard cap only as a failsafe.
### Reliability & UX
- Outbound chunking prefers newlines/word boundaries and enforces caps (1600 WhatsApp/Twilio, 4000 web).

View File

@@ -24,9 +24,6 @@ class TauRpcClient {
private stderr = "";
private buffer: string[] = [];
private idleTimer: NodeJS.Timeout | null = null;
private sawToolActivity = false;
private seenAssistantEnd = false;
private seenAgentEnd = false;
private readonly idleMs = 120;
private pending:
| {
@@ -54,12 +51,14 @@ class TauRpcClient {
this.stderr += d.toString();
});
this.child.on("exit", (code, signal) => {
if (this.idleTimer) clearTimeout(this.idleTimer);
if (this.pending) {
this.pending.reject(
new Error(`tau rpc exited (code=${code}, signal=${signal})`),
);
clearTimeout(this.pending.timer);
const pending = this.pending;
this.pending = undefined;
const out = this.buffer.join("\n");
clearTimeout(pending.timer);
// Treat process exit as completion with whatever output we captured.
pending.resolve({ stdout: out, stderr: this.stderr, code: code ?? 0, signal });
}
this.dispose();
});
@@ -70,31 +69,16 @@ class TauRpcClient {
this.buffer.push(line);
this.pending?.onEvent?.(line);
// Parse the line once to track agent/tool lifecycle signals.
// Parse the line once to track agent lifecycle signals.
try {
const evt = JSON.parse(line) as { type?: string; message?: unknown };
// Any tool activity (calls or execution events) means we should wait for agent_end,
// not the first assistant message_end, to avoid truncating follow-up replies.
if (
evt?.type === "tool_execution_start" ||
evt?.type === "tool_execution_end" ||
(evt?.type === "message" &&
evt.message &&
JSON.stringify(evt.message).includes('"toolCall"'))
) {
this.sawToolActivity = true;
}
if (evt?.type === "agent_end") {
this.seenAgentEnd = true;
if (this.idleTimer) clearTimeout(this.idleTimer);
// Tau signals the end of the prompt/response cycle; resolve with all buffered output.
const pending = this.pending;
this.pending = undefined;
const out = this.buffer.join("\n");
this.buffer = [];
this.sawToolActivity = false;
this.seenAssistantEnd = false;
clearTimeout(pending.timer);
pending.resolve({ stdout: out, stderr: this.stderr, code: 0 });
return;
@@ -102,42 +86,6 @@ class TauRpcClient {
} catch {
// ignore malformed/non-JSON lines
}
// Streamed JSON arrives line-by-line; mark when an assistant message finishes
// and resolve after a short idle to capture any follow-up events (e.g. tools)
// that belong to the same turn.
if (
line.includes('"type":"message_end"') &&
line.includes('"role":"assistant"')
) {
this.seenAssistantEnd = true;
}
if (this.seenAssistantEnd) {
if (this.idleTimer) clearTimeout(this.idleTimer);
this.idleTimer = setTimeout(() => {
if (!this.pending) return;
const out = this.buffer.join("\n");
// If tools are in-flight, prefer waiting for agent_end to avoid dropping the
// post-tool assistant turn. The outer timeout still prevents hangs.
if (this.sawToolActivity && !this.seenAgentEnd) {
return;
}
// Only resolve once we have at least one assistant text payload; otherwise keep waiting.
const parsed = piSpec.parseOutput(out);
if (parsed.texts && parsed.texts.length > 0) {
const pending = this.pending;
this.pending = undefined;
this.buffer = [];
this.sawToolActivity = false;
this.seenAssistantEnd = false;
clearTimeout(pending.timer);
pending.resolve({ stdout: out, stderr: this.stderr, code: 0 });
return;
}
// No assistant text yet; wait for more lines.
}, this.idleMs); // small idle window to group streaming blocks
}
}
async prompt(
@@ -162,14 +110,13 @@ class TauRpcClient {
if (!ok) child.stdin.once("drain", () => resolve());
});
return await new Promise<TauRpcResult>((resolve, reject) => {
// Hard cap to avoid stuck relays; agent_end or process exit should usually resolve first.
const capMs = Math.min(timeoutMs, 5 * 60 * 1000);
const timer = setTimeout(() => {
this.pending = undefined;
this.sawToolActivity = false;
this.seenAssistantEnd = false;
this.seenAgentEnd = false;
reject(new Error(`tau rpc timed out after ${timeoutMs}ms`));
reject(new Error(`tau rpc timed out after ${capMs}ms`));
child.kill("SIGKILL");
}, timeoutMs);
}, capMs);
this.pending = { resolve, reject, timer, onEvent };
});
}