From 5b54d4de7a992abb2bbc742226af14c326ad6695 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Tue, 2 Dec 2025 07:54:13 +0000 Subject: [PATCH] feat(web): batch inbound messages --- src/web/auto-reply.test.ts | 74 +++++++++ src/web/auto-reply.ts | 311 +++++++++++++++++++++---------------- 2 files changed, 250 insertions(+), 135 deletions(-) diff --git a/src/web/auto-reply.test.ts b/src/web/auto-reply.test.ts index 059c6e570..945df6212 100644 --- a/src/web/auto-reply.test.ts +++ b/src/web/auto-reply.test.ts @@ -25,6 +25,8 @@ import { stripHeartbeatToken, } from "./auto-reply.js"; import type { sendMessageWeb } from "./outbound.js"; +import * as commandQueue from "../process/command-queue.js"; +import { getQueueSize } from "../process/command-queue.js"; const makeSessionStore = async ( entries: Record = {}, @@ -573,6 +575,78 @@ describe("web auto-reply", () => { } }); + it("batches inbound messages while queue is busy and preserves timestamps", async () => { + vi.useFakeTimers(); + const originalMax = process.getMaxListeners(); + process.setMaxListeners?.(1); // force low to confirm bump + + const sendMedia = vi.fn(); + const reply = vi.fn().mockResolvedValue(undefined); + const sendComposing = vi.fn(); + const resolver = vi.fn().mockResolvedValue({ text: "batched" }); + + let capturedOnMessage: + | ((msg: import("./inbound.js").WebInboundMessage) => Promise) + | undefined; + const listenerFactory = async (opts: { + onMessage: ( + msg: import("./inbound.js").WebInboundMessage, + ) => Promise; + }) => { + capturedOnMessage = opts.onMessage; + return { close: vi.fn() }; + }; + + // Queue starts busy, then frees after one polling tick. + let queueBusy = true; + const queueSpy = vi + .spyOn(commandQueue, "getQueueSize") + .mockImplementation(() => (queueBusy ? 1 : 0)); + + setLoadConfigMock(() => ({ inbound: { timestampPrefix: "UTC" } })); + + await monitorWebProvider(false, listenerFactory, false, resolver); + expect(capturedOnMessage).toBeDefined(); + + // Two messages from the same sender with fixed timestamps + await capturedOnMessage?.({ + body: "first", + from: "+1", + to: "+2", + id: "m1", + timestamp: 1735689600000, // Jan 1 2025 00:00:00 UTC + sendComposing, + reply, + sendMedia, + }); + await capturedOnMessage?.({ + body: "second", + from: "+1", + to: "+2", + id: "m2", + timestamp: 1735693200000, // Jan 1 2025 01:00:00 UTC + sendComposing, + reply, + sendMedia, + }); + + // Let the queued batch flush once the queue is free + queueBusy = false; + vi.advanceTimersByTime(200); + + expect(resolver).toHaveBeenCalledTimes(1); + const args = resolver.mock.calls[0][0]; + expect(args.Body).toContain("[Jan 1 00:00] [warelay] first"); + expect(args.Body).toContain("[Jan 1 01:00] [warelay] second"); + + // Max listeners bumped to avoid warnings in multi-instance test runs + expect(process.getMaxListeners?.()).toBeGreaterThanOrEqual(50); + + queueSpy.mockRestore(); + process.setMaxListeners?.(originalMax); + vi.useRealTimers(); + }); + it("falls back to text when media send fails", async () => { const sendMedia = vi.fn().mockRejectedValue(new Error("boom")); const reply = vi.fn().mockResolvedValue(undefined); diff --git a/src/web/auto-reply.ts b/src/web/auto-reply.ts index cc965dab0..19634a3f1 100644 --- a/src/web/auto-reply.ts +++ b/src/web/auto-reply.ts @@ -515,6 +515,13 @@ export async function monitorWebProvider( }), ); + // Avoid noisy MaxListenersExceeded warnings in test environments where + // multiple relay instances may be constructed. + const currentMaxListeners = process.getMaxListeners?.() ?? 10; + if (process.setMaxListeners && currentMaxListeners < 50) { + process.setMaxListeners(50); + } + let sigintStop = false; const handleSigint = () => { sigintStop = true; @@ -544,35 +551,179 @@ export async function monitorWebProvider( const MESSAGE_TIMEOUT_MS = 30 * 60 * 1000; // 30 minutes without any messages const WATCHDOG_CHECK_MS = 60 * 1000; // Check every minute + // Batch inbound messages while command queue is busy, then send one + // combined prompt with per-message timestamps (inbound-only behavior). + type PendingBatch = { messages: WebInboundMsg[]; timer?: NodeJS.Timeout }; + const pendingBatches = new Map(); + + const formatTimestamp = (ts?: number) => { + const tsCfg = cfg.inbound?.timestampPrefix; + const tsEnabled = tsCfg !== false; // default true + if (!tsEnabled) return ""; + const tz = typeof tsCfg === "string" ? tsCfg : "UTC"; + const date = ts ? new Date(ts) : new Date(); + try { + return `[${date.toLocaleDateString("en-US", { month: "short", day: "numeric", timeZone: tz })} ${date.toLocaleTimeString("en-US", { hour: "2-digit", minute: "2-digit", hour12: false, timeZone: tz })}] `; + } catch { + return `[${date.toISOString().slice(5, 16).replace("T", " ")}] `; + } + }; + + const buildLine = (msg: WebInboundMsg) => { + // Build message prefix: explicit config > default based on allowFrom + let messagePrefix = cfg.inbound?.messagePrefix; + if (messagePrefix === undefined) { + const hasAllowFrom = (cfg.inbound?.allowFrom?.length ?? 0) > 0; + messagePrefix = hasAllowFrom ? "" : "[warelay]"; + } + const prefixStr = messagePrefix ? `${messagePrefix} ` : ""; + return `${formatTimestamp(msg.timestamp)}${prefixStr}${msg.body}`; + }; + + const processBatch = async (from: string) => { + const batch = pendingBatches.get(from); + if (!batch || batch.messages.length === 0) return; + if (getQueueSize() > 0) { + // Wait until command queue is free to run the combined prompt. + batch.timer = setTimeout(() => void processBatch(from), 150); + return; + } + pendingBatches.delete(from); + + const messages = batch.messages; + const latest = messages[messages.length - 1]; + const combinedBody = messages.map(buildLine).join("\n"); + + // Echo detection uses combined body so we don't respond twice. + if (recentlySent.has(combinedBody)) { + logVerbose(`Skipping auto-reply: detected echo for combined batch`); + recentlySent.delete(combinedBody); + return; + } + + const correlationId = latest.id ?? newConnectionId(); + replyLogger.info( + { + connectionId, + correlationId, + from, + to: latest.to, + body: combinedBody, + mediaType: latest.mediaType ?? null, + mediaPath: latest.mediaPath ?? null, + batchSize: messages.length, + }, + "inbound web message (batched)", + ); + + const tsDisplay = latest.timestamp + ? new Date(latest.timestamp).toISOString() + : new Date().toISOString(); + console.log(`\n[${tsDisplay}] ${from} -> ${latest.to}: ${combinedBody}`); + + const replyResult = await (replyResolver ?? getReplyFromConfig)( + { + Body: combinedBody, + From: latest.from, + To: latest.to, + MessageSid: latest.id, + MediaPath: latest.mediaPath, + MediaUrl: latest.mediaUrl, + MediaType: latest.mediaType, + }, + { + onReplyStart: latest.sendComposing, + }, + ); + + if ( + !replyResult || + (!replyResult.text && + !replyResult.mediaUrl && + !replyResult.mediaUrls?.length) + ) { + logVerbose("Skipping auto-reply: no text/media returned from resolver"); + return; + } + + // Apply response prefix if configured (skip for HEARTBEAT_OK to preserve exact match) + const responsePrefix = cfg.inbound?.responsePrefix; + if ( + responsePrefix && + replyResult.text && + replyResult.text.trim() !== HEARTBEAT_TOKEN + ) { + if (!replyResult.text.startsWith(responsePrefix)) { + replyResult.text = `${responsePrefix} ${replyResult.text}`; + } + } + + try { + await deliverWebReply({ + replyResult, + msg: latest, + maxMediaBytes, + replyLogger, + runtime, + connectionId, + }); + + if (replyResult.text) { + recentlySent.add(replyResult.text); + recentlySent.add(combinedBody); // Prevent echo on the batch text itself + logVerbose( + `Added to echo detection set (size now: ${recentlySent.size}): ${replyResult.text.substring(0, 50)}...`, + ); + if (recentlySent.size > MAX_RECENT_MESSAGES) { + const firstKey = recentlySent.values().next().value; + if (firstKey) recentlySent.delete(firstKey); + } + } + + if (isVerbose()) { + console.log( + success( + `↩️ Auto-replied to ${from} (web${replyResult.mediaUrl || replyResult.mediaUrls?.length ? ", media" : ""}; batched ${messages.length})`, + ), + ); + } else { + console.log( + success( + `↩️ ${replyResult.text ?? ""}${replyResult.mediaUrl || replyResult.mediaUrls?.length ? " (media)" : ""}`, + ), + ); + } + } catch (err) { + console.error( + danger( + `Failed sending web auto-reply to ${from}: ${String(err)}`, + ), + ); + } + }; + + const enqueueBatch = async (msg: WebInboundMsg) => { + const bucket = pendingBatches.get(msg.from) ?? { messages: [] }; + bucket.messages.push(msg); + pendingBatches.set(msg.from, bucket); + + // Process immediately when queue is free; otherwise wait until it drains. + if (getQueueSize() === 0) { + await processBatch(msg.from); + } else { + bucket.timer = bucket.timer ?? setTimeout(() => void processBatch(msg.from), 150); + } + }; + const listener = await (listenerFactory ?? monitorWebInbox)({ verbose, onMessage: async (msg) => { - // Also add IPC-sent messages to echo detection - // (this is handled below in the IPC sendHandler) handledMessages += 1; lastMessageAt = Date.now(); - const ts = msg.timestamp - ? new Date(msg.timestamp).toISOString() - : new Date().toISOString(); - const correlationId = msg.id ?? newConnectionId(); - replyLogger.info( - { - connectionId, - correlationId, - from: msg.from, - to: msg.to, - body: msg.body, - mediaType: msg.mediaType ?? null, - mediaPath: msg.mediaPath ?? null, - }, - "inbound web message", - ); + lastInboundMsg = msg; - console.log(`\n[${ts}] ${msg.from} -> ${msg.to}: ${msg.body}`); - - // Detect same-phone mode (self-messaging) - const isSamePhoneMode = msg.from === msg.to; - if (isSamePhoneMode) { + // Same-phone mode logging retained + if (msg.from === msg.to) { logVerbose(`📱 Same-phone mode detected (from === to: ${msg.from})`); } @@ -582,121 +733,11 @@ export async function monitorWebProvider( logVerbose( `Skipping auto-reply: detected echo (message matches recently sent text)`, ); - recentlySent.delete(msg.body); // Remove from set to allow future identical messages + recentlySent.delete(msg.body); return; } - logVerbose( - `Echo check: message not in recent set (size: ${recentlySent.size})`, - ); - - lastInboundMsg = msg; - - // Build timestamp prefix (default: enabled with UTC) - // Can be: true (UTC), false (disabled), or "America/New_York" (custom timezone) - let timestampStr = ""; - const tsCfg = cfg.inbound?.timestampPrefix; - const tsEnabled = tsCfg !== false; // default true - if (tsEnabled) { - const tz = typeof tsCfg === "string" ? tsCfg : "UTC"; - const now = new Date(); - try { - // Format: "Nov 29 06:30" - compact but informative - timestampStr = `[${now.toLocaleDateString("en-US", { month: "short", day: "numeric", timeZone: tz })} ${now.toLocaleTimeString("en-US", { hour: "2-digit", minute: "2-digit", hour12: false, timeZone: tz })}] `; - } catch { - // Fallback to UTC if timezone invalid - timestampStr = `[${now.toISOString().slice(5, 16).replace("T", " ")}] `; - } - } - - // Build message prefix: explicit config > default based on allowFrom - // If allowFrom is configured, user likely has a specific setup - no default prefix - // If no allowFrom, add "[warelay]" so AI knows it's coming through warelay - let messagePrefix = cfg.inbound?.messagePrefix; - if (messagePrefix === undefined) { - const hasAllowFrom = (cfg.inbound?.allowFrom?.length ?? 0) > 0; - messagePrefix = hasAllowFrom ? "" : "[warelay]"; - } - const prefixStr = messagePrefix ? `${messagePrefix} ` : ""; - const bodyForCommand = `${timestampStr}${prefixStr}${msg.body}`; - - const replyResult = await (replyResolver ?? getReplyFromConfig)( - { - Body: bodyForCommand, - From: msg.from, - To: msg.to, - MessageSid: msg.id, - MediaPath: msg.mediaPath, - MediaUrl: msg.mediaUrl, - MediaType: msg.mediaType, - }, - { - onReplyStart: msg.sendComposing, - }, - ); - if ( - !replyResult || - (!replyResult.text && - !replyResult.mediaUrl && - !replyResult.mediaUrls?.length) - ) { - logVerbose( - "Skipping auto-reply: no text/media returned from resolver", - ); - return; - } - // Apply response prefix if configured (skip for HEARTBEAT_OK to preserve exact match) - const responsePrefix = cfg.inbound?.responsePrefix; - if (responsePrefix && replyResult.text && replyResult.text.trim() !== HEARTBEAT_TOKEN) { - // Only add prefix if not already present - if (!replyResult.text.startsWith(responsePrefix)) { - replyResult.text = `${responsePrefix} ${replyResult.text}`; - } - } - - try { - await deliverWebReply({ - replyResult, - msg, - maxMediaBytes, - replyLogger, - runtime, - connectionId, - }); - - // Track sent message to prevent echo loops - if (replyResult.text) { - recentlySent.add(replyResult.text); - logVerbose( - `Added to echo detection set (size now: ${recentlySent.size}): ${replyResult.text.substring(0, 50)}...`, - ); - // Keep set bounded - remove oldest if too large - if (recentlySent.size > MAX_RECENT_MESSAGES) { - const firstKey = recentlySent.values().next().value; - if (firstKey) recentlySent.delete(firstKey); - } - } - - if (isVerbose()) { - console.log( - success( - `↩️ Auto-replied to ${msg.from} (web${replyResult.mediaUrl || replyResult.mediaUrls?.length ? ", media" : ""})`, - ), - ); - } else { - console.log( - success( - `↩️ ${replyResult.text ?? ""}${replyResult.mediaUrl || replyResult.mediaUrls?.length ? " (media)" : ""}`, - ), - ); - } - } catch (err) { - console.error( - danger( - `Failed sending web auto-reply to ${msg.from}: ${String(err)}`, - ), - ); - } + return enqueueBatch(msg); }, });