diff --git a/docs/ios/spec.md b/docs/ios/spec.md index cf3d4a080..7d6de8a41 100644 --- a/docs/ios/spec.md +++ b/docs/ios/spec.md @@ -107,7 +107,7 @@ Add to `src/gateway/protocol/schema.ts` (and regenerate Swift models): **Methods** - `node.list` → list paired/connected nodes + capabilities - `node.invoke` → send a command to a specific node - - Params: `{ nodeId, command, params, idempotencyKey }` + - Params: `{ nodeId, command, params?, timeoutMs? }` **Events** - `node.event` → async node status/errors @@ -125,6 +125,14 @@ Result pattern: - Request is a standard `req/res` with `ok` / `error`. - Long operations (loads, streaming drawing, etc.) may also emit `node.event` progress. +#### Current (implemented) +As of 2025-12-13, the Gateway supports `node.invoke` for bridge-connected nodes. + +Example: draw a diagonal line on the iOS Canvas: +```bash +clawdis nodes invoke --node ios-node --command screen.eval --params '{"javaScript":"(() => { const {ctx} = window.__clawdis; ctx.clearRect(0,0,innerWidth,innerHeight); ctx.lineWidth=6; ctx.strokeStyle=\"#ff2d55\"; ctx.beginPath(); ctx.moveTo(40,40); ctx.lineTo(innerWidth-40, innerHeight-40); ctx.stroke(); return \"ok\"; })()"}' +``` + ### Background behavior requirement When iOS is backgrounded: - Voice may still be active (subject to iOS suspension). diff --git a/src/cli/nodes-cli.ts b/src/cli/nodes-cli.ts index e3562fcd5..03e3e7656 100644 --- a/src/cli/nodes-cli.ts +++ b/src/cli/nodes-cli.ts @@ -7,6 +7,10 @@ type NodesRpcOpts = { token?: string; timeout?: string; json?: boolean; + node?: string; + command?: string; + params?: string; + invokeTimeout?: string; }; type PendingRequest = { @@ -207,4 +211,50 @@ export function registerNodesCli(program: Command) { } }), ); + + nodesCallOpts( + nodes + .command("invoke") + .description("Invoke a command on a connected node") + .requiredOption("--node ", "Node id (instanceId)") + .requiredOption("--command ", "Command (e.g. screen.eval)") + .option("--params ", "JSON object string for params") + .option( + "--invoke-timeout ", + "Node invoke timeout in ms (default 15000)", + ) + .action(async (opts: NodesRpcOpts) => { + try { + const nodeId = String(opts.node ?? "").trim(); + const command = String(opts.command ?? "").trim(); + if (!nodeId || !command) { + defaultRuntime.error("--node and --command required"); + defaultRuntime.exit(1); + return; + } + const params = opts.params + ? (JSON.parse(String(opts.params)) as unknown) + : undefined; + const timeoutMs = opts.invokeTimeout + ? Number.parseInt(String(opts.invokeTimeout), 10) + : undefined; + + const invokeParams: Record = { nodeId, command }; + if (params !== undefined) invokeParams.params = params; + if (typeof timeoutMs === "number" && Number.isFinite(timeoutMs)) { + invokeParams.timeoutMs = timeoutMs; + } + + const result = await callGatewayCli( + "node.invoke", + opts, + invokeParams, + ); + defaultRuntime.log(JSON.stringify(result, null, 2)); + } catch (err) { + defaultRuntime.error(`nodes invoke failed: ${String(err)}`); + defaultRuntime.exit(1); + } + }), + ); } diff --git a/src/cli/program.test.ts b/src/cli/program.test.ts index 54e6669c5..c255de4cc 100644 --- a/src/cli/program.test.ts +++ b/src/cli/program.test.ts @@ -91,4 +91,41 @@ describe("cli program", () => { ); expect(runtime.log).toHaveBeenCalled(); }); + + it("runs nodes invoke and calls node.invoke", async () => { + callGateway.mockResolvedValue({ + ok: true, + nodeId: "ios-node", + command: "screen.eval", + payload: { result: "ok" }, + }); + + const program = buildProgram(); + runtime.log.mockClear(); + await program.parseAsync( + [ + "nodes", + "invoke", + "--node", + "ios-node", + "--command", + "screen.eval", + "--params", + '{"javaScript":"1+1"}', + ], + { from: "user" }, + ); + + expect(callGateway).toHaveBeenCalledWith( + expect.objectContaining({ + method: "node.invoke", + params: { + nodeId: "ios-node", + command: "screen.eval", + params: { javaScript: "1+1" }, + }, + }), + ); + expect(runtime.log).toHaveBeenCalled(); + }); }); diff --git a/src/gateway/protocol/index.ts b/src/gateway/protocol/index.ts index 45aa74439..ecec9a7ae 100644 --- a/src/gateway/protocol/index.ts +++ b/src/gateway/protocol/index.ts @@ -36,6 +36,8 @@ import { GatewayFrameSchema, type HelloOk, HelloOkSchema, + type NodeInvokeParams, + NodeInvokeParamsSchema, type NodePairApproveParams, NodePairApproveParamsSchema, type NodePairListParams, @@ -103,6 +105,9 @@ export const validateNodePairRejectParams = ajv.compile( export const validateNodePairVerifyParams = ajv.compile( NodePairVerifyParamsSchema, ); +export const validateNodeInvokeParams = ajv.compile( + NodeInvokeParamsSchema, +); export const validateSessionsListParams = ajv.compile( SessionsListParamsSchema, ); @@ -158,6 +163,7 @@ export { NodePairApproveParamsSchema, NodePairRejectParamsSchema, NodePairVerifyParamsSchema, + NodeInvokeParamsSchema, SessionsListParamsSchema, SessionsPatchParamsSchema, CronJobSchema, @@ -199,6 +205,7 @@ export type { NodePairApproveParams, NodePairRejectParams, NodePairVerifyParams, + NodeInvokeParams, SessionsListParams, SessionsPatchParams, CronJob, diff --git a/src/gateway/protocol/schema.ts b/src/gateway/protocol/schema.ts index 4590223cb..3fa266ded 100644 --- a/src/gateway/protocol/schema.ts +++ b/src/gateway/protocol/schema.ts @@ -242,6 +242,16 @@ export const NodePairVerifyParamsSchema = Type.Object( { additionalProperties: false }, ); +export const NodeInvokeParamsSchema = Type.Object( + { + nodeId: NonEmptyString, + command: NonEmptyString, + params: Type.Optional(Type.Unknown()), + timeoutMs: Type.Optional(Type.Integer({ minimum: 0 })), + }, + { additionalProperties: false }, +); + export const SessionsListParamsSchema = Type.Object( { limit: Type.Optional(Type.Integer({ minimum: 1 })), @@ -496,6 +506,7 @@ export const ProtocolSchemas: Record = { NodePairApproveParams: NodePairApproveParamsSchema, NodePairRejectParams: NodePairRejectParamsSchema, NodePairVerifyParams: NodePairVerifyParamsSchema, + NodeInvokeParams: NodeInvokeParamsSchema, SessionsListParams: SessionsListParamsSchema, SessionsPatchParams: SessionsPatchParamsSchema, CronJob: CronJobSchema, @@ -533,6 +544,7 @@ export type NodePairListParams = Static; export type NodePairApproveParams = Static; export type NodePairRejectParams = Static; export type NodePairVerifyParams = Static; +export type NodeInvokeParams = Static; export type SessionsListParams = Static; export type SessionsPatchParams = Static; export type CronJob = Static; diff --git a/src/gateway/server.test.ts b/src/gateway/server.test.ts index 8784796ea..ae6d9947f 100644 --- a/src/gateway/server.test.ts +++ b/src/gateway/server.test.ts @@ -27,10 +27,24 @@ type BridgeStartOpts = { }; const bridgeStartCalls = vi.hoisted(() => [] as BridgeStartOpts[]); +const bridgeInvoke = vi.hoisted(() => + vi.fn(async () => ({ + type: "invoke-res", + id: "1", + ok: true, + payloadJSON: JSON.stringify({ ok: true }), + error: null, + })), +); vi.mock("../infra/bridge/server.js", () => ({ startNodeBridgeServer: vi.fn(async (opts: BridgeStartOpts) => { bridgeStartCalls.push(opts); - return { port: 0, close: async () => {} }; + return { + port: 18790, + close: async () => {}, + listConnected: () => [], + invoke: bridgeInvoke, + }; }), })); @@ -362,6 +376,54 @@ describe("gateway server", () => { } }); + test("routes node.invoke to the node bridge", async () => { + const homeDir = await fs.mkdtemp(path.join(os.tmpdir(), "clawdis-home-")); + const prevHome = process.env.HOME; + process.env.HOME = homeDir; + + try { + bridgeInvoke.mockResolvedValueOnce({ + type: "invoke-res", + id: "inv-1", + ok: true, + payloadJSON: JSON.stringify({ result: "4" }), + error: null, + }); + + const { server, ws } = await startServerWithClient(); + try { + await connectOk(ws); + + const res = await rpcReq(ws, "node.invoke", { + nodeId: "ios-node", + command: "screen.eval", + params: { javaScript: "2+2" }, + timeoutMs: 123, + }); + expect(res.ok).toBe(true); + + expect(bridgeInvoke).toHaveBeenCalledWith( + expect.objectContaining({ + nodeId: "ios-node", + command: "screen.eval", + paramsJSON: JSON.stringify({ javaScript: "2+2" }), + timeoutMs: 123, + }), + ); + } finally { + ws.close(); + await server.close(); + } + } finally { + await fs.rm(homeDir, { recursive: true, force: true }); + if (prevHome === undefined) { + delete process.env.HOME; + } else { + process.env.HOME = prevHome; + } + } + }); + test("emits presence updates for bridge connect/disconnect", async () => { const homeDir = await fs.mkdtemp(path.join(os.tmpdir(), "clawdis-home-")); const prevHome = process.env.HOME; diff --git a/src/gateway/server.ts b/src/gateway/server.ts index dc4a1ff54..5b20e5ee0 100644 --- a/src/gateway/server.ts +++ b/src/gateway/server.ts @@ -100,6 +100,7 @@ import { validateCronRunsParams, validateCronStatusParams, validateCronUpdateParams, + validateNodeInvokeParams, validateNodePairApproveParams, validateNodePairListParams, validateNodePairRejectParams, @@ -176,6 +177,7 @@ const METHODS = [ "node.pair.approve", "node.pair.reject", "node.pair.verify", + "node.invoke", "cron.list", "cron.status", "cron.add", @@ -2046,6 +2048,100 @@ export async function startGatewayServer( } break; } + case "node.invoke": { + const params = (req.params ?? {}) as Record; + if (!validateNodeInvokeParams(params)) { + respond( + false, + undefined, + errorShape( + ErrorCodes.INVALID_REQUEST, + `invalid node.invoke params: ${formatValidationErrors(validateNodeInvokeParams.errors)}`, + ), + ); + break; + } + if (!bridge) { + respond( + false, + undefined, + errorShape(ErrorCodes.UNAVAILABLE, "bridge not running"), + ); + break; + } + const p = params as { + nodeId: string; + command: string; + params?: unknown; + timeoutMs?: number; + }; + const nodeId = String(p.nodeId ?? "").trim(); + const command = String(p.command ?? "").trim(); + if (!nodeId || !command) { + respond( + false, + undefined, + errorShape( + ErrorCodes.INVALID_REQUEST, + "nodeId and command required", + ), + ); + break; + } + + try { + const paramsJSON = + "params" in p && p.params !== undefined + ? JSON.stringify(p.params) + : null; + const res = await bridge.invoke({ + nodeId, + command, + paramsJSON, + timeoutMs: p.timeoutMs, + }); + if (!res.ok) { + respond( + false, + undefined, + errorShape( + ErrorCodes.UNAVAILABLE, + res.error?.message ?? "node invoke failed", + { details: { nodeError: res.error ?? null } }, + ), + ); + break; + } + const payload = + typeof res.payloadJSON === "string" && res.payloadJSON.trim() + ? (() => { + try { + return JSON.parse(res.payloadJSON) as unknown; + } catch { + return { payloadJSON: res.payloadJSON }; + } + })() + : undefined; + respond( + true, + { + ok: true, + nodeId, + command, + payload, + payloadJSON: res.payloadJSON ?? null, + }, + undefined, + ); + } catch (err) { + respond( + false, + undefined, + errorShape(ErrorCodes.UNAVAILABLE, formatForLog(err)), + ); + } + break; + } case "send": { const p = (req.params ?? {}) as Record; if (!validateSendParams(p)) { diff --git a/src/infra/bridge/server.test.ts b/src/infra/bridge/server.test.ts index 56ac9127f..44a375013 100644 --- a/src/infra/bridge/server.test.ts +++ b/src/infra/bridge/server.test.ts @@ -253,4 +253,87 @@ describe("node bridge server", () => { await server.close(); }); + + it("supports invoke roundtrip to a connected node", async () => { + const server = await startNodeBridgeServer({ + host: "127.0.0.1", + port: 0, + pairingBaseDir: baseDir, + }); + + const socket = net.connect({ host: "127.0.0.1", port: server.port }); + const readLine = createLineReader(socket); + sendLine(socket, { type: "pair-request", nodeId: "n5", platform: "ios" }); + + // Approve the pending request from the gateway side. + let reqId: string | undefined; + for (let i = 0; i < 40; i += 1) { + const list = await listNodePairing(baseDir); + const req = list.pending.find((p) => p.nodeId === "n5"); + if (req) { + reqId = req.requestId; + break; + } + await new Promise((r) => setTimeout(r, 25)); + } + expect(reqId).toBeTruthy(); + if (!reqId) throw new Error("expected a pending requestId"); + await approveNodePairing(reqId, baseDir); + + const pairOk = JSON.parse(await readLine()) as { + type: string; + token?: string; + }; + expect(pairOk.type).toBe("pair-ok"); + expect(typeof pairOk.token).toBe("string"); + if (!pairOk.token) throw new Error("expected pair-ok token"); + const token = pairOk.token; + + const helloOk = JSON.parse(await readLine()) as { type: string }; + expect(helloOk.type).toBe("hello-ok"); + + const responder = (async () => { + while (true) { + const frame = JSON.parse(await readLine()) as { + type: string; + id?: string; + command?: string; + }; + if (frame.type !== "invoke") continue; + sendLine(socket, { + type: "invoke-res", + id: frame.id, + ok: true, + payloadJSON: JSON.stringify({ echo: frame.command }), + }); + break; + } + })(); + + const res = await server.invoke({ + nodeId: "n5", + command: "screen.eval", + paramsJSON: JSON.stringify({ javaScript: "1+1" }), + timeoutMs: 3000, + }); + + expect(res.ok).toBe(true); + const payload = JSON.parse(String(res.payloadJSON ?? "null")) as { + echo?: string; + }; + expect(payload.echo).toBe("screen.eval"); + + await responder; + socket.destroy(); + + // Ensure invoke works only for connected nodes (hello with token on a new socket). + const socket2 = net.connect({ host: "127.0.0.1", port: server.port }); + const readLine2 = createLineReader(socket2); + sendLine(socket2, { type: "hello", nodeId: "n5", token }); + const hello2 = JSON.parse(await readLine2()) as { type: string }; + expect(hello2.type).toBe("hello-ok"); + socket2.destroy(); + + await server.close(); + }); }); diff --git a/src/infra/bridge/server.ts b/src/infra/bridge/server.ts index ffd68897d..dba596f38 100644 --- a/src/infra/bridge/server.ts +++ b/src/infra/bridge/server.ts @@ -1,3 +1,4 @@ +import { randomUUID } from "node:crypto"; import net from "node:net"; import os from "node:os"; @@ -36,6 +37,13 @@ type BridgeEventFrame = { type BridgePingFrame = { type: "ping"; id: string }; type BridgePongFrame = { type: "pong"; id: string }; +type BridgeInvokeRequestFrame = { + type: "invoke"; + id: string; + command: string; + paramsJSON?: string | null; +}; + type BridgeInvokeResponseFrame = { type: "invoke-res"; id: string; @@ -54,6 +62,7 @@ type AnyBridgeFrame = | BridgeEventFrame | BridgePingFrame | BridgePongFrame + | BridgeInvokeRequestFrame | BridgeInvokeResponseFrame | BridgeHelloOkFrame | BridgePairOkFrame @@ -63,6 +72,13 @@ type AnyBridgeFrame = export type NodeBridgeServer = { port: number; close: () => Promise; + invoke: (opts: { + nodeId: string; + command: string; + paramsJSON?: string | null; + timeoutMs?: number; + }) => Promise; + listConnected: () => NodeBridgeClientInfo[]; }; export type NodeBridgeClientInfo = { @@ -105,6 +121,10 @@ export async function startNodeBridgeServer( return { port: 0, close: async () => {}, + invoke: async () => { + throw new Error("bridge disabled in tests"); + }, + listConnected: () => [], }; } @@ -113,7 +133,20 @@ export async function startNodeBridgeServer( ? opts.serverName.trim() : os.hostname(); - const connections = new Map(); + type ConnectionState = { + socket: net.Socket; + nodeInfo: NodeBridgeClientInfo; + invokeWaiters: Map< + string, + { + resolve: (value: BridgeInvokeResponseFrame) => void; + reject: (err: Error) => void; + timer: ReturnType; + } + >; + }; + + const connections = new Map(); const server = net.createServer((socket) => { socket.setNoDelay(true); @@ -127,17 +160,22 @@ export async function startNodeBridgeServer( { resolve: (value: BridgeInvokeResponseFrame) => void; reject: (err: Error) => void; + timer: ReturnType; } >(); const abort = new AbortController(); const stop = () => { if (!abort.signal.aborted) abort.abort(); - if (nodeId) connections.delete(nodeId); for (const [, waiter] of invokeWaiters) { + clearTimeout(waiter.timer); waiter.reject(new Error("bridge connection closed")); } invokeWaiters.clear(); + if (nodeId) { + const existing = connections.get(nodeId); + if (existing?.socket === socket) connections.delete(nodeId); + } }; const send = (frame: AnyBridgeFrame) => { @@ -182,7 +220,14 @@ export async function startNodeBridgeServer( } isAuthenticated = true; - connections.set(nodeId, socket); + const existing = connections.get(nodeId); + if (existing?.socket && existing.socket !== socket) { + try { + existing.socket.destroy(); + } catch { + /* ignore */ + } + } nodeInfo = { nodeId, displayName: verified.node.displayName ?? hello.displayName, @@ -190,6 +235,7 @@ export async function startNodeBridgeServer( version: verified.node.version ?? hello.version, remoteIp: remoteAddress, }; + connections.set(nodeId, { socket, nodeInfo, invokeWaiters }); send({ type: "hello-ok", serverName } satisfies BridgeHelloOkFrame); await opts.onAuthenticated?.(nodeInfo); }; @@ -258,7 +304,14 @@ export async function startNodeBridgeServer( } isAuthenticated = true; - connections.set(nodeId, socket); + const existing = connections.get(nodeId); + if (existing?.socket && existing.socket !== socket) { + try { + existing.socket.destroy(); + } catch { + /* ignore */ + } + } nodeInfo = { nodeId, displayName: req.displayName, @@ -266,6 +319,7 @@ export async function startNodeBridgeServer( version: req.version, remoteIp: remoteAddress, }; + connections.set(nodeId, { socket, nodeInfo, invokeWaiters }); send({ type: "pair-ok", token: wait.token } satisfies BridgePairOkFrame); send({ type: "hello-ok", serverName } satisfies BridgeHelloOkFrame); await opts.onAuthenticated?.(nodeInfo); @@ -331,10 +385,16 @@ export async function startNodeBridgeServer( const waiter = invokeWaiters.get(res.id); if (waiter) { invokeWaiters.delete(res.id); + clearTimeout(waiter.timer); waiter.resolve(res); } break; } + case "invoke": { + // Direction is gateway -> node only. + sendError("INVALID_REQUEST", "invoke not allowed from node"); + break; + } case "pong": // ignore break; @@ -372,7 +432,7 @@ export async function startNodeBridgeServer( close: async () => { for (const sock of connections.values()) { try { - sock.destroy(); + sock.socket.destroy(); } catch { /* ignore */ } @@ -382,5 +442,52 @@ export async function startNodeBridgeServer( server.close((err) => (err ? reject(err) : resolve())), ); }, + listConnected: () => [...connections.values()].map((c) => c.nodeInfo), + invoke: async ({ nodeId, command, paramsJSON, timeoutMs }) => { + const normalizedNodeId = String(nodeId ?? "").trim(); + const normalizedCommand = String(command ?? "").trim(); + if (!normalizedNodeId) { + throw new Error("INVALID_REQUEST: nodeId required"); + } + if (!normalizedCommand) { + throw new Error("INVALID_REQUEST: command required"); + } + + const conn = connections.get(normalizedNodeId); + if (!conn) { + throw new Error( + `UNAVAILABLE: node not connected (${normalizedNodeId})`, + ); + } + + const id = randomUUID(); + const timeout = Number.isFinite(timeoutMs) ? Number(timeoutMs) : 15_000; + + return await new Promise((resolve, reject) => { + const timer = setTimeout( + () => { + conn.invokeWaiters.delete(id); + reject(new Error("UNAVAILABLE: invoke timeout")); + }, + Math.max(0, timeout), + ); + + conn.invokeWaiters.set(id, { resolve, reject, timer }); + try { + conn.socket.write( + encodeLine({ + type: "invoke", + id, + command: normalizedCommand, + paramsJSON: paramsJSON ?? null, + } satisfies BridgeInvokeRequestFrame), + ); + } catch (err) { + conn.invokeWaiters.delete(id); + clearTimeout(timer); + reject(err instanceof Error ? err : new Error(String(err))); + } + }); + }, }; }