fix: broadcast group history consistency (#547)

This commit is contained in:
Peter Steinberger
2026-01-09 21:39:28 +01:00
parent 7641b142ad
commit 1a97aadb6b
2 changed files with 149 additions and 12 deletions

View File

@@ -2140,6 +2140,112 @@ describe("broadcast groups", () => {
resetLoadConfigMock(); resetLoadConfigMock();
}); });
it("shares group history across broadcast agents and clears after replying", async () => {
setLoadConfigMock({
whatsapp: { allowFrom: ["*"] },
agents: {
defaults: { maxConcurrent: 10 },
list: [{ id: "alfred" }, { id: "baerbel" }],
},
broadcast: {
strategy: "sequential",
"123@g.us": ["alfred", "baerbel"],
},
} satisfies ClawdbotConfig);
const sendMedia = vi.fn();
const reply = vi.fn().mockResolvedValue(undefined);
const sendComposing = vi.fn();
const resolver = vi.fn().mockResolvedValue({ text: "ok" });
let capturedOnMessage:
| ((msg: import("./inbound.js").WebInboundMessage) => Promise<void>)
| undefined;
const listenerFactory = async (opts: {
onMessage: (
msg: import("./inbound.js").WebInboundMessage,
) => Promise<void>;
}) => {
capturedOnMessage = opts.onMessage;
return { close: vi.fn() };
};
await monitorWebProvider(false, listenerFactory, false, resolver);
expect(capturedOnMessage).toBeDefined();
await capturedOnMessage?.({
body: "hello group",
from: "123@g.us",
conversationId: "123@g.us",
chatId: "123@g.us",
chatType: "group",
to: "+2",
id: "g1",
senderE164: "+111",
senderName: "Alice",
selfE164: "+999",
sendComposing,
reply,
sendMedia,
});
expect(resolver).not.toHaveBeenCalled();
await capturedOnMessage?.({
body: "@bot ping",
from: "123@g.us",
conversationId: "123@g.us",
chatId: "123@g.us",
chatType: "group",
to: "+2",
id: "g2",
senderE164: "+222",
senderName: "Bob",
mentionedJids: ["999@s.whatsapp.net"],
selfE164: "+999",
selfJid: "999@s.whatsapp.net",
sendComposing,
reply,
sendMedia,
});
expect(resolver).toHaveBeenCalledTimes(2);
for (const call of resolver.mock.calls.slice(0, 2)) {
const payload = call[0] as { Body: string };
expect(payload.Body).toContain("Chat messages since your last reply");
expect(payload.Body).toContain("Alice: hello group");
expect(payload.Body).toContain("@bot ping");
expect(payload.Body).toContain("[from: Bob (+222)]");
}
await capturedOnMessage?.({
body: "@bot ping 2",
from: "123@g.us",
conversationId: "123@g.us",
chatId: "123@g.us",
chatType: "group",
to: "+2",
id: "g3",
senderE164: "+333",
senderName: "Clara",
mentionedJids: ["999@s.whatsapp.net"],
selfE164: "+999",
selfJid: "999@s.whatsapp.net",
sendComposing,
reply,
sendMedia,
});
expect(resolver).toHaveBeenCalledTimes(4);
for (const call of resolver.mock.calls.slice(2, 4)) {
const payload = call[0] as { Body: string };
expect(payload.Body).not.toContain("Alice: hello group");
expect(payload.Body).not.toContain("Chat messages since your last reply");
}
resetLoadConfigMock();
});
it("broadcasts in parallel by default", async () => { it("broadcasts in parallel by default", async () => {
setLoadConfigMock({ setLoadConfigMock({
whatsapp: { allowFrom: ["*"] }, whatsapp: { allowFrom: ["*"] },

View File

@@ -1093,7 +1093,15 @@ export async function monitorWebProvider(
msg: WebInboundMsg, msg: WebInboundMsg,
route: ReturnType<typeof resolveAgentRoute>, route: ReturnType<typeof resolveAgentRoute>,
groupHistoryKey: string, groupHistoryKey: string,
) => { opts?: {
groupHistory?: Array<{
sender: string;
body: string;
timestamp?: number;
}>;
suppressGroupHistoryClear?: boolean;
},
): Promise<boolean> => {
status.lastMessageAt = Date.now(); status.lastMessageAt = Date.now();
status.lastEventAt = status.lastMessageAt; status.lastEventAt = status.lastMessageAt;
emitStatus(); emitStatus();
@@ -1102,7 +1110,8 @@ export async function monitorWebProvider(
let shouldClearGroupHistory = false; let shouldClearGroupHistory = false;
if (msg.chatType === "group") { if (msg.chatType === "group") {
const history = groupHistories.get(groupHistoryKey) ?? []; const history =
opts?.groupHistory ?? groupHistories.get(groupHistoryKey) ?? [];
const historyWithoutCurrent = const historyWithoutCurrent =
history.length > 0 ? history.slice(0, -1) : []; history.length > 0 ? history.slice(0, -1) : [];
if (historyWithoutCurrent.length > 0) { if (historyWithoutCurrent.length > 0) {
@@ -1127,7 +1136,7 @@ export async function monitorWebProvider(
? `${msg.senderName} (${msg.senderE164})` ? `${msg.senderName} (${msg.senderE164})`
: (msg.senderName ?? msg.senderE164 ?? "Unknown"); : (msg.senderName ?? msg.senderE164 ?? "Unknown");
combinedBody = `${combinedBody}\\n[from: ${senderLabel}]`; combinedBody = `${combinedBody}\\n[from: ${senderLabel}]`;
shouldClearGroupHistory = true; shouldClearGroupHistory = !(opts?.suppressGroupHistoryClear ?? false);
} }
// Echo detection uses combined body so we don't respond twice. // Echo detection uses combined body so we don't respond twice.
@@ -1138,7 +1147,7 @@ export async function monitorWebProvider(
if (recentlySent.has(combinedEchoKey)) { if (recentlySent.has(combinedEchoKey)) {
logVerbose(`Skipping auto-reply: detected echo for combined message`); logVerbose(`Skipping auto-reply: detected echo for combined message`);
recentlySent.delete(combinedEchoKey); recentlySent.delete(combinedEchoKey);
return; return false;
} }
const correlationId = msg.id ?? newConnectionId(); const correlationId = msg.id ?? newConnectionId();
@@ -1324,12 +1333,14 @@ export async function monitorWebProvider(
logVerbose( logVerbose(
"Skipping auto-reply: silent token or no text/media returned from resolver", "Skipping auto-reply: silent token or no text/media returned from resolver",
); );
return; return false;
} }
if (shouldClearGroupHistory && didSendReply) { if (shouldClearGroupHistory && didSendReply) {
groupHistories.set(groupHistoryKey, []); groupHistories.set(groupHistoryKey, []);
} }
return didSendReply;
}; };
const maybeBroadcastMessage = async (params: { const maybeBroadcastMessage = async (params: {
@@ -1352,14 +1363,18 @@ export async function monitorWebProvider(
normalizeAgentId(agent.id), normalizeAgentId(agent.id),
); );
const hasKnownAgents = (agentIds?.length ?? 0) > 0; const hasKnownAgents = (agentIds?.length ?? 0) > 0;
const groupHistorySnapshot =
msg.chatType === "group"
? (groupHistories.get(groupHistoryKey) ?? [])
: undefined;
const processForAgent = (agentId: string) => { const processForAgent = async (agentId: string): Promise<boolean> => {
const normalizedAgentId = normalizeAgentId(agentId); const normalizedAgentId = normalizeAgentId(agentId);
if (hasKnownAgents && !agentIds?.includes(normalizedAgentId)) { if (hasKnownAgents && !agentIds?.includes(normalizedAgentId)) {
whatsappInboundLog.warn( whatsappInboundLog.warn(
`Broadcast agent ${agentId} not found in agents.list; skipping`, `Broadcast agent ${agentId} not found in agents.list; skipping`,
); );
return Promise.resolve(); return false;
} }
const agentRoute = { const agentRoute = {
...route, ...route,
@@ -1378,19 +1393,35 @@ export async function monitorWebProvider(
}), }),
}; };
return processMessage(msg, agentRoute, groupHistoryKey).catch((err) => { try {
return await processMessage(msg, agentRoute, groupHistoryKey, {
groupHistory: groupHistorySnapshot,
suppressGroupHistoryClear: true,
});
} catch (err) {
whatsappInboundLog.error( whatsappInboundLog.error(
`Broadcast agent ${agentId} failed: ${formatError(err)}`, `Broadcast agent ${agentId} failed: ${formatError(err)}`,
); );
}); return false;
}
}; };
let didSendReply = false;
if (strategy === "sequential") { if (strategy === "sequential") {
for (const agentId of broadcastAgents) { for (const agentId of broadcastAgents) {
await processForAgent(agentId); if (await processForAgent(agentId)) didSendReply = true;
} }
} else { } else {
await Promise.allSettled(broadcastAgents.map(processForAgent)); const results = await Promise.allSettled(
broadcastAgents.map(processForAgent),
);
didSendReply = results.some(
(result) => result.status === "fulfilled" && result.value,
);
}
if (msg.chatType === "group" && didSendReply) {
groupHistories.set(groupHistoryKey, []);
} }
return true; return true;
@@ -1561,7 +1592,7 @@ export async function monitorWebProvider(
return; return;
} }
return processMessage(msg, route, groupHistoryKey); await processMessage(msg, route, groupHistoryKey);
}, },
}); });