From 9185fdc89694f43b113954eeb0dddd8af5931cc4 Mon Sep 17 00:00:00 2001 From: Samrat Jha Date: Fri, 9 Jan 2026 13:35:55 -0500 Subject: [PATCH 1/2] fix(queue): deduplicate followup queue entries to prevent duplicate responses ## Problem When messages arrived while the agent was busy processing a previous message, the same message could be enqueued multiple times into the followup queue. This happened because Discord's event system can emit the same message multiple times (e.g., during reconnects or due to slow listener processing), and the followup queue had no deduplication logic. This caused the bot to respond to the same user message 2-4+ times. ## Solution Add simple exact-match deduplication in `enqueueFollowupRun()`: if a prompt is already in the queue, skip adding it again. Extracted into a small `isPromptAlreadyQueued()` helper for clarity. ## Testing - Added test cases for deduplication (same prompt rejected, different accepted) - Manually verified on Discord: single response per message even when multiple events fire during slow agent processing --- .../reply/queue.collect-routing.test.ts | 105 ++++++++++++++++++ src/auto-reply/reply/queue.ts | 16 +++ 2 files changed, 121 insertions(+) diff --git a/src/auto-reply/reply/queue.collect-routing.test.ts b/src/auto-reply/reply/queue.collect-routing.test.ts index 089f38ae1..118a098cd 100644 --- a/src/auto-reply/reply/queue.collect-routing.test.ts +++ b/src/auto-reply/reply/queue.collect-routing.test.ts @@ -29,6 +29,111 @@ function createRun(params: { }; } +describe("followup queue deduplication", () => { + it("deduplicates messages with same Discord message_id", async () => { + const key = `test-dedup-message-id-${Date.now()}`; + const calls: FollowupRun[] = []; + const runFollowup = async (run: FollowupRun) => { + calls.push(run); + }; + const settings: QueueSettings = { + mode: "collect", + debounceMs: 0, + cap: 50, + dropPolicy: "summarize", + }; + + // First enqueue should succeed + const first = enqueueFollowupRun( + key, + createRun({ + prompt: "[Discord Guild #test channel id:123] Hello", + originatingChannel: "discord", + originatingTo: "channel:123", + }), + settings, + ); + expect(first).toBe(true); + + // Second enqueue with same prompt should be deduplicated + const second = enqueueFollowupRun( + key, + createRun({ + prompt: "[Discord Guild #test channel id:123] Hello", + originatingChannel: "discord", + originatingTo: "channel:123", + }), + settings, + ); + expect(second).toBe(false); + + // Third enqueue with different prompt should succeed + const third = enqueueFollowupRun( + key, + createRun({ + prompt: "[Discord Guild #test channel id:123] World", + originatingChannel: "discord", + originatingTo: "channel:123", + }), + settings, + ); + expect(third).toBe(true); + + scheduleFollowupDrain(key, runFollowup); + await expect.poll(() => calls.length).toBe(1); + // Should collect both unique messages + expect(calls[0]?.prompt).toContain( + "[Queued messages while agent was busy]", + ); + }); + + it("deduplicates across different providers using exact prompt match", async () => { + const key = `test-dedup-whatsapp-${Date.now()}`; + const settings: QueueSettings = { + mode: "collect", + debounceMs: 0, + cap: 50, + dropPolicy: "summarize", + }; + + // First enqueue should succeed + const first = enqueueFollowupRun( + key, + createRun({ + prompt: "Hello world", + originatingChannel: "whatsapp", + originatingTo: "+1234567890", + }), + settings, + ); + expect(first).toBe(true); + + // Second enqueue with same prompt should be deduplicated + const second = enqueueFollowupRun( + key, + createRun({ + prompt: "Hello world", + originatingChannel: "whatsapp", + originatingTo: "+1234567890", + }), + settings, + ); + expect(second).toBe(false); + + // Third enqueue with different prompt should succeed + const third = enqueueFollowupRun( + key, + createRun({ + prompt: "Hello world 2", + originatingChannel: "whatsapp", + originatingTo: "+1234567890", + }), + settings, + ); + expect(third).toBe(true); + }); +}); + describe("followup queue collect routing", () => { it("does not collect when destinations differ", async () => { const key = `test-collect-diff-to-${Date.now()}`; diff --git a/src/auto-reply/reply/queue.ts b/src/auto-reply/reply/queue.ts index 4b14afa43..4bb7ec630 100644 --- a/src/auto-reply/reply/queue.ts +++ b/src/auto-reply/reply/queue.ts @@ -337,6 +337,16 @@ function getFollowupQueue( FOLLOWUP_QUEUES.set(key, created); return created; } +/** + * Check if a prompt is already queued using exact match. + */ +function isPromptAlreadyQueued( + prompt: string, + queue: FollowupQueueState, +): boolean { + return queue.items.some((item) => item.prompt === prompt); +} + export function enqueueFollowupRun( key: string, run: FollowupRun, @@ -345,6 +355,12 @@ export function enqueueFollowupRun( const queue = getFollowupQueue(key, settings); queue.lastEnqueuedAt = Date.now(); queue.lastRun = run.run; + + // Deduplicate: skip if the same prompt is already queued. + if (isPromptAlreadyQueued(run.prompt, queue)) { + return false; + } + const cap = queue.cap; if (cap > 0 && queue.items.length >= cap) { if (queue.dropPolicy === "new") { From d3a0114b6b759ba6b8033acc408eb75860f67883 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Fri, 9 Jan 2026 20:44:11 +0100 Subject: [PATCH 2/2] fix: dedupe followup queue by message id (#600) (thanks @samratjha96) --- CHANGELOG.md | 1 + src/auto-reply/reply.ts | 1 + .../reply/queue.collect-routing.test.ts | 49 +++++++++++++++++-- src/auto-reply/reply/queue.ts | 33 ++++++++++--- 4 files changed, 72 insertions(+), 12 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 8ea5ae402..ea11920be 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -57,6 +57,7 @@ - Auto-reply: relax reply tag parsing to allow whitespace. (#560) — thanks @mcinteerj - Auto-reply: add per-provider block streaming toggles and coalesce streamed blocks to reduce line spam. (#536) — thanks @mcinteerj - Auto-reply: fix /status usage summary filtering for the active provider. +- Auto-reply: deduplicate followup queue entries using message id/routing to avoid duplicate replies. (#600) — thanks @samratjha96 - Status: show provider prefix in /status model display. (#506) — thanks @mcinteerj - Status: compact /status with session token usage + estimated cost, add `/cost` per-response usage lines (tokens-only for OAuth). - Status: show active auth profile and key snippet in /status. diff --git a/src/auto-reply/reply.ts b/src/auto-reply/reply.ts index a8e7de8fa..c8d0d664d 100644 --- a/src/auto-reply/reply.ts +++ b/src/auto-reply/reply.ts @@ -846,6 +846,7 @@ export async function getReplyFromConfig( const authProfileId = sessionEntry?.authProfileOverride; const followupRun = { prompt: queuedBody, + messageId: sessionCtx.MessageSid, summaryLine: baseBodyTrimmedRaw, enqueuedAt: Date.now(), // Originating channel for reply routing. diff --git a/src/auto-reply/reply/queue.collect-routing.test.ts b/src/auto-reply/reply/queue.collect-routing.test.ts index 118a098cd..74295f1ae 100644 --- a/src/auto-reply/reply/queue.collect-routing.test.ts +++ b/src/auto-reply/reply/queue.collect-routing.test.ts @@ -6,14 +6,20 @@ import { enqueueFollowupRun, scheduleFollowupDrain } from "./queue.js"; function createRun(params: { prompt: string; + messageId?: string; originatingChannel?: FollowupRun["originatingChannel"]; originatingTo?: string; + originatingAccountId?: string; + originatingThreadId?: number; }): FollowupRun { return { prompt: params.prompt, + messageId: params.messageId, enqueuedAt: Date.now(), originatingChannel: params.originatingChannel, originatingTo: params.originatingTo, + originatingAccountId: params.originatingAccountId, + originatingThreadId: params.originatingThreadId, run: { agentId: "agent", agentDir: "/tmp", @@ -48,6 +54,7 @@ describe("followup queue deduplication", () => { key, createRun({ prompt: "[Discord Guild #test channel id:123] Hello", + messageId: "m1", originatingChannel: "discord", originatingTo: "channel:123", }), @@ -55,11 +62,12 @@ describe("followup queue deduplication", () => { ); expect(first).toBe(true); - // Second enqueue with same prompt should be deduplicated + // Second enqueue with same message id should be deduplicated const second = enqueueFollowupRun( key, createRun({ - prompt: "[Discord Guild #test channel id:123] Hello", + prompt: "[Discord Guild #test channel id:123] Hello (dupe)", + messageId: "m1", originatingChannel: "discord", originatingTo: "channel:123", }), @@ -67,11 +75,12 @@ describe("followup queue deduplication", () => { ); expect(second).toBe(false); - // Third enqueue with different prompt should succeed + // Third enqueue with different message id should succeed const third = enqueueFollowupRun( key, createRun({ prompt: "[Discord Guild #test channel id:123] World", + messageId: "m2", originatingChannel: "discord", originatingTo: "channel:123", }), @@ -87,7 +96,7 @@ describe("followup queue deduplication", () => { ); }); - it("deduplicates across different providers using exact prompt match", async () => { + it("deduplicates exact prompt when routing matches and no message id", async () => { const key = `test-dedup-whatsapp-${Date.now()}`; const settings: QueueSettings = { mode: "collect", @@ -132,6 +141,38 @@ describe("followup queue deduplication", () => { ); expect(third).toBe(true); }); + + it("does not deduplicate across different providers without message id", async () => { + const key = `test-dedup-cross-provider-${Date.now()}`; + const settings: QueueSettings = { + mode: "collect", + debounceMs: 0, + cap: 50, + dropPolicy: "summarize", + }; + + const first = enqueueFollowupRun( + key, + createRun({ + prompt: "Same text", + originatingChannel: "whatsapp", + originatingTo: "+1234567890", + }), + settings, + ); + expect(first).toBe(true); + + const second = enqueueFollowupRun( + key, + createRun({ + prompt: "Same text", + originatingChannel: "discord", + originatingTo: "channel:123", + }), + settings, + ); + expect(second).toBe(true); + }); }); describe("followup queue collect routing", () => { diff --git a/src/auto-reply/reply/queue.ts b/src/auto-reply/reply/queue.ts index 4bb7ec630..7fabf083b 100644 --- a/src/auto-reply/reply/queue.ts +++ b/src/auto-reply/reply/queue.ts @@ -27,6 +27,8 @@ export type QueueSettings = { }; export type FollowupRun = { prompt: string; + /** Provider message ID, when available (for deduplication). */ + messageId?: string; summaryLine?: string; enqueuedAt: number; /** @@ -338,13 +340,27 @@ function getFollowupQueue( return created; } /** - * Check if a prompt is already queued using exact match. + * Check if a run is already queued using a stable dedup key. */ -function isPromptAlreadyQueued( - prompt: string, +function isRunAlreadyQueued( + run: FollowupRun, queue: FollowupQueueState, ): boolean { - return queue.items.some((item) => item.prompt === prompt); + const hasSameRouting = (item: FollowupRun) => + item.originatingChannel === run.originatingChannel && + item.originatingTo === run.originatingTo && + item.originatingAccountId === run.originatingAccountId && + item.originatingThreadId === run.originatingThreadId; + + const messageId = run.messageId?.trim(); + if (messageId) { + return queue.items.some( + (item) => item.messageId?.trim() === messageId && hasSameRouting(item), + ); + } + return queue.items.some( + (item) => item.prompt === run.prompt && hasSameRouting(item), + ); } export function enqueueFollowupRun( @@ -353,14 +369,15 @@ export function enqueueFollowupRun( settings: QueueSettings, ): boolean { const queue = getFollowupQueue(key, settings); - queue.lastEnqueuedAt = Date.now(); - queue.lastRun = run.run; - // Deduplicate: skip if the same prompt is already queued. - if (isPromptAlreadyQueued(run.prompt, queue)) { + // Deduplicate: skip if the same message is already queued. + if (isRunAlreadyQueued(run, queue)) { return false; } + queue.lastEnqueuedAt = Date.now(); + queue.lastRun = run.run; + const cap = queue.cap; if (cap > 0 && queue.items.length >= cap) { if (queue.dropPolicy === "new") {