diff --git a/docs/configuration.md b/docs/configuration.md index 181a31f13..091a2deb3 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -85,6 +85,26 @@ Group messages default to **require mention** (either metadata mention or regex } ``` +### `routing.queue` + +Controls how inbound messages behave when an agent run is already active. + +```json5 +{ + routing: { + queue: { + mode: "interrupt", // global default: queue | interrupt | drop + bySurface: { + whatsapp: "interrupt", + telegram: "interrupt", + discord: "queue", + webchat: "queue" + } + } + } +} +``` + ### `discord` (bot transport) Configure the Discord bot by setting the bot token and optional gating: @@ -94,6 +114,10 @@ Configure the Discord bot by setting the bot token and optional gating: discord: { token: "your-bot-token", allowFrom: ["discord:1234567890", "*"], // optional DM allowlist (user ids) + guildAllowFrom: { + guilds: ["123456789012345678"], // optional guild allowlist (ids) + users: ["987654321098765432"] // optional user allowlist (ids) + }, requireMention: true, // require @bot mentions in guilds mediaMaxMb: 8 // clamp inbound media size } diff --git a/docs/discord.md b/docs/discord.md index 34a109340..e6c37f7a6 100644 --- a/docs/discord.md +++ b/docs/discord.md @@ -22,6 +22,7 @@ Status: ready for DM and guild text channels via the official Discord bot gatewa 5. Direct chats: use `user:` (or a `<@id>` mention) when delivering; all turns land in the shared `main` session. 6. Guild channels: use `channel:` for delivery. Mentions are required by default; disable with `discord.requireMention = false`. 7. Optional DM allowlist: reuse `discord.allowFrom` with user ids (`1234567890` or `discord:1234567890`). Use `"*"` to allow all DMs. +8. Optional guild allowlist: set `discord.guildAllowFrom` with `guilds` and/or `users` to gate who can invoke the bot in servers. Note: Discord does not provide a simple username → id lookup without extra guild context, so prefer ids or `<@id>` mentions for DM delivery targets. @@ -38,6 +39,10 @@ Note: Discord does not provide a simple username → id lookup without extra gui discord: { token: "abc.123", allowFrom: ["123456789012345678"], + guildAllowFrom: { + guilds: ["123456789012345678"], + users: ["987654321098765432"] + }, requireMention: true, mediaMaxMb: 8 } @@ -45,6 +50,7 @@ Note: Discord does not provide a simple username → id lookup without extra gui ``` - `allowFrom`: DM allowlist (user ids). Omit or set to `["*"]` to allow any DM sender. +- `guildAllowFrom`: Optional allowlist for guild messages. Set `guilds` and/or `users` (ids). When both are set, both must match. - `requireMention`: when `true`, messages in guild channels must mention the bot. - `mediaMaxMb`: clamp inbound media saved to disk. diff --git a/docs/queue.md b/docs/queue.md index ef78d7884..122039daa 100644 --- a/docs/queue.md +++ b/docs/queue.md @@ -18,6 +18,33 @@ We now serialize command-based auto-replies (WhatsApp Web listener) through a ti - When verbose logging is enabled, queued commands emit a short notice if they waited more than ~2s before starting. - Typing indicators (`onReplyStart`) still fire immediately on enqueue so user experience is unchanged while we wait our turn. +## Queue modes (per surface) +Inbound messages can either queue or interrupt when a run is already active: +- `queue`: serialize per session; if the agent is streaming, the new message is appended to the current run. +- `interrupt`: abort the active run for that session, then run the newest message. +- `drop`: ignore the message if the session lane is busy. + +Defaults (when unset in config): +- WhatsApp + Telegram → `interrupt` +- Discord + WebChat → `queue` + +Configure globally or per surface via `routing.queue`: + +```json5 +{ + routing: { + queue: { + mode: "interrupt", + bySurface: { discord: "queue", telegram: "interrupt" } + } + } +} +``` + +## Per-session overrides +- `/queue ` as a standalone command stores the mode for the current session. +- `/queue ` embedded in a message applies **once** (no persistence). + ## Scope and guarantees - Applies only to config-driven command replies; plain text replies are unaffected. - Default lane (`main`) is process-wide for inbound + main heartbeats; set `agent.maxConcurrent` to allow multiple sessions in parallel. diff --git a/src/agents/pi-embedded-runner.ts b/src/agents/pi-embedded-runner.ts index a6a4c7a07..b2a216453 100644 --- a/src/agents/pi-embedded-runner.ts +++ b/src/agents/pi-embedded-runner.ts @@ -86,6 +86,7 @@ export type EmbeddedPiRunResult = { type EmbeddedPiQueueHandle = { queueMessage: (text: string) => Promise; isStreaming: () => boolean; + abort: () => void; }; const ACTIVE_EMBEDDED_RUNS = new Map(); @@ -203,6 +204,27 @@ export function queueEmbeddedPiMessage( return true; } +export function abortEmbeddedPiRun(sessionId: string): boolean { + const handle = ACTIVE_EMBEDDED_RUNS.get(sessionId); + if (!handle) return false; + handle.abort(); + return true; +} + +export function isEmbeddedPiRunActive(sessionId: string): boolean { + return ACTIVE_EMBEDDED_RUNS.has(sessionId); +} + +export function isEmbeddedPiRunStreaming(sessionId: string): boolean { + const handle = ACTIVE_EMBEDDED_RUNS.get(sessionId); + if (!handle) return false; + return handle.isStreaming(); +} + +export function resolveEmbeddedSessionLane(key: string) { + return resolveSessionLane(key); +} + function mapThinkingLevel(level?: ThinkLevel): ThinkingLevel { // pi-agent-core supports "xhigh" too; Clawdis doesn't surface it for now. if (!level) return "off"; @@ -445,14 +467,19 @@ export async function runEmbeddedPiAgent(params: { if (prior.length > 0) { session.agent.replaceMessages(prior); } + let aborted = Boolean(params.abortSignal?.aborted); + const abortRun = () => { + aborted = true; + void session.abort(); + }; const queueHandle: EmbeddedPiQueueHandle = { queueMessage: async (text: string) => { await session.queueMessage(text); }, isStreaming: () => session.isStreaming, + abort: abortRun, }; ACTIVE_EMBEDDED_RUNS.set(params.sessionId, queueHandle); - let aborted = Boolean(params.abortSignal?.aborted); const { assistantTexts, @@ -473,8 +500,7 @@ export async function runEmbeddedPiAgent(params: { const abortTimer = setTimeout( () => { - aborted = true; - void session.abort(); + abortRun(); }, Math.max(1, params.timeoutMs), ); @@ -482,8 +508,7 @@ export async function runEmbeddedPiAgent(params: { let messagesSnapshot: AppMessage[] = []; let sessionIdUsed = session.sessionId; const onAbort = () => { - aborted = true; - void session.abort(); + abortRun(); }; if (params.abortSignal) { if (params.abortSignal.aborted) { diff --git a/src/agents/pi-embedded.ts b/src/agents/pi-embedded.ts index 3611934e2..b521dbf48 100644 --- a/src/agents/pi-embedded.ts +++ b/src/agents/pi-embedded.ts @@ -4,6 +4,10 @@ export type { EmbeddedPiRunResult, } from "./pi-embedded-runner.js"; export { + abortEmbeddedPiRun, + isEmbeddedPiRunActive, + isEmbeddedPiRunStreaming, queueEmbeddedPiMessage, + resolveEmbeddedSessionLane, runEmbeddedPiAgent, } from "./pi-embedded-runner.js"; diff --git a/src/auto-reply/reply.directive.test.ts b/src/auto-reply/reply.directive.test.ts index f8f43cde9..70738c35f 100644 --- a/src/auto-reply/reply.directive.test.ts +++ b/src/auto-reply/reply.directive.test.ts @@ -5,8 +5,11 @@ import path from "node:path"; import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; vi.mock("../agents/pi-embedded.js", () => ({ + abortEmbeddedPiRun: vi.fn().mockReturnValue(false), runEmbeddedPiAgent: vi.fn(), queueEmbeddedPiMessage: vi.fn().mockReturnValue(false), + resolveEmbeddedSessionLane: (key: string) => + `session:${key.trim() || "main"}`, })); vi.mock("../agents/model-catalog.js", () => ({ loadModelCatalog: vi.fn(), @@ -20,6 +23,7 @@ import { saveSessionStore, } from "../config/sessions.js"; import { + extractQueueDirective, extractThinkDirective, extractVerboseDirective, getReplyFromConfig, @@ -83,6 +87,13 @@ describe("directive parsing", () => { expect(res.thinkLevel).toBe("high"); }); + it("matches queue directive", () => { + const res = extractQueueDirective("please /queue interrupt now"); + expect(res.hasDirective).toBe(true); + expect(res.queueMode).toBe("interrupt"); + expect(res.cleaned).toBe("please now"); + }); + it("applies inline think and still runs agent content", async () => { await withTempHome(async (home) => { vi.mocked(runEmbeddedPiAgent).mockResolvedValue({ @@ -142,6 +153,33 @@ describe("directive parsing", () => { }); }); + it("acks queue directive and persists override", async () => { + await withTempHome(async (home) => { + vi.mocked(runEmbeddedPiAgent).mockReset(); + const storePath = path.join(home, "sessions.json"); + + const res = await getReplyFromConfig( + { Body: "/queue interrupt", From: "+1222", To: "+1222" }, + {}, + { + agent: { + model: "anthropic/claude-opus-4-5", + workspace: path.join(home, "clawd"), + }, + routing: { allowFrom: ["*"] }, + session: { store: storePath }, + }, + ); + + const text = Array.isArray(res) ? res[0]?.text : res?.text; + expect(text).toMatch(/^⚙️ Queue mode set to interrupt\./); + const store = loadSessionStore(storePath); + const entry = Object.values(store)[0]; + expect(entry?.queueMode).toBe("interrupt"); + expect(runEmbeddedPiAgent).not.toHaveBeenCalled(); + }); + }); + it("updates tool verbose during an in-flight run (toggle on)", async () => { await withTempHome(async (home) => { const storePath = path.join(home, "sessions.json"); diff --git a/src/auto-reply/reply.triggers.test.ts b/src/auto-reply/reply.triggers.test.ts index a5e177b6c..1c6835f33 100644 --- a/src/auto-reply/reply.triggers.test.ts +++ b/src/auto-reply/reply.triggers.test.ts @@ -4,8 +4,11 @@ import { join } from "node:path"; import { afterEach, describe, expect, it, vi } from "vitest"; vi.mock("../agents/pi-embedded.js", () => ({ + abortEmbeddedPiRun: vi.fn().mockReturnValue(false), runEmbeddedPiAgent: vi.fn(), queueEmbeddedPiMessage: vi.fn().mockReturnValue(false), + resolveEmbeddedSessionLane: (key: string) => + `session:${key.trim() || "main"}`, })); import { runEmbeddedPiAgent } from "../agents/pi-embedded.js"; diff --git a/src/auto-reply/reply.ts b/src/auto-reply/reply.ts index b7756f65f..b749632a3 100644 --- a/src/auto-reply/reply.ts +++ b/src/auto-reply/reply.ts @@ -14,7 +14,9 @@ import { resolveConfiguredModelRef, } from "../agents/model-selection.js"; import { + abortEmbeddedPiRun, queueEmbeddedPiMessage, + resolveEmbeddedSessionLane, runEmbeddedPiAgent, } from "../agents/pi-embedded.js"; import { buildWorkspaceSkillSnapshot } from "../agents/skills.js"; @@ -37,6 +39,7 @@ import { logVerbose } from "../globals.js"; import { buildProviderSummary } from "../infra/provider-summary.js"; import { triggerClawdisRestart } from "../infra/restart.js"; import { drainSystemEvents } from "../infra/system-events.js"; +import { clearCommandLane, getQueueSize } from "../process/command-queue.js"; import { defaultRuntime } from "../runtime.js"; import { normalizeE164 } from "../utils.js"; import { resolveHeartbeatSeconds } from "../web/reconnect.js"; @@ -67,6 +70,8 @@ const SYSTEM_MARK = "⚙️"; const BARE_SESSION_RESET_PROMPT = "A new session was started via /new or /reset. Say hi briefly (1-2 sentences) and ask what the user wants to do next. Do not mention internal steps, files, tools, or reasoning."; +type QueueMode = "queue" | "interrupt" | "drop"; + export function extractThinkDirective(body?: string): { cleaned: string; thinkLevel?: ThinkLevel; @@ -112,6 +117,36 @@ export function extractVerboseDirective(body?: string): { }; } +function normalizeQueueMode(raw?: string): QueueMode | undefined { + if (!raw) return undefined; + const cleaned = raw.trim().toLowerCase(); + if (cleaned === "queue" || cleaned === "queued") return "queue"; + if (cleaned === "interrupt" || cleaned === "interrupts" || cleaned === "abort") + return "interrupt"; + if (cleaned === "drop" || cleaned === "discard") return "drop"; + return undefined; +} + +export function extractQueueDirective(body?: string): { + cleaned: string; + queueMode?: QueueMode; + rawMode?: string; + hasDirective: boolean; +} { + if (!body) return { cleaned: "", hasDirective: false }; + const match = body.match(/(?:^|\s)\/queue(?=$|\s|:)\s*:?\s*([a-zA-Z-]+)\b/i); + const queueMode = normalizeQueueMode(match?.[1]); + const cleaned = match + ? body.replace(match[0], "").replace(/\s+/g, " ").trim() + : body.trim(); + return { + cleaned, + queueMode, + rawMode: match?.[1], + hasDirective: !!match, + }; +} + function isAbortTrigger(text?: string): boolean { if (!text) return false; const normalized = text.trim().toLowerCase(); @@ -156,9 +191,41 @@ function stripMentions( } // Generic mention patterns like @123456789 or plain digits result = result.replace(/@[0-9+]{5,}/g, " "); + // Discord-style mentions (<@123> or <@!123>) + result = result.replace(/<@!?\d+>/g, " "); return result.replace(/\s+/g, " ").trim(); } +function defaultQueueModeForSurface(surface?: string): QueueMode { + const normalized = surface?.trim().toLowerCase(); + if (normalized === "discord") return "queue"; + if (normalized === "webchat") return "queue"; + return "interrupt"; +} + +function resolveQueueMode(params: { + cfg: ClawdisConfig; + surface?: string; + sessionEntry?: SessionEntry; + inlineMode?: QueueMode; +}): QueueMode { + const surfaceKey = params.surface?.trim().toLowerCase(); + const queueCfg = params.cfg.routing?.queue; + const surfaceMode = + surfaceKey && queueCfg?.bySurface + ? (queueCfg.bySurface as Record)[ + surfaceKey + ] + : undefined; + return ( + params.inlineMode ?? + params.sessionEntry?.queueMode ?? + surfaceMode ?? + queueCfg?.mode ?? + defaultQueueModeForSurface(surfaceKey) + ); +} + export async function getReplyFromConfig( ctx: MsgContext, opts?: GetReplyOptions, @@ -343,6 +410,7 @@ export async function getReplyFromConfig( verboseLevel: persistedVerbose ?? baseEntry?.verboseLevel, modelOverride: persistedModelOverride ?? baseEntry?.modelOverride, providerOverride: persistedProviderOverride ?? baseEntry?.providerOverride, + queueMode: baseEntry?.queueMode, }; sessionStore[sessionKey] = sessionEntry; await saveSessionStore(storePath, sessionStore); @@ -371,8 +439,14 @@ export async function getReplyFromConfig( rawModel: rawModelDirective, hasDirective: hasModelDirective, } = extractModelDirective(verboseCleaned); - sessionCtx.Body = modelCleaned; - sessionCtx.BodyStripped = modelCleaned; + const { + cleaned: queueCleaned, + queueMode: inlineQueueMode, + rawMode: rawQueueMode, + hasDirective: hasQueueDirective, + } = extractQueueDirective(modelCleaned); + sessionCtx.Body = queueCleaned; + sessionCtx.BodyStripped = queueCleaned; const defaultGroupActivation = () => { const requireMention = cfg.routing?.groupChat?.requireMention; @@ -457,9 +531,14 @@ export async function getReplyFromConfig( DEFAULT_CONTEXT_TOKENS; const directiveOnly = (() => { - if (!hasThinkDirective && !hasVerboseDirective && !hasModelDirective) + if ( + !hasThinkDirective && + !hasVerboseDirective && + !hasModelDirective && + !hasQueueDirective + ) return false; - const stripped = stripStructuralPrefixes(modelCleaned ?? ""); + const stripped = stripStructuralPrefixes(queueCleaned ?? ""); const noMentions = isGroup ? stripMentions(stripped, ctx, cfg) : stripped; return noMentions.length === 0; })(); @@ -501,6 +580,12 @@ export async function getReplyFromConfig( text: `Unrecognized verbose level "${rawVerboseLevel ?? ""}". Valid levels: off, on.`, }; } + if (hasQueueDirective && !inlineQueueMode) { + cleanupTyping(); + return { + text: `Unrecognized queue mode "${rawQueueMode ?? ""}". Valid modes: queue, interrupt, drop.`, + }; + } let modelSelection: | { provider: string; model: string; isDefault: boolean } @@ -543,6 +628,9 @@ export async function getReplyFromConfig( sessionEntry.modelOverride = modelSelection.model; } } + if (hasQueueDirective && inlineQueueMode) { + sessionEntry.queueMode = inlineQueueMode; + } sessionEntry.updatedAt = Date.now(); sessionStore[sessionKey] = sessionEntry; await saveSessionStore(storePath, sessionStore); @@ -571,6 +659,9 @@ export async function getReplyFromConfig( : `Model set to ${label}.`, ); } + if (hasQueueDirective && inlineQueueMode) { + parts.push(`${SYSTEM_MARK} Queue mode set to ${inlineQueueMode}.`); + } const ack = parts.join(" ").trim(); cleanupTyping(); return { text: ack || "OK." }; @@ -626,6 +717,7 @@ export async function getReplyFromConfig( await saveSessionStore(storePath, sessionStore); } } + const perMessageQueueMode = hasQueueDirective ? inlineQueueMode : undefined; // Optional allowlist by origin number (E.164 without whatsapp: prefix) const configuredAllowFrom = cfg.routing?.allowFrom; @@ -990,7 +1082,35 @@ export async function getReplyFromConfig( .trim() : queueBodyBase; - if (queueEmbeddedPiMessage(sessionIdFinal, queuedBody)) { + const resolvedQueueMode = resolveQueueMode({ + cfg, + surface: sessionCtx.Surface, + sessionEntry, + inlineMode: perMessageQueueMode, + }); + const sessionLaneKey = resolveEmbeddedSessionLane( + sessionKey ?? sessionIdFinal, + ); + const laneSize = getQueueSize(sessionLaneKey); + if (resolvedQueueMode === "drop" && laneSize > 0) { + logVerbose( + `Dropping inbound message for ${sessionLaneKey} (queue busy, mode=drop)`, + ); + cleanupTyping(); + return undefined; + } + if (resolvedQueueMode === "interrupt" && laneSize > 0) { + const cleared = clearCommandLane(sessionLaneKey); + const aborted = abortEmbeddedPiRun(sessionIdFinal); + logVerbose( + `Interrupting ${sessionLaneKey} (cleared ${cleared}, aborted=${aborted})`, + ); + } + + if ( + resolvedQueueMode === "queue" && + queueEmbeddedPiMessage(sessionIdFinal, queuedBody) + ) { if (sessionEntry && sessionStore && sessionKey) { sessionEntry.updatedAt = Date.now(); sessionStore[sessionKey] = sessionEntry; diff --git a/src/commands/agent.test.ts b/src/commands/agent.test.ts index 974c9e8b2..55015f407 100644 --- a/src/commands/agent.test.ts +++ b/src/commands/agent.test.ts @@ -12,7 +12,10 @@ import { } from "vitest"; vi.mock("../agents/pi-embedded.js", () => ({ + abortEmbeddedPiRun: vi.fn().mockReturnValue(false), runEmbeddedPiAgent: vi.fn(), + resolveEmbeddedSessionLane: (key: string) => + `session:${key.trim() || "main"}`, })); import { runEmbeddedPiAgent } from "../agents/pi-embedded.js"; diff --git a/src/config/config.ts b/src/config/config.ts index c2d2ce0e1..4602709da 100644 --- a/src/config/config.ts +++ b/src/config/config.ts @@ -139,10 +139,23 @@ export type TelegramConfig = { export type DiscordConfig = { token?: string; allowFrom?: Array; + guildAllowFrom?: { + guilds?: Array; + users?: Array; + }; requireMention?: boolean; mediaMaxMb?: number; }; +export type QueueMode = "queue" | "interrupt" | "drop"; + +export type QueueModeBySurface = { + whatsapp?: QueueMode; + telegram?: QueueMode; + discord?: QueueMode; + webchat?: QueueMode; +}; + export type GroupChatConfig = { requireMention?: boolean; mentionPatterns?: string[]; @@ -157,6 +170,10 @@ export type RoutingConfig = { timeoutSeconds?: number; }; groupChat?: GroupChatConfig; + queue?: { + mode?: QueueMode; + bySurface?: QueueModeBySurface; + }; }; export type MessagesConfig = { @@ -437,6 +454,21 @@ const GroupChatSchema = z }) .optional(); +const QueueModeSchema = z.union([ + z.literal("queue"), + z.literal("interrupt"), + z.literal("drop"), +]); + +const QueueModeBySurfaceSchema = z + .object({ + whatsapp: QueueModeSchema.optional(), + telegram: QueueModeSchema.optional(), + discord: QueueModeSchema.optional(), + webchat: QueueModeSchema.optional(), + }) + .optional(); + const TranscribeAudioSchema = z .object({ command: z.array(z.string()), @@ -498,6 +530,12 @@ const RoutingSchema = z allowFrom: z.array(z.string()).optional(), groupChat: GroupChatSchema, transcribeAudio: TranscribeAudioSchema, + queue: z + .object({ + mode: QueueModeSchema.optional(), + bySurface: QueueModeBySurfaceSchema, + }) + .optional(), }) .optional(); @@ -698,6 +736,12 @@ const ClawdisSchema = z.object({ .object({ token: z.string().optional(), allowFrom: z.array(z.union([z.string(), z.number()])).optional(), + guildAllowFrom: z + .object({ + guilds: z.array(z.union([z.string(), z.number()])).optional(), + users: z.array(z.union([z.string(), z.number()])).optional(), + }) + .optional(), requireMention: z.boolean().optional(), mediaMaxMb: z.number().positive().optional(), }) diff --git a/src/config/sessions.ts b/src/config/sessions.ts index ab5593054..8b8871b45 100644 --- a/src/config/sessions.ts +++ b/src/config/sessions.ts @@ -21,6 +21,7 @@ export type SessionEntry = { modelOverride?: string; groupActivation?: "mention" | "always"; groupActivationNeedsSystemIntro?: boolean; + queueMode?: "queue" | "interrupt" | "drop"; inputTokens?: number; outputTokens?: number; totalTokens?: number; @@ -132,6 +133,7 @@ export async function updateLastRoute(params: { verboseLevel: existing?.verboseLevel, providerOverride: existing?.providerOverride, modelOverride: existing?.modelOverride, + queueMode: existing?.queueMode, inputTokens: existing?.inputTokens, outputTokens: existing?.outputTokens, totalTokens: existing?.totalTokens, diff --git a/src/cron/isolated-agent.test.ts b/src/cron/isolated-agent.test.ts index 16b268147..8fc319d08 100644 --- a/src/cron/isolated-agent.test.ts +++ b/src/cron/isolated-agent.test.ts @@ -9,7 +9,10 @@ import type { ClawdisConfig } from "../config/config.js"; import type { CronJob } from "./types.js"; vi.mock("../agents/pi-embedded.js", () => ({ + abortEmbeddedPiRun: vi.fn().mockReturnValue(false), runEmbeddedPiAgent: vi.fn(), + resolveEmbeddedSessionLane: (key: string) => + `session:${key.trim() || "main"}`, })); import { runEmbeddedPiAgent } from "../agents/pi-embedded.js"; diff --git a/src/discord/monitor.ts b/src/discord/monitor.ts index 88c6f890b..5b89e49fc 100644 --- a/src/discord/monitor.ts +++ b/src/discord/monitor.ts @@ -25,6 +25,10 @@ export type MonitorDiscordOpts = { runtime?: RuntimeEnv; abortSignal?: AbortSignal; allowFrom?: Array; + guildAllowFrom?: { + guilds?: Array; + users?: Array; + }; requireMention?: boolean; mediaMaxMb?: number; }; @@ -55,6 +59,7 @@ export async function monitorDiscordProvider(opts: MonitorDiscordOpts = {}) { }; const allowFrom = opts.allowFrom ?? cfg.discord?.allowFrom; + const guildAllowFrom = opts.guildAllowFrom ?? cfg.discord?.guildAllowFrom; const requireMention = opts.requireMention ?? cfg.discord?.requireMention ?? true; const mediaMaxBytes = @@ -86,9 +91,11 @@ export async function monitorDiscordProvider(opts: MonitorDiscordOpts = {}) { if (!message.author) return; const isDirectMessage = !message.guild; + const botId = client.user?.id; + const wasMentioned = + !isDirectMessage && Boolean(botId && message.mentions.has(botId)); if (!isDirectMessage && requireMention) { - const botId = client.user?.id; - if (botId && !message.mentions.has(botId)) { + if (botId && !wasMentioned) { logger.info( { channelId: message.channelId, @@ -100,6 +107,31 @@ export async function monitorDiscordProvider(opts: MonitorDiscordOpts = {}) { } } + if (!isDirectMessage && guildAllowFrom) { + const guilds = normalizeDiscordAllowList(guildAllowFrom.guilds, [ + "guild:", + ]); + const users = normalizeDiscordAllowList(guildAllowFrom.users, [ + "discord:", + "user:", + ]); + if (guilds || users) { + const guildId = message.guild?.id ?? ""; + const userId = message.author.id; + const guildOk = + !guilds || + guilds.allowAll || + (guildId && guilds.ids.has(guildId)); + const userOk = !users || users.allowAll || users.ids.has(userId); + if (!guildOk || !userOk) { + logVerbose( + `Blocked discord guild sender ${userId} (guild ${guildId || "unknown"}) not in guildAllowFrom`, + ); + return; + } + } + } + if (isDirectMessage && Array.isArray(allowFrom) && allowFrom.length > 0) { const allowed = allowFrom .map((entry) => String(entry).trim()) @@ -155,6 +187,7 @@ export async function monitorDiscordProvider(opts: MonitorDiscordOpts = {}) { ? message.channel.name : undefined, Surface: "discord" as const, + WasMentioned: wasMentioned, MessageSid: message.id, Timestamp: message.createdTimestamp, MediaPath: media?.path, @@ -276,6 +309,27 @@ function buildGuildLabel(message: import("discord.js").Message) { return `${message.guild?.name ?? "Guild"} #${channelName} id:${message.channelId}`; } +function normalizeDiscordAllowList( + raw: Array | undefined, + prefixes: string[], +): { allowAll: boolean; ids: Set } | null { + if (!raw || raw.length === 0) return null; + const cleaned = raw + .map((entry) => String(entry).trim()) + .filter(Boolean) + .map((entry) => { + for (const prefix of prefixes) { + if (entry.toLowerCase().startsWith(prefix)) { + return entry.slice(prefix.length); + } + } + return entry; + }); + const allowAll = cleaned.includes("*"); + const ids = new Set(cleaned.filter((entry) => entry !== "*")); + return { allowAll, ids }; +} + async function sendTyping(message: Message) { try { const channel = message.channel; diff --git a/src/gateway/server.ts b/src/gateway/server.ts index ef7d72985..472bbd3af 100644 --- a/src/gateway/server.ts +++ b/src/gateway/server.ts @@ -2014,6 +2014,7 @@ export async function startGatewayServer( runtime: discordRuntimeEnv, abortSignal: discordAbort.signal, allowFrom: cfg.discord?.allowFrom, + guildAllowFrom: cfg.discord?.guildAllowFrom, requireMention: cfg.discord?.requireMention, mediaMaxMb: cfg.discord?.mediaMaxMb, }) diff --git a/src/process/command-queue.ts b/src/process/command-queue.ts index 81066c01e..0370c20dd 100644 --- a/src/process/command-queue.ts +++ b/src/process/command-queue.ts @@ -122,3 +122,12 @@ export function getTotalQueueSize() { } return total; } + +export function clearCommandLane(lane = "main") { + const cleaned = lane.trim() || "main"; + const state = lanes.get(cleaned); + if (!state) return 0; + const removed = state.queue.length; + state.queue.length = 0; + return removed; +} diff --git a/src/web/auto-reply.test.ts b/src/web/auto-reply.test.ts index a3714074a..8f6188adb 100644 --- a/src/web/auto-reply.test.ts +++ b/src/web/auto-reply.test.ts @@ -7,8 +7,11 @@ import sharp from "sharp"; import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; vi.mock("../agents/pi-embedded.js", () => ({ + abortEmbeddedPiRun: vi.fn().mockReturnValue(false), runEmbeddedPiAgent: vi.fn(), queueEmbeddedPiMessage: vi.fn().mockReturnValue(false), + resolveEmbeddedSessionLane: (key: string) => + `session:${key.trim() || "main"}`, })); import { runEmbeddedPiAgent } from "../agents/pi-embedded.js";