diff --git a/docs/concepts/provider-routing.md b/docs/concepts/provider-routing.md index 958d3d83e..ab27321e1 100644 --- a/docs/concepts/provider-routing.md +++ b/docs/concepts/provider-routing.md @@ -51,6 +51,24 @@ Routing picks **one agent** for each inbound message: The matched agent determines which workspace and session store are used. +## Broadcast groups (run multiple agents) + +Broadcast groups let you run **multiple agents** for the same peer **when Clawdbot would normally reply** (for example: in WhatsApp groups, after mention/activation gating). + +Config: + +```json5 +{ + broadcast: { + strategy: "parallel", + "120363403215116621@g.us": ["alfred", "baerbel"], + "+15555550123": ["support", "logger"] + } +} +``` + +See: [Broadcast Groups](/broadcast-groups). + ## Config overview - `agents.list`: named agent definitions (workspace, model, etc.). diff --git a/src/config/zod-schema.ts b/src/config/zod-schema.ts index e05ead895..823c218db 100644 --- a/src/config/zod-schema.ts +++ b/src/config/zod-schema.ts @@ -1079,393 +1079,423 @@ const AgentDefaultsSchema = z }) .optional(); -export const ClawdbotSchema = z.object({ - env: z - .object({ - shellEnv: z - .object({ - enabled: z.boolean().optional(), - timeoutMs: z.number().int().nonnegative().optional(), - }) - .optional(), - vars: z.record(z.string(), z.string()).optional(), - }) - .catchall(z.string()) - .optional(), - wizard: z - .object({ - lastRunAt: z.string().optional(), - lastRunVersion: z.string().optional(), - lastRunCommit: z.string().optional(), - lastRunCommand: z.string().optional(), - lastRunMode: z - .union([z.literal("local"), z.literal("remote")]) - .optional(), - }) - .optional(), - logging: z - .object({ - level: z - .union([ - z.literal("silent"), - z.literal("fatal"), - z.literal("error"), - z.literal("warn"), - z.literal("info"), - z.literal("debug"), - z.literal("trace"), - ]) - .optional(), - file: z.string().optional(), - consoleLevel: z - .union([ - z.literal("silent"), - z.literal("fatal"), - z.literal("error"), - z.literal("warn"), - z.literal("info"), - z.literal("debug"), - z.literal("trace"), - ]) - .optional(), - consoleStyle: z - .union([z.literal("pretty"), z.literal("compact"), z.literal("json")]) - .optional(), - redactSensitive: z - .union([z.literal("off"), z.literal("tools")]) - .optional(), - redactPatterns: z.array(z.string()).optional(), - }) - .optional(), - browser: z - .object({ - enabled: z.boolean().optional(), - controlUrl: z.string().optional(), - cdpUrl: z.string().optional(), - color: z.string().optional(), - executablePath: z.string().optional(), - headless: z.boolean().optional(), - noSandbox: z.boolean().optional(), - attachOnly: z.boolean().optional(), - defaultProfile: z.string().optional(), - profiles: z - .record( - z - .string() - .regex( - /^[a-z0-9-]+$/, - "Profile names must be alphanumeric with hyphens only", - ), - z - .object({ - cdpPort: z.number().int().min(1).max(65535).optional(), - cdpUrl: z.string().optional(), - color: HexColorSchema, - }) - .refine((value) => value.cdpPort || value.cdpUrl, { - message: "Profile must set cdpPort or cdpUrl", +export const ClawdbotSchema = z + .object({ + env: z + .object({ + shellEnv: z + .object({ + enabled: z.boolean().optional(), + timeoutMs: z.number().int().nonnegative().optional(), + }) + .optional(), + vars: z.record(z.string(), z.string()).optional(), + }) + .catchall(z.string()) + .optional(), + wizard: z + .object({ + lastRunAt: z.string().optional(), + lastRunVersion: z.string().optional(), + lastRunCommit: z.string().optional(), + lastRunCommand: z.string().optional(), + lastRunMode: z + .union([z.literal("local"), z.literal("remote")]) + .optional(), + }) + .optional(), + logging: z + .object({ + level: z + .union([ + z.literal("silent"), + z.literal("fatal"), + z.literal("error"), + z.literal("warn"), + z.literal("info"), + z.literal("debug"), + z.literal("trace"), + ]) + .optional(), + file: z.string().optional(), + consoleLevel: z + .union([ + z.literal("silent"), + z.literal("fatal"), + z.literal("error"), + z.literal("warn"), + z.literal("info"), + z.literal("debug"), + z.literal("trace"), + ]) + .optional(), + consoleStyle: z + .union([z.literal("pretty"), z.literal("compact"), z.literal("json")]) + .optional(), + redactSensitive: z + .union([z.literal("off"), z.literal("tools")]) + .optional(), + redactPatterns: z.array(z.string()).optional(), + }) + .optional(), + browser: z + .object({ + enabled: z.boolean().optional(), + controlUrl: z.string().optional(), + cdpUrl: z.string().optional(), + color: z.string().optional(), + executablePath: z.string().optional(), + headless: z.boolean().optional(), + noSandbox: z.boolean().optional(), + attachOnly: z.boolean().optional(), + defaultProfile: z.string().optional(), + profiles: z + .record( + z + .string() + .regex( + /^[a-z0-9-]+$/, + "Profile names must be alphanumeric with hyphens only", + ), + z + .object({ + cdpPort: z.number().int().min(1).max(65535).optional(), + cdpUrl: z.string().optional(), + color: HexColorSchema, + }) + .refine((value) => value.cdpPort || value.cdpUrl, { + message: "Profile must set cdpPort or cdpUrl", + }), + ) + .optional(), + }) + .optional(), + ui: z + .object({ + seamColor: HexColorSchema.optional(), + }) + .optional(), + auth: z + .object({ + profiles: z + .record( + z.string(), + z.object({ + provider: z.string(), + mode: z.union([ + z.literal("api_key"), + z.literal("oauth"), + z.literal("token"), + ]), + email: z.string().optional(), }), - ) - .optional(), - }) - .optional(), - ui: z - .object({ - seamColor: HexColorSchema.optional(), - }) - .optional(), - auth: z - .object({ - profiles: z - .record( - z.string(), - z.object({ - provider: z.string(), - mode: z.union([ - z.literal("api_key"), - z.literal("oauth"), - z.literal("token"), - ]), - email: z.string().optional(), - }), - ) - .optional(), - order: z.record(z.string(), z.array(z.string())).optional(), - }) - .optional(), - models: ModelsConfigSchema, - agents: AgentsSchema, - tools: ToolsSchema, - bindings: BindingsSchema, - broadcast: BroadcastSchema, - audio: AudioSchema, - messages: MessagesSchema, - commands: CommandsSchema, - session: SessionSchema, - cron: z - .object({ - enabled: z.boolean().optional(), - store: z.string().optional(), - maxConcurrentRuns: z.number().int().positive().optional(), - }) - .optional(), - hooks: z - .object({ - enabled: z.boolean().optional(), - path: z.string().optional(), - token: z.string().optional(), - maxBodyBytes: z.number().int().positive().optional(), - presets: z.array(z.string()).optional(), - transformsDir: z.string().optional(), - mappings: z.array(HookMappingSchema).optional(), - gmail: HooksGmailSchema, - }) - .optional(), - web: z - .object({ - enabled: z.boolean().optional(), - heartbeatSeconds: z.number().int().positive().optional(), - reconnect: z - .object({ - initialMs: z.number().positive().optional(), - maxMs: z.number().positive().optional(), - factor: z.number().positive().optional(), - jitter: z.number().min(0).max(1).optional(), - maxAttempts: z.number().int().min(0).optional(), - }) - .optional(), - }) - .optional(), - whatsapp: z - .object({ - accounts: z - .record( - z.string(), - z - .object({ - name: z.string().optional(), - capabilities: z.array(z.string()).optional(), - enabled: z.boolean().optional(), - messagePrefix: z.string().optional(), - /** Override auth directory for this WhatsApp account (Baileys multi-file auth state). */ - authDir: z.string().optional(), - dmPolicy: DmPolicySchema.optional().default("pairing"), - selfChatMode: z.boolean().optional(), - allowFrom: z.array(z.string()).optional(), - groupAllowFrom: z.array(z.string()).optional(), - groupPolicy: GroupPolicySchema.optional().default("open"), - textChunkLimit: z.number().int().positive().optional(), - mediaMaxMb: z.number().int().positive().optional(), - blockStreaming: z.boolean().optional(), - blockStreamingCoalesce: BlockStreamingCoalesceSchema.optional(), - groups: z - .record( - z.string(), - z - .object({ - requireMention: z.boolean().optional(), - }) - .optional(), - ) - .optional(), - }) - .superRefine((value, ctx) => { - if (value.dmPolicy !== "open") return; - const allow = (value.allowFrom ?? []) - .map((v) => String(v).trim()) - .filter(Boolean); - if (allow.includes("*")) return; - ctx.addIssue({ - code: z.ZodIssueCode.custom, - path: ["allowFrom"], - message: - 'whatsapp.accounts.*.dmPolicy="open" requires allowFrom to include "*"', - }); - }) - .optional(), - ) - .optional(), - capabilities: z.array(z.string()).optional(), - dmPolicy: DmPolicySchema.optional().default("pairing"), - messagePrefix: z.string().optional(), - selfChatMode: z.boolean().optional(), - allowFrom: z.array(z.string()).optional(), - groupAllowFrom: z.array(z.string()).optional(), - groupPolicy: GroupPolicySchema.optional().default("open"), - textChunkLimit: z.number().int().positive().optional(), - mediaMaxMb: z.number().int().positive().optional().default(50), - blockStreaming: z.boolean().optional(), - blockStreamingCoalesce: BlockStreamingCoalesceSchema.optional(), - actions: z - .object({ - reactions: z.boolean().optional(), - sendMessage: z.boolean().optional(), - polls: z.boolean().optional(), - }) - .optional(), - groups: z - .record( - z.string(), - z - .object({ - requireMention: z.boolean().optional(), - }) - .optional(), - ) - .optional(), - }) - .superRefine((value, ctx) => { - if (value.dmPolicy !== "open") return; - const allow = (value.allowFrom ?? []) - .map((v) => String(v).trim()) - .filter(Boolean); - if (allow.includes("*")) return; - ctx.addIssue({ - code: z.ZodIssueCode.custom, - path: ["allowFrom"], - message: - 'whatsapp.dmPolicy="open" requires whatsapp.allowFrom to include "*"', - }); - }) - .optional(), - telegram: TelegramConfigSchema.optional(), - discord: DiscordConfigSchema.optional(), - slack: SlackConfigSchema.optional(), - signal: SignalConfigSchema.optional(), - imessage: IMessageConfigSchema.optional(), - msteams: MSTeamsConfigSchema.optional(), - bridge: z - .object({ - enabled: z.boolean().optional(), - port: z.number().int().positive().optional(), - bind: z - .union([ - z.literal("auto"), - z.literal("lan"), - z.literal("tailnet"), - z.literal("loopback"), - ]) - .optional(), - }) - .optional(), - discovery: z - .object({ - wideArea: z - .object({ - enabled: z.boolean().optional(), - }) - .optional(), - }) - .optional(), - canvasHost: z - .object({ - enabled: z.boolean().optional(), - root: z.string().optional(), - port: z.number().int().positive().optional(), - liveReload: z.boolean().optional(), - }) - .optional(), - talk: z - .object({ - voiceId: z.string().optional(), - voiceAliases: z.record(z.string(), z.string()).optional(), - modelId: z.string().optional(), - outputFormat: z.string().optional(), - apiKey: z.string().optional(), - interruptOnSpeech: z.boolean().optional(), - }) - .optional(), - gateway: z - .object({ - port: z.number().int().positive().optional(), - mode: z.union([z.literal("local"), z.literal("remote")]).optional(), - bind: z - .union([ - z.literal("auto"), - z.literal("lan"), - z.literal("tailnet"), - z.literal("loopback"), - ]) - .optional(), - controlUi: z - .object({ - enabled: z.boolean().optional(), - basePath: z.string().optional(), - }) - .optional(), - auth: z - .object({ - mode: z.union([z.literal("token"), z.literal("password")]).optional(), - token: z.string().optional(), - password: z.string().optional(), - allowTailscale: z.boolean().optional(), - }) - .optional(), - tailscale: z - .object({ - mode: z - .union([z.literal("off"), z.literal("serve"), z.literal("funnel")]) - .optional(), - resetOnExit: z.boolean().optional(), - }) - .optional(), - remote: z - .object({ - url: z.string().optional(), - token: z.string().optional(), - password: z.string().optional(), - sshTarget: z.string().optional(), - sshIdentity: z.string().optional(), - }) - .optional(), - reload: z - .object({ - mode: z - .union([ - z.literal("off"), - z.literal("restart"), - z.literal("hot"), - z.literal("hybrid"), - ]) - .optional(), - debounceMs: z.number().int().min(0).optional(), - }) - .optional(), - }) - .optional(), - skills: z - .object({ - allowBundled: z.array(z.string()).optional(), - load: z - .object({ - extraDirs: z.array(z.string()).optional(), - }) - .optional(), - install: z - .object({ - preferBrew: z.boolean().optional(), - nodeManager: z - .union([ - z.literal("npm"), - z.literal("pnpm"), - z.literal("yarn"), - z.literal("bun"), - ]) - .optional(), - }) - .optional(), - entries: z - .record( - z.string(), - z - .object({ - enabled: z.boolean().optional(), - apiKey: z.string().optional(), - env: z.record(z.string(), z.string()).optional(), - }) - .passthrough(), - ) - .optional(), - }) - .optional(), -}); + ) + .optional(), + order: z.record(z.string(), z.array(z.string())).optional(), + }) + .optional(), + models: ModelsConfigSchema, + agents: AgentsSchema, + tools: ToolsSchema, + bindings: BindingsSchema, + broadcast: BroadcastSchema, + audio: AudioSchema, + messages: MessagesSchema, + commands: CommandsSchema, + session: SessionSchema, + cron: z + .object({ + enabled: z.boolean().optional(), + store: z.string().optional(), + maxConcurrentRuns: z.number().int().positive().optional(), + }) + .optional(), + hooks: z + .object({ + enabled: z.boolean().optional(), + path: z.string().optional(), + token: z.string().optional(), + maxBodyBytes: z.number().int().positive().optional(), + presets: z.array(z.string()).optional(), + transformsDir: z.string().optional(), + mappings: z.array(HookMappingSchema).optional(), + gmail: HooksGmailSchema, + }) + .optional(), + web: z + .object({ + enabled: z.boolean().optional(), + heartbeatSeconds: z.number().int().positive().optional(), + reconnect: z + .object({ + initialMs: z.number().positive().optional(), + maxMs: z.number().positive().optional(), + factor: z.number().positive().optional(), + jitter: z.number().min(0).max(1).optional(), + maxAttempts: z.number().int().min(0).optional(), + }) + .optional(), + }) + .optional(), + whatsapp: z + .object({ + accounts: z + .record( + z.string(), + z + .object({ + name: z.string().optional(), + capabilities: z.array(z.string()).optional(), + enabled: z.boolean().optional(), + messagePrefix: z.string().optional(), + /** Override auth directory for this WhatsApp account (Baileys multi-file auth state). */ + authDir: z.string().optional(), + dmPolicy: DmPolicySchema.optional().default("pairing"), + selfChatMode: z.boolean().optional(), + allowFrom: z.array(z.string()).optional(), + groupAllowFrom: z.array(z.string()).optional(), + groupPolicy: GroupPolicySchema.optional().default("open"), + textChunkLimit: z.number().int().positive().optional(), + mediaMaxMb: z.number().int().positive().optional(), + blockStreaming: z.boolean().optional(), + blockStreamingCoalesce: BlockStreamingCoalesceSchema.optional(), + groups: z + .record( + z.string(), + z + .object({ + requireMention: z.boolean().optional(), + }) + .optional(), + ) + .optional(), + }) + .superRefine((value, ctx) => { + if (value.dmPolicy !== "open") return; + const allow = (value.allowFrom ?? []) + .map((v) => String(v).trim()) + .filter(Boolean); + if (allow.includes("*")) return; + ctx.addIssue({ + code: z.ZodIssueCode.custom, + path: ["allowFrom"], + message: + 'whatsapp.accounts.*.dmPolicy="open" requires allowFrom to include "*"', + }); + }) + .optional(), + ) + .optional(), + capabilities: z.array(z.string()).optional(), + dmPolicy: DmPolicySchema.optional().default("pairing"), + messagePrefix: z.string().optional(), + selfChatMode: z.boolean().optional(), + allowFrom: z.array(z.string()).optional(), + groupAllowFrom: z.array(z.string()).optional(), + groupPolicy: GroupPolicySchema.optional().default("open"), + textChunkLimit: z.number().int().positive().optional(), + mediaMaxMb: z.number().int().positive().optional().default(50), + blockStreaming: z.boolean().optional(), + blockStreamingCoalesce: BlockStreamingCoalesceSchema.optional(), + actions: z + .object({ + reactions: z.boolean().optional(), + sendMessage: z.boolean().optional(), + polls: z.boolean().optional(), + }) + .optional(), + groups: z + .record( + z.string(), + z + .object({ + requireMention: z.boolean().optional(), + }) + .optional(), + ) + .optional(), + }) + .superRefine((value, ctx) => { + if (value.dmPolicy !== "open") return; + const allow = (value.allowFrom ?? []) + .map((v) => String(v).trim()) + .filter(Boolean); + if (allow.includes("*")) return; + ctx.addIssue({ + code: z.ZodIssueCode.custom, + path: ["allowFrom"], + message: + 'whatsapp.dmPolicy="open" requires whatsapp.allowFrom to include "*"', + }); + }) + .optional(), + telegram: TelegramConfigSchema.optional(), + discord: DiscordConfigSchema.optional(), + slack: SlackConfigSchema.optional(), + signal: SignalConfigSchema.optional(), + imessage: IMessageConfigSchema.optional(), + msteams: MSTeamsConfigSchema.optional(), + bridge: z + .object({ + enabled: z.boolean().optional(), + port: z.number().int().positive().optional(), + bind: z + .union([ + z.literal("auto"), + z.literal("lan"), + z.literal("tailnet"), + z.literal("loopback"), + ]) + .optional(), + }) + .optional(), + discovery: z + .object({ + wideArea: z + .object({ + enabled: z.boolean().optional(), + }) + .optional(), + }) + .optional(), + canvasHost: z + .object({ + enabled: z.boolean().optional(), + root: z.string().optional(), + port: z.number().int().positive().optional(), + liveReload: z.boolean().optional(), + }) + .optional(), + talk: z + .object({ + voiceId: z.string().optional(), + voiceAliases: z.record(z.string(), z.string()).optional(), + modelId: z.string().optional(), + outputFormat: z.string().optional(), + apiKey: z.string().optional(), + interruptOnSpeech: z.boolean().optional(), + }) + .optional(), + gateway: z + .object({ + port: z.number().int().positive().optional(), + mode: z.union([z.literal("local"), z.literal("remote")]).optional(), + bind: z + .union([ + z.literal("auto"), + z.literal("lan"), + z.literal("tailnet"), + z.literal("loopback"), + ]) + .optional(), + controlUi: z + .object({ + enabled: z.boolean().optional(), + basePath: z.string().optional(), + }) + .optional(), + auth: z + .object({ + mode: z + .union([z.literal("token"), z.literal("password")]) + .optional(), + token: z.string().optional(), + password: z.string().optional(), + allowTailscale: z.boolean().optional(), + }) + .optional(), + tailscale: z + .object({ + mode: z + .union([ + z.literal("off"), + z.literal("serve"), + z.literal("funnel"), + ]) + .optional(), + resetOnExit: z.boolean().optional(), + }) + .optional(), + remote: z + .object({ + url: z.string().optional(), + token: z.string().optional(), + password: z.string().optional(), + sshTarget: z.string().optional(), + sshIdentity: z.string().optional(), + }) + .optional(), + reload: z + .object({ + mode: z + .union([ + z.literal("off"), + z.literal("restart"), + z.literal("hot"), + z.literal("hybrid"), + ]) + .optional(), + debounceMs: z.number().int().min(0).optional(), + }) + .optional(), + }) + .optional(), + skills: z + .object({ + allowBundled: z.array(z.string()).optional(), + load: z + .object({ + extraDirs: z.array(z.string()).optional(), + }) + .optional(), + install: z + .object({ + preferBrew: z.boolean().optional(), + nodeManager: z + .union([ + z.literal("npm"), + z.literal("pnpm"), + z.literal("yarn"), + z.literal("bun"), + ]) + .optional(), + }) + .optional(), + entries: z + .record( + z.string(), + z + .object({ + enabled: z.boolean().optional(), + apiKey: z.string().optional(), + env: z.record(z.string(), z.string()).optional(), + }) + .passthrough(), + ) + .optional(), + }) + .optional(), + }) + .superRefine((cfg, ctx) => { + const agents = cfg.agents?.list ?? []; + if (agents.length === 0) return; + const agentIds = new Set(agents.map((agent) => agent.id)); + + const broadcast = cfg.broadcast; + if (!broadcast) return; + + for (const [peerId, ids] of Object.entries(broadcast)) { + if (peerId === "strategy") continue; + if (!Array.isArray(ids)) continue; + for (let idx = 0; idx < ids.length; idx += 1) { + const agentId = ids[idx]; + if (!agentIds.has(agentId)) { + ctx.addIssue({ + code: z.ZodIssueCode.custom, + path: ["broadcast", peerId, idx], + message: `Unknown agent id "${agentId}" (not in agents.list).`, + }); + } + } + } + }); diff --git a/src/routing/session-key.ts b/src/routing/session-key.ts index 2ab004150..1506b1f8a 100644 --- a/src/routing/session-key.ts +++ b/src/routing/session-key.ts @@ -2,6 +2,10 @@ export const DEFAULT_AGENT_ID = "main"; export const DEFAULT_MAIN_KEY = "main"; export const DEFAULT_ACCOUNT_ID = "default"; +function normalizeToken(value: string | undefined | null): string { + return (value ?? "").trim().toLowerCase(); +} + export type ParsedAgentSessionKey = { agentId: string; rest: string; @@ -97,6 +101,18 @@ export function buildAgentPeerSessionKey(params: { return `agent:${normalizeAgentId(params.agentId)}:${provider}:${peerKind}:${peerId}`; } +export function buildGroupHistoryKey(params: { + provider: string; + accountId?: string | null; + peerKind: "group" | "channel"; + peerId: string; +}): string { + const provider = normalizeToken(params.provider) || "unknown"; + const accountId = normalizeAccountId(params.accountId); + const peerId = params.peerId.trim() || "unknown"; + return `${provider}:${accountId}:${params.peerKind}:${peerId}`; +} + export function resolveThreadSessionKeys(params: { baseSessionKey: string; threadId?: string | null; diff --git a/src/web/auto-reply.test.ts b/src/web/auto-reply.test.ts index f172ba0af..6152c56ff 100644 --- a/src/web/auto-reply.test.ts +++ b/src/web/auto-reply.test.ts @@ -2081,3 +2081,183 @@ describe("web auto-reply", () => { resetLoadConfigMock(); }); }); + +describe("broadcast groups", () => { + it("broadcasts sequentially in configured order", async () => { + setLoadConfigMock({ + whatsapp: { allowFrom: ["*"] }, + agents: { + defaults: { maxConcurrent: 10 }, + list: [{ id: "alfred" }, { id: "baerbel" }], + }, + broadcast: { + strategy: "sequential", + "+1000": ["alfred", "baerbel"], + }, + } satisfies ClawdbotConfig); + + const sendMedia = vi.fn(); + const reply = vi.fn().mockResolvedValue(undefined); + const sendComposing = vi.fn(); + const seen: string[] = []; + const resolver = vi.fn(async (ctx: { SessionKey?: unknown }) => { + seen.push(String(ctx.SessionKey)); + return { text: "ok" }; + }); + + let capturedOnMessage: + | ((msg: import("./inbound.js").WebInboundMessage) => Promise) + | undefined; + const listenerFactory = async (opts: { + onMessage: ( + msg: import("./inbound.js").WebInboundMessage, + ) => Promise; + }) => { + capturedOnMessage = opts.onMessage; + return { close: vi.fn() }; + }; + + await monitorWebProvider(false, listenerFactory, false, resolver); + expect(capturedOnMessage).toBeDefined(); + + await capturedOnMessage?.({ + id: "m1", + from: "+1000", + conversationId: "+1000", + to: "+2000", + body: "hello", + timestamp: Date.now(), + chatType: "direct", + chatId: "direct:+1000", + sendComposing, + reply, + sendMedia, + }); + + expect(resolver).toHaveBeenCalledTimes(2); + expect(seen[0]).toContain("agent:alfred:"); + expect(seen[1]).toContain("agent:baerbel:"); + resetLoadConfigMock(); + }); + + it("broadcasts in parallel by default", async () => { + setLoadConfigMock({ + whatsapp: { allowFrom: ["*"] }, + agents: { + defaults: { maxConcurrent: 10 }, + list: [{ id: "alfred" }, { id: "baerbel" }], + }, + broadcast: { + strategy: "parallel", + "+1000": ["alfred", "baerbel"], + }, + } satisfies ClawdbotConfig); + + const sendMedia = vi.fn(); + const reply = vi.fn().mockResolvedValue(undefined); + const sendComposing = vi.fn(); + + let started = 0; + let release: (() => void) | undefined; + const gate = new Promise((resolve) => { + release = resolve; + }); + + const resolver = vi.fn(async () => { + started += 1; + if (started < 2) { + await gate; + } else { + release?.(); + } + return { text: "ok" }; + }); + + let capturedOnMessage: + | ((msg: import("./inbound.js").WebInboundMessage) => Promise) + | undefined; + const listenerFactory = async (opts: { + onMessage: ( + msg: import("./inbound.js").WebInboundMessage, + ) => Promise; + }) => { + capturedOnMessage = opts.onMessage; + return { close: vi.fn() }; + }; + + await monitorWebProvider(false, listenerFactory, false, resolver); + expect(capturedOnMessage).toBeDefined(); + + await capturedOnMessage?.({ + id: "m1", + from: "+1000", + conversationId: "+1000", + to: "+2000", + body: "hello", + timestamp: Date.now(), + chatType: "direct", + chatId: "direct:+1000", + sendComposing, + reply, + sendMedia, + }); + + expect(resolver).toHaveBeenCalledTimes(2); + resetLoadConfigMock(); + }); + + it("skips unknown broadcast agent ids when agents.list is present", async () => { + setLoadConfigMock({ + whatsapp: { allowFrom: ["*"] }, + agents: { + defaults: { maxConcurrent: 10 }, + list: [{ id: "alfred" }], + }, + broadcast: { + "+1000": ["alfred", "missing"], + }, + } satisfies ClawdbotConfig); + + const sendMedia = vi.fn(); + const reply = vi.fn().mockResolvedValue(undefined); + const sendComposing = vi.fn(); + const seen: string[] = []; + const resolver = vi.fn(async (ctx: { SessionKey?: unknown }) => { + seen.push(String(ctx.SessionKey)); + return { text: "ok" }; + }); + + let capturedOnMessage: + | ((msg: import("./inbound.js").WebInboundMessage) => Promise) + | undefined; + const listenerFactory = async (opts: { + onMessage: ( + msg: import("./inbound.js").WebInboundMessage, + ) => Promise; + }) => { + capturedOnMessage = opts.onMessage; + return { close: vi.fn() }; + }; + + await monitorWebProvider(false, listenerFactory, false, resolver); + expect(capturedOnMessage).toBeDefined(); + + await capturedOnMessage?.({ + id: "m1", + from: "+1000", + conversationId: "+1000", + to: "+2000", + body: "hello", + timestamp: Date.now(), + chatType: "direct", + chatId: "direct:+1000", + sendComposing, + reply, + sendMedia, + }); + + expect(resolver).toHaveBeenCalledTimes(1); + expect(seen[0]).toContain("agent:alfred:"); + resetLoadConfigMock(); + }); +}); diff --git a/src/web/auto-reply.ts b/src/web/auto-reply.ts index a83757a7b..994283617 100644 --- a/src/web/auto-reply.ts +++ b/src/web/auto-reply.ts @@ -54,6 +54,7 @@ import { } from "../routing/resolve-route.js"; import { buildAgentMainSessionKey, + buildGroupHistoryKey, DEFAULT_MAIN_KEY, normalizeAgentId, } from "../routing/session-key.js"; @@ -1001,14 +1002,27 @@ export async function monitorWebProvider( // Track recently sent messages to prevent echo loops const recentlySent = new Set(); const MAX_RECENT_MESSAGES = 100; + const buildCombinedEchoKey = (params: { + sessionKey: string; + combinedBody: string; + }) => `combined:${params.sessionKey}:${params.combinedBody}`; const rememberSentText = ( text: string | undefined, - opts: { combinedBody: string; logVerboseMessage?: boolean }, + opts: { + combinedBody?: string; + combinedBodySessionKey?: string; + logVerboseMessage?: boolean; + }, ) => { if (!text) return; recentlySent.add(text); - if (opts.combinedBody) { - recentlySent.add(opts.combinedBody); + if (opts.combinedBody && opts.combinedBodySessionKey) { + recentlySent.add( + buildCombinedEchoKey({ + sessionKey: opts.combinedBodySessionKey, + combinedBody: opts.combinedBody, + }), + ); } if (opts.logVerboseMessage) { logVerbose( @@ -1117,9 +1131,13 @@ export async function monitorWebProvider( } // Echo detection uses combined body so we don't respond twice. - if (recentlySent.has(combinedBody)) { + const combinedEchoKey = buildCombinedEchoKey({ + sessionKey: route.sessionKey, + combinedBody, + }); + if (recentlySent.has(combinedEchoKey)) { logVerbose(`Skipping auto-reply: detected echo for combined message`); - recentlySent.delete(combinedBody); + recentlySent.delete(combinedEchoKey); return; } @@ -1213,13 +1231,14 @@ export async function monitorWebProvider( }); didSendReply = true; if (info.kind === "tool") { - rememberSentText(payload.text, { combinedBody: "" }); + rememberSentText(payload.text, {}); return; } const shouldLog = info.kind === "final" && payload.text ? true : undefined; rememberSentText(payload.text, { combinedBody, + combinedBodySessionKey: route.sessionKey, logVerboseMessage: shouldLog, }); if (info.kind === "final") { @@ -1274,7 +1293,7 @@ export async function monitorWebProvider( GroupSubject: msg.groupSubject, GroupMembers: formatGroupMembers( msg.groupParticipants, - groupMemberNames.get(route.sessionKey), + groupMemberNames.get(groupHistoryKey), msg.senderE164, ), SenderName: msg.senderName, @@ -1313,6 +1332,70 @@ export async function monitorWebProvider( } }; + const maybeBroadcastMessage = async (params: { + msg: WebInboundMsg; + peerId: string; + route: ReturnType; + groupHistoryKey: string; + }): Promise => { + const { msg, peerId, route, groupHistoryKey } = params; + const broadcastAgents = cfg.broadcast?.[peerId]; + if (!broadcastAgents || !Array.isArray(broadcastAgents)) return false; + if (broadcastAgents.length === 0) return false; + + const strategy = cfg.broadcast?.strategy || "parallel"; + whatsappInboundLog.info( + `Broadcasting message to ${broadcastAgents.length} agents (${strategy})`, + ); + + const agentIds = cfg.agents?.list?.map((agent) => + normalizeAgentId(agent.id), + ); + const hasKnownAgents = (agentIds?.length ?? 0) > 0; + + const processForAgent = (agentId: string) => { + const normalizedAgentId = normalizeAgentId(agentId); + if (hasKnownAgents && !agentIds?.includes(normalizedAgentId)) { + whatsappInboundLog.warn( + `Broadcast agent ${agentId} not found in agents.list; skipping`, + ); + return Promise.resolve(); + } + const agentRoute = { + ...route, + agentId: normalizedAgentId, + sessionKey: buildAgentSessionKey({ + agentId: normalizedAgentId, + provider: "whatsapp", + peer: { + kind: msg.chatType === "group" ? "group" : "dm", + id: peerId, + }, + }), + mainSessionKey: buildAgentMainSessionKey({ + agentId: normalizedAgentId, + mainKey: DEFAULT_MAIN_KEY, + }), + }; + + return processMessage(msg, agentRoute, groupHistoryKey).catch((err) => { + whatsappInboundLog.error( + `Broadcast agent ${agentId} failed: ${formatError(err)}`, + ); + }); + }; + + if (strategy === "sequential") { + for (const agentId of broadcastAgents) { + await processForAgent(agentId); + } + } else { + await Promise.allSettled(broadcastAgents.map(processForAgent)); + } + + return true; + }; + const listener = await (listenerFactory ?? monitorWebInbox)({ verbose, accountId: account.accountId, @@ -1349,7 +1432,12 @@ export async function monitorWebProvider( }); const groupHistoryKey = msg.chatType === "group" - ? `whatsapp:${route.accountId}:group:${peerId.trim() || "unknown"}` + ? buildGroupHistoryKey({ + provider: "whatsapp", + accountId: route.accountId, + peerKind: "group", + peerId, + }) : route.sessionKey; // Same-phone mode logging retained @@ -1467,65 +1555,9 @@ export async function monitorWebProvider( // Broadcast groups: when we'd reply anyway, run multiple agents. // Does not bypass group mention/activation gating above (Option A). - const broadcastAgents = cfg.broadcast?.[peerId]; if ( - broadcastAgents && - Array.isArray(broadcastAgents) && - broadcastAgents.length > 0 + await maybeBroadcastMessage({ msg, peerId, route, groupHistoryKey }) ) { - const strategy = cfg.broadcast?.strategy || "parallel"; - whatsappInboundLog.info( - `Broadcasting message to ${broadcastAgents.length} agents (${strategy})`, - ); - - const agentIds = cfg.agents?.list?.map((agent) => - normalizeAgentId(agent.id), - ); - const hasKnownAgents = (agentIds?.length ?? 0) > 0; - - const processForAgent = (agentId: string) => { - const normalizedAgentId = normalizeAgentId(agentId); - if (hasKnownAgents && !agentIds?.includes(normalizedAgentId)) { - whatsappInboundLog.warn( - `Broadcast agent ${agentId} not found in agents.list; skipping`, - ); - return Promise.resolve(); - } - const agentRoute = { - ...route, - agentId: normalizedAgentId, - sessionKey: buildAgentSessionKey({ - agentId: normalizedAgentId, - provider: "whatsapp", - peer: { - kind: msg.chatType === "group" ? "group" : "dm", - id: peerId, - }, - }), - mainSessionKey: buildAgentMainSessionKey({ - agentId: normalizedAgentId, - mainKey: DEFAULT_MAIN_KEY, - }), - }; - - return processMessage(msg, agentRoute, groupHistoryKey).catch( - (err) => { - whatsappInboundLog.error( - `Broadcast agent ${agentId} failed: ${formatError(err)}`, - ); - }, - ); - }; - - if (strategy === "sequential") { - for (const agentId of broadcastAgents) { - await processForAgent(agentId); - } - } else { - // Parallel processing (default) - await Promise.allSettled(broadcastAgents.map(processForAgent)); - } - return; }