fix: route system events per session

This commit is contained in:
Peter Steinberger
2026-01-04 22:11:04 +01:00
parent 2ceceb8c25
commit 1657c5e3d2
7 changed files with 256 additions and 31 deletions

View File

@@ -13,6 +13,7 @@
- Cron tool passes `id` to the gateway for update/remove/run/runs (keeps `jobId` input). (#180) — thanks @adamgall - Cron tool passes `id` to the gateway for update/remove/run/runs (keeps `jobId` input). (#180) — thanks @adamgall
- macOS: treat location permission as always-only to avoid iOS-only enums. (#165) — thanks @Nachx639 - macOS: treat location permission as always-only to avoid iOS-only enums. (#165) — thanks @Nachx639
- Onboarding: when running from source, auto-build missing Control UI assets (`pnpm ui:build`). - Onboarding: when running from source, auto-build missing Control UI assets (`pnpm ui:build`).
- Discord/Slack: route reaction + system notifications to the correct session (no main-session bleed).
## 2026.1.5 ## 2026.1.5

View File

@@ -515,6 +515,7 @@ export async function getReplyFromConfig(
!isGroupSession && sessionKey === (sessionCfg?.mainKey ?? "main"); !isGroupSession && sessionKey === (sessionCfg?.mainKey ?? "main");
prefixedBodyBase = await prependSystemEvents({ prefixedBodyBase = await prependSystemEvents({
cfg, cfg,
sessionKey,
isMainSession, isMainSession,
isNewSession, isNewSession,
prefixedBodyBase, prefixedBodyBase,

View File

@@ -8,12 +8,11 @@ import { drainSystemEvents } from "../../infra/system-events.js";
export async function prependSystemEvents(params: { export async function prependSystemEvents(params: {
cfg: ClawdbotConfig; cfg: ClawdbotConfig;
sessionKey: string;
isMainSession: boolean; isMainSession: boolean;
isNewSession: boolean; isNewSession: boolean;
prefixedBodyBase: string; prefixedBodyBase: string;
}): Promise<string> { }): Promise<string> {
if (!params.isMainSession) return params.prefixedBodyBase;
const compactSystemEvent = (line: string): string | null => { const compactSystemEvent = (line: string): string | null => {
const trimmed = line.trim(); const trimmed = line.trim();
if (!trimmed) return null; if (!trimmed) return null;
@@ -27,11 +26,11 @@ export async function prependSystemEvents(params: {
}; };
const systemLines: string[] = []; const systemLines: string[] = [];
const queued = drainSystemEvents(); const queued = drainSystemEvents(params.sessionKey);
systemLines.push( systemLines.push(
...queued.map(compactSystemEvent).filter((v): v is string => Boolean(v)), ...queued.map(compactSystemEvent).filter((v): v is string => Boolean(v)),
); );
if (params.isNewSession) { if (params.isMainSession && params.isNewSession) {
const summary = await buildProviderSummary(params.cfg); const summary = await buildProviderSummary(params.cfg);
if (summary.length > 0) systemLines.unshift(...summary); if (summary.length > 0) systemLines.unshift(...summary);
} }

View File

@@ -29,7 +29,11 @@ import type {
ReplyToMode, ReplyToMode,
} from "../config/config.js"; } from "../config/config.js";
import { loadConfig } from "../config/config.js"; import { loadConfig } from "../config/config.js";
import { resolveStorePath, updateLastRoute } from "../config/sessions.js"; import {
resolveSessionKey,
resolveStorePath,
updateLastRoute,
} from "../config/sessions.js";
import { danger, logVerbose, shouldLogVerbose, warn } from "../globals.js"; import { danger, logVerbose, shouldLogVerbose, warn } from "../globals.js";
import { enqueueSystemEvent } from "../infra/system-events.js"; import { enqueueSystemEvent } from "../infra/system-events.js";
import { getChildLogger } from "../logging.js"; import { getChildLogger } from "../logging.js";
@@ -356,7 +360,22 @@ export async function monitorDiscordProvider(opts: MonitorDiscordOpts = {}) {
const systemText = resolveDiscordSystemEvent(message); const systemText = resolveDiscordSystemEvent(message);
if (systemText) { if (systemText) {
const sessionCfg = cfg.session;
const sessionScope = sessionCfg?.scope ?? "per-sender";
const mainKey = (sessionCfg?.mainKey ?? "main").trim() || "main";
const sessionKey = resolveSessionKey(
sessionScope,
{
From: isDirectMessage
? `discord:${message.author.id}`
: `group:${message.channelId}`,
ChatType: isDirectMessage ? "direct" : "group",
Surface: "discord",
},
mainKey,
);
enqueueSystemEvent(systemText, { enqueueSystemEvent(systemText, {
sessionKey,
contextKey: `discord:system:${message.channelId}:${message.id}`, contextKey: `discord:system:${message.channelId}:${message.id}`,
}); });
return; return;
@@ -645,7 +664,20 @@ export async function monitorDiscordProvider(opts: MonitorDiscordOpts = {}) {
const authorLabel = message.author?.tag ?? message.author?.username; const authorLabel = message.author?.tag ?? message.author?.username;
const baseText = `Discord reaction ${action}: ${emojiLabel} by ${actorLabel} on ${guildSlug} ${channelLabel} msg ${message.id}`; const baseText = `Discord reaction ${action}: ${emojiLabel} by ${actorLabel} on ${guildSlug} ${channelLabel} msg ${message.id}`;
const text = authorLabel ? `${baseText} from ${authorLabel}` : baseText; const text = authorLabel ? `${baseText} from ${authorLabel}` : baseText;
const sessionCfg = cfg.session;
const sessionScope = sessionCfg?.scope ?? "per-sender";
const mainKey = (sessionCfg?.mainKey ?? "main").trim() || "main";
const sessionKey = resolveSessionKey(
sessionScope,
{
From: `group:${message.channelId}`,
ChatType: "group",
Surface: "discord",
},
mainKey,
);
enqueueSystemEvent(text, { enqueueSystemEvent(text, {
sessionKey,
contextKey: `discord:reaction:${action}:${message.id}:${user.id}:${emojiLabel}`, contextKey: `discord:reaction:${action}:${message.id}:${user.id}:${emojiLabel}`,
}); });
} catch (err) { } catch (err) {

View File

@@ -0,0 +1,64 @@
import { beforeEach, describe, expect, it } from "vitest";
import { prependSystemEvents } from "../auto-reply/reply/session-updates.js";
import type { ClawdbotConfig } from "../config/config.js";
import {
enqueueSystemEvent,
peekSystemEvents,
resetSystemEventsForTest,
} from "./system-events.js";
const cfg = {} as unknown as ClawdbotConfig;
describe("system events (session routing)", () => {
beforeEach(() => {
resetSystemEventsForTest();
});
it("does not leak session-scoped events into main", async () => {
enqueueSystemEvent("Discord reaction added: ✅", {
sessionKey: "discord:group:123",
contextKey: "discord:reaction:added:msg:user:✅",
});
expect(peekSystemEvents()).toEqual([]);
expect(peekSystemEvents("discord:group:123")).toEqual([
"Discord reaction added: ✅",
]);
const main = await prependSystemEvents({
cfg,
sessionKey: "main",
isMainSession: true,
isNewSession: false,
prefixedBodyBase: "hello",
});
expect(main).toBe("hello");
expect(peekSystemEvents("discord:group:123")).toEqual([
"Discord reaction added: ✅",
]);
const discord = await prependSystemEvents({
cfg,
sessionKey: "discord:group:123",
isMainSession: false,
isNewSession: false,
prefixedBodyBase: "hi",
});
expect(discord).toBe("System: Discord reaction added: ✅\n\nhi");
expect(peekSystemEvents("discord:group:123")).toEqual([]);
});
it("defaults system events to main", async () => {
enqueueSystemEvent("Node: Mac Studio");
const main = await prependSystemEvents({
cfg,
sessionKey: "main",
isMainSession: true,
isNewSession: false,
prefixedBodyBase: "ping",
});
expect(main).toBe("System: Node: Mac Studio\n\nping");
});
});

View File

@@ -1,18 +1,31 @@
// Lightweight in-memory queue for human-readable system events that should be // Lightweight in-memory queue for human-readable system events that should be
// prefixed to the next main-session prompt/heartbeat. We intentionally avoid // prefixed to the next prompt. We intentionally avoid persistence to keep
// persistence to keep events ephemeral. // events ephemeral. Events are session-scoped; callers that don't specify a
// session key default to "main".
type SystemEvent = { text: string; ts: number }; type SystemEvent = { text: string; ts: number };
const DEFAULT_SESSION_KEY = "main";
const MAX_EVENTS = 20; const MAX_EVENTS = 20;
const queue: SystemEvent[] = [];
let lastText: string | null = null; type SessionQueue = {
let lastContextKey: string | null = null; queue: SystemEvent[];
lastText: string | null;
lastContextKey: string | null;
};
const queues = new Map<string, SessionQueue>();
type SystemEventOptions = { type SystemEventOptions = {
contextKey?: string | null; contextKey?: string | null;
sessionKey?: string | null;
}; };
function normalizeSessionKey(key?: string | null): string {
const trimmed = typeof key === "string" ? key.trim() : "";
return trimmed || DEFAULT_SESSION_KEY;
}
function normalizeContextKey(key?: string | null): string | null { function normalizeContextKey(key?: string | null): string | null {
if (!key) return null; if (!key) return null;
const trimmed = key.trim(); const trimmed = key.trim();
@@ -22,33 +35,58 @@ function normalizeContextKey(key?: string | null): string | null {
export function isSystemEventContextChanged( export function isSystemEventContextChanged(
contextKey?: string | null, contextKey?: string | null,
sessionKey?: string | null,
): boolean { ): boolean {
const key = normalizeSessionKey(sessionKey);
const existing = queues.get(key);
const normalized = normalizeContextKey(contextKey); const normalized = normalizeContextKey(contextKey);
return normalized !== lastContextKey; return normalized !== (existing?.lastContextKey ?? null);
} }
export function enqueueSystemEvent(text: string, options?: SystemEventOptions) { export function enqueueSystemEvent(text: string, options?: SystemEventOptions) {
const key = normalizeSessionKey(options?.sessionKey);
const entry =
queues.get(key) ??
(() => {
const created: SessionQueue = {
queue: [],
lastText: null,
lastContextKey: null,
};
queues.set(key, created);
return created;
})();
const cleaned = text.trim(); const cleaned = text.trim();
if (!cleaned) return; if (!cleaned) return;
lastContextKey = normalizeContextKey(options?.contextKey); entry.lastContextKey = normalizeContextKey(options?.contextKey);
if (lastText === cleaned) return; // skip consecutive duplicates if (entry.lastText === cleaned) return; // skip consecutive duplicates
lastText = cleaned; entry.lastText = cleaned;
queue.push({ text: cleaned, ts: Date.now() }); entry.queue.push({ text: cleaned, ts: Date.now() });
if (queue.length > MAX_EVENTS) queue.shift(); if (entry.queue.length > MAX_EVENTS) entry.queue.shift();
} }
export function drainSystemEvents(): string[] { export function drainSystemEvents(sessionKey?: string | null): string[] {
const out = queue.map((e) => e.text); const key = normalizeSessionKey(sessionKey);
queue.length = 0; const entry = queues.get(key);
lastText = null; if (!entry || entry.queue.length === 0) return [];
lastContextKey = null; const out = entry.queue.map((e) => e.text);
entry.queue.length = 0;
entry.lastText = null;
entry.lastContextKey = null;
queues.delete(key);
return out; return out;
} }
export function peekSystemEvents(): string[] { export function peekSystemEvents(sessionKey?: string | null): string[] {
return queue.map((e) => e.text); const key = normalizeSessionKey(sessionKey);
return queues.get(key)?.queue.map((e) => e.text) ?? [];
} }
export function hasSystemEvents() { export function hasSystemEvents(sessionKey?: string | null) {
return queue.length > 0; const key = normalizeSessionKey(sessionKey);
return (queues.get(key)?.queue.length ?? 0) > 0;
}
export function resetSystemEventsForTest() {
queues.clear();
} }

View File

@@ -14,7 +14,11 @@ import type {
SlackSlashCommandConfig, SlackSlashCommandConfig,
} from "../config/config.js"; } from "../config/config.js";
import { loadConfig } from "../config/config.js"; import { loadConfig } from "../config/config.js";
import { resolveStorePath, updateLastRoute } from "../config/sessions.js"; import {
resolveSessionKey,
resolveStorePath,
updateLastRoute,
} from "../config/sessions.js";
import { danger, logVerbose, shouldLogVerbose } from "../globals.js"; import { danger, logVerbose, shouldLogVerbose } from "../globals.js";
import { enqueueSystemEvent } from "../infra/system-events.js"; import { enqueueSystemEvent } from "../infra/system-events.js";
import { getChildLogger } from "../logging.js"; import { getChildLogger } from "../logging.js";
@@ -311,6 +315,31 @@ async function resolveSlackMedia(params: {
export async function monitorSlackProvider(opts: MonitorSlackOpts = {}) { export async function monitorSlackProvider(opts: MonitorSlackOpts = {}) {
const cfg = loadConfig(); const cfg = loadConfig();
const sessionCfg = cfg.session;
const sessionScope = sessionCfg?.scope ?? "per-sender";
const mainKey = (sessionCfg?.mainKey ?? "main").trim() || "main";
const resolveSlackSystemEventSessionKey = (params: {
channelId?: string | null;
channelType?: string | null;
}) => {
const channelId = params.channelId?.trim() ?? "";
if (!channelId) return mainKey;
const channelType = params.channelType?.trim().toLowerCase() ?? "";
const isRoom = channelType === "channel" || channelType === "group";
const isGroup = channelType === "mpim";
const from = isRoom
? `slack:channel:${channelId}`
: isGroup
? `slack:group:${channelId}`
: `slack:${channelId}`;
const chatType = isRoom ? "room" : isGroup ? "group" : "direct";
return resolveSessionKey(
sessionScope,
{ From: from, ChatType: chatType, Surface: "slack" },
mainKey,
);
};
const botToken = resolveSlackBotToken( const botToken = resolveSlackBotToken(
opts.botToken ?? opts.botToken ??
process.env.SLACK_BOT_TOKEN ?? process.env.SLACK_BOT_TOKEN ??
@@ -576,7 +605,22 @@ export async function monitorSlackProvider(opts: MonitorSlackOpts = {}) {
const inboundLabel = isDirectMessage const inboundLabel = isDirectMessage
? `Slack DM from ${senderName}` ? `Slack DM from ${senderName}`
: `Slack message in ${roomLabel} from ${senderName}`; : `Slack message in ${roomLabel} from ${senderName}`;
const slackFrom = isDirectMessage
? `slack:${message.user}`
: isRoom
? `slack:channel:${message.channel}`
: `slack:group:${message.channel}`;
const sessionKey = resolveSessionKey(
sessionScope,
{
From: slackFrom,
ChatType: isDirectMessage ? "direct" : isRoom ? "room" : "group",
Surface: "slack",
},
mainKey,
);
enqueueSystemEvent(`${inboundLabel}: ${preview}`, { enqueueSystemEvent(`${inboundLabel}: ${preview}`, {
sessionKey,
contextKey: `slack:message:${message.channel}:${message.ts ?? "unknown"}`, contextKey: `slack:message:${message.channel}:${message.ts ?? "unknown"}`,
}); });
@@ -591,11 +635,7 @@ export async function monitorSlackProvider(opts: MonitorSlackOpts = {}) {
const isRoomish = isRoom || isGroupDm; const isRoomish = isRoom || isGroupDm;
const ctxPayload = { const ctxPayload = {
Body: body, Body: body,
From: isDirectMessage From: slackFrom,
? `slack:${message.user}`
: isRoom
? `slack:channel:${message.channel}`
: `slack:group:${message.channel}`,
To: isDirectMessage To: isDirectMessage
? `user:${message.user}` ? `user:${message.user}`
: `channel:${message.channel}`, : `channel:${message.channel}`,
@@ -715,7 +755,12 @@ export async function monitorSlackProvider(opts: MonitorSlackOpts = {}) {
channelId, channelId,
channelName: channelInfo?.name, channelName: channelInfo?.name,
}); });
const sessionKey = resolveSlackSystemEventSessionKey({
channelId,
channelType,
});
enqueueSystemEvent(`Slack message edited in ${label}.`, { enqueueSystemEvent(`Slack message edited in ${label}.`, {
sessionKey,
contextKey: `slack:message:changed:${channelId ?? "unknown"}:${messageId ?? changed.event_ts ?? "unknown"}`, contextKey: `slack:message:changed:${channelId ?? "unknown"}:${messageId ?? changed.event_ts ?? "unknown"}`,
}); });
return; return;
@@ -740,7 +785,12 @@ export async function monitorSlackProvider(opts: MonitorSlackOpts = {}) {
channelId, channelId,
channelName: channelInfo?.name, channelName: channelInfo?.name,
}); });
const sessionKey = resolveSlackSystemEventSessionKey({
channelId,
channelType,
});
enqueueSystemEvent(`Slack message deleted in ${label}.`, { enqueueSystemEvent(`Slack message deleted in ${label}.`, {
sessionKey,
contextKey: `slack:message:deleted:${channelId ?? "unknown"}:${deleted.deleted_ts ?? deleted.event_ts ?? "unknown"}`, contextKey: `slack:message:deleted:${channelId ?? "unknown"}:${deleted.deleted_ts ?? deleted.event_ts ?? "unknown"}`,
}); });
return; return;
@@ -766,7 +816,12 @@ export async function monitorSlackProvider(opts: MonitorSlackOpts = {}) {
channelName: channelInfo?.name, channelName: channelInfo?.name,
}); });
const messageId = thread.message?.ts ?? thread.event_ts; const messageId = thread.message?.ts ?? thread.event_ts;
const sessionKey = resolveSlackSystemEventSessionKey({
channelId,
channelType,
});
enqueueSystemEvent(`Slack thread reply broadcast in ${label}.`, { enqueueSystemEvent(`Slack thread reply broadcast in ${label}.`, {
sessionKey,
contextKey: `slack:thread:broadcast:${channelId ?? "unknown"}:${messageId ?? "unknown"}`, contextKey: `slack:thread:broadcast:${channelId ?? "unknown"}:${messageId ?? "unknown"}`,
}); });
return; return;
@@ -860,7 +915,12 @@ export async function monitorSlackProvider(opts: MonitorSlackOpts = {}) {
const authorLabel = authorInfo?.name ?? event.item_user; const authorLabel = authorInfo?.name ?? event.item_user;
const baseText = `Slack reaction ${action}: :${emojiLabel}: by ${actorLabel} in ${channelLabel} msg ${item.ts}`; const baseText = `Slack reaction ${action}: :${emojiLabel}: by ${actorLabel} in ${channelLabel} msg ${item.ts}`;
const text = authorLabel ? `${baseText} from ${authorLabel}` : baseText; const text = authorLabel ? `${baseText} from ${authorLabel}` : baseText;
const sessionKey = resolveSlackSystemEventSessionKey({
channelId: item.channel,
channelType,
});
enqueueSystemEvent(text, { enqueueSystemEvent(text, {
sessionKey,
contextKey: `slack:reaction:${action}:${item.channel}:${item.ts}:${event.user}:${emojiLabel}`, contextKey: `slack:reaction:${action}:${item.channel}:${item.ts}:${event.user}:${emojiLabel}`,
}); });
} catch (err) { } catch (err) {
@@ -909,7 +969,12 @@ export async function monitorSlackProvider(opts: MonitorSlackOpts = {}) {
channelId, channelId,
channelName: channelInfo?.name, channelName: channelInfo?.name,
}); });
const sessionKey = resolveSlackSystemEventSessionKey({
channelId,
channelType,
});
enqueueSystemEvent(`Slack: ${userLabel} joined ${label}.`, { enqueueSystemEvent(`Slack: ${userLabel} joined ${label}.`, {
sessionKey,
contextKey: `slack:member:joined:${channelId ?? "unknown"}:${payload.user ?? "unknown"}`, contextKey: `slack:member:joined:${channelId ?? "unknown"}:${payload.user ?? "unknown"}`,
}); });
} catch (err) { } catch (err) {
@@ -945,7 +1010,12 @@ export async function monitorSlackProvider(opts: MonitorSlackOpts = {}) {
channelId, channelId,
channelName: channelInfo?.name, channelName: channelInfo?.name,
}); });
const sessionKey = resolveSlackSystemEventSessionKey({
channelId,
channelType,
});
enqueueSystemEvent(`Slack: ${userLabel} left ${label}.`, { enqueueSystemEvent(`Slack: ${userLabel} left ${label}.`, {
sessionKey,
contextKey: `slack:member:left:${channelId ?? "unknown"}:${payload.user ?? "unknown"}`, contextKey: `slack:member:left:${channelId ?? "unknown"}:${payload.user ?? "unknown"}`,
}); });
} catch (err) { } catch (err) {
@@ -971,7 +1041,12 @@ export async function monitorSlackProvider(opts: MonitorSlackOpts = {}) {
return; return;
} }
const label = resolveSlackChannelLabel({ channelId, channelName }); const label = resolveSlackChannelLabel({ channelId, channelName });
const sessionKey = resolveSlackSystemEventSessionKey({
channelId,
channelType: "channel",
});
enqueueSystemEvent(`Slack channel created: ${label}.`, { enqueueSystemEvent(`Slack channel created: ${label}.`, {
sessionKey,
contextKey: `slack:channel:created:${channelId ?? channelName ?? "unknown"}`, contextKey: `slack:channel:created:${channelId ?? channelName ?? "unknown"}`,
}); });
} catch (err) { } catch (err) {
@@ -1000,7 +1075,12 @@ export async function monitorSlackProvider(opts: MonitorSlackOpts = {}) {
return; return;
} }
const label = resolveSlackChannelLabel({ channelId, channelName }); const label = resolveSlackChannelLabel({ channelId, channelName });
const sessionKey = resolveSlackSystemEventSessionKey({
channelId,
channelType: "channel",
});
enqueueSystemEvent(`Slack channel renamed: ${label}.`, { enqueueSystemEvent(`Slack channel renamed: ${label}.`, {
sessionKey,
contextKey: `slack:channel:renamed:${channelId ?? channelName ?? "unknown"}`, contextKey: `slack:channel:renamed:${channelId ?? channelName ?? "unknown"}`,
}); });
} catch (err) { } catch (err) {
@@ -1039,9 +1119,14 @@ export async function monitorSlackProvider(opts: MonitorSlackOpts = {}) {
const userLabel = userInfo?.name ?? payload.user ?? "someone"; const userLabel = userInfo?.name ?? payload.user ?? "someone";
const itemType = payload.item?.type ?? "item"; const itemType = payload.item?.type ?? "item";
const messageId = payload.item?.message?.ts ?? payload.event_ts; const messageId = payload.item?.message?.ts ?? payload.event_ts;
const sessionKey = resolveSlackSystemEventSessionKey({
channelId,
channelType: channelInfo?.type ?? undefined,
});
enqueueSystemEvent( enqueueSystemEvent(
`Slack: ${userLabel} pinned a ${itemType} in ${label}.`, `Slack: ${userLabel} pinned a ${itemType} in ${label}.`,
{ {
sessionKey,
contextKey: `slack:pin:added:${channelId ?? "unknown"}:${messageId ?? "unknown"}`, contextKey: `slack:pin:added:${channelId ?? "unknown"}:${messageId ?? "unknown"}`,
}, },
); );
@@ -1081,9 +1166,14 @@ export async function monitorSlackProvider(opts: MonitorSlackOpts = {}) {
const userLabel = userInfo?.name ?? payload.user ?? "someone"; const userLabel = userInfo?.name ?? payload.user ?? "someone";
const itemType = payload.item?.type ?? "item"; const itemType = payload.item?.type ?? "item";
const messageId = payload.item?.message?.ts ?? payload.event_ts; const messageId = payload.item?.message?.ts ?? payload.event_ts;
const sessionKey = resolveSlackSystemEventSessionKey({
channelId,
channelType: channelInfo?.type ?? undefined,
});
enqueueSystemEvent( enqueueSystemEvent(
`Slack: ${userLabel} unpinned a ${itemType} in ${label}.`, `Slack: ${userLabel} unpinned a ${itemType} in ${label}.`,
{ {
sessionKey,
contextKey: `slack:pin:removed:${channelId ?? "unknown"}:${messageId ?? "unknown"}`, contextKey: `slack:pin:removed:${channelId ?? "unknown"}:${messageId ?? "unknown"}`,
}, },
); );