diff --git a/docs/agent.md b/docs/agent.md index eacd853b5..c0eaec718 100644 --- a/docs/agent.md +++ b/docs/agent.md @@ -1,6 +1,6 @@ # Agent Abstraction Refactor Plan -Goal: support multiple agent CLIs (Claude, Codex, Pi, Opencode) cleanly, without legacy flags, and make parsing/injection per-agent. Keep WhatsApp/Twilio plumbing intact. +Goal: support multiple agent CLIs (Claude, Codex, Pi, Opencode, Gemini) cleanly, without legacy flags, and make parsing/injection per-agent. Keep WhatsApp/Twilio plumbing intact. ## Overview - Introduce a pluggable agent layer (`src/agents/*`), selected by config. @@ -15,7 +15,7 @@ Goal: support multiple agent CLIs (Claude, Codex, Pi, Opencode) cleanly, without reply: { mode: "command", agent: { - kind: "claude" | "opencode" | "pi" | "codex", + kind: "claude" | "opencode" | "pi" | "codex" | "gemini", format?: "text" | "json", identityPrefix?: string }, @@ -42,6 +42,7 @@ Goal: support multiple agent CLIs (Claude, Codex, Pi, Opencode) cleanly, without - `src/agents/opencode.ts` – reuse `parseOpencodeJson` (from PR #5), inject `--format json`, session flag `--session` defaults, identity prefix. - `src/agents/pi.ts` – parse NDJSON `AssistantMessageEvent` (final `message_end.message.content[text]`), inject `--mode json`/`-p` defaults, session flags. - `src/agents/codex.ts` – parse Codex JSONL (last `item` with `type:"agent_message"`; usage from `turn.completed`), inject `codex exec --json --skip-git-repo-check`, sandbox default read-only. +- `src/agents/gemini.ts` – minimal parsing (plain text), identity prepend, honors `--output-format` when `format` is set, and defaults to `--resume {{SessionId}}` for session resume (new sessions need no flag). Override `sessionArgNew/sessionArgResume` if you use a different session strategy. - Shared MEDIA extraction stays in `media/parse.ts`. ## Command runner changes diff --git a/src/agents/agents.test.ts b/src/agents/agents.test.ts index 06e64037e..6f4605eb8 100644 --- a/src/agents/agents.test.ts +++ b/src/agents/agents.test.ts @@ -4,6 +4,7 @@ import { CLAUDE_IDENTITY_PREFIX } from "../auto-reply/claude.js"; import { OPENCODE_IDENTITY_PREFIX } from "../auto-reply/opencode.js"; import { claudeSpec } from "./claude.js"; import { codexSpec } from "./codex.js"; +import { GEMINI_IDENTITY_PREFIX, geminiSpec } from "./gemini.js"; import { opencodeSpec } from "./opencode.js"; import { piSpec } from "./pi.js"; @@ -115,4 +116,33 @@ describe("agent buildArgs + parseOutput helpers", () => { expect(built).toContain("--skip-git-repo-check"); expect(built).toContain("read-only"); }); + + it("geminiSpec prepends identity unless already sent", () => { + const argv = ["gemini", "hi"]; + const built = geminiSpec.buildArgs({ + argv, + bodyIndex: 1, + isNewSession: true, + sessionId: "sess", + sendSystemOnce: false, + systemSent: false, + identityPrefix: undefined, + format: "json", + }); + expect(built.at(-1)).toContain(GEMINI_IDENTITY_PREFIX); + + const builtOnce = geminiSpec.buildArgs({ + argv, + bodyIndex: 1, + isNewSession: false, + sessionId: "sess", + sendSystemOnce: true, + systemSent: true, + identityPrefix: undefined, + format: "json", + }); + expect(builtOnce.at(-1)).toBe("hi"); + expect(builtOnce).toContain("--output-format"); + expect(builtOnce).toContain("json"); + }); }); diff --git a/src/agents/claude.ts b/src/agents/claude.ts index 261dfe4ce..e656acb5c 100644 --- a/src/agents/claude.ts +++ b/src/agents/claude.ts @@ -12,7 +12,16 @@ import type { AgentMeta, AgentSpec } from "./types.js"; function toMeta(parsed?: ClaudeJsonParseResult): AgentMeta | undefined { if (!parsed?.parsed) return undefined; const summary = summarizeClaudeMetadata(parsed.parsed); - return summary ? { extra: { summary } } : undefined; + const sessionId = + parsed.parsed && + typeof parsed.parsed === "object" && + typeof (parsed.parsed as { session_id?: unknown }).session_id === "string" + ? (parsed.parsed as { session_id: string }).session_id + : undefined; + const meta: AgentMeta = {}; + if (sessionId) meta.sessionId = sessionId; + if (summary) meta.extra = { summary }; + return Object.keys(meta).length ? meta : undefined; } export const claudeSpec: AgentSpec = { diff --git a/src/agents/gemini.ts b/src/agents/gemini.ts new file mode 100644 index 000000000..91e2bf50a --- /dev/null +++ b/src/agents/gemini.ts @@ -0,0 +1,50 @@ +import path from "node:path"; + +import type { AgentMeta, AgentSpec } from "./types.js"; + +const GEMINI_BIN = "gemini"; +export const GEMINI_IDENTITY_PREFIX = + "You are Gemini responding for warelay. Keep WhatsApp replies concise (<1500 chars). If the prompt contains media paths or a Transcript block, use them. If this was a heartbeat probe and nothing needs attention, reply with exactly HEARTBEAT_OK."; + +// Gemini CLI currently prints plain text; --output json is flaky across versions, so we +// keep parsing minimal and let MEDIA token stripping happen later in the pipeline. +function parseGeminiOutput(raw: string): { text?: string; meta?: AgentMeta } { + const trimmed = raw.trim(); + return { text: trimmed || undefined, meta: undefined }; +} + +export const geminiSpec: AgentSpec = { + kind: "gemini", + isInvocation: (argv) => + argv.length > 0 && path.basename(argv[0]) === GEMINI_BIN, + buildArgs: (ctx) => { + const argv = [...ctx.argv]; + const body = argv[ctx.bodyIndex] ?? ""; + const beforeBody = argv.slice(0, ctx.bodyIndex); + const afterBody = argv.slice(ctx.bodyIndex + 1); + + if (ctx.format) { + const hasOutput = + beforeBody.some( + (p) => p === "--output-format" || p.startsWith("--output-format="), + ) || + afterBody.some( + (p) => p === "--output-format" || p.startsWith("--output-format="), + ); + if (!hasOutput) { + beforeBody.push("--output-format", ctx.format); + } + } + + const shouldPrependIdentity = !(ctx.sendSystemOnce && ctx.systemSent); + const bodyWithIdentity = + shouldPrependIdentity && body + ? [ctx.identityPrefix ?? GEMINI_IDENTITY_PREFIX, body] + .filter(Boolean) + .join("\n\n") + : body; + + return [...beforeBody, bodyWithIdentity, ...afterBody]; + }, + parseOutput: parseGeminiOutput, +}; diff --git a/src/agents/index.ts b/src/agents/index.ts index 231d8a3eb..e2b5a35e9 100644 --- a/src/agents/index.ts +++ b/src/agents/index.ts @@ -1,5 +1,6 @@ import { claudeSpec } from "./claude.js"; import { codexSpec } from "./codex.js"; +import { geminiSpec } from "./gemini.js"; import { opencodeSpec } from "./opencode.js"; import { piSpec } from "./pi.js"; import type { AgentKind, AgentSpec } from "./types.js"; @@ -7,6 +8,7 @@ import type { AgentKind, AgentSpec } from "./types.js"; const specs: Record = { claude: claudeSpec, codex: codexSpec, + gemini: geminiSpec, opencode: opencodeSpec, pi: piSpec, }; diff --git a/src/agents/pi.ts b/src/agents/pi.ts index c7359b98e..948afd837 100644 --- a/src/agents/pi.ts +++ b/src/agents/pi.ts @@ -47,7 +47,11 @@ function parsePiJson(raw: string): AgentParseResult { export const piSpec: AgentSpec = { kind: "pi", - isInvocation: (argv) => argv.length > 0 && path.basename(argv[0]) === "pi", + isInvocation: (argv) => { + if (argv.length === 0) return false; + const base = path.basename(argv[0]).replace(/\.(m?js)$/i, ""); + return base === "pi" || base === "tau"; + }, buildArgs: (ctx) => { const argv = [...ctx.argv]; // Non-interactive print + JSON diff --git a/src/agents/types.ts b/src/agents/types.ts index d430fb296..e704cb029 100644 --- a/src/agents/types.ts +++ b/src/agents/types.ts @@ -1,9 +1,10 @@ -export type AgentKind = "claude" | "opencode" | "pi" | "codex"; +export type AgentKind = "claude" | "opencode" | "pi" | "codex" | "gemini"; export type AgentMeta = { model?: string; provider?: string; stopReason?: string; + sessionId?: string; usage?: { input?: number; output?: number; diff --git a/src/auto-reply/command-reply.ts b/src/auto-reply/command-reply.ts index eb1838be0..f79ff45e2 100644 --- a/src/auto-reply/command-reply.ts +++ b/src/auto-reply/command-reply.ts @@ -6,9 +6,11 @@ import type { AgentMeta } from "../agents/types.js"; import type { WarelayConfig } from "../config/config.js"; import { isVerbose, logVerbose } from "../globals.js"; import { logError } from "../logger.js"; +import { getChildLogger } from "../logging.js"; import { splitMediaFromOutput } from "../media/parse.js"; import { enqueueCommand } from "../process/command-queue.js"; import type { runCommandWithTimeout } from "../process/exec.js"; +import { runPiRpc } from "../process/tau-rpc.js"; import { applyTemplate, type TemplateContext } from "./templating.js"; import type { ReplyPayload } from "./types.js"; @@ -99,6 +101,12 @@ export function summarizeClaudeMetadata(payload: unknown): string | undefined { export async function runCommandReply( params: CommandReplyParams, ): Promise { + const logger = getChildLogger({ module: "command-reply" }); + const verboseLog = (msg: string) => { + logger.debug(msg); + if (isVerbose()) logVerbose(msg); + }; + const { reply, templatingCtx, @@ -133,14 +141,25 @@ export async function runCommandReply( // Session args prepared (templated) and injected generically if (reply.session) { - const defaultNew = - agentCfg.kind === "claude" - ? ["--session-id", "{{SessionId}}"] - : ["--session", "{{SessionId}}"]; - const defaultResume = - agentCfg.kind === "claude" - ? ["--resume", "{{SessionId}}"] - : ["--session", "{{SessionId}}"]; + const defaultSessionArgs = (() => { + switch (agentCfg.kind) { + case "claude": + return { + newArgs: ["--session-id", "{{SessionId}}"], + resumeArgs: ["--resume", "{{SessionId}}"], + }; + case "gemini": + // Gemini CLI supports --resume ; starting a new session needs no flag. + return { newArgs: [], resumeArgs: ["--resume", "{{SessionId}}"] }; + default: + return { + newArgs: ["--session", "{{SessionId}}"], + resumeArgs: ["--session", "{{SessionId}}"], + }; + } + })(); + const defaultNew = defaultSessionArgs.newArgs; + const defaultResume = defaultSessionArgs.resumeArgs; const sessionArgList = ( isNewSession ? (reply.session.sessionArgNew ?? defaultNew) @@ -170,7 +189,7 @@ export async function runCommandReply( systemSent, identityPrefix: agentCfg.identityPrefix, format: agentCfg.format, - }) + }) : argv; logVerbose( @@ -181,20 +200,41 @@ export async function runCommandReply( let queuedMs: number | undefined; let queuedAhead: number | undefined; try { - const { stdout, stderr, code, signal, killed } = await enqueue( - () => commandRunner(finalArgv, { timeoutMs, cwd: reply.cwd }), - { - onWait: (waitMs, ahead) => { - queuedMs = waitMs; - queuedAhead = ahead; - if (isVerbose()) { - logVerbose( - `Command auto-reply queued for ${waitMs}ms (${queuedAhead} ahead)`, - ); + const run = async () => { + // Prefer long-lived tau RPC for pi agent to avoid cold starts. + if (agentKind === "pi") { + const body = finalArgv[bodyIndex] ?? ""; + // Build rpc args without the prompt body; force --mode rpc. + const rpcArgv = (() => { + const copy = [...finalArgv]; + copy.splice(bodyIndex, 1); + const modeIdx = copy.findIndex((a) => a === "--mode"); + if (modeIdx >= 0 && copy[modeIdx + 1]) { + copy.splice(modeIdx, 2, "--mode", "rpc"); + } else if (!copy.includes("--mode")) { + copy.splice(copy.length - 1, 0, "--mode", "rpc"); } - }, + return copy; + })(); + return await runPiRpc({ + argv: rpcArgv, + cwd: reply.cwd, + prompt: body, + timeoutMs, + }); + } + return await commandRunner(finalArgv, { timeoutMs, cwd: reply.cwd }); + }; + + const { stdout, stderr, code, signal, killed } = await enqueue(run, { + onWait: (waitMs, ahead) => { + queuedMs = waitMs; + queuedAhead = ahead; + if (isVerbose()) { + logVerbose(`Command auto-reply queued for ${waitMs}ms (${queuedAhead} ahead)`); + } }, - ); + }); const rawStdout = stdout.trim(); let mediaFromCommand: string[] | undefined; let trimmed = rawStdout; @@ -214,17 +254,19 @@ export async function runCommandReply( trimmed = cleanedText; if (mediaFound?.length) { mediaFromCommand = mediaFound; - if (isVerbose()) logVerbose(`MEDIA token extracted: ${mediaFound}`); - } else if (isVerbose()) { - logVerbose("No MEDIA token extracted from final text"); + verboseLog(`MEDIA token extracted: ${mediaFound}`); + } else { + verboseLog("No MEDIA token extracted from final text"); } if (!trimmed && !mediaFromCommand) { const meta = parsed?.meta?.extra?.summary ?? undefined; trimmed = `(command produced no output${meta ? `; ${meta}` : ""})`; - logVerbose("No text/media produced; injecting fallback notice to user"); + verboseLog("No text/media produced; injecting fallback notice to user"); } - logVerbose(`Command auto-reply stdout (trimmed): ${trimmed || ""}`); - logVerbose(`Command auto-reply finished in ${Date.now() - started}ms`); + verboseLog(`Command auto-reply stdout (trimmed): ${trimmed || ""}`); + const elapsed = Date.now() - started; + verboseLog(`Command auto-reply finished in ${elapsed}ms`); + logger.info({ durationMs: elapsed, agent: agentKind, cwd: reply.cwd }, "command auto-reply finished"); if ((code ?? 0) !== 0) { console.error( `Command auto-reply exited with code ${code ?? "unknown"} (signal: ${signal ?? "none"})`, @@ -311,17 +353,16 @@ export async function runCommandReply( killed, agentMeta: parsed?.meta, }; - if (isVerbose()) { - logVerbose(`Command auto-reply meta: ${JSON.stringify(meta)}`); - } + verboseLog(`Command auto-reply meta: ${JSON.stringify(meta)}`); return { payload, meta }; } catch (err) { const elapsed = Date.now() - started; + logger.info({ durationMs: elapsed, agent: agentKind, cwd: reply.cwd }, "command auto-reply failed"); const anyErr = err as { killed?: boolean; signal?: string }; const timeoutHit = anyErr.killed === true || anyErr.signal === "SIGKILL"; const errorObj = err as { stdout?: string; stderr?: string }; if (errorObj.stderr?.trim()) { - logVerbose(`Command auto-reply stderr: ${errorObj.stderr.trim()}`); + verboseLog(`Command auto-reply stderr: ${errorObj.stderr.trim()}`); } if (timeoutHit) { console.error( diff --git a/src/auto-reply/reply.ts b/src/auto-reply/reply.ts index e59f2c683..d1d930885 100644 --- a/src/auto-reply/reply.ts +++ b/src/auto-reply/reply.ts @@ -8,6 +8,7 @@ import { deriveSessionKey, loadSessionStore, resolveStorePath, + type SessionEntry, saveSessionStore, } from "../config/sessions.js"; import { info, isVerbose, logVerbose } from "../globals.js"; @@ -27,6 +28,15 @@ import type { GetReplyOptions, ReplyPayload } from "./types.js"; export type { GetReplyOptions, ReplyPayload } from "./types.js"; +const ABORT_TRIGGERS = new Set(["stop", "esc", "abort", "wait", "exit"]); +const ABORT_MEMORY = new Map(); + +function isAbortTrigger(text?: string): boolean { + if (!text) return false; + const normalized = text.trim().toLowerCase(); + return ABORT_TRIGGERS.has(normalized); +} + export async function getReplyFromConfig( ctx: MsgContext, opts?: GetReplyOptions, @@ -95,11 +105,13 @@ export async function getReplyFromConfig( const storePath = resolveStorePath(sessionCfg?.store); let sessionStore: ReturnType | undefined; let sessionKey: string | undefined; + let sessionEntry: SessionEntry | undefined; let sessionId: string | undefined; let isNewSession = false; let bodyStripped: string | undefined; let systemSent = false; + let abortedLastRun = false; if (sessionCfg) { const trimmedBody = (ctx.Body ?? "").trim(); @@ -127,13 +139,21 @@ export async function getReplyFromConfig( if (!isNewSession && freshEntry) { sessionId = entry.sessionId; systemSent = entry.systemSent ?? false; + abortedLastRun = entry.abortedLastRun ?? false; } else { sessionId = crypto.randomUUID(); isNewSession = true; systemSent = false; + abortedLastRun = false; } - sessionStore[sessionKey] = { sessionId, updatedAt: Date.now(), systemSent }; + sessionEntry = { + sessionId, + updatedAt: Date.now(), + systemSent, + abortedLastRun, + }; + sessionStore[sessionKey] = sessionEntry; await saveSessionStore(storePath, sessionStore); } @@ -149,6 +169,11 @@ export async function getReplyFromConfig( const from = (ctx.From ?? "").replace(/^whatsapp:/, ""); const to = (ctx.To ?? "").replace(/^whatsapp:/, ""); const isSamePhone = from && to && from === to; + const abortKey = sessionKey ?? (from || undefined) ?? (to || undefined); + + if (!sessionEntry && abortKey) { + abortedLastRun = ABORT_MEMORY.get(abortKey) ?? false; + } // Same-phone mode (self-messaging) is always allowed if (isSamePhone) { @@ -164,6 +189,23 @@ export async function getReplyFromConfig( } } + const abortRequested = + reply?.mode === "command" && + isAbortTrigger((sessionCtx.BodyStripped ?? sessionCtx.Body ?? "").trim()); + + if (abortRequested) { + if (sessionEntry && sessionStore && sessionKey) { + sessionEntry.abortedLastRun = true; + sessionEntry.updatedAt = Date.now(); + sessionStore[sessionKey] = sessionEntry; + await saveSessionStore(storePath, sessionStore); + } else if (abortKey) { + ABORT_MEMORY.set(abortKey, true); + } + cleanupTyping(); + return { text: "Agent was aborted." }; + } + await startTypingLoop(); // Optional prefix injected before Body for templating/command prompts. @@ -177,16 +219,30 @@ export async function getReplyFromConfig( ? applyTemplate(reply.bodyPrefix, sessionCtx) : ""; const baseBody = sessionCtx.BodyStripped ?? sessionCtx.Body ?? ""; - const prefixedBodyBase = (() => { - let body = baseBody; - if (!sendSystemOnce || isFirstTurnInSession) { - body = bodyPrefix ? `${bodyPrefix}${body}` : body; + const abortedHint = + reply?.mode === "command" && abortedLastRun + ? "Note: The previous agent run was aborted by the user. Resume carefully or ask for clarification." + : ""; + let prefixedBodyBase = baseBody; + if (!sendSystemOnce || isFirstTurnInSession) { + prefixedBodyBase = bodyPrefix + ? `${bodyPrefix}${prefixedBodyBase}` + : prefixedBodyBase; + } + if (sessionIntro) { + prefixedBodyBase = `${sessionIntro}\n\n${prefixedBodyBase}`; + } + if (abortedHint) { + prefixedBodyBase = `${abortedHint}\n\n${prefixedBodyBase}`; + if (sessionEntry && sessionStore && sessionKey) { + sessionEntry.abortedLastRun = false; + sessionEntry.updatedAt = Date.now(); + sessionStore[sessionKey] = sessionEntry; + await saveSessionStore(storePath, sessionStore); + } else if (abortKey) { + ABORT_MEMORY.set(abortKey, false); } - if (sessionIntro) { - body = `${sessionIntro}\n\n${body}`; - } - return body; - })(); + } if ( sessionCfg && sendSystemOnce && @@ -194,12 +250,18 @@ export async function getReplyFromConfig( sessionStore && sessionKey ) { - sessionStore[sessionKey] = { - ...(sessionStore[sessionKey] ?? {}), - sessionId: sessionId ?? crypto.randomUUID(), + const current = sessionEntry ?? + sessionStore[sessionKey] ?? { + sessionId: sessionId ?? crypto.randomUUID(), + updatedAt: Date.now(), + }; + sessionEntry = { + ...current, + sessionId: sessionId ?? current.sessionId ?? crypto.randomUUID(), updatedAt: Date.now(), systemSent: true, }; + sessionStore[sessionKey] = sessionEntry; await saveSessionStore(storePath, sessionStore); systemSent = true; } @@ -265,6 +327,31 @@ export async function getReplyFromConfig( timeoutSeconds, commandRunner, }); + if (sessionCfg && sessionStore && sessionKey) { + const returnedSessionId = meta.agentMeta?.sessionId; + if (returnedSessionId && returnedSessionId !== sessionId) { + const entry = sessionEntry ?? + sessionStore[sessionKey] ?? { + sessionId: returnedSessionId, + updatedAt: Date.now(), + systemSent, + abortedLastRun, + }; + sessionEntry = { + ...entry, + sessionId: returnedSessionId, + updatedAt: Date.now(), + }; + sessionStore[sessionKey] = sessionEntry; + await saveSessionStore(storePath, sessionStore); + sessionId = returnedSessionId; + if (isVerbose()) { + logVerbose( + `Session id updated from agent meta: ${returnedSessionId} (store: ${storePath})`, + ); + } + } + } if (meta.agentMeta && isVerbose()) { logVerbose(`Agent meta: ${JSON.stringify(meta.agentMeta)}`); } diff --git a/src/config/config.ts b/src/config/config.ts index ccb7c06b1..74c6efa6d 100644 --- a/src/config/config.ts +++ b/src/config/config.ts @@ -117,6 +117,7 @@ const ReplySchema = z z.literal("opencode"), z.literal("pi"), z.literal("codex"), + z.literal("gemini"), ]), format: z.union([z.literal("text"), z.literal("json")]).optional(), identityPrefix: z.string().optional(), diff --git a/src/config/sessions.ts b/src/config/sessions.ts index 9c5d10096..eba690e69 100644 --- a/src/config/sessions.ts +++ b/src/config/sessions.ts @@ -12,6 +12,7 @@ export type SessionEntry = { sessionId: string; updatedAt: number; systemSent?: boolean; + abortedLastRun?: boolean; }; export const SESSION_STORE_DEFAULT = path.join(CONFIG_DIR, "sessions.json"); diff --git a/src/index.core.test.ts b/src/index.core.test.ts index f213719d4..8e56d6309 100644 --- a/src/index.core.test.ts +++ b/src/index.core.test.ts @@ -671,6 +671,124 @@ describe("config and templating", () => { expect(secondArgv[secondArgv.length - 1]).toBe("[sys] next"); }); + it("stores session id returned by agent meta when it differs", async () => { + const tmpStore = path.join( + os.tmpdir(), + `warelay-store-${Date.now()}-sessionid.json`, + ); + vi.spyOn(crypto, "randomUUID").mockReturnValue("initial-sid"); + const runSpy = vi.spyOn(index, "runCommandWithTimeout").mockResolvedValue({ + stdout: '{"text":"hi","session_id":"agent-sid-123"}\n', + stderr: "", + code: 0, + signal: null, + killed: false, + }); + const cfg = { + inbound: { + reply: { + mode: "command" as const, + command: ["claude", "{{Body}}"], + agent: { kind: "claude", format: "json" as const }, + session: { store: tmpStore }, + }, + }, + }; + + await index.getReplyFromConfig( + { Body: "/new hi", From: "+1", To: "+2" }, + undefined, + cfg, + runSpy, + ); + + const persisted = JSON.parse(fs.readFileSync(tmpStore, "utf-8")); + const entry = Object.values(persisted)[0] as { sessionId?: string }; + expect(entry.sessionId).toBe("agent-sid-123"); + }); + + it("aborts command when stop word is received and skips command runner", async () => { + const tmpStore = path.join( + os.tmpdir(), + `warelay-store-${Date.now()}-abort.json`, + ); + const runSpy = vi.fn().mockResolvedValue({ + stdout: "should-not-run", + stderr: "", + code: 0, + signal: null, + killed: false, + }); + const cfg = { + inbound: { + reply: { + mode: "command" as const, + command: ["echo", "{{Body}}"], + session: { store: tmpStore }, + }, + }, + }; + + const result = await index.getReplyFromConfig( + { Body: "stop", From: "+1", To: "+2" }, + undefined, + cfg, + runSpy, + ); + + expect(result?.text).toMatch(/aborted/i); + expect(runSpy).not.toHaveBeenCalled(); + const persisted = JSON.parse(fs.readFileSync(tmpStore, "utf-8")); + const entry = Object.values(persisted)[0] as { abortedLastRun?: boolean }; + expect(entry.abortedLastRun).toBe(true); + }); + + it("adds an abort hint to the next prompt and then clears the flag", async () => { + const tmpStore = path.join( + os.tmpdir(), + `warelay-store-${Date.now()}-aborthint.json`, + ); + const runSpy = vi.fn().mockResolvedValue({ + stdout: "ok\n", + stderr: "", + code: 0, + signal: null, + killed: false, + }); + const cfg = { + inbound: { + reply: { + mode: "command" as const, + command: ["echo", "{{Body}}"], + session: { store: tmpStore }, + }, + }, + }; + + await index.getReplyFromConfig( + { Body: "abort", From: "+1555", To: "+2666" }, + undefined, + cfg, + runSpy, + ); + + const result = await index.getReplyFromConfig( + { Body: "continue", From: "+1555", To: "+2666" }, + undefined, + cfg, + runSpy, + ); + + const argv = runSpy.mock.calls[0][0]; + const prompt = argv.at(-1) as string; + expect(prompt).toMatch(/previous agent run was aborted/i); + expect(prompt).toMatch(/continue/); + const persisted = JSON.parse(fs.readFileSync(tmpStore, "utf-8")); + const entry = Object.values(persisted)[0] as { abortedLastRun?: boolean }; + expect(entry.abortedLastRun).toBe(false); + expect(result?.text).toBe("ok"); + }); + it("refreshes typing indicator while command runs", async () => { const onReplyStart = vi.fn(); const runSpy = vi.spyOn(index, "runCommandWithTimeout").mockImplementation( diff --git a/src/process/tau-rpc.ts b/src/process/tau-rpc.ts new file mode 100644 index 000000000..9d28e7ed5 --- /dev/null +++ b/src/process/tau-rpc.ts @@ -0,0 +1,116 @@ +import { spawn, type ChildProcessWithoutNullStreams } from "node:child_process"; +import readline from "node:readline"; + +type TauRpcOptions = { + argv: string[]; + cwd?: string; + timeoutMs: number; +}; + +type TauRpcResult = { stdout: string; stderr: string; code: number }; + +class TauRpcClient { + private child: ChildProcessWithoutNullStreams | null = null; + private rl: readline.Interface | null = null; + private stderr = ""; + private buffer: string[] = []; + private pending: + | { + resolve: (r: TauRpcResult) => void; + reject: (err: unknown) => void; + timer: NodeJS.Timeout; + } + | undefined; + + constructor(private readonly argv: string[], private readonly cwd: string | undefined) {} + + private ensureChild() { + if (this.child) return; + this.child = spawn(this.argv[0], this.argv.slice(1), { + cwd: this.cwd, + stdio: ["pipe", "pipe", "pipe"], + }); + this.rl = readline.createInterface({ input: this.child.stdout }); + this.rl.on("line", (line) => this.handleLine(line)); + this.child.stderr.on("data", (d) => { + this.stderr += d.toString(); + }); + this.child.on("exit", (code, signal) => { + if (this.pending) { + this.pending.reject(new Error(`tau rpc exited (code=${code}, signal=${signal})`)); + clearTimeout(this.pending.timer); + this.pending = undefined; + } + this.dispose(); + }); + } + + private handleLine(line: string) { + if (!this.pending) return; + this.buffer.push(line); + // Finish on assistant message_end event to mirror parse logic in piSpec + if (line.includes('"type":"message_end"') && line.includes('"role":"assistant"')) { + const out = this.buffer.join("\n"); + clearTimeout(this.pending.timer); + const pending = this.pending; + this.pending = undefined; + this.buffer = []; + pending.resolve({ stdout: out, stderr: this.stderr, code: 0 }); + } + } + + async prompt(prompt: string, timeoutMs: number): Promise { + this.ensureChild(); + if (this.pending) { + throw new Error("tau rpc already handling a request"); + } + const child = this.child!; + await new Promise((resolve, reject) => { + const ok = child.stdin.write( + JSON.stringify({ + type: "prompt", + message: { role: "user", content: [{ type: "text", text: prompt }] }, + }) + "\n", + (err) => (err ? reject(err) : resolve()), + ); + if (!ok) child.stdin.once("drain", () => resolve()); + }); + return await new Promise((resolve, reject) => { + const timer = setTimeout(() => { + this.pending = undefined; + reject(new Error(`tau rpc timed out after ${timeoutMs}ms`)); + child.kill("SIGKILL"); + }, timeoutMs); + this.pending = { resolve, reject, timer }; + }); + } + + dispose() { + this.rl?.close(); + this.rl = null; + if (this.child && !this.child.killed) { + this.child.kill("SIGKILL"); + } + this.child = null; + this.buffer = []; + this.stderr = ""; + } +} + +let singleton: { key: string; client: TauRpcClient } | undefined; + +export async function runPiRpc( + opts: TauRpcOptions & { prompt: string }, +): Promise { + const key = `${opts.cwd ?? ""}|${opts.argv.join(" ")}`; + if (!singleton || singleton.key !== key) { + singleton?.client.dispose(); + singleton = { key, client: new TauRpcClient(opts.argv, opts.cwd) }; + } + return singleton.client.prompt(opts.prompt, opts.timeoutMs); +} + +export function resetPiRpc() { + singleton?.client.dispose(); + singleton = undefined; +} diff --git a/src/web/auto-reply.ts b/src/web/auto-reply.ts index 9b6e9e917..2cf9722c8 100644 --- a/src/web/auto-reply.ts +++ b/src/web/auto-reply.ts @@ -12,7 +12,7 @@ import { import { danger, info, isVerbose, logVerbose, success } from "../globals.js"; import { logInfo } from "../logger.js"; import { getChildLogger } from "../logging.js"; -import { enqueueCommand, getQueueSize } from "../process/command-queue.js"; +import { getQueueSize } from "../process/command-queue.js"; import { defaultRuntime, type RuntimeEnv } from "../runtime.js"; import { normalizeE164 } from "../utils.js"; import { monitorWebInbox } from "./inbound.js"; @@ -621,21 +621,19 @@ export async function monitorWebProvider( : new Date().toISOString(); console.log(`\n[${tsDisplay}] ${from} -> ${latest.to}: ${combinedBody}`); - const replyResult = await enqueueCommand(() => - (replyResolver ?? getReplyFromConfig)( - { - Body: combinedBody, - From: latest.from, - To: latest.to, - MessageSid: latest.id, - MediaPath: latest.mediaPath, - MediaUrl: latest.mediaUrl, - MediaType: latest.mediaType, - }, - { - onReplyStart: latest.sendComposing, - }, - ), + const replyResult = await (replyResolver ?? getReplyFromConfig)( + { + Body: combinedBody, + From: latest.from, + To: latest.to, + MessageSid: latest.id, + MediaPath: latest.mediaPath, + MediaUrl: latest.mediaUrl, + MediaType: latest.mediaType, + }, + { + onReplyStart: latest.sendComposing, + }, ); if ( @@ -931,24 +929,19 @@ export async function monitorWebProvider( "reply heartbeat start", ); } - const hbFrom = lastInboundMsg.from; - const hbTo = lastInboundMsg.to; - const hbComposing = lastInboundMsg.sendComposing; - const replyResult = await enqueueCommand(() => - (replyResolver ?? getReplyFromConfig)( - { - Body: HEARTBEAT_PROMPT, - From: hbFrom, - To: hbTo, - MessageSid: snapshot.entry?.sessionId, - MediaPath: undefined, - MediaUrl: undefined, - MediaType: undefined, - }, - { - onReplyStart: hbComposing, - }, - ), + const replyResult = await (replyResolver ?? getReplyFromConfig)( + { + Body: HEARTBEAT_PROMPT, + From: lastInboundMsg.from, + To: lastInboundMsg.to, + MessageSid: snapshot.entry?.sessionId, + MediaPath: undefined, + MediaUrl: undefined, + MediaType: undefined, + }, + { + onReplyStart: lastInboundMsg.sendComposing, + }, ); if ( diff --git a/src/web/media.ts b/src/web/media.ts index 76c0e8605..83515704d 100644 --- a/src/web/media.ts +++ b/src/web/media.ts @@ -1,4 +1,5 @@ import fs from "node:fs/promises"; +import path from "node:path"; import sharp from "sharp"; import { isVerbose, logVerbose } from "../globals.js"; @@ -12,7 +13,12 @@ import { detectMime } from "../media/mime.js"; export async function loadWebMedia( mediaUrl: string, maxBytes?: number, -): Promise<{ buffer: Buffer; contentType?: string; kind: MediaKind }> { +): Promise<{ + buffer: Buffer; + contentType?: string; + kind: MediaKind; + fileName?: string; +}> { if (mediaUrl.startsWith("file://")) { mediaUrl = mediaUrl.replace("file://", ""); } @@ -40,6 +46,14 @@ export async function loadWebMedia( }; if (/^https?:\/\//i.test(mediaUrl)) { + let fileName: string | undefined; + try { + const url = new URL(mediaUrl); + const base = path.basename(url.pathname); + fileName = base || undefined; + } catch { + // ignore parse errors; leave undefined + } const res = await fetch(mediaUrl); if (!res.ok || !res.body) { throw new Error(`Failed to fetch media: HTTP ${res.status}`); @@ -56,7 +70,7 @@ export async function loadWebMedia( maxBytesForKind(kind), ); if (kind === "image") { - return optimizeAndClampImage(array, cap); + return { ...(await optimizeAndClampImage(array, cap)), fileName }; } if (array.length > cap) { throw new Error( @@ -65,19 +79,25 @@ export async function loadWebMedia( ).toFixed(2)}MB)`, ); } - return { buffer: array, contentType: contentType ?? undefined, kind }; + return { + buffer: array, + contentType: contentType ?? undefined, + kind, + fileName, + }; } // Local path const data = await fs.readFile(mediaUrl); const mime = detectMime({ buffer: data, filePath: mediaUrl }); const kind = mediaKindFromMime(mime); + const fileName = path.basename(mediaUrl) || undefined; const cap = Math.min( maxBytes ?? maxBytesForKind(kind), maxBytesForKind(kind), ); if (kind === "image") { - return optimizeAndClampImage(data, cap); + return { ...(await optimizeAndClampImage(data, cap)), fileName }; } if (data.length > cap) { throw new Error( @@ -86,7 +106,7 @@ export async function loadWebMedia( ).toFixed(2)}MB)`, ); } - return { buffer: data, contentType: mime, kind }; + return { buffer: data, contentType: mime, kind, fileName }; } export async function optimizeImageToJpeg( diff --git a/src/web/outbound.test.ts b/src/web/outbound.test.ts index 2e4abecd2..6b8132358 100644 --- a/src/web/outbound.test.ts +++ b/src/web/outbound.test.ts @@ -1,3 +1,4 @@ +import type { AnyMessageContent } from "@whiskeysockets/baileys"; import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; import { resetLogger, setLoggerOverride } from "../logging.js"; @@ -17,6 +18,11 @@ vi.mock("./session.js", () => { }; }); +const loadWebMediaMock = vi.fn(); +vi.mock("./media.js", () => ({ + loadWebMedia: (...args: unknown[]) => loadWebMediaMock(...args), +})); + import { sendMessageWeb } from "./outbound.js"; const { createWaSocket } = await import("./session.js"); @@ -37,4 +43,98 @@ describe("web outbound", () => { expect(sock.sendMessage).toHaveBeenCalled(); expect(sock.ws.close).toHaveBeenCalled(); }); + + it("maps audio to PTT with opus mime when ogg", async () => { + const buf = Buffer.from("audio"); + loadWebMediaMock.mockResolvedValueOnce({ + buffer: buf, + contentType: "audio/ogg", + kind: "audio", + }); + await sendMessageWeb("+1555", "voice note", { + verbose: false, + mediaUrl: "/tmp/voice.ogg", + }); + const sock = await createWaSocket(); + const [, payload] = sock.sendMessage.mock.calls.at(-1) as [ + string, + AnyMessageContent, + ]; + expect(payload).toMatchObject({ + audio: buf, + ptt: true, + mimetype: "audio/ogg; codecs=opus", + }); + }); + + it("maps video with caption", async () => { + const buf = Buffer.from("video"); + loadWebMediaMock.mockResolvedValueOnce({ + buffer: buf, + contentType: "video/mp4", + kind: "video", + }); + await sendMessageWeb("+1555", "clip", { + verbose: false, + mediaUrl: "/tmp/video.mp4", + }); + const sock = await createWaSocket(); + const [, payload] = sock.sendMessage.mock.calls.at(-1) as [ + string, + AnyMessageContent, + ]; + expect(payload).toMatchObject({ + video: buf, + caption: "clip", + mimetype: "video/mp4", + }); + }); + + it("maps image with caption", async () => { + const buf = Buffer.from("img"); + loadWebMediaMock.mockResolvedValueOnce({ + buffer: buf, + contentType: "image/jpeg", + kind: "image", + }); + await sendMessageWeb("+1555", "pic", { + verbose: false, + mediaUrl: "/tmp/pic.jpg", + }); + const sock = await createWaSocket(); + const [, payload] = sock.sendMessage.mock.calls.at(-1) as [ + string, + AnyMessageContent, + ]; + expect(payload).toMatchObject({ + image: buf, + caption: "pic", + mimetype: "image/jpeg", + }); + }); + + it("maps other kinds to document with filename", async () => { + const buf = Buffer.from("pdf"); + loadWebMediaMock.mockResolvedValueOnce({ + buffer: buf, + contentType: "application/pdf", + kind: "document", + fileName: "file.pdf", + }); + await sendMessageWeb("+1555", "doc", { + verbose: false, + mediaUrl: "/tmp/file.pdf", + }); + const sock = await createWaSocket(); + const [, payload] = sock.sendMessage.mock.calls.at(-1) as [ + string, + AnyMessageContent, + ]; + expect(payload).toMatchObject({ + document: buf, + fileName: "file.pdf", + caption: "doc", + mimetype: "application/pdf", + }); + }); }); diff --git a/src/web/outbound.ts b/src/web/outbound.ts index ac17ea5b9..f1d0af851 100644 --- a/src/web/outbound.ts +++ b/src/web/outbound.ts @@ -35,11 +35,39 @@ export async function sendMessageWeb( let payload: AnyMessageContent = { text: body }; if (options.mediaUrl) { const media = await loadWebMedia(options.mediaUrl); - payload = { - image: media.buffer, - caption: body || undefined, - mimetype: media.contentType, - }; + const caption = body || undefined; + if (media.kind === "audio") { + // WhatsApp expects explicit opus codec for PTT voice notes. + const mimetype = + media.contentType === "audio/ogg" + ? "audio/ogg; codecs=opus" + : media.contentType ?? "application/octet-stream"; + payload = { audio: media.buffer, ptt: true, mimetype }; + } else if (media.kind === "video") { + const mimetype = media.contentType ?? "application/octet-stream"; + payload = { + video: media.buffer, + caption, + mimetype, + }; + } else if (media.kind === "image") { + const mimetype = media.contentType ?? "application/octet-stream"; + payload = { + image: media.buffer, + caption, + mimetype, + }; + } else { + // Fallback to document for anything else (pdf, etc.). + const fileName = media.fileName ?? "file"; + const mimetype = media.contentType ?? "application/octet-stream"; + payload = { + document: media.buffer, + fileName, + caption, + mimetype, + }; + } } logInfo( `📤 Sending via web session -> ${jid}${options.mediaUrl ? " (media)" : ""}`,