From f9409cbe43d242770a60cd84f7251b78c7522a89 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Sat, 13 Dec 2025 02:34:11 +0000 Subject: [PATCH] Cron: add scheduler, wakeups, and run history --- docs/configuration.md | 24 ++ docs/cron.md | 364 +++++++++++++++++++++++++++ docs/queue.md | 3 +- package.json | 1 + src/cli/cron-cli.ts | 414 ++++++++++++++++++++++++++++++ src/cli/gateway-cli.ts | 275 ++++++++++++++++++++ src/cli/gateway-rpc.ts | 35 +++ src/cli/program.ts | 267 +------------------- src/config/config.ts | 14 ++ src/cron/isolated-agent.ts | 341 +++++++++++++++++++++++++ src/cron/run-log.test.ts | 98 ++++++++ src/cron/run-log.ts | 101 ++++++++ src/cron/schedule.test.ts | 26 ++ src/cron/schedule.ts | 29 +++ src/cron/service.test.ts | 120 +++++++++ src/cron/service.ts | 431 ++++++++++++++++++++++++++++++++ src/cron/store.ts | 52 ++++ src/cron/types.ts | 64 +++++ src/gateway/protocol/index.ts | 49 ++++ src/gateway/protocol/schema.ts | 197 +++++++++++++++ src/gateway/server.test.ts | 291 ++++++++++++++++++++- src/gateway/server.ts | 241 +++++++++++++++++- src/process/command-queue.ts | 132 +++++++--- src/web/auto-reply.test.ts | 12 +- src/web/auto-reply.ts | 85 +++++-- src/web/reply-heartbeat-wake.ts | 77 ++++++ 26 files changed, 3401 insertions(+), 342 deletions(-) create mode 100644 docs/cron.md create mode 100644 src/cli/cron-cli.ts create mode 100644 src/cli/gateway-cli.ts create mode 100644 src/cli/gateway-rpc.ts create mode 100644 src/cron/isolated-agent.ts create mode 100644 src/cron/run-log.test.ts create mode 100644 src/cron/run-log.ts create mode 100644 src/cron/schedule.test.ts create mode 100644 src/cron/schedule.ts create mode 100644 src/cron/service.test.ts create mode 100644 src/cron/service.ts create mode 100644 src/cron/store.ts create mode 100644 src/cron/types.ts create mode 100644 src/web/reply-heartbeat-wake.ts diff --git a/docs/configuration.md b/docs/configuration.md index 977a87d86..2dd747cab 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -112,6 +112,30 @@ Array of E.164 phone numbers allowed to trigger the AI. Use `["*"]` to allow eve > Quick start: If you omit `inbound.reply`, CLAWDIS falls back to the bundled `@mariozechner/pi-coding-agent` with `--mode rpc`, per-sender sessions, and a 200k-token window. No extra install or config needed to get a reply. +### `cron` + +Cron is a Gateway-owned scheduler for wakeups and scheduled jobs. See `docs/cron.md` for the full RFC and CLI examples. + +| Key | Type | Default | Description | +|-----|------|---------|-------------| +| `enabled` | boolean | `false` | Enable the cron scheduler inside the Gateway | +| `store` | string | *(auto)* | Override the cron job store path (defaults to `~/.clawdis/cron/jobs.json` if present, otherwise `~/.clawdis/cron.json`) | +| `maxConcurrentRuns` | number | `1` | Max concurrent isolated cron runs (command-queue lane `"cron"`) | + +Run history: +- The Gateway appends a JSONL run ledger on each job completion (see `docs/cron.md`). Location is derived from `cron.store` / the resolved store path. + +Example: + +```json5 +{ + cron: { + enabled: true, + maxConcurrentRuns: 2 + } +} +``` + ### Template Variables Use these in your command: diff --git a/docs/cron.md b/docs/cron.md new file mode 100644 index 000000000..3806d61bc --- /dev/null +++ b/docs/cron.md @@ -0,0 +1,364 @@ +--- +summary: "RFC: Cron jobs + wakeups for Clawd/Clawdis (main vs isolated sessions)" +read_when: + - Designing scheduled jobs, alarms, or wakeups + - Adding Gateway methods or CLI commands for automation + - Adjusting heartbeat behavior or session routing +--- + +# RFC: Cron jobs + wakeups for Clawd + +Status: Draft +Last updated: 2025-12-13 + +## Context + +Clawdis already has: +- A **periodic reply heartbeat** that runs the agent with `HEARTBEAT /think:high` and suppresses `HEARTBEAT_OK` (`src/web/auto-reply.ts`). +- A lightweight, in-memory **system event queue** (`enqueueSystemEvent`) that is injected into the next **main session** turn (`drainSystemEvents` in `src/auto-reply/reply.ts`). +- A WebSocket **Gateway** daemon that is intended to be always-on (`docs/gateway.md`). + +This RFC adds a small “cron job system” so Clawd can schedule future work and reliably wake itself up: +- **Delayed**: run on the *next* normal heartbeat tick +- **Immediate**: run *now* (trigger a heartbeat immediately) +- **Isolated jobs**: optionally run in their own session that does not pollute the main session and can run concurrently (within configured limits). + +## Goals + +- Provide a **persistent job store** and an **in-process scheduler** owned by the Gateway. +- Allow each job to target either: + - `sessionTarget: "main"`: inject as `System:` lines and rely on the main heartbeat (or trigger it immediately). + - `sessionTarget: "isolated"`: run an agent turn in a dedicated session key (job session), optionally delivering a message and/or posting a summary back to main. +- Expose a stable control surface: + - **Gateway methods** (`cron.*`, `wake`) for programmatic usage (mac app, CLI, agents). + - **CLI commands** (`clawdis cron ...`) to add/remove/edit/list and to debug `run`. +- Produce clear, structured **logs** for job lifecycle and execution outcomes. + +## Non-goals (v1) + +- Multi-host distributed scheduling. +- Exactly-once semantics across crashes (we aim for “at-least-once with idempotency hooks”). +- A full Unix-cron parser as the only schedule format (we can support it, but v1 should not require complex cron features to be useful). + +## Terminology + +- **Wake**: a request to ensure the agent gets a turn soon (either right now or next heartbeat). +- **Main session**: the canonical session bucket (default key `"main"`) that receives `System:` events. +- **Isolated session**: a per-job session key (e.g. `cron:`) with its own session id / session file. + +## User stories + +- “Remind me in 20 minutes” → add a one-shot job that triggers an immediate heartbeat at T+20m. +- “Every weekday at 7:30, wake me up and start music” → recurring job, isolated session, deliver to WhatsApp. +- “Every hour, check battery; only interrupt me if < 20%” → isolated job that decides whether to deliver; may also post a brief status to main. +- “Next heartbeat, please check calendar” → delayed wake targeting main session. + +## Job model + +### Storage schema (v1) + +Each job is a JSON object with stable keys (unknown keys ignored for forward compatibility): + +- `id: string` (UUID) +- `name?: string` +- `enabled: boolean` +- `createdAtMs: number` +- `updatedAtMs: number` +- `schedule` (one of) + - `{"kind":"at","atMs":number}` (one-shot) + - `{"kind":"every","everyMs":number,"anchorMs"?:number}` (simple interval) + - `{"kind":"cron","expr":string,"tz"?:string}` (optional; see “Schedule parsing”) +- `sessionTarget: "main" | "isolated"` +- `wakeMode: "next-heartbeat" | "now"` + - For `sessionTarget:"isolated"`, `wakeMode:"now"` means “run immediately when due”. + - For `sessionTarget:"main"`, `wakeMode` controls whether we trigger the heartbeat immediately or just enqueue and wait. +- `payload` (one of) + - `{"kind":"systemEvent","text":string}` (enqueue as `System:`) + - `{"kind":"agentTurn","message":string,"deliver"?:boolean,"channel"?: "last"|"whatsapp"|"telegram","to"?:string,"timeoutSeconds"?:number}` +- `isolation` (optional; only meaningful for isolated jobs) + - `{"postToMain": boolean, "postToMainPrefix"?: string}` +- `runtime` (optional) + - `{"maxAttempts"?:number,"retryBackoffMs"?:number}` (best-effort retries; defaults off) +- `state` (runtime-maintained) + - `{"nextRunAtMs":number,"lastRunAtMs"?:number,"lastStatus"?: "ok"|"error"|"skipped","lastError"?:string,"lastDurationMs"?:number}` + +### Key behavior + +- `sessionTarget:"main"` jobs always enqueue `payload.kind:"systemEvent"` (directly or derived from `agentTurn` results; see below). +- `sessionTarget:"isolated"` jobs create/use a stable session key: `cron:`. + +## Storage location + +We can store this directly under `~/.clawdis` without a subfolder, but a folder gives us room for future artifacts (per-job state, migration backups, run history). + +Current behavior (v1): +- Default store: `~/.clawdis/cron.json` +- If `~/.clawdis/cron/jobs.json` exists, it is preferred (and is a good location for future per-cron artifacts). +- Any path can be forced via `cron.store` in config. + +The scheduler should never require additional configuration for the base directory (Clawdis already treats `~/.clawdis` as fixed). + +## Enabling + +Cron execution should be opt-in via config: + +```json5 +{ + cron: { + enabled: true, + // optional: + store: "~/.clawdis/cron.json", + maxConcurrentRuns: 1 + } +} +``` + +## Scheduler design + +### Ownership + +The Gateway owns: +- the scheduler timer, +- job store reads/writes, +- job execution (enqueue system events and/or agent turns). + +This keeps scheduling unified with the always-on process and prevents “two schedulers” when multiple CLIs run. + +### Timer strategy + +- Maintain an in-memory heap/array of enabled jobs keyed by `state.nextRunAtMs`. +- Use a **single `setTimeout`** to wake at the earliest next run. +- On wake: + - compute all due jobs (now >= nextRunAtMs), + - mark them “in flight” (in memory), + - persist updated `state` (at least bump `nextRunAtMs` / `lastRunAtMs`) before starting execution to minimize duplicate runs on crash, + - execute jobs (with concurrency limits), + - persist final `lastStatus/lastError/lastDurationMs`, + - re-arm timer for the next earliest run. + +### Schedule parsing + +V1 can ship with `at` + `every` without extra deps. + +If we add `"kind":"cron"`: +- Use a well-maintained parser (we use `croner`) and support: + - 5-field cron (`min hour dom mon dow`) at minimum + - optional `tz` +- Store `nextRunAtMs` computed by the parser; re-compute after each run. + +## Execution semantics + +### Main session jobs + +Main session jobs do not run the agent directly by default. + +When due: +1) `enqueueSystemEvent(job.payload.text)` (or a derived message) +2) If `wakeMode:"now"`, trigger an immediate heartbeat run (see “Heartbeat wake hook”). +3) Otherwise do nothing else (the next scheduled heartbeat will pick up the system event). + +Why: This keeps the main session’s “proactive” behavior centralized in the heartbeat rules and avoids ad-hoc agent turns that might fight with inbound message processing. + +### Isolated session jobs + +Isolated jobs run an agent turn in a dedicated session key, intended to be separate from main. + +When due: +- Build a message body that includes schedule metadata, e.g.: + - `"[cron:] : "` +- Execute via the same agent runner path as other command-mode runs, but pinned to: + - `sessionKey = cron:` + - `sessionId = store[sessionKey].sessionId` (create if missing) +- Optionally deliver output (`payload.deliver === true`) to the configured channel/to. +- If `isolation.postToMain` is true, enqueue a summary system event to main, e.g.: + - `System: Cron "" completed: <1-line summary>` + +### “Run in parallel to main” + +Clawdis currently serializes command execution through a global in-process queue (`src/process/command-queue.ts`) to avoid collisions. + +To support isolated cron jobs running “in parallel”, we should introduce **lanes** (keyed queues) plus a global concurrency cap: +- Lane `"main"`: inbound auto-replies + main heartbeat. +- Lane `"cron"` (or `cron:`): isolated jobs. +- Configurable `cron.maxConcurrentRuns` (default 1 or 2). + +This yields: +- isolated jobs can overlap with the main lane (up to cap), +- each lane still preserves ordering for its own work (optional), +- we retain safety knobs to prevent runaway resource contention. + +## Heartbeat wake hook (immediate vs next heartbeat) + +We need a way for the Gateway (or the scheduler) to request an immediate heartbeat without duplicating heartbeat logic. + +Design: +- `monitorWebProvider` owns the real `runReplyHeartbeat()` function (it already has all the local state needed). +- Add a small global hook module: + - `setReplyHeartbeatWakeHandler(fn | null)` installed by `monitorWebProvider` + - `requestReplyHeartbeatNow({ reason, coalesceMs? })` +- If the handler is absent (provider not connected), the request is stored as “pending”; the next time the handler is installed, it runs once. +- Coalesce rapid calls and respect the existing “skip when queue busy” behavior (prefer retrying soon vs dropping). + +## Run history log (JSONL) + +In addition to normal structured logs, the Gateway writes an append-only run history “ledger” (JSONL) whenever a job finishes. This is intended for quick debugging (“did the job run, when, and what happened?”). + +Path rules: +- If the cron store path basename is `jobs.json` (e.g. `~/.clawdis/cron/jobs.json`), logs go to `.../runs/.jsonl` (e.g. `~/.clawdis/cron/runs/.jsonl`). +- Otherwise logs go to `.runs.jsonl` in the same directory (e.g. `~/.clawdis/cron.json` → `~/.clawdis/cron.runs.jsonl`). + +Retention: +- Best-effort pruning when the file grows beyond ~2MB; keep the newest ~2000 lines. + +## Gateway API + +New methods (names can be bikeshed; `cron.*` is suggested): + +- `wake` + - params: `{ mode: "now" | "next-heartbeat", text: string }` + - effect: `enqueueSystemEvent(text)`, plus optional immediate heartbeat trigger + +- `cron.list` + - params: optional `{ includeDisabled?: boolean }` + - returns: `{ jobs: CronJob[] }` + +- `cron.add` + - params: job payload without `id/state` (server generates and returns created job) + +- `cron.update` + - params: `{ id: string, patch: Partial }` + +- `cron.remove` + - params: `{ id: string }` + +- `cron.run` + - params: `{ id: string, mode?: "due" | "force" }` (debugging; does not change schedule unless `force` requires it) + +- `cron.runs` + - params: `{ id?: string, limit?: number }` + - returns: `{ entries: CronRunLogEntry[] }` + - note: if the store layout is `.../jobs.json`, `id` is required (runs are stored per-job). + +The Gateway should broadcast a `cron` event for UI/debug: +- event: `cron` + - payload: `{ jobId, action: "added"|"updated"|"removed"|"started"|"finished", status?, error?, nextRunAtMs? }` + +## CLI surface + +Add a `cron` command group (all commands should also support `--json` where sensible): + +- `clawdis cron list [--json] [--all]` +- `clawdis cron add ...` + - schedule flags: + - `--at ` (one-shot) + - `--every ` (e.g. `10m`, `1h`) + - `--cron "" [--tz ""]` + - target flags: + - `--session main|isolated` + - `--wake now|next` + - payload flags (choose one): + - `--system-event ""` + - `--message "" [--deliver] [--channel last|whatsapp|telegram] [--to ]` + +- `clawdis cron edit ...` (patch-by-flags, non-interactive) +- `clawdis cron rm ` +- `clawdis cron enable ` / `clawdis cron disable ` +- `clawdis cron run [--force]` (debug) + +Additionally: +- `clawdis wake --mode now|next --text ""` as a thin wrapper around `wake` for agents to call. + +## Examples + +### Run once at a specific time + +One-shot reminder that targets the main session and triggers a heartbeat immediately at the scheduled time: + +```bash +clawdis cron add \ + --at "2025-12-14T07:00:00-08:00" \ + --session main \ + --wake now \ + --system-event "Alarm: wake up (meeting in 30 minutes)." +``` + +### Run daily (calendar-accurate) + +Daily at 07:00 in a specific timezone (preferred over “every 24h” to avoid DST drift): + +```bash +clawdis cron add \ + --cron "0 7 * * *" \ + --tz "America/Los_Angeles" \ + --session isolated \ + --wake now \ + --message "Daily check: scan calendar + inbox; deliver only if urgent." \ + --deliver \ + --channel last +``` + +### Run weekly (every Wednesday) + +Every Wednesday at 09:00: + +```bash +clawdis cron add \ + --cron "0 9 * * 3" \ + --tz "America/Los_Angeles" \ + --session isolated \ + --wake now \ + --message "Weekly: summarize status and remind me of goals." \ + --deliver \ + --channel last +``` + +### “Next heartbeat” + +Enqueue a note for the main session but let the existing heartbeat cadence pick it up: + +```bash +clawdis wake --mode next --text "Next heartbeat: check battery + upcoming meetings." +``` + +## Logging & observability + +Logging requirements: +- Use `getChildLogger({ module: "cron", jobId, runId, name })` for every run. +- Log lifecycle: + - store load/save (debug; include job count) + - schedule recompute (debug; include nextRunAt) + - job start/end (info) + - job skipped (info; include reason) + - job error (warn; include error + stack where available) +- Emit a concise user-facing line to stdout when running in CLI mode (similar to heartbeat logs). + +Suggested log events: +- `cron: scheduler started` (jobCount, nextWakeAt) +- `cron: job started` (jobId, scheduleKind, sessionTarget, wakeMode) +- `cron: job finished` (status, durationMs, nextRunAtMs) + +## Safety & security + +- Respect existing allowlists/routing rules: delivery defaults should not send to arbitrary destinations unless explicitly configured. +- Provide a global “kill switch”: + - `cron.enabled: boolean` config default true (or false until enabled). + - `gateway method set-heartbeats` already exists; cron should have similar. +- Avoid persistence of sensitive payloads unless requested; job text may contain private content. + +## Testing plan (v1) + +- Unit tests: + - schedule computation for `at` and `every` + - job store read/write + migration behavior + - lane concurrency: main vs cron overlap is bounded + - “wake now” coalescing and pending behavior when provider not ready +- Integration tests: + - start Gateway with `CLAWDIS_SKIP_PROVIDERS=1`, add jobs, list/edit/remove + - simulate due jobs and assert `enqueueSystemEvent` called + cron events broadcast + +## Rollout plan + +1) Add the `wake` primitive + heartbeat wake hook (no persistent jobs yet). +2) Add `cron.*` API and CLI wrappers with `at` + `every`. +3) Add optional cron expression parsing (`kind:"cron"`) if needed. +4) Add UI surfacing in WebChat/macOS app (optional). diff --git a/docs/queue.md b/docs/queue.md index a3db8fed3..56874cf2b 100644 --- a/docs/queue.md +++ b/docs/queue.md @@ -19,7 +19,8 @@ We now serialize all command-based auto-replies (WhatsApp Web listener) through ## Scope and guarantees - Applies only to config-driven command replies; plain text replies are unaffected. -- Queue is process-wide, so the web inbox listener (and any future entrypoints) all respect the same lock. +- Default lane (`main`) is process-wide for inbound + main heartbeats to keep the primary workflow serialized. +- Additional lanes may exist (e.g. `cron`) so background jobs can run in parallel without blocking inbound replies. - No external dependencies or background worker threads; pure TypeScript + promises. ## Troubleshooting diff --git a/package.json b/package.json index 13efbd407..794e67976 100644 --- a/package.json +++ b/package.json @@ -43,6 +43,7 @@ "body-parser": "^2.2.1", "chalk": "^5.6.2", "commander": "^14.0.2", + "croner": "^9.1.0", "detect-libc": "^2.1.2", "dotenv": "^17.2.3", "express": "^5.2.1", diff --git a/src/cli/cron-cli.ts b/src/cli/cron-cli.ts new file mode 100644 index 000000000..adbb39b0f --- /dev/null +++ b/src/cli/cron-cli.ts @@ -0,0 +1,414 @@ +import type { Command } from "commander"; +import { danger } from "../globals.js"; +import { defaultRuntime } from "../runtime.js"; +import { addGatewayClientOptions, callGatewayFromCli } from "./gateway-rpc.js"; + +function parseDurationMs(input: string): number | null { + const raw = input.trim(); + if (!raw) return null; + const match = raw.match(/^(\d+(?:\.\d+)?)(ms|s|m|h|d)$/i); + if (!match) return null; + const n = Number.parseFloat(match[1] ?? ""); + if (!Number.isFinite(n) || n <= 0) return null; + const unit = (match[2] ?? "").toLowerCase(); + const factor = + unit === "ms" + ? 1 + : unit === "s" + ? 1000 + : unit === "m" + ? 60_000 + : unit === "h" + ? 3_600_000 + : 86_400_000; + return Math.floor(n * factor); +} + +function parseAtMs(input: string): number | null { + const raw = input.trim(); + if (!raw) return null; + const asNum = Number(raw); + if (Number.isFinite(asNum) && asNum > 0) return Math.floor(asNum); + const parsed = Date.parse(raw); + if (Number.isFinite(parsed)) return parsed; + const dur = parseDurationMs(raw); + if (dur) return Date.now() + dur; + return null; +} + +export function registerCronCli(program: Command) { + addGatewayClientOptions( + program + .command("wake") + .description( + "Enqueue a system event and optionally trigger an immediate heartbeat", + ) + .requiredOption("--text ", "System event text") + .option( + "--mode ", + "Wake mode (now|next-heartbeat)", + "next-heartbeat", + ) + .option("--json", "Output JSON", false), + ).action(async (opts) => { + try { + const result = await callGatewayFromCli( + "wake", + opts, + { mode: opts.mode, text: opts.text }, + { expectFinal: false }, + ); + if (opts.json) defaultRuntime.log(JSON.stringify(result, null, 2)); + else defaultRuntime.log("ok"); + } catch (err) { + defaultRuntime.error(danger(String(err))); + defaultRuntime.exit(1); + } + }); + + const cron = program + .command("cron") + .description("Manage cron jobs (via Gateway)"); + + addGatewayClientOptions( + cron + .command("list") + .description("List cron jobs") + .option("--all", "Include disabled jobs", false) + .option("--json", "Output JSON", false) + .action(async (opts) => { + try { + const res = await callGatewayFromCli("cron.list", opts, { + includeDisabled: Boolean(opts.all), + }); + defaultRuntime.log(JSON.stringify(res, null, 2)); + } catch (err) { + defaultRuntime.error(danger(String(err))); + defaultRuntime.exit(1); + } + }), + ); + + addGatewayClientOptions( + cron + .command("add") + .description("Add a cron job") + .option("--name ", "Optional name") + .option("--disabled", "Create job disabled", false) + .option("--session ", "Session target (main|isolated)", "main") + .option( + "--wake ", + "Wake mode (now|next-heartbeat)", + "next-heartbeat", + ) + .option("--at ", "Run once at time (ISO) or +duration (e.g. 20m)") + .option("--every ", "Run every duration (e.g. 10m, 1h)") + .option("--cron ", "Cron expression (5-field)") + .option("--tz ", "Timezone for cron expressions (IANA)", "") + .option("--system-event ", "System event payload (main session)") + .option("--message ", "Agent message payload") + .option( + "--thinking ", + "Thinking level for agent jobs (off|minimal|low|medium|high)", + ) + .option("--timeout-seconds ", "Timeout seconds for agent jobs") + .option("--deliver", "Deliver agent output", false) + .option( + "--channel ", + "Delivery channel (last|whatsapp|telegram)", + "last", + ) + .option("--to ", "Delivery destination (E.164 or Telegram chatId)") + .option( + "--best-effort-deliver", + "Do not fail the job if delivery fails", + false, + ) + .option("--post-to-main", "Post a 1-line summary to main session", false) + .option( + "--post-prefix ", + "Prefix for summary system event", + "Cron", + ) + .option("--json", "Output JSON", false) + .action(async (opts) => { + try { + const schedule = (() => { + const at = typeof opts.at === "string" ? opts.at : ""; + const every = typeof opts.every === "string" ? opts.every : ""; + const cronExpr = typeof opts.cron === "string" ? opts.cron : ""; + const chosen = [ + Boolean(at), + Boolean(every), + Boolean(cronExpr), + ].filter(Boolean).length; + if (chosen !== 1) { + throw new Error( + "Choose exactly one schedule: --at, --every, or --cron", + ); + } + if (at) { + const atMs = parseAtMs(at); + if (!atMs) + throw new Error( + "Invalid --at; use ISO time or duration like 20m", + ); + return { kind: "at" as const, atMs }; + } + if (every) { + const everyMs = parseDurationMs(every); + if (!everyMs) + throw new Error("Invalid --every; use e.g. 10m, 1h, 1d"); + return { kind: "every" as const, everyMs }; + } + return { + kind: "cron" as const, + expr: cronExpr, + tz: + typeof opts.tz === "string" && opts.tz.trim() + ? opts.tz.trim() + : undefined, + }; + })(); + + const sessionTarget = String(opts.session ?? "main"); + if (sessionTarget !== "main" && sessionTarget !== "isolated") { + throw new Error("--session must be main or isolated"); + } + + const wakeMode = String(opts.wake ?? "next-heartbeat"); + if (wakeMode !== "now" && wakeMode !== "next-heartbeat") { + throw new Error("--wake must be now or next-heartbeat"); + } + + const payload = (() => { + const systemEvent = + typeof opts.systemEvent === "string" + ? opts.systemEvent.trim() + : ""; + const message = + typeof opts.message === "string" ? opts.message.trim() : ""; + const chosen = [Boolean(systemEvent), Boolean(message)].filter( + Boolean, + ).length; + if (chosen !== 1) { + throw new Error( + "Choose exactly one payload: --system-event or --message", + ); + } + if (systemEvent) + return { kind: "systemEvent" as const, text: systemEvent }; + const timeoutSeconds = opts.timeoutSeconds + ? Number.parseInt(String(opts.timeoutSeconds), 10) + : undefined; + return { + kind: "agentTurn" as const, + message, + thinking: + typeof opts.thinking === "string" && opts.thinking.trim() + ? opts.thinking.trim() + : undefined, + timeoutSeconds: + timeoutSeconds && Number.isFinite(timeoutSeconds) + ? timeoutSeconds + : undefined, + deliver: Boolean(opts.deliver), + channel: typeof opts.channel === "string" ? opts.channel : "last", + to: + typeof opts.to === "string" && opts.to.trim() + ? opts.to.trim() + : undefined, + bestEffortDeliver: Boolean(opts.bestEffortDeliver), + }; + })(); + + if (sessionTarget === "isolated" && payload.kind !== "agentTurn") { + throw new Error( + "Isolated jobs require --message (agentTurn payload).", + ); + } + + const isolation = opts.postToMain + ? { + postToMain: true, + postToMainPrefix: String(opts.postPrefix ?? "Cron"), + } + : undefined; + + const params = { + name: + typeof opts.name === "string" && opts.name.trim() + ? opts.name.trim() + : undefined, + enabled: !opts.disabled, + schedule, + sessionTarget, + wakeMode, + payload, + isolation, + }; + + const res = await callGatewayFromCli("cron.add", opts, params); + defaultRuntime.log(JSON.stringify(res, null, 2)); + } catch (err) { + defaultRuntime.error(danger(String(err))); + defaultRuntime.exit(1); + } + }), + ); + + addGatewayClientOptions( + cron + .command("rm") + .description("Remove a cron job") + .argument("", "Job id") + .option("--json", "Output JSON", false) + .action(async (id, opts) => { + try { + const res = await callGatewayFromCli("cron.remove", opts, { id }); + defaultRuntime.log(JSON.stringify(res, null, 2)); + } catch (err) { + defaultRuntime.error(danger(String(err))); + defaultRuntime.exit(1); + } + }), + ); + + addGatewayClientOptions( + cron + .command("edit") + .description("Edit a cron job (patch fields)") + .argument("", "Job id") + .option("--name ", "Set name") + .option("--enable", "Enable job", false) + .option("--disable", "Disable job", false) + .option("--session ", "Session target (main|isolated)") + .option("--wake ", "Wake mode (now|next-heartbeat)") + .option("--at ", "Set one-shot time (ISO) or duration like 20m") + .option("--every ", "Set interval duration like 10m") + .option("--cron ", "Set cron expression") + .option("--tz ", "Timezone for cron expressions (IANA)") + .option("--system-event ", "Set systemEvent payload") + .option("--message ", "Set agentTurn payload message") + .option("--thinking ", "Thinking level for agent jobs") + .option("--timeout-seconds ", "Timeout seconds for agent jobs") + .option("--deliver", "Deliver agent output", false) + .option( + "--channel ", + "Delivery channel (last|whatsapp|telegram)", + ) + .option("--to ", "Delivery destination") + .option( + "--best-effort-deliver", + "Do not fail job if delivery fails", + false, + ) + .option("--post-to-main", "Post a 1-line summary to main session", false) + .option("--post-prefix ", "Prefix for summary system event") + .action(async (id, opts) => { + try { + const patch: Record = {}; + if (typeof opts.name === "string") patch.name = opts.name; + if (opts.enable && opts.disable) + throw new Error("Choose --enable or --disable, not both"); + if (opts.enable) patch.enabled = true; + if (opts.disable) patch.enabled = false; + if (typeof opts.session === "string") + patch.sessionTarget = opts.session; + if (typeof opts.wake === "string") patch.wakeMode = opts.wake; + + const scheduleChosen = [opts.at, opts.every, opts.cron].filter( + Boolean, + ).length; + if (scheduleChosen > 1) + throw new Error("Choose at most one schedule change"); + if (opts.at) { + const atMs = parseAtMs(String(opts.at)); + if (!atMs) throw new Error("Invalid --at"); + patch.schedule = { kind: "at", atMs }; + } else if (opts.every) { + const everyMs = parseDurationMs(String(opts.every)); + if (!everyMs) throw new Error("Invalid --every"); + patch.schedule = { kind: "every", everyMs }; + } else if (opts.cron) { + patch.schedule = { + kind: "cron", + expr: String(opts.cron), + tz: + typeof opts.tz === "string" && opts.tz.trim() + ? opts.tz.trim() + : undefined, + }; + } + + const payloadChosen = [opts.systemEvent, opts.message].filter( + Boolean, + ).length; + if (payloadChosen > 1) + throw new Error("Choose at most one payload change"); + if (opts.systemEvent) { + patch.payload = { + kind: "systemEvent", + text: String(opts.systemEvent), + }; + } else if (opts.message) { + const timeoutSeconds = opts.timeoutSeconds + ? Number.parseInt(String(opts.timeoutSeconds), 10) + : undefined; + patch.payload = { + kind: "agentTurn", + message: String(opts.message), + thinking: + typeof opts.thinking === "string" ? opts.thinking : undefined, + timeoutSeconds: + timeoutSeconds && Number.isFinite(timeoutSeconds) + ? timeoutSeconds + : undefined, + deliver: Boolean(opts.deliver), + channel: + typeof opts.channel === "string" ? opts.channel : undefined, + to: typeof opts.to === "string" ? opts.to : undefined, + bestEffortDeliver: Boolean(opts.bestEffortDeliver), + }; + } + + if (opts.postToMain) { + patch.isolation = { + postToMain: true, + postToMainPrefix: + typeof opts.postPrefix === "string" ? opts.postPrefix : "Cron", + }; + } + + const res = await callGatewayFromCli("cron.update", opts, { + id, + patch, + }); + defaultRuntime.log(JSON.stringify(res, null, 2)); + } catch (err) { + defaultRuntime.error(danger(String(err))); + defaultRuntime.exit(1); + } + }), + ); + + addGatewayClientOptions( + cron + .command("run") + .description("Run a cron job now (debug)") + .argument("", "Job id") + .option("--force", "Run even if not due", false) + .action(async (id, opts) => { + try { + const res = await callGatewayFromCli("cron.run", opts, { + id, + mode: opts.force ? "force" : "due", + }); + defaultRuntime.log(JSON.stringify(res, null, 2)); + } catch (err) { + defaultRuntime.error(danger(String(err))); + defaultRuntime.exit(1); + } + }), + ); +} diff --git a/src/cli/gateway-cli.ts b/src/cli/gateway-cli.ts new file mode 100644 index 000000000..973e7b39c --- /dev/null +++ b/src/cli/gateway-cli.ts @@ -0,0 +1,275 @@ +import type { Command } from "commander"; +import { callGateway, randomIdempotencyKey } from "../gateway/call.js"; +import { startGatewayServer } from "../gateway/server.js"; +import { info, setVerbose } from "../globals.js"; +import { GatewayLockError } from "../infra/gateway-lock.js"; +import { defaultRuntime } from "../runtime.js"; +import { createDefaultDeps } from "./deps.js"; +import { forceFreePort } from "./ports.js"; + +type GatewayRpcOpts = { + url?: string; + token?: string; + timeout?: string; + expectFinal?: boolean; +}; + +const gatewayCallOpts = (cmd: Command) => + cmd + .option("--url ", "Gateway WebSocket URL", "ws://127.0.0.1:18789") + .option("--token ", "Gateway token (if required)") + .option("--timeout ", "Timeout in ms", "10000") + .option("--expect-final", "Wait for final response (agent)", false); + +const callGatewayCli = async ( + method: string, + opts: GatewayRpcOpts, + params?: unknown, +) => + callGateway({ + url: opts.url, + token: opts.token, + method, + params, + expectFinal: Boolean(opts.expectFinal), + timeoutMs: Number(opts.timeout ?? 10_000), + clientName: "cli", + mode: "cli", + }); + +export function registerGatewayCli(program: Command) { + const gateway = program + .command("gateway") + .description("Run the WebSocket Gateway") + .option("--port ", "Port for the gateway WebSocket", "18789") + .option( + "--webchat-port ", + "Port for the loopback WebChat HTTP server (default 18788)", + ) + .option( + "--token ", + "Shared token required in connect.params.auth.token (default: CLAWDIS_GATEWAY_TOKEN env if set)", + ) + .option( + "--force", + "Kill any existing listener on the target port before starting", + false, + ) + .option("--verbose", "Verbose logging to stdout/stderr", false) + .action(async (opts) => { + setVerbose(Boolean(opts.verbose)); + const port = Number.parseInt(String(opts.port ?? "18789"), 10); + if (Number.isNaN(port) || port <= 0) { + defaultRuntime.error("Invalid port"); + defaultRuntime.exit(1); + } + const webchatPort = opts.webchatPort + ? Number.parseInt(String(opts.webchatPort), 10) + : undefined; + if ( + webchatPort !== undefined && + (Number.isNaN(webchatPort) || webchatPort <= 0) + ) { + defaultRuntime.error("Invalid webchat port"); + defaultRuntime.exit(1); + } + if (opts.force) { + try { + const killed = forceFreePort(port); + if (killed.length === 0) { + defaultRuntime.log(info(`Force: no listeners on port ${port}`)); + } else { + for (const proc of killed) { + defaultRuntime.log( + info( + `Force: killed pid ${proc.pid}${proc.command ? ` (${proc.command})` : ""} on port ${port}`, + ), + ); + } + await new Promise((resolve) => setTimeout(resolve, 200)); + } + } catch (err) { + defaultRuntime.error(`Force: ${String(err)}`); + defaultRuntime.exit(1); + return; + } + } + if (opts.token) { + process.env.CLAWDIS_GATEWAY_TOKEN = String(opts.token); + } + + let server: Awaited> | null = null; + let shuttingDown = false; + let forceExitTimer: ReturnType | null = null; + + const onSigterm = () => shutdown("SIGTERM"); + const onSigint = () => shutdown("SIGINT"); + + const shutdown = (signal: string) => { + // Ensure we don't leak listeners across restarts/tests. + process.removeListener("SIGTERM", onSigterm); + process.removeListener("SIGINT", onSigint); + + if (shuttingDown) { + defaultRuntime.log( + info(`gateway: received ${signal} during shutdown; exiting now`), + ); + defaultRuntime.exit(0); + } + shuttingDown = true; + defaultRuntime.log(info(`gateway: received ${signal}; shutting down`)); + + // Avoid hanging forever if a provider task ignores abort. + forceExitTimer = setTimeout(() => { + defaultRuntime.error( + "gateway: shutdown timed out; exiting without full cleanup", + ); + defaultRuntime.exit(0); + }, 5000); + + void (async () => { + try { + await server?.close(); + } catch (err) { + defaultRuntime.error(`gateway: shutdown error: ${String(err)}`); + } finally { + if (forceExitTimer) clearTimeout(forceExitTimer); + defaultRuntime.exit(0); + } + })(); + }; + + process.once("SIGTERM", onSigterm); + process.once("SIGINT", onSigint); + + try { + server = await startGatewayServer(port, { webchatPort }); + } catch (err) { + if (err instanceof GatewayLockError) { + defaultRuntime.error(`Gateway failed to start: ${err.message}`); + defaultRuntime.exit(1); + return; + } + defaultRuntime.error(`Gateway failed to start: ${String(err)}`); + defaultRuntime.exit(1); + } + // Keep process alive + await new Promise(() => {}); + }); + + gatewayCallOpts( + gateway + .command("call") + .description("Call a Gateway method and print JSON") + .argument( + "", + "Method name (health/status/system-presence/send/agent/cron.*)", + ) + .option("--params ", "JSON object string for params", "{}") + .action(async (method, opts) => { + try { + const params = JSON.parse(String(opts.params ?? "{}")); + const result = await callGatewayCli(method, opts, params); + defaultRuntime.log(JSON.stringify(result, null, 2)); + } catch (err) { + defaultRuntime.error(`Gateway call failed: ${String(err)}`); + defaultRuntime.exit(1); + } + }), + ); + + gatewayCallOpts( + gateway + .command("health") + .description("Fetch Gateway health") + .action(async (opts) => { + try { + const result = await callGatewayCli("health", opts); + defaultRuntime.log(JSON.stringify(result, null, 2)); + } catch (err) { + defaultRuntime.error(String(err)); + defaultRuntime.exit(1); + } + }), + ); + + gatewayCallOpts( + gateway + .command("status") + .description("Fetch Gateway status") + .action(async (opts) => { + try { + const result = await callGatewayCli("status", opts); + defaultRuntime.log(JSON.stringify(result, null, 2)); + } catch (err) { + defaultRuntime.error(String(err)); + defaultRuntime.exit(1); + } + }), + ); + + gatewayCallOpts( + gateway + .command("send") + .description("Send a message via the Gateway") + .requiredOption("--to ", "Destination (E.164 or jid)") + .requiredOption("--message ", "Message text") + .option("--media-url ", "Optional media URL") + .option("--idempotency-key ", "Idempotency key") + .action(async (opts) => { + try { + const idempotencyKey = opts.idempotencyKey ?? randomIdempotencyKey(); + const result = await callGatewayCli("send", opts, { + to: opts.to, + message: opts.message, + mediaUrl: opts.mediaUrl, + idempotencyKey, + }); + defaultRuntime.log(JSON.stringify(result, null, 2)); + } catch (err) { + defaultRuntime.error(String(err)); + defaultRuntime.exit(1); + } + }), + ); + + gatewayCallOpts( + gateway + .command("agent") + .description("Run an agent turn via the Gateway (waits for final)") + .requiredOption("--message ", "User message") + .option("--to ", "Destination") + .option("--session-id ", "Session id") + .option("--thinking ", "Thinking level") + .option("--deliver", "Deliver response", false) + .option("--timeout-seconds ", "Agent timeout seconds") + .option("--idempotency-key ", "Idempotency key") + .action(async (opts) => { + try { + const idempotencyKey = opts.idempotencyKey ?? randomIdempotencyKey(); + const result = await callGatewayCli( + "agent", + { ...opts, expectFinal: true }, + { + message: opts.message, + to: opts.to, + sessionId: opts.sessionId, + thinking: opts.thinking, + deliver: Boolean(opts.deliver), + timeout: opts.timeoutSeconds + ? Number.parseInt(String(opts.timeoutSeconds), 10) + : undefined, + idempotencyKey, + }, + ); + defaultRuntime.log(JSON.stringify(result, null, 2)); + } catch (err) { + defaultRuntime.error(String(err)); + defaultRuntime.exit(1); + } + }), + ); + + // Build default deps (keeps parity with other commands; future-proofing). + void createDefaultDeps(); +} diff --git a/src/cli/gateway-rpc.ts b/src/cli/gateway-rpc.ts new file mode 100644 index 000000000..293b97b6c --- /dev/null +++ b/src/cli/gateway-rpc.ts @@ -0,0 +1,35 @@ +import type { Command } from "commander"; +import { callGateway } from "../gateway/call.js"; + +export type GatewayRpcOpts = { + url?: string; + token?: string; + timeout?: string; + expectFinal?: boolean; +}; + +export function addGatewayClientOptions(cmd: Command) { + return cmd + .option("--url ", "Gateway WebSocket URL", "ws://127.0.0.1:18789") + .option("--token ", "Gateway token (if required)") + .option("--timeout ", "Timeout in ms", "10000") + .option("--expect-final", "Wait for final response (agent)", false); +} + +export async function callGatewayFromCli( + method: string, + opts: GatewayRpcOpts, + params?: unknown, + extra?: { expectFinal?: boolean }, +) { + return await callGateway({ + url: opts.url, + token: opts.token, + method, + params, + expectFinal: extra?.expectFinal ?? Boolean(opts.expectFinal), + timeoutMs: Number(opts.timeout ?? 10_000), + clientName: "cli", + mode: "cli", + }); +} diff --git a/src/cli/program.ts b/src/cli/program.ts index f3dee2ca3..83e55f3be 100644 --- a/src/cli/program.ts +++ b/src/cli/program.ts @@ -5,15 +5,14 @@ import { healthCommand } from "../commands/health.js"; import { sendCommand } from "../commands/send.js"; import { sessionsCommand } from "../commands/sessions.js"; import { statusCommand } from "../commands/status.js"; -import { callGateway, randomIdempotencyKey } from "../gateway/call.js"; -import { startGatewayServer } from "../gateway/server.js"; import { danger, info, setVerbose } from "../globals.js"; -import { GatewayLockError } from "../infra/gateway-lock.js"; import { loginWeb, logoutWeb } from "../provider-web.js"; import { defaultRuntime } from "../runtime.js"; import { VERSION } from "../version.js"; import { startWebChatServer } from "../webchat/server.js"; +import { registerCronCli } from "./cron-cli.js"; import { createDefaultDeps } from "./deps.js"; +import { registerGatewayCli } from "./gateway-cli.js"; import { forceFreePort } from "./ports.js"; export { forceFreePort }; @@ -209,266 +208,8 @@ Examples: } }); - program; - const gateway = program - .command("gateway") - .description("Run the WebSocket Gateway") - .option("--port ", "Port for the gateway WebSocket", "18789") - .option( - "--webchat-port ", - "Port for the loopback WebChat HTTP server (default 18788)", - ) - .option( - "--token ", - "Shared token required in connect.params.auth.token (default: CLAWDIS_GATEWAY_TOKEN env if set)", - ) - .option( - "--force", - "Kill any existing listener on the target port before starting", - false, - ) - .option("--verbose", "Verbose logging to stdout/stderr", false) - .action(async (opts) => { - setVerbose(Boolean(opts.verbose)); - const port = Number.parseInt(String(opts.port ?? "18789"), 10); - if (Number.isNaN(port) || port <= 0) { - defaultRuntime.error("Invalid port"); - defaultRuntime.exit(1); - } - const webchatPort = opts.webchatPort - ? Number.parseInt(String(opts.webchatPort), 10) - : undefined; - if ( - webchatPort !== undefined && - (Number.isNaN(webchatPort) || webchatPort <= 0) - ) { - defaultRuntime.error("Invalid webchat port"); - defaultRuntime.exit(1); - } - if (opts.force) { - try { - const killed = forceFreePort(port); - if (killed.length === 0) { - defaultRuntime.log(info(`Force: no listeners on port ${port}`)); - } else { - for (const proc of killed) { - defaultRuntime.log( - info( - `Force: killed pid ${proc.pid}${proc.command ? ` (${proc.command})` : ""} on port ${port}`, - ), - ); - } - await new Promise((resolve) => setTimeout(resolve, 200)); - } - } catch (err) { - defaultRuntime.error(`Force: ${String(err)}`); - defaultRuntime.exit(1); - return; - } - } - if (opts.token) { - process.env.CLAWDIS_GATEWAY_TOKEN = String(opts.token); - } - - let server: Awaited> | null = null; - let shuttingDown = false; - let forceExitTimer: ReturnType | null = null; - - const onSigterm = () => shutdown("SIGTERM"); - const onSigint = () => shutdown("SIGINT"); - - const shutdown = (signal: string) => { - // Ensure we don't leak listeners across restarts/tests. - process.removeListener("SIGTERM", onSigterm); - process.removeListener("SIGINT", onSigint); - - if (shuttingDown) { - defaultRuntime.log( - info(`gateway: received ${signal} during shutdown; exiting now`), - ); - defaultRuntime.exit(0); - } - shuttingDown = true; - defaultRuntime.log(info(`gateway: received ${signal}; shutting down`)); - - // Avoid hanging forever if a provider task ignores abort. - forceExitTimer = setTimeout(() => { - defaultRuntime.error( - "gateway: shutdown timed out; exiting without full cleanup", - ); - defaultRuntime.exit(0); - }, 5000); - - void (async () => { - try { - await server?.close(); - } catch (err) { - defaultRuntime.error(`gateway: shutdown error: ${String(err)}`); - } finally { - if (forceExitTimer) clearTimeout(forceExitTimer); - defaultRuntime.exit(0); - } - })(); - }; - - process.once("SIGTERM", onSigterm); - process.once("SIGINT", onSigint); - - try { - server = await startGatewayServer(port, { webchatPort }); - } catch (err) { - if (err instanceof GatewayLockError) { - defaultRuntime.error(`Gateway failed to start: ${err.message}`); - defaultRuntime.exit(1); - return; - } - defaultRuntime.error(`Gateway failed to start: ${String(err)}`); - defaultRuntime.exit(1); - } - // Keep process alive - await new Promise(() => {}); - }); - - const gatewayCallOpts = (cmd: Command) => - cmd - .option("--url ", "Gateway WebSocket URL", "ws://127.0.0.1:18789") - .option("--token ", "Gateway token (if required)") - .option("--timeout ", "Timeout in ms", "10000") - .option("--expect-final", "Wait for final response (agent)", false); - - const callGatewayCli = async ( - method: string, - opts: { - url?: string; - token?: string; - timeout?: string; - expectFinal?: boolean; - }, - params?: unknown, - ) => - callGateway({ - url: opts.url, - token: opts.token, - method, - params, - expectFinal: Boolean(opts.expectFinal), - timeoutMs: Number(opts.timeout ?? 10_000), - clientName: "cli", - mode: "cli", - }); - - gatewayCallOpts( - gateway - .command("call") - .description("Call a Gateway method and print JSON") - .argument( - "", - "Method name (health/status/system-presence/send/agent)", - ) - .option("--params ", "JSON object string for params", "{}") - .action(async (method, opts) => { - try { - const params = JSON.parse(String(opts.params ?? "{}")); - const result = await callGatewayCli(method, opts, params); - defaultRuntime.log(JSON.stringify(result, null, 2)); - } catch (err) { - defaultRuntime.error(`Gateway call failed: ${String(err)}`); - defaultRuntime.exit(1); - } - }), - ); - - gatewayCallOpts( - gateway - .command("health") - .description("Fetch Gateway health") - .action(async (opts) => { - try { - const result = await callGatewayCli("health", opts); - defaultRuntime.log(JSON.stringify(result, null, 2)); - } catch (err) { - defaultRuntime.error(String(err)); - defaultRuntime.exit(1); - } - }), - ); - - gatewayCallOpts( - gateway - .command("status") - .description("Fetch Gateway status") - .action(async (opts) => { - try { - const result = await callGatewayCli("status", opts); - defaultRuntime.log(JSON.stringify(result, null, 2)); - } catch (err) { - defaultRuntime.error(String(err)); - defaultRuntime.exit(1); - } - }), - ); - - gatewayCallOpts( - gateway - .command("send") - .description("Send a message via the Gateway") - .requiredOption("--to ", "Destination (E.164 or jid)") - .requiredOption("--message ", "Message text") - .option("--media-url ", "Optional media URL") - .option("--idempotency-key ", "Idempotency key") - .action(async (opts) => { - try { - const idempotencyKey = opts.idempotencyKey ?? randomIdempotencyKey(); - const result = await callGatewayCli("send", opts, { - to: opts.to, - message: opts.message, - mediaUrl: opts.mediaUrl, - idempotencyKey, - }); - defaultRuntime.log(JSON.stringify(result, null, 2)); - } catch (err) { - defaultRuntime.error(String(err)); - defaultRuntime.exit(1); - } - }), - ); - - gatewayCallOpts( - gateway - .command("agent") - .description("Run an agent turn via the Gateway (waits for final)") - .requiredOption("--message ", "User message") - .option("--to ", "Destination") - .option("--session-id ", "Session id") - .option("--thinking ", "Thinking level") - .option("--deliver", "Deliver response", false) - .option("--timeout-seconds ", "Agent timeout seconds") - .option("--idempotency-key ", "Idempotency key") - .action(async (opts) => { - try { - const idempotencyKey = opts.idempotencyKey ?? randomIdempotencyKey(); - const result = await callGatewayCli( - "agent", - { ...opts, expectFinal: true }, - { - message: opts.message, - to: opts.to, - sessionId: opts.sessionId, - thinking: opts.thinking, - deliver: Boolean(opts.deliver), - timeout: opts.timeoutSeconds - ? Number.parseInt(String(opts.timeoutSeconds), 10) - : undefined, - idempotencyKey, - }, - ); - defaultRuntime.log(JSON.stringify(result, null, 2)); - } catch (err) { - defaultRuntime.error(String(err)); - defaultRuntime.exit(1); - } - }), - ); + registerGatewayCli(program); + registerCronCli(program); program .command("status") .description("Show web session health and recent session recipients") diff --git a/src/config/config.ts b/src/config/config.ts index 2f2719a78..1a7b05348 100644 --- a/src/config/config.ts +++ b/src/config/config.ts @@ -49,6 +49,12 @@ export type WebChatConfig = { port?: number; }; +export type CronConfig = { + enabled?: boolean; + store?: string; + maxConcurrentRuns?: number; +}; + export type TelegramConfig = { botToken?: string; requireMention?: boolean; @@ -107,6 +113,7 @@ export type ClawdisConfig = { web?: WebConfig; telegram?: TelegramConfig; webchat?: WebChatConfig; + cron?: CronConfig; }; // New branding path (preferred) @@ -218,6 +225,13 @@ const ClawdisSchema = z.object({ reply: ReplySchema.optional(), }) .optional(), + cron: z + .object({ + enabled: z.boolean().optional(), + store: z.string().optional(), + maxConcurrentRuns: z.number().int().positive().optional(), + }) + .optional(), web: z .object({ heartbeatSeconds: z.number().int().positive().optional(), diff --git a/src/cron/isolated-agent.ts b/src/cron/isolated-agent.ts new file mode 100644 index 000000000..f3816df95 --- /dev/null +++ b/src/cron/isolated-agent.ts @@ -0,0 +1,341 @@ +import crypto from "node:crypto"; + +import { chunkText } from "../auto-reply/chunk.js"; +import { runCommandReply } from "../auto-reply/command-reply.js"; +import { + applyTemplate, + type TemplateContext, +} from "../auto-reply/templating.js"; +import { normalizeThinkLevel } from "../auto-reply/thinking.js"; +import type { CliDeps } from "../cli/deps.js"; +import type { ClawdisConfig } from "../config/config.js"; +import { + DEFAULT_IDLE_MINUTES, + loadSessionStore, + resolveStorePath, + type SessionEntry, + saveSessionStore, +} from "../config/sessions.js"; +import { enqueueCommandInLane } from "../process/command-queue.js"; +import { normalizeE164 } from "../utils.js"; +import type { CronJob } from "./types.js"; + +export type RunCronAgentTurnResult = { + status: "ok" | "error" | "skipped"; + summary?: string; +}; + +function assertCommandReplyConfig(cfg: ClawdisConfig) { + const reply = cfg.inbound?.reply; + if (!reply || reply.mode !== "command" || !reply.command?.length) { + throw new Error( + "Configure inbound.reply.mode=command with reply.command before using cron agent jobs.", + ); + } + return reply as NonNullable< + NonNullable["reply"] + > & { + mode: "command"; + command: string[]; + }; +} + +function pickSummaryFromOutput(text: string | undefined) { + const clean = (text ?? "").trim(); + if (!clean) return undefined; + const oneLine = clean.replace(/\s+/g, " "); + return oneLine.length > 200 ? `${oneLine.slice(0, 200)}…` : oneLine; +} + +function resolveDeliveryTarget( + cfg: ClawdisConfig, + jobPayload: { + channel?: "last" | "whatsapp" | "telegram"; + to?: string; + }, +) { + const requestedChannel = + typeof jobPayload.channel === "string" ? jobPayload.channel : "last"; + const explicitTo = + typeof jobPayload.to === "string" && jobPayload.to.trim() + ? jobPayload.to.trim() + : undefined; + + const sessionCfg = cfg.inbound?.reply?.session; + const mainKey = (sessionCfg?.mainKey ?? "main").trim() || "main"; + const storePath = resolveStorePath(sessionCfg?.store); + const store = loadSessionStore(storePath); + const main = store[mainKey]; + const lastChannel = + main?.lastChannel && main.lastChannel !== "webchat" + ? main.lastChannel + : undefined; + const lastTo = typeof main?.lastTo === "string" ? main.lastTo.trim() : ""; + + const channel = (() => { + if (requestedChannel === "whatsapp" || requestedChannel === "telegram") { + return requestedChannel; + } + return lastChannel ?? "whatsapp"; + })(); + + const to = (() => { + if (explicitTo) return explicitTo; + return lastTo || undefined; + })(); + + const sanitizedWhatsappTo = (() => { + if (channel !== "whatsapp") return to; + const rawAllow = cfg.inbound?.allowFrom ?? []; + if (rawAllow.includes("*")) return to; + const allowFrom = rawAllow + .map((val) => normalizeE164(val)) + .filter((val) => val.length > 1); + if (allowFrom.length === 0) return to; + if (!to) return allowFrom[0]; + const normalized = normalizeE164(to); + if (allowFrom.includes(normalized)) return normalized; + return allowFrom[0]; + })(); + + return { + channel, + to: channel === "whatsapp" ? sanitizedWhatsappTo : to, + }; +} + +function resolveCronSession(params: { + cfg: ClawdisConfig; + sessionKey: string; + nowMs: number; +}) { + const sessionCfg = params.cfg.inbound?.reply?.session; + const idleMinutes = Math.max( + sessionCfg?.idleMinutes ?? DEFAULT_IDLE_MINUTES, + 1, + ); + const idleMs = idleMinutes * 60_000; + const storePath = resolveStorePath(sessionCfg?.store); + const store = loadSessionStore(storePath); + const entry = store[params.sessionKey]; + const fresh = entry && params.nowMs - entry.updatedAt <= idleMs; + const sessionId = fresh ? entry.sessionId : crypto.randomUUID(); + const systemSent = fresh ? Boolean(entry.systemSent) : false; + const sessionEntry: SessionEntry = { + sessionId, + updatedAt: params.nowMs, + systemSent, + thinkingLevel: entry?.thinkingLevel, + verboseLevel: entry?.verboseLevel, + model: entry?.model, + contextTokens: entry?.contextTokens, + lastChannel: entry?.lastChannel, + lastTo: entry?.lastTo, + syncing: entry?.syncing, + }; + return { storePath, store, sessionEntry, systemSent, isNewSession: !fresh }; +} + +export async function runCronIsolatedAgentTurn(params: { + cfg: ClawdisConfig; + deps: CliDeps; + job: CronJob; + message: string; + sessionKey: string; + lane?: string; +}): Promise { + const replyCfg = assertCommandReplyConfig(params.cfg); + const now = Date.now(); + const cronSession = resolveCronSession({ + cfg: params.cfg, + sessionKey: params.sessionKey, + nowMs: now, + }); + const sendSystemOnce = replyCfg.session?.sendSystemOnce === true; + const isFirstTurnInSession = + cronSession.isNewSession || !cronSession.systemSent; + const sessionIntro = replyCfg.session?.sessionIntro + ? applyTemplate(replyCfg.session.sessionIntro, { + SessionId: cronSession.sessionEntry.sessionId, + }) + : ""; + const bodyPrefix = replyCfg.bodyPrefix + ? applyTemplate(replyCfg.bodyPrefix, { + SessionId: cronSession.sessionEntry.sessionId, + }) + : ""; + + const thinkOverride = normalizeThinkLevel(replyCfg.thinkingDefault); + const jobThink = normalizeThinkLevel( + (params.job.payload.kind === "agentTurn" + ? params.job.payload.thinking + : undefined) ?? undefined, + ); + const thinkLevel = jobThink ?? thinkOverride; + + const timeoutSecondsRaw = + params.job.payload.kind === "agentTurn" && params.job.payload.timeoutSeconds + ? params.job.payload.timeoutSeconds + : (replyCfg.timeoutSeconds ?? 600); + const timeoutSeconds = Math.max(Math.floor(timeoutSecondsRaw), 1); + const timeoutMs = timeoutSeconds * 1000; + + const delivery = + params.job.payload.kind === "agentTurn" && + params.job.payload.deliver === true; + const bestEffortDeliver = + params.job.payload.kind === "agentTurn" && + params.job.payload.bestEffortDeliver === true; + + const resolvedDelivery = resolveDeliveryTarget(params.cfg, { + channel: + params.job.payload.kind === "agentTurn" + ? params.job.payload.channel + : "last", + to: + params.job.payload.kind === "agentTurn" + ? params.job.payload.to + : undefined, + }); + + const base = + `[cron:${params.job.id}${params.job.name ? ` ${params.job.name}` : ""}] ${params.message}`.trim(); + + let commandBody = base; + if (!sendSystemOnce || isFirstTurnInSession) { + commandBody = bodyPrefix ? `${bodyPrefix}${commandBody}` : commandBody; + } + if (sessionIntro) { + commandBody = `${sessionIntro}\n\n${commandBody}`; + } + + const templatingCtx: TemplateContext = { + Body: commandBody, + BodyStripped: commandBody, + SessionId: cronSession.sessionEntry.sessionId, + From: resolvedDelivery.to ?? "", + To: resolvedDelivery.to ?? "", + Surface: "Cron", + IsNewSession: cronSession.isNewSession ? "true" : "false", + }; + + // Persist systemSent before the run, mirroring the inbound auto-reply behavior. + if (sendSystemOnce && isFirstTurnInSession) { + cronSession.sessionEntry.systemSent = true; + cronSession.store[params.sessionKey] = cronSession.sessionEntry; + await saveSessionStore(cronSession.storePath, cronSession.store); + } else { + cronSession.store[params.sessionKey] = cronSession.sessionEntry; + await saveSessionStore(cronSession.storePath, cronSession.store); + } + + const lane = params.lane?.trim() || "cron"; + + const runResult = await runCommandReply({ + reply: { ...replyCfg, mode: "command" }, + templatingCtx, + sendSystemOnce, + isNewSession: cronSession.isNewSession, + isFirstTurnInSession, + systemSent: cronSession.sessionEntry.systemSent ?? false, + timeoutMs, + timeoutSeconds, + thinkLevel, + enqueue: (task, opts) => enqueueCommandInLane(lane, task, opts), + runId: cronSession.sessionEntry.sessionId, + }); + + const payloads = runResult.payloads ?? []; + const firstText = payloads[0]?.text ?? ""; + const summary = pickSummaryFromOutput(firstText); + + if (delivery) { + if (resolvedDelivery.channel === "whatsapp") { + if (!resolvedDelivery.to) { + if (!bestEffortDeliver) { + return { + status: "error", + summary: "Cron delivery to WhatsApp requires a recipient.", + }; + } + return { + status: "skipped", + summary: "Delivery skipped (no WhatsApp recipient).", + }; + } + const to = normalizeE164(resolvedDelivery.to); + try { + for (const payload of payloads) { + const mediaList = + payload.mediaUrls ?? (payload.mediaUrl ? [payload.mediaUrl] : []); + const primaryMedia = mediaList[0]; + await params.deps.sendMessageWhatsApp(to, payload.text ?? "", { + verbose: false, + mediaUrl: primaryMedia, + }); + for (const extra of mediaList.slice(1)) { + await params.deps.sendMessageWhatsApp(to, "", { + verbose: false, + mediaUrl: extra, + }); + } + } + } catch (err) { + if (!bestEffortDeliver) throw err; + return { + status: "ok", + summary: summary + ? `${summary} (delivery failed)` + : "completed (delivery failed)", + }; + } + } else if (resolvedDelivery.channel === "telegram") { + if (!resolvedDelivery.to) { + if (!bestEffortDeliver) { + return { + status: "error", + summary: "Cron delivery to Telegram requires a chatId.", + }; + } + return { + status: "skipped", + summary: "Delivery skipped (no Telegram chatId).", + }; + } + const chatId = resolvedDelivery.to; + try { + for (const payload of payloads) { + const mediaList = + payload.mediaUrls ?? (payload.mediaUrl ? [payload.mediaUrl] : []); + if (mediaList.length === 0) { + for (const chunk of chunkText(payload.text ?? "", 4000)) { + await params.deps.sendMessageTelegram(chatId, chunk, { + verbose: false, + }); + } + } else { + let first = true; + for (const url of mediaList) { + const caption = first ? (payload.text ?? "") : ""; + first = false; + await params.deps.sendMessageTelegram(chatId, caption, { + verbose: false, + mediaUrl: url, + }); + } + } + } + } catch (err) { + if (!bestEffortDeliver) throw err; + return { + status: "ok", + summary: summary + ? `${summary} (delivery failed)` + : "completed (delivery failed)", + }; + } + } + } + + return { status: "ok", summary }; +} diff --git a/src/cron/run-log.test.ts b/src/cron/run-log.test.ts new file mode 100644 index 000000000..c2492ac88 --- /dev/null +++ b/src/cron/run-log.test.ts @@ -0,0 +1,98 @@ +import fs from "node:fs/promises"; +import os from "node:os"; +import path from "node:path"; + +import { describe, expect, it } from "vitest"; + +import { + appendCronRunLog, + readCronRunLogEntries, + resolveCronRunLogPath, +} from "./run-log.js"; + +describe("cron run log", () => { + it("resolves a flat store path to cron.runs.jsonl", () => { + const storePath = path.join(os.tmpdir(), "cron.json"); + const p = resolveCronRunLogPath({ storePath, jobId: "job-1" }); + expect(p.endsWith(path.join(os.tmpdir(), "cron.runs.jsonl"))).toBe(true); + }); + + it("resolves jobs.json to per-job runs/.jsonl", () => { + const storePath = path.join(os.tmpdir(), "cron", "jobs.json"); + const p = resolveCronRunLogPath({ storePath, jobId: "job-1" }); + expect( + p.endsWith(path.join(os.tmpdir(), "cron", "runs", "job-1.jsonl")), + ).toBe(true); + }); + + it("appends JSONL and prunes by line count", async () => { + const dir = await fs.mkdtemp(path.join(os.tmpdir(), "clawdis-cron-log-")); + const logPath = path.join(dir, "cron.runs.jsonl"); + + for (let i = 0; i < 10; i++) { + await appendCronRunLog( + logPath, + { + ts: 1000 + i, + jobId: "job-1", + action: "finished", + status: "ok", + durationMs: i, + }, + { maxBytes: 1, keepLines: 3 }, + ); + } + + const raw = await fs.readFile(logPath, "utf-8"); + const lines = raw + .split("\n") + .map((l) => l.trim()) + .filter(Boolean); + expect(lines.length).toBe(3); + const last = JSON.parse(lines[2] ?? "{}") as { ts?: number }; + expect(last.ts).toBe(1009); + + await fs.rm(dir, { recursive: true, force: true }); + }); + + it("reads newest entries and filters by jobId", async () => { + const dir = await fs.mkdtemp( + path.join(os.tmpdir(), "clawdis-cron-log-read-"), + ); + const logPath = path.join(dir, "cron.runs.jsonl"); + + await appendCronRunLog(logPath, { + ts: 1, + jobId: "a", + action: "finished", + status: "ok", + }); + await appendCronRunLog(logPath, { + ts: 2, + jobId: "b", + action: "finished", + status: "error", + error: "nope", + }); + await appendCronRunLog(logPath, { + ts: 3, + jobId: "a", + action: "finished", + status: "skipped", + }); + + const all = await readCronRunLogEntries(logPath, { limit: 10 }); + expect(all.map((e) => e.jobId)).toEqual(["a", "b", "a"]); + + const onlyA = await readCronRunLogEntries(logPath, { + limit: 10, + jobId: "a", + }); + expect(onlyA.map((e) => e.ts)).toEqual([1, 3]); + + const lastOne = await readCronRunLogEntries(logPath, { limit: 1 }); + expect(lastOne.map((e) => e.ts)).toEqual([3]); + + await fs.rm(dir, { recursive: true, force: true }); + }); +}); diff --git a/src/cron/run-log.ts b/src/cron/run-log.ts new file mode 100644 index 000000000..4dc247325 --- /dev/null +++ b/src/cron/run-log.ts @@ -0,0 +1,101 @@ +import fs from "node:fs/promises"; +import path from "node:path"; + +export type CronRunLogEntry = { + ts: number; + jobId: string; + action: "finished"; + status?: "ok" | "error" | "skipped"; + error?: string; + runAtMs?: number; + durationMs?: number; + nextRunAtMs?: number; +}; + +export function resolveCronRunLogPath(params: { + storePath: string; + jobId: string; +}) { + const storePath = path.resolve(params.storePath); + const dir = path.dirname(storePath); + const base = path.basename(storePath); + if (base === "jobs.json") { + return path.join(dir, "runs", `${params.jobId}.jsonl`); + } + + const ext = path.extname(base); + const baseNoExt = ext ? base.slice(0, -ext.length) : base; + return path.join(dir, `${baseNoExt}.runs.jsonl`); +} + +const writesByPath = new Map>(); + +async function pruneIfNeeded( + filePath: string, + opts: { maxBytes: number; keepLines: number }, +) { + const stat = await fs.stat(filePath).catch(() => null); + if (!stat || stat.size <= opts.maxBytes) return; + + const raw = await fs.readFile(filePath, "utf-8").catch(() => ""); + const lines = raw + .split("\n") + .map((l) => l.trim()) + .filter(Boolean); + const kept = lines.slice(Math.max(0, lines.length - opts.keepLines)); + const tmp = `${filePath}.${process.pid}.${Math.random().toString(16).slice(2)}.tmp`; + await fs.writeFile(tmp, `${kept.join("\n")}\n`, "utf-8"); + await fs.rename(tmp, filePath); +} + +export async function appendCronRunLog( + filePath: string, + entry: CronRunLogEntry, + opts?: { maxBytes?: number; keepLines?: number }, +) { + const resolved = path.resolve(filePath); + const prev = writesByPath.get(resolved) ?? Promise.resolve(); + const next = prev + .catch(() => undefined) + .then(async () => { + await fs.mkdir(path.dirname(resolved), { recursive: true }); + await fs.appendFile(resolved, `${JSON.stringify(entry)}\n`, "utf-8"); + await pruneIfNeeded(resolved, { + maxBytes: opts?.maxBytes ?? 2_000_000, + keepLines: opts?.keepLines ?? 2_000, + }); + }); + writesByPath.set(resolved, next); + await next; +} + +export async function readCronRunLogEntries( + filePath: string, + opts?: { limit?: number; jobId?: string }, +): Promise { + const limit = Math.max(1, Math.min(5000, Math.floor(opts?.limit ?? 200))); + const jobId = opts?.jobId?.trim() || undefined; + const raw = await fs + .readFile(path.resolve(filePath), "utf-8") + .catch(() => ""); + if (!raw.trim()) return []; + const parsed: CronRunLogEntry[] = []; + const lines = raw.split("\n"); + for (let i = lines.length - 1; i >= 0 && parsed.length < limit; i--) { + const line = lines[i]?.trim(); + if (!line) continue; + try { + const obj = JSON.parse(line) as Partial | null; + if (!obj || typeof obj !== "object") continue; + if (obj.action !== "finished") continue; + if (typeof obj.jobId !== "string" || obj.jobId.trim().length === 0) + continue; + if (typeof obj.ts !== "number" || !Number.isFinite(obj.ts)) continue; + if (jobId && obj.jobId !== jobId) continue; + parsed.push(obj as CronRunLogEntry); + } catch { + // ignore invalid lines + } + } + return parsed.reverse(); +} diff --git a/src/cron/schedule.test.ts b/src/cron/schedule.test.ts new file mode 100644 index 000000000..210e2860a --- /dev/null +++ b/src/cron/schedule.test.ts @@ -0,0 +1,26 @@ +import { describe, expect, it } from "vitest"; + +import { computeNextRunAtMs } from "./schedule.js"; + +describe("cron schedule", () => { + it("computes next run for cron expression with timezone", () => { + // Saturday, Dec 13 2025 00:00:00Z + const nowMs = Date.parse("2025-12-13T00:00:00.000Z"); + const next = computeNextRunAtMs( + { kind: "cron", expr: "0 9 * * 3", tz: "America/Los_Angeles" }, + nowMs, + ); + // Next Wednesday at 09:00 PST -> 17:00Z + expect(next).toBe(Date.parse("2025-12-17T17:00:00.000Z")); + }); + + it("computes next run for every schedule", () => { + const anchor = Date.parse("2025-12-13T00:00:00.000Z"); + const now = anchor + 10_000; + const next = computeNextRunAtMs( + { kind: "every", everyMs: 30_000, anchorMs: anchor }, + now, + ); + expect(next).toBe(anchor + 30_000); + }); +}); diff --git a/src/cron/schedule.ts b/src/cron/schedule.ts new file mode 100644 index 000000000..4c4308da8 --- /dev/null +++ b/src/cron/schedule.ts @@ -0,0 +1,29 @@ +import { Cron } from "croner"; +import type { CronSchedule } from "./types.js"; + +export function computeNextRunAtMs( + schedule: CronSchedule, + nowMs: number, +): number | undefined { + if (schedule.kind === "at") { + return schedule.atMs > nowMs ? schedule.atMs : undefined; + } + + if (schedule.kind === "every") { + const everyMs = Math.max(1, Math.floor(schedule.everyMs)); + const anchor = Math.max(0, Math.floor(schedule.anchorMs ?? nowMs)); + if (nowMs <= anchor) return anchor; + const elapsed = nowMs - anchor; + const steps = Math.floor((elapsed + everyMs - 1) / everyMs); + return anchor + steps * everyMs; + } + + const expr = schedule.expr.trim(); + if (!expr) return undefined; + const cron = new Cron(expr, { + timezone: schedule.tz?.trim() || undefined, + catch: false, + }); + const next = cron.nextRun(new Date(nowMs)); + return next ? next.getTime() : undefined; +} diff --git a/src/cron/service.test.ts b/src/cron/service.test.ts new file mode 100644 index 000000000..368b4dc25 --- /dev/null +++ b/src/cron/service.test.ts @@ -0,0 +1,120 @@ +import fs from "node:fs/promises"; +import os from "node:os"; +import path from "node:path"; + +import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; + +import { CronService } from "./service.js"; + +const noopLogger = { + debug: vi.fn(), + info: vi.fn(), + warn: vi.fn(), + error: vi.fn(), +}; + +async function makeStorePath() { + const dir = await fs.mkdtemp(path.join(os.tmpdir(), "clawdis-cron-")); + return { + storePath: path.join(dir, "cron.json"), + cleanup: async () => { + await fs.rm(dir, { recursive: true, force: true }); + }, + }; +} + +describe("CronService", () => { + beforeEach(() => { + vi.useFakeTimers(); + vi.setSystemTime(new Date("2025-12-13T00:00:00.000Z")); + noopLogger.debug.mockClear(); + noopLogger.info.mockClear(); + noopLogger.warn.mockClear(); + noopLogger.error.mockClear(); + }); + + afterEach(() => { + vi.useRealTimers(); + }); + + it("runs a one-shot main job and disables it after success", async () => { + const store = await makeStorePath(); + const enqueueSystemEvent = vi.fn(); + const requestReplyHeartbeatNow = vi.fn(); + + const cron = new CronService({ + storePath: store.storePath, + cronEnabled: true, + log: noopLogger, + enqueueSystemEvent, + requestReplyHeartbeatNow, + runIsolatedAgentJob: vi.fn(async () => ({ status: "ok" })), + }); + + await cron.start(); + const atMs = Date.parse("2025-12-13T00:00:02.000Z"); + const job = await cron.add({ + enabled: true, + schedule: { kind: "at", atMs }, + sessionTarget: "main", + wakeMode: "now", + payload: { kind: "systemEvent", text: "hello" }, + }); + + expect(job.state.nextRunAtMs).toBe(atMs); + + vi.setSystemTime(new Date("2025-12-13T00:00:02.000Z")); + await vi.runOnlyPendingTimersAsync(); + + const jobs = await cron.list({ includeDisabled: true }); + const updated = jobs.find((j) => j.id === job.id); + expect(updated?.enabled).toBe(false); + expect(enqueueSystemEvent).toHaveBeenCalledWith("hello"); + expect(requestReplyHeartbeatNow).toHaveBeenCalled(); + + await cron.list({ includeDisabled: true }); + cron.stop(); + await store.cleanup(); + }); + + it("runs an isolated job and posts summary to main", async () => { + const store = await makeStorePath(); + const enqueueSystemEvent = vi.fn(); + const requestReplyHeartbeatNow = vi.fn(); + const runIsolatedAgentJob = vi.fn(async () => ({ + status: "ok" as const, + summary: "done", + })); + + const cron = new CronService({ + storePath: store.storePath, + cronEnabled: true, + log: noopLogger, + enqueueSystemEvent, + requestReplyHeartbeatNow, + runIsolatedAgentJob, + }); + + await cron.start(); + const atMs = Date.parse("2025-12-13T00:00:01.000Z"); + await cron.add({ + enabled: true, + name: "weekly", + schedule: { kind: "at", atMs }, + sessionTarget: "isolated", + wakeMode: "now", + payload: { kind: "agentTurn", message: "do it", deliver: false }, + isolation: { postToMain: true, postToMainPrefix: "Cron" }, + }); + + vi.setSystemTime(new Date("2025-12-13T00:00:01.000Z")); + await vi.runOnlyPendingTimersAsync(); + + await cron.list({ includeDisabled: true }); + expect(runIsolatedAgentJob).toHaveBeenCalledTimes(1); + expect(enqueueSystemEvent).toHaveBeenCalledWith("Cron: done"); + expect(requestReplyHeartbeatNow).toHaveBeenCalled(); + cron.stop(); + await store.cleanup(); + }); +}); diff --git a/src/cron/service.ts b/src/cron/service.ts new file mode 100644 index 000000000..267f7b59c --- /dev/null +++ b/src/cron/service.ts @@ -0,0 +1,431 @@ +import crypto from "node:crypto"; + +import { computeNextRunAtMs } from "./schedule.js"; +import { loadCronStore, saveCronStore } from "./store.js"; +import type { + CronJob, + CronJobCreate, + CronJobPatch, + CronPayload, + CronStoreFile, +} from "./types.js"; + +export type CronEvent = { + jobId: string; + action: "added" | "updated" | "removed" | "started" | "finished"; + runAtMs?: number; + durationMs?: number; + status?: "ok" | "error" | "skipped"; + error?: string; + nextRunAtMs?: number; +}; + +type Logger = { + debug: (obj: unknown, msg?: string) => void; + info: (obj: unknown, msg?: string) => void; + warn: (obj: unknown, msg?: string) => void; + error: (obj: unknown, msg?: string) => void; +}; + +export type CronServiceDeps = { + nowMs?: () => number; + log: Logger; + storePath: string; + cronEnabled: boolean; + enqueueSystemEvent: (text: string) => void; + requestReplyHeartbeatNow: (opts?: { reason?: string }) => void; + runIsolatedAgentJob: (params: { + job: CronJob; + message: string; + }) => Promise<{ status: "ok" | "error" | "skipped"; summary?: string }>; + onEvent?: (evt: CronEvent) => void; +}; + +const STUCK_RUN_MS = 2 * 60 * 60 * 1000; + +function isNonEmptyString(value: unknown): value is string { + return typeof value === "string" && value.trim().length > 0; +} + +function normalizePayloadToSystemText(payload: CronPayload) { + if (payload.kind === "systemEvent") return payload.text.trim(); + return payload.message.trim(); +} + +export class CronService { + private readonly deps: Required> & + Pick; + private store: CronStoreFile | null = null; + private timer: NodeJS.Timeout | null = null; + private running = false; + private op: Promise = Promise.resolve(); + + constructor(deps: CronServiceDeps) { + this.deps = { + ...deps, + nowMs: deps.nowMs ?? (() => Date.now()), + onEvent: deps.onEvent, + }; + } + + async start() { + await this.locked(async () => { + if (!this.deps.cronEnabled) { + this.deps.log.info({ enabled: false }, "cron: disabled"); + return; + } + await this.ensureLoaded(); + this.recomputeNextRuns(); + await this.persist(); + this.armTimer(); + this.deps.log.info( + { + enabled: true, + jobs: this.store?.jobs.length ?? 0, + nextWakeAtMs: this.nextWakeAtMs() ?? null, + }, + "cron: started", + ); + }); + } + + stop() { + if (this.timer) clearTimeout(this.timer); + this.timer = null; + } + + async list(opts?: { includeDisabled?: boolean }) { + return await this.locked(async () => { + await this.ensureLoaded(); + const includeDisabled = opts?.includeDisabled === true; + const jobs = (this.store?.jobs ?? []).filter( + (j) => includeDisabled || j.enabled, + ); + return jobs.sort( + (a, b) => (a.state.nextRunAtMs ?? 0) - (b.state.nextRunAtMs ?? 0), + ); + }); + } + + async add(input: CronJobCreate) { + return await this.locked(async () => { + await this.ensureLoaded(); + const now = this.deps.nowMs(); + const id = crypto.randomUUID(); + const job: CronJob = { + id, + name: input.name?.trim() || undefined, + enabled: input.enabled !== false, + createdAtMs: now, + updatedAtMs: now, + schedule: input.schedule, + sessionTarget: input.sessionTarget, + wakeMode: input.wakeMode, + payload: input.payload, + isolation: input.isolation, + state: { + ...input.state, + }, + }; + job.state.nextRunAtMs = this.computeJobNextRunAtMs(job, now); + this.store?.jobs.push(job); + await this.persist(); + this.armTimer(); + this.emit({ + jobId: id, + action: "added", + nextRunAtMs: job.state.nextRunAtMs, + }); + return job; + }); + } + + async update(id: string, patch: CronJobPatch) { + return await this.locked(async () => { + await this.ensureLoaded(); + const job = this.findJobOrThrow(id); + const now = this.deps.nowMs(); + + if (isNonEmptyString(patch.name)) job.name = patch.name.trim(); + if (patch.name === null || patch.name === "") job.name = undefined; + if (typeof patch.enabled === "boolean") job.enabled = patch.enabled; + if (patch.schedule) job.schedule = patch.schedule; + if (patch.sessionTarget) job.sessionTarget = patch.sessionTarget; + if (patch.wakeMode) job.wakeMode = patch.wakeMode; + if (patch.payload) job.payload = patch.payload; + if (patch.isolation) job.isolation = patch.isolation; + if (patch.state) job.state = { ...job.state, ...patch.state }; + + job.updatedAtMs = now; + if (job.enabled) { + job.state.nextRunAtMs = this.computeJobNextRunAtMs(job, now); + } else { + job.state.nextRunAtMs = undefined; + job.state.runningAtMs = undefined; + } + await this.persist(); + this.armTimer(); + this.emit({ + jobId: id, + action: "updated", + nextRunAtMs: job.state.nextRunAtMs, + }); + return job; + }); + } + + async remove(id: string) { + return await this.locked(async () => { + await this.ensureLoaded(); + const before = this.store?.jobs.length ?? 0; + if (!this.store) return { ok: false, removed: false }; + this.store.jobs = this.store.jobs.filter((j) => j.id !== id); + const removed = (this.store.jobs.length ?? 0) !== before; + await this.persist(); + this.armTimer(); + if (removed) this.emit({ jobId: id, action: "removed" }); + return { ok: true, removed }; + }); + } + + async run(id: string, mode?: "due" | "force") { + return await this.locked(async () => { + await this.ensureLoaded(); + const job = this.findJobOrThrow(id); + const now = this.deps.nowMs(); + const due = + mode === "force" || + (job.enabled && + typeof job.state.nextRunAtMs === "number" && + now >= job.state.nextRunAtMs); + if (!due) return { ok: true, ran: false, reason: "not-due" as const }; + await this.executeJob(job, now, { forced: mode === "force" }); + await this.persist(); + this.armTimer(); + return { ok: true, ran: true }; + }); + } + + wake(opts: { mode: "now" | "next-heartbeat"; text: string }) { + const text = opts.text.trim(); + if (!text) return { ok: false }; + this.deps.enqueueSystemEvent(text); + if (opts.mode === "now") { + this.deps.requestReplyHeartbeatNow({ reason: "wake" }); + } + return { ok: true }; + } + + private async locked(fn: () => Promise): Promise { + const next = this.op.then(fn, fn); + // Keep the chain alive even when the operation fails. + this.op = next.then( + () => undefined, + () => undefined, + ); + return (await next) as T; + } + + private async ensureLoaded() { + if (this.store) return; + const loaded = await loadCronStore(this.deps.storePath); + this.store = { version: 1, jobs: loaded.jobs ?? [] }; + } + + private async persist() { + if (!this.store) return; + await saveCronStore(this.deps.storePath, this.store); + } + + private findJobOrThrow(id: string) { + const job = this.store?.jobs.find((j) => j.id === id); + if (!job) throw new Error(`unknown cron job id: ${id}`); + return job; + } + + private computeJobNextRunAtMs(job: CronJob, nowMs: number) { + if (!job.enabled) return undefined; + if (job.schedule.kind === "at") { + // One-shot jobs stay due until they successfully finish. + if (job.state.lastStatus === "ok" && job.state.lastRunAtMs) + return undefined; + return job.schedule.atMs; + } + return computeNextRunAtMs(job.schedule, nowMs); + } + + private recomputeNextRuns() { + if (!this.store) return; + const now = this.deps.nowMs(); + for (const job of this.store.jobs) { + if (!job.state) job.state = {}; + if (!job.enabled) { + job.state.nextRunAtMs = undefined; + job.state.runningAtMs = undefined; + continue; + } + const runningAt = job.state.runningAtMs; + if (typeof runningAt === "number" && now - runningAt > STUCK_RUN_MS) { + this.deps.log.warn( + { jobId: job.id, runningAtMs: runningAt }, + "cron: clearing stuck running marker", + ); + job.state.runningAtMs = undefined; + } + job.state.nextRunAtMs = this.computeJobNextRunAtMs(job, now); + } + } + + private nextWakeAtMs() { + const jobs = this.store?.jobs ?? []; + const enabled = jobs.filter( + (j) => j.enabled && typeof j.state.nextRunAtMs === "number", + ); + if (enabled.length === 0) return undefined; + return enabled.reduce( + (min, j) => Math.min(min, j.state.nextRunAtMs as number), + enabled[0].state.nextRunAtMs as number, + ); + } + + private armTimer() { + if (this.timer) clearTimeout(this.timer); + this.timer = null; + if (!this.deps.cronEnabled) return; + const nextAt = this.nextWakeAtMs(); + if (!nextAt) return; + const delay = Math.max(nextAt - this.deps.nowMs(), 0); + this.timer = setTimeout(() => { + void this.onTimer().catch((err) => { + this.deps.log.error({ err: String(err) }, "cron: timer tick failed"); + }); + }, delay); + this.timer.unref?.(); + } + + private async onTimer() { + if (this.running) return; + this.running = true; + try { + await this.locked(async () => { + await this.ensureLoaded(); + await this.runDueJobs(); + await this.persist(); + this.armTimer(); + }); + } finally { + this.running = false; + } + } + + private async runDueJobs() { + if (!this.store) return; + const now = this.deps.nowMs(); + const due = this.store.jobs.filter((j) => { + if (!j.enabled) return false; + if (typeof j.state.runningAtMs === "number") return false; + const next = j.state.nextRunAtMs; + return typeof next === "number" && now >= next; + }); + for (const job of due) { + await this.executeJob(job, now, { forced: false }); + } + } + + private async executeJob( + job: CronJob, + nowMs: number, + opts: { forced: boolean }, + ) { + const startedAt = this.deps.nowMs(); + job.state.runningAtMs = startedAt; + job.state.lastError = undefined; + this.emit({ jobId: job.id, action: "started", runAtMs: startedAt }); + + const finish = async ( + status: "ok" | "error" | "skipped", + err?: string, + summary?: string, + ) => { + const endedAt = this.deps.nowMs(); + job.state.runningAtMs = undefined; + job.state.lastRunAtMs = startedAt; + job.state.lastStatus = status; + job.state.lastDurationMs = Math.max(0, endedAt - startedAt); + job.state.lastError = err; + + if (job.schedule.kind === "at" && status === "ok") { + // One-shot job completed successfully; disable it. + job.enabled = false; + job.state.nextRunAtMs = undefined; + } else if (job.enabled) { + job.state.nextRunAtMs = this.computeJobNextRunAtMs(job, endedAt); + } else { + job.state.nextRunAtMs = undefined; + } + + this.emit({ + jobId: job.id, + action: "finished", + status, + error: err, + runAtMs: startedAt, + durationMs: job.state.lastDurationMs, + nextRunAtMs: job.state.nextRunAtMs, + }); + + if (summary && job.isolation?.postToMain) { + const prefix = job.isolation.postToMainPrefix?.trim() || "Cron"; + this.deps.enqueueSystemEvent(`${prefix}: ${summary}`); + if (job.wakeMode === "now") { + this.deps.requestReplyHeartbeatNow({ reason: `cron:${job.id}:post` }); + } + } + }; + + try { + if (job.sessionTarget === "main") { + const text = normalizePayloadToSystemText(job.payload); + this.deps.enqueueSystemEvent(text); + if (job.wakeMode === "now") { + this.deps.requestReplyHeartbeatNow({ reason: `cron:${job.id}` }); + } + await finish("ok"); + return; + } + + if (job.payload.kind !== "agentTurn") { + await finish("skipped", "isolated job requires payload.kind=agentTurn"); + return; + } + + const res = await this.deps.runIsolatedAgentJob({ + job, + message: job.payload.message, + }); + if (res.status === "ok") await finish("ok", undefined, res.summary); + else if (res.status === "skipped") + await finish("skipped", undefined, res.summary); + else await finish("error", res.summary ?? "cron job failed"); + } catch (err) { + await finish("error", String(err)); + } finally { + job.updatedAtMs = nowMs; + if (!opts.forced && job.enabled) { + // Keep nextRunAtMs in sync in case the schedule advanced during a long run. + job.state.nextRunAtMs = this.computeJobNextRunAtMs( + job, + this.deps.nowMs(), + ); + } + } + } + + private emit(evt: CronEvent) { + try { + this.deps.onEvent?.(evt); + } catch { + /* ignore */ + } + } +} diff --git a/src/cron/store.ts b/src/cron/store.ts new file mode 100644 index 000000000..eda8dd886 --- /dev/null +++ b/src/cron/store.ts @@ -0,0 +1,52 @@ +import fs from "node:fs"; +import os from "node:os"; +import path from "node:path"; + +import JSON5 from "json5"; +import { CONFIG_DIR } from "../utils.js"; +import type { CronStoreFile } from "./types.js"; + +export const LEGACY_CRON_STORE_PATH = path.join( + CONFIG_DIR, + "cron", + "jobs.json", +); +export const DEFAULT_CRON_STORE_PATH = path.join(CONFIG_DIR, "cron.json"); + +export function resolveCronStorePath(storePath?: string) { + if (storePath?.trim()) { + const raw = storePath.trim(); + if (raw.startsWith("~")) + return path.resolve(raw.replace("~", os.homedir())); + return path.resolve(raw); + } + if (fs.existsSync(LEGACY_CRON_STORE_PATH)) return LEGACY_CRON_STORE_PATH; + return DEFAULT_CRON_STORE_PATH; +} + +export async function loadCronStore(storePath: string): Promise { + try { + const raw = await fs.promises.readFile(storePath, "utf-8"); + const parsed = JSON5.parse(raw) as Partial | null; + const jobs = Array.isArray(parsed?.jobs) ? (parsed?.jobs as never[]) : []; + return { + version: 1, + jobs: jobs.filter(Boolean) as never as CronStoreFile["jobs"], + }; + } catch { + return { version: 1, jobs: [] }; + } +} + +export async function saveCronStore(storePath: string, store: CronStoreFile) { + await fs.promises.mkdir(path.dirname(storePath), { recursive: true }); + const tmp = `${storePath}.${process.pid}.${Math.random().toString(16).slice(2)}.tmp`; + const json = JSON.stringify(store, null, 2); + await fs.promises.writeFile(tmp, json, "utf-8"); + await fs.promises.rename(tmp, storePath); + try { + await fs.promises.copyFile(storePath, `${storePath}.bak`); + } catch { + // best-effort + } +} diff --git a/src/cron/types.ts b/src/cron/types.ts new file mode 100644 index 000000000..4786c7ba1 --- /dev/null +++ b/src/cron/types.ts @@ -0,0 +1,64 @@ +export type CronSchedule = + | { kind: "at"; atMs: number } + | { kind: "every"; everyMs: number; anchorMs?: number } + | { kind: "cron"; expr: string; tz?: string }; + +export type CronSessionTarget = "main" | "isolated"; +export type CronWakeMode = "next-heartbeat" | "now"; + +export type CronPayload = + | { kind: "systemEvent"; text: string } + | { + kind: "agentTurn"; + message: string; + thinking?: string; + timeoutSeconds?: number; + deliver?: boolean; + channel?: "last" | "whatsapp" | "telegram"; + to?: string; + bestEffortDeliver?: boolean; + }; + +export type CronIsolation = { + postToMain?: boolean; + postToMainPrefix?: string; +}; + +export type CronJobState = { + nextRunAtMs?: number; + runningAtMs?: number; + lastRunAtMs?: number; + lastStatus?: "ok" | "error" | "skipped"; + lastError?: string; + lastDurationMs?: number; +}; + +export type CronJob = { + id: string; + name?: string; + enabled: boolean; + createdAtMs: number; + updatedAtMs: number; + schedule: CronSchedule; + sessionTarget: CronSessionTarget; + wakeMode: CronWakeMode; + payload: CronPayload; + isolation?: CronIsolation; + state: CronJobState; +}; + +export type CronStoreFile = { + version: 1; + jobs: CronJob[]; +}; + +export type CronJobCreate = Omit< + CronJob, + "id" | "createdAtMs" | "updatedAtMs" | "state" +> & { + state?: Partial; +}; + +export type CronJobPatch = Partial< + Omit & { state: CronJobState } +>; diff --git a/src/gateway/protocol/index.ts b/src/gateway/protocol/index.ts index 23d9e8092..0477855a0 100644 --- a/src/gateway/protocol/index.ts +++ b/src/gateway/protocol/index.ts @@ -9,6 +9,21 @@ import { ChatSendParamsSchema, type ConnectParams, ConnectParamsSchema, + type CronAddParams, + CronAddParamsSchema, + type CronJob, + CronJobSchema, + type CronListParams, + CronListParamsSchema, + type CronRemoveParams, + CronRemoveParamsSchema, + type CronRunLogEntry, + type CronRunParams, + CronRunParamsSchema, + type CronRunsParams, + CronRunsParamsSchema, + type CronUpdateParams, + CronUpdateParamsSchema, ErrorCodes, type ErrorShape, ErrorShapeSchema, @@ -36,6 +51,8 @@ import { StateVersionSchema, type TickEvent, TickEventSchema, + type WakeParams, + WakeParamsSchema, } from "./schema.js"; const ajv = new ( @@ -54,6 +71,21 @@ export const validateRequestFrame = ajv.compile(RequestFrameSchema); export const validateSendParams = ajv.compile(SendParamsSchema); export const validateAgentParams = ajv.compile(AgentParamsSchema); +export const validateWakeParams = ajv.compile(WakeParamsSchema); +export const validateCronListParams = + ajv.compile(CronListParamsSchema); +export const validateCronAddParams = + ajv.compile(CronAddParamsSchema); +export const validateCronUpdateParams = ajv.compile( + CronUpdateParamsSchema, +); +export const validateCronRemoveParams = ajv.compile( + CronRemoveParamsSchema, +); +export const validateCronRunParams = + ajv.compile(CronRunParamsSchema); +export const validateCronRunsParams = + ajv.compile(CronRunsParamsSchema); export const validateChatHistoryParams = ajv.compile(ChatHistoryParamsSchema); export const validateChatSendParams = ajv.compile(ChatSendParamsSchema); export const validateChatEvent = ajv.compile(ChatEventSchema); @@ -80,6 +112,14 @@ export { ChatEventSchema, SendParamsSchema, AgentParamsSchema, + WakeParamsSchema, + CronJobSchema, + CronListParamsSchema, + CronAddParamsSchema, + CronUpdateParamsSchema, + CronRemoveParamsSchema, + CronRunParamsSchema, + CronRunsParamsSchema, ChatHistoryParamsSchema, ChatSendParamsSchema, TickEventSchema, @@ -105,4 +145,13 @@ export type { ChatEvent, TickEvent, ShutdownEvent, + WakeParams, + CronJob, + CronListParams, + CronAddParams, + CronUpdateParams, + CronRemoveParams, + CronRunParams, + CronRunsParams, + CronRunLogEntry, }; diff --git a/src/gateway/protocol/schema.ts b/src/gateway/protocol/schema.ts index fc4bf018f..a6d8a55c2 100644 --- a/src/gateway/protocol/schema.ts +++ b/src/gateway/protocol/schema.ts @@ -203,6 +203,185 @@ export const AgentParamsSchema = Type.Object( { additionalProperties: false }, ); +export const WakeParamsSchema = Type.Object( + { + mode: Type.Union([Type.Literal("now"), Type.Literal("next-heartbeat")]), + text: NonEmptyString, + }, + { additionalProperties: false }, +); + +export const CronScheduleSchema = Type.Union([ + Type.Object( + { + kind: Type.Literal("at"), + atMs: Type.Integer({ minimum: 0 }), + }, + { additionalProperties: false }, + ), + Type.Object( + { + kind: Type.Literal("every"), + everyMs: Type.Integer({ minimum: 1 }), + anchorMs: Type.Optional(Type.Integer({ minimum: 0 })), + }, + { additionalProperties: false }, + ), + Type.Object( + { + kind: Type.Literal("cron"), + expr: NonEmptyString, + tz: Type.Optional(Type.String()), + }, + { additionalProperties: false }, + ), +]); + +export const CronPayloadSchema = Type.Union([ + Type.Object( + { + kind: Type.Literal("systemEvent"), + text: NonEmptyString, + }, + { additionalProperties: false }, + ), + Type.Object( + { + kind: Type.Literal("agentTurn"), + message: NonEmptyString, + thinking: Type.Optional(Type.String()), + timeoutSeconds: Type.Optional(Type.Integer({ minimum: 1 })), + deliver: Type.Optional(Type.Boolean()), + channel: Type.Optional( + Type.Union([ + Type.Literal("last"), + Type.Literal("whatsapp"), + Type.Literal("telegram"), + ]), + ), + to: Type.Optional(Type.String()), + bestEffortDeliver: Type.Optional(Type.Boolean()), + }, + { additionalProperties: false }, + ), +]); + +export const CronIsolationSchema = Type.Object( + { + postToMain: Type.Optional(Type.Boolean()), + postToMainPrefix: Type.Optional(Type.String()), + }, + { additionalProperties: false }, +); + +export const CronJobStateSchema = Type.Object( + { + nextRunAtMs: Type.Optional(Type.Integer({ minimum: 0 })), + runningAtMs: Type.Optional(Type.Integer({ minimum: 0 })), + lastRunAtMs: Type.Optional(Type.Integer({ minimum: 0 })), + lastStatus: Type.Optional( + Type.Union([ + Type.Literal("ok"), + Type.Literal("error"), + Type.Literal("skipped"), + ]), + ), + lastError: Type.Optional(Type.String()), + lastDurationMs: Type.Optional(Type.Integer({ minimum: 0 })), + }, + { additionalProperties: false }, +); + +export const CronJobSchema = Type.Object( + { + id: NonEmptyString, + name: Type.Optional(Type.String()), + enabled: Type.Boolean(), + createdAtMs: Type.Integer({ minimum: 0 }), + updatedAtMs: Type.Integer({ minimum: 0 }), + schedule: CronScheduleSchema, + sessionTarget: Type.Union([Type.Literal("main"), Type.Literal("isolated")]), + wakeMode: Type.Union([Type.Literal("next-heartbeat"), Type.Literal("now")]), + payload: CronPayloadSchema, + isolation: Type.Optional(CronIsolationSchema), + state: CronJobStateSchema, + }, + { additionalProperties: false }, +); + +export const CronListParamsSchema = Type.Object( + { + includeDisabled: Type.Optional(Type.Boolean()), + }, + { additionalProperties: false }, +); + +export const CronAddParamsSchema = Type.Object( + { + name: Type.Optional(Type.String()), + enabled: Type.Optional(Type.Boolean()), + schedule: CronScheduleSchema, + sessionTarget: Type.Union([Type.Literal("main"), Type.Literal("isolated")]), + wakeMode: Type.Union([Type.Literal("next-heartbeat"), Type.Literal("now")]), + payload: CronPayloadSchema, + isolation: Type.Optional(CronIsolationSchema), + }, + { additionalProperties: false }, +); + +export const CronUpdateParamsSchema = Type.Object( + { + id: NonEmptyString, + patch: Type.Partial(CronAddParamsSchema), + }, + { additionalProperties: false }, +); + +export const CronRemoveParamsSchema = Type.Object( + { + id: NonEmptyString, + }, + { additionalProperties: false }, +); + +export const CronRunParamsSchema = Type.Object( + { + id: NonEmptyString, + mode: Type.Optional( + Type.Union([Type.Literal("due"), Type.Literal("force")]), + ), + }, + { additionalProperties: false }, +); + +export const CronRunsParamsSchema = Type.Object( + { + id: Type.Optional(NonEmptyString), + limit: Type.Optional(Type.Integer({ minimum: 1, maximum: 5000 })), + }, + { additionalProperties: false }, +); + +export const CronRunLogEntrySchema = Type.Object( + { + ts: Type.Integer({ minimum: 0 }), + jobId: NonEmptyString, + action: Type.Literal("finished"), + status: Type.Optional( + Type.Union([ + Type.Literal("ok"), + Type.Literal("error"), + Type.Literal("skipped"), + ]), + ), + error: Type.Optional(Type.String()), + runAtMs: Type.Optional(Type.Integer({ minimum: 0 })), + durationMs: Type.Optional(Type.Integer({ minimum: 0 })), + nextRunAtMs: Type.Optional(Type.Integer({ minimum: 0 })), + }, + { additionalProperties: false }, +); + // WebChat/WebSocket-native chat methods export const ChatHistoryParamsSchema = Type.Object( { @@ -256,6 +435,15 @@ export const ProtocolSchemas: Record = { AgentEvent: AgentEventSchema, SendParams: SendParamsSchema, AgentParams: AgentParamsSchema, + WakeParams: WakeParamsSchema, + CronJob: CronJobSchema, + CronListParams: CronListParamsSchema, + CronAddParams: CronAddParamsSchema, + CronUpdateParams: CronUpdateParamsSchema, + CronRemoveParams: CronRemoveParamsSchema, + CronRunParams: CronRunParamsSchema, + CronRunsParams: CronRunsParamsSchema, + CronRunLogEntry: CronRunLogEntrySchema, ChatHistoryParams: ChatHistoryParamsSchema, ChatSendParams: ChatSendParamsSchema, ChatEvent: ChatEventSchema, @@ -276,6 +464,15 @@ export type PresenceEntry = Static; export type ErrorShape = Static; export type StateVersion = Static; export type AgentEvent = Static; +export type WakeParams = Static; +export type CronJob = Static; +export type CronListParams = Static; +export type CronAddParams = Static; +export type CronUpdateParams = Static; +export type CronRemoveParams = Static; +export type CronRunParams = Static; +export type CronRunsParams = Static; +export type CronRunLogEntry = Static; export type ChatEvent = Static; export type TickEvent = Static; export type ShutdownEvent = Static; diff --git a/src/gateway/server.test.ts b/src/gateway/server.test.ts index 1c04ea864..a1eb09668 100644 --- a/src/gateway/server.test.ts +++ b/src/gateway/server.test.ts @@ -14,6 +14,7 @@ import { startGatewayServer } from "./server.js"; let testSessionStorePath: string | undefined; let testAllowFrom: string[] | undefined; +let testCronStorePath: string | undefined; vi.mock("../config/config.js", () => ({ loadConfig: () => ({ inbound: { @@ -24,6 +25,7 @@ vi.mock("../config/config.js", () => ({ session: { mainKey: "main", store: testSessionStorePath }, }, }, + cron: { enabled: false, store: testCronStorePath }, }), })); @@ -173,6 +175,273 @@ async function connectOk( } describe("gateway server", () => { + test("supports cron.add and cron.list", async () => { + const dir = await fs.mkdtemp(path.join(os.tmpdir(), "clawdis-gw-cron-")); + testCronStorePath = path.join(dir, "cron.json"); + await fs.writeFile( + testCronStorePath, + JSON.stringify({ version: 1, jobs: [] }), + ); + + const { server, ws } = await startServerWithClient(); + await connectOk(ws); + + ws.send( + JSON.stringify({ + type: "req", + id: "cron-add-1", + method: "cron.add", + params: { + name: "daily", + enabled: true, + schedule: { kind: "every", everyMs: 60_000 }, + sessionTarget: "main", + wakeMode: "next-heartbeat", + payload: { kind: "systemEvent", text: "hello" }, + }, + }), + ); + const addRes = await onceMessage<{ + type: "res"; + ok: boolean; + payload?: unknown; + }>(ws, (o) => o.type === "res" && o.id === "cron-add-1"); + expect(addRes.ok).toBe(true); + expect(typeof (addRes.payload as { id?: unknown } | null)?.id).toBe( + "string", + ); + + ws.send( + JSON.stringify({ + type: "req", + id: "cron-list-1", + method: "cron.list", + params: { includeDisabled: true }, + }), + ); + const listRes = await onceMessage<{ + type: "res"; + ok: boolean; + payload?: unknown; + }>(ws, (o) => o.type === "res" && o.id === "cron-list-1"); + expect(listRes.ok).toBe(true); + const jobs = (listRes.payload as { jobs?: unknown } | null)?.jobs; + expect(Array.isArray(jobs)).toBe(true); + expect((jobs as unknown[]).length).toBe(1); + expect(((jobs as Array<{ name?: unknown }>)[0]?.name as string) ?? "").toBe( + "daily", + ); + + ws.close(); + await server.close(); + await fs.rm(dir, { recursive: true, force: true }); + testCronStorePath = undefined; + }); + + test("writes cron run history for flat store paths", async () => { + const dir = await fs.mkdtemp( + path.join(os.tmpdir(), "clawdis-gw-cron-log-"), + ); + testCronStorePath = path.join(dir, "cron.json"); + await fs.writeFile( + testCronStorePath, + JSON.stringify({ version: 1, jobs: [] }), + ); + + const { server, ws } = await startServerWithClient(); + await connectOk(ws); + + const atMs = Date.now() - 1; + ws.send( + JSON.stringify({ + type: "req", + id: "cron-add-log-1", + method: "cron.add", + params: { + enabled: true, + schedule: { kind: "at", atMs }, + sessionTarget: "main", + wakeMode: "next-heartbeat", + payload: { kind: "systemEvent", text: "hello" }, + }, + }), + ); + + const addRes = await onceMessage<{ + type: "res"; + ok: boolean; + payload?: unknown; + }>(ws, (o) => o.type === "res" && o.id === "cron-add-log-1"); + expect(addRes.ok).toBe(true); + const jobId = String((addRes.payload as { id?: unknown } | null)?.id ?? ""); + expect(jobId.length > 0).toBe(true); + + ws.send( + JSON.stringify({ + type: "req", + id: "cron-run-log-1", + method: "cron.run", + params: { id: jobId, mode: "force" }, + }), + ); + const runRes = await onceMessage<{ type: "res"; ok: boolean }>( + ws, + (o) => o.type === "res" && o.id === "cron-run-log-1", + 8000, + ); + expect(runRes.ok).toBe(true); + + const logPath = path.join(dir, "cron.runs.jsonl"); + const waitForLog = async () => { + for (let i = 0; i < 200; i++) { + const raw = await fs.readFile(logPath, "utf-8").catch(() => ""); + if (raw.trim().length > 0) return raw; + await new Promise((r) => setTimeout(r, 10)); + } + throw new Error("timeout waiting for cron run log"); + }; + + const raw = await waitForLog(); + const lines = raw + .split("\n") + .map((l) => l.trim()) + .filter(Boolean); + expect(lines.length).toBeGreaterThan(0); + const last = JSON.parse(lines.at(-1) ?? "{}") as { + jobId?: unknown; + action?: unknown; + status?: unknown; + }; + expect(last.action).toBe("finished"); + expect(last.jobId).toBe(jobId); + expect(last.status).toBe("ok"); + + ws.send( + JSON.stringify({ + type: "req", + id: "cron-runs-1", + method: "cron.runs", + params: { id: jobId, limit: 50 }, + }), + ); + const runsRes = await onceMessage<{ + type: "res"; + ok: boolean; + payload?: unknown; + }>(ws, (o) => o.type === "res" && o.id === "cron-runs-1", 8000); + expect(runsRes.ok).toBe(true); + const entries = (runsRes.payload as { entries?: unknown } | null)?.entries; + expect(Array.isArray(entries)).toBe(true); + expect((entries as Array<{ jobId?: unknown }>).at(-1)?.jobId).toBe(jobId); + + ws.close(); + await server.close(); + await fs.rm(dir, { recursive: true, force: true }); + testCronStorePath = undefined; + }); + + test("writes cron run history to per-job runs/ when store is jobs.json", async () => { + const dir = await fs.mkdtemp( + path.join(os.tmpdir(), "clawdis-gw-cron-log-jobs-"), + ); + const cronDir = path.join(dir, "cron"); + testCronStorePath = path.join(cronDir, "jobs.json"); + await fs.mkdir(cronDir, { recursive: true }); + await fs.writeFile( + testCronStorePath, + JSON.stringify({ version: 1, jobs: [] }), + ); + + const { server, ws } = await startServerWithClient(); + await connectOk(ws); + + const atMs = Date.now() - 1; + ws.send( + JSON.stringify({ + type: "req", + id: "cron-add-log-2", + method: "cron.add", + params: { + enabled: true, + schedule: { kind: "at", atMs }, + sessionTarget: "main", + wakeMode: "next-heartbeat", + payload: { kind: "systemEvent", text: "hello" }, + }, + }), + ); + + const addRes = await onceMessage<{ + type: "res"; + ok: boolean; + payload?: unknown; + }>(ws, (o) => o.type === "res" && o.id === "cron-add-log-2"); + expect(addRes.ok).toBe(true); + const jobId = String((addRes.payload as { id?: unknown } | null)?.id ?? ""); + expect(jobId.length > 0).toBe(true); + + ws.send( + JSON.stringify({ + type: "req", + id: "cron-run-log-2", + method: "cron.run", + params: { id: jobId, mode: "force" }, + }), + ); + const runRes = await onceMessage<{ type: "res"; ok: boolean }>( + ws, + (o) => o.type === "res" && o.id === "cron-run-log-2", + 8000, + ); + expect(runRes.ok).toBe(true); + + const logPath = path.join(cronDir, "runs", `${jobId}.jsonl`); + const waitForLog = async () => { + for (let i = 0; i < 200; i++) { + const raw = await fs.readFile(logPath, "utf-8").catch(() => ""); + if (raw.trim().length > 0) return raw; + await new Promise((r) => setTimeout(r, 10)); + } + throw new Error("timeout waiting for per-job cron run log"); + }; + + const raw = await waitForLog(); + const line = raw + .split("\n") + .map((l) => l.trim()) + .filter(Boolean) + .at(-1); + const last = JSON.parse(line ?? "{}") as { + jobId?: unknown; + action?: unknown; + }; + expect(last.action).toBe("finished"); + expect(last.jobId).toBe(jobId); + + ws.send( + JSON.stringify({ + type: "req", + id: "cron-runs-2", + method: "cron.runs", + params: { id: jobId, limit: 20 }, + }), + ); + const runsRes = await onceMessage<{ + type: "res"; + ok: boolean; + payload?: unknown; + }>(ws, (o) => o.type === "res" && o.id === "cron-runs-2", 8000); + expect(runsRes.ok).toBe(true); + const entries = (runsRes.payload as { entries?: unknown } | null)?.entries; + expect(Array.isArray(entries)).toBe(true); + expect((entries as Array<{ jobId?: unknown }>).at(-1)?.jobId).toBe(jobId); + + ws.close(); + await server.close(); + await fs.rm(dir, { recursive: true, force: true }); + testCronStorePath = undefined; + }); + test("broadcasts heartbeat events and serves last-heartbeat", async () => { type HeartbeatPayload = { ts: number; @@ -196,16 +465,7 @@ describe("gateway server", () => { }; const { server, ws } = await startServerWithClient(); - ws.send( - JSON.stringify({ - type: "hello", - minProtocol: 1, - maxProtocol: 1, - client: { name: "test", version: "1", platform: "test", mode: "test" }, - caps: [], - }), - ); - await onceMessage(ws, (o) => o.type === "hello-ok"); + await connectOk(ws); const waitHeartbeat = onceMessage( ws, @@ -631,13 +891,18 @@ describe("gateway server", () => { await connectOk(ws); // Emit a fake agent event directly through the shared emitter. + const runId = randomUUID(); const evtPromise = onceMessage( ws, - (o) => o.type === "event" && o.event === "agent", + (o) => + o.type === "event" && + o.event === "agent" && + o.payload?.runId === runId && + o.payload?.stream === "job", ); - emitAgentEvent({ runId: "run-1", stream: "job", data: { msg: "hi" } }); + emitAgentEvent({ runId, stream: "job", data: { msg: "hi" } }); const evt = await evtPromise; - expect(evt.payload.runId).toBe("run-1"); + expect(evt.payload.runId).toBe(runId); expect(typeof evt.seq).toBe("number"); expect(evt.payload.data.msg).toBe("hi"); diff --git a/src/gateway/server.ts b/src/gateway/server.ts index e603b7ac0..4c60d1ccc 100644 --- a/src/gateway/server.ts +++ b/src/gateway/server.ts @@ -19,6 +19,15 @@ import { type SessionEntry, saveSessionStore, } from "../config/sessions.js"; +import { runCronIsolatedAgentTurn } from "../cron/isolated-agent.js"; +import { + appendCronRunLog, + readCronRunLogEntries, + resolveCronRunLogPath, +} from "../cron/run-log.js"; +import { CronService } from "../cron/service.js"; +import { resolveCronStorePath } from "../cron/store.js"; +import type { CronJobCreate, CronJobPatch } from "../cron/types.js"; import { isVerbose } from "../globals.js"; import { onAgentEvent } from "../infra/agent-events.js"; import { GatewayLockError } from "../infra/gateway-lock.js"; @@ -33,7 +42,12 @@ import { upsertPresence, } from "../infra/system-presence.js"; import { logError, logInfo, logWarn } from "../logger.js"; -import { getLogger, getResolvedLoggerSettings } from "../logging.js"; +import { + getChildLogger, + getLogger, + getResolvedLoggerSettings, +} from "../logging.js"; +import { setCommandLaneConcurrency } from "../process/command-queue.js"; import { monitorWebProvider, webAuthExists } from "../providers/web/index.js"; import { defaultRuntime } from "../runtime.js"; import { monitorTelegramProvider } from "../telegram/monitor.js"; @@ -41,6 +55,7 @@ import { sendMessageTelegram } from "../telegram/send.js"; import { normalizeE164 } from "../utils.js"; import { setHeartbeatsEnabled } from "../web/auto-reply.js"; import { sendMessageWhatsApp } from "../web/outbound.js"; +import { requestReplyHeartbeatNow } from "../web/reply-heartbeat-wake.js"; import { ensureWebChatServerFromConfig } from "../webchat/server.js"; import { buildMessageWithAttachments } from "./chat-attachments.js"; import { @@ -56,8 +71,15 @@ import { validateChatHistoryParams, validateChatSendParams, validateConnectParams, + validateCronAddParams, + validateCronListParams, + validateCronRemoveParams, + validateCronRunParams, + validateCronRunsParams, + validateCronUpdateParams, validateRequestFrame, validateSendParams, + validateWakeParams, } from "./protocol/index.js"; type Client = { @@ -72,6 +94,13 @@ const METHODS = [ "status", "last-heartbeat", "set-heartbeats", + "wake", + "cron.list", + "cron.add", + "cron.update", + "cron.remove", + "cron.run", + "cron.runs", "system-presence", "system-event", "send", @@ -89,6 +118,7 @@ const EVENTS = [ "shutdown", "health", "heartbeat", + "cron", ]; export type GatewayServer = { @@ -322,6 +352,59 @@ export async function startGatewayServer( const providerAbort = new AbortController(); const providerTasks: Array> = []; const clients = new Set(); + const cfgAtStart = loadConfig(); + setCommandLaneConcurrency("cron", cfgAtStart.cron?.maxConcurrentRuns ?? 1); + + const cronStorePath = resolveCronStorePath(cfgAtStart.cron?.store); + const cronLogger = getChildLogger({ + module: "cron", + storePath: cronStorePath, + }); + const cronDeps = createDefaultDeps(); + const cronEnabled = + process.env.CLAWDIS_SKIP_CRON !== "1" && cfgAtStart.cron?.enabled === true; + const cron = new CronService({ + storePath: cronStorePath, + cronEnabled, + enqueueSystemEvent, + requestReplyHeartbeatNow, + runIsolatedAgentJob: async ({ job, message }) => { + const cfg = loadConfig(); + return await runCronIsolatedAgentTurn({ + cfg, + deps: cronDeps, + job, + message, + sessionKey: `cron:${job.id}`, + lane: "cron", + }); + }, + log: cronLogger, + onEvent: (evt) => { + broadcast("cron", evt, { dropIfSlow: true }); + if (evt.action === "finished") { + const logPath = resolveCronRunLogPath({ + storePath: cronStorePath, + jobId: evt.jobId, + }); + void appendCronRunLog(logPath, { + ts: Date.now(), + jobId: evt.jobId, + action: "finished", + status: evt.status, + error: evt.error, + runAtMs: evt.runAtMs, + durationMs: evt.durationMs, + nextRunAtMs: evt.nextRunAtMs, + }).catch((err) => { + cronLogger.warn( + { err: String(err), logPath }, + "cron: run log append failed", + ); + }); + } + }, + }); const startProviders = async () => { const cfg = loadConfig(); @@ -513,6 +596,10 @@ export async function startGatewayServer( broadcast("heartbeat", evt, { dropIfSlow: true }); }); + void cron + .start() + .catch((err) => logError(`cron failed to start: ${String(err)}`)); + wss.on("connection", (socket) => { let client: Client | null = null; let closed = false; @@ -988,6 +1075,157 @@ export async function startGatewayServer( } break; } + case "wake": { + const params = (req.params ?? {}) as Record; + if (!validateWakeParams(params)) { + respond( + false, + undefined, + errorShape( + ErrorCodes.INVALID_REQUEST, + `invalid wake params: ${formatValidationErrors(validateWakeParams.errors)}`, + ), + ); + break; + } + const p = params as { + mode: "now" | "next-heartbeat"; + text: string; + }; + const result = cron.wake({ mode: p.mode, text: p.text }); + respond(true, result, undefined); + break; + } + case "cron.list": { + const params = (req.params ?? {}) as Record; + if (!validateCronListParams(params)) { + respond( + false, + undefined, + errorShape( + ErrorCodes.INVALID_REQUEST, + `invalid cron.list params: ${formatValidationErrors(validateCronListParams.errors)}`, + ), + ); + break; + } + const p = params as { includeDisabled?: boolean }; + const jobs = await cron.list({ + includeDisabled: p.includeDisabled, + }); + respond(true, { jobs }, undefined); + break; + } + case "cron.add": { + const params = (req.params ?? {}) as Record; + if (!validateCronAddParams(params)) { + respond( + false, + undefined, + errorShape( + ErrorCodes.INVALID_REQUEST, + `invalid cron.add params: ${formatValidationErrors(validateCronAddParams.errors)}`, + ), + ); + break; + } + const job = await cron.add(params as unknown as CronJobCreate); + respond(true, job, undefined); + break; + } + case "cron.update": { + const params = (req.params ?? {}) as Record; + if (!validateCronUpdateParams(params)) { + respond( + false, + undefined, + errorShape( + ErrorCodes.INVALID_REQUEST, + `invalid cron.update params: ${formatValidationErrors(validateCronUpdateParams.errors)}`, + ), + ); + break; + } + const p = params as { id: string; patch: Record }; + const job = await cron.update( + p.id, + p.patch as unknown as CronJobPatch, + ); + respond(true, job, undefined); + break; + } + case "cron.remove": { + const params = (req.params ?? {}) as Record; + if (!validateCronRemoveParams(params)) { + respond( + false, + undefined, + errorShape( + ErrorCodes.INVALID_REQUEST, + `invalid cron.remove params: ${formatValidationErrors(validateCronRemoveParams.errors)}`, + ), + ); + break; + } + const p = params as { id: string }; + const result = await cron.remove(p.id); + respond(true, result, undefined); + break; + } + case "cron.run": { + const params = (req.params ?? {}) as Record; + if (!validateCronRunParams(params)) { + respond( + false, + undefined, + errorShape( + ErrorCodes.INVALID_REQUEST, + `invalid cron.run params: ${formatValidationErrors(validateCronRunParams.errors)}`, + ), + ); + break; + } + const p = params as { id: string; mode?: "due" | "force" }; + const result = await cron.run(p.id, p.mode); + respond(true, result, undefined); + break; + } + case "cron.runs": { + const params = (req.params ?? {}) as Record; + if (!validateCronRunsParams(params)) { + respond( + false, + undefined, + errorShape( + ErrorCodes.INVALID_REQUEST, + `invalid cron.runs params: ${formatValidationErrors(validateCronRunsParams.errors)}`, + ), + ); + break; + } + const p = params as { id?: string; limit?: number }; + if (!p.id && cronStorePath.endsWith(`${path.sep}jobs.json`)) { + respond( + false, + undefined, + errorShape( + ErrorCodes.INVALID_REQUEST, + "cron.runs requires id when using jobs.json store layout", + ), + ); + break; + } + const logPath = resolveCronRunLogPath({ + storePath: cronStorePath, + jobId: p.id ?? "all", + }); + const entries = await readCronRunLogEntries(logPath, { + limit: p.limit, + jobId: p.id, + }); + respond(true, { entries }, undefined); + break; + } case "status": { const status = await getStatusSummary(); respond(true, status, undefined); @@ -1426,6 +1664,7 @@ export async function startGatewayServer( return { close: async () => { providerAbort.abort(); + cron.stop(); broadcast("shutdown", { reason: "gateway stopping", restartExpectedMs: null, diff --git a/src/process/command-queue.ts b/src/process/command-queue.ts index 4119d6e3e..81066c01e 100644 --- a/src/process/command-queue.ts +++ b/src/process/command-queue.ts @@ -1,5 +1,7 @@ // Minimal in-process queue to serialize command executions. -// Ensures only one command runs at a time across webhook, poller, and web inbox flows. +// Default lane ("main") preserves the existing behavior. Additional lanes allow +// low-risk parallelism (e.g. cron jobs) without interleaving stdin / logs for +// the main auto-reply workflow. type QueueEntry = { task: () => Promise; @@ -10,26 +12,91 @@ type QueueEntry = { onWait?: (waitMs: number, queuedAhead: number) => void; }; -const queue: QueueEntry[] = []; -let draining = false; +type LaneState = { + lane: string; + queue: QueueEntry[]; + active: number; + maxConcurrent: number; + draining: boolean; +}; -async function drainQueue() { - if (draining) return; - draining = true; - while (queue.length) { - const entry = queue.shift() as QueueEntry; - const waitedMs = Date.now() - entry.enqueuedAt; - if (waitedMs >= entry.warnAfterMs) { - entry.onWait?.(waitedMs, queue.length); +const lanes = new Map(); + +function getLaneState(lane: string): LaneState { + const existing = lanes.get(lane); + if (existing) return existing; + const created: LaneState = { + lane, + queue: [], + active: 0, + maxConcurrent: 1, + draining: false, + }; + lanes.set(lane, created); + return created; +} + +function drainLane(lane: string) { + const state = getLaneState(lane); + if (state.draining) return; + state.draining = true; + + const pump = () => { + while (state.active < state.maxConcurrent && state.queue.length > 0) { + const entry = state.queue.shift() as QueueEntry; + const waitedMs = Date.now() - entry.enqueuedAt; + if (waitedMs >= entry.warnAfterMs) { + entry.onWait?.(waitedMs, state.queue.length); + } + state.active += 1; + void (async () => { + try { + const result = await entry.task(); + state.active -= 1; + pump(); + entry.resolve(result); + } catch (err) { + state.active -= 1; + pump(); + entry.reject(err); + } + })(); } - try { - const result = await entry.task(); - entry.resolve(result); - } catch (err) { - entry.reject(err); - } - } - draining = false; + state.draining = false; + }; + + pump(); +} + +export function setCommandLaneConcurrency(lane: string, maxConcurrent: number) { + const cleaned = lane.trim() || "main"; + const state = getLaneState(cleaned); + state.maxConcurrent = Math.max(1, Math.floor(maxConcurrent)); + drainLane(cleaned); +} + +export function enqueueCommandInLane( + lane: string, + task: () => Promise, + opts?: { + warnAfterMs?: number; + onWait?: (waitMs: number, queuedAhead: number) => void; + }, +): Promise { + const cleaned = lane.trim() || "main"; + const warnAfterMs = opts?.warnAfterMs ?? 2_000; + const state = getLaneState(cleaned); + return new Promise((resolve, reject) => { + state.queue.push({ + task: () => task(), + resolve: (value) => resolve(value as T), + reject, + enqueuedAt: Date.now(), + warnAfterMs, + onWait: opts?.onWait, + }); + drainLane(cleaned); + }); } export function enqueueCommand( @@ -39,20 +106,19 @@ export function enqueueCommand( onWait?: (waitMs: number, queuedAhead: number) => void; }, ): Promise { - const warnAfterMs = opts?.warnAfterMs ?? 2_000; - return new Promise((resolve, reject) => { - queue.push({ - task: () => task(), - resolve: (value) => resolve(value as T), - reject, - enqueuedAt: Date.now(), - warnAfterMs, - onWait: opts?.onWait, - }); - void drainQueue(); - }); + return enqueueCommandInLane("main", task, opts); } -export function getQueueSize() { - return queue.length + (draining ? 1 : 0); +export function getQueueSize(lane = "main") { + const state = lanes.get(lane); + if (!state) return 0; + return state.queue.length + state.active; +} + +export function getTotalQueueSize() { + let total = 0; + for (const s of lanes.values()) { + total += s.queue.length + s.active; + } + return total; } diff --git a/src/web/auto-reply.test.ts b/src/web/auto-reply.test.ts index 10b8f6186..baf50f6c8 100644 --- a/src/web/auto-reply.test.ts +++ b/src/web/auto-reply.test.ts @@ -115,7 +115,9 @@ describe("heartbeat helpers", () => { describe("resolveHeartbeatRecipients", () => { it("returns the sole session recipient", async () => { const now = Date.now(); - const store = await makeSessionStore({ "+1000": { updatedAt: now } }); + const store = await makeSessionStore({ + main: { updatedAt: now, lastChannel: "whatsapp", lastTo: "+1000" }, + }); const cfg: ClawdisConfig = { inbound: { allowFrom: ["+1999"], @@ -131,8 +133,8 @@ describe("resolveHeartbeatRecipients", () => { it("surfaces ambiguity when multiple sessions exist", async () => { const now = Date.now(); const store = await makeSessionStore({ - "+1000": { updatedAt: now }, - "+2000": { updatedAt: now - 10 }, + main: { updatedAt: now, lastChannel: "whatsapp", lastTo: "+1000" }, + alt: { updatedAt: now - 10, lastChannel: "whatsapp", lastTo: "+2000" }, }); const cfg: ClawdisConfig = { inbound: { @@ -162,7 +164,9 @@ describe("resolveHeartbeatRecipients", () => { it("merges sessions and allowFrom when --all is set", async () => { const now = Date.now(); - const store = await makeSessionStore({ "+1000": { updatedAt: now } }); + const store = await makeSessionStore({ + main: { updatedAt: now, lastChannel: "whatsapp", lastTo: "+1000" }, + }); const cfg: ClawdisConfig = { inbound: { allowFrom: ["+1999"], diff --git a/src/web/auto-reply.ts b/src/web/auto-reply.ts index 4d34a0db8..dbdb9f1d0 100644 --- a/src/web/auto-reply.ts +++ b/src/web/auto-reply.ts @@ -32,6 +32,8 @@ import { resolveReconnectPolicy, sleepWithAbort, } from "./reconnect.js"; +import type { ReplyHeartbeatWakeResult } from "./reply-heartbeat-wake.js"; +import { setReplyHeartbeatWakeHandler } from "./reply-heartbeat-wake.js"; import { formatError, getWebAuthAgeMs, readWebSelfId } from "./session.js"; const WEB_TEXT_LIMIT = 4000; @@ -379,21 +381,24 @@ export async function runWebHeartbeatOnce(opts: { } function getFallbackRecipient(cfg: ReturnType) { - const storePath = resolveStorePath(cfg.inbound?.reply?.session?.store); + const sessionCfg = cfg.inbound?.reply?.session; + const storePath = resolveStorePath(sessionCfg?.store); const store = loadSessionStore(storePath); - const candidates = Object.entries(store).filter(([key]) => key !== "global"); - if (candidates.length === 0) { - const allowFrom = - Array.isArray(cfg.inbound?.allowFrom) && cfg.inbound.allowFrom.length > 0 - ? cfg.inbound.allowFrom.filter((v) => v !== "*") - : []; - if (allowFrom.length === 0) return null; - return allowFrom[0] ? normalizeE164(allowFrom[0]) : null; + const mainKey = (sessionCfg?.mainKey ?? "main").trim() || "main"; + const main = store[mainKey]; + const lastTo = typeof main?.lastTo === "string" ? main.lastTo.trim() : ""; + const lastChannel = main?.lastChannel; + + if (lastChannel === "whatsapp" && lastTo) { + return normalizeE164(lastTo); } - const mostRecent = candidates.sort( - (a, b) => (b[1]?.updatedAt ?? 0) - (a[1]?.updatedAt ?? 0), - )[0]; - return mostRecent ? normalizeE164(mostRecent[0]) : null; + + const allowFrom = + Array.isArray(cfg.inbound?.allowFrom) && cfg.inbound.allowFrom.length > 0 + ? cfg.inbound.allowFrom.filter((v) => v !== "*") + : []; + if (allowFrom.length === 0) return null; + return allowFrom[0] ? normalizeE164(allowFrom[0]) : null; } function getSessionRecipients(cfg: ReturnType) { @@ -402,14 +407,30 @@ function getSessionRecipients(cfg: ReturnType) { if (scope === "global") return []; const storePath = resolveStorePath(cfg.inbound?.reply?.session?.store); const store = loadSessionStore(storePath); - return Object.entries(store) + const isGroupKey = (key: string) => + key.startsWith("group:") || key.includes("@g.us"); + const isCronKey = (key: string) => key.startsWith("cron:"); + + const recipients = Object.entries(store) .filter(([key]) => key !== "global" && key !== "unknown") - .map(([key, entry]) => ({ - to: normalizeE164(key), + .filter(([key]) => !isGroupKey(key) && !isCronKey(key)) + .map(([_, entry]) => ({ + to: + entry?.lastChannel === "whatsapp" && entry?.lastTo + ? normalizeE164(entry.lastTo) + : "", updatedAt: entry?.updatedAt ?? 0, })) - .filter(({ to }) => Boolean(to)) + .filter(({ to }) => to.length > 1) .sort((a, b) => b.updatedAt - a.updatedAt); + + // Dedupe while preserving recency ordering. + const seen = new Set(); + return recipients.filter((r) => { + if (seen.has(r.to)) return false; + seen.add(r.to); + return true; + }); } export function resolveHeartbeatRecipients( @@ -1055,6 +1076,7 @@ export async function monitorWebProvider( const closeListener = async () => { setActiveWebListener(null); + setReplyHeartbeatWakeHandler(null); if (heartbeat) clearInterval(heartbeat); if (replyHeartbeatTimer) clearInterval(replyHeartbeatTimer); if (watchdogTimer) clearInterval(watchdogTimer); @@ -1126,8 +1148,11 @@ export async function monitorWebProvider( }, WATCHDOG_CHECK_MS); } - const runReplyHeartbeat = async () => { - if (!heartbeatsEnabled) return; + const runReplyHeartbeat = async (): Promise => { + const started = Date.now(); + if (!heartbeatsEnabled) { + return { status: "skipped", reason: "disabled" }; + } const queued = getQueueSize(); if (queued > 0) { heartbeatLogger.info( @@ -1135,16 +1160,18 @@ export async function monitorWebProvider( "reply heartbeat skipped", ); console.log(success("heartbeat: skipped (requests in flight)")); - return; + return { status: "skipped", reason: "requests-in-flight" }; + } + if (!replyHeartbeatMinutes) { + return { status: "skipped", reason: "disabled" }; } - if (!replyHeartbeatMinutes) return; if (lastInboundMsg?.chatType === "group") { heartbeatLogger.info( { connectionId, reason: "last-inbound-group" }, "reply heartbeat skipped", ); console.log(success("heartbeat: skipped (group chat)")); - return; + return { status: "skipped", reason: "group-chat" }; } const tickStart = Date.now(); if (!lastInboundMsg) { @@ -1159,7 +1186,7 @@ export async function monitorWebProvider( "reply heartbeat skipped", ); console.log(success("heartbeat: skipped (no recent inbound)")); - return; + return { status: "skipped", reason: "no-recent-inbound" }; } const snapshot = getSessionSnapshot(cfg, fallbackTo, true); if (!snapshot.entry) { @@ -1168,7 +1195,7 @@ export async function monitorWebProvider( "reply heartbeat skipped", ); console.log(success("heartbeat: skipped (no session to resume)")); - return; + return { status: "skipped", reason: "no-session-for-fallback" }; } if (isVerbose()) { heartbeatLogger.info( @@ -1199,7 +1226,7 @@ export async function monitorWebProvider( }, "reply heartbeat sent (fallback session)", ); - return; + return { status: "ran", durationMs: Date.now() - started }; } try { @@ -1252,7 +1279,7 @@ export async function monitorWebProvider( "reply heartbeat skipped", ); console.log(success("heartbeat: ok (empty reply)")); - return; + return { status: "ran", durationMs: Date.now() - started }; } const stripped = stripHeartbeatToken(replyPayload.text); @@ -1270,7 +1297,7 @@ export async function monitorWebProvider( "reply heartbeat skipped", ); console.log(success("heartbeat: ok (HEARTBEAT_OK)")); - return; + return { status: "ran", durationMs: Date.now() - started }; } // Apply response prefix if configured (same as regular messages) @@ -1310,6 +1337,7 @@ export async function monitorWebProvider( }, "reply heartbeat sent", ); + return { status: "ran", durationMs: Date.now() - started }; } catch (err) { const durationMs = Date.now() - tickStart; heartbeatLogger.warn( @@ -1323,9 +1351,12 @@ export async function monitorWebProvider( console.log( danger(`heartbeat: failed (${formatDuration(durationMs)})`), ); + return { status: "failed", reason: String(err) }; } }; + setReplyHeartbeatWakeHandler(async () => runReplyHeartbeat()); + if (replyHeartbeatMinutes && !replyHeartbeatTimer) { const intervalMs = replyHeartbeatMinutes * 60_000; replyHeartbeatTimer = setInterval(() => { diff --git a/src/web/reply-heartbeat-wake.ts b/src/web/reply-heartbeat-wake.ts new file mode 100644 index 000000000..1cb4f269f --- /dev/null +++ b/src/web/reply-heartbeat-wake.ts @@ -0,0 +1,77 @@ +export type ReplyHeartbeatWakeResult = + | { status: "ran"; durationMs: number } + | { status: "skipped"; reason: string } + | { status: "failed"; reason: string }; + +export type ReplyHeartbeatWakeHandler = (opts: { + reason?: string; +}) => Promise; + +let handler: ReplyHeartbeatWakeHandler | null = null; +let pendingReason: string | null = null; +let scheduled = false; +let running = false; +let timer: NodeJS.Timeout | null = null; + +const DEFAULT_COALESCE_MS = 250; +const DEFAULT_RETRY_MS = 1_000; + +function schedule(coalesceMs: number) { + if (timer) return; + timer = setTimeout(async () => { + timer = null; + scheduled = false; + const active = handler; + if (!active) return; + if (running) { + scheduled = true; + schedule(coalesceMs); + return; + } + + const reason = pendingReason; + pendingReason = null; + running = true; + try { + const res = await active({ reason: reason ?? undefined }); + if (res.status === "skipped" && res.reason === "requests-in-flight") { + // The main lane is busy; retry soon. + pendingReason = reason ?? "retry"; + schedule(DEFAULT_RETRY_MS); + } + } catch (err) { + pendingReason = reason ?? "retry"; + schedule(DEFAULT_RETRY_MS); + throw err; + } finally { + running = false; + if (pendingReason || scheduled) schedule(coalesceMs); + } + }, coalesceMs); + timer.unref?.(); +} + +export function setReplyHeartbeatWakeHandler( + next: ReplyHeartbeatWakeHandler | null, +) { + handler = next; + if (handler && pendingReason) { + schedule(DEFAULT_COALESCE_MS); + } +} + +export function requestReplyHeartbeatNow(opts?: { + reason?: string; + coalesceMs?: number; +}) { + pendingReason = opts?.reason ?? pendingReason ?? "requested"; + schedule(opts?.coalesceMs ?? DEFAULT_COALESCE_MS); +} + +export function hasReplyHeartbeatWakeHandler() { + return handler !== null; +} + +export function hasPendingReplyHeartbeatWake() { + return pendingReason !== null || Boolean(timer) || scheduled; +}