diff --git a/CHANGELOG.md b/CHANGELOG.md index c7aaac5a7..9bc163f04 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,7 @@ ### Fixes - Auto-reply: prefer `RawBody` for command/directive parsing (WhatsApp + Discord) and prevent fallback runs from clobbering concurrent session updates. (#643) — thanks @mcinteerj. +- Cron: `wakeMode: "now"` waits for heartbeat completion (and retries when the main lane is busy). (#666) — thanks @roshanasingh4. - Agents/OpenAI: fix Responses tool-only → follow-up turn handling (avoid standalone `reasoning` items that trigger 400 “required following item”). - Auth: update Claude Code keychain credentials in-place during refresh sync; share JSON file helpers; add CLI fallback coverage. - Auth: throttle external CLI credential syncs (Claude/Codex), reduce Keychain reads, and skip sync when cached credentials are still fresh. diff --git a/src/cron/service.test.ts b/src/cron/service.test.ts index 34762abd8..03364e9cd 100644 --- a/src/cron/service.test.ts +++ b/src/cron/service.test.ts @@ -4,6 +4,7 @@ import path from "node:path"; import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; +import type { HeartbeatRunResult } from "../infra/heartbeat-wake.js"; import { CronService } from "./service.js"; const noopLogger = { @@ -78,6 +79,68 @@ describe("CronService", () => { await store.cleanup(); }); + it("wakeMode now waits for heartbeat completion when available", async () => { + const store = await makeStorePath(); + const enqueueSystemEvent = vi.fn(); + const requestHeartbeatNow = vi.fn(); + + let now = 0; + const nowMs = () => { + now += 10; + return now; + }; + + let resolveHeartbeat: ((res: HeartbeatRunResult) => void) | null = null; + const runHeartbeatOnce = vi.fn( + async () => + await new Promise((resolve) => { + resolveHeartbeat = resolve; + }), + ); + + const cron = new CronService({ + storePath: store.storePath, + cronEnabled: true, + log: noopLogger, + nowMs, + enqueueSystemEvent, + requestHeartbeatNow, + runHeartbeatOnce, + runIsolatedAgentJob: vi.fn(async () => ({ status: "ok" })), + }); + + await cron.start(); + const job = await cron.add({ + name: "wakeMode now waits", + enabled: true, + schedule: { kind: "at", atMs: 1 }, + sessionTarget: "main", + wakeMode: "now", + payload: { kind: "systemEvent", text: "hello" }, + }); + + const runPromise = cron.run(job.id, "force"); + for (let i = 0; i < 10; i++) { + if (runHeartbeatOnce.mock.calls.length > 0) break; + // Let the locked() chain progress. + await Promise.resolve(); + } + + expect(runHeartbeatOnce).toHaveBeenCalledTimes(1); + expect(requestHeartbeatNow).not.toHaveBeenCalled(); + expect(enqueueSystemEvent).toHaveBeenCalledWith("hello"); + expect(job.state.runningAtMs).toBeTypeOf("number"); + + resolveHeartbeat?.({ status: "ran", durationMs: 123 }); + await runPromise; + + expect(job.state.lastStatus).toBe("ok"); + expect(job.state.lastDurationMs).toBeGreaterThan(0); + + cron.stop(); + await store.cleanup(); + }); + it("runs an isolated job and posts summary to main", async () => { const store = await makeStorePath(); const enqueueSystemEvent = vi.fn(); diff --git a/src/cron/service.ts b/src/cron/service.ts index 620ebd16f..870e56675 100644 --- a/src/cron/service.ts +++ b/src/cron/service.ts @@ -1,5 +1,6 @@ import crypto from "node:crypto"; +import type { HeartbeatRunResult } from "../infra/heartbeat-wake.js"; import { truncateUtf16Safe } from "../utils.js"; import { migrateLegacyCronPayload } from "./payload-migration.js"; import { computeNextRunAtMs } from "./schedule.js"; @@ -37,11 +38,9 @@ export type CronServiceDeps = { cronEnabled: boolean; enqueueSystemEvent: (text: string) => void; requestHeartbeatNow: (opts?: { reason?: string }) => void; - runHeartbeatOnce?: (opts?: { reason?: string }) => Promise<{ - status: "ran" | "skipped" | "failed"; - durationMs: number; + runHeartbeatOnce?: (opts?: { reason?: string; - }>; + }) => Promise; runIsolatedAgentJob: (params: { job: CronJob; message: string }) => Promise<{ status: "ok" | "error" | "skipped"; summary?: string; @@ -50,6 +49,10 @@ export type CronServiceDeps = { onEvent?: (evt: CronEvent) => void; }; +type CronServiceDepsInternal = Omit & { + nowMs: () => number; +}; + const STUCK_RUN_MS = 2 * 60 * 60 * 1000; const MAX_TIMEOUT_MS = 2 ** 31 - 1; @@ -104,8 +107,7 @@ function normalizePayloadToSystemText(payload: CronPayload) { } export class CronService { - private readonly deps: Required> & - Pick; + private readonly deps: CronServiceDepsInternal; private store: CronStoreFile | null = null; private timer: NodeJS.Timeout | null = null; private running = false; @@ -116,7 +118,6 @@ export class CronService { this.deps = { ...deps, nowMs: deps.nowMs ?? (() => Date.now()), - onEvent: deps.onEvent, }; } @@ -520,25 +521,36 @@ export class CronService { } this.deps.enqueueSystemEvent(text); if (job.wakeMode === "now" && this.deps.runHeartbeatOnce) { - const heartbeatResult = await this.deps.runHeartbeatOnce({ - reason: `cron:${job.id}`, - }); - // Map heartbeat status to cron status + const reason = `cron:${job.id}`; + const delay = (ms: number) => + new Promise((resolve) => setTimeout(resolve, ms)); + const maxWaitMs = 2 * 60_000; + const waitStartedAt = this.deps.nowMs(); + + let heartbeatResult: HeartbeatRunResult; + for (;;) { + heartbeatResult = await this.deps.runHeartbeatOnce({ reason }); + if ( + heartbeatResult.status !== "skipped" || + heartbeatResult.reason !== "requests-in-flight" + ) { + break; + } + if (this.deps.nowMs() - waitStartedAt > maxWaitMs) { + heartbeatResult = { + status: "skipped", + reason: "timeout waiting for main lane to become idle", + }; + break; + } + await delay(250); + } + if (heartbeatResult.status === "ran") { await finish("ok", undefined, text); - } else if (heartbeatResult.status === "skipped") { - await finish( - "skipped", - heartbeatResult.reason ?? "heartbeat skipped", - text, - ); - } else if (heartbeatResult.status === "failed") { - await finish( - "error", - heartbeatResult.reason ?? "heartbeat failed", - text, - ); - } + } else if (heartbeatResult.status === "skipped") + await finish("skipped", heartbeatResult.reason, text); + else await finish("error", heartbeatResult.reason, text); } else { // wakeMode is "next-heartbeat" or runHeartbeatOnce not available this.deps.requestHeartbeatNow({ reason: `cron:${job.id}` }); diff --git a/src/gateway/server.ts b/src/gateway/server.ts index caabc1517..a56f07605 100644 --- a/src/gateway/server.ts +++ b/src/gateway/server.ts @@ -723,7 +723,7 @@ export async function startGatewayServer( return await runHeartbeatOnce({ cfg: runtimeConfig, reason: opts?.reason, - deps: { runtime }, + deps: { ...deps, runtime: defaultRuntime }, }); }, runIsolatedAgentJob: async ({ job, message }) => {