fix(subagents): align wait timeout with run timeout

This commit is contained in:
Peter Steinberger
2026-01-12 01:58:24 +00:00
parent b518fb29c6
commit 2941a7002d
3 changed files with 158 additions and 35 deletions

View File

@@ -46,6 +46,7 @@ describe("subagents", () => {
let deletedKey: string | undefined; let deletedKey: string | undefined;
let childRunId: string | undefined; let childRunId: string | undefined;
let childSessionKey: string | undefined; let childSessionKey: string | undefined;
const waitCalls: Array<{ runId?: string; timeoutMs?: number }> = [];
const sessionLastAssistantText = new Map<string, string>(); const sessionLastAssistantText = new Map<string, string>();
callGatewayMock.mockImplementation(async (opts: unknown) => { callGatewayMock.mockImplementation(async (opts: unknown) => {
@@ -81,20 +82,9 @@ describe("subagents", () => {
const params = request.params as const params = request.params as
| { runId?: string; timeoutMs?: number } | { runId?: string; timeoutMs?: number }
| undefined; | undefined;
if ( waitCalls.push(params ?? {});
params?.runId && const status = params?.runId === childRunId ? "timeout" : "ok";
params.runId === childRunId && return { runId: params?.runId ?? "run-1", status };
typeof params.timeoutMs === "number" &&
params.timeoutMs > 0
) {
throw new Error(
"sessions_spawn must not wait for sub-agent completion",
);
}
if (params?.timeoutMs === 0) {
return { runId: params?.runId ?? "run-1", status: "timeout" };
}
return { runId: params?.runId ?? "run-1", status: "ok" };
} }
if (request.method === "chat.history") { if (request.method === "chat.history") {
const params = request.params as { sessionKey?: string } | undefined; const params = request.params as { sessionKey?: string } | undefined;
@@ -154,6 +144,8 @@ describe("subagents", () => {
await new Promise((resolve) => setTimeout(resolve, 0)); await new Promise((resolve) => setTimeout(resolve, 0));
await new Promise((resolve) => setTimeout(resolve, 0)); await new Promise((resolve) => setTimeout(resolve, 0));
const childWait = waitCalls.find((call) => call.runId === childRunId);
expect(childWait?.timeoutMs).toBe(1000);
const agentCalls = calls.filter((call) => call.method === "agent"); const agentCalls = calls.filter((call) => call.method === "agent");
expect(agentCalls).toHaveLength(2); expect(agentCalls).toHaveLength(2);
const first = agentCalls[0]?.params as const first = agentCalls[0]?.params as
@@ -183,6 +175,126 @@ describe("subagents", () => {
expect(deletedKey?.startsWith("agent:main:subagent:")).toBe(true); expect(deletedKey?.startsWith("agent:main:subagent:")).toBe(true);
}); });
it("sessions_spawn announces via agent.wait when lifecycle events are missing", async () => {
resetSubagentRegistryForTests();
callGatewayMock.mockReset();
const calls: Array<{ method?: string; params?: unknown }> = [];
let agentCallCount = 0;
let sendParams: { to?: string; provider?: string; message?: string } = {};
let deletedKey: string | undefined;
let childRunId: string | undefined;
let childSessionKey: string | undefined;
const waitCalls: Array<{ runId?: string; timeoutMs?: number }> = [];
const sessionLastAssistantText = new Map<string, string>();
callGatewayMock.mockImplementation(async (opts: unknown) => {
const request = opts as { method?: string; params?: unknown };
calls.push(request);
if (request.method === "agent") {
agentCallCount += 1;
const runId = `run-${agentCallCount}`;
const params = request.params as {
message?: string;
sessionKey?: string;
provider?: string;
timeout?: number;
};
const message = params?.message ?? "";
const sessionKey = params?.sessionKey ?? "";
if (message === "Sub-agent announce step.") {
sessionLastAssistantText.set(sessionKey, "announce now");
} else {
childRunId = runId;
childSessionKey = sessionKey;
sessionLastAssistantText.set(sessionKey, "result");
expect(params?.provider).toBe("discord");
expect(params?.timeout).toBe(1);
}
return {
runId,
status: "accepted",
acceptedAt: 2000 + agentCallCount,
};
}
if (request.method === "agent.wait") {
const params = request.params as
| { runId?: string; timeoutMs?: number }
| undefined;
waitCalls.push(params ?? {});
return {
runId: params?.runId ?? "run-1",
status: "ok",
startedAt: 3000,
endedAt: 4000,
};
}
if (request.method === "chat.history") {
const params = request.params as { sessionKey?: string } | undefined;
const text =
sessionLastAssistantText.get(params?.sessionKey ?? "") ?? "";
return {
messages: [{ role: "assistant", content: [{ type: "text", text }] }],
};
}
if (request.method === "send") {
const params = request.params as
| { to?: string; provider?: string; message?: string }
| undefined;
sendParams = {
to: params?.to,
provider: params?.provider,
message: params?.message,
};
return { messageId: "m-announce" };
}
if (request.method === "sessions.delete") {
const params = request.params as { key?: string } | undefined;
deletedKey = params?.key;
return { ok: true };
}
return {};
});
const tool = createClawdbotTools({
agentSessionKey: "discord:group:req",
agentProvider: "discord",
}).find((candidate) => candidate.name === "sessions_spawn");
if (!tool) throw new Error("missing sessions_spawn tool");
const result = await tool.execute("call1b", {
task: "do thing",
runTimeoutSeconds: 1,
cleanup: "delete",
});
expect(result.details).toMatchObject({
status: "accepted",
runId: "run-1",
});
await new Promise((resolve) => setTimeout(resolve, 0));
await new Promise((resolve) => setTimeout(resolve, 0));
await new Promise((resolve) => setTimeout(resolve, 0));
const childWait = waitCalls.find((call) => call.runId === childRunId);
expect(childWait?.timeoutMs).toBe(1000);
expect(childSessionKey?.startsWith("agent:main:subagent:")).toBe(true);
const agentCalls = calls.filter((call) => call.method === "agent");
expect(agentCalls).toHaveLength(2);
const second = agentCalls[1]?.params as
| { provider?: string; deliver?: boolean; lane?: string }
| undefined;
expect(second?.lane).toBe("nested");
expect(second?.deliver).toBe(false);
expect(second?.provider).toBe("webchat");
expect(sendParams.provider).toBe("discord");
expect(sendParams.to).toBe("channel:req");
expect(sendParams.message ?? "").toContain("announce now");
expect(sendParams.message ?? "").toContain("Stats:");
expect(deletedKey?.startsWith("agent:main:subagent:")).toBe(true);
});
it("sessions_spawn resolves main announce target from sessions.list", async () => { it("sessions_spawn resolves main announce target from sessions.list", async () => {
resetSubagentRegistryForTests(); resetSubagentRegistryForTests();
callGatewayMock.mockReset(); callGatewayMock.mockReset();
@@ -191,6 +303,7 @@ describe("subagents", () => {
let sendParams: { to?: string; provider?: string; message?: string } = {}; let sendParams: { to?: string; provider?: string; message?: string } = {};
let childRunId: string | undefined; let childRunId: string | undefined;
let childSessionKey: string | undefined; let childSessionKey: string | undefined;
const waitCalls: Array<{ runId?: string; timeoutMs?: number }> = [];
const sessionLastAssistantText = new Map<string, string>(); const sessionLastAssistantText = new Map<string, string>();
callGatewayMock.mockImplementation(async (opts: unknown) => { callGatewayMock.mockImplementation(async (opts: unknown) => {
@@ -233,10 +346,9 @@ describe("subagents", () => {
const params = request.params as const params = request.params as
| { runId?: string; timeoutMs?: number } | { runId?: string; timeoutMs?: number }
| undefined; | undefined;
if (params?.timeoutMs === 0) { waitCalls.push(params ?? {});
return { runId: params?.runId ?? "run-1", status: "timeout" }; const status = params?.runId === childRunId ? "timeout" : "ok";
} return { runId: params?.runId ?? "run-1", status };
return { runId: params?.runId ?? "run-1", status: "ok" };
} }
if (request.method === "chat.history") { if (request.method === "chat.history") {
const params = request.params as { sessionKey?: string } | undefined; const params = request.params as { sessionKey?: string } | undefined;
@@ -293,6 +405,8 @@ describe("subagents", () => {
await new Promise((resolve) => setTimeout(resolve, 0)); await new Promise((resolve) => setTimeout(resolve, 0));
await new Promise((resolve) => setTimeout(resolve, 0)); await new Promise((resolve) => setTimeout(resolve, 0));
const childWait = waitCalls.find((call) => call.runId === childRunId);
expect(childWait?.timeoutMs).toBe(1000);
expect(sendParams.provider).toBe("whatsapp"); expect(sendParams.provider).toBe("whatsapp");
expect(sendParams.to).toBe("+123"); expect(sendParams.to).toBe("+123");
expect(sendParams.message ?? "").toContain("hello from sub"); expect(sendParams.message ?? "").toContain("hello from sub");
@@ -534,9 +648,7 @@ describe("subagents", () => {
}; };
} }
if (request.method === "agent.wait") { if (request.method === "agent.wait") {
const params = request.params as { timeoutMs?: number } | undefined; return { status: "timeout" };
if (params?.timeoutMs === 0) return { status: "timeout" };
return { status: "ok" };
} }
if (request.method === "sessions.delete") { if (request.method === "sessions.delete") {
return { ok: true }; return { ok: true };
@@ -597,9 +709,7 @@ describe("subagents", () => {
}; };
} }
if (request.method === "agent.wait") { if (request.method === "agent.wait") {
const params = request.params as { timeoutMs?: number } | undefined; return { status: "timeout" };
if (params?.timeoutMs === 0) return { status: "timeout" };
return { status: "ok" };
} }
if (request.method === "sessions.delete") { if (request.method === "sessions.delete") {
return { ok: true }; return { ok: true };

View File

@@ -2,6 +2,7 @@ import { loadConfig } from "../config/config.js";
import { callGateway } from "../gateway/call.js"; import { callGateway } from "../gateway/call.js";
import { onAgentEvent } from "../infra/agent-events.js"; import { onAgentEvent } from "../infra/agent-events.js";
import { runSubagentAnnounceFlow } from "./subagent-announce.js"; import { runSubagentAnnounceFlow } from "./subagent-announce.js";
import { resolveAgentTimeoutMs } from "./timeout.js";
export type SubagentRunRecord = { export type SubagentRunRecord = {
runId: string; runId: string;
@@ -23,13 +24,20 @@ const subagentRuns = new Map<string, SubagentRunRecord>();
let sweeper: NodeJS.Timeout | null = null; let sweeper: NodeJS.Timeout | null = null;
let listenerStarted = false; let listenerStarted = false;
function resolveArchiveAfterMs() { function resolveArchiveAfterMs(cfg?: ReturnType<typeof loadConfig>) {
const cfg = loadConfig(); const config = cfg ?? loadConfig();
const minutes = cfg.agents?.defaults?.subagents?.archiveAfterMinutes ?? 60; const minutes = config.agents?.defaults?.subagents?.archiveAfterMinutes ?? 60;
if (!Number.isFinite(minutes) || minutes <= 0) return undefined; if (!Number.isFinite(minutes) || minutes <= 0) return undefined;
return Math.max(1, Math.floor(minutes)) * 60_000; return Math.max(1, Math.floor(minutes)) * 60_000;
} }
function resolveSubagentWaitTimeoutMs(
cfg: ReturnType<typeof loadConfig>,
runTimeoutSeconds?: number,
) {
return resolveAgentTimeoutMs({ cfg, overrideSeconds: runTimeoutSeconds });
}
function startSweeper() { function startSweeper() {
if (sweeper) return; if (sweeper) return;
sweeper = setInterval(() => { sweeper = setInterval(() => {
@@ -130,10 +138,16 @@ export function registerSubagentRun(params: {
task: string; task: string;
cleanup: "delete" | "keep"; cleanup: "delete" | "keep";
label?: string; label?: string;
runTimeoutSeconds?: number;
}) { }) {
const now = Date.now(); const now = Date.now();
const archiveAfterMs = resolveArchiveAfterMs(); const cfg = loadConfig();
const archiveAfterMs = resolveArchiveAfterMs(cfg);
const archiveAtMs = archiveAfterMs ? now + archiveAfterMs : undefined; const archiveAtMs = archiveAfterMs ? now + archiveAfterMs : undefined;
const waitTimeoutMs = resolveSubagentWaitTimeoutMs(
cfg,
params.runTimeoutSeconds,
);
subagentRuns.set(params.runId, { subagentRuns.set(params.runId, {
runId: params.runId, runId: params.runId,
childSessionKey: params.childSessionKey, childSessionKey: params.childSessionKey,
@@ -152,21 +166,19 @@ export function registerSubagentRun(params: {
if (archiveAfterMs) startSweeper(); if (archiveAfterMs) startSweeper();
// Wait for subagent completion via gateway RPC (cross-process). // Wait for subagent completion via gateway RPC (cross-process).
// The in-process lifecycle listener is a fallback for embedded runs. // The in-process lifecycle listener is a fallback for embedded runs.
void waitForSubagentCompletion(params.runId); void waitForSubagentCompletion(params.runId, waitTimeoutMs);
} }
// Default wait timeout: 10 minutes. This covers most subagent runs. async function waitForSubagentCompletion(runId: string, waitTimeoutMs: number) {
const DEFAULT_SUBAGENT_WAIT_TIMEOUT_MS = 10 * 60 * 1000;
async function waitForSubagentCompletion(runId: string) {
try { try {
const timeoutMs = Math.max(1, Math.floor(waitTimeoutMs));
const wait = (await callGateway({ const wait = (await callGateway({
method: "agent.wait", method: "agent.wait",
params: { params: {
runId, runId,
timeoutMs: DEFAULT_SUBAGENT_WAIT_TIMEOUT_MS, timeoutMs,
}, },
timeoutMs: DEFAULT_SUBAGENT_WAIT_TIMEOUT_MS + 10_000, timeoutMs: timeoutMs + 10_000,
})) as { status?: string; startedAt?: number; endedAt?: number }; })) as { status?: string; startedAt?: number; endedAt?: number };
if (wait?.status !== "ok" && wait?.status !== "error") return; if (wait?.status !== "ok" && wait?.status !== "error") return;
const entry = subagentRuns.get(runId); const entry = subagentRuns.get(runId);

View File

@@ -211,6 +211,7 @@ export function createSessionsSpawnTool(opts?: {
task, task,
cleanup, cleanup,
label: label || undefined, label: label || undefined,
runTimeoutSeconds,
}); });
return jsonResult({ return jsonResult({