diff --git a/src/auto-reply/claude.ts b/src/auto-reply/claude.ts new file mode 100644 index 000000000..83a8aaa95 --- /dev/null +++ b/src/auto-reply/claude.ts @@ -0,0 +1,60 @@ +// Helpers specific to Claude CLI output/argv handling. + +export const CLAUDE_BIN = "claude"; + +function extractClaudeText(payload: unknown): string | undefined { + // Best-effort walker to find the primary text field in Claude JSON outputs. + if (payload == null) return undefined; + if (typeof payload === "string") return payload; + if (Array.isArray(payload)) { + for (const item of payload) { + const found = extractClaudeText(item); + if (found) return found; + } + return undefined; + } + if (typeof payload === "object") { + const obj = payload as Record; + if (typeof obj.text === "string") return obj.text; + if (typeof obj.completion === "string") return obj.completion; + if (typeof obj.output === "string") return obj.output; + if (obj.message) { + const inner = extractClaudeText(obj.message); + if (inner) return inner; + } + if (Array.isArray(obj.messages)) { + const inner = extractClaudeText(obj.messages); + if (inner) return inner; + } + if (Array.isArray(obj.content)) { + for (const block of obj.content) { + if ( + block && + typeof block === "object" && + (block as { type?: string }).type === "text" && + typeof (block as { text?: unknown }).text === "string" + ) { + return (block as { text: string }).text; + } + const inner = extractClaudeText(block); + if (inner) return inner; + } + } + } + return undefined; +} + +export function parseClaudeJsonText(raw: string): string | undefined { + // Handle a single JSON blob or newline-delimited JSON; return the first extracted text. + const candidates = [raw, ...raw.split(/\n+/).map((s) => s.trim()).filter(Boolean)]; + for (const candidate of candidates) { + try { + const parsed = JSON.parse(candidate); + const text = extractClaudeText(parsed); + if (text) return text; + } catch { + // ignore parse errors; try next candidate + } + } + return undefined; +} diff --git a/src/auto-reply/reply.ts b/src/auto-reply/reply.ts new file mode 100644 index 000000000..3f72f99a6 --- /dev/null +++ b/src/auto-reply/reply.ts @@ -0,0 +1,357 @@ +import crypto from "node:crypto"; +import path from "node:path"; + +import { CLAUDE_BIN, parseClaudeJsonText } from "./claude.js"; +import { + applyTemplate, + type MsgContext, + type TemplateContext, +} from "./templating.js"; +import { + DEFAULT_IDLE_MINUTES, + DEFAULT_RESET_TRIGGER, + deriveSessionKey, + loadSessionStore, + resolveStorePath, + saveSessionStore, +} from "../config/sessions.js"; +import { + type WarelayConfig, + loadConfig, +} from "../config/config.js"; +import { + danger, + info, + isVerbose, + logVerbose, + warn, +} from "../globals.js"; +import { normalizeE164, withWhatsAppPrefix } from "../utils.js"; +import { + runCommandWithTimeout, + type SpawnResult, +} from "../process/exec.js"; +import { sendTypingIndicator } from "../twilio/typing.js"; +import { defaultRuntime, type RuntimeEnv } from "../runtime.js"; +import type { MessageInstance } from "twilio/lib/rest/api/v2010/account/message.js"; + +type GetReplyOptions = { + onReplyStart?: () => Promise | void; +}; + +export async function getReplyFromConfig( + ctx: MsgContext, + opts?: GetReplyOptions, + configOverride?: WarelayConfig, + commandRunner: typeof runCommandWithTimeout = runCommandWithTimeout, +): Promise { + // Choose reply from config: static text or external command stdout. + const cfg = configOverride ?? loadConfig(); + const reply = cfg.inbound?.reply; + const timeoutSeconds = Math.max(reply?.timeoutSeconds ?? 600, 1); + const timeoutMs = timeoutSeconds * 1000; + let started = false; + const onReplyStart = async () => { + if (started) return; + started = true; + await opts?.onReplyStart?.(); + }; + + // Optional session handling (conversation reuse + /new resets) + const sessionCfg = reply?.session; + const resetTriggers = sessionCfg?.resetTriggers?.length + ? sessionCfg.resetTriggers + : [DEFAULT_RESET_TRIGGER]; + const idleMinutes = Math.max( + sessionCfg?.idleMinutes ?? DEFAULT_IDLE_MINUTES, + 1, + ); + const sessionScope = sessionCfg?.scope ?? "per-sender"; + const storePath = resolveStorePath(sessionCfg?.store); + + let sessionId: string | undefined; + let isNewSession = false; + let bodyStripped: string | undefined; + + if (sessionCfg) { + const trimmedBody = (ctx.Body ?? "").trim(); + for (const trigger of resetTriggers) { + if (!trigger) continue; + if (trimmedBody === trigger) { + isNewSession = true; + bodyStripped = ""; + break; + } + const triggerPrefix = `${trigger} `; + if (trimmedBody.startsWith(triggerPrefix)) { + isNewSession = true; + bodyStripped = trimmedBody.slice(trigger.length).trimStart(); + break; + } + } + + const sessionKey = deriveSessionKey(sessionScope, ctx); + const store = loadSessionStore(storePath); + const entry = store[sessionKey]; + const idleMs = idleMinutes * 60_000; + const freshEntry = entry && Date.now() - entry.updatedAt <= idleMs; + + if (!isNewSession && freshEntry) { + sessionId = entry.sessionId; + } else { + sessionId = crypto.randomUUID(); + isNewSession = true; + } + + store[sessionKey] = { sessionId, updatedAt: Date.now() }; + await saveSessionStore(storePath, store); + } + + const sessionCtx: TemplateContext = { + ...ctx, + BodyStripped: bodyStripped ?? ctx.Body, + SessionId: sessionId, + IsNewSession: isNewSession ? "true" : "false", + }; + + // Optional prefix injected before Body for templating/command prompts. + const bodyPrefix = reply?.bodyPrefix + ? applyTemplate(reply.bodyPrefix, sessionCtx) + : ""; + const prefixedBody = bodyPrefix + ? `${bodyPrefix}${sessionCtx.BodyStripped ?? sessionCtx.Body ?? ""}` + : (sessionCtx.BodyStripped ?? sessionCtx.Body); + const templatingCtx: TemplateContext = { + ...sessionCtx, + Body: prefixedBody, + BodyStripped: prefixedBody, + }; + + // Optional allowlist by origin number (E.164 without whatsapp: prefix) + const allowFrom = cfg.inbound?.allowFrom; + if (Array.isArray(allowFrom) && allowFrom.length > 0) { + const from = (ctx.From ?? "").replace(/^whatsapp:/, ""); + if (!allowFrom.includes(from)) { + logVerbose( + `Skipping auto-reply: sender ${from || ""} not in allowFrom list`, + ); + return undefined; + } + } + if (!reply) { + logVerbose("No inbound.reply configured; skipping auto-reply"); + return undefined; + } + + if (reply.mode === "text" && reply.text) { + await onReplyStart(); + logVerbose("Using text auto-reply from config"); + return applyTemplate(reply.text, templatingCtx); + } + + if (reply.mode === "command" && reply.command?.length) { + await onReplyStart(); + let argv = reply.command.map((part) => applyTemplate(part, templatingCtx)); + const templatePrefix = reply.template + ? applyTemplate(reply.template, templatingCtx) + : ""; + if (templatePrefix && argv.length > 0) { + argv = [argv[0], templatePrefix, ...argv.slice(1)]; + } + + // Ensure Claude commands can emit plain text by forcing --output-format when configured. + // We inject the flags only when the user points at the `claude` binary and has opted in via config, + // so existing custom argv or non-Claude commands remain untouched. + if ( + reply.claudeOutputFormat && + argv.length > 0 && + path.basename(argv[0]) === CLAUDE_BIN + ) { + const hasOutputFormat = argv.some( + (part) => + part === "--output-format" || part.startsWith("--output-format="), + ); + // Keep the final argument as the prompt/body; insert options just before it. + const insertBeforeBody = Math.max(argv.length - 1, 0); + if (!hasOutputFormat) { + argv = [ + ...argv.slice(0, insertBeforeBody), + "--output-format", + reply.claudeOutputFormat, + ...argv.slice(insertBeforeBody), + ]; + } + const hasPrintFlag = argv.some( + (part) => part === "-p" || part === "--print", + ); + if (!hasPrintFlag) { + const insertIdx = Math.max(argv.length - 1, 0); + argv = [...argv.slice(0, insertIdx), "-p", ...argv.slice(insertIdx)]; + } + } + + // Inject session args if configured (use resume for existing, session-id for new) + if (reply.session) { + const sessionArgList = ( + isNewSession + ? (reply.session.sessionArgNew ?? ["--session-id", "{{SessionId}}"]) + : (reply.session.sessionArgResume ?? ["--resume", "{{SessionId}}"]) + ).map((part) => applyTemplate(part, templatingCtx)); + if (sessionArgList.length) { + const insertBeforeBody = reply.session.sessionArgBeforeBody ?? true; + const insertAt = + insertBeforeBody && argv.length > 1 ? argv.length - 1 : argv.length; + argv = [ + ...argv.slice(0, insertAt), + ...sessionArgList, + ...argv.slice(insertAt), + ]; + } + } + const finalArgv = argv; + logVerbose(`Running command auto-reply: ${finalArgv.join(" ")}`); + const started = Date.now(); + try { + const { stdout, stderr, code, signal, killed } = await commandRunner( + finalArgv, + timeoutMs, + ); + let trimmed = stdout.trim(); + if (stderr?.trim()) { + logVerbose(`Command auto-reply stderr: ${stderr.trim()}`); + } + if (reply.claudeOutputFormat === "json" && trimmed) { + // Claude JSON mode: extract the human text for both logging and reply. + const extracted = parseClaudeJsonText(trimmed); + if (extracted) { + logVerbose( + `Claude JSON parsed -> ${extracted.slice(0, 120)}${extracted.length > 120 ? "…" : ""}`, + ); + trimmed = extracted.trim(); + } else { + logVerbose("Claude JSON parse failed; returning raw stdout"); + } + } + logVerbose( + `Command auto-reply stdout (trimmed): ${trimmed || ""}`, + ); + logVerbose(`Command auto-reply finished in ${Date.now() - started}ms`); + if ((code ?? 0) !== 0) { + console.error( + `Command auto-reply exited with code ${code ?? "unknown"} (signal: ${signal ?? "none"})`, + ); + return undefined; + } + if (killed && !signal) { + console.error( + `Command auto-reply process killed before completion (exit code ${code ?? "unknown"})`, + ); + return undefined; + } + return trimmed || undefined; + } catch (err) { + const elapsed = Date.now() - started; + const anyErr = err as { killed?: boolean; signal?: string }; + const timeoutHit = anyErr.killed === true || anyErr.signal === "SIGKILL"; + const errorObj = err as { + stdout?: string; + stderr?: string; + }; + if (errorObj.stderr?.trim()) { + logVerbose(`Command auto-reply stderr: ${errorObj.stderr.trim()}`); + } + if (timeoutHit) { + console.error( + `Command auto-reply timed out after ${elapsed}ms (limit ${timeoutMs}ms)`, + ); + } else { + console.error(`Command auto-reply failed after ${elapsed}ms`, err); + } + return undefined; + } + } + + return undefined; +} + +type TwilioLikeClient = { + messages: { + create: (opts: { from?: string; to?: string; body: string }) => Promise; + }; +}; + +export async function autoReplyIfConfigured( + client: TwilioLikeClient, + message: MessageInstance, + configOverride?: WarelayConfig, + runtime: RuntimeEnv = defaultRuntime, +): Promise { + // Fire a config-driven reply (text or command) for the inbound message, if configured. + const ctx: MsgContext = { + Body: message.body ?? undefined, + From: message.from ?? undefined, + To: message.to ?? undefined, + MessageSid: message.sid, + }; + + const replyText = await getReplyFromConfig( + ctx, + { + onReplyStart: () => sendTypingIndicator(client, message.sid, runtime), + }, + configOverride, + ); + if (!replyText) return; + + const replyFrom = message.to; + const replyTo = message.from; + if (!replyFrom || !replyTo) { + if (isVerbose()) + console.error( + "Skipping auto-reply: missing to/from on inbound message", + ctx, + ); + return; + } + + logVerbose( + `Auto-replying via Twilio: from ${replyFrom} to ${replyTo}, body length ${replyText.length}`, + ); + + try { + await client.messages.create({ + from: replyFrom, + to: replyTo, + body: replyText, + }); + if (isVerbose()) { + console.log( + info( + `↩️ Auto-replied to ${replyTo} (sid ${message.sid ?? "no-sid"})`, + ), + ); + } + } catch (err) { + const anyErr = err as { + code?: string | number; + message?: unknown; + moreInfo?: unknown; + status?: string | number; + response?: { body?: unknown }; + }; + const { code, status } = anyErr; + const msg = + typeof anyErr?.message === "string" + ? anyErr.message + : (anyErr?.message ?? err); + runtime.error( + `❌ Twilio send failed${code ? ` (code ${code})` : ""}${status ? ` status ${status}` : ""}: ${msg}`, + ); + if (anyErr?.moreInfo) runtime.error(`More info: ${anyErr.moreInfo}`); + const responseBody = anyErr?.response?.body; + if (responseBody) { + runtime.error("Response body:"); + runtime.error(JSON.stringify(responseBody, null, 2)); + } + } +} diff --git a/src/auto-reply/templating.ts b/src/auto-reply/templating.ts new file mode 100644 index 000000000..b4494332d --- /dev/null +++ b/src/auto-reply/templating.ts @@ -0,0 +1,20 @@ +export type MsgContext = { + Body?: string; + From?: string; + To?: string; + MessageSid?: string; +}; + +export type TemplateContext = MsgContext & { + BodyStripped?: string; + SessionId?: string; + IsNewSession?: string; +}; + +export function applyTemplate(str: string, ctx: TemplateContext) { + // Simple {{Placeholder}} interpolation using inbound message context. + return str.replace(/{{\s*(\w+)\s*}}/g, (_, key) => { + const value = (ctx as Record)[key]; + return value == null ? "" : String(value); + }); +} diff --git a/src/config/config.ts b/src/config/config.ts new file mode 100644 index 000000000..08ed74e57 --- /dev/null +++ b/src/config/config.ts @@ -0,0 +1,51 @@ +import fs from "node:fs"; +import os from "node:os"; +import path from "node:path"; + +import JSON5 from "json5"; + +export type ReplyMode = "text" | "command"; +export type ClaudeOutputFormat = "text" | "json" | "stream-json"; +export type SessionScope = "per-sender" | "global"; + +export type SessionConfig = { + scope?: SessionScope; + resetTriggers?: string[]; + idleMinutes?: number; + store?: string; + sessionArgNew?: string[]; + sessionArgResume?: string[]; + sessionArgBeforeBody?: boolean; +}; + +export type WarelayConfig = { + inbound?: { + allowFrom?: string[]; // E.164 numbers allowed to trigger auto-reply (without whatsapp:) + reply?: { + mode: ReplyMode; + text?: string; // for mode=text, can contain {{Body}} + command?: string[]; // for mode=command, argv with templates + template?: string; // prepend template string when building command/prompt + timeoutSeconds?: number; // optional command timeout; defaults to 600s + bodyPrefix?: string; // optional string prepended to Body before templating + session?: SessionConfig; + claudeOutputFormat?: ClaudeOutputFormat; // when command starts with `claude`, force an output format + }; + }; +}; + +export const CONFIG_PATH = path.join(os.homedir(), ".warelay", "warelay.json"); + +export function loadConfig(): WarelayConfig { + // Read ~/.warelay/warelay.json (JSON5) if present. + try { + if (!fs.existsSync(CONFIG_PATH)) return {}; + const raw = fs.readFileSync(CONFIG_PATH, "utf-8"); + const parsed = JSON5.parse(raw); + if (typeof parsed !== "object" || parsed === null) return {}; + return parsed as WarelayConfig; + } catch (err) { + console.error(`Failed to read config at ${CONFIG_PATH}`, err); + return {}; + } +} diff --git a/src/config/sessions.ts b/src/config/sessions.ts new file mode 100644 index 000000000..70ca22a52 --- /dev/null +++ b/src/config/sessions.ts @@ -0,0 +1,54 @@ +import fs from "node:fs"; +import os from "node:os"; +import path from "node:path"; + +import JSON5 from "json5"; + +import { CONFIG_DIR, normalizeE164 } from "../utils.js"; +import type { MsgContext } from "../auto-reply/templating.js"; + +export type SessionScope = "per-sender" | "global"; + +export type SessionEntry = { sessionId: string; updatedAt: number }; + +export const SESSION_STORE_DEFAULT = path.join(CONFIG_DIR, "sessions.json"); +export const DEFAULT_RESET_TRIGGER = "/new"; +export const DEFAULT_IDLE_MINUTES = 60; + +export function resolveStorePath(store?: string) { + if (!store) return SESSION_STORE_DEFAULT; + if (store.startsWith("~")) + return path.resolve(store.replace("~", os.homedir())); + return path.resolve(store); +} + +export function loadSessionStore(storePath: string): Record { + try { + const raw = fs.readFileSync(storePath, "utf-8"); + const parsed = JSON5.parse(raw); + if (parsed && typeof parsed === "object") { + return parsed as Record; + } + } catch { + // ignore missing/invalid store; we'll recreate it + } + return {}; +} + +export async function saveSessionStore( + storePath: string, + store: Record, +) { + await fs.promises.mkdir(path.dirname(storePath), { recursive: true }); + await fs.promises.writeFile( + storePath, + JSON.stringify(store, null, 2), + "utf-8", + ); +} + +export function deriveSessionKey(scope: SessionScope, ctx: MsgContext) { + if (scope === "global") return "global"; + const from = ctx.From ? normalizeE164(ctx.From) : ""; + return from || "unknown"; +} diff --git a/src/index.ts b/src/index.ts index a6af05896..3790bfeb7 100644 --- a/src/index.ts +++ b/src/index.ts @@ -1,5 +1,4 @@ #!/usr/bin/env node -import { execFile, spawn } from "node:child_process"; import crypto from "node:crypto"; import fs from "node:fs"; import net from "node:net"; @@ -8,7 +7,6 @@ import path from "node:path"; import process, { stdin as input, stdout as output } from "node:process"; import readline from "node:readline/promises"; import { fileURLToPath } from "node:url"; -import { promisify } from "node:util"; import bodyParser from "body-parser"; import chalk from "chalk"; @@ -19,6 +17,32 @@ import JSON5 from "json5"; import Twilio from "twilio"; import type { MessageInstance } from "twilio/lib/rest/api/v2010/account/message.js"; import { z } from "zod"; +import { + runCommandWithTimeout, + runExec, + type SpawnResult, +} from "./process/exec.js"; +import { defaultRuntime, type RuntimeEnv } from "./runtime.js"; +import { sendTypingIndicator } from "./twilio/typing.js"; +import { + autoReplyIfConfigured, + getReplyFromConfig, +} from "./auto-reply/reply.js"; +import { CLAUDE_BIN, parseClaudeJsonText } from "./auto-reply/claude.js"; +import { + applyTemplate, + type MsgContext, + type TemplateContext, +} from "./auto-reply/templating.js"; +import { + CONFIG_PATH, + type WarelayConfig, + type SessionConfig, + type SessionScope, + type ReplyMode, + type ClaudeOutputFormat, + loadConfig, +} from "./config/config.js"; import { sendCommand } from "./commands/send.js"; import { statusCommand } from "./commands/status.js"; import { upCommand } from "./commands/up.js"; @@ -52,6 +76,15 @@ import { toWhatsappJid, withWhatsAppPrefix, } from "./utils.js"; +import { + DEFAULT_IDLE_MINUTES, + DEFAULT_RESET_TRIGGER, + deriveSessionKey, + loadSessionStore, + resolveStorePath, + saveSessionStore, + SESSION_STORE_DEFAULT, +} from "./config/sessions.js"; dotenv.config({ quiet: true }); @@ -192,21 +225,6 @@ type EnvConfig = { auth: AuthMode; }; -type RuntimeEnv = { - log: typeof console.log; - error: typeof console.error; - exit: (code: number) => never; -}; - -const defaultRuntime: RuntimeEnv = { - log: console.log, - error: console.error, - exit: (code) => { - process.exit(code); - throw new Error("unreachable"); // satisfies tests when mocked - }, -}; - const EnvSchema = z .object({ TWILIO_ACCOUNT_SID: z.string().min(1, "TWILIO_ACCOUNT_SID required"), @@ -280,84 +298,6 @@ function readEnv(runtime: RuntimeEnv = defaultRuntime): EnvConfig { }; } -const execFileAsync = promisify(execFile); - -type ExecResult = { stdout: string; stderr: string }; - -type ExecOptions = { maxBuffer?: number; timeoutMs?: number }; - -async function runExec( - command: string, - args: string[], - { maxBuffer = 2_000_000, timeoutMs }: ExecOptions = {}, -): Promise { - // Thin wrapper around execFile with utf8 output. - if (isVerbose()) { - console.log(`$ ${command} ${args.join(" ")}`); - } - try { - const { stdout, stderr } = await execFileAsync(command, args, { - maxBuffer, - encoding: "utf8", - timeout: timeoutMs, - }); - if (isVerbose()) { - if (stdout.trim()) console.log(stdout.trim()); - if (stderr.trim()) console.error(stderr.trim()); - } - return { stdout, stderr }; - } catch (err) { - if (isVerbose()) { - console.error(danger(`Command failed: ${command} ${args.join(" ")}`)); - } - throw err; - } -} - -type SpawnResult = { - stdout: string; - stderr: string; - code: number | null; - signal: NodeJS.Signals | null; - killed: boolean; -}; - -async function runCommandWithTimeout( - argv: string[], - timeoutMs: number, -): Promise { - // Spawn with inherited stdin (TTY) so tools like `claude` don't hang. - return await new Promise((resolve, reject) => { - const child = spawn(argv[0], argv.slice(1), { - stdio: ["inherit", "pipe", "pipe"], - }); - let stdout = ""; - let stderr = ""; - let settled = false; - const timer = setTimeout(() => { - child.kill("SIGKILL"); - }, timeoutMs); - - child.stdout?.on("data", (d) => { - stdout += d.toString(); - }); - child.stderr?.on("data", (d) => { - stderr += d.toString(); - }); - child.on("error", (err) => { - if (settled) return; - settled = true; - clearTimeout(timer); - reject(err); - }); - child.on("close", (code, signal) => { - if (settled) return; - settled = true; - clearTimeout(timer); - resolve({ stdout, stderr, code, signal, killed: child.killed }); - }); - }); -} class PortInUseError extends Error { port: number; @@ -479,472 +419,6 @@ async function promptYesNo( return answer.startsWith("y"); } -const CONFIG_PATH = path.join(os.homedir(), ".warelay", "warelay.json"); - -type ReplyMode = "text" | "command"; - -type ClaudeOutputFormat = "text" | "json" | "stream-json"; - -type WarelayConfig = { - inbound?: { - allowFrom?: string[]; // E.164 numbers allowed to trigger auto-reply (without whatsapp:) - reply?: { - mode: ReplyMode; - text?: string; // for mode=text, can contain {{Body}} - command?: string[]; // for mode=command, argv with templates - template?: string; // prepend template string when building command/prompt - timeoutSeconds?: number; // optional command timeout; defaults to 600s - bodyPrefix?: string; // optional string prepended to Body before templating - session?: SessionConfig; - claudeOutputFormat?: ClaudeOutputFormat; // when command starts with `claude`, force an output format - }; - }; -}; - -type SessionScope = "per-sender" | "global"; - -type SessionConfig = { - scope?: SessionScope; - resetTriggers?: string[]; - idleMinutes?: number; - store?: string; - sessionArgNew?: string[]; - sessionArgResume?: string[]; - sessionArgBeforeBody?: boolean; -}; - -function loadConfig(): WarelayConfig { - // Read ~/.warelay/warelay.json (JSON5) if present. - try { - if (!fs.existsSync(CONFIG_PATH)) return {}; - const raw = fs.readFileSync(CONFIG_PATH, "utf-8"); - const parsed = JSON5.parse(raw); - if (typeof parsed !== "object" || parsed === null) return {}; - return parsed as WarelayConfig; - } catch (err) { - console.error(`Failed to read config at ${CONFIG_PATH}`, err); - return {}; - } -} - -type MsgContext = { - Body?: string; - From?: string; - To?: string; - MessageSid?: string; -}; - -type GetReplyOptions = { - onReplyStart?: () => Promise | void; -}; - -function applyTemplate(str: string, ctx: TemplateContext) { - // Simple {{Placeholder}} interpolation using inbound message context. - return str.replace(/{{\s*(\w+)\s*}}/g, (_, key) => { - const value = (ctx as Record)[key]; - return value == null ? "" : String(value); - }); -} - -type TemplateContext = MsgContext & { - BodyStripped?: string; - SessionId?: string; - IsNewSession?: string; -}; - -function extractClaudeText(payload: unknown): string | undefined { - // Best-effort walker to find the primary text field in Claude JSON outputs. - if (payload == null) return undefined; - if (typeof payload === "string") return payload; - if (Array.isArray(payload)) { - for (const item of payload) { - const found = extractClaudeText(item); - if (found) return found; - } - return undefined; - } - if (typeof payload === "object") { - const obj = payload as Record; - if (typeof obj.text === "string") return obj.text; - if (typeof obj.completion === "string") return obj.completion; - if (typeof obj.output === "string") return obj.output; - if (obj.message) { - const inner = extractClaudeText(obj.message); - if (inner) return inner; - } - if (Array.isArray(obj.messages)) { - const inner = extractClaudeText(obj.messages); - if (inner) return inner; - } - if (Array.isArray(obj.content)) { - for (const block of obj.content) { - if ( - block && - typeof block === "object" && - (block as { type?: string }).type === "text" && - typeof (block as { text?: unknown }).text === "string" - ) { - return (block as { text: string }).text; - } - const inner = extractClaudeText(block); - if (inner) return inner; - } - } - } - return undefined; -} - -function parseClaudeJsonText(raw: string): string | undefined { - // Handle a single JSON blob or newline-delimited JSON; return the first extracted text. - const candidates = [raw, ...raw.split(/\n+/).map((s) => s.trim()).filter(Boolean)]; - for (const candidate of candidates) { - try { - const parsed = JSON.parse(candidate); - const text = extractClaudeText(parsed); - if (text) return text; - } catch { - // ignore parse errors; try next candidate - } - } - return undefined; -} - -type SessionEntry = { sessionId: string; updatedAt: number }; - -const SESSION_STORE_DEFAULT = path.join(CONFIG_DIR, "sessions.json"); -const DEFAULT_RESET_TRIGGER = "/new"; -const DEFAULT_IDLE_MINUTES = 60; -const CLAUDE_BIN = "claude"; - -function resolveStorePath(store?: string) { - if (!store) return SESSION_STORE_DEFAULT; - if (store.startsWith("~")) - return path.resolve(store.replace("~", os.homedir())); - return path.resolve(store); -} - -function loadSessionStore(storePath: string): Record { - try { - const raw = fs.readFileSync(storePath, "utf-8"); - const parsed = JSON5.parse(raw); - if (parsed && typeof parsed === "object") { - return parsed as Record; - } - } catch { - // ignore missing/invalid store; we'll recreate it - } - return {}; -} - -async function saveSessionStore( - storePath: string, - store: Record, -) { - await fs.promises.mkdir(path.dirname(storePath), { recursive: true }); - await fs.promises.writeFile( - storePath, - JSON.stringify(store, null, 2), - "utf-8", - ); -} - -function deriveSessionKey(scope: SessionScope, ctx: MsgContext) { - if (scope === "global") return "global"; - const from = ctx.From ? normalizeE164(ctx.From) : ""; - return from || "unknown"; -} - -async function getReplyFromConfig( - ctx: MsgContext, - opts?: GetReplyOptions, - configOverride?: WarelayConfig, - commandRunner: typeof runCommandWithTimeout = runCommandWithTimeout, -): Promise { - // Choose reply from config: static text or external command stdout. - const cfg = configOverride ?? loadConfig(); - const reply = cfg.inbound?.reply; - const timeoutSeconds = Math.max(reply?.timeoutSeconds ?? 600, 1); - const timeoutMs = timeoutSeconds * 1000; - let started = false; - const onReplyStart = async () => { - if (started) return; - started = true; - await opts?.onReplyStart?.(); - }; - - // Optional session handling (conversation reuse + /new resets) - const sessionCfg = reply?.session; - const resetTriggers = sessionCfg?.resetTriggers?.length - ? sessionCfg.resetTriggers - : [DEFAULT_RESET_TRIGGER]; - const idleMinutes = Math.max( - sessionCfg?.idleMinutes ?? DEFAULT_IDLE_MINUTES, - 1, - ); - const sessionScope = sessionCfg?.scope ?? "per-sender"; - const storePath = resolveStorePath(sessionCfg?.store); - - let sessionId: string | undefined; - let isNewSession = false; - let bodyStripped: string | undefined; - - if (sessionCfg) { - const trimmedBody = (ctx.Body ?? "").trim(); - for (const trigger of resetTriggers) { - if (!trigger) continue; - if (trimmedBody === trigger) { - isNewSession = true; - bodyStripped = ""; - break; - } - const triggerPrefix = `${trigger} `; - if (trimmedBody.startsWith(triggerPrefix)) { - isNewSession = true; - bodyStripped = trimmedBody.slice(trigger.length).trimStart(); - break; - } - } - - const sessionKey = deriveSessionKey(sessionScope, ctx); - const store = loadSessionStore(storePath); - const entry = store[sessionKey]; - const idleMs = idleMinutes * 60_000; - const freshEntry = entry && Date.now() - entry.updatedAt <= idleMs; - - if (!isNewSession && freshEntry) { - sessionId = entry.sessionId; - } else { - sessionId = crypto.randomUUID(); - isNewSession = true; - } - - store[sessionKey] = { sessionId, updatedAt: Date.now() }; - await saveSessionStore(storePath, store); - } - - const sessionCtx: TemplateContext = { - ...ctx, - BodyStripped: bodyStripped ?? ctx.Body, - SessionId: sessionId, - IsNewSession: isNewSession ? "true" : "false", - }; - - // Optional prefix injected before Body for templating/command prompts. - const bodyPrefix = reply?.bodyPrefix - ? applyTemplate(reply.bodyPrefix, sessionCtx) - : ""; - const prefixedBody = bodyPrefix - ? `${bodyPrefix}${sessionCtx.BodyStripped ?? sessionCtx.Body ?? ""}` - : (sessionCtx.BodyStripped ?? sessionCtx.Body); - const templatingCtx: TemplateContext = { - ...sessionCtx, - Body: prefixedBody, - BodyStripped: prefixedBody, - }; - - // Optional allowlist by origin number (E.164 without whatsapp: prefix) - const allowFrom = cfg.inbound?.allowFrom; - if (Array.isArray(allowFrom) && allowFrom.length > 0) { - const from = (ctx.From ?? "").replace(/^whatsapp:/, ""); - if (!allowFrom.includes(from)) { - logVerbose( - `Skipping auto-reply: sender ${from || ""} not in allowFrom list`, - ); - return undefined; - } - } - if (!reply) { - logVerbose("No inbound.reply configured; skipping auto-reply"); - return undefined; - } - - if (reply.mode === "text" && reply.text) { - await onReplyStart(); - logVerbose("Using text auto-reply from config"); - return applyTemplate(reply.text, templatingCtx); - } - - if (reply.mode === "command" && reply.command?.length) { - await onReplyStart(); - let argv = reply.command.map((part) => applyTemplate(part, templatingCtx)); - const templatePrefix = reply.template - ? applyTemplate(reply.template, templatingCtx) - : ""; - if (templatePrefix && argv.length > 0) { - argv = [argv[0], templatePrefix, ...argv.slice(1)]; - } - - // Ensure Claude commands can emit plain text by forcing --output-format when configured. - // We inject the flags only when the user points at the `claude` binary and has opted in via config, - // so existing custom argv or non-Claude commands remain untouched. - if ( - reply.claudeOutputFormat && - argv.length > 0 && - path.basename(argv[0]) === CLAUDE_BIN - ) { - const hasOutputFormat = argv.some( - (part) => - part === "--output-format" || part.startsWith("--output-format="), - ); - // Keep the final argument as the prompt/body; insert options just before it. - const insertBeforeBody = Math.max(argv.length - 1, 0); - if (!hasOutputFormat) { - argv = [ - ...argv.slice(0, insertBeforeBody), - "--output-format", - reply.claudeOutputFormat, - ...argv.slice(insertBeforeBody), - ]; - } - const hasPrintFlag = argv.some( - (part) => part === "-p" || part === "--print", - ); - if (!hasPrintFlag) { - const insertIdx = Math.max(argv.length - 1, 0); - argv = [...argv.slice(0, insertIdx), "-p", ...argv.slice(insertIdx)]; - } - } - - // Inject session args if configured (use resume for existing, session-id for new) - if (reply.session) { - const sessionArgList = ( - isNewSession - ? (reply.session.sessionArgNew ?? ["--session-id", "{{SessionId}}"]) - : (reply.session.sessionArgResume ?? ["--resume", "{{SessionId}}"]) - ).map((part) => applyTemplate(part, templatingCtx)); - if (sessionArgList.length) { - const insertBeforeBody = reply.session.sessionArgBeforeBody ?? true; - const insertAt = - insertBeforeBody && argv.length > 1 ? argv.length - 1 : argv.length; - argv = [ - ...argv.slice(0, insertAt), - ...sessionArgList, - ...argv.slice(insertAt), - ]; - } - } - const finalArgv = argv; - logVerbose(`Running command auto-reply: ${finalArgv.join(" ")}`); - const started = Date.now(); - try { - const { stdout, stderr, code, signal, killed } = await commandRunner( - finalArgv, - timeoutMs, - ); - let trimmed = stdout.trim(); - if (stderr?.trim()) { - logVerbose(`Command auto-reply stderr: ${stderr.trim()}`); - } - if (reply.claudeOutputFormat === "json" && trimmed) { - // Claude JSON mode: extract the human text for both logging and reply. - const extracted = parseClaudeJsonText(trimmed); - if (extracted) { - logVerbose( - `Claude JSON parsed -> ${extracted.slice(0, 120)}${extracted.length > 120 ? "…" : ""}`, - ); - trimmed = extracted.trim(); - } else { - logVerbose("Claude JSON parse failed; returning raw stdout"); - } - } - logVerbose( - `Command auto-reply stdout (trimmed): ${trimmed || ""}`, - ); - logVerbose(`Command auto-reply finished in ${Date.now() - started}ms`); - if ((code ?? 0) !== 0) { - console.error( - `Command auto-reply exited with code ${code ?? "unknown"} (signal: ${signal ?? "none"})`, - ); - return undefined; - } - if (killed && !signal) { - console.error( - `Command auto-reply process killed before completion (exit code ${code ?? "unknown"})`, - ); - return undefined; - } - return trimmed || undefined; - } catch (err) { - const elapsed = Date.now() - started; - const anyErr = err as { killed?: boolean; signal?: string }; - const timeoutHit = anyErr.killed === true || anyErr.signal === "SIGKILL"; - const errorObj = err as { - stdout?: string; - stderr?: string; - }; - if (errorObj.stderr?.trim()) { - logVerbose(`Command auto-reply stderr: ${errorObj.stderr.trim()}`); - } - if (timeoutHit) { - console.error( - `Command auto-reply timed out after ${elapsed}ms (limit ${timeoutMs}ms)`, - ); - } else { - console.error(`Command auto-reply failed after ${elapsed}ms`, err); - } - return undefined; - } - } - - return undefined; -} - -async function autoReplyIfConfigured( - client: ReturnType, - message: MessageInstance, - configOverride?: WarelayConfig, - runtime: RuntimeEnv = defaultRuntime, -): Promise { - // Fire a config-driven reply (text or command) for the inbound message, if configured. - const ctx: MsgContext = { - Body: message.body ?? undefined, - From: message.from ?? undefined, - To: message.to ?? undefined, - MessageSid: message.sid, - }; - - const replyText = await getReplyFromConfig( - ctx, - { - onReplyStart: () => sendTypingIndicator(client, message.sid, runtime), - }, - configOverride, - ); - if (!replyText) return; - - const replyFrom = message.to; - const replyTo = message.from; - if (!replyFrom || !replyTo) { - if (isVerbose()) - console.error( - "Skipping auto-reply: missing to/from on inbound message", - ctx, - ); - return; - } - - logVerbose( - `Auto-replying via Twilio: from ${replyFrom} to ${replyTo}, body length ${replyText.length}`, - ); - - try { - await client.messages.create({ - from: replyFrom, - to: replyTo, - body: replyText, - }); - if (isVerbose()) { - console.log( - success( - `↩️ Auto-replied to ${replyTo} (sid ${message.sid ?? "no-sid"})`, - ), - ); - } - } catch (err) { - logTwilioSendError(err, replyTo ?? undefined, runtime); - } -} - function createClient(env: EnvConfig) { // Twilio client using either auth token or API key/secret. if ("authToken" in env.auth) { @@ -957,35 +431,6 @@ function createClient(env: EnvConfig) { }); } -async function sendTypingIndicator( - client: ReturnType, - messageSid?: string, - runtime: RuntimeEnv = defaultRuntime, -) { - // Best-effort WhatsApp typing indicator (public beta as of Nov 2025). - if (!messageSid) { - logVerbose("Skipping typing indicator: missing MessageSid"); - return; - } - try { - const requester = client as unknown as TwilioRequester; - await requester.request({ - method: "post", - uri: "https://messaging.twilio.com/v2/Indicators/Typing.json", - form: { - messageId: messageSid, - channel: "whatsapp", - }, - }); - logVerbose(`Sent typing indicator for inbound ${messageSid}`); - } catch (err) { - if (isVerbose()) { - runtime.error(warn("Typing indicator failed (continuing without it)")); - runtime.error(err as Error); - } - } -} - async function sendMessage( to: string, body: string, diff --git a/src/process/exec.ts b/src/process/exec.ts new file mode 100644 index 000000000..afccb6b8a --- /dev/null +++ b/src/process/exec.ts @@ -0,0 +1,71 @@ +import { execFile, spawn } from "node:child_process"; + +import { danger, isVerbose } from "../globals.js"; + +export async function runExec( + command: string, + args: string[], + timeoutMs = 10_000, +): Promise<{ stdout: string; stderr: string }> { + // Simple promise-wrapped execFile with optional verbosity logging. + try { + const { stdout, stderr } = await execFile(command, args, { + timeout: timeoutMs, + }); + if (isVerbose()) { + if (stdout.trim()) console.log(stdout.trim()); + if (stderr.trim()) console.error(stderr.trim()); + } + return { stdout, stderr }; + } catch (err) { + if (isVerbose()) { + console.error(danger(`Command failed: ${command} ${args.join(" ")}`)); + } + throw err; + } +} + +export type SpawnResult = { + stdout: string; + stderr: string; + code: number | null; + signal: NodeJS.Signals | null; + killed: boolean; +}; + +export async function runCommandWithTimeout( + argv: string[], + timeoutMs: number, +): Promise { + // Spawn with inherited stdin (TTY) so tools like `claude` don't hang. + return await new Promise((resolve, reject) => { + const child = spawn(argv[0], argv.slice(1), { + stdio: ["inherit", "pipe", "pipe"], + }); + let stdout = ""; + let stderr = ""; + let settled = false; + const timer = setTimeout(() => { + child.kill("SIGKILL"); + }, timeoutMs); + + child.stdout?.on("data", (d) => { + stdout += d.toString(); + }); + child.stderr?.on("data", (d) => { + stderr += d.toString(); + }); + child.on("error", (err) => { + if (settled) return; + settled = true; + clearTimeout(timer); + reject(err); + }); + child.on("close", (code, signal) => { + if (settled) return; + settled = true; + clearTimeout(timer); + resolve({ stdout, stderr, code, signal, killed: child.killed }); + }); + }); +} diff --git a/src/runtime.ts b/src/runtime.ts new file mode 100644 index 000000000..56e2d138a --- /dev/null +++ b/src/runtime.ts @@ -0,0 +1,14 @@ +export type RuntimeEnv = { + log: typeof console.log; + error: typeof console.error; + exit: (code: number) => never; +}; + +export const defaultRuntime: RuntimeEnv = { + log: console.log, + error: console.error, + exit: (code) => { + process.exit(code); + throw new Error("unreachable"); // satisfies tests when mocked + }, +}; diff --git a/src/twilio/typing.ts b/src/twilio/typing.ts new file mode 100644 index 000000000..7f353af2e --- /dev/null +++ b/src/twilio/typing.ts @@ -0,0 +1,43 @@ +import { warn, isVerbose, logVerbose } from "../globals.js"; +import type { RuntimeEnv } from "../runtime.js"; + +type TwilioRequestOptions = { + method: "get" | "post"; + uri: string; + params?: Record; + form?: Record; + body?: unknown; + contentType?: string; +}; + +type TwilioRequester = { + request: (options: TwilioRequestOptions) => Promise; +}; + +export async function sendTypingIndicator( + client: TwilioRequester, + messageSid?: string, + runtime: RuntimeEnv, +) { + // Best-effort WhatsApp typing indicator (public beta as of Nov 2025). + if (!messageSid) { + logVerbose("Skipping typing indicator: missing MessageSid"); + return; + } + try { + await client.request({ + method: "post", + uri: "https://messaging.twilio.com/v2/Indicators/Typing.json", + form: { + messageId: messageSid, + channel: "whatsapp", + }, + }); + logVerbose(`Sent typing indicator for inbound ${messageSid}`); + } catch (err) { + if (isVerbose()) { + runtime.error(warn("Typing indicator failed (continuing without it)")); + runtime.error(err as Error); + } + } +}