Auto-reply: stream verbose tool results via tau rpc

This commit is contained in:
Peter Steinberger
2025-12-03 09:21:31 +00:00
parent 16e42e6d6d
commit c3792db0e5
4 changed files with 127 additions and 70 deletions

View File

@@ -35,6 +35,7 @@ type CommandReplyParams = {
enqueue?: EnqueueRunner;
thinkLevel?: ThinkLevel;
verboseLevel?: "off" | "on";
onPartialReply?: (payload: ReplyPayload) => Promise<void> | void;
};
export type CommandReplyMeta = {
@@ -143,6 +144,7 @@ export async function runCommandReply(
enqueue = enqueueCommand,
thinkLevel,
verboseLevel,
onPartialReply,
} = params;
if (!reply.command?.length) {
@@ -274,6 +276,42 @@ export async function runCommandReply(
cwd: reply.cwd,
prompt: body,
timeoutMs,
onEvent:
verboseLevel === "on" && onPartialReply
? (line: string) => {
try {
const ev = JSON.parse(line) as {
type?: string;
message?: { role?: string; content?: unknown[] };
};
if (
(ev.type === "message" || ev.type === "message_end") &&
ev.message?.role === "tool_result" &&
Array.isArray(ev.message.content)
) {
const text = (
ev.message.content as Array<{ text?: string }>
)
.map((c) => c.text)
.filter((t): t is string => !!t)
.join("\n")
.trim();
if (text) {
const { text: cleanedText, mediaUrls: mediaFound } =
splitMediaFromOutput(`🛠️ ${text}`);
void onPartialReply({
text: cleanedText,
mediaUrls: mediaFound?.length
? mediaFound
: undefined,
} as ReplyPayload);
}
}
} catch {
// ignore malformed lines
}
}
: undefined,
});
}
return await commandRunner(finalArgv, { timeoutMs, cwd: reply.cwd });
@@ -309,8 +347,10 @@ export async function runCommandReply(
type ReplyItem = { text: string; media?: string[] };
const replyItems: ReplyItem[] = [];
// When verbose is on, surface tool results first (before assistant summary) to mirror chat ordering.
if (verboseLevel === "on") {
const includeToolResultsInline =
verboseLevel === "on" && !onPartialReply && parsedToolResults.length > 0;
if (includeToolResultsInline) {
for (const tr of parsedToolResults) {
const prefixed = `🛠️ ${tr}`;
const { text: cleanedText, mediaUrls: mediaFound } =

View File

@@ -529,6 +529,7 @@ export async function getReplyFromConfig(
commandRunner,
thinkLevel: resolvedThinkLevel,
verboseLevel: resolvedVerboseLevel,
onPartialReply: opts?.onPartialReply,
});
const payloadArray = runResult.payloads ?? [];
const meta = runResult.meta;
@@ -611,6 +612,16 @@ export async function autoReplyIfConfigured(
To: message.to ?? undefined,
MessageSid: message.sid,
};
const replyFrom = message.to;
const replyTo = message.from;
if (!replyFrom || !replyTo) {
if (isVerbose())
console.error(
"Skipping auto-reply: missing to/from on inbound message",
ctx,
);
return;
}
const cfg = configOverride ?? loadConfig();
// Attach media hints for transcription/templates if present on Twilio payloads.
const mediaUrl = (message as { mediaUrl?: string }).mediaUrl;
@@ -632,10 +643,72 @@ export async function autoReplyIfConfigured(
}
}
const sendTwilio = async (body: string, media?: string) => {
let resolvedMedia = media;
if (resolvedMedia && !/^https?:\/\//i.test(resolvedMedia)) {
const hosted = await ensureMediaHosted(resolvedMedia);
resolvedMedia = hosted.url;
}
await client.messages.create({
from: replyFrom,
to: replyTo,
body,
...(resolvedMedia ? { mediaUrl: [resolvedMedia] } : {}),
});
};
const sendPayload = async (replyPayload: ReplyPayload) => {
const mediaList = replyPayload.mediaUrls?.length
? replyPayload.mediaUrls
: replyPayload.mediaUrl
? [replyPayload.mediaUrl]
: [];
const text = replyPayload.text ?? "";
const chunks = chunkText(text, TWILIO_TEXT_LIMIT);
if (chunks.length === 0) chunks.push("");
for (let i = 0; i < chunks.length; i++) {
const body = chunks[i];
const attachMedia = i === 0 ? mediaList[0] : undefined;
if (body) {
logVerbose(
`Auto-replying via Twilio: from ${replyFrom} to ${replyTo}, body length ${body.length}`,
);
} else if (attachMedia) {
logVerbose(
`Auto-replying via Twilio: from ${replyFrom} to ${replyTo} (media only)`,
);
}
await sendTwilio(body, attachMedia);
if (i === 0 && mediaList.length > 1) {
for (const extra of mediaList.slice(1)) {
await sendTwilio("", extra);
}
}
if (isVerbose()) {
console.log(
info(
`↩️ Auto-replied to ${replyTo} (sid ${message.sid ?? "no-sid"}${attachMedia ? ", media" : ""})`,
),
);
}
}
};
const partialSender = async (payload: ReplyPayload) => {
await sendPayload(payload);
};
const replyResult = await getReplyFromConfig(
ctx,
{
onReplyStart: () => sendTypingIndicator(client, runtime, message.sid),
onPartialReply: partialSender,
},
cfg,
);
@@ -646,73 +719,9 @@ export async function autoReplyIfConfigured(
: [];
if (replies.length === 0) return;
const replyFrom = message.to;
const replyTo = message.from;
if (!replyFrom || !replyTo) {
if (isVerbose())
console.error(
"Skipping auto-reply: missing to/from on inbound message",
ctx,
);
return;
}
try {
const sendTwilio = async (body: string, media?: string) => {
let resolvedMedia = media;
if (resolvedMedia && !/^https?:\/\//i.test(resolvedMedia)) {
const hosted = await ensureMediaHosted(resolvedMedia);
resolvedMedia = hosted.url;
}
await client.messages.create({
from: replyFrom,
to: replyTo,
body,
...(resolvedMedia ? { mediaUrl: [resolvedMedia] } : {}),
});
};
for (const replyPayload of replies) {
const mediaList = replyPayload.mediaUrls?.length
? replyPayload.mediaUrls
: replyPayload.mediaUrl
? [replyPayload.mediaUrl]
: [];
const text = replyPayload.text ?? "";
const chunks = chunkText(text, TWILIO_TEXT_LIMIT);
if (chunks.length === 0) chunks.push("");
for (let i = 0; i < chunks.length; i++) {
const body = chunks[i];
const attachMedia = i === 0 ? mediaList[0] : undefined;
if (body) {
logVerbose(
`Auto-replying via Twilio: from ${replyFrom} to ${replyTo}, body length ${body.length}`,
);
} else if (attachMedia) {
logVerbose(
`Auto-replying via Twilio: from ${replyFrom} to ${replyTo} (media only)`,
);
}
await sendTwilio(body, attachMedia);
if (i === 0 && mediaList.length > 1) {
for (const extra of mediaList.slice(1)) {
await sendTwilio("", extra);
}
}
if (isVerbose()) {
console.log(
info(
`↩️ Auto-replied to ${replyTo} (sid ${message.sid ?? "no-sid"}${attachMedia ? ", media" : ""})`,
),
);
}
}
await sendPayload(replyPayload);
}
} catch (err) {
const anyErr = err as {

View File

@@ -1,6 +1,7 @@
export type GetReplyOptions = {
onReplyStart?: () => Promise<void> | void;
isHeartbeat?: boolean;
onPartialReply?: (payload: ReplyPayload) => Promise<void> | void;
};
export type ReplyPayload = {

View File

@@ -7,6 +7,7 @@ type TauRpcOptions = {
argv: string[];
cwd?: string;
timeoutMs: number;
onEvent?: (line: string) => void;
};
type TauRpcResult = {
@@ -30,6 +31,7 @@ class TauRpcClient {
resolve: (r: TauRpcResult) => void;
reject: (err: unknown) => void;
timer: NodeJS.Timeout;
onEvent?: (line: string) => void;
}
| undefined;
@@ -64,6 +66,7 @@ class TauRpcClient {
private handleLine(line: string) {
if (!this.pending) return;
this.buffer.push(line);
this.pending?.onEvent?.(line);
// Streamed JSON arrives line-by-line; mark when an assistant message finishes
// and resolve after a short idle to capture any follow-up events (e.g. tools)
// that belong to the same turn.
@@ -95,7 +98,11 @@ class TauRpcClient {
}
}
async prompt(prompt: string, timeoutMs: number): Promise<TauRpcResult> {
async prompt(
prompt: string,
timeoutMs: number,
onEvent?: (line: string) => void,
): Promise<TauRpcResult> {
this.ensureChild();
if (this.pending) {
throw new Error("tau rpc already handling a request");
@@ -118,7 +125,7 @@ class TauRpcClient {
reject(new Error(`tau rpc timed out after ${timeoutMs}ms`));
child.kill("SIGKILL");
}, timeoutMs);
this.pending = { resolve, reject, timer };
this.pending = { resolve, reject, timer, onEvent };
});
}
@@ -144,7 +151,7 @@ export async function runPiRpc(
singleton?.client.dispose();
singleton = { key, client: new TauRpcClient(opts.argv, opts.cwd) };
}
return singleton.client.prompt(opts.prompt, opts.timeoutMs);
return singleton.client.prompt(opts.prompt, opts.timeoutMs, opts.onEvent);
}
export function resetPiRpc() {