import fs from "node:fs/promises"; import os from "node:os"; import path from "node:path"; import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; import type { HeartbeatRunResult } from "../infra/heartbeat-wake.js"; import { CronService } from "./service.js"; const noopLogger = { debug: vi.fn(), info: vi.fn(), warn: vi.fn(), error: vi.fn(), }; async function makeStorePath() { const dir = await fs.mkdtemp(path.join(os.tmpdir(), "clawdbot-cron-")); return { storePath: path.join(dir, "cron", "jobs.json"), cleanup: async () => { await fs.rm(dir, { recursive: true, force: true }); }, }; } describe("CronService", () => { beforeEach(() => { vi.useFakeTimers(); vi.setSystemTime(new Date("2025-12-13T00:00:00.000Z")); noopLogger.debug.mockClear(); noopLogger.info.mockClear(); noopLogger.warn.mockClear(); noopLogger.error.mockClear(); }); afterEach(() => { vi.useRealTimers(); }); it("runs a one-shot main job and disables it after success", async () => { const store = await makeStorePath(); const enqueueSystemEvent = vi.fn(); const requestHeartbeatNow = vi.fn(); const cron = new CronService({ storePath: store.storePath, cronEnabled: true, log: noopLogger, enqueueSystemEvent, requestHeartbeatNow, runIsolatedAgentJob: vi.fn(async () => ({ status: "ok" })), }); await cron.start(); const atMs = Date.parse("2025-12-13T00:00:02.000Z"); const job = await cron.add({ name: "one-shot hello", enabled: true, schedule: { kind: "at", atMs }, sessionTarget: "main", wakeMode: "now", payload: { kind: "systemEvent", text: "hello" }, }); expect(job.state.nextRunAtMs).toBe(atMs); vi.setSystemTime(new Date("2025-12-13T00:00:02.000Z")); await vi.runOnlyPendingTimersAsync(); const jobs = await cron.list({ includeDisabled: true }); const updated = jobs.find((j) => j.id === job.id); expect(updated?.enabled).toBe(false); expect(enqueueSystemEvent).toHaveBeenCalledWith("hello", { agentId: undefined, }); expect(requestHeartbeatNow).toHaveBeenCalled(); await cron.list({ includeDisabled: true }); cron.stop(); await store.cleanup(); }); it("runs a one-shot job and deletes it after success when requested", async () => { const store = await makeStorePath(); const enqueueSystemEvent = vi.fn(); const requestHeartbeatNow = vi.fn(); const cron = new CronService({ storePath: store.storePath, cronEnabled: true, log: noopLogger, enqueueSystemEvent, requestHeartbeatNow, runIsolatedAgentJob: vi.fn(async () => ({ status: "ok" })), }); await cron.start(); const atMs = Date.parse("2025-12-13T00:00:02.000Z"); const job = await cron.add({ name: "one-shot delete", enabled: true, deleteAfterRun: true, schedule: { kind: "at", atMs }, sessionTarget: "main", wakeMode: "now", payload: { kind: "systemEvent", text: "hello" }, }); vi.setSystemTime(new Date("2025-12-13T00:00:02.000Z")); await vi.runOnlyPendingTimersAsync(); const jobs = await cron.list({ includeDisabled: true }); expect(jobs.find((j) => j.id === job.id)).toBeUndefined(); expect(enqueueSystemEvent).toHaveBeenCalledWith("hello", { agentId: undefined, }); expect(requestHeartbeatNow).toHaveBeenCalled(); cron.stop(); await store.cleanup(); }); it("wakeMode now waits for heartbeat completion when available", async () => { const store = await makeStorePath(); const enqueueSystemEvent = vi.fn(); const requestHeartbeatNow = vi.fn(); let now = 0; const nowMs = () => { now += 10; return now; }; let resolveHeartbeat: ((res: HeartbeatRunResult) => void) | null = null; const runHeartbeatOnce = vi.fn( async () => await new Promise((resolve) => { resolveHeartbeat = resolve; }), ); const cron = new CronService({ storePath: store.storePath, cronEnabled: true, log: noopLogger, nowMs, enqueueSystemEvent, requestHeartbeatNow, runHeartbeatOnce, runIsolatedAgentJob: vi.fn(async () => ({ status: "ok" })), }); await cron.start(); const job = await cron.add({ name: "wakeMode now waits", enabled: true, schedule: { kind: "at", atMs: 1 }, sessionTarget: "main", wakeMode: "now", payload: { kind: "systemEvent", text: "hello" }, }); const runPromise = cron.run(job.id, "force"); for (let i = 0; i < 10; i++) { if (runHeartbeatOnce.mock.calls.length > 0) break; // Let the locked() chain progress. await Promise.resolve(); } expect(runHeartbeatOnce).toHaveBeenCalledTimes(1); expect(requestHeartbeatNow).not.toHaveBeenCalled(); expect(enqueueSystemEvent).toHaveBeenCalledWith("hello", { agentId: undefined, }); expect(job.state.runningAtMs).toBeTypeOf("number"); resolveHeartbeat?.({ status: "ran", durationMs: 123 }); await runPromise; expect(job.state.lastStatus).toBe("ok"); expect(job.state.lastDurationMs).toBeGreaterThan(0); cron.stop(); await store.cleanup(); }); it("runs an isolated job and posts summary to main", async () => { const store = await makeStorePath(); const enqueueSystemEvent = vi.fn(); const requestHeartbeatNow = vi.fn(); const runIsolatedAgentJob = vi.fn(async () => ({ status: "ok" as const, summary: "done", })); const cron = new CronService({ storePath: store.storePath, cronEnabled: true, log: noopLogger, enqueueSystemEvent, requestHeartbeatNow, runIsolatedAgentJob, }); await cron.start(); const atMs = Date.parse("2025-12-13T00:00:01.000Z"); await cron.add({ enabled: true, name: "weekly", schedule: { kind: "at", atMs }, sessionTarget: "isolated", wakeMode: "now", payload: { kind: "agentTurn", message: "do it", deliver: false }, }); vi.setSystemTime(new Date("2025-12-13T00:00:01.000Z")); await vi.runOnlyPendingTimersAsync(); await cron.list({ includeDisabled: true }); expect(runIsolatedAgentJob).toHaveBeenCalledTimes(1); expect(enqueueSystemEvent).toHaveBeenCalledWith("Cron: done", { agentId: undefined, }); expect(requestHeartbeatNow).toHaveBeenCalled(); cron.stop(); await store.cleanup(); }); it("migrates legacy payload.provider to payload.channel on load", async () => { const store = await makeStorePath(); const enqueueSystemEvent = vi.fn(); const requestHeartbeatNow = vi.fn(); const rawJob = { id: "legacy-1", name: "legacy", enabled: true, createdAtMs: Date.now(), updatedAtMs: Date.now(), schedule: { kind: "cron", expr: "* * * * *" }, sessionTarget: "isolated", wakeMode: "now", payload: { kind: "agentTurn", message: "hi", deliver: true, provider: " TeLeGrAm ", to: "7200373102", }, state: {}, }; await fs.mkdir(path.dirname(store.storePath), { recursive: true }); await fs.writeFile( store.storePath, JSON.stringify({ version: 1, jobs: [rawJob] }, null, 2), "utf-8", ); const cron = new CronService({ storePath: store.storePath, cronEnabled: true, log: noopLogger, enqueueSystemEvent, requestHeartbeatNow, runIsolatedAgentJob: vi.fn(async () => ({ status: "ok" })), }); await cron.start(); const jobs = await cron.list({ includeDisabled: true }); const job = jobs.find((j) => j.id === rawJob.id); const payload = job?.payload as unknown as Record; expect(payload.channel).toBe("telegram"); expect("provider" in payload).toBe(false); cron.stop(); await store.cleanup(); }); it("canonicalizes payload.channel casing on load", async () => { const store = await makeStorePath(); const enqueueSystemEvent = vi.fn(); const requestHeartbeatNow = vi.fn(); const rawJob = { id: "legacy-2", name: "legacy", enabled: true, createdAtMs: Date.now(), updatedAtMs: Date.now(), schedule: { kind: "cron", expr: "* * * * *" }, sessionTarget: "isolated", wakeMode: "now", payload: { kind: "agentTurn", message: "hi", deliver: true, channel: "Telegram", to: "7200373102", }, state: {}, }; await fs.mkdir(path.dirname(store.storePath), { recursive: true }); await fs.writeFile( store.storePath, JSON.stringify({ version: 1, jobs: [rawJob] }, null, 2), "utf-8", ); const cron = new CronService({ storePath: store.storePath, cronEnabled: true, log: noopLogger, enqueueSystemEvent, requestHeartbeatNow, runIsolatedAgentJob: vi.fn(async () => ({ status: "ok" })), }); await cron.start(); const jobs = await cron.list({ includeDisabled: true }); const job = jobs.find((j) => j.id === rawJob.id); const payload = job?.payload as unknown as Record; expect(payload.channel).toBe("telegram"); cron.stop(); await store.cleanup(); }); it("posts last output to main even when isolated job errors", async () => { const store = await makeStorePath(); const enqueueSystemEvent = vi.fn(); const requestHeartbeatNow = vi.fn(); const runIsolatedAgentJob = vi.fn(async () => ({ status: "error" as const, summary: "last output", error: "boom", })); const cron = new CronService({ storePath: store.storePath, cronEnabled: true, log: noopLogger, enqueueSystemEvent, requestHeartbeatNow, runIsolatedAgentJob, }); await cron.start(); const atMs = Date.parse("2025-12-13T00:00:01.000Z"); await cron.add({ name: "isolated error test", enabled: true, schedule: { kind: "at", atMs }, sessionTarget: "isolated", wakeMode: "now", payload: { kind: "agentTurn", message: "do it", deliver: false }, }); vi.setSystemTime(new Date("2025-12-13T00:00:01.000Z")); await vi.runOnlyPendingTimersAsync(); await cron.list({ includeDisabled: true }); expect(enqueueSystemEvent).toHaveBeenCalledWith("Cron (error): last output", { agentId: undefined, }); expect(requestHeartbeatNow).toHaveBeenCalled(); cron.stop(); await store.cleanup(); }); it("rejects unsupported session/payload combinations", async () => { const store = await makeStorePath(); const cron = new CronService({ storePath: store.storePath, cronEnabled: true, log: noopLogger, enqueueSystemEvent: vi.fn(), requestHeartbeatNow: vi.fn(), runIsolatedAgentJob: vi.fn(async () => ({ status: "ok" })), }); await cron.start(); await expect( cron.add({ name: "bad combo (main/agentTurn)", enabled: true, schedule: { kind: "every", everyMs: 1000 }, sessionTarget: "main", wakeMode: "next-heartbeat", payload: { kind: "agentTurn", message: "nope" }, }), ).rejects.toThrow(/main cron jobs require/); await expect( cron.add({ name: "bad combo (isolated/systemEvent)", enabled: true, schedule: { kind: "every", everyMs: 1000 }, sessionTarget: "isolated", wakeMode: "next-heartbeat", payload: { kind: "systemEvent", text: "nope" }, }), ).rejects.toThrow(/isolated cron jobs require/); cron.stop(); await store.cleanup(); }); it("skips invalid main jobs with agentTurn payloads from disk", async () => { const store = await makeStorePath(); const enqueueSystemEvent = vi.fn(); const requestHeartbeatNow = vi.fn(); const atMs = Date.parse("2025-12-13T00:00:01.000Z"); await fs.mkdir(path.dirname(store.storePath), { recursive: true }); await fs.writeFile( store.storePath, JSON.stringify({ version: 1, jobs: [ { id: "job-1", enabled: true, createdAtMs: Date.parse("2025-12-13T00:00:00.000Z"), updatedAtMs: Date.parse("2025-12-13T00:00:00.000Z"), schedule: { kind: "at", atMs }, sessionTarget: "main", wakeMode: "now", payload: { kind: "agentTurn", message: "bad" }, state: {}, }, ], }), ); const cron = new CronService({ storePath: store.storePath, cronEnabled: true, log: noopLogger, enqueueSystemEvent, requestHeartbeatNow, runIsolatedAgentJob: vi.fn(async () => ({ status: "ok" })), }); await cron.start(); vi.setSystemTime(new Date("2025-12-13T00:00:01.000Z")); await vi.runOnlyPendingTimersAsync(); expect(enqueueSystemEvent).not.toHaveBeenCalled(); expect(requestHeartbeatNow).not.toHaveBeenCalled(); const jobs = await cron.list({ includeDisabled: true }); expect(jobs[0]?.state.lastStatus).toBe("skipped"); expect(jobs[0]?.state.lastError).toMatch(/main job requires/i); cron.stop(); await store.cleanup(); }); });