fix(agents): make sessions_spawn non-blocking

This commit is contained in:
Peter Steinberger
2026-01-07 16:14:25 +00:00
parent 28b8349bd5
commit 77024cf776
6 changed files with 176 additions and 191 deletions

View File

@@ -19,17 +19,21 @@ vi.mock("../config/config.js", async (importOriginal) => {
};
});
import { emitAgentEvent } from "../infra/agent-events.js";
import { createClawdbotTools } from "./clawdbot-tools.js";
import { resetSubagentRegistryForTests } from "./subagent-registry.js";
describe("subagents", () => {
it("sessions_spawn announces back to the requester group provider", async () => {
resetSubagentRegistryForTests();
callGatewayMock.mockReset();
const calls: Array<{ method?: string; params?: unknown }> = [];
let agentCallCount = 0;
let lastWaitedRunId: string | undefined;
const replyByRunId = new Map<string, string>();
let sendParams: { to?: string; provider?: string; message?: string } = {};
let deletedKey: string | undefined;
let childRunId: string | undefined;
let childSessionKey: string | undefined;
const sessionLastAssistantText = new Map<string, string>();
callGatewayMock.mockImplementation(async (opts: unknown) => {
const request = opts as { method?: string; params?: unknown };
@@ -37,13 +41,21 @@ describe("subagents", () => {
if (request.method === "agent") {
agentCallCount += 1;
const runId = `run-${agentCallCount}`;
const params = request.params as
| { message?: string; sessionKey?: string }
| undefined;
const params = request.params as {
message?: string;
sessionKey?: string;
timeout?: number;
};
const message = params?.message ?? "";
const reply =
message === "Sub-agent announce step." ? "announce now" : "result";
replyByRunId.set(runId, reply);
const sessionKey = params?.sessionKey ?? "";
if (message === "Sub-agent announce step.") {
sessionLastAssistantText.set(sessionKey, "announce now");
} else {
childRunId = runId;
childSessionKey = sessionKey;
sessionLastAssistantText.set(sessionKey, "result");
expect(params?.timeout).toBe(1);
}
return {
runId,
status: "accepted",
@@ -51,13 +63,28 @@ describe("subagents", () => {
};
}
if (request.method === "agent.wait") {
const params = request.params as { runId?: string } | undefined;
lastWaitedRunId = params?.runId;
const params = request.params as
| { runId?: string; timeoutMs?: number }
| undefined;
if (
params?.runId &&
params.runId === childRunId &&
typeof params.timeoutMs === "number" &&
params.timeoutMs > 0
) {
throw new Error(
"sessions_spawn must not wait for sub-agent completion",
);
}
if (params?.timeoutMs === 0) {
return { runId: params?.runId ?? "run-1", status: "timeout" };
}
return { runId: params?.runId ?? "run-1", status: "ok" };
}
if (request.method === "chat.history") {
const params = request.params as { sessionKey?: string } | undefined;
const text =
(lastWaitedRunId && replyByRunId.get(lastWaitedRunId)) ?? "";
sessionLastAssistantText.get(params?.sessionKey ?? "") ?? "";
return {
messages: [{ role: "assistant", content: [{ type: "text", text }] }],
};
@@ -89,11 +116,26 @@ describe("subagents", () => {
const result = await tool.execute("call1", {
task: "do thing",
timeoutSeconds: 1,
runTimeoutSeconds: 1,
cleanup: "delete",
});
expect(result.details).toMatchObject({ status: "ok", reply: "result" });
expect(result.details).toMatchObject({
status: "accepted",
runId: "run-1",
});
if (!childRunId) throw new Error("missing child runId");
emitAgentEvent({
runId: childRunId,
stream: "lifecycle",
data: {
phase: "end",
startedAt: 1234,
endedAt: 2345,
},
});
await new Promise((resolve) => setTimeout(resolve, 0));
await new Promise((resolve) => setTimeout(resolve, 0));
await new Promise((resolve) => setTimeout(resolve, 0));
@@ -105,6 +147,7 @@ describe("subagents", () => {
expect(first?.lane).toBe("subagent");
expect(first?.deliver).toBe(false);
expect(first?.sessionKey?.startsWith("agent:main:subagent:")).toBe(true);
expect(childSessionKey?.startsWith("agent:main:subagent:")).toBe(true);
expect(sendParams.provider).toBe("discord");
expect(sendParams.to).toBe("channel:req");
@@ -114,12 +157,14 @@ describe("subagents", () => {
});
it("sessions_spawn resolves main announce target from sessions.list", async () => {
resetSubagentRegistryForTests();
callGatewayMock.mockReset();
const calls: Array<{ method?: string; params?: unknown }> = [];
let agentCallCount = 0;
let lastWaitedRunId: string | undefined;
const replyByRunId = new Map<string, string>();
let sendParams: { to?: string; provider?: string; message?: string } = {};
let childRunId: string | undefined;
let childSessionKey: string | undefined;
const sessionLastAssistantText = new Map<string, string>();
callGatewayMock.mockImplementation(async (opts: unknown) => {
const request = opts as { method?: string; params?: unknown };
@@ -138,13 +183,19 @@ describe("subagents", () => {
if (request.method === "agent") {
agentCallCount += 1;
const runId = `run-${agentCallCount}`;
const params = request.params as
| { message?: string; sessionKey?: string }
| undefined;
const params = request.params as {
message?: string;
sessionKey?: string;
};
const message = params?.message ?? "";
const reply =
message === "Sub-agent announce step." ? "hello from sub" : "done";
replyByRunId.set(runId, reply);
const sessionKey = params?.sessionKey ?? "";
if (message === "Sub-agent announce step.") {
sessionLastAssistantText.set(sessionKey, "hello from sub");
} else {
childRunId = runId;
childSessionKey = sessionKey;
sessionLastAssistantText.set(sessionKey, "done");
}
return {
runId,
status: "accepted",
@@ -152,13 +203,18 @@ describe("subagents", () => {
};
}
if (request.method === "agent.wait") {
const params = request.params as { runId?: string } | undefined;
lastWaitedRunId = params?.runId;
const params = request.params as
| { runId?: string; timeoutMs?: number }
| undefined;
if (params?.timeoutMs === 0) {
return { runId: params?.runId ?? "run-1", status: "timeout" };
}
return { runId: params?.runId ?? "run-1", status: "ok" };
}
if (request.method === "chat.history") {
const params = request.params as { sessionKey?: string } | undefined;
const text =
(lastWaitedRunId && replyByRunId.get(lastWaitedRunId)) ?? "";
sessionLastAssistantText.get(params?.sessionKey ?? "") ?? "";
return {
messages: [{ role: "assistant", content: [{ type: "text", text }] }],
};
@@ -188,10 +244,25 @@ describe("subagents", () => {
const result = await tool.execute("call2", {
task: "do thing",
timeoutSeconds: 1,
runTimeoutSeconds: 1,
});
expect(result.details).toMatchObject({
status: "accepted",
runId: "run-1",
});
expect(result.details).toMatchObject({ status: "ok", reply: "done" });
if (!childRunId) throw new Error("missing child runId");
emitAgentEvent({
runId: childRunId,
stream: "lifecycle",
data: {
phase: "end",
startedAt: 1000,
endedAt: 2000,
},
});
await new Promise((resolve) => setTimeout(resolve, 0));
await new Promise((resolve) => setTimeout(resolve, 0));
await new Promise((resolve) => setTimeout(resolve, 0));
@@ -199,14 +270,14 @@ describe("subagents", () => {
expect(sendParams.to).toBe("+123");
expect(sendParams.message ?? "").toContain("hello from sub");
expect(sendParams.message ?? "").toContain("Stats:");
expect(childSessionKey?.startsWith("agent:main:subagent:")).toBe(true);
});
it("sessions_spawn applies a model to the child session", async () => {
resetSubagentRegistryForTests();
callGatewayMock.mockReset();
const calls: Array<{ method?: string; params?: unknown }> = [];
let agentCallCount = 0;
let lastWaitedRunId: string | undefined;
const replyByRunId = new Map<string, string>();
callGatewayMock.mockImplementation(async (opts: unknown) => {
const request = opts as { method?: string; params?: unknown };
@@ -217,13 +288,6 @@ describe("subagents", () => {
if (request.method === "agent") {
agentCallCount += 1;
const runId = `run-${agentCallCount}`;
const params = request.params as
| { message?: string; sessionKey?: string }
| undefined;
const message = params?.message ?? "";
const reply =
message === "Sub-agent announce step." ? "ANNOUNCE_SKIP" : "done";
replyByRunId.set(runId, reply);
return {
runId,
status: "accepted",
@@ -231,16 +295,9 @@ describe("subagents", () => {
};
}
if (request.method === "agent.wait") {
const params = request.params as { runId?: string } | undefined;
lastWaitedRunId = params?.runId;
return { runId: params?.runId ?? "run-1", status: "ok" };
}
if (request.method === "chat.history") {
const text =
(lastWaitedRunId && replyByRunId.get(lastWaitedRunId)) ?? "";
return {
messages: [{ role: "assistant", content: [{ type: "text", text }] }],
};
const params = request.params as { timeoutMs?: number } | undefined;
if (params?.timeoutMs === 0) return { status: "timeout" };
return { status: "ok" };
}
if (request.method === "sessions.delete") {
return { ok: true };
@@ -256,11 +313,14 @@ describe("subagents", () => {
const result = await tool.execute("call3", {
task: "do thing",
timeoutSeconds: 1,
runTimeoutSeconds: 1,
model: "claude-haiku-4-5",
cleanup: "keep",
});
expect(result.details).toMatchObject({ status: "ok", reply: "done" });
expect(result.details).toMatchObject({
status: "accepted",
modelApplied: true,
});
const patchIndex = calls.findIndex(
(call) => call.method === "sessions.patch",
@@ -277,11 +337,10 @@ describe("subagents", () => {
});
it("sessions_spawn skips invalid model overrides and continues", async () => {
resetSubagentRegistryForTests();
callGatewayMock.mockReset();
const calls: Array<{ method?: string; params?: unknown }> = [];
let agentCallCount = 0;
let lastWaitedRunId: string | undefined;
const replyByRunId = new Map<string, string>();
callGatewayMock.mockImplementation(async (opts: unknown) => {
const request = opts as { method?: string; params?: unknown };
@@ -292,13 +351,6 @@ describe("subagents", () => {
if (request.method === "agent") {
agentCallCount += 1;
const runId = `run-${agentCallCount}`;
const params = request.params as
| { message?: string; sessionKey?: string }
| undefined;
const message = params?.message ?? "";
const reply =
message === "Sub-agent announce step." ? "ANNOUNCE_SKIP" : "done";
replyByRunId.set(runId, reply);
return {
runId,
status: "accepted",
@@ -306,16 +358,9 @@ describe("subagents", () => {
};
}
if (request.method === "agent.wait") {
const params = request.params as { runId?: string } | undefined;
lastWaitedRunId = params?.runId;
return { runId: params?.runId ?? "run-1", status: "ok" };
}
if (request.method === "chat.history") {
const text =
(lastWaitedRunId && replyByRunId.get(lastWaitedRunId)) ?? "";
return {
messages: [{ role: "assistant", content: [{ type: "text", text }] }],
};
const params = request.params as { timeoutMs?: number } | undefined;
if (params?.timeoutMs === 0) return { status: "timeout" };
return { status: "ok" };
}
if (request.method === "sessions.delete") {
return { ok: true };
@@ -331,11 +376,11 @@ describe("subagents", () => {
const result = await tool.execute("call4", {
task: "do thing",
timeoutSeconds: 1,
runTimeoutSeconds: 1,
model: "bad-model",
});
expect(result.details).toMatchObject({
status: "ok",
status: "accepted",
modelApplied: false,
});
expect(
@@ -343,4 +388,36 @@ describe("subagents", () => {
).toContain("invalid model");
expect(calls.some((call) => call.method === "agent")).toBe(true);
});
it("sessions_spawn supports legacy timeoutSeconds alias", async () => {
resetSubagentRegistryForTests();
callGatewayMock.mockReset();
let spawnedTimeout: number | undefined;
callGatewayMock.mockImplementation(async (opts: unknown) => {
const request = opts as { method?: string; params?: unknown };
if (request.method === "agent") {
const params = request.params as { timeout?: number } | undefined;
spawnedTimeout = params?.timeout;
return { runId: "run-1", status: "accepted", acceptedAt: 1000 };
}
return {};
});
const tool = createClawdbotTools({
agentSessionKey: "main",
agentProvider: "whatsapp",
}).find((candidate) => candidate.name === "sessions_spawn");
if (!tool) throw new Error("missing sessions_spawn tool");
const result = await tool.execute("call5", {
task: "do thing",
timeoutSeconds: 2,
});
expect(result.details).toMatchObject({
status: "accepted",
runId: "run-1",
});
expect(spawnedTimeout).toBe(2);
});
});

View File

@@ -168,7 +168,7 @@
"sessions_spawn": {
"emoji": "🧑‍🔧",
"title": "Sub-agent",
"detailKeys": ["label", "timeoutSeconds", "cleanup"]
"detailKeys": ["label", "runTimeoutSeconds", "cleanup"]
},
"whatsapp_login": {
"emoji": "🟢",

View File

@@ -9,15 +9,8 @@ import {
normalizeAgentId,
parseAgentSessionKey,
} from "../../routing/session-key.js";
import {
buildSubagentSystemPrompt,
runSubagentAnnounceFlow,
} from "../subagent-announce.js";
import {
beginSubagentAnnounce,
registerSubagentRun,
} from "../subagent-registry.js";
import { readLatestAssistantReply } from "./agent-step.js";
import { buildSubagentSystemPrompt } from "../subagent-announce.js";
import { registerSubagentRun } from "../subagent-registry.js";
import type { AnyAgentTool } from "./common.js";
import { jsonResult, readStringParam } from "./common.js";
import {
@@ -30,6 +23,8 @@ const SessionsSpawnToolSchema = Type.Object({
task: Type.String(),
label: Type.Optional(Type.String()),
model: Type.Optional(Type.String()),
runTimeoutSeconds: Type.Optional(Type.Integer({ minimum: 0 })),
// Back-compat alias. Prefer runTimeoutSeconds.
timeoutSeconds: Type.Optional(Type.Integer({ minimum: 0 })),
cleanup: Type.Optional(
Type.Union([Type.Literal("delete"), Type.Literal("keep")]),
@@ -56,12 +51,20 @@ export function createSessionsSpawnTool(opts?: {
params.cleanup === "keep" || params.cleanup === "delete"
? (params.cleanup as "keep" | "delete")
: "keep";
const timeoutSeconds =
typeof params.timeoutSeconds === "number" &&
Number.isFinite(params.timeoutSeconds)
? Math.max(0, Math.floor(params.timeoutSeconds))
: 0;
const timeoutMs = timeoutSeconds * 1000;
const runTimeoutSeconds = (() => {
const explicit =
typeof params.runTimeoutSeconds === "number" &&
Number.isFinite(params.runTimeoutSeconds)
? Math.max(0, Math.floor(params.runTimeoutSeconds))
: undefined;
if (explicit !== undefined) return explicit;
const legacy =
typeof params.timeoutSeconds === "number" &&
Number.isFinite(params.timeoutSeconds)
? Math.max(0, Math.floor(params.timeoutSeconds))
: undefined;
return legacy ?? 0;
})();
let modelWarning: string | undefined;
let modelApplied = false;
@@ -152,6 +155,7 @@ export function createSessionsSpawnTool(opts?: {
deliver: false,
lane: "subagent",
extraSystemPrompt: childSystemPrompt,
timeout: runTimeoutSeconds > 0 ? runTimeoutSeconds : undefined,
},
timeoutMs: 10_000,
})) as { runId?: string };
@@ -183,109 +187,10 @@ export function createSessionsSpawnTool(opts?: {
cleanup,
});
if (timeoutSeconds === 0) {
return jsonResult({
status: "accepted",
childSessionKey,
runId: childRunId,
modelApplied: model ? modelApplied : undefined,
warning: modelWarning,
});
}
let waitStatus: string | undefined;
let waitError: string | undefined;
let waitStartedAt: number | undefined;
let waitEndedAt: number | undefined;
try {
const wait = (await callGateway({
method: "agent.wait",
params: {
runId: childRunId,
timeoutMs,
},
timeoutMs: timeoutMs + 2000,
})) as {
status?: string;
error?: string;
startedAt?: number;
endedAt?: number;
};
waitStatus = typeof wait?.status === "string" ? wait.status : undefined;
waitError = typeof wait?.error === "string" ? wait.error : undefined;
waitStartedAt =
typeof wait?.startedAt === "number" ? wait.startedAt : undefined;
waitEndedAt =
typeof wait?.endedAt === "number" ? wait.endedAt : undefined;
} catch (err) {
const messageText =
err instanceof Error
? err.message
: typeof err === "string"
? err
: "error";
return jsonResult({
status: messageText.includes("gateway timeout") ? "timeout" : "error",
error: messageText,
childSessionKey,
runId: childRunId,
});
}
if (waitStatus === "timeout") {
try {
await callGateway({
method: "chat.abort",
params: { sessionKey: childSessionKey, runId: childRunId },
timeoutMs: 5_000,
});
} catch {
// best-effort
}
return jsonResult({
status: "timeout",
error: waitError,
childSessionKey,
runId: childRunId,
modelApplied: model ? modelApplied : undefined,
warning: modelWarning,
});
}
if (waitStatus === "error") {
return jsonResult({
status: "error",
error: waitError ?? "agent error",
childSessionKey,
runId: childRunId,
modelApplied: model ? modelApplied : undefined,
warning: modelWarning,
});
}
const replyText = await readLatestAssistantReply({
sessionKey: childSessionKey,
});
if (beginSubagentAnnounce(childRunId)) {
void runSubagentAnnounceFlow({
childSessionKey,
childRunId,
requesterSessionKey: requesterInternalKey,
requesterProvider: opts?.agentProvider,
requesterDisplayKey,
task,
timeoutMs: 30_000,
cleanup,
roundOneReply: replyText,
startedAt: waitStartedAt,
endedAt: waitEndedAt,
});
}
return jsonResult({
status: "ok",
status: "accepted",
childSessionKey,
runId: childRunId,
reply: replyText,
modelApplied: model ? modelApplied : undefined,
warning: modelWarning,
});