fix: require explicit system event session keys
This commit is contained in:
@@ -1,3 +1,5 @@
|
||||
import { loadConfig } from "../../config/config.js";
|
||||
import { resolveMainSessionKey } from "../../config/sessions.js";
|
||||
import { getLastHeartbeatEvent } from "../../infra/heartbeat-events.js";
|
||||
import { setHeartbeatsEnabled } from "../../infra/heartbeat-runner.js";
|
||||
import {
|
||||
@@ -45,6 +47,7 @@ export const systemHandlers: GatewayRequestHandlers = {
|
||||
);
|
||||
return;
|
||||
}
|
||||
const sessionKey = resolveMainSessionKey(loadConfig());
|
||||
const instanceId =
|
||||
typeof params.instanceId === "string" ? params.instanceId : undefined;
|
||||
const host = typeof params.host === "string" ? params.host : undefined;
|
||||
@@ -107,7 +110,10 @@ export const systemHandlers: GatewayRequestHandlers = {
|
||||
modeChanged ||
|
||||
reasonChanged;
|
||||
if (hasChanges) {
|
||||
const contextChanged = isSystemEventContextChanged(presenceUpdate.key);
|
||||
const contextChanged = isSystemEventContextChanged(
|
||||
sessionKey,
|
||||
presenceUpdate.key,
|
||||
);
|
||||
const parts: string[] = [];
|
||||
if (contextChanged || hostChanged || ipChanged) {
|
||||
const hostLabel = next.host?.trim() || "Unknown";
|
||||
@@ -126,12 +132,13 @@ export const systemHandlers: GatewayRequestHandlers = {
|
||||
const deltaText = parts.join(" · ");
|
||||
if (deltaText) {
|
||||
enqueueSystemEvent(deltaText, {
|
||||
sessionKey,
|
||||
contextKey: presenceUpdate.key,
|
||||
});
|
||||
}
|
||||
}
|
||||
} else {
|
||||
enqueueSystemEvent(text);
|
||||
enqueueSystemEvent(text, { sessionKey });
|
||||
}
|
||||
const nextPresenceVersion = context.incrementPresenceVersion();
|
||||
context.broadcast(
|
||||
|
||||
@@ -8,6 +8,7 @@ import {
|
||||
rpcReq,
|
||||
startServerWithClient,
|
||||
testState,
|
||||
waitForSystemEvent,
|
||||
} from "./test-helpers.js";
|
||||
|
||||
installGatewayTestHooks();
|
||||
@@ -55,6 +56,48 @@ describe("gateway server cron", () => {
|
||||
testState.cronStorePath = undefined;
|
||||
});
|
||||
|
||||
test("enqueues main cron system events to the resolved main session key", async () => {
|
||||
const dir = await fs.mkdtemp(path.join(os.tmpdir(), "clawdbot-gw-cron-"));
|
||||
testState.cronStorePath = path.join(dir, "cron", "jobs.json");
|
||||
testState.sessionConfig = { mainKey: "primary" };
|
||||
await fs.mkdir(path.dirname(testState.cronStorePath), { recursive: true });
|
||||
await fs.writeFile(
|
||||
testState.cronStorePath,
|
||||
JSON.stringify({ version: 1, jobs: [] }),
|
||||
);
|
||||
|
||||
const { server, ws } = await startServerWithClient();
|
||||
await connectOk(ws);
|
||||
|
||||
const atMs = Date.now() - 1;
|
||||
const addRes = await rpcReq(ws, "cron.add", {
|
||||
name: "route test",
|
||||
enabled: true,
|
||||
schedule: { kind: "at", atMs },
|
||||
sessionTarget: "main",
|
||||
wakeMode: "next-heartbeat",
|
||||
payload: { kind: "systemEvent", text: "cron route check" },
|
||||
});
|
||||
expect(addRes.ok).toBe(true);
|
||||
const jobIdValue = (addRes.payload as { id?: unknown } | null)?.id;
|
||||
const jobId = typeof jobIdValue === "string" ? jobIdValue : "";
|
||||
expect(jobId.length > 0).toBe(true);
|
||||
|
||||
const runRes = await rpcReq(ws, "cron.run", { id: jobId, mode: "force" });
|
||||
expect(runRes.ok).toBe(true);
|
||||
|
||||
const events = await waitForSystemEvent();
|
||||
expect(events.some((event) => event.includes("cron route check"))).toBe(
|
||||
true,
|
||||
);
|
||||
|
||||
ws.close();
|
||||
await server.close();
|
||||
await fs.rm(dir, { recursive: true, force: true });
|
||||
testState.cronStorePath = undefined;
|
||||
testState.sessionConfig = undefined;
|
||||
});
|
||||
|
||||
test("normalizes wrapped cron.add payloads", async () => {
|
||||
const dir = await fs.mkdtemp(path.join(os.tmpdir(), "clawdbot-gw-cron-"));
|
||||
testState.cronStorePath = path.join(dir, "cron", "jobs.json");
|
||||
|
||||
@@ -1,4 +1,6 @@
|
||||
import { describe, expect, test } from "vitest";
|
||||
import { loadConfig } from "../config/config.js";
|
||||
import { resolveMainSessionKey } from "../config/sessions.js";
|
||||
import { drainSystemEvents, peekSystemEvents } from "../infra/system-events.js";
|
||||
import {
|
||||
cronIsolatedRun,
|
||||
@@ -11,6 +13,8 @@ import {
|
||||
|
||||
installGatewayTestHooks();
|
||||
|
||||
const resolveMainKey = () => resolveMainSessionKey(loadConfig());
|
||||
|
||||
describe("gateway server hooks", () => {
|
||||
test("hooks wake requires auth", async () => {
|
||||
testState.hooksConfig = { enabled: true, token: "hook-secret" };
|
||||
@@ -40,7 +44,7 @@ describe("gateway server hooks", () => {
|
||||
expect(res.status).toBe(200);
|
||||
const events = await waitForSystemEvent();
|
||||
expect(events.some((e) => e.includes("Ping"))).toBe(true);
|
||||
drainSystemEvents();
|
||||
drainSystemEvents(resolveMainKey());
|
||||
await server.close();
|
||||
});
|
||||
|
||||
@@ -63,7 +67,7 @@ describe("gateway server hooks", () => {
|
||||
expect(res.status).toBe(202);
|
||||
const events = await waitForSystemEvent();
|
||||
expect(events.some((e) => e.includes("Hook Email: done"))).toBe(true);
|
||||
drainSystemEvents();
|
||||
drainSystemEvents(resolveMainKey());
|
||||
await server.close();
|
||||
});
|
||||
|
||||
@@ -94,7 +98,7 @@ describe("gateway server hooks", () => {
|
||||
job?: { payload?: { model?: string } };
|
||||
};
|
||||
expect(call?.job?.payload?.model).toBe("openai/gpt-4.1-mini");
|
||||
drainSystemEvents();
|
||||
drainSystemEvents(resolveMainKey());
|
||||
await server.close();
|
||||
});
|
||||
|
||||
@@ -113,7 +117,7 @@ describe("gateway server hooks", () => {
|
||||
expect(res.status).toBe(200);
|
||||
const events = await waitForSystemEvent();
|
||||
expect(events.some((e) => e.includes("Query auth"))).toBe(true);
|
||||
drainSystemEvents();
|
||||
drainSystemEvents(resolveMainKey());
|
||||
await server.close();
|
||||
});
|
||||
|
||||
@@ -130,7 +134,7 @@ describe("gateway server hooks", () => {
|
||||
body: JSON.stringify({ message: "Nope", provider: "sms" }),
|
||||
});
|
||||
expect(res.status).toBe(400);
|
||||
expect(peekSystemEvents().length).toBe(0);
|
||||
expect(peekSystemEvents(resolveMainKey()).length).toBe(0);
|
||||
await server.close();
|
||||
});
|
||||
|
||||
@@ -149,7 +153,7 @@ describe("gateway server hooks", () => {
|
||||
expect(res.status).toBe(200);
|
||||
const events = await waitForSystemEvent();
|
||||
expect(events.some((e) => e.includes("Header auth"))).toBe(true);
|
||||
drainSystemEvents();
|
||||
drainSystemEvents(resolveMainKey());
|
||||
await server.close();
|
||||
});
|
||||
|
||||
|
||||
@@ -488,7 +488,8 @@ export async function startGatewayServer(
|
||||
text: string;
|
||||
mode: "now" | "next-heartbeat";
|
||||
}) => {
|
||||
enqueueSystemEvent(value.text);
|
||||
const sessionKey = resolveMainSessionKey(loadConfig());
|
||||
enqueueSystemEvent(value.text, { sessionKey });
|
||||
if (value.mode === "now") {
|
||||
requestHeartbeatNow({ reason: "hook:wake" });
|
||||
}
|
||||
@@ -509,6 +510,7 @@ export async function startGatewayServer(
|
||||
const sessionKey = value.sessionKey.trim()
|
||||
? value.sessionKey.trim()
|
||||
: `hook:${randomUUID()}`;
|
||||
const mainSessionKey = resolveMainSessionKey(loadConfig());
|
||||
const jobId = randomUUID();
|
||||
const now = Date.now();
|
||||
const job: CronJob = {
|
||||
@@ -551,13 +553,17 @@ export async function startGatewayServer(
|
||||
result.status === "ok"
|
||||
? `Hook ${value.name}`
|
||||
: `Hook ${value.name} (${result.status})`;
|
||||
enqueueSystemEvent(`${prefix}: ${summary}`.trim());
|
||||
enqueueSystemEvent(`${prefix}: ${summary}`.trim(), {
|
||||
sessionKey: mainSessionKey,
|
||||
});
|
||||
if (value.wakeMode === "now") {
|
||||
requestHeartbeatNow({ reason: `hook:${jobId}` });
|
||||
}
|
||||
} catch (err) {
|
||||
logHooks.warn(`hook agent failed: ${String(err)}`);
|
||||
enqueueSystemEvent(`Hook ${value.name} (error): ${String(err)}`);
|
||||
enqueueSystemEvent(`Hook ${value.name} (error): ${String(err)}`, {
|
||||
sessionKey: mainSessionKey,
|
||||
});
|
||||
if (value.wakeMode === "now") {
|
||||
requestHeartbeatNow({ reason: `hook:${jobId}:error` });
|
||||
}
|
||||
@@ -1822,7 +1828,8 @@ export async function startGatewayServer(
|
||||
const summary = summarizeRestartSentinel(payload);
|
||||
|
||||
if (!sessionKey) {
|
||||
enqueueSystemEvent(message);
|
||||
const mainSessionKey = resolveMainSessionKey(loadConfig());
|
||||
enqueueSystemEvent(message, { sessionKey: mainSessionKey });
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -1836,7 +1843,7 @@ export async function startGatewayServer(
|
||||
const provider = lastProvider ?? parsedTarget?.provider;
|
||||
const to = lastTo || parsedTarget?.to;
|
||||
if (!provider || !to) {
|
||||
enqueueSystemEvent(message);
|
||||
enqueueSystemEvent(message, { sessionKey });
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -1853,7 +1860,7 @@ export async function startGatewayServer(
|
||||
allowFrom: cfg.whatsapp?.allowFrom ?? [],
|
||||
});
|
||||
if (!resolved.ok) {
|
||||
enqueueSystemEvent(message);
|
||||
enqueueSystemEvent(message, { sessionKey });
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -1872,7 +1879,7 @@ export async function startGatewayServer(
|
||||
deps,
|
||||
);
|
||||
} catch (err) {
|
||||
enqueueSystemEvent(`${summary}\n${String(err)}`);
|
||||
enqueueSystemEvent(`${summary}\n${String(err)}`, { sessionKey });
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
@@ -4,6 +4,8 @@ import os from "node:os";
|
||||
import path from "node:path";
|
||||
import { afterEach, beforeEach, expect, vi } from "vitest";
|
||||
import { WebSocket } from "ws";
|
||||
import { loadConfig } from "../config/config.js";
|
||||
import { resolveMainSessionKey } from "../config/sessions.js";
|
||||
import { resetAgentRunContextForTest } from "../infra/agent-events.js";
|
||||
import { drainSystemEvents, peekSystemEvents } from "../infra/system-events.js";
|
||||
import { rawDataToString } from "../infra/ws.js";
|
||||
@@ -373,7 +375,7 @@ export function installGatewayTestHooks() {
|
||||
embeddedRunMock.abortCalls = [];
|
||||
embeddedRunMock.waitCalls = [];
|
||||
embeddedRunMock.waitResults.clear();
|
||||
drainSystemEvents();
|
||||
drainSystemEvents(resolveMainSessionKey(loadConfig()));
|
||||
resetAgentRunContextForTest();
|
||||
const mod = await import("./server.js");
|
||||
mod.__resetModelCatalogCacheForTest();
|
||||
@@ -553,9 +555,10 @@ export async function rpcReq<T = unknown>(
|
||||
}
|
||||
|
||||
export async function waitForSystemEvent(timeoutMs = 2000) {
|
||||
const sessionKey = resolveMainSessionKey(loadConfig());
|
||||
const deadline = Date.now() + timeoutMs;
|
||||
while (Date.now() < deadline) {
|
||||
const events = peekSystemEvents();
|
||||
const events = peekSystemEvents(sessionKey);
|
||||
if (events.length > 0) return events;
|
||||
await new Promise((resolve) => setTimeout(resolve, 10));
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user