310 lines
10 KiB
TypeScript
310 lines
10 KiB
TypeScript
import fs from "node:fs/promises";
|
|
import os from "node:os";
|
|
import path from "node:path";
|
|
import { describe, expect, test } from "vitest";
|
|
import {
|
|
connectOk,
|
|
installGatewayTestHooks,
|
|
rpcReq,
|
|
startServerWithClient,
|
|
testState,
|
|
} from "./test-helpers.js";
|
|
|
|
const decodeWsData = (data: unknown): string => {
|
|
if (typeof data === "string") return data;
|
|
if (Buffer.isBuffer(data)) return data.toString("utf-8");
|
|
if (Array.isArray(data)) return Buffer.concat(data).toString("utf-8");
|
|
if (data instanceof ArrayBuffer) return Buffer.from(data).toString("utf-8");
|
|
if (ArrayBuffer.isView(data)) {
|
|
return Buffer.from(data.buffer, data.byteOffset, data.byteLength).toString(
|
|
"utf-8",
|
|
);
|
|
}
|
|
return "";
|
|
};
|
|
|
|
installGatewayTestHooks();
|
|
|
|
describe("gateway server cron", () => {
|
|
test("supports cron.add and cron.list", async () => {
|
|
const dir = await fs.mkdtemp(path.join(os.tmpdir(), "clawdis-gw-cron-"));
|
|
testState.cronStorePath = path.join(dir, "cron", "jobs.json");
|
|
await fs.mkdir(path.dirname(testState.cronStorePath), { recursive: true });
|
|
await fs.writeFile(
|
|
testState.cronStorePath,
|
|
JSON.stringify({ version: 1, jobs: [] }),
|
|
);
|
|
|
|
const { server, ws } = await startServerWithClient();
|
|
await connectOk(ws);
|
|
|
|
const addRes = await rpcReq(ws, "cron.add", {
|
|
name: "daily",
|
|
enabled: true,
|
|
schedule: { kind: "every", everyMs: 60_000 },
|
|
sessionTarget: "main",
|
|
wakeMode: "next-heartbeat",
|
|
payload: { kind: "systemEvent", text: "hello" },
|
|
});
|
|
expect(addRes.ok).toBe(true);
|
|
expect(typeof (addRes.payload as { id?: unknown } | null)?.id).toBe(
|
|
"string",
|
|
);
|
|
|
|
const listRes = await rpcReq(ws, "cron.list", {
|
|
includeDisabled: true,
|
|
});
|
|
expect(listRes.ok).toBe(true);
|
|
const jobs = (listRes.payload as { jobs?: unknown } | null)?.jobs;
|
|
expect(Array.isArray(jobs)).toBe(true);
|
|
expect((jobs as unknown[]).length).toBe(1);
|
|
expect(((jobs as Array<{ name?: unknown }>)[0]?.name as string) ?? "").toBe(
|
|
"daily",
|
|
);
|
|
|
|
ws.close();
|
|
await server.close();
|
|
await fs.rm(dir, { recursive: true, force: true });
|
|
testState.cronStorePath = undefined;
|
|
});
|
|
|
|
test("writes cron run history to runs/<jobId>.jsonl", async () => {
|
|
const dir = await fs.mkdtemp(
|
|
path.join(os.tmpdir(), "clawdis-gw-cron-log-"),
|
|
);
|
|
testState.cronStorePath = path.join(dir, "cron", "jobs.json");
|
|
await fs.mkdir(path.dirname(testState.cronStorePath), { recursive: true });
|
|
await fs.writeFile(
|
|
testState.cronStorePath,
|
|
JSON.stringify({ version: 1, jobs: [] }),
|
|
);
|
|
|
|
const { server, ws } = await startServerWithClient();
|
|
await connectOk(ws);
|
|
|
|
const atMs = Date.now() - 1;
|
|
const addRes = await rpcReq(ws, "cron.add", {
|
|
name: "log test",
|
|
enabled: true,
|
|
schedule: { kind: "at", atMs },
|
|
sessionTarget: "main",
|
|
wakeMode: "next-heartbeat",
|
|
payload: { kind: "systemEvent", text: "hello" },
|
|
});
|
|
expect(addRes.ok).toBe(true);
|
|
const jobIdValue = (addRes.payload as { id?: unknown } | null)?.id;
|
|
const jobId = typeof jobIdValue === "string" ? jobIdValue : "";
|
|
expect(jobId.length > 0).toBe(true);
|
|
|
|
const runRes = await rpcReq(ws, "cron.run", { id: jobId, mode: "force" });
|
|
expect(runRes.ok).toBe(true);
|
|
|
|
const logPath = path.join(dir, "cron", "runs", `${jobId}.jsonl`);
|
|
const waitForLog = async () => {
|
|
for (let i = 0; i < 200; i += 1) {
|
|
const raw = await fs.readFile(logPath, "utf-8").catch(() => "");
|
|
if (raw.trim().length > 0) return raw;
|
|
await new Promise((r) => setTimeout(r, 10));
|
|
}
|
|
throw new Error("timeout waiting for cron run log");
|
|
};
|
|
|
|
const raw = await waitForLog();
|
|
const line = raw
|
|
.split("\n")
|
|
.map((l) => l.trim())
|
|
.filter(Boolean)
|
|
.at(-1);
|
|
const last = JSON.parse(line ?? "{}") as {
|
|
jobId?: unknown;
|
|
action?: unknown;
|
|
status?: unknown;
|
|
summary?: unknown;
|
|
};
|
|
expect(last.action).toBe("finished");
|
|
expect(last.jobId).toBe(jobId);
|
|
expect(last.status).toBe("ok");
|
|
expect(last.summary).toBe("hello");
|
|
|
|
const runsRes = await rpcReq(ws, "cron.runs", { id: jobId, limit: 50 });
|
|
expect(runsRes.ok).toBe(true);
|
|
const entries = (runsRes.payload as { entries?: unknown } | null)?.entries;
|
|
expect(Array.isArray(entries)).toBe(true);
|
|
expect((entries as Array<{ jobId?: unknown }>).at(-1)?.jobId).toBe(jobId);
|
|
expect((entries as Array<{ summary?: unknown }>).at(-1)?.summary).toBe(
|
|
"hello",
|
|
);
|
|
|
|
ws.close();
|
|
await server.close();
|
|
await fs.rm(dir, { recursive: true, force: true });
|
|
testState.cronStorePath = undefined;
|
|
});
|
|
|
|
test("writes cron run history to per-job runs/ when store is jobs.json", async () => {
|
|
const dir = await fs.mkdtemp(
|
|
path.join(os.tmpdir(), "clawdis-gw-cron-log-jobs-"),
|
|
);
|
|
const cronDir = path.join(dir, "cron");
|
|
testState.cronStorePath = path.join(cronDir, "jobs.json");
|
|
await fs.mkdir(cronDir, { recursive: true });
|
|
await fs.writeFile(
|
|
testState.cronStorePath,
|
|
JSON.stringify({ version: 1, jobs: [] }),
|
|
);
|
|
|
|
const { server, ws } = await startServerWithClient();
|
|
await connectOk(ws);
|
|
|
|
const atMs = Date.now() - 1;
|
|
const addRes = await rpcReq(ws, "cron.add", {
|
|
name: "log test (jobs.json)",
|
|
enabled: true,
|
|
schedule: { kind: "at", atMs },
|
|
sessionTarget: "main",
|
|
wakeMode: "next-heartbeat",
|
|
payload: { kind: "systemEvent", text: "hello" },
|
|
});
|
|
|
|
expect(addRes.ok).toBe(true);
|
|
const jobIdValue = (addRes.payload as { id?: unknown } | null)?.id;
|
|
const jobId = typeof jobIdValue === "string" ? jobIdValue : "";
|
|
expect(jobId.length > 0).toBe(true);
|
|
|
|
const runRes = await rpcReq(ws, "cron.run", { id: jobId, mode: "force" });
|
|
expect(runRes.ok).toBe(true);
|
|
|
|
const logPath = path.join(cronDir, "runs", `${jobId}.jsonl`);
|
|
const waitForLog = async () => {
|
|
for (let i = 0; i < 200; i += 1) {
|
|
const raw = await fs.readFile(logPath, "utf-8").catch(() => "");
|
|
if (raw.trim().length > 0) return raw;
|
|
await new Promise((r) => setTimeout(r, 10));
|
|
}
|
|
throw new Error("timeout waiting for per-job cron run log");
|
|
};
|
|
|
|
const raw = await waitForLog();
|
|
const line = raw
|
|
.split("\n")
|
|
.map((l) => l.trim())
|
|
.filter(Boolean)
|
|
.at(-1);
|
|
const last = JSON.parse(line ?? "{}") as {
|
|
jobId?: unknown;
|
|
action?: unknown;
|
|
summary?: unknown;
|
|
};
|
|
expect(last.action).toBe("finished");
|
|
expect(last.jobId).toBe(jobId);
|
|
expect(last.summary).toBe("hello");
|
|
|
|
const runsRes = await rpcReq(ws, "cron.runs", { id: jobId, limit: 20 });
|
|
expect(runsRes.ok).toBe(true);
|
|
const entries = (runsRes.payload as { entries?: unknown } | null)?.entries;
|
|
expect(Array.isArray(entries)).toBe(true);
|
|
expect((entries as Array<{ jobId?: unknown }>).at(-1)?.jobId).toBe(jobId);
|
|
expect((entries as Array<{ summary?: unknown }>).at(-1)?.summary).toBe(
|
|
"hello",
|
|
);
|
|
|
|
ws.close();
|
|
await server.close();
|
|
await fs.rm(dir, { recursive: true, force: true });
|
|
testState.cronStorePath = undefined;
|
|
});
|
|
|
|
test("enables cron scheduler by default and runs due jobs automatically", async () => {
|
|
const dir = await fs.mkdtemp(
|
|
path.join(os.tmpdir(), "clawdis-gw-cron-default-on-"),
|
|
);
|
|
testState.cronStorePath = path.join(dir, "cron", "jobs.json");
|
|
testState.cronEnabled = undefined;
|
|
|
|
try {
|
|
await fs.mkdir(path.dirname(testState.cronStorePath), {
|
|
recursive: true,
|
|
});
|
|
await fs.writeFile(
|
|
testState.cronStorePath,
|
|
JSON.stringify({ version: 1, jobs: [] }),
|
|
);
|
|
|
|
const { server, ws } = await startServerWithClient();
|
|
await connectOk(ws);
|
|
|
|
const statusRes = await rpcReq(ws, "cron.status", {});
|
|
expect(statusRes.ok).toBe(true);
|
|
const statusPayload = statusRes.payload as
|
|
| { enabled?: unknown; storePath?: unknown }
|
|
| undefined;
|
|
expect(statusPayload?.enabled).toBe(true);
|
|
const storePath =
|
|
typeof statusPayload?.storePath === "string"
|
|
? statusPayload.storePath
|
|
: "";
|
|
expect(storePath).toContain("jobs.json");
|
|
|
|
const atMs = Date.now() + 80;
|
|
const addRes = await rpcReq(ws, "cron.add", {
|
|
name: "auto run test",
|
|
enabled: true,
|
|
schedule: { kind: "at", atMs },
|
|
sessionTarget: "main",
|
|
wakeMode: "next-heartbeat",
|
|
payload: { kind: "systemEvent", text: "auto" },
|
|
});
|
|
expect(addRes.ok).toBe(true);
|
|
const jobIdValue = (addRes.payload as { id?: unknown } | null)?.id;
|
|
const jobId = typeof jobIdValue === "string" ? jobIdValue : "";
|
|
expect(jobId.length > 0).toBe(true);
|
|
|
|
const finishedEvt = await new Promise<{
|
|
type: "event";
|
|
event: string;
|
|
payload?: { jobId?: string; action?: string; status?: string } | null;
|
|
}>((resolve) => {
|
|
const timeout = setTimeout(() => resolve(null as never), 8000);
|
|
ws.on("message", (data) => {
|
|
const obj = JSON.parse(decodeWsData(data));
|
|
if (
|
|
obj.type === "event" &&
|
|
obj.event === "cron" &&
|
|
obj.payload?.jobId === jobId &&
|
|
obj.payload?.action === "finished"
|
|
) {
|
|
clearTimeout(timeout);
|
|
resolve(obj);
|
|
}
|
|
});
|
|
});
|
|
expect(finishedEvt.payload?.status).toBe("ok");
|
|
|
|
const waitForRuns = async () => {
|
|
for (let i = 0; i < 200; i += 1) {
|
|
const runsRes = await rpcReq(ws, "cron.runs", {
|
|
id: jobId,
|
|
limit: 10,
|
|
});
|
|
expect(runsRes.ok).toBe(true);
|
|
const entries = (runsRes.payload as { entries?: unknown } | null)
|
|
?.entries;
|
|
if (Array.isArray(entries) && entries.length > 0) return entries;
|
|
await new Promise((r) => setTimeout(r, 10));
|
|
}
|
|
throw new Error("timeout waiting for cron.runs entries");
|
|
};
|
|
|
|
const entries = (await waitForRuns()) as Array<{ jobId?: unknown }>;
|
|
expect(entries.at(-1)?.jobId).toBe(jobId);
|
|
|
|
ws.close();
|
|
await server.close();
|
|
} finally {
|
|
testState.cronEnabled = false;
|
|
testState.cronStorePath = undefined;
|
|
await fs.rm(dir, { recursive: true, force: true });
|
|
}
|
|
});
|
|
});
|