diff --git a/docs/concepts/session-tool.md b/docs/concepts/session-tool.md index de4f31fb2..d1e0cb343 100644 --- a/docs/concepts/session-tool.md +++ b/docs/concepts/session-tool.md @@ -127,14 +127,15 @@ Parameters: - `task` (required) - `label?` (optional; used for logs/UI) - `model?` (optional; overrides the sub-agent model; invalid values error) -- `timeoutSeconds?` (optional; omit for long-running jobs; if set, Clawdbot aborts the sub-agent when the timeout elapses) +- `runTimeoutSeconds?` (default 0; when set, aborts the sub-agent run after N seconds) - `cleanup?` (`delete|keep`, default `keep`) Behavior: -- Starts a new `agent::subagent:` session with `deliver: false`. +- Starts a new `agent::subagent:` session with `deliver: false`. - Sub-agents default to the full tool set **minus session tools** (configurable via `agent.subagents.tools`). - Sub-agents are not allowed to call `sessions_spawn` (no sub-agent → sub-agent spawning). -- After completion (or best-effort wait), Clawdbot runs a sub-agent **announce step** and posts the result to the requester chat provider. +- Always non-blocking: returns `{ status: "accepted", runId, childSessionKey }` immediately. +- After completion, Clawdbot runs a sub-agent **announce step** and posts the result to the requester chat provider. - Reply exactly `ANNOUNCE_SKIP` during the announce step to stay silent. - Sub-agent sessions are auto-archived after `agent.subagents.archiveAfterMinutes` (default: 60). - Announce replies include a stats line (runtime, tokens, sessionKey/sessionId, transcript path, and optional cost). diff --git a/docs/tools/index.md b/docs/tools/index.md index c6db325cc..af7e2609b 100644 --- a/docs/tools/index.md +++ b/docs/tools/index.md @@ -157,13 +157,14 @@ Core parameters: - `sessions_list`: `kinds?`, `limit?`, `activeMinutes?`, `messageLimit?` (0 = none) - `sessions_history`: `sessionKey`, `limit?`, `includeTools?` - `sessions_send`: `sessionKey`, `message`, `timeoutSeconds?` (0 = fire-and-forget) -- `sessions_spawn`: `task`, `label?`, `model?`, `timeoutSeconds?`, `cleanup?` +- `sessions_spawn`: `task`, `label?`, `model?`, `runTimeoutSeconds?`, `cleanup?` Notes: - `main` is the canonical direct-chat key; global/unknown are hidden. - `messageLimit > 0` fetches last N messages per session (tool messages filtered). - `sessions_send` waits for final completion when `timeoutSeconds > 0`. - `sessions_spawn` starts a sub-agent run and posts an announce reply back to the requester chat. +- `sessions_spawn` is non-blocking and returns `status: "accepted"` immediately. - `sessions_send` runs a reply‑back ping‑pong (reply `REPLY_SKIP` to stop; max turns via `session.agentToAgent.maxPingPongTurns`, 0–5). - After the ping‑pong, the target agent runs an **announce step**; reply `ANNOUNCE_SKIP` to suppress the announcement. diff --git a/docs/tools/subagents.md b/docs/tools/subagents.md index 68a88360d..c2d25e389 100644 --- a/docs/tools/subagents.md +++ b/docs/tools/subagents.md @@ -7,7 +7,7 @@ read_when: # Sub-agents -Sub-agents are background agent runs spawned from an existing agent run. They run in their own session (`agent::subagent:`) and, when finished, **announce** their result back to the requester chat provider. +Sub-agents are background agent runs spawned from an existing agent run. They run in their own session (`agent::subagent:`) and, when finished, **announce** their result back to the requester chat provider. Primary goals: - Parallelize “research / long task / slow tool” work without blocking the main run. @@ -25,7 +25,7 @@ Tool params: - `task` (required) - `label?` (optional) - `model?` (optional; overrides the sub-agent model; invalid values are skipped and the sub-agent runs on the default model with a warning in the tool result) -- `timeoutSeconds?` (optional; omit for long-running jobs; when set, Clawdbot waits up to N seconds and aborts the sub-agent if it is still running) +- `runTimeoutSeconds?` (default `0`; when set, the sub-agent run is aborted after N seconds) - `cleanup?` (`delete|keep`, default `keep`) Auto-archive: @@ -33,7 +33,7 @@ Auto-archive: - Archive uses `sessions.delete` and renames the transcript to `*.deleted.` (same folder). - `cleanup: "delete"` archives immediately after announce (still keeps the transcript via rename). - Auto-archive is best-effort; pending timers are lost if the gateway restarts. -- Timeouts do **not** auto-archive; they only stop the run. The session remains until auto-archive. +- `runTimeoutSeconds` does **not** auto-archive; it only stops the run. The session remains until auto-archive. ## Announce @@ -84,3 +84,4 @@ Sub-agents use a dedicated in-process queue lane: - Sub-agent announce is **best-effort**. If the gateway restarts, pending “announce back” work is lost. - Sub-agents still share the same gateway process resources; treat `maxConcurrent` as a safety valve. +- `sessions_spawn` is always non-blocking: it returns `{ status: "accepted", runId, childSessionKey }` immediately. diff --git a/src/agents/clawdbot-tools.subagents.test.ts b/src/agents/clawdbot-tools.subagents.test.ts index d8be2d249..0df0a0abd 100644 --- a/src/agents/clawdbot-tools.subagents.test.ts +++ b/src/agents/clawdbot-tools.subagents.test.ts @@ -19,17 +19,21 @@ vi.mock("../config/config.js", async (importOriginal) => { }; }); +import { emitAgentEvent } from "../infra/agent-events.js"; import { createClawdbotTools } from "./clawdbot-tools.js"; +import { resetSubagentRegistryForTests } from "./subagent-registry.js"; describe("subagents", () => { it("sessions_spawn announces back to the requester group provider", async () => { + resetSubagentRegistryForTests(); callGatewayMock.mockReset(); const calls: Array<{ method?: string; params?: unknown }> = []; let agentCallCount = 0; - let lastWaitedRunId: string | undefined; - const replyByRunId = new Map(); let sendParams: { to?: string; provider?: string; message?: string } = {}; let deletedKey: string | undefined; + let childRunId: string | undefined; + let childSessionKey: string | undefined; + const sessionLastAssistantText = new Map(); callGatewayMock.mockImplementation(async (opts: unknown) => { const request = opts as { method?: string; params?: unknown }; @@ -37,13 +41,21 @@ describe("subagents", () => { if (request.method === "agent") { agentCallCount += 1; const runId = `run-${agentCallCount}`; - const params = request.params as - | { message?: string; sessionKey?: string } - | undefined; + const params = request.params as { + message?: string; + sessionKey?: string; + timeout?: number; + }; const message = params?.message ?? ""; - const reply = - message === "Sub-agent announce step." ? "announce now" : "result"; - replyByRunId.set(runId, reply); + const sessionKey = params?.sessionKey ?? ""; + if (message === "Sub-agent announce step.") { + sessionLastAssistantText.set(sessionKey, "announce now"); + } else { + childRunId = runId; + childSessionKey = sessionKey; + sessionLastAssistantText.set(sessionKey, "result"); + expect(params?.timeout).toBe(1); + } return { runId, status: "accepted", @@ -51,13 +63,28 @@ describe("subagents", () => { }; } if (request.method === "agent.wait") { - const params = request.params as { runId?: string } | undefined; - lastWaitedRunId = params?.runId; + const params = request.params as + | { runId?: string; timeoutMs?: number } + | undefined; + if ( + params?.runId && + params.runId === childRunId && + typeof params.timeoutMs === "number" && + params.timeoutMs > 0 + ) { + throw new Error( + "sessions_spawn must not wait for sub-agent completion", + ); + } + if (params?.timeoutMs === 0) { + return { runId: params?.runId ?? "run-1", status: "timeout" }; + } return { runId: params?.runId ?? "run-1", status: "ok" }; } if (request.method === "chat.history") { + const params = request.params as { sessionKey?: string } | undefined; const text = - (lastWaitedRunId && replyByRunId.get(lastWaitedRunId)) ?? ""; + sessionLastAssistantText.get(params?.sessionKey ?? "") ?? ""; return { messages: [{ role: "assistant", content: [{ type: "text", text }] }], }; @@ -89,11 +116,26 @@ describe("subagents", () => { const result = await tool.execute("call1", { task: "do thing", - timeoutSeconds: 1, + runTimeoutSeconds: 1, cleanup: "delete", }); - expect(result.details).toMatchObject({ status: "ok", reply: "result" }); + expect(result.details).toMatchObject({ + status: "accepted", + runId: "run-1", + }); + if (!childRunId) throw new Error("missing child runId"); + emitAgentEvent({ + runId: childRunId, + stream: "lifecycle", + data: { + phase: "end", + startedAt: 1234, + endedAt: 2345, + }, + }); + + await new Promise((resolve) => setTimeout(resolve, 0)); await new Promise((resolve) => setTimeout(resolve, 0)); await new Promise((resolve) => setTimeout(resolve, 0)); @@ -105,6 +147,7 @@ describe("subagents", () => { expect(first?.lane).toBe("subagent"); expect(first?.deliver).toBe(false); expect(first?.sessionKey?.startsWith("agent:main:subagent:")).toBe(true); + expect(childSessionKey?.startsWith("agent:main:subagent:")).toBe(true); expect(sendParams.provider).toBe("discord"); expect(sendParams.to).toBe("channel:req"); @@ -114,12 +157,14 @@ describe("subagents", () => { }); it("sessions_spawn resolves main announce target from sessions.list", async () => { + resetSubagentRegistryForTests(); callGatewayMock.mockReset(); const calls: Array<{ method?: string; params?: unknown }> = []; let agentCallCount = 0; - let lastWaitedRunId: string | undefined; - const replyByRunId = new Map(); let sendParams: { to?: string; provider?: string; message?: string } = {}; + let childRunId: string | undefined; + let childSessionKey: string | undefined; + const sessionLastAssistantText = new Map(); callGatewayMock.mockImplementation(async (opts: unknown) => { const request = opts as { method?: string; params?: unknown }; @@ -138,13 +183,19 @@ describe("subagents", () => { if (request.method === "agent") { agentCallCount += 1; const runId = `run-${agentCallCount}`; - const params = request.params as - | { message?: string; sessionKey?: string } - | undefined; + const params = request.params as { + message?: string; + sessionKey?: string; + }; const message = params?.message ?? ""; - const reply = - message === "Sub-agent announce step." ? "hello from sub" : "done"; - replyByRunId.set(runId, reply); + const sessionKey = params?.sessionKey ?? ""; + if (message === "Sub-agent announce step.") { + sessionLastAssistantText.set(sessionKey, "hello from sub"); + } else { + childRunId = runId; + childSessionKey = sessionKey; + sessionLastAssistantText.set(sessionKey, "done"); + } return { runId, status: "accepted", @@ -152,13 +203,18 @@ describe("subagents", () => { }; } if (request.method === "agent.wait") { - const params = request.params as { runId?: string } | undefined; - lastWaitedRunId = params?.runId; + const params = request.params as + | { runId?: string; timeoutMs?: number } + | undefined; + if (params?.timeoutMs === 0) { + return { runId: params?.runId ?? "run-1", status: "timeout" }; + } return { runId: params?.runId ?? "run-1", status: "ok" }; } if (request.method === "chat.history") { + const params = request.params as { sessionKey?: string } | undefined; const text = - (lastWaitedRunId && replyByRunId.get(lastWaitedRunId)) ?? ""; + sessionLastAssistantText.get(params?.sessionKey ?? "") ?? ""; return { messages: [{ role: "assistant", content: [{ type: "text", text }] }], }; @@ -188,10 +244,25 @@ describe("subagents", () => { const result = await tool.execute("call2", { task: "do thing", - timeoutSeconds: 1, + runTimeoutSeconds: 1, + }); + expect(result.details).toMatchObject({ + status: "accepted", + runId: "run-1", }); - expect(result.details).toMatchObject({ status: "ok", reply: "done" }); + if (!childRunId) throw new Error("missing child runId"); + emitAgentEvent({ + runId: childRunId, + stream: "lifecycle", + data: { + phase: "end", + startedAt: 1000, + endedAt: 2000, + }, + }); + + await new Promise((resolve) => setTimeout(resolve, 0)); await new Promise((resolve) => setTimeout(resolve, 0)); await new Promise((resolve) => setTimeout(resolve, 0)); @@ -199,14 +270,14 @@ describe("subagents", () => { expect(sendParams.to).toBe("+123"); expect(sendParams.message ?? "").toContain("hello from sub"); expect(sendParams.message ?? "").toContain("Stats:"); + expect(childSessionKey?.startsWith("agent:main:subagent:")).toBe(true); }); it("sessions_spawn applies a model to the child session", async () => { + resetSubagentRegistryForTests(); callGatewayMock.mockReset(); const calls: Array<{ method?: string; params?: unknown }> = []; let agentCallCount = 0; - let lastWaitedRunId: string | undefined; - const replyByRunId = new Map(); callGatewayMock.mockImplementation(async (opts: unknown) => { const request = opts as { method?: string; params?: unknown }; @@ -217,13 +288,6 @@ describe("subagents", () => { if (request.method === "agent") { agentCallCount += 1; const runId = `run-${agentCallCount}`; - const params = request.params as - | { message?: string; sessionKey?: string } - | undefined; - const message = params?.message ?? ""; - const reply = - message === "Sub-agent announce step." ? "ANNOUNCE_SKIP" : "done"; - replyByRunId.set(runId, reply); return { runId, status: "accepted", @@ -231,16 +295,9 @@ describe("subagents", () => { }; } if (request.method === "agent.wait") { - const params = request.params as { runId?: string } | undefined; - lastWaitedRunId = params?.runId; - return { runId: params?.runId ?? "run-1", status: "ok" }; - } - if (request.method === "chat.history") { - const text = - (lastWaitedRunId && replyByRunId.get(lastWaitedRunId)) ?? ""; - return { - messages: [{ role: "assistant", content: [{ type: "text", text }] }], - }; + const params = request.params as { timeoutMs?: number } | undefined; + if (params?.timeoutMs === 0) return { status: "timeout" }; + return { status: "ok" }; } if (request.method === "sessions.delete") { return { ok: true }; @@ -256,11 +313,14 @@ describe("subagents", () => { const result = await tool.execute("call3", { task: "do thing", - timeoutSeconds: 1, + runTimeoutSeconds: 1, model: "claude-haiku-4-5", cleanup: "keep", }); - expect(result.details).toMatchObject({ status: "ok", reply: "done" }); + expect(result.details).toMatchObject({ + status: "accepted", + modelApplied: true, + }); const patchIndex = calls.findIndex( (call) => call.method === "sessions.patch", @@ -277,11 +337,10 @@ describe("subagents", () => { }); it("sessions_spawn skips invalid model overrides and continues", async () => { + resetSubagentRegistryForTests(); callGatewayMock.mockReset(); const calls: Array<{ method?: string; params?: unknown }> = []; let agentCallCount = 0; - let lastWaitedRunId: string | undefined; - const replyByRunId = new Map(); callGatewayMock.mockImplementation(async (opts: unknown) => { const request = opts as { method?: string; params?: unknown }; @@ -292,13 +351,6 @@ describe("subagents", () => { if (request.method === "agent") { agentCallCount += 1; const runId = `run-${agentCallCount}`; - const params = request.params as - | { message?: string; sessionKey?: string } - | undefined; - const message = params?.message ?? ""; - const reply = - message === "Sub-agent announce step." ? "ANNOUNCE_SKIP" : "done"; - replyByRunId.set(runId, reply); return { runId, status: "accepted", @@ -306,16 +358,9 @@ describe("subagents", () => { }; } if (request.method === "agent.wait") { - const params = request.params as { runId?: string } | undefined; - lastWaitedRunId = params?.runId; - return { runId: params?.runId ?? "run-1", status: "ok" }; - } - if (request.method === "chat.history") { - const text = - (lastWaitedRunId && replyByRunId.get(lastWaitedRunId)) ?? ""; - return { - messages: [{ role: "assistant", content: [{ type: "text", text }] }], - }; + const params = request.params as { timeoutMs?: number } | undefined; + if (params?.timeoutMs === 0) return { status: "timeout" }; + return { status: "ok" }; } if (request.method === "sessions.delete") { return { ok: true }; @@ -331,11 +376,11 @@ describe("subagents", () => { const result = await tool.execute("call4", { task: "do thing", - timeoutSeconds: 1, + runTimeoutSeconds: 1, model: "bad-model", }); expect(result.details).toMatchObject({ - status: "ok", + status: "accepted", modelApplied: false, }); expect( @@ -343,4 +388,36 @@ describe("subagents", () => { ).toContain("invalid model"); expect(calls.some((call) => call.method === "agent")).toBe(true); }); + + it("sessions_spawn supports legacy timeoutSeconds alias", async () => { + resetSubagentRegistryForTests(); + callGatewayMock.mockReset(); + let spawnedTimeout: number | undefined; + + callGatewayMock.mockImplementation(async (opts: unknown) => { + const request = opts as { method?: string; params?: unknown }; + if (request.method === "agent") { + const params = request.params as { timeout?: number } | undefined; + spawnedTimeout = params?.timeout; + return { runId: "run-1", status: "accepted", acceptedAt: 1000 }; + } + return {}; + }); + + const tool = createClawdbotTools({ + agentSessionKey: "main", + agentProvider: "whatsapp", + }).find((candidate) => candidate.name === "sessions_spawn"); + if (!tool) throw new Error("missing sessions_spawn tool"); + + const result = await tool.execute("call5", { + task: "do thing", + timeoutSeconds: 2, + }); + expect(result.details).toMatchObject({ + status: "accepted", + runId: "run-1", + }); + expect(spawnedTimeout).toBe(2); + }); }); diff --git a/src/agents/tool-display.json b/src/agents/tool-display.json index fb02b91f4..67ecb83ae 100644 --- a/src/agents/tool-display.json +++ b/src/agents/tool-display.json @@ -168,7 +168,7 @@ "sessions_spawn": { "emoji": "🧑‍🔧", "title": "Sub-agent", - "detailKeys": ["label", "timeoutSeconds", "cleanup"] + "detailKeys": ["label", "runTimeoutSeconds", "cleanup"] }, "whatsapp_login": { "emoji": "🟢", diff --git a/src/agents/tools/sessions-spawn-tool.ts b/src/agents/tools/sessions-spawn-tool.ts index ea19f370a..fe983feb5 100644 --- a/src/agents/tools/sessions-spawn-tool.ts +++ b/src/agents/tools/sessions-spawn-tool.ts @@ -9,15 +9,8 @@ import { normalizeAgentId, parseAgentSessionKey, } from "../../routing/session-key.js"; -import { - buildSubagentSystemPrompt, - runSubagentAnnounceFlow, -} from "../subagent-announce.js"; -import { - beginSubagentAnnounce, - registerSubagentRun, -} from "../subagent-registry.js"; -import { readLatestAssistantReply } from "./agent-step.js"; +import { buildSubagentSystemPrompt } from "../subagent-announce.js"; +import { registerSubagentRun } from "../subagent-registry.js"; import type { AnyAgentTool } from "./common.js"; import { jsonResult, readStringParam } from "./common.js"; import { @@ -30,6 +23,8 @@ const SessionsSpawnToolSchema = Type.Object({ task: Type.String(), label: Type.Optional(Type.String()), model: Type.Optional(Type.String()), + runTimeoutSeconds: Type.Optional(Type.Integer({ minimum: 0 })), + // Back-compat alias. Prefer runTimeoutSeconds. timeoutSeconds: Type.Optional(Type.Integer({ minimum: 0 })), cleanup: Type.Optional( Type.Union([Type.Literal("delete"), Type.Literal("keep")]), @@ -56,12 +51,20 @@ export function createSessionsSpawnTool(opts?: { params.cleanup === "keep" || params.cleanup === "delete" ? (params.cleanup as "keep" | "delete") : "keep"; - const timeoutSeconds = - typeof params.timeoutSeconds === "number" && - Number.isFinite(params.timeoutSeconds) - ? Math.max(0, Math.floor(params.timeoutSeconds)) - : 0; - const timeoutMs = timeoutSeconds * 1000; + const runTimeoutSeconds = (() => { + const explicit = + typeof params.runTimeoutSeconds === "number" && + Number.isFinite(params.runTimeoutSeconds) + ? Math.max(0, Math.floor(params.runTimeoutSeconds)) + : undefined; + if (explicit !== undefined) return explicit; + const legacy = + typeof params.timeoutSeconds === "number" && + Number.isFinite(params.timeoutSeconds) + ? Math.max(0, Math.floor(params.timeoutSeconds)) + : undefined; + return legacy ?? 0; + })(); let modelWarning: string | undefined; let modelApplied = false; @@ -152,6 +155,7 @@ export function createSessionsSpawnTool(opts?: { deliver: false, lane: "subagent", extraSystemPrompt: childSystemPrompt, + timeout: runTimeoutSeconds > 0 ? runTimeoutSeconds : undefined, }, timeoutMs: 10_000, })) as { runId?: string }; @@ -183,109 +187,10 @@ export function createSessionsSpawnTool(opts?: { cleanup, }); - if (timeoutSeconds === 0) { - return jsonResult({ - status: "accepted", - childSessionKey, - runId: childRunId, - modelApplied: model ? modelApplied : undefined, - warning: modelWarning, - }); - } - - let waitStatus: string | undefined; - let waitError: string | undefined; - let waitStartedAt: number | undefined; - let waitEndedAt: number | undefined; - try { - const wait = (await callGateway({ - method: "agent.wait", - params: { - runId: childRunId, - timeoutMs, - }, - timeoutMs: timeoutMs + 2000, - })) as { - status?: string; - error?: string; - startedAt?: number; - endedAt?: number; - }; - waitStatus = typeof wait?.status === "string" ? wait.status : undefined; - waitError = typeof wait?.error === "string" ? wait.error : undefined; - waitStartedAt = - typeof wait?.startedAt === "number" ? wait.startedAt : undefined; - waitEndedAt = - typeof wait?.endedAt === "number" ? wait.endedAt : undefined; - } catch (err) { - const messageText = - err instanceof Error - ? err.message - : typeof err === "string" - ? err - : "error"; - return jsonResult({ - status: messageText.includes("gateway timeout") ? "timeout" : "error", - error: messageText, - childSessionKey, - runId: childRunId, - }); - } - - if (waitStatus === "timeout") { - try { - await callGateway({ - method: "chat.abort", - params: { sessionKey: childSessionKey, runId: childRunId }, - timeoutMs: 5_000, - }); - } catch { - // best-effort - } - return jsonResult({ - status: "timeout", - error: waitError, - childSessionKey, - runId: childRunId, - modelApplied: model ? modelApplied : undefined, - warning: modelWarning, - }); - } - if (waitStatus === "error") { - return jsonResult({ - status: "error", - error: waitError ?? "agent error", - childSessionKey, - runId: childRunId, - modelApplied: model ? modelApplied : undefined, - warning: modelWarning, - }); - } - - const replyText = await readLatestAssistantReply({ - sessionKey: childSessionKey, - }); - if (beginSubagentAnnounce(childRunId)) { - void runSubagentAnnounceFlow({ - childSessionKey, - childRunId, - requesterSessionKey: requesterInternalKey, - requesterProvider: opts?.agentProvider, - requesterDisplayKey, - task, - timeoutMs: 30_000, - cleanup, - roundOneReply: replyText, - startedAt: waitStartedAt, - endedAt: waitEndedAt, - }); - } - return jsonResult({ - status: "ok", + status: "accepted", childSessionKey, runId: childRunId, - reply: replyText, modelApplied: model ? modelApplied : undefined, warning: modelWarning, });