diff --git a/src/agents/clawdis-tools.sessions.test.ts b/src/agents/clawdis-tools.sessions.test.ts index 1453f9701..cb9ac6ef7 100644 --- a/src/agents/clawdis-tools.sessions.test.ts +++ b/src/agents/clawdis-tools.sessions.test.ts @@ -1,11 +1,54 @@ import { describe, expect, it, vi } from "vitest"; const callGatewayMock = vi.fn(); +const nextRunId = "run-1"; +const nextRunState: "done" | "error" = "done"; vi.mock("../gateway/call.js", () => ({ callGateway: (opts: unknown) => callGatewayMock(opts), })); +vi.mock("../gateway/client.js", () => ({ + GatewayClient: class { + private opts: { + onEvent?: (evt: { + event?: string; + payload?: { + runId?: string; + stream?: string; + data?: Record; + }; + }) => void; + }; + constructor(opts: { + onEvent?: (evt: { + event?: string; + payload?: { + runId?: string; + stream?: string; + data?: Record; + }; + }) => void; + }) { + this.opts = opts; + } + start() { + setTimeout(() => { + this.opts.onEvent?.({ + event: "agent", + payload: { + runId: nextRunId, + stream: "job", + data: + nextRunState === "error" ? { state: "error" } : { state: "done" }, + }, + }); + }, 1); + } + stop() {} + }, +})); + vi.mock("../config/config.js", () => ({ loadConfig: () => ({ session: { mainKey: "main", scope: "per-sender" }, @@ -122,11 +165,9 @@ describe("sessions tools", () => { it("sessions_send supports fire-and-forget and wait", async () => { callGatewayMock.mockImplementation(async (opts: unknown) => { - const request = opts as { method?: string; expectFinal?: boolean }; + const request = opts as { method?: string }; if (request.method === "agent") { - return request.expectFinal - ? { runId: "run-1", status: "ok" } - : { runId: "run-1", status: "accepted" }; + return { runId: "run-1", status: "accepted" }; } if (request.method === "chat.history") { return { diff --git a/src/agents/clawdis-tools.ts b/src/agents/clawdis-tools.ts index e8e8b1f04..7ca44f6eb 100644 --- a/src/agents/clawdis-tools.ts +++ b/src/agents/clawdis-tools.ts @@ -45,6 +45,7 @@ import { type ClawdisConfig, type DiscordActionConfig, loadConfig, + resolveGatewayPort, } from "../config/config.js"; import { addRoleDiscord, @@ -77,6 +78,7 @@ import { unpinMessageDiscord, } from "../discord/send.js"; import { callGateway } from "../gateway/call.js"; +import { GatewayClient } from "../gateway/client.js"; import { detectMime, imageMimeFromFormat } from "../media/mime.js"; import { sanitizeToolResultImages } from "./tool-images.js"; @@ -337,6 +339,88 @@ function extractAssistantText(message: unknown): string | undefined { return joined ? joined : undefined; } +function resolveGatewayConnection(opts: GatewayCallOptions) { + const cfg = loadConfig(); + const isRemoteMode = cfg.gateway?.mode === "remote"; + const remote = isRemoteMode ? cfg.gateway?.remote : undefined; + const localPort = resolveGatewayPort(cfg); + + const url = + normalizeKey(opts.gatewayUrl) ?? + (typeof remote?.url === "string" && remote.url.trim() + ? remote.url.trim() + : undefined) ?? + `ws://127.0.0.1:${localPort}`; + const token = + normalizeKey(opts.gatewayToken) ?? + (isRemoteMode + ? normalizeKey(remote?.token) + : (normalizeKey(process.env.CLAWDIS_GATEWAY_TOKEN) ?? + normalizeKey(cfg.gateway?.auth?.token))); + const password = + normalizeKey(process.env.CLAWDIS_GATEWAY_PASSWORD) ?? + normalizeKey(remote?.password); + return { url, token, password }; +} + +async function waitForAgentCompletion(params: { + connection: ReturnType; + runId: string; + timeoutMs: number; +}): Promise<{ status: "done" | "error" | "timeout"; error?: string }> { + const { connection, runId, timeoutMs } = params; + return await new Promise((resolve) => { + let settled = false; + const done = (status: "done" | "error" | "timeout", error?: string) => { + if (settled) return; + settled = true; + clearTimeout(timer); + client.stop(); + resolve({ status, error }); + }; + + const client = new GatewayClient({ + url: connection.url, + token: connection.token, + password: connection.password, + clientName: "agent", + clientVersion: "dev", + platform: process.platform, + mode: "agent", + instanceId: crypto.randomUUID(), + onEvent: (evt) => { + if (evt.event !== "agent") return; + const payload = evt.payload as { + runId?: unknown; + stream?: unknown; + data?: Record; + }; + if (payload?.runId !== runId) return; + if (payload.stream !== "job") return; + const state = payload.data?.state; + if (state === "done") { + done("done"); + return; + } + if (state === "error") { + done( + "error", + typeof payload.data?.error === "string" + ? payload.data.error + : undefined, + ); + } + }, + onClose: (_code, _reason) => { + done("timeout"); + }, + }); + + const timer = setTimeout(() => done("timeout"), timeoutMs); + client.start(); + }); +} + async function imageResult(params: { label: string; path: string; @@ -2690,6 +2774,7 @@ function createSessionsSendTool(): AnyAgentTool { ? Math.max(0, Math.floor(params.timeoutSeconds)) : 30; const idempotencyKey = crypto.randomUUID(); + let runId = idempotencyKey; try { const response = (await callGateway({ method: "agent", @@ -2699,55 +2784,20 @@ function createSessionsSendTool(): AnyAgentTool { idempotencyKey, deliver: false, }, - expectFinal: timeoutSeconds > 0, - timeoutMs: timeoutSeconds > 0 ? timeoutSeconds * 1000 : undefined, + timeoutMs: + timeoutSeconds > 0 + ? Math.min(timeoutSeconds * 1000, 10_000) + : 10_000, })) as { runId?: string; status?: string }; - - const runId = - typeof response?.runId === "string" && response.runId - ? response.runId - : idempotencyKey; - - if (timeoutSeconds === 0) { - return jsonResult({ - runId, - status: "accepted", - sessionKey: resolveDisplaySessionKey({ - key: sessionKey, - alias, - mainKey, - }), - }); + if (typeof response?.runId === "string" && response.runId) { + runId = response.runId; } - - const history = (await callGateway({ - method: "chat.history", - params: { sessionKey: resolvedKey, limit: 50 }, - })) as { messages?: unknown[] }; - const filtered = stripToolMessages( - Array.isArray(history?.messages) ? history.messages : [], - ); - const last = - filtered.length > 0 ? filtered[filtered.length - 1] : undefined; - const reply = last ? extractAssistantText(last) : undefined; - - return jsonResult({ - runId, - status: "ok", - reply, - sessionKey: resolveDisplaySessionKey({ - key: sessionKey, - alias, - mainKey, - }), - }); } catch (err) { const message = err instanceof Error ? err.message : String(err ?? "error"); - const isTimeout = message.toLowerCase().includes("timeout"); return jsonResult({ - runId: idempotencyKey, - status: isTimeout ? "timeout" : "error", + runId, + status: "error", error: message, sessionKey: resolveDisplaySessionKey({ key: sessionKey, @@ -2756,6 +2806,60 @@ function createSessionsSendTool(): AnyAgentTool { }), }); } + + if (timeoutSeconds === 0) { + return jsonResult({ + runId, + status: "accepted", + sessionKey: resolveDisplaySessionKey({ + key: sessionKey, + alias, + mainKey, + }), + }); + } + + const connection = resolveGatewayConnection({}); + const wait = await waitForAgentCompletion({ + connection, + runId, + timeoutMs: timeoutSeconds * 1000, + }); + + if (wait.status === "timeout") { + return jsonResult({ + runId, + status: "timeout", + sessionKey: resolveDisplaySessionKey({ + key: sessionKey, + alias, + mainKey, + }), + }); + } + + const history = (await callGateway({ + method: "chat.history", + params: { sessionKey: resolvedKey, limit: 50 }, + })) as { messages?: unknown[] }; + const filtered = stripToolMessages( + Array.isArray(history?.messages) ? history.messages : [], + ); + const last = + filtered.length > 0 ? filtered[filtered.length - 1] : undefined; + const reply = last ? extractAssistantText(last) : undefined; + + return jsonResult({ + runId, + status: wait.status === "error" ? "error" : "ok", + error: wait.error, + reply, + sessionKey: resolveDisplaySessionKey({ + key: sessionKey, + alias, + mainKey, + }), + }); }, }; }