fix: wait for final agent response in sessions_send

This commit is contained in:
Peter Steinberger
2026-01-04 00:40:40 +01:00
parent e07fdd117d
commit 3bc24bf179
2 changed files with 75 additions and 182 deletions

View File

@@ -1,54 +1,10 @@
import { describe, expect, it, vi } from "vitest"; import { describe, expect, it, vi } from "vitest";
const callGatewayMock = vi.fn(); const callGatewayMock = vi.fn();
const nextRunId = "run-1";
const nextRunState: "done" | "error" = "done";
vi.mock("../gateway/call.js", () => ({ vi.mock("../gateway/call.js", () => ({
callGateway: (opts: unknown) => callGatewayMock(opts), callGateway: (opts: unknown) => callGatewayMock(opts),
})); }));
vi.mock("../gateway/client.js", () => ({
GatewayClient: class {
private opts: {
onEvent?: (evt: {
event?: string;
payload?: {
runId?: string;
stream?: string;
data?: Record<string, unknown>;
};
}) => void;
};
constructor(opts: {
onEvent?: (evt: {
event?: string;
payload?: {
runId?: string;
stream?: string;
data?: Record<string, unknown>;
};
}) => void;
}) {
this.opts = opts;
}
start() {
setTimeout(() => {
this.opts.onEvent?.({
event: "agent",
payload: {
runId: nextRunId,
stream: "job",
data:
nextRunState === "error" ? { state: "error" } : { state: "done" },
},
});
}, 1);
}
stop() {}
},
}));
vi.mock("../config/config.js", () => ({ vi.mock("../config/config.js", () => ({
loadConfig: () => ({ loadConfig: () => ({
session: { mainKey: "main", scope: "per-sender" }, session: { mainKey: "main", scope: "per-sender" },
@@ -60,6 +16,7 @@ import { createClawdisTools } from "./clawdis-tools.js";
describe("sessions tools", () => { describe("sessions tools", () => {
it("sessions_list filters kinds and includes messages", async () => { it("sessions_list filters kinds and includes messages", async () => {
callGatewayMock.mockReset();
callGatewayMock.mockImplementation(async (opts: unknown) => { callGatewayMock.mockImplementation(async (opts: unknown) => {
const request = opts as { method?: string }; const request = opts as { method?: string };
if (request.method === "sessions.list") { if (request.method === "sessions.list") {
@@ -131,6 +88,7 @@ describe("sessions tools", () => {
}); });
it("sessions_history filters tool messages by default", async () => { it("sessions_history filters tool messages by default", async () => {
callGatewayMock.mockReset();
callGatewayMock.mockImplementation(async (opts: unknown) => { callGatewayMock.mockImplementation(async (opts: unknown) => {
const request = opts as { method?: string }; const request = opts as { method?: string };
if (request.method === "chat.history") { if (request.method === "chat.history") {
@@ -164,8 +122,11 @@ describe("sessions tools", () => {
}); });
it("sessions_send supports fire-and-forget and wait", async () => { it("sessions_send supports fire-and-forget and wait", async () => {
callGatewayMock.mockReset();
const calls: Array<{ method?: string; expectFinal?: boolean }> = [];
callGatewayMock.mockImplementation(async (opts: unknown) => { callGatewayMock.mockImplementation(async (opts: unknown) => {
const request = opts as { method?: string }; const request = opts as { method?: string; expectFinal?: boolean };
calls.push(request);
if (request.method === "agent") { if (request.method === "agent") {
return { runId: "run-1", status: "accepted" }; return { runId: "run-1", status: "accepted" };
} }
@@ -203,5 +164,9 @@ describe("sessions tools", () => {
runId: "run-1", runId: "run-1",
reply: "done", reply: "done",
}); });
const agentCalls = calls.filter((call) => call.method === "agent");
expect(agentCalls[0]?.expectFinal).toBeUndefined();
expect(agentCalls[1]?.expectFinal).toBe(true);
}); });
}); });

View File

@@ -45,7 +45,6 @@ import {
type ClawdisConfig, type ClawdisConfig,
type DiscordActionConfig, type DiscordActionConfig,
loadConfig, loadConfig,
resolveGatewayPort,
} from "../config/config.js"; } from "../config/config.js";
import { import {
addRoleDiscord, addRoleDiscord,
@@ -78,7 +77,6 @@ import {
unpinMessageDiscord, unpinMessageDiscord,
} from "../discord/send.js"; } from "../discord/send.js";
import { callGateway } from "../gateway/call.js"; import { callGateway } from "../gateway/call.js";
import { GatewayClient } from "../gateway/client.js";
import { detectMime, imageMimeFromFormat } from "../media/mime.js"; import { detectMime, imageMimeFromFormat } from "../media/mime.js";
import { sanitizeToolResultImages } from "./tool-images.js"; import { sanitizeToolResultImages } from "./tool-images.js";
@@ -339,88 +337,6 @@ function extractAssistantText(message: unknown): string | undefined {
return joined ? joined : undefined; return joined ? joined : undefined;
} }
function resolveGatewayConnection(opts: GatewayCallOptions) {
const cfg = loadConfig();
const isRemoteMode = cfg.gateway?.mode === "remote";
const remote = isRemoteMode ? cfg.gateway?.remote : undefined;
const localPort = resolveGatewayPort(cfg);
const url =
normalizeKey(opts.gatewayUrl) ??
(typeof remote?.url === "string" && remote.url.trim()
? remote.url.trim()
: undefined) ??
`ws://127.0.0.1:${localPort}`;
const token =
normalizeKey(opts.gatewayToken) ??
(isRemoteMode
? normalizeKey(remote?.token)
: (normalizeKey(process.env.CLAWDIS_GATEWAY_TOKEN) ??
normalizeKey(cfg.gateway?.auth?.token)));
const password =
normalizeKey(process.env.CLAWDIS_GATEWAY_PASSWORD) ??
normalizeKey(remote?.password);
return { url, token, password };
}
async function waitForAgentCompletion(params: {
connection: ReturnType<typeof resolveGatewayConnection>;
runId: string;
timeoutMs: number;
}): Promise<{ status: "done" | "error" | "timeout"; error?: string }> {
const { connection, runId, timeoutMs } = params;
return await new Promise((resolve) => {
let settled = false;
const done = (status: "done" | "error" | "timeout", error?: string) => {
if (settled) return;
settled = true;
clearTimeout(timer);
client.stop();
resolve({ status, error });
};
const client = new GatewayClient({
url: connection.url,
token: connection.token,
password: connection.password,
clientName: "agent",
clientVersion: "dev",
platform: process.platform,
mode: "agent",
instanceId: crypto.randomUUID(),
onEvent: (evt) => {
if (evt.event !== "agent") return;
const payload = evt.payload as {
runId?: unknown;
stream?: unknown;
data?: Record<string, unknown>;
};
if (payload?.runId !== runId) return;
if (payload.stream !== "job") return;
const state = payload.data?.state;
if (state === "done") {
done("done");
return;
}
if (state === "error") {
done(
"error",
typeof payload.data?.error === "string"
? payload.data.error
: undefined,
);
}
},
onClose: (_code, _reason) => {
done("timeout");
},
});
const timer = setTimeout(() => done("timeout"), timeoutMs);
client.start();
});
}
async function imageResult(params: { async function imageResult(params: {
label: string; label: string;
path: string; path: string;
@@ -2775,23 +2691,33 @@ function createSessionsSendTool(): AnyAgentTool {
: 30; : 30;
const idempotencyKey = crypto.randomUUID(); const idempotencyKey = crypto.randomUUID();
let runId = idempotencyKey; let runId = idempotencyKey;
try { const displayKey = resolveDisplaySessionKey({
const response = (await callGateway({ key: sessionKey,
method: "agent", alias,
params: { mainKey,
});
const sendParams = {
message, message,
sessionKey: resolvedKey, sessionKey: resolvedKey,
idempotencyKey, idempotencyKey,
deliver: false, deliver: false,
}, };
timeoutMs:
timeoutSeconds > 0 if (timeoutSeconds === 0) {
? Math.min(timeoutSeconds * 1000, 10_000) try {
: 10_000, const response = (await callGateway({
})) as { runId?: string; status?: string }; method: "agent",
params: sendParams,
timeoutMs: 10_000,
})) as { runId?: string };
if (typeof response?.runId === "string" && response.runId) { if (typeof response?.runId === "string" && response.runId) {
runId = response.runId; runId = response.runId;
} }
return jsonResult({
runId,
status: "accepted",
sessionKey: displayKey,
});
} catch (err) { } catch (err) {
const message = const message =
err instanceof Error ? err.message : String(err ?? "error"); err instanceof Error ? err.message : String(err ?? "error");
@@ -2799,42 +2725,49 @@ function createSessionsSendTool(): AnyAgentTool {
runId, runId,
status: "error", status: "error",
error: message, error: message,
sessionKey: resolveDisplaySessionKey({ sessionKey: displayKey,
key: sessionKey,
alias,
mainKey,
}),
}); });
} }
if (timeoutSeconds === 0) {
return jsonResult({
runId,
status: "accepted",
sessionKey: resolveDisplaySessionKey({
key: sessionKey,
alias,
mainKey,
}),
});
} }
const connection = resolveGatewayConnection({}); try {
const wait = await waitForAgentCompletion({ const response = (await callGateway({
connection, method: "agent",
runId, params: sendParams,
expectFinal: true,
timeoutMs: timeoutSeconds * 1000, timeoutMs: timeoutSeconds * 1000,
}); })) as { runId?: string; status?: string };
if (typeof response?.runId === "string" && response.runId) {
if (wait.status === "timeout") { runId = response.runId;
}
} catch (err) {
const message =
err instanceof Error ? err.message : String(err ?? "error");
if (message.includes("gateway timeout")) {
try {
const cached = (await callGateway({
method: "agent",
params: sendParams,
timeoutMs: 5_000,
})) as { runId?: string };
if (typeof cached?.runId === "string" && cached.runId) {
runId = cached.runId;
}
} catch {
/* ignore */
}
return jsonResult({ return jsonResult({
runId, runId,
status: "timeout", status: "timeout",
sessionKey: resolveDisplaySessionKey({ error: message,
key: sessionKey, sessionKey: displayKey,
alias, });
mainKey, }
}), return jsonResult({
runId,
status: "error",
error: message,
sessionKey: displayKey,
}); });
} }
@@ -2851,14 +2784,9 @@ function createSessionsSendTool(): AnyAgentTool {
return jsonResult({ return jsonResult({
runId, runId,
status: wait.status === "error" ? "error" : "ok", status: "ok",
error: wait.error,
reply, reply,
sessionKey: resolveDisplaySessionKey({ sessionKey: displayKey,
key: sessionKey,
alias,
mainKey,
}),
}); });
}, },
}; };