feat: cron ISO at + delete-after-run

This commit is contained in:
Peter Steinberger
2026-01-13 04:55:39 +00:00
parent 8f105288d2
commit 75a7855223
17 changed files with 221 additions and 17 deletions

View File

@@ -75,4 +75,40 @@ describe("normalizeCronJobCreate", () => {
const payload = normalized.payload as Record<string, unknown>;
expect(payload.provider).toBe("telegram");
});
it("coerces ISO schedule.at to atMs (UTC)", () => {
const normalized = normalizeCronJobCreate({
name: "iso at",
enabled: true,
schedule: { at: "2026-01-12T18:00:00" },
sessionTarget: "main",
wakeMode: "next-heartbeat",
payload: {
kind: "systemEvent",
text: "hi",
},
}) as unknown as Record<string, unknown>;
const schedule = normalized.schedule as Record<string, unknown>;
expect(schedule.kind).toBe("at");
expect(schedule.atMs).toBe(Date.parse("2026-01-12T18:00:00Z"));
});
it("coerces ISO schedule.atMs string to atMs (UTC)", () => {
const normalized = normalizeCronJobCreate({
name: "iso atMs",
enabled: true,
schedule: { kind: "at", atMs: "2026-01-12T18:00:00" },
sessionTarget: "main",
wakeMode: "next-heartbeat",
payload: {
kind: "systemEvent",
text: "hi",
},
}) as unknown as Record<string, unknown>;
const schedule = normalized.schedule as Record<string, unknown>;
expect(schedule.kind).toBe("at");
expect(schedule.atMs).toBe(Date.parse("2026-01-12T18:00:00Z"));
});
});

View File

@@ -1,4 +1,5 @@
import { normalizeAgentId } from "../routing/session-key.js";
import { parseAbsoluteTimeMs } from "./parse.js";
import { migrateLegacyCronPayload } from "./payload-migration.js";
import type { CronJobCreate, CronJobPatch } from "./types.js";
@@ -19,11 +20,32 @@ function isRecord(value: unknown): value is UnknownRecord {
function coerceSchedule(schedule: UnknownRecord) {
const next: UnknownRecord = { ...schedule };
const kind = typeof schedule.kind === "string" ? schedule.kind : undefined;
const atMsRaw = schedule.atMs;
const atRaw = schedule.at;
const parsedAtMs =
typeof atMsRaw === "string"
? parseAbsoluteTimeMs(atMsRaw)
: typeof atRaw === "string"
? parseAbsoluteTimeMs(atRaw)
: null;
if (!kind) {
if (typeof schedule.atMs === "number") next.kind = "at";
if (
typeof schedule.atMs === "number" ||
typeof schedule.at === "string" ||
typeof schedule.atMs === "string"
)
next.kind = "at";
else if (typeof schedule.everyMs === "number") next.kind = "every";
else if (typeof schedule.expr === "string") next.kind = "cron";
}
if (typeof schedule.atMs !== "number" && parsedAtMs !== null) {
next.atMs = parsedAtMs;
}
if ("at" in next) delete next.at;
return next;
}

21
src/cron/parse.ts Normal file
View File

@@ -0,0 +1,21 @@
const ISO_TZ_RE = /(Z|[+-]\d{2}:?\d{2})$/i;
const ISO_DATE_RE = /^\d{4}-\d{2}-\d{2}$/;
const ISO_DATE_TIME_RE = /^\d{4}-\d{2}-\d{2}T/;
function normalizeUtcIso(raw: string) {
if (ISO_TZ_RE.test(raw)) return raw;
if (ISO_DATE_RE.test(raw)) return `${raw}T00:00:00Z`;
if (ISO_DATE_TIME_RE.test(raw)) return `${raw}Z`;
return raw;
}
export function parseAbsoluteTimeMs(input: string): number | null {
const raw = input.trim();
if (!raw) return null;
if (/^\d+$/.test(raw)) {
const n = Number(raw);
if (Number.isFinite(n) && n > 0) return Math.floor(n);
}
const parsed = Date.parse(normalizeUtcIso(raw));
return Number.isFinite(parsed) ? parsed : null;
}

View File

@@ -81,6 +81,46 @@ describe("CronService", () => {
await store.cleanup();
});
it("runs a one-shot job and deletes it after success when requested", async () => {
const store = await makeStorePath();
const enqueueSystemEvent = vi.fn();
const requestHeartbeatNow = vi.fn();
const cron = new CronService({
storePath: store.storePath,
cronEnabled: true,
log: noopLogger,
enqueueSystemEvent,
requestHeartbeatNow,
runIsolatedAgentJob: vi.fn(async () => ({ status: "ok" })),
});
await cron.start();
const atMs = Date.parse("2025-12-13T00:00:02.000Z");
const job = await cron.add({
name: "one-shot delete",
enabled: true,
deleteAfterRun: true,
schedule: { kind: "at", atMs },
sessionTarget: "main",
wakeMode: "now",
payload: { kind: "systemEvent", text: "hello" },
});
vi.setSystemTime(new Date("2025-12-13T00:00:02.000Z"));
await vi.runOnlyPendingTimersAsync();
const jobs = await cron.list({ includeDisabled: true });
expect(jobs.find((j) => j.id === job.id)).toBeUndefined();
expect(enqueueSystemEvent).toHaveBeenCalledWith("hello", {
agentId: undefined,
});
expect(requestHeartbeatNow).toHaveBeenCalled();
cron.stop();
await store.cleanup();
});
it("wakeMode now waits for heartbeat completion when available", async () => {
const store = await makeStorePath();
const enqueueSystemEvent = vi.fn();

View File

@@ -193,6 +193,7 @@ export class CronService {
name: normalizeRequiredName(input.name),
description: normalizeOptionalText(input.description),
enabled: input.enabled !== false,
deleteAfterRun: input.deleteAfterRun,
createdAtMs: now,
updatedAtMs: now,
schedule: input.schedule,
@@ -229,6 +230,8 @@ export class CronService {
if ("description" in patch)
job.description = normalizeOptionalText(patch.description);
if (typeof patch.enabled === "boolean") job.enabled = patch.enabled;
if (typeof patch.deleteAfterRun === "boolean")
job.deleteAfterRun = patch.deleteAfterRun;
if (patch.schedule) job.schedule = patch.schedule;
if (patch.sessionTarget) job.sessionTarget = patch.sessionTarget;
if (patch.wakeMode) job.wakeMode = patch.wakeMode;
@@ -472,6 +475,8 @@ export class CronService {
job.state.lastError = undefined;
this.emit({ jobId: job.id, action: "started", runAtMs: startedAt });
let deleted = false;
const finish = async (
status: "ok" | "error" | "skipped",
err?: string,
@@ -484,14 +489,21 @@ export class CronService {
job.state.lastDurationMs = Math.max(0, endedAt - startedAt);
job.state.lastError = err;
if (job.schedule.kind === "at" && status === "ok") {
// One-shot job completed successfully; disable it.
job.enabled = false;
job.state.nextRunAtMs = undefined;
} else if (job.enabled) {
job.state.nextRunAtMs = this.computeJobNextRunAtMs(job, endedAt);
} else {
job.state.nextRunAtMs = undefined;
const shouldDelete =
job.schedule.kind === "at" &&
status === "ok" &&
job.deleteAfterRun === true;
if (!shouldDelete) {
if (job.schedule.kind === "at" && status === "ok") {
// One-shot job completed successfully; disable it.
job.enabled = false;
job.state.nextRunAtMs = undefined;
} else if (job.enabled) {
job.state.nextRunAtMs = this.computeJobNextRunAtMs(job, endedAt);
} else {
job.state.nextRunAtMs = undefined;
}
}
this.emit({
@@ -505,6 +517,12 @@ export class CronService {
nextRunAtMs: job.state.nextRunAtMs,
});
if (shouldDelete && this.store) {
this.store.jobs = this.store.jobs.filter((j) => j.id !== job.id);
deleted = true;
this.emit({ jobId: job.id, action: "removed" });
}
if (job.sessionTarget === "isolated") {
const prefix = job.isolation?.postToMainPrefix?.trim() || "Cron";
const body = (summary ?? err ?? status).trim();
@@ -592,7 +610,7 @@ export class CronService {
await finish("error", String(err));
} finally {
job.updatedAtMs = nowMs;
if (!opts.forced && job.enabled) {
if (!opts.forced && job.enabled && !deleted) {
// Keep nextRunAtMs in sync in case the schedule advanced during a long run.
job.state.nextRunAtMs = this.computeJobNextRunAtMs(
job,

View File

@@ -44,6 +44,7 @@ export type CronJob = {
name: string;
description?: string;
enabled: boolean;
deleteAfterRun?: boolean;
createdAtMs: number;
updatedAtMs: number;
schedule: CronSchedule;