diff --git a/CHANGELOG.md b/CHANGELOG.md index c2ef0699d..7e68db013 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -27,6 +27,7 @@ - Hooks: default hook agent delivery to true. (#533) — thanks @mcinteerj - Hooks: normalize hook agent providers (aliases + msteams support). - WhatsApp: route queued replies to the original sender instead of the bot's own number. (#534) — thanks @mcinteerj +- WhatsApp: add broadcast groups for multi-agent replies. (#547) — thanks @pasogott - iMessage: isolate group-ish threads by chat_id. (#535) — thanks @mdahmann - Models: add OAuth expiry checks in doctor, expanded `models status` auth output (missing auth + `--check` exit codes). (#538) — thanks @latitudeki5223 - Deps: bump Pi to 0.40.0 and drop pi-ai patch (upstream 429 fix). (#543) — thanks @mcinteerj diff --git a/docs/broadcast-groups.md b/docs/broadcast-groups.md index 5a736ac0c..58833dbdc 100644 --- a/docs/broadcast-groups.md +++ b/docs/broadcast-groups.md @@ -7,6 +7,8 @@ Broadcast Groups enable multiple agents to process and respond to the same message simultaneously. This allows you to create specialized agent teams that work together in a single WhatsApp group, DM, or channel - all using one phone number. +Broadcast groups are evaluated after provider allowlists and group activation rules. In WhatsApp groups, this means broadcasts happen when Clawdbot would normally reply (for example: on mention, depending on your group settings). + ## Use Cases ### 1. Specialized Agent Teams @@ -52,19 +54,17 @@ Agents: ### Basic Setup -Add a `broadcast` section to your routing config: +Add a top-level `broadcast` section (next to `bindings`): ```json { - "routing": { - "broadcast": { - "120363403215116621@g.us": ["alfred", "baerbel", "assistant3"] - } + "broadcast": { + "120363403215116621@g.us": ["alfred", "baerbel", "assistant3"] } } ``` -**Result:** All three agents process every message in this WhatsApp group. +**Result:** When Clawdbot would reply in this chat, it will run all three agents. ### Processing Strategy @@ -74,11 +74,9 @@ Control how agents process messages: All agents process simultaneously: ```json { - "routing": { - "broadcast": { - "strategy": "parallel", - "120363403215116621@g.us": ["alfred", "baerbel"] - } + "broadcast": { + "strategy": "parallel", + "120363403215116621@g.us": ["alfred", "baerbel"] } } ``` @@ -87,11 +85,9 @@ All agents process simultaneously: Agents process in order (one waits for previous to finish): ```json { - "routing": { - "broadcast": { - "strategy": "sequential", - "120363403215116621@g.us": ["alfred", "baerbel"] - } + "broadcast": { + "strategy": "sequential", + "120363403215116621@g.us": ["alfred", "baerbel"] } } ``` @@ -100,31 +96,33 @@ Agents process in order (one waits for previous to finish): ```json { - "routing": { - "defaultAgentId": "main", - "broadcast": { - "strategy": "parallel", - "120363403215116621@g.us": ["code-reviewer", "security-auditor", "docs-generator"], - "120363424282127706@g.us": ["support-en", "support-de"], - "+15555550123": ["assistant", "logger"] - }, - "agents": { - "code-reviewer": { + "agents": { + "list": [ + { + "id": "code-reviewer", "name": "Code Reviewer", "workspace": "/path/to/code-reviewer", "sandbox": { "mode": "all" } }, - "security-auditor": { - "name": "Security Auditor", + { + "id": "security-auditor", + "name": "Security Auditor", "workspace": "/path/to/security-auditor", "sandbox": { "mode": "all" } }, - "docs-generator": { + { + "id": "docs-generator", "name": "Documentation Generator", "workspace": "/path/to/docs-generator", "sandbox": { "mode": "all" } } - } + ] + }, + "broadcast": { + "strategy": "parallel", + "120363403215116621@g.us": ["code-reviewer", "security-auditor", "docs-generator"], + "120363424282127706@g.us": ["support-en", "support-de"], + "+15555550123": ["assistant", "logger"] } } ``` @@ -134,7 +132,7 @@ Agents process in order (one waits for previous to finish): ### Message Flow 1. **Incoming message** arrives in a WhatsApp group -2. **Routing check**: System checks if peer ID is in `routing.broadcast` +2. **Broadcast check**: System checks if peer ID is in `broadcast` 3. **If in broadcast list**: - All listed agents process the message - Each agent has its own session key and isolated context @@ -142,6 +140,8 @@ Agents process in order (one waits for previous to finish): 4. **If not in broadcast list**: - Normal routing applies (first matching binding) +Note: broadcast groups do not bypass provider allowlists or group activation rules (mentions/commands/etc). They only change *which agents run* when a message is eligible for processing. + ### Session Isolation Each agent in a broadcast group maintains completely separate: @@ -151,6 +151,7 @@ Each agent in a broadcast group maintains completely separate: - **Workspace** (separate sandboxes if configured) - **Tool access** (different allow/deny lists) - **Memory/context** (separate IDENTITY.md, SOUL.md, etc.) +- **Group context buffer** (recent group messages used for context) is shared per peer, so all broadcast agents see the same context when triggered This allows each agent to have: - Different personalities @@ -258,13 +259,11 @@ Broadcast groups work alongside existing routing: ```json { - "routing": { - "bindings": [ - { "agentId": "alfred", "match": { "peer": { "id": "GROUP_A" } } } - ], - "broadcast": { - "GROUP_B": ["agent1", "agent2"] - } + "bindings": [ + { "match": { "provider": "whatsapp", "peer": { "kind": "group", "id": "GROUP_A" } }, "agentId": "alfred" } + ], + "broadcast": { + "GROUP_B": ["agent1", "agent2"] } } ``` @@ -279,7 +278,7 @@ Broadcast groups work alongside existing routing: ### Agents Not Responding **Check:** -1. Agent IDs exist in `routing.agents` +1. Agent IDs exist in `agents.list` 2. Peer ID format is correct (e.g., `120363403215116621@g.us`) 3. Agents are not in deny lists @@ -307,34 +306,22 @@ tail -f ~/.clawdbot/logs/gateway.log | grep broadcast ```json { - "routing": { - "broadcast": { - "strategy": "parallel", - "120363403215116621@g.us": [ - "code-formatter", - "security-scanner", - "test-coverage", - "docs-checker" - ] - }, - "agents": { - "code-formatter": { - "workspace": "~/agents/formatter", - "tools": { "allow": ["read", "write"] } - }, - "security-scanner": { - "workspace": "~/agents/security", - "tools": { "allow": ["read", "bash"] } - }, - "test-coverage": { - "workspace": "~/agents/testing", - "tools": { "allow": ["read", "bash"] } - }, - "docs-checker": { - "workspace": "~/agents/docs", - "tools": { "allow": ["read"] } - } - } + "broadcast": { + "strategy": "parallel", + "120363403215116621@g.us": [ + "code-formatter", + "security-scanner", + "test-coverage", + "docs-checker" + ] + }, + "agents": { + "list": [ + { "id": "code-formatter", "workspace": "~/agents/formatter", "tools": { "allow": ["read", "write"] } }, + { "id": "security-scanner", "workspace": "~/agents/security", "tools": { "allow": ["read", "bash"] } }, + { "id": "test-coverage", "workspace": "~/agents/testing", "tools": { "allow": ["read", "bash"] } }, + { "id": "docs-checker", "workspace": "~/agents/docs", "tools": { "allow": ["read"] } } + ] } } ``` @@ -350,22 +337,16 @@ tail -f ~/.clawdbot/logs/gateway.log | grep broadcast ```json { - "routing": { - "broadcast": { - "strategy": "sequential", - "+15555550123": ["detect-language", "translator-en", "translator-de"] - }, - "agents": { - "detect-language": { - "workspace": "~/agents/lang-detect" - }, - "translator-en": { - "workspace": "~/agents/translate-en" - }, - "translator-de": { - "workspace": "~/agents/translate-de" - } - } + "broadcast": { + "strategy": "sequential", + "+15555550123": ["detect-language", "translator-en", "translator-de"] + }, + "agents": { + "list": [ + { "id": "detect-language", "workspace": "~/agents/lang-detect" }, + { "id": "translator-en", "workspace": "~/agents/translate-en" }, + { "id": "translator-de", "workspace": "~/agents/translate-de" } + ] } } ``` @@ -375,10 +356,10 @@ tail -f ~/.clawdbot/logs/gateway.log | grep broadcast ### Config Schema ```typescript -interface RoutingConfig { +interface ClawdbotConfig { broadcast?: { strategy?: "parallel" | "sequential"; - [peerId: string]: string[] | "parallel" | "sequential"; + [peerId: string]: string[]; }; } ``` @@ -409,6 +390,6 @@ Planned features: ## See Also -- [Multi-Agent Configuration](multi-agent-sandbox-tools.md) -- [Routing Configuration](routing.md) -- [Session Management](sessions.md) +- [Multi-Agent Configuration](/multi-agent-sandbox-tools) +- [Routing Configuration](/concepts/provider-routing) +- [Session Management](/concepts/sessions) diff --git a/src/config/config.test.ts b/src/config/config.test.ts index 4d9da2597..22015846c 100644 --- a/src/config/config.test.ts +++ b/src/config/config.test.ts @@ -808,6 +808,41 @@ describe("talk.voiceAliases", () => { }); }); +describe("broadcast", () => { + it("accepts a broadcast peer map with strategy", async () => { + vi.resetModules(); + const { validateConfigObject } = await import("./config.js"); + const res = validateConfigObject({ + agents: { + list: [{ id: "alfred" }, { id: "baerbel" }], + }, + broadcast: { + strategy: "parallel", + "120363403215116621@g.us": ["alfred", "baerbel"], + }, + }); + expect(res.ok).toBe(true); + }); + + it("rejects invalid broadcast strategy", async () => { + vi.resetModules(); + const { validateConfigObject } = await import("./config.js"); + const res = validateConfigObject({ + broadcast: { strategy: "nope" }, + }); + expect(res.ok).toBe(false); + }); + + it("rejects non-array broadcast entries", async () => { + vi.resetModules(); + const { validateConfigObject } = await import("./config.js"); + const res = validateConfigObject({ + broadcast: { "120363403215116621@g.us": 123 }, + }); + expect(res.ok).toBe(false); + }); +}); + describe("legacy config detection", () => { it("rejects routing.allowFrom", async () => { vi.resetModules(); diff --git a/src/config/types.ts b/src/config/types.ts index 30ab5f806..28dda35a1 100644 --- a/src/config/types.ts +++ b/src/config/types.ts @@ -946,6 +946,19 @@ export type AgentBinding = { }; }; +export type BroadcastStrategy = "parallel" | "sequential"; + +export type BroadcastConfig = { + /** Default processing strategy for broadcast peers. */ + strategy?: BroadcastStrategy; + /** + * Map peer IDs to arrays of agent IDs that should ALL process messages. + * + * Note: the index signature includes `undefined` so `strategy?: ...` remains type-safe. + */ + [peerId: string]: string[] | BroadcastStrategy | undefined; +}; + export type AudioConfig = { transcription?: { // Optional CLI to turn inbound audio into text; templated args, must output transcript to stdout. @@ -1373,6 +1386,7 @@ export type ClawdbotConfig = { agents?: AgentsConfig; tools?: ToolsConfig; bindings?: AgentBinding[]; + broadcast?: BroadcastConfig; audio?: AudioConfig; messages?: MessagesConfig; commands?: CommandsConfig; diff --git a/src/config/zod-schema.ts b/src/config/zod-schema.ts index ad279d679..e05ead895 100644 --- a/src/config/zod-schema.ts +++ b/src/config/zod-schema.ts @@ -842,6 +842,15 @@ const BindingsSchema = z ) .optional(); +const BroadcastStrategySchema = z.enum(["parallel", "sequential"]); + +const BroadcastSchema = z + .object({ + strategy: BroadcastStrategySchema.optional(), + }) + .catchall(z.array(z.string())) + .optional(); + const AudioSchema = z .object({ transcription: TranscribeAudioSchema, @@ -1188,6 +1197,7 @@ export const ClawdbotSchema = z.object({ agents: AgentsSchema, tools: ToolsSchema, bindings: BindingsSchema, + broadcast: BroadcastSchema, audio: AudioSchema, messages: MessagesSchema, commands: CommandsSchema, diff --git a/src/web/auto-reply.ts b/src/web/auto-reply.ts index bb20decba..a83757a7b 100644 --- a/src/web/auto-reply.ts +++ b/src/web/auto-reply.ts @@ -48,13 +48,14 @@ import { enqueueSystemEvent } from "../infra/system-events.js"; import { registerUnhandledRejectionHandler } from "../infra/unhandled-rejections.js"; import { createSubsystemLogger, getChildLogger } from "../logging.js"; import { toLocationContext } from "../providers/location.js"; -import { resolveAgentRoute } from "../routing/resolve-route.js"; +import { + buildAgentSessionKey, + resolveAgentRoute, +} from "../routing/resolve-route.js"; import { buildAgentMainSessionKey, - buildAgentPeerSessionKey, DEFAULT_MAIN_KEY, normalizeAgentId, - normalizeId, } from "../routing/session-key.js"; import { defaultRuntime, type RuntimeEnv } from "../runtime.js"; import { isSelfChatMode, jidToE164, normalizeE164 } from "../utils.js"; @@ -1077,6 +1078,7 @@ export async function monitorWebProvider( const processMessage = async ( msg: WebInboundMsg, route: ReturnType, + groupHistoryKey: string, ) => { status.lastMessageAt = Date.now(); status.lastEventAt = status.lastMessageAt; @@ -1086,7 +1088,7 @@ export async function monitorWebProvider( let shouldClearGroupHistory = false; if (msg.chatType === "group") { - const history = groupHistories.get(route.sessionKey) ?? []; + const history = groupHistories.get(groupHistoryKey) ?? []; const historyWithoutCurrent = history.length > 0 ? history.slice(0, -1) : []; if (historyWithoutCurrent.length > 0) { @@ -1298,7 +1300,7 @@ export async function monitorWebProvider( markDispatchIdle(); if (!queuedFinal) { if (shouldClearGroupHistory && didSendReply) { - groupHistories.set(route.sessionKey, []); + groupHistories.set(groupHistoryKey, []); } logVerbose( "Skipping auto-reply: silent token or no text/media returned from resolver", @@ -1307,7 +1309,7 @@ export async function monitorWebProvider( } if (shouldClearGroupHistory && didSendReply) { - groupHistories.set(route.sessionKey, []); + groupHistories.set(groupHistoryKey, []); } }; @@ -1345,7 +1347,10 @@ export async function monitorWebProvider( id: peerId, }, }); - const groupHistoryKey = route.sessionKey; + const groupHistoryKey = + msg.chatType === "group" + ? `whatsapp:${route.accountId}:group:${peerId.trim() || "unknown"}` + : route.sessionKey; // Same-phone mode logging retained if (msg.from === msg.to) { @@ -1460,29 +1465,42 @@ export async function monitorWebProvider( } } - // Check for broadcast groups - const broadcastAgents = cfg.routing?.broadcast?.[peerId]; + // Broadcast groups: when we'd reply anyway, run multiple agents. + // Does not bypass group mention/activation gating above (Option A). + const broadcastAgents = cfg.broadcast?.[peerId]; if ( broadcastAgents && Array.isArray(broadcastAgents) && broadcastAgents.length > 0 ) { - const strategy = cfg.routing?.broadcast?.strategy || "parallel"; + const strategy = cfg.broadcast?.strategy || "parallel"; whatsappInboundLog.info( `Broadcasting message to ${broadcastAgents.length} agents (${strategy})`, ); + const agentIds = cfg.agents?.list?.map((agent) => + normalizeAgentId(agent.id), + ); + const hasKnownAgents = (agentIds?.length ?? 0) > 0; + const processForAgent = (agentId: string) => { const normalizedAgentId = normalizeAgentId(agentId); + if (hasKnownAgents && !agentIds?.includes(normalizedAgentId)) { + whatsappInboundLog.warn( + `Broadcast agent ${agentId} not found in agents.list; skipping`, + ); + return Promise.resolve(); + } const agentRoute = { ...route, agentId: normalizedAgentId, - sessionKey: buildAgentPeerSessionKey({ + sessionKey: buildAgentSessionKey({ agentId: normalizedAgentId, - mainKey: DEFAULT_MAIN_KEY, provider: "whatsapp", - peerKind: msg.chatType === "group" ? "group" : "dm", - peerId: normalizeId(peerId), + peer: { + kind: msg.chatType === "group" ? "group" : "dm", + id: peerId, + }, }), mainSessionKey: buildAgentMainSessionKey({ agentId: normalizedAgentId, @@ -1490,11 +1508,13 @@ export async function monitorWebProvider( }), }; - return processMessage(msg, agentRoute).catch((err) => { - whatsappInboundLog.error( - `Broadcast agent ${agentId} failed: ${formatError(err)}`, - ); - }); + return processMessage(msg, agentRoute, groupHistoryKey).catch( + (err) => { + whatsappInboundLog.error( + `Broadcast agent ${agentId} failed: ${formatError(err)}`, + ); + }, + ); }; if (strategy === "sequential") { @@ -1509,7 +1529,7 @@ export async function monitorWebProvider( return; } - return processMessage(msg, route); + return processMessage(msg, route, groupHistoryKey); }, });