import { randomUUID } from "node:crypto"; import fs from "node:fs/promises"; import { type AddressInfo, createServer } from "node:net"; import os from "node:os"; import path from "node:path"; import { afterEach, beforeEach, describe, expect, test, vi } from "vitest"; import { WebSocket } from "ws"; import { agentCommand } from "../commands/agent.js"; import { emitAgentEvent } from "../infra/agent-events.js"; import { GatewayLockError } from "../infra/gateway-lock.js"; import { emitHeartbeatEvent } from "../infra/heartbeat-events.js"; import { PROTOCOL_VERSION } from "./protocol/index.js"; import { startGatewayServer } from "./server.js"; type BridgeClientInfo = { nodeId: string; displayName?: string; platform?: string; version?: string; remoteIp?: string; }; type BridgeStartOpts = { onAuthenticated?: (node: BridgeClientInfo) => Promise | void; onDisconnected?: (node: BridgeClientInfo) => Promise | void; onPairRequested?: (request: unknown) => Promise | void; onEvent?: ( nodeId: string, evt: { event: string; payloadJSON?: string | null }, ) => Promise | void; onRequest?: ( nodeId: string, req: { id: string; method: string; paramsJSON?: string | null }, ) => Promise< | { ok: true; payloadJSON?: string | null } | { ok: false; error: { code: string; message: string; details?: unknown } } >; }; const bridgeStartCalls = vi.hoisted(() => [] as BridgeStartOpts[]); const bridgeInvoke = vi.hoisted(() => vi.fn(async () => ({ type: "invoke-res", id: "1", ok: true, payloadJSON: JSON.stringify({ ok: true }), error: null, })), ); const bridgeListConnected = vi.hoisted(() => vi.fn(() => [] as BridgeClientInfo[]), ); const bridgeSendEvent = vi.hoisted(() => vi.fn()); vi.mock("../infra/bridge/server.js", () => ({ startNodeBridgeServer: vi.fn(async (opts: BridgeStartOpts) => { bridgeStartCalls.push(opts); return { port: 18790, close: async () => {}, listConnected: bridgeListConnected, invoke: bridgeInvoke, sendEvent: bridgeSendEvent, }; }), })); let testSessionStorePath: string | undefined; let testAllowFrom: string[] | undefined; let testCronStorePath: string | undefined; let testCronEnabled: boolean | undefined = false; vi.mock("../config/config.js", () => ({ loadConfig: () => ({ inbound: { allowFrom: testAllowFrom, workspace: path.join(os.tmpdir(), "clawd-gateway-test"), agent: { provider: "anthropic", model: "claude-opus-4-5" }, session: { mainKey: "main", store: testSessionStorePath }, }, cron: (() => { const cron: Record = {}; if (typeof testCronEnabled === "boolean") cron.enabled = testCronEnabled; if (typeof testCronStorePath === "string") cron.store = testCronStorePath; return Object.keys(cron).length > 0 ? cron : undefined; })(), }), })); vi.mock("../commands/health.js", () => ({ getHealthSnapshot: vi.fn().mockResolvedValue({ ok: true, stub: true }), })); vi.mock("../commands/status.js", () => ({ getStatusSummary: vi.fn().mockResolvedValue({ ok: true }), })); vi.mock("../web/outbound.js", () => ({ sendMessageWhatsApp: vi .fn() .mockResolvedValue({ messageId: "msg-1", toJid: "jid-1" }), })); vi.mock("../commands/agent.js", () => ({ agentCommand: vi.fn().mockResolvedValue(undefined), })); process.env.CLAWDIS_SKIP_PROVIDERS = "1"; let previousHome: string | undefined; let tempHome: string | undefined; beforeEach(async () => { previousHome = process.env.HOME; tempHome = await fs.mkdtemp(path.join(os.tmpdir(), "clawdis-gateway-home-")); process.env.HOME = tempHome; }); afterEach(async () => { process.env.HOME = previousHome; if (tempHome) { await fs.rm(tempHome, { recursive: true, force: true }); tempHome = undefined; } }); async function getFreePort(): Promise { return await new Promise((resolve, reject) => { const server = createServer(); server.listen(0, "127.0.0.1", () => { const port = (server.address() as AddressInfo).port; server.close((err) => (err ? reject(err) : resolve(port))); }); }); } async function occupyPort(): Promise<{ server: ReturnType; port: number; }> { return await new Promise((resolve, reject) => { const server = createServer(); server.once("error", reject); server.listen(0, "127.0.0.1", () => { const port = (server.address() as AddressInfo).port; resolve({ server, port }); }); }); } function onceMessage( ws: WebSocket, filter: (obj: unknown) => boolean, timeoutMs = 3000, ): Promise { return new Promise((resolve, reject) => { const timer = setTimeout(() => reject(new Error("timeout")), timeoutMs); const closeHandler = (code: number, reason: Buffer) => { clearTimeout(timer); ws.off("message", handler); reject(new Error(`closed ${code}: ${reason.toString()}`)); }; const handler = (data: WebSocket.RawData) => { const obj = JSON.parse(String(data)); if (filter(obj)) { clearTimeout(timer); ws.off("message", handler); ws.off("close", closeHandler); resolve(obj as T); } }; ws.on("message", handler); ws.once("close", closeHandler); }); } async function startServerWithClient(token?: string) { const port = await getFreePort(); const prev = process.env.CLAWDIS_GATEWAY_TOKEN; if (token === undefined) { delete process.env.CLAWDIS_GATEWAY_TOKEN; } else { process.env.CLAWDIS_GATEWAY_TOKEN = token; } const server = await startGatewayServer(port); const ws = new WebSocket(`ws://127.0.0.1:${port}`); await new Promise((resolve) => ws.once("open", resolve)); return { server, ws, port, prevToken: prev }; } type ConnectResponse = { type: "res"; id: string; ok: boolean; payload?: unknown; error?: { message?: string }; }; async function connectReq( ws: WebSocket, opts?: { token?: string; minProtocol?: number; maxProtocol?: number; client?: { name: string; version: string; platform: string; mode: string; instanceId?: string; }; }, ): Promise { const id = randomUUID(); ws.send( JSON.stringify({ type: "req", id, method: "connect", params: { minProtocol: opts?.minProtocol ?? PROTOCOL_VERSION, maxProtocol: opts?.maxProtocol ?? PROTOCOL_VERSION, client: opts?.client ?? { name: "test", version: "1.0.0", platform: "test", mode: "test", }, caps: [], auth: opts?.token ? { token: opts.token } : undefined, }, }), ); return await onceMessage( ws, (o) => o.type === "res" && o.id === id, ); } async function connectOk( ws: WebSocket, opts?: Parameters[1], ) { const res = await connectReq(ws, opts); expect(res.ok).toBe(true); expect((res.payload as { type?: unknown } | undefined)?.type).toBe( "hello-ok", ); return res.payload as { type: "hello-ok" }; } async function rpcReq( ws: WebSocket, method: string, params?: unknown, ) { const id = randomUUID(); ws.send(JSON.stringify({ type: "req", id, method, params })); return await onceMessage<{ type: "res"; id: string; ok: boolean; payload?: T; error?: { message?: string }; }>(ws, (o) => o.type === "res" && o.id === id); } describe("gateway server", () => { test("voicewake.get returns defaults and voicewake.set broadcasts", async () => { const homeDir = await fs.mkdtemp(path.join(os.tmpdir(), "clawdis-home-")); const prevHome = process.env.HOME; process.env.HOME = homeDir; const { server, ws } = await startServerWithClient(); await connectOk(ws); const initial = await rpcReq<{ triggers: string[] }>(ws, "voicewake.get"); expect(initial.ok).toBe(true); expect(initial.payload?.triggers).toEqual(["clawd", "claude"]); const changedP = onceMessage<{ type: "event"; event: string; payload?: unknown; }>(ws, (o) => o.type === "event" && o.event === "voicewake.changed"); const setRes = await rpcReq<{ triggers: string[] }>(ws, "voicewake.set", { triggers: [" hi ", "", "there"], }); expect(setRes.ok).toBe(true); expect(setRes.payload?.triggers).toEqual(["hi", "there"]); const changed = await changedP; expect(changed.event).toBe("voicewake.changed"); expect( (changed.payload as { triggers?: unknown } | undefined)?.triggers, ).toEqual(["hi", "there"]); const after = await rpcReq<{ triggers: string[] }>(ws, "voicewake.get"); expect(after.ok).toBe(true); expect(after.payload?.triggers).toEqual(["hi", "there"]); const onDisk = JSON.parse( await fs.readFile( path.join(homeDir, ".clawdis", "settings", "voicewake.json"), "utf8", ), ) as { triggers?: unknown; updatedAtMs?: unknown }; expect(onDisk.triggers).toEqual(["hi", "there"]); expect(typeof onDisk.updatedAtMs).toBe("number"); ws.close(); await server.close(); if (prevHome === undefined) { delete process.env.HOME; } else { process.env.HOME = prevHome; } }); test("pushes voicewake.changed to nodes on connect and on updates", async () => { const homeDir = await fs.mkdtemp(path.join(os.tmpdir(), "clawdis-home-")); const prevHome = process.env.HOME; process.env.HOME = homeDir; bridgeSendEvent.mockClear(); bridgeListConnected.mockReturnValue([{ nodeId: "n1" }]); const { server, ws } = await startServerWithClient(); await connectOk(ws); const startCall = bridgeStartCalls.at(-1); expect(startCall).toBeTruthy(); await startCall?.onAuthenticated?.({ nodeId: "n1" }); const first = bridgeSendEvent.mock.calls.find( (c) => c[0]?.event === "voicewake.changed" && c[0]?.nodeId === "n1", )?.[0] as { payloadJSON?: string | null } | undefined; expect(first?.payloadJSON).toBeTruthy(); const firstPayload = JSON.parse(String(first?.payloadJSON)) as { triggers?: unknown; }; expect(firstPayload.triggers).toEqual(["clawd", "claude"]); bridgeSendEvent.mockClear(); const setRes = await rpcReq<{ triggers: string[] }>(ws, "voicewake.set", { triggers: ["clawd", "computer"], }); expect(setRes.ok).toBe(true); const broadcast = bridgeSendEvent.mock.calls.find( (c) => c[0]?.event === "voicewake.changed" && c[0]?.nodeId === "n1", )?.[0] as { payloadJSON?: string | null } | undefined; expect(broadcast?.payloadJSON).toBeTruthy(); const broadcastPayload = JSON.parse(String(broadcast?.payloadJSON)) as { triggers?: unknown; }; expect(broadcastPayload.triggers).toEqual(["clawd", "computer"]); ws.close(); await server.close(); if (prevHome === undefined) { delete process.env.HOME; } else { process.env.HOME = prevHome; } }); test("supports gateway-owned node pairing methods and events", async () => { const homeDir = await fs.mkdtemp(path.join(os.tmpdir(), "clawdis-home-")); const prevHome = process.env.HOME; process.env.HOME = homeDir; const { server, ws } = await startServerWithClient(); await connectOk(ws); const requestedP = onceMessage<{ type: "event"; event: string; payload?: unknown; }>(ws, (o) => o.type === "event" && o.event === "node.pair.requested"); ws.send( JSON.stringify({ type: "req", id: "pair-req-1", method: "node.pair.request", params: { nodeId: "n1", displayName: "Iris" }, }), ); const res1 = await onceMessage<{ type: "res"; ok: boolean; payload?: unknown; }>(ws, (o) => o.type === "res" && o.id === "pair-req-1"); expect(res1.ok).toBe(true); const req1 = (res1.payload as { request?: { requestId?: unknown } } | null) ?.request; const requestId = String(req1?.requestId ?? ""); expect(requestId.length).toBeGreaterThan(0); const evt1 = await requestedP; expect(evt1.event).toBe("node.pair.requested"); expect((evt1.payload as { requestId?: unknown } | null)?.requestId).toBe( requestId, ); // Second request for same node should return the existing pending request // without emitting a second requested event. ws.send( JSON.stringify({ type: "req", id: "pair-req-2", method: "node.pair.request", params: { nodeId: "n1", displayName: "Iris" }, }), ); const res2 = await onceMessage<{ type: "res"; ok: boolean; payload?: unknown; }>(ws, (o) => o.type === "res" && o.id === "pair-req-2"); expect(res2.ok).toBe(true); await expect( onceMessage( ws, (o) => o.type === "event" && o.event === "node.pair.requested", 200, ), ).rejects.toThrow(); const resolvedP = onceMessage<{ type: "event"; event: string; payload?: unknown; }>(ws, (o) => o.type === "event" && o.event === "node.pair.resolved"); ws.send( JSON.stringify({ type: "req", id: "pair-approve-1", method: "node.pair.approve", params: { requestId }, }), ); const approveRes = await onceMessage<{ type: "res"; ok: boolean; payload?: unknown; }>(ws, (o) => o.type === "res" && o.id === "pair-approve-1"); expect(approveRes.ok).toBe(true); const token = String( (approveRes.payload as { node?: { token?: unknown } } | null)?.node ?.token ?? "", ); expect(token.length).toBeGreaterThan(0); const evt2 = await resolvedP; expect((evt2.payload as { requestId?: unknown } | null)?.requestId).toBe( requestId, ); expect((evt2.payload as { decision?: unknown } | null)?.decision).toBe( "approved", ); ws.send( JSON.stringify({ type: "req", id: "pair-verify-1", method: "node.pair.verify", params: { nodeId: "n1", token }, }), ); const verifyRes = await onceMessage<{ type: "res"; ok: boolean; payload?: unknown; }>(ws, (o) => o.type === "res" && o.id === "pair-verify-1"); expect(verifyRes.ok).toBe(true); expect((verifyRes.payload as { ok?: unknown } | null)?.ok).toBe(true); ws.send( JSON.stringify({ type: "req", id: "pair-list-1", method: "node.pair.list", params: {}, }), ); const listRes = await onceMessage<{ type: "res"; ok: boolean; payload?: unknown; }>(ws, (o) => o.type === "res" && o.id === "pair-list-1"); expect(listRes.ok).toBe(true); const paired = (listRes.payload as { paired?: unknown } | null)?.paired; expect(Array.isArray(paired)).toBe(true); expect( (paired as Array<{ nodeId?: unknown }>).some((n) => n.nodeId === "n1"), ).toBe(true); ws.close(); await server.close(); await fs.rm(homeDir, { recursive: true, force: true }); if (prevHome === undefined) { delete process.env.HOME; } else { process.env.HOME = prevHome; } }); test("routes node.invoke to the node bridge", async () => { const homeDir = await fs.mkdtemp(path.join(os.tmpdir(), "clawdis-home-")); const prevHome = process.env.HOME; process.env.HOME = homeDir; try { bridgeInvoke.mockResolvedValueOnce({ type: "invoke-res", id: "inv-1", ok: true, payloadJSON: JSON.stringify({ result: "4" }), error: null, }); const { server, ws } = await startServerWithClient(); try { await connectOk(ws); const res = await rpcReq(ws, "node.invoke", { nodeId: "ios-node", command: "screen.eval", params: { javaScript: "2+2" }, timeoutMs: 123, idempotencyKey: "idem-1", }); expect(res.ok).toBe(true); expect(bridgeInvoke).toHaveBeenCalledWith( expect.objectContaining({ nodeId: "ios-node", command: "screen.eval", paramsJSON: JSON.stringify({ javaScript: "2+2" }), timeoutMs: 123, }), ); } finally { ws.close(); await server.close(); } } finally { await fs.rm(homeDir, { recursive: true, force: true }); if (prevHome === undefined) { delete process.env.HOME; } else { process.env.HOME = prevHome; } } }); test("emits presence updates for bridge connect/disconnect", async () => { const homeDir = await fs.mkdtemp(path.join(os.tmpdir(), "clawdis-home-")); const prevHome = process.env.HOME; process.env.HOME = homeDir; try { const before = bridgeStartCalls.length; const { server, ws } = await startServerWithClient(); try { await connectOk(ws); const bridgeCall = bridgeStartCalls[before]; expect(bridgeCall).toBeTruthy(); const waitPresenceReason = async (reason: string) => { await onceMessage( ws, (o) => { if (o.type !== "event" || o.event !== "presence") return false; const payload = o.payload as { presence?: unknown } | null; const list = payload?.presence; if (!Array.isArray(list)) return false; return list.some( (p) => typeof p === "object" && p !== null && (p as { instanceId?: unknown }).instanceId === "iris-1" && (p as { reason?: unknown }).reason === reason, ); }, 3000, ); }; const presenceConnectedP = waitPresenceReason("iris-connected"); await bridgeCall?.onAuthenticated?.({ nodeId: "iris-1", displayName: "Iris", platform: "ios", version: "1.0", remoteIp: "10.0.0.10", }); await presenceConnectedP; const presenceDisconnectedP = waitPresenceReason("iris-disconnected"); await bridgeCall?.onDisconnected?.({ nodeId: "iris-1", displayName: "Iris", platform: "ios", version: "1.0", remoteIp: "10.0.0.10", }); await presenceDisconnectedP; } finally { try { ws.close(); } catch { /* ignore */ } await server.close(); await fs.rm(homeDir, { recursive: true, force: true }); } } finally { if (prevHome === undefined) { delete process.env.HOME; } else { process.env.HOME = prevHome; } } }); test("supports cron.add and cron.list", async () => { const dir = await fs.mkdtemp(path.join(os.tmpdir(), "clawdis-gw-cron-")); testCronStorePath = path.join(dir, "cron", "jobs.json"); await fs.mkdir(path.dirname(testCronStorePath), { recursive: true }); await fs.writeFile( testCronStorePath, JSON.stringify({ version: 1, jobs: [] }), ); const { server, ws } = await startServerWithClient(); await connectOk(ws); ws.send( JSON.stringify({ type: "req", id: "cron-add-1", method: "cron.add", params: { name: "daily", enabled: true, schedule: { kind: "every", everyMs: 60_000 }, sessionTarget: "main", wakeMode: "next-heartbeat", payload: { kind: "systemEvent", text: "hello" }, }, }), ); const addRes = await onceMessage<{ type: "res"; ok: boolean; payload?: unknown; }>(ws, (o) => o.type === "res" && o.id === "cron-add-1"); expect(addRes.ok).toBe(true); expect(typeof (addRes.payload as { id?: unknown } | null)?.id).toBe( "string", ); ws.send( JSON.stringify({ type: "req", id: "cron-list-1", method: "cron.list", params: { includeDisabled: true }, }), ); const listRes = await onceMessage<{ type: "res"; ok: boolean; payload?: unknown; }>(ws, (o) => o.type === "res" && o.id === "cron-list-1"); expect(listRes.ok).toBe(true); const jobs = (listRes.payload as { jobs?: unknown } | null)?.jobs; expect(Array.isArray(jobs)).toBe(true); expect((jobs as unknown[]).length).toBe(1); expect(((jobs as Array<{ name?: unknown }>)[0]?.name as string) ?? "").toBe( "daily", ); ws.close(); await server.close(); await fs.rm(dir, { recursive: true, force: true }); testCronStorePath = undefined; }); test("writes cron run history to runs/.jsonl", async () => { const dir = await fs.mkdtemp( path.join(os.tmpdir(), "clawdis-gw-cron-log-"), ); testCronStorePath = path.join(dir, "cron", "jobs.json"); await fs.mkdir(path.dirname(testCronStorePath), { recursive: true }); await fs.writeFile( testCronStorePath, JSON.stringify({ version: 1, jobs: [] }), ); const { server, ws } = await startServerWithClient(); await connectOk(ws); const atMs = Date.now() - 1; ws.send( JSON.stringify({ type: "req", id: "cron-add-log-1", method: "cron.add", params: { enabled: true, schedule: { kind: "at", atMs }, sessionTarget: "main", wakeMode: "next-heartbeat", payload: { kind: "systemEvent", text: "hello" }, }, }), ); const addRes = await onceMessage<{ type: "res"; ok: boolean; payload?: unknown; }>(ws, (o) => o.type === "res" && o.id === "cron-add-log-1"); expect(addRes.ok).toBe(true); const jobId = String((addRes.payload as { id?: unknown } | null)?.id ?? ""); expect(jobId.length > 0).toBe(true); ws.send( JSON.stringify({ type: "req", id: "cron-run-log-1", method: "cron.run", params: { id: jobId, mode: "force" }, }), ); const runRes = await onceMessage<{ type: "res"; ok: boolean }>( ws, (o) => o.type === "res" && o.id === "cron-run-log-1", 8000, ); expect(runRes.ok).toBe(true); const logPath = path.join(dir, "cron", "runs", `${jobId}.jsonl`); const waitForLog = async () => { for (let i = 0; i < 200; i++) { const raw = await fs.readFile(logPath, "utf-8").catch(() => ""); if (raw.trim().length > 0) return raw; await new Promise((r) => setTimeout(r, 10)); } throw new Error("timeout waiting for cron run log"); }; const raw = await waitForLog(); const line = raw .split("\n") .map((l) => l.trim()) .filter(Boolean) .at(-1); const last = JSON.parse(line ?? "{}") as { jobId?: unknown; action?: unknown; status?: unknown; summary?: unknown; }; expect(last.action).toBe("finished"); expect(last.jobId).toBe(jobId); expect(last.status).toBe("ok"); expect(last.summary).toBe("hello"); ws.send( JSON.stringify({ type: "req", id: "cron-runs-1", method: "cron.runs", params: { id: jobId, limit: 50 }, }), ); const runsRes = await onceMessage<{ type: "res"; ok: boolean; payload?: unknown; }>(ws, (o) => o.type === "res" && o.id === "cron-runs-1", 8000); expect(runsRes.ok).toBe(true); const entries = (runsRes.payload as { entries?: unknown } | null)?.entries; expect(Array.isArray(entries)).toBe(true); expect((entries as Array<{ jobId?: unknown }>).at(-1)?.jobId).toBe(jobId); expect((entries as Array<{ summary?: unknown }>).at(-1)?.summary).toBe( "hello", ); ws.close(); await server.close(); await fs.rm(dir, { recursive: true, force: true }); testCronStorePath = undefined; }); test("writes cron run history to per-job runs/ when store is jobs.json", async () => { const dir = await fs.mkdtemp( path.join(os.tmpdir(), "clawdis-gw-cron-log-jobs-"), ); const cronDir = path.join(dir, "cron"); testCronStorePath = path.join(cronDir, "jobs.json"); await fs.mkdir(cronDir, { recursive: true }); await fs.writeFile( testCronStorePath, JSON.stringify({ version: 1, jobs: [] }), ); const { server, ws } = await startServerWithClient(); await connectOk(ws); const atMs = Date.now() - 1; ws.send( JSON.stringify({ type: "req", id: "cron-add-log-2", method: "cron.add", params: { enabled: true, schedule: { kind: "at", atMs }, sessionTarget: "main", wakeMode: "next-heartbeat", payload: { kind: "systemEvent", text: "hello" }, }, }), ); const addRes = await onceMessage<{ type: "res"; ok: boolean; payload?: unknown; }>(ws, (o) => o.type === "res" && o.id === "cron-add-log-2"); expect(addRes.ok).toBe(true); const jobId = String((addRes.payload as { id?: unknown } | null)?.id ?? ""); expect(jobId.length > 0).toBe(true); ws.send( JSON.stringify({ type: "req", id: "cron-run-log-2", method: "cron.run", params: { id: jobId, mode: "force" }, }), ); const runRes = await onceMessage<{ type: "res"; ok: boolean }>( ws, (o) => o.type === "res" && o.id === "cron-run-log-2", 8000, ); expect(runRes.ok).toBe(true); const logPath = path.join(cronDir, "runs", `${jobId}.jsonl`); const waitForLog = async () => { for (let i = 0; i < 200; i++) { const raw = await fs.readFile(logPath, "utf-8").catch(() => ""); if (raw.trim().length > 0) return raw; await new Promise((r) => setTimeout(r, 10)); } throw new Error("timeout waiting for per-job cron run log"); }; const raw = await waitForLog(); const line = raw .split("\n") .map((l) => l.trim()) .filter(Boolean) .at(-1); const last = JSON.parse(line ?? "{}") as { jobId?: unknown; action?: unknown; summary?: unknown; }; expect(last.action).toBe("finished"); expect(last.jobId).toBe(jobId); expect(last.summary).toBe("hello"); ws.send( JSON.stringify({ type: "req", id: "cron-runs-2", method: "cron.runs", params: { id: jobId, limit: 20 }, }), ); const runsRes = await onceMessage<{ type: "res"; ok: boolean; payload?: unknown; }>(ws, (o) => o.type === "res" && o.id === "cron-runs-2", 8000); expect(runsRes.ok).toBe(true); const entries = (runsRes.payload as { entries?: unknown } | null)?.entries; expect(Array.isArray(entries)).toBe(true); expect((entries as Array<{ jobId?: unknown }>).at(-1)?.jobId).toBe(jobId); expect((entries as Array<{ summary?: unknown }>).at(-1)?.summary).toBe( "hello", ); ws.close(); await server.close(); await fs.rm(dir, { recursive: true, force: true }); testCronStorePath = undefined; }); test("enables cron scheduler by default and runs due jobs automatically", async () => { const dir = await fs.mkdtemp( path.join(os.tmpdir(), "clawdis-gw-cron-default-on-"), ); testCronStorePath = path.join(dir, "cron", "jobs.json"); testCronEnabled = undefined; // omitted config => enabled by default try { await fs.mkdir(path.dirname(testCronStorePath), { recursive: true }); await fs.writeFile( testCronStorePath, JSON.stringify({ version: 1, jobs: [] }), ); const { server, ws } = await startServerWithClient(); await connectOk(ws); ws.send( JSON.stringify({ type: "req", id: "cron-status-1", method: "cron.status", params: {}, }), ); const statusRes = await onceMessage<{ type: "res"; id: string; ok: boolean; payload?: unknown; }>(ws, (o) => o.type === "res" && o.id === "cron-status-1"); expect(statusRes.ok).toBe(true); const statusPayload = statusRes.payload as | { enabled?: unknown; storePath?: unknown } | undefined; expect(statusPayload?.enabled).toBe(true); expect(String(statusPayload?.storePath ?? "")).toContain("jobs.json"); const atMs = Date.now() + 80; ws.send( JSON.stringify({ type: "req", id: "cron-add-auto-1", method: "cron.add", params: { enabled: true, schedule: { kind: "at", atMs }, sessionTarget: "main", wakeMode: "next-heartbeat", payload: { kind: "systemEvent", text: "auto" }, }, }), ); const addRes = await onceMessage<{ type: "res"; ok: boolean; payload?: unknown; }>(ws, (o) => o.type === "res" && o.id === "cron-add-auto-1"); expect(addRes.ok).toBe(true); const jobId = String( (addRes.payload as { id?: unknown } | null)?.id ?? "", ); expect(jobId.length > 0).toBe(true); const finishedEvt = await onceMessage<{ type: "event"; event: string; payload?: { jobId?: string; action?: string; status?: string } | null; }>( ws, (o) => o.type === "event" && o.event === "cron" && (o.payload as { jobId?: unknown } | null)?.jobId === jobId && (o.payload as { action?: unknown } | null)?.action === "finished", 8000, ); expect(finishedEvt.payload?.status).toBe("ok"); const waitForRuns = async () => { for (let i = 0; i < 200; i++) { ws.send( JSON.stringify({ type: "req", id: "cron-runs-auto-1", method: "cron.runs", params: { id: jobId, limit: 10 }, }), ); const runsRes = await onceMessage<{ type: "res"; ok: boolean; payload?: unknown; }>(ws, (o) => o.type === "res" && o.id === "cron-runs-auto-1", 8000); expect(runsRes.ok).toBe(true); const entries = (runsRes.payload as { entries?: unknown } | null) ?.entries; if (Array.isArray(entries) && entries.length > 0) return entries; await new Promise((r) => setTimeout(r, 10)); } throw new Error("timeout waiting for cron.runs entries"); }; const entries = (await waitForRuns()) as Array<{ jobId?: unknown }>; expect(entries.at(-1)?.jobId).toBe(jobId); ws.close(); await server.close(); } finally { testCronEnabled = false; testCronStorePath = undefined; await fs.rm(dir, { recursive: true, force: true }); } }); test("broadcasts heartbeat events and serves last-heartbeat", async () => { type HeartbeatPayload = { ts: number; status: string; to?: string; preview?: string; durationMs?: number; hasMedia?: boolean; reason?: string; }; type EventFrame = { type: "event"; event: string; payload?: HeartbeatPayload | null; }; type ResFrame = { type: "res"; id: string; ok: boolean; payload?: unknown; }; const { server, ws } = await startServerWithClient(); await connectOk(ws); const waitHeartbeat = onceMessage( ws, (o) => o.type === "event" && o.event === "heartbeat", ); emitHeartbeatEvent({ status: "sent", to: "+123", preview: "ping" }); const evt = await waitHeartbeat; expect(evt.payload?.status).toBe("sent"); expect(typeof evt.payload?.ts).toBe("number"); ws.send( JSON.stringify({ type: "req", id: "hb-last", method: "last-heartbeat", }), ); const last = await onceMessage( ws, (o) => o.type === "res" && o.id === "hb-last", ); expect(last.ok).toBe(true); const lastPayload = last.payload as HeartbeatPayload | null | undefined; expect(lastPayload?.status).toBe("sent"); expect(lastPayload?.ts).toBe(evt.payload?.ts); ws.send( JSON.stringify({ type: "req", id: "hb-toggle-off", method: "set-heartbeats", params: { enabled: false }, }), ); const toggle = await onceMessage( ws, (o) => o.type === "res" && o.id === "hb-toggle-off", ); expect(toggle.ok).toBe(true); expect((toggle.payload as { enabled?: boolean } | undefined)?.enabled).toBe( false, ); ws.close(); await server.close(); }); test("agent falls back to allowFrom when lastTo is stale", async () => { testAllowFrom = ["+436769770569"]; const dir = await fs.mkdtemp(path.join(os.tmpdir(), "clawdis-gw-")); testSessionStorePath = path.join(dir, "sessions.json"); await fs.writeFile( testSessionStorePath, JSON.stringify( { main: { sessionId: "sess-main-stale", updatedAt: Date.now(), lastChannel: "whatsapp", lastTo: "+1555", }, }, null, 2, ), "utf-8", ); const { server, ws } = await startServerWithClient(); await connectOk(ws); ws.send( JSON.stringify({ type: "req", id: "agent-last-stale", method: "agent", params: { message: "hi", sessionKey: "main", channel: "last", deliver: true, idempotencyKey: "idem-agent-last-stale", }, }), ); await onceMessage( ws, (o) => o.type === "res" && o.id === "agent-last-stale", ); const spy = vi.mocked(agentCommand); expect(spy).toHaveBeenCalled(); const call = spy.mock.calls.at(-1)?.[0] as Record; expect(call.provider).toBe("whatsapp"); expect(call.to).toBe("+436769770569"); expect(call.sessionId).toBe("sess-main-stale"); ws.close(); await server.close(); testAllowFrom = undefined; }); test("agent routes main last-channel whatsapp", async () => { testAllowFrom = undefined; const dir = await fs.mkdtemp(path.join(os.tmpdir(), "clawdis-gw-")); testSessionStorePath = path.join(dir, "sessions.json"); await fs.writeFile( testSessionStorePath, JSON.stringify( { main: { sessionId: "sess-main-whatsapp", updatedAt: Date.now(), lastChannel: "whatsapp", lastTo: "+1555", }, }, null, 2, ), "utf-8", ); const { server, ws } = await startServerWithClient(); await connectOk(ws); ws.send( JSON.stringify({ type: "req", id: "agent-last-whatsapp", method: "agent", params: { message: "hi", sessionKey: "main", channel: "last", deliver: true, idempotencyKey: "idem-agent-last-whatsapp", }, }), ); await onceMessage( ws, (o) => o.type === "res" && o.id === "agent-last-whatsapp", ); const spy = vi.mocked(agentCommand); expect(spy).toHaveBeenCalled(); const call = spy.mock.calls.at(-1)?.[0] as Record; expect(call.provider).toBe("whatsapp"); expect(call.to).toBe("+1555"); expect(call.deliver).toBe(true); expect(call.bestEffortDeliver).toBe(true); expect(call.sessionId).toBe("sess-main-whatsapp"); ws.close(); await server.close(); }); test("agent routes main last-channel telegram", async () => { const dir = await fs.mkdtemp(path.join(os.tmpdir(), "clawdis-gw-")); testSessionStorePath = path.join(dir, "sessions.json"); await fs.writeFile( testSessionStorePath, JSON.stringify( { main: { sessionId: "sess-main", updatedAt: Date.now(), lastChannel: "telegram", lastTo: "123", }, }, null, 2, ), "utf-8", ); const { server, ws } = await startServerWithClient(); await connectOk(ws); ws.send( JSON.stringify({ type: "req", id: "agent-last", method: "agent", params: { message: "hi", sessionKey: "main", channel: "last", deliver: true, idempotencyKey: "idem-agent-last", }, }), ); await onceMessage(ws, (o) => o.type === "res" && o.id === "agent-last"); const spy = vi.mocked(agentCommand); expect(spy).toHaveBeenCalled(); const call = spy.mock.calls.at(-1)?.[0] as Record; expect(call.provider).toBe("telegram"); expect(call.to).toBe("123"); expect(call.deliver).toBe(true); expect(call.bestEffortDeliver).toBe(true); expect(call.sessionId).toBe("sess-main"); ws.close(); await server.close(); }); test("agent ignores webchat last-channel for routing", async () => { testAllowFrom = ["+1555"]; const dir = await fs.mkdtemp(path.join(os.tmpdir(), "clawdis-gw-")); testSessionStorePath = path.join(dir, "sessions.json"); await fs.writeFile( testSessionStorePath, JSON.stringify( { main: { sessionId: "sess-main-webchat", updatedAt: Date.now(), lastChannel: "webchat", lastTo: "+1555", }, }, null, 2, ), "utf-8", ); const { server, ws } = await startServerWithClient(); await connectOk(ws); ws.send( JSON.stringify({ type: "req", id: "agent-webchat", method: "agent", params: { message: "hi", sessionKey: "main", channel: "last", deliver: true, idempotencyKey: "idem-agent-webchat", }, }), ); await onceMessage(ws, (o) => o.type === "res" && o.id === "agent-webchat"); const spy = vi.mocked(agentCommand); expect(spy).toHaveBeenCalled(); const call = spy.mock.calls.at(-1)?.[0] as Record; expect(call.provider).toBe("whatsapp"); expect(call.to).toBe("+1555"); expect(call.deliver).toBe(true); expect(call.bestEffortDeliver).toBe(true); expect(call.sessionId).toBe("sess-main-webchat"); ws.close(); await server.close(); }); test("rejects protocol mismatch", async () => { const { server, ws } = await startServerWithClient(); try { const res = await connectReq(ws, { minProtocol: PROTOCOL_VERSION + 1, maxProtocol: PROTOCOL_VERSION + 2, }); expect(res.ok).toBe(false); } catch { // If the server closed before we saw the frame, that's acceptable for mismatch. } ws.close(); await server.close(); }); test("rejects invalid token", async () => { const { server, ws, prevToken } = await startServerWithClient("secret"); const res = await connectReq(ws, { token: "wrong" }); expect(res.ok).toBe(false); expect(res.error?.message ?? "").toContain("unauthorized"); ws.close(); await server.close(); process.env.CLAWDIS_GATEWAY_TOKEN = prevToken; }); test( "closes silent handshakes after timeout", { timeout: 15_000 }, async () => { const { server, ws } = await startServerWithClient(); const closed = await new Promise((resolve) => { const timer = setTimeout(() => resolve(false), 12_000); ws.once("close", () => { clearTimeout(timer); resolve(true); }); }); expect(closed).toBe(true); await server.close(); }, ); test("connect (req) handshake returns hello-ok payload", async () => { const { server, ws } = await startServerWithClient(); const id = randomUUID(); ws.send( JSON.stringify({ type: "req", id, method: "connect", params: { minProtocol: PROTOCOL_VERSION, maxProtocol: PROTOCOL_VERSION, client: { name: "test", version: "1.0.0", platform: "test", mode: "test", }, caps: [], }, }), ); const res = await onceMessage<{ ok: boolean; payload?: unknown }>( ws, (o) => o.type === "res" && o.id === id, ); expect(res.ok).toBe(true); expect((res.payload as { type?: unknown } | undefined)?.type).toBe( "hello-ok", ); ws.close(); await server.close(); }); test("rejects non-connect first request", async () => { const { server, ws } = await startServerWithClient(); ws.send(JSON.stringify({ type: "req", id: "h1", method: "health" })); const res = await onceMessage<{ ok: boolean; error?: unknown }>( ws, (o) => o.type === "res" && o.id === "h1", ); expect(res.ok).toBe(false); await new Promise((resolve) => ws.once("close", () => resolve())); await server.close(); }); test( "connect + health + presence + status succeed", { timeout: 8000 }, async () => { const { server, ws } = await startServerWithClient(); await connectOk(ws); const healthP = onceMessage( ws, (o) => o.type === "res" && o.id === "health1", ); const statusP = onceMessage( ws, (o) => o.type === "res" && o.id === "status1", ); const presenceP = onceMessage( ws, (o) => o.type === "res" && o.id === "presence1", ); const sendReq = (id: string, method: string) => ws.send(JSON.stringify({ type: "req", id, method })); sendReq("health1", "health"); sendReq("status1", "status"); sendReq("presence1", "system-presence"); const health = await healthP; const status = await statusP; const presence = await presenceP; expect(health.ok).toBe(true); expect(status.ok).toBe(true); expect(presence.ok).toBe(true); expect(Array.isArray(presence.payload)).toBe(true); ws.close(); await server.close(); }, ); test( "presence events carry seq + stateVersion", { timeout: 8000 }, async () => { const { server, ws } = await startServerWithClient(); await connectOk(ws); const presenceEventP = onceMessage( ws, (o) => o.type === "event" && o.event === "presence", ); ws.send( JSON.stringify({ type: "req", id: "evt-1", method: "system-event", params: { text: "note from test" }, }), ); const evt = await presenceEventP; expect(typeof evt.seq).toBe("number"); expect(evt.stateVersion?.presence).toBeGreaterThan(0); expect(Array.isArray(evt.payload?.presence)).toBe(true); ws.close(); await server.close(); }, ); test("agent events stream with seq", { timeout: 8000 }, async () => { const { server, ws } = await startServerWithClient(); await connectOk(ws); // Emit a fake agent event directly through the shared emitter. const runId = randomUUID(); const evtPromise = onceMessage( ws, (o) => o.type === "event" && o.event === "agent" && o.payload?.runId === runId && o.payload?.stream === "job", ); emitAgentEvent({ runId, stream: "job", data: { msg: "hi" } }); const evt = await evtPromise; expect(evt.payload.runId).toBe(runId); expect(typeof evt.seq).toBe("number"); expect(evt.payload.data.msg).toBe("hi"); ws.close(); await server.close(); }); test( "agent ack response then final response", { timeout: 8000 }, async () => { const { server, ws } = await startServerWithClient(); await connectOk(ws); const ackP = onceMessage( ws, (o) => o.type === "res" && o.id === "ag1" && o.payload?.status === "accepted", ); const finalP = onceMessage( ws, (o) => o.type === "res" && o.id === "ag1" && o.payload?.status !== "accepted", ); ws.send( JSON.stringify({ type: "req", id: "ag1", method: "agent", params: { message: "hi", idempotencyKey: "idem-ag" }, }), ); const ack = await ackP; const final = await finalP; expect(ack.payload.runId).toBeDefined(); expect(final.payload.runId).toBe(ack.payload.runId); expect(final.payload.status).toBe("ok"); ws.close(); await server.close(); }, ); test( "agent dedupes by idempotencyKey after completion", { timeout: 8000 }, async () => { const { server, ws } = await startServerWithClient(); await connectOk(ws); const firstFinalP = onceMessage( ws, (o) => o.type === "res" && o.id === "ag1" && o.payload?.status !== "accepted", ); ws.send( JSON.stringify({ type: "req", id: "ag1", method: "agent", params: { message: "hi", idempotencyKey: "same-agent" }, }), ); const firstFinal = await firstFinalP; const secondP = onceMessage( ws, (o) => o.type === "res" && o.id === "ag2", ); ws.send( JSON.stringify({ type: "req", id: "ag2", method: "agent", params: { message: "hi again", idempotencyKey: "same-agent" }, }), ); const second = await secondP; expect(second.payload).toEqual(firstFinal.payload); ws.close(); await server.close(); }, ); test("shutdown event is broadcast on close", { timeout: 8000 }, async () => { const { server, ws } = await startServerWithClient(); await connectOk(ws); const shutdownP = onceMessage( ws, (o) => o.type === "event" && o.event === "shutdown", 5000, ); await server.close(); const evt = await shutdownP; expect(evt.payload?.reason).toBeDefined(); }); test( "presence broadcast reaches multiple clients", { timeout: 8000 }, async () => { const port = await getFreePort(); const server = await startGatewayServer(port); const mkClient = async () => { const c = new WebSocket(`ws://127.0.0.1:${port}`); await new Promise((resolve) => c.once("open", resolve)); await connectOk(c); return c; }; const clients = await Promise.all([mkClient(), mkClient(), mkClient()]); const waits = clients.map((c) => onceMessage(c, (o) => o.type === "event" && o.event === "presence"), ); clients[0].send( JSON.stringify({ type: "req", id: "broadcast", method: "system-event", params: { text: "fanout" }, }), ); const events = await Promise.all(waits); for (const evt of events) { expect(evt.payload?.presence?.length).toBeGreaterThan(0); expect(typeof evt.seq).toBe("number"); } for (const c of clients) c.close(); await server.close(); }, ); test("send dedupes by idempotencyKey", { timeout: 8000 }, async () => { const { server, ws } = await startServerWithClient(); await connectOk(ws); const idem = "same-key"; const res1P = onceMessage(ws, (o) => o.type === "res" && o.id === "a1"); const res2P = onceMessage(ws, (o) => o.type === "res" && o.id === "a2"); const sendReq = (id: string) => ws.send( JSON.stringify({ type: "req", id, method: "send", params: { to: "+15550000000", message: "hi", idempotencyKey: idem }, }), ); sendReq("a1"); sendReq("a2"); const res1 = await res1P; const res2 = await res2P; expect(res1.ok).toBe(true); expect(res2.ok).toBe(true); expect(res1.payload).toEqual(res2.payload); ws.close(); await server.close(); }); test("agent dedupe survives reconnect", { timeout: 15000 }, async () => { const port = await getFreePort(); const server = await startGatewayServer(port); const dial = async () => { const ws = new WebSocket(`ws://127.0.0.1:${port}`); await new Promise((resolve) => ws.once("open", resolve)); await connectOk(ws); return ws; }; const idem = "reconnect-agent"; const ws1 = await dial(); const final1P = onceMessage( ws1, (o) => o.type === "res" && o.id === "ag1" && o.payload?.status !== "accepted", 6000, ); ws1.send( JSON.stringify({ type: "req", id: "ag1", method: "agent", params: { message: "hi", idempotencyKey: idem }, }), ); const final1 = await final1P; ws1.close(); const ws2 = await dial(); const final2P = onceMessage( ws2, (o) => o.type === "res" && o.id === "ag2" && o.payload?.status !== "accepted", 6000, ); ws2.send( JSON.stringify({ type: "req", id: "ag2", method: "agent", params: { message: "hi again", idempotencyKey: idem }, }), ); const res = await final2P; expect(res.payload).toEqual(final1.payload); ws2.close(); await server.close(); }); test("chat.send accepts image attachment", { timeout: 12000 }, async () => { const { server, ws } = await startServerWithClient(); await connectOk(ws); const reqId = "chat-img"; ws.send( JSON.stringify({ type: "req", id: reqId, method: "chat.send", params: { sessionKey: "main", message: "see image", idempotencyKey: "idem-img", attachments: [ { type: "image", mimeType: "image/png", fileName: "dot.png", content: "iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAQAAAC1HAwCAAAAC0lEQVR42mP8/woAAn8B9FD5fHAAAAAASUVORK5CYII=", }, ], }, }), ); const res = await onceMessage( ws, (o) => o.type === "res" && o.id === reqId, 8000, ); expect(res.ok).toBe(true); expect(res.payload?.runId).toBeDefined(); ws.close(); await server.close(); }); test("chat.history caps large histories and honors limit", async () => { const firstContentText = (msg: unknown): string | undefined => { if (!msg || typeof msg !== "object") return undefined; const content = (msg as { content?: unknown }).content; if (!Array.isArray(content) || content.length === 0) return undefined; const first = content[0]; if (!first || typeof first !== "object") return undefined; const text = (first as { text?: unknown }).text; return typeof text === "string" ? text : undefined; }; const dir = await fs.mkdtemp(path.join(os.tmpdir(), "clawdis-gw-")); testSessionStorePath = path.join(dir, "sessions.json"); await fs.writeFile( testSessionStorePath, JSON.stringify( { main: { sessionId: "sess-main", updatedAt: Date.now(), }, }, null, 2, ), "utf-8", ); const lines: string[] = []; for (let i = 0; i < 300; i += 1) { lines.push( JSON.stringify({ message: { role: "user", content: [{ type: "text", text: `m${i}` }], timestamp: Date.now() + i, }, }), ); } await fs.writeFile( path.join(dir, "sess-main.jsonl"), lines.join("\n"), "utf-8", ); const { server, ws } = await startServerWithClient(); await connectOk(ws); const defaultRes = await rpcReq<{ messages?: unknown[] }>( ws, "chat.history", { sessionKey: "main", }, ); expect(defaultRes.ok).toBe(true); const defaultMsgs = defaultRes.payload?.messages ?? []; expect(defaultMsgs.length).toBe(200); expect(firstContentText(defaultMsgs[0])).toBe("m100"); const limitedRes = await rpcReq<{ messages?: unknown[] }>( ws, "chat.history", { sessionKey: "main", limit: 5, }, ); expect(limitedRes.ok).toBe(true); const limitedMsgs = limitedRes.payload?.messages ?? []; expect(limitedMsgs.length).toBe(5); expect(firstContentText(limitedMsgs[0])).toBe("m295"); const largeLines: string[] = []; for (let i = 0; i < 1500; i += 1) { largeLines.push( JSON.stringify({ message: { role: "user", content: [{ type: "text", text: `b${i}` }], timestamp: Date.now() + i, }, }), ); } await fs.writeFile( path.join(dir, "sess-main.jsonl"), largeLines.join("\n"), "utf-8", ); const cappedRes = await rpcReq<{ messages?: unknown[] }>( ws, "chat.history", { sessionKey: "main", }, ); expect(cappedRes.ok).toBe(true); const cappedMsgs = cappedRes.payload?.messages ?? []; expect(cappedMsgs.length).toBe(200); expect(firstContentText(cappedMsgs[0])).toBe("b1300"); const maxRes = await rpcReq<{ messages?: unknown[] }>(ws, "chat.history", { sessionKey: "main", limit: 1000, }); expect(maxRes.ok).toBe(true); const maxMsgs = maxRes.payload?.messages ?? []; expect(maxMsgs.length).toBe(1000); expect(firstContentText(maxMsgs[0])).toBe("b500"); ws.close(); await server.close(); }); test("chat.history caps payload bytes", async () => { const dir = await fs.mkdtemp(path.join(os.tmpdir(), "clawdis-gw-")); testSessionStorePath = path.join(dir, "sessions.json"); await fs.writeFile( testSessionStorePath, JSON.stringify( { main: { sessionId: "sess-main", updatedAt: Date.now(), }, }, null, 2, ), "utf-8", ); const { server, ws } = await startServerWithClient(); await connectOk(ws); const bigText = "x".repeat(300_000); const largeLines: string[] = []; for (let i = 0; i < 60; i += 1) { largeLines.push( JSON.stringify({ message: { role: "user", content: [{ type: "text", text: `${i}:${bigText}` }], timestamp: Date.now() + i, }, }), ); } await fs.writeFile( path.join(dir, "sess-main.jsonl"), largeLines.join("\n"), "utf-8", ); const cappedRes = await rpcReq<{ messages?: unknown[] }>( ws, "chat.history", { sessionKey: "main", limit: 1000 }, ); expect(cappedRes.ok).toBe(true); const cappedMsgs = cappedRes.payload?.messages ?? []; const bytes = Buffer.byteLength(JSON.stringify(cappedMsgs), "utf8"); expect(bytes).toBeLessThanOrEqual(6 * 1024 * 1024); expect(cappedMsgs.length).toBeLessThan(60); ws.close(); await server.close(); }); test("chat.send does not overwrite last delivery route", async () => { const dir = await fs.mkdtemp(path.join(os.tmpdir(), "clawdis-gw-")); testSessionStorePath = path.join(dir, "sessions.json"); await fs.writeFile( testSessionStorePath, JSON.stringify( { main: { sessionId: "sess-main", updatedAt: Date.now(), lastChannel: "whatsapp", lastTo: "+1555", }, }, null, 2, ), "utf-8", ); const { server, ws } = await startServerWithClient(); await connectOk(ws); const reqId = "chat-route"; ws.send( JSON.stringify({ type: "req", id: reqId, method: "chat.send", params: { sessionKey: "main", message: "hello", idempotencyKey: "idem-route", }, }), ); const res = await onceMessage( ws, (o) => o.type === "res" && o.id === reqId, ); expect(res.ok).toBe(true); const stored = JSON.parse( await fs.readFile(testSessionStorePath, "utf-8"), ) as { main?: { lastChannel?: string; lastTo?: string }; }; expect(stored.main?.lastChannel).toBe("whatsapp"); expect(stored.main?.lastTo).toBe("+1555"); ws.close(); await server.close(); }); test( "chat.abort cancels an in-flight chat.send", { timeout: 15000 }, async () => { const dir = await fs.mkdtemp(path.join(os.tmpdir(), "clawdis-gw-")); testSessionStorePath = path.join(dir, "sessions.json"); await fs.writeFile( testSessionStorePath, JSON.stringify( { main: { sessionId: "sess-main", updatedAt: Date.now(), }, }, null, 2, ), "utf-8", ); const { server, ws } = await startServerWithClient(); let inFlight: Promise | undefined; try { await connectOk(ws); const spy = vi.mocked(agentCommand); const callsBefore = spy.mock.calls.length; spy.mockImplementationOnce(async (opts) => { const signal = (opts as { abortSignal?: AbortSignal }).abortSignal; await new Promise((resolve) => { if (!signal) return resolve(); if (signal.aborted) return resolve(); signal.addEventListener("abort", () => resolve(), { once: true }); }); }); const sendResP = onceMessage( ws, (o) => o.type === "res" && o.id === "send-abort-1", 8000, ); const abortResP = onceMessage( ws, (o) => o.type === "res" && o.id === "abort-1", 8000, ); const abortedEventP = onceMessage( ws, (o) => o.type === "event" && o.event === "chat" && o.payload?.state === "aborted", 8000, ); inFlight = Promise.allSettled([sendResP, abortResP, abortedEventP]); ws.send( JSON.stringify({ type: "req", id: "send-abort-1", method: "chat.send", params: { sessionKey: "main", message: "hello", idempotencyKey: "idem-abort-1", timeoutMs: 30_000, }, }), ); await new Promise((resolve, reject) => { const deadline = Date.now() + 1000; const tick = () => { if (spy.mock.calls.length > callsBefore) return resolve(); if (Date.now() > deadline) return reject(new Error("timeout waiting for agentCommand")); setTimeout(tick, 5); }; tick(); }); ws.send( JSON.stringify({ type: "req", id: "abort-1", method: "chat.abort", params: { sessionKey: "main", runId: "idem-abort-1" }, }), ); const abortRes = await abortResP; expect(abortRes.ok).toBe(true); const sendRes = await sendResP; expect(sendRes.ok).toBe(true); const evt = await abortedEventP; expect(evt.payload?.runId).toBe("idem-abort-1"); expect(evt.payload?.sessionKey).toBe("main"); } finally { ws.close(); await inFlight; await server.close(); } }, ); test("chat.abort returns aborted=false for unknown runId", async () => { const dir = await fs.mkdtemp(path.join(os.tmpdir(), "clawdis-gw-")); testSessionStorePath = path.join(dir, "sessions.json"); await fs.writeFile(testSessionStorePath, JSON.stringify({}, null, 2), "utf-8"); const { server, ws } = await startServerWithClient(); await connectOk(ws); ws.send( JSON.stringify({ type: "req", id: "abort-unknown-1", method: "chat.abort", params: { sessionKey: "main", runId: "missing-run" }, }), ); const abortRes = await onceMessage<{ type: "res"; id: string; ok: boolean; payload?: { ok?: boolean; aborted?: boolean }; }>(ws, (o) => o.type === "res" && o.id === "abort-unknown-1"); expect(abortRes.ok).toBe(true); expect(abortRes.payload?.aborted).toBe(false); ws.close(); await server.close(); }); test("chat.abort rejects mismatched sessionKey", async () => { const dir = await fs.mkdtemp(path.join(os.tmpdir(), "clawdis-gw-")); testSessionStorePath = path.join(dir, "sessions.json"); await fs.writeFile( testSessionStorePath, JSON.stringify( { main: { sessionId: "sess-main", updatedAt: Date.now(), }, }, null, 2, ), "utf-8", ); const { server, ws } = await startServerWithClient(); await connectOk(ws); const spy = vi.mocked(agentCommand); let agentStartedResolve: (() => void) | undefined; const agentStartedP = new Promise((resolve) => { agentStartedResolve = resolve; }); spy.mockImplementationOnce(async (opts) => { agentStartedResolve?.(); const signal = (opts as { abortSignal?: AbortSignal }).abortSignal; await new Promise((resolve) => { if (!signal) return resolve(); if (signal.aborted) return resolve(); signal.addEventListener("abort", () => resolve(), { once: true }); }); }); ws.send( JSON.stringify({ type: "req", id: "send-mismatch-1", method: "chat.send", params: { sessionKey: "main", message: "hello", idempotencyKey: "idem-mismatch-1", timeoutMs: 30_000, }, }), ); await agentStartedP; ws.send( JSON.stringify({ type: "req", id: "abort-mismatch-1", method: "chat.abort", params: { sessionKey: "other", runId: "idem-mismatch-1" }, }), ); const abortRes = await onceMessage( ws, (o) => o.type === "res" && o.id === "abort-mismatch-1", ); expect(abortRes.ok).toBe(false); expect(abortRes.error?.code).toBe("INVALID_REQUEST"); ws.send( JSON.stringify({ type: "req", id: "abort-mismatch-2", method: "chat.abort", params: { sessionKey: "main", runId: "idem-mismatch-1" }, }), ); const abortRes2 = await onceMessage( ws, (o) => o.type === "res" && o.id === "abort-mismatch-2", ); expect(abortRes2.ok).toBe(true); const sendRes = await onceMessage( ws, (o) => o.type === "res" && o.id === "send-mismatch-1", ); expect(sendRes.ok).toBe(true); ws.close(); await server.close(); }); test("chat.abort is a no-op after chat.send completes", async () => { const dir = await fs.mkdtemp(path.join(os.tmpdir(), "clawdis-gw-")); testSessionStorePath = path.join(dir, "sessions.json"); await fs.writeFile( testSessionStorePath, JSON.stringify( { main: { sessionId: "sess-main", updatedAt: Date.now(), }, }, null, 2, ), "utf-8", ); const { server, ws } = await startServerWithClient(); await connectOk(ws); const spy = vi.mocked(agentCommand); spy.mockResolvedValueOnce(undefined); ws.send( JSON.stringify({ type: "req", id: "send-complete-1", method: "chat.send", params: { sessionKey: "main", message: "hello", idempotencyKey: "idem-complete-1", timeoutMs: 30_000, }, }), ); const sendRes = await onceMessage( ws, (o) => o.type === "res" && o.id === "send-complete-1", ); expect(sendRes.ok).toBe(true); ws.send( JSON.stringify({ type: "req", id: "abort-complete-1", method: "chat.abort", params: { sessionKey: "main", runId: "idem-complete-1" }, }), ); const abortRes = await onceMessage( ws, (o) => o.type === "res" && o.id === "abort-complete-1", ); expect(abortRes.ok).toBe(true); expect(abortRes.payload?.aborted).toBe(false); ws.close(); await server.close(); }); test("bridge RPC chat.history returns session messages", async () => { const dir = await fs.mkdtemp(path.join(os.tmpdir(), "clawdis-gw-")); testSessionStorePath = path.join(dir, "sessions.json"); await fs.writeFile( testSessionStorePath, JSON.stringify( { main: { sessionId: "sess-main", updatedAt: Date.now(), }, }, null, 2, ), "utf-8", ); await fs.writeFile( path.join(dir, "sess-main.jsonl"), [ JSON.stringify({ message: { role: "user", content: [{ type: "text", text: "hi" }], timestamp: Date.now(), }, }), ].join("\n"), "utf-8", ); const port = await getFreePort(); const server = await startGatewayServer(port); const bridgeCall = bridgeStartCalls.at(-1); expect(bridgeCall?.onRequest).toBeDefined(); const res = await bridgeCall?.onRequest?.("ios-node", { id: "r1", method: "chat.history", paramsJSON: JSON.stringify({ sessionKey: "main" }), }); expect(res?.ok).toBe(true); const payload = JSON.parse( String((res as { payloadJSON?: string }).payloadJSON ?? "{}"), ) as { sessionKey?: string; sessionId?: string; messages?: unknown[]; }; expect(payload.sessionKey).toBe("main"); expect(payload.sessionId).toBe("sess-main"); expect(Array.isArray(payload.messages)).toBe(true); expect(payload.messages?.length).toBeGreaterThan(0); await server.close(); }); test("bridge RPC sessions.list returns session rows", async () => { const dir = await fs.mkdtemp(path.join(os.tmpdir(), "clawdis-gw-")); testSessionStorePath = path.join(dir, "sessions.json"); await fs.writeFile( testSessionStorePath, JSON.stringify( { main: { sessionId: "sess-main", updatedAt: Date.now(), }, }, null, 2, ), "utf-8", ); const port = await getFreePort(); const server = await startGatewayServer(port); const bridgeCall = bridgeStartCalls.at(-1); expect(bridgeCall?.onRequest).toBeDefined(); const res = await bridgeCall?.onRequest?.("ios-node", { id: "r1", method: "sessions.list", paramsJSON: JSON.stringify({ includeGlobal: true, includeUnknown: false, limit: 50, }), }); expect(res?.ok).toBe(true); const payload = JSON.parse( String((res as { payloadJSON?: string }).payloadJSON ?? "{}"), ) as { sessions?: unknown[]; count?: number; path?: string; }; expect(Array.isArray(payload.sessions)).toBe(true); expect(typeof payload.count).toBe("number"); expect(typeof payload.path).toBe("string"); await server.close(); }); test("bridge chat events are pushed to subscribed nodes", async () => { const dir = await fs.mkdtemp(path.join(os.tmpdir(), "clawdis-gw-")); testSessionStorePath = path.join(dir, "sessions.json"); await fs.writeFile( testSessionStorePath, JSON.stringify( { main: { sessionId: "sess-main", updatedAt: Date.now(), }, }, null, 2, ), "utf-8", ); const port = await getFreePort(); const server = await startGatewayServer(port); const bridgeCall = bridgeStartCalls.at(-1); expect(bridgeCall?.onEvent).toBeDefined(); expect(bridgeCall?.onRequest).toBeDefined(); // Subscribe the node to chat events for main. await bridgeCall?.onEvent?.("ios-node", { event: "chat.subscribe", payloadJSON: JSON.stringify({ sessionKey: "main" }), }); bridgeSendEvent.mockClear(); // Trigger a chat.send, then simulate agent bus completion for the sessionId. const reqRes = await bridgeCall?.onRequest?.("ios-node", { id: "s1", method: "chat.send", paramsJSON: JSON.stringify({ sessionKey: "main", message: "hello", idempotencyKey: "idem-bridge-chat", timeoutMs: 30_000, }), }); expect(reqRes?.ok).toBe(true); emitAgentEvent({ runId: "sess-main", seq: 1, ts: Date.now(), stream: "assistant", data: { text: "hi from agent" }, }); emitAgentEvent({ runId: "sess-main", seq: 2, ts: Date.now(), stream: "job", data: { state: "done" }, }); // Wait a tick for the bridge send to happen. await new Promise((r) => setTimeout(r, 25)); expect(bridgeSendEvent).toHaveBeenCalledWith( expect.objectContaining({ nodeId: "ios-node", event: "agent", }), ); expect(bridgeSendEvent).toHaveBeenCalledWith( expect.objectContaining({ nodeId: "ios-node", event: "chat", }), ); await server.close(); }); test("presence includes client fingerprint", async () => { const { server, ws } = await startServerWithClient(); await connectOk(ws, { client: { name: "fingerprint", version: "9.9.9", platform: "test", deviceFamily: "iPad", modelIdentifier: "iPad16,6", mode: "ui", instanceId: "abc", }, }); const presenceP = onceMessage( ws, (o) => o.type === "res" && o.id === "fingerprint", 4000, ); ws.send( JSON.stringify({ type: "req", id: "fingerprint", method: "system-presence", }), ); const presenceRes = await presenceP; const entries = presenceRes.payload as Array>; const clientEntry = entries.find((e) => e.instanceId === "abc"); expect(clientEntry?.host).toBe("fingerprint"); expect(clientEntry?.version).toBe("9.9.9"); expect(clientEntry?.mode).toBe("ui"); expect(clientEntry?.deviceFamily).toBe("iPad"); expect(clientEntry?.modelIdentifier).toBe("iPad16,6"); ws.close(); await server.close(); }); test("cli connections are not tracked as instances", async () => { const { server, ws } = await startServerWithClient(); const cliId = `cli-${randomUUID()}`; await connectOk(ws, { client: { name: "cli", version: "dev", platform: "test", mode: "cli", instanceId: cliId, }, }); const presenceP = onceMessage( ws, (o) => o.type === "res" && o.id === "cli-presence", 4000, ); ws.send( JSON.stringify({ type: "req", id: "cli-presence", method: "system-presence", }), ); const presenceRes = await presenceP; const entries = presenceRes.payload as Array>; expect(entries.some((e) => e.instanceId === cliId)).toBe(false); ws.close(); await server.close(); }); test("lists and patches session store via sessions.* RPC", async () => { const dir = await fs.mkdtemp(path.join(os.tmpdir(), "clawdis-sessions-")); const storePath = path.join(dir, "sessions.json"); const now = Date.now(); testSessionStorePath = storePath; await fs.writeFile( storePath, JSON.stringify( { main: { sessionId: "sess-main", updatedAt: now - 30_000, inputTokens: 10, outputTokens: 20, thinkingLevel: "low", verboseLevel: "on", }, "group:dev": { sessionId: "sess-group", updatedAt: now - 120_000, totalTokens: 50, }, global: { sessionId: "sess-global", updatedAt: now - 10_000, }, }, null, 2, ), "utf-8", ); const { server, ws } = await startServerWithClient(); const hello = await connectOk(ws); expect( (hello as unknown as { features?: { methods?: string[] } }).features ?.methods, ).toEqual(expect.arrayContaining(["sessions.list", "sessions.patch"])); const list1 = await rpcReq<{ path: string; sessions: Array<{ key: string; totalTokens?: number; thinkingLevel?: string; verboseLevel?: string; }>; }>(ws, "sessions.list", { includeGlobal: false, includeUnknown: false }); expect(list1.ok).toBe(true); expect(list1.payload?.path).toBe(storePath); expect(list1.payload?.sessions.some((s) => s.key === "global")).toBe(false); const main = list1.payload?.sessions.find((s) => s.key === "main"); expect(main?.totalTokens).toBe(30); expect(main?.thinkingLevel).toBe("low"); expect(main?.verboseLevel).toBe("on"); const active = await rpcReq<{ sessions: Array<{ key: string }>; }>(ws, "sessions.list", { includeGlobal: false, includeUnknown: false, activeMinutes: 1, }); expect(active.ok).toBe(true); expect(active.payload?.sessions.map((s) => s.key)).toEqual(["main"]); const limited = await rpcReq<{ sessions: Array<{ key: string }>; }>(ws, "sessions.list", { includeGlobal: true, includeUnknown: false, limit: 1, }); expect(limited.ok).toBe(true); expect(limited.payload?.sessions).toHaveLength(1); expect(limited.payload?.sessions[0]?.key).toBe("global"); const patched = await rpcReq<{ ok: true; key: string }>( ws, "sessions.patch", { key: "main", thinkingLevel: "medium", verboseLevel: null }, ); expect(patched.ok).toBe(true); expect(patched.payload?.ok).toBe(true); expect(patched.payload?.key).toBe("main"); const list2 = await rpcReq<{ sessions: Array<{ key: string; thinkingLevel?: string; verboseLevel?: string; }>; }>(ws, "sessions.list", {}); expect(list2.ok).toBe(true); const main2 = list2.payload?.sessions.find((s) => s.key === "main"); expect(main2?.thinkingLevel).toBe("medium"); expect(main2?.verboseLevel).toBeUndefined(); const badThinking = await rpcReq(ws, "sessions.patch", { key: "main", thinkingLevel: "banana", }); expect(badThinking.ok).toBe(false); expect( (badThinking.error as { message?: unknown } | undefined)?.message ?? "", ).toMatch(/invalid thinkinglevel/i); ws.close(); await server.close(); }); test("refuses to start when port already bound", async () => { const { server: blocker, port } = await occupyPort(); await expect(startGatewayServer(port)).rejects.toBeInstanceOf( GatewayLockError, ); await expect(startGatewayServer(port)).rejects.toThrow( /already listening/i, ); blocker.close(); }); test("releases port after close", async () => { const port = await getFreePort(); const server = await startGatewayServer(port); await server.close(); // If the port was released, another listener can bind immediately. const probe = createServer(); await new Promise((resolve, reject) => { probe.once("error", reject); probe.listen(port, "127.0.0.1", () => resolve()); }); await new Promise((resolve, reject) => probe.close((err) => (err ? reject(err) : resolve())), ); }); });