fix(macos): stabilize bridge tunnels

This commit is contained in:
Nimrod Gutman
2026-01-10 20:36:25 +02:00
committed by Peter Steinberger
parent a6a9930a34
commit 55d2608808
5 changed files with 178 additions and 47 deletions

View File

@@ -20,6 +20,7 @@ actor MacNodeBridgeSession {
private let encoder = JSONEncoder()
private let decoder = JSONDecoder()
private let clock = ContinuousClock()
private var disconnectHandler: (@Sendable (String) async -> Void)?
private var connection: NWConnection?
private var queue: DispatchQueue?
@@ -35,10 +36,12 @@ actor MacNodeBridgeSession {
endpoint: NWEndpoint,
hello: BridgeHello,
onConnected: (@Sendable (String) async -> Void)? = nil,
onDisconnected: (@Sendable (String) async -> Void)? = nil,
onInvoke: @escaping @Sendable (BridgeInvokeRequest) async -> BridgeInvokeResponse)
async throws
{
await self.disconnect()
self.disconnectHandler = onDisconnected
self.state = .connecting
let params = NWParameters.tcp
@@ -83,6 +86,7 @@ actor MacNodeBridgeSession {
let data = line.data(using: .utf8),
let base = try? self.decoder.decode(BridgeBaseFrame.self, from: data)
else {
self.logger.error("node bridge hello failed (unexpected response)")
await self.disconnect()
throw NSError(domain: "Bridge", code: 1, userInfo: [
NSLocalizedDescriptionKey: "Unexpected bridge response",
@@ -97,53 +101,69 @@ actor MacNodeBridgeSession {
} else if base.type == "error" {
let err = try self.decoder.decode(BridgeErrorFrame.self, from: data)
self.state = .failed(message: "\(err.code): \(err.message)")
self.logger.error("node bridge hello error: \(err.code, privacy: .public)")
await self.disconnect()
throw NSError(domain: "Bridge", code: 2, userInfo: [
NSLocalizedDescriptionKey: "\(err.code): \(err.message)",
])
} else {
self.state = .failed(message: "Unexpected bridge response")
self.logger.error("node bridge hello failed (unexpected frame)")
await self.disconnect()
throw NSError(domain: "Bridge", code: 3, userInfo: [
NSLocalizedDescriptionKey: "Unexpected bridge response",
])
}
while true {
guard let next = try await self.receiveLine() else { break }
guard let nextData = next.data(using: .utf8) else { continue }
guard let nextBase = try? self.decoder.decode(BridgeBaseFrame.self, from: nextData) else { continue }
do {
while true {
guard let next = try await self.receiveLine() else { break }
guard let nextData = next.data(using: .utf8) else { continue }
guard let nextBase = try? self.decoder.decode(BridgeBaseFrame.self, from: nextData) else { continue }
switch nextBase.type {
case "res":
let res = try self.decoder.decode(BridgeRPCResponse.self, from: nextData)
if let cont = self.pendingRPC.removeValue(forKey: res.id) {
cont.resume(returning: res)
switch nextBase.type {
case "res":
let res = try self.decoder.decode(BridgeRPCResponse.self, from: nextData)
if let cont = self.pendingRPC.removeValue(forKey: res.id) {
cont.resume(returning: res)
}
case "event":
let evt = try self.decoder.decode(BridgeEventFrame.self, from: nextData)
self.broadcastServerEvent(evt)
case "ping":
let ping = try self.decoder.decode(BridgePing.self, from: nextData)
try await self.send(BridgePong(type: "pong", id: ping.id))
case "pong":
let pong = try self.decoder.decode(BridgePong.self, from: nextData)
self.notePong(pong)
case "invoke":
let req = try self.decoder.decode(BridgeInvokeRequest.self, from: nextData)
Task.detached { [weak self] in
let res = await onInvoke(req)
guard let self else { return }
do {
try await self.send(res)
} catch {
await self.logInvokeSendFailure(error)
}
}
default:
continue
}
case "event":
let evt = try self.decoder.decode(BridgeEventFrame.self, from: nextData)
self.broadcastServerEvent(evt)
case "ping":
let ping = try self.decoder.decode(BridgePing.self, from: nextData)
try await self.send(BridgePong(type: "pong", id: ping.id))
case "pong":
let pong = try self.decoder.decode(BridgePong.self, from: nextData)
self.notePong(pong)
case "invoke":
let req = try self.decoder.decode(BridgeInvokeRequest.self, from: nextData)
let res = await onInvoke(req)
try await self.send(res)
default:
continue
}
}
await self.disconnect()
await self.handleDisconnect(reason: "connection closed")
} catch {
self.logger.error(
"node bridge receive failed: \(error.localizedDescription, privacy: .public)")
await self.handleDisconnect(reason: "receive failed")
throw error
}
}
func sendEvent(event: String, payloadJSON: String?) async throws {
@@ -205,6 +225,7 @@ actor MacNodeBridgeSession {
self.pingTask?.cancel()
self.pingTask = nil
self.lastPongAt = nil
self.disconnectHandler = nil
self.connection?.cancel()
self.connection = nil
@@ -312,6 +333,7 @@ actor MacNodeBridgeSession {
private func startPingLoop() {
self.pingTask?.cancel()
self.lastPongAt = self.clock.now
self.logger.debug("node bridge ping loop started")
self.pingTask = Task { [weak self] in
guard let self else { return }
await self.runPingLoop()
@@ -336,7 +358,7 @@ actor MacNodeBridgeSession {
"Node bridge heartbeat timed out; disconnecting " +
"(age: \(ageDescription, privacy: .public))."
self.logger.warning(message)
await self.disconnect()
await self.handleDisconnect(reason: "ping timeout")
return
}
}
@@ -350,7 +372,7 @@ actor MacNodeBridgeSession {
"Node bridge ping send failed; disconnecting " +
"(error: \(errorDescription, privacy: .public))."
self.logger.warning(message)
await self.disconnect()
await self.handleDisconnect(reason: "ping send failed")
return
}
}
@@ -369,15 +391,28 @@ actor MacNodeBridgeSession {
"Node bridge connection failed; disconnecting " +
"(error: \(errorDescription, privacy: .public))."
self.logger.warning(message)
await self.disconnect()
await self.handleDisconnect(reason: "connection failed")
case .cancelled:
self.logger.warning("Node bridge connection cancelled; disconnecting.")
await self.disconnect()
await self.handleDisconnect(reason: "connection cancelled")
default:
break
}
}
private func handleDisconnect(reason: String) async {
self.logger.info("node bridge disconnect reason=\(reason, privacy: .public)")
if let handler = self.disconnectHandler {
await handler(reason)
}
await self.disconnect()
}
private func logInvokeSendFailure(_ error: Error) {
self.logger.error(
"node bridge invoke response send failed: \(error.localizedDescription, privacy: .public)")
}
private static func makeStateStream(
for connection: NWConnection) -> AsyncStream<NWConnection.State>
{

View File

@@ -61,12 +61,17 @@ final class MacNodeModeCoordinator {
retryDelay = 1_000_000_000
do {
let hello = await self.makeHello()
self.logger.info(
"mac node bridge connecting endpoint=\(endpoint, privacy: .public)")
try await self.session.connect(
endpoint: endpoint,
hello: hello,
onConnected: { [weak self] serverName in
self?.logger.info("mac node connected to \(serverName, privacy: .public)")
},
onDisconnected: { reason in
await MacNodeModeCoordinator.handleBridgeDisconnect(reason: reason)
},
onInvoke: { [weak self] req in
guard let self else {
return BridgeInvokeResponse(
@@ -80,7 +85,8 @@ final class MacNodeModeCoordinator {
if await self.tryPair(endpoint: endpoint, error: error) {
continue
}
self.logger.error("mac node bridge connect failed: \(error.localizedDescription, privacy: .public)")
self.logger.error(
"mac node bridge connect failed: \(error.localizedDescription, privacy: .public)")
try? await Task.sleep(nanoseconds: min(retryDelay, 5_000_000_000))
retryDelay = min(retryDelay * 2, 10_000_000_000)
}
@@ -285,15 +291,31 @@ final class MacNodeModeCoordinator {
let mode = await MainActor.run(body: { AppStateStore.shared.connectionMode })
if mode == .remote {
do {
if self.tunnel == nil || self.tunnel?.process.isRunning == false {
let remotePort = Self.remoteBridgePort()
self.tunnel = try await RemotePortTunnel.create(
remotePort: remotePort,
allowRemoteUrlOverride: false)
if let tunnel = self.tunnel,
tunnel.process.isRunning,
let localPort = tunnel.localPort
{
let healthy = await self.bridgeTunnelHealthy(localPort: localPort, timeoutSeconds: 1.0)
if healthy, let port = NWEndpoint.Port(rawValue: localPort) {
self.logger.info(
"reusing mac node bridge tunnel localPort=\(localPort, privacy: .public)")
return .hostPort(host: "127.0.0.1", port: port)
}
self.logger.error(
"mac node bridge tunnel unhealthy localPort=\(localPort, privacy: .public); restarting")
tunnel.terminate()
self.tunnel = nil
}
let remotePort = Self.remoteBridgePort()
self.tunnel = try await RemotePortTunnel.create(
remotePort: remotePort,
allowRemoteUrlOverride: false)
if let localPort = self.tunnel?.localPort,
let port = NWEndpoint.Port(rawValue: localPort)
{
self.logger.info(
"mac node bridge tunnel ready localPort=\(localPort, privacy: .public) remotePort=\(remotePort, privacy: .public)")
return .hostPort(host: "127.0.0.1", port: port)
}
} catch {
@@ -311,6 +333,21 @@ final class MacNodeModeCoordinator {
return await Self.discoverBridgeEndpoint(timeoutSeconds: timeoutSeconds)
}
@MainActor
private static func handleBridgeDisconnect(reason: String) async {
guard reason.localizedCaseInsensitiveContains("ping") else { return }
let coordinator = MacNodeModeCoordinator.shared
coordinator.logger.error(
"mac node bridge disconnected (\(reason, privacy: .public)); resetting tunnel")
coordinator.tunnel?.terminate()
coordinator.tunnel = nil
}
private func bridgeTunnelHealthy(localPort: UInt16, timeoutSeconds: Double) async -> Bool {
guard let port = NWEndpoint.Port(rawValue: localPort) else { return false }
return await Self.probeEndpoint(.hostPort(host: "127.0.0.1", port: port), timeoutSeconds: timeoutSeconds)
}
private static func discoverBridgeEndpoint(timeoutSeconds: Double) async -> NWEndpoint? {
final class DiscoveryState: @unchecked Sendable {
let lock = NSLock()

View File

@@ -41,7 +41,8 @@ final class RemotePortTunnel {
static func create(
remotePort: Int,
preferredLocalPort: UInt16? = nil,
allowRemoteUrlOverride: Bool = true) async throws -> RemotePortTunnel
allowRemoteUrlOverride: Bool = true,
allowRandomLocalPort: Bool = true) async throws -> RemotePortTunnel
{
let settings = CommandResolver.connectionSettings()
guard settings.mode == .remote, let parsed = CommandResolver.parseSSHTarget(settings.target) else {
@@ -51,7 +52,9 @@ final class RemotePortTunnel {
userInfo: [NSLocalizedDescriptionKey: "Remote mode is not configured"])
}
let localPort = try await Self.findPort(preferred: preferredLocalPort)
let localPort = try await Self.findPort(
preferred: preferredLocalPort,
allowRandom: allowRandomLocalPort)
let sshHost = parsed.host.trimmingCharacters(in: .whitespacesAndNewlines)
let remotePortOverride =
allowRemoteUrlOverride && remotePort == GatewayEnvironment.gatewayPort()
@@ -172,8 +175,16 @@ final class RemotePortTunnel {
return trimmed.split(separator: ".").first.map(String.init) ?? trimmed
}
private static func findPort(preferred: UInt16?) async throws -> UInt16 {
private static func findPort(preferred: UInt16?, allowRandom: Bool) async throws -> UInt16 {
if let preferred, self.portIsFree(preferred) { return preferred }
if let preferred, !allowRandom {
throw NSError(
domain: "RemotePortTunnel",
code: 5,
userInfo: [
NSLocalizedDescriptionKey: "Local port \(preferred) is unavailable",
])
}
return try await withCheckedThrowingContinuation { cont in
let queue = DispatchQueue(label: "com.clawdbot.remote.tunnel.port", qos: .utility)

View File

@@ -7,8 +7,15 @@ actor RemoteTunnelManager {
private let logger = Logger(subsystem: "com.clawdbot", category: "remote-tunnel")
private var controlTunnel: RemotePortTunnel?
private var restartInFlight = false
private var lastRestartAt: Date?
private let restartBackoffSeconds: TimeInterval = 2.0
func controlTunnelPortIfRunning() async -> UInt16? {
if self.restartInFlight {
self.logger.info("control tunnel restart in flight; skipping reuse check")
return nil
}
if let tunnel = self.controlTunnel,
tunnel.process.isRunning,
let local = tunnel.localPort
@@ -18,6 +25,7 @@ actor RemoteTunnelManager {
return local
}
self.logger.error("active SSH tunnel on port \(local, privacy: .public) is unhealthy; restarting")
await self.beginRestart()
tunnel.terminate()
self.controlTunnel = nil
}
@@ -34,6 +42,11 @@ actor RemoteTunnelManager {
"pid=\(desc.pid, privacy: .public)")
return desiredPort
}
if self.restartInFlight {
self.logger.info("control tunnel restart in flight; skip stale tunnel cleanup")
return nil
}
await self.beginRestart()
await self.cleanupStaleTunnel(desc: desc, port: desiredPort)
}
return nil
@@ -56,12 +69,15 @@ actor RemoteTunnelManager {
"identitySet=\(identitySet, privacy: .public)")
if let local = await self.controlTunnelPortIfRunning() { return local }
await self.waitForRestartBackoffIfNeeded()
let desiredPort = UInt16(GatewayEnvironment.gatewayPort())
let tunnel = try await RemotePortTunnel.create(
remotePort: GatewayEnvironment.gatewayPort(),
preferredLocalPort: desiredPort)
preferredLocalPort: desiredPort,
allowRandomLocalPort: false)
self.controlTunnel = tunnel
self.endRestart()
let resolvedPort = tunnel.localPort ?? desiredPort
self.logger.info("ssh tunnel ready localPort=\(resolvedPort, privacy: .public)")
return tunnel.localPort ?? desiredPort
@@ -83,6 +99,35 @@ actor RemoteTunnelManager {
return false
}
private func beginRestart() async {
guard !self.restartInFlight else { return }
self.restartInFlight = true
self.lastRestartAt = Date()
self.logger.info("control tunnel restart started")
Task { [weak self] in
guard let self else { return }
try? await Task.sleep(nanoseconds: UInt64(self.restartBackoffSeconds * 1_000_000_000))
await self.endRestart()
}
}
private func endRestart() {
if self.restartInFlight {
self.restartInFlight = false
self.logger.info("control tunnel restart finished")
}
}
private func waitForRestartBackoffIfNeeded() async {
guard let last = self.lastRestartAt else { return }
let elapsed = Date().timeIntervalSince(last)
let remaining = self.restartBackoffSeconds - elapsed
guard remaining > 0 else { return }
self.logger.info(
"control tunnel restart backoff \(remaining, privacy: .public)s")
try? await Task.sleep(nanoseconds: UInt64(remaining * 1_000_000_000))
}
private func cleanupStaleTunnel(desc: PortGuardian.Descriptor, port: UInt16) async {
let pid = desc.pid
self.logger.error(

View File

@@ -50,10 +50,13 @@ enum ShellExecutor {
errorMessage: "failed to start: \(error.localizedDescription)")
}
let outTask = Task { stdoutPipe.fileHandleForReading.readToEndSafely() }
let errTask = Task { stderrPipe.fileHandleForReading.readToEndSafely() }
let waitTask = Task { () -> ShellResult in
process.waitUntilExit()
let out = stdoutPipe.fileHandleForReading.readToEndSafely()
let err = stderrPipe.fileHandleForReading.readToEndSafely()
let out = await outTask.value
let err = await errTask.value
let status = Int(process.terminationStatus)
return ShellResult(
stdout: String(bytes: out, encoding: .utf8) ?? "",