Merge pull request #831 from roshanasingh4/fix/817-persist-subagent-registry
Persist subagent run registry to survive gateway restarts
This commit is contained in:
@@ -4,6 +4,7 @@
|
||||
|
||||
### Fixes
|
||||
- Packaging: include `dist/memory/**` in the npm tarball (fixes `ERR_MODULE_NOT_FOUND` for `dist/memory/index.js`).
|
||||
- Agents: persist sub-agent registry across gateway restarts and resume announce flow safely. (#831) — thanks @roshanasingh4.
|
||||
|
||||
## 2026.1.12-1
|
||||
|
||||
|
||||
@@ -236,7 +236,8 @@ export async function runSubagentAnnounceFlow(params: {
|
||||
startedAt?: number;
|
||||
endedAt?: number;
|
||||
label?: string;
|
||||
}) {
|
||||
}): Promise<boolean> {
|
||||
let didAnnounce = false;
|
||||
try {
|
||||
let reply = params.roundOneReply;
|
||||
if (!reply && params.waitForCompletion !== false) {
|
||||
@@ -249,7 +250,7 @@ export async function runSubagentAnnounceFlow(params: {
|
||||
},
|
||||
timeoutMs: waitMs + 2000,
|
||||
})) as { status?: string };
|
||||
if (wait?.status !== "ok") return;
|
||||
if (wait?.status !== "ok") return false;
|
||||
reply = await readLatestAssistantReply({
|
||||
sessionKey: params.childSessionKey,
|
||||
});
|
||||
@@ -265,7 +266,7 @@ export async function runSubagentAnnounceFlow(params: {
|
||||
sessionKey: params.requesterSessionKey,
|
||||
displayKey: params.requesterDisplayKey,
|
||||
});
|
||||
if (!announceTarget) return;
|
||||
if (!announceTarget) return false;
|
||||
|
||||
const announcePrompt = buildSubagentAnnouncePrompt({
|
||||
requesterSessionKey: params.requesterSessionKey,
|
||||
@@ -289,7 +290,7 @@ export async function runSubagentAnnounceFlow(params: {
|
||||
!announceReply.trim() ||
|
||||
isAnnounceSkip(announceReply)
|
||||
)
|
||||
return;
|
||||
return false;
|
||||
|
||||
const statsLine = await buildSubagentStatsLine({
|
||||
sessionKey: params.childSessionKey,
|
||||
@@ -311,6 +312,7 @@ export async function runSubagentAnnounceFlow(params: {
|
||||
},
|
||||
timeoutMs: 10_000,
|
||||
});
|
||||
didAnnounce = true;
|
||||
} catch {
|
||||
// Best-effort follow-ups; ignore failures to avoid breaking the caller response.
|
||||
} finally {
|
||||
@@ -338,4 +340,5 @@ export async function runSubagentAnnounceFlow(params: {
|
||||
}
|
||||
}
|
||||
}
|
||||
return didAnnounce;
|
||||
}
|
||||
|
||||
126
src/agents/subagent-registry.persistence.test.ts
Normal file
126
src/agents/subagent-registry.persistence.test.ts
Normal file
@@ -0,0 +1,126 @@
|
||||
import fs from "node:fs/promises";
|
||||
import os from "node:os";
|
||||
import path from "node:path";
|
||||
|
||||
import { afterEach, describe, expect, it, vi } from "vitest";
|
||||
|
||||
const noop = () => {};
|
||||
|
||||
vi.mock("../gateway/call.js", () => ({
|
||||
callGateway: vi.fn(async () => ({
|
||||
status: "ok",
|
||||
startedAt: 111,
|
||||
endedAt: 222,
|
||||
})),
|
||||
}));
|
||||
|
||||
vi.mock("../infra/agent-events.js", () => ({
|
||||
onAgentEvent: vi.fn(() => noop),
|
||||
}));
|
||||
|
||||
const announceSpy = vi.fn(async () => true);
|
||||
vi.mock("./subagent-announce.js", () => ({
|
||||
runSubagentAnnounceFlow: (...args: unknown[]) => announceSpy(...args),
|
||||
}));
|
||||
|
||||
describe("subagent registry persistence", () => {
|
||||
const previousStateDir = process.env.CLAWDBOT_STATE_DIR;
|
||||
let tempStateDir: string | null = null;
|
||||
|
||||
afterEach(async () => {
|
||||
announceSpy.mockClear();
|
||||
vi.resetModules();
|
||||
if (tempStateDir) {
|
||||
await fs.rm(tempStateDir, { recursive: true, force: true });
|
||||
tempStateDir = null;
|
||||
}
|
||||
if (previousStateDir === undefined) {
|
||||
delete process.env.CLAWDBOT_STATE_DIR;
|
||||
} else {
|
||||
process.env.CLAWDBOT_STATE_DIR = previousStateDir;
|
||||
}
|
||||
});
|
||||
|
||||
it("persists runs to disk and resumes after restart", async () => {
|
||||
tempStateDir = await fs.mkdtemp(
|
||||
path.join(os.tmpdir(), "clawdbot-subagent-"),
|
||||
);
|
||||
process.env.CLAWDBOT_STATE_DIR = tempStateDir;
|
||||
|
||||
vi.resetModules();
|
||||
const mod1 = await import("./subagent-registry.js");
|
||||
|
||||
mod1.registerSubagentRun({
|
||||
runId: "run-1",
|
||||
childSessionKey: "agent:main:subagent:test",
|
||||
requesterSessionKey: "agent:main:main",
|
||||
requesterDisplayKey: "main",
|
||||
task: "do the thing",
|
||||
cleanup: "keep",
|
||||
});
|
||||
|
||||
const registryPath = path.join(tempStateDir, "subagents", "runs.json");
|
||||
const raw = await fs.readFile(registryPath, "utf8");
|
||||
const parsed = JSON.parse(raw) as { runs?: Record<string, unknown> };
|
||||
expect(parsed.runs && Object.keys(parsed.runs)).toContain("run-1");
|
||||
|
||||
// Simulate a process restart: module re-import should load persisted runs
|
||||
// and trigger the announce flow once the run resolves.
|
||||
vi.resetModules();
|
||||
const mod2 = await import("./subagent-registry.js");
|
||||
mod2.initSubagentRegistry();
|
||||
|
||||
// allow queued async wait/announce to execute
|
||||
await new Promise((r) => setTimeout(r, 0));
|
||||
|
||||
expect(announceSpy).toHaveBeenCalled();
|
||||
|
||||
type AnnounceParams = {
|
||||
childRunId: string;
|
||||
childSessionKey: string;
|
||||
};
|
||||
const first = announceSpy.mock.calls[0]?.[0] as unknown as AnnounceParams;
|
||||
expect(first.childRunId).toBe("run-1");
|
||||
expect(first.childSessionKey).toBe("agent:main:subagent:test");
|
||||
});
|
||||
|
||||
it("retries announce even when announceHandled was persisted", async () => {
|
||||
tempStateDir = await fs.mkdtemp(
|
||||
path.join(os.tmpdir(), "clawdbot-subagent-"),
|
||||
);
|
||||
process.env.CLAWDBOT_STATE_DIR = tempStateDir;
|
||||
|
||||
const registryPath = path.join(tempStateDir, "subagents", "runs.json");
|
||||
const persisted = {
|
||||
version: 1,
|
||||
runs: {
|
||||
"run-2": {
|
||||
runId: "run-2",
|
||||
childSessionKey: "agent:main:subagent:two",
|
||||
requesterSessionKey: "agent:main:main",
|
||||
requesterDisplayKey: "main",
|
||||
task: "do the other thing",
|
||||
cleanup: "keep",
|
||||
createdAt: 1,
|
||||
startedAt: 1,
|
||||
endedAt: 2,
|
||||
announceHandled: true,
|
||||
},
|
||||
},
|
||||
};
|
||||
await fs.mkdir(path.dirname(registryPath), { recursive: true });
|
||||
await fs.writeFile(registryPath, `${JSON.stringify(persisted)}\n`, "utf8");
|
||||
|
||||
vi.resetModules();
|
||||
const mod = await import("./subagent-registry.js");
|
||||
mod.initSubagentRegistry();
|
||||
|
||||
await new Promise((r) => setTimeout(r, 0));
|
||||
|
||||
const calls = announceSpy.mock.calls.map((call) => call[0]);
|
||||
const match = calls.find(
|
||||
(params) => (params as { childRunId?: string }).childRunId === "run-2",
|
||||
);
|
||||
expect(match).toBeTruthy();
|
||||
});
|
||||
});
|
||||
62
src/agents/subagent-registry.store.ts
Normal file
62
src/agents/subagent-registry.store.ts
Normal file
@@ -0,0 +1,62 @@
|
||||
import path from "node:path";
|
||||
|
||||
import { STATE_DIR_CLAWDBOT } from "../config/paths.js";
|
||||
import { loadJsonFile, saveJsonFile } from "../infra/json-file.js";
|
||||
import type { SubagentRunRecord } from "./subagent-registry.js";
|
||||
|
||||
export type PersistedSubagentRegistryVersion = 1;
|
||||
|
||||
type PersistedSubagentRegistry = {
|
||||
version: 1;
|
||||
runs: Record<string, PersistedSubagentRunRecord>;
|
||||
};
|
||||
|
||||
const REGISTRY_VERSION = 1 as const;
|
||||
|
||||
type PersistedSubagentRunRecord = Omit<SubagentRunRecord, "announceHandled">;
|
||||
|
||||
export function resolveSubagentRegistryPath(): string {
|
||||
return path.join(STATE_DIR_CLAWDBOT, "subagents", "runs.json");
|
||||
}
|
||||
|
||||
export function loadSubagentRegistryFromDisk(): Map<string, SubagentRunRecord> {
|
||||
const pathname = resolveSubagentRegistryPath();
|
||||
const raw = loadJsonFile(pathname);
|
||||
if (!raw || typeof raw !== "object") return new Map();
|
||||
const record = raw as Partial<PersistedSubagentRegistry>;
|
||||
if (record.version !== REGISTRY_VERSION) return new Map();
|
||||
const runsRaw = record.runs;
|
||||
if (!runsRaw || typeof runsRaw !== "object") return new Map();
|
||||
const out = new Map<string, SubagentRunRecord>();
|
||||
for (const [runId, entry] of Object.entries(runsRaw)) {
|
||||
if (!entry || typeof entry !== "object") continue;
|
||||
const typed = entry as PersistedSubagentRunRecord;
|
||||
if (!typed.runId || typeof typed.runId !== "string") continue;
|
||||
const announceCompletedAt =
|
||||
typeof typed.announceCompletedAt === "number"
|
||||
? typed.announceCompletedAt
|
||||
: undefined;
|
||||
out.set(runId, {
|
||||
...typed,
|
||||
announceCompletedAt,
|
||||
announceHandled: Boolean(announceCompletedAt),
|
||||
});
|
||||
}
|
||||
return out;
|
||||
}
|
||||
|
||||
export function saveSubagentRegistryToDisk(
|
||||
runs: Map<string, SubagentRunRecord>,
|
||||
) {
|
||||
const pathname = resolveSubagentRegistryPath();
|
||||
const serialized: Record<string, PersistedSubagentRunRecord> = {};
|
||||
for (const [runId, entry] of runs.entries()) {
|
||||
const { announceHandled: _ignored, ...persisted } = entry;
|
||||
serialized[runId] = persisted;
|
||||
}
|
||||
const out: PersistedSubagentRegistry = {
|
||||
version: REGISTRY_VERSION,
|
||||
runs: serialized,
|
||||
};
|
||||
saveJsonFile(pathname, out);
|
||||
}
|
||||
@@ -2,6 +2,10 @@ import { loadConfig } from "../config/config.js";
|
||||
import { callGateway } from "../gateway/call.js";
|
||||
import { onAgentEvent } from "../infra/agent-events.js";
|
||||
import { runSubagentAnnounceFlow } from "./subagent-announce.js";
|
||||
import {
|
||||
loadSubagentRegistryFromDisk,
|
||||
saveSubagentRegistryToDisk,
|
||||
} from "./subagent-registry.store.js";
|
||||
import { resolveAgentTimeoutMs } from "./timeout.js";
|
||||
|
||||
export type SubagentRunRecord = {
|
||||
@@ -17,12 +21,88 @@ export type SubagentRunRecord = {
|
||||
startedAt?: number;
|
||||
endedAt?: number;
|
||||
archiveAtMs?: number;
|
||||
announceCompletedAt?: number;
|
||||
announceHandled: boolean;
|
||||
};
|
||||
|
||||
const subagentRuns = new Map<string, SubagentRunRecord>();
|
||||
let sweeper: NodeJS.Timeout | null = null;
|
||||
let listenerStarted = false;
|
||||
let listenerStop: (() => void) | null = null;
|
||||
let restoreAttempted = false;
|
||||
|
||||
function persistSubagentRuns() {
|
||||
try {
|
||||
saveSubagentRegistryToDisk(subagentRuns);
|
||||
} catch {
|
||||
// ignore persistence failures
|
||||
}
|
||||
}
|
||||
|
||||
const resumedRuns = new Set<string>();
|
||||
|
||||
function resumeSubagentRun(runId: string) {
|
||||
if (!runId || resumedRuns.has(runId)) return;
|
||||
const entry = subagentRuns.get(runId);
|
||||
if (!entry) return;
|
||||
if (entry.announceCompletedAt) return;
|
||||
|
||||
if (typeof entry.endedAt === "number" && entry.endedAt > 0) {
|
||||
if (!beginSubagentAnnounce(runId)) return;
|
||||
const announce = runSubagentAnnounceFlow({
|
||||
childSessionKey: entry.childSessionKey,
|
||||
childRunId: entry.runId,
|
||||
requesterSessionKey: entry.requesterSessionKey,
|
||||
requesterChannel: entry.requesterChannel,
|
||||
requesterDisplayKey: entry.requesterDisplayKey,
|
||||
task: entry.task,
|
||||
timeoutMs: 30_000,
|
||||
cleanup: entry.cleanup,
|
||||
waitForCompletion: false,
|
||||
startedAt: entry.startedAt,
|
||||
endedAt: entry.endedAt,
|
||||
label: entry.label,
|
||||
});
|
||||
void announce.then((didAnnounce) => {
|
||||
finalizeSubagentAnnounce(runId, entry.cleanup, didAnnounce);
|
||||
});
|
||||
resumedRuns.add(runId);
|
||||
return;
|
||||
}
|
||||
|
||||
// Wait for completion again after restart.
|
||||
const cfg = loadConfig();
|
||||
const waitTimeoutMs = resolveSubagentWaitTimeoutMs(cfg, undefined);
|
||||
void waitForSubagentCompletion(runId, waitTimeoutMs);
|
||||
resumedRuns.add(runId);
|
||||
}
|
||||
|
||||
function restoreSubagentRunsOnce() {
|
||||
if (restoreAttempted) return;
|
||||
restoreAttempted = true;
|
||||
try {
|
||||
const restored = loadSubagentRegistryFromDisk();
|
||||
if (restored.size === 0) return;
|
||||
for (const [runId, entry] of restored.entries()) {
|
||||
if (!runId || !entry) continue;
|
||||
// Keep any newer in-memory entries.
|
||||
if (!subagentRuns.has(runId)) {
|
||||
subagentRuns.set(runId, entry);
|
||||
}
|
||||
}
|
||||
|
||||
// Resume pending work.
|
||||
ensureListener();
|
||||
if ([...subagentRuns.values()].some((entry) => entry.archiveAtMs)) {
|
||||
startSweeper();
|
||||
}
|
||||
for (const runId of subagentRuns.keys()) {
|
||||
resumeSubagentRun(runId);
|
||||
}
|
||||
} catch {
|
||||
// ignore restore failures
|
||||
}
|
||||
}
|
||||
|
||||
function resolveArchiveAfterMs(cfg?: ReturnType<typeof loadConfig>) {
|
||||
const config = cfg ?? loadConfig();
|
||||
@@ -54,9 +134,11 @@ function stopSweeper() {
|
||||
|
||||
async function sweepSubagentRuns() {
|
||||
const now = Date.now();
|
||||
let mutated = false;
|
||||
for (const [runId, entry] of subagentRuns.entries()) {
|
||||
if (!entry.archiveAtMs || entry.archiveAtMs > now) continue;
|
||||
subagentRuns.delete(runId);
|
||||
mutated = true;
|
||||
try {
|
||||
await callGateway({
|
||||
method: "sessions.delete",
|
||||
@@ -67,13 +149,14 @@ async function sweepSubagentRuns() {
|
||||
// ignore
|
||||
}
|
||||
}
|
||||
if (mutated) persistSubagentRuns();
|
||||
if (subagentRuns.size === 0) stopSweeper();
|
||||
}
|
||||
|
||||
function ensureListener() {
|
||||
if (listenerStarted) return;
|
||||
listenerStarted = true;
|
||||
onAgentEvent((evt) => {
|
||||
listenerStop = onAgentEvent((evt) => {
|
||||
if (!evt || evt.stream !== "lifecycle") return;
|
||||
const entry = subagentRuns.get(evt.runId);
|
||||
if (!entry) {
|
||||
@@ -85,7 +168,10 @@ function ensureListener() {
|
||||
typeof evt.data?.startedAt === "number"
|
||||
? (evt.data.startedAt as number)
|
||||
: undefined;
|
||||
if (startedAt) entry.startedAt = startedAt;
|
||||
if (startedAt) {
|
||||
entry.startedAt = startedAt;
|
||||
persistSubagentRuns();
|
||||
}
|
||||
return;
|
||||
}
|
||||
if (phase !== "end" && phase !== "error") return;
|
||||
@@ -94,14 +180,12 @@ function ensureListener() {
|
||||
? (evt.data.endedAt as number)
|
||||
: Date.now();
|
||||
entry.endedAt = endedAt;
|
||||
persistSubagentRuns();
|
||||
|
||||
if (!beginSubagentAnnounce(evt.runId)) {
|
||||
if (entry.cleanup === "delete") {
|
||||
subagentRuns.delete(evt.runId);
|
||||
}
|
||||
return;
|
||||
}
|
||||
void runSubagentAnnounceFlow({
|
||||
const announce = runSubagentAnnounceFlow({
|
||||
childSessionKey: entry.childSessionKey,
|
||||
childRunId: entry.runId,
|
||||
requesterSessionKey: entry.requesterSessionKey,
|
||||
@@ -115,17 +199,36 @@ function ensureListener() {
|
||||
endedAt: entry.endedAt,
|
||||
label: entry.label,
|
||||
});
|
||||
if (entry.cleanup === "delete") {
|
||||
subagentRuns.delete(evt.runId);
|
||||
}
|
||||
void announce.then((didAnnounce) => {
|
||||
finalizeSubagentAnnounce(evt.runId, entry.cleanup, didAnnounce);
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
function finalizeSubagentAnnounce(
|
||||
runId: string,
|
||||
cleanup: "delete" | "keep",
|
||||
didAnnounce: boolean,
|
||||
) {
|
||||
const entry = subagentRuns.get(runId);
|
||||
if (!entry) return;
|
||||
if (cleanup === "delete") {
|
||||
subagentRuns.delete(runId);
|
||||
persistSubagentRuns();
|
||||
return;
|
||||
}
|
||||
if (!didAnnounce) return;
|
||||
entry.announceCompletedAt = Date.now();
|
||||
persistSubagentRuns();
|
||||
}
|
||||
|
||||
export function beginSubagentAnnounce(runId: string) {
|
||||
const entry = subagentRuns.get(runId);
|
||||
if (!entry) return false;
|
||||
if (entry.announceCompletedAt) return false;
|
||||
if (entry.announceHandled) return false;
|
||||
entry.announceHandled = true;
|
||||
persistSubagentRuns();
|
||||
return true;
|
||||
}
|
||||
|
||||
@@ -163,6 +266,7 @@ export function registerSubagentRun(params: {
|
||||
announceHandled: false,
|
||||
});
|
||||
ensureListener();
|
||||
persistSubagentRuns();
|
||||
if (archiveAfterMs) startSweeper();
|
||||
// Wait for subagent completion via gateway RPC (cross-process).
|
||||
// The in-process lifecycle listener is a fallback for embedded runs.
|
||||
@@ -183,11 +287,22 @@ async function waitForSubagentCompletion(runId: string, waitTimeoutMs: number) {
|
||||
if (wait?.status !== "ok" && wait?.status !== "error") return;
|
||||
const entry = subagentRuns.get(runId);
|
||||
if (!entry) return;
|
||||
if (typeof wait.startedAt === "number") entry.startedAt = wait.startedAt;
|
||||
if (typeof wait.endedAt === "number") entry.endedAt = wait.endedAt;
|
||||
if (!entry.endedAt) entry.endedAt = Date.now();
|
||||
let mutated = false;
|
||||
if (typeof wait.startedAt === "number") {
|
||||
entry.startedAt = wait.startedAt;
|
||||
mutated = true;
|
||||
}
|
||||
if (typeof wait.endedAt === "number") {
|
||||
entry.endedAt = wait.endedAt;
|
||||
mutated = true;
|
||||
}
|
||||
if (!entry.endedAt) {
|
||||
entry.endedAt = Date.now();
|
||||
mutated = true;
|
||||
}
|
||||
if (mutated) persistSubagentRuns();
|
||||
if (!beginSubagentAnnounce(runId)) return;
|
||||
void runSubagentAnnounceFlow({
|
||||
const announce = runSubagentAnnounceFlow({
|
||||
childSessionKey: entry.childSessionKey,
|
||||
childRunId: entry.runId,
|
||||
requesterSessionKey: entry.requesterSessionKey,
|
||||
@@ -201,9 +316,9 @@ async function waitForSubagentCompletion(runId: string, waitTimeoutMs: number) {
|
||||
endedAt: entry.endedAt,
|
||||
label: entry.label,
|
||||
});
|
||||
if (entry.cleanup === "delete") {
|
||||
subagentRuns.delete(runId);
|
||||
}
|
||||
void announce.then((didAnnounce) => {
|
||||
finalizeSubagentAnnounce(runId, entry.cleanup, didAnnounce);
|
||||
});
|
||||
} catch {
|
||||
// ignore
|
||||
}
|
||||
@@ -211,10 +326,23 @@ async function waitForSubagentCompletion(runId: string, waitTimeoutMs: number) {
|
||||
|
||||
export function resetSubagentRegistryForTests() {
|
||||
subagentRuns.clear();
|
||||
resumedRuns.clear();
|
||||
stopSweeper();
|
||||
restoreAttempted = false;
|
||||
if (listenerStop) {
|
||||
listenerStop();
|
||||
listenerStop = null;
|
||||
}
|
||||
listenerStarted = false;
|
||||
persistSubagentRuns();
|
||||
}
|
||||
|
||||
export function releaseSubagentRun(runId: string) {
|
||||
subagentRuns.delete(runId);
|
||||
const didDelete = subagentRuns.delete(runId);
|
||||
if (didDelete) persistSubagentRuns();
|
||||
if (subagentRuns.size === 0) stopSweeper();
|
||||
}
|
||||
|
||||
export function initSubagentRegistry() {
|
||||
restoreSubagentRunsOnce();
|
||||
}
|
||||
|
||||
@@ -19,6 +19,7 @@ import {
|
||||
resolveConfiguredModelRef,
|
||||
resolveHooksGmailModel,
|
||||
} from "../agents/model-selection.js";
|
||||
import { initSubagentRegistry } from "../agents/subagent-registry.js";
|
||||
import { resolveAnnounceTargetFromKey } from "../agents/tools/sessions-send-helpers.js";
|
||||
import { CANVAS_HOST_PATH } from "../canvas-host/a2ui.js";
|
||||
import {
|
||||
@@ -460,6 +461,7 @@ export async function startGatewayServer(
|
||||
}
|
||||
|
||||
const cfgAtStart = loadConfig();
|
||||
initSubagentRegistry();
|
||||
await autoMigrateLegacyState({ cfg: cfgAtStart, log });
|
||||
const defaultAgentId = resolveDefaultAgentId(cfgAtStart);
|
||||
const defaultWorkspaceDir = resolveAgentWorkspaceDir(
|
||||
|
||||
Reference in New Issue
Block a user