Cron: normalize cron.add inputs + align channels (#256)

* fix: harden cron add and align channels

* fix: keep cron tool id params

---------

Co-authored-by: Peter Steinberger <steipete@gmail.com>
This commit is contained in:
Marcus Neves
2026-01-05 23:09:48 -03:00
committed by GitHub
parent 00061b2fd3
commit 67e1452f4a
21 changed files with 457 additions and 48 deletions

View File

@@ -12,7 +12,7 @@
"element",
"node",
"nodeId",
"jobId",
"id",
"requestId",
"to",
"channelId",
@@ -136,10 +136,10 @@
"label": "add",
"detailKeys": ["job.name", "job.id", "job.schedule", "job.cron"]
},
"update": { "label": "update", "detailKeys": ["jobId"] },
"remove": { "label": "remove", "detailKeys": ["jobId"] },
"run": { "label": "run", "detailKeys": ["jobId"] },
"runs": { "label": "runs", "detailKeys": ["jobId"] },
"update": { "label": "update", "detailKeys": ["id"] },
"remove": { "label": "remove", "detailKeys": ["id"] },
"run": { "label": "run", "detailKeys": ["id"] },
"runs": { "label": "runs", "detailKeys": ["id"] },
"wake": { "label": "wake", "detailKeys": ["text", "mode"] }
}
},
@@ -229,4 +229,3 @@
}
}
}

View File

@@ -35,14 +35,31 @@ describe("cron tool", () => {
expect(call.params).toEqual(expectedParams);
});
it("rejects jobId params", async () => {
it("normalizes cron.add job payloads", async () => {
const tool = createCronTool();
await expect(
tool.execute("call2", {
action: "update",
jobId: "job-1",
patch: { foo: "bar" },
}),
).rejects.toThrow("id required");
await tool.execute("call2", {
action: "add",
job: {
data: {
name: "wake-up",
schedule: { atMs: 123 },
payload: { text: "hello" },
},
},
});
expect(callGatewayMock).toHaveBeenCalledTimes(1);
const call = callGatewayMock.mock.calls[0]?.[0] as {
method?: string;
params?: unknown;
};
expect(call.method).toBe("cron.add");
expect(call.params).toEqual({
name: "wake-up",
schedule: { kind: "at", atMs: 123 },
sessionTarget: "main",
wakeMode: "next-heartbeat",
payload: { kind: "systemEvent", text: "hello" },
});
});
});

View File

@@ -2,6 +2,13 @@ import { Type } from "@sinclair/typebox";
import { type AnyAgentTool, jsonResult, readStringParam } from "./common.js";
import { callGatewayTool, type GatewayCallOptions } from "./gateway.js";
import { CronAddParamsSchema } from "../../gateway/protocol/schema.js";
import {
normalizeCronJobCreate,
normalizeCronJobPatch,
} from "../../cron/normalize.js";
const CronJobPatchSchema = Type.Partial(CronAddParamsSchema);
const CronToolSchema = Type.Union([
Type.Object({
@@ -22,7 +29,7 @@ const CronToolSchema = Type.Union([
gatewayUrl: Type.Optional(Type.String()),
gatewayToken: Type.Optional(Type.String()),
timeoutMs: Type.Optional(Type.Number()),
job: Type.Object({}, { additionalProperties: true }),
job: CronAddParamsSchema,
}),
Type.Object({
action: Type.Literal("update"),
@@ -30,7 +37,7 @@ const CronToolSchema = Type.Union([
gatewayToken: Type.Optional(Type.String()),
timeoutMs: Type.Optional(Type.Number()),
id: Type.String(),
patch: Type.Object({}, { additionalProperties: true }),
patch: CronJobPatchSchema,
}),
Type.Object({
action: Type.Literal("remove"),
@@ -97,8 +104,9 @@ export function createCronTool(): AnyAgentTool {
if (!params.job || typeof params.job !== "object") {
throw new Error("job required");
}
const job = normalizeCronJobCreate(params.job) ?? params.job;
return jsonResult(
await callGatewayTool("cron.add", gatewayOpts, params.job),
await callGatewayTool("cron.add", gatewayOpts, job),
);
}
case "update": {
@@ -106,10 +114,11 @@ export function createCronTool(): AnyAgentTool {
if (!params.patch || typeof params.patch !== "object") {
throw new Error("patch required");
}
const patch = normalizeCronJobPatch(params.patch) ?? params.patch;
return jsonResult(
await callGatewayTool("cron.update", gatewayOpts, {
id,
patch: params.patch,
patch,
}),
);
}

View File

@@ -4,6 +4,7 @@ import { defaultRuntime } from "../runtime.js";
import type { GatewayRpcOpts } from "./gateway-rpc.js";
import { addGatewayClientOptions, callGatewayFromCli } from "./gateway-rpc.js";
async function warnIfCronSchedulerDisabled(opts: GatewayRpcOpts) {
try {
const res = (await callGatewayFromCli("cron.status", opts, {})) as {
@@ -155,7 +156,7 @@ export function registerCronCli(program: Command) {
.option("--deliver", "Deliver agent output", false)
.option(
"--channel <channel>",
"Delivery channel (last|whatsapp|telegram|discord|signal|imessage)",
"Delivery channel (last|whatsapp|telegram|discord|slack|signal|imessage)",
"last",
)
.option(
@@ -414,7 +415,7 @@ export function registerCronCli(program: Command) {
.option("--deliver", "Deliver agent output", false)
.option(
"--channel <channel>",
"Delivery channel (last|whatsapp|telegram|discord|signal|imessage)",
"Delivery channel (last|whatsapp|telegram|discord|slack|signal|imessage)",
)
.option(
"--to <dest>",

View File

@@ -0,0 +1,85 @@
import fs from "node:fs/promises";
import path from "node:path";
import { describe, expect, it } from "vitest";
import { CronPayloadSchema } from "../gateway/protocol/schema.js";
type SchemaLike = {
anyOf?: Array<{ properties?: Record<string, unknown> }>;
properties?: Record<string, unknown>;
const?: unknown;
};
type ChannelSchema = {
anyOf?: Array<{ const?: unknown }>;
};
function extractCronChannels(schema: SchemaLike): string[] {
const union = schema.anyOf ?? [];
const payloadWithChannel = union.find((entry) =>
Boolean(entry?.properties && "channel" in entry.properties),
);
const channelSchema = payloadWithChannel?.properties
? (payloadWithChannel.properties.channel as ChannelSchema)
: undefined;
const channels = (channelSchema?.anyOf ?? [])
.map((entry) => entry?.const)
.filter((value): value is string => typeof value === "string");
return channels;
}
const UI_FILES = [
"ui/src/ui/types.ts",
"ui/src/ui/ui-types.ts",
"ui/src/ui/views/cron.ts",
];
const SWIFT_FILES = [
"apps/macos/Sources/Clawdbot/GatewayConnection.swift",
];
describe("cron protocol conformance", () => {
it("ui + swift include all cron channels from gateway schema", async () => {
const channels = extractCronChannels(CronPayloadSchema as SchemaLike);
expect(channels.length).toBeGreaterThan(0);
const cwd = process.cwd();
for (const relPath of UI_FILES) {
const content = await fs.readFile(path.join(cwd, relPath), "utf-8");
for (const channel of channels) {
expect(
content.includes(`"${channel}"`),
`${relPath} missing ${channel}`,
).toBe(true);
}
}
for (const relPath of SWIFT_FILES) {
const content = await fs.readFile(path.join(cwd, relPath), "utf-8");
for (const channel of channels) {
const pattern = new RegExp(`\\bcase\\s+${channel}\\b`);
expect(
pattern.test(content),
`${relPath} missing case ${channel}`,
).toBe(true);
}
}
});
it("cron status shape matches gateway fields in UI + Swift", async () => {
const cwd = process.cwd();
const uiTypes = await fs.readFile(
path.join(cwd, "ui/src/ui/types.ts"),
"utf-8",
);
expect(uiTypes.includes("export type CronStatus")).toBe(true);
expect(uiTypes.includes("jobs:")).toBe(true);
expect(uiTypes.includes("jobCount")).toBe(false);
const swift = await fs.readFile(
path.join(cwd, "apps/macos/Sources/Clawdbot/GatewayConnection.swift"),
"utf-8",
);
expect(swift.includes("struct CronSchedulerStatus")).toBe(true);
expect(swift.includes("let jobs:")).toBe(true);
});
});

88
src/cron/normalize.ts Normal file
View File

@@ -0,0 +1,88 @@
import type { CronJobCreate, CronJobPatch } from "./types.js";
type UnknownRecord = Record<string, unknown>;
type NormalizeOptions = {
applyDefaults?: boolean;
};
const DEFAULT_OPTIONS: NormalizeOptions = {
applyDefaults: false,
};
function isRecord(value: unknown): value is UnknownRecord {
return typeof value === "object" && value !== null && !Array.isArray(value);
}
function coerceSchedule(schedule: UnknownRecord) {
const next: UnknownRecord = { ...schedule };
const kind = typeof schedule.kind === "string" ? schedule.kind : undefined;
if (!kind) {
if (typeof schedule.atMs === "number") next.kind = "at";
else if (typeof schedule.everyMs === "number") next.kind = "every";
else if (typeof schedule.expr === "string") next.kind = "cron";
}
return next;
}
function coercePayload(payload: UnknownRecord) {
const next: UnknownRecord = { ...payload };
const kind = typeof payload.kind === "string" ? payload.kind : undefined;
if (!kind) {
if (typeof payload.text === "string") next.kind = "systemEvent";
else if (typeof payload.message === "string") next.kind = "agentTurn";
}
return next;
}
function unwrapJob(raw: UnknownRecord) {
if (isRecord(raw.data)) return raw.data;
if (isRecord(raw.job)) return raw.job;
return raw;
}
export function normalizeCronJobInput(
raw: unknown,
options: NormalizeOptions = DEFAULT_OPTIONS,
): UnknownRecord | null {
if (!isRecord(raw)) return null;
const base = unwrapJob(raw);
const next: UnknownRecord = { ...base };
if (isRecord(base.schedule)) {
next.schedule = coerceSchedule(base.schedule);
}
if (isRecord(base.payload)) {
next.payload = coercePayload(base.payload);
}
if (options.applyDefaults) {
if (!next.wakeMode) next.wakeMode = "next-heartbeat";
if (!next.sessionTarget && isRecord(next.payload)) {
const kind = typeof next.payload.kind === "string" ? next.payload.kind : "";
if (kind === "systemEvent") next.sessionTarget = "main";
if (kind === "agentTurn") next.sessionTarget = "isolated";
}
}
return next;
}
export function normalizeCronJobCreate(
raw: unknown,
options?: NormalizeOptions,
): CronJobCreate | null {
return normalizeCronJobInput(raw, { applyDefaults: true, ...options }) as
| CronJobCreate
| null;
}
export function normalizeCronJobPatch(
raw: unknown,
options?: NormalizeOptions,
): CronJobPatch | null {
return normalizeCronJobInput(raw, { applyDefaults: false, ...options }) as
| CronJobPatch
| null;
}

View File

@@ -635,6 +635,8 @@ export const CronPayloadSchema = Type.Union([
Type.Literal("telegram"),
Type.Literal("discord"),
Type.Literal("slack"),
Type.Literal("signal"),
Type.Literal("imessage"),
]),
),
to: Type.Optional(Type.String()),

View File

@@ -17,6 +17,10 @@ import {
validateWakeParams,
} from "../protocol/index.js";
import type { GatewayRequestHandlers } from "./types.js";
import {
normalizeCronJobCreate,
normalizeCronJobPatch,
} from "../../cron/normalize.js";
export const cronHandlers: GatewayRequestHandlers = {
wake: ({ params, respond, context }) => {
@@ -72,7 +76,8 @@ export const cronHandlers: GatewayRequestHandlers = {
respond(true, status, undefined);
},
"cron.add": async ({ params, respond, context }) => {
if (!validateCronAddParams(params)) {
const normalized = normalizeCronJobCreate(params) ?? params;
if (!validateCronAddParams(normalized)) {
respond(
false,
undefined,
@@ -83,11 +88,20 @@ export const cronHandlers: GatewayRequestHandlers = {
);
return;
}
const job = await context.cron.add(params as unknown as CronJobCreate);
const job = await context.cron.add(
normalized as unknown as CronJobCreate,
);
respond(true, job, undefined);
},
"cron.update": async ({ params, respond, context }) => {
if (!validateCronUpdateParams(params)) {
const normalizedPatch = normalizeCronJobPatch(
(params as { patch?: unknown } | null)?.patch,
);
const candidate =
normalizedPatch && typeof params === "object" && params !== null
? { ...(params as Record<string, unknown>), patch: normalizedPatch }
: params;
if (!validateCronUpdateParams(candidate)) {
respond(
false,
undefined,
@@ -98,7 +112,7 @@ export const cronHandlers: GatewayRequestHandlers = {
);
return;
}
const p = params as {
const p = candidate as {
id: string;
patch: Record<string, unknown>;
};

View File

@@ -68,6 +68,88 @@ describe("gateway server cron", () => {
testState.cronStorePath = undefined;
});
test("normalizes wrapped cron.add payloads", async () => {
const dir = await fs.mkdtemp(path.join(os.tmpdir(), "clawdbot-gw-cron-"));
testState.cronStorePath = path.join(dir, "cron", "jobs.json");
await fs.mkdir(path.dirname(testState.cronStorePath), { recursive: true });
await fs.writeFile(
testState.cronStorePath,
JSON.stringify({ version: 1, jobs: [] }),
);
const { server, ws } = await startServerWithClient();
await connectOk(ws);
const atMs = Date.now() + 1000;
const addRes = await rpcReq(ws, "cron.add", {
data: {
name: "wrapped",
schedule: { atMs },
payload: { text: "hello" },
},
});
expect(addRes.ok).toBe(true);
const payload = addRes.payload as
| { schedule?: unknown; sessionTarget?: unknown; wakeMode?: unknown }
| undefined;
expect(payload?.sessionTarget).toBe("main");
expect(payload?.wakeMode).toBe("next-heartbeat");
expect((payload?.schedule as { kind?: unknown } | undefined)?.kind).toBe(
"at",
);
ws.close();
await server.close();
await fs.rm(dir, { recursive: true, force: true });
testState.cronStorePath = undefined;
});
test("normalizes cron.update patch payloads", async () => {
const dir = await fs.mkdtemp(path.join(os.tmpdir(), "clawdbot-gw-cron-"));
testState.cronStorePath = path.join(dir, "cron", "jobs.json");
await fs.mkdir(path.dirname(testState.cronStorePath), { recursive: true });
await fs.writeFile(
testState.cronStorePath,
JSON.stringify({ version: 1, jobs: [] }),
);
const { server, ws } = await startServerWithClient();
await connectOk(ws);
const addRes = await rpcReq(ws, "cron.add", {
name: "patch test",
enabled: true,
schedule: { kind: "every", everyMs: 60_000 },
sessionTarget: "main",
wakeMode: "next-heartbeat",
payload: { kind: "systemEvent", text: "hello" },
});
expect(addRes.ok).toBe(true);
const jobIdValue = (addRes.payload as { id?: unknown } | null)?.id;
const jobId = typeof jobIdValue === "string" ? jobIdValue : "";
expect(jobId.length > 0).toBe(true);
const atMs = Date.now() + 1_000;
const updateRes = await rpcReq(ws, "cron.update", {
id: jobId,
patch: {
schedule: { atMs },
payload: { text: "updated" },
},
});
expect(updateRes.ok).toBe(true);
const updated = updateRes.payload as
| { schedule?: { kind?: unknown }; payload?: { kind?: unknown } }
| undefined;
expect(updated?.schedule?.kind).toBe("at");
expect(updated?.payload?.kind).toBe("systemEvent");
ws.close();
await server.close();
await fs.rm(dir, { recursive: true, force: true });
testState.cronStorePath = undefined;
});
test("writes cron run history to runs/<jobId>.jsonl", async () => {
const dir = await fs.mkdtemp(
path.join(os.tmpdir(), "clawdbot-gw-cron-log-"),