refactor: harden broadcast groups

This commit is contained in:
Peter Steinberger
2026-01-09 21:29:07 +01:00
parent 374aa856f2
commit 7641b142ad
5 changed files with 730 additions and 454 deletions

View File

@@ -2081,3 +2081,183 @@ describe("web auto-reply", () => {
resetLoadConfigMock();
});
});
describe("broadcast groups", () => {
it("broadcasts sequentially in configured order", async () => {
setLoadConfigMock({
whatsapp: { allowFrom: ["*"] },
agents: {
defaults: { maxConcurrent: 10 },
list: [{ id: "alfred" }, { id: "baerbel" }],
},
broadcast: {
strategy: "sequential",
"+1000": ["alfred", "baerbel"],
},
} satisfies ClawdbotConfig);
const sendMedia = vi.fn();
const reply = vi.fn().mockResolvedValue(undefined);
const sendComposing = vi.fn();
const seen: string[] = [];
const resolver = vi.fn(async (ctx: { SessionKey?: unknown }) => {
seen.push(String(ctx.SessionKey));
return { text: "ok" };
});
let capturedOnMessage:
| ((msg: import("./inbound.js").WebInboundMessage) => Promise<void>)
| undefined;
const listenerFactory = async (opts: {
onMessage: (
msg: import("./inbound.js").WebInboundMessage,
) => Promise<void>;
}) => {
capturedOnMessage = opts.onMessage;
return { close: vi.fn() };
};
await monitorWebProvider(false, listenerFactory, false, resolver);
expect(capturedOnMessage).toBeDefined();
await capturedOnMessage?.({
id: "m1",
from: "+1000",
conversationId: "+1000",
to: "+2000",
body: "hello",
timestamp: Date.now(),
chatType: "direct",
chatId: "direct:+1000",
sendComposing,
reply,
sendMedia,
});
expect(resolver).toHaveBeenCalledTimes(2);
expect(seen[0]).toContain("agent:alfred:");
expect(seen[1]).toContain("agent:baerbel:");
resetLoadConfigMock();
});
it("broadcasts in parallel by default", async () => {
setLoadConfigMock({
whatsapp: { allowFrom: ["*"] },
agents: {
defaults: { maxConcurrent: 10 },
list: [{ id: "alfred" }, { id: "baerbel" }],
},
broadcast: {
strategy: "parallel",
"+1000": ["alfred", "baerbel"],
},
} satisfies ClawdbotConfig);
const sendMedia = vi.fn();
const reply = vi.fn().mockResolvedValue(undefined);
const sendComposing = vi.fn();
let started = 0;
let release: (() => void) | undefined;
const gate = new Promise<void>((resolve) => {
release = resolve;
});
const resolver = vi.fn(async () => {
started += 1;
if (started < 2) {
await gate;
} else {
release?.();
}
return { text: "ok" };
});
let capturedOnMessage:
| ((msg: import("./inbound.js").WebInboundMessage) => Promise<void>)
| undefined;
const listenerFactory = async (opts: {
onMessage: (
msg: import("./inbound.js").WebInboundMessage,
) => Promise<void>;
}) => {
capturedOnMessage = opts.onMessage;
return { close: vi.fn() };
};
await monitorWebProvider(false, listenerFactory, false, resolver);
expect(capturedOnMessage).toBeDefined();
await capturedOnMessage?.({
id: "m1",
from: "+1000",
conversationId: "+1000",
to: "+2000",
body: "hello",
timestamp: Date.now(),
chatType: "direct",
chatId: "direct:+1000",
sendComposing,
reply,
sendMedia,
});
expect(resolver).toHaveBeenCalledTimes(2);
resetLoadConfigMock();
});
it("skips unknown broadcast agent ids when agents.list is present", async () => {
setLoadConfigMock({
whatsapp: { allowFrom: ["*"] },
agents: {
defaults: { maxConcurrent: 10 },
list: [{ id: "alfred" }],
},
broadcast: {
"+1000": ["alfred", "missing"],
},
} satisfies ClawdbotConfig);
const sendMedia = vi.fn();
const reply = vi.fn().mockResolvedValue(undefined);
const sendComposing = vi.fn();
const seen: string[] = [];
const resolver = vi.fn(async (ctx: { SessionKey?: unknown }) => {
seen.push(String(ctx.SessionKey));
return { text: "ok" };
});
let capturedOnMessage:
| ((msg: import("./inbound.js").WebInboundMessage) => Promise<void>)
| undefined;
const listenerFactory = async (opts: {
onMessage: (
msg: import("./inbound.js").WebInboundMessage,
) => Promise<void>;
}) => {
capturedOnMessage = opts.onMessage;
return { close: vi.fn() };
};
await monitorWebProvider(false, listenerFactory, false, resolver);
expect(capturedOnMessage).toBeDefined();
await capturedOnMessage?.({
id: "m1",
from: "+1000",
conversationId: "+1000",
to: "+2000",
body: "hello",
timestamp: Date.now(),
chatType: "direct",
chatId: "direct:+1000",
sendComposing,
reply,
sendMedia,
});
expect(resolver).toHaveBeenCalledTimes(1);
expect(seen[0]).toContain("agent:alfred:");
resetLoadConfigMock();
});
});

View File

@@ -54,6 +54,7 @@ import {
} from "../routing/resolve-route.js";
import {
buildAgentMainSessionKey,
buildGroupHistoryKey,
DEFAULT_MAIN_KEY,
normalizeAgentId,
} from "../routing/session-key.js";
@@ -1001,14 +1002,27 @@ export async function monitorWebProvider(
// Track recently sent messages to prevent echo loops
const recentlySent = new Set<string>();
const MAX_RECENT_MESSAGES = 100;
const buildCombinedEchoKey = (params: {
sessionKey: string;
combinedBody: string;
}) => `combined:${params.sessionKey}:${params.combinedBody}`;
const rememberSentText = (
text: string | undefined,
opts: { combinedBody: string; logVerboseMessage?: boolean },
opts: {
combinedBody?: string;
combinedBodySessionKey?: string;
logVerboseMessage?: boolean;
},
) => {
if (!text) return;
recentlySent.add(text);
if (opts.combinedBody) {
recentlySent.add(opts.combinedBody);
if (opts.combinedBody && opts.combinedBodySessionKey) {
recentlySent.add(
buildCombinedEchoKey({
sessionKey: opts.combinedBodySessionKey,
combinedBody: opts.combinedBody,
}),
);
}
if (opts.logVerboseMessage) {
logVerbose(
@@ -1117,9 +1131,13 @@ export async function monitorWebProvider(
}
// Echo detection uses combined body so we don't respond twice.
if (recentlySent.has(combinedBody)) {
const combinedEchoKey = buildCombinedEchoKey({
sessionKey: route.sessionKey,
combinedBody,
});
if (recentlySent.has(combinedEchoKey)) {
logVerbose(`Skipping auto-reply: detected echo for combined message`);
recentlySent.delete(combinedBody);
recentlySent.delete(combinedEchoKey);
return;
}
@@ -1213,13 +1231,14 @@ export async function monitorWebProvider(
});
didSendReply = true;
if (info.kind === "tool") {
rememberSentText(payload.text, { combinedBody: "" });
rememberSentText(payload.text, {});
return;
}
const shouldLog =
info.kind === "final" && payload.text ? true : undefined;
rememberSentText(payload.text, {
combinedBody,
combinedBodySessionKey: route.sessionKey,
logVerboseMessage: shouldLog,
});
if (info.kind === "final") {
@@ -1274,7 +1293,7 @@ export async function monitorWebProvider(
GroupSubject: msg.groupSubject,
GroupMembers: formatGroupMembers(
msg.groupParticipants,
groupMemberNames.get(route.sessionKey),
groupMemberNames.get(groupHistoryKey),
msg.senderE164,
),
SenderName: msg.senderName,
@@ -1313,6 +1332,70 @@ export async function monitorWebProvider(
}
};
const maybeBroadcastMessage = async (params: {
msg: WebInboundMsg;
peerId: string;
route: ReturnType<typeof resolveAgentRoute>;
groupHistoryKey: string;
}): Promise<boolean> => {
const { msg, peerId, route, groupHistoryKey } = params;
const broadcastAgents = cfg.broadcast?.[peerId];
if (!broadcastAgents || !Array.isArray(broadcastAgents)) return false;
if (broadcastAgents.length === 0) return false;
const strategy = cfg.broadcast?.strategy || "parallel";
whatsappInboundLog.info(
`Broadcasting message to ${broadcastAgents.length} agents (${strategy})`,
);
const agentIds = cfg.agents?.list?.map((agent) =>
normalizeAgentId(agent.id),
);
const hasKnownAgents = (agentIds?.length ?? 0) > 0;
const processForAgent = (agentId: string) => {
const normalizedAgentId = normalizeAgentId(agentId);
if (hasKnownAgents && !agentIds?.includes(normalizedAgentId)) {
whatsappInboundLog.warn(
`Broadcast agent ${agentId} not found in agents.list; skipping`,
);
return Promise.resolve();
}
const agentRoute = {
...route,
agentId: normalizedAgentId,
sessionKey: buildAgentSessionKey({
agentId: normalizedAgentId,
provider: "whatsapp",
peer: {
kind: msg.chatType === "group" ? "group" : "dm",
id: peerId,
},
}),
mainSessionKey: buildAgentMainSessionKey({
agentId: normalizedAgentId,
mainKey: DEFAULT_MAIN_KEY,
}),
};
return processMessage(msg, agentRoute, groupHistoryKey).catch((err) => {
whatsappInboundLog.error(
`Broadcast agent ${agentId} failed: ${formatError(err)}`,
);
});
};
if (strategy === "sequential") {
for (const agentId of broadcastAgents) {
await processForAgent(agentId);
}
} else {
await Promise.allSettled(broadcastAgents.map(processForAgent));
}
return true;
};
const listener = await (listenerFactory ?? monitorWebInbox)({
verbose,
accountId: account.accountId,
@@ -1349,7 +1432,12 @@ export async function monitorWebProvider(
});
const groupHistoryKey =
msg.chatType === "group"
? `whatsapp:${route.accountId}:group:${peerId.trim() || "unknown"}`
? buildGroupHistoryKey({
provider: "whatsapp",
accountId: route.accountId,
peerKind: "group",
peerId,
})
: route.sessionKey;
// Same-phone mode logging retained
@@ -1467,65 +1555,9 @@ export async function monitorWebProvider(
// Broadcast groups: when we'd reply anyway, run multiple agents.
// Does not bypass group mention/activation gating above (Option A).
const broadcastAgents = cfg.broadcast?.[peerId];
if (
broadcastAgents &&
Array.isArray(broadcastAgents) &&
broadcastAgents.length > 0
await maybeBroadcastMessage({ msg, peerId, route, groupHistoryKey })
) {
const strategy = cfg.broadcast?.strategy || "parallel";
whatsappInboundLog.info(
`Broadcasting message to ${broadcastAgents.length} agents (${strategy})`,
);
const agentIds = cfg.agents?.list?.map((agent) =>
normalizeAgentId(agent.id),
);
const hasKnownAgents = (agentIds?.length ?? 0) > 0;
const processForAgent = (agentId: string) => {
const normalizedAgentId = normalizeAgentId(agentId);
if (hasKnownAgents && !agentIds?.includes(normalizedAgentId)) {
whatsappInboundLog.warn(
`Broadcast agent ${agentId} not found in agents.list; skipping`,
);
return Promise.resolve();
}
const agentRoute = {
...route,
agentId: normalizedAgentId,
sessionKey: buildAgentSessionKey({
agentId: normalizedAgentId,
provider: "whatsapp",
peer: {
kind: msg.chatType === "group" ? "group" : "dm",
id: peerId,
},
}),
mainSessionKey: buildAgentMainSessionKey({
agentId: normalizedAgentId,
mainKey: DEFAULT_MAIN_KEY,
}),
};
return processMessage(msg, agentRoute, groupHistoryKey).catch(
(err) => {
whatsappInboundLog.error(
`Broadcast agent ${agentId} failed: ${formatError(err)}`,
);
},
);
};
if (strategy === "sequential") {
for (const agentId of broadcastAgents) {
await processForAgent(agentId);
}
} else {
// Parallel processing (default)
await Promise.allSettled(broadcastAgents.map(processForAgent));
}
return;
}