auto-reply: support multi-text RPC outputs

This commit is contained in:
Peter Steinberger
2025-12-02 23:03:55 +00:00
parent 0f6157a49d
commit cfaec9d608
9 changed files with 179 additions and 126 deletions

View File

@@ -52,6 +52,10 @@
## Unreleased ## Unreleased
### Fixed
- Support multiple assistant text replies when using Tau RPC: agents now emit `texts` arrays and command auto-replies deliver each message separately without leaking raw JSON.
- Normalized agent parsers (pi/claude/opencode/codex/gemini) to the new plural output shape.
### Changes ### Changes
- **Heartbeat backpressure:** Web reply heartbeats now check the shared command queue and skip while any command/Claude runs are in flight, preventing concurrent prompts during long-running requests. - **Heartbeat backpressure:** Web reply heartbeats now check the shared command queue and skip while any command/Claude runs are in flight, preventing concurrent prompts during long-running requests.
- **Isolated session fixtures in web tests:** Heartbeat/auto-reply tests now create temporary session stores instead of using the default `~/.warelay/sessions.json`, preventing local config pollution during test runs. - **Isolated session fixtures in web tests:** Heartbeat/auto-reply tests now create temporary session stores instead of using the default `~/.warelay/sessions.json`, preventing local config pollution during test runs.

View File

@@ -69,7 +69,7 @@ export const claudeSpec: AgentSpec = {
const parsed = parseClaudeJson(rawStdout); const parsed = parseClaudeJson(rawStdout);
const text = parsed?.text ?? rawStdout.trim(); const text = parsed?.text ?? rawStdout.trim();
return { return {
text: text?.trim(), texts: text ? [text.trim()] : undefined,
meta: toMeta(parsed), meta: toMeta(parsed),
}; };
}, },

View File

@@ -4,7 +4,7 @@ import type { AgentMeta, AgentParseResult, AgentSpec } from "./types.js";
function parseCodexJson(raw: string): AgentParseResult { function parseCodexJson(raw: string): AgentParseResult {
const lines = raw.split(/\n+/).filter((l) => l.trim().startsWith("{")); const lines = raw.split(/\n+/).filter((l) => l.trim().startsWith("{"));
let text: string | undefined; const texts: string[] = [];
let meta: AgentMeta | undefined; let meta: AgentMeta | undefined;
for (const line of lines) { for (const line of lines) {
@@ -21,7 +21,7 @@ function parseCodexJson(raw: string): AgentParseResult {
ev.item?.type === "agent_message" && ev.item?.type === "agent_message" &&
typeof ev.item.text === "string" typeof ev.item.text === "string"
) { ) {
text = ev.item.text; texts.push(ev.item.text);
} }
if ( if (
ev.type === "turn.completed" && ev.type === "turn.completed" &&
@@ -50,7 +50,8 @@ function parseCodexJson(raw: string): AgentParseResult {
} }
} }
return { text: text?.trim(), meta }; const finalTexts = texts.length ? texts.map((t) => t.trim()) : undefined;
return { texts: finalTexts, meta };
} }
export const codexSpec: AgentSpec = { export const codexSpec: AgentSpec = {

View File

@@ -10,7 +10,8 @@ export const GEMINI_IDENTITY_PREFIX =
// keep parsing minimal and let MEDIA token stripping happen later in the pipeline. // keep parsing minimal and let MEDIA token stripping happen later in the pipeline.
function parseGeminiOutput(raw: string): { text?: string; meta?: AgentMeta } { function parseGeminiOutput(raw: string): { text?: string; meta?: AgentMeta } {
const trimmed = raw.trim(); const trimmed = raw.trim();
return { text: trimmed || undefined, meta: undefined }; const text = trimmed || undefined;
return { texts: text ? [text] : undefined, meta: undefined };
} }
export const geminiSpec: AgentSpec = { export const geminiSpec: AgentSpec = {

View File

@@ -55,7 +55,7 @@ export const opencodeSpec: AgentSpec = {
const parsed = parseOpencodeJson(rawStdout); const parsed = parseOpencodeJson(rawStdout);
const text = parsed.text ?? rawStdout.trim(); const text = parsed.text ?? rawStdout.trim();
return { return {
text: text?.trim(), texts: text ? [text.trim()] : undefined,
meta: toMeta(parsed), meta: toMeta(parsed),
}; };
}, },

View File

@@ -13,36 +13,50 @@ type PiAssistantMessage = {
function parsePiJson(raw: string): AgentParseResult { function parsePiJson(raw: string): AgentParseResult {
const lines = raw.split(/\n+/).filter((l) => l.trim().startsWith("{")); const lines = raw.split(/\n+/).filter((l) => l.trim().startsWith("{"));
let lastMessage: PiAssistantMessage | undefined;
// Collect every assistant message we see; Tau in RPC mode can emit multiple
// assistant payloads in one run (e.g., queued turns, heartbeats). We concatenate
// all text blocks so users see everything instead of only the last message_end.
const texts: string[] = [];
let lastAssistant: PiAssistantMessage | undefined;
for (const line of lines) { for (const line of lines) {
try { try {
const ev = JSON.parse(line) as { const ev = JSON.parse(line) as {
type?: string; type?: string;
message?: PiAssistantMessage; message?: PiAssistantMessage;
}; };
// Pi emits a stream; we only care about the terminal assistant message_end. const msg = ev.message;
if (ev.type === "message_end" && ev.message?.role === "assistant") { if (msg?.role === "assistant" && Array.isArray(msg.content)) {
lastMessage = ev.message; const msgText = msg.content
.filter((c) => c?.type === "text" && typeof c.text === "string")
.map((c) => c.text)
.join("\n")
.trim();
if (msgText) texts.push(msgText);
// keep meta from the most recent assistant message
lastAssistant = msg;
} }
} catch { } catch {
// ignore // ignore malformed lines
} }
} }
const text =
lastMessage?.content // Combine all assistant text messages (ignore tool calls/partials). This keeps
?.filter((c) => c?.type === "text" && typeof c.text === "string") // multi-message replies intact while dropping non-text events.
.map((c) => c.text) const text = texts.length ? texts.join("\n\n").trim() : undefined;
.join("\n")
?.trim() ?? undefined; const meta: AgentMeta | undefined =
const meta: AgentMeta | undefined = lastMessage text && lastAssistant
? { ? {
model: lastMessage.model, model: lastAssistant.model,
provider: lastMessage.provider, provider: lastAssistant.provider,
stopReason: lastMessage.stopReason, stopReason: lastAssistant.stopReason,
usage: lastMessage.usage, usage: lastAssistant.usage,
} }
: undefined; : undefined;
return { text, meta };
return { texts, meta };
} }
export const piSpec: AgentSpec = { export const piSpec: AgentSpec = {

View File

@@ -16,7 +16,7 @@ export type AgentMeta = {
}; };
export type AgentParseResult = { export type AgentParseResult = {
text?: string; texts?: string[];
mediaUrls?: string[]; mediaUrls?: string[];
meta?: AgentMeta; meta?: AgentMeta;
}; };

View File

@@ -44,7 +44,7 @@ export type CommandReplyMeta = {
}; };
export type CommandReplyResult = { export type CommandReplyResult = {
payload?: ReplyPayload; payloads?: ReplyPayload[];
meta: CommandReplyMeta; meta: CommandReplyMeta;
}; };
@@ -189,7 +189,7 @@ export async function runCommandReply(
systemSent, systemSent,
identityPrefix: agentCfg.identityPrefix, identityPrefix: agentCfg.identityPrefix,
format: agentCfg.format, format: agentCfg.format,
}) })
: argv; : argv;
logVerbose( logVerbose(
@@ -249,33 +249,54 @@ export async function runCommandReply(
}); });
const rawStdout = stdout.trim(); const rawStdout = stdout.trim();
let mediaFromCommand: string[] | undefined; let mediaFromCommand: string[] | undefined;
let trimmed = rawStdout; const trimmed = rawStdout;
if (stderr?.trim()) { if (stderr?.trim()) {
logVerbose(`Command auto-reply stderr: ${stderr.trim()}`); logVerbose(`Command auto-reply stderr: ${stderr.trim()}`);
} }
const parsed = trimmed ? agent.parseOutput(trimmed) : undefined; const parsed = trimmed ? agent.parseOutput(trimmed) : undefined;
// Treat empty string as "no content" so we can fall back to the friendly
// "(command produced no output)" message instead of echoing raw JSON. // Collect one message per assistant text from parseOutput (tau RPC can emit many).
if (parsed && parsed.text !== undefined) { const parsedTexts =
trimmed = parsed.text.trim(); parsed?.texts?.map((t) => t.trim()).filter(Boolean) ??
(parsed?.text ? [parsed.text.trim()] : []);
type ReplyItem = { text: string; media?: string[] };
const replyItems: ReplyItem[] = [];
for (const t of parsedTexts) {
const { text: cleanedText, mediaUrls: mediaFound } =
splitMediaFromOutput(t);
replyItems.push({
text: cleanedText,
media: mediaFound?.length ? mediaFound : undefined,
});
} }
const { text: cleanedText, mediaUrls: mediaFound } = // If parser gave nothing, fall back to raw stdout as a single message.
splitMediaFromOutput(trimmed); if (replyItems.length === 0 && trimmed) {
trimmed = cleanedText; const { text: cleanedText, mediaUrls: mediaFound } =
if (mediaFound?.length) { splitMediaFromOutput(trimmed);
mediaFromCommand = mediaFound; if (cleanedText || mediaFound?.length) {
verboseLog(`MEDIA token extracted: ${mediaFound}`); replyItems.push({
} else { text: cleanedText,
verboseLog("No MEDIA token extracted from final text"); media: mediaFound?.length ? mediaFound : undefined,
});
}
} }
if (!trimmed && !mediaFromCommand) {
// No content at all → fallback notice.
if (replyItems.length === 0) {
const meta = parsed?.meta?.extra?.summary ?? undefined; const meta = parsed?.meta?.extra?.summary ?? undefined;
trimmed = `(command produced no output${meta ? `; ${meta}` : ""})`; replyItems.push({
text: `(command produced no output${meta ? `; ${meta}` : ""})`,
});
verboseLog("No text/media produced; injecting fallback notice to user"); verboseLog("No text/media produced; injecting fallback notice to user");
} }
verboseLog(`Command auto-reply stdout (trimmed): ${trimmed || "<empty>"}`);
verboseLog(
`Command auto-reply stdout produced ${replyItems.length} message(s)`,
);
const elapsed = Date.now() - started; const elapsed = Date.now() - started;
verboseLog(`Command auto-reply finished in ${elapsed}ms`); verboseLog(`Command auto-reply finished in ${elapsed}ms`);
logger.info( logger.info(
@@ -292,7 +313,7 @@ export async function runCommandReply(
: ""; : "";
const errorText = `⚠️ Command exited with code ${code ?? "unknown"}${signal ? ` (${signal})` : ""}${partialOut}`; const errorText = `⚠️ Command exited with code ${code ?? "unknown"}${signal ? ` (${signal})` : ""}${partialOut}`;
return { return {
payload: { text: errorText }, payloads: [{ text: errorText }],
meta: { meta: {
durationMs: Date.now() - started, durationMs: Date.now() - started,
queuedMs, queuedMs,
@@ -310,7 +331,7 @@ export async function runCommandReply(
); );
const errorText = `⚠️ Command was killed before completion (exit code ${code ?? "unknown"})`; const errorText = `⚠️ Command was killed before completion (exit code ${code ?? "unknown"})`;
return { return {
payload: { text: errorText }, payloads: [{ text: errorText }],
meta: { meta: {
durationMs: Date.now() - started, durationMs: Date.now() - started,
queuedMs, queuedMs,
@@ -322,43 +343,6 @@ export async function runCommandReply(
}, },
}; };
} }
let mediaUrls =
mediaFromCommand ?? (reply.mediaUrl ? [reply.mediaUrl] : undefined);
// If mediaMaxMb is set, skip local media paths larger than the cap.
if (mediaUrls?.length && reply.mediaMaxMb) {
const maxBytes = reply.mediaMaxMb * 1024 * 1024;
const filtered: string[] = [];
for (const url of mediaUrls) {
if (/^https?:\/\//i.test(url)) {
filtered.push(url);
continue;
}
const abs = path.isAbsolute(url) ? url : path.resolve(url);
try {
const stats = await fs.stat(abs);
if (stats.size <= maxBytes) {
filtered.push(url);
} else if (isVerbose()) {
logVerbose(
`Skipping media ${url} (${(stats.size / (1024 * 1024)).toFixed(2)}MB) over cap ${reply.mediaMaxMb}MB`,
);
}
} catch {
filtered.push(url);
}
}
mediaUrls = filtered;
}
const payload =
trimmed || mediaUrls?.length
? {
text: trimmed || undefined,
mediaUrl: mediaUrls?.[0],
mediaUrls,
}
: undefined;
const meta: CommandReplyMeta = { const meta: CommandReplyMeta = {
durationMs: Date.now() - started, durationMs: Date.now() - started,
queuedMs, queuedMs,
@@ -368,8 +352,56 @@ export async function runCommandReply(
killed, killed,
agentMeta: parsed?.meta, agentMeta: parsed?.meta,
}; };
const payloads: ReplyPayload[] = [];
// Build each reply item sequentially (delivery handled by caller).
for (const item of replyItems) {
let mediaUrls =
item.media ??
mediaFromCommand ??
(reply.mediaUrl ? [reply.mediaUrl] : undefined);
// If mediaMaxMb is set, skip local media paths larger than the cap.
if (mediaUrls?.length && reply.mediaMaxMb) {
const maxBytes = reply.mediaMaxMb * 1024 * 1024;
const filtered: string[] = [];
for (const url of mediaUrls) {
if (/^https?:\/\//i.test(url)) {
filtered.push(url);
continue;
}
const abs = path.isAbsolute(url) ? url : path.resolve(url);
try {
const stats = await fs.stat(abs);
if (stats.size <= maxBytes) {
filtered.push(url);
} else if (isVerbose()) {
logVerbose(
`Skipping media ${url} (${(stats.size / (1024 * 1024)).toFixed(2)}MB) over cap ${reply.mediaMaxMb}MB`,
);
}
} catch {
filtered.push(url);
}
}
mediaUrls = filtered;
}
const payload =
item.text || mediaUrls?.length
? {
text: item.text || undefined,
mediaUrl: mediaUrls?.[0],
mediaUrls,
}
: undefined;
if (payload) payloads.push(payload);
}
verboseLog(`Command auto-reply meta: ${JSON.stringify(meta)}`); verboseLog(`Command auto-reply meta: ${JSON.stringify(meta)}`);
return { payload, meta }; return { payloads, meta };
} catch (err) { } catch (err) {
const elapsed = Date.now() - started; const elapsed = Date.now() - started;
logger.info( logger.info(
@@ -414,7 +446,7 @@ export async function runCommandReply(
const errMsg = err instanceof Error ? err.message : String(err); const errMsg = err instanceof Error ? err.message : String(err);
const errorText = `⚠️ Command failed: ${errMsg}`; const errorText = `⚠️ Command failed: ${errMsg}`;
return { return {
payload: { text: errorText }, payloads: [{ text: errorText }],
meta: { meta: {
durationMs: elapsed, durationMs: elapsed,
queuedMs, queuedMs,

View File

@@ -305,7 +305,7 @@ export async function getReplyFromConfig(
mediaUrl: reply.mediaUrl, mediaUrl: reply.mediaUrl,
}; };
cleanupTyping(); cleanupTyping();
return result; return [result];
} }
if (reply && reply.mode === "command" && reply.command?.length) { if (reply && reply.mode === "command" && reply.command?.length) {
@@ -316,7 +316,7 @@ export async function getReplyFromConfig(
mode: "command" as const, mode: "command" as const,
}; };
try { try {
const { payload, meta } = await runCommandReply({ const { payloads, meta } = await runCommandReply({
reply: commandReply, reply: commandReply,
templatingCtx, templatingCtx,
sendSystemOnce, sendSystemOnce,
@@ -355,7 +355,7 @@ export async function getReplyFromConfig(
if (meta.agentMeta && isVerbose()) { if (meta.agentMeta && isVerbose()) {
logVerbose(`Agent meta: ${JSON.stringify(meta.agentMeta)}`); logVerbose(`Agent meta: ${JSON.stringify(meta.agentMeta)}`);
} }
return payload; return payloads;
} finally { } finally {
cleanupTyping(); cleanupTyping();
} }
@@ -416,13 +416,12 @@ export async function autoReplyIfConfigured(
}, },
cfg, cfg,
); );
if ( const replies = replyResult
!replyResult || ? Array.isArray(replyResult)
(!replyResult.text && ? replyResult
!replyResult.mediaUrl && : [replyResult]
!replyResult.mediaUrls?.length) : [];
) if (replies.length === 0) return;
return;
const replyFrom = message.to; const replyFrom = message.to;
const replyTo = message.from; const replyTo = message.from;
@@ -435,23 +434,7 @@ export async function autoReplyIfConfigured(
return; return;
} }
if (replyResult.text) {
logVerbose(
`Auto-replying via Twilio: from ${replyFrom} to ${replyTo}, body length ${replyResult.text.length}`,
);
} else {
logVerbose(
`Auto-replying via Twilio: from ${replyFrom} to ${replyTo} (media)`,
);
}
try { try {
const mediaList = replyResult.mediaUrls?.length
? replyResult.mediaUrls
: replyResult.mediaUrl
? [replyResult.mediaUrl]
: [];
const sendTwilio = async (body: string, media?: string) => { const sendTwilio = async (body: string, media?: string) => {
let resolvedMedia = media; let resolvedMedia = media;
if (resolvedMedia && !/^https?:\/\//i.test(resolvedMedia)) { if (resolvedMedia && !/^https?:\/\//i.test(resolvedMedia)) {
@@ -466,21 +449,39 @@ export async function autoReplyIfConfigured(
}); });
}; };
if (mediaList.length === 0) { for (const replyPayload of replies) {
await sendTwilio(replyResult.text ?? ""); if (replyPayload.text) {
} else { logVerbose(
// First media with body (if any), then remaining as separate media-only sends. `Auto-replying via Twilio: from ${replyFrom} to ${replyTo}, body length ${replyPayload.text.length}`,
await sendTwilio(replyResult.text ?? "", mediaList[0]); );
for (const extra of mediaList.slice(1)) { } else {
await sendTwilio("", extra); logVerbose(
`Auto-replying via Twilio: from ${replyFrom} to ${replyTo} (media)`,
);
}
const mediaList = replyPayload.mediaUrls?.length
? replyPayload.mediaUrls
: replyPayload.mediaUrl
? [replyPayload.mediaUrl]
: [];
if (mediaList.length === 0) {
await sendTwilio(replyPayload.text ?? "");
} else {
await sendTwilio(replyPayload.text ?? "", mediaList[0]);
for (const extra of mediaList.slice(1)) {
await sendTwilio("", extra);
}
}
if (isVerbose()) {
console.log(
info(
`↩️ Auto-replied to ${replyTo} (sid ${message.sid ?? "no-sid"}${replyPayload.mediaUrl ? ", media" : ""})`,
),
);
} }
}
if (isVerbose()) {
console.log(
info(
`↩️ Auto-replied to ${replyTo} (sid ${message.sid ?? "no-sid"}${replyResult.mediaUrl ? ", media" : ""})`,
),
);
} }
} catch (err) { } catch (err) {
const anyErr = err as { const anyErr = err as {