diff --git a/src/gateway/server.test.ts b/src/gateway/server.test.ts index dccc02c6a..9cc635248 100644 --- a/src/gateway/server.test.ts +++ b/src/gateway/server.test.ts @@ -24,6 +24,17 @@ type BridgeStartOpts = { onAuthenticated?: (node: BridgeClientInfo) => Promise | void; onDisconnected?: (node: BridgeClientInfo) => Promise | void; onPairRequested?: (request: unknown) => Promise | void; + onEvent?: ( + nodeId: string, + evt: { event: string; payloadJSON?: string | null }, + ) => Promise | void; + onRequest?: ( + nodeId: string, + req: { id: string; method: string; paramsJSON?: string | null }, + ) => Promise< + | { ok: true; payloadJSON?: string | null } + | { ok: false; error: { code: string; message: string; details?: unknown } } + >; }; const bridgeStartCalls = vi.hoisted(() => [] as BridgeStartOpts[]); @@ -36,6 +47,7 @@ const bridgeInvoke = vi.hoisted(() => error: null, })), ); +const bridgeSendEvent = vi.hoisted(() => vi.fn()); vi.mock("../infra/bridge/server.js", () => ({ startNodeBridgeServer: vi.fn(async (opts: BridgeStartOpts) => { bridgeStartCalls.push(opts); @@ -44,6 +56,7 @@ vi.mock("../infra/bridge/server.js", () => ({ close: async () => {}, listConnected: () => [], invoke: bridgeInvoke, + sendEvent: bridgeSendEvent, }; }), })); @@ -1663,6 +1676,138 @@ describe("gateway server", () => { await server.close(); }); + test("bridge RPC chat.history returns session messages", async () => { + const dir = await fs.mkdtemp(path.join(os.tmpdir(), "clawdis-gw-")); + testSessionStorePath = path.join(dir, "sessions.json"); + await fs.writeFile( + testSessionStorePath, + JSON.stringify( + { + main: { + sessionId: "sess-main", + updatedAt: Date.now(), + }, + }, + null, + 2, + ), + "utf-8", + ); + + await fs.writeFile( + path.join(dir, "sess-main.jsonl"), + [ + JSON.stringify({ + message: { + role: "user", + content: [{ type: "text", text: "hi" }], + timestamp: Date.now(), + }, + }), + ].join("\n"), + "utf-8", + ); + + const port = await getFreePort(); + const server = await startGatewayServer(port); + const bridgeCall = bridgeStartCalls.at(-1); + expect(bridgeCall?.onRequest).toBeDefined(); + + const res = await bridgeCall?.onRequest?.("ios-node", { + id: "r1", + method: "chat.history", + paramsJSON: JSON.stringify({ sessionKey: "main" }), + }); + + expect(res?.ok).toBe(true); + const payload = JSON.parse( + String((res as { payloadJSON?: string }).payloadJSON ?? "{}"), + ) as { + sessionKey?: string; + sessionId?: string; + messages?: unknown[]; + }; + expect(payload.sessionKey).toBe("main"); + expect(payload.sessionId).toBe("sess-main"); + expect(Array.isArray(payload.messages)).toBe(true); + expect(payload.messages?.length).toBeGreaterThan(0); + + await server.close(); + }); + + test("bridge chat events are pushed to subscribed nodes", async () => { + const dir = await fs.mkdtemp(path.join(os.tmpdir(), "clawdis-gw-")); + testSessionStorePath = path.join(dir, "sessions.json"); + await fs.writeFile( + testSessionStorePath, + JSON.stringify( + { + main: { + sessionId: "sess-main", + updatedAt: Date.now(), + }, + }, + null, + 2, + ), + "utf-8", + ); + + const port = await getFreePort(); + const server = await startGatewayServer(port); + const bridgeCall = bridgeStartCalls.at(-1); + expect(bridgeCall?.onEvent).toBeDefined(); + expect(bridgeCall?.onRequest).toBeDefined(); + + // Subscribe the node to chat events for main. + await bridgeCall?.onEvent?.("ios-node", { + event: "chat.subscribe", + payloadJSON: JSON.stringify({ sessionKey: "main" }), + }); + + bridgeSendEvent.mockClear(); + + // Trigger a chat.send, then simulate agent bus completion for the sessionId. + const reqRes = await bridgeCall?.onRequest?.("ios-node", { + id: "s1", + method: "chat.send", + paramsJSON: JSON.stringify({ + sessionKey: "main", + message: "hello", + idempotencyKey: "idem-bridge-chat", + timeoutMs: 30_000, + }), + }); + expect(reqRes?.ok).toBe(true); + + emitAgentEvent({ + runId: "sess-main", + seq: 1, + ts: Date.now(), + stream: "assistant", + data: { text: "hi from agent" }, + }); + emitAgentEvent({ + runId: "sess-main", + seq: 2, + ts: Date.now(), + stream: "job", + data: { state: "done" }, + }); + + // Wait a tick for the bridge send to happen. + await new Promise((r) => setTimeout(r, 25)); + + expect(bridgeSendEvent).toHaveBeenCalledWith( + expect.objectContaining({ + nodeId: "ios-node", + event: "chat", + }), + ); + + await server.close(); + }); + test("presence includes client fingerprint", async () => { const { server, ws } = await startServerWithClient(); await connectOk(ws, { diff --git a/src/gateway/server.ts b/src/gateway/server.ts index 0494f778a..808cdf9c6 100644 --- a/src/gateway/server.ts +++ b/src/gateway/server.ts @@ -492,6 +492,8 @@ export async function startGatewayServer( const httpServer: HttpServer = createHttpServer(); let bonjourStop: (() => Promise) | null = null; let bridge: Awaited> | null = null; + const bridgeNodeSubscriptions = new Map>(); + const bridgeSessionSubscribers = new Map>(); try { await new Promise((resolve, reject) => { const onError = (err: NodeJS.ErrnoException) => { @@ -677,6 +679,311 @@ export async function startGatewayServer( : 18790; const bridgeEnabled = process.env.CLAWDIS_BRIDGE_ENABLED !== "0"; + const bridgeSubscribe = (nodeId: string, sessionKey: string) => { + const normalizedNodeId = nodeId.trim(); + const normalizedSessionKey = sessionKey.trim(); + if (!normalizedNodeId || !normalizedSessionKey) return; + + let nodeSet = bridgeNodeSubscriptions.get(normalizedNodeId); + if (!nodeSet) { + nodeSet = new Set(); + bridgeNodeSubscriptions.set(normalizedNodeId, nodeSet); + } + if (nodeSet.has(normalizedSessionKey)) return; + nodeSet.add(normalizedSessionKey); + + let sessionSet = bridgeSessionSubscribers.get(normalizedSessionKey); + if (!sessionSet) { + sessionSet = new Set(); + bridgeSessionSubscribers.set(normalizedSessionKey, sessionSet); + } + sessionSet.add(normalizedNodeId); + }; + + const bridgeUnsubscribe = (nodeId: string, sessionKey: string) => { + const normalizedNodeId = nodeId.trim(); + const normalizedSessionKey = sessionKey.trim(); + if (!normalizedNodeId || !normalizedSessionKey) return; + + const nodeSet = bridgeNodeSubscriptions.get(normalizedNodeId); + nodeSet?.delete(normalizedSessionKey); + if (nodeSet?.size === 0) bridgeNodeSubscriptions.delete(normalizedNodeId); + + const sessionSet = bridgeSessionSubscribers.get(normalizedSessionKey); + sessionSet?.delete(normalizedNodeId); + if (sessionSet?.size === 0) + bridgeSessionSubscribers.delete(normalizedSessionKey); + }; + + const bridgeUnsubscribeAll = (nodeId: string) => { + const normalizedNodeId = nodeId.trim(); + const nodeSet = bridgeNodeSubscriptions.get(normalizedNodeId); + if (!nodeSet) return; + for (const sessionKey of nodeSet) { + const sessionSet = bridgeSessionSubscribers.get(sessionKey); + sessionSet?.delete(normalizedNodeId); + if (sessionSet?.size === 0) bridgeSessionSubscribers.delete(sessionKey); + } + bridgeNodeSubscriptions.delete(normalizedNodeId); + }; + + const bridgeSendToSession = ( + sessionKey: string, + event: string, + payload: unknown, + ) => { + const normalizedSessionKey = sessionKey.trim(); + if (!normalizedSessionKey) return; + const subs = bridgeSessionSubscribers.get(normalizedSessionKey); + if (!subs || subs.size === 0) return; + if (!bridge) return; + + const payloadJSON = payload ? JSON.stringify(payload) : null; + for (const nodeId of subs) { + bridge.sendEvent({ nodeId, event, payloadJSON }); + } + }; + + const bridgeSendToAllSubscribed = (event: string, payload: unknown) => { + if (!bridge) return; + const payloadJSON = payload ? JSON.stringify(payload) : null; + for (const nodeId of bridgeNodeSubscriptions.keys()) { + bridge.sendEvent({ nodeId, event, payloadJSON }); + } + }; + + const handleBridgeRequest = async ( + nodeId: string, + req: { id: string; method: string; paramsJSON?: string | null }, + ): Promise< + | { ok: true; payloadJSON?: string | null } + | { ok: false; error: { code: string; message: string; details?: unknown } } + > => { + const method = req.method.trim(); + + const parseParams = (): Record => { + const raw = typeof req.paramsJSON === "string" ? req.paramsJSON : ""; + const trimmed = raw.trim(); + if (!trimmed) return {}; + const parsed = JSON.parse(trimmed) as unknown; + return typeof parsed === "object" && parsed !== null + ? (parsed as Record) + : {}; + }; + + try { + switch (method) { + case "health": { + const now = Date.now(); + const cached = healthCache; + if (cached && now - cached.ts < HEALTH_REFRESH_INTERVAL_MS) { + return { ok: true, payloadJSON: JSON.stringify(cached) }; + } + const snap = await refreshHealthSnapshot({ probe: false }); + return { ok: true, payloadJSON: JSON.stringify(snap) }; + } + case "chat.history": { + const params = parseParams(); + if (!validateChatHistoryParams(params)) { + return { + ok: false, + error: { + code: ErrorCodes.INVALID_REQUEST, + message: `invalid chat.history params: ${formatValidationErrors(validateChatHistoryParams.errors)}`, + }, + }; + } + const { sessionKey } = params as { sessionKey: string }; + const { storePath, entry } = loadSessionEntry(sessionKey); + const sessionId = entry?.sessionId; + const messages = + sessionId && storePath + ? readSessionMessages(sessionId, storePath) + : []; + const thinkingLevel = + entry?.thinkingLevel ?? + loadConfig().inbound?.reply?.thinkingDefault ?? + "off"; + return { + ok: true, + payloadJSON: JSON.stringify({ + sessionKey, + sessionId, + messages, + thinkingLevel, + }), + }; + } + case "chat.send": { + const params = parseParams(); + if (!validateChatSendParams(params)) { + return { + ok: false, + error: { + code: ErrorCodes.INVALID_REQUEST, + message: `invalid chat.send params: ${formatValidationErrors(validateChatSendParams.errors)}`, + }, + }; + } + + const p = params as { + sessionKey: string; + message: string; + thinking?: string; + deliver?: boolean; + attachments?: Array<{ + type?: string; + mimeType?: string; + fileName?: string; + content?: unknown; + }>; + timeoutMs?: number; + idempotencyKey: string; + }; + const timeoutMs = Math.min( + Math.max(p.timeoutMs ?? 30_000, 0), + 30_000, + ); + const normalizedAttachments = + p.attachments?.map((a) => ({ + type: typeof a?.type === "string" ? a.type : undefined, + mimeType: + typeof a?.mimeType === "string" ? a.mimeType : undefined, + fileName: + typeof a?.fileName === "string" ? a.fileName : undefined, + content: + typeof a?.content === "string" + ? a.content + : ArrayBuffer.isView(a?.content) + ? Buffer.from( + a.content.buffer, + a.content.byteOffset, + a.content.byteLength, + ).toString("base64") + : undefined, + })) ?? []; + + let messageWithAttachments = p.message; + if (normalizedAttachments.length > 0) { + try { + messageWithAttachments = buildMessageWithAttachments( + p.message, + normalizedAttachments, + { maxBytes: 5_000_000 }, + ); + } catch (err) { + return { + ok: false, + error: { + code: ErrorCodes.INVALID_REQUEST, + message: String(err), + }, + }; + } + } + + const { storePath, store, entry } = loadSessionEntry(p.sessionKey); + const now = Date.now(); + const sessionId = entry?.sessionId ?? randomUUID(); + const sessionEntry: SessionEntry = { + sessionId, + updatedAt: now, + thinkingLevel: entry?.thinkingLevel, + verboseLevel: entry?.verboseLevel, + systemSent: entry?.systemSent, + lastChannel: entry?.lastChannel, + lastTo: entry?.lastTo, + }; + if (store) { + store[p.sessionKey] = sessionEntry; + if (storePath) { + await saveSessionStore(storePath, store); + } + } + + const clientRunId = p.idempotencyKey; + chatRunSessions.set(sessionId, { + sessionKey: p.sessionKey, + clientRunId, + }); + + const cached = dedupe.get(`chat:${clientRunId}`); + if (cached) { + if (cached.ok) { + return { ok: true, payloadJSON: JSON.stringify(cached.payload) }; + } + return { + ok: false, + error: cached.error ?? { + code: ErrorCodes.UNAVAILABLE, + message: "request failed", + }, + }; + } + + try { + await agentCommand( + { + message: messageWithAttachments, + sessionId, + thinking: p.thinking, + deliver: p.deliver, + timeout: Math.ceil(timeoutMs / 1000).toString(), + surface: `Iris(${nodeId})`, + }, + defaultRuntime, + deps, + ); + const payload = { + runId: clientRunId, + status: "ok" as const, + }; + dedupe.set(`chat:${clientRunId}`, { + ts: Date.now(), + ok: true, + payload, + }); + return { ok: true, payloadJSON: JSON.stringify(payload) }; + } catch (err) { + const error = errorShape(ErrorCodes.UNAVAILABLE, String(err)); + const payload = { + runId: clientRunId, + status: "error" as const, + summary: String(err), + }; + dedupe.set(`chat:${clientRunId}`, { + ts: Date.now(), + ok: false, + payload, + error, + }); + return { + ok: false, + error: error ?? { + code: ErrorCodes.UNAVAILABLE, + message: String(err), + }, + }; + } + } + default: + return { + ok: false, + error: { + code: "FORBIDDEN", + message: "Method not allowed", + details: { method }, + }, + }; + } + } catch (err) { + return { + ok: false, + error: { code: ErrorCodes.INVALID_REQUEST, message: String(err) }, + }; + } + }; + const handleBridgeEvent = async ( nodeId: string, evt: { event: string; payloadJSON?: string | null }, @@ -807,6 +1114,42 @@ export async function startGatewayServer( }); return; } + case "chat.subscribe": { + if (!evt.payloadJSON) return; + let payload: unknown; + try { + payload = JSON.parse(evt.payloadJSON) as unknown; + } catch { + return; + } + const obj = + typeof payload === "object" && payload !== null + ? (payload as Record) + : {}; + const sessionKey = + typeof obj.sessionKey === "string" ? obj.sessionKey.trim() : ""; + if (!sessionKey) return; + bridgeSubscribe(nodeId, sessionKey); + return; + } + case "chat.unsubscribe": { + if (!evt.payloadJSON) return; + let payload: unknown; + try { + payload = JSON.parse(evt.payloadJSON) as unknown; + } catch { + return; + } + const obj = + typeof payload === "object" && payload !== null + ? (payload as Record) + : {}; + const sessionKey = + typeof obj.sessionKey === "string" ? obj.sessionKey.trim() : ""; + if (!sessionKey) return; + bridgeUnsubscribe(nodeId, sessionKey); + return; + } default: return; } @@ -820,6 +1163,7 @@ export async function startGatewayServer( host: bridgeHost, port: bridgePort, serverName: machineDisplayName, + onRequest: (nodeId, req) => handleBridgeRequest(nodeId, req), onAuthenticated: (node) => { const host = node.displayName?.trim() || node.nodeId; const ip = node.remoteIp?.trim(); @@ -851,6 +1195,7 @@ export async function startGatewayServer( ); }, onDisconnected: (node) => { + bridgeUnsubscribeAll(node.nodeId); const host = node.displayName?.trim() || node.nodeId; const ip = node.remoteIp?.trim(); const version = node.version?.trim() || "unknown"; @@ -924,11 +1269,14 @@ export async function startGatewayServer( broadcast("health", snap, { stateVersion: { presence: presenceVersion, health: healthVersion }, }); + bridgeSendToAllSubscribed("health", snap); }; // periodic keepalive const tickInterval = setInterval(() => { - broadcast("tick", { ts: Date.now() }, { dropIfSlow: true }); + const payload = { ts: Date.now() }; + broadcast("tick", payload, { dropIfSlow: true }); + bridgeSendToAllSubscribed("tick", payload); }, TICK_INTERVAL_MS); // periodic health refresh to keep cached snapshot warm @@ -997,7 +1345,7 @@ export async function startGatewayServer( const text = chatRunBuffers.get(clientRunId)?.join("\n").trim() ?? ""; chatRunBuffers.delete(clientRunId); if (evt.data.state === "done") { - broadcast("chat", { + const payload = { ...base, state: "final", message: text @@ -1007,13 +1355,17 @@ export async function startGatewayServer( timestamp: Date.now(), } : undefined, - }); + }; + broadcast("chat", payload); + bridgeSendToSession(sessionKey, "chat", payload); } else { - broadcast("chat", { + const payload = { ...base, state: "error", errorMessage: evt.data.error ? String(evt.data.error) : undefined, - }); + }; + broadcast("chat", payload); + bridgeSendToSession(sessionKey, "chat", payload); } chatRunSessions.delete(evt.runId); } diff --git a/src/infra/bridge/server.test.ts b/src/infra/bridge/server.test.ts index 44a375013..dedeee960 100644 --- a/src/infra/bridge/server.test.ts +++ b/src/infra/bridge/server.test.ts @@ -153,6 +153,72 @@ describe("node bridge server", () => { await server.close(); }); + it("handles req/res RPC after authentication", async () => { + let lastRequest: { nodeId?: string; id?: string; method?: string } | null = + null; + + const server = await startNodeBridgeServer({ + host: "127.0.0.1", + port: 0, + pairingBaseDir: baseDir, + onRequest: async (nodeId, req) => { + lastRequest = { nodeId, id: req.id, method: req.method }; + return { ok: true, payloadJSON: JSON.stringify({ ok: true }) }; + }, + }); + + const socket = net.connect({ host: "127.0.0.1", port: server.port }); + const readLine = createLineReader(socket); + sendLine(socket, { + type: "pair-request", + nodeId: "n3-rpc", + platform: "ios", + }); + + // Approve the pending request from the gateway side. + let reqId: string | undefined; + for (let i = 0; i < 40; i += 1) { + const list = await listNodePairing(baseDir); + const req = list.pending.find((p) => p.nodeId === "n3-rpc"); + if (req) { + reqId = req.requestId; + break; + } + await new Promise((r) => setTimeout(r, 25)); + } + expect(reqId).toBeTruthy(); + if (!reqId) throw new Error("expected a pending requestId"); + await approveNodePairing(reqId, baseDir); + + const line1 = JSON.parse(await readLine()) as { type: string }; + expect(line1.type).toBe("pair-ok"); + const line2 = JSON.parse(await readLine()) as { type: string }; + expect(line2.type).toBe("hello-ok"); + + sendLine(socket, { type: "req", id: "r1", method: "health" }); + const res = JSON.parse(await readLine()) as { + type: string; + id?: string; + ok?: boolean; + payloadJSON?: string | null; + error?: unknown; + }; + expect(res.type).toBe("res"); + expect(res.id).toBe("r1"); + expect(res.ok).toBe(true); + expect(res.payloadJSON).toBe(JSON.stringify({ ok: true })); + expect(res.error).toBeUndefined(); + + expect(lastRequest).toEqual({ + nodeId: "n3-rpc", + id: "r1", + method: "health", + }); + + socket.destroy(); + await server.close(); + }); + it("passes node metadata to onAuthenticated and onDisconnected", async () => { let lastAuthed: { nodeId?: string; diff --git a/src/infra/bridge/server.ts b/src/infra/bridge/server.ts index dba596f38..323e1d6e5 100644 --- a/src/infra/bridge/server.ts +++ b/src/infra/bridge/server.ts @@ -34,6 +34,21 @@ type BridgeEventFrame = { payloadJSON?: string | null; }; +type BridgeRPCRequestFrame = { + type: "req"; + id: string; + method: string; + paramsJSON?: string | null; +}; + +type BridgeRPCResponseFrame = { + type: "res"; + id: string; + ok: boolean; + payloadJSON?: string | null; + error?: { code: string; message: string; details?: unknown } | null; +}; + type BridgePingFrame = { type: "ping"; id: string }; type BridgePongFrame = { type: "pong"; id: string }; @@ -60,6 +75,8 @@ type AnyBridgeFrame = | BridgeHelloFrame | BridgePairRequestFrame | BridgeEventFrame + | BridgeRPCRequestFrame + | BridgeRPCResponseFrame | BridgePingFrame | BridgePongFrame | BridgeInvokeRequestFrame @@ -78,6 +95,11 @@ export type NodeBridgeServer = { paramsJSON?: string | null; timeoutMs?: number; }) => Promise; + sendEvent: (opts: { + nodeId: string; + event: string; + payloadJSON?: string | null; + }) => void; listConnected: () => NodeBridgeClientInfo[]; }; @@ -94,6 +116,13 @@ export type NodeBridgeServerOpts = { port: number; // 0 = ephemeral pairingBaseDir?: string; onEvent?: (nodeId: string, evt: BridgeEventFrame) => Promise | void; + onRequest?: ( + nodeId: string, + req: BridgeRPCRequestFrame, + ) => Promise< + | { ok: true; payloadJSON?: string | null } + | { ok: false; error: { code: string; message: string; details?: unknown } } + >; onAuthenticated?: (node: NodeBridgeClientInfo) => Promise | void; onDisconnected?: (node: NodeBridgeClientInfo) => Promise | void; onPairRequested?: ( @@ -124,6 +153,7 @@ export async function startNodeBridgeServer( invoke: async () => { throw new Error("bridge disabled in tests"); }, + sendEvent: () => {}, listConnected: () => [], }; } @@ -333,6 +363,71 @@ export async function startNodeBridgeServer( await opts.onEvent?.(nodeId, evt); }; + const handleRequest = async (req: BridgeRPCRequestFrame) => { + if (!isAuthenticated || !nodeId) { + send({ + type: "res", + id: String(req.id ?? ""), + ok: false, + error: { code: "UNAUTHORIZED", message: "not authenticated" }, + } satisfies BridgeRPCResponseFrame); + return; + } + + if (!opts.onRequest) { + send({ + type: "res", + id: String(req.id ?? ""), + ok: false, + error: { code: "UNAVAILABLE", message: "RPC not supported" }, + } satisfies BridgeRPCResponseFrame); + return; + } + + const id = String(req.id ?? ""); + const method = String(req.method ?? ""); + if (!id || !method) { + send({ + type: "res", + id: id || "invalid", + ok: false, + error: { code: "INVALID_REQUEST", message: "id and method required" }, + } satisfies BridgeRPCResponseFrame); + return; + } + + try { + const result = await opts.onRequest(nodeId, { + type: "req", + id, + method, + paramsJSON: req.paramsJSON ?? null, + }); + if (result.ok) { + send({ + type: "res", + id, + ok: true, + payloadJSON: result.payloadJSON ?? null, + } satisfies BridgeRPCResponseFrame); + } else { + send({ + type: "res", + id, + ok: false, + error: result.error, + } satisfies BridgeRPCResponseFrame); + } + } catch (err) { + send({ + type: "res", + id, + ok: false, + error: { code: "UNAVAILABLE", message: String(err) }, + } satisfies BridgeRPCResponseFrame); + } + }; + socket.on("data", (chunk) => { buffer += chunk.toString("utf8"); while (true) { @@ -364,6 +459,9 @@ export async function startNodeBridgeServer( case "event": await handleEvent(frame as BridgeEventFrame); break; + case "req": + await handleRequest(frame as BridgeRPCRequestFrame); + break; case "ping": { if (!isAuthenticated) { sendError("UNAUTHORIZED", "not authenticated"); @@ -395,6 +493,10 @@ export async function startNodeBridgeServer( sendError("INVALID_REQUEST", "invoke not allowed from node"); break; } + case "res": + // Direction is node -> gateway only. + sendError("INVALID_REQUEST", "res not allowed from node"); + break; case "pong": // ignore break; @@ -443,6 +545,24 @@ export async function startNodeBridgeServer( ); }, listConnected: () => [...connections.values()].map((c) => c.nodeInfo), + sendEvent: ({ nodeId, event, payloadJSON }) => { + const normalizedNodeId = String(nodeId ?? "").trim(); + const normalizedEvent = String(event ?? "").trim(); + if (!normalizedNodeId || !normalizedEvent) return; + const conn = connections.get(normalizedNodeId); + if (!conn) return; + try { + conn.socket.write( + encodeLine({ + type: "event", + event: normalizedEvent, + payloadJSON: payloadJSON ?? null, + } satisfies BridgeEventFrame), + ); + } catch { + // ignore + } + }, invoke: async ({ nodeId, command, paramsJSON, timeoutMs }) => { const normalizedNodeId = String(nodeId ?? "").trim(); const normalizedCommand = String(command ?? "").trim();