diff --git a/src/agents/subagent-registry.persistence.test.ts b/src/agents/subagent-registry.persistence.test.ts new file mode 100644 index 000000000..1d9c9d050 --- /dev/null +++ b/src/agents/subagent-registry.persistence.test.ts @@ -0,0 +1,85 @@ +import fs from "node:fs/promises"; +import os from "node:os"; +import path from "node:path"; + +import { afterEach, describe, expect, it, vi } from "vitest"; + +const noop = () => {}; + +vi.mock("../gateway/call.js", () => ({ + callGateway: vi.fn(async () => ({ + status: "ok", + startedAt: 111, + endedAt: 222, + })), +})); + +vi.mock("../infra/agent-events.js", () => ({ + onAgentEvent: vi.fn(() => noop), +})); + +const announceSpy = vi.fn(async () => {}); +vi.mock("./subagent-announce.js", () => ({ + runSubagentAnnounceFlow: (...args: unknown[]) => announceSpy(...args), +})); + +describe("subagent registry persistence", () => { + const previousStateDir = process.env.CLAWDBOT_STATE_DIR; + let tempStateDir: string | null = null; + + afterEach(async () => { + announceSpy.mockClear(); + vi.resetModules(); + if (tempStateDir) { + await fs.rm(tempStateDir, { recursive: true, force: true }); + tempStateDir = null; + } + if (previousStateDir === undefined) { + delete process.env.CLAWDBOT_STATE_DIR; + } else { + process.env.CLAWDBOT_STATE_DIR = previousStateDir; + } + }); + + it("persists runs to disk and resumes after restart", async () => { + tempStateDir = await fs.mkdtemp( + path.join(os.tmpdir(), "clawdbot-subagent-"), + ); + process.env.CLAWDBOT_STATE_DIR = tempStateDir; + + vi.resetModules(); + const mod1 = await import("./subagent-registry.js"); + + mod1.registerSubagentRun({ + runId: "run-1", + childSessionKey: "agent:main:subagent:test", + requesterSessionKey: "agent:main:main", + requesterDisplayKey: "main", + task: "do the thing", + cleanup: "keep", + }); + + const registryPath = path.join(tempStateDir, "subagents", "runs.json"); + const raw = await fs.readFile(registryPath, "utf8"); + const parsed = JSON.parse(raw) as { runs?: Record }; + expect(parsed.runs && Object.keys(parsed.runs)).toContain("run-1"); + + // Simulate a process restart: module re-import should load persisted runs + // and trigger the announce flow once the run resolves. + vi.resetModules(); + await import("./subagent-registry.js"); + + // allow queued async wait/announce to execute + await new Promise((r) => setTimeout(r, 0)); + + expect(announceSpy).toHaveBeenCalled(); + + type AnnounceParams = { + childRunId: string; + childSessionKey: string; + }; + const first = announceSpy.mock.calls[0]?.[0] as unknown as AnnounceParams; + expect(first.childRunId).toBe("run-1"); + expect(first.childSessionKey).toBe("agent:main:subagent:test"); + }); +}); diff --git a/src/agents/subagent-registry.store.ts b/src/agents/subagent-registry.store.ts new file mode 100644 index 000000000..8ecfeaf1d --- /dev/null +++ b/src/agents/subagent-registry.store.ts @@ -0,0 +1,47 @@ +import path from "node:path"; + +import { STATE_DIR_CLAWDBOT } from "../config/paths.js"; +import { loadJsonFile, saveJsonFile } from "../infra/json-file.js"; +import type { SubagentRunRecord } from "./subagent-registry.js"; + +export type PersistedSubagentRegistryVersion = 1; + +type PersistedSubagentRegistry = { + version: 1; + runs: Record; +}; + +const REGISTRY_VERSION = 1 as const; + +export function resolveSubagentRegistryPath(): string { + return path.join(STATE_DIR_CLAWDBOT, "subagents", "runs.json"); +} + +export function loadSubagentRegistryFromDisk(): Map { + const pathname = resolveSubagentRegistryPath(); + const raw = loadJsonFile(pathname); + if (!raw || typeof raw !== "object") return new Map(); + const record = raw as Partial; + if (record.version !== REGISTRY_VERSION) return new Map(); + const runsRaw = record.runs; + if (!runsRaw || typeof runsRaw !== "object") return new Map(); + const out = new Map(); + for (const [runId, entry] of Object.entries(runsRaw)) { + if (!entry || typeof entry !== "object") continue; + const typed = entry as SubagentRunRecord; + if (!typed.runId || typeof typed.runId !== "string") continue; + out.set(runId, typed); + } + return out; +} + +export function saveSubagentRegistryToDisk( + runs: Map, +) { + const pathname = resolveSubagentRegistryPath(); + const out: PersistedSubagentRegistry = { + version: REGISTRY_VERSION, + runs: Object.fromEntries(runs.entries()), + }; + saveJsonFile(pathname, out); +} diff --git a/src/agents/subagent-registry.ts b/src/agents/subagent-registry.ts index b9c6e5168..c4382a79b 100644 --- a/src/agents/subagent-registry.ts +++ b/src/agents/subagent-registry.ts @@ -2,6 +2,10 @@ import { loadConfig } from "../config/config.js"; import { callGateway } from "../gateway/call.js"; import { onAgentEvent } from "../infra/agent-events.js"; import { runSubagentAnnounceFlow } from "./subagent-announce.js"; +import { + loadSubagentRegistryFromDisk, + saveSubagentRegistryToDisk, +} from "./subagent-registry.store.js"; import { resolveAgentTimeoutMs } from "./timeout.js"; export type SubagentRunRecord = { @@ -23,6 +27,81 @@ export type SubagentRunRecord = { const subagentRuns = new Map(); let sweeper: NodeJS.Timeout | null = null; let listenerStarted = false; +let restoreAttempted = false; + +function persistSubagentRuns() { + try { + saveSubagentRegistryToDisk(subagentRuns); + } catch { + // ignore persistence failures + } +} + +const resumedRuns = new Set(); + +function resumeSubagentRun(runId: string) { + if (!runId || resumedRuns.has(runId)) return; + const entry = subagentRuns.get(runId); + if (!entry) return; + if (entry.announceHandled) return; + + if (typeof entry.endedAt === "number" && entry.endedAt > 0) { + if (!beginSubagentAnnounce(runId)) return; + void runSubagentAnnounceFlow({ + childSessionKey: entry.childSessionKey, + childRunId: entry.runId, + requesterSessionKey: entry.requesterSessionKey, + requesterProvider: entry.requesterProvider, + requesterDisplayKey: entry.requesterDisplayKey, + task: entry.task, + timeoutMs: 30_000, + cleanup: entry.cleanup, + waitForCompletion: false, + startedAt: entry.startedAt, + endedAt: entry.endedAt, + label: entry.label, + }); + if (entry.cleanup === "delete") { + subagentRuns.delete(runId); + persistSubagentRuns(); + } + resumedRuns.add(runId); + return; + } + + // Wait for completion again after restart. + const cfg = loadConfig(); + const waitTimeoutMs = resolveSubagentWaitTimeoutMs(cfg, undefined); + void waitForSubagentCompletion(runId, waitTimeoutMs); + resumedRuns.add(runId); +} + +function restoreSubagentRunsOnce() { + if (restoreAttempted) return; + restoreAttempted = true; + try { + const restored = loadSubagentRegistryFromDisk(); + if (restored.size === 0) return; + for (const [runId, entry] of restored.entries()) { + if (!runId || !entry) continue; + // Keep any newer in-memory entries. + if (!subagentRuns.has(runId)) { + subagentRuns.set(runId, entry); + } + } + + // Resume pending work. + ensureListener(); + if ([...subagentRuns.values()].some((entry) => entry.archiveAtMs)) { + startSweeper(); + } + for (const runId of subagentRuns.keys()) { + resumeSubagentRun(runId); + } + } catch { + // ignore restore failures + } +} function resolveArchiveAfterMs(cfg?: ReturnType) { const config = cfg ?? loadConfig(); @@ -54,9 +133,11 @@ function stopSweeper() { async function sweepSubagentRuns() { const now = Date.now(); + let mutated = false; for (const [runId, entry] of subagentRuns.entries()) { if (!entry.archiveAtMs || entry.archiveAtMs > now) continue; subagentRuns.delete(runId); + mutated = true; try { await callGateway({ method: "sessions.delete", @@ -67,6 +148,7 @@ async function sweepSubagentRuns() { // ignore } } + if (mutated) persistSubagentRuns(); if (subagentRuns.size === 0) stopSweeper(); } @@ -85,7 +167,10 @@ function ensureListener() { typeof evt.data?.startedAt === "number" ? (evt.data.startedAt as number) : undefined; - if (startedAt) entry.startedAt = startedAt; + if (startedAt) { + entry.startedAt = startedAt; + persistSubagentRuns(); + } return; } if (phase !== "end" && phase !== "error") return; @@ -94,10 +179,12 @@ function ensureListener() { ? (evt.data.endedAt as number) : Date.now(); entry.endedAt = endedAt; + persistSubagentRuns(); if (!beginSubagentAnnounce(evt.runId)) { if (entry.cleanup === "delete") { subagentRuns.delete(evt.runId); + persistSubagentRuns(); } return; } @@ -117,6 +204,7 @@ function ensureListener() { }); if (entry.cleanup === "delete") { subagentRuns.delete(evt.runId); + persistSubagentRuns(); } }); } @@ -126,6 +214,7 @@ export function beginSubagentAnnounce(runId: string) { if (!entry) return false; if (entry.announceHandled) return false; entry.announceHandled = true; + persistSubagentRuns(); return true; } @@ -163,6 +252,7 @@ export function registerSubagentRun(params: { announceHandled: false, }); ensureListener(); + persistSubagentRuns(); if (archiveAfterMs) startSweeper(); // Wait for subagent completion via gateway RPC (cross-process). // The in-process lifecycle listener is a fallback for embedded runs. @@ -183,9 +273,20 @@ async function waitForSubagentCompletion(runId: string, waitTimeoutMs: number) { if (wait?.status !== "ok" && wait?.status !== "error") return; const entry = subagentRuns.get(runId); if (!entry) return; - if (typeof wait.startedAt === "number") entry.startedAt = wait.startedAt; - if (typeof wait.endedAt === "number") entry.endedAt = wait.endedAt; - if (!entry.endedAt) entry.endedAt = Date.now(); + let mutated = false; + if (typeof wait.startedAt === "number") { + entry.startedAt = wait.startedAt; + mutated = true; + } + if (typeof wait.endedAt === "number") { + entry.endedAt = wait.endedAt; + mutated = true; + } + if (!entry.endedAt) { + entry.endedAt = Date.now(); + mutated = true; + } + if (mutated) persistSubagentRuns(); if (!beginSubagentAnnounce(runId)) return; void runSubagentAnnounceFlow({ childSessionKey: entry.childSessionKey, @@ -203,6 +304,7 @@ async function waitForSubagentCompletion(runId: string, waitTimeoutMs: number) { }); if (entry.cleanup === "delete") { subagentRuns.delete(runId); + persistSubagentRuns(); } } catch { // ignore @@ -212,9 +314,17 @@ async function waitForSubagentCompletion(runId: string, waitTimeoutMs: number) { export function resetSubagentRegistryForTests() { subagentRuns.clear(); stopSweeper(); + restoreAttempted = false; + listenerStarted = false; + persistSubagentRuns(); } export function releaseSubagentRun(runId: string) { - subagentRuns.delete(runId); + const didDelete = subagentRuns.delete(runId); + if (didDelete) persistSubagentRuns(); if (subagentRuns.size === 0) stopSweeper(); } + +// Best-effort: restore persisted runs on process start so announces/cleanup can +// continue after gateway restarts. +restoreSubagentRunsOnce();