refactor(gateway): share request encoding
This commit is contained in:
committed by
Peter Steinberger
parent
81e915110e
commit
b6581e77f6
@@ -574,46 +574,22 @@ public actor GatewayChannelActor {
|
||||
params: [String: AnyCodable]?,
|
||||
timeoutMs: Double? = nil) async throws -> Data
|
||||
{
|
||||
do {
|
||||
try await self.connect()
|
||||
} catch {
|
||||
throw self.wrap(error, context: "gateway connect")
|
||||
}
|
||||
let id = UUID().uuidString
|
||||
try await self.connectOrThrow(context: "gateway connect")
|
||||
let effectiveTimeout = timeoutMs ?? self.defaultRequestTimeoutMs
|
||||
// Encode request using the generated models to avoid JSONSerialization/ObjC bridging pitfalls.
|
||||
let paramsObject: ProtoAnyCodable? = params.map { entries in
|
||||
let dict = entries.reduce(into: [String: ProtoAnyCodable]()) { dict, entry in
|
||||
dict[entry.key] = ProtoAnyCodable(entry.value.value)
|
||||
}
|
||||
return ProtoAnyCodable(dict)
|
||||
}
|
||||
let frame = RequestFrame(
|
||||
type: "req",
|
||||
id: id,
|
||||
method: method,
|
||||
params: paramsObject)
|
||||
let data: Data
|
||||
do {
|
||||
data = try self.encoder.encode(frame)
|
||||
} catch {
|
||||
self.logger.error(
|
||||
"gateway request encode failed \(method, privacy: .public) error=\(error.localizedDescription, privacy: .public)")
|
||||
throw error
|
||||
}
|
||||
let payload = try self.encodeRequest(method: method, params: params, kind: "request")
|
||||
let response = try await withCheckedThrowingContinuation { (cont: CheckedContinuation<GatewayFrame, Error>) in
|
||||
self.pending[id] = cont
|
||||
self.pending[payload.id] = cont
|
||||
Task { [weak self] in
|
||||
guard let self else { return }
|
||||
try? await Task.sleep(nanoseconds: UInt64(effectiveTimeout * 1_000_000))
|
||||
await self.timeoutRequest(id: id, timeoutMs: effectiveTimeout)
|
||||
await self.timeoutRequest(id: payload.id, timeoutMs: effectiveTimeout)
|
||||
}
|
||||
Task {
|
||||
do {
|
||||
try await self.task?.send(.data(data))
|
||||
try await self.task?.send(.data(payload.data))
|
||||
} catch {
|
||||
let wrapped = self.wrap(error, context: "gateway send \(method)")
|
||||
let waiter = self.pending.removeValue(forKey: id)
|
||||
let waiter = self.pending.removeValue(forKey: payload.id)
|
||||
// Treat send failures as a broken socket: mark disconnected and trigger reconnect.
|
||||
self.connected = false
|
||||
self.task?.cancel(with: .goingAway, reason: nil)
|
||||
@@ -644,31 +620,8 @@ public actor GatewayChannelActor {
|
||||
}
|
||||
|
||||
public func send(method: String, params: [String: AnyCodable]?) async throws {
|
||||
do {
|
||||
try await self.connect()
|
||||
} catch {
|
||||
throw self.wrap(error, context: "gateway connect")
|
||||
}
|
||||
let id = UUID().uuidString
|
||||
let paramsObject: ProtoAnyCodable? = params.map { entries in
|
||||
let dict = entries.reduce(into: [String: ProtoAnyCodable]()) { dict, entry in
|
||||
dict[entry.key] = ProtoAnyCodable(entry.value.value)
|
||||
}
|
||||
return ProtoAnyCodable(dict)
|
||||
}
|
||||
let frame = RequestFrame(
|
||||
type: "req",
|
||||
id: id,
|
||||
method: method,
|
||||
params: paramsObject)
|
||||
let data: Data
|
||||
do {
|
||||
data = try self.encoder.encode(frame)
|
||||
} catch {
|
||||
self.logger.error(
|
||||
"gateway send encode failed \(method, privacy: .public) error=\(error.localizedDescription, privacy: .public)")
|
||||
throw error
|
||||
}
|
||||
try await self.connectOrThrow(context: "gateway connect")
|
||||
let payload = try self.encodeRequest(method: method, params: params, kind: "send")
|
||||
guard let task = self.task else {
|
||||
throw NSError(
|
||||
domain: "Gateway",
|
||||
@@ -676,7 +629,7 @@ public actor GatewayChannelActor {
|
||||
userInfo: [NSLocalizedDescriptionKey: "gateway socket unavailable"])
|
||||
}
|
||||
do {
|
||||
try await task.send(.data(data))
|
||||
try await task.send(.data(payload.data))
|
||||
} catch {
|
||||
let wrapped = self.wrap(error, context: "gateway send \(method)")
|
||||
self.connected = false
|
||||
@@ -703,6 +656,42 @@ public actor GatewayChannelActor {
|
||||
return NSError(domain: ns.domain, code: ns.code, userInfo: [NSLocalizedDescriptionKey: "\(context): \(desc)"])
|
||||
}
|
||||
|
||||
private func connectOrThrow(context: String) async throws {
|
||||
do {
|
||||
try await self.connect()
|
||||
} catch {
|
||||
throw self.wrap(error, context: context)
|
||||
}
|
||||
}
|
||||
|
||||
private func encodeRequest(
|
||||
method: String,
|
||||
params: [String: AnyCodable]?,
|
||||
kind: String) throws -> (id: String, data: Data)
|
||||
{
|
||||
let id = UUID().uuidString
|
||||
// Encode request using the generated models to avoid JSONSerialization/ObjC bridging pitfalls.
|
||||
let paramsObject: ProtoAnyCodable? = params.map { entries in
|
||||
let dict = entries.reduce(into: [String: ProtoAnyCodable]()) { dict, entry in
|
||||
dict[entry.key] = ProtoAnyCodable(entry.value.value)
|
||||
}
|
||||
return ProtoAnyCodable(dict)
|
||||
}
|
||||
let frame = RequestFrame(
|
||||
type: "req",
|
||||
id: id,
|
||||
method: method,
|
||||
params: paramsObject)
|
||||
do {
|
||||
let data = try self.encoder.encode(frame)
|
||||
return (id: id, data: data)
|
||||
} catch {
|
||||
self.logger.error(
|
||||
"gateway \(kind) encode failed \(method, privacy: .public) error=\(error.localizedDescription, privacy: .public)")
|
||||
throw error
|
||||
}
|
||||
}
|
||||
|
||||
private func failPending(_ error: Error) async {
|
||||
let waiters = self.pending
|
||||
self.pending.removeAll()
|
||||
|
||||
Reference in New Issue
Block a user