fix: harden node bridge keepalive
This commit is contained in:
@@ -1,6 +1,7 @@
|
||||
import ClawdbotKit
|
||||
import Foundation
|
||||
import Network
|
||||
import OSLog
|
||||
|
||||
actor MacNodeBridgeSession {
|
||||
private struct TimeoutError: LocalizedError {
|
||||
@@ -15,8 +16,10 @@ actor MacNodeBridgeSession {
|
||||
case failed(message: String)
|
||||
}
|
||||
|
||||
private let logger = Logger(subsystem: "com.clawdbot", category: "node.bridge-session")
|
||||
private let encoder = JSONEncoder()
|
||||
private let decoder = JSONDecoder()
|
||||
private let clock = ContinuousClock()
|
||||
|
||||
private var connection: NWConnection?
|
||||
private var queue: DispatchQueue?
|
||||
@@ -24,8 +27,7 @@ actor MacNodeBridgeSession {
|
||||
private var pendingRPC: [String: CheckedContinuation<BridgeRPCResponse, Error>] = [:]
|
||||
private var serverEventSubscribers: [UUID: AsyncStream<BridgeEventFrame>.Continuation] = [:]
|
||||
private var pingTask: Task<Void, Never>?
|
||||
private var lastPongAt: Date?
|
||||
private var lastPingId: String?
|
||||
private var lastPongAt: ContinuousClock.Instant?
|
||||
|
||||
private(set) var state: State = .idle
|
||||
|
||||
@@ -41,6 +43,12 @@ actor MacNodeBridgeSession {
|
||||
|
||||
let params = NWParameters.tcp
|
||||
params.includePeerToPeer = true
|
||||
let tcpOptions = NWProtocolTCP.Options()
|
||||
tcpOptions.enableKeepalive = true
|
||||
tcpOptions.keepaliveIdle = 30
|
||||
tcpOptions.keepaliveInterval = 15
|
||||
tcpOptions.keepaliveCount = 3
|
||||
params.defaultProtocolStack.transportProtocol = tcpOptions
|
||||
let connection = NWConnection(to: endpoint, using: params)
|
||||
let queue = DispatchQueue(label: "com.clawdbot.macos.bridge-session")
|
||||
self.connection = connection
|
||||
@@ -50,6 +58,10 @@ actor MacNodeBridgeSession {
|
||||
connection.start(queue: queue)
|
||||
|
||||
try await Self.waitForReady(stateStream, timeoutSeconds: 6)
|
||||
connection.stateUpdateHandler = { [weak self] state in
|
||||
guard let self else { return }
|
||||
Task { await self.handleConnectionState(state) }
|
||||
}
|
||||
|
||||
try await AsyncTimeout.withTimeout(
|
||||
seconds: 6,
|
||||
@@ -193,7 +205,6 @@ actor MacNodeBridgeSession {
|
||||
self.pingTask?.cancel()
|
||||
self.pingTask = nil
|
||||
self.lastPongAt = nil
|
||||
self.lastPingId = nil
|
||||
|
||||
self.connection?.cancel()
|
||||
self.connection = nil
|
||||
@@ -300,7 +311,7 @@ actor MacNodeBridgeSession {
|
||||
|
||||
private func startPingLoop() {
|
||||
self.pingTask?.cancel()
|
||||
self.lastPongAt = Date()
|
||||
self.lastPongAt = self.clock.now
|
||||
self.pingTask = Task { [weak self] in
|
||||
guard let self else { return }
|
||||
await self.runPingLoop()
|
||||
@@ -308,30 +319,29 @@ actor MacNodeBridgeSession {
|
||||
}
|
||||
|
||||
private func runPingLoop() async {
|
||||
let intervalSeconds = 15.0
|
||||
let timeoutSeconds = 45.0
|
||||
let interval: Duration = .seconds(15)
|
||||
let timeout: Duration = .seconds(45)
|
||||
|
||||
while !Task.isCancelled {
|
||||
do {
|
||||
try await Task.sleep(nanoseconds: UInt64(intervalSeconds * 1_000_000_000))
|
||||
} catch {
|
||||
return
|
||||
}
|
||||
try? await Task.sleep(for: interval)
|
||||
|
||||
guard self.connection != nil else { return }
|
||||
|
||||
if let last = self.lastPongAt,
|
||||
Date().timeIntervalSince(last) > timeoutSeconds
|
||||
{
|
||||
await self.disconnect()
|
||||
return
|
||||
if let last = self.lastPongAt {
|
||||
let now = self.clock.now
|
||||
if now > last.advanced(by: timeout) {
|
||||
let age = last.duration(to: now)
|
||||
self.logger.warning("Node bridge heartbeat timed out; disconnecting (age: \(String(describing: age), privacy: .public)).")
|
||||
await self.disconnect()
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
let id = UUID().uuidString
|
||||
self.lastPingId = id
|
||||
do {
|
||||
try await self.send(BridgePing(type: "ping", id: id))
|
||||
} catch {
|
||||
self.logger.warning("Node bridge ping send failed; disconnecting (error: \(String(describing: error), privacy: .public)).")
|
||||
await self.disconnect()
|
||||
return
|
||||
}
|
||||
@@ -340,7 +350,20 @@ actor MacNodeBridgeSession {
|
||||
|
||||
private func notePong(_ pong: BridgePong) {
|
||||
_ = pong
|
||||
self.lastPongAt = Date()
|
||||
self.lastPongAt = self.clock.now
|
||||
}
|
||||
|
||||
private func handleConnectionState(_ state: NWConnection.State) async {
|
||||
switch state {
|
||||
case let .failed(error):
|
||||
self.logger.warning("Node bridge connection failed; disconnecting (error: \(String(describing: error), privacy: .public)).")
|
||||
await self.disconnect()
|
||||
case .cancelled:
|
||||
self.logger.warning("Node bridge connection cancelled; disconnecting.")
|
||||
await self.disconnect()
|
||||
default:
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
private static func makeStateStream(
|
||||
|
||||
Reference in New Issue
Block a user