feat(web): batch inbound messages

This commit is contained in:
Peter Steinberger
2025-12-02 07:54:13 +00:00
parent 96152f6577
commit 5b54d4de7a
2 changed files with 250 additions and 135 deletions

View File

@@ -25,6 +25,8 @@ import {
stripHeartbeatToken, stripHeartbeatToken,
} from "./auto-reply.js"; } from "./auto-reply.js";
import type { sendMessageWeb } from "./outbound.js"; import type { sendMessageWeb } from "./outbound.js";
import * as commandQueue from "../process/command-queue.js";
import { getQueueSize } from "../process/command-queue.js";
const makeSessionStore = async ( const makeSessionStore = async (
entries: Record<string, unknown> = {}, entries: Record<string, unknown> = {},
@@ -573,6 +575,78 @@ describe("web auto-reply", () => {
} }
}); });
it("batches inbound messages while queue is busy and preserves timestamps", async () => {
vi.useFakeTimers();
const originalMax = process.getMaxListeners();
process.setMaxListeners?.(1); // force low to confirm bump
const sendMedia = vi.fn();
const reply = vi.fn().mockResolvedValue(undefined);
const sendComposing = vi.fn();
const resolver = vi.fn().mockResolvedValue({ text: "batched" });
let capturedOnMessage:
| ((msg: import("./inbound.js").WebInboundMessage) => Promise<void>)
| undefined;
const listenerFactory = async (opts: {
onMessage: (
msg: import("./inbound.js").WebInboundMessage,
) => Promise<void>;
}) => {
capturedOnMessage = opts.onMessage;
return { close: vi.fn() };
};
// Queue starts busy, then frees after one polling tick.
let queueBusy = true;
const queueSpy = vi
.spyOn(commandQueue, "getQueueSize")
.mockImplementation(() => (queueBusy ? 1 : 0));
setLoadConfigMock(() => ({ inbound: { timestampPrefix: "UTC" } }));
await monitorWebProvider(false, listenerFactory, false, resolver);
expect(capturedOnMessage).toBeDefined();
// Two messages from the same sender with fixed timestamps
await capturedOnMessage?.({
body: "first",
from: "+1",
to: "+2",
id: "m1",
timestamp: 1735689600000, // Jan 1 2025 00:00:00 UTC
sendComposing,
reply,
sendMedia,
});
await capturedOnMessage?.({
body: "second",
from: "+1",
to: "+2",
id: "m2",
timestamp: 1735693200000, // Jan 1 2025 01:00:00 UTC
sendComposing,
reply,
sendMedia,
});
// Let the queued batch flush once the queue is free
queueBusy = false;
vi.advanceTimersByTime(200);
expect(resolver).toHaveBeenCalledTimes(1);
const args = resolver.mock.calls[0][0];
expect(args.Body).toContain("[Jan 1 00:00] [warelay] first");
expect(args.Body).toContain("[Jan 1 01:00] [warelay] second");
// Max listeners bumped to avoid warnings in multi-instance test runs
expect(process.getMaxListeners?.()).toBeGreaterThanOrEqual(50);
queueSpy.mockRestore();
process.setMaxListeners?.(originalMax);
vi.useRealTimers();
});
it("falls back to text when media send fails", async () => { it("falls back to text when media send fails", async () => {
const sendMedia = vi.fn().mockRejectedValue(new Error("boom")); const sendMedia = vi.fn().mockRejectedValue(new Error("boom"));
const reply = vi.fn().mockResolvedValue(undefined); const reply = vi.fn().mockResolvedValue(undefined);

View File

@@ -515,6 +515,13 @@ export async function monitorWebProvider(
}), }),
); );
// Avoid noisy MaxListenersExceeded warnings in test environments where
// multiple relay instances may be constructed.
const currentMaxListeners = process.getMaxListeners?.() ?? 10;
if (process.setMaxListeners && currentMaxListeners < 50) {
process.setMaxListeners(50);
}
let sigintStop = false; let sigintStop = false;
const handleSigint = () => { const handleSigint = () => {
sigintStop = true; sigintStop = true;
@@ -544,111 +551,108 @@ export async function monitorWebProvider(
const MESSAGE_TIMEOUT_MS = 30 * 60 * 1000; // 30 minutes without any messages const MESSAGE_TIMEOUT_MS = 30 * 60 * 1000; // 30 minutes without any messages
const WATCHDOG_CHECK_MS = 60 * 1000; // Check every minute const WATCHDOG_CHECK_MS = 60 * 1000; // Check every minute
const listener = await (listenerFactory ?? monitorWebInbox)({ // Batch inbound messages while command queue is busy, then send one
verbose, // combined prompt with per-message timestamps (inbound-only behavior).
onMessage: async (msg) => { type PendingBatch = { messages: WebInboundMsg[]; timer?: NodeJS.Timeout };
// Also add IPC-sent messages to echo detection const pendingBatches = new Map<string, PendingBatch>();
// (this is handled below in the IPC sendHandler)
handledMessages += 1;
lastMessageAt = Date.now();
const ts = msg.timestamp
? new Date(msg.timestamp).toISOString()
: new Date().toISOString();
const correlationId = msg.id ?? newConnectionId();
replyLogger.info(
{
connectionId,
correlationId,
from: msg.from,
to: msg.to,
body: msg.body,
mediaType: msg.mediaType ?? null,
mediaPath: msg.mediaPath ?? null,
},
"inbound web message",
);
console.log(`\n[${ts}] ${msg.from} -> ${msg.to}: ${msg.body}`); const formatTimestamp = (ts?: number) => {
// Detect same-phone mode (self-messaging)
const isSamePhoneMode = msg.from === msg.to;
if (isSamePhoneMode) {
logVerbose(`📱 Same-phone mode detected (from === to: ${msg.from})`);
}
// Skip if this is a message we just sent (echo detection)
if (recentlySent.has(msg.body)) {
console.log(`⏭️ Skipping echo: detected recently sent message`);
logVerbose(
`Skipping auto-reply: detected echo (message matches recently sent text)`,
);
recentlySent.delete(msg.body); // Remove from set to allow future identical messages
return;
}
logVerbose(
`Echo check: message not in recent set (size: ${recentlySent.size})`,
);
lastInboundMsg = msg;
// Build timestamp prefix (default: enabled with UTC)
// Can be: true (UTC), false (disabled), or "America/New_York" (custom timezone)
let timestampStr = "";
const tsCfg = cfg.inbound?.timestampPrefix; const tsCfg = cfg.inbound?.timestampPrefix;
const tsEnabled = tsCfg !== false; // default true const tsEnabled = tsCfg !== false; // default true
if (tsEnabled) { if (!tsEnabled) return "";
const tz = typeof tsCfg === "string" ? tsCfg : "UTC"; const tz = typeof tsCfg === "string" ? tsCfg : "UTC";
const now = new Date(); const date = ts ? new Date(ts) : new Date();
try { try {
// Format: "Nov 29 06:30" - compact but informative return `[${date.toLocaleDateString("en-US", { month: "short", day: "numeric", timeZone: tz })} ${date.toLocaleTimeString("en-US", { hour: "2-digit", minute: "2-digit", hour12: false, timeZone: tz })}] `;
timestampStr = `[${now.toLocaleDateString("en-US", { month: "short", day: "numeric", timeZone: tz })} ${now.toLocaleTimeString("en-US", { hour: "2-digit", minute: "2-digit", hour12: false, timeZone: tz })}] `;
} catch { } catch {
// Fallback to UTC if timezone invalid return `[${date.toISOString().slice(5, 16).replace("T", " ")}] `;
timestampStr = `[${now.toISOString().slice(5, 16).replace("T", " ")}] `;
}
} }
};
const buildLine = (msg: WebInboundMsg) => {
// Build message prefix: explicit config > default based on allowFrom // Build message prefix: explicit config > default based on allowFrom
// If allowFrom is configured, user likely has a specific setup - no default prefix
// If no allowFrom, add "[warelay]" so AI knows it's coming through warelay
let messagePrefix = cfg.inbound?.messagePrefix; let messagePrefix = cfg.inbound?.messagePrefix;
if (messagePrefix === undefined) { if (messagePrefix === undefined) {
const hasAllowFrom = (cfg.inbound?.allowFrom?.length ?? 0) > 0; const hasAllowFrom = (cfg.inbound?.allowFrom?.length ?? 0) > 0;
messagePrefix = hasAllowFrom ? "" : "[warelay]"; messagePrefix = hasAllowFrom ? "" : "[warelay]";
} }
const prefixStr = messagePrefix ? `${messagePrefix} ` : ""; const prefixStr = messagePrefix ? `${messagePrefix} ` : "";
const bodyForCommand = `${timestampStr}${prefixStr}${msg.body}`; return `${formatTimestamp(msg.timestamp)}${prefixStr}${msg.body}`;
};
const processBatch = async (from: string) => {
const batch = pendingBatches.get(from);
if (!batch || batch.messages.length === 0) return;
if (getQueueSize() > 0) {
// Wait until command queue is free to run the combined prompt.
batch.timer = setTimeout(() => void processBatch(from), 150);
return;
}
pendingBatches.delete(from);
const messages = batch.messages;
const latest = messages[messages.length - 1];
const combinedBody = messages.map(buildLine).join("\n");
// Echo detection uses combined body so we don't respond twice.
if (recentlySent.has(combinedBody)) {
logVerbose(`Skipping auto-reply: detected echo for combined batch`);
recentlySent.delete(combinedBody);
return;
}
const correlationId = latest.id ?? newConnectionId();
replyLogger.info(
{
connectionId,
correlationId,
from,
to: latest.to,
body: combinedBody,
mediaType: latest.mediaType ?? null,
mediaPath: latest.mediaPath ?? null,
batchSize: messages.length,
},
"inbound web message (batched)",
);
const tsDisplay = latest.timestamp
? new Date(latest.timestamp).toISOString()
: new Date().toISOString();
console.log(`\n[${tsDisplay}] ${from} -> ${latest.to}: ${combinedBody}`);
const replyResult = await (replyResolver ?? getReplyFromConfig)( const replyResult = await (replyResolver ?? getReplyFromConfig)(
{ {
Body: bodyForCommand, Body: combinedBody,
From: msg.from, From: latest.from,
To: msg.to, To: latest.to,
MessageSid: msg.id, MessageSid: latest.id,
MediaPath: msg.mediaPath, MediaPath: latest.mediaPath,
MediaUrl: msg.mediaUrl, MediaUrl: latest.mediaUrl,
MediaType: msg.mediaType, MediaType: latest.mediaType,
}, },
{ {
onReplyStart: msg.sendComposing, onReplyStart: latest.sendComposing,
}, },
); );
if ( if (
!replyResult || !replyResult ||
(!replyResult.text && (!replyResult.text &&
!replyResult.mediaUrl && !replyResult.mediaUrl &&
!replyResult.mediaUrls?.length) !replyResult.mediaUrls?.length)
) { ) {
logVerbose( logVerbose("Skipping auto-reply: no text/media returned from resolver");
"Skipping auto-reply: no text/media returned from resolver",
);
return; return;
} }
// Apply response prefix if configured (skip for HEARTBEAT_OK to preserve exact match) // Apply response prefix if configured (skip for HEARTBEAT_OK to preserve exact match)
const responsePrefix = cfg.inbound?.responsePrefix; const responsePrefix = cfg.inbound?.responsePrefix;
if (responsePrefix && replyResult.text && replyResult.text.trim() !== HEARTBEAT_TOKEN) { if (
// Only add prefix if not already present responsePrefix &&
replyResult.text &&
replyResult.text.trim() !== HEARTBEAT_TOKEN
) {
if (!replyResult.text.startsWith(responsePrefix)) { if (!replyResult.text.startsWith(responsePrefix)) {
replyResult.text = `${responsePrefix} ${replyResult.text}`; replyResult.text = `${responsePrefix} ${replyResult.text}`;
} }
@@ -657,20 +661,19 @@ export async function monitorWebProvider(
try { try {
await deliverWebReply({ await deliverWebReply({
replyResult, replyResult,
msg, msg: latest,
maxMediaBytes, maxMediaBytes,
replyLogger, replyLogger,
runtime, runtime,
connectionId, connectionId,
}); });
// Track sent message to prevent echo loops
if (replyResult.text) { if (replyResult.text) {
recentlySent.add(replyResult.text); recentlySent.add(replyResult.text);
recentlySent.add(combinedBody); // Prevent echo on the batch text itself
logVerbose( logVerbose(
`Added to echo detection set (size now: ${recentlySent.size}): ${replyResult.text.substring(0, 50)}...`, `Added to echo detection set (size now: ${recentlySent.size}): ${replyResult.text.substring(0, 50)}...`,
); );
// Keep set bounded - remove oldest if too large
if (recentlySent.size > MAX_RECENT_MESSAGES) { if (recentlySent.size > MAX_RECENT_MESSAGES) {
const firstKey = recentlySent.values().next().value; const firstKey = recentlySent.values().next().value;
if (firstKey) recentlySent.delete(firstKey); if (firstKey) recentlySent.delete(firstKey);
@@ -680,7 +683,7 @@ export async function monitorWebProvider(
if (isVerbose()) { if (isVerbose()) {
console.log( console.log(
success( success(
`↩️ Auto-replied to ${msg.from} (web${replyResult.mediaUrl || replyResult.mediaUrls?.length ? ", media" : ""})`, `↩️ Auto-replied to ${from} (web${replyResult.mediaUrl || replyResult.mediaUrls?.length ? ", media" : ""}; batched ${messages.length})`,
), ),
); );
} else { } else {
@@ -693,10 +696,48 @@ export async function monitorWebProvider(
} catch (err) { } catch (err) {
console.error( console.error(
danger( danger(
`Failed sending web auto-reply to ${msg.from}: ${String(err)}`, `Failed sending web auto-reply to ${from}: ${String(err)}`,
), ),
); );
} }
};
const enqueueBatch = async (msg: WebInboundMsg) => {
const bucket = pendingBatches.get(msg.from) ?? { messages: [] };
bucket.messages.push(msg);
pendingBatches.set(msg.from, bucket);
// Process immediately when queue is free; otherwise wait until it drains.
if (getQueueSize() === 0) {
await processBatch(msg.from);
} else {
bucket.timer = bucket.timer ?? setTimeout(() => void processBatch(msg.from), 150);
}
};
const listener = await (listenerFactory ?? monitorWebInbox)({
verbose,
onMessage: async (msg) => {
handledMessages += 1;
lastMessageAt = Date.now();
lastInboundMsg = msg;
// Same-phone mode logging retained
if (msg.from === msg.to) {
logVerbose(`📱 Same-phone mode detected (from === to: ${msg.from})`);
}
// Skip if this is a message we just sent (echo detection)
if (recentlySent.has(msg.body)) {
console.log(`⏭️ Skipping echo: detected recently sent message`);
logVerbose(
`Skipping auto-reply: detected echo (message matches recently sent text)`,
);
recentlySent.delete(msg.body);
return;
}
return enqueueBatch(msg);
}, },
}); });