diff --git a/CHANGELOG.md b/CHANGELOG.md index c12fe6502..f54cbd323 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,36 +1,9 @@ # Changelog -# 2026.1.12-1 +## 2026.1.12 ### Changes -- Heartbeat: raise default `ackMaxChars` to 300 so any `HEARTBEAT_OK` replies with short padding stay internal (fewer noisy heartbeat posts on providers). - -## 2026.1.11-5 - -### Fixes -- Auto-reply: prevent duplicate /status replies (including /usage alias) and add tests for inline + standalone cases. - -## 2026.1.11-4 - -### Fixes -- CLI: read the git commit hash from the package root so npm installs show it. - -## 2026.1.11-3 - -### Fixes -- CLI: avoid top-level await warnings in the entrypoint on fresh installs. -- CLI: show a commit hash in the banner for npm installs (package.json gitHead fallback). - -## 2026.1.11-2 - -### Fixes -- Installer: ensure the CLI entrypoint is executable after npm installs. -- Packaging: include `dist/plugins/` in the npm package to avoid missing module errors. - -## 2026.1.11-1 - -### Fixes -- Installer: include `patches/` in the npm package so postinstall patching works for npm/bun installs. +- Cron: add optional `agentId` binding (CLI `--agent` / `--clear-agent`), route cron runs + summaries to the chosen agent, and document/test the fallback to the default agent. (#770) ## 2026.1.11 diff --git a/docs/automation/cron-jobs.md b/docs/automation/cron-jobs.md index 6b36c4505..b942793a4 100644 --- a/docs/automation/cron-jobs.md +++ b/docs/automation/cron-jobs.md @@ -27,6 +27,8 @@ A cron job is a stored record with: - a **schedule** (when it should run), - a **payload** (what it should do), - optional **delivery** (where output should be sent). +- optional **agent binding** (`agentId`): run the job under a specific agent; if + missing or unknown, the gateway falls back to the default agent. Jobs are identified by a stable `jobId` (used by CLI/Gateway APIs). In agent tool calls, `jobId` is canonical; legacy `id` is accepted for compatibility. @@ -190,6 +192,16 @@ clawdbot cron add \ --deliver \ --provider whatsapp \ --to "+15551234567" + +Agent selection (multi-agent setups): +```bash +# Pin a job to agent "ops" (falls back to default if that agent is missing) +clawdbot cron add --name "Ops sweep" --cron "0 6 * * *" --session isolated --message "Check ops queue" --agent ops + +# Switch or clear the agent on an existing job +clawdbot cron edit --agent ops +clawdbot cron edit --clear-agent +``` ``` Manual run (debug): diff --git a/src/cli/cron-cli.test.ts b/src/cli/cron-cli.test.ts index b01c9d7f1..0fb1578ba 100644 --- a/src/cli/cron-cli.test.ts +++ b/src/cli/cron-cli.test.ts @@ -74,6 +74,39 @@ describe("cron cli", () => { expect(params?.payload?.thinking).toBe("low"); }); + it("sends agent id on cron add", async () => { + callGatewayFromCli.mockClear(); + + const { registerCronCli } = await import("./cron-cli.js"); + const program = new Command(); + program.exitOverride(); + registerCronCli(program); + + await program.parseAsync( + [ + "cron", + "add", + "--name", + "Agent pinned", + "--cron", + "* * * * *", + "--session", + "isolated", + "--message", + "hi", + "--agent", + "ops", + ], + { from: "user" }, + ); + + const addCall = callGatewayFromCli.mock.calls.find( + (call) => call[0] === "cron.add", + ); + const params = addCall?.[2] as { agentId?: string }; + expect(params?.agentId).toBe("ops"); + }); + it("omits empty model and thinking on cron edit", async () => { callGatewayFromCli.mockClear(); @@ -142,6 +175,36 @@ describe("cron cli", () => { expect(patch?.patch?.payload?.thinking).toBe("high"); }); + it("sets and clears agent id on cron edit", async () => { + callGatewayFromCli.mockClear(); + + const { registerCronCli } = await import("./cron-cli.js"); + const program = new Command(); + program.exitOverride(); + registerCronCli(program); + + await program.parseAsync( + ["cron", "edit", "job-1", "--agent", " Ops ", "--message", "hello"], + { from: "user" }, + ); + + const updateCall = callGatewayFromCli.mock.calls.find( + (call) => call[0] === "cron.update", + ); + const patch = updateCall?.[2] as { patch?: { agentId?: unknown } }; + expect(patch?.patch?.agentId).toBe("Ops"); + + callGatewayFromCli.mockClear(); + await program.parseAsync(["cron", "edit", "job-2", "--clear-agent"], { + from: "user", + }); + const clearCall = callGatewayFromCli.mock.calls.find( + (call) => call[0] === "cron.update", + ); + const clearPatch = clearCall?.[2] as { patch?: { agentId?: unknown } }; + expect(clearPatch?.patch?.agentId).toBeNull(); + }); + it("does not include model/thinking when no payload change is requested", async () => { callGatewayFromCli.mockClear(); diff --git a/src/cli/cron-cli.ts b/src/cli/cron-cli.ts index 14c0b279c..dfac7cca4 100644 --- a/src/cli/cron-cli.ts +++ b/src/cli/cron-cli.ts @@ -2,6 +2,7 @@ import type { Command } from "commander"; import type { CronJob, CronSchedule } from "../cron/types.js"; import { danger } from "../globals.js"; import { PROVIDER_IDS } from "../providers/registry.js"; +import { normalizeAgentId } from "../routing/session-key.js"; import { defaultRuntime } from "../runtime.js"; import { formatDocsLink } from "../terminal/links.js"; import { colorize, isRich, theme } from "../terminal/theme.js"; @@ -72,6 +73,7 @@ const CRON_NEXT_PAD = 10; const CRON_LAST_PAD = 10; const CRON_STATUS_PAD = 9; const CRON_TARGET_PAD = 9; +const CRON_AGENT_PAD = 10; const pad = (value: string, width: number) => value.padEnd(width); @@ -139,6 +141,7 @@ function printCronList(jobs: CronJob[], runtime = defaultRuntime) { pad("Last", CRON_LAST_PAD), pad("Status", CRON_STATUS_PAD), pad("Target", CRON_TARGET_PAD), + pad("Agent", CRON_AGENT_PAD), ].join(" "); runtime.log(rich ? theme.heading(header) : header); @@ -162,6 +165,10 @@ function printCronList(jobs: CronJob[], runtime = defaultRuntime) { const statusRaw = formatStatus(job); const statusLabel = pad(statusRaw, CRON_STATUS_PAD); const targetLabel = pad(job.sessionTarget, CRON_TARGET_PAD); + const agentLabel = pad( + truncate(job.agentId ?? "default", CRON_AGENT_PAD), + CRON_AGENT_PAD, + ); const coloredStatus = (() => { if (statusRaw === "ok") return colorize(rich, theme.success, statusLabel); @@ -178,6 +185,9 @@ function printCronList(jobs: CronJob[], runtime = defaultRuntime) { job.sessionTarget === "isolated" ? colorize(rich, theme.accentBright, targetLabel) : colorize(rich, theme.accent, targetLabel); + const coloredAgent = job.agentId + ? colorize(rich, theme.info, agentLabel) + : colorize(rich, theme.muted, agentLabel); const line = [ colorize(rich, theme.accent, idLabel), @@ -187,6 +197,7 @@ function printCronList(jobs: CronJob[], runtime = defaultRuntime) { colorize(rich, theme.muted, lastLabel), coloredStatus, coloredTarget, + coloredAgent, ].join(" "); runtime.log(line.trimEnd()); @@ -283,6 +294,7 @@ export function registerCronCli(program: Command) { .requiredOption("--name ", "Job name") .option("--description ", "Optional description") .option("--disabled", "Create job disabled", false) + .option("--agent ", "Agent id for this job") .option("--session ", "Session target (main|isolated)", "main") .option( "--wake ", @@ -375,6 +387,11 @@ export function registerCronCli(program: Command) { throw new Error("--wake must be now or next-heartbeat"); } + const agentId = + typeof opts.agent === "string" && opts.agent.trim() + ? normalizeAgentId(opts.agent) + : undefined; + const payload = (() => { const systemEvent = typeof opts.systemEvent === "string" @@ -451,6 +468,7 @@ export function registerCronCli(program: Command) { name, description, enabled: !opts.disabled, + agentId, schedule, sessionTarget, wakeMode, @@ -561,6 +579,8 @@ export function registerCronCli(program: Command) { .option("--enable", "Enable job", false) .option("--disable", "Disable job", false) .option("--session ", "Session target (main|isolated)") + .option("--agent ", "Set agent id") + .option("--clear-agent", "Unset agent and use default", false) .option("--wake ", "Wake mode (now|next-heartbeat)") .option("--at ", "Set one-shot time (ISO) or duration like 20m") .option("--every ", "Set interval duration like 10m") @@ -613,6 +633,15 @@ export function registerCronCli(program: Command) { if (typeof opts.session === "string") patch.sessionTarget = opts.session; if (typeof opts.wake === "string") patch.wakeMode = opts.wake; + if (opts.agent && opts.clearAgent) { + throw new Error("Use --agent or --clear-agent, not both"); + } + if (typeof opts.agent === "string" && opts.agent.trim()) { + patch.agentId = normalizeAgentId(opts.agent); + } + if (opts.clearAgent) { + patch.agentId = null; + } const scheduleChosen = [opts.at, opts.every, opts.cron].filter( Boolean, diff --git a/src/cron/isolated-agent.test.ts b/src/cron/isolated-agent.test.ts index 967c14c4d..a93633539 100644 --- a/src/cron/isolated-agent.test.ts +++ b/src/cron/isolated-agent.test.ts @@ -120,6 +120,75 @@ describe("runCronIsolatedAgentTurn", () => { }); }); + it("uses agentId for workspace, session key, and store paths", async () => { + await withTempHome(async (home) => { + const deps: CliDeps = { + sendMessageWhatsApp: vi.fn(), + sendMessageTelegram: vi.fn(), + sendMessageDiscord: vi.fn(), + sendMessageSignal: vi.fn(), + sendMessageIMessage: vi.fn(), + }; + const opsWorkspace = path.join(home, "ops-workspace"); + vi.mocked(runEmbeddedPiAgent).mockResolvedValue({ + payloads: [{ text: "ok" }], + meta: { + durationMs: 5, + agentMeta: { sessionId: "s", provider: "p", model: "m" }, + }, + }); + + const cfg = makeCfg( + home, + path.join( + home, + ".clawdbot", + "agents", + "{agentId}", + "sessions", + "sessions.json", + ), + { + agents: { + defaults: { workspace: path.join(home, "default-workspace") }, + list: [ + { id: "main", default: true }, + { id: "ops", workspace: opsWorkspace }, + ], + }, + }, + ); + + const res = await runCronIsolatedAgentTurn({ + cfg, + deps, + job: { + ...makeJob({ + kind: "agentTurn", + message: "do it", + deliver: false, + provider: "last", + }), + agentId: "ops", + }, + message: "do it", + sessionKey: "cron:job-ops", + agentId: "ops", + lane: "cron", + }); + + expect(res.status).toBe("ok"); + const call = vi.mocked(runEmbeddedPiAgent).mock.calls.at(-1)?.[0] as { + sessionKey?: string; + workspaceDir?: string; + sessionFile?: string; + }; + expect(call?.sessionKey).toBe("agent:ops:cron:job-ops"); + expect(call?.workspaceDir).toBe(opsWorkspace); + expect(call?.sessionFile).toContain(path.join("agents", "ops")); + }); + }); + it("uses model override when provided", async () => { await withTempHome(async (home) => { const storePath = await writeSessionStore(home); diff --git a/src/cron/isolated-agent.ts b/src/cron/isolated-agent.ts index cdee2cfc8..64e095c46 100644 --- a/src/cron/isolated-agent.ts +++ b/src/cron/isolated-agent.ts @@ -1,4 +1,9 @@ import crypto from "node:crypto"; +import { + resolveAgentConfig, + resolveAgentWorkspaceDir, + resolveDefaultAgentId, +} from "../agents/agent-scope.js"; import { runCliAgent } from "../agents/cli-runner.js"; import { getCliSessionId, setCliSessionId } from "../agents/cli-session.js"; import { lookupContextTokens } from "../agents/context.js"; @@ -21,10 +26,7 @@ import { runEmbeddedPiAgent } from "../agents/pi-embedded.js"; import { buildWorkspaceSkillSnapshot } from "../agents/skills.js"; import { resolveAgentTimeoutMs } from "../agents/timeout.js"; import { hasNonzeroUsage } from "../agents/usage.js"; -import { - DEFAULT_AGENT_WORKSPACE_DIR, - ensureAgentWorkspace, -} from "../agents/workspace.js"; +import { ensureAgentWorkspace } from "../agents/workspace.js"; import { DEFAULT_HEARTBEAT_ACK_MAX_CHARS, stripHeartbeatToken, @@ -35,13 +37,13 @@ import type { ClawdbotConfig } from "../config/config.js"; import { DEFAULT_IDLE_MINUTES, loadSessionStore, - resolveAgentIdFromSessionKey, - resolveMainSessionKey, + resolveAgentMainSessionKey, resolveSessionTranscriptPath, resolveStorePath, type SessionEntry, saveSessionStore, } from "../config/sessions.js"; +import type { AgentDefaultsConfig } from "../config/types.js"; import { registerAgentRunContext } from "../infra/agent-events.js"; import { deliverOutboundPayloads } from "../infra/outbound/deliver.js"; import { resolveMessageProviderSelection } from "../infra/outbound/provider-selection.js"; @@ -52,6 +54,10 @@ import { import { normalizeProviderId } from "../providers/plugins/index.js"; import type { ProviderId } from "../providers/plugins/types.js"; import { DEFAULT_CHAT_PROVIDER } from "../providers/registry.js"; +import { + buildAgentMainSessionKey, + normalizeAgentId, +} from "../routing/session-key.js"; import { INTERNAL_MESSAGE_PROVIDER, normalizeMessageProvider, @@ -113,6 +119,7 @@ function isHeartbeatOnlyResponse( async function resolveDeliveryTarget( cfg: ClawdbotConfig, + agentId: string, jobPayload: { provider?: "last" | ProviderId; to?: string; @@ -134,8 +141,7 @@ async function resolveDeliveryTarget( : undefined; const sessionCfg = cfg.session; - const mainSessionKey = resolveMainSessionKey(cfg); - const agentId = resolveAgentIdFromSessionKey(mainSessionKey); + const mainSessionKey = resolveAgentMainSessionKey({ cfg, agentId }); const storePath = resolveStorePath(sessionCfg?.store, { agentId }); const store = loadSessionStore(storePath); const main = store[mainSessionKey]; @@ -187,6 +193,7 @@ function resolveCronSession(params: { cfg: ClawdbotConfig; sessionKey: string; nowMs: number; + agentId: string; }) { const sessionCfg = params.cfg.session; const idleMinutes = Math.max( @@ -194,7 +201,9 @@ function resolveCronSession(params: { 1, ); const idleMs = idleMinutes * 60_000; - const storePath = resolveStorePath(sessionCfg?.store); + const storePath = resolveStorePath(sessionCfg?.store, { + agentId: params.agentId, + }); const store = loadSessionStore(storePath); const entry = store[params.sessionKey]; const fresh = entry && params.nowMs - entry.updatedAt <= idleMs; @@ -221,10 +230,50 @@ export async function runCronIsolatedAgentTurn(params: { job: CronJob; message: string; sessionKey: string; + agentId?: string; lane?: string; }): Promise { - const agentCfg = params.cfg.agents?.defaults; - const workspaceDirRaw = agentCfg?.workspace ?? DEFAULT_AGENT_WORKSPACE_DIR; + const defaultAgentId = resolveDefaultAgentId(params.cfg); + const requestedAgentId = + typeof params.agentId === "string" && params.agentId.trim() + ? params.agentId + : typeof params.job.agentId === "string" && params.job.agentId.trim() + ? params.job.agentId + : undefined; + const normalizedRequested = requestedAgentId + ? normalizeAgentId(requestedAgentId) + : undefined; + const agentConfigOverride = normalizedRequested + ? resolveAgentConfig(params.cfg, normalizedRequested) + : undefined; + const { model: overrideModel, ...agentOverrideRest } = + agentConfigOverride ?? {}; + const agentId = agentConfigOverride + ? (normalizedRequested ?? defaultAgentId) + : defaultAgentId; + const agentCfg: AgentDefaultsConfig = { + ...(params.cfg.agents?.defaults ?? {}), + ...(agentOverrideRest as Partial), + }; + if (typeof overrideModel === "string") { + agentCfg.model = { primary: overrideModel }; + } else if (overrideModel) { + agentCfg.model = overrideModel; + } + const cfgWithAgentDefaults: ClawdbotConfig = { + ...params.cfg, + agents: { ...(params.cfg.agents ?? {}), defaults: agentCfg }, + }; + + const baseSessionKey = ( + params.sessionKey?.trim() || `cron:${params.job.id}` + ).trim(); + const agentSessionKey = buildAgentMainSessionKey({ + agentId, + mainKey: baseSessionKey, + }); + + const workspaceDirRaw = resolveAgentWorkspaceDir(params.cfg, agentId); const workspace = await ensureAgentWorkspace({ dir: workspaceDirRaw, ensureBootstrapFiles: !agentCfg?.skipBootstrap, @@ -232,7 +281,7 @@ export async function runCronIsolatedAgentTurn(params: { const workspaceDir = workspace.dir; const resolvedDefault = resolveConfiguredModelRef({ - cfg: params.cfg, + cfg: cfgWithAgentDefaults, defaultProvider: DEFAULT_PROVIDER, defaultModel: DEFAULT_MODEL, }); @@ -241,12 +290,12 @@ export async function runCronIsolatedAgentTurn(params: { let catalog: Awaited> | undefined; const loadCatalog = async () => { if (!catalog) { - catalog = await loadModelCatalog({ config: params.cfg }); + catalog = await loadModelCatalog({ config: cfgWithAgentDefaults }); } return catalog; }; // Resolve model - prefer hooks.gmail.model for Gmail hooks. - const isGmailHook = params.sessionKey.startsWith("hook:gmail:"); + const isGmailHook = baseSessionKey.startsWith("hook:gmail:"); const hooksGmailModelRef = isGmailHook ? resolveHooksGmailModel({ cfg: params.cfg, @@ -275,7 +324,7 @@ export async function runCronIsolatedAgentTurn(params: { return { status: "error", error: "invalid model: expected string" }; } const resolvedOverride = resolveAllowedModelRef({ - cfg: params.cfg, + cfg: cfgWithAgentDefaults, catalog: await loadCatalog(), raw: modelOverrideRaw, defaultProvider: resolvedDefault.provider, @@ -290,7 +339,8 @@ export async function runCronIsolatedAgentTurn(params: { const now = Date.now(); const cronSession = resolveCronSession({ cfg: params.cfg, - sessionKey: params.sessionKey, + sessionKey: agentSessionKey, + agentId, nowMs: now, }); const isFirstTurnInSession = @@ -309,7 +359,7 @@ export async function runCronIsolatedAgentTurn(params: { let thinkLevel = jobThink ?? hooksGmailThinking ?? thinkOverride; if (!thinkLevel) { thinkLevel = resolveThinkingDefault({ - cfg: params.cfg, + cfg: cfgWithAgentDefaults, provider, model, catalog: await loadCatalog(), @@ -317,7 +367,7 @@ export async function runCronIsolatedAgentTurn(params: { } const timeoutMs = resolveAgentTimeoutMs({ - cfg: params.cfg, + cfg: cfgWithAgentDefaults, overrideSeconds: params.job.payload.kind === "agentTurn" ? params.job.payload.timeoutSeconds @@ -331,16 +381,20 @@ export async function runCronIsolatedAgentTurn(params: { params.job.payload.kind === "agentTurn" && params.job.payload.bestEffortDeliver === true; - const resolvedDelivery = await resolveDeliveryTarget(params.cfg, { - provider: - params.job.payload.kind === "agentTurn" - ? params.job.payload.provider - : "last", - to: - params.job.payload.kind === "agentTurn" - ? params.job.payload.to - : undefined, - }); + const resolvedDelivery = await resolveDeliveryTarget( + cfgWithAgentDefaults, + agentId, + { + provider: + params.job.payload.kind === "agentTurn" + ? params.job.payload.provider + : "last", + to: + params.job.payload.kind === "agentTurn" + ? params.job.payload.to + : undefined, + }, + ); const base = `[cron:${params.job.id} ${params.job.name}] ${params.message}`.trim(); @@ -350,7 +404,9 @@ export async function runCronIsolatedAgentTurn(params: { const needsSkillsSnapshot = cronSession.isNewSession || !cronSession.sessionEntry.skillsSnapshot; const skillsSnapshot = needsSkillsSnapshot - ? buildWorkspaceSkillSnapshot(workspaceDir, { config: params.cfg }) + ? buildWorkspaceSkillSnapshot(workspaceDir, { + config: cfgWithAgentDefaults, + }) : cronSession.sessionEntry.skillsSnapshot; if (needsSkillsSnapshot && skillsSnapshot) { cronSession.sessionEntry = { @@ -358,17 +414,17 @@ export async function runCronIsolatedAgentTurn(params: { updatedAt: Date.now(), skillsSnapshot, }; - cronSession.store[params.sessionKey] = cronSession.sessionEntry; + cronSession.store[agentSessionKey] = cronSession.sessionEntry; await saveSessionStore(cronSession.storePath, cronSession.store); } // Persist systemSent before the run, mirroring the inbound auto-reply behavior. if (isFirstTurnInSession) { cronSession.sessionEntry.systemSent = true; - cronSession.store[params.sessionKey] = cronSession.sessionEntry; + cronSession.store[agentSessionKey] = cronSession.sessionEntry; await saveSessionStore(cronSession.storePath, cronSession.store); } else { - cronSession.store[params.sessionKey] = cronSession.sessionEntry; + cronSession.store[agentSessionKey] = cronSession.sessionEntry; await saveSessionStore(cronSession.storePath, cronSession.store); } @@ -378,31 +434,32 @@ export async function runCronIsolatedAgentTurn(params: { try { const sessionFile = resolveSessionTranscriptPath( cronSession.sessionEntry.sessionId, + agentId, ); const resolvedVerboseLevel = (cronSession.sessionEntry.verboseLevel as "on" | "off" | undefined) ?? (agentCfg?.verboseDefault as "on" | "off" | undefined); registerAgentRunContext(cronSession.sessionEntry.sessionId, { - sessionKey: params.sessionKey, + sessionKey: agentSessionKey, verboseLevel: resolvedVerboseLevel, }); const messageProvider = resolvedDelivery.provider; const fallbackResult = await runWithModelFallback({ - cfg: params.cfg, + cfg: cfgWithAgentDefaults, provider, model, run: (providerOverride, modelOverride) => { - if (isCliProvider(providerOverride, params.cfg)) { + if (isCliProvider(providerOverride, cfgWithAgentDefaults)) { const cliSessionId = getCliSessionId( cronSession.sessionEntry, providerOverride, ); return runCliAgent({ sessionId: cronSession.sessionEntry.sessionId, - sessionKey: params.sessionKey, + sessionKey: agentSessionKey, sessionFile, workspaceDir, - config: params.cfg, + config: cfgWithAgentDefaults, prompt: commandBody, provider: providerOverride, model: modelOverride, @@ -414,11 +471,11 @@ export async function runCronIsolatedAgentTurn(params: { } return runEmbeddedPiAgent({ sessionId: cronSession.sessionEntry.sessionId, - sessionKey: params.sessionKey, + sessionKey: agentSessionKey, messageProvider, sessionFile, workspaceDir, - config: params.cfg, + config: cfgWithAgentDefaults, skillsSnapshot, prompt: commandBody, lane: params.lane ?? "cron", @@ -454,7 +511,7 @@ export async function runCronIsolatedAgentTurn(params: { cronSession.sessionEntry.modelProvider = providerUsed; cronSession.sessionEntry.model = modelUsed; cronSession.sessionEntry.contextTokens = contextTokens; - if (isCliProvider(providerUsed, params.cfg)) { + if (isCliProvider(providerUsed, cfgWithAgentDefaults)) { const cliSessionId = runResult.meta.agentMeta?.sessionId?.trim(); if (cliSessionId) { setCliSessionId(cronSession.sessionEntry, providerUsed, cliSessionId); @@ -470,7 +527,7 @@ export async function runCronIsolatedAgentTurn(params: { cronSession.sessionEntry.totalTokens = promptTokens > 0 ? promptTokens : (usage.total ?? input); } - cronSession.store[params.sessionKey] = cronSession.sessionEntry; + cronSession.store[agentSessionKey] = cronSession.sessionEntry; await saveSessionStore(cronSession.storePath, cronSession.store); } const firstText = payloads[0]?.text ?? ""; @@ -481,8 +538,7 @@ export async function runCronIsolatedAgentTurn(params: { // This allows cron jobs to silently ack when nothing to report but still deliver // actual content when there is something to say. const ackMaxChars = - params.cfg.agents?.defaults?.heartbeat?.ackMaxChars ?? - DEFAULT_HEARTBEAT_ACK_MAX_CHARS; + agentCfg?.heartbeat?.ackMaxChars ?? DEFAULT_HEARTBEAT_ACK_MAX_CHARS; const skipHeartbeatDelivery = delivery && isHeartbeatOnlyResponse(payloads, Math.max(0, ackMaxChars)); @@ -505,7 +561,7 @@ export async function runCronIsolatedAgentTurn(params: { } try { await deliverOutboundPayloads({ - cfg: params.cfg, + cfg: cfgWithAgentDefaults, provider: resolvedDelivery.provider as Exclude< OutboundProvider, "none" diff --git a/src/cron/normalize.test.ts b/src/cron/normalize.test.ts index 529db68b3..b33f13e84 100644 --- a/src/cron/normalize.test.ts +++ b/src/cron/normalize.test.ts @@ -24,6 +24,38 @@ describe("normalizeCronJobCreate", () => { expect("channel" in payload).toBe(false); }); + it("normalizes agentId and drops null", () => { + const normalized = normalizeCronJobCreate({ + name: "agent-set", + enabled: true, + schedule: { kind: "cron", expr: "* * * * *" }, + sessionTarget: "isolated", + wakeMode: "now", + agentId: " Ops ", + payload: { + kind: "agentTurn", + message: "hi", + }, + }) as unknown as Record; + + expect(normalized.agentId).toBe("Ops"); + + const cleared = normalizeCronJobCreate({ + name: "agent-clear", + enabled: true, + schedule: { kind: "cron", expr: "* * * * *" }, + sessionTarget: "isolated", + wakeMode: "now", + agentId: null, + payload: { + kind: "agentTurn", + message: "hi", + }, + }) as unknown as Record; + + expect(cleared.agentId).toBeNull(); + }); + it("canonicalizes payload.provider casing", () => { const normalized = normalizeCronJobCreate({ name: "legacy provider", diff --git a/src/cron/normalize.ts b/src/cron/normalize.ts index 6e29bf277..07c4611b8 100644 --- a/src/cron/normalize.ts +++ b/src/cron/normalize.ts @@ -1,3 +1,4 @@ +import { normalizeAgentId } from "../routing/session-key.js"; import { migrateLegacyCronPayload } from "./payload-migration.js"; import type { CronJobCreate, CronJobPatch } from "./types.js"; @@ -53,6 +54,17 @@ export function normalizeCronJobInput( const base = unwrapJob(raw); const next: UnknownRecord = { ...base }; + if ("agentId" in base) { + const agentId = (base as UnknownRecord).agentId; + if (agentId === null) { + next.agentId = null; + } else if (typeof agentId === "string") { + const trimmed = agentId.trim(); + if (trimmed) next.agentId = normalizeAgentId(trimmed); + else delete next.agentId; + } + } + if (isRecord(base.schedule)) { next.schedule = coerceSchedule(base.schedule); } diff --git a/src/cron/service.test.ts b/src/cron/service.test.ts index 03364e9cd..168c9faaa 100644 --- a/src/cron/service.test.ts +++ b/src/cron/service.test.ts @@ -71,7 +71,9 @@ describe("CronService", () => { const jobs = await cron.list({ includeDisabled: true }); const updated = jobs.find((j) => j.id === job.id); expect(updated?.enabled).toBe(false); - expect(enqueueSystemEvent).toHaveBeenCalledWith("hello"); + expect(enqueueSystemEvent).toHaveBeenCalledWith("hello", { + agentId: undefined, + }); expect(requestHeartbeatNow).toHaveBeenCalled(); await cron.list({ includeDisabled: true }); @@ -128,7 +130,9 @@ describe("CronService", () => { expect(runHeartbeatOnce).toHaveBeenCalledTimes(1); expect(requestHeartbeatNow).not.toHaveBeenCalled(); - expect(enqueueSystemEvent).toHaveBeenCalledWith("hello"); + expect(enqueueSystemEvent).toHaveBeenCalledWith("hello", { + agentId: undefined, + }); expect(job.state.runningAtMs).toBeTypeOf("number"); resolveHeartbeat?.({ status: "ran", durationMs: 123 }); @@ -175,7 +179,9 @@ describe("CronService", () => { await cron.list({ includeDisabled: true }); expect(runIsolatedAgentJob).toHaveBeenCalledTimes(1); - expect(enqueueSystemEvent).toHaveBeenCalledWith("Cron: done"); + expect(enqueueSystemEvent).toHaveBeenCalledWith("Cron: done", { + agentId: undefined, + }); expect(requestHeartbeatNow).toHaveBeenCalled(); cron.stop(); await store.cleanup(); @@ -318,6 +324,7 @@ describe("CronService", () => { expect(enqueueSystemEvent).toHaveBeenCalledWith( "Cron (error): last output", + { agentId: undefined }, ); expect(requestHeartbeatNow).toHaveBeenCalled(); cron.stop(); diff --git a/src/cron/service.ts b/src/cron/service.ts index 870e56675..77dd9095d 100644 --- a/src/cron/service.ts +++ b/src/cron/service.ts @@ -1,6 +1,7 @@ import crypto from "node:crypto"; import type { HeartbeatRunResult } from "../infra/heartbeat-wake.js"; +import { normalizeAgentId } from "../routing/session-key.js"; import { truncateUtf16Safe } from "../utils.js"; import { migrateLegacyCronPayload } from "./payload-migration.js"; import { computeNextRunAtMs } from "./schedule.js"; @@ -36,7 +37,7 @@ export type CronServiceDeps = { log: Logger; storePath: string; cronEnabled: boolean; - enqueueSystemEvent: (text: string) => void; + enqueueSystemEvent: (text: string, opts?: { agentId?: string }) => void; requestHeartbeatNow: (opts?: { reason?: string }) => void; runHeartbeatOnce?: (opts?: { reason?: string; @@ -74,6 +75,13 @@ function truncateText(input: string, maxLen: number) { return `${truncateUtf16Safe(input, Math.max(0, maxLen - 1)).trimEnd()}…`; } +function normalizeOptionalAgentId(raw: unknown) { + if (typeof raw !== "string") return undefined; + const trimmed = raw.trim(); + if (!trimmed) return undefined; + return normalizeAgentId(trimmed); +} + function inferLegacyName(job: { schedule?: { kind?: unknown; everyMs?: unknown; expr?: unknown }; payload?: { kind?: unknown; text?: unknown; message?: unknown }; @@ -181,6 +189,7 @@ export class CronService { const id = crypto.randomUUID(); const job: CronJob = { id, + agentId: normalizeOptionalAgentId(input.agentId), name: normalizeRequiredName(input.name), description: normalizeOptionalText(input.description), enabled: input.enabled !== false, @@ -226,6 +235,11 @@ export class CronService { if (patch.payload) job.payload = patch.payload; if (patch.isolation) job.isolation = patch.isolation; if (patch.state) job.state = { ...job.state, ...patch.state }; + if ("agentId" in patch) { + job.agentId = normalizeOptionalAgentId( + (patch as { agentId?: unknown }).agentId, + ); + } job.updatedAtMs = now; this.assertSupportedJobSpec(job); @@ -495,7 +509,9 @@ export class CronService { const prefix = job.isolation?.postToMainPrefix?.trim() || "Cron"; const body = (summary ?? err ?? status).trim(); const statusPrefix = status === "ok" ? prefix : `${prefix} (${status})`; - this.deps.enqueueSystemEvent(`${statusPrefix}: ${body}`); + this.deps.enqueueSystemEvent(`${statusPrefix}: ${body}`, { + agentId: job.agentId, + }); if (job.wakeMode === "now") { this.deps.requestHeartbeatNow({ reason: `cron:${job.id}:post` }); } @@ -519,7 +535,7 @@ export class CronService { ); return; } - this.deps.enqueueSystemEvent(text); + this.deps.enqueueSystemEvent(text, { agentId: job.agentId }); if (job.wakeMode === "now" && this.deps.runHeartbeatOnce) { const reason = `cron:${job.id}`; const delay = (ms: number) => diff --git a/src/cron/types.ts b/src/cron/types.ts index 92b70dd4f..c01225cd9 100644 --- a/src/cron/types.ts +++ b/src/cron/types.ts @@ -40,6 +40,7 @@ export type CronJobState = { export type CronJob = { id: string; + agentId?: string; name: string; description?: string; enabled: boolean; diff --git a/src/gateway/protocol/schema.ts b/src/gateway/protocol/schema.ts index 1925c810c..8b8251bb5 100644 --- a/src/gateway/protocol/schema.ts +++ b/src/gateway/protocol/schema.ts @@ -826,6 +826,7 @@ export const CronJobStateSchema = Type.Object( export const CronJobSchema = Type.Object( { id: NonEmptyString, + agentId: Type.Optional(NonEmptyString), name: NonEmptyString, description: Type.Optional(Type.String()), enabled: Type.Boolean(), @@ -856,6 +857,7 @@ export const CronStatusParamsSchema = Type.Object( export const CronAddParamsSchema = Type.Object( { name: NonEmptyString, + agentId: Type.Optional(Type.Union([NonEmptyString, Type.Null()])), description: Type.Optional(Type.String()), enabled: Type.Optional(Type.Boolean()), schedule: CronScheduleSchema, diff --git a/src/gateway/server.ts b/src/gateway/server.ts index f7d2ae137..1897eb090 100644 --- a/src/gateway/server.ts +++ b/src/gateway/server.ts @@ -45,7 +45,7 @@ import { } from "../config/port-defaults.js"; import { loadSessionStore, - resolveMainSessionKey, + resolveAgentMainSessionKey, resolveMainSessionKeyFromConfig, resolveStorePath, } from "../config/sessions.js"; @@ -119,6 +119,7 @@ import { normalizeProviderId, type ProviderId, } from "../providers/plugins/index.js"; +import { normalizeAgentId } from "../routing/session-key.js"; import { defaultRuntime, type RuntimeEnv } from "../runtime.js"; import { isGatewayCliClient, @@ -782,11 +783,36 @@ export async function startGatewayServer( const storePath = resolveCronStorePath(cfg.cron?.store); const cronEnabled = process.env.CLAWDBOT_SKIP_CRON !== "1" && cfg.cron?.enabled !== false; + const resolveCronAgent = (requested?: string | null) => { + const runtimeConfig = loadConfig(); + const normalized = + typeof requested === "string" && requested.trim() + ? normalizeAgentId(requested) + : undefined; + const hasAgent = + normalized !== undefined && + Array.isArray(runtimeConfig.agents?.list) && + runtimeConfig.agents.list.some( + (entry) => + entry && + typeof entry.id === "string" && + normalizeAgentId(entry.id) === normalized, + ); + const agentId = hasAgent + ? normalized + : resolveDefaultAgentId(runtimeConfig); + return { agentId, cfg: runtimeConfig }; + }; const cron = new CronService({ storePath, cronEnabled, - enqueueSystemEvent: (text) => { - enqueueSystemEvent(text, { sessionKey: resolveMainSessionKey(cfg) }); + enqueueSystemEvent: (text, opts) => { + const { agentId, cfg: runtimeConfig } = resolveCronAgent(opts?.agentId); + const sessionKey = resolveAgentMainSessionKey({ + cfg: runtimeConfig, + agentId, + }); + enqueueSystemEvent(text, { sessionKey }); }, requestHeartbeatNow, runHeartbeatOnce: async (opts) => { @@ -798,12 +824,13 @@ export async function startGatewayServer( }); }, runIsolatedAgentJob: async ({ job, message }) => { - const runtimeConfig = loadConfig(); + const { agentId, cfg: runtimeConfig } = resolveCronAgent(job.agentId); return await runCronIsolatedAgentTurn({ cfg: runtimeConfig, deps, job, message, + agentId, sessionKey: `cron:${job.id}`, lane: "cron", }); diff --git a/ui/src/ui/types.ts b/ui/src/ui/types.ts index b2d0263eb..c6b729720 100644 --- a/ui/src/ui/types.ts +++ b/ui/src/ui/types.ts @@ -350,6 +350,7 @@ export type CronJobState = { export type CronJob = { id: string; + agentId?: string; name: string; description?: string; enabled: boolean;