perf(pi): reuse tau rpc for command auto-replies

This commit is contained in:
Peter Steinberger
2025-12-02 20:09:51 +00:00
parent a34271adf9
commit b172b538fc
17 changed files with 695 additions and 93 deletions

View File

@@ -1,6 +1,6 @@
# Agent Abstraction Refactor Plan
Goal: support multiple agent CLIs (Claude, Codex, Pi, Opencode) cleanly, without legacy flags, and make parsing/injection per-agent. Keep WhatsApp/Twilio plumbing intact.
Goal: support multiple agent CLIs (Claude, Codex, Pi, Opencode, Gemini) cleanly, without legacy flags, and make parsing/injection per-agent. Keep WhatsApp/Twilio plumbing intact.
## Overview
- Introduce a pluggable agent layer (`src/agents/*`), selected by config.
@@ -15,7 +15,7 @@ Goal: support multiple agent CLIs (Claude, Codex, Pi, Opencode) cleanly, without
reply: {
mode: "command",
agent: {
kind: "claude" | "opencode" | "pi" | "codex",
kind: "claude" | "opencode" | "pi" | "codex" | "gemini",
format?: "text" | "json",
identityPrefix?: string
},
@@ -42,6 +42,7 @@ Goal: support multiple agent CLIs (Claude, Codex, Pi, Opencode) cleanly, without
- `src/agents/opencode.ts` reuse `parseOpencodeJson` (from PR #5), inject `--format json`, session flag `--session` defaults, identity prefix.
- `src/agents/pi.ts` parse NDJSON `AssistantMessageEvent` (final `message_end.message.content[text]`), inject `--mode json`/`-p` defaults, session flags.
- `src/agents/codex.ts` parse Codex JSONL (last `item` with `type:"agent_message"`; usage from `turn.completed`), inject `codex exec --json --skip-git-repo-check`, sandbox default read-only.
- `src/agents/gemini.ts` minimal parsing (plain text), identity prepend, honors `--output-format` when `format` is set, and defaults to `--resume {{SessionId}}` for session resume (new sessions need no flag). Override `sessionArgNew/sessionArgResume` if you use a different session strategy.
- Shared MEDIA extraction stays in `media/parse.ts`.
## Command runner changes

View File

@@ -4,6 +4,7 @@ import { CLAUDE_IDENTITY_PREFIX } from "../auto-reply/claude.js";
import { OPENCODE_IDENTITY_PREFIX } from "../auto-reply/opencode.js";
import { claudeSpec } from "./claude.js";
import { codexSpec } from "./codex.js";
import { GEMINI_IDENTITY_PREFIX, geminiSpec } from "./gemini.js";
import { opencodeSpec } from "./opencode.js";
import { piSpec } from "./pi.js";
@@ -115,4 +116,33 @@ describe("agent buildArgs + parseOutput helpers", () => {
expect(built).toContain("--skip-git-repo-check");
expect(built).toContain("read-only");
});
it("geminiSpec prepends identity unless already sent", () => {
const argv = ["gemini", "hi"];
const built = geminiSpec.buildArgs({
argv,
bodyIndex: 1,
isNewSession: true,
sessionId: "sess",
sendSystemOnce: false,
systemSent: false,
identityPrefix: undefined,
format: "json",
});
expect(built.at(-1)).toContain(GEMINI_IDENTITY_PREFIX);
const builtOnce = geminiSpec.buildArgs({
argv,
bodyIndex: 1,
isNewSession: false,
sessionId: "sess",
sendSystemOnce: true,
systemSent: true,
identityPrefix: undefined,
format: "json",
});
expect(builtOnce.at(-1)).toBe("hi");
expect(builtOnce).toContain("--output-format");
expect(builtOnce).toContain("json");
});
});

View File

@@ -12,7 +12,16 @@ import type { AgentMeta, AgentSpec } from "./types.js";
function toMeta(parsed?: ClaudeJsonParseResult): AgentMeta | undefined {
if (!parsed?.parsed) return undefined;
const summary = summarizeClaudeMetadata(parsed.parsed);
return summary ? { extra: { summary } } : undefined;
const sessionId =
parsed.parsed &&
typeof parsed.parsed === "object" &&
typeof (parsed.parsed as { session_id?: unknown }).session_id === "string"
? (parsed.parsed as { session_id: string }).session_id
: undefined;
const meta: AgentMeta = {};
if (sessionId) meta.sessionId = sessionId;
if (summary) meta.extra = { summary };
return Object.keys(meta).length ? meta : undefined;
}
export const claudeSpec: AgentSpec = {

50
src/agents/gemini.ts Normal file
View File

@@ -0,0 +1,50 @@
import path from "node:path";
import type { AgentMeta, AgentSpec } from "./types.js";
const GEMINI_BIN = "gemini";
export const GEMINI_IDENTITY_PREFIX =
"You are Gemini responding for warelay. Keep WhatsApp replies concise (<1500 chars). If the prompt contains media paths or a Transcript block, use them. If this was a heartbeat probe and nothing needs attention, reply with exactly HEARTBEAT_OK.";
// Gemini CLI currently prints plain text; --output json is flaky across versions, so we
// keep parsing minimal and let MEDIA token stripping happen later in the pipeline.
function parseGeminiOutput(raw: string): { text?: string; meta?: AgentMeta } {
const trimmed = raw.trim();
return { text: trimmed || undefined, meta: undefined };
}
export const geminiSpec: AgentSpec = {
kind: "gemini",
isInvocation: (argv) =>
argv.length > 0 && path.basename(argv[0]) === GEMINI_BIN,
buildArgs: (ctx) => {
const argv = [...ctx.argv];
const body = argv[ctx.bodyIndex] ?? "";
const beforeBody = argv.slice(0, ctx.bodyIndex);
const afterBody = argv.slice(ctx.bodyIndex + 1);
if (ctx.format) {
const hasOutput =
beforeBody.some(
(p) => p === "--output-format" || p.startsWith("--output-format="),
) ||
afterBody.some(
(p) => p === "--output-format" || p.startsWith("--output-format="),
);
if (!hasOutput) {
beforeBody.push("--output-format", ctx.format);
}
}
const shouldPrependIdentity = !(ctx.sendSystemOnce && ctx.systemSent);
const bodyWithIdentity =
shouldPrependIdentity && body
? [ctx.identityPrefix ?? GEMINI_IDENTITY_PREFIX, body]
.filter(Boolean)
.join("\n\n")
: body;
return [...beforeBody, bodyWithIdentity, ...afterBody];
},
parseOutput: parseGeminiOutput,
};

View File

@@ -1,5 +1,6 @@
import { claudeSpec } from "./claude.js";
import { codexSpec } from "./codex.js";
import { geminiSpec } from "./gemini.js";
import { opencodeSpec } from "./opencode.js";
import { piSpec } from "./pi.js";
import type { AgentKind, AgentSpec } from "./types.js";
@@ -7,6 +8,7 @@ import type { AgentKind, AgentSpec } from "./types.js";
const specs: Record<AgentKind, AgentSpec> = {
claude: claudeSpec,
codex: codexSpec,
gemini: geminiSpec,
opencode: opencodeSpec,
pi: piSpec,
};

View File

@@ -47,7 +47,11 @@ function parsePiJson(raw: string): AgentParseResult {
export const piSpec: AgentSpec = {
kind: "pi",
isInvocation: (argv) => argv.length > 0 && path.basename(argv[0]) === "pi",
isInvocation: (argv) => {
if (argv.length === 0) return false;
const base = path.basename(argv[0]).replace(/\.(m?js)$/i, "");
return base === "pi" || base === "tau";
},
buildArgs: (ctx) => {
const argv = [...ctx.argv];
// Non-interactive print + JSON

View File

@@ -1,9 +1,10 @@
export type AgentKind = "claude" | "opencode" | "pi" | "codex";
export type AgentKind = "claude" | "opencode" | "pi" | "codex" | "gemini";
export type AgentMeta = {
model?: string;
provider?: string;
stopReason?: string;
sessionId?: string;
usage?: {
input?: number;
output?: number;

View File

@@ -6,9 +6,11 @@ import type { AgentMeta } from "../agents/types.js";
import type { WarelayConfig } from "../config/config.js";
import { isVerbose, logVerbose } from "../globals.js";
import { logError } from "../logger.js";
import { getChildLogger } from "../logging.js";
import { splitMediaFromOutput } from "../media/parse.js";
import { enqueueCommand } from "../process/command-queue.js";
import type { runCommandWithTimeout } from "../process/exec.js";
import { runPiRpc } from "../process/tau-rpc.js";
import { applyTemplate, type TemplateContext } from "./templating.js";
import type { ReplyPayload } from "./types.js";
@@ -99,6 +101,12 @@ export function summarizeClaudeMetadata(payload: unknown): string | undefined {
export async function runCommandReply(
params: CommandReplyParams,
): Promise<CommandReplyResult> {
const logger = getChildLogger({ module: "command-reply" });
const verboseLog = (msg: string) => {
logger.debug(msg);
if (isVerbose()) logVerbose(msg);
};
const {
reply,
templatingCtx,
@@ -133,14 +141,25 @@ export async function runCommandReply(
// Session args prepared (templated) and injected generically
if (reply.session) {
const defaultNew =
agentCfg.kind === "claude"
? ["--session-id", "{{SessionId}}"]
: ["--session", "{{SessionId}}"];
const defaultResume =
agentCfg.kind === "claude"
? ["--resume", "{{SessionId}}"]
: ["--session", "{{SessionId}}"];
const defaultSessionArgs = (() => {
switch (agentCfg.kind) {
case "claude":
return {
newArgs: ["--session-id", "{{SessionId}}"],
resumeArgs: ["--resume", "{{SessionId}}"],
};
case "gemini":
// Gemini CLI supports --resume <id>; starting a new session needs no flag.
return { newArgs: [], resumeArgs: ["--resume", "{{SessionId}}"] };
default:
return {
newArgs: ["--session", "{{SessionId}}"],
resumeArgs: ["--session", "{{SessionId}}"],
};
}
})();
const defaultNew = defaultSessionArgs.newArgs;
const defaultResume = defaultSessionArgs.resumeArgs;
const sessionArgList = (
isNewSession
? (reply.session.sessionArgNew ?? defaultNew)
@@ -170,7 +189,7 @@ export async function runCommandReply(
systemSent,
identityPrefix: agentCfg.identityPrefix,
format: agentCfg.format,
})
})
: argv;
logVerbose(
@@ -181,20 +200,41 @@ export async function runCommandReply(
let queuedMs: number | undefined;
let queuedAhead: number | undefined;
try {
const { stdout, stderr, code, signal, killed } = await enqueue(
() => commandRunner(finalArgv, { timeoutMs, cwd: reply.cwd }),
{
onWait: (waitMs, ahead) => {
queuedMs = waitMs;
queuedAhead = ahead;
if (isVerbose()) {
logVerbose(
`Command auto-reply queued for ${waitMs}ms (${queuedAhead} ahead)`,
);
const run = async () => {
// Prefer long-lived tau RPC for pi agent to avoid cold starts.
if (agentKind === "pi") {
const body = finalArgv[bodyIndex] ?? "";
// Build rpc args without the prompt body; force --mode rpc.
const rpcArgv = (() => {
const copy = [...finalArgv];
copy.splice(bodyIndex, 1);
const modeIdx = copy.findIndex((a) => a === "--mode");
if (modeIdx >= 0 && copy[modeIdx + 1]) {
copy.splice(modeIdx, 2, "--mode", "rpc");
} else if (!copy.includes("--mode")) {
copy.splice(copy.length - 1, 0, "--mode", "rpc");
}
},
return copy;
})();
return await runPiRpc({
argv: rpcArgv,
cwd: reply.cwd,
prompt: body,
timeoutMs,
});
}
return await commandRunner(finalArgv, { timeoutMs, cwd: reply.cwd });
};
const { stdout, stderr, code, signal, killed } = await enqueue(run, {
onWait: (waitMs, ahead) => {
queuedMs = waitMs;
queuedAhead = ahead;
if (isVerbose()) {
logVerbose(`Command auto-reply queued for ${waitMs}ms (${queuedAhead} ahead)`);
}
},
);
});
const rawStdout = stdout.trim();
let mediaFromCommand: string[] | undefined;
let trimmed = rawStdout;
@@ -214,17 +254,19 @@ export async function runCommandReply(
trimmed = cleanedText;
if (mediaFound?.length) {
mediaFromCommand = mediaFound;
if (isVerbose()) logVerbose(`MEDIA token extracted: ${mediaFound}`);
} else if (isVerbose()) {
logVerbose("No MEDIA token extracted from final text");
verboseLog(`MEDIA token extracted: ${mediaFound}`);
} else {
verboseLog("No MEDIA token extracted from final text");
}
if (!trimmed && !mediaFromCommand) {
const meta = parsed?.meta?.extra?.summary ?? undefined;
trimmed = `(command produced no output${meta ? `; ${meta}` : ""})`;
logVerbose("No text/media produced; injecting fallback notice to user");
verboseLog("No text/media produced; injecting fallback notice to user");
}
logVerbose(`Command auto-reply stdout (trimmed): ${trimmed || "<empty>"}`);
logVerbose(`Command auto-reply finished in ${Date.now() - started}ms`);
verboseLog(`Command auto-reply stdout (trimmed): ${trimmed || "<empty>"}`);
const elapsed = Date.now() - started;
verboseLog(`Command auto-reply finished in ${elapsed}ms`);
logger.info({ durationMs: elapsed, agent: agentKind, cwd: reply.cwd }, "command auto-reply finished");
if ((code ?? 0) !== 0) {
console.error(
`Command auto-reply exited with code ${code ?? "unknown"} (signal: ${signal ?? "none"})`,
@@ -311,17 +353,16 @@ export async function runCommandReply(
killed,
agentMeta: parsed?.meta,
};
if (isVerbose()) {
logVerbose(`Command auto-reply meta: ${JSON.stringify(meta)}`);
}
verboseLog(`Command auto-reply meta: ${JSON.stringify(meta)}`);
return { payload, meta };
} catch (err) {
const elapsed = Date.now() - started;
logger.info({ durationMs: elapsed, agent: agentKind, cwd: reply.cwd }, "command auto-reply failed");
const anyErr = err as { killed?: boolean; signal?: string };
const timeoutHit = anyErr.killed === true || anyErr.signal === "SIGKILL";
const errorObj = err as { stdout?: string; stderr?: string };
if (errorObj.stderr?.trim()) {
logVerbose(`Command auto-reply stderr: ${errorObj.stderr.trim()}`);
verboseLog(`Command auto-reply stderr: ${errorObj.stderr.trim()}`);
}
if (timeoutHit) {
console.error(

View File

@@ -8,6 +8,7 @@ import {
deriveSessionKey,
loadSessionStore,
resolveStorePath,
type SessionEntry,
saveSessionStore,
} from "../config/sessions.js";
import { info, isVerbose, logVerbose } from "../globals.js";
@@ -27,6 +28,15 @@ import type { GetReplyOptions, ReplyPayload } from "./types.js";
export type { GetReplyOptions, ReplyPayload } from "./types.js";
const ABORT_TRIGGERS = new Set(["stop", "esc", "abort", "wait", "exit"]);
const ABORT_MEMORY = new Map<string, boolean>();
function isAbortTrigger(text?: string): boolean {
if (!text) return false;
const normalized = text.trim().toLowerCase();
return ABORT_TRIGGERS.has(normalized);
}
export async function getReplyFromConfig(
ctx: MsgContext,
opts?: GetReplyOptions,
@@ -95,11 +105,13 @@ export async function getReplyFromConfig(
const storePath = resolveStorePath(sessionCfg?.store);
let sessionStore: ReturnType<typeof loadSessionStore> | undefined;
let sessionKey: string | undefined;
let sessionEntry: SessionEntry | undefined;
let sessionId: string | undefined;
let isNewSession = false;
let bodyStripped: string | undefined;
let systemSent = false;
let abortedLastRun = false;
if (sessionCfg) {
const trimmedBody = (ctx.Body ?? "").trim();
@@ -127,13 +139,21 @@ export async function getReplyFromConfig(
if (!isNewSession && freshEntry) {
sessionId = entry.sessionId;
systemSent = entry.systemSent ?? false;
abortedLastRun = entry.abortedLastRun ?? false;
} else {
sessionId = crypto.randomUUID();
isNewSession = true;
systemSent = false;
abortedLastRun = false;
}
sessionStore[sessionKey] = { sessionId, updatedAt: Date.now(), systemSent };
sessionEntry = {
sessionId,
updatedAt: Date.now(),
systemSent,
abortedLastRun,
};
sessionStore[sessionKey] = sessionEntry;
await saveSessionStore(storePath, sessionStore);
}
@@ -149,6 +169,11 @@ export async function getReplyFromConfig(
const from = (ctx.From ?? "").replace(/^whatsapp:/, "");
const to = (ctx.To ?? "").replace(/^whatsapp:/, "");
const isSamePhone = from && to && from === to;
const abortKey = sessionKey ?? (from || undefined) ?? (to || undefined);
if (!sessionEntry && abortKey) {
abortedLastRun = ABORT_MEMORY.get(abortKey) ?? false;
}
// Same-phone mode (self-messaging) is always allowed
if (isSamePhone) {
@@ -164,6 +189,23 @@ export async function getReplyFromConfig(
}
}
const abortRequested =
reply?.mode === "command" &&
isAbortTrigger((sessionCtx.BodyStripped ?? sessionCtx.Body ?? "").trim());
if (abortRequested) {
if (sessionEntry && sessionStore && sessionKey) {
sessionEntry.abortedLastRun = true;
sessionEntry.updatedAt = Date.now();
sessionStore[sessionKey] = sessionEntry;
await saveSessionStore(storePath, sessionStore);
} else if (abortKey) {
ABORT_MEMORY.set(abortKey, true);
}
cleanupTyping();
return { text: "Agent was aborted." };
}
await startTypingLoop();
// Optional prefix injected before Body for templating/command prompts.
@@ -177,16 +219,30 @@ export async function getReplyFromConfig(
? applyTemplate(reply.bodyPrefix, sessionCtx)
: "";
const baseBody = sessionCtx.BodyStripped ?? sessionCtx.Body ?? "";
const prefixedBodyBase = (() => {
let body = baseBody;
if (!sendSystemOnce || isFirstTurnInSession) {
body = bodyPrefix ? `${bodyPrefix}${body}` : body;
const abortedHint =
reply?.mode === "command" && abortedLastRun
? "Note: The previous agent run was aborted by the user. Resume carefully or ask for clarification."
: "";
let prefixedBodyBase = baseBody;
if (!sendSystemOnce || isFirstTurnInSession) {
prefixedBodyBase = bodyPrefix
? `${bodyPrefix}${prefixedBodyBase}`
: prefixedBodyBase;
}
if (sessionIntro) {
prefixedBodyBase = `${sessionIntro}\n\n${prefixedBodyBase}`;
}
if (abortedHint) {
prefixedBodyBase = `${abortedHint}\n\n${prefixedBodyBase}`;
if (sessionEntry && sessionStore && sessionKey) {
sessionEntry.abortedLastRun = false;
sessionEntry.updatedAt = Date.now();
sessionStore[sessionKey] = sessionEntry;
await saveSessionStore(storePath, sessionStore);
} else if (abortKey) {
ABORT_MEMORY.set(abortKey, false);
}
if (sessionIntro) {
body = `${sessionIntro}\n\n${body}`;
}
return body;
})();
}
if (
sessionCfg &&
sendSystemOnce &&
@@ -194,12 +250,18 @@ export async function getReplyFromConfig(
sessionStore &&
sessionKey
) {
sessionStore[sessionKey] = {
...(sessionStore[sessionKey] ?? {}),
sessionId: sessionId ?? crypto.randomUUID(),
const current = sessionEntry ??
sessionStore[sessionKey] ?? {
sessionId: sessionId ?? crypto.randomUUID(),
updatedAt: Date.now(),
};
sessionEntry = {
...current,
sessionId: sessionId ?? current.sessionId ?? crypto.randomUUID(),
updatedAt: Date.now(),
systemSent: true,
};
sessionStore[sessionKey] = sessionEntry;
await saveSessionStore(storePath, sessionStore);
systemSent = true;
}
@@ -265,6 +327,31 @@ export async function getReplyFromConfig(
timeoutSeconds,
commandRunner,
});
if (sessionCfg && sessionStore && sessionKey) {
const returnedSessionId = meta.agentMeta?.sessionId;
if (returnedSessionId && returnedSessionId !== sessionId) {
const entry = sessionEntry ??
sessionStore[sessionKey] ?? {
sessionId: returnedSessionId,
updatedAt: Date.now(),
systemSent,
abortedLastRun,
};
sessionEntry = {
...entry,
sessionId: returnedSessionId,
updatedAt: Date.now(),
};
sessionStore[sessionKey] = sessionEntry;
await saveSessionStore(storePath, sessionStore);
sessionId = returnedSessionId;
if (isVerbose()) {
logVerbose(
`Session id updated from agent meta: ${returnedSessionId} (store: ${storePath})`,
);
}
}
}
if (meta.agentMeta && isVerbose()) {
logVerbose(`Agent meta: ${JSON.stringify(meta.agentMeta)}`);
}

View File

@@ -117,6 +117,7 @@ const ReplySchema = z
z.literal("opencode"),
z.literal("pi"),
z.literal("codex"),
z.literal("gemini"),
]),
format: z.union([z.literal("text"), z.literal("json")]).optional(),
identityPrefix: z.string().optional(),

View File

@@ -12,6 +12,7 @@ export type SessionEntry = {
sessionId: string;
updatedAt: number;
systemSent?: boolean;
abortedLastRun?: boolean;
};
export const SESSION_STORE_DEFAULT = path.join(CONFIG_DIR, "sessions.json");

View File

@@ -671,6 +671,124 @@ describe("config and templating", () => {
expect(secondArgv[secondArgv.length - 1]).toBe("[sys] next");
});
it("stores session id returned by agent meta when it differs", async () => {
const tmpStore = path.join(
os.tmpdir(),
`warelay-store-${Date.now()}-sessionid.json`,
);
vi.spyOn(crypto, "randomUUID").mockReturnValue("initial-sid");
const runSpy = vi.spyOn(index, "runCommandWithTimeout").mockResolvedValue({
stdout: '{"text":"hi","session_id":"agent-sid-123"}\n',
stderr: "",
code: 0,
signal: null,
killed: false,
});
const cfg = {
inbound: {
reply: {
mode: "command" as const,
command: ["claude", "{{Body}}"],
agent: { kind: "claude", format: "json" as const },
session: { store: tmpStore },
},
},
};
await index.getReplyFromConfig(
{ Body: "/new hi", From: "+1", To: "+2" },
undefined,
cfg,
runSpy,
);
const persisted = JSON.parse(fs.readFileSync(tmpStore, "utf-8"));
const entry = Object.values(persisted)[0] as { sessionId?: string };
expect(entry.sessionId).toBe("agent-sid-123");
});
it("aborts command when stop word is received and skips command runner", async () => {
const tmpStore = path.join(
os.tmpdir(),
`warelay-store-${Date.now()}-abort.json`,
);
const runSpy = vi.fn().mockResolvedValue({
stdout: "should-not-run",
stderr: "",
code: 0,
signal: null,
killed: false,
});
const cfg = {
inbound: {
reply: {
mode: "command" as const,
command: ["echo", "{{Body}}"],
session: { store: tmpStore },
},
},
};
const result = await index.getReplyFromConfig(
{ Body: "stop", From: "+1", To: "+2" },
undefined,
cfg,
runSpy,
);
expect(result?.text).toMatch(/aborted/i);
expect(runSpy).not.toHaveBeenCalled();
const persisted = JSON.parse(fs.readFileSync(tmpStore, "utf-8"));
const entry = Object.values(persisted)[0] as { abortedLastRun?: boolean };
expect(entry.abortedLastRun).toBe(true);
});
it("adds an abort hint to the next prompt and then clears the flag", async () => {
const tmpStore = path.join(
os.tmpdir(),
`warelay-store-${Date.now()}-aborthint.json`,
);
const runSpy = vi.fn().mockResolvedValue({
stdout: "ok\n",
stderr: "",
code: 0,
signal: null,
killed: false,
});
const cfg = {
inbound: {
reply: {
mode: "command" as const,
command: ["echo", "{{Body}}"],
session: { store: tmpStore },
},
},
};
await index.getReplyFromConfig(
{ Body: "abort", From: "+1555", To: "+2666" },
undefined,
cfg,
runSpy,
);
const result = await index.getReplyFromConfig(
{ Body: "continue", From: "+1555", To: "+2666" },
undefined,
cfg,
runSpy,
);
const argv = runSpy.mock.calls[0][0];
const prompt = argv.at(-1) as string;
expect(prompt).toMatch(/previous agent run was aborted/i);
expect(prompt).toMatch(/continue/);
const persisted = JSON.parse(fs.readFileSync(tmpStore, "utf-8"));
const entry = Object.values(persisted)[0] as { abortedLastRun?: boolean };
expect(entry.abortedLastRun).toBe(false);
expect(result?.text).toBe("ok");
});
it("refreshes typing indicator while command runs", async () => {
const onReplyStart = vi.fn();
const runSpy = vi.spyOn(index, "runCommandWithTimeout").mockImplementation(

116
src/process/tau-rpc.ts Normal file
View File

@@ -0,0 +1,116 @@
import { spawn, type ChildProcessWithoutNullStreams } from "node:child_process";
import readline from "node:readline";
type TauRpcOptions = {
argv: string[];
cwd?: string;
timeoutMs: number;
};
type TauRpcResult = { stdout: string; stderr: string; code: number };
class TauRpcClient {
private child: ChildProcessWithoutNullStreams | null = null;
private rl: readline.Interface | null = null;
private stderr = "";
private buffer: string[] = [];
private pending:
| {
resolve: (r: TauRpcResult) => void;
reject: (err: unknown) => void;
timer: NodeJS.Timeout;
}
| undefined;
constructor(private readonly argv: string[], private readonly cwd: string | undefined) {}
private ensureChild() {
if (this.child) return;
this.child = spawn(this.argv[0], this.argv.slice(1), {
cwd: this.cwd,
stdio: ["pipe", "pipe", "pipe"],
});
this.rl = readline.createInterface({ input: this.child.stdout });
this.rl.on("line", (line) => this.handleLine(line));
this.child.stderr.on("data", (d) => {
this.stderr += d.toString();
});
this.child.on("exit", (code, signal) => {
if (this.pending) {
this.pending.reject(new Error(`tau rpc exited (code=${code}, signal=${signal})`));
clearTimeout(this.pending.timer);
this.pending = undefined;
}
this.dispose();
});
}
private handleLine(line: string) {
if (!this.pending) return;
this.buffer.push(line);
// Finish on assistant message_end event to mirror parse logic in piSpec
if (line.includes('"type":"message_end"') && line.includes('"role":"assistant"')) {
const out = this.buffer.join("\n");
clearTimeout(this.pending.timer);
const pending = this.pending;
this.pending = undefined;
this.buffer = [];
pending.resolve({ stdout: out, stderr: this.stderr, code: 0 });
}
}
async prompt(prompt: string, timeoutMs: number): Promise<TauRpcResult> {
this.ensureChild();
if (this.pending) {
throw new Error("tau rpc already handling a request");
}
const child = this.child!;
await new Promise<void>((resolve, reject) => {
const ok = child.stdin.write(
JSON.stringify({
type: "prompt",
message: { role: "user", content: [{ type: "text", text: prompt }] },
}) + "\n",
(err) => (err ? reject(err) : resolve()),
);
if (!ok) child.stdin.once("drain", () => resolve());
});
return await new Promise<TauRpcResult>((resolve, reject) => {
const timer = setTimeout(() => {
this.pending = undefined;
reject(new Error(`tau rpc timed out after ${timeoutMs}ms`));
child.kill("SIGKILL");
}, timeoutMs);
this.pending = { resolve, reject, timer };
});
}
dispose() {
this.rl?.close();
this.rl = null;
if (this.child && !this.child.killed) {
this.child.kill("SIGKILL");
}
this.child = null;
this.buffer = [];
this.stderr = "";
}
}
let singleton: { key: string; client: TauRpcClient } | undefined;
export async function runPiRpc(
opts: TauRpcOptions & { prompt: string },
): Promise<TauRpcResult> {
const key = `${opts.cwd ?? ""}|${opts.argv.join(" ")}`;
if (!singleton || singleton.key !== key) {
singleton?.client.dispose();
singleton = { key, client: new TauRpcClient(opts.argv, opts.cwd) };
}
return singleton.client.prompt(opts.prompt, opts.timeoutMs);
}
export function resetPiRpc() {
singleton?.client.dispose();
singleton = undefined;
}

View File

@@ -12,7 +12,7 @@ import {
import { danger, info, isVerbose, logVerbose, success } from "../globals.js";
import { logInfo } from "../logger.js";
import { getChildLogger } from "../logging.js";
import { enqueueCommand, getQueueSize } from "../process/command-queue.js";
import { getQueueSize } from "../process/command-queue.js";
import { defaultRuntime, type RuntimeEnv } from "../runtime.js";
import { normalizeE164 } from "../utils.js";
import { monitorWebInbox } from "./inbound.js";
@@ -621,21 +621,19 @@ export async function monitorWebProvider(
: new Date().toISOString();
console.log(`\n[${tsDisplay}] ${from} -> ${latest.to}: ${combinedBody}`);
const replyResult = await enqueueCommand(() =>
(replyResolver ?? getReplyFromConfig)(
{
Body: combinedBody,
From: latest.from,
To: latest.to,
MessageSid: latest.id,
MediaPath: latest.mediaPath,
MediaUrl: latest.mediaUrl,
MediaType: latest.mediaType,
},
{
onReplyStart: latest.sendComposing,
},
),
const replyResult = await (replyResolver ?? getReplyFromConfig)(
{
Body: combinedBody,
From: latest.from,
To: latest.to,
MessageSid: latest.id,
MediaPath: latest.mediaPath,
MediaUrl: latest.mediaUrl,
MediaType: latest.mediaType,
},
{
onReplyStart: latest.sendComposing,
},
);
if (
@@ -931,24 +929,19 @@ export async function monitorWebProvider(
"reply heartbeat start",
);
}
const hbFrom = lastInboundMsg.from;
const hbTo = lastInboundMsg.to;
const hbComposing = lastInboundMsg.sendComposing;
const replyResult = await enqueueCommand(() =>
(replyResolver ?? getReplyFromConfig)(
{
Body: HEARTBEAT_PROMPT,
From: hbFrom,
To: hbTo,
MessageSid: snapshot.entry?.sessionId,
MediaPath: undefined,
MediaUrl: undefined,
MediaType: undefined,
},
{
onReplyStart: hbComposing,
},
),
const replyResult = await (replyResolver ?? getReplyFromConfig)(
{
Body: HEARTBEAT_PROMPT,
From: lastInboundMsg.from,
To: lastInboundMsg.to,
MessageSid: snapshot.entry?.sessionId,
MediaPath: undefined,
MediaUrl: undefined,
MediaType: undefined,
},
{
onReplyStart: lastInboundMsg.sendComposing,
},
);
if (

View File

@@ -1,4 +1,5 @@
import fs from "node:fs/promises";
import path from "node:path";
import sharp from "sharp";
import { isVerbose, logVerbose } from "../globals.js";
@@ -12,7 +13,12 @@ import { detectMime } from "../media/mime.js";
export async function loadWebMedia(
mediaUrl: string,
maxBytes?: number,
): Promise<{ buffer: Buffer; contentType?: string; kind: MediaKind }> {
): Promise<{
buffer: Buffer;
contentType?: string;
kind: MediaKind;
fileName?: string;
}> {
if (mediaUrl.startsWith("file://")) {
mediaUrl = mediaUrl.replace("file://", "");
}
@@ -40,6 +46,14 @@ export async function loadWebMedia(
};
if (/^https?:\/\//i.test(mediaUrl)) {
let fileName: string | undefined;
try {
const url = new URL(mediaUrl);
const base = path.basename(url.pathname);
fileName = base || undefined;
} catch {
// ignore parse errors; leave undefined
}
const res = await fetch(mediaUrl);
if (!res.ok || !res.body) {
throw new Error(`Failed to fetch media: HTTP ${res.status}`);
@@ -56,7 +70,7 @@ export async function loadWebMedia(
maxBytesForKind(kind),
);
if (kind === "image") {
return optimizeAndClampImage(array, cap);
return { ...(await optimizeAndClampImage(array, cap)), fileName };
}
if (array.length > cap) {
throw new Error(
@@ -65,19 +79,25 @@ export async function loadWebMedia(
).toFixed(2)}MB)`,
);
}
return { buffer: array, contentType: contentType ?? undefined, kind };
return {
buffer: array,
contentType: contentType ?? undefined,
kind,
fileName,
};
}
// Local path
const data = await fs.readFile(mediaUrl);
const mime = detectMime({ buffer: data, filePath: mediaUrl });
const kind = mediaKindFromMime(mime);
const fileName = path.basename(mediaUrl) || undefined;
const cap = Math.min(
maxBytes ?? maxBytesForKind(kind),
maxBytesForKind(kind),
);
if (kind === "image") {
return optimizeAndClampImage(data, cap);
return { ...(await optimizeAndClampImage(data, cap)), fileName };
}
if (data.length > cap) {
throw new Error(
@@ -86,7 +106,7 @@ export async function loadWebMedia(
).toFixed(2)}MB)`,
);
}
return { buffer: data, contentType: mime, kind };
return { buffer: data, contentType: mime, kind, fileName };
}
export async function optimizeImageToJpeg(

View File

@@ -1,3 +1,4 @@
import type { AnyMessageContent } from "@whiskeysockets/baileys";
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
import { resetLogger, setLoggerOverride } from "../logging.js";
@@ -17,6 +18,11 @@ vi.mock("./session.js", () => {
};
});
const loadWebMediaMock = vi.fn();
vi.mock("./media.js", () => ({
loadWebMedia: (...args: unknown[]) => loadWebMediaMock(...args),
}));
import { sendMessageWeb } from "./outbound.js";
const { createWaSocket } = await import("./session.js");
@@ -37,4 +43,98 @@ describe("web outbound", () => {
expect(sock.sendMessage).toHaveBeenCalled();
expect(sock.ws.close).toHaveBeenCalled();
});
it("maps audio to PTT with opus mime when ogg", async () => {
const buf = Buffer.from("audio");
loadWebMediaMock.mockResolvedValueOnce({
buffer: buf,
contentType: "audio/ogg",
kind: "audio",
});
await sendMessageWeb("+1555", "voice note", {
verbose: false,
mediaUrl: "/tmp/voice.ogg",
});
const sock = await createWaSocket();
const [, payload] = sock.sendMessage.mock.calls.at(-1) as [
string,
AnyMessageContent,
];
expect(payload).toMatchObject({
audio: buf,
ptt: true,
mimetype: "audio/ogg; codecs=opus",
});
});
it("maps video with caption", async () => {
const buf = Buffer.from("video");
loadWebMediaMock.mockResolvedValueOnce({
buffer: buf,
contentType: "video/mp4",
kind: "video",
});
await sendMessageWeb("+1555", "clip", {
verbose: false,
mediaUrl: "/tmp/video.mp4",
});
const sock = await createWaSocket();
const [, payload] = sock.sendMessage.mock.calls.at(-1) as [
string,
AnyMessageContent,
];
expect(payload).toMatchObject({
video: buf,
caption: "clip",
mimetype: "video/mp4",
});
});
it("maps image with caption", async () => {
const buf = Buffer.from("img");
loadWebMediaMock.mockResolvedValueOnce({
buffer: buf,
contentType: "image/jpeg",
kind: "image",
});
await sendMessageWeb("+1555", "pic", {
verbose: false,
mediaUrl: "/tmp/pic.jpg",
});
const sock = await createWaSocket();
const [, payload] = sock.sendMessage.mock.calls.at(-1) as [
string,
AnyMessageContent,
];
expect(payload).toMatchObject({
image: buf,
caption: "pic",
mimetype: "image/jpeg",
});
});
it("maps other kinds to document with filename", async () => {
const buf = Buffer.from("pdf");
loadWebMediaMock.mockResolvedValueOnce({
buffer: buf,
contentType: "application/pdf",
kind: "document",
fileName: "file.pdf",
});
await sendMessageWeb("+1555", "doc", {
verbose: false,
mediaUrl: "/tmp/file.pdf",
});
const sock = await createWaSocket();
const [, payload] = sock.sendMessage.mock.calls.at(-1) as [
string,
AnyMessageContent,
];
expect(payload).toMatchObject({
document: buf,
fileName: "file.pdf",
caption: "doc",
mimetype: "application/pdf",
});
});
});

View File

@@ -35,11 +35,39 @@ export async function sendMessageWeb(
let payload: AnyMessageContent = { text: body };
if (options.mediaUrl) {
const media = await loadWebMedia(options.mediaUrl);
payload = {
image: media.buffer,
caption: body || undefined,
mimetype: media.contentType,
};
const caption = body || undefined;
if (media.kind === "audio") {
// WhatsApp expects explicit opus codec for PTT voice notes.
const mimetype =
media.contentType === "audio/ogg"
? "audio/ogg; codecs=opus"
: media.contentType ?? "application/octet-stream";
payload = { audio: media.buffer, ptt: true, mimetype };
} else if (media.kind === "video") {
const mimetype = media.contentType ?? "application/octet-stream";
payload = {
video: media.buffer,
caption,
mimetype,
};
} else if (media.kind === "image") {
const mimetype = media.contentType ?? "application/octet-stream";
payload = {
image: media.buffer,
caption,
mimetype,
};
} else {
// Fallback to document for anything else (pdf, etc.).
const fileName = media.fileName ?? "file";
const mimetype = media.contentType ?? "application/octet-stream";
payload = {
document: media.buffer,
fileName,
caption,
mimetype,
};
}
}
logInfo(
`📤 Sending via web session -> ${jid}${options.mediaUrl ? " (media)" : ""}`,