refactor(cron): move store into ~/.clawdis/cron

This commit is contained in:
Peter Steinberger
2025-12-13 12:38:08 +00:00
parent 3863fe6412
commit 3e2e4be680
10 changed files with 90 additions and 63 deletions

View File

@@ -632,11 +632,11 @@ public struct CronRunParams: Codable {
} }
public struct CronRunsParams: Codable { public struct CronRunsParams: Codable {
public let id: String? public let id: String
public let limit: Int? public let limit: Int?
public init( public init(
id: String?, id: String,
limit: Int? limit: Int?
) { ) {
self.id = id self.id = id

View File

@@ -1601,7 +1601,10 @@
"maximum": 5000, "maximum": 5000,
"type": "integer" "type": "integer"
} }
} },
"required": [
"id"
]
}, },
"CronRunLogEntry": { "CronRunLogEntry": {
"additionalProperties": false, "additionalProperties": false,

View File

@@ -362,15 +362,14 @@ export function registerCronCli(program: Command) {
cron cron
.command("runs") .command("runs")
.description("Show cron run history (JSONL-backed)") .description("Show cron run history (JSONL-backed)")
.option("--id <id>", "Job id (required when store is jobs.json)") .requiredOption("--id <id>", "Job id")
.option("--limit <n>", "Max entries (default 50)", "50") .option("--limit <n>", "Max entries (default 50)", "50")
.action(async (opts) => { .action(async (opts) => {
try { try {
const limitRaw = Number.parseInt(String(opts.limit ?? "50"), 10); const limitRaw = Number.parseInt(String(opts.limit ?? "50"), 10);
const limit = const limit =
Number.isFinite(limitRaw) && limitRaw > 0 ? limitRaw : 50; Number.isFinite(limitRaw) && limitRaw > 0 ? limitRaw : 50;
const id = const id = String(opts.id);
typeof opts.id === "string" && opts.id.trim() ? opts.id : undefined;
const res = await callGatewayFromCli("cron.runs", opts, { const res = await callGatewayFromCli("cron.runs", opts, {
id, id,
limit, limit,

View File

@@ -11,13 +11,7 @@ import {
} from "./run-log.js"; } from "./run-log.js";
describe("cron run log", () => { describe("cron run log", () => {
it("resolves a flat store path to cron.runs.jsonl", () => { it("resolves store path to per-job runs/<jobId>.jsonl", () => {
const storePath = path.join(os.tmpdir(), "cron.json");
const p = resolveCronRunLogPath({ storePath, jobId: "job-1" });
expect(p.endsWith(path.join(os.tmpdir(), "cron.runs.jsonl"))).toBe(true);
});
it("resolves jobs.json to per-job runs/<jobId>.jsonl", () => {
const storePath = path.join(os.tmpdir(), "cron", "jobs.json"); const storePath = path.join(os.tmpdir(), "cron", "jobs.json");
const p = resolveCronRunLogPath({ storePath, jobId: "job-1" }); const p = resolveCronRunLogPath({ storePath, jobId: "job-1" });
expect( expect(
@@ -27,7 +21,7 @@ describe("cron run log", () => {
it("appends JSONL and prunes by line count", async () => { it("appends JSONL and prunes by line count", async () => {
const dir = await fs.mkdtemp(path.join(os.tmpdir(), "clawdis-cron-log-")); const dir = await fs.mkdtemp(path.join(os.tmpdir(), "clawdis-cron-log-"));
const logPath = path.join(dir, "cron.runs.jsonl"); const logPath = path.join(dir, "runs", "job-1.jsonl");
for (let i = 0; i < 10; i++) { for (let i = 0; i < 10; i++) {
await appendCronRunLog( await appendCronRunLog(
@@ -59,15 +53,16 @@ describe("cron run log", () => {
const dir = await fs.mkdtemp( const dir = await fs.mkdtemp(
path.join(os.tmpdir(), "clawdis-cron-log-read-"), path.join(os.tmpdir(), "clawdis-cron-log-read-"),
); );
const logPath = path.join(dir, "cron.runs.jsonl"); const logPathA = path.join(dir, "runs", "a.jsonl");
const logPathB = path.join(dir, "runs", "b.jsonl");
await appendCronRunLog(logPath, { await appendCronRunLog(logPathA, {
ts: 1, ts: 1,
jobId: "a", jobId: "a",
action: "finished", action: "finished",
status: "ok", status: "ok",
}); });
await appendCronRunLog(logPath, { await appendCronRunLog(logPathB, {
ts: 2, ts: 2,
jobId: "b", jobId: "b",
action: "finished", action: "finished",
@@ -75,31 +70,37 @@ describe("cron run log", () => {
error: "nope", error: "nope",
summary: "oops", summary: "oops",
}); });
await appendCronRunLog(logPath, { await appendCronRunLog(logPathA, {
ts: 3, ts: 3,
jobId: "a", jobId: "a",
action: "finished", action: "finished",
status: "skipped", status: "skipped",
}); });
const all = await readCronRunLogEntries(logPath, { limit: 10 }); const allA = await readCronRunLogEntries(logPathA, { limit: 10 });
expect(all.map((e) => e.jobId)).toEqual(["a", "b", "a"]); expect(allA.map((e) => e.jobId)).toEqual(["a", "a"]);
const onlyA = await readCronRunLogEntries(logPath, { const onlyA = await readCronRunLogEntries(logPathA, {
limit: 10, limit: 10,
jobId: "a", jobId: "a",
}); });
expect(onlyA.map((e) => e.ts)).toEqual([1, 3]); expect(onlyA.map((e) => e.ts)).toEqual([1, 3]);
const lastOne = await readCronRunLogEntries(logPath, { limit: 1 }); const lastOne = await readCronRunLogEntries(logPathA, { limit: 1 });
expect(lastOne.map((e) => e.ts)).toEqual([3]); expect(lastOne.map((e) => e.ts)).toEqual([3]);
const onlyB = await readCronRunLogEntries(logPath, { const onlyB = await readCronRunLogEntries(logPathB, {
limit: 10, limit: 10,
jobId: "b", jobId: "b",
}); });
expect(onlyB[0]?.summary).toBe("oops"); expect(onlyB[0]?.summary).toBe("oops");
const wrongFilter = await readCronRunLogEntries(logPathA, {
limit: 10,
jobId: "b",
});
expect(wrongFilter).toEqual([]);
await fs.rm(dir, { recursive: true, force: true }); await fs.rm(dir, { recursive: true, force: true });
}); });
}); });

View File

@@ -19,14 +19,7 @@ export function resolveCronRunLogPath(params: {
}) { }) {
const storePath = path.resolve(params.storePath); const storePath = path.resolve(params.storePath);
const dir = path.dirname(storePath); const dir = path.dirname(storePath);
const base = path.basename(storePath); return path.join(dir, "runs", `${params.jobId}.jsonl`);
if (base === "jobs.json") {
return path.join(dir, "runs", `${params.jobId}.jsonl`);
}
const ext = path.extname(base);
const baseNoExt = ext ? base.slice(0, -ext.length) : base;
return path.join(dir, `${baseNoExt}.runs.jsonl`);
} }
const writesByPath = new Map<string, Promise<void>>(); const writesByPath = new Map<string, Promise<void>>();

View File

@@ -16,7 +16,7 @@ const noopLogger = {
async function makeStorePath() { async function makeStorePath() {
const dir = await fs.mkdtemp(path.join(os.tmpdir(), "clawdis-cron-")); const dir = await fs.mkdtemp(path.join(os.tmpdir(), "clawdis-cron-"));
return { return {
storePath: path.join(dir, "cron.json"), storePath: path.join(dir, "cron", "jobs.json"),
cleanup: async () => { cleanup: async () => {
await fs.rm(dir, { recursive: true, force: true }); await fs.rm(dir, { recursive: true, force: true });
}, },
@@ -201,6 +201,7 @@ describe("CronService", () => {
const requestReplyHeartbeatNow = vi.fn(); const requestReplyHeartbeatNow = vi.fn();
const atMs = Date.parse("2025-12-13T00:00:01.000Z"); const atMs = Date.parse("2025-12-13T00:00:01.000Z");
await fs.mkdir(path.dirname(store.storePath), { recursive: true });
await fs.writeFile( await fs.writeFile(
store.storePath, store.storePath,
JSON.stringify({ JSON.stringify({

View File

@@ -6,12 +6,9 @@ import JSON5 from "json5";
import { CONFIG_DIR } from "../utils.js"; import { CONFIG_DIR } from "../utils.js";
import type { CronStoreFile } from "./types.js"; import type { CronStoreFile } from "./types.js";
export const LEGACY_CRON_STORE_PATH = path.join( export const DEFAULT_CRON_DIR = path.join(CONFIG_DIR, "cron");
CONFIG_DIR, export const DEFAULT_CRON_STORE_PATH = path.join(DEFAULT_CRON_DIR, "jobs.json");
"cron", export const LEGACY_FLAT_CRON_STORE_PATH = path.join(CONFIG_DIR, "cron.json");
"jobs.json",
);
export const DEFAULT_CRON_STORE_PATH = path.join(CONFIG_DIR, "cron.json");
export function resolveCronStorePath(storePath?: string) { export function resolveCronStorePath(storePath?: string) {
if (storePath?.trim()) { if (storePath?.trim()) {
@@ -20,11 +17,52 @@ export function resolveCronStorePath(storePath?: string) {
return path.resolve(raw.replace("~", os.homedir())); return path.resolve(raw.replace("~", os.homedir()));
return path.resolve(raw); return path.resolve(raw);
} }
if (fs.existsSync(LEGACY_CRON_STORE_PATH)) return LEGACY_CRON_STORE_PATH;
return DEFAULT_CRON_STORE_PATH; return DEFAULT_CRON_STORE_PATH;
} }
async function maybeMigrateLegacyFlatStore(storePath: string) {
const resolved = path.resolve(storePath);
const resolvedDefault = path.resolve(DEFAULT_CRON_STORE_PATH);
if (resolved !== resolvedDefault) return;
if (fs.existsSync(resolved)) return;
if (!fs.existsSync(LEGACY_FLAT_CRON_STORE_PATH)) return;
try {
const raw = await fs.promises.readFile(
LEGACY_FLAT_CRON_STORE_PATH,
"utf-8",
);
const parsed = JSON5.parse(raw) as Partial<CronStoreFile> | null;
const jobs = Array.isArray(parsed?.jobs) ? (parsed?.jobs as never[]) : [];
const store: CronStoreFile = {
version: 1,
jobs: jobs.filter(Boolean) as never as CronStoreFile["jobs"],
};
await saveCronStore(storePath, store);
await fs.promises.mkdir(DEFAULT_CRON_DIR, { recursive: true });
const destBase = path.join(DEFAULT_CRON_DIR, "cron.json.migrated");
const dest = fs.existsSync(destBase)
? path.join(
DEFAULT_CRON_DIR,
`cron.json.migrated.${process.pid}.${Math.random().toString(16).slice(2)}`,
)
: destBase;
try {
await fs.promises.rename(LEGACY_FLAT_CRON_STORE_PATH, dest);
} catch {
await fs.promises.copyFile(LEGACY_FLAT_CRON_STORE_PATH, dest);
await fs.promises.unlink(LEGACY_FLAT_CRON_STORE_PATH).catch(() => {
/* ignore */
});
}
} catch {
// Best-effort; keep legacy store if anything fails.
}
}
export async function loadCronStore(storePath: string): Promise<CronStoreFile> { export async function loadCronStore(storePath: string): Promise<CronStoreFile> {
await maybeMigrateLegacyFlatStore(storePath);
try { try {
const raw = await fs.promises.readFile(storePath, "utf-8"); const raw = await fs.promises.readFile(storePath, "utf-8");
const parsed = JSON5.parse(raw) as Partial<CronStoreFile> | null; const parsed = JSON5.parse(raw) as Partial<CronStoreFile> | null;

View File

@@ -391,7 +391,7 @@ export const CronRunParamsSchema = Type.Object(
export const CronRunsParamsSchema = Type.Object( export const CronRunsParamsSchema = Type.Object(
{ {
id: Type.Optional(NonEmptyString), id: NonEmptyString,
limit: Type.Optional(Type.Integer({ minimum: 1, maximum: 5000 })), limit: Type.Optional(Type.Integer({ minimum: 1, maximum: 5000 })),
}, },
{ additionalProperties: false }, { additionalProperties: false },

View File

@@ -417,7 +417,8 @@ describe("gateway server", () => {
test("supports cron.add and cron.list", async () => { test("supports cron.add and cron.list", async () => {
const dir = await fs.mkdtemp(path.join(os.tmpdir(), "clawdis-gw-cron-")); const dir = await fs.mkdtemp(path.join(os.tmpdir(), "clawdis-gw-cron-"));
testCronStorePath = path.join(dir, "cron.json"); testCronStorePath = path.join(dir, "cron", "jobs.json");
await fs.mkdir(path.dirname(testCronStorePath), { recursive: true });
await fs.writeFile( await fs.writeFile(
testCronStorePath, testCronStorePath,
JSON.stringify({ version: 1, jobs: [] }), JSON.stringify({ version: 1, jobs: [] }),
@@ -478,11 +479,12 @@ describe("gateway server", () => {
testCronStorePath = undefined; testCronStorePath = undefined;
}); });
test("writes cron run history for flat store paths", async () => { test("writes cron run history to runs/<jobId>.jsonl", async () => {
const dir = await fs.mkdtemp( const dir = await fs.mkdtemp(
path.join(os.tmpdir(), "clawdis-gw-cron-log-"), path.join(os.tmpdir(), "clawdis-gw-cron-log-"),
); );
testCronStorePath = path.join(dir, "cron.json"); testCronStorePath = path.join(dir, "cron", "jobs.json");
await fs.mkdir(path.dirname(testCronStorePath), { recursive: true });
await fs.writeFile( await fs.writeFile(
testCronStorePath, testCronStorePath,
JSON.stringify({ version: 1, jobs: [] }), JSON.stringify({ version: 1, jobs: [] }),
@@ -531,7 +533,7 @@ describe("gateway server", () => {
); );
expect(runRes.ok).toBe(true); expect(runRes.ok).toBe(true);
const logPath = path.join(dir, "cron.runs.jsonl"); const logPath = path.join(dir, "cron", "runs", `${jobId}.jsonl`);
const waitForLog = async () => { const waitForLog = async () => {
for (let i = 0; i < 200; i++) { for (let i = 0; i < 200; i++) {
const raw = await fs.readFile(logPath, "utf-8").catch(() => ""); const raw = await fs.readFile(logPath, "utf-8").catch(() => "");
@@ -542,12 +544,12 @@ describe("gateway server", () => {
}; };
const raw = await waitForLog(); const raw = await waitForLog();
const lines = raw const line = raw
.split("\n") .split("\n")
.map((l) => l.trim()) .map((l) => l.trim())
.filter(Boolean); .filter(Boolean)
expect(lines.length).toBeGreaterThan(0); .at(-1);
const last = JSON.parse(lines.at(-1) ?? "{}") as { const last = JSON.parse(line ?? "{}") as {
jobId?: unknown; jobId?: unknown;
action?: unknown; action?: unknown;
status?: unknown; status?: unknown;
@@ -696,10 +698,11 @@ describe("gateway server", () => {
const dir = await fs.mkdtemp( const dir = await fs.mkdtemp(
path.join(os.tmpdir(), "clawdis-gw-cron-default-on-"), path.join(os.tmpdir(), "clawdis-gw-cron-default-on-"),
); );
testCronStorePath = path.join(dir, "cron.json"); testCronStorePath = path.join(dir, "cron", "jobs.json");
testCronEnabled = undefined; // omitted config => enabled by default testCronEnabled = undefined; // omitted config => enabled by default
try { try {
await fs.mkdir(path.dirname(testCronStorePath), { recursive: true });
await fs.writeFile( await fs.writeFile(
testCronStorePath, testCronStorePath,
JSON.stringify({ version: 1, jobs: [] }), JSON.stringify({ version: 1, jobs: [] }),
@@ -727,7 +730,7 @@ describe("gateway server", () => {
| { enabled?: unknown; storePath?: unknown } | { enabled?: unknown; storePath?: unknown }
| undefined; | undefined;
expect(statusPayload?.enabled).toBe(true); expect(statusPayload?.enabled).toBe(true);
expect(String(statusPayload?.storePath ?? "")).toContain("cron.json"); expect(String(statusPayload?.storePath ?? "")).toContain("jobs.json");
const atMs = Date.now() + 80; const atMs = Date.now() + 80;
ws.send( ws.send(

View File

@@ -1487,21 +1487,10 @@ export async function startGatewayServer(
); );
break; break;
} }
const p = params as { id?: string; limit?: number }; const p = params as { id: string; limit?: number };
if (!p.id && cronStorePath.endsWith(`${path.sep}jobs.json`)) {
respond(
false,
undefined,
errorShape(
ErrorCodes.INVALID_REQUEST,
"cron.runs requires id when using jobs.json store layout",
),
);
break;
}
const logPath = resolveCronRunLogPath({ const logPath = resolveCronRunLogPath({
storePath: cronStorePath, storePath: cronStorePath,
jobId: p.id ?? "all", jobId: p.id,
}); });
const entries = await readCronRunLogEntries(logPath, { const entries = await readCronRunLogEntries(logPath, {
limit: p.limit, limit: p.limit,