diff --git a/apps/macos/Sources/Clawdis/ControlChannel.swift b/apps/macos/Sources/Clawdis/ControlChannel.swift index a1de1cb37..8d64f58d5 100644 --- a/apps/macos/Sources/Clawdis/ControlChannel.swift +++ b/apps/macos/Sources/Clawdis/ControlChannel.swift @@ -1,6 +1,7 @@ import Foundation import OSLog import SwiftUI +import ClawdisProtocol struct ControlHeartbeatEvent: Codable { let ts: Double @@ -132,7 +133,7 @@ final class ControlChannel: ObservableObject { forName: .gatewayEvent, object: nil, queue: .main) - { [weak self] @MainActor note in + { [weak self] note in guard let self, let obj = note.userInfo as? [String: Any], let event = obj["event"] as? String else { return } @@ -146,18 +147,20 @@ final class ControlChannel: ObservableObject { let dataDict = payload["data"] as? [String: Any] { let wrapped = dataDict.mapValues { AnyCodable($0) } - AgentEventStore.shared.append(ControlAgentEvent( - runId: runId, - seq: seq, - stream: stream, - ts: ts, - data: wrapped)) + Task { @MainActor in + AgentEventStore.shared.append(ControlAgentEvent( + runId: runId, + seq: seq, + stream: stream, + ts: ts, + data: wrapped)) + } } case "presence": // InstancesStore listens separately via notification break case "shutdown": - self.state = .degraded("gateway shutdown") + Task { @MainActor in self.state = .degraded("gateway shutdown") } default: break } @@ -166,8 +169,8 @@ final class ControlChannel: ObservableObject { forName: .gatewaySnapshot, object: nil, queue: .main) - { [weak self] @MainActor _ in - self?.state = .connected + { [weak self] _ in + Task { @MainActor [weak self] in self?.state = .connected } } self.eventTokens = [ev, tick] } diff --git a/apps/macos/Sources/Clawdis/GatewayChannel.swift b/apps/macos/Sources/Clawdis/GatewayChannel.swift index 2349bd400..41426668a 100644 --- a/apps/macos/Sources/Clawdis/GatewayChannel.swift +++ b/apps/macos/Sources/Clawdis/GatewayChannel.swift @@ -1,5 +1,6 @@ import Foundation import OSLog +import ClawdisProtocol struct GatewayEvent: Codable { let type: String @@ -17,7 +18,7 @@ extension Notification.Name { private actor GatewayChannelActor { private let logger = Logger(subsystem: "com.steipete.clawdis", category: "gateway") private var task: URLSessionWebSocketTask? - private var pending: [String: CheckedContinuation] = [:] + private var pending: [String: CheckedContinuation] = [:] private var connected = false private var url: URL private var token: String? @@ -25,6 +26,8 @@ private actor GatewayChannelActor { private var backoffMs: Double = 500 private var shouldReconnect = true private var lastSeq: Int? + private let decoder = JSONDecoder() + private let encoder = JSONEncoder() init(url: URL, token: String?) { self.url = url @@ -90,8 +93,10 @@ private actor GatewayChannelActor { case let .failure(err): Task { await self.handleReceiveFailure(err) } case let .success(msg): - Task { await self.handle(msg) } - self.listen() + Task { + await self.handle(msg) + await self.listen() + } } } } @@ -109,26 +114,28 @@ private actor GatewayChannelActor { @unknown default: nil } guard let data else { return } - guard let obj = try? JSONSerialization.jsonObject(with: data) as? [String: Any], - let type = obj["type"] as? String else { return } - switch type { - case "res": - if let id = obj["id"] as? String, let waiter = pending.removeValue(forKey: id) { - waiter.resume(returning: data) + guard let frame = try? self.decoder.decode(GatewayFrame.self, from: data) else { + self.logger.error("gateway decode failed") + return + } + switch frame { + case let .res(res): + if let id = res.id, let waiter = pending.removeValue(forKey: id) { + waiter.resume(returning: .res(res)) } - case "event": - if let seq = obj["seq"] as? Int { + case let .event(evt): + if let seq = evt.seq { if let last = lastSeq, seq > last + 1 { NotificationCenter.default.post( name: .gatewaySeqGap, - object: nil, + object: frame, userInfo: ["expected": last + 1, "received": seq]) } self.lastSeq = seq } - NotificationCenter.default.post(name: .gatewayEvent, object: nil, userInfo: obj) - case "hello-ok": - NotificationCenter.default.post(name: .gatewaySnapshot, object: nil, userInfo: obj) + NotificationCenter.default.post(name: .gatewayEvent, object: frame) + case .helloOk: + NotificationCenter.default.post(name: .gatewaySnapshot, object: frame) default: break } @@ -160,7 +167,7 @@ private actor GatewayChannelActor { "params": paramsObject, ] let data = try JSONSerialization.data(withJSONObject: frame) - let response = try await withCheckedThrowingContinuation { (cont: CheckedContinuation) in + let response = try await withCheckedThrowingContinuation { (cont: CheckedContinuation) in self.pending[id] = cont Task { do { @@ -171,7 +178,18 @@ private actor GatewayChannelActor { } } } - return response + guard case let .res(res) = response else { + throw NSError(domain: "Gateway", code: 2, userInfo: [NSLocalizedDescriptionKey: "unexpected frame"]) + } + if res.ok == false { + let msg = (res.error?.message) ?? "gateway error" + throw NSError(domain: "Gateway", code: 3, userInfo: [NSLocalizedDescriptionKey: msg]) + } + if let payload = res.payload?.value { + let payloadData = try JSONSerialization.data(withJSONObject: payload) + return payloadData + } + return Data() } } diff --git a/apps/macos/Sources/Clawdis/InstancesStore.swift b/apps/macos/Sources/Clawdis/InstancesStore.swift index f006cebad..64c55c8cc 100644 --- a/apps/macos/Sources/Clawdis/InstancesStore.swift +++ b/apps/macos/Sources/Clawdis/InstancesStore.swift @@ -1,6 +1,7 @@ import Cocoa import Foundation import OSLog +import ClawdisProtocol struct InstanceInfo: Identifiable, Codable { let id: String @@ -65,12 +66,18 @@ final class InstancesStore: ObservableObject { forName: .gatewayEvent, object: nil, queue: .main) - { [weak self] @MainActor note in + { [weak self] note in guard let self, - let obj = note.userInfo as? [String: Any], - let event = obj["event"] as? String else { return } - if event == "presence", let payload = obj["payload"] as? [String: Any] { - self.handlePresencePayload(payload) + let frame = note.object as? GatewayFrame else { return } + switch frame { + case let .event(evt) where evt.event == "presence": + if let payload = evt.payload?.value as? [String: Any], + let presence = payload["presence"], + let presenceData = try? JSONSerialization.data(withJSONObject: presence) { + Task { @MainActor [weak self] in self?.decodeAndApplyPresenceData(presenceData) } + } + default: + break } } let gap = NotificationCenter.default.addObserver( @@ -85,12 +92,18 @@ final class InstancesStore: ObservableObject { forName: .gatewaySnapshot, object: nil, queue: .main) - { [weak self] @MainActor note in + { [weak self] note in guard let self, - let obj = note.userInfo as? [String: Any], - let snapshot = obj["snapshot"] as? [String: Any], - let presence = snapshot["presence"] else { return } - self.decodeAndApplyPresence(presence: presence) + let frame = note.object as? GatewayFrame else { return } + switch frame { + case let .helloOk(hello): + let presence = hello.snapshot.presence + if let data = try? JSONEncoder().encode(presence) { + Task { @MainActor [weak self] in self?.decodeAndApplyPresenceData(data) } + } + default: + break + } } self.observers = [ev, snap, gap] } @@ -257,14 +270,7 @@ final class InstancesStore: ObservableObject { } } - private func handlePresencePayload(_ payload: [String: Any]) { - if let presence = payload["presence"] { - self.decodeAndApplyPresence(presence: presence) - } - } - - private func decodeAndApplyPresence(presence: Any) { - guard let data = try? JSONSerialization.data(withJSONObject: presence) else { return } + private func decodeAndApplyPresenceData(_ data: Data) { do { let decoded = try JSONDecoder().decode([InstanceInfo].self, from: data) let withIDs = decoded.map { entry -> InstanceInfo in @@ -285,6 +291,7 @@ final class InstancesStore: ObservableObject { self.lastError = nil } catch { self.logger.error("presence decode from event failed: \(error.localizedDescription, privacy: .public)") + self.lastError = error.localizedDescription } } } diff --git a/apps/macos/Sources/ClawdisProtocol/GatewayModels.swift b/apps/macos/Sources/ClawdisProtocol/GatewayModels.swift index 40952bc5e..11f7e84d7 100644 --- a/apps/macos/Sources/ClawdisProtocol/GatewayModels.swift +++ b/apps/macos/Sources/ClawdisProtocol/GatewayModels.swift @@ -341,6 +341,7 @@ public enum GatewayFrame: Codable { case req(RequestFrame) case res(ResponseFrame) case event(EventFrame) + case unknown(type: String, raw: [String: AnyCodable]) public init(from decoder: Decoder) throws { let container = try decoder.singleValueContainer() @@ -362,7 +363,7 @@ public enum GatewayFrame: Codable { case "event": self = .event(try decodePayload(EventFrame.self, from: raw)) default: - throw DecodingError.dataCorruptedError(in: container, debugDescription: "unknown type (type)") + self = .unknown(type: type, raw: raw) } } @@ -374,6 +375,9 @@ public enum GatewayFrame: Codable { case .req(let v): try v.encode(to: encoder) case .res(let v): try v.encode(to: encoder) case .event(let v): try v.encode(to: encoder) + case .unknown(_, let raw): + var container = encoder.singleValueContainer() + try container.encode(raw) } } diff --git a/scripts/protocol-gen-swift.ts b/scripts/protocol-gen-swift.ts index 293805ad0..1a46488b6 100644 --- a/scripts/protocol-gen-swift.ts +++ b/scripts/protocol-gen-swift.ts @@ -135,7 +135,7 @@ function emitGatewayFrame(): string { case "event": self = .event(try decodePayload(EventFrame.self, from: raw)) default: - throw DecodingError.dataCorruptedError(in: container, debugDescription: "unknown type \(type)") + self = .unknown(type: type, raw: raw) } } @@ -147,6 +147,9 @@ function emitGatewayFrame(): string { case .req(let v): try v.encode(to: encoder) case .res(let v): try v.encode(to: encoder) case .event(let v): try v.encode(to: encoder) + case .unknown(_, let raw): + var container = encoder.singleValueContainer() + try container.encode(raw) } } `; @@ -162,6 +165,7 @@ function emitGatewayFrame(): string { return [ "public enum GatewayFrame: Codable {", ...caseLines, + " case unknown(type: String, raw: [String: AnyCodable])", initLines, helper, "}",