From 7c765615699ab81893252e28e48abcfdd14e824f Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Sun, 11 Jan 2026 00:12:17 +0100 Subject: [PATCH] fix: dedupe inbound messages across providers --- CHANGELOG.md | 1 + docs/concepts/messages.md | 6 ++ .../reply/dispatch-from-config.test.ts | 36 ++++++++- src/auto-reply/reply/dispatch-from-config.ts | 5 ++ src/auto-reply/reply/inbound-dedupe.test.ts | 81 +++++++++++++++++++ src/auto-reply/reply/inbound-dedupe.ts | 52 ++++++++++++ src/infra/dedupe.test.ts | 34 ++++++++ src/infra/dedupe.ts | 59 ++++++++++++++ src/signal/monitor.tool-result.test.ts | 2 + src/slack/monitor.tool-result.test.ts | 2 + src/slack/monitor.ts | 18 +---- src/telegram/bot.media.test.ts | 5 ++ src/telegram/bot.test.ts | 2 + src/telegram/bot.ts | 42 +++------- src/web/auto-reply.test.ts | 2 + src/web/inbound.media.test.ts | 3 +- src/web/inbound.ts | 20 ++++- src/web/monitor-inbox.test.ts | 36 ++++++++- 18 files changed, 353 insertions(+), 53 deletions(-) create mode 100644 src/auto-reply/reply/inbound-dedupe.test.ts create mode 100644 src/auto-reply/reply/inbound-dedupe.ts create mode 100644 src/infra/dedupe.test.ts create mode 100644 src/infra/dedupe.ts diff --git a/CHANGELOG.md b/CHANGELOG.md index 8b45a3066..50a23b371 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,6 +15,7 @@ - Docker: allow optional home volume + extra bind mounts in `docker-setup.sh`. (#679) — thanks @gabriel-trigo. ### Fixes +- Providers: dedupe inbound messages across providers to avoid duplicate LLM runs on redeliveries/reconnects. (#689) — thanks @adam91holt. - Agents: strip ``/`` tags from hidden reasoning output and cover tag variants in tests. (#688) — thanks @theglove44. - macOS: save model picker selections as normalized provider/model IDs and keep manual entries aligned. (#683) — thanks @benithors. - Agents: recognize "usage limit" errors as rate limits for failover. (#687) — thanks @evalexpr. diff --git a/docs/concepts/messages.md b/docs/concepts/messages.md index 8050a14e1..d96a36d05 100644 --- a/docs/concepts/messages.md +++ b/docs/concepts/messages.md @@ -27,6 +27,12 @@ Key knobs live in configuration: See [Configuration](/gateway/configuration) for full schema. +## Inbound dedupe + +Providers can redeliver the same message after reconnects. Clawdbot keeps a +short-lived cache keyed by provider/account/peer/session/message id so duplicate +deliveries do not trigger another agent run. + ## Sessions and devices Sessions are owned by the gateway, not by clients. diff --git a/src/auto-reply/reply/dispatch-from-config.test.ts b/src/auto-reply/reply/dispatch-from-config.test.ts index 26bb863cb..7feb79158 100644 --- a/src/auto-reply/reply/dispatch-from-config.test.ts +++ b/src/auto-reply/reply/dispatch-from-config.test.ts @@ -1,4 +1,4 @@ -import { describe, expect, it, vi } from "vitest"; +import { beforeEach, describe, expect, it, vi } from "vitest"; import type { ClawdbotConfig } from "../../config/config.js"; import type { MsgContext } from "../templating.js"; @@ -34,6 +34,7 @@ vi.mock("./abort.js", () => ({ })); const { dispatchReplyFromConfig } = await import("./dispatch-from-config.js"); +const { resetInboundDedupe } = await import("./inbound-dedupe.js"); function createDispatcher(): ReplyDispatcher { return { @@ -46,6 +47,9 @@ function createDispatcher(): ReplyDispatcher { } describe("dispatchReplyFromConfig", () => { + beforeEach(() => { + resetInboundDedupe(); + }); it("does not route when Provider matches OriginatingChannel (even if Surface is missing)", async () => { mocks.tryFastAbortFromMessage.mockResolvedValue({ handled: false, @@ -125,4 +129,34 @@ describe("dispatchReplyFromConfig", () => { text: "⚙️ Agent was aborted.", }); }); + + it("deduplicates inbound messages by MessageSid and origin", async () => { + mocks.tryFastAbortFromMessage.mockResolvedValue({ + handled: false, + aborted: false, + }); + const cfg = {} as ClawdbotConfig; + const ctx: MsgContext = { + Provider: "whatsapp", + OriginatingChannel: "whatsapp", + OriginatingTo: "whatsapp:+15555550123", + MessageSid: "msg-1", + }; + const replyResolver = vi.fn(async () => ({ text: "hi" }) as ReplyPayload); + + await dispatchReplyFromConfig({ + ctx, + cfg, + dispatcher: createDispatcher(), + replyResolver, + }); + await dispatchReplyFromConfig({ + ctx, + cfg, + dispatcher: createDispatcher(), + replyResolver, + }); + + expect(replyResolver).toHaveBeenCalledTimes(1); + }); }); diff --git a/src/auto-reply/reply/dispatch-from-config.ts b/src/auto-reply/reply/dispatch-from-config.ts index 2c07cb4d8..18b562e1a 100644 --- a/src/auto-reply/reply/dispatch-from-config.ts +++ b/src/auto-reply/reply/dispatch-from-config.ts @@ -4,6 +4,7 @@ import { getReplyFromConfig } from "../reply.js"; import type { MsgContext } from "../templating.js"; import type { GetReplyOptions, ReplyPayload } from "../types.js"; import { tryFastAbortFromMessage } from "./abort.js"; +import { shouldSkipDuplicateInbound } from "./inbound-dedupe.js"; import type { ReplyDispatcher, ReplyDispatchKind } from "./reply-dispatcher.js"; import { isRoutableChannel, routeReply } from "./route-reply.js"; @@ -21,6 +22,10 @@ export async function dispatchReplyFromConfig(params: { }): Promise { const { ctx, cfg, dispatcher } = params; + if (shouldSkipDuplicateInbound(ctx)) { + return { queuedFinal: false, counts: dispatcher.getQueuedCounts() }; + } + // Check if we should route replies to originating channel instead of dispatcher. // Only route when the originating channel is DIFFERENT from the current surface. // This handles cross-provider routing (e.g., message from Telegram being processed diff --git a/src/auto-reply/reply/inbound-dedupe.test.ts b/src/auto-reply/reply/inbound-dedupe.test.ts new file mode 100644 index 000000000..9a5588ce3 --- /dev/null +++ b/src/auto-reply/reply/inbound-dedupe.test.ts @@ -0,0 +1,81 @@ +import { describe, expect, it } from "vitest"; + +import type { MsgContext } from "../templating.js"; +import { + buildInboundDedupeKey, + resetInboundDedupe, + shouldSkipDuplicateInbound, +} from "./inbound-dedupe.js"; + +describe("inbound dedupe", () => { + it("builds a stable key when MessageSid is present", () => { + const ctx: MsgContext = { + Provider: "telegram", + OriginatingChannel: "telegram", + OriginatingTo: "telegram:123", + MessageSid: "42", + }; + expect(buildInboundDedupeKey(ctx)).toBe("telegram|telegram:123|42"); + }); + + it("skips duplicates with the same key", () => { + resetInboundDedupe(); + const ctx: MsgContext = { + Provider: "whatsapp", + OriginatingChannel: "whatsapp", + OriginatingTo: "whatsapp:+1555", + MessageSid: "msg-1", + }; + expect(shouldSkipDuplicateInbound(ctx, { now: 100 })).toBe(false); + expect(shouldSkipDuplicateInbound(ctx, { now: 200 })).toBe(true); + }); + + it("does not dedupe when the peer changes", () => { + resetInboundDedupe(); + const base: MsgContext = { + Provider: "whatsapp", + OriginatingChannel: "whatsapp", + MessageSid: "msg-1", + }; + expect( + shouldSkipDuplicateInbound( + { ...base, OriginatingTo: "whatsapp:+1000" }, + { now: 100 }, + ), + ).toBe(false); + expect( + shouldSkipDuplicateInbound( + { ...base, OriginatingTo: "whatsapp:+2000" }, + { now: 200 }, + ), + ).toBe(false); + }); + + it("does not dedupe across session keys", () => { + resetInboundDedupe(); + const base: MsgContext = { + Provider: "whatsapp", + OriginatingChannel: "whatsapp", + OriginatingTo: "whatsapp:+1555", + MessageSid: "msg-1", + }; + expect( + shouldSkipDuplicateInbound( + { ...base, SessionKey: "agent:alpha:main" }, + { now: 100 }, + ), + ).toBe(false); + expect( + shouldSkipDuplicateInbound( + { ...base, SessionKey: "agent:bravo:main" }, + { now: 200 }, + ), + ).toBe(false); + expect( + shouldSkipDuplicateInbound( + { ...base, SessionKey: "agent:alpha:main" }, + { now: 300 }, + ), + ).toBe(true); + }); +}); diff --git a/src/auto-reply/reply/inbound-dedupe.ts b/src/auto-reply/reply/inbound-dedupe.ts new file mode 100644 index 000000000..2246b6b79 --- /dev/null +++ b/src/auto-reply/reply/inbound-dedupe.ts @@ -0,0 +1,52 @@ +import { logVerbose, shouldLogVerbose } from "../../globals.js"; +import { createDedupeCache, type DedupeCache } from "../../infra/dedupe.js"; +import type { MsgContext } from "../templating.js"; + +const DEFAULT_INBOUND_DEDUPE_TTL_MS = 20 * 60_000; +const DEFAULT_INBOUND_DEDUPE_MAX = 5000; + +const inboundDedupeCache = createDedupeCache({ + ttlMs: DEFAULT_INBOUND_DEDUPE_TTL_MS, + maxSize: DEFAULT_INBOUND_DEDUPE_MAX, +}); + +const normalizeProvider = (value?: string | null) => + value?.trim().toLowerCase() || ""; + +const resolveInboundPeerId = (ctx: MsgContext) => + ctx.OriginatingTo ?? ctx.To ?? ctx.From ?? ctx.SessionKey; + +export function buildInboundDedupeKey(ctx: MsgContext): string | null { + const provider = normalizeProvider( + ctx.OriginatingChannel ?? ctx.Provider ?? ctx.Surface, + ); + const messageId = ctx.MessageSid?.trim(); + if (!provider || !messageId) return null; + const peerId = resolveInboundPeerId(ctx); + if (!peerId) return null; + const sessionKey = ctx.SessionKey?.trim() ?? ""; + const accountId = ctx.AccountId?.trim() ?? ""; + const threadId = + typeof ctx.MessageThreadId === "number" ? String(ctx.MessageThreadId) : ""; + return [provider, accountId, sessionKey, peerId, threadId, messageId] + .filter(Boolean) + .join("|"); +} + +export function shouldSkipDuplicateInbound( + ctx: MsgContext, + opts?: { cache?: DedupeCache; now?: number }, +): boolean { + const key = buildInboundDedupeKey(ctx); + if (!key) return false; + const cache = opts?.cache ?? inboundDedupeCache; + const skipped = cache.check(key, opts?.now); + if (skipped && shouldLogVerbose()) { + logVerbose(`inbound dedupe: skipped ${key}`); + } + return skipped; +} + +export function resetInboundDedupe(): void { + inboundDedupeCache.clear(); +} diff --git a/src/infra/dedupe.test.ts b/src/infra/dedupe.test.ts new file mode 100644 index 000000000..3d41938a4 --- /dev/null +++ b/src/infra/dedupe.test.ts @@ -0,0 +1,34 @@ +import { describe, expect, it } from "vitest"; + +import { createDedupeCache } from "./dedupe.js"; + +describe("createDedupeCache", () => { + it("marks duplicates within TTL", () => { + const cache = createDedupeCache({ ttlMs: 1000, maxSize: 10 }); + expect(cache.check("a", 100)).toBe(false); + expect(cache.check("a", 500)).toBe(true); + }); + + it("expires entries after TTL", () => { + const cache = createDedupeCache({ ttlMs: 1000, maxSize: 10 }); + expect(cache.check("a", 100)).toBe(false); + expect(cache.check("a", 1501)).toBe(false); + }); + + it("evicts oldest entries when over max size", () => { + const cache = createDedupeCache({ ttlMs: 10_000, maxSize: 2 }); + expect(cache.check("a", 100)).toBe(false); + expect(cache.check("b", 200)).toBe(false); + expect(cache.check("c", 300)).toBe(false); + expect(cache.check("a", 400)).toBe(false); + }); + + it("prunes expired entries even when refreshed keys are older in insertion order", () => { + const cache = createDedupeCache({ ttlMs: 100, maxSize: 10 }); + expect(cache.check("a", 0)).toBe(false); + expect(cache.check("b", 50)).toBe(false); + expect(cache.check("a", 120)).toBe(false); + expect(cache.check("c", 200)).toBe(false); + expect(cache.size()).toBe(2); + }); +}); diff --git a/src/infra/dedupe.ts b/src/infra/dedupe.ts new file mode 100644 index 000000000..ad49d65bd --- /dev/null +++ b/src/infra/dedupe.ts @@ -0,0 +1,59 @@ +export type DedupeCache = { + check: (key: string | undefined | null, now?: number) => boolean; + clear: () => void; + size: () => number; +}; + +type DedupeCacheOptions = { + ttlMs: number; + maxSize: number; +}; + +export function createDedupeCache(options: DedupeCacheOptions): DedupeCache { + const ttlMs = Math.max(0, options.ttlMs); + const maxSize = Math.max(0, Math.floor(options.maxSize)); + const cache = new Map(); + + const touch = (key: string, now: number) => { + cache.delete(key); + cache.set(key, now); + }; + + const prune = (now: number) => { + const cutoff = ttlMs > 0 ? now - ttlMs : undefined; + if (cutoff !== undefined) { + for (const [entryKey, entryTs] of cache) { + if (entryTs < cutoff) { + cache.delete(entryKey); + } + } + } + if (maxSize <= 0) { + cache.clear(); + return; + } + while (cache.size > maxSize) { + const oldestKey = cache.keys().next().value as string | undefined; + if (!oldestKey) break; + cache.delete(oldestKey); + } + }; + + return { + check: (key, now = Date.now()) => { + if (!key) return false; + const existing = cache.get(key); + if (existing !== undefined && (ttlMs <= 0 || now - existing < ttlMs)) { + touch(key, now); + return true; + } + touch(key, now); + prune(now); + return false; + }, + clear: () => { + cache.clear(); + }, + size: () => cache.size, + }; +} diff --git a/src/signal/monitor.tool-result.test.ts b/src/signal/monitor.tool-result.test.ts index dba9405db..65c9ccd02 100644 --- a/src/signal/monitor.tool-result.test.ts +++ b/src/signal/monitor.tool-result.test.ts @@ -1,5 +1,6 @@ import { beforeEach, describe, expect, it, vi } from "vitest"; +import { resetInboundDedupe } from "../auto-reply/reply/inbound-dedupe.js"; import type { ClawdbotConfig } from "../config/config.js"; import { peekSystemEvents, @@ -61,6 +62,7 @@ vi.mock("./daemon.js", () => ({ const flush = () => new Promise((resolve) => setTimeout(resolve, 0)); beforeEach(() => { + resetInboundDedupe(); config = { messages: { responsePrefix: "PFX" }, signal: { autoStart: false, dmPolicy: "open", allowFrom: ["*"] }, diff --git a/src/slack/monitor.tool-result.test.ts b/src/slack/monitor.tool-result.test.ts index b7ef431a8..52d201d92 100644 --- a/src/slack/monitor.tool-result.test.ts +++ b/src/slack/monitor.tool-result.test.ts @@ -1,6 +1,7 @@ import { beforeEach, describe, expect, it, vi } from "vitest"; import { HISTORY_CONTEXT_MARKER } from "../auto-reply/reply/history.js"; +import { resetInboundDedupe } from "../auto-reply/reply/inbound-dedupe.js"; import { CURRENT_MESSAGE_MARKER } from "../auto-reply/reply/mentions.js"; import { monitorSlackProvider } from "./monitor.js"; @@ -100,6 +101,7 @@ async function waitForEvent(name: string) { } beforeEach(() => { + resetInboundDedupe(); config = { messages: { responsePrefix: "PFX", diff --git a/src/slack/monitor.ts b/src/slack/monitor.ts index 44ddd66ad..072b233c0 100644 --- a/src/slack/monitor.ts +++ b/src/slack/monitor.ts @@ -50,6 +50,7 @@ import { updateLastRoute, } from "../config/sessions.js"; import { danger, logVerbose, shouldLogVerbose } from "../globals.js"; +import { createDedupeCache } from "../infra/dedupe.js"; import { enqueueSystemEvent } from "../infra/system-events.js"; import { getChildLogger } from "../logging.js"; import { type FetchLike, fetchRemoteMedia } from "../media/fetch.js"; @@ -516,24 +517,11 @@ export async function monitorSlackProvider(opts: MonitorSlackOpts = {}) { } >(); const userCache = new Map(); - const seenMessages = new Map(); + const seenMessages = createDedupeCache({ ttlMs: 60_000, maxSize: 500 }); const markMessageSeen = (channelId: string | undefined, ts?: string) => { if (!channelId || !ts) return false; - const key = `${channelId}:${ts}`; - if (seenMessages.has(key)) return true; - seenMessages.set(key, Date.now()); - if (seenMessages.size > 500) { - const cutoff = Date.now() - 60_000; - for (const [entry, seenAt] of seenMessages) { - if (seenAt < cutoff || seenMessages.size > 450) { - seenMessages.delete(entry); - } else { - break; - } - } - } - return false; + return seenMessages.check(`${channelId}:${ts}`); }; const app = new App({ diff --git a/src/telegram/bot.media.test.ts b/src/telegram/bot.media.test.ts index a30c10d8d..3bcef4c54 100644 --- a/src/telegram/bot.media.test.ts +++ b/src/telegram/bot.media.test.ts @@ -1,4 +1,5 @@ import { beforeEach, describe, expect, it, vi } from "vitest"; +import { resetInboundDedupe } from "../auto-reply/reply/inbound-dedupe.js"; const useSpy = vi.fn(); const middlewareUseSpy = vi.fn(); @@ -16,6 +17,10 @@ const apiStub: ApiStub = { sendChatAction: sendChatActionSpy, }; +beforeEach(() => { + resetInboundDedupe(); +}); + vi.mock("grammy", () => ({ Bot: class { api = apiStub; diff --git a/src/telegram/bot.test.ts b/src/telegram/bot.test.ts index dd908b6bf..5b510d049 100644 --- a/src/telegram/bot.test.ts +++ b/src/telegram/bot.test.ts @@ -2,6 +2,7 @@ import fs from "node:fs"; import os from "node:os"; import path from "node:path"; import { beforeEach, describe, expect, it, vi } from "vitest"; +import { resetInboundDedupe } from "../auto-reply/reply/inbound-dedupe.js"; import * as replyModule from "../auto-reply/reply.js"; import { createTelegramBot, getTelegramSequentialKey } from "./bot.js"; import { resolveTelegramFetch } from "./fetch.js"; @@ -124,6 +125,7 @@ const getOnHandler = (event: string) => { describe("createTelegramBot", () => { beforeEach(() => { + resetInboundDedupe(); loadConfig.mockReturnValue({ telegram: { dmPolicy: "open", allowFrom: ["*"] }, }); diff --git a/src/telegram/bot.ts b/src/telegram/bot.ts index 9eadd1469..dfaf49ea0 100644 --- a/src/telegram/bot.ts +++ b/src/telegram/bot.ts @@ -45,6 +45,7 @@ import { updateLastRoute, } from "../config/sessions.js"; import { danger, logVerbose, shouldLogVerbose } from "../globals.js"; +import { createDedupeCache } from "../infra/dedupe.js"; import { formatErrorMessage } from "../infra/errors.js"; import { recordProviderActivity } from "../infra/provider-activity.js"; import { getChildLogger } from "../logging.js"; @@ -120,32 +121,11 @@ const buildTelegramUpdateKey = (ctx: TelegramUpdateKeyContext) => { return undefined; }; -const shouldSkipTelegramUpdate = ( - cache: Map, - key?: string, -) => { - if (!key) return false; - const now = Date.now(); - const existing = cache.get(key); - if (existing && now - existing.ts < RECENT_TELEGRAM_UPDATE_TTL_MS) { - return true; - } - if (existing) cache.delete(key); - cache.set(key, { ts: now }); - if (cache.size > RECENT_TELEGRAM_UPDATE_MAX) { - for (const [cachedKey, entry] of cache) { - if (now - entry.ts > RECENT_TELEGRAM_UPDATE_TTL_MS) { - cache.delete(cachedKey); - } - } - while (cache.size > RECENT_TELEGRAM_UPDATE_MAX) { - const oldestKey = cache.keys().next().value as string | undefined; - if (!oldestKey) break; - cache.delete(oldestKey); - } - } - return false; -}; +const createTelegramUpdateDedupe = () => + createDedupeCache({ + ttlMs: RECENT_TELEGRAM_UPDATE_TTL_MS, + maxSize: RECENT_TELEGRAM_UPDATE_MAX, + }); /** Telegram Location object */ interface TelegramLocation { @@ -233,10 +213,10 @@ export function createTelegramBot(opts: TelegramBotOptions) { bot.api.config.use(apiThrottler()); bot.use(sequentialize(getTelegramSequentialKey)); - const recentUpdates = new Map(); + const recentUpdates = createTelegramUpdateDedupe(); const shouldSkipUpdate = (ctx: TelegramUpdateKeyContext) => { const key = buildTelegramUpdateKey(ctx); - const skipped = shouldSkipTelegramUpdate(recentUpdates, key); + const skipped = recentUpdates.check(key); if (skipped && key && shouldLogVerbose()) { logVerbose(`telegram dedupe: skipped ${key}`); } @@ -388,7 +368,7 @@ export function createTelegramBot(opts: TelegramBotOptions) { primaryCtx: TelegramContext, allMedia: Array<{ path: string; contentType?: string }>, storeAllowFrom: string[], - options?: { forceWasMentioned?: boolean }, + options?: { forceWasMentioned?: boolean; messageIdOverride?: string }, ) => { const msg = primaryCtx.message; recordProviderActivity({ @@ -720,7 +700,7 @@ export function createTelegramBot(opts: TelegramBotOptions) { SenderUsername: senderUsername || undefined, Provider: "telegram", Surface: "telegram", - MessageSid: String(msg.message_id), + MessageSid: options?.messageIdOverride ?? String(msg.message_id), ReplyToId: replyTarget?.id, ReplyToBody: replyTarget?.body, ReplyToSender: replyTarget?.sender, @@ -1163,7 +1143,7 @@ export function createTelegramBot(opts: TelegramBotOptions) { { message: syntheticMessage, me: ctx.me, getFile }, [], storeAllowFrom, - { forceWasMentioned: true }, + { forceWasMentioned: true, messageIdOverride: callback.id }, ); } catch (err) { runtime.error?.(danger(`callback handler failed: ${String(err)}`)); diff --git a/src/web/auto-reply.test.ts b/src/web/auto-reply.test.ts index f1cea72ba..7e0c78c6e 100644 --- a/src/web/auto-reply.test.ts +++ b/src/web/auto-reply.test.ts @@ -17,6 +17,7 @@ vi.mock("../agents/pi-embedded.js", () => ({ })); import { runEmbeddedPiAgent } from "../agents/pi-embedded.js"; +import { resetInboundDedupe } from "../auto-reply/reply/inbound-dedupe.js"; import { getReplyFromConfig } from "../auto-reply/reply.js"; import type { ClawdbotConfig } from "../config/config.js"; import { resetLogger, setLoggerOverride } from "../logging.js"; @@ -57,6 +58,7 @@ const rmDirWithRetries = async (dir: string): Promise => { }; beforeEach(async () => { + resetInboundDedupe(); previousHome = process.env.HOME; tempHome = await fs.mkdtemp(path.join(os.tmpdir(), "clawdbot-web-home-")); process.env.HOME = tempHome; diff --git a/src/web/inbound.media.test.ts b/src/web/inbound.media.test.ts index c18cf7f26..6551d79b4 100644 --- a/src/web/inbound.media.test.ts +++ b/src/web/inbound.media.test.ts @@ -106,11 +106,12 @@ vi.mock("./session.js", () => { }; }); -import { monitorWebInbox } from "./inbound.js"; +import { monitorWebInbox, resetWebInboundDedupe } from "./inbound.js"; describe("web inbound media saves with extension", () => { beforeEach(() => { saveMediaBufferSpy.mockClear(); + resetWebInboundDedupe(); }); beforeAll(async () => { diff --git a/src/web/inbound.ts b/src/web/inbound.ts index 87ea51d88..3a90479bd 100644 --- a/src/web/inbound.ts +++ b/src/web/inbound.ts @@ -14,6 +14,7 @@ import { import { loadConfig } from "../config/config.js"; import { logVerbose, shouldLogVerbose } from "../globals.js"; +import { createDedupeCache } from "../infra/dedupe.js"; import { recordProviderActivity } from "../infra/provider-activity.js"; import { createSubsystemLogger, getChildLogger } from "../logging.js"; import { saveMediaBuffer } from "../media/store.js"; @@ -48,6 +49,17 @@ export type WebListenerCloseReason = { error?: unknown; }; +const RECENT_WEB_MESSAGE_TTL_MS = 20 * 60_000; +const RECENT_WEB_MESSAGE_MAX = 5000; +const recentInboundMessages = createDedupeCache({ + ttlMs: RECENT_WEB_MESSAGE_TTL_MS, + maxSize: RECENT_WEB_MESSAGE_MAX, +}); + +export function resetWebInboundDedupe(): void { + recentInboundMessages.clear(); +} + export type WebInboundMessage = { id?: string; from: string; // conversation id: E.164 for direct chats, group JID for groups @@ -117,7 +129,6 @@ export async function monitorWebInbox(options: { } const selfJid = sock.user?.id; const selfE164 = selfJid ? jidToE164(selfJid) : null; - const seen = new Set(); const groupMetaCache = new Map< string, { subject?: string; participants?: string[]; expires: number } @@ -169,9 +180,6 @@ export async function monitorWebInbox(options: { direction: "inbound", }); const id = msg.key?.id ?? undefined; - // De-dupe on message id; Baileys can emit retries. - if (id && seen.has(id)) continue; - if (id) seen.add(id); // Note: not filtering fromMe here - echo detection happens in auto-reply layer const remoteJid = msg.key?.remoteJid; if (!remoteJid) continue; @@ -179,6 +187,10 @@ export async function monitorWebInbox(options: { if (remoteJid.endsWith("@status") || remoteJid.endsWith("@broadcast")) continue; const group = isJidGroup(remoteJid); + if (id) { + const dedupeKey = `${options.accountId}:${remoteJid}:${id}`; + if (recentInboundMessages.check(dedupeKey)) continue; + } const participantJid = msg.key?.participant ?? undefined; const from = group ? remoteJid : await resolveInboundJid(remoteJid); // Skip if we still can't resolve an id to key conversation diff --git a/src/web/monitor-inbox.test.ts b/src/web/monitor-inbox.test.ts index 6463d5797..6a48e4dd4 100644 --- a/src/web/monitor-inbox.test.ts +++ b/src/web/monitor-inbox.test.ts @@ -76,7 +76,7 @@ import path from "node:path"; import { afterEach, beforeEach, describe, expect, it } from "vitest"; import { resetLogger, setLoggerOverride } from "../logging.js"; -import { monitorWebInbox } from "./inbound.js"; +import { monitorWebInbox, resetWebInboundDedupe } from "./inbound.js"; const ACCOUNT_ID = "default"; let authDir: string; @@ -89,6 +89,7 @@ describe("web monitor inbox", () => { code: "PAIRCODE", created: true, }); + resetWebInboundDedupe(); authDir = fsSync.mkdtempSync(path.join(os.tmpdir(), "clawdbot-auth-")); }); @@ -151,6 +152,39 @@ describe("web monitor inbox", () => { await listener.close(); }); + it("deduplicates redelivered messages by id", async () => { + const onMessage = vi.fn(async () => { + return; + }); + + const listener = await monitorWebInbox({ + verbose: false, + onMessage, + accountId: ACCOUNT_ID, + authDir, + }); + const sock = await createWaSocket(); + const upsert = { + type: "notify", + messages: [ + { + key: { id: "abc", fromMe: false, remoteJid: "999@s.whatsapp.net" }, + message: { conversation: "ping" }, + messageTimestamp: 1_700_000_000, + pushName: "Tester", + }, + ], + }; + + sock.ev.emit("messages.upsert", upsert); + sock.ev.emit("messages.upsert", upsert); + await new Promise((resolve) => setImmediate(resolve)); + + expect(onMessage).toHaveBeenCalledTimes(1); + + await listener.close(); + }); + it("resolves LID JIDs using Baileys LID mapping store", async () => { const onMessage = vi.fn(async () => { return;