diff --git a/src/agents/subagent-announce-queue.ts b/src/agents/subagent-announce-queue.ts index 29e6770db..d7510b3f5 100644 --- a/src/agents/subagent-announce-queue.ts +++ b/src/agents/subagent-announce-queue.ts @@ -6,8 +6,8 @@ import { normalizeDeliveryContext, } from "../utils/delivery-context.js"; import { + applyQueueDropPolicy, buildCollectPrompt, - buildQueueSummaryLine, buildQueueSummaryPrompt, hasCrossChannelItems, waitForQueueDebounce, @@ -156,22 +156,15 @@ export function enqueueAnnounce(params: { const queue = getAnnounceQueue(params.key, params.settings, params.send); queue.lastEnqueuedAt = Date.now(); - const cap = queue.cap; - if (cap > 0 && queue.items.length >= cap) { + const shouldEnqueue = applyQueueDropPolicy({ + queue, + summarize: (item) => item.summaryLine?.trim() || item.prompt.trim(), + }); + if (!shouldEnqueue) { if (queue.dropPolicy === "new") { scheduleAnnounceDrain(params.key); - return false; - } - const dropCount = queue.items.length - cap + 1; - const dropped = queue.items.splice(0, dropCount); - if (queue.dropPolicy === "summarize") { - for (const droppedItem of dropped) { - queue.droppedCount += 1; - const base = droppedItem.summaryLine?.trim() || droppedItem.prompt.trim(); - queue.summaryLines.push(buildQueueSummaryLine(base)); - } - while (queue.summaryLines.length > cap) queue.summaryLines.shift(); } + return false; } const origin = normalizeDeliveryContext(params.item.origin); diff --git a/src/auto-reply/reply/queue/enqueue.ts b/src/auto-reply/reply/queue/enqueue.ts index f9c57044a..040aeb54e 100644 --- a/src/auto-reply/reply/queue/enqueue.ts +++ b/src/auto-reply/reply/queue/enqueue.ts @@ -1,4 +1,7 @@ -import { buildQueueSummaryLine } from "../../../utils/queue-helpers.js"; +import { + applyQueueDropPolicy, + shouldSkipQueueItem, +} from "../../../utils/queue-helpers.js"; import { FOLLOWUP_QUEUES, getFollowupQueue } from "./state.js"; import type { FollowupRun, QueueDedupeMode, QueueSettings } from "./types.js"; @@ -28,36 +31,23 @@ export function enqueueFollowupRun( dedupeMode: QueueDedupeMode = "message-id", ): boolean { const queue = getFollowupQueue(key, settings); + const dedupe = + dedupeMode === "none" + ? undefined + : (item: FollowupRun, items: FollowupRun[]) => + isRunAlreadyQueued(item, items, dedupeMode === "prompt"); // Deduplicate: skip if the same message is already queued. - if (dedupeMode !== "none") { - if (dedupeMode === "message-id" && isRunAlreadyQueued(run, queue.items)) { - return false; - } - if (dedupeMode === "prompt" && isRunAlreadyQueued(run, queue.items, true)) { - return false; - } - } + if (shouldSkipQueueItem({ item: run, items: queue.items, dedupe })) return false; queue.lastEnqueuedAt = Date.now(); queue.lastRun = run.run; - const cap = queue.cap; - if (cap > 0 && queue.items.length >= cap) { - if (queue.dropPolicy === "new") { - return false; - } - const dropCount = queue.items.length - cap + 1; - const dropped = queue.items.splice(0, dropCount); - if (queue.dropPolicy === "summarize") { - for (const item of dropped) { - queue.droppedCount += 1; - const base = item.summaryLine?.trim() || item.prompt.trim(); - queue.summaryLines.push(buildQueueSummaryLine(base)); - } - while (queue.summaryLines.length > cap) queue.summaryLines.shift(); - } - } + const shouldEnqueue = applyQueueDropPolicy({ + queue, + summarize: (item) => item.summaryLine?.trim() || item.prompt.trim(), + }); + if (!shouldEnqueue) return false; queue.items.push(run); return true; diff --git a/src/utils/queue-helpers.ts b/src/utils/queue-helpers.ts index f5f369c62..e4bac1fe0 100644 --- a/src/utils/queue-helpers.ts +++ b/src/utils/queue-helpers.ts @@ -4,6 +4,13 @@ export type QueueSummaryState = { summaryLines: string[]; }; +export type QueueDropPolicy = QueueSummaryState["dropPolicy"]; + +export type QueueState = QueueSummaryState & { + items: T[]; + cap: number; +}; + export function elideQueueText(text: string, limit = 140): string { if (text.length <= limit) return text; return `${text.slice(0, Math.max(0, limit - 1)).trimEnd()}…`; @@ -14,6 +21,36 @@ export function buildQueueSummaryLine(text: string, limit = 160): string { return elideQueueText(cleaned, limit); } +export function shouldSkipQueueItem(params: { + item: T; + items: T[]; + dedupe?: (item: T, items: T[]) => boolean; +}): boolean { + if (!params.dedupe) return false; + return params.dedupe(params.item, params.items); +} + +export function applyQueueDropPolicy(params: { + queue: QueueState; + summarize: (item: T) => string; + summaryLimit?: number; +}): boolean { + const cap = params.queue.cap; + if (cap <= 0 || params.queue.items.length < cap) return true; + if (params.queue.dropPolicy === "new") return false; + const dropCount = params.queue.items.length - cap + 1; + const dropped = params.queue.items.splice(0, dropCount); + if (params.queue.dropPolicy === "summarize") { + for (const item of dropped) { + params.queue.droppedCount += 1; + params.queue.summaryLines.push(buildQueueSummaryLine(params.summarize(item))); + } + const limit = Math.max(0, params.summaryLimit ?? cap); + while (params.queue.summaryLines.length > limit) params.queue.summaryLines.shift(); + } + return true; +} + export function waitForQueueDebounce(queue: { debounceMs: number; lastEnqueuedAt: number;