feat(cron): post isolated summaries

This commit is contained in:
Peter Steinberger
2025-12-13 12:09:15 +00:00
parent 32cd1175fb
commit c02613e15f
12 changed files with 462 additions and 52 deletions

View File

@@ -0,0 +1,197 @@
import fs from "node:fs/promises";
import os from "node:os";
import path from "node:path";
import { beforeEach, describe, expect, it, vi } from "vitest";
import type { CliDeps } from "../cli/deps.js";
import type { ClawdisConfig } from "../config/config.js";
import type { CronJob } from "./types.js";
vi.mock("../auto-reply/command-reply.js", () => ({
runCommandReply: vi.fn(),
}));
import { runCommandReply } from "../auto-reply/command-reply.js";
import { runCronIsolatedAgentTurn } from "./isolated-agent.js";
async function makeSessionStorePath() {
const dir = await fs.mkdtemp(
path.join(os.tmpdir(), "clawdis-cron-sessions-"),
);
const storePath = path.join(dir, "sessions.json");
await fs.writeFile(
storePath,
JSON.stringify(
{
main: {
sessionId: "main-session",
updatedAt: Date.now(),
lastChannel: "webchat",
lastTo: "",
},
},
null,
2,
),
);
return {
storePath,
cleanup: async () => {
await fs.rm(dir, { recursive: true, force: true });
},
};
}
function makeCfg(storePath: string): ClawdisConfig {
return {
inbound: {
reply: {
mode: "command",
command: ["echo", "ok"],
session: {
store: storePath,
mainKey: "main",
},
},
},
} as ClawdisConfig;
}
function makeJob(payload: CronJob["payload"]): CronJob {
const now = Date.now();
return {
id: "job-1",
enabled: true,
createdAtMs: now,
updatedAtMs: now,
schedule: { kind: "every", everyMs: 60_000 },
sessionTarget: "isolated",
wakeMode: "now",
payload,
state: {},
isolation: { postToMainPrefix: "Cron" },
};
}
describe("runCronIsolatedAgentTurn", () => {
beforeEach(() => {
vi.mocked(runCommandReply).mockReset();
});
it("uses last non-empty agent text as summary", async () => {
const sessions = await makeSessionStorePath();
const deps: CliDeps = {
sendMessageWhatsApp: vi.fn(),
sendMessageTelegram: vi.fn(),
};
vi.mocked(runCommandReply).mockResolvedValue({
payloads: [{ text: "first" }, { text: " " }, { text: " last " }],
});
const res = await runCronIsolatedAgentTurn({
cfg: makeCfg(sessions.storePath),
deps,
job: makeJob({ kind: "agentTurn", message: "do it", deliver: false }),
message: "do it",
sessionKey: "cron:job-1",
lane: "cron",
});
expect(res.status).toBe("ok");
expect(res.summary).toBe("last");
await sessions.cleanup();
});
it("truncates long summaries", async () => {
const sessions = await makeSessionStorePath();
const deps: CliDeps = {
sendMessageWhatsApp: vi.fn(),
sendMessageTelegram: vi.fn(),
};
const long = "a".repeat(2001);
vi.mocked(runCommandReply).mockResolvedValue({
payloads: [{ text: long }],
});
const res = await runCronIsolatedAgentTurn({
cfg: makeCfg(sessions.storePath),
deps,
job: makeJob({ kind: "agentTurn", message: "do it", deliver: false }),
message: "do it",
sessionKey: "cron:job-1",
lane: "cron",
});
expect(res.status).toBe("ok");
expect(String(res.summary ?? "")).toMatch(/…$/);
await sessions.cleanup();
});
it("fails delivery without a WhatsApp recipient when bestEffortDeliver=false", async () => {
const sessions = await makeSessionStorePath();
const deps: CliDeps = {
sendMessageWhatsApp: vi.fn(),
sendMessageTelegram: vi.fn(),
};
vi.mocked(runCommandReply).mockResolvedValue({
payloads: [{ text: "hello" }],
});
const res = await runCronIsolatedAgentTurn({
cfg: makeCfg(sessions.storePath),
deps,
job: makeJob({
kind: "agentTurn",
message: "do it",
deliver: true,
channel: "whatsapp",
bestEffortDeliver: false,
}),
message: "do it",
sessionKey: "cron:job-1",
lane: "cron",
});
expect(res.status).toBe("error");
expect(res.summary).toBe("hello");
expect(String(res.error ?? "")).toMatch(/requires a recipient/i);
expect(deps.sendMessageWhatsApp).not.toHaveBeenCalled();
await sessions.cleanup();
});
it("skips delivery without a WhatsApp recipient when bestEffortDeliver=true", async () => {
const sessions = await makeSessionStorePath();
const deps: CliDeps = {
sendMessageWhatsApp: vi.fn(),
sendMessageTelegram: vi.fn(),
};
vi.mocked(runCommandReply).mockResolvedValue({
payloads: [{ text: "hello" }],
});
const res = await runCronIsolatedAgentTurn({
cfg: makeCfg(sessions.storePath),
deps,
job: makeJob({
kind: "agentTurn",
message: "do it",
deliver: true,
channel: "whatsapp",
bestEffortDeliver: true,
}),
message: "do it",
sessionKey: "cron:job-1",
lane: "cron",
});
expect(res.status).toBe("skipped");
expect(String(res.summary ?? "")).toMatch(/delivery skipped/i);
expect(deps.sendMessageWhatsApp).not.toHaveBeenCalled();
await sessions.cleanup();
});
});

View File

@@ -23,6 +23,7 @@ import type { CronJob } from "./types.js";
export type RunCronAgentTurnResult = {
status: "ok" | "error" | "skipped";
summary?: string;
error?: string;
};
function assertCommandReplyConfig(cfg: ClawdisConfig) {
@@ -241,19 +242,24 @@ export async function runCronIsolatedAgentTurn(params: {
const lane = params.lane?.trim() || "cron";
const runResult = await runCommandReply({
reply: { ...replyCfg, mode: "command" },
templatingCtx,
sendSystemOnce,
isNewSession: cronSession.isNewSession,
isFirstTurnInSession,
systemSent: cronSession.sessionEntry.systemSent ?? false,
timeoutMs,
timeoutSeconds,
thinkLevel,
enqueue: (task, opts) => enqueueCommandInLane(lane, task, opts),
runId: cronSession.sessionEntry.sessionId,
});
let runResult: Awaited<ReturnType<typeof runCommandReply>>;
try {
runResult = await runCommandReply({
reply: { ...replyCfg, mode: "command" },
templatingCtx,
sendSystemOnce,
isNewSession: cronSession.isNewSession,
isFirstTurnInSession,
systemSent: cronSession.sessionEntry.systemSent ?? false,
timeoutMs,
timeoutSeconds,
thinkLevel,
enqueue: (task, opts) => enqueueCommandInLane(lane, task, opts),
runId: cronSession.sessionEntry.sessionId,
});
} catch (err) {
return { status: "error", error: String(err) };
}
const payloads = runResult.payloads ?? [];
const firstText = payloads[0]?.text ?? "";
@@ -263,12 +269,12 @@ export async function runCronIsolatedAgentTurn(params: {
if (delivery) {
if (resolvedDelivery.channel === "whatsapp") {
if (!resolvedDelivery.to) {
if (!bestEffortDeliver) {
if (!bestEffortDeliver)
return {
status: "error",
summary: "Cron delivery to WhatsApp requires a recipient.",
summary,
error: "Cron delivery to WhatsApp requires a recipient.",
};
}
return {
status: "skipped",
summary: "Delivery skipped (no WhatsApp recipient).",
@@ -292,22 +298,18 @@ export async function runCronIsolatedAgentTurn(params: {
}
}
} catch (err) {
if (!bestEffortDeliver) throw err;
return {
status: "ok",
summary: summary
? `${summary} (delivery failed)`
: "completed (delivery failed)",
};
if (!bestEffortDeliver)
return { status: "error", summary, error: String(err) };
return { status: "ok", summary };
}
} else if (resolvedDelivery.channel === "telegram") {
if (!resolvedDelivery.to) {
if (!bestEffortDeliver) {
if (!bestEffortDeliver)
return {
status: "error",
summary: "Cron delivery to Telegram requires a chatId.",
summary,
error: "Cron delivery to Telegram requires a chatId.",
};
}
return {
status: "skipped",
summary: "Delivery skipped (no Telegram chatId).",
@@ -337,13 +339,9 @@ export async function runCronIsolatedAgentTurn(params: {
}
}
} catch (err) {
if (!bestEffortDeliver) throw err;
return {
status: "ok",
summary: summary
? `${summary} (delivery failed)`
: "completed (delivery failed)",
};
if (!bestEffortDeliver)
return { status: "error", summary, error: String(err) };
return { status: "ok", summary };
}
}
}

View File

@@ -73,6 +73,7 @@ describe("cron run log", () => {
action: "finished",
status: "error",
error: "nope",
summary: "oops",
});
await appendCronRunLog(logPath, {
ts: 3,
@@ -93,6 +94,12 @@ describe("cron run log", () => {
const lastOne = await readCronRunLogEntries(logPath, { limit: 1 });
expect(lastOne.map((e) => e.ts)).toEqual([3]);
const onlyB = await readCronRunLogEntries(logPath, {
limit: 10,
jobId: "b",
});
expect(onlyB[0]?.summary).toBe("oops");
await fs.rm(dir, { recursive: true, force: true });
});
});

View File

@@ -7,6 +7,7 @@ export type CronRunLogEntry = {
action: "finished";
status?: "ok" | "error" | "skipped";
error?: string;
summary?: string;
runAtMs?: number;
durationMs?: number;
nextRunAtMs?: number;

View File

@@ -117,6 +117,173 @@ describe("CronService", () => {
await store.cleanup();
});
it("posts last output to main even when isolated job errors", async () => {
const store = await makeStorePath();
const enqueueSystemEvent = vi.fn();
const requestReplyHeartbeatNow = vi.fn();
const runIsolatedAgentJob = vi.fn(async () => ({
status: "error" as const,
summary: "last output",
error: "boom",
}));
const cron = new CronService({
storePath: store.storePath,
cronEnabled: true,
log: noopLogger,
enqueueSystemEvent,
requestReplyHeartbeatNow,
runIsolatedAgentJob,
});
await cron.start();
const atMs = Date.parse("2025-12-13T00:00:01.000Z");
await cron.add({
enabled: true,
schedule: { kind: "at", atMs },
sessionTarget: "isolated",
wakeMode: "now",
payload: { kind: "agentTurn", message: "do it", deliver: false },
});
vi.setSystemTime(new Date("2025-12-13T00:00:01.000Z"));
await vi.runOnlyPendingTimersAsync();
expect(enqueueSystemEvent).toHaveBeenCalledWith(
"Cron (error): last output",
);
expect(requestReplyHeartbeatNow).toHaveBeenCalled();
cron.stop();
await store.cleanup();
});
it("rejects unsupported session/payload combinations", async () => {
const store = await makeStorePath();
const cron = new CronService({
storePath: store.storePath,
cronEnabled: true,
log: noopLogger,
enqueueSystemEvent: vi.fn(),
requestReplyHeartbeatNow: vi.fn(),
runIsolatedAgentJob: vi.fn(async () => ({ status: "ok" })),
});
await cron.start();
await expect(
cron.add({
enabled: true,
schedule: { kind: "every", everyMs: 1000 },
sessionTarget: "main",
wakeMode: "next-heartbeat",
payload: { kind: "agentTurn", message: "nope" },
}),
).rejects.toThrow(/main cron jobs require/);
await expect(
cron.add({
enabled: true,
schedule: { kind: "every", everyMs: 1000 },
sessionTarget: "isolated",
wakeMode: "next-heartbeat",
payload: { kind: "systemEvent", text: "nope" },
}),
).rejects.toThrow(/isolated cron jobs require/);
cron.stop();
await store.cleanup();
});
it("skips invalid main jobs with agentTurn payloads from disk", async () => {
const store = await makeStorePath();
const enqueueSystemEvent = vi.fn();
const requestReplyHeartbeatNow = vi.fn();
const atMs = Date.parse("2025-12-13T00:00:01.000Z");
await fs.writeFile(
store.storePath,
JSON.stringify({
version: 1,
jobs: [
{
id: "job-1",
enabled: true,
createdAtMs: Date.parse("2025-12-13T00:00:00.000Z"),
updatedAtMs: Date.parse("2025-12-13T00:00:00.000Z"),
schedule: { kind: "at", atMs },
sessionTarget: "main",
wakeMode: "now",
payload: { kind: "agentTurn", message: "bad" },
state: {},
},
],
}),
);
const cron = new CronService({
storePath: store.storePath,
cronEnabled: true,
log: noopLogger,
enqueueSystemEvent,
requestReplyHeartbeatNow,
runIsolatedAgentJob: vi.fn(async () => ({ status: "ok" })),
});
await cron.start();
vi.setSystemTime(new Date("2025-12-13T00:00:01.000Z"));
await vi.runOnlyPendingTimersAsync();
expect(enqueueSystemEvent).not.toHaveBeenCalled();
expect(requestReplyHeartbeatNow).not.toHaveBeenCalled();
const jobs = await cron.list({ includeDisabled: true });
expect(jobs[0]?.state.lastStatus).toBe("skipped");
expect(jobs[0]?.state.lastError).toMatch(/main job requires/i);
cron.stop();
await store.cleanup();
});
it("skips main jobs with empty systemEvent text", async () => {
const store = await makeStorePath();
const enqueueSystemEvent = vi.fn();
const requestReplyHeartbeatNow = vi.fn();
const cron = new CronService({
storePath: store.storePath,
cronEnabled: true,
log: noopLogger,
enqueueSystemEvent,
requestReplyHeartbeatNow,
runIsolatedAgentJob: vi.fn(async () => ({ status: "ok" })),
});
await cron.start();
const atMs = Date.parse("2025-12-13T00:00:01.000Z");
await cron.add({
enabled: true,
schedule: { kind: "at", atMs },
sessionTarget: "main",
wakeMode: "now",
payload: { kind: "systemEvent", text: " " },
});
vi.setSystemTime(new Date("2025-12-13T00:00:01.000Z"));
await vi.runOnlyPendingTimersAsync();
expect(enqueueSystemEvent).not.toHaveBeenCalled();
expect(requestReplyHeartbeatNow).not.toHaveBeenCalled();
const jobs = await cron.list({ includeDisabled: true });
expect(jobs[0]?.state.lastStatus).toBe("skipped");
expect(jobs[0]?.state.lastError).toMatch(/non-empty/i);
cron.stop();
await store.cleanup();
});
it("does not schedule timers when cron is disabled", async () => {
const store = await makeStorePath();
const enqueueSystemEvent = vi.fn();

View File

@@ -17,6 +17,7 @@ export type CronEvent = {
durationMs?: number;
status?: "ok" | "error" | "skipped";
error?: string;
summary?: string;
nextRunAtMs?: number;
};
@@ -34,10 +35,11 @@ export type CronServiceDeps = {
cronEnabled: boolean;
enqueueSystemEvent: (text: string) => void;
requestReplyHeartbeatNow: (opts?: { reason?: string }) => void;
runIsolatedAgentJob: (params: {
job: CronJob;
message: string;
}) => Promise<{ status: "ok" | "error" | "skipped"; summary?: string }>;
runIsolatedAgentJob: (params: { job: CronJob; message: string }) => Promise<{
status: "ok" | "error" | "skipped";
summary?: string;
error?: string;
}>;
onEvent?: (evt: CronEvent) => void;
};
@@ -142,6 +144,7 @@ export class CronService {
...input.state,
},
};
this.assertSupportedJobSpec(job);
job.state.nextRunAtMs = this.computeJobNextRunAtMs(job, now);
this.store?.jobs.push(job);
await this.persist();
@@ -173,6 +176,7 @@ export class CronService {
if (patch.state) job.state = { ...job.state, ...patch.state };
job.updatedAtMs = now;
this.assertSupportedJobSpec(job);
if (job.enabled) {
job.state.nextRunAtMs = this.computeJobNextRunAtMs(job, now);
} else {
@@ -397,14 +401,17 @@ export class CronService {
action: "finished",
status,
error: err,
summary,
runAtMs: startedAt,
durationMs: job.state.lastDurationMs,
nextRunAtMs: job.state.nextRunAtMs,
});
if (summary && job.sessionTarget === "isolated") {
if (job.sessionTarget === "isolated") {
const prefix = job.isolation?.postToMainPrefix?.trim() || "Cron";
this.deps.enqueueSystemEvent(`${prefix}: ${summary}`);
const body = (summary ?? err ?? status).trim();
const statusPrefix = status === "ok" ? prefix : `${prefix} (${status})`;
this.deps.enqueueSystemEvent(`${statusPrefix}: ${body}`);
if (job.wakeMode === "now") {
this.deps.requestReplyHeartbeatNow({ reason: `cron:${job.id}:post` });
}
@@ -413,12 +420,26 @@ export class CronService {
try {
if (job.sessionTarget === "main") {
if (job.payload.kind !== "systemEvent") {
await finish(
"skipped",
'main job requires payload.kind="systemEvent"',
);
return;
}
const text = normalizePayloadToSystemText(job.payload);
if (!text) {
await finish(
"skipped",
"main job requires non-empty systemEvent text",
);
return;
}
this.deps.enqueueSystemEvent(text);
if (job.wakeMode === "now") {
this.deps.requestReplyHeartbeatNow({ reason: `cron:${job.id}` });
}
await finish("ok");
await finish("ok", undefined, text);
return;
}
@@ -434,7 +455,7 @@ export class CronService {
if (res.status === "ok") await finish("ok", undefined, res.summary);
else if (res.status === "skipped")
await finish("skipped", undefined, res.summary);
else await finish("error", res.summary ?? "cron job failed");
else await finish("error", res.error ?? "cron job failed", res.summary);
} catch (err) {
await finish("error", String(err));
} finally {
@@ -456,4 +477,15 @@ export class CronService {
/* ignore */
}
}
private assertSupportedJobSpec(
job: Pick<CronJob, "sessionTarget" | "payload">,
) {
if (job.sessionTarget === "main" && job.payload.kind !== "systemEvent") {
throw new Error('main cron jobs require payload.kind="systemEvent"');
}
if (job.sessionTarget === "isolated" && job.payload.kind !== "agentTurn") {
throw new Error('isolated cron jobs require payload.kind="agentTurn"');
}
}
}

View File

@@ -20,7 +20,6 @@ export type CronPayload =
};
export type CronIsolation = {
postToMain?: boolean;
postToMainPrefix?: string;
};

View File

@@ -299,7 +299,6 @@ export const CronPayloadSchema = Type.Union([
export const CronIsolationSchema = Type.Object(
{
postToMain: Type.Optional(Type.Boolean()),
postToMainPrefix: Type.Optional(Type.String()),
},
{ additionalProperties: false },
@@ -411,6 +410,7 @@ export const CronRunLogEntrySchema = Type.Object(
]),
),
error: Type.Optional(Type.String()),
summary: Type.Optional(Type.String()),
runAtMs: Type.Optional(Type.Integer({ minimum: 0 })),
durationMs: Type.Optional(Type.Integer({ minimum: 0 })),
nextRunAtMs: Type.Optional(Type.Integer({ minimum: 0 })),

View File

@@ -551,10 +551,12 @@ describe("gateway server", () => {
jobId?: unknown;
action?: unknown;
status?: unknown;
summary?: unknown;
};
expect(last.action).toBe("finished");
expect(last.jobId).toBe(jobId);
expect(last.status).toBe("ok");
expect(last.summary).toBe("hello");
ws.send(
JSON.stringify({
@@ -573,6 +575,9 @@ describe("gateway server", () => {
const entries = (runsRes.payload as { entries?: unknown } | null)?.entries;
expect(Array.isArray(entries)).toBe(true);
expect((entries as Array<{ jobId?: unknown }>).at(-1)?.jobId).toBe(jobId);
expect((entries as Array<{ summary?: unknown }>).at(-1)?.summary).toBe(
"hello",
);
ws.close();
await server.close();
@@ -654,9 +659,11 @@ describe("gateway server", () => {
const last = JSON.parse(line ?? "{}") as {
jobId?: unknown;
action?: unknown;
summary?: unknown;
};
expect(last.action).toBe("finished");
expect(last.jobId).toBe(jobId);
expect(last.summary).toBe("hello");
ws.send(
JSON.stringify({
@@ -675,6 +682,9 @@ describe("gateway server", () => {
const entries = (runsRes.payload as { entries?: unknown } | null)?.entries;
expect(Array.isArray(entries)).toBe(true);
expect((entries as Array<{ jobId?: unknown }>).at(-1)?.jobId).toBe(jobId);
expect((entries as Array<{ summary?: unknown }>).at(-1)?.summary).toBe(
"hello",
);
ws.close();
await server.close();

View File

@@ -418,6 +418,7 @@ export async function startGatewayServer(
action: "finished",
status: evt.status,
error: evt.error,
summary: evt.summary,
runAtMs: evt.runAtMs,
durationMs: evt.durationMs,
nextRunAtMs: evt.nextRunAtMs,