From 55d2608808a35aeaa750f80d08e98297917d7c7f Mon Sep 17 00:00:00 2001 From: Nimrod Gutman Date: Sat, 10 Jan 2026 20:36:25 +0200 Subject: [PATCH] fix(macos): stabilize bridge tunnels --- .../NodeMode/MacNodeBridgeSession.swift | 105 ++++++++++++------ .../NodeMode/MacNodeModeCoordinator.swift | 49 +++++++- .../Sources/Clawdbot/RemotePortTunnel.swift | 17 ++- .../Clawdbot/RemoteTunnelManager.swift | 47 +++++++- .../Sources/Clawdbot/ShellExecutor.swift | 7 +- 5 files changed, 178 insertions(+), 47 deletions(-) diff --git a/apps/macos/Sources/Clawdbot/NodeMode/MacNodeBridgeSession.swift b/apps/macos/Sources/Clawdbot/NodeMode/MacNodeBridgeSession.swift index 93b2d53c6..7b7dfeff3 100644 --- a/apps/macos/Sources/Clawdbot/NodeMode/MacNodeBridgeSession.swift +++ b/apps/macos/Sources/Clawdbot/NodeMode/MacNodeBridgeSession.swift @@ -20,6 +20,7 @@ actor MacNodeBridgeSession { private let encoder = JSONEncoder() private let decoder = JSONDecoder() private let clock = ContinuousClock() + private var disconnectHandler: (@Sendable (String) async -> Void)? private var connection: NWConnection? private var queue: DispatchQueue? @@ -35,10 +36,12 @@ actor MacNodeBridgeSession { endpoint: NWEndpoint, hello: BridgeHello, onConnected: (@Sendable (String) async -> Void)? = nil, + onDisconnected: (@Sendable (String) async -> Void)? = nil, onInvoke: @escaping @Sendable (BridgeInvokeRequest) async -> BridgeInvokeResponse) async throws { await self.disconnect() + self.disconnectHandler = onDisconnected self.state = .connecting let params = NWParameters.tcp @@ -83,6 +86,7 @@ actor MacNodeBridgeSession { let data = line.data(using: .utf8), let base = try? self.decoder.decode(BridgeBaseFrame.self, from: data) else { + self.logger.error("node bridge hello failed (unexpected response)") await self.disconnect() throw NSError(domain: "Bridge", code: 1, userInfo: [ NSLocalizedDescriptionKey: "Unexpected bridge response", @@ -97,53 +101,69 @@ actor MacNodeBridgeSession { } else if base.type == "error" { let err = try self.decoder.decode(BridgeErrorFrame.self, from: data) self.state = .failed(message: "\(err.code): \(err.message)") + self.logger.error("node bridge hello error: \(err.code, privacy: .public)") await self.disconnect() throw NSError(domain: "Bridge", code: 2, userInfo: [ NSLocalizedDescriptionKey: "\(err.code): \(err.message)", ]) } else { self.state = .failed(message: "Unexpected bridge response") + self.logger.error("node bridge hello failed (unexpected frame)") await self.disconnect() throw NSError(domain: "Bridge", code: 3, userInfo: [ NSLocalizedDescriptionKey: "Unexpected bridge response", ]) } - while true { - guard let next = try await self.receiveLine() else { break } - guard let nextData = next.data(using: .utf8) else { continue } - guard let nextBase = try? self.decoder.decode(BridgeBaseFrame.self, from: nextData) else { continue } + do { + while true { + guard let next = try await self.receiveLine() else { break } + guard let nextData = next.data(using: .utf8) else { continue } + guard let nextBase = try? self.decoder.decode(BridgeBaseFrame.self, from: nextData) else { continue } - switch nextBase.type { - case "res": - let res = try self.decoder.decode(BridgeRPCResponse.self, from: nextData) - if let cont = self.pendingRPC.removeValue(forKey: res.id) { - cont.resume(returning: res) + switch nextBase.type { + case "res": + let res = try self.decoder.decode(BridgeRPCResponse.self, from: nextData) + if let cont = self.pendingRPC.removeValue(forKey: res.id) { + cont.resume(returning: res) + } + + case "event": + let evt = try self.decoder.decode(BridgeEventFrame.self, from: nextData) + self.broadcastServerEvent(evt) + + case "ping": + let ping = try self.decoder.decode(BridgePing.self, from: nextData) + try await self.send(BridgePong(type: "pong", id: ping.id)) + + case "pong": + let pong = try self.decoder.decode(BridgePong.self, from: nextData) + self.notePong(pong) + + case "invoke": + let req = try self.decoder.decode(BridgeInvokeRequest.self, from: nextData) + Task.detached { [weak self] in + let res = await onInvoke(req) + guard let self else { return } + do { + try await self.send(res) + } catch { + await self.logInvokeSendFailure(error) + } + } + + default: + continue } - - case "event": - let evt = try self.decoder.decode(BridgeEventFrame.self, from: nextData) - self.broadcastServerEvent(evt) - - case "ping": - let ping = try self.decoder.decode(BridgePing.self, from: nextData) - try await self.send(BridgePong(type: "pong", id: ping.id)) - - case "pong": - let pong = try self.decoder.decode(BridgePong.self, from: nextData) - self.notePong(pong) - - case "invoke": - let req = try self.decoder.decode(BridgeInvokeRequest.self, from: nextData) - let res = await onInvoke(req) - try await self.send(res) - - default: - continue } - } - await self.disconnect() + await self.handleDisconnect(reason: "connection closed") + } catch { + self.logger.error( + "node bridge receive failed: \(error.localizedDescription, privacy: .public)") + await self.handleDisconnect(reason: "receive failed") + throw error + } } func sendEvent(event: String, payloadJSON: String?) async throws { @@ -205,6 +225,7 @@ actor MacNodeBridgeSession { self.pingTask?.cancel() self.pingTask = nil self.lastPongAt = nil + self.disconnectHandler = nil self.connection?.cancel() self.connection = nil @@ -312,6 +333,7 @@ actor MacNodeBridgeSession { private func startPingLoop() { self.pingTask?.cancel() self.lastPongAt = self.clock.now + self.logger.debug("node bridge ping loop started") self.pingTask = Task { [weak self] in guard let self else { return } await self.runPingLoop() @@ -336,7 +358,7 @@ actor MacNodeBridgeSession { "Node bridge heartbeat timed out; disconnecting " + "(age: \(ageDescription, privacy: .public))." self.logger.warning(message) - await self.disconnect() + await self.handleDisconnect(reason: "ping timeout") return } } @@ -350,7 +372,7 @@ actor MacNodeBridgeSession { "Node bridge ping send failed; disconnecting " + "(error: \(errorDescription, privacy: .public))." self.logger.warning(message) - await self.disconnect() + await self.handleDisconnect(reason: "ping send failed") return } } @@ -369,15 +391,28 @@ actor MacNodeBridgeSession { "Node bridge connection failed; disconnecting " + "(error: \(errorDescription, privacy: .public))." self.logger.warning(message) - await self.disconnect() + await self.handleDisconnect(reason: "connection failed") case .cancelled: self.logger.warning("Node bridge connection cancelled; disconnecting.") - await self.disconnect() + await self.handleDisconnect(reason: "connection cancelled") default: break } } + private func handleDisconnect(reason: String) async { + self.logger.info("node bridge disconnect reason=\(reason, privacy: .public)") + if let handler = self.disconnectHandler { + await handler(reason) + } + await self.disconnect() + } + + private func logInvokeSendFailure(_ error: Error) { + self.logger.error( + "node bridge invoke response send failed: \(error.localizedDescription, privacy: .public)") + } + private static func makeStateStream( for connection: NWConnection) -> AsyncStream { diff --git a/apps/macos/Sources/Clawdbot/NodeMode/MacNodeModeCoordinator.swift b/apps/macos/Sources/Clawdbot/NodeMode/MacNodeModeCoordinator.swift index 12f268511..dd87f506e 100644 --- a/apps/macos/Sources/Clawdbot/NodeMode/MacNodeModeCoordinator.swift +++ b/apps/macos/Sources/Clawdbot/NodeMode/MacNodeModeCoordinator.swift @@ -61,12 +61,17 @@ final class MacNodeModeCoordinator { retryDelay = 1_000_000_000 do { let hello = await self.makeHello() + self.logger.info( + "mac node bridge connecting endpoint=\(endpoint, privacy: .public)") try await self.session.connect( endpoint: endpoint, hello: hello, onConnected: { [weak self] serverName in self?.logger.info("mac node connected to \(serverName, privacy: .public)") }, + onDisconnected: { reason in + await MacNodeModeCoordinator.handleBridgeDisconnect(reason: reason) + }, onInvoke: { [weak self] req in guard let self else { return BridgeInvokeResponse( @@ -80,7 +85,8 @@ final class MacNodeModeCoordinator { if await self.tryPair(endpoint: endpoint, error: error) { continue } - self.logger.error("mac node bridge connect failed: \(error.localizedDescription, privacy: .public)") + self.logger.error( + "mac node bridge connect failed: \(error.localizedDescription, privacy: .public)") try? await Task.sleep(nanoseconds: min(retryDelay, 5_000_000_000)) retryDelay = min(retryDelay * 2, 10_000_000_000) } @@ -285,15 +291,31 @@ final class MacNodeModeCoordinator { let mode = await MainActor.run(body: { AppStateStore.shared.connectionMode }) if mode == .remote { do { - if self.tunnel == nil || self.tunnel?.process.isRunning == false { - let remotePort = Self.remoteBridgePort() - self.tunnel = try await RemotePortTunnel.create( - remotePort: remotePort, - allowRemoteUrlOverride: false) + if let tunnel = self.tunnel, + tunnel.process.isRunning, + let localPort = tunnel.localPort + { + let healthy = await self.bridgeTunnelHealthy(localPort: localPort, timeoutSeconds: 1.0) + if healthy, let port = NWEndpoint.Port(rawValue: localPort) { + self.logger.info( + "reusing mac node bridge tunnel localPort=\(localPort, privacy: .public)") + return .hostPort(host: "127.0.0.1", port: port) + } + self.logger.error( + "mac node bridge tunnel unhealthy localPort=\(localPort, privacy: .public); restarting") + tunnel.terminate() + self.tunnel = nil } + + let remotePort = Self.remoteBridgePort() + self.tunnel = try await RemotePortTunnel.create( + remotePort: remotePort, + allowRemoteUrlOverride: false) if let localPort = self.tunnel?.localPort, let port = NWEndpoint.Port(rawValue: localPort) { + self.logger.info( + "mac node bridge tunnel ready localPort=\(localPort, privacy: .public) remotePort=\(remotePort, privacy: .public)") return .hostPort(host: "127.0.0.1", port: port) } } catch { @@ -311,6 +333,21 @@ final class MacNodeModeCoordinator { return await Self.discoverBridgeEndpoint(timeoutSeconds: timeoutSeconds) } + @MainActor + private static func handleBridgeDisconnect(reason: String) async { + guard reason.localizedCaseInsensitiveContains("ping") else { return } + let coordinator = MacNodeModeCoordinator.shared + coordinator.logger.error( + "mac node bridge disconnected (\(reason, privacy: .public)); resetting tunnel") + coordinator.tunnel?.terminate() + coordinator.tunnel = nil + } + + private func bridgeTunnelHealthy(localPort: UInt16, timeoutSeconds: Double) async -> Bool { + guard let port = NWEndpoint.Port(rawValue: localPort) else { return false } + return await Self.probeEndpoint(.hostPort(host: "127.0.0.1", port: port), timeoutSeconds: timeoutSeconds) + } + private static func discoverBridgeEndpoint(timeoutSeconds: Double) async -> NWEndpoint? { final class DiscoveryState: @unchecked Sendable { let lock = NSLock() diff --git a/apps/macos/Sources/Clawdbot/RemotePortTunnel.swift b/apps/macos/Sources/Clawdbot/RemotePortTunnel.swift index 4b8bcbe83..ccbeb6e8d 100644 --- a/apps/macos/Sources/Clawdbot/RemotePortTunnel.swift +++ b/apps/macos/Sources/Clawdbot/RemotePortTunnel.swift @@ -41,7 +41,8 @@ final class RemotePortTunnel { static func create( remotePort: Int, preferredLocalPort: UInt16? = nil, - allowRemoteUrlOverride: Bool = true) async throws -> RemotePortTunnel + allowRemoteUrlOverride: Bool = true, + allowRandomLocalPort: Bool = true) async throws -> RemotePortTunnel { let settings = CommandResolver.connectionSettings() guard settings.mode == .remote, let parsed = CommandResolver.parseSSHTarget(settings.target) else { @@ -51,7 +52,9 @@ final class RemotePortTunnel { userInfo: [NSLocalizedDescriptionKey: "Remote mode is not configured"]) } - let localPort = try await Self.findPort(preferred: preferredLocalPort) + let localPort = try await Self.findPort( + preferred: preferredLocalPort, + allowRandom: allowRandomLocalPort) let sshHost = parsed.host.trimmingCharacters(in: .whitespacesAndNewlines) let remotePortOverride = allowRemoteUrlOverride && remotePort == GatewayEnvironment.gatewayPort() @@ -172,8 +175,16 @@ final class RemotePortTunnel { return trimmed.split(separator: ".").first.map(String.init) ?? trimmed } - private static func findPort(preferred: UInt16?) async throws -> UInt16 { + private static func findPort(preferred: UInt16?, allowRandom: Bool) async throws -> UInt16 { if let preferred, self.portIsFree(preferred) { return preferred } + if let preferred, !allowRandom { + throw NSError( + domain: "RemotePortTunnel", + code: 5, + userInfo: [ + NSLocalizedDescriptionKey: "Local port \(preferred) is unavailable", + ]) + } return try await withCheckedThrowingContinuation { cont in let queue = DispatchQueue(label: "com.clawdbot.remote.tunnel.port", qos: .utility) diff --git a/apps/macos/Sources/Clawdbot/RemoteTunnelManager.swift b/apps/macos/Sources/Clawdbot/RemoteTunnelManager.swift index 9330e9c27..5e42fbd05 100644 --- a/apps/macos/Sources/Clawdbot/RemoteTunnelManager.swift +++ b/apps/macos/Sources/Clawdbot/RemoteTunnelManager.swift @@ -7,8 +7,15 @@ actor RemoteTunnelManager { private let logger = Logger(subsystem: "com.clawdbot", category: "remote-tunnel") private var controlTunnel: RemotePortTunnel? + private var restartInFlight = false + private var lastRestartAt: Date? + private let restartBackoffSeconds: TimeInterval = 2.0 func controlTunnelPortIfRunning() async -> UInt16? { + if self.restartInFlight { + self.logger.info("control tunnel restart in flight; skipping reuse check") + return nil + } if let tunnel = self.controlTunnel, tunnel.process.isRunning, let local = tunnel.localPort @@ -18,6 +25,7 @@ actor RemoteTunnelManager { return local } self.logger.error("active SSH tunnel on port \(local, privacy: .public) is unhealthy; restarting") + await self.beginRestart() tunnel.terminate() self.controlTunnel = nil } @@ -34,6 +42,11 @@ actor RemoteTunnelManager { "pid=\(desc.pid, privacy: .public)") return desiredPort } + if self.restartInFlight { + self.logger.info("control tunnel restart in flight; skip stale tunnel cleanup") + return nil + } + await self.beginRestart() await self.cleanupStaleTunnel(desc: desc, port: desiredPort) } return nil @@ -56,12 +69,15 @@ actor RemoteTunnelManager { "identitySet=\(identitySet, privacy: .public)") if let local = await self.controlTunnelPortIfRunning() { return local } + await self.waitForRestartBackoffIfNeeded() let desiredPort = UInt16(GatewayEnvironment.gatewayPort()) let tunnel = try await RemotePortTunnel.create( remotePort: GatewayEnvironment.gatewayPort(), - preferredLocalPort: desiredPort) + preferredLocalPort: desiredPort, + allowRandomLocalPort: false) self.controlTunnel = tunnel + self.endRestart() let resolvedPort = tunnel.localPort ?? desiredPort self.logger.info("ssh tunnel ready localPort=\(resolvedPort, privacy: .public)") return tunnel.localPort ?? desiredPort @@ -83,6 +99,35 @@ actor RemoteTunnelManager { return false } + private func beginRestart() async { + guard !self.restartInFlight else { return } + self.restartInFlight = true + self.lastRestartAt = Date() + self.logger.info("control tunnel restart started") + Task { [weak self] in + guard let self else { return } + try? await Task.sleep(nanoseconds: UInt64(self.restartBackoffSeconds * 1_000_000_000)) + await self.endRestart() + } + } + + private func endRestart() { + if self.restartInFlight { + self.restartInFlight = false + self.logger.info("control tunnel restart finished") + } + } + + private func waitForRestartBackoffIfNeeded() async { + guard let last = self.lastRestartAt else { return } + let elapsed = Date().timeIntervalSince(last) + let remaining = self.restartBackoffSeconds - elapsed + guard remaining > 0 else { return } + self.logger.info( + "control tunnel restart backoff \(remaining, privacy: .public)s") + try? await Task.sleep(nanoseconds: UInt64(remaining * 1_000_000_000)) + } + private func cleanupStaleTunnel(desc: PortGuardian.Descriptor, port: UInt16) async { let pid = desc.pid self.logger.error( diff --git a/apps/macos/Sources/Clawdbot/ShellExecutor.swift b/apps/macos/Sources/Clawdbot/ShellExecutor.swift index 77bc011c1..faad2eb63 100644 --- a/apps/macos/Sources/Clawdbot/ShellExecutor.swift +++ b/apps/macos/Sources/Clawdbot/ShellExecutor.swift @@ -50,10 +50,13 @@ enum ShellExecutor { errorMessage: "failed to start: \(error.localizedDescription)") } + let outTask = Task { stdoutPipe.fileHandleForReading.readToEndSafely() } + let errTask = Task { stderrPipe.fileHandleForReading.readToEndSafely() } + let waitTask = Task { () -> ShellResult in process.waitUntilExit() - let out = stdoutPipe.fileHandleForReading.readToEndSafely() - let err = stderrPipe.fileHandleForReading.readToEndSafely() + let out = await outTask.value + let err = await errTask.value let status = Int(process.terminationStatus) return ShellResult( stdout: String(bytes: out, encoding: .utf8) ?? "",