diff --git a/CHANGELOG.md b/CHANGELOG.md index 17843b5ab..58ff1f56a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,7 @@ ### Fixes - Packaging: include `dist/memory/**` in the npm tarball (fixes `ERR_MODULE_NOT_FOUND` for `dist/memory/index.js`). +- Agents: persist sub-agent registry across gateway restarts and resume announce flow safely. (#831) — thanks @roshanasingh4. ## 2026.1.12-1 diff --git a/src/agents/subagent-announce.ts b/src/agents/subagent-announce.ts index 088e4883a..cae87cd64 100644 --- a/src/agents/subagent-announce.ts +++ b/src/agents/subagent-announce.ts @@ -236,7 +236,8 @@ export async function runSubagentAnnounceFlow(params: { startedAt?: number; endedAt?: number; label?: string; -}) { +}): Promise { + let didAnnounce = false; try { let reply = params.roundOneReply; if (!reply && params.waitForCompletion !== false) { @@ -249,7 +250,7 @@ export async function runSubagentAnnounceFlow(params: { }, timeoutMs: waitMs + 2000, })) as { status?: string }; - if (wait?.status !== "ok") return; + if (wait?.status !== "ok") return false; reply = await readLatestAssistantReply({ sessionKey: params.childSessionKey, }); @@ -265,7 +266,7 @@ export async function runSubagentAnnounceFlow(params: { sessionKey: params.requesterSessionKey, displayKey: params.requesterDisplayKey, }); - if (!announceTarget) return; + if (!announceTarget) return false; const announcePrompt = buildSubagentAnnouncePrompt({ requesterSessionKey: params.requesterSessionKey, @@ -289,7 +290,7 @@ export async function runSubagentAnnounceFlow(params: { !announceReply.trim() || isAnnounceSkip(announceReply) ) - return; + return false; const statsLine = await buildSubagentStatsLine({ sessionKey: params.childSessionKey, @@ -311,6 +312,7 @@ export async function runSubagentAnnounceFlow(params: { }, timeoutMs: 10_000, }); + didAnnounce = true; } catch { // Best-effort follow-ups; ignore failures to avoid breaking the caller response. } finally { @@ -338,4 +340,5 @@ export async function runSubagentAnnounceFlow(params: { } } } + return didAnnounce; } diff --git a/src/agents/subagent-registry.persistence.test.ts b/src/agents/subagent-registry.persistence.test.ts new file mode 100644 index 000000000..942685d01 --- /dev/null +++ b/src/agents/subagent-registry.persistence.test.ts @@ -0,0 +1,126 @@ +import fs from "node:fs/promises"; +import os from "node:os"; +import path from "node:path"; + +import { afterEach, describe, expect, it, vi } from "vitest"; + +const noop = () => {}; + +vi.mock("../gateway/call.js", () => ({ + callGateway: vi.fn(async () => ({ + status: "ok", + startedAt: 111, + endedAt: 222, + })), +})); + +vi.mock("../infra/agent-events.js", () => ({ + onAgentEvent: vi.fn(() => noop), +})); + +const announceSpy = vi.fn(async () => true); +vi.mock("./subagent-announce.js", () => ({ + runSubagentAnnounceFlow: (...args: unknown[]) => announceSpy(...args), +})); + +describe("subagent registry persistence", () => { + const previousStateDir = process.env.CLAWDBOT_STATE_DIR; + let tempStateDir: string | null = null; + + afterEach(async () => { + announceSpy.mockClear(); + vi.resetModules(); + if (tempStateDir) { + await fs.rm(tempStateDir, { recursive: true, force: true }); + tempStateDir = null; + } + if (previousStateDir === undefined) { + delete process.env.CLAWDBOT_STATE_DIR; + } else { + process.env.CLAWDBOT_STATE_DIR = previousStateDir; + } + }); + + it("persists runs to disk and resumes after restart", async () => { + tempStateDir = await fs.mkdtemp( + path.join(os.tmpdir(), "clawdbot-subagent-"), + ); + process.env.CLAWDBOT_STATE_DIR = tempStateDir; + + vi.resetModules(); + const mod1 = await import("./subagent-registry.js"); + + mod1.registerSubagentRun({ + runId: "run-1", + childSessionKey: "agent:main:subagent:test", + requesterSessionKey: "agent:main:main", + requesterDisplayKey: "main", + task: "do the thing", + cleanup: "keep", + }); + + const registryPath = path.join(tempStateDir, "subagents", "runs.json"); + const raw = await fs.readFile(registryPath, "utf8"); + const parsed = JSON.parse(raw) as { runs?: Record }; + expect(parsed.runs && Object.keys(parsed.runs)).toContain("run-1"); + + // Simulate a process restart: module re-import should load persisted runs + // and trigger the announce flow once the run resolves. + vi.resetModules(); + const mod2 = await import("./subagent-registry.js"); + mod2.initSubagentRegistry(); + + // allow queued async wait/announce to execute + await new Promise((r) => setTimeout(r, 0)); + + expect(announceSpy).toHaveBeenCalled(); + + type AnnounceParams = { + childRunId: string; + childSessionKey: string; + }; + const first = announceSpy.mock.calls[0]?.[0] as unknown as AnnounceParams; + expect(first.childRunId).toBe("run-1"); + expect(first.childSessionKey).toBe("agent:main:subagent:test"); + }); + + it("retries announce even when announceHandled was persisted", async () => { + tempStateDir = await fs.mkdtemp( + path.join(os.tmpdir(), "clawdbot-subagent-"), + ); + process.env.CLAWDBOT_STATE_DIR = tempStateDir; + + const registryPath = path.join(tempStateDir, "subagents", "runs.json"); + const persisted = { + version: 1, + runs: { + "run-2": { + runId: "run-2", + childSessionKey: "agent:main:subagent:two", + requesterSessionKey: "agent:main:main", + requesterDisplayKey: "main", + task: "do the other thing", + cleanup: "keep", + createdAt: 1, + startedAt: 1, + endedAt: 2, + announceHandled: true, + }, + }, + }; + await fs.mkdir(path.dirname(registryPath), { recursive: true }); + await fs.writeFile(registryPath, `${JSON.stringify(persisted)}\n`, "utf8"); + + vi.resetModules(); + const mod = await import("./subagent-registry.js"); + mod.initSubagentRegistry(); + + await new Promise((r) => setTimeout(r, 0)); + + const calls = announceSpy.mock.calls.map((call) => call[0]); + const match = calls.find( + (params) => (params as { childRunId?: string }).childRunId === "run-2", + ); + expect(match).toBeTruthy(); + }); +}); diff --git a/src/agents/subagent-registry.store.ts b/src/agents/subagent-registry.store.ts new file mode 100644 index 000000000..24c9d6e15 --- /dev/null +++ b/src/agents/subagent-registry.store.ts @@ -0,0 +1,62 @@ +import path from "node:path"; + +import { STATE_DIR_CLAWDBOT } from "../config/paths.js"; +import { loadJsonFile, saveJsonFile } from "../infra/json-file.js"; +import type { SubagentRunRecord } from "./subagent-registry.js"; + +export type PersistedSubagentRegistryVersion = 1; + +type PersistedSubagentRegistry = { + version: 1; + runs: Record; +}; + +const REGISTRY_VERSION = 1 as const; + +type PersistedSubagentRunRecord = Omit; + +export function resolveSubagentRegistryPath(): string { + return path.join(STATE_DIR_CLAWDBOT, "subagents", "runs.json"); +} + +export function loadSubagentRegistryFromDisk(): Map { + const pathname = resolveSubagentRegistryPath(); + const raw = loadJsonFile(pathname); + if (!raw || typeof raw !== "object") return new Map(); + const record = raw as Partial; + if (record.version !== REGISTRY_VERSION) return new Map(); + const runsRaw = record.runs; + if (!runsRaw || typeof runsRaw !== "object") return new Map(); + const out = new Map(); + for (const [runId, entry] of Object.entries(runsRaw)) { + if (!entry || typeof entry !== "object") continue; + const typed = entry as PersistedSubagentRunRecord; + if (!typed.runId || typeof typed.runId !== "string") continue; + const announceCompletedAt = + typeof typed.announceCompletedAt === "number" + ? typed.announceCompletedAt + : undefined; + out.set(runId, { + ...typed, + announceCompletedAt, + announceHandled: Boolean(announceCompletedAt), + }); + } + return out; +} + +export function saveSubagentRegistryToDisk( + runs: Map, +) { + const pathname = resolveSubagentRegistryPath(); + const serialized: Record = {}; + for (const [runId, entry] of runs.entries()) { + const { announceHandled: _ignored, ...persisted } = entry; + serialized[runId] = persisted; + } + const out: PersistedSubagentRegistry = { + version: REGISTRY_VERSION, + runs: serialized, + }; + saveJsonFile(pathname, out); +} diff --git a/src/agents/subagent-registry.ts b/src/agents/subagent-registry.ts index b9c6e5168..d37ce0453 100644 --- a/src/agents/subagent-registry.ts +++ b/src/agents/subagent-registry.ts @@ -2,6 +2,10 @@ import { loadConfig } from "../config/config.js"; import { callGateway } from "../gateway/call.js"; import { onAgentEvent } from "../infra/agent-events.js"; import { runSubagentAnnounceFlow } from "./subagent-announce.js"; +import { + loadSubagentRegistryFromDisk, + saveSubagentRegistryToDisk, +} from "./subagent-registry.store.js"; import { resolveAgentTimeoutMs } from "./timeout.js"; export type SubagentRunRecord = { @@ -17,12 +21,88 @@ export type SubagentRunRecord = { startedAt?: number; endedAt?: number; archiveAtMs?: number; + announceCompletedAt?: number; announceHandled: boolean; }; const subagentRuns = new Map(); let sweeper: NodeJS.Timeout | null = null; let listenerStarted = false; +let listenerStop: (() => void) | null = null; +let restoreAttempted = false; + +function persistSubagentRuns() { + try { + saveSubagentRegistryToDisk(subagentRuns); + } catch { + // ignore persistence failures + } +} + +const resumedRuns = new Set(); + +function resumeSubagentRun(runId: string) { + if (!runId || resumedRuns.has(runId)) return; + const entry = subagentRuns.get(runId); + if (!entry) return; + if (entry.announceCompletedAt) return; + + if (typeof entry.endedAt === "number" && entry.endedAt > 0) { + if (!beginSubagentAnnounce(runId)) return; + const announce = runSubagentAnnounceFlow({ + childSessionKey: entry.childSessionKey, + childRunId: entry.runId, + requesterSessionKey: entry.requesterSessionKey, + requesterChannel: entry.requesterChannel, + requesterDisplayKey: entry.requesterDisplayKey, + task: entry.task, + timeoutMs: 30_000, + cleanup: entry.cleanup, + waitForCompletion: false, + startedAt: entry.startedAt, + endedAt: entry.endedAt, + label: entry.label, + }); + void announce.then((didAnnounce) => { + finalizeSubagentAnnounce(runId, entry.cleanup, didAnnounce); + }); + resumedRuns.add(runId); + return; + } + + // Wait for completion again after restart. + const cfg = loadConfig(); + const waitTimeoutMs = resolveSubagentWaitTimeoutMs(cfg, undefined); + void waitForSubagentCompletion(runId, waitTimeoutMs); + resumedRuns.add(runId); +} + +function restoreSubagentRunsOnce() { + if (restoreAttempted) return; + restoreAttempted = true; + try { + const restored = loadSubagentRegistryFromDisk(); + if (restored.size === 0) return; + for (const [runId, entry] of restored.entries()) { + if (!runId || !entry) continue; + // Keep any newer in-memory entries. + if (!subagentRuns.has(runId)) { + subagentRuns.set(runId, entry); + } + } + + // Resume pending work. + ensureListener(); + if ([...subagentRuns.values()].some((entry) => entry.archiveAtMs)) { + startSweeper(); + } + for (const runId of subagentRuns.keys()) { + resumeSubagentRun(runId); + } + } catch { + // ignore restore failures + } +} function resolveArchiveAfterMs(cfg?: ReturnType) { const config = cfg ?? loadConfig(); @@ -54,9 +134,11 @@ function stopSweeper() { async function sweepSubagentRuns() { const now = Date.now(); + let mutated = false; for (const [runId, entry] of subagentRuns.entries()) { if (!entry.archiveAtMs || entry.archiveAtMs > now) continue; subagentRuns.delete(runId); + mutated = true; try { await callGateway({ method: "sessions.delete", @@ -67,13 +149,14 @@ async function sweepSubagentRuns() { // ignore } } + if (mutated) persistSubagentRuns(); if (subagentRuns.size === 0) stopSweeper(); } function ensureListener() { if (listenerStarted) return; listenerStarted = true; - onAgentEvent((evt) => { + listenerStop = onAgentEvent((evt) => { if (!evt || evt.stream !== "lifecycle") return; const entry = subagentRuns.get(evt.runId); if (!entry) { @@ -85,7 +168,10 @@ function ensureListener() { typeof evt.data?.startedAt === "number" ? (evt.data.startedAt as number) : undefined; - if (startedAt) entry.startedAt = startedAt; + if (startedAt) { + entry.startedAt = startedAt; + persistSubagentRuns(); + } return; } if (phase !== "end" && phase !== "error") return; @@ -94,14 +180,12 @@ function ensureListener() { ? (evt.data.endedAt as number) : Date.now(); entry.endedAt = endedAt; + persistSubagentRuns(); if (!beginSubagentAnnounce(evt.runId)) { - if (entry.cleanup === "delete") { - subagentRuns.delete(evt.runId); - } return; } - void runSubagentAnnounceFlow({ + const announce = runSubagentAnnounceFlow({ childSessionKey: entry.childSessionKey, childRunId: entry.runId, requesterSessionKey: entry.requesterSessionKey, @@ -115,17 +199,36 @@ function ensureListener() { endedAt: entry.endedAt, label: entry.label, }); - if (entry.cleanup === "delete") { - subagentRuns.delete(evt.runId); - } + void announce.then((didAnnounce) => { + finalizeSubagentAnnounce(evt.runId, entry.cleanup, didAnnounce); + }); }); } +function finalizeSubagentAnnounce( + runId: string, + cleanup: "delete" | "keep", + didAnnounce: boolean, +) { + const entry = subagentRuns.get(runId); + if (!entry) return; + if (cleanup === "delete") { + subagentRuns.delete(runId); + persistSubagentRuns(); + return; + } + if (!didAnnounce) return; + entry.announceCompletedAt = Date.now(); + persistSubagentRuns(); +} + export function beginSubagentAnnounce(runId: string) { const entry = subagentRuns.get(runId); if (!entry) return false; + if (entry.announceCompletedAt) return false; if (entry.announceHandled) return false; entry.announceHandled = true; + persistSubagentRuns(); return true; } @@ -163,6 +266,7 @@ export function registerSubagentRun(params: { announceHandled: false, }); ensureListener(); + persistSubagentRuns(); if (archiveAfterMs) startSweeper(); // Wait for subagent completion via gateway RPC (cross-process). // The in-process lifecycle listener is a fallback for embedded runs. @@ -183,11 +287,22 @@ async function waitForSubagentCompletion(runId: string, waitTimeoutMs: number) { if (wait?.status !== "ok" && wait?.status !== "error") return; const entry = subagentRuns.get(runId); if (!entry) return; - if (typeof wait.startedAt === "number") entry.startedAt = wait.startedAt; - if (typeof wait.endedAt === "number") entry.endedAt = wait.endedAt; - if (!entry.endedAt) entry.endedAt = Date.now(); + let mutated = false; + if (typeof wait.startedAt === "number") { + entry.startedAt = wait.startedAt; + mutated = true; + } + if (typeof wait.endedAt === "number") { + entry.endedAt = wait.endedAt; + mutated = true; + } + if (!entry.endedAt) { + entry.endedAt = Date.now(); + mutated = true; + } + if (mutated) persistSubagentRuns(); if (!beginSubagentAnnounce(runId)) return; - void runSubagentAnnounceFlow({ + const announce = runSubagentAnnounceFlow({ childSessionKey: entry.childSessionKey, childRunId: entry.runId, requesterSessionKey: entry.requesterSessionKey, @@ -201,9 +316,9 @@ async function waitForSubagentCompletion(runId: string, waitTimeoutMs: number) { endedAt: entry.endedAt, label: entry.label, }); - if (entry.cleanup === "delete") { - subagentRuns.delete(runId); - } + void announce.then((didAnnounce) => { + finalizeSubagentAnnounce(runId, entry.cleanup, didAnnounce); + }); } catch { // ignore } @@ -211,10 +326,23 @@ async function waitForSubagentCompletion(runId: string, waitTimeoutMs: number) { export function resetSubagentRegistryForTests() { subagentRuns.clear(); + resumedRuns.clear(); stopSweeper(); + restoreAttempted = false; + if (listenerStop) { + listenerStop(); + listenerStop = null; + } + listenerStarted = false; + persistSubagentRuns(); } export function releaseSubagentRun(runId: string) { - subagentRuns.delete(runId); + const didDelete = subagentRuns.delete(runId); + if (didDelete) persistSubagentRuns(); if (subagentRuns.size === 0) stopSweeper(); } + +export function initSubagentRegistry() { + restoreSubagentRunsOnce(); +} diff --git a/src/gateway/server.ts b/src/gateway/server.ts index 886af68cc..486214491 100644 --- a/src/gateway/server.ts +++ b/src/gateway/server.ts @@ -19,6 +19,7 @@ import { resolveConfiguredModelRef, resolveHooksGmailModel, } from "../agents/model-selection.js"; +import { initSubagentRegistry } from "../agents/subagent-registry.js"; import { resolveAnnounceTargetFromKey } from "../agents/tools/sessions-send-helpers.js"; import { CANVAS_HOST_PATH } from "../canvas-host/a2ui.js"; import { @@ -460,6 +461,7 @@ export async function startGatewayServer( } const cfgAtStart = loadConfig(); + initSubagentRegistry(); await autoMigrateLegacyState({ cfg: cfgAtStart, log }); const defaultAgentId = resolveDefaultAgentId(cfgAtStart); const defaultWorkspaceDir = resolveAgentWorkspaceDir(