fix: land broadcast groups (#547) (thanks @pasogott)

This commit is contained in:
Peter Steinberger
2026-01-09 21:14:19 +01:00
parent 09769d127f
commit 76964162c7
6 changed files with 169 additions and 108 deletions

View File

@@ -808,6 +808,41 @@ describe("talk.voiceAliases", () => {
});
});
describe("broadcast", () => {
it("accepts a broadcast peer map with strategy", async () => {
vi.resetModules();
const { validateConfigObject } = await import("./config.js");
const res = validateConfigObject({
agents: {
list: [{ id: "alfred" }, { id: "baerbel" }],
},
broadcast: {
strategy: "parallel",
"120363403215116621@g.us": ["alfred", "baerbel"],
},
});
expect(res.ok).toBe(true);
});
it("rejects invalid broadcast strategy", async () => {
vi.resetModules();
const { validateConfigObject } = await import("./config.js");
const res = validateConfigObject({
broadcast: { strategy: "nope" },
});
expect(res.ok).toBe(false);
});
it("rejects non-array broadcast entries", async () => {
vi.resetModules();
const { validateConfigObject } = await import("./config.js");
const res = validateConfigObject({
broadcast: { "120363403215116621@g.us": 123 },
});
expect(res.ok).toBe(false);
});
});
describe("legacy config detection", () => {
it("rejects routing.allowFrom", async () => {
vi.resetModules();

View File

@@ -946,6 +946,19 @@ export type AgentBinding = {
};
};
export type BroadcastStrategy = "parallel" | "sequential";
export type BroadcastConfig = {
/** Default processing strategy for broadcast peers. */
strategy?: BroadcastStrategy;
/**
* Map peer IDs to arrays of agent IDs that should ALL process messages.
*
* Note: the index signature includes `undefined` so `strategy?: ...` remains type-safe.
*/
[peerId: string]: string[] | BroadcastStrategy | undefined;
};
export type AudioConfig = {
transcription?: {
// Optional CLI to turn inbound audio into text; templated args, must output transcript to stdout.
@@ -1373,6 +1386,7 @@ export type ClawdbotConfig = {
agents?: AgentsConfig;
tools?: ToolsConfig;
bindings?: AgentBinding[];
broadcast?: BroadcastConfig;
audio?: AudioConfig;
messages?: MessagesConfig;
commands?: CommandsConfig;

View File

@@ -842,6 +842,15 @@ const BindingsSchema = z
)
.optional();
const BroadcastStrategySchema = z.enum(["parallel", "sequential"]);
const BroadcastSchema = z
.object({
strategy: BroadcastStrategySchema.optional(),
})
.catchall(z.array(z.string()))
.optional();
const AudioSchema = z
.object({
transcription: TranscribeAudioSchema,
@@ -1188,6 +1197,7 @@ export const ClawdbotSchema = z.object({
agents: AgentsSchema,
tools: ToolsSchema,
bindings: BindingsSchema,
broadcast: BroadcastSchema,
audio: AudioSchema,
messages: MessagesSchema,
commands: CommandsSchema,

View File

@@ -48,13 +48,14 @@ import { enqueueSystemEvent } from "../infra/system-events.js";
import { registerUnhandledRejectionHandler } from "../infra/unhandled-rejections.js";
import { createSubsystemLogger, getChildLogger } from "../logging.js";
import { toLocationContext } from "../providers/location.js";
import { resolveAgentRoute } from "../routing/resolve-route.js";
import {
buildAgentSessionKey,
resolveAgentRoute,
} from "../routing/resolve-route.js";
import {
buildAgentMainSessionKey,
buildAgentPeerSessionKey,
DEFAULT_MAIN_KEY,
normalizeAgentId,
normalizeId,
} from "../routing/session-key.js";
import { defaultRuntime, type RuntimeEnv } from "../runtime.js";
import { isSelfChatMode, jidToE164, normalizeE164 } from "../utils.js";
@@ -1077,6 +1078,7 @@ export async function monitorWebProvider(
const processMessage = async (
msg: WebInboundMsg,
route: ReturnType<typeof resolveAgentRoute>,
groupHistoryKey: string,
) => {
status.lastMessageAt = Date.now();
status.lastEventAt = status.lastMessageAt;
@@ -1086,7 +1088,7 @@ export async function monitorWebProvider(
let shouldClearGroupHistory = false;
if (msg.chatType === "group") {
const history = groupHistories.get(route.sessionKey) ?? [];
const history = groupHistories.get(groupHistoryKey) ?? [];
const historyWithoutCurrent =
history.length > 0 ? history.slice(0, -1) : [];
if (historyWithoutCurrent.length > 0) {
@@ -1298,7 +1300,7 @@ export async function monitorWebProvider(
markDispatchIdle();
if (!queuedFinal) {
if (shouldClearGroupHistory && didSendReply) {
groupHistories.set(route.sessionKey, []);
groupHistories.set(groupHistoryKey, []);
}
logVerbose(
"Skipping auto-reply: silent token or no text/media returned from resolver",
@@ -1307,7 +1309,7 @@ export async function monitorWebProvider(
}
if (shouldClearGroupHistory && didSendReply) {
groupHistories.set(route.sessionKey, []);
groupHistories.set(groupHistoryKey, []);
}
};
@@ -1345,7 +1347,10 @@ export async function monitorWebProvider(
id: peerId,
},
});
const groupHistoryKey = route.sessionKey;
const groupHistoryKey =
msg.chatType === "group"
? `whatsapp:${route.accountId}:group:${peerId.trim() || "unknown"}`
: route.sessionKey;
// Same-phone mode logging retained
if (msg.from === msg.to) {
@@ -1460,29 +1465,42 @@ export async function monitorWebProvider(
}
}
// Check for broadcast groups
const broadcastAgents = cfg.routing?.broadcast?.[peerId];
// Broadcast groups: when we'd reply anyway, run multiple agents.
// Does not bypass group mention/activation gating above (Option A).
const broadcastAgents = cfg.broadcast?.[peerId];
if (
broadcastAgents &&
Array.isArray(broadcastAgents) &&
broadcastAgents.length > 0
) {
const strategy = cfg.routing?.broadcast?.strategy || "parallel";
const strategy = cfg.broadcast?.strategy || "parallel";
whatsappInboundLog.info(
`Broadcasting message to ${broadcastAgents.length} agents (${strategy})`,
);
const agentIds = cfg.agents?.list?.map((agent) =>
normalizeAgentId(agent.id),
);
const hasKnownAgents = (agentIds?.length ?? 0) > 0;
const processForAgent = (agentId: string) => {
const normalizedAgentId = normalizeAgentId(agentId);
if (hasKnownAgents && !agentIds?.includes(normalizedAgentId)) {
whatsappInboundLog.warn(
`Broadcast agent ${agentId} not found in agents.list; skipping`,
);
return Promise.resolve();
}
const agentRoute = {
...route,
agentId: normalizedAgentId,
sessionKey: buildAgentPeerSessionKey({
sessionKey: buildAgentSessionKey({
agentId: normalizedAgentId,
mainKey: DEFAULT_MAIN_KEY,
provider: "whatsapp",
peerKind: msg.chatType === "group" ? "group" : "dm",
peerId: normalizeId(peerId),
peer: {
kind: msg.chatType === "group" ? "group" : "dm",
id: peerId,
},
}),
mainSessionKey: buildAgentMainSessionKey({
agentId: normalizedAgentId,
@@ -1490,11 +1508,13 @@ export async function monitorWebProvider(
}),
};
return processMessage(msg, agentRoute).catch((err) => {
whatsappInboundLog.error(
`Broadcast agent ${agentId} failed: ${formatError(err)}`,
);
});
return processMessage(msg, agentRoute, groupHistoryKey).catch(
(err) => {
whatsappInboundLog.error(
`Broadcast agent ${agentId} failed: ${formatError(err)}`,
);
},
);
};
if (strategy === "sequential") {
@@ -1509,7 +1529,7 @@ export async function monitorWebProvider(
return;
}
return processMessage(msg, route);
return processMessage(msg, route, groupHistoryKey);
},
});