From 19016f16e05da594b605564e8f706f177620de5a Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Sat, 17 Jan 2026 01:44:09 +0000 Subject: [PATCH] fix: queue subagent announce delivery --- src/agents/subagent-announce.format.test.ts | 77 ++++- src/agents/subagent-announce.ts | 339 +++++++++++++++++++- 2 files changed, 409 insertions(+), 7 deletions(-) diff --git a/src/agents/subagent-announce.format.test.ts b/src/agents/subagent-announce.format.test.ts index b8c2542fa..5d7484e8a 100644 --- a/src/agents/subagent-announce.format.test.ts +++ b/src/agents/subagent-announce.format.test.ts @@ -1,6 +1,19 @@ import { beforeEach, describe, expect, it, vi } from "vitest"; const agentSpy = vi.fn(async () => ({ runId: "run-main", status: "ok" })); +const embeddedRunMock = { + isEmbeddedPiRunActive: vi.fn(() => false), + isEmbeddedPiRunStreaming: vi.fn(() => false), + queueEmbeddedPiMessage: vi.fn(() => false), + waitForEmbeddedPiRunEnd: vi.fn(async () => true), +}; +let sessionStore: Record> = {}; +let configOverride: ReturnType<(typeof import("../config/config.js"))["loadConfig"]> = { + session: { + mainKey: "main", + scope: "per-sender", + }, +}; vi.mock("../gateway/call.js", () => ({ callGateway: vi.fn(async (req: unknown) => { @@ -22,20 +35,36 @@ vi.mock("./tools/agent-step.js", () => ({ })); vi.mock("../config/sessions.js", () => ({ - loadSessionStore: vi.fn(() => ({})), + loadSessionStore: vi.fn(() => sessionStore), resolveAgentIdFromSessionKey: () => "main", resolveStorePath: () => "/tmp/sessions.json", + resolveMainSessionKey: () => "agent:main:main", })); -vi.mock("../config/config.js", () => ({ - loadConfig: () => ({ - session: { mainKey: "agent:main:main" }, - }), -})); +vi.mock("./pi-embedded.js", () => embeddedRunMock); + +vi.mock("../config/config.js", async (importOriginal) => { + const actual = await importOriginal(); + return { + ...actual, + loadConfig: () => configOverride, + }; +}); describe("subagent announce formatting", () => { beforeEach(() => { agentSpy.mockClear(); + embeddedRunMock.isEmbeddedPiRunActive.mockReset().mockReturnValue(false); + embeddedRunMock.isEmbeddedPiRunStreaming.mockReset().mockReturnValue(false); + embeddedRunMock.queueEmbeddedPiMessage.mockReset().mockReturnValue(false); + embeddedRunMock.waitForEmbeddedPiRunEnd.mockReset().mockResolvedValue(true); + sessionStore = {}; + configOverride = { + session: { + mainKey: "main", + scope: "per-sender", + }, + }; }); it("sends instructional message to main agent with status and findings", async () => { @@ -88,4 +117,40 @@ describe("subagent announce formatting", () => { const msg = call?.params?.message as string; expect(msg).toContain("completed successfully"); }); + + it("steers announcements into an active run when queue mode is steer", async () => { + const { runSubagentAnnounceFlow } = await import("./subagent-announce.js"); + embeddedRunMock.isEmbeddedPiRunActive.mockReturnValue(true); + embeddedRunMock.isEmbeddedPiRunStreaming.mockReturnValue(true); + embeddedRunMock.queueEmbeddedPiMessage.mockReturnValue(true); + sessionStore = { + "agent:main:main": { + sessionId: "session-123", + lastChannel: "whatsapp", + lastTo: "+1555", + queueMode: "steer", + }, + }; + + const didAnnounce = await runSubagentAnnounceFlow({ + childSessionKey: "agent:main:subagent:test", + childRunId: "run-789", + requesterSessionKey: "main", + requesterDisplayKey: "main", + task: "do thing", + timeoutMs: 1000, + cleanup: "keep", + waitForCompletion: false, + startedAt: 10, + endedAt: 20, + outcome: { status: "ok" }, + }); + + expect(didAnnounce).toBe(true); + expect(embeddedRunMock.queueEmbeddedPiMessage).toHaveBeenCalledWith( + "session-123", + expect.stringContaining("background task"), + ); + expect(agentSpy).not.toHaveBeenCalled(); + }); }); diff --git a/src/agents/subagent-announce.ts b/src/agents/subagent-announce.ts index 71615306a..dfff96966 100644 --- a/src/agents/subagent-announce.ts +++ b/src/agents/subagent-announce.ts @@ -5,9 +5,21 @@ import { loadConfig } from "../config/config.js"; import { loadSessionStore, resolveAgentIdFromSessionKey, + resolveMainSessionKey, resolveStorePath, } from "../config/sessions.js"; +import { normalizeMainKey } from "../routing/session-key.js"; +import { + resolveQueueSettings, + type QueueDropPolicy, + type QueueMode, +} from "../auto-reply/reply/queue.js"; import { callGateway } from "../gateway/call.js"; +import { defaultRuntime } from "../runtime.js"; +import { + isEmbeddedPiRunActive, + queueEmbeddedPiMessage, +} from "./pi-embedded.js"; import { readLatestAssistantReply } from "./tools/agent-step.js"; function formatDurationShort(valueMs?: number) { @@ -75,6 +87,315 @@ async function waitForSessionUsage(params: { sessionKey: string }) { return { entry, storePath }; } +type AnnounceQueueItem = { + prompt: string; + summaryLine?: string; + enqueuedAt: number; + sessionKey: string; + originatingChannel?: string; + originatingTo?: string; + originatingAccountId?: string; +}; + +type AnnounceQueueState = { + items: AnnounceQueueItem[]; + draining: boolean; + lastEnqueuedAt: number; + mode: QueueMode; + debounceMs: number; + cap: number; + dropPolicy: QueueDropPolicy; + droppedCount: number; + summaryLines: string[]; +}; + +const ANNOUNCE_QUEUES = new Map(); + +function getAnnounceQueue(key: string, settings: { mode: QueueMode; debounceMs?: number; cap?: number; dropPolicy?: QueueDropPolicy }) { + const existing = ANNOUNCE_QUEUES.get(key); + if (existing) { + existing.mode = settings.mode; + existing.debounceMs = + typeof settings.debounceMs === "number" + ? Math.max(0, settings.debounceMs) + : existing.debounceMs; + existing.cap = + typeof settings.cap === "number" && settings.cap > 0 + ? Math.floor(settings.cap) + : existing.cap; + existing.dropPolicy = settings.dropPolicy ?? existing.dropPolicy; + return existing; + } + const created: AnnounceQueueState = { + items: [], + draining: false, + lastEnqueuedAt: 0, + mode: settings.mode, + debounceMs: + typeof settings.debounceMs === "number" ? Math.max(0, settings.debounceMs) : 1000, + cap: typeof settings.cap === "number" && settings.cap > 0 ? Math.floor(settings.cap) : 20, + dropPolicy: settings.dropPolicy ?? "summarize", + droppedCount: 0, + summaryLines: [], + }; + ANNOUNCE_QUEUES.set(key, created); + return created; +} + +function elideText(text: string, limit = 140): string { + if (text.length <= limit) return text; + return `${text.slice(0, Math.max(0, limit - 1)).trimEnd()}…`; +} + +function buildQueueSummaryLine(item: AnnounceQueueItem): string { + const base = item.summaryLine?.trim() || item.prompt.trim(); + const cleaned = base.replace(/\s+/g, " ").trim(); + return elideText(cleaned, 160); +} + +function enqueueAnnounce( + key: string, + item: AnnounceQueueItem, + settings: { mode: QueueMode; debounceMs?: number; cap?: number; dropPolicy?: QueueDropPolicy }, +): boolean { + const queue = getAnnounceQueue(key, settings); + queue.lastEnqueuedAt = Date.now(); + + const cap = queue.cap; + if (cap > 0 && queue.items.length >= cap) { + if (queue.dropPolicy === "new") { + return false; + } + const dropCount = queue.items.length - cap + 1; + const dropped = queue.items.splice(0, dropCount); + if (queue.dropPolicy === "summarize") { + for (const droppedItem of dropped) { + queue.droppedCount += 1; + queue.summaryLines.push(buildQueueSummaryLine(droppedItem)); + } + while (queue.summaryLines.length > cap) queue.summaryLines.shift(); + } + } + + queue.items.push(item); + return true; +} + +async function waitForQueueDebounce(queue: { debounceMs: number; lastEnqueuedAt: number }) { + const debounceMs = Math.max(0, queue.debounceMs); + if (debounceMs <= 0) return; + while (true) { + const since = Date.now() - queue.lastEnqueuedAt; + if (since >= debounceMs) return; + await new Promise((resolve) => setTimeout(resolve, debounceMs - since)); + } +} + +function buildSummaryPrompt(queue: { + dropPolicy: QueueDropPolicy; + droppedCount: number; + summaryLines: string[]; +}): string | undefined { + if (queue.dropPolicy !== "summarize" || queue.droppedCount <= 0) { + return undefined; + } + const lines = [ + `[Queue overflow] Dropped ${queue.droppedCount} announce${queue.droppedCount === 1 ? "" : "s"} due to cap.`, + ]; + if (queue.summaryLines.length > 0) { + lines.push("Summary:"); + for (const line of queue.summaryLines) { + lines.push(`- ${line}`); + } + } + queue.droppedCount = 0; + queue.summaryLines = []; + return lines.join("\n"); +} + +function buildCollectPrompt(items: AnnounceQueueItem[], summary?: string): string { + const blocks: string[] = ["[Queued announce messages while agent was busy]"]; + if (summary) blocks.push(summary); + items.forEach((item, idx) => { + blocks.push(`---\nQueued #${idx + 1}\n${item.prompt}`.trim()); + }); + return blocks.join("\n\n"); +} + +function hasCrossChannelItems(items: AnnounceQueueItem[]): boolean { + const keys = new Set(); + let hasUnkeyed = false; + for (const item of items) { + const channel = item.originatingChannel; + const to = item.originatingTo; + const accountId = item.originatingAccountId; + if (!channel && !to && !accountId) { + hasUnkeyed = true; + continue; + } + if (!channel || !to) { + return true; + } + keys.add([channel, to, accountId || ""].join("|")); + } + if (keys.size === 0) return false; + if (hasUnkeyed) return true; + return keys.size > 1; +} + +function scheduleAnnounceDrain(key: string) { + const queue = ANNOUNCE_QUEUES.get(key); + if (!queue || queue.draining) return; + queue.draining = true; + void (async () => { + try { + let forceIndividualCollect = false; + while (queue.items.length > 0 || queue.droppedCount > 0) { + await waitForQueueDebounce(queue); + if (queue.mode === "collect") { + if (forceIndividualCollect) { + const next = queue.items.shift(); + if (!next) break; + await sendAnnounce(next); + continue; + } + const isCrossChannel = hasCrossChannelItems(queue.items); + if (isCrossChannel) { + forceIndividualCollect = true; + const next = queue.items.shift(); + if (!next) break; + await sendAnnounce(next); + continue; + } + const items = queue.items.splice(0, queue.items.length); + const summary = buildSummaryPrompt(queue); + const prompt = buildCollectPrompt(items, summary); + const last = items.at(-1); + if (!last) break; + await sendAnnounce({ ...last, prompt }); + continue; + } + + const summaryPrompt = buildSummaryPrompt(queue); + if (summaryPrompt) { + const next = queue.items.shift(); + if (!next) break; + await sendAnnounce({ ...next, prompt: summaryPrompt }); + continue; + } + + const next = queue.items.shift(); + if (!next) break; + await sendAnnounce(next); + } + } catch (err) { + defaultRuntime.error?.(`announce queue drain failed for ${key}: ${String(err)}`); + } finally { + queue.draining = false; + if (queue.items.length === 0 && queue.droppedCount === 0) { + ANNOUNCE_QUEUES.delete(key); + } else { + scheduleAnnounceDrain(key); + } + } + })(); +} + +async function sendAnnounce(item: AnnounceQueueItem) { + await callGateway({ + method: "agent", + params: { + sessionKey: item.sessionKey, + message: item.prompt, + deliver: true, + idempotencyKey: crypto.randomUUID(), + }, + expectFinal: true, + timeoutMs: 60_000, + }); +} + +function resolveRequesterStoreKey( + cfg: ReturnType, + requesterSessionKey: string, +): string { + const raw = requesterSessionKey.trim(); + if (!raw) return raw; + if (raw === "global" || raw === "unknown") return raw; + if (raw.startsWith("agent:")) return raw; + const mainKey = normalizeMainKey(cfg.session?.mainKey); + if (raw === "main" || raw === mainKey) { + return resolveMainSessionKey(cfg); + } + const agentId = resolveAgentIdFromSessionKey(raw); + return `agent:${agentId}:${raw}`; +} + +function loadRequesterSessionEntry(requesterSessionKey: string) { + const cfg = loadConfig(); + const canonicalKey = resolveRequesterStoreKey(cfg, requesterSessionKey); + const agentId = resolveAgentIdFromSessionKey(canonicalKey); + const storePath = resolveStorePath(cfg.session?.store, { agentId }); + const store = loadSessionStore(storePath); + const legacyKey = canonicalKey.startsWith("agent:") + ? canonicalKey.split(":").slice(2).join(":") + : undefined; + const entry = + store[canonicalKey] ?? + store[requesterSessionKey] ?? + (legacyKey ? store[legacyKey] : undefined); + return { cfg, entry, canonicalKey }; +} + +async function maybeQueueSubagentAnnounce(params: { + requesterSessionKey: string; + triggerMessage: string; + summaryLine?: string; +}): Promise<"steered" | "queued" | "none"> { + const { cfg, entry } = loadRequesterSessionEntry(params.requesterSessionKey); + const canonicalKey = resolveRequesterStoreKey(cfg, params.requesterSessionKey); + const sessionId = entry?.sessionId; + if (!sessionId) return "none"; + + const queueSettings = resolveQueueSettings({ + cfg, + channel: entry?.channel ?? entry?.lastChannel, + sessionEntry: entry, + }); + const isActive = isEmbeddedPiRunActive(sessionId); + + const shouldSteer = queueSettings.mode === "steer" || queueSettings.mode === "steer-backlog"; + if (shouldSteer) { + const steered = queueEmbeddedPiMessage(sessionId, params.triggerMessage); + if (steered) return "steered"; + } + + const shouldFollowup = + queueSettings.mode === "followup" || + queueSettings.mode === "collect" || + queueSettings.mode === "steer-backlog" || + queueSettings.mode === "interrupt"; + if (isActive && (shouldFollowup || queueSettings.mode === "steer")) { + enqueueAnnounce( + canonicalKey, + { + prompt: params.triggerMessage, + summaryLine: params.summaryLine, + enqueuedAt: Date.now(), + sessionKey: canonicalKey, + originatingChannel: entry?.lastChannel, + originatingTo: entry?.lastTo, + originatingAccountId: entry?.lastAccountId, + }, + queueSettings, + ); + scheduleAnnounceDrain(canonicalKey); + return "queued"; + } + + return "none"; +} + async function buildSubagentStatsLine(params: { sessionKey: string; startedAt?: number; @@ -278,6 +599,20 @@ export async function runSubagentAnnounceFlow(params: { "You can respond with NO_REPLY if no announcement is needed (e.g., internal task with no user-facing result).", ].join("\n"); + const queued = await maybeQueueSubagentAnnounce({ + requesterSessionKey: params.requesterSessionKey, + triggerMessage, + summaryLine: taskLabel, + }); + if (queued === "steered") { + didAnnounce = true; + return true; + } + if (queued === "queued") { + didAnnounce = true; + return true; + } + // Send to main agent - it will respond in its own voice await callGateway({ method: "agent", @@ -287,11 +622,13 @@ export async function runSubagentAnnounceFlow(params: { deliver: true, idempotencyKey: crypto.randomUUID(), }, + expectFinal: true, timeoutMs: 60_000, }); didAnnounce = true; - } catch { + } catch (err) { + defaultRuntime.error?.(`Subagent announce failed: ${String(err)}`); // Best-effort follow-ups; ignore failures to avoid breaking the caller response. } finally { // Patch label after all writes complete