From 1657c5e3d2d7628438063ff1f4a468842b706c17 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Sun, 4 Jan 2026 22:11:04 +0100 Subject: [PATCH] fix: route system events per session --- CHANGELOG.md | 1 + src/auto-reply/reply.ts | 1 + src/auto-reply/reply/session-updates.ts | 7 +- src/discord/monitor.ts | 34 +++++++- src/infra/system-events.test.ts | 64 +++++++++++++++ src/infra/system-events.ts | 78 +++++++++++++----- src/slack/monitor.ts | 102 ++++++++++++++++++++++-- 7 files changed, 256 insertions(+), 31 deletions(-) create mode 100644 src/infra/system-events.test.ts diff --git a/CHANGELOG.md b/CHANGELOG.md index 7aa9f71d4..b8486ef9a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,7 @@ - Cron tool passes `id` to the gateway for update/remove/run/runs (keeps `jobId` input). (#180) — thanks @adamgall - macOS: treat location permission as always-only to avoid iOS-only enums. (#165) — thanks @Nachx639 - Onboarding: when running from source, auto-build missing Control UI assets (`pnpm ui:build`). +- Discord/Slack: route reaction + system notifications to the correct session (no main-session bleed). ## 2026.1.5 diff --git a/src/auto-reply/reply.ts b/src/auto-reply/reply.ts index 2ac40c61f..366066e59 100644 --- a/src/auto-reply/reply.ts +++ b/src/auto-reply/reply.ts @@ -515,6 +515,7 @@ export async function getReplyFromConfig( !isGroupSession && sessionKey === (sessionCfg?.mainKey ?? "main"); prefixedBodyBase = await prependSystemEvents({ cfg, + sessionKey, isMainSession, isNewSession, prefixedBodyBase, diff --git a/src/auto-reply/reply/session-updates.ts b/src/auto-reply/reply/session-updates.ts index 1f1033968..23e1a8777 100644 --- a/src/auto-reply/reply/session-updates.ts +++ b/src/auto-reply/reply/session-updates.ts @@ -8,12 +8,11 @@ import { drainSystemEvents } from "../../infra/system-events.js"; export async function prependSystemEvents(params: { cfg: ClawdbotConfig; + sessionKey: string; isMainSession: boolean; isNewSession: boolean; prefixedBodyBase: string; }): Promise { - if (!params.isMainSession) return params.prefixedBodyBase; - const compactSystemEvent = (line: string): string | null => { const trimmed = line.trim(); if (!trimmed) return null; @@ -27,11 +26,11 @@ export async function prependSystemEvents(params: { }; const systemLines: string[] = []; - const queued = drainSystemEvents(); + const queued = drainSystemEvents(params.sessionKey); systemLines.push( ...queued.map(compactSystemEvent).filter((v): v is string => Boolean(v)), ); - if (params.isNewSession) { + if (params.isMainSession && params.isNewSession) { const summary = await buildProviderSummary(params.cfg); if (summary.length > 0) systemLines.unshift(...summary); } diff --git a/src/discord/monitor.ts b/src/discord/monitor.ts index 663418cc7..95b07049f 100644 --- a/src/discord/monitor.ts +++ b/src/discord/monitor.ts @@ -29,7 +29,11 @@ import type { ReplyToMode, } from "../config/config.js"; import { loadConfig } from "../config/config.js"; -import { resolveStorePath, updateLastRoute } from "../config/sessions.js"; +import { + resolveSessionKey, + resolveStorePath, + updateLastRoute, +} from "../config/sessions.js"; import { danger, logVerbose, shouldLogVerbose, warn } from "../globals.js"; import { enqueueSystemEvent } from "../infra/system-events.js"; import { getChildLogger } from "../logging.js"; @@ -356,7 +360,22 @@ export async function monitorDiscordProvider(opts: MonitorDiscordOpts = {}) { const systemText = resolveDiscordSystemEvent(message); if (systemText) { + const sessionCfg = cfg.session; + const sessionScope = sessionCfg?.scope ?? "per-sender"; + const mainKey = (sessionCfg?.mainKey ?? "main").trim() || "main"; + const sessionKey = resolveSessionKey( + sessionScope, + { + From: isDirectMessage + ? `discord:${message.author.id}` + : `group:${message.channelId}`, + ChatType: isDirectMessage ? "direct" : "group", + Surface: "discord", + }, + mainKey, + ); enqueueSystemEvent(systemText, { + sessionKey, contextKey: `discord:system:${message.channelId}:${message.id}`, }); return; @@ -645,7 +664,20 @@ export async function monitorDiscordProvider(opts: MonitorDiscordOpts = {}) { const authorLabel = message.author?.tag ?? message.author?.username; const baseText = `Discord reaction ${action}: ${emojiLabel} by ${actorLabel} on ${guildSlug} ${channelLabel} msg ${message.id}`; const text = authorLabel ? `${baseText} from ${authorLabel}` : baseText; + const sessionCfg = cfg.session; + const sessionScope = sessionCfg?.scope ?? "per-sender"; + const mainKey = (sessionCfg?.mainKey ?? "main").trim() || "main"; + const sessionKey = resolveSessionKey( + sessionScope, + { + From: `group:${message.channelId}`, + ChatType: "group", + Surface: "discord", + }, + mainKey, + ); enqueueSystemEvent(text, { + sessionKey, contextKey: `discord:reaction:${action}:${message.id}:${user.id}:${emojiLabel}`, }); } catch (err) { diff --git a/src/infra/system-events.test.ts b/src/infra/system-events.test.ts new file mode 100644 index 000000000..ee3b9e200 --- /dev/null +++ b/src/infra/system-events.test.ts @@ -0,0 +1,64 @@ +import { beforeEach, describe, expect, it } from "vitest"; + +import { prependSystemEvents } from "../auto-reply/reply/session-updates.js"; +import type { ClawdbotConfig } from "../config/config.js"; +import { + enqueueSystemEvent, + peekSystemEvents, + resetSystemEventsForTest, +} from "./system-events.js"; + +const cfg = {} as unknown as ClawdbotConfig; + +describe("system events (session routing)", () => { + beforeEach(() => { + resetSystemEventsForTest(); + }); + + it("does not leak session-scoped events into main", async () => { + enqueueSystemEvent("Discord reaction added: ✅", { + sessionKey: "discord:group:123", + contextKey: "discord:reaction:added:msg:user:✅", + }); + + expect(peekSystemEvents()).toEqual([]); + expect(peekSystemEvents("discord:group:123")).toEqual([ + "Discord reaction added: ✅", + ]); + + const main = await prependSystemEvents({ + cfg, + sessionKey: "main", + isMainSession: true, + isNewSession: false, + prefixedBodyBase: "hello", + }); + expect(main).toBe("hello"); + expect(peekSystemEvents("discord:group:123")).toEqual([ + "Discord reaction added: ✅", + ]); + + const discord = await prependSystemEvents({ + cfg, + sessionKey: "discord:group:123", + isMainSession: false, + isNewSession: false, + prefixedBodyBase: "hi", + }); + expect(discord).toBe("System: Discord reaction added: ✅\n\nhi"); + expect(peekSystemEvents("discord:group:123")).toEqual([]); + }); + + it("defaults system events to main", async () => { + enqueueSystemEvent("Node: Mac Studio"); + + const main = await prependSystemEvents({ + cfg, + sessionKey: "main", + isMainSession: true, + isNewSession: false, + prefixedBodyBase: "ping", + }); + expect(main).toBe("System: Node: Mac Studio\n\nping"); + }); +}); diff --git a/src/infra/system-events.ts b/src/infra/system-events.ts index a2eef7d75..5dd97bcc4 100644 --- a/src/infra/system-events.ts +++ b/src/infra/system-events.ts @@ -1,18 +1,31 @@ // Lightweight in-memory queue for human-readable system events that should be -// prefixed to the next main-session prompt/heartbeat. We intentionally avoid -// persistence to keep events ephemeral. +// prefixed to the next prompt. We intentionally avoid persistence to keep +// events ephemeral. Events are session-scoped; callers that don't specify a +// session key default to "main". type SystemEvent = { text: string; ts: number }; +const DEFAULT_SESSION_KEY = "main"; const MAX_EVENTS = 20; -const queue: SystemEvent[] = []; -let lastText: string | null = null; -let lastContextKey: string | null = null; + +type SessionQueue = { + queue: SystemEvent[]; + lastText: string | null; + lastContextKey: string | null; +}; + +const queues = new Map(); type SystemEventOptions = { contextKey?: string | null; + sessionKey?: string | null; }; +function normalizeSessionKey(key?: string | null): string { + const trimmed = typeof key === "string" ? key.trim() : ""; + return trimmed || DEFAULT_SESSION_KEY; +} + function normalizeContextKey(key?: string | null): string | null { if (!key) return null; const trimmed = key.trim(); @@ -22,33 +35,58 @@ function normalizeContextKey(key?: string | null): string | null { export function isSystemEventContextChanged( contextKey?: string | null, + sessionKey?: string | null, ): boolean { + const key = normalizeSessionKey(sessionKey); + const existing = queues.get(key); const normalized = normalizeContextKey(contextKey); - return normalized !== lastContextKey; + return normalized !== (existing?.lastContextKey ?? null); } export function enqueueSystemEvent(text: string, options?: SystemEventOptions) { + const key = normalizeSessionKey(options?.sessionKey); + const entry = + queues.get(key) ?? + (() => { + const created: SessionQueue = { + queue: [], + lastText: null, + lastContextKey: null, + }; + queues.set(key, created); + return created; + })(); const cleaned = text.trim(); if (!cleaned) return; - lastContextKey = normalizeContextKey(options?.contextKey); - if (lastText === cleaned) return; // skip consecutive duplicates - lastText = cleaned; - queue.push({ text: cleaned, ts: Date.now() }); - if (queue.length > MAX_EVENTS) queue.shift(); + entry.lastContextKey = normalizeContextKey(options?.contextKey); + if (entry.lastText === cleaned) return; // skip consecutive duplicates + entry.lastText = cleaned; + entry.queue.push({ text: cleaned, ts: Date.now() }); + if (entry.queue.length > MAX_EVENTS) entry.queue.shift(); } -export function drainSystemEvents(): string[] { - const out = queue.map((e) => e.text); - queue.length = 0; - lastText = null; - lastContextKey = null; +export function drainSystemEvents(sessionKey?: string | null): string[] { + const key = normalizeSessionKey(sessionKey); + const entry = queues.get(key); + if (!entry || entry.queue.length === 0) return []; + const out = entry.queue.map((e) => e.text); + entry.queue.length = 0; + entry.lastText = null; + entry.lastContextKey = null; + queues.delete(key); return out; } -export function peekSystemEvents(): string[] { - return queue.map((e) => e.text); +export function peekSystemEvents(sessionKey?: string | null): string[] { + const key = normalizeSessionKey(sessionKey); + return queues.get(key)?.queue.map((e) => e.text) ?? []; } -export function hasSystemEvents() { - return queue.length > 0; +export function hasSystemEvents(sessionKey?: string | null) { + const key = normalizeSessionKey(sessionKey); + return (queues.get(key)?.queue.length ?? 0) > 0; +} + +export function resetSystemEventsForTest() { + queues.clear(); } diff --git a/src/slack/monitor.ts b/src/slack/monitor.ts index f4b0afd86..b34ee2851 100644 --- a/src/slack/monitor.ts +++ b/src/slack/monitor.ts @@ -14,7 +14,11 @@ import type { SlackSlashCommandConfig, } from "../config/config.js"; import { loadConfig } from "../config/config.js"; -import { resolveStorePath, updateLastRoute } from "../config/sessions.js"; +import { + resolveSessionKey, + resolveStorePath, + updateLastRoute, +} from "../config/sessions.js"; import { danger, logVerbose, shouldLogVerbose } from "../globals.js"; import { enqueueSystemEvent } from "../infra/system-events.js"; import { getChildLogger } from "../logging.js"; @@ -311,6 +315,31 @@ async function resolveSlackMedia(params: { export async function monitorSlackProvider(opts: MonitorSlackOpts = {}) { const cfg = loadConfig(); + const sessionCfg = cfg.session; + const sessionScope = sessionCfg?.scope ?? "per-sender"; + const mainKey = (sessionCfg?.mainKey ?? "main").trim() || "main"; + + const resolveSlackSystemEventSessionKey = (params: { + channelId?: string | null; + channelType?: string | null; + }) => { + const channelId = params.channelId?.trim() ?? ""; + if (!channelId) return mainKey; + const channelType = params.channelType?.trim().toLowerCase() ?? ""; + const isRoom = channelType === "channel" || channelType === "group"; + const isGroup = channelType === "mpim"; + const from = isRoom + ? `slack:channel:${channelId}` + : isGroup + ? `slack:group:${channelId}` + : `slack:${channelId}`; + const chatType = isRoom ? "room" : isGroup ? "group" : "direct"; + return resolveSessionKey( + sessionScope, + { From: from, ChatType: chatType, Surface: "slack" }, + mainKey, + ); + }; const botToken = resolveSlackBotToken( opts.botToken ?? process.env.SLACK_BOT_TOKEN ?? @@ -576,7 +605,22 @@ export async function monitorSlackProvider(opts: MonitorSlackOpts = {}) { const inboundLabel = isDirectMessage ? `Slack DM from ${senderName}` : `Slack message in ${roomLabel} from ${senderName}`; + const slackFrom = isDirectMessage + ? `slack:${message.user}` + : isRoom + ? `slack:channel:${message.channel}` + : `slack:group:${message.channel}`; + const sessionKey = resolveSessionKey( + sessionScope, + { + From: slackFrom, + ChatType: isDirectMessage ? "direct" : isRoom ? "room" : "group", + Surface: "slack", + }, + mainKey, + ); enqueueSystemEvent(`${inboundLabel}: ${preview}`, { + sessionKey, contextKey: `slack:message:${message.channel}:${message.ts ?? "unknown"}`, }); @@ -591,11 +635,7 @@ export async function monitorSlackProvider(opts: MonitorSlackOpts = {}) { const isRoomish = isRoom || isGroupDm; const ctxPayload = { Body: body, - From: isDirectMessage - ? `slack:${message.user}` - : isRoom - ? `slack:channel:${message.channel}` - : `slack:group:${message.channel}`, + From: slackFrom, To: isDirectMessage ? `user:${message.user}` : `channel:${message.channel}`, @@ -715,7 +755,12 @@ export async function monitorSlackProvider(opts: MonitorSlackOpts = {}) { channelId, channelName: channelInfo?.name, }); + const sessionKey = resolveSlackSystemEventSessionKey({ + channelId, + channelType, + }); enqueueSystemEvent(`Slack message edited in ${label}.`, { + sessionKey, contextKey: `slack:message:changed:${channelId ?? "unknown"}:${messageId ?? changed.event_ts ?? "unknown"}`, }); return; @@ -740,7 +785,12 @@ export async function monitorSlackProvider(opts: MonitorSlackOpts = {}) { channelId, channelName: channelInfo?.name, }); + const sessionKey = resolveSlackSystemEventSessionKey({ + channelId, + channelType, + }); enqueueSystemEvent(`Slack message deleted in ${label}.`, { + sessionKey, contextKey: `slack:message:deleted:${channelId ?? "unknown"}:${deleted.deleted_ts ?? deleted.event_ts ?? "unknown"}`, }); return; @@ -766,7 +816,12 @@ export async function monitorSlackProvider(opts: MonitorSlackOpts = {}) { channelName: channelInfo?.name, }); const messageId = thread.message?.ts ?? thread.event_ts; + const sessionKey = resolveSlackSystemEventSessionKey({ + channelId, + channelType, + }); enqueueSystemEvent(`Slack thread reply broadcast in ${label}.`, { + sessionKey, contextKey: `slack:thread:broadcast:${channelId ?? "unknown"}:${messageId ?? "unknown"}`, }); return; @@ -860,7 +915,12 @@ export async function monitorSlackProvider(opts: MonitorSlackOpts = {}) { const authorLabel = authorInfo?.name ?? event.item_user; const baseText = `Slack reaction ${action}: :${emojiLabel}: by ${actorLabel} in ${channelLabel} msg ${item.ts}`; const text = authorLabel ? `${baseText} from ${authorLabel}` : baseText; + const sessionKey = resolveSlackSystemEventSessionKey({ + channelId: item.channel, + channelType, + }); enqueueSystemEvent(text, { + sessionKey, contextKey: `slack:reaction:${action}:${item.channel}:${item.ts}:${event.user}:${emojiLabel}`, }); } catch (err) { @@ -909,7 +969,12 @@ export async function monitorSlackProvider(opts: MonitorSlackOpts = {}) { channelId, channelName: channelInfo?.name, }); + const sessionKey = resolveSlackSystemEventSessionKey({ + channelId, + channelType, + }); enqueueSystemEvent(`Slack: ${userLabel} joined ${label}.`, { + sessionKey, contextKey: `slack:member:joined:${channelId ?? "unknown"}:${payload.user ?? "unknown"}`, }); } catch (err) { @@ -945,7 +1010,12 @@ export async function monitorSlackProvider(opts: MonitorSlackOpts = {}) { channelId, channelName: channelInfo?.name, }); + const sessionKey = resolveSlackSystemEventSessionKey({ + channelId, + channelType, + }); enqueueSystemEvent(`Slack: ${userLabel} left ${label}.`, { + sessionKey, contextKey: `slack:member:left:${channelId ?? "unknown"}:${payload.user ?? "unknown"}`, }); } catch (err) { @@ -971,7 +1041,12 @@ export async function monitorSlackProvider(opts: MonitorSlackOpts = {}) { return; } const label = resolveSlackChannelLabel({ channelId, channelName }); + const sessionKey = resolveSlackSystemEventSessionKey({ + channelId, + channelType: "channel", + }); enqueueSystemEvent(`Slack channel created: ${label}.`, { + sessionKey, contextKey: `slack:channel:created:${channelId ?? channelName ?? "unknown"}`, }); } catch (err) { @@ -1000,7 +1075,12 @@ export async function monitorSlackProvider(opts: MonitorSlackOpts = {}) { return; } const label = resolveSlackChannelLabel({ channelId, channelName }); + const sessionKey = resolveSlackSystemEventSessionKey({ + channelId, + channelType: "channel", + }); enqueueSystemEvent(`Slack channel renamed: ${label}.`, { + sessionKey, contextKey: `slack:channel:renamed:${channelId ?? channelName ?? "unknown"}`, }); } catch (err) { @@ -1039,9 +1119,14 @@ export async function monitorSlackProvider(opts: MonitorSlackOpts = {}) { const userLabel = userInfo?.name ?? payload.user ?? "someone"; const itemType = payload.item?.type ?? "item"; const messageId = payload.item?.message?.ts ?? payload.event_ts; + const sessionKey = resolveSlackSystemEventSessionKey({ + channelId, + channelType: channelInfo?.type ?? undefined, + }); enqueueSystemEvent( `Slack: ${userLabel} pinned a ${itemType} in ${label}.`, { + sessionKey, contextKey: `slack:pin:added:${channelId ?? "unknown"}:${messageId ?? "unknown"}`, }, ); @@ -1081,9 +1166,14 @@ export async function monitorSlackProvider(opts: MonitorSlackOpts = {}) { const userLabel = userInfo?.name ?? payload.user ?? "someone"; const itemType = payload.item?.type ?? "item"; const messageId = payload.item?.message?.ts ?? payload.event_ts; + const sessionKey = resolveSlackSystemEventSessionKey({ + channelId, + channelType: channelInfo?.type ?? undefined, + }); enqueueSystemEvent( `Slack: ${userLabel} unpinned a ${itemType} in ${label}.`, { + sessionKey, contextKey: `slack:pin:removed:${channelId ?? "unknown"}:${messageId ?? "unknown"}`, }, );