From cfaec9d6088cedad84e4865ec1fe2d22529976f4 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Tue, 2 Dec 2025 23:03:55 +0000 Subject: [PATCH] auto-reply: support multi-text RPC outputs --- CHANGELOG.md | 4 + src/agents/claude.ts | 2 +- src/agents/codex.ts | 7 +- src/agents/gemini.ts | 3 +- src/agents/opencode.ts | 2 +- src/agents/pi.ts | 54 +++++++----- src/agents/types.ts | 2 +- src/auto-reply/command-reply.ts | 150 +++++++++++++++++++------------- src/auto-reply/reply.ts | 81 ++++++++--------- 9 files changed, 179 insertions(+), 126 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index cd7136a56..dce8595e0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -52,6 +52,10 @@ ## Unreleased +### Fixed +- Support multiple assistant text replies when using Tau RPC: agents now emit `texts` arrays and command auto-replies deliver each message separately without leaking raw JSON. +- Normalized agent parsers (pi/claude/opencode/codex/gemini) to the new plural output shape. + ### Changes - **Heartbeat backpressure:** Web reply heartbeats now check the shared command queue and skip while any command/Claude runs are in flight, preventing concurrent prompts during long-running requests. - **Isolated session fixtures in web tests:** Heartbeat/auto-reply tests now create temporary session stores instead of using the default `~/.warelay/sessions.json`, preventing local config pollution during test runs. diff --git a/src/agents/claude.ts b/src/agents/claude.ts index e656acb5c..22f8e7ed6 100644 --- a/src/agents/claude.ts +++ b/src/agents/claude.ts @@ -69,7 +69,7 @@ export const claudeSpec: AgentSpec = { const parsed = parseClaudeJson(rawStdout); const text = parsed?.text ?? rawStdout.trim(); return { - text: text?.trim(), + texts: text ? [text.trim()] : undefined, meta: toMeta(parsed), }; }, diff --git a/src/agents/codex.ts b/src/agents/codex.ts index da1cd29a2..94b17dd11 100644 --- a/src/agents/codex.ts +++ b/src/agents/codex.ts @@ -4,7 +4,7 @@ import type { AgentMeta, AgentParseResult, AgentSpec } from "./types.js"; function parseCodexJson(raw: string): AgentParseResult { const lines = raw.split(/\n+/).filter((l) => l.trim().startsWith("{")); - let text: string | undefined; + const texts: string[] = []; let meta: AgentMeta | undefined; for (const line of lines) { @@ -21,7 +21,7 @@ function parseCodexJson(raw: string): AgentParseResult { ev.item?.type === "agent_message" && typeof ev.item.text === "string" ) { - text = ev.item.text; + texts.push(ev.item.text); } if ( ev.type === "turn.completed" && @@ -50,7 +50,8 @@ function parseCodexJson(raw: string): AgentParseResult { } } - return { text: text?.trim(), meta }; + const finalTexts = texts.length ? texts.map((t) => t.trim()) : undefined; + return { texts: finalTexts, meta }; } export const codexSpec: AgentSpec = { diff --git a/src/agents/gemini.ts b/src/agents/gemini.ts index 91e2bf50a..c95285ebd 100644 --- a/src/agents/gemini.ts +++ b/src/agents/gemini.ts @@ -10,7 +10,8 @@ export const GEMINI_IDENTITY_PREFIX = // keep parsing minimal and let MEDIA token stripping happen later in the pipeline. function parseGeminiOutput(raw: string): { text?: string; meta?: AgentMeta } { const trimmed = raw.trim(); - return { text: trimmed || undefined, meta: undefined }; + const text = trimmed || undefined; + return { texts: text ? [text] : undefined, meta: undefined }; } export const geminiSpec: AgentSpec = { diff --git a/src/agents/opencode.ts b/src/agents/opencode.ts index c458d94c1..836b7c4e0 100644 --- a/src/agents/opencode.ts +++ b/src/agents/opencode.ts @@ -55,7 +55,7 @@ export const opencodeSpec: AgentSpec = { const parsed = parseOpencodeJson(rawStdout); const text = parsed.text ?? rawStdout.trim(); return { - text: text?.trim(), + texts: text ? [text.trim()] : undefined, meta: toMeta(parsed), }; }, diff --git a/src/agents/pi.ts b/src/agents/pi.ts index 948afd837..f1e632a77 100644 --- a/src/agents/pi.ts +++ b/src/agents/pi.ts @@ -13,36 +13,50 @@ type PiAssistantMessage = { function parsePiJson(raw: string): AgentParseResult { const lines = raw.split(/\n+/).filter((l) => l.trim().startsWith("{")); - let lastMessage: PiAssistantMessage | undefined; + + // Collect every assistant message we see; Tau in RPC mode can emit multiple + // assistant payloads in one run (e.g., queued turns, heartbeats). We concatenate + // all text blocks so users see everything instead of only the last message_end. + const texts: string[] = []; + let lastAssistant: PiAssistantMessage | undefined; + for (const line of lines) { try { const ev = JSON.parse(line) as { type?: string; message?: PiAssistantMessage; }; - // Pi emits a stream; we only care about the terminal assistant message_end. - if (ev.type === "message_end" && ev.message?.role === "assistant") { - lastMessage = ev.message; + const msg = ev.message; + if (msg?.role === "assistant" && Array.isArray(msg.content)) { + const msgText = msg.content + .filter((c) => c?.type === "text" && typeof c.text === "string") + .map((c) => c.text) + .join("\n") + .trim(); + if (msgText) texts.push(msgText); + // keep meta from the most recent assistant message + lastAssistant = msg; } } catch { - // ignore + // ignore malformed lines } } - const text = - lastMessage?.content - ?.filter((c) => c?.type === "text" && typeof c.text === "string") - .map((c) => c.text) - .join("\n") - ?.trim() ?? undefined; - const meta: AgentMeta | undefined = lastMessage - ? { - model: lastMessage.model, - provider: lastMessage.provider, - stopReason: lastMessage.stopReason, - usage: lastMessage.usage, - } - : undefined; - return { text, meta }; + + // Combine all assistant text messages (ignore tool calls/partials). This keeps + // multi-message replies intact while dropping non-text events. + const text = texts.length ? texts.join("\n\n").trim() : undefined; + + const meta: AgentMeta | undefined = + text && lastAssistant + ? { + model: lastAssistant.model, + provider: lastAssistant.provider, + stopReason: lastAssistant.stopReason, + usage: lastAssistant.usage, + } + : undefined; + + return { texts, meta }; } export const piSpec: AgentSpec = { diff --git a/src/agents/types.ts b/src/agents/types.ts index e704cb029..5fccd7053 100644 --- a/src/agents/types.ts +++ b/src/agents/types.ts @@ -16,7 +16,7 @@ export type AgentMeta = { }; export type AgentParseResult = { - text?: string; + texts?: string[]; mediaUrls?: string[]; meta?: AgentMeta; }; diff --git a/src/auto-reply/command-reply.ts b/src/auto-reply/command-reply.ts index 3a49275be..cf914ed6a 100644 --- a/src/auto-reply/command-reply.ts +++ b/src/auto-reply/command-reply.ts @@ -44,7 +44,7 @@ export type CommandReplyMeta = { }; export type CommandReplyResult = { - payload?: ReplyPayload; + payloads?: ReplyPayload[]; meta: CommandReplyMeta; }; @@ -189,7 +189,7 @@ export async function runCommandReply( systemSent, identityPrefix: agentCfg.identityPrefix, format: agentCfg.format, - }) + }) : argv; logVerbose( @@ -249,33 +249,54 @@ export async function runCommandReply( }); const rawStdout = stdout.trim(); let mediaFromCommand: string[] | undefined; - let trimmed = rawStdout; + const trimmed = rawStdout; if (stderr?.trim()) { logVerbose(`Command auto-reply stderr: ${stderr.trim()}`); } const parsed = trimmed ? agent.parseOutput(trimmed) : undefined; - // Treat empty string as "no content" so we can fall back to the friendly - // "(command produced no output)" message instead of echoing raw JSON. - if (parsed && parsed.text !== undefined) { - trimmed = parsed.text.trim(); + + // Collect one message per assistant text from parseOutput (tau RPC can emit many). + const parsedTexts = + parsed?.texts?.map((t) => t.trim()).filter(Boolean) ?? + (parsed?.text ? [parsed.text.trim()] : []); + + type ReplyItem = { text: string; media?: string[] }; + const replyItems: ReplyItem[] = []; + + for (const t of parsedTexts) { + const { text: cleanedText, mediaUrls: mediaFound } = + splitMediaFromOutput(t); + replyItems.push({ + text: cleanedText, + media: mediaFound?.length ? mediaFound : undefined, + }); } - const { text: cleanedText, mediaUrls: mediaFound } = - splitMediaFromOutput(trimmed); - trimmed = cleanedText; - if (mediaFound?.length) { - mediaFromCommand = mediaFound; - verboseLog(`MEDIA token extracted: ${mediaFound}`); - } else { - verboseLog("No MEDIA token extracted from final text"); + // If parser gave nothing, fall back to raw stdout as a single message. + if (replyItems.length === 0 && trimmed) { + const { text: cleanedText, mediaUrls: mediaFound } = + splitMediaFromOutput(trimmed); + if (cleanedText || mediaFound?.length) { + replyItems.push({ + text: cleanedText, + media: mediaFound?.length ? mediaFound : undefined, + }); + } } - if (!trimmed && !mediaFromCommand) { + + // No content at all → fallback notice. + if (replyItems.length === 0) { const meta = parsed?.meta?.extra?.summary ?? undefined; - trimmed = `(command produced no output${meta ? `; ${meta}` : ""})`; + replyItems.push({ + text: `(command produced no output${meta ? `; ${meta}` : ""})`, + }); verboseLog("No text/media produced; injecting fallback notice to user"); } - verboseLog(`Command auto-reply stdout (trimmed): ${trimmed || ""}`); + + verboseLog( + `Command auto-reply stdout produced ${replyItems.length} message(s)`, + ); const elapsed = Date.now() - started; verboseLog(`Command auto-reply finished in ${elapsed}ms`); logger.info( @@ -292,7 +313,7 @@ export async function runCommandReply( : ""; const errorText = `⚠️ Command exited with code ${code ?? "unknown"}${signal ? ` (${signal})` : ""}${partialOut}`; return { - payload: { text: errorText }, + payloads: [{ text: errorText }], meta: { durationMs: Date.now() - started, queuedMs, @@ -310,7 +331,7 @@ export async function runCommandReply( ); const errorText = `⚠️ Command was killed before completion (exit code ${code ?? "unknown"})`; return { - payload: { text: errorText }, + payloads: [{ text: errorText }], meta: { durationMs: Date.now() - started, queuedMs, @@ -322,43 +343,6 @@ export async function runCommandReply( }, }; } - let mediaUrls = - mediaFromCommand ?? (reply.mediaUrl ? [reply.mediaUrl] : undefined); - - // If mediaMaxMb is set, skip local media paths larger than the cap. - if (mediaUrls?.length && reply.mediaMaxMb) { - const maxBytes = reply.mediaMaxMb * 1024 * 1024; - const filtered: string[] = []; - for (const url of mediaUrls) { - if (/^https?:\/\//i.test(url)) { - filtered.push(url); - continue; - } - const abs = path.isAbsolute(url) ? url : path.resolve(url); - try { - const stats = await fs.stat(abs); - if (stats.size <= maxBytes) { - filtered.push(url); - } else if (isVerbose()) { - logVerbose( - `Skipping media ${url} (${(stats.size / (1024 * 1024)).toFixed(2)}MB) over cap ${reply.mediaMaxMb}MB`, - ); - } - } catch { - filtered.push(url); - } - } - mediaUrls = filtered; - } - - const payload = - trimmed || mediaUrls?.length - ? { - text: trimmed || undefined, - mediaUrl: mediaUrls?.[0], - mediaUrls, - } - : undefined; const meta: CommandReplyMeta = { durationMs: Date.now() - started, queuedMs, @@ -368,8 +352,56 @@ export async function runCommandReply( killed, agentMeta: parsed?.meta, }; + + const payloads: ReplyPayload[] = []; + + // Build each reply item sequentially (delivery handled by caller). + for (const item of replyItems) { + let mediaUrls = + item.media ?? + mediaFromCommand ?? + (reply.mediaUrl ? [reply.mediaUrl] : undefined); + + // If mediaMaxMb is set, skip local media paths larger than the cap. + if (mediaUrls?.length && reply.mediaMaxMb) { + const maxBytes = reply.mediaMaxMb * 1024 * 1024; + const filtered: string[] = []; + for (const url of mediaUrls) { + if (/^https?:\/\//i.test(url)) { + filtered.push(url); + continue; + } + const abs = path.isAbsolute(url) ? url : path.resolve(url); + try { + const stats = await fs.stat(abs); + if (stats.size <= maxBytes) { + filtered.push(url); + } else if (isVerbose()) { + logVerbose( + `Skipping media ${url} (${(stats.size / (1024 * 1024)).toFixed(2)}MB) over cap ${reply.mediaMaxMb}MB`, + ); + } + } catch { + filtered.push(url); + } + } + mediaUrls = filtered; + } + + const payload = + item.text || mediaUrls?.length + ? { + text: item.text || undefined, + mediaUrl: mediaUrls?.[0], + mediaUrls, + } + : undefined; + + if (payload) payloads.push(payload); + } + verboseLog(`Command auto-reply meta: ${JSON.stringify(meta)}`); - return { payload, meta }; + return { payloads, meta }; } catch (err) { const elapsed = Date.now() - started; logger.info( @@ -414,7 +446,7 @@ export async function runCommandReply( const errMsg = err instanceof Error ? err.message : String(err); const errorText = `⚠️ Command failed: ${errMsg}`; return { - payload: { text: errorText }, + payloads: [{ text: errorText }], meta: { durationMs: elapsed, queuedMs, diff --git a/src/auto-reply/reply.ts b/src/auto-reply/reply.ts index d1d930885..06dfa2728 100644 --- a/src/auto-reply/reply.ts +++ b/src/auto-reply/reply.ts @@ -305,7 +305,7 @@ export async function getReplyFromConfig( mediaUrl: reply.mediaUrl, }; cleanupTyping(); - return result; + return [result]; } if (reply && reply.mode === "command" && reply.command?.length) { @@ -316,7 +316,7 @@ export async function getReplyFromConfig( mode: "command" as const, }; try { - const { payload, meta } = await runCommandReply({ + const { payloads, meta } = await runCommandReply({ reply: commandReply, templatingCtx, sendSystemOnce, @@ -355,7 +355,7 @@ export async function getReplyFromConfig( if (meta.agentMeta && isVerbose()) { logVerbose(`Agent meta: ${JSON.stringify(meta.agentMeta)}`); } - return payload; + return payloads; } finally { cleanupTyping(); } @@ -416,13 +416,12 @@ export async function autoReplyIfConfigured( }, cfg, ); - if ( - !replyResult || - (!replyResult.text && - !replyResult.mediaUrl && - !replyResult.mediaUrls?.length) - ) - return; + const replies = replyResult + ? Array.isArray(replyResult) + ? replyResult + : [replyResult] + : []; + if (replies.length === 0) return; const replyFrom = message.to; const replyTo = message.from; @@ -435,23 +434,7 @@ export async function autoReplyIfConfigured( return; } - if (replyResult.text) { - logVerbose( - `Auto-replying via Twilio: from ${replyFrom} to ${replyTo}, body length ${replyResult.text.length}`, - ); - } else { - logVerbose( - `Auto-replying via Twilio: from ${replyFrom} to ${replyTo} (media)`, - ); - } - try { - const mediaList = replyResult.mediaUrls?.length - ? replyResult.mediaUrls - : replyResult.mediaUrl - ? [replyResult.mediaUrl] - : []; - const sendTwilio = async (body: string, media?: string) => { let resolvedMedia = media; if (resolvedMedia && !/^https?:\/\//i.test(resolvedMedia)) { @@ -466,21 +449,39 @@ export async function autoReplyIfConfigured( }); }; - if (mediaList.length === 0) { - await sendTwilio(replyResult.text ?? ""); - } else { - // First media with body (if any), then remaining as separate media-only sends. - await sendTwilio(replyResult.text ?? "", mediaList[0]); - for (const extra of mediaList.slice(1)) { - await sendTwilio("", extra); + for (const replyPayload of replies) { + if (replyPayload.text) { + logVerbose( + `Auto-replying via Twilio: from ${replyFrom} to ${replyTo}, body length ${replyPayload.text.length}`, + ); + } else { + logVerbose( + `Auto-replying via Twilio: from ${replyFrom} to ${replyTo} (media)`, + ); + } + + const mediaList = replyPayload.mediaUrls?.length + ? replyPayload.mediaUrls + : replyPayload.mediaUrl + ? [replyPayload.mediaUrl] + : []; + + if (mediaList.length === 0) { + await sendTwilio(replyPayload.text ?? ""); + } else { + await sendTwilio(replyPayload.text ?? "", mediaList[0]); + for (const extra of mediaList.slice(1)) { + await sendTwilio("", extra); + } + } + + if (isVerbose()) { + console.log( + info( + `↩️ Auto-replied to ${replyTo} (sid ${message.sid ?? "no-sid"}${replyPayload.mediaUrl ? ", media" : ""})`, + ), + ); } - } - if (isVerbose()) { - console.log( - info( - `↩️ Auto-replied to ${replyTo} (sid ${message.sid ?? "no-sid"}${replyResult.mediaUrl ? ", media" : ""})`, - ), - ); } } catch (err) { const anyErr = err as {