refactor: unify queue drop handling

This commit is contained in:
Peter Steinberger
2026-01-17 06:37:45 +00:00
parent a4178e4062
commit eb8a0510e0
3 changed files with 59 additions and 39 deletions

View File

@@ -6,8 +6,8 @@ import {
normalizeDeliveryContext, normalizeDeliveryContext,
} from "../utils/delivery-context.js"; } from "../utils/delivery-context.js";
import { import {
applyQueueDropPolicy,
buildCollectPrompt, buildCollectPrompt,
buildQueueSummaryLine,
buildQueueSummaryPrompt, buildQueueSummaryPrompt,
hasCrossChannelItems, hasCrossChannelItems,
waitForQueueDebounce, waitForQueueDebounce,
@@ -156,22 +156,15 @@ export function enqueueAnnounce(params: {
const queue = getAnnounceQueue(params.key, params.settings, params.send); const queue = getAnnounceQueue(params.key, params.settings, params.send);
queue.lastEnqueuedAt = Date.now(); queue.lastEnqueuedAt = Date.now();
const cap = queue.cap; const shouldEnqueue = applyQueueDropPolicy({
if (cap > 0 && queue.items.length >= cap) { queue,
summarize: (item) => item.summaryLine?.trim() || item.prompt.trim(),
});
if (!shouldEnqueue) {
if (queue.dropPolicy === "new") { if (queue.dropPolicy === "new") {
scheduleAnnounceDrain(params.key); scheduleAnnounceDrain(params.key);
return false;
}
const dropCount = queue.items.length - cap + 1;
const dropped = queue.items.splice(0, dropCount);
if (queue.dropPolicy === "summarize") {
for (const droppedItem of dropped) {
queue.droppedCount += 1;
const base = droppedItem.summaryLine?.trim() || droppedItem.prompt.trim();
queue.summaryLines.push(buildQueueSummaryLine(base));
}
while (queue.summaryLines.length > cap) queue.summaryLines.shift();
} }
return false;
} }
const origin = normalizeDeliveryContext(params.item.origin); const origin = normalizeDeliveryContext(params.item.origin);

View File

@@ -1,4 +1,7 @@
import { buildQueueSummaryLine } from "../../../utils/queue-helpers.js"; import {
applyQueueDropPolicy,
shouldSkipQueueItem,
} from "../../../utils/queue-helpers.js";
import { FOLLOWUP_QUEUES, getFollowupQueue } from "./state.js"; import { FOLLOWUP_QUEUES, getFollowupQueue } from "./state.js";
import type { FollowupRun, QueueDedupeMode, QueueSettings } from "./types.js"; import type { FollowupRun, QueueDedupeMode, QueueSettings } from "./types.js";
@@ -28,36 +31,23 @@ export function enqueueFollowupRun(
dedupeMode: QueueDedupeMode = "message-id", dedupeMode: QueueDedupeMode = "message-id",
): boolean { ): boolean {
const queue = getFollowupQueue(key, settings); const queue = getFollowupQueue(key, settings);
const dedupe =
dedupeMode === "none"
? undefined
: (item: FollowupRun, items: FollowupRun[]) =>
isRunAlreadyQueued(item, items, dedupeMode === "prompt");
// Deduplicate: skip if the same message is already queued. // Deduplicate: skip if the same message is already queued.
if (dedupeMode !== "none") { if (shouldSkipQueueItem({ item: run, items: queue.items, dedupe })) return false;
if (dedupeMode === "message-id" && isRunAlreadyQueued(run, queue.items)) {
return false;
}
if (dedupeMode === "prompt" && isRunAlreadyQueued(run, queue.items, true)) {
return false;
}
}
queue.lastEnqueuedAt = Date.now(); queue.lastEnqueuedAt = Date.now();
queue.lastRun = run.run; queue.lastRun = run.run;
const cap = queue.cap; const shouldEnqueue = applyQueueDropPolicy({
if (cap > 0 && queue.items.length >= cap) { queue,
if (queue.dropPolicy === "new") { summarize: (item) => item.summaryLine?.trim() || item.prompt.trim(),
return false; });
} if (!shouldEnqueue) return false;
const dropCount = queue.items.length - cap + 1;
const dropped = queue.items.splice(0, dropCount);
if (queue.dropPolicy === "summarize") {
for (const item of dropped) {
queue.droppedCount += 1;
const base = item.summaryLine?.trim() || item.prompt.trim();
queue.summaryLines.push(buildQueueSummaryLine(base));
}
while (queue.summaryLines.length > cap) queue.summaryLines.shift();
}
}
queue.items.push(run); queue.items.push(run);
return true; return true;

View File

@@ -4,6 +4,13 @@ export type QueueSummaryState = {
summaryLines: string[]; summaryLines: string[];
}; };
export type QueueDropPolicy = QueueSummaryState["dropPolicy"];
export type QueueState<T> = QueueSummaryState & {
items: T[];
cap: number;
};
export function elideQueueText(text: string, limit = 140): string { export function elideQueueText(text: string, limit = 140): string {
if (text.length <= limit) return text; if (text.length <= limit) return text;
return `${text.slice(0, Math.max(0, limit - 1)).trimEnd()}`; return `${text.slice(0, Math.max(0, limit - 1)).trimEnd()}`;
@@ -14,6 +21,36 @@ export function buildQueueSummaryLine(text: string, limit = 160): string {
return elideQueueText(cleaned, limit); return elideQueueText(cleaned, limit);
} }
export function shouldSkipQueueItem<T>(params: {
item: T;
items: T[];
dedupe?: (item: T, items: T[]) => boolean;
}): boolean {
if (!params.dedupe) return false;
return params.dedupe(params.item, params.items);
}
export function applyQueueDropPolicy<T>(params: {
queue: QueueState<T>;
summarize: (item: T) => string;
summaryLimit?: number;
}): boolean {
const cap = params.queue.cap;
if (cap <= 0 || params.queue.items.length < cap) return true;
if (params.queue.dropPolicy === "new") return false;
const dropCount = params.queue.items.length - cap + 1;
const dropped = params.queue.items.splice(0, dropCount);
if (params.queue.dropPolicy === "summarize") {
for (const item of dropped) {
params.queue.droppedCount += 1;
params.queue.summaryLines.push(buildQueueSummaryLine(params.summarize(item)));
}
const limit = Math.max(0, params.summaryLimit ?? cap);
while (params.queue.summaryLines.length > limit) params.queue.summaryLines.shift();
}
return true;
}
export function waitForQueueDebounce(queue: { export function waitForQueueDebounce(queue: {
debounceMs: number; debounceMs: number;
lastEnqueuedAt: number; lastEnqueuedAt: number;