From 1a97aadb6bb268589ad4820cda7bb635d99b3ada Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Fri, 9 Jan 2026 21:39:28 +0100 Subject: [PATCH] fix: broadcast group history consistency (#547) --- src/web/auto-reply.test.ts | 106 +++++++++++++++++++++++++++++++++++++ src/web/auto-reply.ts | 55 ++++++++++++++----- 2 files changed, 149 insertions(+), 12 deletions(-) diff --git a/src/web/auto-reply.test.ts b/src/web/auto-reply.test.ts index 6152c56ff..e76333e8d 100644 --- a/src/web/auto-reply.test.ts +++ b/src/web/auto-reply.test.ts @@ -2140,6 +2140,112 @@ describe("broadcast groups", () => { resetLoadConfigMock(); }); + it("shares group history across broadcast agents and clears after replying", async () => { + setLoadConfigMock({ + whatsapp: { allowFrom: ["*"] }, + agents: { + defaults: { maxConcurrent: 10 }, + list: [{ id: "alfred" }, { id: "baerbel" }], + }, + broadcast: { + strategy: "sequential", + "123@g.us": ["alfred", "baerbel"], + }, + } satisfies ClawdbotConfig); + + const sendMedia = vi.fn(); + const reply = vi.fn().mockResolvedValue(undefined); + const sendComposing = vi.fn(); + const resolver = vi.fn().mockResolvedValue({ text: "ok" }); + + let capturedOnMessage: + | ((msg: import("./inbound.js").WebInboundMessage) => Promise) + | undefined; + const listenerFactory = async (opts: { + onMessage: ( + msg: import("./inbound.js").WebInboundMessage, + ) => Promise; + }) => { + capturedOnMessage = opts.onMessage; + return { close: vi.fn() }; + }; + + await monitorWebProvider(false, listenerFactory, false, resolver); + expect(capturedOnMessage).toBeDefined(); + + await capturedOnMessage?.({ + body: "hello group", + from: "123@g.us", + conversationId: "123@g.us", + chatId: "123@g.us", + chatType: "group", + to: "+2", + id: "g1", + senderE164: "+111", + senderName: "Alice", + selfE164: "+999", + sendComposing, + reply, + sendMedia, + }); + + expect(resolver).not.toHaveBeenCalled(); + + await capturedOnMessage?.({ + body: "@bot ping", + from: "123@g.us", + conversationId: "123@g.us", + chatId: "123@g.us", + chatType: "group", + to: "+2", + id: "g2", + senderE164: "+222", + senderName: "Bob", + mentionedJids: ["999@s.whatsapp.net"], + selfE164: "+999", + selfJid: "999@s.whatsapp.net", + sendComposing, + reply, + sendMedia, + }); + + expect(resolver).toHaveBeenCalledTimes(2); + for (const call of resolver.mock.calls.slice(0, 2)) { + const payload = call[0] as { Body: string }; + expect(payload.Body).toContain("Chat messages since your last reply"); + expect(payload.Body).toContain("Alice: hello group"); + expect(payload.Body).toContain("@bot ping"); + expect(payload.Body).toContain("[from: Bob (+222)]"); + } + + await capturedOnMessage?.({ + body: "@bot ping 2", + from: "123@g.us", + conversationId: "123@g.us", + chatId: "123@g.us", + chatType: "group", + to: "+2", + id: "g3", + senderE164: "+333", + senderName: "Clara", + mentionedJids: ["999@s.whatsapp.net"], + selfE164: "+999", + selfJid: "999@s.whatsapp.net", + sendComposing, + reply, + sendMedia, + }); + + expect(resolver).toHaveBeenCalledTimes(4); + for (const call of resolver.mock.calls.slice(2, 4)) { + const payload = call[0] as { Body: string }; + expect(payload.Body).not.toContain("Alice: hello group"); + expect(payload.Body).not.toContain("Chat messages since your last reply"); + } + + resetLoadConfigMock(); + }); + it("broadcasts in parallel by default", async () => { setLoadConfigMock({ whatsapp: { allowFrom: ["*"] }, diff --git a/src/web/auto-reply.ts b/src/web/auto-reply.ts index 994283617..02c708d2f 100644 --- a/src/web/auto-reply.ts +++ b/src/web/auto-reply.ts @@ -1093,7 +1093,15 @@ export async function monitorWebProvider( msg: WebInboundMsg, route: ReturnType, groupHistoryKey: string, - ) => { + opts?: { + groupHistory?: Array<{ + sender: string; + body: string; + timestamp?: number; + }>; + suppressGroupHistoryClear?: boolean; + }, + ): Promise => { status.lastMessageAt = Date.now(); status.lastEventAt = status.lastMessageAt; emitStatus(); @@ -1102,7 +1110,8 @@ export async function monitorWebProvider( let shouldClearGroupHistory = false; if (msg.chatType === "group") { - const history = groupHistories.get(groupHistoryKey) ?? []; + const history = + opts?.groupHistory ?? groupHistories.get(groupHistoryKey) ?? []; const historyWithoutCurrent = history.length > 0 ? history.slice(0, -1) : []; if (historyWithoutCurrent.length > 0) { @@ -1127,7 +1136,7 @@ export async function monitorWebProvider( ? `${msg.senderName} (${msg.senderE164})` : (msg.senderName ?? msg.senderE164 ?? "Unknown"); combinedBody = `${combinedBody}\\n[from: ${senderLabel}]`; - shouldClearGroupHistory = true; + shouldClearGroupHistory = !(opts?.suppressGroupHistoryClear ?? false); } // Echo detection uses combined body so we don't respond twice. @@ -1138,7 +1147,7 @@ export async function monitorWebProvider( if (recentlySent.has(combinedEchoKey)) { logVerbose(`Skipping auto-reply: detected echo for combined message`); recentlySent.delete(combinedEchoKey); - return; + return false; } const correlationId = msg.id ?? newConnectionId(); @@ -1324,12 +1333,14 @@ export async function monitorWebProvider( logVerbose( "Skipping auto-reply: silent token or no text/media returned from resolver", ); - return; + return false; } if (shouldClearGroupHistory && didSendReply) { groupHistories.set(groupHistoryKey, []); } + + return didSendReply; }; const maybeBroadcastMessage = async (params: { @@ -1352,14 +1363,18 @@ export async function monitorWebProvider( normalizeAgentId(agent.id), ); const hasKnownAgents = (agentIds?.length ?? 0) > 0; + const groupHistorySnapshot = + msg.chatType === "group" + ? (groupHistories.get(groupHistoryKey) ?? []) + : undefined; - const processForAgent = (agentId: string) => { + const processForAgent = async (agentId: string): Promise => { const normalizedAgentId = normalizeAgentId(agentId); if (hasKnownAgents && !agentIds?.includes(normalizedAgentId)) { whatsappInboundLog.warn( `Broadcast agent ${agentId} not found in agents.list; skipping`, ); - return Promise.resolve(); + return false; } const agentRoute = { ...route, @@ -1378,19 +1393,35 @@ export async function monitorWebProvider( }), }; - return processMessage(msg, agentRoute, groupHistoryKey).catch((err) => { + try { + return await processMessage(msg, agentRoute, groupHistoryKey, { + groupHistory: groupHistorySnapshot, + suppressGroupHistoryClear: true, + }); + } catch (err) { whatsappInboundLog.error( `Broadcast agent ${agentId} failed: ${formatError(err)}`, ); - }); + return false; + } }; + let didSendReply = false; if (strategy === "sequential") { for (const agentId of broadcastAgents) { - await processForAgent(agentId); + if (await processForAgent(agentId)) didSendReply = true; } } else { - await Promise.allSettled(broadcastAgents.map(processForAgent)); + const results = await Promise.allSettled( + broadcastAgents.map(processForAgent), + ); + didSendReply = results.some( + (result) => result.status === "fulfilled" && result.value, + ); + } + + if (msg.chatType === "group" && didSendReply) { + groupHistories.set(groupHistoryKey, []); } return true; @@ -1561,7 +1592,7 @@ export async function monitorWebProvider( return; } - return processMessage(msg, route, groupHistoryKey); + await processMessage(msg, route, groupHistoryKey); }, });