fix: resume subagent registry safely (#831) (thanks @roshanasingh4)

This commit is contained in:
Peter Steinberger
2026-01-13 10:10:15 +00:00
parent 714de9d996
commit b071f73fef
6 changed files with 115 additions and 35 deletions

View File

@@ -4,6 +4,7 @@
### Fixes ### Fixes
- Packaging: include `dist/memory/**` in the npm tarball (fixes `ERR_MODULE_NOT_FOUND` for `dist/memory/index.js`). - Packaging: include `dist/memory/**` in the npm tarball (fixes `ERR_MODULE_NOT_FOUND` for `dist/memory/index.js`).
- Agents: persist sub-agent registry across gateway restarts and resume announce flow safely. (#831) — thanks @roshanasingh4.
## 2026.1.12-1 ## 2026.1.12-1

View File

@@ -236,7 +236,8 @@ export async function runSubagentAnnounceFlow(params: {
startedAt?: number; startedAt?: number;
endedAt?: number; endedAt?: number;
label?: string; label?: string;
}) { }): Promise<boolean> {
let didAnnounce = false;
try { try {
let reply = params.roundOneReply; let reply = params.roundOneReply;
if (!reply && params.waitForCompletion !== false) { if (!reply && params.waitForCompletion !== false) {
@@ -249,7 +250,7 @@ export async function runSubagentAnnounceFlow(params: {
}, },
timeoutMs: waitMs + 2000, timeoutMs: waitMs + 2000,
})) as { status?: string }; })) as { status?: string };
if (wait?.status !== "ok") return; if (wait?.status !== "ok") return false;
reply = await readLatestAssistantReply({ reply = await readLatestAssistantReply({
sessionKey: params.childSessionKey, sessionKey: params.childSessionKey,
}); });
@@ -265,7 +266,7 @@ export async function runSubagentAnnounceFlow(params: {
sessionKey: params.requesterSessionKey, sessionKey: params.requesterSessionKey,
displayKey: params.requesterDisplayKey, displayKey: params.requesterDisplayKey,
}); });
if (!announceTarget) return; if (!announceTarget) return false;
const announcePrompt = buildSubagentAnnouncePrompt({ const announcePrompt = buildSubagentAnnouncePrompt({
requesterSessionKey: params.requesterSessionKey, requesterSessionKey: params.requesterSessionKey,
@@ -289,7 +290,7 @@ export async function runSubagentAnnounceFlow(params: {
!announceReply.trim() || !announceReply.trim() ||
isAnnounceSkip(announceReply) isAnnounceSkip(announceReply)
) )
return; return false;
const statsLine = await buildSubagentStatsLine({ const statsLine = await buildSubagentStatsLine({
sessionKey: params.childSessionKey, sessionKey: params.childSessionKey,
@@ -311,6 +312,7 @@ export async function runSubagentAnnounceFlow(params: {
}, },
timeoutMs: 10_000, timeoutMs: 10_000,
}); });
didAnnounce = true;
} catch { } catch {
// Best-effort follow-ups; ignore failures to avoid breaking the caller response. // Best-effort follow-ups; ignore failures to avoid breaking the caller response.
} finally { } finally {
@@ -338,4 +340,5 @@ export async function runSubagentAnnounceFlow(params: {
} }
} }
} }
return didAnnounce;
} }

View File

@@ -18,7 +18,7 @@ vi.mock("../infra/agent-events.js", () => ({
onAgentEvent: vi.fn(() => noop), onAgentEvent: vi.fn(() => noop),
})); }));
const announceSpy = vi.fn(async () => {}); const announceSpy = vi.fn(async () => true);
vi.mock("./subagent-announce.js", () => ({ vi.mock("./subagent-announce.js", () => ({
runSubagentAnnounceFlow: (...args: unknown[]) => announceSpy(...args), runSubagentAnnounceFlow: (...args: unknown[]) => announceSpy(...args),
})); }));
@@ -67,7 +67,8 @@ describe("subagent registry persistence", () => {
// Simulate a process restart: module re-import should load persisted runs // Simulate a process restart: module re-import should load persisted runs
// and trigger the announce flow once the run resolves. // and trigger the announce flow once the run resolves.
vi.resetModules(); vi.resetModules();
await import("./subagent-registry.js"); const mod2 = await import("./subagent-registry.js");
mod2.initSubagentRegistry();
// allow queued async wait/announce to execute // allow queued async wait/announce to execute
await new Promise((r) => setTimeout(r, 0)); await new Promise((r) => setTimeout(r, 0));
@@ -82,4 +83,44 @@ describe("subagent registry persistence", () => {
expect(first.childRunId).toBe("run-1"); expect(first.childRunId).toBe("run-1");
expect(first.childSessionKey).toBe("agent:main:subagent:test"); expect(first.childSessionKey).toBe("agent:main:subagent:test");
}); });
it("retries announce even when announceHandled was persisted", async () => {
tempStateDir = await fs.mkdtemp(
path.join(os.tmpdir(), "clawdbot-subagent-"),
);
process.env.CLAWDBOT_STATE_DIR = tempStateDir;
const registryPath = path.join(tempStateDir, "subagents", "runs.json");
const persisted = {
version: 1,
runs: {
"run-2": {
runId: "run-2",
childSessionKey: "agent:main:subagent:two",
requesterSessionKey: "agent:main:main",
requesterDisplayKey: "main",
task: "do the other thing",
cleanup: "keep",
createdAt: 1,
startedAt: 1,
endedAt: 2,
announceHandled: true,
},
},
};
await fs.mkdir(path.dirname(registryPath), { recursive: true });
await fs.writeFile(registryPath, `${JSON.stringify(persisted)}\n`, "utf8");
vi.resetModules();
const mod = await import("./subagent-registry.js");
mod.initSubagentRegistry();
await new Promise((r) => setTimeout(r, 0));
const calls = announceSpy.mock.calls.map((call) => call[0]);
const match = calls.find(
(params) => (params as { childRunId?: string }).childRunId === "run-2",
);
expect(match).toBeTruthy();
});
}); });

View File

@@ -8,11 +8,13 @@ export type PersistedSubagentRegistryVersion = 1;
type PersistedSubagentRegistry = { type PersistedSubagentRegistry = {
version: 1; version: 1;
runs: Record<string, SubagentRunRecord>; runs: Record<string, PersistedSubagentRunRecord>;
}; };
const REGISTRY_VERSION = 1 as const; const REGISTRY_VERSION = 1 as const;
type PersistedSubagentRunRecord = Omit<SubagentRunRecord, "announceHandled">;
export function resolveSubagentRegistryPath(): string { export function resolveSubagentRegistryPath(): string {
return path.join(STATE_DIR_CLAWDBOT, "subagents", "runs.json"); return path.join(STATE_DIR_CLAWDBOT, "subagents", "runs.json");
} }
@@ -28,9 +30,17 @@ export function loadSubagentRegistryFromDisk(): Map<string, SubagentRunRecord> {
const out = new Map<string, SubagentRunRecord>(); const out = new Map<string, SubagentRunRecord>();
for (const [runId, entry] of Object.entries(runsRaw)) { for (const [runId, entry] of Object.entries(runsRaw)) {
if (!entry || typeof entry !== "object") continue; if (!entry || typeof entry !== "object") continue;
const typed = entry as SubagentRunRecord; const typed = entry as PersistedSubagentRunRecord;
if (!typed.runId || typeof typed.runId !== "string") continue; if (!typed.runId || typeof typed.runId !== "string") continue;
out.set(runId, typed); const announceCompletedAt =
typeof typed.announceCompletedAt === "number"
? typed.announceCompletedAt
: undefined;
out.set(runId, {
...typed,
announceCompletedAt,
announceHandled: Boolean(announceCompletedAt),
});
} }
return out; return out;
} }
@@ -39,9 +49,14 @@ export function saveSubagentRegistryToDisk(
runs: Map<string, SubagentRunRecord>, runs: Map<string, SubagentRunRecord>,
) { ) {
const pathname = resolveSubagentRegistryPath(); const pathname = resolveSubagentRegistryPath();
const serialized: Record<string, PersistedSubagentRunRecord> = {};
for (const [runId, entry] of runs.entries()) {
const { announceHandled: _ignored, ...persisted } = entry;
serialized[runId] = persisted;
}
const out: PersistedSubagentRegistry = { const out: PersistedSubagentRegistry = {
version: REGISTRY_VERSION, version: REGISTRY_VERSION,
runs: Object.fromEntries(runs.entries()), runs: serialized,
}; };
saveJsonFile(pathname, out); saveJsonFile(pathname, out);
} }

View File

@@ -21,12 +21,14 @@ export type SubagentRunRecord = {
startedAt?: number; startedAt?: number;
endedAt?: number; endedAt?: number;
archiveAtMs?: number; archiveAtMs?: number;
announceCompletedAt?: number;
announceHandled: boolean; announceHandled: boolean;
}; };
const subagentRuns = new Map<string, SubagentRunRecord>(); const subagentRuns = new Map<string, SubagentRunRecord>();
let sweeper: NodeJS.Timeout | null = null; let sweeper: NodeJS.Timeout | null = null;
let listenerStarted = false; let listenerStarted = false;
let listenerStop: (() => void) | null = null;
let restoreAttempted = false; let restoreAttempted = false;
function persistSubagentRuns() { function persistSubagentRuns() {
@@ -43,15 +45,15 @@ function resumeSubagentRun(runId: string) {
if (!runId || resumedRuns.has(runId)) return; if (!runId || resumedRuns.has(runId)) return;
const entry = subagentRuns.get(runId); const entry = subagentRuns.get(runId);
if (!entry) return; if (!entry) return;
if (entry.announceHandled) return; if (entry.announceCompletedAt) return;
if (typeof entry.endedAt === "number" && entry.endedAt > 0) { if (typeof entry.endedAt === "number" && entry.endedAt > 0) {
if (!beginSubagentAnnounce(runId)) return; if (!beginSubagentAnnounce(runId)) return;
void runSubagentAnnounceFlow({ const announce = runSubagentAnnounceFlow({
childSessionKey: entry.childSessionKey, childSessionKey: entry.childSessionKey,
childRunId: entry.runId, childRunId: entry.runId,
requesterSessionKey: entry.requesterSessionKey, requesterSessionKey: entry.requesterSessionKey,
requesterProvider: entry.requesterProvider, requesterChannel: entry.requesterChannel,
requesterDisplayKey: entry.requesterDisplayKey, requesterDisplayKey: entry.requesterDisplayKey,
task: entry.task, task: entry.task,
timeoutMs: 30_000, timeoutMs: 30_000,
@@ -61,10 +63,9 @@ function resumeSubagentRun(runId: string) {
endedAt: entry.endedAt, endedAt: entry.endedAt,
label: entry.label, label: entry.label,
}); });
if (entry.cleanup === "delete") { void announce.then((didAnnounce) => {
subagentRuns.delete(runId); finalizeSubagentAnnounce(runId, entry.cleanup, didAnnounce);
persistSubagentRuns(); });
}
resumedRuns.add(runId); resumedRuns.add(runId);
return; return;
} }
@@ -155,7 +156,7 @@ async function sweepSubagentRuns() {
function ensureListener() { function ensureListener() {
if (listenerStarted) return; if (listenerStarted) return;
listenerStarted = true; listenerStarted = true;
onAgentEvent((evt) => { listenerStop = onAgentEvent((evt) => {
if (!evt || evt.stream !== "lifecycle") return; if (!evt || evt.stream !== "lifecycle") return;
const entry = subagentRuns.get(evt.runId); const entry = subagentRuns.get(evt.runId);
if (!entry) { if (!entry) {
@@ -182,13 +183,9 @@ function ensureListener() {
persistSubagentRuns(); persistSubagentRuns();
if (!beginSubagentAnnounce(evt.runId)) { if (!beginSubagentAnnounce(evt.runId)) {
if (entry.cleanup === "delete") {
subagentRuns.delete(evt.runId);
persistSubagentRuns();
}
return; return;
} }
void runSubagentAnnounceFlow({ const announce = runSubagentAnnounceFlow({
childSessionKey: entry.childSessionKey, childSessionKey: entry.childSessionKey,
childRunId: entry.runId, childRunId: entry.runId,
requesterSessionKey: entry.requesterSessionKey, requesterSessionKey: entry.requesterSessionKey,
@@ -202,16 +199,33 @@ function ensureListener() {
endedAt: entry.endedAt, endedAt: entry.endedAt,
label: entry.label, label: entry.label,
}); });
if (entry.cleanup === "delete") { void announce.then((didAnnounce) => {
subagentRuns.delete(evt.runId); finalizeSubagentAnnounce(evt.runId, entry.cleanup, didAnnounce);
persistSubagentRuns(); });
}
}); });
} }
function finalizeSubagentAnnounce(
runId: string,
cleanup: "delete" | "keep",
didAnnounce: boolean,
) {
const entry = subagentRuns.get(runId);
if (!entry) return;
if (cleanup === "delete") {
subagentRuns.delete(runId);
persistSubagentRuns();
return;
}
if (!didAnnounce) return;
entry.announceCompletedAt = Date.now();
persistSubagentRuns();
}
export function beginSubagentAnnounce(runId: string) { export function beginSubagentAnnounce(runId: string) {
const entry = subagentRuns.get(runId); const entry = subagentRuns.get(runId);
if (!entry) return false; if (!entry) return false;
if (entry.announceCompletedAt) return false;
if (entry.announceHandled) return false; if (entry.announceHandled) return false;
entry.announceHandled = true; entry.announceHandled = true;
persistSubagentRuns(); persistSubagentRuns();
@@ -288,7 +302,7 @@ async function waitForSubagentCompletion(runId: string, waitTimeoutMs: number) {
} }
if (mutated) persistSubagentRuns(); if (mutated) persistSubagentRuns();
if (!beginSubagentAnnounce(runId)) return; if (!beginSubagentAnnounce(runId)) return;
void runSubagentAnnounceFlow({ const announce = runSubagentAnnounceFlow({
childSessionKey: entry.childSessionKey, childSessionKey: entry.childSessionKey,
childRunId: entry.runId, childRunId: entry.runId,
requesterSessionKey: entry.requesterSessionKey, requesterSessionKey: entry.requesterSessionKey,
@@ -302,10 +316,9 @@ async function waitForSubagentCompletion(runId: string, waitTimeoutMs: number) {
endedAt: entry.endedAt, endedAt: entry.endedAt,
label: entry.label, label: entry.label,
}); });
if (entry.cleanup === "delete") { void announce.then((didAnnounce) => {
subagentRuns.delete(runId); finalizeSubagentAnnounce(runId, entry.cleanup, didAnnounce);
persistSubagentRuns(); });
}
} catch { } catch {
// ignore // ignore
} }
@@ -313,8 +326,13 @@ async function waitForSubagentCompletion(runId: string, waitTimeoutMs: number) {
export function resetSubagentRegistryForTests() { export function resetSubagentRegistryForTests() {
subagentRuns.clear(); subagentRuns.clear();
resumedRuns.clear();
stopSweeper(); stopSweeper();
restoreAttempted = false; restoreAttempted = false;
if (listenerStop) {
listenerStop();
listenerStop = null;
}
listenerStarted = false; listenerStarted = false;
persistSubagentRuns(); persistSubagentRuns();
} }
@@ -325,6 +343,6 @@ export function releaseSubagentRun(runId: string) {
if (subagentRuns.size === 0) stopSweeper(); if (subagentRuns.size === 0) stopSweeper();
} }
// Best-effort: restore persisted runs on process start so announces/cleanup can export function initSubagentRegistry() {
// continue after gateway restarts. restoreSubagentRunsOnce();
restoreSubagentRunsOnce(); }

View File

@@ -19,6 +19,7 @@ import {
resolveConfiguredModelRef, resolveConfiguredModelRef,
resolveHooksGmailModel, resolveHooksGmailModel,
} from "../agents/model-selection.js"; } from "../agents/model-selection.js";
import { initSubagentRegistry } from "../agents/subagent-registry.js";
import { resolveAnnounceTargetFromKey } from "../agents/tools/sessions-send-helpers.js"; import { resolveAnnounceTargetFromKey } from "../agents/tools/sessions-send-helpers.js";
import { CANVAS_HOST_PATH } from "../canvas-host/a2ui.js"; import { CANVAS_HOST_PATH } from "../canvas-host/a2ui.js";
import { import {
@@ -460,6 +461,7 @@ export async function startGatewayServer(
} }
const cfgAtStart = loadConfig(); const cfgAtStart = loadConfig();
initSubagentRegistry();
await autoMigrateLegacyState({ cfg: cfgAtStart, log }); await autoMigrateLegacyState({ cfg: cfgAtStart, log });
const defaultAgentId = resolveDefaultAgentId(cfgAtStart); const defaultAgentId = resolveDefaultAgentId(cfgAtStart);
const defaultWorkspaceDir = resolveAgentWorkspaceDir( const defaultWorkspaceDir = resolveAgentWorkspaceDir(