Mac: use typed GatewayFrame + forward-compatible Swift generator
This commit is contained in:
@@ -1,6 +1,7 @@
|
||||
import Foundation
|
||||
import OSLog
|
||||
import SwiftUI
|
||||
import ClawdisProtocol
|
||||
|
||||
struct ControlHeartbeatEvent: Codable {
|
||||
let ts: Double
|
||||
@@ -132,7 +133,7 @@ final class ControlChannel: ObservableObject {
|
||||
forName: .gatewayEvent,
|
||||
object: nil,
|
||||
queue: .main)
|
||||
{ [weak self] @MainActor note in
|
||||
{ [weak self] note in
|
||||
guard let self,
|
||||
let obj = note.userInfo as? [String: Any],
|
||||
let event = obj["event"] as? String else { return }
|
||||
@@ -146,18 +147,20 @@ final class ControlChannel: ObservableObject {
|
||||
let dataDict = payload["data"] as? [String: Any]
|
||||
{
|
||||
let wrapped = dataDict.mapValues { AnyCodable($0) }
|
||||
AgentEventStore.shared.append(ControlAgentEvent(
|
||||
runId: runId,
|
||||
seq: seq,
|
||||
stream: stream,
|
||||
ts: ts,
|
||||
data: wrapped))
|
||||
Task { @MainActor in
|
||||
AgentEventStore.shared.append(ControlAgentEvent(
|
||||
runId: runId,
|
||||
seq: seq,
|
||||
stream: stream,
|
||||
ts: ts,
|
||||
data: wrapped))
|
||||
}
|
||||
}
|
||||
case "presence":
|
||||
// InstancesStore listens separately via notification
|
||||
break
|
||||
case "shutdown":
|
||||
self.state = .degraded("gateway shutdown")
|
||||
Task { @MainActor in self.state = .degraded("gateway shutdown") }
|
||||
default:
|
||||
break
|
||||
}
|
||||
@@ -166,8 +169,8 @@ final class ControlChannel: ObservableObject {
|
||||
forName: .gatewaySnapshot,
|
||||
object: nil,
|
||||
queue: .main)
|
||||
{ [weak self] @MainActor _ in
|
||||
self?.state = .connected
|
||||
{ [weak self] _ in
|
||||
Task { @MainActor [weak self] in self?.state = .connected }
|
||||
}
|
||||
self.eventTokens = [ev, tick]
|
||||
}
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
import Foundation
|
||||
import OSLog
|
||||
import ClawdisProtocol
|
||||
|
||||
struct GatewayEvent: Codable {
|
||||
let type: String
|
||||
@@ -17,7 +18,7 @@ extension Notification.Name {
|
||||
private actor GatewayChannelActor {
|
||||
private let logger = Logger(subsystem: "com.steipete.clawdis", category: "gateway")
|
||||
private var task: URLSessionWebSocketTask?
|
||||
private var pending: [String: CheckedContinuation<Data, Error>] = [:]
|
||||
private var pending: [String: CheckedContinuation<GatewayFrame, Error>] = [:]
|
||||
private var connected = false
|
||||
private var url: URL
|
||||
private var token: String?
|
||||
@@ -25,6 +26,8 @@ private actor GatewayChannelActor {
|
||||
private var backoffMs: Double = 500
|
||||
private var shouldReconnect = true
|
||||
private var lastSeq: Int?
|
||||
private let decoder = JSONDecoder()
|
||||
private let encoder = JSONEncoder()
|
||||
|
||||
init(url: URL, token: String?) {
|
||||
self.url = url
|
||||
@@ -90,8 +93,10 @@ private actor GatewayChannelActor {
|
||||
case let .failure(err):
|
||||
Task { await self.handleReceiveFailure(err) }
|
||||
case let .success(msg):
|
||||
Task { await self.handle(msg) }
|
||||
self.listen()
|
||||
Task {
|
||||
await self.handle(msg)
|
||||
await self.listen()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -109,26 +114,28 @@ private actor GatewayChannelActor {
|
||||
@unknown default: nil
|
||||
}
|
||||
guard let data else { return }
|
||||
guard let obj = try? JSONSerialization.jsonObject(with: data) as? [String: Any],
|
||||
let type = obj["type"] as? String else { return }
|
||||
switch type {
|
||||
case "res":
|
||||
if let id = obj["id"] as? String, let waiter = pending.removeValue(forKey: id) {
|
||||
waiter.resume(returning: data)
|
||||
guard let frame = try? self.decoder.decode(GatewayFrame.self, from: data) else {
|
||||
self.logger.error("gateway decode failed")
|
||||
return
|
||||
}
|
||||
switch frame {
|
||||
case let .res(res):
|
||||
if let id = res.id, let waiter = pending.removeValue(forKey: id) {
|
||||
waiter.resume(returning: .res(res))
|
||||
}
|
||||
case "event":
|
||||
if let seq = obj["seq"] as? Int {
|
||||
case let .event(evt):
|
||||
if let seq = evt.seq {
|
||||
if let last = lastSeq, seq > last + 1 {
|
||||
NotificationCenter.default.post(
|
||||
name: .gatewaySeqGap,
|
||||
object: nil,
|
||||
object: frame,
|
||||
userInfo: ["expected": last + 1, "received": seq])
|
||||
}
|
||||
self.lastSeq = seq
|
||||
}
|
||||
NotificationCenter.default.post(name: .gatewayEvent, object: nil, userInfo: obj)
|
||||
case "hello-ok":
|
||||
NotificationCenter.default.post(name: .gatewaySnapshot, object: nil, userInfo: obj)
|
||||
NotificationCenter.default.post(name: .gatewayEvent, object: frame)
|
||||
case .helloOk:
|
||||
NotificationCenter.default.post(name: .gatewaySnapshot, object: frame)
|
||||
default:
|
||||
break
|
||||
}
|
||||
@@ -160,7 +167,7 @@ private actor GatewayChannelActor {
|
||||
"params": paramsObject,
|
||||
]
|
||||
let data = try JSONSerialization.data(withJSONObject: frame)
|
||||
let response = try await withCheckedThrowingContinuation { (cont: CheckedContinuation<Data, Error>) in
|
||||
let response = try await withCheckedThrowingContinuation { (cont: CheckedContinuation<GatewayFrame, Error>) in
|
||||
self.pending[id] = cont
|
||||
Task {
|
||||
do {
|
||||
@@ -171,7 +178,18 @@ private actor GatewayChannelActor {
|
||||
}
|
||||
}
|
||||
}
|
||||
return response
|
||||
guard case let .res(res) = response else {
|
||||
throw NSError(domain: "Gateway", code: 2, userInfo: [NSLocalizedDescriptionKey: "unexpected frame"])
|
||||
}
|
||||
if res.ok == false {
|
||||
let msg = (res.error?.message) ?? "gateway error"
|
||||
throw NSError(domain: "Gateway", code: 3, userInfo: [NSLocalizedDescriptionKey: msg])
|
||||
}
|
||||
if let payload = res.payload?.value {
|
||||
let payloadData = try JSONSerialization.data(withJSONObject: payload)
|
||||
return payloadData
|
||||
}
|
||||
return Data()
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
import Cocoa
|
||||
import Foundation
|
||||
import OSLog
|
||||
import ClawdisProtocol
|
||||
|
||||
struct InstanceInfo: Identifiable, Codable {
|
||||
let id: String
|
||||
@@ -65,12 +66,18 @@ final class InstancesStore: ObservableObject {
|
||||
forName: .gatewayEvent,
|
||||
object: nil,
|
||||
queue: .main)
|
||||
{ [weak self] @MainActor note in
|
||||
{ [weak self] note in
|
||||
guard let self,
|
||||
let obj = note.userInfo as? [String: Any],
|
||||
let event = obj["event"] as? String else { return }
|
||||
if event == "presence", let payload = obj["payload"] as? [String: Any] {
|
||||
self.handlePresencePayload(payload)
|
||||
let frame = note.object as? GatewayFrame else { return }
|
||||
switch frame {
|
||||
case let .event(evt) where evt.event == "presence":
|
||||
if let payload = evt.payload?.value as? [String: Any],
|
||||
let presence = payload["presence"],
|
||||
let presenceData = try? JSONSerialization.data(withJSONObject: presence) {
|
||||
Task { @MainActor [weak self] in self?.decodeAndApplyPresenceData(presenceData) }
|
||||
}
|
||||
default:
|
||||
break
|
||||
}
|
||||
}
|
||||
let gap = NotificationCenter.default.addObserver(
|
||||
@@ -85,12 +92,18 @@ final class InstancesStore: ObservableObject {
|
||||
forName: .gatewaySnapshot,
|
||||
object: nil,
|
||||
queue: .main)
|
||||
{ [weak self] @MainActor note in
|
||||
{ [weak self] note in
|
||||
guard let self,
|
||||
let obj = note.userInfo as? [String: Any],
|
||||
let snapshot = obj["snapshot"] as? [String: Any],
|
||||
let presence = snapshot["presence"] else { return }
|
||||
self.decodeAndApplyPresence(presence: presence)
|
||||
let frame = note.object as? GatewayFrame else { return }
|
||||
switch frame {
|
||||
case let .helloOk(hello):
|
||||
let presence = hello.snapshot.presence
|
||||
if let data = try? JSONEncoder().encode(presence) {
|
||||
Task { @MainActor [weak self] in self?.decodeAndApplyPresenceData(data) }
|
||||
}
|
||||
default:
|
||||
break
|
||||
}
|
||||
}
|
||||
self.observers = [ev, snap, gap]
|
||||
}
|
||||
@@ -257,14 +270,7 @@ final class InstancesStore: ObservableObject {
|
||||
}
|
||||
}
|
||||
|
||||
private func handlePresencePayload(_ payload: [String: Any]) {
|
||||
if let presence = payload["presence"] {
|
||||
self.decodeAndApplyPresence(presence: presence)
|
||||
}
|
||||
}
|
||||
|
||||
private func decodeAndApplyPresence(presence: Any) {
|
||||
guard let data = try? JSONSerialization.data(withJSONObject: presence) else { return }
|
||||
private func decodeAndApplyPresenceData(_ data: Data) {
|
||||
do {
|
||||
let decoded = try JSONDecoder().decode([InstanceInfo].self, from: data)
|
||||
let withIDs = decoded.map { entry -> InstanceInfo in
|
||||
@@ -285,6 +291,7 @@ final class InstancesStore: ObservableObject {
|
||||
self.lastError = nil
|
||||
} catch {
|
||||
self.logger.error("presence decode from event failed: \(error.localizedDescription, privacy: .public)")
|
||||
self.lastError = error.localizedDescription
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -341,6 +341,7 @@ public enum GatewayFrame: Codable {
|
||||
case req(RequestFrame)
|
||||
case res(ResponseFrame)
|
||||
case event(EventFrame)
|
||||
case unknown(type: String, raw: [String: AnyCodable])
|
||||
|
||||
public init(from decoder: Decoder) throws {
|
||||
let container = try decoder.singleValueContainer()
|
||||
@@ -362,7 +363,7 @@ public enum GatewayFrame: Codable {
|
||||
case "event":
|
||||
self = .event(try decodePayload(EventFrame.self, from: raw))
|
||||
default:
|
||||
throw DecodingError.dataCorruptedError(in: container, debugDescription: "unknown type (type)")
|
||||
self = .unknown(type: type, raw: raw)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -374,6 +375,9 @@ public enum GatewayFrame: Codable {
|
||||
case .req(let v): try v.encode(to: encoder)
|
||||
case .res(let v): try v.encode(to: encoder)
|
||||
case .event(let v): try v.encode(to: encoder)
|
||||
case .unknown(_, let raw):
|
||||
var container = encoder.singleValueContainer()
|
||||
try container.encode(raw)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -135,7 +135,7 @@ function emitGatewayFrame(): string {
|
||||
case "event":
|
||||
self = .event(try decodePayload(EventFrame.self, from: raw))
|
||||
default:
|
||||
throw DecodingError.dataCorruptedError(in: container, debugDescription: "unknown type \(type)")
|
||||
self = .unknown(type: type, raw: raw)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -147,6 +147,9 @@ function emitGatewayFrame(): string {
|
||||
case .req(let v): try v.encode(to: encoder)
|
||||
case .res(let v): try v.encode(to: encoder)
|
||||
case .event(let v): try v.encode(to: encoder)
|
||||
case .unknown(_, let raw):
|
||||
var container = encoder.singleValueContainer()
|
||||
try container.encode(raw)
|
||||
}
|
||||
}
|
||||
`;
|
||||
@@ -162,6 +165,7 @@ function emitGatewayFrame(): string {
|
||||
return [
|
||||
"public enum GatewayFrame: Codable {",
|
||||
...caseLines,
|
||||
" case unknown(type: String, raw: [String: AnyCodable])",
|
||||
initLines,
|
||||
helper,
|
||||
"}",
|
||||
|
||||
Reference in New Issue
Block a user