Merge pull request #676 from ngutman/fix/macos-bridge-tunnel-health
fix(macos): stabilize bridge tunnels
This commit is contained in:
@@ -12,9 +12,8 @@
|
|||||||
- Gateway: add OpenAI-compatible `/v1/chat/completions` HTTP endpoint (auth, SSE streaming, per-agent routing). (#680) — thanks @steipete.
|
- Gateway: add OpenAI-compatible `/v1/chat/completions` HTTP endpoint (auth, SSE streaming, per-agent routing). (#680) — thanks @steipete.
|
||||||
|
|
||||||
### Fixes
|
### Fixes
|
||||||
- Block Streaming: enable for all providers, not just Telegram. (#684) — thanks @rubyrunsstuff.
|
- macOS: stabilize bridge tunnels, guard invoke senders on disconnect, and drain stdout/stderr to avoid deadlocks. (#676) — thanks @ngutman.
|
||||||
- Agents/System: clarify sandboxed runtime in system prompt and surface elevated availability when sandboxed.
|
- Agents/System: clarify sandboxed runtime in system prompt and surface elevated availability when sandboxed.
|
||||||
- Agents/System: add reasoning visibility hint + /reasoning and /status guidance in system prompt.
|
|
||||||
- Auto-reply: prefer `RawBody` for command/directive parsing (WhatsApp + Discord) and prevent fallback runs from clobbering concurrent session updates. (#643) — thanks @mcinteerj.
|
- Auto-reply: prefer `RawBody` for command/directive parsing (WhatsApp + Discord) and prevent fallback runs from clobbering concurrent session updates. (#643) — thanks @mcinteerj.
|
||||||
- WhatsApp: fix group reactions by preserving message IDs and sender JIDs in history; normalize participant phone numbers to JIDs in outbound reactions. (#640) — thanks @mcinteerj.
|
- WhatsApp: fix group reactions by preserving message IDs and sender JIDs in history; normalize participant phone numbers to JIDs in outbound reactions. (#640) — thanks @mcinteerj.
|
||||||
- WhatsApp: expose group participant IDs to the model so reactions can target the right sender.
|
- WhatsApp: expose group participant IDs to the model so reactions can target the right sender.
|
||||||
|
|||||||
@@ -20,12 +20,14 @@ actor MacNodeBridgeSession {
|
|||||||
private let encoder = JSONEncoder()
|
private let encoder = JSONEncoder()
|
||||||
private let decoder = JSONDecoder()
|
private let decoder = JSONDecoder()
|
||||||
private let clock = ContinuousClock()
|
private let clock = ContinuousClock()
|
||||||
|
private var disconnectHandler: (@Sendable (String) async -> Void)?
|
||||||
|
|
||||||
private var connection: NWConnection?
|
private var connection: NWConnection?
|
||||||
private var queue: DispatchQueue?
|
private var queue: DispatchQueue?
|
||||||
private var buffer = Data()
|
private var buffer = Data()
|
||||||
private var pendingRPC: [String: CheckedContinuation<BridgeRPCResponse, Error>] = [:]
|
private var pendingRPC: [String: CheckedContinuation<BridgeRPCResponse, Error>] = [:]
|
||||||
private var serverEventSubscribers: [UUID: AsyncStream<BridgeEventFrame>.Continuation] = [:]
|
private var serverEventSubscribers: [UUID: AsyncStream<BridgeEventFrame>.Continuation] = [:]
|
||||||
|
private var invokeTasks: [UUID: Task<Void, Never>] = [:]
|
||||||
private var pingTask: Task<Void, Never>?
|
private var pingTask: Task<Void, Never>?
|
||||||
private var lastPongAt: ContinuousClock.Instant?
|
private var lastPongAt: ContinuousClock.Instant?
|
||||||
|
|
||||||
@@ -35,10 +37,12 @@ actor MacNodeBridgeSession {
|
|||||||
endpoint: NWEndpoint,
|
endpoint: NWEndpoint,
|
||||||
hello: BridgeHello,
|
hello: BridgeHello,
|
||||||
onConnected: (@Sendable (String) async -> Void)? = nil,
|
onConnected: (@Sendable (String) async -> Void)? = nil,
|
||||||
|
onDisconnected: (@Sendable (String) async -> Void)? = nil,
|
||||||
onInvoke: @escaping @Sendable (BridgeInvokeRequest) async -> BridgeInvokeResponse)
|
onInvoke: @escaping @Sendable (BridgeInvokeRequest) async -> BridgeInvokeResponse)
|
||||||
async throws
|
async throws
|
||||||
{
|
{
|
||||||
await self.disconnect()
|
await self.disconnect()
|
||||||
|
self.disconnectHandler = onDisconnected
|
||||||
self.state = .connecting
|
self.state = .connecting
|
||||||
|
|
||||||
let params = NWParameters.tcp
|
let params = NWParameters.tcp
|
||||||
@@ -83,6 +87,7 @@ actor MacNodeBridgeSession {
|
|||||||
let data = line.data(using: .utf8),
|
let data = line.data(using: .utf8),
|
||||||
let base = try? self.decoder.decode(BridgeBaseFrame.self, from: data)
|
let base = try? self.decoder.decode(BridgeBaseFrame.self, from: data)
|
||||||
else {
|
else {
|
||||||
|
self.logger.error("node bridge hello failed (unexpected response)")
|
||||||
await self.disconnect()
|
await self.disconnect()
|
||||||
throw NSError(domain: "Bridge", code: 1, userInfo: [
|
throw NSError(domain: "Bridge", code: 1, userInfo: [
|
||||||
NSLocalizedDescriptionKey: "Unexpected bridge response",
|
NSLocalizedDescriptionKey: "Unexpected bridge response",
|
||||||
@@ -97,53 +102,67 @@ actor MacNodeBridgeSession {
|
|||||||
} else if base.type == "error" {
|
} else if base.type == "error" {
|
||||||
let err = try self.decoder.decode(BridgeErrorFrame.self, from: data)
|
let err = try self.decoder.decode(BridgeErrorFrame.self, from: data)
|
||||||
self.state = .failed(message: "\(err.code): \(err.message)")
|
self.state = .failed(message: "\(err.code): \(err.message)")
|
||||||
|
self.logger.error("node bridge hello error: \(err.code, privacy: .public)")
|
||||||
await self.disconnect()
|
await self.disconnect()
|
||||||
throw NSError(domain: "Bridge", code: 2, userInfo: [
|
throw NSError(domain: "Bridge", code: 2, userInfo: [
|
||||||
NSLocalizedDescriptionKey: "\(err.code): \(err.message)",
|
NSLocalizedDescriptionKey: "\(err.code): \(err.message)",
|
||||||
])
|
])
|
||||||
} else {
|
} else {
|
||||||
self.state = .failed(message: "Unexpected bridge response")
|
self.state = .failed(message: "Unexpected bridge response")
|
||||||
|
self.logger.error("node bridge hello failed (unexpected frame)")
|
||||||
await self.disconnect()
|
await self.disconnect()
|
||||||
throw NSError(domain: "Bridge", code: 3, userInfo: [
|
throw NSError(domain: "Bridge", code: 3, userInfo: [
|
||||||
NSLocalizedDescriptionKey: "Unexpected bridge response",
|
NSLocalizedDescriptionKey: "Unexpected bridge response",
|
||||||
])
|
])
|
||||||
}
|
}
|
||||||
|
|
||||||
while true {
|
do {
|
||||||
guard let next = try await self.receiveLine() else { break }
|
while true {
|
||||||
guard let nextData = next.data(using: .utf8) else { continue }
|
guard let next = try await self.receiveLine() else { break }
|
||||||
guard let nextBase = try? self.decoder.decode(BridgeBaseFrame.self, from: nextData) else { continue }
|
guard let nextData = next.data(using: .utf8) else { continue }
|
||||||
|
guard let nextBase = try? self.decoder.decode(BridgeBaseFrame.self, from: nextData) else { continue }
|
||||||
|
|
||||||
switch nextBase.type {
|
switch nextBase.type {
|
||||||
case "res":
|
case "res":
|
||||||
let res = try self.decoder.decode(BridgeRPCResponse.self, from: nextData)
|
let res = try self.decoder.decode(BridgeRPCResponse.self, from: nextData)
|
||||||
if let cont = self.pendingRPC.removeValue(forKey: res.id) {
|
if let cont = self.pendingRPC.removeValue(forKey: res.id) {
|
||||||
cont.resume(returning: res)
|
cont.resume(returning: res)
|
||||||
|
}
|
||||||
|
|
||||||
|
case "event":
|
||||||
|
let evt = try self.decoder.decode(BridgeEventFrame.self, from: nextData)
|
||||||
|
self.broadcastServerEvent(evt)
|
||||||
|
|
||||||
|
case "ping":
|
||||||
|
let ping = try self.decoder.decode(BridgePing.self, from: nextData)
|
||||||
|
try await self.send(BridgePong(type: "pong", id: ping.id))
|
||||||
|
|
||||||
|
case "pong":
|
||||||
|
let pong = try self.decoder.decode(BridgePong.self, from: nextData)
|
||||||
|
self.notePong(pong)
|
||||||
|
|
||||||
|
case "invoke":
|
||||||
|
let req = try self.decoder.decode(BridgeInvokeRequest.self, from: nextData)
|
||||||
|
let taskID = UUID()
|
||||||
|
let task = Task { [weak self] in
|
||||||
|
let res = await onInvoke(req)
|
||||||
|
guard let self else { return }
|
||||||
|
await self.sendInvokeResponse(res, taskID: taskID)
|
||||||
|
}
|
||||||
|
self.invokeTasks[taskID] = task
|
||||||
|
|
||||||
|
default:
|
||||||
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
case "event":
|
|
||||||
let evt = try self.decoder.decode(BridgeEventFrame.self, from: nextData)
|
|
||||||
self.broadcastServerEvent(evt)
|
|
||||||
|
|
||||||
case "ping":
|
|
||||||
let ping = try self.decoder.decode(BridgePing.self, from: nextData)
|
|
||||||
try await self.send(BridgePong(type: "pong", id: ping.id))
|
|
||||||
|
|
||||||
case "pong":
|
|
||||||
let pong = try self.decoder.decode(BridgePong.self, from: nextData)
|
|
||||||
self.notePong(pong)
|
|
||||||
|
|
||||||
case "invoke":
|
|
||||||
let req = try self.decoder.decode(BridgeInvokeRequest.self, from: nextData)
|
|
||||||
let res = await onInvoke(req)
|
|
||||||
try await self.send(res)
|
|
||||||
|
|
||||||
default:
|
|
||||||
continue
|
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
await self.disconnect()
|
await self.handleDisconnect(reason: "connection closed")
|
||||||
|
} catch {
|
||||||
|
self.logger.error(
|
||||||
|
"node bridge receive failed: \(error.localizedDescription, privacy: .public)")
|
||||||
|
await self.handleDisconnect(reason: "receive failed")
|
||||||
|
throw error
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func sendEvent(event: String, payloadJSON: String?) async throws {
|
func sendEvent(event: String, payloadJSON: String?) async throws {
|
||||||
@@ -205,6 +224,8 @@ actor MacNodeBridgeSession {
|
|||||||
self.pingTask?.cancel()
|
self.pingTask?.cancel()
|
||||||
self.pingTask = nil
|
self.pingTask = nil
|
||||||
self.lastPongAt = nil
|
self.lastPongAt = nil
|
||||||
|
self.disconnectHandler = nil
|
||||||
|
self.cancelInvokeTasks()
|
||||||
|
|
||||||
self.connection?.cancel()
|
self.connection?.cancel()
|
||||||
self.connection = nil
|
self.connection = nil
|
||||||
@@ -312,6 +333,7 @@ actor MacNodeBridgeSession {
|
|||||||
private func startPingLoop() {
|
private func startPingLoop() {
|
||||||
self.pingTask?.cancel()
|
self.pingTask?.cancel()
|
||||||
self.lastPongAt = self.clock.now
|
self.lastPongAt = self.clock.now
|
||||||
|
self.logger.debug("node bridge ping loop started")
|
||||||
self.pingTask = Task { [weak self] in
|
self.pingTask = Task { [weak self] in
|
||||||
guard let self else { return }
|
guard let self else { return }
|
||||||
await self.runPingLoop()
|
await self.runPingLoop()
|
||||||
@@ -336,7 +358,7 @@ actor MacNodeBridgeSession {
|
|||||||
"Node bridge heartbeat timed out; disconnecting " +
|
"Node bridge heartbeat timed out; disconnecting " +
|
||||||
"(age: \(ageDescription, privacy: .public))."
|
"(age: \(ageDescription, privacy: .public))."
|
||||||
self.logger.warning(message)
|
self.logger.warning(message)
|
||||||
await self.disconnect()
|
await self.handleDisconnect(reason: "ping timeout")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -350,7 +372,7 @@ actor MacNodeBridgeSession {
|
|||||||
"Node bridge ping send failed; disconnecting " +
|
"Node bridge ping send failed; disconnecting " +
|
||||||
"(error: \(errorDescription, privacy: .public))."
|
"(error: \(errorDescription, privacy: .public))."
|
||||||
self.logger.warning(message)
|
self.logger.warning(message)
|
||||||
await self.disconnect()
|
await self.handleDisconnect(reason: "ping send failed")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -369,15 +391,45 @@ actor MacNodeBridgeSession {
|
|||||||
"Node bridge connection failed; disconnecting " +
|
"Node bridge connection failed; disconnecting " +
|
||||||
"(error: \(errorDescription, privacy: .public))."
|
"(error: \(errorDescription, privacy: .public))."
|
||||||
self.logger.warning(message)
|
self.logger.warning(message)
|
||||||
await self.disconnect()
|
await self.handleDisconnect(reason: "connection failed")
|
||||||
case .cancelled:
|
case .cancelled:
|
||||||
self.logger.warning("Node bridge connection cancelled; disconnecting.")
|
self.logger.warning("Node bridge connection cancelled; disconnecting.")
|
||||||
await self.disconnect()
|
await self.handleDisconnect(reason: "connection cancelled")
|
||||||
default:
|
default:
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private func handleDisconnect(reason: String) async {
|
||||||
|
self.logger.info("node bridge disconnect reason=\(reason, privacy: .public)")
|
||||||
|
if let handler = self.disconnectHandler {
|
||||||
|
await handler(reason)
|
||||||
|
}
|
||||||
|
await self.disconnect()
|
||||||
|
}
|
||||||
|
|
||||||
|
private func logInvokeSendFailure(_ error: Error) {
|
||||||
|
self.logger.error(
|
||||||
|
"node bridge invoke response send failed: \(error.localizedDescription, privacy: .public)")
|
||||||
|
}
|
||||||
|
|
||||||
|
private func sendInvokeResponse(_ response: BridgeInvokeResponse, taskID: UUID) async {
|
||||||
|
defer { self.invokeTasks[taskID] = nil }
|
||||||
|
if Task.isCancelled { return }
|
||||||
|
do {
|
||||||
|
try await self.send(response)
|
||||||
|
} catch {
|
||||||
|
await self.logInvokeSendFailure(error)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private func cancelInvokeTasks() {
|
||||||
|
for task in self.invokeTasks.values {
|
||||||
|
task.cancel()
|
||||||
|
}
|
||||||
|
self.invokeTasks.removeAll()
|
||||||
|
}
|
||||||
|
|
||||||
private static func makeStateStream(
|
private static func makeStateStream(
|
||||||
for connection: NWConnection) -> AsyncStream<NWConnection.State>
|
for connection: NWConnection) -> AsyncStream<NWConnection.State>
|
||||||
{
|
{
|
||||||
|
|||||||
@@ -61,12 +61,17 @@ final class MacNodeModeCoordinator {
|
|||||||
retryDelay = 1_000_000_000
|
retryDelay = 1_000_000_000
|
||||||
do {
|
do {
|
||||||
let hello = await self.makeHello()
|
let hello = await self.makeHello()
|
||||||
|
self.logger.info(
|
||||||
|
"mac node bridge connecting endpoint=\(endpoint, privacy: .public)")
|
||||||
try await self.session.connect(
|
try await self.session.connect(
|
||||||
endpoint: endpoint,
|
endpoint: endpoint,
|
||||||
hello: hello,
|
hello: hello,
|
||||||
onConnected: { [weak self] serverName in
|
onConnected: { [weak self] serverName in
|
||||||
self?.logger.info("mac node connected to \(serverName, privacy: .public)")
|
self?.logger.info("mac node connected to \(serverName, privacy: .public)")
|
||||||
},
|
},
|
||||||
|
onDisconnected: { reason in
|
||||||
|
await MacNodeModeCoordinator.handleBridgeDisconnect(reason: reason)
|
||||||
|
},
|
||||||
onInvoke: { [weak self] req in
|
onInvoke: { [weak self] req in
|
||||||
guard let self else {
|
guard let self else {
|
||||||
return BridgeInvokeResponse(
|
return BridgeInvokeResponse(
|
||||||
@@ -80,7 +85,8 @@ final class MacNodeModeCoordinator {
|
|||||||
if await self.tryPair(endpoint: endpoint, error: error) {
|
if await self.tryPair(endpoint: endpoint, error: error) {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
self.logger.error("mac node bridge connect failed: \(error.localizedDescription, privacy: .public)")
|
self.logger.error(
|
||||||
|
"mac node bridge connect failed: \(error.localizedDescription, privacy: .public)")
|
||||||
try? await Task.sleep(nanoseconds: min(retryDelay, 5_000_000_000))
|
try? await Task.sleep(nanoseconds: min(retryDelay, 5_000_000_000))
|
||||||
retryDelay = min(retryDelay * 2, 10_000_000_000)
|
retryDelay = min(retryDelay * 2, 10_000_000_000)
|
||||||
}
|
}
|
||||||
@@ -285,15 +291,31 @@ final class MacNodeModeCoordinator {
|
|||||||
let mode = await MainActor.run(body: { AppStateStore.shared.connectionMode })
|
let mode = await MainActor.run(body: { AppStateStore.shared.connectionMode })
|
||||||
if mode == .remote {
|
if mode == .remote {
|
||||||
do {
|
do {
|
||||||
if self.tunnel == nil || self.tunnel?.process.isRunning == false {
|
if let tunnel = self.tunnel,
|
||||||
let remotePort = Self.remoteBridgePort()
|
tunnel.process.isRunning,
|
||||||
self.tunnel = try await RemotePortTunnel.create(
|
let localPort = tunnel.localPort
|
||||||
remotePort: remotePort,
|
{
|
||||||
allowRemoteUrlOverride: false)
|
let healthy = await self.bridgeTunnelHealthy(localPort: localPort, timeoutSeconds: 1.0)
|
||||||
|
if healthy, let port = NWEndpoint.Port(rawValue: localPort) {
|
||||||
|
self.logger.info(
|
||||||
|
"reusing mac node bridge tunnel localPort=\(localPort, privacy: .public)")
|
||||||
|
return .hostPort(host: "127.0.0.1", port: port)
|
||||||
|
}
|
||||||
|
self.logger.error(
|
||||||
|
"mac node bridge tunnel unhealthy localPort=\(localPort, privacy: .public); restarting")
|
||||||
|
tunnel.terminate()
|
||||||
|
self.tunnel = nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
let remotePort = Self.remoteBridgePort()
|
||||||
|
self.tunnel = try await RemotePortTunnel.create(
|
||||||
|
remotePort: remotePort,
|
||||||
|
allowRemoteUrlOverride: false)
|
||||||
if let localPort = self.tunnel?.localPort,
|
if let localPort = self.tunnel?.localPort,
|
||||||
let port = NWEndpoint.Port(rawValue: localPort)
|
let port = NWEndpoint.Port(rawValue: localPort)
|
||||||
{
|
{
|
||||||
|
self.logger.info(
|
||||||
|
"mac node bridge tunnel ready localPort=\(localPort, privacy: .public) remotePort=\(remotePort, privacy: .public)")
|
||||||
return .hostPort(host: "127.0.0.1", port: port)
|
return .hostPort(host: "127.0.0.1", port: port)
|
||||||
}
|
}
|
||||||
} catch {
|
} catch {
|
||||||
@@ -311,6 +333,21 @@ final class MacNodeModeCoordinator {
|
|||||||
return await Self.discoverBridgeEndpoint(timeoutSeconds: timeoutSeconds)
|
return await Self.discoverBridgeEndpoint(timeoutSeconds: timeoutSeconds)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@MainActor
|
||||||
|
private static func handleBridgeDisconnect(reason: String) async {
|
||||||
|
guard reason.localizedCaseInsensitiveContains("ping") else { return }
|
||||||
|
let coordinator = MacNodeModeCoordinator.shared
|
||||||
|
coordinator.logger.error(
|
||||||
|
"mac node bridge disconnected (\(reason, privacy: .public)); resetting tunnel")
|
||||||
|
coordinator.tunnel?.terminate()
|
||||||
|
coordinator.tunnel = nil
|
||||||
|
}
|
||||||
|
|
||||||
|
private func bridgeTunnelHealthy(localPort: UInt16, timeoutSeconds: Double) async -> Bool {
|
||||||
|
guard let port = NWEndpoint.Port(rawValue: localPort) else { return false }
|
||||||
|
return await Self.probeEndpoint(.hostPort(host: "127.0.0.1", port: port), timeoutSeconds: timeoutSeconds)
|
||||||
|
}
|
||||||
|
|
||||||
private static func discoverBridgeEndpoint(timeoutSeconds: Double) async -> NWEndpoint? {
|
private static func discoverBridgeEndpoint(timeoutSeconds: Double) async -> NWEndpoint? {
|
||||||
final class DiscoveryState: @unchecked Sendable {
|
final class DiscoveryState: @unchecked Sendable {
|
||||||
let lock = NSLock()
|
let lock = NSLock()
|
||||||
|
|||||||
@@ -41,7 +41,8 @@ final class RemotePortTunnel {
|
|||||||
static func create(
|
static func create(
|
||||||
remotePort: Int,
|
remotePort: Int,
|
||||||
preferredLocalPort: UInt16? = nil,
|
preferredLocalPort: UInt16? = nil,
|
||||||
allowRemoteUrlOverride: Bool = true) async throws -> RemotePortTunnel
|
allowRemoteUrlOverride: Bool = true,
|
||||||
|
allowRandomLocalPort: Bool = true) async throws -> RemotePortTunnel
|
||||||
{
|
{
|
||||||
let settings = CommandResolver.connectionSettings()
|
let settings = CommandResolver.connectionSettings()
|
||||||
guard settings.mode == .remote, let parsed = CommandResolver.parseSSHTarget(settings.target) else {
|
guard settings.mode == .remote, let parsed = CommandResolver.parseSSHTarget(settings.target) else {
|
||||||
@@ -51,7 +52,9 @@ final class RemotePortTunnel {
|
|||||||
userInfo: [NSLocalizedDescriptionKey: "Remote mode is not configured"])
|
userInfo: [NSLocalizedDescriptionKey: "Remote mode is not configured"])
|
||||||
}
|
}
|
||||||
|
|
||||||
let localPort = try await Self.findPort(preferred: preferredLocalPort)
|
let localPort = try await Self.findPort(
|
||||||
|
preferred: preferredLocalPort,
|
||||||
|
allowRandom: allowRandomLocalPort)
|
||||||
let sshHost = parsed.host.trimmingCharacters(in: .whitespacesAndNewlines)
|
let sshHost = parsed.host.trimmingCharacters(in: .whitespacesAndNewlines)
|
||||||
let remotePortOverride =
|
let remotePortOverride =
|
||||||
allowRemoteUrlOverride && remotePort == GatewayEnvironment.gatewayPort()
|
allowRemoteUrlOverride && remotePort == GatewayEnvironment.gatewayPort()
|
||||||
@@ -172,8 +175,16 @@ final class RemotePortTunnel {
|
|||||||
return trimmed.split(separator: ".").first.map(String.init) ?? trimmed
|
return trimmed.split(separator: ".").first.map(String.init) ?? trimmed
|
||||||
}
|
}
|
||||||
|
|
||||||
private static func findPort(preferred: UInt16?) async throws -> UInt16 {
|
private static func findPort(preferred: UInt16?, allowRandom: Bool) async throws -> UInt16 {
|
||||||
if let preferred, self.portIsFree(preferred) { return preferred }
|
if let preferred, self.portIsFree(preferred) { return preferred }
|
||||||
|
if let preferred, !allowRandom {
|
||||||
|
throw NSError(
|
||||||
|
domain: "RemotePortTunnel",
|
||||||
|
code: 5,
|
||||||
|
userInfo: [
|
||||||
|
NSLocalizedDescriptionKey: "Local port \(preferred) is unavailable",
|
||||||
|
])
|
||||||
|
}
|
||||||
|
|
||||||
return try await withCheckedThrowingContinuation { cont in
|
return try await withCheckedThrowingContinuation { cont in
|
||||||
let queue = DispatchQueue(label: "com.clawdbot.remote.tunnel.port", qos: .utility)
|
let queue = DispatchQueue(label: "com.clawdbot.remote.tunnel.port", qos: .utility)
|
||||||
|
|||||||
@@ -7,8 +7,15 @@ actor RemoteTunnelManager {
|
|||||||
|
|
||||||
private let logger = Logger(subsystem: "com.clawdbot", category: "remote-tunnel")
|
private let logger = Logger(subsystem: "com.clawdbot", category: "remote-tunnel")
|
||||||
private var controlTunnel: RemotePortTunnel?
|
private var controlTunnel: RemotePortTunnel?
|
||||||
|
private var restartInFlight = false
|
||||||
|
private var lastRestartAt: Date?
|
||||||
|
private let restartBackoffSeconds: TimeInterval = 2.0
|
||||||
|
|
||||||
func controlTunnelPortIfRunning() async -> UInt16? {
|
func controlTunnelPortIfRunning() async -> UInt16? {
|
||||||
|
if self.restartInFlight {
|
||||||
|
self.logger.info("control tunnel restart in flight; skipping reuse check")
|
||||||
|
return nil
|
||||||
|
}
|
||||||
if let tunnel = self.controlTunnel,
|
if let tunnel = self.controlTunnel,
|
||||||
tunnel.process.isRunning,
|
tunnel.process.isRunning,
|
||||||
let local = tunnel.localPort
|
let local = tunnel.localPort
|
||||||
@@ -18,6 +25,7 @@ actor RemoteTunnelManager {
|
|||||||
return local
|
return local
|
||||||
}
|
}
|
||||||
self.logger.error("active SSH tunnel on port \(local, privacy: .public) is unhealthy; restarting")
|
self.logger.error("active SSH tunnel on port \(local, privacy: .public) is unhealthy; restarting")
|
||||||
|
await self.beginRestart()
|
||||||
tunnel.terminate()
|
tunnel.terminate()
|
||||||
self.controlTunnel = nil
|
self.controlTunnel = nil
|
||||||
}
|
}
|
||||||
@@ -34,6 +42,11 @@ actor RemoteTunnelManager {
|
|||||||
"pid=\(desc.pid, privacy: .public)")
|
"pid=\(desc.pid, privacy: .public)")
|
||||||
return desiredPort
|
return desiredPort
|
||||||
}
|
}
|
||||||
|
if self.restartInFlight {
|
||||||
|
self.logger.info("control tunnel restart in flight; skip stale tunnel cleanup")
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
await self.beginRestart()
|
||||||
await self.cleanupStaleTunnel(desc: desc, port: desiredPort)
|
await self.cleanupStaleTunnel(desc: desc, port: desiredPort)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
@@ -56,12 +69,15 @@ actor RemoteTunnelManager {
|
|||||||
"identitySet=\(identitySet, privacy: .public)")
|
"identitySet=\(identitySet, privacy: .public)")
|
||||||
|
|
||||||
if let local = await self.controlTunnelPortIfRunning() { return local }
|
if let local = await self.controlTunnelPortIfRunning() { return local }
|
||||||
|
await self.waitForRestartBackoffIfNeeded()
|
||||||
|
|
||||||
let desiredPort = UInt16(GatewayEnvironment.gatewayPort())
|
let desiredPort = UInt16(GatewayEnvironment.gatewayPort())
|
||||||
let tunnel = try await RemotePortTunnel.create(
|
let tunnel = try await RemotePortTunnel.create(
|
||||||
remotePort: GatewayEnvironment.gatewayPort(),
|
remotePort: GatewayEnvironment.gatewayPort(),
|
||||||
preferredLocalPort: desiredPort)
|
preferredLocalPort: desiredPort,
|
||||||
|
allowRandomLocalPort: false)
|
||||||
self.controlTunnel = tunnel
|
self.controlTunnel = tunnel
|
||||||
|
self.endRestart()
|
||||||
let resolvedPort = tunnel.localPort ?? desiredPort
|
let resolvedPort = tunnel.localPort ?? desiredPort
|
||||||
self.logger.info("ssh tunnel ready localPort=\(resolvedPort, privacy: .public)")
|
self.logger.info("ssh tunnel ready localPort=\(resolvedPort, privacy: .public)")
|
||||||
return tunnel.localPort ?? desiredPort
|
return tunnel.localPort ?? desiredPort
|
||||||
@@ -83,6 +99,35 @@ actor RemoteTunnelManager {
|
|||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private func beginRestart() async {
|
||||||
|
guard !self.restartInFlight else { return }
|
||||||
|
self.restartInFlight = true
|
||||||
|
self.lastRestartAt = Date()
|
||||||
|
self.logger.info("control tunnel restart started")
|
||||||
|
Task { [weak self] in
|
||||||
|
guard let self else { return }
|
||||||
|
try? await Task.sleep(nanoseconds: UInt64(self.restartBackoffSeconds * 1_000_000_000))
|
||||||
|
await self.endRestart()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private func endRestart() {
|
||||||
|
if self.restartInFlight {
|
||||||
|
self.restartInFlight = false
|
||||||
|
self.logger.info("control tunnel restart finished")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private func waitForRestartBackoffIfNeeded() async {
|
||||||
|
guard let last = self.lastRestartAt else { return }
|
||||||
|
let elapsed = Date().timeIntervalSince(last)
|
||||||
|
let remaining = self.restartBackoffSeconds - elapsed
|
||||||
|
guard remaining > 0 else { return }
|
||||||
|
self.logger.info(
|
||||||
|
"control tunnel restart backoff \(remaining, privacy: .public)s")
|
||||||
|
try? await Task.sleep(nanoseconds: UInt64(remaining * 1_000_000_000))
|
||||||
|
}
|
||||||
|
|
||||||
private func cleanupStaleTunnel(desc: PortGuardian.Descriptor, port: UInt16) async {
|
private func cleanupStaleTunnel(desc: PortGuardian.Descriptor, port: UInt16) async {
|
||||||
let pid = desc.pid
|
let pid = desc.pid
|
||||||
self.logger.error(
|
self.logger.error(
|
||||||
|
|||||||
@@ -50,10 +50,13 @@ enum ShellExecutor {
|
|||||||
errorMessage: "failed to start: \(error.localizedDescription)")
|
errorMessage: "failed to start: \(error.localizedDescription)")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
let outTask = Task { stdoutPipe.fileHandleForReading.readToEndSafely() }
|
||||||
|
let errTask = Task { stderrPipe.fileHandleForReading.readToEndSafely() }
|
||||||
|
|
||||||
let waitTask = Task { () -> ShellResult in
|
let waitTask = Task { () -> ShellResult in
|
||||||
process.waitUntilExit()
|
process.waitUntilExit()
|
||||||
let out = stdoutPipe.fileHandleForReading.readToEndSafely()
|
let out = await outTask.value
|
||||||
let err = stderrPipe.fileHandleForReading.readToEndSafely()
|
let err = await errTask.value
|
||||||
let status = Int(process.terminationStatus)
|
let status = Int(process.terminationStatus)
|
||||||
return ShellResult(
|
return ShellResult(
|
||||||
stdout: String(bytes: out, encoding: .utf8) ?? "",
|
stdout: String(bytes: out, encoding: .utf8) ?? "",
|
||||||
|
|||||||
@@ -57,6 +57,25 @@ struct LowCoverageHelperTests {
|
|||||||
#expect(result.timedOut == true)
|
#expect(result.timedOut == true)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test func shellExecutorDrainsStdoutAndStderr() async {
|
||||||
|
let script = """
|
||||||
|
i=0
|
||||||
|
while [ $i -lt 2000 ]; do
|
||||||
|
echo "stdout-$i"
|
||||||
|
echo "stderr-$i" 1>&2
|
||||||
|
i=$((i+1))
|
||||||
|
done
|
||||||
|
"""
|
||||||
|
let result = await ShellExecutor.runDetailed(
|
||||||
|
command: ["/bin/sh", "-c", script],
|
||||||
|
cwd: nil,
|
||||||
|
env: nil,
|
||||||
|
timeout: 2)
|
||||||
|
#expect(result.success == true)
|
||||||
|
#expect(result.stdout.contains("stdout-1999"))
|
||||||
|
#expect(result.stderr.contains("stderr-1999"))
|
||||||
|
}
|
||||||
|
|
||||||
@Test func pairedNodesStorePersists() async throws {
|
@Test func pairedNodesStorePersists() async throws {
|
||||||
let dir = FileManager.default.temporaryDirectory
|
let dir = FileManager.default.temporaryDirectory
|
||||||
.appendingPathComponent("paired-\(UUID().uuidString)", isDirectory: true)
|
.appendingPathComponent("paired-\(UUID().uuidString)", isDirectory: true)
|
||||||
|
|||||||
Reference in New Issue
Block a user