From 47cf28f6b67ad4515349dc4bf89388a19b705c42 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Tue, 20 Jan 2026 10:35:54 +0000 Subject: [PATCH] fix: prevent duplicate cron runs across hot reloads --- CHANGELOG.md | 1 + .../service.prevents-duplicate-timers.test.ts | 89 +++++++++++++++++++ src/cron/service/locked.ts | 19 +++- src/cron/service/store.ts | 8 ++ 4 files changed, 113 insertions(+), 4 deletions(-) create mode 100644 src/cron/service.prevents-duplicate-timers.test.ts diff --git a/CHANGELOG.md b/CHANGELOG.md index 6b5ec756b..2eb26e50a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,7 @@ Docs: https://docs.clawd.bot - Config: centralize default agent concurrency limits. ### Fixes +- Cron: serialize scheduler operations per store path to prevent duplicate runs across hot reloads. (#1216) — thanks @carlulsoe. - Web search: infer Perplexity base URL from API key source (direct vs OpenRouter). - Agents: treat OAuth refresh failures as auth errors to trigger model fallback. (#1261) — thanks @zknicker. - TUI: keep thinking blocks ordered before content during streaming and isolate per-run assembly. (#1202) — thanks @aaronveklabs. diff --git a/src/cron/service.prevents-duplicate-timers.test.ts b/src/cron/service.prevents-duplicate-timers.test.ts new file mode 100644 index 000000000..228d84c87 --- /dev/null +++ b/src/cron/service.prevents-duplicate-timers.test.ts @@ -0,0 +1,89 @@ +import fs from "node:fs/promises"; +import os from "node:os"; +import path from "node:path"; + +import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; + +import { CronService } from "./service.js"; + +const noopLogger = { + debug: vi.fn(), + info: vi.fn(), + warn: vi.fn(), + error: vi.fn(), +}; + +async function makeStorePath() { + const dir = await fs.mkdtemp(path.join(os.tmpdir(), "clawdbot-cron-")); + return { + storePath: path.join(dir, "cron", "jobs.json"), + cleanup: async () => { + await fs.rm(dir, { recursive: true, force: true }); + }, + }; +} + +describe("CronService", () => { + beforeEach(() => { + vi.useFakeTimers(); + vi.setSystemTime(new Date("2025-12-13T00:00:00.000Z")); + noopLogger.debug.mockClear(); + noopLogger.info.mockClear(); + noopLogger.warn.mockClear(); + noopLogger.error.mockClear(); + }); + + afterEach(() => { + vi.useRealTimers(); + }); + + it("avoids duplicate runs when two services share a store", async () => { + const store = await makeStorePath(); + const enqueueSystemEvent = vi.fn(); + const requestHeartbeatNow = vi.fn(); + const runIsolatedAgentJob = vi.fn(async () => ({ status: "ok" })); + + const cronA = new CronService({ + storePath: store.storePath, + cronEnabled: true, + log: noopLogger, + enqueueSystemEvent, + requestHeartbeatNow, + runIsolatedAgentJob, + }); + + await cronA.start(); + const atMs = Date.parse("2025-12-13T00:00:01.000Z"); + await cronA.add({ + name: "shared store job", + enabled: true, + schedule: { kind: "at", atMs }, + sessionTarget: "main", + wakeMode: "next-heartbeat", + payload: { kind: "systemEvent", text: "hello" }, + }); + + const cronB = new CronService({ + storePath: store.storePath, + cronEnabled: true, + log: noopLogger, + enqueueSystemEvent, + requestHeartbeatNow, + runIsolatedAgentJob, + }); + + await cronB.start(); + + vi.setSystemTime(new Date("2025-12-13T00:00:01.000Z")); + await vi.runOnlyPendingTimersAsync(); + await cronA.status(); + await cronB.status(); + + expect(enqueueSystemEvent).toHaveBeenCalledTimes(1); + expect(requestHeartbeatNow).toHaveBeenCalledTimes(1); + + cronA.stop(); + cronB.stop(); + await store.cleanup(); + }); +}); diff --git a/src/cron/service/locked.ts b/src/cron/service/locked.ts index d93a2072b..b73096678 100644 --- a/src/cron/service/locked.ts +++ b/src/cron/service/locked.ts @@ -1,11 +1,22 @@ import type { CronServiceState } from "./state.js"; -export async function locked(state: CronServiceState, fn: () => Promise): Promise { - const next = state.op.then(fn, fn); - // Keep the chain alive even when the operation fails. - state.op = next.then( +const storeLocks = new Map>(); + +const resolveChain = (promise: Promise) => + promise.then( () => undefined, () => undefined, ); + +export async function locked(state: CronServiceState, fn: () => Promise): Promise { + const storePath = state.deps.storePath; + const storeOp = storeLocks.get(storePath) ?? Promise.resolve(); + const next = Promise.all([resolveChain(state.op), resolveChain(storeOp)]).then(fn); + + // Keep the chain alive even when the operation fails. + const keepAlive = resolveChain(next); + state.op = keepAlive; + storeLocks.set(storePath, keepAlive); + return (await next) as T; } diff --git a/src/cron/service/store.ts b/src/cron/service/store.ts index 125ce7456..814786ac6 100644 --- a/src/cron/service/store.ts +++ b/src/cron/service/store.ts @@ -4,8 +4,15 @@ import type { CronJob } from "../types.js"; import { inferLegacyName, normalizeOptionalText } from "./normalize.js"; import type { CronServiceState } from "./state.js"; +const storeCache = new Map(); + export async function ensureLoaded(state: CronServiceState) { if (state.store) return; + const cached = storeCache.get(state.deps.storePath); + if (cached) { + state.store = cached; + return; + } const loaded = await loadCronStore(state.deps.storePath); const jobs = (loaded.jobs ?? []) as unknown as Array>; let mutated = false; @@ -35,6 +42,7 @@ export async function ensureLoaded(state: CronServiceState) { } } state.store = { version: 1, jobs: jobs as unknown as CronJob[] }; + storeCache.set(state.deps.storePath, state.store); if (mutated) await persist(state); }