refactor: remove bridge protocol
This commit is contained in:
105
apps/macos/Sources/Clawdbot/NodeMode/GatewayTLSPinning.swift
Normal file
105
apps/macos/Sources/Clawdbot/NodeMode/GatewayTLSPinning.swift
Normal file
@@ -0,0 +1,105 @@
|
||||
import CryptoKit
|
||||
import Foundation
|
||||
import Security
|
||||
|
||||
struct GatewayTLSParams: Sendable {
|
||||
let required: Bool
|
||||
let expectedFingerprint: String?
|
||||
let allowTOFU: Bool
|
||||
let storeKey: String?
|
||||
}
|
||||
|
||||
enum GatewayTLSStore {
|
||||
private static let suiteName = "com.clawdbot.shared"
|
||||
private static let keyPrefix = "gateway.tls."
|
||||
|
||||
private static var defaults: UserDefaults {
|
||||
UserDefaults(suiteName: suiteName) ?? .standard
|
||||
}
|
||||
|
||||
static func loadFingerprint(stableID: String) -> String? {
|
||||
let key = self.keyPrefix + stableID
|
||||
let raw = self.defaults.string(forKey: key)?.trimmingCharacters(in: .whitespacesAndNewlines)
|
||||
return raw?.isEmpty == false ? raw : nil
|
||||
}
|
||||
|
||||
static func saveFingerprint(_ value: String, stableID: String) {
|
||||
let key = self.keyPrefix + stableID
|
||||
self.defaults.set(value, forKey: key)
|
||||
}
|
||||
}
|
||||
|
||||
final class GatewayTLSPinningSession: NSObject, WebSocketSessioning, URLSessionDelegate {
|
||||
private let params: GatewayTLSParams
|
||||
private lazy var session: URLSession = {
|
||||
let config = URLSessionConfiguration.default
|
||||
config.waitsForConnectivity = true
|
||||
return URLSession(configuration: config, delegate: self, delegateQueue: nil)
|
||||
}()
|
||||
|
||||
init(params: GatewayTLSParams) {
|
||||
self.params = params
|
||||
super.init()
|
||||
}
|
||||
|
||||
func makeWebSocketTask(url: URL) -> WebSocketTaskBox {
|
||||
let task = self.session.webSocketTask(with: url)
|
||||
task.maximumMessageSize = 16 * 1024 * 1024
|
||||
return WebSocketTaskBox(task: task)
|
||||
}
|
||||
|
||||
func urlSession(
|
||||
_ session: URLSession,
|
||||
didReceive challenge: URLAuthenticationChallenge,
|
||||
completionHandler: @escaping (URLSession.AuthChallengeDisposition, URLCredential?) -> Void
|
||||
) {
|
||||
guard challenge.protectionSpace.authenticationMethod == NSURLAuthenticationMethodServerTrust,
|
||||
let trust = challenge.protectionSpace.serverTrust
|
||||
else {
|
||||
completionHandler(.performDefaultHandling, nil)
|
||||
return
|
||||
}
|
||||
|
||||
let expected = params.expectedFingerprint.map(normalizeFingerprint)
|
||||
if let fingerprint = certificateFingerprint(trust) {
|
||||
if let expected {
|
||||
if fingerprint == expected {
|
||||
completionHandler(.useCredential, URLCredential(trust: trust))
|
||||
} else {
|
||||
completionHandler(.cancelAuthenticationChallenge, nil)
|
||||
}
|
||||
return
|
||||
}
|
||||
if params.allowTOFU {
|
||||
if let storeKey = params.storeKey {
|
||||
GatewayTLSStore.saveFingerprint(fingerprint, stableID: storeKey)
|
||||
}
|
||||
completionHandler(.useCredential, URLCredential(trust: trust))
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
let ok = SecTrustEvaluateWithError(trust, nil)
|
||||
if ok || !params.required {
|
||||
completionHandler(.useCredential, URLCredential(trust: trust))
|
||||
} else {
|
||||
completionHandler(.cancelAuthenticationChallenge, nil)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private func certificateFingerprint(_ trust: SecTrust) -> String? {
|
||||
let count = SecTrustGetCertificateCount(trust)
|
||||
guard count > 0, let cert = SecTrustGetCertificateAtIndex(trust, 0) else { return nil }
|
||||
let data = SecCertificateCopyData(cert) as Data
|
||||
return sha256Hex(data)
|
||||
}
|
||||
|
||||
private func sha256Hex(_ data: Data) -> String {
|
||||
let digest = SHA256.hash(data: data)
|
||||
return digest.map { String(format: "%02x", $0) }.joined()
|
||||
}
|
||||
|
||||
private func normalizeFingerprint(_ raw: String) -> String {
|
||||
raw.lowercased().filter(\.isHexDigit)
|
||||
}
|
||||
@@ -1,238 +0,0 @@
|
||||
import ClawdbotKit
|
||||
import Foundation
|
||||
import Network
|
||||
|
||||
actor MacNodeBridgePairingClient {
|
||||
private let encoder = JSONEncoder()
|
||||
private let decoder = JSONDecoder()
|
||||
private var lineBuffer = Data()
|
||||
|
||||
func pairAndHello(
|
||||
endpoint: NWEndpoint,
|
||||
hello: BridgeHello,
|
||||
silent: Bool,
|
||||
tls: MacNodeBridgeTLSParams? = nil,
|
||||
onStatus: (@Sendable (String) -> Void)? = nil) async throws -> String
|
||||
{
|
||||
do {
|
||||
return try await self.pairAndHelloOnce(
|
||||
endpoint: endpoint,
|
||||
hello: hello,
|
||||
silent: silent,
|
||||
tls: tls,
|
||||
onStatus: onStatus)
|
||||
} catch {
|
||||
if let tls, !tls.required {
|
||||
return try await self.pairAndHelloOnce(
|
||||
endpoint: endpoint,
|
||||
hello: hello,
|
||||
silent: silent,
|
||||
tls: nil,
|
||||
onStatus: onStatus)
|
||||
}
|
||||
throw error
|
||||
}
|
||||
}
|
||||
|
||||
private func pairAndHelloOnce(
|
||||
endpoint: NWEndpoint,
|
||||
hello: BridgeHello,
|
||||
silent: Bool,
|
||||
tls: MacNodeBridgeTLSParams?,
|
||||
onStatus: (@Sendable (String) -> Void)? = nil) async throws -> String
|
||||
{
|
||||
self.lineBuffer = Data()
|
||||
let params = self.makeParameters(tls: tls)
|
||||
let connection = NWConnection(to: endpoint, using: params)
|
||||
let queue = DispatchQueue(label: "com.clawdbot.macos.bridge-client")
|
||||
defer { connection.cancel() }
|
||||
try await AsyncTimeout.withTimeout(
|
||||
seconds: 8,
|
||||
onTimeout: {
|
||||
NSError(domain: "Bridge", code: 0, userInfo: [
|
||||
NSLocalizedDescriptionKey: "connect timed out",
|
||||
])
|
||||
},
|
||||
operation: {
|
||||
try await self.startAndWaitForReady(connection, queue: queue)
|
||||
})
|
||||
|
||||
onStatus?("Authenticating…")
|
||||
try await self.send(hello, over: connection)
|
||||
|
||||
let first = try await AsyncTimeout.withTimeout(
|
||||
seconds: 10,
|
||||
onTimeout: {
|
||||
NSError(domain: "Bridge", code: 0, userInfo: [
|
||||
NSLocalizedDescriptionKey: "hello timed out",
|
||||
])
|
||||
},
|
||||
operation: { () -> ReceivedFrame in
|
||||
guard let frame = try await self.receiveFrame(over: connection) else {
|
||||
throw NSError(domain: "Bridge", code: 0, userInfo: [
|
||||
NSLocalizedDescriptionKey: "Bridge closed connection during hello",
|
||||
])
|
||||
}
|
||||
return frame
|
||||
})
|
||||
|
||||
switch first.base.type {
|
||||
case "hello-ok":
|
||||
return hello.token ?? ""
|
||||
|
||||
case "error":
|
||||
let err = try self.decoder.decode(BridgeErrorFrame.self, from: first.data)
|
||||
if err.code != "NOT_PAIRED", err.code != "UNAUTHORIZED" {
|
||||
throw NSError(domain: "Bridge", code: 1, userInfo: [
|
||||
NSLocalizedDescriptionKey: "\(err.code): \(err.message)",
|
||||
])
|
||||
}
|
||||
|
||||
onStatus?("Requesting approval…")
|
||||
try await self.send(
|
||||
BridgePairRequest(
|
||||
nodeId: hello.nodeId,
|
||||
displayName: hello.displayName,
|
||||
platform: hello.platform,
|
||||
version: hello.version,
|
||||
coreVersion: hello.coreVersion,
|
||||
uiVersion: hello.uiVersion,
|
||||
deviceFamily: hello.deviceFamily,
|
||||
modelIdentifier: hello.modelIdentifier,
|
||||
caps: hello.caps,
|
||||
commands: hello.commands,
|
||||
silent: silent),
|
||||
over: connection)
|
||||
|
||||
onStatus?("Waiting for approval…")
|
||||
let ok = try await AsyncTimeout.withTimeout(
|
||||
seconds: 60,
|
||||
onTimeout: {
|
||||
NSError(domain: "Bridge", code: 0, userInfo: [
|
||||
NSLocalizedDescriptionKey: "pairing approval timed out",
|
||||
])
|
||||
},
|
||||
operation: {
|
||||
while let next = try await self.receiveFrame(over: connection) {
|
||||
switch next.base.type {
|
||||
case "pair-ok":
|
||||
return try self.decoder.decode(BridgePairOk.self, from: next.data)
|
||||
case "error":
|
||||
let e = try self.decoder.decode(BridgeErrorFrame.self, from: next.data)
|
||||
throw NSError(domain: "Bridge", code: 2, userInfo: [
|
||||
NSLocalizedDescriptionKey: "\(e.code): \(e.message)",
|
||||
])
|
||||
default:
|
||||
continue
|
||||
}
|
||||
}
|
||||
throw NSError(domain: "Bridge", code: 3, userInfo: [
|
||||
NSLocalizedDescriptionKey: "Pairing failed: bridge closed connection",
|
||||
])
|
||||
})
|
||||
|
||||
return ok.token
|
||||
|
||||
default:
|
||||
throw NSError(domain: "Bridge", code: 0, userInfo: [
|
||||
NSLocalizedDescriptionKey: "Unexpected bridge response",
|
||||
])
|
||||
}
|
||||
}
|
||||
|
||||
private func send(_ obj: some Encodable, over connection: NWConnection) async throws {
|
||||
let data = try self.encoder.encode(obj)
|
||||
var line = Data()
|
||||
line.append(data)
|
||||
line.append(0x0A)
|
||||
try await withCheckedThrowingContinuation(isolation: nil) { (cont: CheckedContinuation<Void, Error>) in
|
||||
connection.send(content: line, completion: .contentProcessed { err in
|
||||
if let err { cont.resume(throwing: err) } else { cont.resume(returning: ()) }
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
private struct ReceivedFrame {
|
||||
var base: BridgeBaseFrame
|
||||
var data: Data
|
||||
}
|
||||
|
||||
private func receiveFrame(over connection: NWConnection) async throws -> ReceivedFrame? {
|
||||
guard let lineData = try await self.receiveLineData(over: connection) else {
|
||||
return nil
|
||||
}
|
||||
let base = try self.decoder.decode(BridgeBaseFrame.self, from: lineData)
|
||||
return ReceivedFrame(base: base, data: lineData)
|
||||
}
|
||||
|
||||
private func receiveChunk(over connection: NWConnection) async throws -> Data {
|
||||
try await withCheckedThrowingContinuation(isolation: nil) { (cont: CheckedContinuation<Data, Error>) in
|
||||
connection.receive(minimumIncompleteLength: 1, maximumLength: 64 * 1024) { data, _, isComplete, error in
|
||||
if let error {
|
||||
cont.resume(throwing: error)
|
||||
return
|
||||
}
|
||||
if isComplete {
|
||||
cont.resume(returning: Data())
|
||||
return
|
||||
}
|
||||
cont.resume(returning: data ?? Data())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private func receiveLineData(over connection: NWConnection) async throws -> Data? {
|
||||
while true {
|
||||
if let idx = self.lineBuffer.firstIndex(of: 0x0A) {
|
||||
let line = self.lineBuffer.prefix(upTo: idx)
|
||||
self.lineBuffer.removeSubrange(...idx)
|
||||
return Data(line)
|
||||
}
|
||||
|
||||
let chunk = try await self.receiveChunk(over: connection)
|
||||
if chunk.isEmpty { return nil }
|
||||
self.lineBuffer.append(chunk)
|
||||
}
|
||||
}
|
||||
|
||||
private func makeParameters(tls: MacNodeBridgeTLSParams?) -> NWParameters {
|
||||
let tcpOptions = NWProtocolTCP.Options()
|
||||
if let tlsOptions = makeMacNodeTLSOptions(tls) {
|
||||
let params = NWParameters(tls: tlsOptions, tcp: tcpOptions)
|
||||
params.includePeerToPeer = true
|
||||
return params
|
||||
}
|
||||
let params = NWParameters.tcp
|
||||
params.includePeerToPeer = true
|
||||
return params
|
||||
}
|
||||
|
||||
private func startAndWaitForReady(
|
||||
_ connection: NWConnection,
|
||||
queue: DispatchQueue) async throws
|
||||
{
|
||||
let states = AsyncStream<NWConnection.State> { continuation in
|
||||
connection.stateUpdateHandler = { state in
|
||||
continuation.yield(state)
|
||||
if case .ready = state { continuation.finish() }
|
||||
if case .failed = state { continuation.finish() }
|
||||
if case .cancelled = state { continuation.finish() }
|
||||
}
|
||||
}
|
||||
connection.start(queue: queue)
|
||||
for await state in states {
|
||||
switch state {
|
||||
case .ready:
|
||||
return
|
||||
case let .failed(err):
|
||||
throw err
|
||||
case .cancelled:
|
||||
throw NSError(domain: "Bridge", code: 0, userInfo: [
|
||||
NSLocalizedDescriptionKey: "Bridge connection cancelled",
|
||||
])
|
||||
default:
|
||||
continue
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,519 +0,0 @@
|
||||
import ClawdbotKit
|
||||
import Foundation
|
||||
import Network
|
||||
import OSLog
|
||||
|
||||
actor MacNodeBridgeSession {
|
||||
private struct TimeoutError: LocalizedError {
|
||||
var message: String
|
||||
var errorDescription: String? { self.message }
|
||||
}
|
||||
|
||||
enum State: Sendable, Equatable {
|
||||
case idle
|
||||
case connecting
|
||||
case connected(serverName: String)
|
||||
case failed(message: String)
|
||||
}
|
||||
|
||||
private let logger = Logger(subsystem: "com.clawdbot", category: "node.bridge-session")
|
||||
private let encoder = JSONEncoder()
|
||||
private let decoder = JSONDecoder()
|
||||
private let clock = ContinuousClock()
|
||||
private var disconnectHandler: (@Sendable (String) async -> Void)?
|
||||
|
||||
private var connection: NWConnection?
|
||||
private var queue: DispatchQueue?
|
||||
private var buffer = Data()
|
||||
private var pendingRPC: [String: CheckedContinuation<BridgeRPCResponse, Error>] = [:]
|
||||
private var serverEventSubscribers: [UUID: AsyncStream<BridgeEventFrame>.Continuation] = [:]
|
||||
private var invokeTasks: [UUID: Task<Void, Never>] = [:]
|
||||
private var pingTask: Task<Void, Never>?
|
||||
private var lastPongAt: ContinuousClock.Instant?
|
||||
|
||||
private(set) var state: State = .idle
|
||||
|
||||
func connect(
|
||||
endpoint: NWEndpoint,
|
||||
hello: BridgeHello,
|
||||
tls: MacNodeBridgeTLSParams? = nil,
|
||||
onConnected: (@Sendable (String, String?) async -> Void)? = nil,
|
||||
onDisconnected: (@Sendable (String) async -> Void)? = nil,
|
||||
onInvoke: @escaping @Sendable (BridgeInvokeRequest) async -> BridgeInvokeResponse)
|
||||
async throws
|
||||
{
|
||||
await self.disconnect()
|
||||
self.disconnectHandler = onDisconnected
|
||||
self.state = .connecting
|
||||
do {
|
||||
try await self.connectOnce(
|
||||
endpoint: endpoint,
|
||||
hello: hello,
|
||||
tls: tls,
|
||||
onConnected: onConnected,
|
||||
onInvoke: onInvoke)
|
||||
} catch {
|
||||
if let tls, !tls.required {
|
||||
try await self.connectOnce(
|
||||
endpoint: endpoint,
|
||||
hello: hello,
|
||||
tls: nil,
|
||||
onConnected: onConnected,
|
||||
onInvoke: onInvoke)
|
||||
return
|
||||
}
|
||||
throw error
|
||||
}
|
||||
}
|
||||
|
||||
private func connectOnce(
|
||||
endpoint: NWEndpoint,
|
||||
hello: BridgeHello,
|
||||
tls: MacNodeBridgeTLSParams?,
|
||||
onConnected: (@Sendable (String, String?) async -> Void)? = nil,
|
||||
onInvoke: @escaping @Sendable (BridgeInvokeRequest) async -> BridgeInvokeResponse) async throws
|
||||
{
|
||||
let params = self.makeParameters(tls: tls)
|
||||
let connection = NWConnection(to: endpoint, using: params)
|
||||
let queue = DispatchQueue(label: "com.clawdbot.macos.bridge-session")
|
||||
self.connection = connection
|
||||
self.queue = queue
|
||||
|
||||
let stateStream = Self.makeStateStream(for: connection)
|
||||
connection.start(queue: queue)
|
||||
|
||||
try await Self.waitForReady(stateStream, timeoutSeconds: 6)
|
||||
connection.stateUpdateHandler = { [weak self] state in
|
||||
guard let self else { return }
|
||||
Task { await self.handleConnectionState(state) }
|
||||
}
|
||||
|
||||
try await AsyncTimeout.withTimeout(
|
||||
seconds: 6,
|
||||
onTimeout: {
|
||||
TimeoutError(message: "operation timed out")
|
||||
},
|
||||
operation: {
|
||||
try await self.send(hello)
|
||||
})
|
||||
|
||||
guard let line = try await AsyncTimeout.withTimeout(
|
||||
seconds: 6,
|
||||
onTimeout: {
|
||||
TimeoutError(message: "operation timed out")
|
||||
},
|
||||
operation: {
|
||||
try await self.receiveLine()
|
||||
}),
|
||||
let data = line.data(using: .utf8),
|
||||
let base = try? self.decoder.decode(BridgeBaseFrame.self, from: data)
|
||||
else {
|
||||
self.logger.error("node bridge hello failed (unexpected response)")
|
||||
await self.disconnect()
|
||||
throw NSError(domain: "Bridge", code: 1, userInfo: [
|
||||
NSLocalizedDescriptionKey: "Unexpected bridge response",
|
||||
])
|
||||
}
|
||||
|
||||
if base.type == "hello-ok" {
|
||||
let ok = try self.decoder.decode(BridgeHelloOk.self, from: data)
|
||||
self.state = .connected(serverName: ok.serverName)
|
||||
self.startPingLoop()
|
||||
let mainKey = ok.mainSessionKey?.trimmingCharacters(in: .whitespacesAndNewlines)
|
||||
await onConnected?(ok.serverName, mainKey?.isEmpty == false ? mainKey : nil)
|
||||
} else if base.type == "error" {
|
||||
let err = try self.decoder.decode(BridgeErrorFrame.self, from: data)
|
||||
self.state = .failed(message: "\(err.code): \(err.message)")
|
||||
self.logger.error("node bridge hello error: \(err.code, privacy: .public)")
|
||||
await self.disconnect()
|
||||
throw NSError(domain: "Bridge", code: 2, userInfo: [
|
||||
NSLocalizedDescriptionKey: "\(err.code): \(err.message)",
|
||||
])
|
||||
} else {
|
||||
self.state = .failed(message: "Unexpected bridge response")
|
||||
self.logger.error("node bridge hello failed (unexpected frame)")
|
||||
await self.disconnect()
|
||||
throw NSError(domain: "Bridge", code: 3, userInfo: [
|
||||
NSLocalizedDescriptionKey: "Unexpected bridge response",
|
||||
])
|
||||
}
|
||||
|
||||
do {
|
||||
while true {
|
||||
guard let next = try await self.receiveLine() else { break }
|
||||
guard let nextData = next.data(using: .utf8) else { continue }
|
||||
guard let nextBase = try? self.decoder.decode(BridgeBaseFrame.self, from: nextData) else { continue }
|
||||
|
||||
switch nextBase.type {
|
||||
case "res":
|
||||
let res = try self.decoder.decode(BridgeRPCResponse.self, from: nextData)
|
||||
if let cont = self.pendingRPC.removeValue(forKey: res.id) {
|
||||
cont.resume(returning: res)
|
||||
}
|
||||
|
||||
case "event":
|
||||
let evt = try self.decoder.decode(BridgeEventFrame.self, from: nextData)
|
||||
self.broadcastServerEvent(evt)
|
||||
|
||||
case "ping":
|
||||
let ping = try self.decoder.decode(BridgePing.self, from: nextData)
|
||||
try await self.send(BridgePong(type: "pong", id: ping.id))
|
||||
|
||||
case "pong":
|
||||
let pong = try self.decoder.decode(BridgePong.self, from: nextData)
|
||||
self.notePong(pong)
|
||||
|
||||
case "invoke":
|
||||
let req = try self.decoder.decode(BridgeInvokeRequest.self, from: nextData)
|
||||
let taskID = UUID()
|
||||
let task = Task { [weak self] in
|
||||
let res = await onInvoke(req)
|
||||
guard let self else { return }
|
||||
await self.sendInvokeResponse(res, taskID: taskID)
|
||||
}
|
||||
self.invokeTasks[taskID] = task
|
||||
|
||||
default:
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
await self.handleDisconnect(reason: "connection closed")
|
||||
} catch {
|
||||
self.logger.error(
|
||||
"node bridge receive failed: \(error.localizedDescription, privacy: .public)")
|
||||
await self.handleDisconnect(reason: "receive failed")
|
||||
throw error
|
||||
}
|
||||
}
|
||||
|
||||
func sendEvent(event: String, payloadJSON: String?) async throws {
|
||||
try await self.send(BridgeEventFrame(type: "event", event: event, payloadJSON: payloadJSON))
|
||||
}
|
||||
|
||||
func request(method: String, paramsJSON: String?, timeoutSeconds: Int = 15) async throws -> Data {
|
||||
guard self.connection != nil else {
|
||||
throw NSError(domain: "Bridge", code: 11, userInfo: [
|
||||
NSLocalizedDescriptionKey: "not connected",
|
||||
])
|
||||
}
|
||||
|
||||
let id = UUID().uuidString
|
||||
let req = BridgeRPCRequest(type: "req", id: id, method: method, paramsJSON: paramsJSON)
|
||||
|
||||
let timeoutTask = Task {
|
||||
try await Task.sleep(nanoseconds: UInt64(timeoutSeconds) * 1_000_000_000)
|
||||
await self.timeoutRPC(id: id)
|
||||
}
|
||||
defer { timeoutTask.cancel() }
|
||||
|
||||
let res: BridgeRPCResponse = try await withCheckedThrowingContinuation { cont in
|
||||
Task { [weak self] in
|
||||
guard let self else { return }
|
||||
await self.beginRPC(id: id, request: req, continuation: cont)
|
||||
}
|
||||
}
|
||||
|
||||
if res.ok {
|
||||
let payload = res.payloadJSON ?? ""
|
||||
guard let data = payload.data(using: .utf8) else {
|
||||
throw NSError(domain: "Bridge", code: 12, userInfo: [
|
||||
NSLocalizedDescriptionKey: "Bridge response not UTF-8",
|
||||
])
|
||||
}
|
||||
return data
|
||||
}
|
||||
|
||||
let code = res.error?.code ?? "UNAVAILABLE"
|
||||
let message = res.error?.message ?? "request failed"
|
||||
throw NSError(domain: "Bridge", code: 13, userInfo: [
|
||||
NSLocalizedDescriptionKey: "\(code): \(message)",
|
||||
])
|
||||
}
|
||||
|
||||
func subscribeServerEvents(bufferingNewest: Int = 200) -> AsyncStream<BridgeEventFrame> {
|
||||
let id = UUID()
|
||||
let session = self
|
||||
return AsyncStream(bufferingPolicy: .bufferingNewest(bufferingNewest)) { continuation in
|
||||
self.serverEventSubscribers[id] = continuation
|
||||
continuation.onTermination = { @Sendable _ in
|
||||
Task { await session.removeServerEventSubscriber(id) }
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func disconnect() async {
|
||||
self.pingTask?.cancel()
|
||||
self.pingTask = nil
|
||||
self.lastPongAt = nil
|
||||
self.disconnectHandler = nil
|
||||
self.cancelInvokeTasks()
|
||||
|
||||
self.connection?.cancel()
|
||||
self.connection = nil
|
||||
self.queue = nil
|
||||
self.buffer = Data()
|
||||
|
||||
let pending = self.pendingRPC.values
|
||||
self.pendingRPC.removeAll()
|
||||
for cont in pending {
|
||||
cont.resume(throwing: NSError(domain: "Bridge", code: 14, userInfo: [
|
||||
NSLocalizedDescriptionKey: "UNAVAILABLE: connection closed",
|
||||
]))
|
||||
}
|
||||
|
||||
for (_, cont) in self.serverEventSubscribers {
|
||||
cont.finish()
|
||||
}
|
||||
self.serverEventSubscribers.removeAll()
|
||||
|
||||
self.state = .idle
|
||||
}
|
||||
|
||||
private func beginRPC(
|
||||
id: String,
|
||||
request: BridgeRPCRequest,
|
||||
continuation: CheckedContinuation<BridgeRPCResponse, Error>) async
|
||||
{
|
||||
self.pendingRPC[id] = continuation
|
||||
do {
|
||||
try await self.send(request)
|
||||
} catch {
|
||||
await self.failRPC(id: id, error: error)
|
||||
}
|
||||
}
|
||||
|
||||
private func makeParameters(tls: MacNodeBridgeTLSParams?) -> NWParameters {
|
||||
let tcpOptions = NWProtocolTCP.Options()
|
||||
tcpOptions.enableKeepalive = true
|
||||
tcpOptions.keepaliveIdle = 30
|
||||
tcpOptions.keepaliveInterval = 15
|
||||
tcpOptions.keepaliveCount = 3
|
||||
|
||||
if let tlsOptions = makeMacNodeTLSOptions(tls) {
|
||||
let params = NWParameters(tls: tlsOptions, tcp: tcpOptions)
|
||||
params.includePeerToPeer = true
|
||||
return params
|
||||
}
|
||||
|
||||
let params = NWParameters.tcp
|
||||
params.includePeerToPeer = true
|
||||
params.defaultProtocolStack.transportProtocol = tcpOptions
|
||||
return params
|
||||
}
|
||||
|
||||
private func failRPC(id: String, error: Error) async {
|
||||
if let cont = self.pendingRPC.removeValue(forKey: id) {
|
||||
cont.resume(throwing: error)
|
||||
}
|
||||
}
|
||||
|
||||
private func timeoutRPC(id: String) async {
|
||||
if let cont = self.pendingRPC.removeValue(forKey: id) {
|
||||
cont.resume(throwing: TimeoutError(message: "request timed out"))
|
||||
}
|
||||
}
|
||||
|
||||
private func removeServerEventSubscriber(_ id: UUID) {
|
||||
self.serverEventSubscribers[id] = nil
|
||||
}
|
||||
|
||||
private func broadcastServerEvent(_ evt: BridgeEventFrame) {
|
||||
for (_, cont) in self.serverEventSubscribers {
|
||||
cont.yield(evt)
|
||||
}
|
||||
}
|
||||
|
||||
private func send(_ obj: some Encodable) async throws {
|
||||
guard let connection = self.connection else {
|
||||
throw NSError(domain: "Bridge", code: 15, userInfo: [
|
||||
NSLocalizedDescriptionKey: "not connected",
|
||||
])
|
||||
}
|
||||
let data = try self.encoder.encode(obj)
|
||||
var line = Data()
|
||||
line.append(data)
|
||||
line.append(0x0A)
|
||||
try await withCheckedThrowingContinuation(isolation: self) { (cont: CheckedContinuation<Void, Error>) in
|
||||
connection.send(content: line, completion: .contentProcessed { err in
|
||||
if let err { cont.resume(throwing: err) } else { cont.resume(returning: ()) }
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
private func receiveLine() async throws -> String? {
|
||||
while true {
|
||||
if let idx = self.buffer.firstIndex(of: 0x0A) {
|
||||
let line = self.buffer.prefix(upTo: idx)
|
||||
self.buffer.removeSubrange(...idx)
|
||||
return String(data: line, encoding: .utf8)
|
||||
}
|
||||
let chunk = try await self.receiveChunk()
|
||||
if chunk.isEmpty { return nil }
|
||||
self.buffer.append(chunk)
|
||||
}
|
||||
}
|
||||
|
||||
private func receiveChunk() async throws -> Data {
|
||||
guard let connection else { return Data() }
|
||||
return try await withCheckedThrowingContinuation(isolation: self) { (cont: CheckedContinuation<Data, Error>) in
|
||||
connection.receive(minimumIncompleteLength: 1, maximumLength: 64 * 1024) { data, _, isComplete, error in
|
||||
if let error {
|
||||
cont.resume(throwing: error)
|
||||
return
|
||||
}
|
||||
if isComplete {
|
||||
cont.resume(returning: Data())
|
||||
return
|
||||
}
|
||||
cont.resume(returning: data ?? Data())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private func startPingLoop() {
|
||||
self.pingTask?.cancel()
|
||||
self.lastPongAt = self.clock.now
|
||||
self.logger.debug("node bridge ping loop started")
|
||||
self.pingTask = Task { [weak self] in
|
||||
guard let self else { return }
|
||||
await self.runPingLoop()
|
||||
}
|
||||
}
|
||||
|
||||
private func runPingLoop() async {
|
||||
let interval: Duration = .seconds(15)
|
||||
let timeout: Duration = .seconds(45)
|
||||
|
||||
while !Task.isCancelled {
|
||||
try? await Task.sleep(for: interval)
|
||||
|
||||
guard self.connection != nil else { return }
|
||||
|
||||
if let last = self.lastPongAt {
|
||||
let now = self.clock.now
|
||||
if now > last.advanced(by: timeout) {
|
||||
let age = last.duration(to: now)
|
||||
let ageDescription = String(describing: age)
|
||||
let message =
|
||||
"Node bridge heartbeat timed out; disconnecting " +
|
||||
"(age: \(ageDescription, privacy: .public))."
|
||||
self.logger.warning(message)
|
||||
await self.handleDisconnect(reason: "ping timeout")
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
let id = UUID().uuidString
|
||||
do {
|
||||
try await self.send(BridgePing(type: "ping", id: id))
|
||||
} catch {
|
||||
let errorDescription = String(describing: error)
|
||||
let message =
|
||||
"Node bridge ping send failed; disconnecting " +
|
||||
"(error: \(errorDescription, privacy: .public))."
|
||||
self.logger.warning(message)
|
||||
await self.handleDisconnect(reason: "ping send failed")
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private func notePong(_ pong: BridgePong) {
|
||||
_ = pong
|
||||
self.lastPongAt = self.clock.now
|
||||
}
|
||||
|
||||
private func handleConnectionState(_ state: NWConnection.State) async {
|
||||
switch state {
|
||||
case let .failed(error):
|
||||
let errorDescription = String(describing: error)
|
||||
let message =
|
||||
"Node bridge connection failed; disconnecting " +
|
||||
"(error: \(errorDescription, privacy: .public))."
|
||||
self.logger.warning(message)
|
||||
await self.handleDisconnect(reason: "connection failed")
|
||||
case .cancelled:
|
||||
self.logger.warning("Node bridge connection cancelled; disconnecting.")
|
||||
await self.handleDisconnect(reason: "connection cancelled")
|
||||
default:
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
private func handleDisconnect(reason: String) async {
|
||||
self.logger.info("node bridge disconnect reason=\(reason, privacy: .public)")
|
||||
if let handler = self.disconnectHandler {
|
||||
await handler(reason)
|
||||
}
|
||||
await self.disconnect()
|
||||
}
|
||||
|
||||
private func logInvokeSendFailure(_ error: Error) {
|
||||
self.logger.error(
|
||||
"node bridge invoke response send failed: \(error.localizedDescription, privacy: .public)")
|
||||
}
|
||||
|
||||
private func sendInvokeResponse(_ response: BridgeInvokeResponse, taskID: UUID) async {
|
||||
defer { self.invokeTasks[taskID] = nil }
|
||||
if Task.isCancelled { return }
|
||||
do {
|
||||
try await self.send(response)
|
||||
} catch {
|
||||
self.logInvokeSendFailure(error)
|
||||
}
|
||||
}
|
||||
|
||||
private func cancelInvokeTasks() {
|
||||
for task in self.invokeTasks.values {
|
||||
task.cancel()
|
||||
}
|
||||
self.invokeTasks.removeAll()
|
||||
}
|
||||
|
||||
private static func makeStateStream(
|
||||
for connection: NWConnection) -> AsyncStream<NWConnection.State>
|
||||
{
|
||||
AsyncStream { continuation in
|
||||
connection.stateUpdateHandler = { state in
|
||||
continuation.yield(state)
|
||||
switch state {
|
||||
case .ready, .failed, .cancelled:
|
||||
continuation.finish()
|
||||
default:
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static func waitForReady(
|
||||
_ stream: AsyncStream<NWConnection.State>,
|
||||
timeoutSeconds: Double) async throws
|
||||
{
|
||||
try await AsyncTimeout.withTimeout(
|
||||
seconds: timeoutSeconds,
|
||||
onTimeout: {
|
||||
TimeoutError(message: "operation timed out")
|
||||
},
|
||||
operation: {
|
||||
for await state in stream {
|
||||
switch state {
|
||||
case .ready:
|
||||
return
|
||||
case let .failed(err):
|
||||
throw err
|
||||
case .cancelled:
|
||||
throw NSError(domain: "Bridge", code: 20, userInfo: [
|
||||
NSLocalizedDescriptionKey: "Connection cancelled",
|
||||
])
|
||||
default:
|
||||
continue
|
||||
}
|
||||
}
|
||||
throw NSError(domain: "Bridge", code: 21, userInfo: [
|
||||
NSLocalizedDescriptionKey: "Connection closed",
|
||||
])
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -1,74 +0,0 @@
|
||||
import CryptoKit
|
||||
import Foundation
|
||||
import Network
|
||||
import Security
|
||||
|
||||
struct MacNodeBridgeTLSParams: Sendable {
|
||||
let required: Bool
|
||||
let expectedFingerprint: String?
|
||||
let allowTOFU: Bool
|
||||
let storeKey: String?
|
||||
}
|
||||
|
||||
enum MacNodeBridgeTLSStore {
|
||||
private static let suiteName = "com.clawdbot.shared"
|
||||
private static let keyPrefix = "mac.node.bridge.tls."
|
||||
|
||||
private static var defaults: UserDefaults {
|
||||
UserDefaults(suiteName: suiteName) ?? .standard
|
||||
}
|
||||
|
||||
static func loadFingerprint(stableID: String) -> String? {
|
||||
let key = self.keyPrefix + stableID
|
||||
let raw = self.defaults.string(forKey: key)?.trimmingCharacters(in: .whitespacesAndNewlines)
|
||||
return raw?.isEmpty == false ? raw : nil
|
||||
}
|
||||
|
||||
static func saveFingerprint(_ value: String, stableID: String) {
|
||||
let key = self.keyPrefix + stableID
|
||||
self.defaults.set(value, forKey: key)
|
||||
}
|
||||
}
|
||||
|
||||
func makeMacNodeTLSOptions(_ params: MacNodeBridgeTLSParams?) -> NWProtocolTLS.Options? {
|
||||
guard let params else { return nil }
|
||||
let options = NWProtocolTLS.Options()
|
||||
let expected = params.expectedFingerprint.map(normalizeMacNodeFingerprint)
|
||||
let allowTOFU = params.allowTOFU
|
||||
let storeKey = params.storeKey
|
||||
|
||||
sec_protocol_options_set_verify_block(
|
||||
options.securityProtocolOptions,
|
||||
{ _, trust, complete in
|
||||
let trustRef = sec_trust_copy_ref(trust).takeRetainedValue()
|
||||
if let chain = SecTrustCopyCertificateChain(trustRef) as? [SecCertificate],
|
||||
let cert = chain.first
|
||||
{
|
||||
let data = SecCertificateCopyData(cert) as Data
|
||||
let fingerprint = sha256Hex(data)
|
||||
if let expected {
|
||||
complete(fingerprint == expected)
|
||||
return
|
||||
}
|
||||
if allowTOFU {
|
||||
if let storeKey { MacNodeBridgeTLSStore.saveFingerprint(fingerprint, stableID: storeKey) }
|
||||
complete(true)
|
||||
return
|
||||
}
|
||||
}
|
||||
let ok = SecTrustEvaluateWithError(trustRef, nil)
|
||||
complete(ok)
|
||||
},
|
||||
DispatchQueue(label: "com.clawdbot.macos.bridge.tls.verify"))
|
||||
|
||||
return options
|
||||
}
|
||||
|
||||
private func sha256Hex(_ data: Data) -> String {
|
||||
let digest = SHA256.hash(data: data)
|
||||
return digest.map { String(format: "%02x", $0) }.joined()
|
||||
}
|
||||
|
||||
private func normalizeMacNodeFingerprint(_ raw: String) -> String {
|
||||
raw.lowercased().filter(\.isHexDigit)
|
||||
}
|
||||
150
apps/macos/Sources/Clawdbot/NodeMode/MacNodeGatewaySession.swift
Normal file
150
apps/macos/Sources/Clawdbot/NodeMode/MacNodeGatewaySession.swift
Normal file
@@ -0,0 +1,150 @@
|
||||
import ClawdbotKit
|
||||
import ClawdbotProtocol
|
||||
import Foundation
|
||||
import OSLog
|
||||
|
||||
private struct NodeInvokeRequestPayload: Codable, Sendable {
|
||||
var id: String
|
||||
var nodeId: String
|
||||
var command: String
|
||||
var paramsJSON: String?
|
||||
var timeoutMs: Int?
|
||||
var idempotencyKey: String?
|
||||
}
|
||||
|
||||
actor MacNodeGatewaySession {
|
||||
private let logger = Logger(subsystem: "com.clawdbot", category: "node.gateway")
|
||||
private let decoder = JSONDecoder()
|
||||
private let encoder = JSONEncoder()
|
||||
private var channel: GatewayChannelActor?
|
||||
private var activeURL: URL?
|
||||
private var activeToken: String?
|
||||
private var activePassword: String?
|
||||
private var connectOptions: GatewayConnectOptions?
|
||||
private var onConnected: (@Sendable () async -> Void)?
|
||||
private var onDisconnected: (@Sendable (String) async -> Void)?
|
||||
private var onInvoke: (@Sendable (BridgeInvokeRequest) async -> BridgeInvokeResponse)?
|
||||
|
||||
func connect(
|
||||
url: URL,
|
||||
token: String?,
|
||||
password: String?,
|
||||
connectOptions: GatewayConnectOptions,
|
||||
sessionBox: WebSocketSessionBox?,
|
||||
onConnected: @escaping @Sendable () async -> Void,
|
||||
onDisconnected: @escaping @Sendable (String) async -> Void,
|
||||
onInvoke: @escaping @Sendable (BridgeInvokeRequest) async -> BridgeInvokeResponse
|
||||
) async throws {
|
||||
let shouldReconnect = self.activeURL != url ||
|
||||
self.activeToken != token ||
|
||||
self.activePassword != password ||
|
||||
self.channel == nil
|
||||
|
||||
self.connectOptions = connectOptions
|
||||
self.onConnected = onConnected
|
||||
self.onDisconnected = onDisconnected
|
||||
self.onInvoke = onInvoke
|
||||
|
||||
if shouldReconnect {
|
||||
if let existing = self.channel {
|
||||
await existing.shutdown()
|
||||
}
|
||||
let channel = GatewayChannelActor(
|
||||
url: url,
|
||||
token: token,
|
||||
password: password,
|
||||
session: sessionBox,
|
||||
pushHandler: { [weak self] push in
|
||||
await self?.handlePush(push)
|
||||
},
|
||||
connectOptions: connectOptions,
|
||||
disconnectHandler: { [weak self] reason in
|
||||
await self?.onDisconnected?(reason)
|
||||
})
|
||||
self.channel = channel
|
||||
self.activeURL = url
|
||||
self.activeToken = token
|
||||
self.activePassword = password
|
||||
}
|
||||
|
||||
guard let channel = self.channel else {
|
||||
throw NSError(domain: "Gateway", code: 0, userInfo: [
|
||||
NSLocalizedDescriptionKey: "gateway channel unavailable",
|
||||
])
|
||||
}
|
||||
|
||||
do {
|
||||
try await channel.connect()
|
||||
await onConnected()
|
||||
} catch {
|
||||
await onDisconnected(error.localizedDescription)
|
||||
throw error
|
||||
}
|
||||
}
|
||||
|
||||
func disconnect() async {
|
||||
await self.channel?.shutdown()
|
||||
self.channel = nil
|
||||
self.activeURL = nil
|
||||
self.activeToken = nil
|
||||
self.activePassword = nil
|
||||
}
|
||||
|
||||
func sendEvent(event: String, payloadJSON: String?) async {
|
||||
guard let channel = self.channel else { return }
|
||||
let params: [String: ClawdbotProtocol.AnyCodable] = [
|
||||
"event": ClawdbotProtocol.AnyCodable(event),
|
||||
"payloadJSON": ClawdbotProtocol.AnyCodable(payloadJSON ?? NSNull()),
|
||||
]
|
||||
do {
|
||||
_ = try await channel.request(method: "node.event", params: params, timeoutMs: 8000)
|
||||
} catch {
|
||||
self.logger.error("node event failed: \(error.localizedDescription, privacy: .public)")
|
||||
}
|
||||
}
|
||||
|
||||
private func handlePush(_ push: GatewayPush) async {
|
||||
switch push {
|
||||
case let .event(evt):
|
||||
await self.handleEvent(evt)
|
||||
default:
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
private func handleEvent(_ evt: EventFrame) async {
|
||||
guard evt.event == "node.invoke.request" else { return }
|
||||
guard let payload = evt.payload else { return }
|
||||
do {
|
||||
let data = try self.encoder.encode(payload)
|
||||
let request = try self.decoder.decode(NodeInvokeRequestPayload.self, from: data)
|
||||
guard let onInvoke else { return }
|
||||
let req = BridgeInvokeRequest(id: request.id, command: request.command, paramsJSON: request.paramsJSON)
|
||||
let response = await onInvoke(req)
|
||||
await self.sendInvokeResult(request: request, response: response)
|
||||
} catch {
|
||||
self.logger.error("node invoke decode failed: \(error.localizedDescription, privacy: .public)")
|
||||
}
|
||||
}
|
||||
|
||||
private func sendInvokeResult(request: NodeInvokeRequestPayload, response: BridgeInvokeResponse) async {
|
||||
guard let channel = self.channel else { return }
|
||||
var params: [String: ClawdbotProtocol.AnyCodable] = [
|
||||
"id": ClawdbotProtocol.AnyCodable(request.id),
|
||||
"nodeId": ClawdbotProtocol.AnyCodable(request.nodeId),
|
||||
"ok": ClawdbotProtocol.AnyCodable(response.ok),
|
||||
"payloadJSON": ClawdbotProtocol.AnyCodable(response.payloadJSON ?? NSNull()),
|
||||
]
|
||||
if let error = response.error {
|
||||
params["error"] = ClawdbotProtocol.AnyCodable([
|
||||
"code": ClawdbotProtocol.AnyCodable(error.code.rawValue),
|
||||
"message": ClawdbotProtocol.AnyCodable(error.message),
|
||||
])
|
||||
}
|
||||
do {
|
||||
_ = try await channel.request(method: "node.invoke.result", params: params, timeoutMs: 15000)
|
||||
} catch {
|
||||
self.logger.error("node invoke result failed: \(error.localizedDescription, privacy: .public)")
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,15 +1,7 @@
|
||||
import ClawdbotDiscovery
|
||||
import ClawdbotKit
|
||||
import Foundation
|
||||
import Network
|
||||
import OSLog
|
||||
|
||||
private struct BridgeTarget {
|
||||
let endpoint: NWEndpoint
|
||||
let stableID: String
|
||||
let tls: MacNodeBridgeTLSParams?
|
||||
}
|
||||
|
||||
@MainActor
|
||||
final class MacNodeModeCoordinator {
|
||||
static let shared = MacNodeModeCoordinator()
|
||||
@@ -17,8 +9,7 @@ final class MacNodeModeCoordinator {
|
||||
private let logger = Logger(subsystem: "com.clawdbot", category: "mac-node")
|
||||
private var task: Task<Void, Never>?
|
||||
private let runtime = MacNodeRuntime()
|
||||
private let session = MacNodeBridgeSession()
|
||||
private var tunnel: RemotePortTunnel?
|
||||
private let session = MacNodeGatewaySession()
|
||||
|
||||
func start() {
|
||||
guard self.task == nil else { return }
|
||||
@@ -31,12 +22,10 @@ final class MacNodeModeCoordinator {
|
||||
self.task?.cancel()
|
||||
self.task = nil
|
||||
Task { await self.session.disconnect() }
|
||||
self.tunnel?.terminate()
|
||||
self.tunnel = nil
|
||||
}
|
||||
|
||||
func setPreferredBridgeStableID(_ stableID: String?) {
|
||||
BridgeDiscoveryPreferences.setPreferredStableID(stableID)
|
||||
func setPreferredGatewayStableID(_ stableID: String?) {
|
||||
GatewayDiscoveryPreferences.setPreferredStableID(stableID)
|
||||
Task { await self.session.disconnect() }
|
||||
}
|
||||
|
||||
@@ -44,6 +33,7 @@ final class MacNodeModeCoordinator {
|
||||
var retryDelay: UInt64 = 1_000_000_000
|
||||
var lastCameraEnabled: Bool?
|
||||
let defaults = UserDefaults.standard
|
||||
|
||||
while !Task.isCancelled {
|
||||
if await MainActor.run(body: { AppStateStore.shared.isPaused }) {
|
||||
try? await Task.sleep(nanoseconds: 1_000_000_000)
|
||||
@@ -59,34 +49,42 @@ final class MacNodeModeCoordinator {
|
||||
try? await Task.sleep(nanoseconds: 200_000_000)
|
||||
}
|
||||
|
||||
guard let target = await self.resolveBridgeEndpoint(timeoutSeconds: 5) else {
|
||||
try? await Task.sleep(nanoseconds: min(retryDelay, 5_000_000_000))
|
||||
retryDelay = min(retryDelay * 2, 10_000_000_000)
|
||||
continue
|
||||
}
|
||||
|
||||
retryDelay = 1_000_000_000
|
||||
do {
|
||||
let hello = await self.makeHello()
|
||||
self.logger.info(
|
||||
"mac node bridge connecting endpoint=\(target.endpoint, privacy: .public)")
|
||||
let config = try await GatewayEndpointStore.shared.requireConfig()
|
||||
let caps = self.currentCaps()
|
||||
let commands = self.currentCommands(caps: caps)
|
||||
let permissions = await self.currentPermissions()
|
||||
let connectOptions = GatewayConnectOptions(
|
||||
role: "node",
|
||||
scopes: [],
|
||||
caps: caps,
|
||||
commands: commands,
|
||||
permissions: permissions,
|
||||
clientId: "clawdbot-macos",
|
||||
clientMode: "node",
|
||||
clientDisplayName: InstanceIdentity.displayName)
|
||||
let sessionBox = self.buildSessionBox(url: config.url)
|
||||
|
||||
try await self.session.connect(
|
||||
endpoint: target.endpoint,
|
||||
hello: hello,
|
||||
tls: target.tls,
|
||||
onConnected: { [weak self] serverName, mainSessionKey in
|
||||
self?.logger.info("mac node connected to \(serverName, privacy: .public)")
|
||||
if let mainSessionKey {
|
||||
await self?.runtime.updateMainSessionKey(mainSessionKey)
|
||||
}
|
||||
await self?.runtime.setEventSender { [weak self] event, payload in
|
||||
url: config.url,
|
||||
token: config.token,
|
||||
password: config.password,
|
||||
connectOptions: connectOptions,
|
||||
sessionBox: sessionBox,
|
||||
onConnected: { [weak self] in
|
||||
guard let self else { return }
|
||||
self.logger.info("mac node connected to gateway")
|
||||
let mainSessionKey = await GatewayConnection.shared.mainSessionKey()
|
||||
await self.runtime.updateMainSessionKey(mainSessionKey)
|
||||
await self.runtime.setEventSender { [weak self] event, payload in
|
||||
guard let self else { return }
|
||||
try? await self.session.sendEvent(event: event, payloadJSON: payload)
|
||||
await self.session.sendEvent(event: event, payloadJSON: payload)
|
||||
}
|
||||
},
|
||||
onDisconnected: { [weak self] reason in
|
||||
await self?.runtime.setEventSender(nil)
|
||||
await MacNodeModeCoordinator.handleBridgeDisconnect(reason: reason)
|
||||
guard let self else { return }
|
||||
await self.runtime.setEventSender(nil)
|
||||
self.logger.error("mac node disconnected: \(reason, privacy: .public)")
|
||||
},
|
||||
onInvoke: { [weak self] req in
|
||||
guard let self else {
|
||||
@@ -97,43 +95,17 @@ final class MacNodeModeCoordinator {
|
||||
}
|
||||
return await self.runtime.handleInvoke(req)
|
||||
})
|
||||
|
||||
retryDelay = 1_000_000_000
|
||||
try? await Task.sleep(nanoseconds: 1_000_000_000)
|
||||
} catch {
|
||||
if await self.tryPair(target: target, error: error) {
|
||||
continue
|
||||
}
|
||||
self.logger.error(
|
||||
"mac node bridge connect failed: \(error.localizedDescription, privacy: .public)")
|
||||
try? await Task.sleep(nanoseconds: min(retryDelay, 5_000_000_000))
|
||||
self.logger.error("mac node gateway connect failed: \(error.localizedDescription, privacy: .public)")
|
||||
try? await Task.sleep(nanoseconds: min(retryDelay, 10_000_000_000))
|
||||
retryDelay = min(retryDelay * 2, 10_000_000_000)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private func makeHello() async -> BridgeHello {
|
||||
let token = MacNodeTokenStore.loadToken()
|
||||
let caps = self.currentCaps()
|
||||
let commands = self.currentCommands(caps: caps)
|
||||
let permissions = await self.currentPermissions()
|
||||
let uiVersion = Bundle.main.object(forInfoDictionaryKey: "CFBundleShortVersionString") as? String
|
||||
let liveGatewayVersion = await GatewayConnection.shared.cachedGatewayVersion()
|
||||
let fallbackGatewayVersion = GatewayProcessManager.shared.environmentStatus.gatewayVersion
|
||||
let coreVersion = (liveGatewayVersion ?? fallbackGatewayVersion)?
|
||||
.trimmingCharacters(in: .whitespacesAndNewlines)
|
||||
return BridgeHello(
|
||||
nodeId: Self.nodeId(),
|
||||
displayName: InstanceIdentity.displayName,
|
||||
token: token,
|
||||
platform: "macos",
|
||||
version: uiVersion,
|
||||
coreVersion: coreVersion?.isEmpty == false ? coreVersion : nil,
|
||||
uiVersion: uiVersion,
|
||||
deviceFamily: "Mac",
|
||||
modelIdentifier: InstanceIdentity.modelIdentifier,
|
||||
caps: caps,
|
||||
commands: commands,
|
||||
permissions: permissions)
|
||||
}
|
||||
|
||||
private func currentCaps() -> [String] {
|
||||
var caps: [String] = [ClawdbotCapability.canvas.rawValue, ClawdbotCapability.screen.rawValue]
|
||||
if UserDefaults.standard.object(forKey: cameraEnabledKey) as? Bool ?? false {
|
||||
@@ -182,370 +154,18 @@ final class MacNodeModeCoordinator {
|
||||
return commands
|
||||
}
|
||||
|
||||
private func tryPair(target: BridgeTarget, error: Error) async -> Bool {
|
||||
let text = error.localizedDescription.uppercased()
|
||||
guard text.contains("NOT_PAIRED") || text.contains("UNAUTHORIZED") else { return false }
|
||||
|
||||
do {
|
||||
let shouldSilent = await MainActor.run {
|
||||
AppStateStore.shared.connectionMode == .remote
|
||||
}
|
||||
let hello = await self.makeHello()
|
||||
let token = try await MacNodeBridgePairingClient().pairAndHello(
|
||||
endpoint: target.endpoint,
|
||||
hello: hello,
|
||||
silent: shouldSilent,
|
||||
tls: target.tls,
|
||||
onStatus: { [weak self] status in
|
||||
self?.logger.info("mac node pairing: \(status, privacy: .public)")
|
||||
})
|
||||
if !token.isEmpty {
|
||||
MacNodeTokenStore.saveToken(token)
|
||||
}
|
||||
return true
|
||||
} catch {
|
||||
self.logger.error("mac node pairing failed: \(error.localizedDescription, privacy: .public)")
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
private static func nodeId() -> String {
|
||||
"mac-\(InstanceIdentity.instanceId)"
|
||||
}
|
||||
|
||||
private func resolveLoopbackBridgeEndpoint(timeoutSeconds: Double) async -> BridgeTarget? {
|
||||
guard let port = Self.loopbackBridgePort(),
|
||||
let endpointPort = NWEndpoint.Port(rawValue: port)
|
||||
else {
|
||||
return nil
|
||||
}
|
||||
let endpoint = NWEndpoint.hostPort(host: "127.0.0.1", port: endpointPort)
|
||||
let reachable = await Self.probeEndpoint(endpoint, timeoutSeconds: timeoutSeconds)
|
||||
guard reachable else { return nil }
|
||||
let stableID = BridgeEndpointID.stableID(endpoint)
|
||||
let tlsParams = Self.resolveManualTLSParams(stableID: stableID)
|
||||
return BridgeTarget(endpoint: endpoint, stableID: stableID, tls: tlsParams)
|
||||
}
|
||||
|
||||
static func loopbackBridgePort() -> UInt16? {
|
||||
if let raw = ProcessInfo.processInfo.environment["CLAWDBOT_BRIDGE_PORT"],
|
||||
let parsed = Int(raw.trimmingCharacters(in: .whitespacesAndNewlines)),
|
||||
parsed > 0,
|
||||
parsed <= Int(UInt16.max)
|
||||
{
|
||||
return UInt16(parsed)
|
||||
}
|
||||
return 18790
|
||||
}
|
||||
|
||||
static func remoteBridgePort() -> Int {
|
||||
let fallback = Int(Self.loopbackBridgePort() ?? 18790)
|
||||
let settings = CommandResolver.connectionSettings()
|
||||
let sshHost = CommandResolver.parseSSHTarget(settings.target)?.host ?? ""
|
||||
let base =
|
||||
ClawdbotConfigFile.remoteGatewayPort(matchingHost: sshHost) ??
|
||||
GatewayEnvironment.gatewayPort()
|
||||
guard base > 0 else { return fallback }
|
||||
return Self.derivePort(base: base, offset: 1, fallback: fallback)
|
||||
}
|
||||
|
||||
private static func derivePort(base: Int, offset: Int, fallback: Int) -> Int {
|
||||
let derived = base + offset
|
||||
guard derived > 0, derived <= Int(UInt16.max) else { return fallback }
|
||||
return derived
|
||||
}
|
||||
|
||||
static func probeEndpoint(_ endpoint: NWEndpoint, timeoutSeconds: Double) async -> Bool {
|
||||
let connection = NWConnection(to: endpoint, using: .tcp)
|
||||
let stream = Self.makeStateStream(for: connection)
|
||||
connection.start(queue: DispatchQueue(label: "com.clawdbot.macos.bridge-loopback-probe"))
|
||||
do {
|
||||
try await Self.waitForReady(stream, timeoutSeconds: timeoutSeconds)
|
||||
connection.cancel()
|
||||
return true
|
||||
} catch {
|
||||
connection.cancel()
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
private static func makeStateStream(
|
||||
for connection: NWConnection) -> AsyncStream<NWConnection.State>
|
||||
{
|
||||
AsyncStream { continuation in
|
||||
connection.stateUpdateHandler = { state in
|
||||
continuation.yield(state)
|
||||
switch state {
|
||||
case .ready, .failed, .cancelled:
|
||||
continuation.finish()
|
||||
default:
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static func waitForReady(
|
||||
_ stream: AsyncStream<NWConnection.State>,
|
||||
timeoutSeconds: Double) async throws
|
||||
{
|
||||
try await AsyncTimeout.withTimeout(
|
||||
seconds: timeoutSeconds,
|
||||
onTimeout: {
|
||||
NSError(domain: "Bridge", code: 22, userInfo: [
|
||||
NSLocalizedDescriptionKey: "operation timed out",
|
||||
])
|
||||
},
|
||||
operation: {
|
||||
for await state in stream {
|
||||
switch state {
|
||||
case .ready:
|
||||
return
|
||||
case let .failed(err):
|
||||
throw err
|
||||
case .cancelled:
|
||||
throw NSError(domain: "Bridge", code: 20, userInfo: [
|
||||
NSLocalizedDescriptionKey: "Connection cancelled",
|
||||
])
|
||||
default:
|
||||
continue
|
||||
}
|
||||
}
|
||||
throw NSError(domain: "Bridge", code: 21, userInfo: [
|
||||
NSLocalizedDescriptionKey: "Connection closed",
|
||||
])
|
||||
})
|
||||
}
|
||||
|
||||
private func resolveBridgeEndpoint(timeoutSeconds: Double) async -> BridgeTarget? {
|
||||
let mode = await MainActor.run(body: { AppStateStore.shared.connectionMode })
|
||||
if mode == .remote {
|
||||
do {
|
||||
if let tunnel = self.tunnel,
|
||||
tunnel.process.isRunning,
|
||||
let localPort = tunnel.localPort
|
||||
{
|
||||
let healthy = await self.bridgeTunnelHealthy(localPort: localPort, timeoutSeconds: 1.0)
|
||||
if healthy, let port = NWEndpoint.Port(rawValue: localPort) {
|
||||
self.logger.info(
|
||||
"reusing mac node bridge tunnel localPort=\(localPort, privacy: .public)")
|
||||
let endpoint = NWEndpoint.hostPort(host: "127.0.0.1", port: port)
|
||||
let stableID = BridgeEndpointID.stableID(endpoint)
|
||||
let tlsParams = Self.resolveManualTLSParams(stableID: stableID)
|
||||
return BridgeTarget(endpoint: endpoint, stableID: stableID, tls: tlsParams)
|
||||
}
|
||||
self.logger.error(
|
||||
"mac node bridge tunnel unhealthy localPort=\(localPort, privacy: .public); restarting")
|
||||
tunnel.terminate()
|
||||
self.tunnel = nil
|
||||
}
|
||||
|
||||
let remotePort = Self.remoteBridgePort()
|
||||
let preferredLocalPort = Self.loopbackBridgePort()
|
||||
if let preferredLocalPort {
|
||||
self.logger.info(
|
||||
"mac node bridge tunnel starting " +
|
||||
"preferredLocalPort=\(preferredLocalPort, privacy: .public) " +
|
||||
"remotePort=\(remotePort, privacy: .public)")
|
||||
} else {
|
||||
self.logger.info(
|
||||
"mac node bridge tunnel starting " +
|
||||
"preferredLocalPort=none " +
|
||||
"remotePort=\(remotePort, privacy: .public)")
|
||||
}
|
||||
self.tunnel = try await RemotePortTunnel.create(
|
||||
remotePort: remotePort,
|
||||
preferredLocalPort: preferredLocalPort,
|
||||
allowRemoteUrlOverride: false,
|
||||
allowRandomLocalPort: true)
|
||||
if let localPort = self.tunnel?.localPort,
|
||||
let port = NWEndpoint.Port(rawValue: localPort)
|
||||
{
|
||||
self.logger.info(
|
||||
"mac node bridge tunnel ready " +
|
||||
"localPort=\(localPort, privacy: .public) " +
|
||||
"remotePort=\(remotePort, privacy: .public)")
|
||||
let endpoint = NWEndpoint.hostPort(host: "127.0.0.1", port: port)
|
||||
let stableID = BridgeEndpointID.stableID(endpoint)
|
||||
let tlsParams = Self.resolveManualTLSParams(stableID: stableID)
|
||||
return BridgeTarget(endpoint: endpoint, stableID: stableID, tls: tlsParams)
|
||||
}
|
||||
} catch {
|
||||
self.logger.error("mac node bridge tunnel failed: \(error.localizedDescription, privacy: .public)")
|
||||
self.tunnel?.terminate()
|
||||
self.tunnel = nil
|
||||
}
|
||||
} else if let tunnel = self.tunnel {
|
||||
tunnel.terminate()
|
||||
self.tunnel = nil
|
||||
}
|
||||
if mode == .local, let target = await self.resolveLoopbackBridgeEndpoint(timeoutSeconds: 0.4) {
|
||||
return target
|
||||
}
|
||||
return await Self.discoverBridgeEndpoint(timeoutSeconds: timeoutSeconds)
|
||||
}
|
||||
|
||||
@MainActor
|
||||
private static func handleBridgeDisconnect(reason: String) async {
|
||||
guard reason.localizedCaseInsensitiveContains("ping") else { return }
|
||||
let coordinator = MacNodeModeCoordinator.shared
|
||||
coordinator.logger.error(
|
||||
"mac node bridge disconnected (\(reason, privacy: .public)); resetting tunnel")
|
||||
coordinator.tunnel?.terminate()
|
||||
coordinator.tunnel = nil
|
||||
}
|
||||
|
||||
private func bridgeTunnelHealthy(localPort: UInt16, timeoutSeconds: Double) async -> Bool {
|
||||
guard let port = NWEndpoint.Port(rawValue: localPort) else { return false }
|
||||
return await Self.probeEndpoint(.hostPort(host: "127.0.0.1", port: port), timeoutSeconds: timeoutSeconds)
|
||||
}
|
||||
|
||||
private static func discoverBridgeEndpoint(timeoutSeconds: Double) async -> BridgeTarget? {
|
||||
final class DiscoveryState: @unchecked Sendable {
|
||||
let lock = NSLock()
|
||||
var resolved = false
|
||||
var browsers: [NWBrowser] = []
|
||||
var continuation: CheckedContinuation<BridgeTarget?, Never>?
|
||||
|
||||
func finish(_ target: BridgeTarget?) {
|
||||
self.lock.lock()
|
||||
defer { lock.unlock() }
|
||||
if self.resolved { return }
|
||||
self.resolved = true
|
||||
for browser in self.browsers {
|
||||
browser.cancel()
|
||||
}
|
||||
self.continuation?.resume(returning: target)
|
||||
self.continuation = nil
|
||||
}
|
||||
}
|
||||
|
||||
return await withCheckedContinuation { cont in
|
||||
let state = DiscoveryState()
|
||||
state.continuation = cont
|
||||
|
||||
let params = NWParameters.tcp
|
||||
params.includePeerToPeer = true
|
||||
|
||||
for domain in ClawdbotBonjour.bridgeServiceDomains {
|
||||
let browser = NWBrowser(
|
||||
for: .bonjour(type: ClawdbotBonjour.bridgeServiceType, domain: domain),
|
||||
using: params)
|
||||
browser.browseResultsChangedHandler = { results, _ in
|
||||
let preferred = BridgeDiscoveryPreferences.preferredStableID()
|
||||
if let preferred,
|
||||
let match = results.first(where: {
|
||||
if case .service = $0.endpoint {
|
||||
return BridgeEndpointID.stableID($0.endpoint) == preferred
|
||||
}
|
||||
return false
|
||||
})
|
||||
{
|
||||
state.finish(Self.targetFromResult(match))
|
||||
return
|
||||
}
|
||||
|
||||
if let result = results.first(where: { if case .service = $0.endpoint { true } else { false } }) {
|
||||
state.finish(Self.targetFromResult(result))
|
||||
}
|
||||
}
|
||||
browser.stateUpdateHandler = { browserState in
|
||||
if case .failed = browserState {
|
||||
state.finish(nil)
|
||||
}
|
||||
}
|
||||
state.browsers.append(browser)
|
||||
browser.start(queue: DispatchQueue(label: "com.clawdbot.macos.bridge-discovery.\(domain)"))
|
||||
}
|
||||
|
||||
Task {
|
||||
try? await Task.sleep(nanoseconds: UInt64(timeoutSeconds * 1_000_000_000))
|
||||
state.finish(nil)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private nonisolated static func targetFromResult(_ result: NWBrowser.Result) -> BridgeTarget? {
|
||||
let endpoint = result.endpoint
|
||||
guard case .service = endpoint else { return nil }
|
||||
let stableID = BridgeEndpointID.stableID(endpoint)
|
||||
let txt = result.endpoint.txtRecord?.dictionary ?? [:]
|
||||
let tlsEnabled = Self.txtBoolValue(txt, key: "bridgeTls")
|
||||
let tlsFingerprint = Self.txtValue(txt, key: "bridgeTlsSha256")
|
||||
let tlsParams = Self.resolveDiscoveredTLSParams(
|
||||
stableID: stableID,
|
||||
tlsEnabled: tlsEnabled,
|
||||
tlsFingerprintSha256: tlsFingerprint)
|
||||
return BridgeTarget(endpoint: endpoint, stableID: stableID, tls: tlsParams)
|
||||
}
|
||||
|
||||
private nonisolated static func resolveDiscoveredTLSParams(
|
||||
stableID: String,
|
||||
tlsEnabled: Bool,
|
||||
tlsFingerprintSha256: String?) -> MacNodeBridgeTLSParams?
|
||||
{
|
||||
let stored = MacNodeBridgeTLSStore.loadFingerprint(stableID: stableID)
|
||||
|
||||
if tlsEnabled || tlsFingerprintSha256 != nil {
|
||||
return MacNodeBridgeTLSParams(
|
||||
required: true,
|
||||
expectedFingerprint: tlsFingerprintSha256 ?? stored,
|
||||
allowTOFU: stored == nil,
|
||||
storeKey: stableID)
|
||||
}
|
||||
|
||||
if let stored {
|
||||
return MacNodeBridgeTLSParams(
|
||||
required: true,
|
||||
expectedFingerprint: stored,
|
||||
allowTOFU: false,
|
||||
storeKey: stableID)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
private nonisolated static func resolveManualTLSParams(stableID: String) -> MacNodeBridgeTLSParams? {
|
||||
if let stored = MacNodeBridgeTLSStore.loadFingerprint(stableID: stableID) {
|
||||
return MacNodeBridgeTLSParams(
|
||||
required: true,
|
||||
expectedFingerprint: stored,
|
||||
allowTOFU: false,
|
||||
storeKey: stableID)
|
||||
}
|
||||
|
||||
return MacNodeBridgeTLSParams(
|
||||
required: false,
|
||||
expectedFingerprint: nil,
|
||||
allowTOFU: true,
|
||||
private func buildSessionBox(url: URL) -> WebSocketSessionBox? {
|
||||
guard url.scheme?.lowercased() == "wss" else { return nil }
|
||||
let host = url.host ?? "gateway"
|
||||
let port = url.port ?? 443
|
||||
let stableID = "\(host):\(port)"
|
||||
let stored = GatewayTLSStore.loadFingerprint(stableID: stableID)
|
||||
let params = GatewayTLSParams(
|
||||
required: true,
|
||||
expectedFingerprint: stored,
|
||||
allowTOFU: stored == nil,
|
||||
storeKey: stableID)
|
||||
}
|
||||
|
||||
private nonisolated static func txtValue(_ dict: [String: String], key: String) -> String? {
|
||||
let raw = dict[key]?.trimmingCharacters(in: .whitespacesAndNewlines) ?? ""
|
||||
return raw.isEmpty ? nil : raw
|
||||
}
|
||||
|
||||
private nonisolated static func txtBoolValue(_ dict: [String: String], key: String) -> Bool {
|
||||
guard let raw = self.txtValue(dict, key: key)?.lowercased() else { return false }
|
||||
return raw == "1" || raw == "true" || raw == "yes"
|
||||
}
|
||||
}
|
||||
|
||||
enum MacNodeTokenStore {
|
||||
private static let suiteName = "com.clawdbot.shared"
|
||||
private static let tokenKey = "mac.node.bridge.token"
|
||||
|
||||
private static var defaults: UserDefaults {
|
||||
UserDefaults(suiteName: suiteName) ?? .standard
|
||||
}
|
||||
|
||||
static func loadToken() -> String? {
|
||||
let raw = self.defaults.string(forKey: self.tokenKey)?.trimmingCharacters(in: .whitespacesAndNewlines)
|
||||
return raw?.isEmpty == false ? raw : nil
|
||||
}
|
||||
|
||||
static func saveToken(_ token: String) {
|
||||
self.defaults.set(token, forKey: self.tokenKey)
|
||||
let session = GatewayTLSPinningSession(params: params)
|
||||
return WebSocketSessionBox(session: session)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -486,46 +486,20 @@ actor MacNodeRuntime {
|
||||
return false
|
||||
}()
|
||||
|
||||
var approvedByAsk = false
|
||||
if requiresAsk {
|
||||
let decision = await ExecApprovalsPromptPresenter.prompt(
|
||||
ExecApprovalPromptRequest(
|
||||
command: displayCommand,
|
||||
cwd: params.cwd,
|
||||
let approvedByAsk = params.approved == true
|
||||
if requiresAsk && !approvedByAsk {
|
||||
await self.emitExecEvent(
|
||||
"exec.denied",
|
||||
payload: ExecEventPayload(
|
||||
sessionKey: sessionKey,
|
||||
runId: runId,
|
||||
host: "node",
|
||||
security: security.rawValue,
|
||||
ask: ask.rawValue,
|
||||
agentId: agentId,
|
||||
resolvedPath: resolution?.resolvedPath))
|
||||
|
||||
switch decision {
|
||||
case .deny:
|
||||
await self.emitExecEvent(
|
||||
"exec.denied",
|
||||
payload: ExecEventPayload(
|
||||
sessionKey: sessionKey,
|
||||
runId: runId,
|
||||
host: "node",
|
||||
command: displayCommand,
|
||||
reason: "user-denied"))
|
||||
return Self.errorResponse(
|
||||
req,
|
||||
code: .unavailable,
|
||||
message: "SYSTEM_RUN_DENIED: user denied")
|
||||
case .allowAlways:
|
||||
approvedByAsk = true
|
||||
if security == .allowlist {
|
||||
let pattern = resolution?.resolvedPath ??
|
||||
resolution?.rawExecutable ??
|
||||
command.first?.trimmingCharacters(in: .whitespacesAndNewlines) ??
|
||||
""
|
||||
if !pattern.isEmpty {
|
||||
ExecApprovalsStore.addAllowlistEntry(agentId: agentId, pattern: pattern)
|
||||
}
|
||||
}
|
||||
case .allowOnce:
|
||||
approvedByAsk = true
|
||||
}
|
||||
command: displayCommand,
|
||||
reason: "approval-required"))
|
||||
return Self.errorResponse(
|
||||
req,
|
||||
code: .unavailable,
|
||||
message: "SYSTEM_RUN_DENIED: approval required")
|
||||
}
|
||||
|
||||
if security == .allowlist && allowlistMatch == nil && !skillAllow && !approvedByAsk {
|
||||
@@ -762,7 +736,7 @@ actor MacNodeRuntime {
|
||||
|
||||
private static func decodeParams<T: Decodable>(_ type: T.Type, from json: String?) throws -> T {
|
||||
guard let json, let data = json.data(using: .utf8) else {
|
||||
throw NSError(domain: "Bridge", code: 20, userInfo: [
|
||||
throw NSError(domain: "Gateway", code: 20, userInfo: [
|
||||
NSLocalizedDescriptionKey: "INVALID_REQUEST: paramsJSON required",
|
||||
])
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user