diff --git a/CHANGELOG.md b/CHANGELOG.md index 8ea5ae402..ea11920be 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -57,6 +57,7 @@ - Auto-reply: relax reply tag parsing to allow whitespace. (#560) — thanks @mcinteerj - Auto-reply: add per-provider block streaming toggles and coalesce streamed blocks to reduce line spam. (#536) — thanks @mcinteerj - Auto-reply: fix /status usage summary filtering for the active provider. +- Auto-reply: deduplicate followup queue entries using message id/routing to avoid duplicate replies. (#600) — thanks @samratjha96 - Status: show provider prefix in /status model display. (#506) — thanks @mcinteerj - Status: compact /status with session token usage + estimated cost, add `/cost` per-response usage lines (tokens-only for OAuth). - Status: show active auth profile and key snippet in /status. diff --git a/src/auto-reply/reply.ts b/src/auto-reply/reply.ts index a8e7de8fa..c8d0d664d 100644 --- a/src/auto-reply/reply.ts +++ b/src/auto-reply/reply.ts @@ -846,6 +846,7 @@ export async function getReplyFromConfig( const authProfileId = sessionEntry?.authProfileOverride; const followupRun = { prompt: queuedBody, + messageId: sessionCtx.MessageSid, summaryLine: baseBodyTrimmedRaw, enqueuedAt: Date.now(), // Originating channel for reply routing. diff --git a/src/auto-reply/reply/queue.collect-routing.test.ts b/src/auto-reply/reply/queue.collect-routing.test.ts index 118a098cd..74295f1ae 100644 --- a/src/auto-reply/reply/queue.collect-routing.test.ts +++ b/src/auto-reply/reply/queue.collect-routing.test.ts @@ -6,14 +6,20 @@ import { enqueueFollowupRun, scheduleFollowupDrain } from "./queue.js"; function createRun(params: { prompt: string; + messageId?: string; originatingChannel?: FollowupRun["originatingChannel"]; originatingTo?: string; + originatingAccountId?: string; + originatingThreadId?: number; }): FollowupRun { return { prompt: params.prompt, + messageId: params.messageId, enqueuedAt: Date.now(), originatingChannel: params.originatingChannel, originatingTo: params.originatingTo, + originatingAccountId: params.originatingAccountId, + originatingThreadId: params.originatingThreadId, run: { agentId: "agent", agentDir: "/tmp", @@ -48,6 +54,7 @@ describe("followup queue deduplication", () => { key, createRun({ prompt: "[Discord Guild #test channel id:123] Hello", + messageId: "m1", originatingChannel: "discord", originatingTo: "channel:123", }), @@ -55,11 +62,12 @@ describe("followup queue deduplication", () => { ); expect(first).toBe(true); - // Second enqueue with same prompt should be deduplicated + // Second enqueue with same message id should be deduplicated const second = enqueueFollowupRun( key, createRun({ - prompt: "[Discord Guild #test channel id:123] Hello", + prompt: "[Discord Guild #test channel id:123] Hello (dupe)", + messageId: "m1", originatingChannel: "discord", originatingTo: "channel:123", }), @@ -67,11 +75,12 @@ describe("followup queue deduplication", () => { ); expect(second).toBe(false); - // Third enqueue with different prompt should succeed + // Third enqueue with different message id should succeed const third = enqueueFollowupRun( key, createRun({ prompt: "[Discord Guild #test channel id:123] World", + messageId: "m2", originatingChannel: "discord", originatingTo: "channel:123", }), @@ -87,7 +96,7 @@ describe("followup queue deduplication", () => { ); }); - it("deduplicates across different providers using exact prompt match", async () => { + it("deduplicates exact prompt when routing matches and no message id", async () => { const key = `test-dedup-whatsapp-${Date.now()}`; const settings: QueueSettings = { mode: "collect", @@ -132,6 +141,38 @@ describe("followup queue deduplication", () => { ); expect(third).toBe(true); }); + + it("does not deduplicate across different providers without message id", async () => { + const key = `test-dedup-cross-provider-${Date.now()}`; + const settings: QueueSettings = { + mode: "collect", + debounceMs: 0, + cap: 50, + dropPolicy: "summarize", + }; + + const first = enqueueFollowupRun( + key, + createRun({ + prompt: "Same text", + originatingChannel: "whatsapp", + originatingTo: "+1234567890", + }), + settings, + ); + expect(first).toBe(true); + + const second = enqueueFollowupRun( + key, + createRun({ + prompt: "Same text", + originatingChannel: "discord", + originatingTo: "channel:123", + }), + settings, + ); + expect(second).toBe(true); + }); }); describe("followup queue collect routing", () => { diff --git a/src/auto-reply/reply/queue.ts b/src/auto-reply/reply/queue.ts index 4bb7ec630..7fabf083b 100644 --- a/src/auto-reply/reply/queue.ts +++ b/src/auto-reply/reply/queue.ts @@ -27,6 +27,8 @@ export type QueueSettings = { }; export type FollowupRun = { prompt: string; + /** Provider message ID, when available (for deduplication). */ + messageId?: string; summaryLine?: string; enqueuedAt: number; /** @@ -338,13 +340,27 @@ function getFollowupQueue( return created; } /** - * Check if a prompt is already queued using exact match. + * Check if a run is already queued using a stable dedup key. */ -function isPromptAlreadyQueued( - prompt: string, +function isRunAlreadyQueued( + run: FollowupRun, queue: FollowupQueueState, ): boolean { - return queue.items.some((item) => item.prompt === prompt); + const hasSameRouting = (item: FollowupRun) => + item.originatingChannel === run.originatingChannel && + item.originatingTo === run.originatingTo && + item.originatingAccountId === run.originatingAccountId && + item.originatingThreadId === run.originatingThreadId; + + const messageId = run.messageId?.trim(); + if (messageId) { + return queue.items.some( + (item) => item.messageId?.trim() === messageId && hasSameRouting(item), + ); + } + return queue.items.some( + (item) => item.prompt === run.prompt && hasSameRouting(item), + ); } export function enqueueFollowupRun( @@ -353,14 +369,15 @@ export function enqueueFollowupRun( settings: QueueSettings, ): boolean { const queue = getFollowupQueue(key, settings); - queue.lastEnqueuedAt = Date.now(); - queue.lastRun = run.run; - // Deduplicate: skip if the same prompt is already queued. - if (isPromptAlreadyQueued(run.prompt, queue)) { + // Deduplicate: skip if the same message is already queued. + if (isRunAlreadyQueued(run, queue)) { return false; } + queue.lastEnqueuedAt = Date.now(); + queue.lastRun = run.run; + const cap = queue.cap; if (cap > 0 && queue.items.length >= cap) { if (queue.dropPolicy === "new") {