fix(mac): serialize gateway connect
This commit is contained in:
@@ -23,6 +23,8 @@ private actor GatewayChannelActor {
|
|||||||
private var task: URLSessionWebSocketTask?
|
private var task: URLSessionWebSocketTask?
|
||||||
private var pending: [String: CheckedContinuation<GatewayFrame, Error>] = [:]
|
private var pending: [String: CheckedContinuation<GatewayFrame, Error>] = [:]
|
||||||
private var connected = false
|
private var connected = false
|
||||||
|
private var isConnecting = false
|
||||||
|
private var connectWaiters: [CheckedContinuation<Void, Error>] = []
|
||||||
private var url: URL
|
private var url: URL
|
||||||
private var token: String?
|
private var token: String?
|
||||||
private let session = URLSession(configuration: .default)
|
private let session = URLSession(configuration: .default)
|
||||||
@@ -68,6 +70,15 @@ private actor GatewayChannelActor {
|
|||||||
|
|
||||||
func connect() async throws {
|
func connect() async throws {
|
||||||
if self.connected, self.task?.state == .running { return }
|
if self.connected, self.task?.state == .running { return }
|
||||||
|
if self.isConnecting {
|
||||||
|
try await withCheckedThrowingContinuation { cont in
|
||||||
|
self.connectWaiters.append(cont)
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
self.isConnecting = true
|
||||||
|
defer { self.isConnecting = false }
|
||||||
|
|
||||||
self.task?.cancel(with: .goingAway, reason: nil)
|
self.task?.cancel(with: .goingAway, reason: nil)
|
||||||
self.task = self.session.webSocketTask(with: self.url)
|
self.task = self.session.webSocketTask(with: self.url)
|
||||||
self.task?.resume()
|
self.task?.resume()
|
||||||
@@ -75,12 +86,26 @@ private actor GatewayChannelActor {
|
|||||||
try await self.sendHello()
|
try await self.sendHello()
|
||||||
} catch {
|
} catch {
|
||||||
let wrapped = self.wrap(error, context: "connect to gateway @ \(self.url.absoluteString)")
|
let wrapped = self.wrap(error, context: "connect to gateway @ \(self.url.absoluteString)")
|
||||||
|
self.connected = false
|
||||||
|
self.task?.cancel(with: .goingAway, reason: nil)
|
||||||
|
let waiters = self.connectWaiters
|
||||||
|
self.connectWaiters.removeAll()
|
||||||
|
for waiter in waiters {
|
||||||
|
waiter.resume(throwing: wrapped)
|
||||||
|
}
|
||||||
|
self.logger.error("gateway ws connect failed \(wrapped.localizedDescription, privacy: .public)")
|
||||||
throw wrapped
|
throw wrapped
|
||||||
}
|
}
|
||||||
self.listen()
|
self.listen()
|
||||||
self.connected = true
|
self.connected = true
|
||||||
self.backoffMs = 500
|
self.backoffMs = 500
|
||||||
self.lastSeq = nil
|
self.lastSeq = nil
|
||||||
|
|
||||||
|
let waiters = self.connectWaiters
|
||||||
|
self.connectWaiters.removeAll()
|
||||||
|
for waiter in waiters {
|
||||||
|
waiter.resume(returning: ())
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private func sendHello() async throws {
|
private func sendHello() async throws {
|
||||||
@@ -141,7 +166,7 @@ private actor GatewayChannelActor {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
if let err = try? decoder.decode(HelloError.self, from: data) {
|
if let err = try? decoder.decode(HelloError.self, from: data) {
|
||||||
let reason = err.reason ?? "unknown"
|
let reason = err.reason
|
||||||
// Log and throw a detailed error so UI can surface token/hello issues.
|
// Log and throw a detailed error so UI can surface token/hello issues.
|
||||||
self.logger.error("gateway hello-error: \(reason, privacy: .public)")
|
self.logger.error("gateway hello-error: \(reason, privacy: .public)")
|
||||||
throw NSError(
|
throw NSError(
|
||||||
@@ -284,6 +309,13 @@ private actor GatewayChannelActor {
|
|||||||
try await self.task?.send(.data(data))
|
try await self.task?.send(.data(data))
|
||||||
} catch {
|
} catch {
|
||||||
self.pending.removeValue(forKey: id)
|
self.pending.removeValue(forKey: id)
|
||||||
|
// Treat send failures as a broken socket: mark disconnected and trigger reconnect.
|
||||||
|
self.connected = false
|
||||||
|
self.task?.cancel(with: .goingAway, reason: nil)
|
||||||
|
Task { [weak self] in
|
||||||
|
guard let self else { return }
|
||||||
|
await self.scheduleReconnect()
|
||||||
|
}
|
||||||
cont.resume(throwing: self.wrap(error, context: "gateway send \(method)"))
|
cont.resume(throwing: self.wrap(error, context: "gateway send \(method)"))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user