diff --git a/CHANGELOG.md b/CHANGELOG.md index a211ecef9..b80595611 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -28,6 +28,7 @@ - Hooks: default hook agent delivery to true. (#533) — thanks @mcinteerj - Hooks: normalize hook agent providers (aliases + msteams support). - WhatsApp: route queued replies to the original sender instead of the bot's own number. (#534) — thanks @mcinteerj +- WhatsApp: add broadcast groups for multi-agent replies. (#547) — thanks @pasogott - iMessage: isolate group-ish threads by chat_id. (#535) — thanks @mdahmann - Models: add OAuth expiry checks in doctor, expanded `models status` auth output (missing auth + `--check` exit codes). (#538) — thanks @latitudeki5223 - Deps: bump Pi to 0.40.0 and drop pi-ai patch (upstream 429 fix). (#543) — thanks @mcinteerj diff --git a/docs/broadcast-groups.md b/docs/broadcast-groups.md new file mode 100644 index 000000000..58833dbdc --- /dev/null +++ b/docs/broadcast-groups.md @@ -0,0 +1,395 @@ +# Broadcast Groups + +**Status:** Experimental +**Version:** Added in 2026.1.9 + +## Overview + +Broadcast Groups enable multiple agents to process and respond to the same message simultaneously. This allows you to create specialized agent teams that work together in a single WhatsApp group, DM, or channel - all using one phone number. + +Broadcast groups are evaluated after provider allowlists and group activation rules. In WhatsApp groups, this means broadcasts happen when Clawdbot would normally reply (for example: on mention, depending on your group settings). + +## Use Cases + +### 1. Specialized Agent Teams +Deploy multiple agents with atomic, focused responsibilities: +``` +Group: "Development Team" +Agents: + - CodeReviewer (reviews code snippets) + - DocumentationBot (generates docs) + - SecurityAuditor (checks for vulnerabilities) + - TestGenerator (suggests test cases) +``` + +Each agent processes the same message and provides its specialized perspective. + +### 2. Multi-Language Support +``` +Group: "International Support" +Agents: + - Agent_EN (responds in English) + - Agent_DE (responds in German) + - Agent_ES (responds in Spanish) +``` + +### 3. Quality Assurance Workflows +``` +Group: "Customer Support" +Agents: + - SupportAgent (provides answer) + - QAAgent (reviews quality, only responds if issues found) +``` + +### 4. Task Automation +``` +Group: "Project Management" +Agents: + - TaskTracker (updates task database) + - TimeLogger (logs time spent) + - ReportGenerator (creates summaries) +``` + +## Configuration + +### Basic Setup + +Add a top-level `broadcast` section (next to `bindings`): + +```json +{ + "broadcast": { + "120363403215116621@g.us": ["alfred", "baerbel", "assistant3"] + } +} +``` + +**Result:** When Clawdbot would reply in this chat, it will run all three agents. + +### Processing Strategy + +Control how agents process messages: + +#### Parallel (Default) +All agents process simultaneously: +```json +{ + "broadcast": { + "strategy": "parallel", + "120363403215116621@g.us": ["alfred", "baerbel"] + } +} +``` + +#### Sequential +Agents process in order (one waits for previous to finish): +```json +{ + "broadcast": { + "strategy": "sequential", + "120363403215116621@g.us": ["alfred", "baerbel"] + } +} +``` + +### Complete Example + +```json +{ + "agents": { + "list": [ + { + "id": "code-reviewer", + "name": "Code Reviewer", + "workspace": "/path/to/code-reviewer", + "sandbox": { "mode": "all" } + }, + { + "id": "security-auditor", + "name": "Security Auditor", + "workspace": "/path/to/security-auditor", + "sandbox": { "mode": "all" } + }, + { + "id": "docs-generator", + "name": "Documentation Generator", + "workspace": "/path/to/docs-generator", + "sandbox": { "mode": "all" } + } + ] + }, + "broadcast": { + "strategy": "parallel", + "120363403215116621@g.us": ["code-reviewer", "security-auditor", "docs-generator"], + "120363424282127706@g.us": ["support-en", "support-de"], + "+15555550123": ["assistant", "logger"] + } +} +``` + +## How It Works + +### Message Flow + +1. **Incoming message** arrives in a WhatsApp group +2. **Broadcast check**: System checks if peer ID is in `broadcast` +3. **If in broadcast list**: + - All listed agents process the message + - Each agent has its own session key and isolated context + - Agents process in parallel (default) or sequentially +4. **If not in broadcast list**: + - Normal routing applies (first matching binding) + +Note: broadcast groups do not bypass provider allowlists or group activation rules (mentions/commands/etc). They only change *which agents run* when a message is eligible for processing. + +### Session Isolation + +Each agent in a broadcast group maintains completely separate: + +- **Session keys** (`agent:alfred:whatsapp:group:120363...` vs `agent:baerbel:whatsapp:group:120363...`) +- **Conversation history** (agent doesn't see other agents' messages) +- **Workspace** (separate sandboxes if configured) +- **Tool access** (different allow/deny lists) +- **Memory/context** (separate IDENTITY.md, SOUL.md, etc.) +- **Group context buffer** (recent group messages used for context) is shared per peer, so all broadcast agents see the same context when triggered + +This allows each agent to have: +- Different personalities +- Different tool access (e.g., read-only vs. read-write) +- Different models (e.g., opus vs. sonnet) +- Different skills installed + +### Example: Isolated Sessions + +In group `120363403215116621@g.us` with agents `["alfred", "baerbel"]`: + +**Alfred's context:** +``` +Session: agent:alfred:whatsapp:group:120363403215116621@g.us +History: [user message, alfred's previous responses] +Workspace: /Users/pascal/clawd-alfred/ +Tools: read, write, bash +``` + +**Bärbel's context:** +``` +Session: agent:baerbel:whatsapp:group:120363403215116621@g.us +History: [user message, baerbel's previous responses] +Workspace: /Users/pascal/clawd-baerbel/ +Tools: read only +``` + +## Best Practices + +### 1. Keep Agents Focused + +Design each agent with a single, clear responsibility: + +```json +{ + "broadcast": { + "DEV_GROUP": ["formatter", "linter", "tester"] + } +} +``` + +✅ **Good:** Each agent has one job +❌ **Bad:** One generic "dev-helper" agent + +### 2. Use Descriptive Names + +Make it clear what each agent does: + +```json +{ + "agents": { + "security-scanner": { "name": "Security Scanner" }, + "code-formatter": { "name": "Code Formatter" }, + "test-generator": { "name": "Test Generator" } + } +} +``` + +### 3. Configure Different Tool Access + +Give agents only the tools they need: + +```json +{ + "agents": { + "reviewer": { + "tools": { "allow": ["read", "bash"] } // Read-only + }, + "fixer": { + "tools": { "allow": ["read", "write", "edit", "bash"] } // Read-write + } + } +} +``` + +### 4. Monitor Performance + +With many agents, consider: +- Using `"strategy": "parallel"` (default) for speed +- Limiting broadcast groups to 5-10 agents +- Using faster models for simpler agents + +### 5. Handle Failures Gracefully + +Agents fail independently. One agent's error doesn't block others: + +``` +Message → [Agent A ✓, Agent B ✗ error, Agent C ✓] +Result: Agent A and C respond, Agent B logs error +``` + +## Compatibility + +### Providers + +Broadcast groups currently work with: +- ✅ WhatsApp (implemented) +- 🚧 Telegram (planned) +- 🚧 Discord (planned) +- 🚧 Slack (planned) + +### Routing + +Broadcast groups work alongside existing routing: + +```json +{ + "bindings": [ + { "match": { "provider": "whatsapp", "peer": { "kind": "group", "id": "GROUP_A" } }, "agentId": "alfred" } + ], + "broadcast": { + "GROUP_B": ["agent1", "agent2"] + } +} +``` + +- `GROUP_A`: Only alfred responds (normal routing) +- `GROUP_B`: agent1 AND agent2 respond (broadcast) + +**Precedence:** `broadcast` takes priority over `bindings`. + +## Troubleshooting + +### Agents Not Responding + +**Check:** +1. Agent IDs exist in `agents.list` +2. Peer ID format is correct (e.g., `120363403215116621@g.us`) +3. Agents are not in deny lists + +**Debug:** +```bash +tail -f ~/.clawdbot/logs/gateway.log | grep broadcast +``` + +### Only One Agent Responding + +**Cause:** Peer ID might be in `bindings` but not `broadcast`. + +**Fix:** Add to broadcast config or remove from bindings. + +### Performance Issues + +**If slow with many agents:** +- Reduce number of agents per group +- Use lighter models (sonnet instead of opus) +- Check sandbox startup time + +## Examples + +### Example 1: Code Review Team + +```json +{ + "broadcast": { + "strategy": "parallel", + "120363403215116621@g.us": [ + "code-formatter", + "security-scanner", + "test-coverage", + "docs-checker" + ] + }, + "agents": { + "list": [ + { "id": "code-formatter", "workspace": "~/agents/formatter", "tools": { "allow": ["read", "write"] } }, + { "id": "security-scanner", "workspace": "~/agents/security", "tools": { "allow": ["read", "bash"] } }, + { "id": "test-coverage", "workspace": "~/agents/testing", "tools": { "allow": ["read", "bash"] } }, + { "id": "docs-checker", "workspace": "~/agents/docs", "tools": { "allow": ["read"] } } + ] + } +} +``` + +**User sends:** Code snippet +**Responses:** +- code-formatter: "Fixed indentation and added type hints" +- security-scanner: "⚠️ SQL injection vulnerability in line 12" +- test-coverage: "Coverage is 45%, missing tests for error cases" +- docs-checker: "Missing docstring for function `process_data`" + +### Example 2: Multi-Language Support + +```json +{ + "broadcast": { + "strategy": "sequential", + "+15555550123": ["detect-language", "translator-en", "translator-de"] + }, + "agents": { + "list": [ + { "id": "detect-language", "workspace": "~/agents/lang-detect" }, + { "id": "translator-en", "workspace": "~/agents/translate-en" }, + { "id": "translator-de", "workspace": "~/agents/translate-de" } + ] + } +} +``` + +## API Reference + +### Config Schema + +```typescript +interface ClawdbotConfig { + broadcast?: { + strategy?: "parallel" | "sequential"; + [peerId: string]: string[]; + }; +} +``` + +### Fields + +- `strategy` (optional): How to process agents + - `"parallel"` (default): All agents process simultaneously + - `"sequential"`: Agents process in array order + +- `[peerId]`: WhatsApp group JID, E.164 number, or other peer ID + - Value: Array of agent IDs that should process messages + +## Limitations + +1. **Max agents:** No hard limit, but 10+ agents may be slow +2. **Shared context:** Agents don't see each other's responses (by design) +3. **Message ordering:** Parallel responses may arrive in any order +4. **Rate limits:** All agents count toward WhatsApp rate limits + +## Future Enhancements + +Planned features: +- [ ] Shared context mode (agents see each other's responses) +- [ ] Agent coordination (agents can signal each other) +- [ ] Dynamic agent selection (choose agents based on message content) +- [ ] Agent priorities (some agents respond before others) + +## See Also + +- [Multi-Agent Configuration](/multi-agent-sandbox-tools) +- [Routing Configuration](/concepts/provider-routing) +- [Session Management](/concepts/sessions) diff --git a/src/config/config.test.ts b/src/config/config.test.ts index 4d9da2597..22015846c 100644 --- a/src/config/config.test.ts +++ b/src/config/config.test.ts @@ -808,6 +808,41 @@ describe("talk.voiceAliases", () => { }); }); +describe("broadcast", () => { + it("accepts a broadcast peer map with strategy", async () => { + vi.resetModules(); + const { validateConfigObject } = await import("./config.js"); + const res = validateConfigObject({ + agents: { + list: [{ id: "alfred" }, { id: "baerbel" }], + }, + broadcast: { + strategy: "parallel", + "120363403215116621@g.us": ["alfred", "baerbel"], + }, + }); + expect(res.ok).toBe(true); + }); + + it("rejects invalid broadcast strategy", async () => { + vi.resetModules(); + const { validateConfigObject } = await import("./config.js"); + const res = validateConfigObject({ + broadcast: { strategy: "nope" }, + }); + expect(res.ok).toBe(false); + }); + + it("rejects non-array broadcast entries", async () => { + vi.resetModules(); + const { validateConfigObject } = await import("./config.js"); + const res = validateConfigObject({ + broadcast: { "120363403215116621@g.us": 123 }, + }); + expect(res.ok).toBe(false); + }); +}); + describe("legacy config detection", () => { it("rejects routing.allowFrom", async () => { vi.resetModules(); diff --git a/src/config/types.ts b/src/config/types.ts index 30ab5f806..28dda35a1 100644 --- a/src/config/types.ts +++ b/src/config/types.ts @@ -946,6 +946,19 @@ export type AgentBinding = { }; }; +export type BroadcastStrategy = "parallel" | "sequential"; + +export type BroadcastConfig = { + /** Default processing strategy for broadcast peers. */ + strategy?: BroadcastStrategy; + /** + * Map peer IDs to arrays of agent IDs that should ALL process messages. + * + * Note: the index signature includes `undefined` so `strategy?: ...` remains type-safe. + */ + [peerId: string]: string[] | BroadcastStrategy | undefined; +}; + export type AudioConfig = { transcription?: { // Optional CLI to turn inbound audio into text; templated args, must output transcript to stdout. @@ -1373,6 +1386,7 @@ export type ClawdbotConfig = { agents?: AgentsConfig; tools?: ToolsConfig; bindings?: AgentBinding[]; + broadcast?: BroadcastConfig; audio?: AudioConfig; messages?: MessagesConfig; commands?: CommandsConfig; diff --git a/src/config/zod-schema.ts b/src/config/zod-schema.ts index ad279d679..e05ead895 100644 --- a/src/config/zod-schema.ts +++ b/src/config/zod-schema.ts @@ -842,6 +842,15 @@ const BindingsSchema = z ) .optional(); +const BroadcastStrategySchema = z.enum(["parallel", "sequential"]); + +const BroadcastSchema = z + .object({ + strategy: BroadcastStrategySchema.optional(), + }) + .catchall(z.array(z.string())) + .optional(); + const AudioSchema = z .object({ transcription: TranscribeAudioSchema, @@ -1188,6 +1197,7 @@ export const ClawdbotSchema = z.object({ agents: AgentsSchema, tools: ToolsSchema, bindings: BindingsSchema, + broadcast: BroadcastSchema, audio: AudioSchema, messages: MessagesSchema, commands: CommandsSchema, diff --git a/src/web/auto-reply.ts b/src/web/auto-reply.ts index 52218ca6e..a83757a7b 100644 --- a/src/web/auto-reply.ts +++ b/src/web/auto-reply.ts @@ -48,7 +48,15 @@ import { enqueueSystemEvent } from "../infra/system-events.js"; import { registerUnhandledRejectionHandler } from "../infra/unhandled-rejections.js"; import { createSubsystemLogger, getChildLogger } from "../logging.js"; import { toLocationContext } from "../providers/location.js"; -import { resolveAgentRoute } from "../routing/resolve-route.js"; +import { + buildAgentSessionKey, + resolveAgentRoute, +} from "../routing/resolve-route.js"; +import { + buildAgentMainSessionKey, + DEFAULT_MAIN_KEY, + normalizeAgentId, +} from "../routing/session-key.js"; import { defaultRuntime, type RuntimeEnv } from "../runtime.js"; import { isSelfChatMode, jidToE164, normalizeE164 } from "../utils.js"; import { resolveWhatsAppAccount } from "./accounts.js"; @@ -1070,6 +1078,7 @@ export async function monitorWebProvider( const processMessage = async ( msg: WebInboundMsg, route: ReturnType, + groupHistoryKey: string, ) => { status.lastMessageAt = Date.now(); status.lastEventAt = status.lastMessageAt; @@ -1079,7 +1088,7 @@ export async function monitorWebProvider( let shouldClearGroupHistory = false; if (msg.chatType === "group") { - const history = groupHistories.get(route.sessionKey) ?? []; + const history = groupHistories.get(groupHistoryKey) ?? []; const historyWithoutCurrent = history.length > 0 ? history.slice(0, -1) : []; if (historyWithoutCurrent.length > 0) { @@ -1291,7 +1300,7 @@ export async function monitorWebProvider( markDispatchIdle(); if (!queuedFinal) { if (shouldClearGroupHistory && didSendReply) { - groupHistories.set(route.sessionKey, []); + groupHistories.set(groupHistoryKey, []); } logVerbose( "Skipping auto-reply: silent token or no text/media returned from resolver", @@ -1300,7 +1309,7 @@ export async function monitorWebProvider( } if (shouldClearGroupHistory && didSendReply) { - groupHistories.set(route.sessionKey, []); + groupHistories.set(groupHistoryKey, []); } }; @@ -1338,7 +1347,10 @@ export async function monitorWebProvider( id: peerId, }, }); - const groupHistoryKey = route.sessionKey; + const groupHistoryKey = + msg.chatType === "group" + ? `whatsapp:${route.accountId}:group:${peerId.trim() || "unknown"}` + : route.sessionKey; // Same-phone mode logging retained if (msg.from === msg.to) { @@ -1453,7 +1465,71 @@ export async function monitorWebProvider( } } - return processMessage(msg, route); + // Broadcast groups: when we'd reply anyway, run multiple agents. + // Does not bypass group mention/activation gating above (Option A). + const broadcastAgents = cfg.broadcast?.[peerId]; + if ( + broadcastAgents && + Array.isArray(broadcastAgents) && + broadcastAgents.length > 0 + ) { + const strategy = cfg.broadcast?.strategy || "parallel"; + whatsappInboundLog.info( + `Broadcasting message to ${broadcastAgents.length} agents (${strategy})`, + ); + + const agentIds = cfg.agents?.list?.map((agent) => + normalizeAgentId(agent.id), + ); + const hasKnownAgents = (agentIds?.length ?? 0) > 0; + + const processForAgent = (agentId: string) => { + const normalizedAgentId = normalizeAgentId(agentId); + if (hasKnownAgents && !agentIds?.includes(normalizedAgentId)) { + whatsappInboundLog.warn( + `Broadcast agent ${agentId} not found in agents.list; skipping`, + ); + return Promise.resolve(); + } + const agentRoute = { + ...route, + agentId: normalizedAgentId, + sessionKey: buildAgentSessionKey({ + agentId: normalizedAgentId, + provider: "whatsapp", + peer: { + kind: msg.chatType === "group" ? "group" : "dm", + id: peerId, + }, + }), + mainSessionKey: buildAgentMainSessionKey({ + agentId: normalizedAgentId, + mainKey: DEFAULT_MAIN_KEY, + }), + }; + + return processMessage(msg, agentRoute, groupHistoryKey).catch( + (err) => { + whatsappInboundLog.error( + `Broadcast agent ${agentId} failed: ${formatError(err)}`, + ); + }, + ); + }; + + if (strategy === "sequential") { + for (const agentId of broadcastAgents) { + await processForAgent(agentId); + } + } else { + // Parallel processing (default) + await Promise.allSettled(broadcastAgents.map(processForAgent)); + } + + return; + } + + return processMessage(msg, route, groupHistoryKey); }, });