feat(gateway): add node.invoke for iOS canvas

This commit is contained in:
Peter Steinberger
2025-12-13 23:45:16 +00:00
parent b01cb41950
commit 3fcee21ff7
9 changed files with 469 additions and 7 deletions

View File

@@ -7,6 +7,10 @@ type NodesRpcOpts = {
token?: string;
timeout?: string;
json?: boolean;
node?: string;
command?: string;
params?: string;
invokeTimeout?: string;
};
type PendingRequest = {
@@ -207,4 +211,50 @@ export function registerNodesCli(program: Command) {
}
}),
);
nodesCallOpts(
nodes
.command("invoke")
.description("Invoke a command on a connected node")
.requiredOption("--node <nodeId>", "Node id (instanceId)")
.requiredOption("--command <command>", "Command (e.g. screen.eval)")
.option("--params <json>", "JSON object string for params")
.option(
"--invoke-timeout <ms>",
"Node invoke timeout in ms (default 15000)",
)
.action(async (opts: NodesRpcOpts) => {
try {
const nodeId = String(opts.node ?? "").trim();
const command = String(opts.command ?? "").trim();
if (!nodeId || !command) {
defaultRuntime.error("--node and --command required");
defaultRuntime.exit(1);
return;
}
const params = opts.params
? (JSON.parse(String(opts.params)) as unknown)
: undefined;
const timeoutMs = opts.invokeTimeout
? Number.parseInt(String(opts.invokeTimeout), 10)
: undefined;
const invokeParams: Record<string, unknown> = { nodeId, command };
if (params !== undefined) invokeParams.params = params;
if (typeof timeoutMs === "number" && Number.isFinite(timeoutMs)) {
invokeParams.timeoutMs = timeoutMs;
}
const result = await callGatewayCli(
"node.invoke",
opts,
invokeParams,
);
defaultRuntime.log(JSON.stringify(result, null, 2));
} catch (err) {
defaultRuntime.error(`nodes invoke failed: ${String(err)}`);
defaultRuntime.exit(1);
}
}),
);
}

View File

@@ -91,4 +91,41 @@ describe("cli program", () => {
);
expect(runtime.log).toHaveBeenCalled();
});
it("runs nodes invoke and calls node.invoke", async () => {
callGateway.mockResolvedValue({
ok: true,
nodeId: "ios-node",
command: "screen.eval",
payload: { result: "ok" },
});
const program = buildProgram();
runtime.log.mockClear();
await program.parseAsync(
[
"nodes",
"invoke",
"--node",
"ios-node",
"--command",
"screen.eval",
"--params",
'{"javaScript":"1+1"}',
],
{ from: "user" },
);
expect(callGateway).toHaveBeenCalledWith(
expect.objectContaining({
method: "node.invoke",
params: {
nodeId: "ios-node",
command: "screen.eval",
params: { javaScript: "1+1" },
},
}),
);
expect(runtime.log).toHaveBeenCalled();
});
});

View File

@@ -36,6 +36,8 @@ import {
GatewayFrameSchema,
type HelloOk,
HelloOkSchema,
type NodeInvokeParams,
NodeInvokeParamsSchema,
type NodePairApproveParams,
NodePairApproveParamsSchema,
type NodePairListParams,
@@ -103,6 +105,9 @@ export const validateNodePairRejectParams = ajv.compile<NodePairRejectParams>(
export const validateNodePairVerifyParams = ajv.compile<NodePairVerifyParams>(
NodePairVerifyParamsSchema,
);
export const validateNodeInvokeParams = ajv.compile<NodeInvokeParams>(
NodeInvokeParamsSchema,
);
export const validateSessionsListParams = ajv.compile<SessionsListParams>(
SessionsListParamsSchema,
);
@@ -158,6 +163,7 @@ export {
NodePairApproveParamsSchema,
NodePairRejectParamsSchema,
NodePairVerifyParamsSchema,
NodeInvokeParamsSchema,
SessionsListParamsSchema,
SessionsPatchParamsSchema,
CronJobSchema,
@@ -199,6 +205,7 @@ export type {
NodePairApproveParams,
NodePairRejectParams,
NodePairVerifyParams,
NodeInvokeParams,
SessionsListParams,
SessionsPatchParams,
CronJob,

View File

@@ -242,6 +242,16 @@ export const NodePairVerifyParamsSchema = Type.Object(
{ additionalProperties: false },
);
export const NodeInvokeParamsSchema = Type.Object(
{
nodeId: NonEmptyString,
command: NonEmptyString,
params: Type.Optional(Type.Unknown()),
timeoutMs: Type.Optional(Type.Integer({ minimum: 0 })),
},
{ additionalProperties: false },
);
export const SessionsListParamsSchema = Type.Object(
{
limit: Type.Optional(Type.Integer({ minimum: 1 })),
@@ -496,6 +506,7 @@ export const ProtocolSchemas: Record<string, TSchema> = {
NodePairApproveParams: NodePairApproveParamsSchema,
NodePairRejectParams: NodePairRejectParamsSchema,
NodePairVerifyParams: NodePairVerifyParamsSchema,
NodeInvokeParams: NodeInvokeParamsSchema,
SessionsListParams: SessionsListParamsSchema,
SessionsPatchParams: SessionsPatchParamsSchema,
CronJob: CronJobSchema,
@@ -533,6 +544,7 @@ export type NodePairListParams = Static<typeof NodePairListParamsSchema>;
export type NodePairApproveParams = Static<typeof NodePairApproveParamsSchema>;
export type NodePairRejectParams = Static<typeof NodePairRejectParamsSchema>;
export type NodePairVerifyParams = Static<typeof NodePairVerifyParamsSchema>;
export type NodeInvokeParams = Static<typeof NodeInvokeParamsSchema>;
export type SessionsListParams = Static<typeof SessionsListParamsSchema>;
export type SessionsPatchParams = Static<typeof SessionsPatchParamsSchema>;
export type CronJob = Static<typeof CronJobSchema>;

View File

@@ -27,10 +27,24 @@ type BridgeStartOpts = {
};
const bridgeStartCalls = vi.hoisted(() => [] as BridgeStartOpts[]);
const bridgeInvoke = vi.hoisted(() =>
vi.fn(async () => ({
type: "invoke-res",
id: "1",
ok: true,
payloadJSON: JSON.stringify({ ok: true }),
error: null,
})),
);
vi.mock("../infra/bridge/server.js", () => ({
startNodeBridgeServer: vi.fn(async (opts: BridgeStartOpts) => {
bridgeStartCalls.push(opts);
return { port: 0, close: async () => {} };
return {
port: 18790,
close: async () => {},
listConnected: () => [],
invoke: bridgeInvoke,
};
}),
}));
@@ -362,6 +376,54 @@ describe("gateway server", () => {
}
});
test("routes node.invoke to the node bridge", async () => {
const homeDir = await fs.mkdtemp(path.join(os.tmpdir(), "clawdis-home-"));
const prevHome = process.env.HOME;
process.env.HOME = homeDir;
try {
bridgeInvoke.mockResolvedValueOnce({
type: "invoke-res",
id: "inv-1",
ok: true,
payloadJSON: JSON.stringify({ result: "4" }),
error: null,
});
const { server, ws } = await startServerWithClient();
try {
await connectOk(ws);
const res = await rpcReq(ws, "node.invoke", {
nodeId: "ios-node",
command: "screen.eval",
params: { javaScript: "2+2" },
timeoutMs: 123,
});
expect(res.ok).toBe(true);
expect(bridgeInvoke).toHaveBeenCalledWith(
expect.objectContaining({
nodeId: "ios-node",
command: "screen.eval",
paramsJSON: JSON.stringify({ javaScript: "2+2" }),
timeoutMs: 123,
}),
);
} finally {
ws.close();
await server.close();
}
} finally {
await fs.rm(homeDir, { recursive: true, force: true });
if (prevHome === undefined) {
delete process.env.HOME;
} else {
process.env.HOME = prevHome;
}
}
});
test("emits presence updates for bridge connect/disconnect", async () => {
const homeDir = await fs.mkdtemp(path.join(os.tmpdir(), "clawdis-home-"));
const prevHome = process.env.HOME;

View File

@@ -100,6 +100,7 @@ import {
validateCronRunsParams,
validateCronStatusParams,
validateCronUpdateParams,
validateNodeInvokeParams,
validateNodePairApproveParams,
validateNodePairListParams,
validateNodePairRejectParams,
@@ -176,6 +177,7 @@ const METHODS = [
"node.pair.approve",
"node.pair.reject",
"node.pair.verify",
"node.invoke",
"cron.list",
"cron.status",
"cron.add",
@@ -2046,6 +2048,100 @@ export async function startGatewayServer(
}
break;
}
case "node.invoke": {
const params = (req.params ?? {}) as Record<string, unknown>;
if (!validateNodeInvokeParams(params)) {
respond(
false,
undefined,
errorShape(
ErrorCodes.INVALID_REQUEST,
`invalid node.invoke params: ${formatValidationErrors(validateNodeInvokeParams.errors)}`,
),
);
break;
}
if (!bridge) {
respond(
false,
undefined,
errorShape(ErrorCodes.UNAVAILABLE, "bridge not running"),
);
break;
}
const p = params as {
nodeId: string;
command: string;
params?: unknown;
timeoutMs?: number;
};
const nodeId = String(p.nodeId ?? "").trim();
const command = String(p.command ?? "").trim();
if (!nodeId || !command) {
respond(
false,
undefined,
errorShape(
ErrorCodes.INVALID_REQUEST,
"nodeId and command required",
),
);
break;
}
try {
const paramsJSON =
"params" in p && p.params !== undefined
? JSON.stringify(p.params)
: null;
const res = await bridge.invoke({
nodeId,
command,
paramsJSON,
timeoutMs: p.timeoutMs,
});
if (!res.ok) {
respond(
false,
undefined,
errorShape(
ErrorCodes.UNAVAILABLE,
res.error?.message ?? "node invoke failed",
{ details: { nodeError: res.error ?? null } },
),
);
break;
}
const payload =
typeof res.payloadJSON === "string" && res.payloadJSON.trim()
? (() => {
try {
return JSON.parse(res.payloadJSON) as unknown;
} catch {
return { payloadJSON: res.payloadJSON };
}
})()
: undefined;
respond(
true,
{
ok: true,
nodeId,
command,
payload,
payloadJSON: res.payloadJSON ?? null,
},
undefined,
);
} catch (err) {
respond(
false,
undefined,
errorShape(ErrorCodes.UNAVAILABLE, formatForLog(err)),
);
}
break;
}
case "send": {
const p = (req.params ?? {}) as Record<string, unknown>;
if (!validateSendParams(p)) {

View File

@@ -253,4 +253,87 @@ describe("node bridge server", () => {
await server.close();
});
it("supports invoke roundtrip to a connected node", async () => {
const server = await startNodeBridgeServer({
host: "127.0.0.1",
port: 0,
pairingBaseDir: baseDir,
});
const socket = net.connect({ host: "127.0.0.1", port: server.port });
const readLine = createLineReader(socket);
sendLine(socket, { type: "pair-request", nodeId: "n5", platform: "ios" });
// Approve the pending request from the gateway side.
let reqId: string | undefined;
for (let i = 0; i < 40; i += 1) {
const list = await listNodePairing(baseDir);
const req = list.pending.find((p) => p.nodeId === "n5");
if (req) {
reqId = req.requestId;
break;
}
await new Promise((r) => setTimeout(r, 25));
}
expect(reqId).toBeTruthy();
if (!reqId) throw new Error("expected a pending requestId");
await approveNodePairing(reqId, baseDir);
const pairOk = JSON.parse(await readLine()) as {
type: string;
token?: string;
};
expect(pairOk.type).toBe("pair-ok");
expect(typeof pairOk.token).toBe("string");
if (!pairOk.token) throw new Error("expected pair-ok token");
const token = pairOk.token;
const helloOk = JSON.parse(await readLine()) as { type: string };
expect(helloOk.type).toBe("hello-ok");
const responder = (async () => {
while (true) {
const frame = JSON.parse(await readLine()) as {
type: string;
id?: string;
command?: string;
};
if (frame.type !== "invoke") continue;
sendLine(socket, {
type: "invoke-res",
id: frame.id,
ok: true,
payloadJSON: JSON.stringify({ echo: frame.command }),
});
break;
}
})();
const res = await server.invoke({
nodeId: "n5",
command: "screen.eval",
paramsJSON: JSON.stringify({ javaScript: "1+1" }),
timeoutMs: 3000,
});
expect(res.ok).toBe(true);
const payload = JSON.parse(String(res.payloadJSON ?? "null")) as {
echo?: string;
};
expect(payload.echo).toBe("screen.eval");
await responder;
socket.destroy();
// Ensure invoke works only for connected nodes (hello with token on a new socket).
const socket2 = net.connect({ host: "127.0.0.1", port: server.port });
const readLine2 = createLineReader(socket2);
sendLine(socket2, { type: "hello", nodeId: "n5", token });
const hello2 = JSON.parse(await readLine2()) as { type: string };
expect(hello2.type).toBe("hello-ok");
socket2.destroy();
await server.close();
});
});

View File

@@ -1,3 +1,4 @@
import { randomUUID } from "node:crypto";
import net from "node:net";
import os from "node:os";
@@ -36,6 +37,13 @@ type BridgeEventFrame = {
type BridgePingFrame = { type: "ping"; id: string };
type BridgePongFrame = { type: "pong"; id: string };
type BridgeInvokeRequestFrame = {
type: "invoke";
id: string;
command: string;
paramsJSON?: string | null;
};
type BridgeInvokeResponseFrame = {
type: "invoke-res";
id: string;
@@ -54,6 +62,7 @@ type AnyBridgeFrame =
| BridgeEventFrame
| BridgePingFrame
| BridgePongFrame
| BridgeInvokeRequestFrame
| BridgeInvokeResponseFrame
| BridgeHelloOkFrame
| BridgePairOkFrame
@@ -63,6 +72,13 @@ type AnyBridgeFrame =
export type NodeBridgeServer = {
port: number;
close: () => Promise<void>;
invoke: (opts: {
nodeId: string;
command: string;
paramsJSON?: string | null;
timeoutMs?: number;
}) => Promise<BridgeInvokeResponseFrame>;
listConnected: () => NodeBridgeClientInfo[];
};
export type NodeBridgeClientInfo = {
@@ -105,6 +121,10 @@ export async function startNodeBridgeServer(
return {
port: 0,
close: async () => {},
invoke: async () => {
throw new Error("bridge disabled in tests");
},
listConnected: () => [],
};
}
@@ -113,7 +133,20 @@ export async function startNodeBridgeServer(
? opts.serverName.trim()
: os.hostname();
const connections = new Map<string, net.Socket>();
type ConnectionState = {
socket: net.Socket;
nodeInfo: NodeBridgeClientInfo;
invokeWaiters: Map<
string,
{
resolve: (value: BridgeInvokeResponseFrame) => void;
reject: (err: Error) => void;
timer: ReturnType<typeof setTimeout>;
}
>;
};
const connections = new Map<string, ConnectionState>();
const server = net.createServer((socket) => {
socket.setNoDelay(true);
@@ -127,17 +160,22 @@ export async function startNodeBridgeServer(
{
resolve: (value: BridgeInvokeResponseFrame) => void;
reject: (err: Error) => void;
timer: ReturnType<typeof setTimeout>;
}
>();
const abort = new AbortController();
const stop = () => {
if (!abort.signal.aborted) abort.abort();
if (nodeId) connections.delete(nodeId);
for (const [, waiter] of invokeWaiters) {
clearTimeout(waiter.timer);
waiter.reject(new Error("bridge connection closed"));
}
invokeWaiters.clear();
if (nodeId) {
const existing = connections.get(nodeId);
if (existing?.socket === socket) connections.delete(nodeId);
}
};
const send = (frame: AnyBridgeFrame) => {
@@ -182,7 +220,14 @@ export async function startNodeBridgeServer(
}
isAuthenticated = true;
connections.set(nodeId, socket);
const existing = connections.get(nodeId);
if (existing?.socket && existing.socket !== socket) {
try {
existing.socket.destroy();
} catch {
/* ignore */
}
}
nodeInfo = {
nodeId,
displayName: verified.node.displayName ?? hello.displayName,
@@ -190,6 +235,7 @@ export async function startNodeBridgeServer(
version: verified.node.version ?? hello.version,
remoteIp: remoteAddress,
};
connections.set(nodeId, { socket, nodeInfo, invokeWaiters });
send({ type: "hello-ok", serverName } satisfies BridgeHelloOkFrame);
await opts.onAuthenticated?.(nodeInfo);
};
@@ -258,7 +304,14 @@ export async function startNodeBridgeServer(
}
isAuthenticated = true;
connections.set(nodeId, socket);
const existing = connections.get(nodeId);
if (existing?.socket && existing.socket !== socket) {
try {
existing.socket.destroy();
} catch {
/* ignore */
}
}
nodeInfo = {
nodeId,
displayName: req.displayName,
@@ -266,6 +319,7 @@ export async function startNodeBridgeServer(
version: req.version,
remoteIp: remoteAddress,
};
connections.set(nodeId, { socket, nodeInfo, invokeWaiters });
send({ type: "pair-ok", token: wait.token } satisfies BridgePairOkFrame);
send({ type: "hello-ok", serverName } satisfies BridgeHelloOkFrame);
await opts.onAuthenticated?.(nodeInfo);
@@ -331,10 +385,16 @@ export async function startNodeBridgeServer(
const waiter = invokeWaiters.get(res.id);
if (waiter) {
invokeWaiters.delete(res.id);
clearTimeout(waiter.timer);
waiter.resolve(res);
}
break;
}
case "invoke": {
// Direction is gateway -> node only.
sendError("INVALID_REQUEST", "invoke not allowed from node");
break;
}
case "pong":
// ignore
break;
@@ -372,7 +432,7 @@ export async function startNodeBridgeServer(
close: async () => {
for (const sock of connections.values()) {
try {
sock.destroy();
sock.socket.destroy();
} catch {
/* ignore */
}
@@ -382,5 +442,52 @@ export async function startNodeBridgeServer(
server.close((err) => (err ? reject(err) : resolve())),
);
},
listConnected: () => [...connections.values()].map((c) => c.nodeInfo),
invoke: async ({ nodeId, command, paramsJSON, timeoutMs }) => {
const normalizedNodeId = String(nodeId ?? "").trim();
const normalizedCommand = String(command ?? "").trim();
if (!normalizedNodeId) {
throw new Error("INVALID_REQUEST: nodeId required");
}
if (!normalizedCommand) {
throw new Error("INVALID_REQUEST: command required");
}
const conn = connections.get(normalizedNodeId);
if (!conn) {
throw new Error(
`UNAVAILABLE: node not connected (${normalizedNodeId})`,
);
}
const id = randomUUID();
const timeout = Number.isFinite(timeoutMs) ? Number(timeoutMs) : 15_000;
return await new Promise<BridgeInvokeResponseFrame>((resolve, reject) => {
const timer = setTimeout(
() => {
conn.invokeWaiters.delete(id);
reject(new Error("UNAVAILABLE: invoke timeout"));
},
Math.max(0, timeout),
);
conn.invokeWaiters.set(id, { resolve, reject, timer });
try {
conn.socket.write(
encodeLine({
type: "invoke",
id,
command: normalizedCommand,
paramsJSON: paramsJSON ?? null,
} satisfies BridgeInvokeRequestFrame),
);
} catch (err) {
conn.invokeWaiters.delete(id);
clearTimeout(timer);
reject(err instanceof Error ? err : new Error(String(err)));
}
});
},
};
}