fix: make sessions_send wait via agent events

This commit is contained in:
Peter Steinberger
2026-01-04 00:12:14 +01:00
parent 03ee77b0e1
commit 0f6e566a20
2 changed files with 192 additions and 47 deletions

View File

@@ -1,11 +1,54 @@
import { describe, expect, it, vi } from "vitest"; import { describe, expect, it, vi } from "vitest";
const callGatewayMock = vi.fn(); const callGatewayMock = vi.fn();
const nextRunId = "run-1";
const nextRunState: "done" | "error" = "done";
vi.mock("../gateway/call.js", () => ({ vi.mock("../gateway/call.js", () => ({
callGateway: (opts: unknown) => callGatewayMock(opts), callGateway: (opts: unknown) => callGatewayMock(opts),
})); }));
vi.mock("../gateway/client.js", () => ({
GatewayClient: class {
private opts: {
onEvent?: (evt: {
event?: string;
payload?: {
runId?: string;
stream?: string;
data?: Record<string, unknown>;
};
}) => void;
};
constructor(opts: {
onEvent?: (evt: {
event?: string;
payload?: {
runId?: string;
stream?: string;
data?: Record<string, unknown>;
};
}) => void;
}) {
this.opts = opts;
}
start() {
setTimeout(() => {
this.opts.onEvent?.({
event: "agent",
payload: {
runId: nextRunId,
stream: "job",
data:
nextRunState === "error" ? { state: "error" } : { state: "done" },
},
});
}, 1);
}
stop() {}
},
}));
vi.mock("../config/config.js", () => ({ vi.mock("../config/config.js", () => ({
loadConfig: () => ({ loadConfig: () => ({
session: { mainKey: "main", scope: "per-sender" }, session: { mainKey: "main", scope: "per-sender" },
@@ -122,11 +165,9 @@ describe("sessions tools", () => {
it("sessions_send supports fire-and-forget and wait", async () => { it("sessions_send supports fire-and-forget and wait", async () => {
callGatewayMock.mockImplementation(async (opts: unknown) => { callGatewayMock.mockImplementation(async (opts: unknown) => {
const request = opts as { method?: string; expectFinal?: boolean }; const request = opts as { method?: string };
if (request.method === "agent") { if (request.method === "agent") {
return request.expectFinal return { runId: "run-1", status: "accepted" };
? { runId: "run-1", status: "ok" }
: { runId: "run-1", status: "accepted" };
} }
if (request.method === "chat.history") { if (request.method === "chat.history") {
return { return {

View File

@@ -45,6 +45,7 @@ import {
type ClawdisConfig, type ClawdisConfig,
type DiscordActionConfig, type DiscordActionConfig,
loadConfig, loadConfig,
resolveGatewayPort,
} from "../config/config.js"; } from "../config/config.js";
import { import {
addRoleDiscord, addRoleDiscord,
@@ -77,6 +78,7 @@ import {
unpinMessageDiscord, unpinMessageDiscord,
} from "../discord/send.js"; } from "../discord/send.js";
import { callGateway } from "../gateway/call.js"; import { callGateway } from "../gateway/call.js";
import { GatewayClient } from "../gateway/client.js";
import { detectMime, imageMimeFromFormat } from "../media/mime.js"; import { detectMime, imageMimeFromFormat } from "../media/mime.js";
import { sanitizeToolResultImages } from "./tool-images.js"; import { sanitizeToolResultImages } from "./tool-images.js";
@@ -337,6 +339,88 @@ function extractAssistantText(message: unknown): string | undefined {
return joined ? joined : undefined; return joined ? joined : undefined;
} }
function resolveGatewayConnection(opts: GatewayCallOptions) {
const cfg = loadConfig();
const isRemoteMode = cfg.gateway?.mode === "remote";
const remote = isRemoteMode ? cfg.gateway?.remote : undefined;
const localPort = resolveGatewayPort(cfg);
const url =
normalizeKey(opts.gatewayUrl) ??
(typeof remote?.url === "string" && remote.url.trim()
? remote.url.trim()
: undefined) ??
`ws://127.0.0.1:${localPort}`;
const token =
normalizeKey(opts.gatewayToken) ??
(isRemoteMode
? normalizeKey(remote?.token)
: (normalizeKey(process.env.CLAWDIS_GATEWAY_TOKEN) ??
normalizeKey(cfg.gateway?.auth?.token)));
const password =
normalizeKey(process.env.CLAWDIS_GATEWAY_PASSWORD) ??
normalizeKey(remote?.password);
return { url, token, password };
}
async function waitForAgentCompletion(params: {
connection: ReturnType<typeof resolveGatewayConnection>;
runId: string;
timeoutMs: number;
}): Promise<{ status: "done" | "error" | "timeout"; error?: string }> {
const { connection, runId, timeoutMs } = params;
return await new Promise((resolve) => {
let settled = false;
const done = (status: "done" | "error" | "timeout", error?: string) => {
if (settled) return;
settled = true;
clearTimeout(timer);
client.stop();
resolve({ status, error });
};
const client = new GatewayClient({
url: connection.url,
token: connection.token,
password: connection.password,
clientName: "agent",
clientVersion: "dev",
platform: process.platform,
mode: "agent",
instanceId: crypto.randomUUID(),
onEvent: (evt) => {
if (evt.event !== "agent") return;
const payload = evt.payload as {
runId?: unknown;
stream?: unknown;
data?: Record<string, unknown>;
};
if (payload?.runId !== runId) return;
if (payload.stream !== "job") return;
const state = payload.data?.state;
if (state === "done") {
done("done");
return;
}
if (state === "error") {
done(
"error",
typeof payload.data?.error === "string"
? payload.data.error
: undefined,
);
}
},
onClose: (_code, _reason) => {
done("timeout");
},
});
const timer = setTimeout(() => done("timeout"), timeoutMs);
client.start();
});
}
async function imageResult(params: { async function imageResult(params: {
label: string; label: string;
path: string; path: string;
@@ -2690,6 +2774,7 @@ function createSessionsSendTool(): AnyAgentTool {
? Math.max(0, Math.floor(params.timeoutSeconds)) ? Math.max(0, Math.floor(params.timeoutSeconds))
: 30; : 30;
const idempotencyKey = crypto.randomUUID(); const idempotencyKey = crypto.randomUUID();
let runId = idempotencyKey;
try { try {
const response = (await callGateway({ const response = (await callGateway({
method: "agent", method: "agent",
@@ -2699,55 +2784,20 @@ function createSessionsSendTool(): AnyAgentTool {
idempotencyKey, idempotencyKey,
deliver: false, deliver: false,
}, },
expectFinal: timeoutSeconds > 0, timeoutMs:
timeoutMs: timeoutSeconds > 0 ? timeoutSeconds * 1000 : undefined, timeoutSeconds > 0
? Math.min(timeoutSeconds * 1000, 10_000)
: 10_000,
})) as { runId?: string; status?: string }; })) as { runId?: string; status?: string };
if (typeof response?.runId === "string" && response.runId) {
const runId = runId = response.runId;
typeof response?.runId === "string" && response.runId
? response.runId
: idempotencyKey;
if (timeoutSeconds === 0) {
return jsonResult({
runId,
status: "accepted",
sessionKey: resolveDisplaySessionKey({
key: sessionKey,
alias,
mainKey,
}),
});
} }
const history = (await callGateway({
method: "chat.history",
params: { sessionKey: resolvedKey, limit: 50 },
})) as { messages?: unknown[] };
const filtered = stripToolMessages(
Array.isArray(history?.messages) ? history.messages : [],
);
const last =
filtered.length > 0 ? filtered[filtered.length - 1] : undefined;
const reply = last ? extractAssistantText(last) : undefined;
return jsonResult({
runId,
status: "ok",
reply,
sessionKey: resolveDisplaySessionKey({
key: sessionKey,
alias,
mainKey,
}),
});
} catch (err) { } catch (err) {
const message = const message =
err instanceof Error ? err.message : String(err ?? "error"); err instanceof Error ? err.message : String(err ?? "error");
const isTimeout = message.toLowerCase().includes("timeout");
return jsonResult({ return jsonResult({
runId: idempotencyKey, runId,
status: isTimeout ? "timeout" : "error", status: "error",
error: message, error: message,
sessionKey: resolveDisplaySessionKey({ sessionKey: resolveDisplaySessionKey({
key: sessionKey, key: sessionKey,
@@ -2756,6 +2806,60 @@ function createSessionsSendTool(): AnyAgentTool {
}), }),
}); });
} }
if (timeoutSeconds === 0) {
return jsonResult({
runId,
status: "accepted",
sessionKey: resolveDisplaySessionKey({
key: sessionKey,
alias,
mainKey,
}),
});
}
const connection = resolveGatewayConnection({});
const wait = await waitForAgentCompletion({
connection,
runId,
timeoutMs: timeoutSeconds * 1000,
});
if (wait.status === "timeout") {
return jsonResult({
runId,
status: "timeout",
sessionKey: resolveDisplaySessionKey({
key: sessionKey,
alias,
mainKey,
}),
});
}
const history = (await callGateway({
method: "chat.history",
params: { sessionKey: resolvedKey, limit: 50 },
})) as { messages?: unknown[] };
const filtered = stripToolMessages(
Array.isArray(history?.messages) ? history.messages : [],
);
const last =
filtered.length > 0 ? filtered[filtered.length - 1] : undefined;
const reply = last ? extractAssistantText(last) : undefined;
return jsonResult({
runId,
status: wait.status === "error" ? "error" : "ok",
error: wait.error,
reply,
sessionKey: resolveDisplaySessionKey({
key: sessionKey,
alias,
mainKey,
}),
});
}, },
}; };
} }