diff --git a/src/agents/subagent-announce-queue.ts b/src/agents/subagent-announce-queue.ts new file mode 100644 index 000000000..15dbb087c --- /dev/null +++ b/src/agents/subagent-announce-queue.ts @@ -0,0 +1,241 @@ +import { type QueueDropPolicy, type QueueMode } from "../auto-reply/reply/queue.js"; +import { defaultRuntime } from "../runtime.js"; +import { + type DeliveryContext, + deliveryContextKey, + normalizeDeliveryContext, +} from "../utils/delivery-context.js"; + +export type AnnounceQueueItem = { + prompt: string; + summaryLine?: string; + enqueuedAt: number; + sessionKey: string; + origin?: DeliveryContext; + originKey?: string; +}; + +export type AnnounceQueueSettings = { + mode: QueueMode; + debounceMs?: number; + cap?: number; + dropPolicy?: QueueDropPolicy; +}; + +type AnnounceQueueState = { + items: AnnounceQueueItem[]; + draining: boolean; + lastEnqueuedAt: number; + mode: QueueMode; + debounceMs: number; + cap: number; + dropPolicy: QueueDropPolicy; + droppedCount: number; + summaryLines: string[]; + send: (item: AnnounceQueueItem) => Promise; +}; + +const ANNOUNCE_QUEUES = new Map(); + +function getAnnounceQueue( + key: string, + settings: AnnounceQueueSettings, + send: (item: AnnounceQueueItem) => Promise, +) { + const existing = ANNOUNCE_QUEUES.get(key); + if (existing) { + existing.mode = settings.mode; + existing.debounceMs = + typeof settings.debounceMs === "number" + ? Math.max(0, settings.debounceMs) + : existing.debounceMs; + existing.cap = + typeof settings.cap === "number" && settings.cap > 0 + ? Math.floor(settings.cap) + : existing.cap; + existing.dropPolicy = settings.dropPolicy ?? existing.dropPolicy; + existing.send = send; + return existing; + } + const created: AnnounceQueueState = { + items: [], + draining: false, + lastEnqueuedAt: 0, + mode: settings.mode, + debounceMs: typeof settings.debounceMs === "number" ? Math.max(0, settings.debounceMs) : 1000, + cap: typeof settings.cap === "number" && settings.cap > 0 ? Math.floor(settings.cap) : 20, + dropPolicy: settings.dropPolicy ?? "summarize", + droppedCount: 0, + summaryLines: [], + send, + }; + ANNOUNCE_QUEUES.set(key, created); + return created; +} + +function elideText(text: string, limit = 140): string { + if (text.length <= limit) return text; + return `${text.slice(0, Math.max(0, limit - 1)).trimEnd()}…`; +} + +function buildQueueSummaryLine(item: AnnounceQueueItem): string { + const base = item.summaryLine?.trim() || item.prompt.trim(); + const cleaned = base.replace(/\s+/g, " ").trim(); + return elideText(cleaned, 160); +} + +function waitForQueueDebounce(queue: { debounceMs: number; lastEnqueuedAt: number }) { + const debounceMs = Math.max(0, queue.debounceMs); + if (debounceMs <= 0) return Promise.resolve(); + return new Promise((resolve) => { + const check = () => { + const since = Date.now() - queue.lastEnqueuedAt; + if (since >= debounceMs) { + resolve(); + return; + } + setTimeout(check, debounceMs - since); + }; + check(); + }); +} + +function buildSummaryPrompt(queue: { + dropPolicy: QueueDropPolicy; + droppedCount: number; + summaryLines: string[]; +}): string | undefined { + if (queue.dropPolicy !== "summarize" || queue.droppedCount <= 0) { + return undefined; + } + const lines = [ + `[Queue overflow] Dropped ${queue.droppedCount} announce${queue.droppedCount === 1 ? "" : "s"} due to cap.`, + ]; + if (queue.summaryLines.length > 0) { + lines.push("Summary:"); + for (const line of queue.summaryLines) { + lines.push(`- ${line}`); + } + } + queue.droppedCount = 0; + queue.summaryLines = []; + return lines.join("\n"); +} + +function buildCollectPrompt(items: AnnounceQueueItem[], summary?: string): string { + const blocks: string[] = ["[Queued announce messages while agent was busy]"]; + if (summary) blocks.push(summary); + items.forEach((item, idx) => { + blocks.push(`---\nQueued #${idx + 1}\n${item.prompt}`.trim()); + }); + return blocks.join("\n\n"); +} + +function hasCrossChannelItems(items: AnnounceQueueItem[]): boolean { + const keys = new Set(); + let hasUnkeyed = false; + for (const item of items) { + if (!item.origin) { + hasUnkeyed = true; + continue; + } + if (!item.originKey) { + return true; + } + keys.add(item.originKey); + } + if (keys.size === 0) return false; + if (hasUnkeyed) return true; + return keys.size > 1; +} + +function scheduleAnnounceDrain(key: string) { + const queue = ANNOUNCE_QUEUES.get(key); + if (!queue || queue.draining) return; + queue.draining = true; + void (async () => { + try { + let forceIndividualCollect = false; + while (queue.items.length > 0 || queue.droppedCount > 0) { + await waitForQueueDebounce(queue); + if (queue.mode === "collect") { + if (forceIndividualCollect) { + const next = queue.items.shift(); + if (!next) break; + await queue.send(next); + continue; + } + const isCrossChannel = hasCrossChannelItems(queue.items); + if (isCrossChannel) { + forceIndividualCollect = true; + const next = queue.items.shift(); + if (!next) break; + await queue.send(next); + continue; + } + const items = queue.items.splice(0, queue.items.length); + const summary = buildSummaryPrompt(queue); + const prompt = buildCollectPrompt(items, summary); + const last = items.at(-1); + if (!last) break; + await queue.send({ ...last, prompt }); + continue; + } + + const summaryPrompt = buildSummaryPrompt(queue); + if (summaryPrompt) { + const next = queue.items.shift(); + if (!next) break; + await queue.send({ ...next, prompt: summaryPrompt }); + continue; + } + + const next = queue.items.shift(); + if (!next) break; + await queue.send(next); + } + } catch (err) { + defaultRuntime.error?.(`announce queue drain failed for ${key}: ${String(err)}`); + } finally { + queue.draining = false; + if (queue.items.length === 0 && queue.droppedCount === 0) { + ANNOUNCE_QUEUES.delete(key); + } else { + scheduleAnnounceDrain(key); + } + } + })(); +} + +export function enqueueAnnounce(params: { + key: string; + item: AnnounceQueueItem; + settings: AnnounceQueueSettings; + send: (item: AnnounceQueueItem) => Promise; +}): boolean { + const queue = getAnnounceQueue(params.key, params.settings, params.send); + queue.lastEnqueuedAt = Date.now(); + + const cap = queue.cap; + if (cap > 0 && queue.items.length >= cap) { + if (queue.dropPolicy === "new") { + scheduleAnnounceDrain(params.key); + return false; + } + const dropCount = queue.items.length - cap + 1; + const dropped = queue.items.splice(0, dropCount); + if (queue.dropPolicy === "summarize") { + for (const droppedItem of dropped) { + queue.droppedCount += 1; + queue.summaryLines.push(buildQueueSummaryLine(droppedItem)); + } + while (queue.summaryLines.length > cap) queue.summaryLines.shift(); + } + } + + const origin = normalizeDeliveryContext(params.item.origin); + const originKey = deliveryContextKey(origin); + queue.items.push({ ...params.item, origin, originKey }); + scheduleAnnounceDrain(params.key); + return true; +} diff --git a/src/agents/subagent-announce.ts b/src/agents/subagent-announce.ts index 6f0c3c11f..e9ef75e0c 100644 --- a/src/agents/subagent-announce.ts +++ b/src/agents/subagent-announce.ts @@ -11,19 +11,17 @@ import { import { normalizeMainKey } from "../routing/session-key.js"; import { resolveQueueSettings, - type QueueDropPolicy, - type QueueMode, } from "../auto-reply/reply/queue.js"; import { callGateway } from "../gateway/call.js"; import { defaultRuntime } from "../runtime.js"; import { type DeliveryContext, deliveryContextFromSession, - deliveryContextKey, mergeDeliveryContext, normalizeDeliveryContext, } from "../utils/delivery-context.js"; import { isEmbeddedPiRunActive, queueEmbeddedPiMessage } from "./pi-embedded.js"; +import { type AnnounceQueueItem, enqueueAnnounce } from "./subagent-announce-queue.js"; import { readLatestAssistantReply } from "./tools/agent-step.js"; function formatDurationShort(valueMs?: number) { @@ -91,29 +89,6 @@ async function waitForSessionUsage(params: { sessionKey: string }) { return { entry, storePath }; } -type AnnounceQueueItem = { - prompt: string; - summaryLine?: string; - enqueuedAt: number; - sessionKey: string; - origin?: DeliveryContext; - originKey?: string; -}; - -type AnnounceQueueState = { - items: AnnounceQueueItem[]; - draining: boolean; - lastEnqueuedAt: number; - mode: QueueMode; - debounceMs: number; - cap: number; - dropPolicy: QueueDropPolicy; - droppedCount: number; - summaryLines: string[]; -}; - -const ANNOUNCE_QUEUES = new Map(); - type DeliveryContextSource = Parameters[0]; function resolveAnnounceOrigin( @@ -123,197 +98,6 @@ function resolveAnnounceOrigin( return mergeDeliveryContext(deliveryContextFromSession(entry), requesterOrigin); } -function getAnnounceQueue( - key: string, - settings: { mode: QueueMode; debounceMs?: number; cap?: number; dropPolicy?: QueueDropPolicy }, -) { - const existing = ANNOUNCE_QUEUES.get(key); - if (existing) { - existing.mode = settings.mode; - existing.debounceMs = - typeof settings.debounceMs === "number" - ? Math.max(0, settings.debounceMs) - : existing.debounceMs; - existing.cap = - typeof settings.cap === "number" && settings.cap > 0 - ? Math.floor(settings.cap) - : existing.cap; - existing.dropPolicy = settings.dropPolicy ?? existing.dropPolicy; - return existing; - } - const created: AnnounceQueueState = { - items: [], - draining: false, - lastEnqueuedAt: 0, - mode: settings.mode, - debounceMs: typeof settings.debounceMs === "number" ? Math.max(0, settings.debounceMs) : 1000, - cap: typeof settings.cap === "number" && settings.cap > 0 ? Math.floor(settings.cap) : 20, - dropPolicy: settings.dropPolicy ?? "summarize", - droppedCount: 0, - summaryLines: [], - }; - ANNOUNCE_QUEUES.set(key, created); - return created; -} - -function elideText(text: string, limit = 140): string { - if (text.length <= limit) return text; - return `${text.slice(0, Math.max(0, limit - 1)).trimEnd()}…`; -} - -function buildQueueSummaryLine(item: AnnounceQueueItem): string { - const base = item.summaryLine?.trim() || item.prompt.trim(); - const cleaned = base.replace(/\s+/g, " ").trim(); - return elideText(cleaned, 160); -} - -function enqueueAnnounce( - key: string, - item: AnnounceQueueItem, - settings: { mode: QueueMode; debounceMs?: number; cap?: number; dropPolicy?: QueueDropPolicy }, -): boolean { - const queue = getAnnounceQueue(key, settings); - queue.lastEnqueuedAt = Date.now(); - - const cap = queue.cap; - if (cap > 0 && queue.items.length >= cap) { - if (queue.dropPolicy === "new") { - return false; - } - const dropCount = queue.items.length - cap + 1; - const dropped = queue.items.splice(0, dropCount); - if (queue.dropPolicy === "summarize") { - for (const droppedItem of dropped) { - queue.droppedCount += 1; - queue.summaryLines.push(buildQueueSummaryLine(droppedItem)); - } - while (queue.summaryLines.length > cap) queue.summaryLines.shift(); - } - } - - const origin = normalizeDeliveryContext(item.origin); - const originKey = deliveryContextKey(origin); - queue.items.push({ ...item, origin, originKey }); - return true; -} - -async function waitForQueueDebounce(queue: { debounceMs: number; lastEnqueuedAt: number }) { - const debounceMs = Math.max(0, queue.debounceMs); - if (debounceMs <= 0) return; - while (true) { - const since = Date.now() - queue.lastEnqueuedAt; - if (since >= debounceMs) return; - await new Promise((resolve) => setTimeout(resolve, debounceMs - since)); - } -} - -function buildSummaryPrompt(queue: { - dropPolicy: QueueDropPolicy; - droppedCount: number; - summaryLines: string[]; -}): string | undefined { - if (queue.dropPolicy !== "summarize" || queue.droppedCount <= 0) { - return undefined; - } - const lines = [ - `[Queue overflow] Dropped ${queue.droppedCount} announce${queue.droppedCount === 1 ? "" : "s"} due to cap.`, - ]; - if (queue.summaryLines.length > 0) { - lines.push("Summary:"); - for (const line of queue.summaryLines) { - lines.push(`- ${line}`); - } - } - queue.droppedCount = 0; - queue.summaryLines = []; - return lines.join("\n"); -} - -function buildCollectPrompt(items: AnnounceQueueItem[], summary?: string): string { - const blocks: string[] = ["[Queued announce messages while agent was busy]"]; - if (summary) blocks.push(summary); - items.forEach((item, idx) => { - blocks.push(`---\nQueued #${idx + 1}\n${item.prompt}`.trim()); - }); - return blocks.join("\n\n"); -} - -function hasCrossChannelItems(items: AnnounceQueueItem[]): boolean { - const keys = new Set(); - let hasUnkeyed = false; - for (const item of items) { - if (!item.origin) { - hasUnkeyed = true; - continue; - } - if (!item.originKey) { - return true; - } - keys.add(item.originKey); - } - if (keys.size === 0) return false; - if (hasUnkeyed) return true; - return keys.size > 1; -} - -function scheduleAnnounceDrain(key: string) { - const queue = ANNOUNCE_QUEUES.get(key); - if (!queue || queue.draining) return; - queue.draining = true; - void (async () => { - try { - let forceIndividualCollect = false; - while (queue.items.length > 0 || queue.droppedCount > 0) { - await waitForQueueDebounce(queue); - if (queue.mode === "collect") { - if (forceIndividualCollect) { - const next = queue.items.shift(); - if (!next) break; - await sendAnnounce(next); - continue; - } - const isCrossChannel = hasCrossChannelItems(queue.items); - if (isCrossChannel) { - forceIndividualCollect = true; - const next = queue.items.shift(); - if (!next) break; - await sendAnnounce(next); - continue; - } - const items = queue.items.splice(0, queue.items.length); - const summary = buildSummaryPrompt(queue); - const prompt = buildCollectPrompt(items, summary); - const last = items.at(-1); - if (!last) break; - await sendAnnounce({ ...last, prompt }); - continue; - } - - const summaryPrompt = buildSummaryPrompt(queue); - if (summaryPrompt) { - const next = queue.items.shift(); - if (!next) break; - await sendAnnounce({ ...next, prompt: summaryPrompt }); - continue; - } - - const next = queue.items.shift(); - if (!next) break; - await sendAnnounce(next); - } - } catch (err) { - defaultRuntime.error?.(`announce queue drain failed for ${key}: ${String(err)}`); - } finally { - queue.draining = false; - if (queue.items.length === 0 && queue.droppedCount === 0) { - ANNOUNCE_QUEUES.delete(key); - } else { - scheduleAnnounceDrain(key); - } - } - })(); -} - async function sendAnnounce(item: AnnounceQueueItem) { const origin = item.origin; await callGateway({ @@ -393,18 +177,18 @@ async function maybeQueueSubagentAnnounce(params: { queueSettings.mode === "interrupt"; if (isActive && (shouldFollowup || queueSettings.mode === "steer")) { const origin = resolveAnnounceOrigin(entry, params.requesterOrigin); - enqueueAnnounce( - canonicalKey, - { + enqueueAnnounce({ + key: canonicalKey, + item: { prompt: params.triggerMessage, summaryLine: params.summaryLine, enqueuedAt: Date.now(), sessionKey: canonicalKey, origin, }, - queueSettings, - ); - scheduleAnnounceDrain(canonicalKey); + settings: queueSettings, + send: sendAnnounce, + }); return "queued"; }