diff --git a/docs/cli/cron.md b/docs/cli/cron.md index 36185a9cb..5980d16d5 100644 --- a/docs/cli/cron.md +++ b/docs/cli/cron.md @@ -14,3 +14,16 @@ Related: Tip: run `clawdbot cron --help` for the full command surface. +## Common edits + +Update delivery settings without changing the message: + +```bash +clawdbot cron edit --deliver --channel telegram --to "123456789" +``` + +Disable delivery for an isolated job: + +```bash +clawdbot cron edit --no-deliver +``` diff --git a/src/agents/tools/cron-tool.test.ts b/src/agents/tools/cron-tool.test.ts index b162ce41a..6fa452e08 100644 --- a/src/agents/tools/cron-tool.test.ts +++ b/src/agents/tools/cron-tool.test.ts @@ -65,7 +65,7 @@ describe("cron tool", () => { data: { name: "wake-up", schedule: { atMs: 123 }, - payload: { text: "hello" }, + payload: { kind: "systemEvent", text: "hello" }, }, }, }); @@ -105,7 +105,7 @@ describe("cron tool", () => { job: { name: "reminder", schedule: { atMs: 123 }, - payload: { text: "Reminder: the thing." }, + payload: { kind: "systemEvent", text: "Reminder: the thing." }, }, }); diff --git a/src/cli/cron-cli.test.ts b/src/cli/cron-cli.test.ts index 63fba4279..2bb5d9fc6 100644 --- a/src/cli/cron-cli.test.ts +++ b/src/cli/cron-cli.test.ts @@ -173,7 +173,7 @@ describe("cron cli", () => { expect(clearPatch?.patch?.agentId).toBeNull(); }); - it("does not include model/thinking when no payload change is requested", async () => { + it("allows model/thinking updates without --message", async () => { callGatewayFromCli.mockClear(); const { registerCronCli } = await import("./cron-cli.js"); @@ -186,8 +186,64 @@ describe("cron cli", () => { }); const updateCall = callGatewayFromCli.mock.calls.find((call) => call[0] === "cron.update"); - const patch = updateCall?.[2] as { patch?: { payload?: unknown } }; + const patch = updateCall?.[2] as { + patch?: { payload?: { kind?: string; model?: string; thinking?: string } }; + }; - expect(patch?.patch?.payload).toBeUndefined(); + expect(patch?.patch?.payload?.kind).toBe("agentTurn"); + expect(patch?.patch?.payload?.model).toBe("opus"); + expect(patch?.patch?.payload?.thinking).toBe("low"); + }); + + it("updates delivery settings without requiring --message", async () => { + callGatewayFromCli.mockClear(); + + const { registerCronCli } = await import("./cron-cli.js"); + const program = new Command(); + program.exitOverride(); + registerCronCli(program); + + await program.parseAsync( + ["cron", "edit", "job-1", "--deliver", "--channel", "telegram", "--to", "19098680"], + { from: "user" }, + ); + + const updateCall = callGatewayFromCli.mock.calls.find((call) => call[0] === "cron.update"); + const patch = updateCall?.[2] as { + patch?: { + payload?: { + kind?: string; + message?: string; + deliver?: boolean; + channel?: string; + to?: string; + }; + }; + }; + + expect(patch?.patch?.payload?.kind).toBe("agentTurn"); + expect(patch?.patch?.payload?.deliver).toBe(true); + expect(patch?.patch?.payload?.channel).toBe("telegram"); + expect(patch?.patch?.payload?.to).toBe("19098680"); + expect(patch?.patch?.payload?.message).toBeUndefined(); + }); + + it("supports --no-deliver on cron edit", async () => { + callGatewayFromCli.mockClear(); + + const { registerCronCli } = await import("./cron-cli.js"); + const program = new Command(); + program.exitOverride(); + registerCronCli(program); + + await program.parseAsync(["cron", "edit", "job-1", "--no-deliver"], { from: "user" }); + + const updateCall = callGatewayFromCli.mock.calls.find((call) => call[0] === "cron.update"); + const patch = updateCall?.[2] as { + patch?: { payload?: { kind?: string; deliver?: boolean } }; + }; + + expect(patch?.patch?.payload?.kind).toBe("agentTurn"); + expect(patch?.patch?.payload?.deliver).toBe(false); }); }); diff --git a/src/cli/cron-cli/register.cron-edit.ts b/src/cli/cron-cli/register.cron-edit.ts index d24a4282a..d051ff2e8 100644 --- a/src/cli/cron-cli/register.cron-edit.ts +++ b/src/cli/cron-cli/register.cron-edit.ts @@ -38,14 +38,15 @@ export function registerCronEditCommand(cron: Command) { .option( "--deliver", "Deliver agent output (required when using last-route delivery without --to)", - false, ) + .option("--no-deliver", "Disable delivery") .option("--channel ", `Delivery channel (${getCronChannelOptions()})`) .option( "--to ", "Delivery destination (E.164, Telegram chatId, or Discord channel/user)", ) - .option("--best-effort-deliver", "Do not fail job if delivery fails", false) + .option("--best-effort-deliver", "Do not fail job if delivery fails") + .option("--no-best-effort-deliver", "Fail job when delivery fails") .option("--post-prefix ", "Prefix for summary system event") .action(async (id, opts) => { try { @@ -105,35 +106,49 @@ export function registerCronEditCommand(cron: Command) { }; } - const payloadChosen = [opts.systemEvent, opts.message].filter(Boolean).length; - if (payloadChosen > 1) throw new Error("Choose at most one payload change"); - if (opts.systemEvent) { + const hasSystemEventPatch = typeof opts.systemEvent === "string"; + const model = + typeof opts.model === "string" && opts.model.trim() ? opts.model.trim() : undefined; + const thinking = + typeof opts.thinking === "string" && opts.thinking.trim() + ? opts.thinking.trim() + : undefined; + const timeoutSeconds = opts.timeoutSeconds + ? Number.parseInt(String(opts.timeoutSeconds), 10) + : undefined; + const hasTimeoutSeconds = Boolean(timeoutSeconds && Number.isFinite(timeoutSeconds)); + const hasAgentTurnPatch = + typeof opts.message === "string" || + Boolean(model) || + Boolean(thinking) || + hasTimeoutSeconds || + typeof opts.deliver === "boolean" || + typeof opts.channel === "string" || + typeof opts.to === "string" || + typeof opts.bestEffortDeliver === "boolean"; + if (hasSystemEventPatch && hasAgentTurnPatch) { + throw new Error("Choose at most one payload change"); + } + if (hasSystemEventPatch) { patch.payload = { kind: "systemEvent", text: String(opts.systemEvent), }; - } else if (opts.message) { - const model = - typeof opts.model === "string" && opts.model.trim() ? opts.model.trim() : undefined; - const thinking = - typeof opts.thinking === "string" && opts.thinking.trim() - ? opts.thinking.trim() - : undefined; - const timeoutSeconds = opts.timeoutSeconds - ? Number.parseInt(String(opts.timeoutSeconds), 10) - : undefined; - patch.payload = { - kind: "agentTurn", - message: String(opts.message), - model, - thinking, - timeoutSeconds: - timeoutSeconds && Number.isFinite(timeoutSeconds) ? timeoutSeconds : undefined, - deliver: opts.deliver ? true : undefined, - channel: typeof opts.channel === "string" ? opts.channel : undefined, - to: typeof opts.to === "string" ? opts.to : undefined, - bestEffortDeliver: opts.bestEffortDeliver ? true : undefined, - }; + } else if (hasAgentTurnPatch) { + const payload: Record = { kind: "agentTurn" }; + if (typeof opts.message === "string") payload.message = String(opts.message); + if (model) payload.model = model; + if (thinking) payload.thinking = thinking; + if (hasTimeoutSeconds) { + payload.timeoutSeconds = timeoutSeconds; + } + if (typeof opts.deliver === "boolean") payload.deliver = opts.deliver; + if (typeof opts.channel === "string") payload.channel = opts.channel; + if (typeof opts.to === "string") payload.to = opts.to; + if (typeof opts.bestEffortDeliver === "boolean") { + payload.bestEffortDeliver = opts.bestEffortDeliver; + } + patch.payload = payload; } if (typeof opts.postPrefix === "string") { diff --git a/src/cron/normalize.ts b/src/cron/normalize.ts index f5124d42e..25304edb4 100644 --- a/src/cron/normalize.ts +++ b/src/cron/normalize.ts @@ -51,12 +51,6 @@ function coerceSchedule(schedule: UnknownRecord) { function coercePayload(payload: UnknownRecord) { const next: UnknownRecord = { ...payload }; - const kind = typeof payload.kind === "string" ? payload.kind : undefined; - if (!kind) { - if (typeof payload.text === "string") next.kind = "systemEvent"; - else if (typeof payload.message === "string") next.kind = "agentTurn"; - } - // Back-compat: older configs used `provider` for delivery channel. migrateLegacyCronPayload(next); return next; diff --git a/src/cron/service/jobs.ts b/src/cron/service/jobs.ts index 3119a39b2..b9667bd29 100644 --- a/src/cron/service/jobs.ts +++ b/src/cron/service/jobs.ts @@ -1,7 +1,13 @@ import crypto from "node:crypto"; import { computeNextRunAtMs } from "../schedule.js"; -import type { CronJob, CronJobCreate, CronJobPatch } from "../types.js"; +import type { + CronJob, + CronJobCreate, + CronJobPatch, + CronPayload, + CronPayloadPatch, +} from "../types.js"; import { normalizeOptionalAgentId, normalizeOptionalText, @@ -103,7 +109,7 @@ export function applyJobPatch(job: CronJob, patch: CronJobPatch) { if (patch.schedule) job.schedule = patch.schedule; if (patch.sessionTarget) job.sessionTarget = patch.sessionTarget; if (patch.wakeMode) job.wakeMode = patch.wakeMode; - if (patch.payload) job.payload = patch.payload; + if (patch.payload) job.payload = mergeCronPayload(job.payload, patch.payload); if (patch.isolation) job.isolation = patch.isolation; if (patch.state) job.state = { ...job.state, ...patch.state }; if ("agentId" in patch) { @@ -112,6 +118,55 @@ export function applyJobPatch(job: CronJob, patch: CronJobPatch) { assertSupportedJobSpec(job); } +function mergeCronPayload(existing: CronPayload, patch: CronPayloadPatch): CronPayload { + if (patch.kind !== existing.kind) { + return buildPayloadFromPatch(patch); + } + + if (patch.kind === "systemEvent") { + const text = typeof patch.text === "string" ? patch.text : existing.text; + return { kind: "systemEvent", text }; + } + + const next: CronPayload = { ...existing }; + if (typeof patch.message === "string") next.message = patch.message; + if (typeof patch.model === "string") next.model = patch.model; + if (typeof patch.thinking === "string") next.thinking = patch.thinking; + if (typeof patch.timeoutSeconds === "number") next.timeoutSeconds = patch.timeoutSeconds; + if (typeof patch.deliver === "boolean") next.deliver = patch.deliver; + if (typeof patch.channel === "string") next.channel = patch.channel; + if (typeof patch.to === "string") next.to = patch.to; + if (typeof patch.bestEffortDeliver === "boolean") { + next.bestEffortDeliver = patch.bestEffortDeliver; + } + return next; +} + +function buildPayloadFromPatch(patch: CronPayloadPatch): CronPayload { + if (patch.kind === "systemEvent") { + if (typeof patch.text !== "string" || patch.text.length === 0) { + throw new Error('cron.update payload.kind="systemEvent" requires text'); + } + return { kind: "systemEvent", text: patch.text }; + } + + if (typeof patch.message !== "string" || patch.message.length === 0) { + throw new Error('cron.update payload.kind="agentTurn" requires message'); + } + + return { + kind: "agentTurn", + message: patch.message, + model: patch.model, + thinking: patch.thinking, + timeoutSeconds: patch.timeoutSeconds, + deliver: patch.deliver, + channel: patch.channel, + to: patch.to, + bestEffortDeliver: patch.bestEffortDeliver, + }; +} + export function isJobDue(job: CronJob, nowMs: number, opts: { forced: boolean }) { if (opts.forced) return true; return job.enabled && typeof job.state.nextRunAtMs === "number" && nowMs >= job.state.nextRunAtMs; diff --git a/src/cron/types.ts b/src/cron/types.ts index 88d5f9d3d..9fc64588f 100644 --- a/src/cron/types.ts +++ b/src/cron/types.ts @@ -25,6 +25,20 @@ export type CronPayload = bestEffortDeliver?: boolean; }; +export type CronPayloadPatch = + | { kind: "systemEvent"; text?: string } + | { + kind: "agentTurn"; + message?: string; + model?: string; + thinking?: string; + timeoutSeconds?: number; + deliver?: boolean; + channel?: CronMessageChannel; + to?: string; + bestEffortDeliver?: boolean; + }; + export type CronIsolation = { postToMainPrefix?: string; /** @@ -72,6 +86,7 @@ export type CronJobCreate = Omit; }; -export type CronJobPatch = Partial< - Omit & { state: CronJobState } ->; +export type CronJobPatch = Partial> & { + payload?: CronPayloadPatch; + state?: Partial; +}; diff --git a/src/gateway/protocol/schema/cron.ts b/src/gateway/protocol/schema/cron.ts index b3f95cfe3..63ed0c209 100644 --- a/src/gateway/protocol/schema/cron.ts +++ b/src/gateway/protocol/schema/cron.ts @@ -52,6 +52,30 @@ export const CronPayloadSchema = Type.Union([ ), ]); +export const CronPayloadPatchSchema = Type.Union([ + Type.Object( + { + kind: Type.Literal("systemEvent"), + text: Type.Optional(NonEmptyString), + }, + { additionalProperties: false }, + ), + Type.Object( + { + kind: Type.Literal("agentTurn"), + message: Type.Optional(NonEmptyString), + model: Type.Optional(Type.String()), + thinking: Type.Optional(Type.String()), + timeoutSeconds: Type.Optional(Type.Integer({ minimum: 1 })), + deliver: Type.Optional(Type.Boolean()), + channel: Type.Optional(Type.Union([Type.Literal("last"), NonEmptyString])), + to: Type.Optional(Type.String()), + bestEffortDeliver: Type.Optional(Type.Boolean()), + }, + { additionalProperties: false }, + ), +]); + export const CronIsolationSchema = Type.Object( { postToMainPrefix: Type.Optional(Type.String()), @@ -120,18 +144,35 @@ export const CronAddParamsSchema = Type.Object( { additionalProperties: false }, ); +export const CronJobPatchSchema = Type.Object( + { + name: Type.Optional(NonEmptyString), + agentId: Type.Optional(Type.Union([NonEmptyString, Type.Null()])), + description: Type.Optional(Type.String()), + enabled: Type.Optional(Type.Boolean()), + deleteAfterRun: Type.Optional(Type.Boolean()), + schedule: Type.Optional(CronScheduleSchema), + sessionTarget: Type.Optional(Type.Union([Type.Literal("main"), Type.Literal("isolated")])), + wakeMode: Type.Optional(Type.Union([Type.Literal("next-heartbeat"), Type.Literal("now")])), + payload: Type.Optional(CronPayloadPatchSchema), + isolation: Type.Optional(CronIsolationSchema), + state: Type.Optional(Type.Partial(CronJobStateSchema)), + }, + { additionalProperties: false }, +); + export const CronUpdateParamsSchema = Type.Union([ Type.Object( { id: NonEmptyString, - patch: Type.Partial(CronAddParamsSchema), + patch: CronJobPatchSchema, }, { additionalProperties: false }, ), Type.Object( { jobId: NonEmptyString, - patch: Type.Partial(CronAddParamsSchema), + patch: CronJobPatchSchema, }, { additionalProperties: false }, ), diff --git a/src/gateway/server.cron.test.ts b/src/gateway/server.cron.test.ts index 48269cca7..bec6091de 100644 --- a/src/gateway/server.cron.test.ts +++ b/src/gateway/server.cron.test.ts @@ -122,7 +122,7 @@ describe("gateway server cron", () => { data: { name: "wrapped", schedule: { atMs }, - payload: { text: "hello" }, + payload: { kind: "systemEvent", text: "hello" }, }, }); expect(addRes.ok).toBe(true); @@ -166,7 +166,7 @@ describe("gateway server cron", () => { id: jobId, patch: { schedule: { atMs }, - payload: { text: "updated" }, + payload: { kind: "systemEvent", text: "updated" }, }, }); expect(updateRes.ok).toBe(true); @@ -182,6 +182,96 @@ describe("gateway server cron", () => { testState.cronStorePath = undefined; }); + test("merges agentTurn payload patches", async () => { + const dir = await fs.mkdtemp(path.join(os.tmpdir(), "clawdbot-gw-cron-")); + testState.cronStorePath = path.join(dir, "cron", "jobs.json"); + await fs.mkdir(path.dirname(testState.cronStorePath), { recursive: true }); + await fs.writeFile(testState.cronStorePath, JSON.stringify({ version: 1, jobs: [] })); + + const { server, ws } = await startServerWithClient(); + await connectOk(ws); + + const addRes = await rpcReq(ws, "cron.add", { + name: "patch merge", + enabled: true, + schedule: { kind: "every", everyMs: 60_000 }, + sessionTarget: "isolated", + wakeMode: "next-heartbeat", + payload: { kind: "agentTurn", message: "hello", model: "opus" }, + }); + expect(addRes.ok).toBe(true); + const jobIdValue = (addRes.payload as { id?: unknown } | null)?.id; + const jobId = typeof jobIdValue === "string" ? jobIdValue : ""; + expect(jobId.length > 0).toBe(true); + + const updateRes = await rpcReq(ws, "cron.update", { + id: jobId, + patch: { + payload: { kind: "agentTurn", deliver: true, channel: "telegram", to: "19098680" }, + }, + }); + expect(updateRes.ok).toBe(true); + const updated = updateRes.payload as + | { + payload?: { + kind?: unknown; + message?: unknown; + model?: unknown; + deliver?: unknown; + channel?: unknown; + to?: unknown; + }; + } + | undefined; + expect(updated?.payload?.kind).toBe("agentTurn"); + expect(updated?.payload?.message).toBe("hello"); + expect(updated?.payload?.model).toBe("opus"); + expect(updated?.payload?.deliver).toBe(true); + expect(updated?.payload?.channel).toBe("telegram"); + expect(updated?.payload?.to).toBe("19098680"); + + ws.close(); + await server.close(); + await rmTempDir(dir); + testState.cronStorePath = undefined; + }); + + test("rejects payload kind changes without required fields", async () => { + const dir = await fs.mkdtemp(path.join(os.tmpdir(), "clawdbot-gw-cron-")); + testState.cronStorePath = path.join(dir, "cron", "jobs.json"); + await fs.mkdir(path.dirname(testState.cronStorePath), { recursive: true }); + await fs.writeFile(testState.cronStorePath, JSON.stringify({ version: 1, jobs: [] })); + + const { server, ws } = await startServerWithClient(); + await connectOk(ws); + + const addRes = await rpcReq(ws, "cron.add", { + name: "patch reject", + enabled: true, + schedule: { kind: "every", everyMs: 60_000 }, + sessionTarget: "main", + wakeMode: "next-heartbeat", + payload: { kind: "systemEvent", text: "hello" }, + }); + expect(addRes.ok).toBe(true); + const jobIdValue = (addRes.payload as { id?: unknown } | null)?.id; + const jobId = typeof jobIdValue === "string" ? jobIdValue : ""; + expect(jobId.length > 0).toBe(true); + + const updateRes = await rpcReq(ws, "cron.update", { + id: jobId, + patch: { + payload: { kind: "agentTurn", deliver: true }, + }, + }); + expect(updateRes.ok).toBe(false); + + ws.close(); + await server.close(); + await rmTempDir(dir); + testState.cronStorePath = undefined; + }); + test("accepts jobId for cron.update", async () => { const dir = await fs.mkdtemp(path.join(os.tmpdir(), "clawdbot-gw-cron-")); testState.cronStorePath = path.join(dir, "cron", "jobs.json"); @@ -210,7 +300,7 @@ describe("gateway server cron", () => { jobId, patch: { schedule: { atMs }, - payload: { text: "updated" }, + payload: { kind: "systemEvent", text: "updated" }, }, }); expect(updateRes.ok).toBe(true);