fix: cron wakeMode now waits for heartbeat (#666) (thanks @roshanasingh4)

This commit is contained in:
Peter Steinberger
2026-01-10 18:05:23 +01:00
parent 91c870a0c4
commit b383fbeed3
4 changed files with 101 additions and 25 deletions

View File

@@ -10,6 +10,7 @@
### Fixes ### Fixes
- Auto-reply: prefer `RawBody` for command/directive parsing (WhatsApp + Discord) and prevent fallback runs from clobbering concurrent session updates. (#643) — thanks @mcinteerj. - Auto-reply: prefer `RawBody` for command/directive parsing (WhatsApp + Discord) and prevent fallback runs from clobbering concurrent session updates. (#643) — thanks @mcinteerj.
- Cron: `wakeMode: "now"` waits for heartbeat completion (and retries when the main lane is busy). (#666) — thanks @roshanasingh4.
- Agents/OpenAI: fix Responses tool-only → follow-up turn handling (avoid standalone `reasoning` items that trigger 400 “required following item”). - Agents/OpenAI: fix Responses tool-only → follow-up turn handling (avoid standalone `reasoning` items that trigger 400 “required following item”).
- Auth: update Claude Code keychain credentials in-place during refresh sync; share JSON file helpers; add CLI fallback coverage. - Auth: update Claude Code keychain credentials in-place during refresh sync; share JSON file helpers; add CLI fallback coverage.
- Auth: throttle external CLI credential syncs (Claude/Codex), reduce Keychain reads, and skip sync when cached credentials are still fresh. - Auth: throttle external CLI credential syncs (Claude/Codex), reduce Keychain reads, and skip sync when cached credentials are still fresh.

View File

@@ -4,6 +4,7 @@ import path from "node:path";
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
import type { HeartbeatRunResult } from "../infra/heartbeat-wake.js";
import { CronService } from "./service.js"; import { CronService } from "./service.js";
const noopLogger = { const noopLogger = {
@@ -78,6 +79,68 @@ describe("CronService", () => {
await store.cleanup(); await store.cleanup();
}); });
it("wakeMode now waits for heartbeat completion when available", async () => {
const store = await makeStorePath();
const enqueueSystemEvent = vi.fn();
const requestHeartbeatNow = vi.fn();
let now = 0;
const nowMs = () => {
now += 10;
return now;
};
let resolveHeartbeat: ((res: HeartbeatRunResult) => void) | null = null;
const runHeartbeatOnce = vi.fn(
async () =>
await new Promise<HeartbeatRunResult>((resolve) => {
resolveHeartbeat = resolve;
}),
);
const cron = new CronService({
storePath: store.storePath,
cronEnabled: true,
log: noopLogger,
nowMs,
enqueueSystemEvent,
requestHeartbeatNow,
runHeartbeatOnce,
runIsolatedAgentJob: vi.fn(async () => ({ status: "ok" })),
});
await cron.start();
const job = await cron.add({
name: "wakeMode now waits",
enabled: true,
schedule: { kind: "at", atMs: 1 },
sessionTarget: "main",
wakeMode: "now",
payload: { kind: "systemEvent", text: "hello" },
});
const runPromise = cron.run(job.id, "force");
for (let i = 0; i < 10; i++) {
if (runHeartbeatOnce.mock.calls.length > 0) break;
// Let the locked() chain progress.
await Promise.resolve();
}
expect(runHeartbeatOnce).toHaveBeenCalledTimes(1);
expect(requestHeartbeatNow).not.toHaveBeenCalled();
expect(enqueueSystemEvent).toHaveBeenCalledWith("hello");
expect(job.state.runningAtMs).toBeTypeOf("number");
resolveHeartbeat?.({ status: "ran", durationMs: 123 });
await runPromise;
expect(job.state.lastStatus).toBe("ok");
expect(job.state.lastDurationMs).toBeGreaterThan(0);
cron.stop();
await store.cleanup();
});
it("runs an isolated job and posts summary to main", async () => { it("runs an isolated job and posts summary to main", async () => {
const store = await makeStorePath(); const store = await makeStorePath();
const enqueueSystemEvent = vi.fn(); const enqueueSystemEvent = vi.fn();

View File

@@ -1,5 +1,6 @@
import crypto from "node:crypto"; import crypto from "node:crypto";
import type { HeartbeatRunResult } from "../infra/heartbeat-wake.js";
import { truncateUtf16Safe } from "../utils.js"; import { truncateUtf16Safe } from "../utils.js";
import { migrateLegacyCronPayload } from "./payload-migration.js"; import { migrateLegacyCronPayload } from "./payload-migration.js";
import { computeNextRunAtMs } from "./schedule.js"; import { computeNextRunAtMs } from "./schedule.js";
@@ -37,11 +38,9 @@ export type CronServiceDeps = {
cronEnabled: boolean; cronEnabled: boolean;
enqueueSystemEvent: (text: string) => void; enqueueSystemEvent: (text: string) => void;
requestHeartbeatNow: (opts?: { reason?: string }) => void; requestHeartbeatNow: (opts?: { reason?: string }) => void;
runHeartbeatOnce?: (opts?: { reason?: string }) => Promise<{ runHeartbeatOnce?: (opts?: {
status: "ran" | "skipped" | "failed";
durationMs: number;
reason?: string; reason?: string;
}>; }) => Promise<HeartbeatRunResult>;
runIsolatedAgentJob: (params: { job: CronJob; message: string }) => Promise<{ runIsolatedAgentJob: (params: { job: CronJob; message: string }) => Promise<{
status: "ok" | "error" | "skipped"; status: "ok" | "error" | "skipped";
summary?: string; summary?: string;
@@ -50,6 +49,10 @@ export type CronServiceDeps = {
onEvent?: (evt: CronEvent) => void; onEvent?: (evt: CronEvent) => void;
}; };
type CronServiceDepsInternal = Omit<CronServiceDeps, "nowMs"> & {
nowMs: () => number;
};
const STUCK_RUN_MS = 2 * 60 * 60 * 1000; const STUCK_RUN_MS = 2 * 60 * 60 * 1000;
const MAX_TIMEOUT_MS = 2 ** 31 - 1; const MAX_TIMEOUT_MS = 2 ** 31 - 1;
@@ -104,8 +107,7 @@ function normalizePayloadToSystemText(payload: CronPayload) {
} }
export class CronService { export class CronService {
private readonly deps: Required<Omit<CronServiceDeps, "onEvent">> & private readonly deps: CronServiceDepsInternal;
Pick<CronServiceDeps, "onEvent">;
private store: CronStoreFile | null = null; private store: CronStoreFile | null = null;
private timer: NodeJS.Timeout | null = null; private timer: NodeJS.Timeout | null = null;
private running = false; private running = false;
@@ -116,7 +118,6 @@ export class CronService {
this.deps = { this.deps = {
...deps, ...deps,
nowMs: deps.nowMs ?? (() => Date.now()), nowMs: deps.nowMs ?? (() => Date.now()),
onEvent: deps.onEvent,
}; };
} }
@@ -520,25 +521,36 @@ export class CronService {
} }
this.deps.enqueueSystemEvent(text); this.deps.enqueueSystemEvent(text);
if (job.wakeMode === "now" && this.deps.runHeartbeatOnce) { if (job.wakeMode === "now" && this.deps.runHeartbeatOnce) {
const heartbeatResult = await this.deps.runHeartbeatOnce({ const reason = `cron:${job.id}`;
reason: `cron:${job.id}`, const delay = (ms: number) =>
}); new Promise<void>((resolve) => setTimeout(resolve, ms));
// Map heartbeat status to cron status const maxWaitMs = 2 * 60_000;
const waitStartedAt = this.deps.nowMs();
let heartbeatResult: HeartbeatRunResult;
for (;;) {
heartbeatResult = await this.deps.runHeartbeatOnce({ reason });
if (
heartbeatResult.status !== "skipped" ||
heartbeatResult.reason !== "requests-in-flight"
) {
break;
}
if (this.deps.nowMs() - waitStartedAt > maxWaitMs) {
heartbeatResult = {
status: "skipped",
reason: "timeout waiting for main lane to become idle",
};
break;
}
await delay(250);
}
if (heartbeatResult.status === "ran") { if (heartbeatResult.status === "ran") {
await finish("ok", undefined, text); await finish("ok", undefined, text);
} else if (heartbeatResult.status === "skipped") { } else if (heartbeatResult.status === "skipped")
await finish( await finish("skipped", heartbeatResult.reason, text);
"skipped", else await finish("error", heartbeatResult.reason, text);
heartbeatResult.reason ?? "heartbeat skipped",
text,
);
} else if (heartbeatResult.status === "failed") {
await finish(
"error",
heartbeatResult.reason ?? "heartbeat failed",
text,
);
}
} else { } else {
// wakeMode is "next-heartbeat" or runHeartbeatOnce not available // wakeMode is "next-heartbeat" or runHeartbeatOnce not available
this.deps.requestHeartbeatNow({ reason: `cron:${job.id}` }); this.deps.requestHeartbeatNow({ reason: `cron:${job.id}` });

View File

@@ -723,7 +723,7 @@ export async function startGatewayServer(
return await runHeartbeatOnce({ return await runHeartbeatOnce({
cfg: runtimeConfig, cfg: runtimeConfig,
reason: opts?.reason, reason: opts?.reason,
deps: { runtime }, deps: { ...deps, runtime: defaultRuntime },
}); });
}, },
runIsolatedAgentJob: async ({ job, message }) => { runIsolatedAgentJob: async ({ job, message }) => {