diff --git a/apps/macos/Sources/Clawdis/Bridge/BridgeServer.swift b/apps/macos/Sources/Clawdis/Bridge/BridgeServer.swift index 63706136c..e3c536fd2 100644 --- a/apps/macos/Sources/Clawdis/Bridge/BridgeServer.swift +++ b/apps/macos/Sources/Clawdis/Bridge/BridgeServer.swift @@ -28,13 +28,6 @@ actor BridgeServer { params.includePeerToPeer = true let listener = try NWListener(using: params, on: .any) - let name = Host.current().localizedName ?? ProcessInfo.processInfo.hostName - listener.service = NWListener.Service( - name: "\(name) (Clawdis)", - type: ClawdisBonjour.bridgeServiceType, - domain: ClawdisBonjour.bridgeServiceDomain, - txtRecord: nil) - listener.newConnectionHandler = { [weak self] connection in guard let self else { return } Task { await self.handle(connection: connection) } diff --git a/apps/macos/Sources/Clawdis/GeneralSettings.swift b/apps/macos/Sources/Clawdis/GeneralSettings.swift index e6ff0ce29..24e582be2 100644 --- a/apps/macos/Sources/Clawdis/GeneralSettings.swift +++ b/apps/macos/Sources/Clawdis/GeneralSettings.swift @@ -5,6 +5,7 @@ struct GeneralSettings: View { @ObservedObject var state: AppState @ObservedObject private var healthStore = HealthStore.shared @ObservedObject private var gatewayManager = GatewayProcessManager.shared + @StateObject private var masterDiscovery = MasterDiscoveryModel() @State private var isInstallingCLI = false @State private var cliStatus: String? @State private var cliInstalled = false @@ -124,6 +125,18 @@ struct GeneralSettings: View { TextField("user@host[:22]", text: self.$state.remoteTarget) .textFieldStyle(.roundedBorder) .frame(maxWidth: .infinity) + Menu { + if self.masterDiscovery.masters.isEmpty { + Button(self.masterDiscovery.statusText) {}.disabled(true) + } else { + ForEach(self.masterDiscovery.masters) { master in + Button(master.displayName) { self.applyDiscoveredMaster(master) } + } + } + } label: { + Image(systemName: "dot.radiowaves.left.and.right") + } + .help("Discover Clawdis masters on your LAN") Button { Task { await self.testRemote() } } label: { @@ -188,6 +201,8 @@ struct GeneralSettings: View { .lineLimit(1) } .transition(.opacity) + .onAppear { self.masterDiscovery.start() } + .onDisappear { self.masterDiscovery.stop() } } private var controlStatusLine: String { @@ -562,6 +577,17 @@ extension GeneralSettings { alert.addButton(withTitle: "OK") alert.runModal() } + + private func applyDiscoveredMaster(_ master: MasterDiscoveryModel.DiscoveredMaster) { + let host = master.tailnetDns ?? master.lanHost + guard let host else { return } + let user = NSUserName() + var target = "\(user)@\(host)" + if master.sshPort != 22 { + target += ":\(master.sshPort)" + } + self.state.remoteTarget = target + } } private func healthAgeString(_ ms: Double?) -> String { diff --git a/apps/macos/Sources/Clawdis/MasterDiscoveryModel.swift b/apps/macos/Sources/Clawdis/MasterDiscoveryModel.swift new file mode 100644 index 000000000..4960fe7e2 --- /dev/null +++ b/apps/macos/Sources/Clawdis/MasterDiscoveryModel.swift @@ -0,0 +1,103 @@ +import Foundation +import Network + +@MainActor +final class MasterDiscoveryModel: ObservableObject { + struct DiscoveredMaster: Identifiable, Equatable { + var id: String { self.debugID } + var displayName: String + var lanHost: String? + var tailnetDns: String? + var sshPort: Int + var debugID: String + } + + @Published var masters: [DiscoveredMaster] = [] + @Published var statusText: String = "Idle" + + private var browser: NWBrowser? + + private static let serviceType = "_clawdis-master._tcp" + private static let serviceDomain = "local." + + func start() { + if self.browser != nil { return } + + let params = NWParameters.tcp + params.includePeerToPeer = true + + let browser = NWBrowser(for: .bonjour(type: Self.serviceType, domain: Self.serviceDomain), using: params) + + browser.stateUpdateHandler = { [weak self] state in + Task { @MainActor in + guard let self else { return } + switch state { + case .setup: + self.statusText = "Setup" + case .ready: + self.statusText = "Searching…" + case let .failed(err): + self.statusText = "Failed: \(err)" + case .cancelled: + self.statusText = "Stopped" + case let .waiting(err): + self.statusText = "Waiting: \(err)" + @unknown default: + self.statusText = "Unknown" + } + } + } + + browser.browseResultsChangedHandler = { [weak self] results, _ in + Task { @MainActor in + guard let self else { return } + self.masters = results.compactMap { result -> DiscoveredMaster? in + guard case let .service(name, _, _, _) = result.endpoint else { return nil } + + var lanHost: String? + var tailnetDns: String? + var sshPort = 22 + if case let .bonjour(txt) = result.metadata { + let dict = txt.dictionary + if let value = dict["lanHost"] { + let trimmed = value.trimmingCharacters(in: .whitespacesAndNewlines) + lanHost = trimmed.isEmpty ? nil : trimmed + } + if let value = dict["tailnetDns"] { + let trimmed = value.trimmingCharacters(in: .whitespacesAndNewlines) + tailnetDns = trimmed.isEmpty ? nil : trimmed + } + if let value = dict["sshPort"], + let parsed = Int(value.trimmingCharacters(in: .whitespacesAndNewlines)), + parsed > 0 + { + sshPort = parsed + } + } + + return DiscoveredMaster( + displayName: name, + lanHost: lanHost, + tailnetDns: tailnetDns, + sshPort: sshPort, + debugID: Self.prettyEndpointDebugID(result.endpoint)) + } + .sorted { $0.displayName.localizedCaseInsensitiveCompare($1.displayName) == .orderedAscending } + } + } + + self.browser = browser + browser.start(queue: DispatchQueue(label: "com.steipete.clawdis.macos.master-discovery")) + } + + func stop() { + self.browser?.cancel() + self.browser = nil + self.masters = [] + self.statusText = "Stopped" + } + + private static func prettyEndpointDebugID(_ endpoint: NWEndpoint) -> String { + String(describing: endpoint) + } +} diff --git a/apps/macos/Sources/Clawdis/MenuBar.swift b/apps/macos/Sources/Clawdis/MenuBar.swift index a55c70ec7..2535499fd 100644 --- a/apps/macos/Sources/Clawdis/MenuBar.swift +++ b/apps/macos/Sources/Clawdis/MenuBar.swift @@ -182,7 +182,6 @@ final class AppDelegate: NSObject, NSApplicationDelegate { Task { await HealthStore.shared.refresh(onDemand: true) } Task { await PortGuardian.shared.sweep(mode: AppStateStore.shared.connectionMode) } Task { await self.socketServer.start() } - Task { await BridgeServer.shared.start() } self.scheduleFirstRunOnboardingIfNeeded() // Developer/testing helper: auto-open WebChat when launched with --webchat @@ -201,7 +200,6 @@ final class AppDelegate: NSObject, NSApplicationDelegate { Task { await AgentRPC.shared.shutdown() } Task { await GatewayConnection.shared.shutdown() } Task { await self.socketServer.stop() } - Task { await BridgeServer.shared.stop() } } @MainActor diff --git a/docs/discovery.md b/docs/discovery.md new file mode 100644 index 000000000..a9e33a247 --- /dev/null +++ b/docs/discovery.md @@ -0,0 +1,106 @@ +--- +summary: "Node discovery and transports (Bonjour, Tailscale, SSH) for finding the master gateway" +read_when: + - Implementing or changing Bonjour discovery/advertising + - Adjusting remote connection modes (direct vs SSH) + - Designing bridge + pairing for remote nodes +--- +# Discovery & transports + +Clawdis has two distinct problems that look similar on the surface: + +1) **Operator remote control**: the macOS menu bar app controlling a “master” gateway running elsewhere. +2) **Node pairing**: Iris/iOS (and future nodes) finding a gateway and pairing securely. + +The design goal is to keep all network discovery/advertising in the **Node Gateway** (`clawd` / `clawdis gateway`) and keep clients (mac app, iOS) as consumers. + +## Terms + +- **Master gateway**: the single, long-running gateway process that owns state (sessions, pairing, node registry) and runs providers. +- **Gateway WS (loopback)**: the existing gateway WebSocket control endpoint on `127.0.0.1:18789`. +- **Bridge (direct transport)**: a LAN/tailnet-facing endpoint owned by the gateway that allows authenticated clients/nodes to call a scoped subset of gateway methods. The bridge exists so the gateway can remain loopback-only. +- **SSH transport (fallback)**: remote control by forwarding `127.0.0.1:18789` over SSH. + +## Why we keep both “direct” and SSH + +- **Direct bridge** is the best UX on the same network and within a tailnet: + - auto-discovery on LAN via Bonjour + - pairing tokens + ACLs owned by the gateway + - no shell access required; protocol surface can stay tight and auditable +- **SSH** remains the universal fallback: + - works anywhere you have SSH access (even across unrelated networks) + - survives multicast/mDNS issues + - requires no new inbound ports besides SSH + +## Discovery inputs (how clients learn where the master is) + +### 1) Bonjour / mDNS (LAN only) + +Bonjour is best-effort and does not cross networks. It is only used for “same LAN” convenience. + +Target direction: +- The **gateway** advertises itself (and/or its bridge) via Bonjour. +- Clients browse and show a “pick a master” list, then store the chosen endpoint. + +#### Current implementation + +- Service types: + - `_clawdis-master._tcp` (gateway “master” beacon) + - `_clawdis-bridge._tcp` (optional; bridge transport beacon) +- TXT keys (non-secret): + - `role=master` + - `lanHost=.local` + - `sshPort=22` (or whatever is advertised) + - `gatewayPort=18789` (loopback WS port; informational) + - `bridgePort=18790` (when bridge is enabled) + - `tailnetDns=` (optional hint) + +Disable/override: +- `CLAWDIS_DISABLE_BONJOUR=1` disables advertising. +- `CLAWDIS_BRIDGE_ENABLED=0` disables the bridge listener. +- `CLAWDIS_BRIDGE_HOST` / `CLAWDIS_BRIDGE_PORT` control bind/port. + +### 2) Tailnet (cross-network) + +For London/Vienna style setups, Bonjour won’t help. The recommended “direct” target is: +- Tailscale MagicDNS name (preferred) or a stable tailnet IP. + +If the gateway can detect it is running under Tailscale, it can publish `tailnetDns` as an optional hint for clients. + +### 3) Manual / SSH target + +When there is no direct route (or direct is disabled), clients can always connect via SSH by forwarding the loopback gateway port. + +See `docs/remote.md`. + +## Transport selection (client policy) + +Recommended client behavior: + +1) If a paired direct endpoint is configured and reachable, use it. +2) Else, if Bonjour finds a master on LAN, offer a one-tap “Use this master” choice and save it as the direct endpoint. +3) Else, if a tailnet DNS/IP is configured, try direct. +4) Else, fall back to SSH. + +## Pairing + auth (direct transport) + +The gateway is the source of truth for node/client admission. + +- Pairing requests are created/approved/rejected in the gateway (see `docs/gateway/pairing.md`). +- The bridge enforces: + - auth (token / keypair) + - scopes/ACLs (bridge is not a raw proxy to every gateway method) + - rate limits + +## Where the code lives (target architecture) + +- Node gateway: + - advertises discovery beacons (Bonjour) + - owns pairing storage + decisions + - runs the bridge listener (direct transport) +- macOS app: + - UI for picking a master, showing pairing prompts, and troubleshooting + - SSH tunneling only for the fallback path +- iOS node: + - browses Bonjour (LAN) as a convenience only + - uses direct transport + pairing to connect to the gateway diff --git a/docs/gateway/pairing.md b/docs/gateway/pairing.md index 43662c523..578844b27 100644 --- a/docs/gateway/pairing.md +++ b/docs/gateway/pairing.md @@ -17,7 +17,7 @@ This enables: ## Concepts - **Pending request**: a node asked to join; requires explicit approve/reject. - **Paired node**: node is allowed; gateway returns an auth token for subsequent connects. -- **Bridge**: LAN transport that forwards between node ↔ gateway. The bridge does not decide membership. +- **Bridge**: direct transport endpoint owned by the gateway. The bridge does not decide membership. ## API surface (gateway protocol) These are conceptual method names; wire them into `src/gateway/protocol/schema.ts` and regenerate Swift types. @@ -46,20 +46,24 @@ These are conceptual method names; wire them into `src/gateway/protocol/schema.t - Creates (or returns) a pending request. - Params: node metadata (same shape as `node.pair.requested` payload, minus `requestId`/`ts`). - Result: - - `requestId` - - `status` ("pending" | "alreadyPaired") - - If already paired: may include `token` directly to allow fast path. + - `status` ("pending") + - `created` (boolean) — whether this call created the pending request + - `request` (pending request object), including `isRepair` when the node was already paired + - Security: **never returns an existing token**. If a paired node “lost” its token, it must be approved again (token rotation). - `node.pair.list` - Returns: - `pending[]` (pending requests) - `paired[]` (paired node records) - `node.pair.approve` - Params: `{ requestId }` - - Result: `{ nodeId, token }` + - Result: `{ requestId, node: { nodeId, token, ... } }` - Must be idempotent (first decision wins). - `node.pair.reject` - Params: `{ requestId }` - - Result: `{ nodeId }` + - Result: `{ requestId, nodeId }` +- `node.pair.verify` + - Params: `{ nodeId, token }` + - Result: `{ ok: boolean, node?: { nodeId, ... } }` ## CLI flows CLI must be able to fully operate without any GUI: @@ -80,10 +84,9 @@ Notes: - Pending entries should have a TTL (e.g. 5 minutes) and expire automatically. ## Bridge integration -The macOS Bridge is responsible for: -- Surfacing the pairing request to the gateway (`node.pair.request`). -- Waiting for the decision (`node.pair.approve`/`reject`) and completing the on-wire pairing handshake to the node. -- Enforcing ACLs on what the node can call, even after paired. +Target direction: +- The gateway runs the bridge listener (LAN/tailnet-facing) and advertises discovery beacons (Bonjour). +- The bridge is transport only; it forwards/scopes requests and enforces ACLs, but pairing decisions are made by the gateway. The macOS UI (Swift) can: - Subscribe to `node.pair.requested`, show an alert, and call `node.pair.approve` or `node.pair.reject`. @@ -91,5 +94,4 @@ The macOS UI (Swift) can: ## Implementation note If the bridge is only provided by the macOS app, then “no Swift app running” cannot work end-to-end. -To support headless pairing, also add a `clawdis bridge` CLI mode that provides the Bonjour bridge service and forwards to the local gateway. - +The long-term goal is to move bridge hosting + Bonjour advertising into the Node gateway so headless pairing works by default. diff --git a/docs/ios/spec.md b/docs/ios/spec.md index e2b38210b..bfd2a2743 100644 --- a/docs/ios/spec.md +++ b/docs/ios/spec.md @@ -31,10 +31,10 @@ Non-goals (v1): - macOS “Canvas” exists today, but is **mac-only** and controlled via mac app IPC (`clawdis-mac canvas ...`) rather than the Gateway protocol (`docs/mac/canvas.md`). - Voice wake forwards via `GatewayChannel` to Gateway `agent` (mac app: `VoiceWakeForwarder` → `AgentRPC`). -## Recommended topology (B): macOS Bridge + loopback Gateway -Keep the Node gateway loopback-only; expose a dedicated **macOS bridge** to the LAN. +## Recommended topology (B): Gateway-owned Bridge + loopback Gateway +Keep the Node gateway loopback-only; expose a dedicated **gateway-owned bridge** to the LAN/tailnet. -**iOS App** ⇄ (TLS + pairing) ⇄ **macOS Bridge** ⇄ (loopback) ⇄ **Gateway WS** (`ws://127.0.0.1:18789`) +**iOS App** ⇄ (TLS + pairing) ⇄ **Bridge (in gateway)** ⇄ (loopback) ⇄ **Gateway WS** (`ws://127.0.0.1:18789`) Why: - Preserves current threat model: Gateway remains local-only. @@ -71,6 +71,11 @@ Desired behavior: See `docs/gateway/pairing.md` for the API/events and storage. +CLI (headless approvals): +- `clawdis nodes pending` +- `clawdis nodes approve ` +- `clawdis nodes reject ` + ### Authorization / scope control (bridge-side ACL) The bridge must not be a raw proxy to every gateway method. @@ -183,8 +188,8 @@ open ClawdisNode.xcodeproj - Keep current Canvas root (already implemented): - `~/Library/Application Support/Clawdis/canvas//...` - Bridge state: - - `~/Library/Application Support/Clawdis/bridge/paired-nodes.json` - - `~/Library/Application Support/Clawdis/bridge/keys/...` + - No local pairing store (pairing is gateway-owned). + - Any local bridge-only state should remain private under Application Support. ### Gateway (node) - Pairing (source of truth): diff --git a/docs/remote.md b/docs/remote.md index b67dcfe8d..c61541613 100644 --- a/docs/remote.md +++ b/docs/remote.md @@ -7,6 +7,9 @@ read_when: This repo supports “remote over SSH” by keeping a single gateway (the master) running on a host (e.g., your Mac Studio) and connecting one or more macOS menu bar clients to it. The menu app no longer shells out to `pnpm clawdis …`; it talks to the gateway over a persistent control channel that is tunneled through SSH. +Remote mode is the SSH fallback transport. As Clawdis adds a direct “bridge” transport for LAN/tailnet setups, SSH remains supported for universal reach. +See `docs/discovery.md` for how clients choose between direct vs SSH. + ## Topology - Master: runs the gateway + control server on `127.0.0.1:18789` (in-process TCP server). - Clients: when “Remote over SSH” is selected, the app opens one SSH tunnel: diff --git a/package.json b/package.json index 794e67976..74709d8e7 100644 --- a/package.json +++ b/package.json @@ -35,6 +35,7 @@ "packageManager": "pnpm@10.23.0", "dependencies": { "@grammyjs/transformer-throttler": "^1.2.1", + "@homebridge/ciao": "^1.3.4", "@mariozechner/pi-ai": "^0.18.0", "@mariozechner/pi-coding-agent": "^0.18.0", "@sinclair/typebox": "^0.34.41", diff --git a/src/cli/nodes-cli.ts b/src/cli/nodes-cli.ts new file mode 100644 index 000000000..e3562fcd5 --- /dev/null +++ b/src/cli/nodes-cli.ts @@ -0,0 +1,210 @@ +import type { Command } from "commander"; +import { callGateway } from "../gateway/call.js"; +import { defaultRuntime } from "../runtime.js"; + +type NodesRpcOpts = { + url?: string; + token?: string; + timeout?: string; + json?: boolean; +}; + +type PendingRequest = { + requestId: string; + nodeId: string; + displayName?: string; + platform?: string; + version?: string; + remoteIp?: string; + isRepair?: boolean; + ts: number; +}; + +type PairedNode = { + nodeId: string; + token?: string; + displayName?: string; + platform?: string; + version?: string; + remoteIp?: string; + createdAtMs?: number; + approvedAtMs?: number; +}; + +type PairingList = { + pending: PendingRequest[]; + paired: PairedNode[]; +}; + +const nodesCallOpts = (cmd: Command) => + cmd + .option("--url ", "Gateway WebSocket URL", "ws://127.0.0.1:18789") + .option("--token ", "Gateway token (if required)") + .option("--timeout ", "Timeout in ms", "10000") + .option("--json", "Output JSON", false); + +const callGatewayCli = async ( + method: string, + opts: NodesRpcOpts, + params?: unknown, +) => + callGateway({ + url: opts.url, + token: opts.token, + method, + params, + timeoutMs: Number(opts.timeout ?? 10_000), + clientName: "cli", + mode: "cli", + }); + +function formatAge(msAgo: number) { + const s = Math.max(0, Math.floor(msAgo / 1000)); + if (s < 60) return `${s}s`; + const m = Math.floor(s / 60); + if (m < 60) return `${m}m`; + const h = Math.floor(m / 60); + if (h < 24) return `${h}h`; + const d = Math.floor(h / 24); + return `${d}d`; +} + +function parsePairingList(value: unknown): PairingList { + const obj = + typeof value === "object" && value !== null + ? (value as Record) + : {}; + const pending = Array.isArray(obj.pending) + ? (obj.pending as PendingRequest[]) + : []; + const paired = Array.isArray(obj.paired) ? (obj.paired as PairedNode[]) : []; + return { pending, paired }; +} + +export function registerNodesCli(program: Command) { + const nodes = program + .command("nodes") + .description("Manage gateway-owned node pairing"); + + nodesCallOpts( + nodes + .command("list") + .description("List pending and paired nodes") + .action(async (opts: NodesRpcOpts) => { + try { + const result = (await callGatewayCli( + "node.pair.list", + opts, + {}, + )) as unknown; + if (opts.json) { + defaultRuntime.log(JSON.stringify(result, null, 2)); + return; + } + const { pending, paired } = parsePairingList(result); + defaultRuntime.log( + `Pending: ${pending.length} · Paired: ${paired.length}`, + ); + if (pending.length > 0) { + defaultRuntime.log("\nPending:"); + for (const r of pending) { + const name = r.displayName || r.nodeId; + const repair = r.isRepair ? " (repair)" : ""; + const ip = r.remoteIp ? ` · ${r.remoteIp}` : ""; + const age = + typeof r.ts === "number" + ? ` · ${formatAge(Date.now() - r.ts)} ago` + : ""; + defaultRuntime.log( + `- ${r.requestId}: ${name}${repair}${ip}${age}`, + ); + } + } + if (paired.length > 0) { + defaultRuntime.log("\nPaired:"); + for (const n of paired) { + const name = n.displayName || n.nodeId; + const ip = n.remoteIp ? ` · ${n.remoteIp}` : ""; + defaultRuntime.log(`- ${n.nodeId}: ${name}${ip}`); + } + } + } catch (err) { + defaultRuntime.error(`nodes list failed: ${String(err)}`); + defaultRuntime.exit(1); + } + }), + ); + + nodesCallOpts( + nodes + .command("pending") + .description("List pending pairing requests") + .action(async (opts: NodesRpcOpts) => { + try { + const result = (await callGatewayCli( + "node.pair.list", + opts, + {}, + )) as unknown; + const { pending } = parsePairingList(result); + if (opts.json) { + defaultRuntime.log(JSON.stringify(pending, null, 2)); + return; + } + if (pending.length === 0) { + defaultRuntime.log("No pending pairing requests."); + return; + } + for (const r of pending) { + const name = r.displayName || r.nodeId; + const repair = r.isRepair ? " (repair)" : ""; + const ip = r.remoteIp ? ` · ${r.remoteIp}` : ""; + const age = + typeof r.ts === "number" + ? ` · ${formatAge(Date.now() - r.ts)} ago` + : ""; + defaultRuntime.log(`- ${r.requestId}: ${name}${repair}${ip}${age}`); + } + } catch (err) { + defaultRuntime.error(`nodes pending failed: ${String(err)}`); + defaultRuntime.exit(1); + } + }), + ); + + nodesCallOpts( + nodes + .command("approve") + .description("Approve a pending pairing request") + .argument("", "Pending request id") + .action(async (requestId: string, opts: NodesRpcOpts) => { + try { + const result = await callGatewayCli("node.pair.approve", opts, { + requestId, + }); + defaultRuntime.log(JSON.stringify(result, null, 2)); + } catch (err) { + defaultRuntime.error(`nodes approve failed: ${String(err)}`); + defaultRuntime.exit(1); + } + }), + ); + + nodesCallOpts( + nodes + .command("reject") + .description("Reject a pending pairing request") + .argument("", "Pending request id") + .action(async (requestId: string, opts: NodesRpcOpts) => { + try { + const result = await callGatewayCli("node.pair.reject", opts, { + requestId, + }); + defaultRuntime.log(JSON.stringify(result, null, 2)); + } catch (err) { + defaultRuntime.error(`nodes reject failed: ${String(err)}`); + defaultRuntime.exit(1); + } + }), + ); +} diff --git a/src/cli/program.ts b/src/cli/program.ts index 83e55f3be..299c4a3b2 100644 --- a/src/cli/program.ts +++ b/src/cli/program.ts @@ -13,6 +13,7 @@ import { startWebChatServer } from "../webchat/server.js"; import { registerCronCli } from "./cron-cli.js"; import { createDefaultDeps } from "./deps.js"; import { registerGatewayCli } from "./gateway-cli.js"; +import { registerNodesCli } from "./nodes-cli.js"; import { forceFreePort } from "./ports.js"; export { forceFreePort }; @@ -209,6 +210,7 @@ Examples: }); registerGatewayCli(program); + registerNodesCli(program); registerCronCli(program); program .command("status") diff --git a/src/gateway/protocol/index.ts b/src/gateway/protocol/index.ts index 1746e9d62..edb541278 100644 --- a/src/gateway/protocol/index.ts +++ b/src/gateway/protocol/index.ts @@ -36,6 +36,16 @@ import { GatewayFrameSchema, type HelloOk, HelloOkSchema, + type NodePairApproveParams, + NodePairApproveParamsSchema, + type NodePairListParams, + NodePairListParamsSchema, + type NodePairRejectParams, + NodePairRejectParamsSchema, + type NodePairRequestParams, + NodePairRequestParamsSchema, + type NodePairVerifyParams, + NodePairVerifyParamsSchema, PROTOCOL_VERSION, type PresenceEntry, PresenceEntrySchema, @@ -74,6 +84,21 @@ export const validateRequestFrame = export const validateSendParams = ajv.compile(SendParamsSchema); export const validateAgentParams = ajv.compile(AgentParamsSchema); export const validateWakeParams = ajv.compile(WakeParamsSchema); +export const validateNodePairRequestParams = ajv.compile( + NodePairRequestParamsSchema, +); +export const validateNodePairListParams = ajv.compile( + NodePairListParamsSchema, +); +export const validateNodePairApproveParams = ajv.compile( + NodePairApproveParamsSchema, +); +export const validateNodePairRejectParams = ajv.compile( + NodePairRejectParamsSchema, +); +export const validateNodePairVerifyParams = ajv.compile( + NodePairVerifyParamsSchema, +); export const validateCronListParams = ajv.compile(CronListParamsSchema); export const validateCronStatusParams = ajv.compile( @@ -118,6 +143,11 @@ export { SendParamsSchema, AgentParamsSchema, WakeParamsSchema, + NodePairRequestParamsSchema, + NodePairListParamsSchema, + NodePairApproveParamsSchema, + NodePairRejectParamsSchema, + NodePairVerifyParamsSchema, CronJobSchema, CronListParamsSchema, CronStatusParamsSchema, @@ -152,6 +182,11 @@ export type { TickEvent, ShutdownEvent, WakeParams, + NodePairRequestParams, + NodePairListParams, + NodePairApproveParams, + NodePairRejectParams, + NodePairVerifyParams, CronJob, CronListParams, CronStatusParams, diff --git a/src/gateway/protocol/schema.ts b/src/gateway/protocol/schema.ts index a1ed78e4d..0721426d0 100644 --- a/src/gateway/protocol/schema.ts +++ b/src/gateway/protocol/schema.ts @@ -211,6 +211,37 @@ export const WakeParamsSchema = Type.Object( { additionalProperties: false }, ); +export const NodePairRequestParamsSchema = Type.Object( + { + nodeId: NonEmptyString, + displayName: Type.Optional(NonEmptyString), + platform: Type.Optional(NonEmptyString), + version: Type.Optional(NonEmptyString), + remoteIp: Type.Optional(NonEmptyString), + }, + { additionalProperties: false }, +); + +export const NodePairListParamsSchema = Type.Object( + {}, + { additionalProperties: false }, +); + +export const NodePairApproveParamsSchema = Type.Object( + { requestId: NonEmptyString }, + { additionalProperties: false }, +); + +export const NodePairRejectParamsSchema = Type.Object( + { requestId: NonEmptyString }, + { additionalProperties: false }, +); + +export const NodePairVerifyParamsSchema = Type.Object( + { nodeId: NonEmptyString, token: NonEmptyString }, + { additionalProperties: false }, +); + export const CronScheduleSchema = Type.Union([ Type.Object( { @@ -441,6 +472,11 @@ export const ProtocolSchemas: Record = { SendParams: SendParamsSchema, AgentParams: AgentParamsSchema, WakeParams: WakeParamsSchema, + NodePairRequestParams: NodePairRequestParamsSchema, + NodePairListParams: NodePairListParamsSchema, + NodePairApproveParams: NodePairApproveParamsSchema, + NodePairRejectParams: NodePairRejectParamsSchema, + NodePairVerifyParams: NodePairVerifyParamsSchema, CronJob: CronJobSchema, CronListParams: CronListParamsSchema, CronStatusParams: CronStatusParamsSchema, @@ -471,6 +507,11 @@ export type ErrorShape = Static; export type StateVersion = Static; export type AgentEvent = Static; export type WakeParams = Static; +export type NodePairRequestParams = Static; +export type NodePairListParams = Static; +export type NodePairApproveParams = Static; +export type NodePairRejectParams = Static; +export type NodePairVerifyParams = Static; export type CronJob = Static; export type CronListParams = Static; export type CronStatusParams = Static; diff --git a/src/gateway/server.ts b/src/gateway/server.ts index d8662ec42..84b9aa30d 100644 --- a/src/gateway/server.ts +++ b/src/gateway/server.ts @@ -30,11 +30,20 @@ import { resolveCronStorePath } from "../cron/store.js"; import type { CronJobCreate, CronJobPatch } from "../cron/types.js"; import { isVerbose } from "../globals.js"; import { onAgentEvent } from "../infra/agent-events.js"; +import { startGatewayBonjourAdvertiser } from "../infra/bonjour.js"; +import { startNodeBridgeServer } from "../infra/bridge/server.js"; import { GatewayLockError } from "../infra/gateway-lock.js"; import { getLastHeartbeatEvent, onHeartbeatEvent, } from "../infra/heartbeat-events.js"; +import { + approveNodePairing, + listNodePairing, + rejectNodePairing, + requestNodePairing, + verifyNodeToken, +} from "../infra/node-pairing.js"; import { enqueueSystemEvent } from "../infra/system-events.js"; import { listSystemPresence, @@ -78,6 +87,11 @@ import { validateCronRunsParams, validateCronStatusParams, validateCronUpdateParams, + validateNodePairApproveParams, + validateNodePairListParams, + validateNodePairRejectParams, + validateNodePairRequestParams, + validateNodePairVerifyParams, validateRequestFrame, validateSendParams, validateWakeParams, @@ -96,6 +110,11 @@ const METHODS = [ "last-heartbeat", "set-heartbeats", "wake", + "node.pair.request", + "node.pair.list", + "node.pair.approve", + "node.pair.reject", + "node.pair.verify", "cron.list", "cron.status", "cron.add", @@ -121,6 +140,8 @@ const EVENTS = [ "health", "heartbeat", "cron", + "node.pair.requested", + "node.pair.resolved", ]; export type GatewayServer = { @@ -319,6 +340,8 @@ export async function startGatewayServer( ): Promise { const host = "127.0.0.1"; const httpServer: HttpServer = createHttpServer(); + let bonjourStop: (() => Promise) | null = null; + let bridge: Awaited> | null = null; try { await new Promise((resolve, reject) => { const onError = (err: NodeJS.ErrnoException) => { @@ -362,7 +385,7 @@ export async function startGatewayServer( module: "cron", storePath: cronStorePath, }); - const cronDeps = createDefaultDeps(); + const deps = createDefaultDeps(); const cronEnabled = process.env.CLAWDIS_SKIP_CRON !== "1" && cfgAtStart.cron?.enabled === true; const cron = new CronService({ @@ -374,7 +397,7 @@ export async function startGatewayServer( const cfg = loadConfig(); return await runCronIsolatedAgentTurn({ cfg, - deps: cronDeps, + deps, job, message, sessionKey: `cron:${job.id}`, @@ -496,6 +519,176 @@ export async function startGatewayServer( } }; + const bridgeHost = process.env.CLAWDIS_BRIDGE_HOST ?? "0.0.0.0"; + const bridgePort = + process.env.CLAWDIS_BRIDGE_PORT !== undefined + ? Number.parseInt(process.env.CLAWDIS_BRIDGE_PORT, 10) + : 18790; + const bridgeEnabled = process.env.CLAWDIS_BRIDGE_ENABLED !== "0"; + + const handleBridgeEvent = async ( + nodeId: string, + evt: { event: string; payloadJSON?: string | null }, + ) => { + switch (evt.event) { + case "voice.transcript": { + if (!evt.payloadJSON) return; + let payload: unknown; + try { + payload = JSON.parse(evt.payloadJSON) as unknown; + } catch { + return; + } + const obj = + typeof payload === "object" && payload !== null + ? (payload as Record) + : {}; + const text = typeof obj.text === "string" ? obj.text.trim() : ""; + if (!text) return; + if (text.length > 20_000) return; + const sessionKeyRaw = + typeof obj.sessionKey === "string" ? obj.sessionKey.trim() : ""; + const sessionKey = + sessionKeyRaw.length > 0 ? sessionKeyRaw : `node-${nodeId}`; + const { storePath, store, entry } = loadSessionEntry(sessionKey); + const now = Date.now(); + const sessionId = entry?.sessionId ?? randomUUID(); + store[sessionKey] = { + sessionId, + updatedAt: now, + thinkingLevel: entry?.thinkingLevel, + verboseLevel: entry?.verboseLevel, + systemSent: entry?.systemSent, + lastChannel: entry?.lastChannel, + lastTo: entry?.lastTo, + }; + if (storePath) { + await saveSessionStore(storePath, store); + } + + void agentCommand( + { + message: text, + sessionId, + thinking: "low", + deliver: false, + surface: "Iris", + }, + defaultRuntime, + deps, + ).catch((err) => { + logWarn(`bridge: agent failed node=${nodeId}: ${formatForLog(err)}`); + }); + return; + } + case "agent.request": { + if (!evt.payloadJSON) return; + type AgentDeepLink = { + message?: string; + sessionKey?: string | null; + thinking?: string | null; + deliver?: boolean; + to?: string | null; + channel?: string | null; + timeoutSeconds?: number | null; + key?: string | null; + }; + let link: AgentDeepLink | null = null; + try { + link = JSON.parse(evt.payloadJSON) as AgentDeepLink; + } catch { + return; + } + const message = (link?.message ?? "").trim(); + if (!message) return; + if (message.length > 20_000) return; + + const channelRaw = + typeof link?.channel === "string" ? link.channel.trim() : ""; + const channel = channelRaw.toLowerCase(); + const provider = + channel === "whatsapp" || channel === "telegram" + ? channel + : undefined; + const to = + typeof link?.to === "string" && link.to.trim() + ? link.to.trim() + : undefined; + const deliver = Boolean(link?.deliver) && Boolean(provider); + + const sessionKeyRaw = (link?.sessionKey ?? "").trim(); + const sessionKey = + sessionKeyRaw.length > 0 ? sessionKeyRaw : `node-${nodeId}`; + const { storePath, store, entry } = loadSessionEntry(sessionKey); + const now = Date.now(); + const sessionId = entry?.sessionId ?? randomUUID(); + store[sessionKey] = { + sessionId, + updatedAt: now, + thinkingLevel: entry?.thinkingLevel, + verboseLevel: entry?.verboseLevel, + systemSent: entry?.systemSent, + lastChannel: entry?.lastChannel, + lastTo: entry?.lastTo, + }; + if (storePath) { + await saveSessionStore(storePath, store); + } + + void agentCommand( + { + message, + sessionId, + thinking: link?.thinking ?? undefined, + deliver, + to, + provider, + timeout: + typeof link?.timeoutSeconds === "number" + ? link.timeoutSeconds.toString() + : undefined, + surface: "Iris", + }, + defaultRuntime, + deps, + ).catch((err) => { + logWarn(`bridge: agent failed node=${nodeId}: ${formatForLog(err)}`); + }); + return; + } + default: + return; + } + }; + + if (bridgeEnabled && bridgePort > 0) { + try { + const started = await startNodeBridgeServer({ + host: bridgeHost, + port: bridgePort, + onEvent: handleBridgeEvent, + }); + if (started.port > 0) { + bridge = started; + defaultRuntime.log( + `bridge listening on tcp://${bridgeHost}:${bridge.port} (Iris)`, + ); + } + } catch (err) { + logWarn(`gateway: bridge failed to start: ${String(err)}`); + } + } + + try { + const bonjour = await startGatewayBonjourAdvertiser({ + gatewayPort: port, + bridgePort: bridge?.port, + }); + bonjourStop = bonjour.stop; + } catch (err) { + logWarn(`gateway: bonjour advertising failed: ${String(err)}`); + } + broadcastHealthUpdate = (snap: HealthSummary) => { broadcast("health", snap, { stateVersion: { presence: presenceVersion, health: healthVersion }, @@ -606,7 +799,6 @@ export async function startGatewayServer( let client: Client | null = null; let closed = false; const connId = randomUUID(); - const deps = createDefaultDeps(); const remoteAddr = ( socket as WebSocket & { _socket?: { remoteAddress?: string } } )._socket?.remoteAddress; @@ -1338,6 +1530,191 @@ export async function startGatewayServer( respond(true, { ok: true }, undefined); break; } + case "node.pair.request": { + const params = (req.params ?? {}) as Record; + if (!validateNodePairRequestParams(params)) { + respond( + false, + undefined, + errorShape( + ErrorCodes.INVALID_REQUEST, + `invalid node.pair.request params: ${formatValidationErrors(validateNodePairRequestParams.errors)}`, + ), + ); + break; + } + const p = params as { + nodeId: string; + displayName?: string; + platform?: string; + version?: string; + remoteIp?: string; + }; + try { + const result = await requestNodePairing({ + nodeId: p.nodeId, + displayName: p.displayName, + platform: p.platform, + version: p.version, + remoteIp: p.remoteIp, + }); + if (result.status === "pending" && result.created) { + broadcast("node.pair.requested", result.request, { + dropIfSlow: true, + }); + } + respond(true, result, undefined); + } catch (err) { + respond( + false, + undefined, + errorShape(ErrorCodes.UNAVAILABLE, formatForLog(err)), + ); + } + break; + } + case "node.pair.list": { + const params = (req.params ?? {}) as Record; + if (!validateNodePairListParams(params)) { + respond( + false, + undefined, + errorShape( + ErrorCodes.INVALID_REQUEST, + `invalid node.pair.list params: ${formatValidationErrors(validateNodePairListParams.errors)}`, + ), + ); + break; + } + try { + const list = await listNodePairing(); + respond(true, list, undefined); + } catch (err) { + respond( + false, + undefined, + errorShape(ErrorCodes.UNAVAILABLE, formatForLog(err)), + ); + } + break; + } + case "node.pair.approve": { + const params = (req.params ?? {}) as Record; + if (!validateNodePairApproveParams(params)) { + respond( + false, + undefined, + errorShape( + ErrorCodes.INVALID_REQUEST, + `invalid node.pair.approve params: ${formatValidationErrors(validateNodePairApproveParams.errors)}`, + ), + ); + break; + } + const { requestId } = params as { requestId: string }; + try { + const approved = await approveNodePairing(requestId); + if (!approved) { + respond( + false, + undefined, + errorShape(ErrorCodes.INVALID_REQUEST, "unknown requestId"), + ); + break; + } + broadcast( + "node.pair.resolved", + { + requestId, + nodeId: approved.node.nodeId, + decision: "approved", + ts: Date.now(), + }, + { dropIfSlow: true }, + ); + respond(true, approved, undefined); + } catch (err) { + respond( + false, + undefined, + errorShape(ErrorCodes.UNAVAILABLE, formatForLog(err)), + ); + } + break; + } + case "node.pair.reject": { + const params = (req.params ?? {}) as Record; + if (!validateNodePairRejectParams(params)) { + respond( + false, + undefined, + errorShape( + ErrorCodes.INVALID_REQUEST, + `invalid node.pair.reject params: ${formatValidationErrors(validateNodePairRejectParams.errors)}`, + ), + ); + break; + } + const { requestId } = params as { requestId: string }; + try { + const rejected = await rejectNodePairing(requestId); + if (!rejected) { + respond( + false, + undefined, + errorShape(ErrorCodes.INVALID_REQUEST, "unknown requestId"), + ); + break; + } + broadcast( + "node.pair.resolved", + { + requestId, + nodeId: rejected.nodeId, + decision: "rejected", + ts: Date.now(), + }, + { dropIfSlow: true }, + ); + respond(true, rejected, undefined); + } catch (err) { + respond( + false, + undefined, + errorShape(ErrorCodes.UNAVAILABLE, formatForLog(err)), + ); + } + break; + } + case "node.pair.verify": { + const params = (req.params ?? {}) as Record; + if (!validateNodePairVerifyParams(params)) { + respond( + false, + undefined, + errorShape( + ErrorCodes.INVALID_REQUEST, + `invalid node.pair.verify params: ${formatValidationErrors(validateNodePairVerifyParams.errors)}`, + ), + ); + break; + } + const { nodeId, token } = params as { + nodeId: string; + token: string; + }; + try { + const result = await verifyNodeToken(nodeId, token); + respond(true, result, undefined); + } catch (err) { + respond( + false, + undefined, + errorShape(ErrorCodes.UNAVAILABLE, formatForLog(err)), + ); + } + break; + } case "send": { const p = (req.params ?? {}) as Record; if (!validateSendParams(p)) { @@ -1682,6 +2059,20 @@ export async function startGatewayServer( return { close: async () => { + if (bonjourStop) { + try { + await bonjourStop(); + } catch { + /* ignore */ + } + } + if (bridge) { + try { + await bridge.close(); + } catch { + /* ignore */ + } + } providerAbort.abort(); cron.stop(); broadcast("shutdown", { diff --git a/src/infra/bonjour.ts b/src/infra/bonjour.ts new file mode 100644 index 000000000..3f1b7ae1d --- /dev/null +++ b/src/infra/bonjour.ts @@ -0,0 +1,112 @@ +import os from "node:os"; + +import { type CiaoService, getResponder, Protocol } from "@homebridge/ciao"; + +export type GatewayBonjourAdvertiser = { + stop: () => Promise; +}; + +export type GatewayBonjourAdvertiseOpts = { + instanceName?: string; + gatewayPort: number; + sshPort?: number; + bridgePort?: number; + tailnetDns?: string; +}; + +function isDisabledByEnv() { + if (process.env.CLAWDIS_DISABLE_BONJOUR === "1") return true; + if (process.env.NODE_ENV === "test") return true; + if (process.env.VITEST) return true; + return false; +} + +function safeServiceName(name: string) { + const trimmed = name.trim(); + return trimmed.length > 0 ? trimmed : "Clawdis"; +} + +export async function startGatewayBonjourAdvertiser( + opts: GatewayBonjourAdvertiseOpts, +): Promise { + if (isDisabledByEnv()) { + return { stop: async () => {} }; + } + + const responder = getResponder(); + + const hostname = os.hostname().replace(/\.local$/i, ""); + const instanceName = + typeof opts.instanceName === "string" && opts.instanceName.trim() + ? opts.instanceName.trim() + : `${hostname} (Clawdis)`; + + const txtBase: Record = { + role: "master", + gatewayPort: String(opts.gatewayPort), + lanHost: `${hostname}.local`, + }; + if (typeof opts.bridgePort === "number" && opts.bridgePort > 0) { + txtBase.bridgePort = String(opts.bridgePort); + } + if (typeof opts.tailnetDns === "string" && opts.tailnetDns.trim()) { + txtBase.tailnetDns = opts.tailnetDns.trim(); + } + + const services: CiaoService[] = []; + + // Master beacon: used for discovery (auto-fill SSH/direct targets). + // We advertise a TCP service so clients can resolve the host; the port itself is informational. + const master = responder.createService({ + name: safeServiceName(instanceName), + type: "clawdis-master", + protocol: Protocol.TCP, + port: opts.sshPort ?? 22, + txt: { + ...txtBase, + sshPort: String(opts.sshPort ?? 22), + }, + }); + services.push(master); + + // Optional bridge beacon (same type used by Iris/iOS today). + if (typeof opts.bridgePort === "number" && opts.bridgePort > 0) { + const bridge = responder.createService({ + name: safeServiceName(instanceName), + type: "clawdis-bridge", + protocol: Protocol.TCP, + port: opts.bridgePort, + txt: { + ...txtBase, + transport: "bridge", + }, + }); + services.push(bridge); + } + + // Do not block gateway startup on mDNS probing/announce. Advertising can take + // multiple seconds depending on network state; the gateway should come up even + // if Bonjour is slow or fails. + for (const svc of services) { + void svc.advertise().catch(() => { + /* ignore */ + }); + } + + return { + stop: async () => { + for (const svc of services) { + try { + await svc.destroy(); + } catch { + /* ignore */ + } + } + try { + await responder.shutdown(); + } catch { + /* ignore */ + } + }, + }; +} diff --git a/src/infra/bridge/server.test.ts b/src/infra/bridge/server.test.ts new file mode 100644 index 000000000..7ec26c62a --- /dev/null +++ b/src/infra/bridge/server.test.ts @@ -0,0 +1,129 @@ +import fs from "node:fs/promises"; +import net from "node:net"; +import os from "node:os"; +import path from "node:path"; + +import { afterAll, beforeAll, describe, expect, it } from "vitest"; + +import { approveNodePairing, listNodePairing } from "../node-pairing.js"; +import { startNodeBridgeServer } from "./server.js"; + +function createLineReader(socket: net.Socket) { + let buffer = ""; + const pending: Array<(line: string) => void> = []; + + const flush = () => { + while (pending.length > 0) { + const idx = buffer.indexOf("\n"); + if (idx === -1) return; + const line = buffer.slice(0, idx); + buffer = buffer.slice(idx + 1); + const resolve = pending.shift(); + resolve?.(line); + } + }; + + socket.on("data", (chunk) => { + buffer += chunk.toString("utf8"); + flush(); + }); + + const readLine = async () => { + flush(); + const idx = buffer.indexOf("\n"); + if (idx !== -1) { + const line = buffer.slice(0, idx); + buffer = buffer.slice(idx + 1); + return line; + } + return await new Promise((resolve) => pending.push(resolve)); + }; + + return readLine; +} + +function sendLine(socket: net.Socket, obj: unknown) { + socket.write(`${JSON.stringify(obj)}\n`); +} + +describe("node bridge server", () => { + let baseDir = ""; + + beforeAll(async () => { + process.env.CLAWDIS_ENABLE_BRIDGE_IN_TESTS = "1"; + baseDir = await fs.mkdtemp(path.join(os.tmpdir(), "clawdis-bridge-test-")); + }); + + afterAll(async () => { + await fs.rm(baseDir, { recursive: true, force: true }); + delete process.env.CLAWDIS_ENABLE_BRIDGE_IN_TESTS; + }); + + it("rejects hello when not paired", async () => { + const server = await startNodeBridgeServer({ + host: "127.0.0.1", + port: 0, + pairingBaseDir: baseDir, + }); + + const socket = net.connect({ host: "127.0.0.1", port: server.port }); + const readLine = createLineReader(socket); + sendLine(socket, { type: "hello", nodeId: "n1" }); + const line = await readLine(); + const msg = JSON.parse(line) as { type: string; code?: string }; + expect(msg.type).toBe("error"); + expect(msg.code).toBe("NOT_PAIRED"); + socket.destroy(); + await server.close(); + }); + + it("pairs after approval and then accepts hello", async () => { + const server = await startNodeBridgeServer({ + host: "127.0.0.1", + port: 0, + pairingBaseDir: baseDir, + }); + + const socket = net.connect({ host: "127.0.0.1", port: server.port }); + const readLine = createLineReader(socket); + sendLine(socket, { type: "pair-request", nodeId: "n2", platform: "ios" }); + + // Approve the pending request from the gateway side. + let reqId: string | undefined; + for (let i = 0; i < 40; i += 1) { + const list = await listNodePairing(baseDir); + const req = list.pending.find((p) => p.nodeId === "n2"); + if (req) { + reqId = req.requestId; + break; + } + await new Promise((r) => setTimeout(r, 25)); + } + expect(reqId).toBeTruthy(); + if (!reqId) throw new Error("expected a pending requestId"); + await approveNodePairing(reqId, baseDir); + + const line1 = JSON.parse(await readLine()) as { + type: string; + token?: string; + }; + expect(line1.type).toBe("pair-ok"); + expect(typeof line1.token).toBe("string"); + if (!line1.token) throw new Error("expected pair-ok token"); + const token = line1.token; + + const line2 = JSON.parse(await readLine()) as { type: string }; + expect(line2.type).toBe("hello-ok"); + + socket.destroy(); + + const socket2 = net.connect({ host: "127.0.0.1", port: server.port }); + const readLine2 = createLineReader(socket2); + sendLine(socket2, { type: "hello", nodeId: "n2", token }); + const line3 = JSON.parse(await readLine2()) as { type: string }; + expect(line3.type).toBe("hello-ok"); + socket2.destroy(); + + await server.close(); + }); +}); diff --git a/src/infra/bridge/server.ts b/src/infra/bridge/server.ts new file mode 100644 index 000000000..3f5659674 --- /dev/null +++ b/src/infra/bridge/server.ts @@ -0,0 +1,356 @@ +import net from "node:net"; +import os from "node:os"; + +import { + getPairedNode, + listNodePairing, + requestNodePairing, + verifyNodeToken, +} from "../node-pairing.js"; + +type BridgeHelloFrame = { + type: "hello"; + nodeId: string; + displayName?: string; + token?: string; + platform?: string; + version?: string; +}; + +type BridgePairRequestFrame = { + type: "pair-request"; + nodeId: string; + displayName?: string; + platform?: string; + version?: string; + remoteAddress?: string; +}; + +type BridgeEventFrame = { + type: "event"; + event: string; + payloadJSON?: string | null; +}; + +type BridgePingFrame = { type: "ping"; id: string }; +type BridgePongFrame = { type: "pong"; id: string }; + +type BridgeInvokeResponseFrame = { + type: "invoke-res"; + id: string; + ok: boolean; + payloadJSON?: string | null; + error?: { code: string; message: string } | null; +}; + +type BridgeHelloOkFrame = { type: "hello-ok"; serverName: string }; +type BridgePairOkFrame = { type: "pair-ok"; token: string }; +type BridgeErrorFrame = { type: "error"; code: string; message: string }; + +type AnyBridgeFrame = + | BridgeHelloFrame + | BridgePairRequestFrame + | BridgeEventFrame + | BridgePingFrame + | BridgePongFrame + | BridgeInvokeResponseFrame + | BridgeHelloOkFrame + | BridgePairOkFrame + | BridgeErrorFrame + | { type: string; [k: string]: unknown }; + +export type NodeBridgeServer = { + port: number; + close: () => Promise; +}; + +export type NodeBridgeServerOpts = { + host: string; + port: number; // 0 = ephemeral + pairingBaseDir?: string; + onEvent?: (nodeId: string, evt: BridgeEventFrame) => Promise | void; + onAuthenticated?: (nodeId: string) => Promise | void; + onDisconnected?: (nodeId: string) => Promise | void; + serverName?: string; +}; + +function isTestEnv() { + return process.env.NODE_ENV === "test" || Boolean(process.env.VITEST); +} + +function encodeLine(frame: AnyBridgeFrame) { + return `${JSON.stringify(frame)}\n`; +} + +async function sleep(ms: number) { + await new Promise((resolve) => setTimeout(resolve, ms)); +} + +export async function startNodeBridgeServer( + opts: NodeBridgeServerOpts, +): Promise { + if (isTestEnv() && process.env.CLAWDIS_ENABLE_BRIDGE_IN_TESTS !== "1") { + return { + port: 0, + close: async () => {}, + }; + } + + const serverName = + typeof opts.serverName === "string" && opts.serverName.trim() + ? opts.serverName.trim() + : os.hostname(); + + const connections = new Map(); + + const server = net.createServer((socket) => { + socket.setNoDelay(true); + + let buffer = ""; + let isAuthenticated = false; + let nodeId: string | null = null; + const invokeWaiters = new Map< + string, + { + resolve: (value: BridgeInvokeResponseFrame) => void; + reject: (err: Error) => void; + } + >(); + + const abort = new AbortController(); + const stop = () => { + if (!abort.signal.aborted) abort.abort(); + if (nodeId) connections.delete(nodeId); + for (const [, waiter] of invokeWaiters) { + waiter.reject(new Error("bridge connection closed")); + } + invokeWaiters.clear(); + }; + + const send = (frame: AnyBridgeFrame) => { + try { + socket.write(encodeLine(frame)); + } catch { + // ignore + } + }; + + const sendError = (code: string, message: string) => { + send({ type: "error", code, message } satisfies BridgeErrorFrame); + }; + + const remoteAddress = (() => { + const addr = socket.remoteAddress?.trim(); + return addr && addr.length > 0 ? addr : undefined; + })(); + + const handleHello = async (hello: BridgeHelloFrame) => { + nodeId = String(hello.nodeId ?? "").trim(); + if (!nodeId) { + sendError("INVALID_REQUEST", "nodeId required"); + return; + } + + const token = typeof hello.token === "string" ? hello.token.trim() : ""; + if (!token) { + const paired = await getPairedNode(nodeId, opts.pairingBaseDir); + sendError(paired ? "UNAUTHORIZED" : "NOT_PAIRED", "pairing required"); + return; + } + + const verified = await verifyNodeToken( + nodeId, + token, + opts.pairingBaseDir, + ); + if (!verified.ok) { + sendError("UNAUTHORIZED", "invalid token"); + return; + } + + isAuthenticated = true; + connections.set(nodeId, socket); + send({ type: "hello-ok", serverName } satisfies BridgeHelloOkFrame); + await opts.onAuthenticated?.(nodeId); + }; + + const waitForApproval = async (request: { + requestId: string; + nodeId: string; + ts: number; + isRepair?: boolean; + }): Promise< + { ok: true; token: string } | { ok: false; reason: string } + > => { + const deadline = Date.now() + 5 * 60 * 1000; + while (!abort.signal.aborted && Date.now() < deadline) { + const list = await listNodePairing(opts.pairingBaseDir); + const stillPending = list.pending.some( + (p) => p.requestId === request.requestId, + ); + if (stillPending) { + await sleep(250); + continue; + } + + const paired = await getPairedNode(request.nodeId, opts.pairingBaseDir); + if (!paired) return { ok: false, reason: "pairing rejected" }; + + // For a repair, ensure this approval happened after the request was created. + if (paired.approvedAtMs < request.ts) { + return { ok: false, reason: "pairing rejected" }; + } + + return { ok: true, token: paired.token }; + } + + return { + ok: false, + reason: abort.signal.aborted ? "disconnected" : "pairing expired", + }; + }; + + const handlePairRequest = async (req: BridgePairRequestFrame) => { + nodeId = String(req.nodeId ?? "").trim(); + if (!nodeId) { + sendError("INVALID_REQUEST", "nodeId required"); + return; + } + + const result = await requestNodePairing( + { + nodeId, + displayName: req.displayName, + platform: req.platform, + version: req.version, + remoteIp: remoteAddress, + }, + opts.pairingBaseDir, + ); + + const wait = await waitForApproval(result.request); + if (!wait.ok) { + sendError("UNAUTHORIZED", wait.reason); + return; + } + + isAuthenticated = true; + connections.set(nodeId, socket); + send({ type: "pair-ok", token: wait.token } satisfies BridgePairOkFrame); + send({ type: "hello-ok", serverName } satisfies BridgeHelloOkFrame); + await opts.onAuthenticated?.(nodeId); + }; + + const handleEvent = async (evt: BridgeEventFrame) => { + if (!isAuthenticated || !nodeId) { + sendError("UNAUTHORIZED", "not authenticated"); + return; + } + await opts.onEvent?.(nodeId, evt); + }; + + socket.on("data", (chunk) => { + buffer += chunk.toString("utf8"); + while (true) { + const idx = buffer.indexOf("\n"); + if (idx === -1) break; + const line = buffer.slice(0, idx); + buffer = buffer.slice(idx + 1); + const trimmed = line.trim(); + if (!trimmed) continue; + + void (async () => { + let frame: AnyBridgeFrame; + try { + frame = JSON.parse(trimmed) as AnyBridgeFrame; + } catch (err) { + sendError("INVALID_REQUEST", String(err)); + return; + } + + const type = typeof frame.type === "string" ? frame.type : ""; + try { + switch (type) { + case "hello": + await handleHello(frame as BridgeHelloFrame); + break; + case "pair-request": + await handlePairRequest(frame as BridgePairRequestFrame); + break; + case "event": + await handleEvent(frame as BridgeEventFrame); + break; + case "ping": { + if (!isAuthenticated) { + sendError("UNAUTHORIZED", "not authenticated"); + break; + } + const ping = frame as BridgePingFrame; + send({ + type: "pong", + id: String(ping.id ?? ""), + } satisfies BridgePongFrame); + break; + } + case "invoke-res": { + if (!isAuthenticated) { + sendError("UNAUTHORIZED", "not authenticated"); + break; + } + const res = frame as BridgeInvokeResponseFrame; + const waiter = invokeWaiters.get(res.id); + if (waiter) { + invokeWaiters.delete(res.id); + waiter.resolve(res); + } + break; + } + case "pong": + // ignore + break; + default: + sendError("INVALID_REQUEST", "unknown type"); + } + } catch (err) { + sendError("INVALID_REQUEST", String(err)); + } + })(); + } + }); + + socket.on("close", () => { + const id = nodeId; + stop(); + if (id && isAuthenticated) void opts.onDisconnected?.(id); + }); + socket.on("error", () => { + // close handler will run after close + }); + }); + + await new Promise((resolve, reject) => { + server.once("error", reject); + server.listen(opts.port, opts.host, () => resolve()); + }); + + const address = server.address(); + const port = + typeof address === "object" && address ? address.port : opts.port; + + return { + port, + close: async () => { + for (const sock of connections.values()) { + try { + sock.destroy(); + } catch { + /* ignore */ + } + } + connections.clear(); + await new Promise((resolve, reject) => + server.close((err) => (err ? reject(err) : resolve())), + ); + }, + }; +} diff --git a/src/infra/node-pairing.ts b/src/infra/node-pairing.ts new file mode 100644 index 000000000..c7d727f3c --- /dev/null +++ b/src/infra/node-pairing.ts @@ -0,0 +1,238 @@ +import { randomUUID } from "node:crypto"; +import fs from "node:fs/promises"; +import os from "node:os"; +import path from "node:path"; + +export type NodePairingPendingRequest = { + requestId: string; + nodeId: string; + displayName?: string; + platform?: string; + version?: string; + remoteIp?: string; + isRepair?: boolean; + ts: number; +}; + +export type NodePairingPairedNode = { + nodeId: string; + token: string; + displayName?: string; + platform?: string; + version?: string; + remoteIp?: string; + createdAtMs: number; + approvedAtMs: number; +}; + +export type NodePairingList = { + pending: NodePairingPendingRequest[]; + paired: NodePairingPairedNode[]; +}; + +type NodePairingStateFile = { + pendingById: Record; + pairedByNodeId: Record; +}; + +const PENDING_TTL_MS = 5 * 60 * 1000; + +function defaultBaseDir() { + return path.join(os.homedir(), ".clawdis"); +} + +function resolvePaths(baseDir?: string) { + const root = baseDir ?? defaultBaseDir(); + const dir = path.join(root, "nodes"); + return { + dir, + pendingPath: path.join(dir, "pending.json"), + pairedPath: path.join(dir, "paired.json"), + }; +} + +async function readJSON(filePath: string): Promise { + try { + const raw = await fs.readFile(filePath, "utf8"); + return JSON.parse(raw) as T; + } catch { + return null; + } +} + +async function writeJSONAtomic(filePath: string, value: unknown) { + const dir = path.dirname(filePath); + await fs.mkdir(dir, { recursive: true }); + const tmp = `${filePath}.${randomUUID()}.tmp`; + await fs.writeFile(tmp, JSON.stringify(value, null, 2), "utf8"); + await fs.rename(tmp, filePath); +} + +function pruneExpiredPending( + pendingById: Record, + nowMs: number, +) { + for (const [id, req] of Object.entries(pendingById)) { + if (nowMs - req.ts > PENDING_TTL_MS) { + delete pendingById[id]; + } + } +} + +let lock: Promise = Promise.resolve(); +async function withLock(fn: () => Promise): Promise { + const prev = lock; + let release: (() => void) | undefined; + lock = new Promise((resolve) => { + release = resolve; + }); + await prev; + try { + return await fn(); + } finally { + release?.(); + } +} + +async function loadState(baseDir?: string): Promise { + const { pendingPath, pairedPath } = resolvePaths(baseDir); + const [pending, paired] = await Promise.all([ + readJSON>(pendingPath), + readJSON>(pairedPath), + ]); + const state: NodePairingStateFile = { + pendingById: pending ?? {}, + pairedByNodeId: paired ?? {}, + }; + pruneExpiredPending(state.pendingById, Date.now()); + return state; +} + +async function persistState(state: NodePairingStateFile, baseDir?: string) { + const { pendingPath, pairedPath } = resolvePaths(baseDir); + await Promise.all([ + writeJSONAtomic(pendingPath, state.pendingById), + writeJSONAtomic(pairedPath, state.pairedByNodeId), + ]); +} + +function normalizeNodeId(nodeId: string) { + return nodeId.trim(); +} + +function newToken() { + return randomUUID().replaceAll("-", ""); +} + +export async function listNodePairing( + baseDir?: string, +): Promise { + const state = await loadState(baseDir); + const pending = Object.values(state.pendingById).sort((a, b) => b.ts - a.ts); + const paired = Object.values(state.pairedByNodeId).sort( + (a, b) => b.approvedAtMs - a.approvedAtMs, + ); + return { pending, paired }; +} + +export async function getPairedNode( + nodeId: string, + baseDir?: string, +): Promise { + const state = await loadState(baseDir); + return state.pairedByNodeId[normalizeNodeId(nodeId)] ?? null; +} + +export async function requestNodePairing( + req: Omit, + baseDir?: string, +): Promise<{ + status: "pending"; + request: NodePairingPendingRequest; + created: boolean; +}> { + return await withLock(async () => { + const state = await loadState(baseDir); + const nodeId = normalizeNodeId(req.nodeId); + if (!nodeId) { + throw new Error("nodeId required"); + } + + const existing = Object.values(state.pendingById).find( + (p) => p.nodeId === nodeId, + ); + if (existing) { + return { status: "pending", request: existing, created: false }; + } + + const isRepair = Boolean(state.pairedByNodeId[nodeId]); + const request: NodePairingPendingRequest = { + requestId: randomUUID(), + nodeId, + displayName: req.displayName, + platform: req.platform, + version: req.version, + remoteIp: req.remoteIp, + isRepair, + ts: Date.now(), + }; + state.pendingById[request.requestId] = request; + await persistState(state, baseDir); + return { status: "pending", request, created: true }; + }); +} + +export async function approveNodePairing( + requestId: string, + baseDir?: string, +): Promise<{ requestId: string; node: NodePairingPairedNode } | null> { + return await withLock(async () => { + const state = await loadState(baseDir); + const pending = state.pendingById[requestId]; + if (!pending) return null; + + const now = Date.now(); + const existing = state.pairedByNodeId[pending.nodeId]; + const node: NodePairingPairedNode = { + nodeId: pending.nodeId, + token: newToken(), + displayName: pending.displayName, + platform: pending.platform, + version: pending.version, + remoteIp: pending.remoteIp, + createdAtMs: existing?.createdAtMs ?? now, + approvedAtMs: now, + }; + + delete state.pendingById[requestId]; + state.pairedByNodeId[pending.nodeId] = node; + await persistState(state, baseDir); + return { requestId, node }; + }); +} + +export async function rejectNodePairing( + requestId: string, + baseDir?: string, +): Promise<{ requestId: string; nodeId: string } | null> { + return await withLock(async () => { + const state = await loadState(baseDir); + const pending = state.pendingById[requestId]; + if (!pending) return null; + delete state.pendingById[requestId]; + await persistState(state, baseDir); + return { requestId, nodeId: pending.nodeId }; + }); +} + +export async function verifyNodeToken( + nodeId: string, + token: string, + baseDir?: string, +): Promise<{ ok: boolean; node?: NodePairingPairedNode }> { + const state = await loadState(baseDir); + const normalized = normalizeNodeId(nodeId); + const node = state.pairedByNodeId[normalized]; + if (!node) return { ok: false }; + return node.token === token ? { ok: true, node } : { ok: false }; +}