Cron: accept jobId in gateway cron params

Closes #252
This commit is contained in:
Shadow
2026-01-12 21:35:43 -06:00
parent ab993904d7
commit 25297ce3f5
4 changed files with 166 additions and 38 deletions

View File

@@ -869,38 +869,75 @@ export const CronAddParamsSchema = Type.Object(
{ additionalProperties: false },
);
export const CronUpdateParamsSchema = Type.Object(
{
id: NonEmptyString,
patch: Type.Partial(CronAddParamsSchema),
},
{ additionalProperties: false },
);
export const CronUpdateParamsSchema = Type.Union([
Type.Object(
{
id: NonEmptyString,
patch: Type.Partial(CronAddParamsSchema),
},
{ additionalProperties: false },
),
Type.Object(
{
jobId: NonEmptyString,
patch: Type.Partial(CronAddParamsSchema),
},
{ additionalProperties: false },
),
]);
export const CronRemoveParamsSchema = Type.Object(
{
id: NonEmptyString,
},
{ additionalProperties: false },
);
export const CronRemoveParamsSchema = Type.Union([
Type.Object(
{
id: NonEmptyString,
},
{ additionalProperties: false },
),
Type.Object(
{
jobId: NonEmptyString,
},
{ additionalProperties: false },
),
]);
export const CronRunParamsSchema = Type.Object(
{
id: NonEmptyString,
mode: Type.Optional(
Type.Union([Type.Literal("due"), Type.Literal("force")]),
),
},
{ additionalProperties: false },
);
export const CronRunParamsSchema = Type.Union([
Type.Object(
{
id: NonEmptyString,
mode: Type.Optional(
Type.Union([Type.Literal("due"), Type.Literal("force")]),
),
},
{ additionalProperties: false },
),
Type.Object(
{
jobId: NonEmptyString,
mode: Type.Optional(
Type.Union([Type.Literal("due"), Type.Literal("force")]),
),
},
{ additionalProperties: false },
),
]);
export const CronRunsParamsSchema = Type.Object(
{
id: NonEmptyString,
limit: Type.Optional(Type.Integer({ minimum: 1, maximum: 5000 })),
},
{ additionalProperties: false },
);
export const CronRunsParamsSchema = Type.Union([
Type.Object(
{
id: NonEmptyString,
limit: Type.Optional(Type.Integer({ minimum: 1, maximum: 5000 })),
},
{ additionalProperties: false },
),
Type.Object(
{
jobId: NonEmptyString,
limit: Type.Optional(Type.Integer({ minimum: 1, maximum: 5000 })),
},
{ additionalProperties: false },
),
]);
export const CronRunLogEntrySchema = Type.Object(
{

View File

@@ -111,11 +111,24 @@ export const cronHandlers: GatewayRequestHandlers = {
return;
}
const p = candidate as {
id: string;
id?: string;
jobId?: string;
patch: Record<string, unknown>;
};
const jobId = p.id ?? p.jobId;
if (!jobId) {
respond(
false,
undefined,
errorShape(
ErrorCodes.INVALID_REQUEST,
"invalid cron.update params: missing id",
),
);
return;
}
const job = await context.cron.update(
p.id,
jobId,
p.patch as unknown as CronJobPatch,
);
respond(true, job, undefined);
@@ -132,8 +145,20 @@ export const cronHandlers: GatewayRequestHandlers = {
);
return;
}
const p = params as { id: string };
const result = await context.cron.remove(p.id);
const p = params as { id?: string; jobId?: string };
const jobId = p.id ?? p.jobId;
if (!jobId) {
respond(
false,
undefined,
errorShape(
ErrorCodes.INVALID_REQUEST,
"invalid cron.remove params: missing id",
),
);
return;
}
const result = await context.cron.remove(jobId);
respond(true, result, undefined);
},
"cron.run": async ({ params, respond, context }) => {
@@ -148,8 +173,20 @@ export const cronHandlers: GatewayRequestHandlers = {
);
return;
}
const p = params as { id: string; mode?: "due" | "force" };
const result = await context.cron.run(p.id, p.mode);
const p = params as { id?: string; jobId?: string; mode?: "due" | "force" };
const jobId = p.id ?? p.jobId;
if (!jobId) {
respond(
false,
undefined,
errorShape(
ErrorCodes.INVALID_REQUEST,
"invalid cron.run params: missing id",
),
);
return;
}
const result = await context.cron.run(jobId, p.mode);
respond(true, result, undefined);
},
"cron.runs": async ({ params, respond, context }) => {
@@ -164,14 +201,26 @@ export const cronHandlers: GatewayRequestHandlers = {
);
return;
}
const p = params as { id: string; limit?: number };
const p = params as { id?: string; jobId?: string; limit?: number };
const jobId = p.id ?? p.jobId;
if (!jobId) {
respond(
false,
undefined,
errorShape(
ErrorCodes.INVALID_REQUEST,
"invalid cron.runs params: missing id",
),
);
return;
}
const logPath = resolveCronRunLogPath({
storePath: context.cronStorePath,
jobId: p.id,
jobId,
});
const entries = await readCronRunLogEntries(logPath, {
limit: p.limit,
jobId: p.id,
jobId,
});
respond(true, { entries }, undefined);
},

View File

@@ -180,6 +180,47 @@ describe("gateway server cron", () => {
testState.cronStorePath = undefined;
});
test("accepts jobId for cron.update", async () => {
const dir = await fs.mkdtemp(path.join(os.tmpdir(), "clawdbot-gw-cron-"));
testState.cronStorePath = path.join(dir, "cron", "jobs.json");
await fs.mkdir(path.dirname(testState.cronStorePath), { recursive: true });
await fs.writeFile(
testState.cronStorePath,
JSON.stringify({ version: 1, jobs: [] }),
);
const { server, ws } = await startServerWithClient();
await connectOk(ws);
const addRes = await rpcReq(ws, "cron.add", {
name: "jobId test",
enabled: true,
schedule: { kind: "every", everyMs: 60_000 },
sessionTarget: "main",
wakeMode: "next-heartbeat",
payload: { kind: "systemEvent", text: "hello" },
});
expect(addRes.ok).toBe(true);
const jobIdValue = (addRes.payload as { id?: unknown } | null)?.id;
const jobId = typeof jobIdValue === "string" ? jobIdValue : "";
expect(jobId.length > 0).toBe(true);
const atMs = Date.now() + 2_000;
const updateRes = await rpcReq(ws, "cron.update", {
jobId,
patch: {
schedule: { atMs },
payload: { text: "updated" },
},
});
expect(updateRes.ok).toBe(true);
ws.close();
await server.close();
await fs.rm(dir, { recursive: true, force: true });
testState.cronStorePath = undefined;
});
test("writes cron run history to runs/<jobId>.jsonl", async () => {
const dir = await fs.mkdtemp(
path.join(os.tmpdir(), "clawdbot-gw-cron-log-"),