fix(macos): reduce node pairing polling
This commit is contained in:
@@ -5,6 +5,15 @@ import Foundation
|
||||
import OSLog
|
||||
import UserNotifications
|
||||
|
||||
struct NodePairingReconcilePolicy {
|
||||
static let activeIntervalMs: UInt64 = 15_000
|
||||
static let resyncDelayMs: UInt64 = 250
|
||||
|
||||
static func shouldPoll(pendingCount: Int, isPresenting: Bool) -> Bool {
|
||||
pendingCount > 0 || isPresenting
|
||||
}
|
||||
}
|
||||
|
||||
@MainActor
|
||||
final class NodePairingApprovalPrompter {
|
||||
static let shared = NodePairingApprovalPrompter()
|
||||
@@ -12,6 +21,8 @@ final class NodePairingApprovalPrompter {
|
||||
private let logger = Logger(subsystem: "com.steipete.clawdis", category: "node-pairing")
|
||||
private var task: Task<Void, Never>?
|
||||
private var reconcileTask: Task<Void, Never>?
|
||||
private var reconcileOnceTask: Task<Void, Never>?
|
||||
private var reconcileInFlight = false
|
||||
private var isStopping = false
|
||||
private var isPresenting = false
|
||||
private var queue: [PendingRequest] = []
|
||||
@@ -54,6 +65,13 @@ final class NodePairingApprovalPrompter {
|
||||
var id: String { self.requestId }
|
||||
}
|
||||
|
||||
private struct PairingResolvedEvent: Codable {
|
||||
let requestId: String
|
||||
let nodeId: String
|
||||
let decision: String
|
||||
let ts: Double
|
||||
}
|
||||
|
||||
private enum PairingResolution: String {
|
||||
case approved
|
||||
case rejected
|
||||
@@ -63,9 +81,7 @@ final class NodePairingApprovalPrompter {
|
||||
guard self.task == nil else { return }
|
||||
self.isStopping = false
|
||||
self.reconcileTask?.cancel()
|
||||
self.reconcileTask = Task { [weak self] in
|
||||
await self?.reconcileLoop()
|
||||
}
|
||||
self.reconcileTask = nil
|
||||
self.task = Task { [weak self] in
|
||||
guard let self else { return }
|
||||
_ = try? await GatewayConnection.shared.refresh()
|
||||
@@ -85,6 +101,8 @@ final class NodePairingApprovalPrompter {
|
||||
self.task = nil
|
||||
self.reconcileTask?.cancel()
|
||||
self.reconcileTask = nil
|
||||
self.reconcileOnceTask?.cancel()
|
||||
self.reconcileOnceTask = nil
|
||||
self.queue.removeAll(keepingCapacity: false)
|
||||
self.isPresenting = false
|
||||
self.activeRequestId = nil
|
||||
@@ -108,16 +126,11 @@ final class NodePairingApprovalPrompter {
|
||||
timeoutMs: 6000)
|
||||
guard !data.isEmpty else { return }
|
||||
let list = try JSONDecoder().decode(PairingList.self, from: data)
|
||||
let pending = list.pending.sorted { $0.ts < $1.ts }
|
||||
guard !pending.isEmpty else { return }
|
||||
await MainActor.run { [weak self] in
|
||||
guard let self else { return }
|
||||
self.logger.info(
|
||||
"loaded \(pending.count, privacy: .public) pending node pairing request(s) on startup")
|
||||
for req in pending {
|
||||
self.enqueue(req)
|
||||
}
|
||||
}
|
||||
let pendingCount = list.pending.count
|
||||
guard pendingCount > 0 else { return }
|
||||
self.logger.info(
|
||||
"loaded \(pendingCount, privacy: .public) pending node pairing request(s) on startup")
|
||||
await self.apply(list: list)
|
||||
return
|
||||
} catch {
|
||||
if attempt == 8 {
|
||||
@@ -135,17 +148,17 @@ final class NodePairingApprovalPrompter {
|
||||
private func reconcileLoop() async {
|
||||
// Reconcile requests periodically so multiple running apps stay in sync
|
||||
// (e.g. close dialogs + notify if another machine approves/rejects via app or CLI).
|
||||
let intervalMs: UInt64 = 800
|
||||
while !Task.isCancelled {
|
||||
if self.isStopping { return }
|
||||
do {
|
||||
let list = try await self.fetchPairingList(timeoutMs: 2500)
|
||||
await self.apply(list: list)
|
||||
} catch {
|
||||
// best effort: ignore transient connectivity failures
|
||||
if self.isStopping { break }
|
||||
if !self.shouldPoll {
|
||||
self.reconcileTask = nil
|
||||
return
|
||||
}
|
||||
try? await Task.sleep(nanoseconds: intervalMs * 1_000_000)
|
||||
await self.reconcileOnce(timeoutMs: 2500)
|
||||
try? await Task.sleep(
|
||||
nanoseconds: NodePairingReconcilePolicy.activeIntervalMs * 1_000_000)
|
||||
}
|
||||
self.reconcileTask = nil
|
||||
}
|
||||
|
||||
private func fetchPairingList(timeoutMs: Double) async throws -> PairingList {
|
||||
@@ -193,6 +206,7 @@ final class NodePairingApprovalPrompter {
|
||||
self.isPresenting = false
|
||||
}
|
||||
self.presentNextIfNeeded()
|
||||
self.updateReconcileLoop()
|
||||
}
|
||||
|
||||
private func inferResolution(for request: PendingRequest, list: PairingList) -> PairingResolution {
|
||||
@@ -239,14 +253,32 @@ final class NodePairingApprovalPrompter {
|
||||
}
|
||||
|
||||
private func handle(push: GatewayPush) {
|
||||
guard case let .event(evt) = push else { return }
|
||||
guard evt.event == "node.pair.requested" else { return }
|
||||
guard let payload = evt.payload else { return }
|
||||
do {
|
||||
let req = try GatewayPayloadDecoding.decode(payload, as: PendingRequest.self)
|
||||
self.enqueue(req)
|
||||
} catch {
|
||||
self.logger.error("failed to decode pairing request: \(error.localizedDescription, privacy: .public)")
|
||||
switch push {
|
||||
case let .event(evt) where evt.event == "node.pair.requested":
|
||||
guard let payload = evt.payload else { return }
|
||||
do {
|
||||
let req = try GatewayPayloadDecoding.decode(payload, as: PendingRequest.self)
|
||||
self.enqueue(req)
|
||||
} catch {
|
||||
self.logger
|
||||
.error("failed to decode pairing request: \(error.localizedDescription, privacy: .public)")
|
||||
}
|
||||
case let .event(evt) where evt.event == "node.pair.resolved":
|
||||
guard let payload = evt.payload else { return }
|
||||
do {
|
||||
let resolved = try GatewayPayloadDecoding.decode(payload, as: PairingResolvedEvent.self)
|
||||
self.handleResolved(resolved)
|
||||
} catch {
|
||||
self.logger
|
||||
.error(
|
||||
"failed to decode pairing resolution: \(error.localizedDescription, privacy: .public)")
|
||||
}
|
||||
case .snapshot:
|
||||
self.scheduleReconcileOnce(delayMs: 0)
|
||||
case .seqGap:
|
||||
self.scheduleReconcileOnce()
|
||||
default:
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
@@ -254,6 +286,7 @@ final class NodePairingApprovalPrompter {
|
||||
if self.queue.contains(req) { return }
|
||||
self.queue.append(req)
|
||||
self.presentNextIfNeeded()
|
||||
self.updateReconcileLoop()
|
||||
}
|
||||
|
||||
private func presentNextIfNeeded() {
|
||||
@@ -324,6 +357,7 @@ final class NodePairingApprovalPrompter {
|
||||
}
|
||||
self.isPresenting = false
|
||||
self.presentNextIfNeeded()
|
||||
self.updateReconcileLoop()
|
||||
}
|
||||
|
||||
// Never approve/reject while shutting down (alerts can get dismissed during app termination).
|
||||
@@ -462,6 +496,7 @@ final class NodePairingApprovalPrompter {
|
||||
}
|
||||
self.isPresenting = false
|
||||
self.presentNextIfNeeded()
|
||||
self.updateReconcileLoop()
|
||||
return true
|
||||
}
|
||||
|
||||
@@ -534,4 +569,74 @@ final class NodePairingApprovalPrompter {
|
||||
return process.terminationStatus == 0
|
||||
}.value
|
||||
}
|
||||
|
||||
private var shouldPoll: Bool {
|
||||
NodePairingReconcilePolicy.shouldPoll(
|
||||
pendingCount: self.queue.count,
|
||||
isPresenting: self.isPresenting)
|
||||
}
|
||||
|
||||
private func updateReconcileLoop() {
|
||||
guard !self.isStopping else { return }
|
||||
if self.shouldPoll {
|
||||
if self.reconcileTask == nil {
|
||||
self.reconcileTask = Task { [weak self] in
|
||||
await self?.reconcileLoop()
|
||||
}
|
||||
}
|
||||
} else {
|
||||
self.reconcileTask?.cancel()
|
||||
self.reconcileTask = nil
|
||||
}
|
||||
}
|
||||
|
||||
private func reconcileOnce(timeoutMs: Double) async {
|
||||
if self.isStopping { return }
|
||||
if self.reconcileInFlight { return }
|
||||
self.reconcileInFlight = true
|
||||
defer { self.reconcileInFlight = false }
|
||||
do {
|
||||
let list = try await self.fetchPairingList(timeoutMs: timeoutMs)
|
||||
await self.apply(list: list)
|
||||
} catch {
|
||||
// best effort: ignore transient connectivity failures
|
||||
}
|
||||
}
|
||||
|
||||
private func scheduleReconcileOnce(delayMs: UInt64 = NodePairingReconcilePolicy.resyncDelayMs) {
|
||||
self.reconcileOnceTask?.cancel()
|
||||
self.reconcileOnceTask = Task { [weak self] in
|
||||
guard let self else { return }
|
||||
if delayMs > 0 {
|
||||
try? await Task.sleep(nanoseconds: delayMs * 1_000_000)
|
||||
}
|
||||
await self.reconcileOnce(timeoutMs: 2500)
|
||||
}
|
||||
}
|
||||
|
||||
private func handleResolved(_ resolved: PairingResolvedEvent) {
|
||||
let resolution: PairingResolution =
|
||||
resolved.decision == PairingResolution.approved.rawValue ? .approved : .rejected
|
||||
|
||||
if self.activeRequestId == resolved.requestId, self.activeAlert != nil {
|
||||
self.remoteResolutionsByRequestId[resolved.requestId] = resolution
|
||||
self.logger.info(
|
||||
"pairing request resolved elsewhere; closing dialog requestId=\(resolved.requestId, privacy: .public) resolution=\(resolution.rawValue, privacy: .public)")
|
||||
self.endActiveAlert()
|
||||
return
|
||||
}
|
||||
|
||||
guard let request = self.queue.first(where: { $0.requestId == resolved.requestId }) else {
|
||||
return
|
||||
}
|
||||
self.queue.removeAll { $0.requestId == resolved.requestId }
|
||||
Task { @MainActor in
|
||||
await self.notify(resolution: resolution, request: request, via: "remote")
|
||||
}
|
||||
if self.queue.isEmpty {
|
||||
self.isPresenting = false
|
||||
}
|
||||
self.presentNextIfNeeded()
|
||||
self.updateReconcileLoop()
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user