fix: prevent stuck mac health checks
This commit is contained in:
@@ -93,7 +93,8 @@ final class ControlChannel: ObservableObject {
|
|||||||
if let timeout {
|
if let timeout {
|
||||||
params = ["timeout": AnyHashable(Int(timeout * 1000))]
|
params = ["timeout": AnyHashable(Int(timeout * 1000))]
|
||||||
}
|
}
|
||||||
let payload = try await self.request(method: "health", params: params)
|
let timeoutMs = (timeout ?? 15) * 1000
|
||||||
|
let payload = try await self.request(method: "health", params: params, timeoutMs: timeoutMs)
|
||||||
let ms = Date().timeIntervalSince(start) * 1000
|
let ms = Date().timeIntervalSince(start) * 1000
|
||||||
self.lastPingMs = ms
|
self.lastPingMs = ms
|
||||||
self.state = .connected
|
self.state = .connected
|
||||||
@@ -110,10 +111,14 @@ final class ControlChannel: ObservableObject {
|
|||||||
nil
|
nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func request(method: String, params: [String: AnyHashable]? = nil) async throws -> Data {
|
func request(
|
||||||
|
method: String,
|
||||||
|
params: [String: AnyHashable]? = nil,
|
||||||
|
timeoutMs: Double? = nil) async throws -> Data
|
||||||
|
{
|
||||||
do {
|
do {
|
||||||
let rawParams = params?.reduce(into: [String: AnyCodable]()) { $0[$1.key] = AnyCodable($1.value) }
|
let rawParams = params?.reduce(into: [String: AnyCodable]()) { $0[$1.key] = AnyCodable($1.value) }
|
||||||
let data = try await self.gateway.request(method: method, params: rawParams)
|
let data = try await self.gateway.request(method: method, params: rawParams, timeoutMs: timeoutMs)
|
||||||
self.state = .connected
|
self.state = .connected
|
||||||
return data
|
return data
|
||||||
} catch {
|
} catch {
|
||||||
@@ -147,6 +152,11 @@ final class ControlChannel: ObservableObject {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
let nsError = error as NSError
|
||||||
|
if nsError.domain == "Gateway", nsError.code == 5 {
|
||||||
|
return "Gateway request timed out; check the gateway process on localhost:\(GatewayEnvironment.gatewayPort())."
|
||||||
|
}
|
||||||
|
|
||||||
let nsError = error as NSError
|
let nsError = error as NSError
|
||||||
let detail = nsError.localizedDescription.isEmpty ? "unknown gateway error" : nsError.localizedDescription
|
let detail = nsError.localizedDescription.isEmpty ? "unknown gateway error" : nsError.localizedDescription
|
||||||
return "Gateway error: \(detail)"
|
return "Gateway error: \(detail)"
|
||||||
|
|||||||
@@ -34,6 +34,7 @@ private actor GatewayChannelActor {
|
|||||||
private let decoder = JSONDecoder()
|
private let decoder = JSONDecoder()
|
||||||
private let encoder = JSONEncoder()
|
private let encoder = JSONEncoder()
|
||||||
private var watchdogTask: Task<Void, Never>?
|
private var watchdogTask: Task<Void, Never>?
|
||||||
|
private let defaultRequestTimeoutMs: Double = 15_000
|
||||||
|
|
||||||
init(url: URL, token: String?) {
|
init(url: URL, token: String?) {
|
||||||
self.url = url
|
self.url = url
|
||||||
@@ -157,6 +158,7 @@ private actor GatewayChannelActor {
|
|||||||
let wrapped = self.wrap(err, context: "gateway receive")
|
let wrapped = self.wrap(err, context: "gateway receive")
|
||||||
self.logger.error("gateway ws receive failed \(wrapped.localizedDescription, privacy: .public)")
|
self.logger.error("gateway ws receive failed \(wrapped.localizedDescription, privacy: .public)")
|
||||||
self.connected = false
|
self.connected = false
|
||||||
|
await self.failPending(wrapped)
|
||||||
await self.scheduleReconnect()
|
await self.scheduleReconnect()
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -207,6 +209,11 @@ private actor GatewayChannelActor {
|
|||||||
if delta > tolerance {
|
if delta > tolerance {
|
||||||
self.logger.error("gateway tick missed; reconnecting")
|
self.logger.error("gateway tick missed; reconnecting")
|
||||||
self.connected = false
|
self.connected = false
|
||||||
|
await self.failPending(
|
||||||
|
NSError(
|
||||||
|
domain: "Gateway",
|
||||||
|
code: 4,
|
||||||
|
userInfo: [NSLocalizedDescriptionKey: "gateway tick missed; reconnecting"]))
|
||||||
await self.scheduleReconnect()
|
await self.scheduleReconnect()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@@ -228,13 +235,14 @@ private actor GatewayChannelActor {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func request(method: String, params: [String: AnyCodable]?) async throws -> Data {
|
func request(method: String, params: [String: AnyCodable]?, timeoutMs: Double? = nil) async throws -> Data {
|
||||||
do {
|
do {
|
||||||
try await self.connect()
|
try await self.connect()
|
||||||
} catch {
|
} catch {
|
||||||
throw self.wrap(error, context: "gateway connect")
|
throw self.wrap(error, context: "gateway connect")
|
||||||
}
|
}
|
||||||
let id = UUID().uuidString
|
let id = UUID().uuidString
|
||||||
|
let effectiveTimeout = timeoutMs ?? self.defaultRequestTimeoutMs
|
||||||
// Encode request using the generated models to avoid JSONSerialization/ObjC bridging pitfalls.
|
// Encode request using the generated models to avoid JSONSerialization/ObjC bridging pitfalls.
|
||||||
let paramsObject: ProtoAnyCodable? = params.map { entries in
|
let paramsObject: ProtoAnyCodable? = params.map { entries in
|
||||||
let dict = entries.reduce(into: [String: ProtoAnyCodable]()) { dict, entry in
|
let dict = entries.reduce(into: [String: ProtoAnyCodable]()) { dict, entry in
|
||||||
@@ -250,6 +258,11 @@ private actor GatewayChannelActor {
|
|||||||
let data = try self.encoder.encode(frame)
|
let data = try self.encoder.encode(frame)
|
||||||
let response = try await withCheckedThrowingContinuation { (cont: CheckedContinuation<GatewayFrame, Error>) in
|
let response = try await withCheckedThrowingContinuation { (cont: CheckedContinuation<GatewayFrame, Error>) in
|
||||||
self.pending[id] = cont
|
self.pending[id] = cont
|
||||||
|
Task { [weak self] in
|
||||||
|
guard let self else { return }
|
||||||
|
try? await Task.sleep(nanoseconds: UInt64(effectiveTimeout * 1_000_000))
|
||||||
|
await self.timeoutRequest(id: id, timeoutMs: effectiveTimeout)
|
||||||
|
}
|
||||||
Task {
|
Task {
|
||||||
do {
|
do {
|
||||||
try await self.task?.send(.data(data))
|
try await self.task?.send(.data(data))
|
||||||
@@ -286,6 +299,23 @@ private actor GatewayChannelActor {
|
|||||||
let desc = ns.localizedDescription.isEmpty ? "unknown" : ns.localizedDescription
|
let desc = ns.localizedDescription.isEmpty ? "unknown" : ns.localizedDescription
|
||||||
return NSError(domain: ns.domain, code: ns.code, userInfo: [NSLocalizedDescriptionKey: "\(context): \(desc)"])
|
return NSError(domain: ns.domain, code: ns.code, userInfo: [NSLocalizedDescriptionKey: "\(context): \(desc)"])
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private func failPending(_ error: Error) async {
|
||||||
|
let waiters = self.pending
|
||||||
|
self.pending.removeAll()
|
||||||
|
for (_, waiter) in waiters {
|
||||||
|
waiter.resume(throwing: error)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private func timeoutRequest(id: String, timeoutMs: Double) async {
|
||||||
|
guard let waiter = self.pending.removeValue(forKey: id) else { return }
|
||||||
|
let err = NSError(
|
||||||
|
domain: "Gateway",
|
||||||
|
code: 5,
|
||||||
|
userInfo: [NSLocalizedDescriptionKey: "gateway request timed out after \(Int(timeoutMs))ms"])
|
||||||
|
waiter.resume(throwing: err)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
actor GatewayChannel {
|
actor GatewayChannel {
|
||||||
@@ -295,10 +325,14 @@ actor GatewayChannel {
|
|||||||
self.inner = GatewayChannelActor(url: url, token: token)
|
self.inner = GatewayChannelActor(url: url, token: token)
|
||||||
}
|
}
|
||||||
|
|
||||||
func request(method: String, params: [String: AnyCodable]?) async throws -> Data {
|
func request(
|
||||||
|
method: String,
|
||||||
|
params: [String: AnyCodable]?,
|
||||||
|
timeoutMs: Double? = nil) async throws -> Data
|
||||||
|
{
|
||||||
guard let inner else {
|
guard let inner else {
|
||||||
throw NSError(domain: "Gateway", code: 0, userInfo: [NSLocalizedDescriptionKey: "not configured"])
|
throw NSError(domain: "Gateway", code: 0, userInfo: [NSLocalizedDescriptionKey: "not configured"])
|
||||||
}
|
}
|
||||||
return try await inner.request(method: method, params: params)
|
return try await inner.request(method: method, params: params, timeoutMs: timeoutMs)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user