Merge pull request #600 from samratjha96/fix/discord-duplicate-messages
fix(queue): deduplicate followup queue entries
This commit is contained in:
@@ -57,6 +57,7 @@
|
|||||||
- Auto-reply: relax reply tag parsing to allow whitespace. (#560) — thanks @mcinteerj
|
- Auto-reply: relax reply tag parsing to allow whitespace. (#560) — thanks @mcinteerj
|
||||||
- Auto-reply: add per-provider block streaming toggles and coalesce streamed blocks to reduce line spam. (#536) — thanks @mcinteerj
|
- Auto-reply: add per-provider block streaming toggles and coalesce streamed blocks to reduce line spam. (#536) — thanks @mcinteerj
|
||||||
- Auto-reply: fix /status usage summary filtering for the active provider.
|
- Auto-reply: fix /status usage summary filtering for the active provider.
|
||||||
|
- Auto-reply: deduplicate followup queue entries using message id/routing to avoid duplicate replies. (#600) — thanks @samratjha96
|
||||||
- Status: show provider prefix in /status model display. (#506) — thanks @mcinteerj
|
- Status: show provider prefix in /status model display. (#506) — thanks @mcinteerj
|
||||||
- Status: compact /status with session token usage + estimated cost, add `/cost` per-response usage lines (tokens-only for OAuth).
|
- Status: compact /status with session token usage + estimated cost, add `/cost` per-response usage lines (tokens-only for OAuth).
|
||||||
- Status: show active auth profile and key snippet in /status.
|
- Status: show active auth profile and key snippet in /status.
|
||||||
|
|||||||
@@ -846,6 +846,7 @@ export async function getReplyFromConfig(
|
|||||||
const authProfileId = sessionEntry?.authProfileOverride;
|
const authProfileId = sessionEntry?.authProfileOverride;
|
||||||
const followupRun = {
|
const followupRun = {
|
||||||
prompt: queuedBody,
|
prompt: queuedBody,
|
||||||
|
messageId: sessionCtx.MessageSid,
|
||||||
summaryLine: baseBodyTrimmedRaw,
|
summaryLine: baseBodyTrimmedRaw,
|
||||||
enqueuedAt: Date.now(),
|
enqueuedAt: Date.now(),
|
||||||
// Originating channel for reply routing.
|
// Originating channel for reply routing.
|
||||||
|
|||||||
@@ -6,14 +6,20 @@ import { enqueueFollowupRun, scheduleFollowupDrain } from "./queue.js";
|
|||||||
|
|
||||||
function createRun(params: {
|
function createRun(params: {
|
||||||
prompt: string;
|
prompt: string;
|
||||||
|
messageId?: string;
|
||||||
originatingChannel?: FollowupRun["originatingChannel"];
|
originatingChannel?: FollowupRun["originatingChannel"];
|
||||||
originatingTo?: string;
|
originatingTo?: string;
|
||||||
|
originatingAccountId?: string;
|
||||||
|
originatingThreadId?: number;
|
||||||
}): FollowupRun {
|
}): FollowupRun {
|
||||||
return {
|
return {
|
||||||
prompt: params.prompt,
|
prompt: params.prompt,
|
||||||
|
messageId: params.messageId,
|
||||||
enqueuedAt: Date.now(),
|
enqueuedAt: Date.now(),
|
||||||
originatingChannel: params.originatingChannel,
|
originatingChannel: params.originatingChannel,
|
||||||
originatingTo: params.originatingTo,
|
originatingTo: params.originatingTo,
|
||||||
|
originatingAccountId: params.originatingAccountId,
|
||||||
|
originatingThreadId: params.originatingThreadId,
|
||||||
run: {
|
run: {
|
||||||
agentId: "agent",
|
agentId: "agent",
|
||||||
agentDir: "/tmp",
|
agentDir: "/tmp",
|
||||||
@@ -29,6 +35,146 @@ function createRun(params: {
|
|||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
describe("followup queue deduplication", () => {
|
||||||
|
it("deduplicates messages with same Discord message_id", async () => {
|
||||||
|
const key = `test-dedup-message-id-${Date.now()}`;
|
||||||
|
const calls: FollowupRun[] = [];
|
||||||
|
const runFollowup = async (run: FollowupRun) => {
|
||||||
|
calls.push(run);
|
||||||
|
};
|
||||||
|
const settings: QueueSettings = {
|
||||||
|
mode: "collect",
|
||||||
|
debounceMs: 0,
|
||||||
|
cap: 50,
|
||||||
|
dropPolicy: "summarize",
|
||||||
|
};
|
||||||
|
|
||||||
|
// First enqueue should succeed
|
||||||
|
const first = enqueueFollowupRun(
|
||||||
|
key,
|
||||||
|
createRun({
|
||||||
|
prompt: "[Discord Guild #test channel id:123] Hello",
|
||||||
|
messageId: "m1",
|
||||||
|
originatingChannel: "discord",
|
||||||
|
originatingTo: "channel:123",
|
||||||
|
}),
|
||||||
|
settings,
|
||||||
|
);
|
||||||
|
expect(first).toBe(true);
|
||||||
|
|
||||||
|
// Second enqueue with same message id should be deduplicated
|
||||||
|
const second = enqueueFollowupRun(
|
||||||
|
key,
|
||||||
|
createRun({
|
||||||
|
prompt: "[Discord Guild #test channel id:123] Hello (dupe)",
|
||||||
|
messageId: "m1",
|
||||||
|
originatingChannel: "discord",
|
||||||
|
originatingTo: "channel:123",
|
||||||
|
}),
|
||||||
|
settings,
|
||||||
|
);
|
||||||
|
expect(second).toBe(false);
|
||||||
|
|
||||||
|
// Third enqueue with different message id should succeed
|
||||||
|
const third = enqueueFollowupRun(
|
||||||
|
key,
|
||||||
|
createRun({
|
||||||
|
prompt: "[Discord Guild #test channel id:123] World",
|
||||||
|
messageId: "m2",
|
||||||
|
originatingChannel: "discord",
|
||||||
|
originatingTo: "channel:123",
|
||||||
|
}),
|
||||||
|
settings,
|
||||||
|
);
|
||||||
|
expect(third).toBe(true);
|
||||||
|
|
||||||
|
scheduleFollowupDrain(key, runFollowup);
|
||||||
|
await expect.poll(() => calls.length).toBe(1);
|
||||||
|
// Should collect both unique messages
|
||||||
|
expect(calls[0]?.prompt).toContain(
|
||||||
|
"[Queued messages while agent was busy]",
|
||||||
|
);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("deduplicates exact prompt when routing matches and no message id", async () => {
|
||||||
|
const key = `test-dedup-whatsapp-${Date.now()}`;
|
||||||
|
const settings: QueueSettings = {
|
||||||
|
mode: "collect",
|
||||||
|
debounceMs: 0,
|
||||||
|
cap: 50,
|
||||||
|
dropPolicy: "summarize",
|
||||||
|
};
|
||||||
|
|
||||||
|
// First enqueue should succeed
|
||||||
|
const first = enqueueFollowupRun(
|
||||||
|
key,
|
||||||
|
createRun({
|
||||||
|
prompt: "Hello world",
|
||||||
|
originatingChannel: "whatsapp",
|
||||||
|
originatingTo: "+1234567890",
|
||||||
|
}),
|
||||||
|
settings,
|
||||||
|
);
|
||||||
|
expect(first).toBe(true);
|
||||||
|
|
||||||
|
// Second enqueue with same prompt should be deduplicated
|
||||||
|
const second = enqueueFollowupRun(
|
||||||
|
key,
|
||||||
|
createRun({
|
||||||
|
prompt: "Hello world",
|
||||||
|
originatingChannel: "whatsapp",
|
||||||
|
originatingTo: "+1234567890",
|
||||||
|
}),
|
||||||
|
settings,
|
||||||
|
);
|
||||||
|
expect(second).toBe(false);
|
||||||
|
|
||||||
|
// Third enqueue with different prompt should succeed
|
||||||
|
const third = enqueueFollowupRun(
|
||||||
|
key,
|
||||||
|
createRun({
|
||||||
|
prompt: "Hello world 2",
|
||||||
|
originatingChannel: "whatsapp",
|
||||||
|
originatingTo: "+1234567890",
|
||||||
|
}),
|
||||||
|
settings,
|
||||||
|
);
|
||||||
|
expect(third).toBe(true);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("does not deduplicate across different providers without message id", async () => {
|
||||||
|
const key = `test-dedup-cross-provider-${Date.now()}`;
|
||||||
|
const settings: QueueSettings = {
|
||||||
|
mode: "collect",
|
||||||
|
debounceMs: 0,
|
||||||
|
cap: 50,
|
||||||
|
dropPolicy: "summarize",
|
||||||
|
};
|
||||||
|
|
||||||
|
const first = enqueueFollowupRun(
|
||||||
|
key,
|
||||||
|
createRun({
|
||||||
|
prompt: "Same text",
|
||||||
|
originatingChannel: "whatsapp",
|
||||||
|
originatingTo: "+1234567890",
|
||||||
|
}),
|
||||||
|
settings,
|
||||||
|
);
|
||||||
|
expect(first).toBe(true);
|
||||||
|
|
||||||
|
const second = enqueueFollowupRun(
|
||||||
|
key,
|
||||||
|
createRun({
|
||||||
|
prompt: "Same text",
|
||||||
|
originatingChannel: "discord",
|
||||||
|
originatingTo: "channel:123",
|
||||||
|
}),
|
||||||
|
settings,
|
||||||
|
);
|
||||||
|
expect(second).toBe(true);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
describe("followup queue collect routing", () => {
|
describe("followup queue collect routing", () => {
|
||||||
it("does not collect when destinations differ", async () => {
|
it("does not collect when destinations differ", async () => {
|
||||||
const key = `test-collect-diff-to-${Date.now()}`;
|
const key = `test-collect-diff-to-${Date.now()}`;
|
||||||
|
|||||||
@@ -27,6 +27,8 @@ export type QueueSettings = {
|
|||||||
};
|
};
|
||||||
export type FollowupRun = {
|
export type FollowupRun = {
|
||||||
prompt: string;
|
prompt: string;
|
||||||
|
/** Provider message ID, when available (for deduplication). */
|
||||||
|
messageId?: string;
|
||||||
summaryLine?: string;
|
summaryLine?: string;
|
||||||
enqueuedAt: number;
|
enqueuedAt: number;
|
||||||
/**
|
/**
|
||||||
@@ -337,14 +339,45 @@ function getFollowupQueue(
|
|||||||
FOLLOWUP_QUEUES.set(key, created);
|
FOLLOWUP_QUEUES.set(key, created);
|
||||||
return created;
|
return created;
|
||||||
}
|
}
|
||||||
|
/**
|
||||||
|
* Check if a run is already queued using a stable dedup key.
|
||||||
|
*/
|
||||||
|
function isRunAlreadyQueued(
|
||||||
|
run: FollowupRun,
|
||||||
|
queue: FollowupQueueState,
|
||||||
|
): boolean {
|
||||||
|
const hasSameRouting = (item: FollowupRun) =>
|
||||||
|
item.originatingChannel === run.originatingChannel &&
|
||||||
|
item.originatingTo === run.originatingTo &&
|
||||||
|
item.originatingAccountId === run.originatingAccountId &&
|
||||||
|
item.originatingThreadId === run.originatingThreadId;
|
||||||
|
|
||||||
|
const messageId = run.messageId?.trim();
|
||||||
|
if (messageId) {
|
||||||
|
return queue.items.some(
|
||||||
|
(item) => item.messageId?.trim() === messageId && hasSameRouting(item),
|
||||||
|
);
|
||||||
|
}
|
||||||
|
return queue.items.some(
|
||||||
|
(item) => item.prompt === run.prompt && hasSameRouting(item),
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
export function enqueueFollowupRun(
|
export function enqueueFollowupRun(
|
||||||
key: string,
|
key: string,
|
||||||
run: FollowupRun,
|
run: FollowupRun,
|
||||||
settings: QueueSettings,
|
settings: QueueSettings,
|
||||||
): boolean {
|
): boolean {
|
||||||
const queue = getFollowupQueue(key, settings);
|
const queue = getFollowupQueue(key, settings);
|
||||||
|
|
||||||
|
// Deduplicate: skip if the same message is already queued.
|
||||||
|
if (isRunAlreadyQueued(run, queue)) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
queue.lastEnqueuedAt = Date.now();
|
queue.lastEnqueuedAt = Date.now();
|
||||||
queue.lastRun = run.run;
|
queue.lastRun = run.run;
|
||||||
|
|
||||||
const cap = queue.cap;
|
const cap = queue.cap;
|
||||||
if (cap > 0 && queue.items.length >= cap) {
|
if (cap > 0 && queue.items.length >= cap) {
|
||||||
if (queue.dropPolicy === "new") {
|
if (queue.dropPolicy === "new") {
|
||||||
|
|||||||
Reference in New Issue
Block a user