diff --git a/docs/group-messages.md b/docs/group-messages.md index 0317d7f18..0ae9b3252 100644 --- a/docs/group-messages.md +++ b/docs/group-messages.md @@ -51,7 +51,7 @@ Notes: - Manual smoke: - Send an `@clawd` ping in the group and confirm a reply that references the sender name. - Send a second ping and verify the history block is included then cleared on the next turn. - - Check gateway logs (run with `--verbose`) to see `inbound web message (batched)` entries showing `from: ` and the `[from: …]` suffix. +- Check gateway logs (run with `--verbose`) to see `inbound web message` entries showing `from: ` and the `[from: …]` suffix. ## Known considerations - Heartbeats are intentionally skipped for groups to avoid noisy broadcasts. diff --git a/src/web/auto-reply.test.ts b/src/web/auto-reply.test.ts index e42d70c87..107a09b40 100644 --- a/src/web/auto-reply.test.ts +++ b/src/web/auto-reply.test.ts @@ -79,7 +79,7 @@ const makeSessionStore = async ( await fs.writeFile(storePath, JSON.stringify(entries)); const cleanup = async () => { // Session store writes can be in-flight when the test finishes (e.g. updateLastRoute - // after a batched message flush). `fs.rm({ recursive })` can race and throw ENOTEMPTY. + // after a message flush). `fs.rm({ recursive })` can race and throw ENOTEMPTY. for (let attempt = 0; attempt < 10; attempt += 1) { try { await fs.rm(dir, { recursive: true, force: true }); @@ -866,8 +866,7 @@ describe("web auto-reply", () => { } }); - it("batches inbound messages while queue is busy and preserves timestamps", async () => { - vi.useFakeTimers(); + it("processes inbound messages without batching and preserves timestamps", async () => { const originalMax = process.getMaxListeners(); process.setMaxListeners?.(1); // force low to confirm bump @@ -878,7 +877,7 @@ describe("web auto-reply", () => { const sendMedia = vi.fn(); const reply = vi.fn().mockResolvedValue(undefined); const sendComposing = vi.fn(); - const resolver = vi.fn().mockResolvedValue({ text: "batched" }); + const resolver = vi.fn().mockResolvedValue({ text: "ok" }); let capturedOnMessage: | ((msg: import("./inbound.js").WebInboundMessage) => Promise) @@ -892,12 +891,6 @@ describe("web auto-reply", () => { return { close: vi.fn() }; }; - // Queue starts busy, then frees after one polling tick. - let queueBusy = true; - const queueSpy = vi - .spyOn(commandQueue, "getQueueSize") - .mockImplementation(() => (queueBusy ? 1 : 0)); - setLoadConfigMock(() => ({ inbound: { timestampPrefix: "UTC", @@ -930,25 +923,22 @@ describe("web auto-reply", () => { sendMedia, }); - // Let the queued batch flush once the queue is free - queueBusy = false; - await vi.advanceTimersByTimeAsync(200); - - expect(resolver).toHaveBeenCalledTimes(1); - const args = resolver.mock.calls[0][0]; - expect(args.Body).toContain( + expect(resolver).toHaveBeenCalledTimes(2); + const firstArgs = resolver.mock.calls[0][0]; + const secondArgs = resolver.mock.calls[1][0]; + expect(firstArgs.Body).toContain( "[WhatsApp +1 2025-01-01 00:00] [clawdis] first", ); - expect(args.Body).toContain( + expect(firstArgs.Body).not.toContain("second"); + expect(secondArgs.Body).toContain( "[WhatsApp +1 2025-01-01 01:00] [clawdis] second", ); + expect(secondArgs.Body).not.toContain("first"); // Max listeners bumped to avoid warnings in multi-instance test runs expect(process.getMaxListeners?.()).toBeGreaterThanOrEqual(50); - queueSpy.mockRestore(); process.setMaxListeners?.(originalMax); - vi.useRealTimers(); await store.cleanup(); }); diff --git a/src/web/auto-reply.ts b/src/web/auto-reply.ts index 7ab0a0328..b8d73ce1d 100644 --- a/src/web/auto-reply.ts +++ b/src/web/auto-reply.ts @@ -768,9 +768,6 @@ export async function monitorWebProvider( const MESSAGE_TIMEOUT_MS = 30 * 60 * 1000; // 30 minutes without any messages const WATCHDOG_CHECK_MS = 60 * 1000; // Check every minute - // Batch inbound messages per conversation while command queue is busy. - type PendingBatch = { messages: WebInboundMsg[]; timer?: NodeJS.Timeout }; - const pendingBatches = new Map(); const backgroundTasks = new Set>(); const buildLine = (msg: WebInboundMsg) => { @@ -799,20 +796,11 @@ export async function monitorWebProvider( }); }; - const processBatch = async (conversationId: string) => { - const batch = pendingBatches.get(conversationId); - if (!batch || batch.messages.length === 0) return; - if (getQueueSize() > 0) { - batch.timer = setTimeout(() => processBatch(conversationId), 150); - return; - } - pendingBatches.delete(conversationId); + const processMessage = async (msg: WebInboundMsg) => { + const conversationId = msg.conversationId ?? msg.from; + let combinedBody = buildLine(msg); - const messages = batch.messages; - const latest = messages[messages.length - 1]; - let combinedBody = messages.map(buildLine).join("\n"); - - if (latest.chatType === "group") { + if (msg.chatType === "group") { const history = groupHistories.get(conversationId) ?? []; const historyWithoutCurrent = history.length > 0 ? history.slice(0, -1) : []; @@ -827,13 +815,13 @@ export async function monitorWebProvider( }), ) .join("\\n"); - combinedBody = `[Chat messages since your last reply - for context]\\n${historyText}\\n\\n[Current message - respond to this]\\n${buildLine(latest)}`; + combinedBody = `[Chat messages since your last reply - for context]\\n${historyText}\\n\\n[Current message - respond to this]\\n${buildLine(msg)}`; } // Always surface who sent the triggering message so the agent can address them. const senderLabel = - latest.senderName && latest.senderE164 - ? `${latest.senderName} (${latest.senderE164})` - : (latest.senderName ?? latest.senderE164 ?? "Unknown"); + msg.senderName && msg.senderE164 + ? `${msg.senderName} (${msg.senderE164})` + : (msg.senderName ?? msg.senderE164 ?? "Unknown"); combinedBody = `${combinedBody}\\n[from: ${senderLabel}]`; // Clear stored history after using it groupHistories.set(conversationId, []); @@ -841,46 +829,45 @@ export async function monitorWebProvider( // Echo detection uses combined body so we don't respond twice. if (recentlySent.has(combinedBody)) { - logVerbose(`Skipping auto-reply: detected echo for combined batch`); + logVerbose(`Skipping auto-reply: detected echo for combined message`); recentlySent.delete(combinedBody); return; } - const correlationId = latest.id ?? newConnectionId(); + const correlationId = msg.id ?? newConnectionId(); replyLogger.info( { connectionId, correlationId, - from: latest.chatType === "group" ? conversationId : latest.from, - to: latest.to, + from: msg.chatType === "group" ? conversationId : msg.from, + to: msg.to, body: elide(combinedBody, 240), - mediaType: latest.mediaType ?? null, - mediaPath: latest.mediaPath ?? null, - batchSize: messages.length, + mediaType: msg.mediaType ?? null, + mediaPath: msg.mediaPath ?? null, }, - "inbound web message (batched)", + "inbound web message", ); - const tsDisplay = latest.timestamp - ? new Date(latest.timestamp).toISOString() + const tsDisplay = msg.timestamp + ? new Date(msg.timestamp).toISOString() : new Date().toISOString(); const fromDisplay = - latest.chatType === "group" ? conversationId : latest.from; + msg.chatType === "group" ? conversationId : msg.from; console.log( - `\n[${tsDisplay}] ${fromDisplay} -> ${latest.to}: ${combinedBody}`, + `\n[${tsDisplay}] ${fromDisplay} -> ${msg.to}: ${combinedBody}`, ); - if (latest.chatType !== "group") { + if (msg.chatType !== "group") { const sessionCfg = cfg.inbound?.session; const mainKey = (sessionCfg?.mainKey ?? "main").trim() || "main"; const storePath = resolveStorePath(sessionCfg?.store); const to = (() => { - if (latest.senderE164) return normalizeE164(latest.senderE164); - // In direct chats, `latest.from` is already the canonical conversation id, + if (msg.senderE164) return normalizeE164(msg.senderE164); + // In direct chats, `msg.from` is already the canonical conversation id, // which is an E.164 string (e.g. "+1555"). Only fall back to JID parsing // when we were handed a JID-like string. - if (latest.from.includes("@")) return jidToE164(latest.from); - return normalizeE164(latest.from); + if (msg.from.includes("@")) return jidToE164(msg.from); + return normalizeE164(msg.from); })(); if (to) { const task = updateLastRoute({ @@ -904,21 +891,21 @@ export async function monitorWebProvider( const replyResult = await (replyResolver ?? getReplyFromConfig)( { Body: combinedBody, - From: latest.from, - To: latest.to, - MessageSid: latest.id, - MediaPath: latest.mediaPath, - MediaUrl: latest.mediaUrl, - MediaType: latest.mediaType, - ChatType: latest.chatType, - GroupSubject: latest.groupSubject, - GroupMembers: latest.groupParticipants?.join(", "), - SenderName: latest.senderName, - SenderE164: latest.senderE164, + From: msg.from, + To: msg.to, + MessageSid: msg.id, + MediaPath: msg.mediaPath, + MediaUrl: msg.mediaUrl, + MediaType: msg.mediaType, + ChatType: msg.chatType, + GroupSubject: msg.groupSubject, + GroupMembers: msg.groupParticipants?.join(", "), + SenderName: msg.senderName, + SenderE164: msg.senderE164, Surface: "whatsapp", }, { - onReplyStart: latest.sendComposing, + onReplyStart: msg.sendComposing, }, ); @@ -949,7 +936,7 @@ export async function monitorWebProvider( try { await deliverWebReply({ replyResult: replyPayload, - msg: latest, + msg, maxMediaBytes, replyLogger, runtime, @@ -958,7 +945,7 @@ export async function monitorWebProvider( if (replyPayload.text) { recentlySent.add(replyPayload.text); - recentlySent.add(combinedBody); // Prevent echo on the batch text itself + recentlySent.add(combinedBody); // Prevent echo on the combined text itself logVerbose( `Added to echo detection set (size now: ${recentlySent.size}): ${replyPayload.text.substring(0, 50)}...`, ); @@ -969,13 +956,13 @@ export async function monitorWebProvider( } const fromDisplay = - latest.chatType === "group" + msg.chatType === "group" ? conversationId - : (latest.from ?? "unknown"); + : (msg.from ?? "unknown"); if (isVerbose()) { console.log( success( - `↩️ Auto-replied to ${fromDisplay} (web${replyPayload.mediaUrl || replyPayload.mediaUrls?.length ? ", media" : ""}; batched ${messages.length})`, + `↩️ Auto-replied to ${fromDisplay} (web${replyPayload.mediaUrl || replyPayload.mediaUrls?.length ? ", media" : ""})`, ), ); } else { @@ -988,25 +975,13 @@ export async function monitorWebProvider( } catch (err) { console.error( danger( - `Failed sending web auto-reply to ${latest.from ?? conversationId}: ${String(err)}`, + `Failed sending web auto-reply to ${msg.from ?? conversationId}: ${String(err)}`, ), ); } } }; - const enqueueBatch = async (msg: WebInboundMsg) => { - const key = msg.conversationId ?? msg.from; - const bucket = pendingBatches.get(key) ?? { messages: [] }; - bucket.messages.push(msg); - pendingBatches.set(key, bucket); - if (getQueueSize() === 0) { - await processBatch(key); - } else { - bucket.timer = bucket.timer ?? setTimeout(() => processBatch(key), 150); - } - }; - const listener = await (listenerFactory ?? monitorWebInbox)({ verbose, onMessage: async (msg) => { @@ -1060,7 +1035,7 @@ export async function monitorWebProvider( } } - return enqueueBatch(msg); + return processMessage(msg); }, });