From c7ae5100fae432f4225fa3ee3aee1c5833f82b4c Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Sat, 17 Jan 2026 06:01:37 +0000 Subject: [PATCH] refactor: share queue helpers Co-authored-by: adam91holt --- src/agents/subagent-announce-queue.ts | 103 ++++++------------------ src/auto-reply/reply/queue/drain.ts | 110 +++++++------------------- src/auto-reply/reply/queue/enqueue.ts | 15 +--- src/utils/queue-helpers.ts | 94 ++++++++++++++++++++++ 4 files changed, 149 insertions(+), 173 deletions(-) create mode 100644 src/utils/queue-helpers.ts diff --git a/src/agents/subagent-announce-queue.ts b/src/agents/subagent-announce-queue.ts index 15dbb087c..29e6770db 100644 --- a/src/agents/subagent-announce-queue.ts +++ b/src/agents/subagent-announce-queue.ts @@ -5,6 +5,13 @@ import { deliveryContextKey, normalizeDeliveryContext, } from "../utils/delivery-context.js"; +import { + buildCollectPrompt, + buildQueueSummaryLine, + buildQueueSummaryPrompt, + hasCrossChannelItems, + waitForQueueDebounce, +} from "../utils/queue-helpers.js"; export type AnnounceQueueItem = { prompt: string; @@ -73,82 +80,6 @@ function getAnnounceQueue( return created; } -function elideText(text: string, limit = 140): string { - if (text.length <= limit) return text; - return `${text.slice(0, Math.max(0, limit - 1)).trimEnd()}…`; -} - -function buildQueueSummaryLine(item: AnnounceQueueItem): string { - const base = item.summaryLine?.trim() || item.prompt.trim(); - const cleaned = base.replace(/\s+/g, " ").trim(); - return elideText(cleaned, 160); -} - -function waitForQueueDebounce(queue: { debounceMs: number; lastEnqueuedAt: number }) { - const debounceMs = Math.max(0, queue.debounceMs); - if (debounceMs <= 0) return Promise.resolve(); - return new Promise((resolve) => { - const check = () => { - const since = Date.now() - queue.lastEnqueuedAt; - if (since >= debounceMs) { - resolve(); - return; - } - setTimeout(check, debounceMs - since); - }; - check(); - }); -} - -function buildSummaryPrompt(queue: { - dropPolicy: QueueDropPolicy; - droppedCount: number; - summaryLines: string[]; -}): string | undefined { - if (queue.dropPolicy !== "summarize" || queue.droppedCount <= 0) { - return undefined; - } - const lines = [ - `[Queue overflow] Dropped ${queue.droppedCount} announce${queue.droppedCount === 1 ? "" : "s"} due to cap.`, - ]; - if (queue.summaryLines.length > 0) { - lines.push("Summary:"); - for (const line of queue.summaryLines) { - lines.push(`- ${line}`); - } - } - queue.droppedCount = 0; - queue.summaryLines = []; - return lines.join("\n"); -} - -function buildCollectPrompt(items: AnnounceQueueItem[], summary?: string): string { - const blocks: string[] = ["[Queued announce messages while agent was busy]"]; - if (summary) blocks.push(summary); - items.forEach((item, idx) => { - blocks.push(`---\nQueued #${idx + 1}\n${item.prompt}`.trim()); - }); - return blocks.join("\n\n"); -} - -function hasCrossChannelItems(items: AnnounceQueueItem[]): boolean { - const keys = new Set(); - let hasUnkeyed = false; - for (const item of items) { - if (!item.origin) { - hasUnkeyed = true; - continue; - } - if (!item.originKey) { - return true; - } - keys.add(item.originKey); - } - if (keys.size === 0) return false; - if (hasUnkeyed) return true; - return keys.size > 1; -} - function scheduleAnnounceDrain(key: string) { const queue = ANNOUNCE_QUEUES.get(key); if (!queue || queue.draining) return; @@ -165,7 +96,11 @@ function scheduleAnnounceDrain(key: string) { await queue.send(next); continue; } - const isCrossChannel = hasCrossChannelItems(queue.items); + const isCrossChannel = hasCrossChannelItems(queue.items, (item) => { + if (!item.origin) return {}; + if (!item.originKey) return { cross: true }; + return { key: item.originKey }; + }); if (isCrossChannel) { forceIndividualCollect = true; const next = queue.items.shift(); @@ -174,15 +109,20 @@ function scheduleAnnounceDrain(key: string) { continue; } const items = queue.items.splice(0, queue.items.length); - const summary = buildSummaryPrompt(queue); - const prompt = buildCollectPrompt(items, summary); + const summary = buildQueueSummaryPrompt({ state: queue, noun: "announce" }); + const prompt = buildCollectPrompt({ + title: "[Queued announce messages while agent was busy]", + items, + summary, + renderItem: (item, idx) => `---\nQueued #${idx + 1}\n${item.prompt}`.trim(), + }); const last = items.at(-1); if (!last) break; await queue.send({ ...last, prompt }); continue; } - const summaryPrompt = buildSummaryPrompt(queue); + const summaryPrompt = buildQueueSummaryPrompt({ state: queue, noun: "announce" }); if (summaryPrompt) { const next = queue.items.shift(); if (!next) break; @@ -227,7 +167,8 @@ export function enqueueAnnounce(params: { if (queue.dropPolicy === "summarize") { for (const droppedItem of dropped) { queue.droppedCount += 1; - queue.summaryLines.push(buildQueueSummaryLine(droppedItem)); + const base = droppedItem.summaryLine?.trim() || droppedItem.prompt.trim(); + queue.summaryLines.push(buildQueueSummaryLine(base)); } while (queue.summaryLines.length > cap) queue.summaryLines.shift(); } diff --git a/src/auto-reply/reply/queue/drain.ts b/src/auto-reply/reply/queue/drain.ts index e76f1b71d..01361a1ec 100644 --- a/src/auto-reply/reply/queue/drain.ts +++ b/src/auto-reply/reply/queue/drain.ts @@ -1,84 +1,14 @@ import { defaultRuntime } from "../../../runtime.js"; +import { + buildCollectPrompt, + buildQueueSummaryPrompt, + hasCrossChannelItems, + waitForQueueDebounce, +} from "../../../utils/queue-helpers.js"; import { isRoutableChannel } from "../route-reply.js"; import { FOLLOWUP_QUEUES } from "./state.js"; import type { FollowupRun } from "./types.js"; -async function waitForQueueDebounce(queue: { debounceMs: number; lastEnqueuedAt: number }) { - const debounceMs = Math.max(0, queue.debounceMs); - if (debounceMs <= 0) return; - while (true) { - const since = Date.now() - queue.lastEnqueuedAt; - if (since >= debounceMs) return; - await new Promise((resolve) => setTimeout(resolve, debounceMs - since)); - } -} - -function buildSummaryPrompt(queue: { - dropPolicy: "summarize" | "old" | "new"; - droppedCount: number; - summaryLines: string[]; -}): string | undefined { - if (queue.dropPolicy !== "summarize" || queue.droppedCount <= 0) { - return undefined; - } - const lines = [ - `[Queue overflow] Dropped ${queue.droppedCount} message${queue.droppedCount === 1 ? "" : "s"} due to cap.`, - ]; - if (queue.summaryLines.length > 0) { - lines.push("Summary:"); - for (const line of queue.summaryLines) { - lines.push(`- ${line}`); - } - } - queue.droppedCount = 0; - queue.summaryLines = []; - return lines.join("\\n"); -} - -function buildCollectPrompt(items: FollowupRun[], summary?: string): string { - const blocks: string[] = ["[Queued messages while agent was busy]"]; - if (summary) blocks.push(summary); - items.forEach((item, idx) => { - blocks.push(`---\\nQueued #${idx + 1}\\n${item.prompt}`.trim()); - }); - return blocks.join("\\n\\n"); -} - -/** - * Checks if queued items have different routable originating channels. - * - * Returns true if messages come from different channels (e.g., Slack + Telegram), - * meaning they cannot be safely collected into one prompt without losing routing. - * Also returns true for a mix of routable and non-routable channels. - */ -function hasCrossChannelItems(items: FollowupRun[]): boolean { - const keys = new Set(); - let hasUnkeyed = false; - - for (const item of items) { - const channel = item.originatingChannel; - const to = item.originatingTo; - const accountId = item.originatingAccountId; - const threadId = item.originatingThreadId; - if (!channel && !to && !accountId && typeof threadId !== "number") { - hasUnkeyed = true; - continue; - } - if (!isRoutableChannel(channel) || !to) { - return true; - } - keys.add( - [channel, to, accountId || "", typeof threadId === "number" ? String(threadId) : ""].join( - "|", - ), - ); - } - - if (keys.size === 0) return false; - if (hasUnkeyed) return true; - return keys.size > 1; -} - export function scheduleFollowupDrain( key: string, runFollowup: (run: FollowupRun) => Promise, @@ -105,7 +35,22 @@ export function scheduleFollowupDrain( // Check if messages span multiple channels. // If so, process individually to preserve per-message routing. - const isCrossChannel = hasCrossChannelItems(queue.items); + const isCrossChannel = hasCrossChannelItems(queue.items, (item) => { + const channel = item.originatingChannel; + const to = item.originatingTo; + const accountId = item.originatingAccountId; + const threadId = item.originatingThreadId; + if (!channel && !to && !accountId && typeof threadId !== "number") { + return {}; + } + if (!isRoutableChannel(channel) || !to) { + return { cross: true }; + } + const threadKey = typeof threadId === "number" ? String(threadId) : ""; + return { + key: [channel, to, accountId || "", threadKey].join("|"), + }; + }); if (isCrossChannel) { forceIndividualCollect = true; @@ -116,7 +61,7 @@ export function scheduleFollowupDrain( } const items = queue.items.splice(0, queue.items.length); - const summary = buildSummaryPrompt(queue); + const summary = buildQueueSummaryPrompt({ state: queue, noun: "message" }); const run = items.at(-1)?.run ?? queue.lastRun; if (!run) break; @@ -130,7 +75,12 @@ export function scheduleFollowupDrain( (i) => typeof i.originatingThreadId === "number", )?.originatingThreadId; - const prompt = buildCollectPrompt(items, summary); + const prompt = buildCollectPrompt({ + title: "[Queued messages while agent was busy]", + items, + summary, + renderItem: (item, idx) => `---\nQueued #${idx + 1}\n${item.prompt}`.trim(), + }); await runFollowup({ prompt, run, @@ -143,7 +93,7 @@ export function scheduleFollowupDrain( continue; } - const summaryPrompt = buildSummaryPrompt(queue); + const summaryPrompt = buildQueueSummaryPrompt({ state: queue, noun: "message" }); if (summaryPrompt) { const run = queue.lastRun; if (!run) break; diff --git a/src/auto-reply/reply/queue/enqueue.ts b/src/auto-reply/reply/queue/enqueue.ts index 4c830d3a3..f9c57044a 100644 --- a/src/auto-reply/reply/queue/enqueue.ts +++ b/src/auto-reply/reply/queue/enqueue.ts @@ -1,17 +1,7 @@ +import { buildQueueSummaryLine } from "../../../utils/queue-helpers.js"; import { FOLLOWUP_QUEUES, getFollowupQueue } from "./state.js"; import type { FollowupRun, QueueDedupeMode, QueueSettings } from "./types.js"; -function elideText(text: string, limit = 140): string { - if (text.length <= limit) return text; - return `${text.slice(0, Math.max(0, limit - 1)).trimEnd()}…`; -} - -function buildQueueSummaryLine(run: FollowupRun): string { - const base = run.summaryLine?.trim() || run.prompt.trim(); - const cleaned = base.replace(/\\s+/g, " ").trim(); - return elideText(cleaned, 160); -} - function isRunAlreadyQueued( run: FollowupRun, items: FollowupRun[], @@ -62,7 +52,8 @@ export function enqueueFollowupRun( if (queue.dropPolicy === "summarize") { for (const item of dropped) { queue.droppedCount += 1; - queue.summaryLines.push(buildQueueSummaryLine(item)); + const base = item.summaryLine?.trim() || item.prompt.trim(); + queue.summaryLines.push(buildQueueSummaryLine(base)); } while (queue.summaryLines.length > cap) queue.summaryLines.shift(); } diff --git a/src/utils/queue-helpers.ts b/src/utils/queue-helpers.ts new file mode 100644 index 000000000..f5f369c62 --- /dev/null +++ b/src/utils/queue-helpers.ts @@ -0,0 +1,94 @@ +export type QueueSummaryState = { + dropPolicy: "summarize" | "old" | "new"; + droppedCount: number; + summaryLines: string[]; +}; + +export function elideQueueText(text: string, limit = 140): string { + if (text.length <= limit) return text; + return `${text.slice(0, Math.max(0, limit - 1)).trimEnd()}…`; +} + +export function buildQueueSummaryLine(text: string, limit = 160): string { + const cleaned = text.replace(/\s+/g, " ").trim(); + return elideQueueText(cleaned, limit); +} + +export function waitForQueueDebounce(queue: { + debounceMs: number; + lastEnqueuedAt: number; +}): Promise { + const debounceMs = Math.max(0, queue.debounceMs); + if (debounceMs <= 0) return Promise.resolve(); + return new Promise((resolve) => { + const check = () => { + const since = Date.now() - queue.lastEnqueuedAt; + if (since >= debounceMs) { + resolve(); + return; + } + setTimeout(check, debounceMs - since); + }; + check(); + }); +} + +export function buildQueueSummaryPrompt(params: { + state: QueueSummaryState; + noun: string; + title?: string; +}): string | undefined { + if (params.state.dropPolicy !== "summarize" || params.state.droppedCount <= 0) { + return undefined; + } + const noun = params.noun; + const title = + params.title ?? + `[Queue overflow] Dropped ${params.state.droppedCount} ${noun}${params.state.droppedCount === 1 ? "" : "s"} due to cap.`; + const lines = [title]; + if (params.state.summaryLines.length > 0) { + lines.push("Summary:"); + for (const line of params.state.summaryLines) { + lines.push(`- ${line}`); + } + } + params.state.droppedCount = 0; + params.state.summaryLines = []; + return lines.join("\n"); +} + +export function buildCollectPrompt(params: { + title: string; + items: T[]; + summary?: string; + renderItem: (item: T, index: number) => string; +}): string { + const blocks: string[] = [params.title]; + if (params.summary) blocks.push(params.summary); + params.items.forEach((item, idx) => { + blocks.push(params.renderItem(item, idx)); + }); + return blocks.join("\n\n"); +} + +export function hasCrossChannelItems( + items: T[], + resolveKey: (item: T) => { key?: string; cross?: boolean }, +): boolean { + const keys = new Set(); + let hasUnkeyed = false; + + for (const item of items) { + const resolved = resolveKey(item); + if (resolved.cross) return true; + if (!resolved.key) { + hasUnkeyed = true; + continue; + } + keys.add(resolved.key); + } + + if (keys.size === 0) return false; + if (hasUnkeyed) return true; + return keys.size > 1; +}