From 42a0089b3b353918e00f2aae2c0def018a5af83d Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Fri, 9 Jan 2026 21:58:55 +0100 Subject: [PATCH] fix: require explicit system event session keys --- CHANGELOG.md | 1 + src/auto-reply/reply.directive.test.ts | 6 ++- src/auto-reply/reply/commands.ts | 6 +-- src/auto-reply/reply/directive-handling.ts | 4 +- src/commands/status.test.ts | 2 + src/commands/status.ts | 4 +- src/gateway/server-methods/system.ts | 11 +++++- src/gateway/server.cron.test.ts | 43 ++++++++++++++++++++++ src/gateway/server.hooks.test.ts | 16 +++++--- src/gateway/server.ts | 21 +++++++---- src/gateway/test-helpers.ts | 7 +++- src/infra/system-events.test.ts | 21 ++++------- src/infra/system-events.ts | 33 +++++++++-------- src/web/auto-reply.ts | 1 + 14 files changed, 123 insertions(+), 53 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 7b05a71cf..d51c38094 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -51,6 +51,7 @@ - WhatsApp: make inbound media size cap configurable (default 50 MB). (#505) — thanks @koala73 - Doctor: run legacy state migrations in non-interactive mode without prompts. - Cron: parse Telegram topic targets for isolated delivery. (#478) — thanks @nachoiacovino +- Cron: enqueue main-session system events under the resolved main session key. (#510) - Outbound: default Telegram account selection for config-only tokens; remove heartbeat-specific accountId handling. (follow-up #516) — thanks @YuriNachos - Cron: allow Telegram delivery targets with topic/thread IDs (e.g. `-100…:topic:123`). (#474) — thanks @mitschabaude-bot - Heartbeat: resolve Telegram account IDs from config-only tokens; cron tool accepts canonical `jobId` and legacy `id` for job actions. (#516) — thanks @YuriNachos diff --git a/src/auto-reply/reply.directive.test.ts b/src/auto-reply/reply.directive.test.ts index 737c897e6..ea5df3030 100644 --- a/src/auto-reply/reply.directive.test.ts +++ b/src/auto-reply/reply.directive.test.ts @@ -14,6 +14,8 @@ import { import { drainSystemEvents } from "../infra/system-events.js"; import { getReplyFromConfig } from "./reply.js"; +const MAIN_SESSION_KEY = "agent:main:main"; + vi.mock("../agents/pi-embedded.js", () => ({ abortEmbeddedPiRun: vi.fn().mockReturnValue(false), runEmbeddedPiAgent: vi.fn(), @@ -1390,7 +1392,7 @@ describe("directive behavior", () => { it("queues a system event when switching models", async () => { await withTempHome(async (home) => { - drainSystemEvents(); + drainSystemEvents(MAIN_SESSION_KEY); vi.mocked(runEmbeddedPiAgent).mockReset(); const storePath = path.join(home, "sessions.json"); @@ -1412,7 +1414,7 @@ describe("directive behavior", () => { }, ); - const events = drainSystemEvents(); + const events = drainSystemEvents(MAIN_SESSION_KEY); expect(events).toContain( "Model switched to Opus (anthropic/claude-opus-4-5).", ); diff --git a/src/auto-reply/reply/commands.ts b/src/auto-reply/reply/commands.ts index c5ffd26a2..fb2f01aeb 100644 --- a/src/auto-reply/reply/commands.ts +++ b/src/auto-reply/reply/commands.ts @@ -111,7 +111,7 @@ export async function buildStatusReply(params: { cfg: ClawdbotConfig; command: CommandContext; sessionEntry?: SessionEntry; - sessionKey?: string; + sessionKey: string; sessionScope?: SessionScope; provider: string; model: string; @@ -390,7 +390,7 @@ export async function handleCommands(params: { directives: InlineDirectives; sessionEntry?: SessionEntry; sessionStore?: Record; - sessionKey?: string; + sessionKey: string; storePath?: string; sessionScope?: SessionScope; workspaceDir: string; @@ -815,7 +815,7 @@ export async function handleCommands(params: { const line = reason ? `${compactLabel}: ${reason} • ${contextSummary}` : `${compactLabel} • ${contextSummary}`; - enqueueSystemEvent(line); + enqueueSystemEvent(line, { sessionKey }); return { shouldContinue: false, reply: { text: `⚙️ ${line}` } }; } diff --git a/src/auto-reply/reply/directive-handling.ts b/src/auto-reply/reply/directive-handling.ts index 1fb3dde97..d06491509 100644 --- a/src/auto-reply/reply/directive-handling.ts +++ b/src/auto-reply/reply/directive-handling.ts @@ -447,7 +447,7 @@ export async function handleDirectiveOnly(params: { directives: InlineDirectives; sessionEntry?: SessionEntry; sessionStore?: Record; - sessionKey?: string; + sessionKey: string; storePath?: string; elevatedEnabled: boolean; elevatedAllowed: boolean; @@ -836,6 +836,7 @@ export async function handleDirectiveOnly(params: { enqueueSystemEvent( formatModelSwitchEvent(nextLabel, modelSelection.alias), { + sessionKey, contextKey: `model:${nextLabel}`, }, ); @@ -1103,6 +1104,7 @@ export async function persistInlineDirectives(params: { enqueueSystemEvent( formatModelSwitchEvent(nextLabel, resolved.alias), { + sessionKey, contextKey: `model:${nextLabel}`, }, ); diff --git a/src/commands/status.test.ts b/src/commands/status.test.ts index 2906581fa..977e085cd 100644 --- a/src/commands/status.test.ts +++ b/src/commands/status.test.ts @@ -14,6 +14,7 @@ const mocks = vi.hoisted(() => ({ systemSent: true, }, }), + resolveMainSessionKey: vi.fn().mockReturnValue("agent:main:main"), resolveStorePath: vi.fn().mockReturnValue("/tmp/sessions.json"), webAuthExists: vi.fn().mockResolvedValue(true), getWebAuthAgeMs: vi.fn().mockReturnValue(5000), @@ -23,6 +24,7 @@ const mocks = vi.hoisted(() => ({ vi.mock("../config/sessions.js", () => ({ loadSessionStore: mocks.loadSessionStore, + resolveMainSessionKey: mocks.resolveMainSessionKey, resolveStorePath: mocks.resolveStorePath, })); vi.mock("../web/session.js", () => ({ diff --git a/src/commands/status.ts b/src/commands/status.ts index 1343ae206..3aa94c451 100644 --- a/src/commands/status.ts +++ b/src/commands/status.ts @@ -9,6 +9,7 @@ import { withProgress } from "../cli/progress.js"; import { loadConfig, resolveGatewayPort } from "../config/config.js"; import { loadSessionStore, + resolveMainSessionKey, resolveStorePath, type SessionEntry, } from "../config/sessions.js"; @@ -77,7 +78,8 @@ export async function getStatusSummary(): Promise { colorize: true, includeAllowFrom: true, }); - const queuedSystemEvents = peekSystemEvents(); + const mainSessionKey = resolveMainSessionKey(cfg); + const queuedSystemEvents = peekSystemEvents(mainSessionKey); const resolved = resolveConfiguredModelRef({ cfg, diff --git a/src/gateway/server-methods/system.ts b/src/gateway/server-methods/system.ts index db9e42774..36e0290c9 100644 --- a/src/gateway/server-methods/system.ts +++ b/src/gateway/server-methods/system.ts @@ -1,3 +1,5 @@ +import { loadConfig } from "../../config/config.js"; +import { resolveMainSessionKey } from "../../config/sessions.js"; import { getLastHeartbeatEvent } from "../../infra/heartbeat-events.js"; import { setHeartbeatsEnabled } from "../../infra/heartbeat-runner.js"; import { @@ -45,6 +47,7 @@ export const systemHandlers: GatewayRequestHandlers = { ); return; } + const sessionKey = resolveMainSessionKey(loadConfig()); const instanceId = typeof params.instanceId === "string" ? params.instanceId : undefined; const host = typeof params.host === "string" ? params.host : undefined; @@ -107,7 +110,10 @@ export const systemHandlers: GatewayRequestHandlers = { modeChanged || reasonChanged; if (hasChanges) { - const contextChanged = isSystemEventContextChanged(presenceUpdate.key); + const contextChanged = isSystemEventContextChanged( + sessionKey, + presenceUpdate.key, + ); const parts: string[] = []; if (contextChanged || hostChanged || ipChanged) { const hostLabel = next.host?.trim() || "Unknown"; @@ -126,12 +132,13 @@ export const systemHandlers: GatewayRequestHandlers = { const deltaText = parts.join(" · "); if (deltaText) { enqueueSystemEvent(deltaText, { + sessionKey, contextKey: presenceUpdate.key, }); } } } else { - enqueueSystemEvent(text); + enqueueSystemEvent(text, { sessionKey }); } const nextPresenceVersion = context.incrementPresenceVersion(); context.broadcast( diff --git a/src/gateway/server.cron.test.ts b/src/gateway/server.cron.test.ts index 32e6a1793..a0de6ad45 100644 --- a/src/gateway/server.cron.test.ts +++ b/src/gateway/server.cron.test.ts @@ -8,6 +8,7 @@ import { rpcReq, startServerWithClient, testState, + waitForSystemEvent, } from "./test-helpers.js"; installGatewayTestHooks(); @@ -55,6 +56,48 @@ describe("gateway server cron", () => { testState.cronStorePath = undefined; }); + test("enqueues main cron system events to the resolved main session key", async () => { + const dir = await fs.mkdtemp(path.join(os.tmpdir(), "clawdbot-gw-cron-")); + testState.cronStorePath = path.join(dir, "cron", "jobs.json"); + testState.sessionConfig = { mainKey: "primary" }; + await fs.mkdir(path.dirname(testState.cronStorePath), { recursive: true }); + await fs.writeFile( + testState.cronStorePath, + JSON.stringify({ version: 1, jobs: [] }), + ); + + const { server, ws } = await startServerWithClient(); + await connectOk(ws); + + const atMs = Date.now() - 1; + const addRes = await rpcReq(ws, "cron.add", { + name: "route test", + enabled: true, + schedule: { kind: "at", atMs }, + sessionTarget: "main", + wakeMode: "next-heartbeat", + payload: { kind: "systemEvent", text: "cron route check" }, + }); + expect(addRes.ok).toBe(true); + const jobIdValue = (addRes.payload as { id?: unknown } | null)?.id; + const jobId = typeof jobIdValue === "string" ? jobIdValue : ""; + expect(jobId.length > 0).toBe(true); + + const runRes = await rpcReq(ws, "cron.run", { id: jobId, mode: "force" }); + expect(runRes.ok).toBe(true); + + const events = await waitForSystemEvent(); + expect(events.some((event) => event.includes("cron route check"))).toBe( + true, + ); + + ws.close(); + await server.close(); + await fs.rm(dir, { recursive: true, force: true }); + testState.cronStorePath = undefined; + testState.sessionConfig = undefined; + }); + test("normalizes wrapped cron.add payloads", async () => { const dir = await fs.mkdtemp(path.join(os.tmpdir(), "clawdbot-gw-cron-")); testState.cronStorePath = path.join(dir, "cron", "jobs.json"); diff --git a/src/gateway/server.hooks.test.ts b/src/gateway/server.hooks.test.ts index a3afebda0..4ed2e59ae 100644 --- a/src/gateway/server.hooks.test.ts +++ b/src/gateway/server.hooks.test.ts @@ -1,4 +1,6 @@ import { describe, expect, test } from "vitest"; +import { loadConfig } from "../config/config.js"; +import { resolveMainSessionKey } from "../config/sessions.js"; import { drainSystemEvents, peekSystemEvents } from "../infra/system-events.js"; import { cronIsolatedRun, @@ -11,6 +13,8 @@ import { installGatewayTestHooks(); +const resolveMainKey = () => resolveMainSessionKey(loadConfig()); + describe("gateway server hooks", () => { test("hooks wake requires auth", async () => { testState.hooksConfig = { enabled: true, token: "hook-secret" }; @@ -40,7 +44,7 @@ describe("gateway server hooks", () => { expect(res.status).toBe(200); const events = await waitForSystemEvent(); expect(events.some((e) => e.includes("Ping"))).toBe(true); - drainSystemEvents(); + drainSystemEvents(resolveMainKey()); await server.close(); }); @@ -63,7 +67,7 @@ describe("gateway server hooks", () => { expect(res.status).toBe(202); const events = await waitForSystemEvent(); expect(events.some((e) => e.includes("Hook Email: done"))).toBe(true); - drainSystemEvents(); + drainSystemEvents(resolveMainKey()); await server.close(); }); @@ -94,7 +98,7 @@ describe("gateway server hooks", () => { job?: { payload?: { model?: string } }; }; expect(call?.job?.payload?.model).toBe("openai/gpt-4.1-mini"); - drainSystemEvents(); + drainSystemEvents(resolveMainKey()); await server.close(); }); @@ -113,7 +117,7 @@ describe("gateway server hooks", () => { expect(res.status).toBe(200); const events = await waitForSystemEvent(); expect(events.some((e) => e.includes("Query auth"))).toBe(true); - drainSystemEvents(); + drainSystemEvents(resolveMainKey()); await server.close(); }); @@ -130,7 +134,7 @@ describe("gateway server hooks", () => { body: JSON.stringify({ message: "Nope", provider: "sms" }), }); expect(res.status).toBe(400); - expect(peekSystemEvents().length).toBe(0); + expect(peekSystemEvents(resolveMainKey()).length).toBe(0); await server.close(); }); @@ -149,7 +153,7 @@ describe("gateway server hooks", () => { expect(res.status).toBe(200); const events = await waitForSystemEvent(); expect(events.some((e) => e.includes("Header auth"))).toBe(true); - drainSystemEvents(); + drainSystemEvents(resolveMainKey()); await server.close(); }); diff --git a/src/gateway/server.ts b/src/gateway/server.ts index 4bd840e66..e124adb2b 100644 --- a/src/gateway/server.ts +++ b/src/gateway/server.ts @@ -488,7 +488,8 @@ export async function startGatewayServer( text: string; mode: "now" | "next-heartbeat"; }) => { - enqueueSystemEvent(value.text); + const sessionKey = resolveMainSessionKey(loadConfig()); + enqueueSystemEvent(value.text, { sessionKey }); if (value.mode === "now") { requestHeartbeatNow({ reason: "hook:wake" }); } @@ -509,6 +510,7 @@ export async function startGatewayServer( const sessionKey = value.sessionKey.trim() ? value.sessionKey.trim() : `hook:${randomUUID()}`; + const mainSessionKey = resolveMainSessionKey(loadConfig()); const jobId = randomUUID(); const now = Date.now(); const job: CronJob = { @@ -551,13 +553,17 @@ export async function startGatewayServer( result.status === "ok" ? `Hook ${value.name}` : `Hook ${value.name} (${result.status})`; - enqueueSystemEvent(`${prefix}: ${summary}`.trim()); + enqueueSystemEvent(`${prefix}: ${summary}`.trim(), { + sessionKey: mainSessionKey, + }); if (value.wakeMode === "now") { requestHeartbeatNow({ reason: `hook:${jobId}` }); } } catch (err) { logHooks.warn(`hook agent failed: ${String(err)}`); - enqueueSystemEvent(`Hook ${value.name} (error): ${String(err)}`); + enqueueSystemEvent(`Hook ${value.name} (error): ${String(err)}`, { + sessionKey: mainSessionKey, + }); if (value.wakeMode === "now") { requestHeartbeatNow({ reason: `hook:${jobId}:error` }); } @@ -1822,7 +1828,8 @@ export async function startGatewayServer( const summary = summarizeRestartSentinel(payload); if (!sessionKey) { - enqueueSystemEvent(message); + const mainSessionKey = resolveMainSessionKey(loadConfig()); + enqueueSystemEvent(message, { sessionKey: mainSessionKey }); return; } @@ -1836,7 +1843,7 @@ export async function startGatewayServer( const provider = lastProvider ?? parsedTarget?.provider; const to = lastTo || parsedTarget?.to; if (!provider || !to) { - enqueueSystemEvent(message); + enqueueSystemEvent(message, { sessionKey }); return; } @@ -1853,7 +1860,7 @@ export async function startGatewayServer( allowFrom: cfg.whatsapp?.allowFrom ?? [], }); if (!resolved.ok) { - enqueueSystemEvent(message); + enqueueSystemEvent(message, { sessionKey }); return; } @@ -1872,7 +1879,7 @@ export async function startGatewayServer( deps, ); } catch (err) { - enqueueSystemEvent(`${summary}\n${String(err)}`); + enqueueSystemEvent(`${summary}\n${String(err)}`, { sessionKey }); } }; diff --git a/src/gateway/test-helpers.ts b/src/gateway/test-helpers.ts index 2c3f19462..08a5003ed 100644 --- a/src/gateway/test-helpers.ts +++ b/src/gateway/test-helpers.ts @@ -4,6 +4,8 @@ import os from "node:os"; import path from "node:path"; import { afterEach, beforeEach, expect, vi } from "vitest"; import { WebSocket } from "ws"; +import { loadConfig } from "../config/config.js"; +import { resolveMainSessionKey } from "../config/sessions.js"; import { resetAgentRunContextForTest } from "../infra/agent-events.js"; import { drainSystemEvents, peekSystemEvents } from "../infra/system-events.js"; import { rawDataToString } from "../infra/ws.js"; @@ -373,7 +375,7 @@ export function installGatewayTestHooks() { embeddedRunMock.abortCalls = []; embeddedRunMock.waitCalls = []; embeddedRunMock.waitResults.clear(); - drainSystemEvents(); + drainSystemEvents(resolveMainSessionKey(loadConfig())); resetAgentRunContextForTest(); const mod = await import("./server.js"); mod.__resetModelCatalogCacheForTest(); @@ -553,9 +555,10 @@ export async function rpcReq( } export async function waitForSystemEvent(timeoutMs = 2000) { + const sessionKey = resolveMainSessionKey(loadConfig()); const deadline = Date.now() + timeoutMs; while (Date.now() < deadline) { - const events = peekSystemEvents(); + const events = peekSystemEvents(sessionKey); if (events.length > 0) return events; await new Promise((resolve) => setTimeout(resolve, 10)); } diff --git a/src/infra/system-events.test.ts b/src/infra/system-events.test.ts index ee3b9e200..a6d7656f2 100644 --- a/src/infra/system-events.test.ts +++ b/src/infra/system-events.test.ts @@ -2,6 +2,7 @@ import { beforeEach, describe, expect, it } from "vitest"; import { prependSystemEvents } from "../auto-reply/reply/session-updates.js"; import type { ClawdbotConfig } from "../config/config.js"; +import { resolveMainSessionKey } from "../config/sessions.js"; import { enqueueSystemEvent, peekSystemEvents, @@ -9,6 +10,7 @@ import { } from "./system-events.js"; const cfg = {} as unknown as ClawdbotConfig; +const mainKey = resolveMainSessionKey(cfg); describe("system events (session routing)", () => { beforeEach(() => { @@ -21,14 +23,14 @@ describe("system events (session routing)", () => { contextKey: "discord:reaction:added:msg:user:✅", }); - expect(peekSystemEvents()).toEqual([]); + expect(peekSystemEvents(mainKey)).toEqual([]); expect(peekSystemEvents("discord:group:123")).toEqual([ "Discord reaction added: ✅", ]); const main = await prependSystemEvents({ cfg, - sessionKey: "main", + sessionKey: mainKey, isMainSession: true, isNewSession: false, prefixedBodyBase: "hello", @@ -49,16 +51,9 @@ describe("system events (session routing)", () => { expect(peekSystemEvents("discord:group:123")).toEqual([]); }); - it("defaults system events to main", async () => { - enqueueSystemEvent("Node: Mac Studio"); - - const main = await prependSystemEvents({ - cfg, - sessionKey: "main", - isMainSession: true, - isNewSession: false, - prefixedBodyBase: "ping", - }); - expect(main).toBe("System: Node: Mac Studio\n\nping"); + it("requires an explicit session key", () => { + expect(() => + enqueueSystemEvent("Node: Mac Studio", { sessionKey: " " }), + ).toThrow("sessionKey"); }); }); diff --git a/src/infra/system-events.ts b/src/infra/system-events.ts index 5dd97bcc4..54e9e6a98 100644 --- a/src/infra/system-events.ts +++ b/src/infra/system-events.ts @@ -1,11 +1,9 @@ // Lightweight in-memory queue for human-readable system events that should be // prefixed to the next prompt. We intentionally avoid persistence to keep -// events ephemeral. Events are session-scoped; callers that don't specify a -// session key default to "main". +// events ephemeral. Events are session-scoped and require an explicit key. type SystemEvent = { text: string; ts: number }; -const DEFAULT_SESSION_KEY = "main"; const MAX_EVENTS = 20; type SessionQueue = { @@ -17,13 +15,16 @@ type SessionQueue = { const queues = new Map(); type SystemEventOptions = { + sessionKey: string; contextKey?: string | null; - sessionKey?: string | null; }; -function normalizeSessionKey(key?: string | null): string { +function requireSessionKey(key?: string | null): string { const trimmed = typeof key === "string" ? key.trim() : ""; - return trimmed || DEFAULT_SESSION_KEY; + if (!trimmed) { + throw new Error("system events require a sessionKey"); + } + return trimmed; } function normalizeContextKey(key?: string | null): string | null { @@ -34,17 +35,17 @@ function normalizeContextKey(key?: string | null): string | null { } export function isSystemEventContextChanged( + sessionKey: string, contextKey?: string | null, - sessionKey?: string | null, ): boolean { - const key = normalizeSessionKey(sessionKey); + const key = requireSessionKey(sessionKey); const existing = queues.get(key); const normalized = normalizeContextKey(contextKey); return normalized !== (existing?.lastContextKey ?? null); } -export function enqueueSystemEvent(text: string, options?: SystemEventOptions) { - const key = normalizeSessionKey(options?.sessionKey); +export function enqueueSystemEvent(text: string, options: SystemEventOptions) { + const key = requireSessionKey(options?.sessionKey); const entry = queues.get(key) ?? (() => { @@ -65,8 +66,8 @@ export function enqueueSystemEvent(text: string, options?: SystemEventOptions) { if (entry.queue.length > MAX_EVENTS) entry.queue.shift(); } -export function drainSystemEvents(sessionKey?: string | null): string[] { - const key = normalizeSessionKey(sessionKey); +export function drainSystemEvents(sessionKey: string): string[] { + const key = requireSessionKey(sessionKey); const entry = queues.get(key); if (!entry || entry.queue.length === 0) return []; const out = entry.queue.map((e) => e.text); @@ -77,13 +78,13 @@ export function drainSystemEvents(sessionKey?: string | null): string[] { return out; } -export function peekSystemEvents(sessionKey?: string | null): string[] { - const key = normalizeSessionKey(sessionKey); +export function peekSystemEvents(sessionKey: string): string[] { + const key = requireSessionKey(sessionKey); return queues.get(key)?.queue.map((e) => e.text) ?? []; } -export function hasSystemEvents(sessionKey?: string | null) { - const key = normalizeSessionKey(sessionKey); +export function hasSystemEvents(sessionKey: string) { + const key = requireSessionKey(sessionKey); return (queues.get(key)?.queue.length ?? 0) > 0; } diff --git a/src/web/auto-reply.ts b/src/web/auto-reply.ts index 02c708d2f..cd8e54f66 100644 --- a/src/web/auto-reply.ts +++ b/src/web/auto-reply.ts @@ -1781,6 +1781,7 @@ export async function monitorWebProvider( enqueueSystemEvent( `WhatsApp gateway disconnected (status ${statusCode ?? "unknown"})`, + { sessionKey: connectRoute.sessionKey }, ); if (loggedOut) {