fix: dedupe followup queue by message id (#600) (thanks @samratjha96)

This commit is contained in:
Peter Steinberger
2026-01-09 20:44:11 +01:00
parent 9185fdc896
commit d3a0114b6b
4 changed files with 72 additions and 12 deletions

View File

@@ -57,6 +57,7 @@
- Auto-reply: relax reply tag parsing to allow whitespace. (#560) — thanks @mcinteerj - Auto-reply: relax reply tag parsing to allow whitespace. (#560) — thanks @mcinteerj
- Auto-reply: add per-provider block streaming toggles and coalesce streamed blocks to reduce line spam. (#536) — thanks @mcinteerj - Auto-reply: add per-provider block streaming toggles and coalesce streamed blocks to reduce line spam. (#536) — thanks @mcinteerj
- Auto-reply: fix /status usage summary filtering for the active provider. - Auto-reply: fix /status usage summary filtering for the active provider.
- Auto-reply: deduplicate followup queue entries using message id/routing to avoid duplicate replies. (#600) — thanks @samratjha96
- Status: show provider prefix in /status model display. (#506) — thanks @mcinteerj - Status: show provider prefix in /status model display. (#506) — thanks @mcinteerj
- Status: compact /status with session token usage + estimated cost, add `/cost` per-response usage lines (tokens-only for OAuth). - Status: compact /status with session token usage + estimated cost, add `/cost` per-response usage lines (tokens-only for OAuth).
- Status: show active auth profile and key snippet in /status. - Status: show active auth profile and key snippet in /status.

View File

@@ -846,6 +846,7 @@ export async function getReplyFromConfig(
const authProfileId = sessionEntry?.authProfileOverride; const authProfileId = sessionEntry?.authProfileOverride;
const followupRun = { const followupRun = {
prompt: queuedBody, prompt: queuedBody,
messageId: sessionCtx.MessageSid,
summaryLine: baseBodyTrimmedRaw, summaryLine: baseBodyTrimmedRaw,
enqueuedAt: Date.now(), enqueuedAt: Date.now(),
// Originating channel for reply routing. // Originating channel for reply routing.

View File

@@ -6,14 +6,20 @@ import { enqueueFollowupRun, scheduleFollowupDrain } from "./queue.js";
function createRun(params: { function createRun(params: {
prompt: string; prompt: string;
messageId?: string;
originatingChannel?: FollowupRun["originatingChannel"]; originatingChannel?: FollowupRun["originatingChannel"];
originatingTo?: string; originatingTo?: string;
originatingAccountId?: string;
originatingThreadId?: number;
}): FollowupRun { }): FollowupRun {
return { return {
prompt: params.prompt, prompt: params.prompt,
messageId: params.messageId,
enqueuedAt: Date.now(), enqueuedAt: Date.now(),
originatingChannel: params.originatingChannel, originatingChannel: params.originatingChannel,
originatingTo: params.originatingTo, originatingTo: params.originatingTo,
originatingAccountId: params.originatingAccountId,
originatingThreadId: params.originatingThreadId,
run: { run: {
agentId: "agent", agentId: "agent",
agentDir: "/tmp", agentDir: "/tmp",
@@ -48,6 +54,7 @@ describe("followup queue deduplication", () => {
key, key,
createRun({ createRun({
prompt: "[Discord Guild #test channel id:123] Hello", prompt: "[Discord Guild #test channel id:123] Hello",
messageId: "m1",
originatingChannel: "discord", originatingChannel: "discord",
originatingTo: "channel:123", originatingTo: "channel:123",
}), }),
@@ -55,11 +62,12 @@ describe("followup queue deduplication", () => {
); );
expect(first).toBe(true); expect(first).toBe(true);
// Second enqueue with same prompt should be deduplicated // Second enqueue with same message id should be deduplicated
const second = enqueueFollowupRun( const second = enqueueFollowupRun(
key, key,
createRun({ createRun({
prompt: "[Discord Guild #test channel id:123] Hello", prompt: "[Discord Guild #test channel id:123] Hello (dupe)",
messageId: "m1",
originatingChannel: "discord", originatingChannel: "discord",
originatingTo: "channel:123", originatingTo: "channel:123",
}), }),
@@ -67,11 +75,12 @@ describe("followup queue deduplication", () => {
); );
expect(second).toBe(false); expect(second).toBe(false);
// Third enqueue with different prompt should succeed // Third enqueue with different message id should succeed
const third = enqueueFollowupRun( const third = enqueueFollowupRun(
key, key,
createRun({ createRun({
prompt: "[Discord Guild #test channel id:123] World", prompt: "[Discord Guild #test channel id:123] World",
messageId: "m2",
originatingChannel: "discord", originatingChannel: "discord",
originatingTo: "channel:123", originatingTo: "channel:123",
}), }),
@@ -87,7 +96,7 @@ describe("followup queue deduplication", () => {
); );
}); });
it("deduplicates across different providers using exact prompt match", async () => { it("deduplicates exact prompt when routing matches and no message id", async () => {
const key = `test-dedup-whatsapp-${Date.now()}`; const key = `test-dedup-whatsapp-${Date.now()}`;
const settings: QueueSettings = { const settings: QueueSettings = {
mode: "collect", mode: "collect",
@@ -132,6 +141,38 @@ describe("followup queue deduplication", () => {
); );
expect(third).toBe(true); expect(third).toBe(true);
}); });
it("does not deduplicate across different providers without message id", async () => {
const key = `test-dedup-cross-provider-${Date.now()}`;
const settings: QueueSettings = {
mode: "collect",
debounceMs: 0,
cap: 50,
dropPolicy: "summarize",
};
const first = enqueueFollowupRun(
key,
createRun({
prompt: "Same text",
originatingChannel: "whatsapp",
originatingTo: "+1234567890",
}),
settings,
);
expect(first).toBe(true);
const second = enqueueFollowupRun(
key,
createRun({
prompt: "Same text",
originatingChannel: "discord",
originatingTo: "channel:123",
}),
settings,
);
expect(second).toBe(true);
});
}); });
describe("followup queue collect routing", () => { describe("followup queue collect routing", () => {

View File

@@ -27,6 +27,8 @@ export type QueueSettings = {
}; };
export type FollowupRun = { export type FollowupRun = {
prompt: string; prompt: string;
/** Provider message ID, when available (for deduplication). */
messageId?: string;
summaryLine?: string; summaryLine?: string;
enqueuedAt: number; enqueuedAt: number;
/** /**
@@ -338,13 +340,27 @@ function getFollowupQueue(
return created; return created;
} }
/** /**
* Check if a prompt is already queued using exact match. * Check if a run is already queued using a stable dedup key.
*/ */
function isPromptAlreadyQueued( function isRunAlreadyQueued(
prompt: string, run: FollowupRun,
queue: FollowupQueueState, queue: FollowupQueueState,
): boolean { ): boolean {
return queue.items.some((item) => item.prompt === prompt); const hasSameRouting = (item: FollowupRun) =>
item.originatingChannel === run.originatingChannel &&
item.originatingTo === run.originatingTo &&
item.originatingAccountId === run.originatingAccountId &&
item.originatingThreadId === run.originatingThreadId;
const messageId = run.messageId?.trim();
if (messageId) {
return queue.items.some(
(item) => item.messageId?.trim() === messageId && hasSameRouting(item),
);
}
return queue.items.some(
(item) => item.prompt === run.prompt && hasSameRouting(item),
);
} }
export function enqueueFollowupRun( export function enqueueFollowupRun(
@@ -353,14 +369,15 @@ export function enqueueFollowupRun(
settings: QueueSettings, settings: QueueSettings,
): boolean { ): boolean {
const queue = getFollowupQueue(key, settings); const queue = getFollowupQueue(key, settings);
queue.lastEnqueuedAt = Date.now();
queue.lastRun = run.run;
// Deduplicate: skip if the same prompt is already queued. // Deduplicate: skip if the same message is already queued.
if (isPromptAlreadyQueued(run.prompt, queue)) { if (isRunAlreadyQueued(run, queue)) {
return false; return false;
} }
queue.lastEnqueuedAt = Date.now();
queue.lastRun = run.run;
const cap = queue.cap; const cap = queue.cap;
if (cap > 0 && queue.items.length >= cap) { if (cap > 0 && queue.items.length >= cap) {
if (queue.dropPolicy === "new") { if (queue.dropPolicy === "new") {