fix: dedupe inbound messages across providers

This commit is contained in:
Peter Steinberger
2026-01-11 00:12:17 +01:00
parent bd2002010c
commit 7c76561569
18 changed files with 353 additions and 53 deletions

View File

@@ -15,6 +15,7 @@
- Docker: allow optional home volume + extra bind mounts in `docker-setup.sh`. (#679) — thanks @gabriel-trigo.
### Fixes
- Providers: dedupe inbound messages across providers to avoid duplicate LLM runs on redeliveries/reconnects. (#689) — thanks @adam91holt.
- Agents: strip `<thought>`/`<antthinking>` tags from hidden reasoning output and cover tag variants in tests. (#688) — thanks @theglove44.
- macOS: save model picker selections as normalized provider/model IDs and keep manual entries aligned. (#683) — thanks @benithors.
- Agents: recognize "usage limit" errors as rate limits for failover. (#687) — thanks @evalexpr.

View File

@@ -27,6 +27,12 @@ Key knobs live in configuration:
See [Configuration](/gateway/configuration) for full schema.
## Inbound dedupe
Providers can redeliver the same message after reconnects. Clawdbot keeps a
short-lived cache keyed by provider/account/peer/session/message id so duplicate
deliveries do not trigger another agent run.
## Sessions and devices
Sessions are owned by the gateway, not by clients.

View File

@@ -1,4 +1,4 @@
import { describe, expect, it, vi } from "vitest";
import { beforeEach, describe, expect, it, vi } from "vitest";
import type { ClawdbotConfig } from "../../config/config.js";
import type { MsgContext } from "../templating.js";
@@ -34,6 +34,7 @@ vi.mock("./abort.js", () => ({
}));
const { dispatchReplyFromConfig } = await import("./dispatch-from-config.js");
const { resetInboundDedupe } = await import("./inbound-dedupe.js");
function createDispatcher(): ReplyDispatcher {
return {
@@ -46,6 +47,9 @@ function createDispatcher(): ReplyDispatcher {
}
describe("dispatchReplyFromConfig", () => {
beforeEach(() => {
resetInboundDedupe();
});
it("does not route when Provider matches OriginatingChannel (even if Surface is missing)", async () => {
mocks.tryFastAbortFromMessage.mockResolvedValue({
handled: false,
@@ -125,4 +129,34 @@ describe("dispatchReplyFromConfig", () => {
text: "⚙️ Agent was aborted.",
});
});
it("deduplicates inbound messages by MessageSid and origin", async () => {
mocks.tryFastAbortFromMessage.mockResolvedValue({
handled: false,
aborted: false,
});
const cfg = {} as ClawdbotConfig;
const ctx: MsgContext = {
Provider: "whatsapp",
OriginatingChannel: "whatsapp",
OriginatingTo: "whatsapp:+15555550123",
MessageSid: "msg-1",
};
const replyResolver = vi.fn(async () => ({ text: "hi" }) as ReplyPayload);
await dispatchReplyFromConfig({
ctx,
cfg,
dispatcher: createDispatcher(),
replyResolver,
});
await dispatchReplyFromConfig({
ctx,
cfg,
dispatcher: createDispatcher(),
replyResolver,
});
expect(replyResolver).toHaveBeenCalledTimes(1);
});
});

View File

@@ -4,6 +4,7 @@ import { getReplyFromConfig } from "../reply.js";
import type { MsgContext } from "../templating.js";
import type { GetReplyOptions, ReplyPayload } from "../types.js";
import { tryFastAbortFromMessage } from "./abort.js";
import { shouldSkipDuplicateInbound } from "./inbound-dedupe.js";
import type { ReplyDispatcher, ReplyDispatchKind } from "./reply-dispatcher.js";
import { isRoutableChannel, routeReply } from "./route-reply.js";
@@ -21,6 +22,10 @@ export async function dispatchReplyFromConfig(params: {
}): Promise<DispatchFromConfigResult> {
const { ctx, cfg, dispatcher } = params;
if (shouldSkipDuplicateInbound(ctx)) {
return { queuedFinal: false, counts: dispatcher.getQueuedCounts() };
}
// Check if we should route replies to originating channel instead of dispatcher.
// Only route when the originating channel is DIFFERENT from the current surface.
// This handles cross-provider routing (e.g., message from Telegram being processed

View File

@@ -0,0 +1,81 @@
import { describe, expect, it } from "vitest";
import type { MsgContext } from "../templating.js";
import {
buildInboundDedupeKey,
resetInboundDedupe,
shouldSkipDuplicateInbound,
} from "./inbound-dedupe.js";
describe("inbound dedupe", () => {
it("builds a stable key when MessageSid is present", () => {
const ctx: MsgContext = {
Provider: "telegram",
OriginatingChannel: "telegram",
OriginatingTo: "telegram:123",
MessageSid: "42",
};
expect(buildInboundDedupeKey(ctx)).toBe("telegram|telegram:123|42");
});
it("skips duplicates with the same key", () => {
resetInboundDedupe();
const ctx: MsgContext = {
Provider: "whatsapp",
OriginatingChannel: "whatsapp",
OriginatingTo: "whatsapp:+1555",
MessageSid: "msg-1",
};
expect(shouldSkipDuplicateInbound(ctx, { now: 100 })).toBe(false);
expect(shouldSkipDuplicateInbound(ctx, { now: 200 })).toBe(true);
});
it("does not dedupe when the peer changes", () => {
resetInboundDedupe();
const base: MsgContext = {
Provider: "whatsapp",
OriginatingChannel: "whatsapp",
MessageSid: "msg-1",
};
expect(
shouldSkipDuplicateInbound(
{ ...base, OriginatingTo: "whatsapp:+1000" },
{ now: 100 },
),
).toBe(false);
expect(
shouldSkipDuplicateInbound(
{ ...base, OriginatingTo: "whatsapp:+2000" },
{ now: 200 },
),
).toBe(false);
});
it("does not dedupe across session keys", () => {
resetInboundDedupe();
const base: MsgContext = {
Provider: "whatsapp",
OriginatingChannel: "whatsapp",
OriginatingTo: "whatsapp:+1555",
MessageSid: "msg-1",
};
expect(
shouldSkipDuplicateInbound(
{ ...base, SessionKey: "agent:alpha:main" },
{ now: 100 },
),
).toBe(false);
expect(
shouldSkipDuplicateInbound(
{ ...base, SessionKey: "agent:bravo:main" },
{ now: 200 },
),
).toBe(false);
expect(
shouldSkipDuplicateInbound(
{ ...base, SessionKey: "agent:alpha:main" },
{ now: 300 },
),
).toBe(true);
});
});

View File

@@ -0,0 +1,52 @@
import { logVerbose, shouldLogVerbose } from "../../globals.js";
import { createDedupeCache, type DedupeCache } from "../../infra/dedupe.js";
import type { MsgContext } from "../templating.js";
const DEFAULT_INBOUND_DEDUPE_TTL_MS = 20 * 60_000;
const DEFAULT_INBOUND_DEDUPE_MAX = 5000;
const inboundDedupeCache = createDedupeCache({
ttlMs: DEFAULT_INBOUND_DEDUPE_TTL_MS,
maxSize: DEFAULT_INBOUND_DEDUPE_MAX,
});
const normalizeProvider = (value?: string | null) =>
value?.trim().toLowerCase() || "";
const resolveInboundPeerId = (ctx: MsgContext) =>
ctx.OriginatingTo ?? ctx.To ?? ctx.From ?? ctx.SessionKey;
export function buildInboundDedupeKey(ctx: MsgContext): string | null {
const provider = normalizeProvider(
ctx.OriginatingChannel ?? ctx.Provider ?? ctx.Surface,
);
const messageId = ctx.MessageSid?.trim();
if (!provider || !messageId) return null;
const peerId = resolveInboundPeerId(ctx);
if (!peerId) return null;
const sessionKey = ctx.SessionKey?.trim() ?? "";
const accountId = ctx.AccountId?.trim() ?? "";
const threadId =
typeof ctx.MessageThreadId === "number" ? String(ctx.MessageThreadId) : "";
return [provider, accountId, sessionKey, peerId, threadId, messageId]
.filter(Boolean)
.join("|");
}
export function shouldSkipDuplicateInbound(
ctx: MsgContext,
opts?: { cache?: DedupeCache; now?: number },
): boolean {
const key = buildInboundDedupeKey(ctx);
if (!key) return false;
const cache = opts?.cache ?? inboundDedupeCache;
const skipped = cache.check(key, opts?.now);
if (skipped && shouldLogVerbose()) {
logVerbose(`inbound dedupe: skipped ${key}`);
}
return skipped;
}
export function resetInboundDedupe(): void {
inboundDedupeCache.clear();
}

34
src/infra/dedupe.test.ts Normal file
View File

@@ -0,0 +1,34 @@
import { describe, expect, it } from "vitest";
import { createDedupeCache } from "./dedupe.js";
describe("createDedupeCache", () => {
it("marks duplicates within TTL", () => {
const cache = createDedupeCache({ ttlMs: 1000, maxSize: 10 });
expect(cache.check("a", 100)).toBe(false);
expect(cache.check("a", 500)).toBe(true);
});
it("expires entries after TTL", () => {
const cache = createDedupeCache({ ttlMs: 1000, maxSize: 10 });
expect(cache.check("a", 100)).toBe(false);
expect(cache.check("a", 1501)).toBe(false);
});
it("evicts oldest entries when over max size", () => {
const cache = createDedupeCache({ ttlMs: 10_000, maxSize: 2 });
expect(cache.check("a", 100)).toBe(false);
expect(cache.check("b", 200)).toBe(false);
expect(cache.check("c", 300)).toBe(false);
expect(cache.check("a", 400)).toBe(false);
});
it("prunes expired entries even when refreshed keys are older in insertion order", () => {
const cache = createDedupeCache({ ttlMs: 100, maxSize: 10 });
expect(cache.check("a", 0)).toBe(false);
expect(cache.check("b", 50)).toBe(false);
expect(cache.check("a", 120)).toBe(false);
expect(cache.check("c", 200)).toBe(false);
expect(cache.size()).toBe(2);
});
});

59
src/infra/dedupe.ts Normal file
View File

@@ -0,0 +1,59 @@
export type DedupeCache = {
check: (key: string | undefined | null, now?: number) => boolean;
clear: () => void;
size: () => number;
};
type DedupeCacheOptions = {
ttlMs: number;
maxSize: number;
};
export function createDedupeCache(options: DedupeCacheOptions): DedupeCache {
const ttlMs = Math.max(0, options.ttlMs);
const maxSize = Math.max(0, Math.floor(options.maxSize));
const cache = new Map<string, number>();
const touch = (key: string, now: number) => {
cache.delete(key);
cache.set(key, now);
};
const prune = (now: number) => {
const cutoff = ttlMs > 0 ? now - ttlMs : undefined;
if (cutoff !== undefined) {
for (const [entryKey, entryTs] of cache) {
if (entryTs < cutoff) {
cache.delete(entryKey);
}
}
}
if (maxSize <= 0) {
cache.clear();
return;
}
while (cache.size > maxSize) {
const oldestKey = cache.keys().next().value as string | undefined;
if (!oldestKey) break;
cache.delete(oldestKey);
}
};
return {
check: (key, now = Date.now()) => {
if (!key) return false;
const existing = cache.get(key);
if (existing !== undefined && (ttlMs <= 0 || now - existing < ttlMs)) {
touch(key, now);
return true;
}
touch(key, now);
prune(now);
return false;
},
clear: () => {
cache.clear();
},
size: () => cache.size,
};
}

View File

@@ -1,5 +1,6 @@
import { beforeEach, describe, expect, it, vi } from "vitest";
import { resetInboundDedupe } from "../auto-reply/reply/inbound-dedupe.js";
import type { ClawdbotConfig } from "../config/config.js";
import {
peekSystemEvents,
@@ -61,6 +62,7 @@ vi.mock("./daemon.js", () => ({
const flush = () => new Promise((resolve) => setTimeout(resolve, 0));
beforeEach(() => {
resetInboundDedupe();
config = {
messages: { responsePrefix: "PFX" },
signal: { autoStart: false, dmPolicy: "open", allowFrom: ["*"] },

View File

@@ -1,6 +1,7 @@
import { beforeEach, describe, expect, it, vi } from "vitest";
import { HISTORY_CONTEXT_MARKER } from "../auto-reply/reply/history.js";
import { resetInboundDedupe } from "../auto-reply/reply/inbound-dedupe.js";
import { CURRENT_MESSAGE_MARKER } from "../auto-reply/reply/mentions.js";
import { monitorSlackProvider } from "./monitor.js";
@@ -100,6 +101,7 @@ async function waitForEvent(name: string) {
}
beforeEach(() => {
resetInboundDedupe();
config = {
messages: {
responsePrefix: "PFX",

View File

@@ -50,6 +50,7 @@ import {
updateLastRoute,
} from "../config/sessions.js";
import { danger, logVerbose, shouldLogVerbose } from "../globals.js";
import { createDedupeCache } from "../infra/dedupe.js";
import { enqueueSystemEvent } from "../infra/system-events.js";
import { getChildLogger } from "../logging.js";
import { type FetchLike, fetchRemoteMedia } from "../media/fetch.js";
@@ -516,24 +517,11 @@ export async function monitorSlackProvider(opts: MonitorSlackOpts = {}) {
}
>();
const userCache = new Map<string, { name?: string }>();
const seenMessages = new Map<string, number>();
const seenMessages = createDedupeCache({ ttlMs: 60_000, maxSize: 500 });
const markMessageSeen = (channelId: string | undefined, ts?: string) => {
if (!channelId || !ts) return false;
const key = `${channelId}:${ts}`;
if (seenMessages.has(key)) return true;
seenMessages.set(key, Date.now());
if (seenMessages.size > 500) {
const cutoff = Date.now() - 60_000;
for (const [entry, seenAt] of seenMessages) {
if (seenAt < cutoff || seenMessages.size > 450) {
seenMessages.delete(entry);
} else {
break;
}
}
}
return false;
return seenMessages.check(`${channelId}:${ts}`);
};
const app = new App({

View File

@@ -1,4 +1,5 @@
import { beforeEach, describe, expect, it, vi } from "vitest";
import { resetInboundDedupe } from "../auto-reply/reply/inbound-dedupe.js";
const useSpy = vi.fn();
const middlewareUseSpy = vi.fn();
@@ -16,6 +17,10 @@ const apiStub: ApiStub = {
sendChatAction: sendChatActionSpy,
};
beforeEach(() => {
resetInboundDedupe();
});
vi.mock("grammy", () => ({
Bot: class {
api = apiStub;

View File

@@ -2,6 +2,7 @@ import fs from "node:fs";
import os from "node:os";
import path from "node:path";
import { beforeEach, describe, expect, it, vi } from "vitest";
import { resetInboundDedupe } from "../auto-reply/reply/inbound-dedupe.js";
import * as replyModule from "../auto-reply/reply.js";
import { createTelegramBot, getTelegramSequentialKey } from "./bot.js";
import { resolveTelegramFetch } from "./fetch.js";
@@ -124,6 +125,7 @@ const getOnHandler = (event: string) => {
describe("createTelegramBot", () => {
beforeEach(() => {
resetInboundDedupe();
loadConfig.mockReturnValue({
telegram: { dmPolicy: "open", allowFrom: ["*"] },
});

View File

@@ -45,6 +45,7 @@ import {
updateLastRoute,
} from "../config/sessions.js";
import { danger, logVerbose, shouldLogVerbose } from "../globals.js";
import { createDedupeCache } from "../infra/dedupe.js";
import { formatErrorMessage } from "../infra/errors.js";
import { recordProviderActivity } from "../infra/provider-activity.js";
import { getChildLogger } from "../logging.js";
@@ -120,32 +121,11 @@ const buildTelegramUpdateKey = (ctx: TelegramUpdateKeyContext) => {
return undefined;
};
const shouldSkipTelegramUpdate = (
cache: Map<string, { ts: number }>,
key?: string,
) => {
if (!key) return false;
const now = Date.now();
const existing = cache.get(key);
if (existing && now - existing.ts < RECENT_TELEGRAM_UPDATE_TTL_MS) {
return true;
}
if (existing) cache.delete(key);
cache.set(key, { ts: now });
if (cache.size > RECENT_TELEGRAM_UPDATE_MAX) {
for (const [cachedKey, entry] of cache) {
if (now - entry.ts > RECENT_TELEGRAM_UPDATE_TTL_MS) {
cache.delete(cachedKey);
}
}
while (cache.size > RECENT_TELEGRAM_UPDATE_MAX) {
const oldestKey = cache.keys().next().value as string | undefined;
if (!oldestKey) break;
cache.delete(oldestKey);
}
}
return false;
};
const createTelegramUpdateDedupe = () =>
createDedupeCache({
ttlMs: RECENT_TELEGRAM_UPDATE_TTL_MS,
maxSize: RECENT_TELEGRAM_UPDATE_MAX,
});
/** Telegram Location object */
interface TelegramLocation {
@@ -233,10 +213,10 @@ export function createTelegramBot(opts: TelegramBotOptions) {
bot.api.config.use(apiThrottler());
bot.use(sequentialize(getTelegramSequentialKey));
const recentUpdates = new Map<string, { ts: number }>();
const recentUpdates = createTelegramUpdateDedupe();
const shouldSkipUpdate = (ctx: TelegramUpdateKeyContext) => {
const key = buildTelegramUpdateKey(ctx);
const skipped = shouldSkipTelegramUpdate(recentUpdates, key);
const skipped = recentUpdates.check(key);
if (skipped && key && shouldLogVerbose()) {
logVerbose(`telegram dedupe: skipped ${key}`);
}
@@ -388,7 +368,7 @@ export function createTelegramBot(opts: TelegramBotOptions) {
primaryCtx: TelegramContext,
allMedia: Array<{ path: string; contentType?: string }>,
storeAllowFrom: string[],
options?: { forceWasMentioned?: boolean },
options?: { forceWasMentioned?: boolean; messageIdOverride?: string },
) => {
const msg = primaryCtx.message;
recordProviderActivity({
@@ -720,7 +700,7 @@ export function createTelegramBot(opts: TelegramBotOptions) {
SenderUsername: senderUsername || undefined,
Provider: "telegram",
Surface: "telegram",
MessageSid: String(msg.message_id),
MessageSid: options?.messageIdOverride ?? String(msg.message_id),
ReplyToId: replyTarget?.id,
ReplyToBody: replyTarget?.body,
ReplyToSender: replyTarget?.sender,
@@ -1163,7 +1143,7 @@ export function createTelegramBot(opts: TelegramBotOptions) {
{ message: syntheticMessage, me: ctx.me, getFile },
[],
storeAllowFrom,
{ forceWasMentioned: true },
{ forceWasMentioned: true, messageIdOverride: callback.id },
);
} catch (err) {
runtime.error?.(danger(`callback handler failed: ${String(err)}`));

View File

@@ -17,6 +17,7 @@ vi.mock("../agents/pi-embedded.js", () => ({
}));
import { runEmbeddedPiAgent } from "../agents/pi-embedded.js";
import { resetInboundDedupe } from "../auto-reply/reply/inbound-dedupe.js";
import { getReplyFromConfig } from "../auto-reply/reply.js";
import type { ClawdbotConfig } from "../config/config.js";
import { resetLogger, setLoggerOverride } from "../logging.js";
@@ -57,6 +58,7 @@ const rmDirWithRetries = async (dir: string): Promise<void> => {
};
beforeEach(async () => {
resetInboundDedupe();
previousHome = process.env.HOME;
tempHome = await fs.mkdtemp(path.join(os.tmpdir(), "clawdbot-web-home-"));
process.env.HOME = tempHome;

View File

@@ -106,11 +106,12 @@ vi.mock("./session.js", () => {
};
});
import { monitorWebInbox } from "./inbound.js";
import { monitorWebInbox, resetWebInboundDedupe } from "./inbound.js";
describe("web inbound media saves with extension", () => {
beforeEach(() => {
saveMediaBufferSpy.mockClear();
resetWebInboundDedupe();
});
beforeAll(async () => {

View File

@@ -14,6 +14,7 @@ import {
import { loadConfig } from "../config/config.js";
import { logVerbose, shouldLogVerbose } from "../globals.js";
import { createDedupeCache } from "../infra/dedupe.js";
import { recordProviderActivity } from "../infra/provider-activity.js";
import { createSubsystemLogger, getChildLogger } from "../logging.js";
import { saveMediaBuffer } from "../media/store.js";
@@ -48,6 +49,17 @@ export type WebListenerCloseReason = {
error?: unknown;
};
const RECENT_WEB_MESSAGE_TTL_MS = 20 * 60_000;
const RECENT_WEB_MESSAGE_MAX = 5000;
const recentInboundMessages = createDedupeCache({
ttlMs: RECENT_WEB_MESSAGE_TTL_MS,
maxSize: RECENT_WEB_MESSAGE_MAX,
});
export function resetWebInboundDedupe(): void {
recentInboundMessages.clear();
}
export type WebInboundMessage = {
id?: string;
from: string; // conversation id: E.164 for direct chats, group JID for groups
@@ -117,7 +129,6 @@ export async function monitorWebInbox(options: {
}
const selfJid = sock.user?.id;
const selfE164 = selfJid ? jidToE164(selfJid) : null;
const seen = new Set<string>();
const groupMetaCache = new Map<
string,
{ subject?: string; participants?: string[]; expires: number }
@@ -169,9 +180,6 @@ export async function monitorWebInbox(options: {
direction: "inbound",
});
const id = msg.key?.id ?? undefined;
// De-dupe on message id; Baileys can emit retries.
if (id && seen.has(id)) continue;
if (id) seen.add(id);
// Note: not filtering fromMe here - echo detection happens in auto-reply layer
const remoteJid = msg.key?.remoteJid;
if (!remoteJid) continue;
@@ -179,6 +187,10 @@ export async function monitorWebInbox(options: {
if (remoteJid.endsWith("@status") || remoteJid.endsWith("@broadcast"))
continue;
const group = isJidGroup(remoteJid);
if (id) {
const dedupeKey = `${options.accountId}:${remoteJid}:${id}`;
if (recentInboundMessages.check(dedupeKey)) continue;
}
const participantJid = msg.key?.participant ?? undefined;
const from = group ? remoteJid : await resolveInboundJid(remoteJid);
// Skip if we still can't resolve an id to key conversation

View File

@@ -76,7 +76,7 @@ import path from "node:path";
import { afterEach, beforeEach, describe, expect, it } from "vitest";
import { resetLogger, setLoggerOverride } from "../logging.js";
import { monitorWebInbox } from "./inbound.js";
import { monitorWebInbox, resetWebInboundDedupe } from "./inbound.js";
const ACCOUNT_ID = "default";
let authDir: string;
@@ -89,6 +89,7 @@ describe("web monitor inbox", () => {
code: "PAIRCODE",
created: true,
});
resetWebInboundDedupe();
authDir = fsSync.mkdtempSync(path.join(os.tmpdir(), "clawdbot-auth-"));
});
@@ -151,6 +152,39 @@ describe("web monitor inbox", () => {
await listener.close();
});
it("deduplicates redelivered messages by id", async () => {
const onMessage = vi.fn(async () => {
return;
});
const listener = await monitorWebInbox({
verbose: false,
onMessage,
accountId: ACCOUNT_ID,
authDir,
});
const sock = await createWaSocket();
const upsert = {
type: "notify",
messages: [
{
key: { id: "abc", fromMe: false, remoteJid: "999@s.whatsapp.net" },
message: { conversation: "ping" },
messageTimestamp: 1_700_000_000,
pushName: "Tester",
},
],
};
sock.ev.emit("messages.upsert", upsert);
sock.ev.emit("messages.upsert", upsert);
await new Promise((resolve) => setImmediate(resolve));
expect(onMessage).toHaveBeenCalledTimes(1);
await listener.close();
});
it("resolves LID JIDs using Baileys LID mapping store", async () => {
const onMessage = vi.fn(async () => {
return;