import { loadConfig } from "../config/config.js"; import { callGateway } from "../gateway/call.js"; import { onAgentEvent } from "../infra/agent-events.js"; import { type DeliveryContext, normalizeDeliveryContext } from "../utils/delivery-context.js"; import { runSubagentAnnounceFlow, type SubagentRunOutcome } from "./subagent-announce.js"; import { loadSubagentRegistryFromDisk, saveSubagentRegistryToDisk, } from "./subagent-registry.store.js"; import { resolveAgentTimeoutMs } from "./timeout.js"; export type SubagentRunRecord = { runId: string; childSessionKey: string; requesterSessionKey: string; requesterOrigin?: DeliveryContext; requesterDisplayKey: string; task: string; cleanup: "delete" | "keep"; label?: string; createdAt: number; startedAt?: number; endedAt?: number; outcome?: SubagentRunOutcome; archiveAtMs?: number; cleanupCompletedAt?: number; cleanupHandled?: boolean; }; const subagentRuns = new Map(); let sweeper: NodeJS.Timeout | null = null; let listenerStarted = false; let listenerStop: (() => void) | null = null; let restoreAttempted = false; function persistSubagentRuns() { try { saveSubagentRegistryToDisk(subagentRuns); } catch { // ignore persistence failures } } const resumedRuns = new Set(); function resumeSubagentRun(runId: string) { if (!runId || resumedRuns.has(runId)) return; const entry = subagentRuns.get(runId); if (!entry) return; if (entry.cleanupCompletedAt) return; if (typeof entry.endedAt === "number" && entry.endedAt > 0) { if (!beginSubagentCleanup(runId)) return; const requesterOrigin = normalizeDeliveryContext(entry.requesterOrigin); void runSubagentAnnounceFlow({ childSessionKey: entry.childSessionKey, childRunId: entry.runId, requesterSessionKey: entry.requesterSessionKey, requesterOrigin, requesterDisplayKey: entry.requesterDisplayKey, task: entry.task, timeoutMs: 30_000, cleanup: entry.cleanup, waitForCompletion: false, startedAt: entry.startedAt, endedAt: entry.endedAt, label: entry.label, outcome: entry.outcome, }).then((didAnnounce) => { finalizeSubagentCleanup(runId, entry.cleanup, didAnnounce); }); resumedRuns.add(runId); return; } // Wait for completion again after restart. const cfg = loadConfig(); const waitTimeoutMs = resolveSubagentWaitTimeoutMs(cfg, undefined); void waitForSubagentCompletion(runId, waitTimeoutMs); resumedRuns.add(runId); } function restoreSubagentRunsOnce() { if (restoreAttempted) return; restoreAttempted = true; try { const restored = loadSubagentRegistryFromDisk(); if (restored.size === 0) return; for (const [runId, entry] of restored.entries()) { if (!runId || !entry) continue; // Keep any newer in-memory entries. if (!subagentRuns.has(runId)) { subagentRuns.set(runId, entry); } } // Resume pending work. ensureListener(); if ([...subagentRuns.values()].some((entry) => entry.archiveAtMs)) { startSweeper(); } for (const runId of subagentRuns.keys()) { resumeSubagentRun(runId); } } catch { // ignore restore failures } } function resolveArchiveAfterMs(cfg?: ReturnType) { const config = cfg ?? loadConfig(); const minutes = config.agents?.defaults?.subagents?.archiveAfterMinutes ?? 60; if (!Number.isFinite(minutes) || minutes <= 0) return undefined; return Math.max(1, Math.floor(minutes)) * 60_000; } function resolveSubagentWaitTimeoutMs( cfg: ReturnType, runTimeoutSeconds?: number, ) { return resolveAgentTimeoutMs({ cfg, overrideSeconds: runTimeoutSeconds }); } function startSweeper() { if (sweeper) return; sweeper = setInterval(() => { void sweepSubagentRuns(); }, 60_000); sweeper.unref?.(); } function stopSweeper() { if (!sweeper) return; clearInterval(sweeper); sweeper = null; } async function sweepSubagentRuns() { const now = Date.now(); let mutated = false; for (const [runId, entry] of subagentRuns.entries()) { if (!entry.archiveAtMs || entry.archiveAtMs > now) continue; subagentRuns.delete(runId); mutated = true; try { await callGateway({ method: "sessions.delete", params: { key: entry.childSessionKey, deleteTranscript: true }, timeoutMs: 10_000, }); } catch { // ignore } } if (mutated) persistSubagentRuns(); if (subagentRuns.size === 0) stopSweeper(); } function ensureListener() { if (listenerStarted) { return; } listenerStarted = true; listenerStop = onAgentEvent((evt) => { if (!evt || evt.stream !== "lifecycle") return; const entry = subagentRuns.get(evt.runId); if (!entry) { return; } const phase = evt.data?.phase; if (phase === "start") { const startedAt = typeof evt.data?.startedAt === "number" ? (evt.data.startedAt as number) : undefined; if (startedAt) { entry.startedAt = startedAt; persistSubagentRuns(); } return; } if (phase !== "end" && phase !== "error") return; const endedAt = typeof evt.data?.endedAt === "number" ? (evt.data.endedAt as number) : Date.now(); entry.endedAt = endedAt; if (phase === "error") { const error = typeof evt.data?.error === "string" ? (evt.data.error as string) : undefined; entry.outcome = { status: "error", error }; } else { entry.outcome = { status: "ok" }; } persistSubagentRuns(); if (!beginSubagentCleanup(evt.runId)) { return; } const requesterOrigin = normalizeDeliveryContext(entry.requesterOrigin); void runSubagentAnnounceFlow({ childSessionKey: entry.childSessionKey, childRunId: entry.runId, requesterSessionKey: entry.requesterSessionKey, requesterOrigin, requesterDisplayKey: entry.requesterDisplayKey, task: entry.task, timeoutMs: 30_000, cleanup: entry.cleanup, waitForCompletion: false, startedAt: entry.startedAt, endedAt: entry.endedAt, label: entry.label, outcome: entry.outcome, }).then((didAnnounce) => { finalizeSubagentCleanup(evt.runId, entry.cleanup, didAnnounce); }); }); } function finalizeSubagentCleanup(runId: string, cleanup: "delete" | "keep", didAnnounce: boolean) { const entry = subagentRuns.get(runId); if (!entry) return; if (cleanup === "delete") { subagentRuns.delete(runId); persistSubagentRuns(); return; } if (!didAnnounce) { // Allow retry on the next wake if the announce failed. entry.cleanupHandled = false; persistSubagentRuns(); return; } entry.cleanupCompletedAt = Date.now(); persistSubagentRuns(); } function beginSubagentCleanup(runId: string) { const entry = subagentRuns.get(runId); if (!entry) return false; if (entry.cleanupCompletedAt) return false; if (entry.cleanupHandled) return false; entry.cleanupHandled = true; persistSubagentRuns(); return true; } export function registerSubagentRun(params: { runId: string; childSessionKey: string; requesterSessionKey: string; requesterOrigin?: DeliveryContext; requesterDisplayKey: string; task: string; cleanup: "delete" | "keep"; label?: string; runTimeoutSeconds?: number; }) { const now = Date.now(); const cfg = loadConfig(); const archiveAfterMs = resolveArchiveAfterMs(cfg); const archiveAtMs = archiveAfterMs ? now + archiveAfterMs : undefined; const waitTimeoutMs = resolveSubagentWaitTimeoutMs(cfg, params.runTimeoutSeconds); const requesterOrigin = normalizeDeliveryContext(params.requesterOrigin); subagentRuns.set(params.runId, { runId: params.runId, childSessionKey: params.childSessionKey, requesterSessionKey: params.requesterSessionKey, requesterOrigin, requesterDisplayKey: params.requesterDisplayKey, task: params.task, cleanup: params.cleanup, label: params.label, createdAt: now, startedAt: now, archiveAtMs, cleanupHandled: false, }); ensureListener(); persistSubagentRuns(); if (archiveAfterMs) startSweeper(); // Wait for subagent completion via gateway RPC (cross-process). // The in-process lifecycle listener is a fallback for embedded runs. void waitForSubagentCompletion(params.runId, waitTimeoutMs); } async function waitForSubagentCompletion(runId: string, waitTimeoutMs: number) { try { const timeoutMs = Math.max(1, Math.floor(waitTimeoutMs)); const wait = (await callGateway({ method: "agent.wait", params: { runId, timeoutMs, }, timeoutMs: timeoutMs + 10_000, })) as { status?: string; startedAt?: number; endedAt?: number; error?: string }; if (wait?.status !== "ok" && wait?.status !== "error") return; const entry = subagentRuns.get(runId); if (!entry) return; let mutated = false; if (typeof wait.startedAt === "number") { entry.startedAt = wait.startedAt; mutated = true; } if (typeof wait.endedAt === "number") { entry.endedAt = wait.endedAt; mutated = true; } if (!entry.endedAt) { entry.endedAt = Date.now(); mutated = true; } entry.outcome = wait.status === "error" ? { status: "error", error: wait.error } : { status: "ok" }; mutated = true; if (mutated) persistSubagentRuns(); if (!beginSubagentCleanup(runId)) return; const requesterOrigin = normalizeDeliveryContext(entry.requesterOrigin); void runSubagentAnnounceFlow({ childSessionKey: entry.childSessionKey, childRunId: entry.runId, requesterSessionKey: entry.requesterSessionKey, requesterOrigin, requesterDisplayKey: entry.requesterDisplayKey, task: entry.task, timeoutMs: 30_000, cleanup: entry.cleanup, waitForCompletion: false, startedAt: entry.startedAt, endedAt: entry.endedAt, label: entry.label, outcome: entry.outcome, }).then((didAnnounce) => { finalizeSubagentCleanup(runId, entry.cleanup, didAnnounce); }); } catch { // ignore } } export function resetSubagentRegistryForTests() { subagentRuns.clear(); resumedRuns.clear(); stopSweeper(); restoreAttempted = false; if (listenerStop) { listenerStop(); listenerStop = null; } listenerStarted = false; persistSubagentRuns(); } export function releaseSubagentRun(runId: string) { const didDelete = subagentRuns.delete(runId); if (didDelete) persistSubagentRuns(); if (subagentRuns.size === 0) stopSweeper(); } export function listSubagentRunsForRequester(requesterSessionKey: string): SubagentRunRecord[] { const key = requesterSessionKey.trim(); if (!key) return []; return [...subagentRuns.values()].filter((entry) => entry.requesterSessionKey === key); } export function initSubagentRegistry() { restoreSubagentRunsOnce(); }