From c3792db0e5b451828907e53754e4fa8c1caefb95 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Wed, 3 Dec 2025 09:21:31 +0000 Subject: [PATCH] Auto-reply: stream verbose tool results via tau rpc --- src/auto-reply/command-reply.ts | 44 +++++++++- src/auto-reply/reply.ts | 139 +++++++++++++++++--------------- src/auto-reply/types.ts | 1 + src/process/tau-rpc.ts | 13 ++- 4 files changed, 127 insertions(+), 70 deletions(-) diff --git a/src/auto-reply/command-reply.ts b/src/auto-reply/command-reply.ts index e0b6c14c1..e1397219e 100644 --- a/src/auto-reply/command-reply.ts +++ b/src/auto-reply/command-reply.ts @@ -35,6 +35,7 @@ type CommandReplyParams = { enqueue?: EnqueueRunner; thinkLevel?: ThinkLevel; verboseLevel?: "off" | "on"; + onPartialReply?: (payload: ReplyPayload) => Promise | void; }; export type CommandReplyMeta = { @@ -143,6 +144,7 @@ export async function runCommandReply( enqueue = enqueueCommand, thinkLevel, verboseLevel, + onPartialReply, } = params; if (!reply.command?.length) { @@ -274,6 +276,42 @@ export async function runCommandReply( cwd: reply.cwd, prompt: body, timeoutMs, + onEvent: + verboseLevel === "on" && onPartialReply + ? (line: string) => { + try { + const ev = JSON.parse(line) as { + type?: string; + message?: { role?: string; content?: unknown[] }; + }; + if ( + (ev.type === "message" || ev.type === "message_end") && + ev.message?.role === "tool_result" && + Array.isArray(ev.message.content) + ) { + const text = ( + ev.message.content as Array<{ text?: string }> + ) + .map((c) => c.text) + .filter((t): t is string => !!t) + .join("\n") + .trim(); + if (text) { + const { text: cleanedText, mediaUrls: mediaFound } = + splitMediaFromOutput(`🛠️ ${text}`); + void onPartialReply({ + text: cleanedText, + mediaUrls: mediaFound?.length + ? mediaFound + : undefined, + } as ReplyPayload); + } + } + } catch { + // ignore malformed lines + } + } + : undefined, }); } return await commandRunner(finalArgv, { timeoutMs, cwd: reply.cwd }); @@ -309,8 +347,10 @@ export async function runCommandReply( type ReplyItem = { text: string; media?: string[] }; const replyItems: ReplyItem[] = []; - // When verbose is on, surface tool results first (before assistant summary) to mirror chat ordering. - if (verboseLevel === "on") { + const includeToolResultsInline = + verboseLevel === "on" && !onPartialReply && parsedToolResults.length > 0; + + if (includeToolResultsInline) { for (const tr of parsedToolResults) { const prefixed = `🛠️ ${tr}`; const { text: cleanedText, mediaUrls: mediaFound } = diff --git a/src/auto-reply/reply.ts b/src/auto-reply/reply.ts index 5fa728987..706a03bce 100644 --- a/src/auto-reply/reply.ts +++ b/src/auto-reply/reply.ts @@ -529,6 +529,7 @@ export async function getReplyFromConfig( commandRunner, thinkLevel: resolvedThinkLevel, verboseLevel: resolvedVerboseLevel, + onPartialReply: opts?.onPartialReply, }); const payloadArray = runResult.payloads ?? []; const meta = runResult.meta; @@ -611,6 +612,16 @@ export async function autoReplyIfConfigured( To: message.to ?? undefined, MessageSid: message.sid, }; + const replyFrom = message.to; + const replyTo = message.from; + if (!replyFrom || !replyTo) { + if (isVerbose()) + console.error( + "Skipping auto-reply: missing to/from on inbound message", + ctx, + ); + return; + } const cfg = configOverride ?? loadConfig(); // Attach media hints for transcription/templates if present on Twilio payloads. const mediaUrl = (message as { mediaUrl?: string }).mediaUrl; @@ -632,10 +643,72 @@ export async function autoReplyIfConfigured( } } + const sendTwilio = async (body: string, media?: string) => { + let resolvedMedia = media; + if (resolvedMedia && !/^https?:\/\//i.test(resolvedMedia)) { + const hosted = await ensureMediaHosted(resolvedMedia); + resolvedMedia = hosted.url; + } + await client.messages.create({ + from: replyFrom, + to: replyTo, + body, + ...(resolvedMedia ? { mediaUrl: [resolvedMedia] } : {}), + }); + }; + + const sendPayload = async (replyPayload: ReplyPayload) => { + const mediaList = replyPayload.mediaUrls?.length + ? replyPayload.mediaUrls + : replyPayload.mediaUrl + ? [replyPayload.mediaUrl] + : []; + + const text = replyPayload.text ?? ""; + const chunks = chunkText(text, TWILIO_TEXT_LIMIT); + if (chunks.length === 0) chunks.push(""); + + for (let i = 0; i < chunks.length; i++) { + const body = chunks[i]; + const attachMedia = i === 0 ? mediaList[0] : undefined; + + if (body) { + logVerbose( + `Auto-replying via Twilio: from ${replyFrom} to ${replyTo}, body length ${body.length}`, + ); + } else if (attachMedia) { + logVerbose( + `Auto-replying via Twilio: from ${replyFrom} to ${replyTo} (media only)`, + ); + } + + await sendTwilio(body, attachMedia); + + if (i === 0 && mediaList.length > 1) { + for (const extra of mediaList.slice(1)) { + await sendTwilio("", extra); + } + } + + if (isVerbose()) { + console.log( + info( + `↩️ Auto-replied to ${replyTo} (sid ${message.sid ?? "no-sid"}${attachMedia ? ", media" : ""})`, + ), + ); + } + } + }; + + const partialSender = async (payload: ReplyPayload) => { + await sendPayload(payload); + }; + const replyResult = await getReplyFromConfig( ctx, { onReplyStart: () => sendTypingIndicator(client, runtime, message.sid), + onPartialReply: partialSender, }, cfg, ); @@ -646,73 +719,9 @@ export async function autoReplyIfConfigured( : []; if (replies.length === 0) return; - const replyFrom = message.to; - const replyTo = message.from; - if (!replyFrom || !replyTo) { - if (isVerbose()) - console.error( - "Skipping auto-reply: missing to/from on inbound message", - ctx, - ); - return; - } - try { - const sendTwilio = async (body: string, media?: string) => { - let resolvedMedia = media; - if (resolvedMedia && !/^https?:\/\//i.test(resolvedMedia)) { - const hosted = await ensureMediaHosted(resolvedMedia); - resolvedMedia = hosted.url; - } - await client.messages.create({ - from: replyFrom, - to: replyTo, - body, - ...(resolvedMedia ? { mediaUrl: [resolvedMedia] } : {}), - }); - }; - for (const replyPayload of replies) { - const mediaList = replyPayload.mediaUrls?.length - ? replyPayload.mediaUrls - : replyPayload.mediaUrl - ? [replyPayload.mediaUrl] - : []; - - const text = replyPayload.text ?? ""; - const chunks = chunkText(text, TWILIO_TEXT_LIMIT); - if (chunks.length === 0) chunks.push(""); - - for (let i = 0; i < chunks.length; i++) { - const body = chunks[i]; - const attachMedia = i === 0 ? mediaList[0] : undefined; - - if (body) { - logVerbose( - `Auto-replying via Twilio: from ${replyFrom} to ${replyTo}, body length ${body.length}`, - ); - } else if (attachMedia) { - logVerbose( - `Auto-replying via Twilio: from ${replyFrom} to ${replyTo} (media only)`, - ); - } - - await sendTwilio(body, attachMedia); - - if (i === 0 && mediaList.length > 1) { - for (const extra of mediaList.slice(1)) { - await sendTwilio("", extra); - } - } - - if (isVerbose()) { - console.log( - info( - `↩️ Auto-replied to ${replyTo} (sid ${message.sid ?? "no-sid"}${attachMedia ? ", media" : ""})`, - ), - ); - } - } + await sendPayload(replyPayload); } } catch (err) { const anyErr = err as { diff --git a/src/auto-reply/types.ts b/src/auto-reply/types.ts index 211d1f932..38916c685 100644 --- a/src/auto-reply/types.ts +++ b/src/auto-reply/types.ts @@ -1,6 +1,7 @@ export type GetReplyOptions = { onReplyStart?: () => Promise | void; isHeartbeat?: boolean; + onPartialReply?: (payload: ReplyPayload) => Promise | void; }; export type ReplyPayload = { diff --git a/src/process/tau-rpc.ts b/src/process/tau-rpc.ts index 83b44c416..adaddeb57 100644 --- a/src/process/tau-rpc.ts +++ b/src/process/tau-rpc.ts @@ -7,6 +7,7 @@ type TauRpcOptions = { argv: string[]; cwd?: string; timeoutMs: number; + onEvent?: (line: string) => void; }; type TauRpcResult = { @@ -30,6 +31,7 @@ class TauRpcClient { resolve: (r: TauRpcResult) => void; reject: (err: unknown) => void; timer: NodeJS.Timeout; + onEvent?: (line: string) => void; } | undefined; @@ -64,6 +66,7 @@ class TauRpcClient { private handleLine(line: string) { if (!this.pending) return; this.buffer.push(line); + this.pending?.onEvent?.(line); // Streamed JSON arrives line-by-line; mark when an assistant message finishes // and resolve after a short idle to capture any follow-up events (e.g. tools) // that belong to the same turn. @@ -95,7 +98,11 @@ class TauRpcClient { } } - async prompt(prompt: string, timeoutMs: number): Promise { + async prompt( + prompt: string, + timeoutMs: number, + onEvent?: (line: string) => void, + ): Promise { this.ensureChild(); if (this.pending) { throw new Error("tau rpc already handling a request"); @@ -118,7 +125,7 @@ class TauRpcClient { reject(new Error(`tau rpc timed out after ${timeoutMs}ms`)); child.kill("SIGKILL"); }, timeoutMs); - this.pending = { resolve, reject, timer }; + this.pending = { resolve, reject, timer, onEvent }; }); } @@ -144,7 +151,7 @@ export async function runPiRpc( singleton?.client.dispose(); singleton = { key, client: new TauRpcClient(opts.argv, opts.cwd) }; } - return singleton.client.prompt(opts.prompt, opts.timeoutMs); + return singleton.client.prompt(opts.prompt, opts.timeoutMs, opts.onEvent); } export function resetPiRpc() {