fix: remove WhatsApp batching delay

This commit is contained in:
Peter Steinberger
2025-12-20 01:36:37 +01:00
parent 46b9006de2
commit afa4a234f9
3 changed files with 54 additions and 89 deletions

View File

@@ -51,7 +51,7 @@ Notes:
- Manual smoke:
- Send an `@clawd` ping in the group and confirm a reply that references the sender name.
- Send a second ping and verify the history block is included then cleared on the next turn.
- Check gateway logs (run with `--verbose`) to see `inbound web message (batched)` entries showing `from: <groupJid>` and the `[from: …]` suffix.
- Check gateway logs (run with `--verbose`) to see `inbound web message` entries showing `from: <groupJid>` and the `[from: …]` suffix.
## Known considerations
- Heartbeats are intentionally skipped for groups to avoid noisy broadcasts.

View File

@@ -79,7 +79,7 @@ const makeSessionStore = async (
await fs.writeFile(storePath, JSON.stringify(entries));
const cleanup = async () => {
// Session store writes can be in-flight when the test finishes (e.g. updateLastRoute
// after a batched message flush). `fs.rm({ recursive })` can race and throw ENOTEMPTY.
// after a message flush). `fs.rm({ recursive })` can race and throw ENOTEMPTY.
for (let attempt = 0; attempt < 10; attempt += 1) {
try {
await fs.rm(dir, { recursive: true, force: true });
@@ -866,8 +866,7 @@ describe("web auto-reply", () => {
}
});
it("batches inbound messages while queue is busy and preserves timestamps", async () => {
vi.useFakeTimers();
it("processes inbound messages without batching and preserves timestamps", async () => {
const originalMax = process.getMaxListeners();
process.setMaxListeners?.(1); // force low to confirm bump
@@ -878,7 +877,7 @@ describe("web auto-reply", () => {
const sendMedia = vi.fn();
const reply = vi.fn().mockResolvedValue(undefined);
const sendComposing = vi.fn();
const resolver = vi.fn().mockResolvedValue({ text: "batched" });
const resolver = vi.fn().mockResolvedValue({ text: "ok" });
let capturedOnMessage:
| ((msg: import("./inbound.js").WebInboundMessage) => Promise<void>)
@@ -892,12 +891,6 @@ describe("web auto-reply", () => {
return { close: vi.fn() };
};
// Queue starts busy, then frees after one polling tick.
let queueBusy = true;
const queueSpy = vi
.spyOn(commandQueue, "getQueueSize")
.mockImplementation(() => (queueBusy ? 1 : 0));
setLoadConfigMock(() => ({
inbound: {
timestampPrefix: "UTC",
@@ -930,25 +923,22 @@ describe("web auto-reply", () => {
sendMedia,
});
// Let the queued batch flush once the queue is free
queueBusy = false;
await vi.advanceTimersByTimeAsync(200);
expect(resolver).toHaveBeenCalledTimes(1);
const args = resolver.mock.calls[0][0];
expect(args.Body).toContain(
expect(resolver).toHaveBeenCalledTimes(2);
const firstArgs = resolver.mock.calls[0][0];
const secondArgs = resolver.mock.calls[1][0];
expect(firstArgs.Body).toContain(
"[WhatsApp +1 2025-01-01 00:00] [clawdis] first",
);
expect(args.Body).toContain(
expect(firstArgs.Body).not.toContain("second");
expect(secondArgs.Body).toContain(
"[WhatsApp +1 2025-01-01 01:00] [clawdis] second",
);
expect(secondArgs.Body).not.toContain("first");
// Max listeners bumped to avoid warnings in multi-instance test runs
expect(process.getMaxListeners?.()).toBeGreaterThanOrEqual(50);
queueSpy.mockRestore();
process.setMaxListeners?.(originalMax);
vi.useRealTimers();
await store.cleanup();
});

View File

@@ -768,9 +768,6 @@ export async function monitorWebProvider(
const MESSAGE_TIMEOUT_MS = 30 * 60 * 1000; // 30 minutes without any messages
const WATCHDOG_CHECK_MS = 60 * 1000; // Check every minute
// Batch inbound messages per conversation while command queue is busy.
type PendingBatch = { messages: WebInboundMsg[]; timer?: NodeJS.Timeout };
const pendingBatches = new Map<string, PendingBatch>();
const backgroundTasks = new Set<Promise<unknown>>();
const buildLine = (msg: WebInboundMsg) => {
@@ -799,20 +796,11 @@ export async function monitorWebProvider(
});
};
const processBatch = async (conversationId: string) => {
const batch = pendingBatches.get(conversationId);
if (!batch || batch.messages.length === 0) return;
if (getQueueSize() > 0) {
batch.timer = setTimeout(() => processBatch(conversationId), 150);
return;
}
pendingBatches.delete(conversationId);
const processMessage = async (msg: WebInboundMsg) => {
const conversationId = msg.conversationId ?? msg.from;
let combinedBody = buildLine(msg);
const messages = batch.messages;
const latest = messages[messages.length - 1];
let combinedBody = messages.map(buildLine).join("\n");
if (latest.chatType === "group") {
if (msg.chatType === "group") {
const history = groupHistories.get(conversationId) ?? [];
const historyWithoutCurrent =
history.length > 0 ? history.slice(0, -1) : [];
@@ -827,13 +815,13 @@ export async function monitorWebProvider(
}),
)
.join("\\n");
combinedBody = `[Chat messages since your last reply - for context]\\n${historyText}\\n\\n[Current message - respond to this]\\n${buildLine(latest)}`;
combinedBody = `[Chat messages since your last reply - for context]\\n${historyText}\\n\\n[Current message - respond to this]\\n${buildLine(msg)}`;
}
// Always surface who sent the triggering message so the agent can address them.
const senderLabel =
latest.senderName && latest.senderE164
? `${latest.senderName} (${latest.senderE164})`
: (latest.senderName ?? latest.senderE164 ?? "Unknown");
msg.senderName && msg.senderE164
? `${msg.senderName} (${msg.senderE164})`
: (msg.senderName ?? msg.senderE164 ?? "Unknown");
combinedBody = `${combinedBody}\\n[from: ${senderLabel}]`;
// Clear stored history after using it
groupHistories.set(conversationId, []);
@@ -841,46 +829,45 @@ export async function monitorWebProvider(
// Echo detection uses combined body so we don't respond twice.
if (recentlySent.has(combinedBody)) {
logVerbose(`Skipping auto-reply: detected echo for combined batch`);
logVerbose(`Skipping auto-reply: detected echo for combined message`);
recentlySent.delete(combinedBody);
return;
}
const correlationId = latest.id ?? newConnectionId();
const correlationId = msg.id ?? newConnectionId();
replyLogger.info(
{
connectionId,
correlationId,
from: latest.chatType === "group" ? conversationId : latest.from,
to: latest.to,
from: msg.chatType === "group" ? conversationId : msg.from,
to: msg.to,
body: elide(combinedBody, 240),
mediaType: latest.mediaType ?? null,
mediaPath: latest.mediaPath ?? null,
batchSize: messages.length,
mediaType: msg.mediaType ?? null,
mediaPath: msg.mediaPath ?? null,
},
"inbound web message (batched)",
"inbound web message",
);
const tsDisplay = latest.timestamp
? new Date(latest.timestamp).toISOString()
const tsDisplay = msg.timestamp
? new Date(msg.timestamp).toISOString()
: new Date().toISOString();
const fromDisplay =
latest.chatType === "group" ? conversationId : latest.from;
msg.chatType === "group" ? conversationId : msg.from;
console.log(
`\n[${tsDisplay}] ${fromDisplay} -> ${latest.to}: ${combinedBody}`,
`\n[${tsDisplay}] ${fromDisplay} -> ${msg.to}: ${combinedBody}`,
);
if (latest.chatType !== "group") {
if (msg.chatType !== "group") {
const sessionCfg = cfg.inbound?.session;
const mainKey = (sessionCfg?.mainKey ?? "main").trim() || "main";
const storePath = resolveStorePath(sessionCfg?.store);
const to = (() => {
if (latest.senderE164) return normalizeE164(latest.senderE164);
// In direct chats, `latest.from` is already the canonical conversation id,
if (msg.senderE164) return normalizeE164(msg.senderE164);
// In direct chats, `msg.from` is already the canonical conversation id,
// which is an E.164 string (e.g. "+1555"). Only fall back to JID parsing
// when we were handed a JID-like string.
if (latest.from.includes("@")) return jidToE164(latest.from);
return normalizeE164(latest.from);
if (msg.from.includes("@")) return jidToE164(msg.from);
return normalizeE164(msg.from);
})();
if (to) {
const task = updateLastRoute({
@@ -904,21 +891,21 @@ export async function monitorWebProvider(
const replyResult = await (replyResolver ?? getReplyFromConfig)(
{
Body: combinedBody,
From: latest.from,
To: latest.to,
MessageSid: latest.id,
MediaPath: latest.mediaPath,
MediaUrl: latest.mediaUrl,
MediaType: latest.mediaType,
ChatType: latest.chatType,
GroupSubject: latest.groupSubject,
GroupMembers: latest.groupParticipants?.join(", "),
SenderName: latest.senderName,
SenderE164: latest.senderE164,
From: msg.from,
To: msg.to,
MessageSid: msg.id,
MediaPath: msg.mediaPath,
MediaUrl: msg.mediaUrl,
MediaType: msg.mediaType,
ChatType: msg.chatType,
GroupSubject: msg.groupSubject,
GroupMembers: msg.groupParticipants?.join(", "),
SenderName: msg.senderName,
SenderE164: msg.senderE164,
Surface: "whatsapp",
},
{
onReplyStart: latest.sendComposing,
onReplyStart: msg.sendComposing,
},
);
@@ -949,7 +936,7 @@ export async function monitorWebProvider(
try {
await deliverWebReply({
replyResult: replyPayload,
msg: latest,
msg,
maxMediaBytes,
replyLogger,
runtime,
@@ -958,7 +945,7 @@ export async function monitorWebProvider(
if (replyPayload.text) {
recentlySent.add(replyPayload.text);
recentlySent.add(combinedBody); // Prevent echo on the batch text itself
recentlySent.add(combinedBody); // Prevent echo on the combined text itself
logVerbose(
`Added to echo detection set (size now: ${recentlySent.size}): ${replyPayload.text.substring(0, 50)}...`,
);
@@ -969,13 +956,13 @@ export async function monitorWebProvider(
}
const fromDisplay =
latest.chatType === "group"
msg.chatType === "group"
? conversationId
: (latest.from ?? "unknown");
: (msg.from ?? "unknown");
if (isVerbose()) {
console.log(
success(
`↩️ Auto-replied to ${fromDisplay} (web${replyPayload.mediaUrl || replyPayload.mediaUrls?.length ? ", media" : ""}; batched ${messages.length})`,
`↩️ Auto-replied to ${fromDisplay} (web${replyPayload.mediaUrl || replyPayload.mediaUrls?.length ? ", media" : ""})`,
),
);
} else {
@@ -988,25 +975,13 @@ export async function monitorWebProvider(
} catch (err) {
console.error(
danger(
`Failed sending web auto-reply to ${latest.from ?? conversationId}: ${String(err)}`,
`Failed sending web auto-reply to ${msg.from ?? conversationId}: ${String(err)}`,
),
);
}
}
};
const enqueueBatch = async (msg: WebInboundMsg) => {
const key = msg.conversationId ?? msg.from;
const bucket = pendingBatches.get(key) ?? { messages: [] };
bucket.messages.push(msg);
pendingBatches.set(key, bucket);
if (getQueueSize() === 0) {
await processBatch(key);
} else {
bucket.timer = bucket.timer ?? setTimeout(() => processBatch(key), 150);
}
};
const listener = await (listenerFactory ?? monitorWebInbox)({
verbose,
onMessage: async (msg) => {
@@ -1060,7 +1035,7 @@ export async function monitorWebProvider(
}
}
return enqueueBatch(msg);
return processMessage(msg);
},
});