From b071f73fef220fdec83918dfb6b23e57a94d1e33 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Tue, 13 Jan 2026 10:10:15 +0000 Subject: [PATCH] fix: resume subagent registry safely (#831) (thanks @roshanasingh4) --- CHANGELOG.md | 1 + src/agents/subagent-announce.ts | 11 +-- .../subagent-registry.persistence.test.ts | 45 +++++++++++- src/agents/subagent-registry.store.ts | 23 +++++-- src/agents/subagent-registry.ts | 68 ++++++++++++------- src/gateway/server.ts | 2 + 6 files changed, 115 insertions(+), 35 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 17843b5ab..58ff1f56a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,7 @@ ### Fixes - Packaging: include `dist/memory/**` in the npm tarball (fixes `ERR_MODULE_NOT_FOUND` for `dist/memory/index.js`). +- Agents: persist sub-agent registry across gateway restarts and resume announce flow safely. (#831) — thanks @roshanasingh4. ## 2026.1.12-1 diff --git a/src/agents/subagent-announce.ts b/src/agents/subagent-announce.ts index 088e4883a..cae87cd64 100644 --- a/src/agents/subagent-announce.ts +++ b/src/agents/subagent-announce.ts @@ -236,7 +236,8 @@ export async function runSubagentAnnounceFlow(params: { startedAt?: number; endedAt?: number; label?: string; -}) { +}): Promise { + let didAnnounce = false; try { let reply = params.roundOneReply; if (!reply && params.waitForCompletion !== false) { @@ -249,7 +250,7 @@ export async function runSubagentAnnounceFlow(params: { }, timeoutMs: waitMs + 2000, })) as { status?: string }; - if (wait?.status !== "ok") return; + if (wait?.status !== "ok") return false; reply = await readLatestAssistantReply({ sessionKey: params.childSessionKey, }); @@ -265,7 +266,7 @@ export async function runSubagentAnnounceFlow(params: { sessionKey: params.requesterSessionKey, displayKey: params.requesterDisplayKey, }); - if (!announceTarget) return; + if (!announceTarget) return false; const announcePrompt = buildSubagentAnnouncePrompt({ requesterSessionKey: params.requesterSessionKey, @@ -289,7 +290,7 @@ export async function runSubagentAnnounceFlow(params: { !announceReply.trim() || isAnnounceSkip(announceReply) ) - return; + return false; const statsLine = await buildSubagentStatsLine({ sessionKey: params.childSessionKey, @@ -311,6 +312,7 @@ export async function runSubagentAnnounceFlow(params: { }, timeoutMs: 10_000, }); + didAnnounce = true; } catch { // Best-effort follow-ups; ignore failures to avoid breaking the caller response. } finally { @@ -338,4 +340,5 @@ export async function runSubagentAnnounceFlow(params: { } } } + return didAnnounce; } diff --git a/src/agents/subagent-registry.persistence.test.ts b/src/agents/subagent-registry.persistence.test.ts index 1d9c9d050..942685d01 100644 --- a/src/agents/subagent-registry.persistence.test.ts +++ b/src/agents/subagent-registry.persistence.test.ts @@ -18,7 +18,7 @@ vi.mock("../infra/agent-events.js", () => ({ onAgentEvent: vi.fn(() => noop), })); -const announceSpy = vi.fn(async () => {}); +const announceSpy = vi.fn(async () => true); vi.mock("./subagent-announce.js", () => ({ runSubagentAnnounceFlow: (...args: unknown[]) => announceSpy(...args), })); @@ -67,7 +67,8 @@ describe("subagent registry persistence", () => { // Simulate a process restart: module re-import should load persisted runs // and trigger the announce flow once the run resolves. vi.resetModules(); - await import("./subagent-registry.js"); + const mod2 = await import("./subagent-registry.js"); + mod2.initSubagentRegistry(); // allow queued async wait/announce to execute await new Promise((r) => setTimeout(r, 0)); @@ -82,4 +83,44 @@ describe("subagent registry persistence", () => { expect(first.childRunId).toBe("run-1"); expect(first.childSessionKey).toBe("agent:main:subagent:test"); }); + + it("retries announce even when announceHandled was persisted", async () => { + tempStateDir = await fs.mkdtemp( + path.join(os.tmpdir(), "clawdbot-subagent-"), + ); + process.env.CLAWDBOT_STATE_DIR = tempStateDir; + + const registryPath = path.join(tempStateDir, "subagents", "runs.json"); + const persisted = { + version: 1, + runs: { + "run-2": { + runId: "run-2", + childSessionKey: "agent:main:subagent:two", + requesterSessionKey: "agent:main:main", + requesterDisplayKey: "main", + task: "do the other thing", + cleanup: "keep", + createdAt: 1, + startedAt: 1, + endedAt: 2, + announceHandled: true, + }, + }, + }; + await fs.mkdir(path.dirname(registryPath), { recursive: true }); + await fs.writeFile(registryPath, `${JSON.stringify(persisted)}\n`, "utf8"); + + vi.resetModules(); + const mod = await import("./subagent-registry.js"); + mod.initSubagentRegistry(); + + await new Promise((r) => setTimeout(r, 0)); + + const calls = announceSpy.mock.calls.map((call) => call[0]); + const match = calls.find( + (params) => (params as { childRunId?: string }).childRunId === "run-2", + ); + expect(match).toBeTruthy(); + }); }); diff --git a/src/agents/subagent-registry.store.ts b/src/agents/subagent-registry.store.ts index 8ecfeaf1d..24c9d6e15 100644 --- a/src/agents/subagent-registry.store.ts +++ b/src/agents/subagent-registry.store.ts @@ -8,11 +8,13 @@ export type PersistedSubagentRegistryVersion = 1; type PersistedSubagentRegistry = { version: 1; - runs: Record; + runs: Record; }; const REGISTRY_VERSION = 1 as const; +type PersistedSubagentRunRecord = Omit; + export function resolveSubagentRegistryPath(): string { return path.join(STATE_DIR_CLAWDBOT, "subagents", "runs.json"); } @@ -28,9 +30,17 @@ export function loadSubagentRegistryFromDisk(): Map { const out = new Map(); for (const [runId, entry] of Object.entries(runsRaw)) { if (!entry || typeof entry !== "object") continue; - const typed = entry as SubagentRunRecord; + const typed = entry as PersistedSubagentRunRecord; if (!typed.runId || typeof typed.runId !== "string") continue; - out.set(runId, typed); + const announceCompletedAt = + typeof typed.announceCompletedAt === "number" + ? typed.announceCompletedAt + : undefined; + out.set(runId, { + ...typed, + announceCompletedAt, + announceHandled: Boolean(announceCompletedAt), + }); } return out; } @@ -39,9 +49,14 @@ export function saveSubagentRegistryToDisk( runs: Map, ) { const pathname = resolveSubagentRegistryPath(); + const serialized: Record = {}; + for (const [runId, entry] of runs.entries()) { + const { announceHandled: _ignored, ...persisted } = entry; + serialized[runId] = persisted; + } const out: PersistedSubagentRegistry = { version: REGISTRY_VERSION, - runs: Object.fromEntries(runs.entries()), + runs: serialized, }; saveJsonFile(pathname, out); } diff --git a/src/agents/subagent-registry.ts b/src/agents/subagent-registry.ts index c4382a79b..d37ce0453 100644 --- a/src/agents/subagent-registry.ts +++ b/src/agents/subagent-registry.ts @@ -21,12 +21,14 @@ export type SubagentRunRecord = { startedAt?: number; endedAt?: number; archiveAtMs?: number; + announceCompletedAt?: number; announceHandled: boolean; }; const subagentRuns = new Map(); let sweeper: NodeJS.Timeout | null = null; let listenerStarted = false; +let listenerStop: (() => void) | null = null; let restoreAttempted = false; function persistSubagentRuns() { @@ -43,15 +45,15 @@ function resumeSubagentRun(runId: string) { if (!runId || resumedRuns.has(runId)) return; const entry = subagentRuns.get(runId); if (!entry) return; - if (entry.announceHandled) return; + if (entry.announceCompletedAt) return; if (typeof entry.endedAt === "number" && entry.endedAt > 0) { if (!beginSubagentAnnounce(runId)) return; - void runSubagentAnnounceFlow({ + const announce = runSubagentAnnounceFlow({ childSessionKey: entry.childSessionKey, childRunId: entry.runId, requesterSessionKey: entry.requesterSessionKey, - requesterProvider: entry.requesterProvider, + requesterChannel: entry.requesterChannel, requesterDisplayKey: entry.requesterDisplayKey, task: entry.task, timeoutMs: 30_000, @@ -61,10 +63,9 @@ function resumeSubagentRun(runId: string) { endedAt: entry.endedAt, label: entry.label, }); - if (entry.cleanup === "delete") { - subagentRuns.delete(runId); - persistSubagentRuns(); - } + void announce.then((didAnnounce) => { + finalizeSubagentAnnounce(runId, entry.cleanup, didAnnounce); + }); resumedRuns.add(runId); return; } @@ -155,7 +156,7 @@ async function sweepSubagentRuns() { function ensureListener() { if (listenerStarted) return; listenerStarted = true; - onAgentEvent((evt) => { + listenerStop = onAgentEvent((evt) => { if (!evt || evt.stream !== "lifecycle") return; const entry = subagentRuns.get(evt.runId); if (!entry) { @@ -182,13 +183,9 @@ function ensureListener() { persistSubagentRuns(); if (!beginSubagentAnnounce(evt.runId)) { - if (entry.cleanup === "delete") { - subagentRuns.delete(evt.runId); - persistSubagentRuns(); - } return; } - void runSubagentAnnounceFlow({ + const announce = runSubagentAnnounceFlow({ childSessionKey: entry.childSessionKey, childRunId: entry.runId, requesterSessionKey: entry.requesterSessionKey, @@ -202,16 +199,33 @@ function ensureListener() { endedAt: entry.endedAt, label: entry.label, }); - if (entry.cleanup === "delete") { - subagentRuns.delete(evt.runId); - persistSubagentRuns(); - } + void announce.then((didAnnounce) => { + finalizeSubagentAnnounce(evt.runId, entry.cleanup, didAnnounce); + }); }); } +function finalizeSubagentAnnounce( + runId: string, + cleanup: "delete" | "keep", + didAnnounce: boolean, +) { + const entry = subagentRuns.get(runId); + if (!entry) return; + if (cleanup === "delete") { + subagentRuns.delete(runId); + persistSubagentRuns(); + return; + } + if (!didAnnounce) return; + entry.announceCompletedAt = Date.now(); + persistSubagentRuns(); +} + export function beginSubagentAnnounce(runId: string) { const entry = subagentRuns.get(runId); if (!entry) return false; + if (entry.announceCompletedAt) return false; if (entry.announceHandled) return false; entry.announceHandled = true; persistSubagentRuns(); @@ -288,7 +302,7 @@ async function waitForSubagentCompletion(runId: string, waitTimeoutMs: number) { } if (mutated) persistSubagentRuns(); if (!beginSubagentAnnounce(runId)) return; - void runSubagentAnnounceFlow({ + const announce = runSubagentAnnounceFlow({ childSessionKey: entry.childSessionKey, childRunId: entry.runId, requesterSessionKey: entry.requesterSessionKey, @@ -302,10 +316,9 @@ async function waitForSubagentCompletion(runId: string, waitTimeoutMs: number) { endedAt: entry.endedAt, label: entry.label, }); - if (entry.cleanup === "delete") { - subagentRuns.delete(runId); - persistSubagentRuns(); - } + void announce.then((didAnnounce) => { + finalizeSubagentAnnounce(runId, entry.cleanup, didAnnounce); + }); } catch { // ignore } @@ -313,8 +326,13 @@ async function waitForSubagentCompletion(runId: string, waitTimeoutMs: number) { export function resetSubagentRegistryForTests() { subagentRuns.clear(); + resumedRuns.clear(); stopSweeper(); restoreAttempted = false; + if (listenerStop) { + listenerStop(); + listenerStop = null; + } listenerStarted = false; persistSubagentRuns(); } @@ -325,6 +343,6 @@ export function releaseSubagentRun(runId: string) { if (subagentRuns.size === 0) stopSweeper(); } -// Best-effort: restore persisted runs on process start so announces/cleanup can -// continue after gateway restarts. -restoreSubagentRunsOnce(); +export function initSubagentRegistry() { + restoreSubagentRunsOnce(); +} diff --git a/src/gateway/server.ts b/src/gateway/server.ts index 886af68cc..486214491 100644 --- a/src/gateway/server.ts +++ b/src/gateway/server.ts @@ -19,6 +19,7 @@ import { resolveConfiguredModelRef, resolveHooksGmailModel, } from "../agents/model-selection.js"; +import { initSubagentRegistry } from "../agents/subagent-registry.js"; import { resolveAnnounceTargetFromKey } from "../agents/tools/sessions-send-helpers.js"; import { CANVAS_HOST_PATH } from "../canvas-host/a2ui.js"; import { @@ -460,6 +461,7 @@ export async function startGatewayServer( } const cfgAtStart = loadConfig(); + initSubagentRegistry(); await autoMigrateLegacyState({ cfg: cfgAtStart, log }); const defaultAgentId = resolveDefaultAgentId(cfgAtStart); const defaultWorkspaceDir = resolveAgentWorkspaceDir(