diff --git a/CHANGELOG.md b/CHANGELOG.md index d87f7c099..0631d4d4e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -57,6 +57,7 @@ - Auto-reply: relax reply tag parsing to allow whitespace. (#560) — thanks @mcinteerj - Auto-reply: add per-provider block streaming toggles and coalesce streamed blocks to reduce line spam. (#536) — thanks @mcinteerj - Auto-reply: fix /status usage summary filtering for the active provider. +- Auto-reply: deduplicate followup queue entries using message id/routing to avoid duplicate replies. (#600) — thanks @samratjha96 - Status: show provider prefix in /status model display. (#506) — thanks @mcinteerj - Status: compact /status with session token usage + estimated cost, add `/cost` per-response usage lines (tokens-only for OAuth). - Status: show active auth profile and key snippet in /status. diff --git a/src/auto-reply/reply.ts b/src/auto-reply/reply.ts index a8e7de8fa..c8d0d664d 100644 --- a/src/auto-reply/reply.ts +++ b/src/auto-reply/reply.ts @@ -846,6 +846,7 @@ export async function getReplyFromConfig( const authProfileId = sessionEntry?.authProfileOverride; const followupRun = { prompt: queuedBody, + messageId: sessionCtx.MessageSid, summaryLine: baseBodyTrimmedRaw, enqueuedAt: Date.now(), // Originating channel for reply routing. diff --git a/src/auto-reply/reply/queue.collect-routing.test.ts b/src/auto-reply/reply/queue.collect-routing.test.ts index 089f38ae1..74295f1ae 100644 --- a/src/auto-reply/reply/queue.collect-routing.test.ts +++ b/src/auto-reply/reply/queue.collect-routing.test.ts @@ -6,14 +6,20 @@ import { enqueueFollowupRun, scheduleFollowupDrain } from "./queue.js"; function createRun(params: { prompt: string; + messageId?: string; originatingChannel?: FollowupRun["originatingChannel"]; originatingTo?: string; + originatingAccountId?: string; + originatingThreadId?: number; }): FollowupRun { return { prompt: params.prompt, + messageId: params.messageId, enqueuedAt: Date.now(), originatingChannel: params.originatingChannel, originatingTo: params.originatingTo, + originatingAccountId: params.originatingAccountId, + originatingThreadId: params.originatingThreadId, run: { agentId: "agent", agentDir: "/tmp", @@ -29,6 +35,146 @@ function createRun(params: { }; } +describe("followup queue deduplication", () => { + it("deduplicates messages with same Discord message_id", async () => { + const key = `test-dedup-message-id-${Date.now()}`; + const calls: FollowupRun[] = []; + const runFollowup = async (run: FollowupRun) => { + calls.push(run); + }; + const settings: QueueSettings = { + mode: "collect", + debounceMs: 0, + cap: 50, + dropPolicy: "summarize", + }; + + // First enqueue should succeed + const first = enqueueFollowupRun( + key, + createRun({ + prompt: "[Discord Guild #test channel id:123] Hello", + messageId: "m1", + originatingChannel: "discord", + originatingTo: "channel:123", + }), + settings, + ); + expect(first).toBe(true); + + // Second enqueue with same message id should be deduplicated + const second = enqueueFollowupRun( + key, + createRun({ + prompt: "[Discord Guild #test channel id:123] Hello (dupe)", + messageId: "m1", + originatingChannel: "discord", + originatingTo: "channel:123", + }), + settings, + ); + expect(second).toBe(false); + + // Third enqueue with different message id should succeed + const third = enqueueFollowupRun( + key, + createRun({ + prompt: "[Discord Guild #test channel id:123] World", + messageId: "m2", + originatingChannel: "discord", + originatingTo: "channel:123", + }), + settings, + ); + expect(third).toBe(true); + + scheduleFollowupDrain(key, runFollowup); + await expect.poll(() => calls.length).toBe(1); + // Should collect both unique messages + expect(calls[0]?.prompt).toContain( + "[Queued messages while agent was busy]", + ); + }); + + it("deduplicates exact prompt when routing matches and no message id", async () => { + const key = `test-dedup-whatsapp-${Date.now()}`; + const settings: QueueSettings = { + mode: "collect", + debounceMs: 0, + cap: 50, + dropPolicy: "summarize", + }; + + // First enqueue should succeed + const first = enqueueFollowupRun( + key, + createRun({ + prompt: "Hello world", + originatingChannel: "whatsapp", + originatingTo: "+1234567890", + }), + settings, + ); + expect(first).toBe(true); + + // Second enqueue with same prompt should be deduplicated + const second = enqueueFollowupRun( + key, + createRun({ + prompt: "Hello world", + originatingChannel: "whatsapp", + originatingTo: "+1234567890", + }), + settings, + ); + expect(second).toBe(false); + + // Third enqueue with different prompt should succeed + const third = enqueueFollowupRun( + key, + createRun({ + prompt: "Hello world 2", + originatingChannel: "whatsapp", + originatingTo: "+1234567890", + }), + settings, + ); + expect(third).toBe(true); + }); + + it("does not deduplicate across different providers without message id", async () => { + const key = `test-dedup-cross-provider-${Date.now()}`; + const settings: QueueSettings = { + mode: "collect", + debounceMs: 0, + cap: 50, + dropPolicy: "summarize", + }; + + const first = enqueueFollowupRun( + key, + createRun({ + prompt: "Same text", + originatingChannel: "whatsapp", + originatingTo: "+1234567890", + }), + settings, + ); + expect(first).toBe(true); + + const second = enqueueFollowupRun( + key, + createRun({ + prompt: "Same text", + originatingChannel: "discord", + originatingTo: "channel:123", + }), + settings, + ); + expect(second).toBe(true); + }); +}); + describe("followup queue collect routing", () => { it("does not collect when destinations differ", async () => { const key = `test-collect-diff-to-${Date.now()}`; diff --git a/src/auto-reply/reply/queue.ts b/src/auto-reply/reply/queue.ts index 4b14afa43..7fabf083b 100644 --- a/src/auto-reply/reply/queue.ts +++ b/src/auto-reply/reply/queue.ts @@ -27,6 +27,8 @@ export type QueueSettings = { }; export type FollowupRun = { prompt: string; + /** Provider message ID, when available (for deduplication). */ + messageId?: string; summaryLine?: string; enqueuedAt: number; /** @@ -337,14 +339,45 @@ function getFollowupQueue( FOLLOWUP_QUEUES.set(key, created); return created; } +/** + * Check if a run is already queued using a stable dedup key. + */ +function isRunAlreadyQueued( + run: FollowupRun, + queue: FollowupQueueState, +): boolean { + const hasSameRouting = (item: FollowupRun) => + item.originatingChannel === run.originatingChannel && + item.originatingTo === run.originatingTo && + item.originatingAccountId === run.originatingAccountId && + item.originatingThreadId === run.originatingThreadId; + + const messageId = run.messageId?.trim(); + if (messageId) { + return queue.items.some( + (item) => item.messageId?.trim() === messageId && hasSameRouting(item), + ); + } + return queue.items.some( + (item) => item.prompt === run.prompt && hasSameRouting(item), + ); +} + export function enqueueFollowupRun( key: string, run: FollowupRun, settings: QueueSettings, ): boolean { const queue = getFollowupQueue(key, settings); + + // Deduplicate: skip if the same message is already queued. + if (isRunAlreadyQueued(run, queue)) { + return false; + } + queue.lastEnqueuedAt = Date.now(); queue.lastRun = run.run; + const cap = queue.cap; if (cap > 0 && queue.items.length >= cap) { if (queue.dropPolicy === "new") {