import type { Command } from "commander"; import { danger } from "../globals.js"; import { defaultRuntime } from "../runtime.js"; import type { GatewayRpcOpts } from "./gateway-rpc.js"; import { addGatewayClientOptions, callGatewayFromCli } from "./gateway-rpc.js"; async function warnIfCronSchedulerDisabled(opts: GatewayRpcOpts) { try { const res = (await callGatewayFromCli("cron.status", opts, {})) as { enabled?: boolean; storePath?: string; }; if (res?.enabled === true) return; const store = typeof res?.storePath === "string" ? res.storePath : ""; defaultRuntime.error( [ "warning: cron scheduler is disabled in the Gateway; jobs are saved but will not run automatically.", "Re-enable with `cron.enabled: true` (or remove `cron.enabled: false`) and restart the Gateway.", store ? `store: ${store}` : "", ] .filter(Boolean) .join("\n"), ); } catch { // Ignore status failures (older gateway, offline, etc.) } } function parseDurationMs(input: string): number | null { const raw = input.trim(); if (!raw) return null; const match = raw.match(/^(\d+(?:\.\d+)?)(ms|s|m|h|d)$/i); if (!match) return null; const n = Number.parseFloat(match[1] ?? ""); if (!Number.isFinite(n) || n <= 0) return null; const unit = (match[2] ?? "").toLowerCase(); const factor = unit === "ms" ? 1 : unit === "s" ? 1000 : unit === "m" ? 60_000 : unit === "h" ? 3_600_000 : 86_400_000; return Math.floor(n * factor); } function parseAtMs(input: string): number | null { const raw = input.trim(); if (!raw) return null; const asNum = Number(raw); if (Number.isFinite(asNum) && asNum > 0) return Math.floor(asNum); const parsed = Date.parse(raw); if (Number.isFinite(parsed)) return parsed; const dur = parseDurationMs(raw); if (dur) return Date.now() + dur; return null; } export function registerCronCli(program: Command) { addGatewayClientOptions( program .command("wake") .description( "Enqueue a system event and optionally trigger an immediate heartbeat", ) .requiredOption("--text ", "System event text") .option( "--mode ", "Wake mode (now|next-heartbeat)", "next-heartbeat", ) .option("--json", "Output JSON", false), ).action(async (opts) => { try { const result = await callGatewayFromCli( "wake", opts, { mode: opts.mode, text: opts.text }, { expectFinal: false }, ); if (opts.json) defaultRuntime.log(JSON.stringify(result, null, 2)); else defaultRuntime.log("ok"); } catch (err) { defaultRuntime.error(danger(String(err))); defaultRuntime.exit(1); } }); const cron = program .command("cron") .description("Manage cron jobs (via Gateway)"); addGatewayClientOptions( cron .command("status") .description("Show cron scheduler status") .option("--json", "Output JSON", false) .action(async (opts) => { try { const res = await callGatewayFromCli("cron.status", opts, {}); defaultRuntime.log(JSON.stringify(res, null, 2)); } catch (err) { defaultRuntime.error(danger(String(err))); defaultRuntime.exit(1); } }), ); addGatewayClientOptions( cron .command("list") .description("List cron jobs") .option("--all", "Include disabled jobs", false) .option("--json", "Output JSON", false) .action(async (opts) => { try { const res = await callGatewayFromCli("cron.list", opts, { includeDisabled: Boolean(opts.all), }); defaultRuntime.log(JSON.stringify(res, null, 2)); } catch (err) { defaultRuntime.error(danger(String(err))); defaultRuntime.exit(1); } }), ); addGatewayClientOptions( cron .command("add") .description("Add a cron job") .option("--name ", "Optional name") .option("--disabled", "Create job disabled", false) .option("--session ", "Session target (main|isolated)", "main") .option( "--wake ", "Wake mode (now|next-heartbeat)", "next-heartbeat", ) .option("--at ", "Run once at time (ISO) or +duration (e.g. 20m)") .option("--every ", "Run every duration (e.g. 10m, 1h)") .option("--cron ", "Cron expression (5-field)") .option("--tz ", "Timezone for cron expressions (IANA)", "") .option("--system-event ", "System event payload (main session)") .option("--message ", "Agent message payload") .option( "--thinking ", "Thinking level for agent jobs (off|minimal|low|medium|high)", ) .option("--timeout-seconds ", "Timeout seconds for agent jobs") .option("--deliver", "Deliver agent output", false) .option( "--channel ", "Delivery channel (last|whatsapp|telegram)", "last", ) .option("--to ", "Delivery destination (E.164 or Telegram chatId)") .option( "--best-effort-deliver", "Do not fail the job if delivery fails", false, ) .option( "--post-prefix ", "Prefix for summary system event", "Cron", ) .option("--json", "Output JSON", false) .action(async (opts) => { try { const schedule = (() => { const at = typeof opts.at === "string" ? opts.at : ""; const every = typeof opts.every === "string" ? opts.every : ""; const cronExpr = typeof opts.cron === "string" ? opts.cron : ""; const chosen = [ Boolean(at), Boolean(every), Boolean(cronExpr), ].filter(Boolean).length; if (chosen !== 1) { throw new Error( "Choose exactly one schedule: --at, --every, or --cron", ); } if (at) { const atMs = parseAtMs(at); if (!atMs) throw new Error( "Invalid --at; use ISO time or duration like 20m", ); return { kind: "at" as const, atMs }; } if (every) { const everyMs = parseDurationMs(every); if (!everyMs) throw new Error("Invalid --every; use e.g. 10m, 1h, 1d"); return { kind: "every" as const, everyMs }; } return { kind: "cron" as const, expr: cronExpr, tz: typeof opts.tz === "string" && opts.tz.trim() ? opts.tz.trim() : undefined, }; })(); const sessionTarget = String(opts.session ?? "main"); if (sessionTarget !== "main" && sessionTarget !== "isolated") { throw new Error("--session must be main or isolated"); } const wakeMode = String(opts.wake ?? "next-heartbeat"); if (wakeMode !== "now" && wakeMode !== "next-heartbeat") { throw new Error("--wake must be now or next-heartbeat"); } const payload = (() => { const systemEvent = typeof opts.systemEvent === "string" ? opts.systemEvent.trim() : ""; const message = typeof opts.message === "string" ? opts.message.trim() : ""; const chosen = [Boolean(systemEvent), Boolean(message)].filter( Boolean, ).length; if (chosen !== 1) { throw new Error( "Choose exactly one payload: --system-event or --message", ); } if (systemEvent) return { kind: "systemEvent" as const, text: systemEvent }; const timeoutSeconds = opts.timeoutSeconds ? Number.parseInt(String(opts.timeoutSeconds), 10) : undefined; return { kind: "agentTurn" as const, message, thinking: typeof opts.thinking === "string" && opts.thinking.trim() ? opts.thinking.trim() : undefined, timeoutSeconds: timeoutSeconds && Number.isFinite(timeoutSeconds) ? timeoutSeconds : undefined, deliver: Boolean(opts.deliver), channel: typeof opts.channel === "string" ? opts.channel : "last", to: typeof opts.to === "string" && opts.to.trim() ? opts.to.trim() : undefined, bestEffortDeliver: Boolean(opts.bestEffortDeliver), }; })(); if (sessionTarget === "main" && payload.kind !== "systemEvent") { throw new Error("Main jobs require --system-event (systemEvent)."); } if (sessionTarget === "isolated" && payload.kind !== "agentTurn") { throw new Error("Isolated jobs require --message (agentTurn)."); } const isolation = sessionTarget === "isolated" ? { postToMainPrefix: typeof opts.postPrefix === "string" && opts.postPrefix.trim() ? opts.postPrefix.trim() : "Cron", } : undefined; const params = { name: typeof opts.name === "string" && opts.name.trim() ? opts.name.trim() : undefined, enabled: !opts.disabled, schedule, sessionTarget, wakeMode, payload, isolation, }; const res = await callGatewayFromCli("cron.add", opts, params); defaultRuntime.log(JSON.stringify(res, null, 2)); await warnIfCronSchedulerDisabled(opts); } catch (err) { defaultRuntime.error(danger(String(err))); defaultRuntime.exit(1); } }), ); addGatewayClientOptions( cron .command("rm") .description("Remove a cron job") .argument("", "Job id") .option("--json", "Output JSON", false) .action(async (id, opts) => { try { const res = await callGatewayFromCli("cron.remove", opts, { id }); defaultRuntime.log(JSON.stringify(res, null, 2)); } catch (err) { defaultRuntime.error(danger(String(err))); defaultRuntime.exit(1); } }), ); addGatewayClientOptions( cron .command("enable") .description("Enable a cron job") .argument("", "Job id") .action(async (id, opts) => { try { const res = await callGatewayFromCli("cron.update", opts, { id, patch: { enabled: true }, }); defaultRuntime.log(JSON.stringify(res, null, 2)); await warnIfCronSchedulerDisabled(opts); } catch (err) { defaultRuntime.error(danger(String(err))); defaultRuntime.exit(1); } }), ); addGatewayClientOptions( cron .command("disable") .description("Disable a cron job") .argument("", "Job id") .action(async (id, opts) => { try { const res = await callGatewayFromCli("cron.update", opts, { id, patch: { enabled: false }, }); defaultRuntime.log(JSON.stringify(res, null, 2)); await warnIfCronSchedulerDisabled(opts); } catch (err) { defaultRuntime.error(danger(String(err))); defaultRuntime.exit(1); } }), ); addGatewayClientOptions( cron .command("runs") .description("Show cron run history (JSONL-backed)") .requiredOption("--id ", "Job id") .option("--limit ", "Max entries (default 50)", "50") .action(async (opts) => { try { const limitRaw = Number.parseInt(String(opts.limit ?? "50"), 10); const limit = Number.isFinite(limitRaw) && limitRaw > 0 ? limitRaw : 50; const id = String(opts.id); const res = await callGatewayFromCli("cron.runs", opts, { id, limit, }); defaultRuntime.log(JSON.stringify(res, null, 2)); } catch (err) { defaultRuntime.error(danger(String(err))); defaultRuntime.exit(1); } }), ); addGatewayClientOptions( cron .command("edit") .description("Edit a cron job (patch fields)") .argument("", "Job id") .option("--name ", "Set name") .option("--enable", "Enable job", false) .option("--disable", "Disable job", false) .option("--session ", "Session target (main|isolated)") .option("--wake ", "Wake mode (now|next-heartbeat)") .option("--at ", "Set one-shot time (ISO) or duration like 20m") .option("--every ", "Set interval duration like 10m") .option("--cron ", "Set cron expression") .option("--tz ", "Timezone for cron expressions (IANA)") .option("--system-event ", "Set systemEvent payload") .option("--message ", "Set agentTurn payload message") .option("--thinking ", "Thinking level for agent jobs") .option("--timeout-seconds ", "Timeout seconds for agent jobs") .option("--deliver", "Deliver agent output", false) .option( "--channel ", "Delivery channel (last|whatsapp|telegram)", ) .option("--to ", "Delivery destination") .option( "--best-effort-deliver", "Do not fail job if delivery fails", false, ) .option("--post-prefix ", "Prefix for summary system event") .action(async (id, opts) => { try { if (opts.session === "main" && opts.message) { throw new Error( "Main jobs cannot use --message; use --system-event or --session isolated.", ); } if (opts.session === "isolated" && opts.systemEvent) { throw new Error( "Isolated jobs cannot use --system-event; use --message or --session main.", ); } if (opts.session === "main" && typeof opts.postPrefix === "string") { throw new Error("--post-prefix only applies to isolated jobs."); } const patch: Record = {}; if (typeof opts.name === "string") patch.name = opts.name; if (opts.enable && opts.disable) throw new Error("Choose --enable or --disable, not both"); if (opts.enable) patch.enabled = true; if (opts.disable) patch.enabled = false; if (typeof opts.session === "string") patch.sessionTarget = opts.session; if (typeof opts.wake === "string") patch.wakeMode = opts.wake; const scheduleChosen = [opts.at, opts.every, opts.cron].filter( Boolean, ).length; if (scheduleChosen > 1) throw new Error("Choose at most one schedule change"); if (opts.at) { const atMs = parseAtMs(String(opts.at)); if (!atMs) throw new Error("Invalid --at"); patch.schedule = { kind: "at", atMs }; } else if (opts.every) { const everyMs = parseDurationMs(String(opts.every)); if (!everyMs) throw new Error("Invalid --every"); patch.schedule = { kind: "every", everyMs }; } else if (opts.cron) { patch.schedule = { kind: "cron", expr: String(opts.cron), tz: typeof opts.tz === "string" && opts.tz.trim() ? opts.tz.trim() : undefined, }; } const payloadChosen = [opts.systemEvent, opts.message].filter( Boolean, ).length; if (payloadChosen > 1) throw new Error("Choose at most one payload change"); if (opts.systemEvent) { patch.payload = { kind: "systemEvent", text: String(opts.systemEvent), }; } else if (opts.message) { const timeoutSeconds = opts.timeoutSeconds ? Number.parseInt(String(opts.timeoutSeconds), 10) : undefined; patch.payload = { kind: "agentTurn", message: String(opts.message), thinking: typeof opts.thinking === "string" ? opts.thinking : undefined, timeoutSeconds: timeoutSeconds && Number.isFinite(timeoutSeconds) ? timeoutSeconds : undefined, deliver: Boolean(opts.deliver), channel: typeof opts.channel === "string" ? opts.channel : undefined, to: typeof opts.to === "string" ? opts.to : undefined, bestEffortDeliver: Boolean(opts.bestEffortDeliver), }; } if (typeof opts.postPrefix === "string") { patch.isolation = { postToMainPrefix: opts.postPrefix.trim() ? opts.postPrefix : "Cron", }; } const res = await callGatewayFromCli("cron.update", opts, { id, patch, }); defaultRuntime.log(JSON.stringify(res, null, 2)); await warnIfCronSchedulerDisabled(opts); } catch (err) { defaultRuntime.error(danger(String(err))); defaultRuntime.exit(1); } }), ); addGatewayClientOptions( cron .command("run") .description("Run a cron job now (debug)") .argument("", "Job id") .option("--force", "Run even if not due", false) .action(async (id, opts) => { try { const res = await callGatewayFromCli("cron.run", opts, { id, mode: opts.force ? "force" : "due", }); defaultRuntime.log(JSON.stringify(res, null, 2)); } catch (err) { defaultRuntime.error(danger(String(err))); defaultRuntime.exit(1); } }), ); }