Persist subagent registry across restarts
This commit is contained in:
committed by
Peter Steinberger
parent
6b587fa411
commit
714de9d996
85
src/agents/subagent-registry.persistence.test.ts
Normal file
85
src/agents/subagent-registry.persistence.test.ts
Normal file
@@ -0,0 +1,85 @@
|
|||||||
|
import fs from "node:fs/promises";
|
||||||
|
import os from "node:os";
|
||||||
|
import path from "node:path";
|
||||||
|
|
||||||
|
import { afterEach, describe, expect, it, vi } from "vitest";
|
||||||
|
|
||||||
|
const noop = () => {};
|
||||||
|
|
||||||
|
vi.mock("../gateway/call.js", () => ({
|
||||||
|
callGateway: vi.fn(async () => ({
|
||||||
|
status: "ok",
|
||||||
|
startedAt: 111,
|
||||||
|
endedAt: 222,
|
||||||
|
})),
|
||||||
|
}));
|
||||||
|
|
||||||
|
vi.mock("../infra/agent-events.js", () => ({
|
||||||
|
onAgentEvent: vi.fn(() => noop),
|
||||||
|
}));
|
||||||
|
|
||||||
|
const announceSpy = vi.fn(async () => {});
|
||||||
|
vi.mock("./subagent-announce.js", () => ({
|
||||||
|
runSubagentAnnounceFlow: (...args: unknown[]) => announceSpy(...args),
|
||||||
|
}));
|
||||||
|
|
||||||
|
describe("subagent registry persistence", () => {
|
||||||
|
const previousStateDir = process.env.CLAWDBOT_STATE_DIR;
|
||||||
|
let tempStateDir: string | null = null;
|
||||||
|
|
||||||
|
afterEach(async () => {
|
||||||
|
announceSpy.mockClear();
|
||||||
|
vi.resetModules();
|
||||||
|
if (tempStateDir) {
|
||||||
|
await fs.rm(tempStateDir, { recursive: true, force: true });
|
||||||
|
tempStateDir = null;
|
||||||
|
}
|
||||||
|
if (previousStateDir === undefined) {
|
||||||
|
delete process.env.CLAWDBOT_STATE_DIR;
|
||||||
|
} else {
|
||||||
|
process.env.CLAWDBOT_STATE_DIR = previousStateDir;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
it("persists runs to disk and resumes after restart", async () => {
|
||||||
|
tempStateDir = await fs.mkdtemp(
|
||||||
|
path.join(os.tmpdir(), "clawdbot-subagent-"),
|
||||||
|
);
|
||||||
|
process.env.CLAWDBOT_STATE_DIR = tempStateDir;
|
||||||
|
|
||||||
|
vi.resetModules();
|
||||||
|
const mod1 = await import("./subagent-registry.js");
|
||||||
|
|
||||||
|
mod1.registerSubagentRun({
|
||||||
|
runId: "run-1",
|
||||||
|
childSessionKey: "agent:main:subagent:test",
|
||||||
|
requesterSessionKey: "agent:main:main",
|
||||||
|
requesterDisplayKey: "main",
|
||||||
|
task: "do the thing",
|
||||||
|
cleanup: "keep",
|
||||||
|
});
|
||||||
|
|
||||||
|
const registryPath = path.join(tempStateDir, "subagents", "runs.json");
|
||||||
|
const raw = await fs.readFile(registryPath, "utf8");
|
||||||
|
const parsed = JSON.parse(raw) as { runs?: Record<string, unknown> };
|
||||||
|
expect(parsed.runs && Object.keys(parsed.runs)).toContain("run-1");
|
||||||
|
|
||||||
|
// Simulate a process restart: module re-import should load persisted runs
|
||||||
|
// and trigger the announce flow once the run resolves.
|
||||||
|
vi.resetModules();
|
||||||
|
await import("./subagent-registry.js");
|
||||||
|
|
||||||
|
// allow queued async wait/announce to execute
|
||||||
|
await new Promise((r) => setTimeout(r, 0));
|
||||||
|
|
||||||
|
expect(announceSpy).toHaveBeenCalled();
|
||||||
|
|
||||||
|
type AnnounceParams = {
|
||||||
|
childRunId: string;
|
||||||
|
childSessionKey: string;
|
||||||
|
};
|
||||||
|
const first = announceSpy.mock.calls[0]?.[0] as unknown as AnnounceParams;
|
||||||
|
expect(first.childRunId).toBe("run-1");
|
||||||
|
expect(first.childSessionKey).toBe("agent:main:subagent:test");
|
||||||
|
});
|
||||||
|
});
|
||||||
47
src/agents/subagent-registry.store.ts
Normal file
47
src/agents/subagent-registry.store.ts
Normal file
@@ -0,0 +1,47 @@
|
|||||||
|
import path from "node:path";
|
||||||
|
|
||||||
|
import { STATE_DIR_CLAWDBOT } from "../config/paths.js";
|
||||||
|
import { loadJsonFile, saveJsonFile } from "../infra/json-file.js";
|
||||||
|
import type { SubagentRunRecord } from "./subagent-registry.js";
|
||||||
|
|
||||||
|
export type PersistedSubagentRegistryVersion = 1;
|
||||||
|
|
||||||
|
type PersistedSubagentRegistry = {
|
||||||
|
version: 1;
|
||||||
|
runs: Record<string, SubagentRunRecord>;
|
||||||
|
};
|
||||||
|
|
||||||
|
const REGISTRY_VERSION = 1 as const;
|
||||||
|
|
||||||
|
export function resolveSubagentRegistryPath(): string {
|
||||||
|
return path.join(STATE_DIR_CLAWDBOT, "subagents", "runs.json");
|
||||||
|
}
|
||||||
|
|
||||||
|
export function loadSubagentRegistryFromDisk(): Map<string, SubagentRunRecord> {
|
||||||
|
const pathname = resolveSubagentRegistryPath();
|
||||||
|
const raw = loadJsonFile(pathname);
|
||||||
|
if (!raw || typeof raw !== "object") return new Map();
|
||||||
|
const record = raw as Partial<PersistedSubagentRegistry>;
|
||||||
|
if (record.version !== REGISTRY_VERSION) return new Map();
|
||||||
|
const runsRaw = record.runs;
|
||||||
|
if (!runsRaw || typeof runsRaw !== "object") return new Map();
|
||||||
|
const out = new Map<string, SubagentRunRecord>();
|
||||||
|
for (const [runId, entry] of Object.entries(runsRaw)) {
|
||||||
|
if (!entry || typeof entry !== "object") continue;
|
||||||
|
const typed = entry as SubagentRunRecord;
|
||||||
|
if (!typed.runId || typeof typed.runId !== "string") continue;
|
||||||
|
out.set(runId, typed);
|
||||||
|
}
|
||||||
|
return out;
|
||||||
|
}
|
||||||
|
|
||||||
|
export function saveSubagentRegistryToDisk(
|
||||||
|
runs: Map<string, SubagentRunRecord>,
|
||||||
|
) {
|
||||||
|
const pathname = resolveSubagentRegistryPath();
|
||||||
|
const out: PersistedSubagentRegistry = {
|
||||||
|
version: REGISTRY_VERSION,
|
||||||
|
runs: Object.fromEntries(runs.entries()),
|
||||||
|
};
|
||||||
|
saveJsonFile(pathname, out);
|
||||||
|
}
|
||||||
@@ -2,6 +2,10 @@ import { loadConfig } from "../config/config.js";
|
|||||||
import { callGateway } from "../gateway/call.js";
|
import { callGateway } from "../gateway/call.js";
|
||||||
import { onAgentEvent } from "../infra/agent-events.js";
|
import { onAgentEvent } from "../infra/agent-events.js";
|
||||||
import { runSubagentAnnounceFlow } from "./subagent-announce.js";
|
import { runSubagentAnnounceFlow } from "./subagent-announce.js";
|
||||||
|
import {
|
||||||
|
loadSubagentRegistryFromDisk,
|
||||||
|
saveSubagentRegistryToDisk,
|
||||||
|
} from "./subagent-registry.store.js";
|
||||||
import { resolveAgentTimeoutMs } from "./timeout.js";
|
import { resolveAgentTimeoutMs } from "./timeout.js";
|
||||||
|
|
||||||
export type SubagentRunRecord = {
|
export type SubagentRunRecord = {
|
||||||
@@ -23,6 +27,81 @@ export type SubagentRunRecord = {
|
|||||||
const subagentRuns = new Map<string, SubagentRunRecord>();
|
const subagentRuns = new Map<string, SubagentRunRecord>();
|
||||||
let sweeper: NodeJS.Timeout | null = null;
|
let sweeper: NodeJS.Timeout | null = null;
|
||||||
let listenerStarted = false;
|
let listenerStarted = false;
|
||||||
|
let restoreAttempted = false;
|
||||||
|
|
||||||
|
function persistSubagentRuns() {
|
||||||
|
try {
|
||||||
|
saveSubagentRegistryToDisk(subagentRuns);
|
||||||
|
} catch {
|
||||||
|
// ignore persistence failures
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
const resumedRuns = new Set<string>();
|
||||||
|
|
||||||
|
function resumeSubagentRun(runId: string) {
|
||||||
|
if (!runId || resumedRuns.has(runId)) return;
|
||||||
|
const entry = subagentRuns.get(runId);
|
||||||
|
if (!entry) return;
|
||||||
|
if (entry.announceHandled) return;
|
||||||
|
|
||||||
|
if (typeof entry.endedAt === "number" && entry.endedAt > 0) {
|
||||||
|
if (!beginSubagentAnnounce(runId)) return;
|
||||||
|
void runSubagentAnnounceFlow({
|
||||||
|
childSessionKey: entry.childSessionKey,
|
||||||
|
childRunId: entry.runId,
|
||||||
|
requesterSessionKey: entry.requesterSessionKey,
|
||||||
|
requesterProvider: entry.requesterProvider,
|
||||||
|
requesterDisplayKey: entry.requesterDisplayKey,
|
||||||
|
task: entry.task,
|
||||||
|
timeoutMs: 30_000,
|
||||||
|
cleanup: entry.cleanup,
|
||||||
|
waitForCompletion: false,
|
||||||
|
startedAt: entry.startedAt,
|
||||||
|
endedAt: entry.endedAt,
|
||||||
|
label: entry.label,
|
||||||
|
});
|
||||||
|
if (entry.cleanup === "delete") {
|
||||||
|
subagentRuns.delete(runId);
|
||||||
|
persistSubagentRuns();
|
||||||
|
}
|
||||||
|
resumedRuns.add(runId);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Wait for completion again after restart.
|
||||||
|
const cfg = loadConfig();
|
||||||
|
const waitTimeoutMs = resolveSubagentWaitTimeoutMs(cfg, undefined);
|
||||||
|
void waitForSubagentCompletion(runId, waitTimeoutMs);
|
||||||
|
resumedRuns.add(runId);
|
||||||
|
}
|
||||||
|
|
||||||
|
function restoreSubagentRunsOnce() {
|
||||||
|
if (restoreAttempted) return;
|
||||||
|
restoreAttempted = true;
|
||||||
|
try {
|
||||||
|
const restored = loadSubagentRegistryFromDisk();
|
||||||
|
if (restored.size === 0) return;
|
||||||
|
for (const [runId, entry] of restored.entries()) {
|
||||||
|
if (!runId || !entry) continue;
|
||||||
|
// Keep any newer in-memory entries.
|
||||||
|
if (!subagentRuns.has(runId)) {
|
||||||
|
subagentRuns.set(runId, entry);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Resume pending work.
|
||||||
|
ensureListener();
|
||||||
|
if ([...subagentRuns.values()].some((entry) => entry.archiveAtMs)) {
|
||||||
|
startSweeper();
|
||||||
|
}
|
||||||
|
for (const runId of subagentRuns.keys()) {
|
||||||
|
resumeSubagentRun(runId);
|
||||||
|
}
|
||||||
|
} catch {
|
||||||
|
// ignore restore failures
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
function resolveArchiveAfterMs(cfg?: ReturnType<typeof loadConfig>) {
|
function resolveArchiveAfterMs(cfg?: ReturnType<typeof loadConfig>) {
|
||||||
const config = cfg ?? loadConfig();
|
const config = cfg ?? loadConfig();
|
||||||
@@ -54,9 +133,11 @@ function stopSweeper() {
|
|||||||
|
|
||||||
async function sweepSubagentRuns() {
|
async function sweepSubagentRuns() {
|
||||||
const now = Date.now();
|
const now = Date.now();
|
||||||
|
let mutated = false;
|
||||||
for (const [runId, entry] of subagentRuns.entries()) {
|
for (const [runId, entry] of subagentRuns.entries()) {
|
||||||
if (!entry.archiveAtMs || entry.archiveAtMs > now) continue;
|
if (!entry.archiveAtMs || entry.archiveAtMs > now) continue;
|
||||||
subagentRuns.delete(runId);
|
subagentRuns.delete(runId);
|
||||||
|
mutated = true;
|
||||||
try {
|
try {
|
||||||
await callGateway({
|
await callGateway({
|
||||||
method: "sessions.delete",
|
method: "sessions.delete",
|
||||||
@@ -67,6 +148,7 @@ async function sweepSubagentRuns() {
|
|||||||
// ignore
|
// ignore
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if (mutated) persistSubagentRuns();
|
||||||
if (subagentRuns.size === 0) stopSweeper();
|
if (subagentRuns.size === 0) stopSweeper();
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -85,7 +167,10 @@ function ensureListener() {
|
|||||||
typeof evt.data?.startedAt === "number"
|
typeof evt.data?.startedAt === "number"
|
||||||
? (evt.data.startedAt as number)
|
? (evt.data.startedAt as number)
|
||||||
: undefined;
|
: undefined;
|
||||||
if (startedAt) entry.startedAt = startedAt;
|
if (startedAt) {
|
||||||
|
entry.startedAt = startedAt;
|
||||||
|
persistSubagentRuns();
|
||||||
|
}
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
if (phase !== "end" && phase !== "error") return;
|
if (phase !== "end" && phase !== "error") return;
|
||||||
@@ -94,10 +179,12 @@ function ensureListener() {
|
|||||||
? (evt.data.endedAt as number)
|
? (evt.data.endedAt as number)
|
||||||
: Date.now();
|
: Date.now();
|
||||||
entry.endedAt = endedAt;
|
entry.endedAt = endedAt;
|
||||||
|
persistSubagentRuns();
|
||||||
|
|
||||||
if (!beginSubagentAnnounce(evt.runId)) {
|
if (!beginSubagentAnnounce(evt.runId)) {
|
||||||
if (entry.cleanup === "delete") {
|
if (entry.cleanup === "delete") {
|
||||||
subagentRuns.delete(evt.runId);
|
subagentRuns.delete(evt.runId);
|
||||||
|
persistSubagentRuns();
|
||||||
}
|
}
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
@@ -117,6 +204,7 @@ function ensureListener() {
|
|||||||
});
|
});
|
||||||
if (entry.cleanup === "delete") {
|
if (entry.cleanup === "delete") {
|
||||||
subagentRuns.delete(evt.runId);
|
subagentRuns.delete(evt.runId);
|
||||||
|
persistSubagentRuns();
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
@@ -126,6 +214,7 @@ export function beginSubagentAnnounce(runId: string) {
|
|||||||
if (!entry) return false;
|
if (!entry) return false;
|
||||||
if (entry.announceHandled) return false;
|
if (entry.announceHandled) return false;
|
||||||
entry.announceHandled = true;
|
entry.announceHandled = true;
|
||||||
|
persistSubagentRuns();
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -163,6 +252,7 @@ export function registerSubagentRun(params: {
|
|||||||
announceHandled: false,
|
announceHandled: false,
|
||||||
});
|
});
|
||||||
ensureListener();
|
ensureListener();
|
||||||
|
persistSubagentRuns();
|
||||||
if (archiveAfterMs) startSweeper();
|
if (archiveAfterMs) startSweeper();
|
||||||
// Wait for subagent completion via gateway RPC (cross-process).
|
// Wait for subagent completion via gateway RPC (cross-process).
|
||||||
// The in-process lifecycle listener is a fallback for embedded runs.
|
// The in-process lifecycle listener is a fallback for embedded runs.
|
||||||
@@ -183,9 +273,20 @@ async function waitForSubagentCompletion(runId: string, waitTimeoutMs: number) {
|
|||||||
if (wait?.status !== "ok" && wait?.status !== "error") return;
|
if (wait?.status !== "ok" && wait?.status !== "error") return;
|
||||||
const entry = subagentRuns.get(runId);
|
const entry = subagentRuns.get(runId);
|
||||||
if (!entry) return;
|
if (!entry) return;
|
||||||
if (typeof wait.startedAt === "number") entry.startedAt = wait.startedAt;
|
let mutated = false;
|
||||||
if (typeof wait.endedAt === "number") entry.endedAt = wait.endedAt;
|
if (typeof wait.startedAt === "number") {
|
||||||
if (!entry.endedAt) entry.endedAt = Date.now();
|
entry.startedAt = wait.startedAt;
|
||||||
|
mutated = true;
|
||||||
|
}
|
||||||
|
if (typeof wait.endedAt === "number") {
|
||||||
|
entry.endedAt = wait.endedAt;
|
||||||
|
mutated = true;
|
||||||
|
}
|
||||||
|
if (!entry.endedAt) {
|
||||||
|
entry.endedAt = Date.now();
|
||||||
|
mutated = true;
|
||||||
|
}
|
||||||
|
if (mutated) persistSubagentRuns();
|
||||||
if (!beginSubagentAnnounce(runId)) return;
|
if (!beginSubagentAnnounce(runId)) return;
|
||||||
void runSubagentAnnounceFlow({
|
void runSubagentAnnounceFlow({
|
||||||
childSessionKey: entry.childSessionKey,
|
childSessionKey: entry.childSessionKey,
|
||||||
@@ -203,6 +304,7 @@ async function waitForSubagentCompletion(runId: string, waitTimeoutMs: number) {
|
|||||||
});
|
});
|
||||||
if (entry.cleanup === "delete") {
|
if (entry.cleanup === "delete") {
|
||||||
subagentRuns.delete(runId);
|
subagentRuns.delete(runId);
|
||||||
|
persistSubagentRuns();
|
||||||
}
|
}
|
||||||
} catch {
|
} catch {
|
||||||
// ignore
|
// ignore
|
||||||
@@ -212,9 +314,17 @@ async function waitForSubagentCompletion(runId: string, waitTimeoutMs: number) {
|
|||||||
export function resetSubagentRegistryForTests() {
|
export function resetSubagentRegistryForTests() {
|
||||||
subagentRuns.clear();
|
subagentRuns.clear();
|
||||||
stopSweeper();
|
stopSweeper();
|
||||||
|
restoreAttempted = false;
|
||||||
|
listenerStarted = false;
|
||||||
|
persistSubagentRuns();
|
||||||
}
|
}
|
||||||
|
|
||||||
export function releaseSubagentRun(runId: string) {
|
export function releaseSubagentRun(runId: string) {
|
||||||
subagentRuns.delete(runId);
|
const didDelete = subagentRuns.delete(runId);
|
||||||
|
if (didDelete) persistSubagentRuns();
|
||||||
if (subagentRuns.size === 0) stopSweeper();
|
if (subagentRuns.size === 0) stopSweeper();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Best-effort: restore persisted runs on process start so announces/cleanup can
|
||||||
|
// continue after gateway restarts.
|
||||||
|
restoreSubagentRunsOnce();
|
||||||
|
|||||||
Reference in New Issue
Block a user