feat: stream turn completions and tighten rpc timeout

This commit is contained in:
Peter Steinberger
2025-12-05 21:13:17 +00:00
parent 29dfe89137
commit 5492845659
5 changed files with 412 additions and 176 deletions

View File

@@ -173,6 +173,105 @@ describe("runCommandReply (pi)", () => {
expect(meta.killed).toBe(true); expect(meta.killed).toBe(true);
}); });
it("collapses rpc deltas instead of emitting raw JSON spam", async () => {
mockPiRpc({
stdout: [
'{"type":"message_update","assistantMessageEvent":{"type":"text_delta","delta":"Hello"}}',
'{"type":"message_update","assistantMessageEvent":{"type":"text_delta","delta":" world"}}',
].join("\n"),
stderr: "",
code: 0,
});
const { payloads } = await runCommandReply({
reply: {
mode: "command",
command: ["pi", "{{Body}}"],
agent: { kind: "pi" },
},
templatingCtx: noopTemplateCtx,
sendSystemOnce: false,
isNewSession: true,
isFirstTurnInSession: true,
systemSent: false,
timeoutMs: 1000,
timeoutSeconds: 1,
commandRunner: vi.fn(),
enqueue: enqueueImmediate,
});
expect(payloads?.[0]?.text).toBe("Hello world");
});
it("falls back to assistant text when parseOutput yields nothing", async () => {
mockPiRpc({
stdout: [
'{"type":"agent_start"}',
'{"type":"turn_start"}',
'{"type":"message_end","message":{"role":"assistant","content":[{"type":"text","text":"Acknowledged."}]}}',
].join("\n"),
stderr: "",
code: 0,
});
// Force parser to return nothing so we exercise fallback.
const parseSpy = vi
.spyOn((await import("../agents/pi.js")).piSpec, "parseOutput")
.mockReturnValue({ texts: [], toolResults: [], meta: undefined });
const { payloads } = await runCommandReply({
reply: {
mode: "command",
command: ["pi", "{{Body}}"],
agent: { kind: "pi" },
},
templatingCtx: noopTemplateCtx,
sendSystemOnce: false,
isNewSession: true,
isFirstTurnInSession: true,
systemSent: false,
timeoutMs: 1000,
timeoutSeconds: 1,
commandRunner: vi.fn(),
enqueue: enqueueImmediate,
});
parseSpy.mockRestore();
expect(payloads?.[0]?.text).toBe("Acknowledged.");
});
it("does not stream tool results when verbose is off", async () => {
const onPartial = vi.fn();
mockPiRpc({
stdout: [
'{"type":"tool_execution_start","toolName":"bash","args":{"command":"ls"}}',
'{"type":"message_end","message":{"role":"assistant","content":[{"type":"text","text":"done"}]}}',
].join("\n"),
stderr: "",
code: 0,
});
await runCommandReply({
reply: {
mode: "command",
command: ["pi", "{{Body}}"],
agent: { kind: "pi" },
},
templatingCtx: noopTemplateCtx,
sendSystemOnce: false,
isNewSession: true,
isFirstTurnInSession: true,
systemSent: false,
timeoutMs: 1000,
timeoutSeconds: 1,
commandRunner: vi.fn(),
enqueue: enqueueImmediate,
onPartialReply: onPartial,
verboseLevel: "off",
});
expect(onPartial).not.toHaveBeenCalled();
});
it("parses MEDIA tokens and respects mediaMaxMb for local files", async () => { it("parses MEDIA tokens and respects mediaMaxMb for local files", async () => {
const tmp = path.join(os.tmpdir(), `warelay-test-${Date.now()}.bin`); const tmp = path.join(os.tmpdir(), `warelay-test-${Date.now()}.bin`);
const bigBuffer = Buffer.alloc(2 * 1024 * 1024, 1); const bigBuffer = Buffer.alloc(2 * 1024 * 1024, 1);

View File

@@ -2,7 +2,7 @@ import fs from "node:fs/promises";
import os from "node:os"; import os from "node:os";
import path from "node:path"; import path from "node:path";
import { type AgentKind, getAgentSpec } from "../agents/index.js"; import { piSpec } from "../agents/pi.js";
import type { AgentMeta, AgentToolResult } from "../agents/types.js"; import type { AgentMeta, AgentToolResult } from "../agents/types.js";
import type { WarelayConfig } from "../config/config.js"; import type { WarelayConfig } from "../config/config.js";
import { isVerbose, logVerbose } from "../globals.js"; import { isVerbose, logVerbose } from "../globals.js";
@@ -33,6 +33,9 @@ function stripRpcNoise(raw: string): string {
const msg = evt?.message ?? evt?.assistantMessageEvent; const msg = evt?.message ?? evt?.assistantMessageEvent;
const msgType = msg?.type; const msgType = msg?.type;
// RPC streaming emits one message_update per delta; skip them to avoid flooding fallbacks.
if (type === "message_update") continue;
// Ignore toolcall delta chatter and input buffer append events. // Ignore toolcall delta chatter and input buffer append events.
if (type === "message_update" && msgType === "toolcall_delta") continue; if (type === "message_update" && msgType === "toolcall_delta") continue;
if (type === "input_audio_buffer.append") continue; if (type === "input_audio_buffer.append") continue;
@@ -52,6 +55,66 @@ function stripRpcNoise(raw: string): string {
return kept.join("\n"); return kept.join("\n");
} }
function extractRpcAssistantText(raw: string): string | undefined {
if (!raw.trim()) return undefined;
let deltaBuffer = "";
let lastAssistant: string | undefined;
for (const line of raw.split(/\n+/)) {
try {
const evt = JSON.parse(line) as {
type?: string;
message?: { role?: string; content?: Array<{ type?: string; text?: string }> };
assistantMessageEvent?: { type?: string; delta?: string; content?: string };
};
if (
evt.type === "message_end" &&
evt.message?.role === "assistant" &&
Array.isArray(evt.message.content)
) {
const text = evt.message.content
.filter((c) => c?.type === "text" && typeof c.text === "string")
.map((c) => c.text as string)
.join("\n")
.trim();
if (text) {
lastAssistant = text;
deltaBuffer = "";
}
}
if (evt.type === "message_update" && evt.assistantMessageEvent) {
const evtType = evt.assistantMessageEvent.type;
if (
evtType === "text_delta" ||
evtType === "text_end" ||
evtType === "text_start"
) {
const chunk =
typeof evt.assistantMessageEvent.delta === "string"
? evt.assistantMessageEvent.delta
: typeof evt.assistantMessageEvent.content === "string"
? evt.assistantMessageEvent.content
: "";
if (chunk) {
deltaBuffer += chunk;
lastAssistant = deltaBuffer;
}
}
}
} catch {
// ignore malformed/non-JSON lines
}
}
return lastAssistant?.trim() || undefined;
}
function extractAssistantTextLoosely(raw: string): string | undefined {
// Fallback: grab the last "text":"..." occurrence from a JSON-ish blob.
const matches = [...raw.matchAll(/"text"\s*:\s*"([^"]+?)"/g)];
if (!matches.length) return undefined;
const last = matches.at(-1)?.[1];
return last ? last.replace(/\\n/g, "\n").trim() : undefined;
}
type CommandReplyConfig = NonNullable<WarelayConfig["inbound"]>["reply"] & { type CommandReplyConfig = NonNullable<WarelayConfig["inbound"]>["reply"] & {
mode: "command"; mode: "command";
}; };
@@ -263,28 +326,13 @@ export async function runCommandReply(
throw new Error("reply.command is required for mode=command"); throw new Error("reply.command is required for mode=command");
} }
const agentCfg = reply.agent ?? { kind: "pi" }; const agentCfg = reply.agent ?? { kind: "pi" };
const agentKind: AgentKind = agentCfg.kind ?? "pi"; const agent = piSpec;
const agent = getAgentSpec(agentKind); const agentKind = "pi";
const rawCommand = reply.command; const rawCommand = reply.command;
const hasBodyTemplate = rawCommand.some((part) => const hasBodyTemplate = rawCommand.some((part) =>
/\{\{Body(Stripped)?\}\}/.test(part), /\{\{Body(Stripped)?\}\}/.test(part),
); );
let argv = rawCommand.map((part) => applyTemplate(part, templatingCtx)); let argv = rawCommand.map((part) => applyTemplate(part, templatingCtx));
// Pi is the only supported agent; treat commands as Pi when the binary path looks like pi/tau or the path contains pi.
const isAgentInvocation =
agentKind === "pi" &&
(agent.isInvocation(argv) ||
argv.some((part) => {
if (typeof part !== "string") return false;
const lower = part.toLowerCase();
const base = path.basename(part).toLowerCase();
return (
base === "pi" ||
base === "tau" ||
lower.includes("pi-coding-agent") ||
lower.includes("/pi/")
);
}));
const templatePrefix = const templatePrefix =
reply.template && (!sendSystemOnce || isFirstTurnInSession || !systemSent) reply.template && (!sendSystemOnce || isFirstTurnInSession || !systemSent)
? applyTemplate(reply.template, templatingCtx) ? applyTemplate(reply.template, templatingCtx)
@@ -349,12 +397,7 @@ export async function runCommandReply(
// Tau (pi agent) needs --continue to reload prior messages when resuming. // Tau (pi agent) needs --continue to reload prior messages when resuming.
// Without it, pi starts from a blank state even though we pass the session file path. // Without it, pi starts from a blank state even though we pass the session file path.
if ( if (!isNewSession && !sessionArgList.includes("--continue")) {
agentKind === "pi" &&
isAgentInvocation &&
!isNewSession &&
!sessionArgList.includes("--continue")
) {
sessionArgList.push("--continue"); sessionArgList.push("--continue");
} }
@@ -372,9 +415,7 @@ export async function runCommandReply(
argv = [...argv, ...sessionArgList]; argv = [...argv, ...sessionArgList];
} }
const shouldApplyAgent = isAgentInvocation; if (thinkLevel && thinkLevel !== "off") {
if (shouldApplyAgent && thinkLevel && thinkLevel !== "off") {
const hasThinkingFlag = argv.some( const hasThinkingFlag = argv.some(
(p, i) => (p, i) =>
p === "--thinking" || p === "--thinking" ||
@@ -386,18 +427,16 @@ export async function runCommandReply(
bodyIndex += 2; bodyIndex += 2;
} }
} }
const builtArgv = shouldApplyAgent const builtArgv = agent.buildArgs({
? agent.buildArgs({ argv,
argv, bodyIndex,
bodyIndex, isNewSession,
isNewSession, sessionId: templatingCtx.SessionId,
sessionId: templatingCtx.SessionId, sendSystemOnce,
sendSystemOnce, systemSent,
systemSent, identityPrefix: agentCfg.identityPrefix,
identityPrefix: agentCfg.identityPrefix, format: agentCfg.format,
format: agentCfg.format, });
})
: argv;
const promptIndex = builtArgv.findIndex( const promptIndex = builtArgv.findIndex(
(arg) => typeof arg === "string" && arg.includes(bodyMarker), (arg) => typeof arg === "string" && arg.includes(bodyMarker),
@@ -412,24 +451,22 @@ export async function runCommandReply(
return typeof arg === "string" ? arg.replace(bodyMarker, "") : arg; return typeof arg === "string" ? arg.replace(bodyMarker, "") : arg;
}); });
// For pi/tau agents: drive the agent via RPC stdin so auto-compaction and streaming run server-side. // Drive pi via RPC stdin so auto-compaction and streaming run server-side.
let rpcInput: string | undefined; let rpcInput: string | undefined;
let rpcArgv = finalArgv; let rpcArgv = finalArgv;
if (agentKind === "pi") { rpcInput = `${JSON.stringify({ type: "prompt", message: promptArg })}\n`;
rpcInput = `${JSON.stringify({ type: "prompt", message: promptArg })}\n`; const bodyIdx =
const bodyIdx = promptIndex >= 0 ? promptIndex : Math.max(finalArgv.length - 1, 0);
promptIndex >= 0 ? promptIndex : Math.max(finalArgv.length - 1, 0); rpcArgv = finalArgv.filter((_, idx) => idx !== bodyIdx);
rpcArgv = finalArgv.filter((_, idx) => idx !== bodyIdx); const modeIdx = rpcArgv.indexOf("--mode");
const modeIdx = rpcArgv.indexOf("--mode"); if (modeIdx >= 0 && rpcArgv[modeIdx + 1]) {
if (modeIdx >= 0 && rpcArgv[modeIdx + 1]) { rpcArgv[modeIdx + 1] = "rpc";
rpcArgv[modeIdx + 1] = "rpc"; } else {
} else { rpcArgv.push("--mode", "rpc");
rpcArgv.push("--mode", "rpc");
}
} }
logVerbose( logVerbose(
`Running command auto-reply: ${(agentKind === "pi" ? rpcArgv : finalArgv).join(" ")}${reply.cwd ? ` (cwd: ${reply.cwd})` : ""}`, `Running command auto-reply: ${rpcArgv.join(" ")}${reply.cwd ? ` (cwd: ${reply.cwd})` : ""}`,
); );
logger.info( logger.info(
{ {
@@ -437,7 +474,7 @@ export async function runCommandReply(
sessionId: templatingCtx.SessionId, sessionId: templatingCtx.SessionId,
newSession: isNewSession, newSession: isNewSession,
cwd: reply.cwd, cwd: reply.cwd,
command: (agentKind === "pi" ? rpcArgv : finalArgv).slice(0, -1), // omit body to reduce noise command: rpcArgv.slice(0, -1), // omit body to reduce noise
}, },
"command auto-reply start", "command auto-reply start",
); );
@@ -449,9 +486,11 @@ export async function runCommandReply(
let pendingToolName: string | undefined; let pendingToolName: string | undefined;
let pendingMetas: string[] = []; let pendingMetas: string[] = [];
let pendingTimer: NodeJS.Timeout | null = null; let pendingTimer: NodeJS.Timeout | null = null;
let streamedAny = false;
const enableToolStreaming = verboseLevel === "on";
const toolMetaById = new Map<string, string | undefined>(); const toolMetaById = new Map<string, string | undefined>();
const flushPendingTool = () => { const flushPendingTool = () => {
if (!onPartialReply) return; if (!onPartialReply || !enableToolStreaming) return;
if (!pendingToolName && pendingMetas.length === 0) return; if (!pendingToolName && pendingMetas.length === 0) return;
const text = formatToolAggregate(pendingToolName, pendingMetas); const text = formatToolAggregate(pendingToolName, pendingMetas);
const { text: cleanedText, mediaUrls: mediaFound } = const { text: cleanedText, mediaUrls: mediaFound } =
@@ -460,6 +499,7 @@ export async function runCommandReply(
text: cleanedText, text: cleanedText,
mediaUrls: mediaFound?.length ? mediaFound : undefined, mediaUrls: mediaFound?.length ? mediaFound : undefined,
} as ReplyPayload); } as ReplyPayload);
streamedAny = true;
pendingToolName = undefined; pendingToolName = undefined;
pendingMetas = []; pendingMetas = [];
if (pendingTimer) { if (pendingTimer) {
@@ -468,7 +508,7 @@ export async function runCommandReply(
} }
}; };
let lastStreamedAssistant: string | undefined; let lastStreamedAssistant: string | undefined;
const streamAssistant = (msg?: { role?: string; content?: unknown[] }) => { const streamAssistantFinal = (msg?: { role?: string; content?: unknown[] }) => {
if (!onPartialReply || msg?.role !== "assistant") return; if (!onPartialReply || msg?.role !== "assistant") return;
const textBlocks = Array.isArray(msg.content) const textBlocks = Array.isArray(msg.content)
? (msg.content as Array<{ type?: string; text?: string }>) ? (msg.content as Array<{ type?: string; text?: string }>)
@@ -486,96 +526,62 @@ export async function runCommandReply(
text: cleanedText, text: cleanedText,
mediaUrls: mediaFound?.length ? mediaFound : undefined, mediaUrls: mediaFound?.length ? mediaFound : undefined,
} as ReplyPayload); } as ReplyPayload);
streamedAny = true;
}; };
const run = async () => { const run = async () => {
// Prefer long-lived tau RPC for pi agent to avoid cold starts. const rpcPromptIndex =
if (agentKind === "pi" && shouldApplyAgent) { promptIndex >= 0 ? promptIndex : finalArgv.length - 1;
const rpcPromptIndex = const body = promptArg ?? "";
promptIndex >= 0 ? promptIndex : finalArgv.length - 1; // Build rpc args without the prompt body; force --mode rpc.
const body = promptArg ?? ""; const rpcArgvForRun = (() => {
// Build rpc args without the prompt body; force --mode rpc. const copy = [...finalArgv];
const rpcArgv = (() => { copy.splice(rpcPromptIndex, 1);
const copy = [...finalArgv]; const modeIdx = copy.indexOf("--mode");
copy.splice(rpcPromptIndex, 1); if (modeIdx >= 0 && copy[modeIdx + 1]) {
const modeIdx = copy.indexOf("--mode"); copy.splice(modeIdx, 2, "--mode", "rpc");
if (modeIdx >= 0 && copy[modeIdx + 1]) { } else if (!copy.includes("--mode")) {
copy.splice(modeIdx, 2, "--mode", "rpc"); copy.splice(copy.length - 1, 0, "--mode", "rpc");
} else if (!copy.includes("--mode")) { }
copy.splice(copy.length - 1, 0, "--mode", "rpc"); return copy;
} })();
return copy; const rpcResult = await runPiRpc({
})(); argv: rpcArgvForRun,
const rpcResult = await runPiRpc({ cwd: reply.cwd,
argv: rpcArgv, prompt: body,
cwd: reply.cwd, timeoutMs,
prompt: body, onEvent: onPartialReply
timeoutMs, ? (line: string) => {
onEvent: onPartialReply try {
? (line: string) => { const ev = JSON.parse(line) as {
try { type?: string;
const ev = JSON.parse(line) as { message?: {
type?: string; role?: string;
message?: { content?: unknown[];
role?: string; details?: Record<string, unknown>;
content?: unknown[]; arguments?: Record<string, unknown>;
details?: Record<string, unknown>;
arguments?: Record<string, unknown>;
toolCallId?: string;
tool_call_id?: string;
toolName?: string;
name?: string;
};
toolCallId?: string; toolCallId?: string;
tool_call_id?: string;
toolName?: string; toolName?: string;
args?: Record<string, unknown>; name?: string;
}; };
// Capture metadata as soon as the tool starts (from args). toolCallId?: string;
if (ev.type === "tool_execution_start") { toolName?: string;
const toolName = ev.toolName; args?: Record<string, unknown>;
const meta = inferToolMeta({ };
toolName, if (!enableToolStreaming) return;
name: ev.toolName, // Capture metadata as soon as the tool starts (from args).
arguments: ev.args, if (ev.type === "tool_execution_start") {
}); const toolName = ev.toolName;
if (ev.toolCallId) { const meta = inferToolMeta({
toolMetaById.set(ev.toolCallId, meta); toolName,
} name: ev.toolName,
if (meta) { arguments: ev.args,
if ( });
pendingToolName && if (ev.toolCallId) {
toolName && toolMetaById.set(ev.toolCallId, meta);
toolName !== pendingToolName
) {
flushPendingTool();
}
if (!pendingToolName) pendingToolName = toolName;
pendingMetas.push(meta);
if (
TOOL_RESULT_FLUSH_COUNT > 0 &&
pendingMetas.length >= TOOL_RESULT_FLUSH_COUNT
) {
flushPendingTool();
} else {
if (pendingTimer) clearTimeout(pendingTimer);
pendingTimer = setTimeout(
flushPendingTool,
TOOL_RESULT_DEBOUNCE_MS,
);
}
}
} }
if ( if (meta) {
(ev.type === "message" || ev.type === "message_end") &&
ev.message?.role === "tool_result" &&
Array.isArray(ev.message.content)
) {
const toolName = inferToolName(ev.message);
const toolCallId =
ev.message.toolCallId ?? ev.message.tool_call_id;
const meta =
inferToolMeta(ev.message) ??
(toolCallId ? toolMetaById.get(toolCallId) : undefined);
if ( if (
pendingToolName && pendingToolName &&
toolName && toolName &&
@@ -584,41 +590,66 @@ export async function runCommandReply(
flushPendingTool(); flushPendingTool();
} }
if (!pendingToolName) pendingToolName = toolName; if (!pendingToolName) pendingToolName = toolName;
if (meta) pendingMetas.push(meta); pendingMetas.push(meta);
if ( if (
TOOL_RESULT_FLUSH_COUNT > 0 && TOOL_RESULT_FLUSH_COUNT > 0 &&
pendingMetas.length >= TOOL_RESULT_FLUSH_COUNT pendingMetas.length >= TOOL_RESULT_FLUSH_COUNT
) { ) {
flushPendingTool(); flushPendingTool();
return; } else {
if (pendingTimer) clearTimeout(pendingTimer);
pendingTimer = setTimeout(
flushPendingTool,
TOOL_RESULT_DEBOUNCE_MS,
);
} }
if (pendingTimer) clearTimeout(pendingTimer);
pendingTimer = setTimeout(
flushPendingTool,
TOOL_RESULT_DEBOUNCE_MS,
);
} }
if (
ev.type === "message_end" ||
ev.type === "message_update" ||
ev.type === "message"
) {
streamAssistant(ev.message);
}
} catch {
// ignore malformed lines
} }
if (
enableToolStreaming &&
(ev.type === "message" || ev.type === "message_end") &&
ev.message?.role === "tool_result" &&
Array.isArray(ev.message.content)
) {
const toolName = inferToolName(ev.message);
const toolCallId =
ev.message.toolCallId ?? ev.message.tool_call_id;
const meta =
inferToolMeta(ev.message) ??
(toolCallId ? toolMetaById.get(toolCallId) : undefined);
if (
pendingToolName &&
toolName &&
toolName !== pendingToolName
) {
flushPendingTool();
}
if (!pendingToolName) pendingToolName = toolName;
if (meta) pendingMetas.push(meta);
if (
TOOL_RESULT_FLUSH_COUNT > 0 &&
pendingMetas.length >= TOOL_RESULT_FLUSH_COUNT
) {
flushPendingTool();
return;
}
if (pendingTimer) clearTimeout(pendingTimer);
pendingTimer = setTimeout(
flushPendingTool,
TOOL_RESULT_DEBOUNCE_MS,
);
}
if (ev.type === "message_end") {
streamAssistantFinal(ev.message);
}
} catch {
// ignore malformed lines
} }
: undefined, }
}); : undefined,
flushPendingTool();
return rpcResult;
}
return await commandRunner(agentKind === "pi" ? rpcArgv : finalArgv, {
timeoutMs,
cwd: reply.cwd,
input: rpcInput,
}); });
flushPendingTool();
return rpcResult;
}; };
const { stdout, stderr, code, signal, killed } = await enqueue(run, { const { stdout, stderr, code, signal, killed } = await enqueue(run, {
@@ -633,6 +664,7 @@ export async function runCommandReply(
}, },
}); });
const rawStdout = stdout.trim(); const rawStdout = stdout.trim();
const rpcAssistantText = extractRpcAssistantText(stdout);
let mediaFromCommand: string[] | undefined; let mediaFromCommand: string[] | undefined;
const trimmed = stripRpcNoise(rawStdout); const trimmed = stripRpcNoise(rawStdout);
if (stderr?.trim()) { if (stderr?.trim()) {
@@ -656,9 +688,7 @@ export async function runCommandReply(
); );
}; };
const parsed = const parsed = trimmed ? agent.parseOutput(trimmed) : undefined;
shouldApplyAgent && trimmed ? agent.parseOutput(trimmed) : undefined;
const _parserProvided = shouldApplyAgent && !!parsed;
// Collect assistant texts and tool results from parseOutput (tau RPC can emit many). // Collect assistant texts and tool results from parseOutput (tau RPC can emit many).
const parsedTexts = const parsedTexts =
@@ -734,10 +764,15 @@ export async function runCommandReply(
}); });
} }
// If parser gave nothing, fall back to raw stdout as a single message. // If parser gave nothing, fall back to best-effort assistant text (prefers RPC deltas).
if (replyItems.length === 0 && trimmed && !hasParsedContent) { const fallbackText =
rpcAssistantText ??
extractRpcAssistantText(trimmed) ??
extractAssistantTextLoosely(trimmed) ??
trimmed;
if (replyItems.length === 0 && fallbackText && !hasParsedContent) {
const { text: cleanedText, mediaUrls: mediaFound } = const { text: cleanedText, mediaUrls: mediaFound } =
splitMediaFromOutput(trimmed); splitMediaFromOutput(fallbackText);
if (cleanedText || mediaFound?.length) { if (cleanedText || mediaFound?.length) {
replyItems.push({ replyItems.push({
text: cleanedText, text: cleanedText,
@@ -771,8 +806,9 @@ export async function runCommandReply(
`Command auto-reply exited with code ${code ?? "unknown"} (signal: ${signal ?? "none"})`, `Command auto-reply exited with code ${code ?? "unknown"} (signal: ${signal ?? "none"})`,
); );
// Include any partial output or stderr in error message // Include any partial output or stderr in error message
const partialOut = trimmed const summarySource = rpcAssistantText ?? trimmed;
? `\n\nOutput: ${trimmed.slice(0, 500)}${trimmed.length > 500 ? "..." : ""}` const partialOut = summarySource
? `\n\nOutput: ${summarySource.slice(0, 500)}${summarySource.length > 500 ? "..." : ""}`
: ""; : "";
const errorText = `⚠️ Command exited with code ${code ?? "unknown"}${signal ? ` (${signal})` : ""}${partialOut}`; const errorText = `⚠️ Command exited with code ${code ?? "unknown"}${signal ? ` (${signal})` : ""}${partialOut}`;
return { return {
@@ -864,7 +900,7 @@ export async function runCommandReply(
} }
verboseLog(`Command auto-reply meta: ${JSON.stringify(meta)}`); verboseLog(`Command auto-reply meta: ${JSON.stringify(meta)}`);
return { payloads, meta }; return { payloads: streamedAny && onPartialReply ? [] : payloads, meta };
} catch (err) { } catch (err) {
const elapsed = Date.now() - started; const elapsed = Date.now() - started;
logger.info( logger.info(
@@ -884,7 +920,10 @@ export async function runCommandReply(
const baseMsg = const baseMsg =
"Command timed out after " + "Command timed out after " +
`${timeoutSeconds}s${reply.cwd ? ` (cwd: ${reply.cwd})` : ""}. Try a shorter prompt or split the request.`; `${timeoutSeconds}s${reply.cwd ? ` (cwd: ${reply.cwd})` : ""}. Try a shorter prompt or split the request.`;
const partial = errorObj.stdout?.trim(); const partial =
extractRpcAssistantText(errorObj.stdout ?? "") ||
extractAssistantTextLoosely(errorObj.stdout ?? "") ||
stripRpcNoise(errorObj.stdout ?? "");
const partialSnippet = const partialSnippet =
partial && partial.length > 800 partial && partial.length > 800
? `${partial.slice(0, 800)}...` ? `${partial.slice(0, 800)}...`

View File

@@ -402,6 +402,62 @@ export async function getReplyFromConfig(
return { text: ack }; return { text: ack };
} }
// If any directive (think/verbose) is present anywhere, acknowledge immediately and skip agent execution.
if (hasThinkDirective || hasVerboseDirective) {
if (sessionEntry && sessionStore && sessionKey) {
if (hasThinkDirective && inlineThink) {
if (inlineThink === "off") {
delete sessionEntry.thinkingLevel;
} else {
sessionEntry.thinkingLevel = inlineThink;
}
sessionEntry.updatedAt = Date.now();
}
if (hasVerboseDirective && inlineVerbose) {
if (inlineVerbose === "off") {
delete sessionEntry.verboseLevel;
} else {
sessionEntry.verboseLevel = inlineVerbose;
}
sessionEntry.updatedAt = Date.now();
}
if (sessionEntry.updatedAt) {
sessionStore[sessionKey] = sessionEntry;
await saveSessionStore(storePath, sessionStore);
}
}
const parts: string[] = [];
if (hasThinkDirective) {
if (!inlineThink) {
parts.push(
`Unrecognized thinking level "${rawThinkLevel ?? ""}". Valid levels: off, minimal, low, medium, high.`,
);
} else {
parts.push(
inlineThink === "off"
? "Thinking disabled."
: `Thinking level set to ${inlineThink}.`,
);
}
}
if (hasVerboseDirective) {
if (!inlineVerbose) {
parts.push(
`Unrecognized verbose level "${rawVerboseLevel ?? ""}". Valid levels: off, on.`,
);
} else {
parts.push(
inlineVerbose === "off"
? "Verbose logging disabled."
: "Verbose logging enabled.",
);
}
}
const ack = parts.join(" ");
cleanupTyping();
return { text: ack };
}
// Optional allowlist by origin number (E.164 without whatsapp: prefix) // Optional allowlist by origin number (E.164 without whatsapp: prefix)
const allowFrom = cfg.inbound?.allowFrom; const allowFrom = cfg.inbound?.allowFrom;
const from = (ctx.From ?? "").replace(/^whatsapp:/, ""); const from = (ctx.From ?? "").replace(/^whatsapp:/, "");

View File

@@ -28,6 +28,7 @@ class TauRpcClient {
reject: (err: unknown) => void; reject: (err: unknown) => void;
timer: NodeJS.Timeout; timer: NodeJS.Timeout;
onEvent?: (line: string) => void; onEvent?: (line: string) => void;
capMs: number;
} }
| undefined; | undefined;
@@ -67,6 +68,10 @@ class TauRpcClient {
} }
private handleLine(line: string) { private handleLine(line: string) {
// Any line = activity; refresh timeout watchdog.
if (this.pending) {
this.resetTimeout();
}
if (!this.pending) return; if (!this.pending) return;
this.buffer.push(line); this.buffer.push(line);
this.pending?.onEvent?.(line); this.pending?.onEvent?.(line);
@@ -90,6 +95,18 @@ class TauRpcClient {
} }
} }
private resetTimeout() {
if (!this.pending) return;
const capMs = this.pending.capMs;
if (this.pending.timer) clearTimeout(this.pending.timer);
this.pending.timer = setTimeout(() => {
const pending = this.pending;
this.pending = undefined;
pending?.reject(new Error(`tau rpc timed out after ${Math.round(capMs / 1000)}s`));
this.child?.kill("SIGKILL");
}, capMs);
}
async prompt( async prompt(
prompt: string, prompt: string,
timeoutMs: number, timeoutMs: number,
@@ -112,14 +129,14 @@ class TauRpcClient {
if (!ok) child.stdin.once("drain", () => resolve()); if (!ok) child.stdin.once("drain", () => resolve());
}); });
return await new Promise<TauRpcResult>((resolve, reject) => { return await new Promise<TauRpcResult>((resolve, reject) => {
// Hard cap to avoid stuck relays; agent_end or process exit should usually resolve first. // Hard cap to avoid stuck relays; resets on every line received.
const capMs = Math.min(timeoutMs, 5 * 60 * 1000); const capMs = Math.min(timeoutMs, 5 * 60 * 1000);
const timer = setTimeout(() => { const timer = setTimeout(() => {
this.pending = undefined; this.pending = undefined;
reject(new Error(`tau rpc timed out after ${capMs}ms`)); reject(new Error(`tau rpc timed out after ${Math.round(capMs / 1000)}s`));
child.kill("SIGKILL"); child.kill("SIGKILL");
}, capMs); }, capMs);
this.pending = { resolve, reject, timer, onEvent }; this.pending = { resolve, reject, timer, onEvent, capMs };
}); });
} }

View File

@@ -793,6 +793,31 @@ export async function monitorWebProvider(
}, },
{ {
onReplyStart: latest.sendComposing, onReplyStart: latest.sendComposing,
onPartialReply: async (partial) => {
try {
await deliverWebReply({
replyResult: partial,
msg: latest,
maxMediaBytes,
replyLogger,
runtime,
connectionId,
});
if (partial.text) {
recentlySent.add(partial.text);
if (recentlySent.size > MAX_RECENT_MESSAGES) {
const firstKey = recentlySent.values().next().value;
if (firstKey) recentlySent.delete(firstKey);
}
}
} catch (err) {
console.error(
danger(
`Failed sending partial web auto-reply to ${latest.from ?? conversationId}: ${String(err)}`,
),
);
}
},
}, },
); );