fix(mac): probe loopback bridge
This commit is contained in:
@@ -172,6 +172,102 @@ final class MacNodeModeCoordinator {
|
|||||||
"mac-\(InstanceIdentity.instanceId)"
|
"mac-\(InstanceIdentity.instanceId)"
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private func resolveLoopbackBridgeEndpoint(timeoutSeconds: Double) async -> NWEndpoint? {
|
||||||
|
guard let port = Self.loopbackBridgePort(),
|
||||||
|
let endpointPort = NWEndpoint.Port(rawValue: port)
|
||||||
|
else {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
let endpoint = NWEndpoint.hostPort(host: "127.0.0.1", port: endpointPort)
|
||||||
|
let reachable = await Self.probeEndpoint(endpoint, timeoutSeconds: timeoutSeconds)
|
||||||
|
return reachable ? endpoint : nil
|
||||||
|
}
|
||||||
|
|
||||||
|
static func loopbackBridgePort() -> UInt16? {
|
||||||
|
if let raw = ProcessInfo.processInfo.environment["CLAWDIS_BRIDGE_PORT"],
|
||||||
|
let parsed = Int(raw.trimmingCharacters(in: .whitespacesAndNewlines)),
|
||||||
|
parsed > 0,
|
||||||
|
parsed <= Int(UInt16.max)
|
||||||
|
{
|
||||||
|
return UInt16(parsed)
|
||||||
|
}
|
||||||
|
return 18790
|
||||||
|
}
|
||||||
|
|
||||||
|
static func probeEndpoint(_ endpoint: NWEndpoint, timeoutSeconds: Double) async -> Bool {
|
||||||
|
let connection = NWConnection(to: endpoint, using: .tcp)
|
||||||
|
let stream = Self.makeStateStream(for: connection)
|
||||||
|
connection.start(queue: DispatchQueue(label: "com.steipete.clawdis.macos.bridge-loopback-probe"))
|
||||||
|
do {
|
||||||
|
try await Self.waitForReady(stream, timeoutSeconds: timeoutSeconds)
|
||||||
|
connection.cancel()
|
||||||
|
return true
|
||||||
|
} catch {
|
||||||
|
connection.cancel()
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static func makeStateStream(
|
||||||
|
for connection: NWConnection) -> AsyncStream<NWConnection.State>
|
||||||
|
{
|
||||||
|
AsyncStream { continuation in
|
||||||
|
connection.stateUpdateHandler = { state in
|
||||||
|
continuation.yield(state)
|
||||||
|
switch state {
|
||||||
|
case .ready, .failed, .cancelled:
|
||||||
|
continuation.finish()
|
||||||
|
default:
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static func waitForReady(
|
||||||
|
_ stream: AsyncStream<NWConnection.State>,
|
||||||
|
timeoutSeconds: Double) async throws
|
||||||
|
{
|
||||||
|
try await self.withTimeout(seconds: timeoutSeconds) {
|
||||||
|
for await state in stream {
|
||||||
|
switch state {
|
||||||
|
case .ready:
|
||||||
|
return
|
||||||
|
case let .failed(err):
|
||||||
|
throw err
|
||||||
|
case .cancelled:
|
||||||
|
throw NSError(domain: "Bridge", code: 20, userInfo: [
|
||||||
|
NSLocalizedDescriptionKey: "Connection cancelled",
|
||||||
|
])
|
||||||
|
default:
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
}
|
||||||
|
throw NSError(domain: "Bridge", code: 21, userInfo: [
|
||||||
|
NSLocalizedDescriptionKey: "Connection closed",
|
||||||
|
])
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static func withTimeout<T: Sendable>(
|
||||||
|
seconds: Double,
|
||||||
|
operation: @escaping @Sendable () async throws -> T) async throws -> T
|
||||||
|
{
|
||||||
|
let task = Task { try await operation() }
|
||||||
|
let timeout = Task {
|
||||||
|
try await Task.sleep(nanoseconds: UInt64(seconds * 1_000_000_000))
|
||||||
|
throw NSError(domain: "Bridge", code: 22, userInfo: [
|
||||||
|
NSLocalizedDescriptionKey: "operation timed out",
|
||||||
|
])
|
||||||
|
}
|
||||||
|
defer { timeout.cancel() }
|
||||||
|
return try await withTaskCancellationHandler(operation: {
|
||||||
|
try await task.value
|
||||||
|
}, onCancel: {
|
||||||
|
timeout.cancel()
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
private func resolveBridgeEndpoint(timeoutSeconds: Double) async -> NWEndpoint? {
|
private func resolveBridgeEndpoint(timeoutSeconds: Double) async -> NWEndpoint? {
|
||||||
let mode = await MainActor.run(body: { AppStateStore.shared.connectionMode })
|
let mode = await MainActor.run(body: { AppStateStore.shared.connectionMode })
|
||||||
if mode == .remote {
|
if mode == .remote {
|
||||||
@@ -193,6 +289,9 @@ final class MacNodeModeCoordinator {
|
|||||||
tunnel.terminate()
|
tunnel.terminate()
|
||||||
self.tunnel = nil
|
self.tunnel = nil
|
||||||
}
|
}
|
||||||
|
if mode == .local, let endpoint = await self.resolveLoopbackBridgeEndpoint(timeoutSeconds: 0.4) {
|
||||||
|
return endpoint
|
||||||
|
}
|
||||||
return await Self.discoverBridgeEndpoint(timeoutSeconds: timeoutSeconds)
|
return await Self.discoverBridgeEndpoint(timeoutSeconds: timeoutSeconds)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -0,0 +1,120 @@
|
|||||||
|
import Foundation
|
||||||
|
import Network
|
||||||
|
import Testing
|
||||||
|
@testable import Clawdis
|
||||||
|
|
||||||
|
@Suite struct MacNodeBridgeDiscoveryTests {
|
||||||
|
@MainActor
|
||||||
|
@Test func loopbackBridgePortDefaultsAndOverrides() {
|
||||||
|
withEnv("CLAWDIS_BRIDGE_PORT", value: nil) {
|
||||||
|
#expect(MacNodeModeCoordinator.loopbackBridgePort() == 18790)
|
||||||
|
}
|
||||||
|
withEnv("CLAWDIS_BRIDGE_PORT", value: "19991") {
|
||||||
|
#expect(MacNodeModeCoordinator.loopbackBridgePort() == 19991)
|
||||||
|
}
|
||||||
|
withEnv("CLAWDIS_BRIDGE_PORT", value: "not-a-port") {
|
||||||
|
#expect(MacNodeModeCoordinator.loopbackBridgePort() == 18790)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@MainActor
|
||||||
|
@Test func probeEndpointSucceedsForOpenPort() async throws {
|
||||||
|
let listener = try NWListener(using: .tcp, on: .any)
|
||||||
|
listener.newConnectionHandler = { connection in
|
||||||
|
connection.cancel()
|
||||||
|
}
|
||||||
|
listener.start(queue: DispatchQueue(label: "com.steipete.clawdis.tests.bridge-listener"))
|
||||||
|
try await waitForListenerReady(listener, timeoutSeconds: 1.0)
|
||||||
|
|
||||||
|
guard let port = listener.port else {
|
||||||
|
listener.cancel()
|
||||||
|
throw TestError(message: "listener port missing")
|
||||||
|
}
|
||||||
|
|
||||||
|
let endpoint = NWEndpoint.hostPort(host: "127.0.0.1", port: port)
|
||||||
|
let ok = await MacNodeModeCoordinator.probeEndpoint(endpoint, timeoutSeconds: 0.6)
|
||||||
|
listener.cancel()
|
||||||
|
#expect(ok == true)
|
||||||
|
}
|
||||||
|
|
||||||
|
@MainActor
|
||||||
|
@Test func probeEndpointFailsForClosedPort() async throws {
|
||||||
|
let listener = try NWListener(using: .tcp, on: .any)
|
||||||
|
listener.start(queue: DispatchQueue(label: "com.steipete.clawdis.tests.bridge-listener-close"))
|
||||||
|
try await waitForListenerReady(listener, timeoutSeconds: 1.0)
|
||||||
|
let port = listener.port
|
||||||
|
listener.cancel()
|
||||||
|
try await Task.sleep(nanoseconds: 150_000_000)
|
||||||
|
|
||||||
|
guard let port else {
|
||||||
|
throw TestError(message: "listener port missing")
|
||||||
|
}
|
||||||
|
|
||||||
|
let endpoint = NWEndpoint.hostPort(host: "127.0.0.1", port: port)
|
||||||
|
let ok = await MacNodeModeCoordinator.probeEndpoint(endpoint, timeoutSeconds: 0.4)
|
||||||
|
#expect(ok == false)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private struct TestError: Error {
|
||||||
|
let message: String
|
||||||
|
}
|
||||||
|
|
||||||
|
private struct ListenerTimeoutError: Error {}
|
||||||
|
|
||||||
|
private func waitForListenerReady(_ listener: NWListener, timeoutSeconds: Double) async throws {
|
||||||
|
try await withThrowingTaskGroup(of: Void.self) { group in
|
||||||
|
group.addTask {
|
||||||
|
try await withCheckedThrowingContinuation { cont in
|
||||||
|
final class ListenerState: @unchecked Sendable {
|
||||||
|
let lock = NSLock()
|
||||||
|
var finished = false
|
||||||
|
}
|
||||||
|
let state = ListenerState()
|
||||||
|
let finish: @Sendable (Result<Void, Error>) -> Void = { result in
|
||||||
|
state.lock.lock()
|
||||||
|
defer { state.lock.unlock() }
|
||||||
|
guard !state.finished else { return }
|
||||||
|
state.finished = true
|
||||||
|
cont.resume(with: result)
|
||||||
|
}
|
||||||
|
|
||||||
|
listener.stateUpdateHandler = { state in
|
||||||
|
switch state {
|
||||||
|
case .ready:
|
||||||
|
finish(.success(()))
|
||||||
|
case .failed(let err):
|
||||||
|
finish(.failure(err))
|
||||||
|
case .cancelled:
|
||||||
|
finish(.failure(ListenerTimeoutError()))
|
||||||
|
default:
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
group.addTask {
|
||||||
|
try await Task.sleep(nanoseconds: UInt64(timeoutSeconds * 1_000_000_000))
|
||||||
|
throw ListenerTimeoutError()
|
||||||
|
}
|
||||||
|
_ = try await group.next()
|
||||||
|
group.cancelAll()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private func withEnv(_ key: String, value: String?, _ body: () -> Void) {
|
||||||
|
let existing = getenv(key).map { String(cString: $0) }
|
||||||
|
if let value {
|
||||||
|
setenv(key, value, 1)
|
||||||
|
} else {
|
||||||
|
unsetenv(key)
|
||||||
|
}
|
||||||
|
defer {
|
||||||
|
if let existing {
|
||||||
|
setenv(key, existing, 1)
|
||||||
|
} else {
|
||||||
|
unsetenv(key)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
body()
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user