diff --git a/src/agents/clawdbot-tools.subagents.test.ts b/src/agents/clawdbot-tools.subagents.test.ts index e98bfbe4d..d8c99cbfc 100644 --- a/src/agents/clawdbot-tools.subagents.test.ts +++ b/src/agents/clawdbot-tools.subagents.test.ts @@ -46,6 +46,7 @@ describe("subagents", () => { let deletedKey: string | undefined; let childRunId: string | undefined; let childSessionKey: string | undefined; + const waitCalls: Array<{ runId?: string; timeoutMs?: number }> = []; const sessionLastAssistantText = new Map(); callGatewayMock.mockImplementation(async (opts: unknown) => { @@ -81,20 +82,9 @@ describe("subagents", () => { const params = request.params as | { runId?: string; timeoutMs?: number } | undefined; - if ( - params?.runId && - params.runId === childRunId && - typeof params.timeoutMs === "number" && - params.timeoutMs > 0 - ) { - throw new Error( - "sessions_spawn must not wait for sub-agent completion", - ); - } - if (params?.timeoutMs === 0) { - return { runId: params?.runId ?? "run-1", status: "timeout" }; - } - return { runId: params?.runId ?? "run-1", status: "ok" }; + waitCalls.push(params ?? {}); + const status = params?.runId === childRunId ? "timeout" : "ok"; + return { runId: params?.runId ?? "run-1", status }; } if (request.method === "chat.history") { const params = request.params as { sessionKey?: string } | undefined; @@ -154,6 +144,8 @@ describe("subagents", () => { await new Promise((resolve) => setTimeout(resolve, 0)); await new Promise((resolve) => setTimeout(resolve, 0)); + const childWait = waitCalls.find((call) => call.runId === childRunId); + expect(childWait?.timeoutMs).toBe(1000); const agentCalls = calls.filter((call) => call.method === "agent"); expect(agentCalls).toHaveLength(2); const first = agentCalls[0]?.params as @@ -183,6 +175,126 @@ describe("subagents", () => { expect(deletedKey?.startsWith("agent:main:subagent:")).toBe(true); }); + it("sessions_spawn announces via agent.wait when lifecycle events are missing", async () => { + resetSubagentRegistryForTests(); + callGatewayMock.mockReset(); + const calls: Array<{ method?: string; params?: unknown }> = []; + let agentCallCount = 0; + let sendParams: { to?: string; provider?: string; message?: string } = {}; + let deletedKey: string | undefined; + let childRunId: string | undefined; + let childSessionKey: string | undefined; + const waitCalls: Array<{ runId?: string; timeoutMs?: number }> = []; + const sessionLastAssistantText = new Map(); + + callGatewayMock.mockImplementation(async (opts: unknown) => { + const request = opts as { method?: string; params?: unknown }; + calls.push(request); + if (request.method === "agent") { + agentCallCount += 1; + const runId = `run-${agentCallCount}`; + const params = request.params as { + message?: string; + sessionKey?: string; + provider?: string; + timeout?: number; + }; + const message = params?.message ?? ""; + const sessionKey = params?.sessionKey ?? ""; + if (message === "Sub-agent announce step.") { + sessionLastAssistantText.set(sessionKey, "announce now"); + } else { + childRunId = runId; + childSessionKey = sessionKey; + sessionLastAssistantText.set(sessionKey, "result"); + expect(params?.provider).toBe("discord"); + expect(params?.timeout).toBe(1); + } + return { + runId, + status: "accepted", + acceptedAt: 2000 + agentCallCount, + }; + } + if (request.method === "agent.wait") { + const params = request.params as + | { runId?: string; timeoutMs?: number } + | undefined; + waitCalls.push(params ?? {}); + return { + runId: params?.runId ?? "run-1", + status: "ok", + startedAt: 3000, + endedAt: 4000, + }; + } + if (request.method === "chat.history") { + const params = request.params as { sessionKey?: string } | undefined; + const text = + sessionLastAssistantText.get(params?.sessionKey ?? "") ?? ""; + return { + messages: [{ role: "assistant", content: [{ type: "text", text }] }], + }; + } + if (request.method === "send") { + const params = request.params as + | { to?: string; provider?: string; message?: string } + | undefined; + sendParams = { + to: params?.to, + provider: params?.provider, + message: params?.message, + }; + return { messageId: "m-announce" }; + } + if (request.method === "sessions.delete") { + const params = request.params as { key?: string } | undefined; + deletedKey = params?.key; + return { ok: true }; + } + return {}; + }); + + const tool = createClawdbotTools({ + agentSessionKey: "discord:group:req", + agentProvider: "discord", + }).find((candidate) => candidate.name === "sessions_spawn"); + if (!tool) throw new Error("missing sessions_spawn tool"); + + const result = await tool.execute("call1b", { + task: "do thing", + runTimeoutSeconds: 1, + cleanup: "delete", + }); + expect(result.details).toMatchObject({ + status: "accepted", + runId: "run-1", + }); + + await new Promise((resolve) => setTimeout(resolve, 0)); + await new Promise((resolve) => setTimeout(resolve, 0)); + await new Promise((resolve) => setTimeout(resolve, 0)); + + const childWait = waitCalls.find((call) => call.runId === childRunId); + expect(childWait?.timeoutMs).toBe(1000); + expect(childSessionKey?.startsWith("agent:main:subagent:")).toBe(true); + + const agentCalls = calls.filter((call) => call.method === "agent"); + expect(agentCalls).toHaveLength(2); + const second = agentCalls[1]?.params as + | { provider?: string; deliver?: boolean; lane?: string } + | undefined; + expect(second?.lane).toBe("nested"); + expect(second?.deliver).toBe(false); + expect(second?.provider).toBe("webchat"); + + expect(sendParams.provider).toBe("discord"); + expect(sendParams.to).toBe("channel:req"); + expect(sendParams.message ?? "").toContain("announce now"); + expect(sendParams.message ?? "").toContain("Stats:"); + expect(deletedKey?.startsWith("agent:main:subagent:")).toBe(true); + }); + it("sessions_spawn resolves main announce target from sessions.list", async () => { resetSubagentRegistryForTests(); callGatewayMock.mockReset(); @@ -191,6 +303,7 @@ describe("subagents", () => { let sendParams: { to?: string; provider?: string; message?: string } = {}; let childRunId: string | undefined; let childSessionKey: string | undefined; + const waitCalls: Array<{ runId?: string; timeoutMs?: number }> = []; const sessionLastAssistantText = new Map(); callGatewayMock.mockImplementation(async (opts: unknown) => { @@ -233,10 +346,9 @@ describe("subagents", () => { const params = request.params as | { runId?: string; timeoutMs?: number } | undefined; - if (params?.timeoutMs === 0) { - return { runId: params?.runId ?? "run-1", status: "timeout" }; - } - return { runId: params?.runId ?? "run-1", status: "ok" }; + waitCalls.push(params ?? {}); + const status = params?.runId === childRunId ? "timeout" : "ok"; + return { runId: params?.runId ?? "run-1", status }; } if (request.method === "chat.history") { const params = request.params as { sessionKey?: string } | undefined; @@ -293,6 +405,8 @@ describe("subagents", () => { await new Promise((resolve) => setTimeout(resolve, 0)); await new Promise((resolve) => setTimeout(resolve, 0)); + const childWait = waitCalls.find((call) => call.runId === childRunId); + expect(childWait?.timeoutMs).toBe(1000); expect(sendParams.provider).toBe("whatsapp"); expect(sendParams.to).toBe("+123"); expect(sendParams.message ?? "").toContain("hello from sub"); @@ -534,9 +648,7 @@ describe("subagents", () => { }; } if (request.method === "agent.wait") { - const params = request.params as { timeoutMs?: number } | undefined; - if (params?.timeoutMs === 0) return { status: "timeout" }; - return { status: "ok" }; + return { status: "timeout" }; } if (request.method === "sessions.delete") { return { ok: true }; @@ -597,9 +709,7 @@ describe("subagents", () => { }; } if (request.method === "agent.wait") { - const params = request.params as { timeoutMs?: number } | undefined; - if (params?.timeoutMs === 0) return { status: "timeout" }; - return { status: "ok" }; + return { status: "timeout" }; } if (request.method === "sessions.delete") { return { ok: true }; diff --git a/src/agents/subagent-registry.ts b/src/agents/subagent-registry.ts index 14dca3f03..3ad5ff415 100644 --- a/src/agents/subagent-registry.ts +++ b/src/agents/subagent-registry.ts @@ -2,6 +2,7 @@ import { loadConfig } from "../config/config.js"; import { callGateway } from "../gateway/call.js"; import { onAgentEvent } from "../infra/agent-events.js"; import { runSubagentAnnounceFlow } from "./subagent-announce.js"; +import { resolveAgentTimeoutMs } from "./timeout.js"; export type SubagentRunRecord = { runId: string; @@ -23,13 +24,20 @@ const subagentRuns = new Map(); let sweeper: NodeJS.Timeout | null = null; let listenerStarted = false; -function resolveArchiveAfterMs() { - const cfg = loadConfig(); - const minutes = cfg.agents?.defaults?.subagents?.archiveAfterMinutes ?? 60; +function resolveArchiveAfterMs(cfg?: ReturnType) { + const config = cfg ?? loadConfig(); + const minutes = config.agents?.defaults?.subagents?.archiveAfterMinutes ?? 60; if (!Number.isFinite(minutes) || minutes <= 0) return undefined; return Math.max(1, Math.floor(minutes)) * 60_000; } +function resolveSubagentWaitTimeoutMs( + cfg: ReturnType, + runTimeoutSeconds?: number, +) { + return resolveAgentTimeoutMs({ cfg, overrideSeconds: runTimeoutSeconds }); +} + function startSweeper() { if (sweeper) return; sweeper = setInterval(() => { @@ -130,10 +138,16 @@ export function registerSubagentRun(params: { task: string; cleanup: "delete" | "keep"; label?: string; + runTimeoutSeconds?: number; }) { const now = Date.now(); - const archiveAfterMs = resolveArchiveAfterMs(); + const cfg = loadConfig(); + const archiveAfterMs = resolveArchiveAfterMs(cfg); const archiveAtMs = archiveAfterMs ? now + archiveAfterMs : undefined; + const waitTimeoutMs = resolveSubagentWaitTimeoutMs( + cfg, + params.runTimeoutSeconds, + ); subagentRuns.set(params.runId, { runId: params.runId, childSessionKey: params.childSessionKey, @@ -152,21 +166,19 @@ export function registerSubagentRun(params: { if (archiveAfterMs) startSweeper(); // Wait for subagent completion via gateway RPC (cross-process). // The in-process lifecycle listener is a fallback for embedded runs. - void waitForSubagentCompletion(params.runId); + void waitForSubagentCompletion(params.runId, waitTimeoutMs); } -// Default wait timeout: 10 minutes. This covers most subagent runs. -const DEFAULT_SUBAGENT_WAIT_TIMEOUT_MS = 10 * 60 * 1000; - -async function waitForSubagentCompletion(runId: string) { +async function waitForSubagentCompletion(runId: string, waitTimeoutMs: number) { try { + const timeoutMs = Math.max(1, Math.floor(waitTimeoutMs)); const wait = (await callGateway({ method: "agent.wait", params: { runId, - timeoutMs: DEFAULT_SUBAGENT_WAIT_TIMEOUT_MS, + timeoutMs, }, - timeoutMs: DEFAULT_SUBAGENT_WAIT_TIMEOUT_MS + 10_000, + timeoutMs: timeoutMs + 10_000, })) as { status?: string; startedAt?: number; endedAt?: number }; if (wait?.status !== "ok" && wait?.status !== "error") return; const entry = subagentRuns.get(runId); diff --git a/src/agents/tools/sessions-spawn-tool.ts b/src/agents/tools/sessions-spawn-tool.ts index 2806f22c3..1a0eb5565 100644 --- a/src/agents/tools/sessions-spawn-tool.ts +++ b/src/agents/tools/sessions-spawn-tool.ts @@ -211,6 +211,7 @@ export function createSessionsSpawnTool(opts?: { task, cleanup, label: label || undefined, + runTimeoutSeconds, }); return jsonResult({