From d5d80f424799c273384c9a18d1667b203deca944 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Fri, 12 Dec 2025 23:29:57 +0000 Subject: [PATCH] feat(gateway)!: switch handshake to req:connect (protocol v2) --- .../Sources/Clawdis/GatewayChannel.swift | 118 +++--- .../Clawdis/Resources/WebChat/bootstrap.js | 38 +- .../Resources/WebChat/webchat.bundle.js | 45 +- .../ClawdisProtocol/GatewayModels.swift | 45 +- .../GatewayChannelConfigureTests.swift | 47 ++- .../GatewayChannelConnectTests.swift | 47 ++- .../GatewayChannelRequestTests.swift | 50 ++- .../GatewayChannelShutdownTests.swift | 48 ++- dist/protocol.schema.json | 334 +-------------- docs/architecture.md | 20 +- docs/gateway.md | 20 +- docs/ios/spec.md | 6 +- docs/presence.md | 10 +- docs/refactor/new-arch.md | 27 +- docs/refactor/webagent-session.md | 2 +- docs/typebox.md | 10 +- docs/webchat.md | 2 +- scripts/protocol-gen-swift.ts | 14 +- scripts/protocol-gen.ts | 6 - src/cli/program.ts | 2 +- src/gateway/client.test.ts | 10 +- src/gateway/client.ts | 42 +- src/gateway/protocol/index.ts | 15 +- src/gateway/protocol/schema.ts | 30 +- src/gateway/server.test.ts | 396 +++++++----------- src/gateway/server.ts | 157 ++++--- 26 files changed, 586 insertions(+), 955 deletions(-) diff --git a/apps/macos/Sources/Clawdis/GatewayChannel.swift b/apps/macos/Sources/Clawdis/GatewayChannel.swift index f4118d602..fbad33efa 100644 --- a/apps/macos/Sources/Clawdis/GatewayChannel.swift +++ b/apps/macos/Sources/Clawdis/GatewayChannel.swift @@ -151,7 +151,7 @@ actor GatewayChannelActor { self.task = self.session.makeWebSocketTask(url: self.url) self.task?.resume() do { - try await self.sendHello() + try await self.sendConnect() } catch { let wrapped = self.wrap(error, context: "connect to gateway @ \(self.url.absoluteString)") self.connected = false @@ -176,40 +176,50 @@ actor GatewayChannelActor { } } - private func sendHello() async throws { + private func sendConnect() async throws { let osVersion = ProcessInfo.processInfo.operatingSystemVersion let platform = "macos \(osVersion.majorVersion).\(osVersion.minorVersion).\(osVersion.patchVersion)" let primaryLocale = Locale.preferredLanguages.first ?? Locale.current.identifier let clientName = InstanceIdentity.displayName - let hello = Hello( - type: "hello", - minprotocol: GATEWAY_PROTOCOL_VERSION, - maxprotocol: GATEWAY_PROTOCOL_VERSION, - client: [ - "name": ClawdisProtocol.AnyCodable(clientName), - "version": ClawdisProtocol.AnyCodable( - Bundle.main.infoDictionary?["CFBundleShortVersionString"] as? String ?? "dev"), - "platform": ClawdisProtocol.AnyCodable(platform), - "mode": ClawdisProtocol.AnyCodable("app"), - "instanceId": ClawdisProtocol.AnyCodable(InstanceIdentity.instanceId), - ], - caps: [], - auth: self.token.map { ["token": ClawdisProtocol.AnyCodable($0)] }, - locale: primaryLocale, - useragent: ProcessInfo.processInfo.operatingSystemVersionString) - let data = try JSONEncoder().encode(hello) + let reqId = UUID().uuidString + let client: [String: ProtoAnyCodable] = [ + "name": ProtoAnyCodable(clientName), + "version": ProtoAnyCodable( + Bundle.main.infoDictionary?["CFBundleShortVersionString"] as? String ?? "dev"), + "platform": ProtoAnyCodable(platform), + "mode": ProtoAnyCodable("app"), + "instanceId": ProtoAnyCodable(InstanceIdentity.instanceId), + ] + var params: [String: ProtoAnyCodable] = [ + "minProtocol": ProtoAnyCodable(GATEWAY_PROTOCOL_VERSION), + "maxProtocol": ProtoAnyCodable(GATEWAY_PROTOCOL_VERSION), + "client": ProtoAnyCodable(client), + "caps": ProtoAnyCodable([] as [String]), + "locale": ProtoAnyCodable(primaryLocale), + "userAgent": ProtoAnyCodable(ProcessInfo.processInfo.operatingSystemVersionString), + ] + if let token = self.token { + params["auth"] = ProtoAnyCodable(["token": ProtoAnyCodable(token)]) + } + + let frame = RequestFrame( + type: "req", + id: reqId, + method: "connect", + params: ProtoAnyCodable(params)) + let data = try self.encoder.encode(frame) try await self.task?.send(.data(data)) guard let msg = try await task?.receive() else { throw NSError( domain: "Gateway", code: 1, - userInfo: [NSLocalizedDescriptionKey: "hello failed (no response)"]) + userInfo: [NSLocalizedDescriptionKey: "connect failed (no response)"]) } - try await self.handleHelloResponse(msg) + try await self.handleConnectResponse(msg, reqId: reqId) } - private func handleHelloResponse(_ msg: URLSessionWebSocketTask.Message) async throws { + private func handleConnectResponse(_ msg: URLSessionWebSocketTask.Message, reqId: String) async throws { let data: Data? = switch msg { case let .data(d): d case let .string(s): s.data(using: .utf8) @@ -219,37 +229,46 @@ actor GatewayChannelActor { throw NSError( domain: "Gateway", code: 1, - userInfo: [NSLocalizedDescriptionKey: "hello failed (empty response)"]) + userInfo: [NSLocalizedDescriptionKey: "connect failed (empty response)"]) } let decoder = JSONDecoder() - if let ok = try? decoder.decode(HelloOk.self, from: data) { - if let tick = ok.policy["tickIntervalMs"]?.value as? Double { - self.tickIntervalMs = tick - } else if let tick = ok.policy["tickIntervalMs"]?.value as? Int { - self.tickIntervalMs = Double(tick) - } - self.lastTick = Date() - self.tickTask?.cancel() - self.tickTask = Task { [weak self] in - guard let self else { return } - await self.watchTicks() - } - await self.pushHandler?(.snapshot(ok)) - return - } - if let err = try? decoder.decode(HelloError.self, from: data) { - let reason = err.reason - // Log and throw a detailed error so UI can surface token/hello issues. - self.logger.error("gateway hello-error: \(reason, privacy: .public)") + guard let frame = try? decoder.decode(GatewayFrame.self, from: data) else { throw NSError( domain: "Gateway", - code: 1008, - userInfo: [NSLocalizedDescriptionKey: "hello-error: \(reason)"]) + code: 1, + userInfo: [NSLocalizedDescriptionKey: "connect failed (invalid response)"]) } - throw NSError( - domain: "Gateway", - code: 1, - userInfo: [NSLocalizedDescriptionKey: "hello failed (unexpected response)"]) + guard case let .res(res) = frame, res.id == reqId else { + throw NSError( + domain: "Gateway", + code: 1, + userInfo: [NSLocalizedDescriptionKey: "connect failed (unexpected response)"]) + } + if res.ok == false { + let msg = (res.error?["message"]?.value as? String) ?? "gateway connect failed" + throw NSError(domain: "Gateway", code: 1008, userInfo: [NSLocalizedDescriptionKey: msg]) + } + guard let payload = res.payload else { + throw NSError( + domain: "Gateway", + code: 1, + userInfo: [NSLocalizedDescriptionKey: "connect failed (missing payload)"]) + } + let payloadData = try self.encoder.encode(payload) + let ok = try decoder.decode(HelloOk.self, from: payloadData) + if let tick = ok.policy["tickIntervalMs"]?.value as? Double { + self.tickIntervalMs = tick + } else if let tick = ok.policy["tickIntervalMs"]?.value as? Int { + self.tickIntervalMs = Double(tick) + } + self.lastTick = Date() + self.tickTask?.cancel() + self.tickTask = Task { [weak self] in + guard let self else { return } + await self.watchTicks() + } + await self.pushHandler?(.snapshot(ok)) + return } private func listen() { @@ -301,9 +320,6 @@ actor GatewayChannelActor { } if evt.event == "tick" { self.lastTick = Date() } await self.pushHandler?(.event(evt)) - case let .helloOk(ok): - self.lastTick = Date() - await self.pushHandler?(.snapshot(ok)) default: break } diff --git a/apps/macos/Sources/Clawdis/Resources/WebChat/bootstrap.js b/apps/macos/Sources/Clawdis/Resources/WebChat/bootstrap.js index b317a29c9..6a7239ec8 100644 --- a/apps/macos/Sources/Clawdis/Resources/WebChat/bootstrap.js +++ b/apps/macos/Sources/Clawdis/Resources/WebChat/bootstrap.js @@ -51,11 +51,11 @@ class GatewaySocket { this.ws = ws; ws.onopen = () => { - logStatus(`ws: open -> sending hello (${this.url})`); - const hello = { - type: "hello", - minProtocol: 1, - maxProtocol: 1, + const id = randomId(); + logStatus(`ws: open -> sending connect (${this.url})`); + const params = { + minProtocol: 2, + maxProtocol: 2, client: { name: "webchat-ui", version: "dev", @@ -63,8 +63,10 @@ class GatewaySocket { mode: "webchat", instanceId: randomId(), }, + caps: [], }; - ws.send(JSON.stringify(hello)); + ws.send(JSON.stringify({ type: "req", id, method: "connect", params })); + this.pending.set(id, { resolve, reject, _handshake: true }); }; ws.onerror = (err) => { @@ -91,14 +93,6 @@ class GatewaySocket { } catch { return; } - if (msg.type === "hello-ok") { - logStatus( - `ws: hello-ok presence=${msg?.snapshot?.presence?.length ?? 0} healthOk=${msg?.snapshot?.health?.ok ?? "n/a"}`, - ); - this.handlers.set("snapshot", msg.snapshot); - resolve(msg); - return; - } if (msg.type === "event") { const cb = this.handlers.get(msg.event); if (cb) cb(msg.payload, msg); @@ -108,8 +102,20 @@ class GatewaySocket { const pending = this.pending.get(msg.id); if (!pending) return; this.pending.delete(msg.id); - if (msg.ok) pending.resolve(msg.payload); - else pending.reject(new Error(msg.error?.message || "gateway error")); + if (msg.ok) { + if (pending._handshake) { + const helloOk = msg.payload; + logStatus( + `ws: hello-ok presence=${helloOk?.snapshot?.presence?.length ?? 0} healthOk=${helloOk?.snapshot?.health?.ok ?? "n/a"}`, + ); + this.handlers.set("snapshot", helloOk.snapshot); + pending.resolve(helloOk); + } else { + pending.resolve(msg.payload); + } + } else { + pending.reject(new Error(msg.error?.message || "gateway error")); + } } }; }); diff --git a/apps/macos/Sources/Clawdis/Resources/WebChat/webchat.bundle.js b/apps/macos/Sources/Clawdis/Resources/WebChat/webchat.bundle.js index e49db5fa5..225ecd899 100644 --- a/apps/macos/Sources/Clawdis/Resources/WebChat/webchat.bundle.js +++ b/apps/macos/Sources/Clawdis/Resources/WebChat/webchat.bundle.js @@ -196394,20 +196394,31 @@ var GatewaySocket = class { const ws = new WebSocket(this.url); this.ws = ws; ws.onopen = () => { - logStatus(`ws: open -> sending hello (${this.url})`); - const hello = { - type: "hello", - minProtocol: 1, - maxProtocol: 1, + const id = randomId(); + logStatus(`ws: open -> sending connect (${this.url})`); + const params = { + minProtocol: 2, + maxProtocol: 2, client: { name: "webchat-ui", version: "dev", platform: "browser", mode: "webchat", instanceId: randomId() - } + }, + caps: [] }; - ws.send(JSON.stringify(hello)); + ws.send(JSON.stringify({ + type: "req", + id, + method: "connect", + params + })); + this.pending.set(id, { + resolve, + reject, + _handshake: true + }); }; ws.onerror = (err) => { logStatus(`ws: error ${formatError(err)}`); @@ -196428,12 +196439,6 @@ var GatewaySocket = class { } catch { return; } - if (msg.type === "hello-ok") { - logStatus(`ws: hello-ok presence=${msg?.snapshot?.presence?.length ?? 0} healthOk=${msg?.snapshot?.health?.ok ?? "n/a"}`); - this.handlers.set("snapshot", msg.snapshot); - resolve(msg); - return; - } if (msg.type === "event") { const cb = this.handlers.get(msg.event); if (cb) cb(msg.payload, msg); @@ -196443,8 +196448,18 @@ var GatewaySocket = class { const pending = this.pending.get(msg.id); if (!pending) return; this.pending.delete(msg.id); - if (msg.ok) pending.resolve(msg.payload); - else pending.reject(new Error(msg.error?.message || "gateway error")); + if (msg.ok) { + if (pending._handshake) { + const helloOk = msg.payload; + logStatus(`ws: hello-ok presence=${helloOk?.snapshot?.presence?.length ?? 0} healthOk=${helloOk?.snapshot?.health?.ok ?? "n/a"}`); + this.handlers.set("snapshot", helloOk.snapshot); + pending.resolve(helloOk); + } else { + pending.resolve(msg.payload); + } + } else { + pending.reject(new Error(msg.error?.message || "gateway error")); + } } }; }); diff --git a/apps/macos/Sources/ClawdisProtocol/GatewayModels.swift b/apps/macos/Sources/ClawdisProtocol/GatewayModels.swift index 7f88763ef..4cb504446 100644 --- a/apps/macos/Sources/ClawdisProtocol/GatewayModels.swift +++ b/apps/macos/Sources/ClawdisProtocol/GatewayModels.swift @@ -1,7 +1,7 @@ // Generated by scripts/protocol-gen-swift.ts — do not edit by hand import Foundation -public let GATEWAY_PROTOCOL_VERSION = 1 +public let GATEWAY_PROTOCOL_VERSION = 2 public enum ErrorCode: String, Codable { case notLinked = "NOT_LINKED" @@ -10,8 +10,7 @@ public enum ErrorCode: String, Codable { case unavailable = "UNAVAILABLE" } -public struct Hello: Codable { - public let type: String +public struct ConnectParams: Codable { public let minprotocol: Int public let maxprotocol: Int public let client: [String: AnyCodable] @@ -21,7 +20,6 @@ public struct Hello: Codable { public let useragent: String? public init( - type: String, minprotocol: Int, maxprotocol: Int, client: [String: AnyCodable], @@ -30,7 +28,6 @@ public struct Hello: Codable { locale: String?, useragent: String? ) { - self.type = type self.minprotocol = minprotocol self.maxprotocol = maxprotocol self.client = client @@ -40,7 +37,6 @@ public struct Hello: Codable { self.useragent = useragent } private enum CodingKeys: String, CodingKey { - case type case minprotocol = "minProtocol" case maxprotocol = "maxProtocol" case client @@ -84,31 +80,6 @@ public struct HelloOk: Codable { } } -public struct HelloError: Codable { - public let type: String - public let reason: String - public let expectedprotocol: Int? - public let minclient: String? - - public init( - type: String, - reason: String, - expectedprotocol: Int?, - minclient: String? - ) { - self.type = type - self.reason = reason - self.expectedprotocol = expectedprotocol - self.minclient = minclient - } - private enum CodingKeys: String, CodingKey { - case type - case reason - case expectedprotocol = "expectedProtocol" - case minclient = "minClient" - } -} - public struct RequestFrame: Codable { public let type: String public let id: String @@ -537,9 +508,6 @@ public struct ShutdownEvent: Codable { } public enum GatewayFrame: Codable { - case hello(Hello) - case helloOk(HelloOk) - case helloError(HelloError) case req(RequestFrame) case res(ResponseFrame) case event(EventFrame) @@ -553,12 +521,6 @@ public enum GatewayFrame: Codable { let typeContainer = try decoder.container(keyedBy: CodingKeys.self) let type = try typeContainer.decode(String.self, forKey: .type) switch type { - case "hello": - self = .hello(try Hello(from: decoder)) - case "hello-ok": - self = .helloOk(try HelloOk(from: decoder)) - case "hello-error": - self = .helloError(try HelloError(from: decoder)) case "req": self = .req(try RequestFrame(from: decoder)) case "res": @@ -574,9 +536,6 @@ public enum GatewayFrame: Codable { public func encode(to encoder: Encoder) throws { switch self { - case .hello(let v): try v.encode(to: encoder) - case .helloOk(let v): try v.encode(to: encoder) - case .helloError(let v): try v.encode(to: encoder) case .req(let v): try v.encode(to: encoder) case .res(let v): try v.encode(to: encoder) case .event(let v): try v.encode(to: encoder) diff --git a/apps/macos/Tests/ClawdisIPCTests/GatewayChannelConfigureTests.swift b/apps/macos/Tests/ClawdisIPCTests/GatewayChannelConfigureTests.swift index 2f3d5b35a..d2e457242 100644 --- a/apps/macos/Tests/ClawdisIPCTests/GatewayChannelConfigureTests.swift +++ b/apps/macos/Tests/ClawdisIPCTests/GatewayChannelConfigureTests.swift @@ -5,6 +5,7 @@ import Testing @Suite struct GatewayConnectionTests { private final class FakeWebSocketTask: WebSocketTasking, @unchecked Sendable { + private let connectRequestID = OSAllocatedUnfairLock(initialState: nil) private let pendingReceiveHandler = OSAllocatedUnfairLock<(@Sendable (Result) -> Void)?>(initialState: nil) private let cancelCount = OSAllocatedUnfairLock(initialState: 0) @@ -40,8 +41,18 @@ import Testing return count } - // First send is the hello frame. Subsequent sends are request frames. - if currentSendCount == 0 { return } + // First send is the connect handshake request. Subsequent sends are request frames. + if currentSendCount == 0 { + guard case let .data(data) = message else { return } + if let obj = try? JSONSerialization.jsonObject(with: data) as? [String: Any], + (obj["type"] as? String) == "req", + (obj["method"] as? String) == "connect", + let id = obj["id"] as? String + { + self.connectRequestID.withLock { $0 = id } + } + return + } guard case let .data(data) = message else { return } guard @@ -61,7 +72,8 @@ import Testing if self.helloDelayMs > 0 { try await Task.sleep(nanoseconds: UInt64(self.helloDelayMs) * 1_000_000) } - return .data(Self.helloOkData()) + let id = self.connectRequestID.withLock { $0 } ?? "connect" + return .data(Self.connectOkData(id: id)) } func receive( @@ -75,20 +87,25 @@ import Testing handler?(Result.success(.data(data))) } - private static func helloOkData() -> Data { + private static func connectOkData(id: String) -> Data { let json = """ { - "type": "hello-ok", - "protocol": 1, - "server": { "version": "test", "connId": "test" }, - "features": { "methods": [], "events": [] }, - "snapshot": { - "presence": [ { "ts": 1 } ], - "health": {}, - "stateVersion": { "presence": 0, "health": 0 }, - "uptimeMs": 0 - }, - "policy": { "maxPayload": 1, "maxBufferedBytes": 1, "tickIntervalMs": 30000 } + "type": "res", + "id": "\(id)", + "ok": true, + "payload": { + "type": "hello-ok", + "protocol": 2, + "server": { "version": "test", "connId": "test" }, + "features": { "methods": [], "events": [] }, + "snapshot": { + "presence": [ { "ts": 1 } ], + "health": {}, + "stateVersion": { "presence": 0, "health": 0 }, + "uptimeMs": 0 + }, + "policy": { "maxPayload": 1, "maxBufferedBytes": 1, "tickIntervalMs": 30000 } + } } """ return Data(json.utf8) diff --git a/apps/macos/Tests/ClawdisIPCTests/GatewayChannelConnectTests.swift b/apps/macos/Tests/ClawdisIPCTests/GatewayChannelConnectTests.swift index af5c335ca..8b589885a 100644 --- a/apps/macos/Tests/ClawdisIPCTests/GatewayChannelConnectTests.swift +++ b/apps/macos/Tests/ClawdisIPCTests/GatewayChannelConnectTests.swift @@ -11,6 +11,7 @@ import Testing private final class FakeWebSocketTask: WebSocketTasking, @unchecked Sendable { private let response: FakeResponse + private let connectRequestID = OSAllocatedUnfairLock(initialState: nil) private let pendingReceiveHandler = OSAllocatedUnfairLock<(@Sendable (Result) -> Void)?>( initialState: nil) @@ -36,13 +37,26 @@ import Testing } func send(_ message: URLSessionWebSocketTask.Message) async throws { - _ = message + let data: Data? = switch message { + case let .data(d): d + case let .string(s): s.data(using: .utf8) + @unknown default: nil + } + guard let data else { return } + if let obj = try? JSONSerialization.jsonObject(with: data) as? [String: Any], + obj["type"] as? String == "req", + obj["method"] as? String == "connect", + let id = obj["id"] as? String + { + self.connectRequestID.withLock { $0 = id } + } } func receive() async throws -> URLSessionWebSocketTask.Message { let (delayMs, msg): (Int, URLSessionWebSocketTask.Message) = switch self.response { case let .helloOk(delayMs): - (delayMs, .data(Self.helloOkData())) + let id = self.connectRequestID.withLock { $0 } ?? "connect" + (delayMs, .data(Self.connectOkData(id: id))) case let .invalid(delayMs): (delayMs, .string("not json")) } @@ -58,20 +72,25 @@ import Testing self.pendingReceiveHandler.withLock { $0 = completionHandler } } - private static func helloOkData() -> Data { + private static func connectOkData(id: String) -> Data { let json = """ { - "type": "hello-ok", - "protocol": 1, - "server": { "version": "test", "connId": "test" }, - "features": { "methods": [], "events": [] }, - "snapshot": { - "presence": [ { "ts": 1 } ], - "health": {}, - "stateVersion": { "presence": 0, "health": 0 }, - "uptimeMs": 0 - }, - "policy": { "maxPayload": 1, "maxBufferedBytes": 1, "tickIntervalMs": 30000 } + "type": "res", + "id": "\(id)", + "ok": true, + "payload": { + "type": "hello-ok", + "protocol": 2, + "server": { "version": "test", "connId": "test" }, + "features": { "methods": [], "events": [] }, + "snapshot": { + "presence": [ { "ts": 1 } ], + "health": {}, + "stateVersion": { "presence": 0, "health": 0 }, + "uptimeMs": 0 + }, + "policy": { "maxPayload": 1, "maxBufferedBytes": 1, "tickIntervalMs": 30000 } + } } """ return Data(json.utf8) diff --git a/apps/macos/Tests/ClawdisIPCTests/GatewayChannelRequestTests.swift b/apps/macos/Tests/ClawdisIPCTests/GatewayChannelRequestTests.swift index 7bac7d4f3..0d1110744 100644 --- a/apps/macos/Tests/ClawdisIPCTests/GatewayChannelRequestTests.swift +++ b/apps/macos/Tests/ClawdisIPCTests/GatewayChannelRequestTests.swift @@ -6,6 +6,7 @@ import Testing @Suite struct GatewayChannelRequestTests { private final class FakeWebSocketTask: WebSocketTasking, @unchecked Sendable { private let requestSendDelayMs: Int + private let connectRequestID = OSAllocatedUnfairLock(initialState: nil) private let pendingReceiveHandler = OSAllocatedUnfairLock<(@Sendable (Result) -> Void)?>(initialState: nil) private let sendCount = OSAllocatedUnfairLock(initialState: 0) @@ -37,7 +38,22 @@ import Testing return count } - // First send is the hello frame. Second send is the request frame. + // First send is the connect handshake. Second send is the request frame. + if currentSendCount == 0 { + let data: Data? = switch message { + case let .data(d): d + case let .string(s): s.data(using: .utf8) + @unknown default: nil + } + guard let data else { return } + if let obj = try? JSONSerialization.jsonObject(with: data) as? [String: Any], + obj["type"] as? String == "req", + obj["method"] as? String == "connect", + let id = obj["id"] as? String + { + self.connectRequestID.withLock { $0 = id } + } + } if currentSendCount == 1 { try await Task.sleep(nanoseconds: UInt64(self.requestSendDelayMs) * 1_000_000) throw URLError(.cannotConnectToHost) @@ -45,7 +61,8 @@ import Testing } func receive() async throws -> URLSessionWebSocketTask.Message { - .data(Self.helloOkData()) + let id = self.connectRequestID.withLock { $0 } ?? "connect" + return .data(Self.connectOkData(id: id)) } func receive( @@ -54,20 +71,25 @@ import Testing self.pendingReceiveHandler.withLock { $0 = completionHandler } } - private static func helloOkData() -> Data { + private static func connectOkData(id: String) -> Data { let json = """ { - "type": "hello-ok", - "protocol": 1, - "server": { "version": "test", "connId": "test" }, - "features": { "methods": [], "events": [] }, - "snapshot": { - "presence": [ { "ts": 1 } ], - "health": {}, - "stateVersion": { "presence": 0, "health": 0 }, - "uptimeMs": 0 - }, - "policy": { "maxPayload": 1, "maxBufferedBytes": 1, "tickIntervalMs": 30000 } + "type": "res", + "id": "\(id)", + "ok": true, + "payload": { + "type": "hello-ok", + "protocol": 2, + "server": { "version": "test", "connId": "test" }, + "features": { "methods": [], "events": [] }, + "snapshot": { + "presence": [ { "ts": 1 } ], + "health": {}, + "stateVersion": { "presence": 0, "health": 0 }, + "uptimeMs": 0 + }, + "policy": { "maxPayload": 1, "maxBufferedBytes": 1, "tickIntervalMs": 30000 } + } } """ return Data(json.utf8) diff --git a/apps/macos/Tests/ClawdisIPCTests/GatewayChannelShutdownTests.swift b/apps/macos/Tests/ClawdisIPCTests/GatewayChannelShutdownTests.swift index f2a78fbfe..0a210ad37 100644 --- a/apps/macos/Tests/ClawdisIPCTests/GatewayChannelShutdownTests.swift +++ b/apps/macos/Tests/ClawdisIPCTests/GatewayChannelShutdownTests.swift @@ -5,6 +5,7 @@ import Testing @Suite struct GatewayChannelShutdownTests { private final class FakeWebSocketTask: WebSocketTasking, @unchecked Sendable { + private let connectRequestID = OSAllocatedUnfairLock(initialState: nil) private let pendingReceiveHandler = OSAllocatedUnfairLock<(@Sendable (Result) -> Void)?>(initialState: nil) private let cancelCount = OSAllocatedUnfairLock(initialState: 0) @@ -29,11 +30,24 @@ import Testing } func send(_ message: URLSessionWebSocketTask.Message) async throws { - _ = message + let data: Data? = switch message { + case let .data(d): d + case let .string(s): s.data(using: .utf8) + @unknown default: nil + } + guard let data else { return } + if let obj = try? JSONSerialization.jsonObject(with: data) as? [String: Any], + obj["type"] as? String == "req", + obj["method"] as? String == "connect", + let id = obj["id"] as? String + { + self.connectRequestID.withLock { $0 = id } + } } func receive() async throws -> URLSessionWebSocketTask.Message { - .data(Self.helloOkData()) + let id = self.connectRequestID.withLock { $0 } ?? "connect" + return .data(Self.connectOkData(id: id)) } func receive( @@ -47,20 +61,25 @@ import Testing handler?(Result.failure(URLError(.networkConnectionLost))) } - private static func helloOkData() -> Data { + private static func connectOkData(id: String) -> Data { let json = """ { - "type": "hello-ok", - "protocol": 1, - "server": { "version": "test", "connId": "test" }, - "features": { "methods": [], "events": [] }, - "snapshot": { - "presence": [ { "ts": 1 } ], - "health": {}, - "stateVersion": { "presence": 0, "health": 0 }, - "uptimeMs": 0 - }, - "policy": { "maxPayload": 1, "maxBufferedBytes": 1, "tickIntervalMs": 30000 } + "type": "res", + "id": "\(id)", + "ok": true, + "payload": { + "type": "hello-ok", + "protocol": 2, + "server": { "version": "test", "connId": "test" }, + "features": { "methods": [], "events": [] }, + "snapshot": { + "presence": [ { "ts": 1 } ], + "health": {}, + "stateVersion": { "presence": 0, "health": 0 }, + "uptimeMs": 0 + }, + "policy": { "maxPayload": 1, "maxBufferedBytes": 1, "tickIntervalMs": 30000 } + } } """ return Data(json.utf8) @@ -106,4 +125,3 @@ import Testing #expect(session.snapshotMakeCount() == 1) } } - diff --git a/dist/protocol.schema.json b/dist/protocol.schema.json index f22d0d7a9..4cc5e086b 100644 --- a/dist/protocol.schema.json +++ b/dist/protocol.schema.json @@ -4,15 +4,6 @@ "title": "Clawdis Gateway Protocol", "description": "Handshake, request/response, and event frames for the Gateway WebSocket.", "oneOf": [ - { - "$ref": "#/definitions/Hello" - }, - { - "$ref": "#/definitions/HelloOk" - }, - { - "$ref": "#/definitions/HelloError" - }, { "$ref": "#/definitions/RequestFrame" }, @@ -26,23 +17,16 @@ "discriminator": { "propertyName": "type", "mapping": { - "hello": "#/definitions/Hello", - "hello-ok": "#/definitions/HelloOk", - "hello-error": "#/definitions/HelloError", "req": "#/definitions/RequestFrame", "res": "#/definitions/ResponseFrame", "event": "#/definitions/EventFrame" } }, "definitions": { - "Hello": { + "ConnectParams": { "additionalProperties": false, "type": "object", "properties": { - "type": { - "const": "hello", - "type": "string" - }, "minProtocol": { "minimum": 1, "type": "integer" @@ -108,7 +92,6 @@ } }, "required": [ - "type", "minProtocol", "maxProtocol", "client" @@ -298,32 +281,6 @@ "policy" ] }, - "HelloError": { - "additionalProperties": false, - "type": "object", - "properties": { - "type": { - "const": "hello-error", - "type": "string" - }, - "reason": { - "minLength": 1, - "type": "string" - }, - "expectedProtocol": { - "minimum": 1, - "type": "integer" - }, - "minClient": { - "minLength": 1, - "type": "string" - } - }, - "required": [ - "type", - "reason" - ] - }, "RequestFrame": { "additionalProperties": false, "type": "object", @@ -441,295 +398,6 @@ "GatewayFrame": { "discriminator": "type", "anyOf": [ - { - "additionalProperties": false, - "type": "object", - "properties": { - "type": { - "const": "hello", - "type": "string" - }, - "minProtocol": { - "minimum": 1, - "type": "integer" - }, - "maxProtocol": { - "minimum": 1, - "type": "integer" - }, - "client": { - "additionalProperties": false, - "type": "object", - "properties": { - "name": { - "minLength": 1, - "type": "string" - }, - "version": { - "minLength": 1, - "type": "string" - }, - "platform": { - "minLength": 1, - "type": "string" - }, - "mode": { - "minLength": 1, - "type": "string" - }, - "instanceId": { - "minLength": 1, - "type": "string" - } - }, - "required": [ - "name", - "version", - "platform", - "mode" - ] - }, - "caps": { - "default": [], - "type": "array", - "items": { - "minLength": 1, - "type": "string" - } - }, - "auth": { - "additionalProperties": false, - "type": "object", - "properties": { - "token": { - "type": "string" - } - } - }, - "locale": { - "type": "string" - }, - "userAgent": { - "type": "string" - } - }, - "required": [ - "type", - "minProtocol", - "maxProtocol", - "client" - ] - }, - { - "additionalProperties": false, - "type": "object", - "properties": { - "type": { - "const": "hello-ok", - "type": "string" - }, - "protocol": { - "minimum": 1, - "type": "integer" - }, - "server": { - "additionalProperties": false, - "type": "object", - "properties": { - "version": { - "minLength": 1, - "type": "string" - }, - "commit": { - "minLength": 1, - "type": "string" - }, - "host": { - "minLength": 1, - "type": "string" - }, - "connId": { - "minLength": 1, - "type": "string" - } - }, - "required": [ - "version", - "connId" - ] - }, - "features": { - "additionalProperties": false, - "type": "object", - "properties": { - "methods": { - "type": "array", - "items": { - "minLength": 1, - "type": "string" - } - }, - "events": { - "type": "array", - "items": { - "minLength": 1, - "type": "string" - } - } - }, - "required": [ - "methods", - "events" - ] - }, - "snapshot": { - "additionalProperties": false, - "type": "object", - "properties": { - "presence": { - "type": "array", - "items": { - "additionalProperties": false, - "type": "object", - "properties": { - "host": { - "minLength": 1, - "type": "string" - }, - "ip": { - "minLength": 1, - "type": "string" - }, - "version": { - "minLength": 1, - "type": "string" - }, - "mode": { - "minLength": 1, - "type": "string" - }, - "lastInputSeconds": { - "minimum": 0, - "type": "integer" - }, - "reason": { - "minLength": 1, - "type": "string" - }, - "tags": { - "type": "array", - "items": { - "minLength": 1, - "type": "string" - } - }, - "text": { - "type": "string" - }, - "ts": { - "minimum": 0, - "type": "integer" - }, - "instanceId": { - "minLength": 1, - "type": "string" - } - }, - "required": [ - "ts" - ] - } - }, - "health": {}, - "stateVersion": { - "additionalProperties": false, - "type": "object", - "properties": { - "presence": { - "minimum": 0, - "type": "integer" - }, - "health": { - "minimum": 0, - "type": "integer" - } - }, - "required": [ - "presence", - "health" - ] - }, - "uptimeMs": { - "minimum": 0, - "type": "integer" - } - }, - "required": [ - "presence", - "health", - "stateVersion", - "uptimeMs" - ] - }, - "policy": { - "additionalProperties": false, - "type": "object", - "properties": { - "maxPayload": { - "minimum": 1, - "type": "integer" - }, - "maxBufferedBytes": { - "minimum": 1, - "type": "integer" - }, - "tickIntervalMs": { - "minimum": 1, - "type": "integer" - } - }, - "required": [ - "maxPayload", - "maxBufferedBytes", - "tickIntervalMs" - ] - } - }, - "required": [ - "type", - "protocol", - "server", - "features", - "snapshot", - "policy" - ] - }, - { - "additionalProperties": false, - "type": "object", - "properties": { - "type": { - "const": "hello-error", - "type": "string" - }, - "reason": { - "minLength": 1, - "type": "string" - }, - "expectedProtocol": { - "minimum": 1, - "type": "integer" - }, - "minClient": { - "minLength": 1, - "type": "string" - } - }, - "required": [ - "type", - "reason" - ] - }, { "additionalProperties": false, "type": "object", diff --git a/docs/architecture.md b/docs/architecture.md index 47dbe74cb..1315adfd4 100644 --- a/docs/architecture.md +++ b/docs/architecture.md @@ -16,7 +16,7 @@ Last updated: 2025-12-09 - **Gateway (daemon)** - Maintains Baileys/Telegram connections. - Exposes a typed WS API (req/resp + server push events). - - Validates every inbound frame against JSON Schema; rejects anything before a mandatory `hello`. + - Validates every inbound frame against JSON Schema; rejects anything before a mandatory `connect`. - **Clients (mac app / CLI / web admin)** - One WS connection per client. - Send requests (`health`, `status`, `send`, `agent`, `system-presence`, toggles) and subscribe to events (`tick`, `agent`, `presence`, `shutdown`). @@ -31,9 +31,9 @@ Last updated: 2025-12-09 ``` Client Gateway | | - |------- hello ----------->| - |<------ hello-ok ---------| (or hello-error + close) - | (hello-ok carries snapshot: presence + health) + |---- req:connect -------->| + |<------ res (ok) ---------| (or res error + close) + | (payload=hello-ok carries snapshot: presence + health) | | |<------ event:presence ---| (deltas) |<------ event:tick -------| (keepalive/no-op) @@ -46,13 +46,12 @@ Client Gateway ``` ## Wire protocol (summary) - Transport: WebSocket, text frames with JSON payloads. -- First frame must be `hello {type:"hello", minProtocol, maxProtocol, client:{name,version,platform,mode,instanceId}, caps, auth?, locale?, userAgent? }`. -- Server replies `hello-ok {type:"hello-ok", protocol:, server:{version,commit,host,connId}, features:{methods,events}, snapshot:{presence:[...], health:{...}, stateVersion:{presence,health}, uptimeMs}, policy:{maxPayload,maxBufferedBytes,tickIntervalMs} }` - or `hello-error {type:"hello-error", reason, expectedProtocol, minClient }` then closes. +- First frame must be `req {type:"req", id, method:"connect", params:{minProtocol, maxProtocol, client:{name,version,platform,mode,instanceId}, caps, auth?, locale?, userAgent? } }`. +- Server replies `res {type:"res", id, ok:true, payload: hello-ok }` or `ok:false` then closes. - After handshake: - Requests: `{type:"req", id, method, params}` → `{type:"res", id, ok, payload|error}` - Events: `{type:"event", event:"agent"|"presence"|"tick"|"shutdown", payload, seq?, stateVersion?}` -- If `CLAWDIS_GATEWAY_TOKEN` (or `--token`) is set, `hello.auth.token` must match; otherwise the socket closes with policy violation. +- If `CLAWDIS_GATEWAY_TOKEN` (or `--token`) is set, `connect.params.auth.token` must match; otherwise the socket closes with policy violation. - Presence payload is structured, not free text: `{host, ip, version, mode, lastInputSeconds?, ts, reason?, tags?[], instanceId? }`. - Agent runs are acked `{runId,status:"accepted"}` then complete with a final res `{runId,status,summary}`; streamed output arrives as `event:"agent"`. - Protocol versions are bumped on breaking changes; clients must match `minClient`; Gateway chooses within client’s min/max. @@ -69,13 +68,14 @@ Client Gateway ## Invariants - Exactly one Gateway controls a single Baileys session per host. No fallbacks to ad-hoc direct Baileys sends. -- Handshake is mandatory; any non-JSON or non-hello first frame is a hard close. +- Handshake is mandatory; any non-JSON or non-connect first frame is a hard close. - All methods and events are versioned; new fields are additive; breaking changes increment `protocol`. - No event replay: on seq gaps, clients must refresh (`health` + `system-presence`) and continue; presence is bounded via TTL/max entries. ## Remote access - Preferred: Tailscale or VPN; alternate: SSH tunnel `ssh -N -L 18789:127.0.0.1:18789 user@host`. -- Same protocol over the tunnel; same handshake. If a shared token is configured, clients must send it in `hello.auth.token` even over the tunnel. +- Same protocol over the tunnel; same handshake. If a shared token is configured, clients must send it in `connect.params.auth.token` even over the tunnel. +- Same protocol over the tunnel; same handshake. If a shared token is configured, clients must send it in `connect.params.auth.token` even over the tunnel. ## Operations snapshot - Start: `clawdis gateway` (foreground, logs to stdout). diff --git a/docs/gateway.md b/docs/gateway.md index 01424c2dd..81fb750f2 100644 --- a/docs/gateway.md +++ b/docs/gateway.md @@ -25,7 +25,7 @@ pnpm clawdis gateway --force - Pass `--verbose` to mirror debug logging (handshakes, req/res, events) from the log file into stdio when troubleshooting. - `--force` uses `lsof` to find listeners on the chosen port, sends SIGTERM, logs what it killed, then starts the gateway (fails fast if `lsof` is missing). - If you run under a supervisor (launchd/systemd/mac app child-process mode), a stop/restart typically sends **SIGTERM**; older builds may surface this as `pnpm` `ELIFECYCLE` exit code **143** (SIGTERM), which is a normal shutdown, not a crash. -- Optional shared secret: pass `--token ` or set `CLAWDIS_GATEWAY_TOKEN` to require clients to send `hello.auth.token`. +- Optional shared secret: pass `--token ` or set `CLAWDIS_GATEWAY_TOKEN` to require clients to send `connect.params.auth.token`. ## Remote access - Tailscale/VPN preferred; otherwise SSH tunnel: @@ -33,11 +33,11 @@ pnpm clawdis gateway --force ssh -N -L 18789:127.0.0.1:18789 user@host ``` - Clients then connect to `ws://127.0.0.1:18789` through the tunnel. -- If a token is configured, clients must include it in `hello.auth.token` even over the tunnel. +- If a token is configured, clients must include it in `connect.params.auth.token` even over the tunnel. ## Protocol (operator view) -- Mandatory first frame from client: `hello {type:"hello", minProtocol, maxProtocol, client:{name,version,platform,mode,instanceId}, caps, auth?, locale?, userAgent? }`. -- Gateway replies `hello-ok {type:"hello-ok", protocol:, server:{version,commit,host,connId}, features:{methods,events}, snapshot:{presence[], health, stateVersion, uptimeMs}, policy:{maxPayload,maxBufferedBytes,tickIntervalMs} }` or `hello-error`. +- Mandatory first frame from client: `req {type:"req", id, method:"connect", params:{minProtocol,maxProtocol,client:{name,version,platform,mode,instanceId}, caps, auth?, locale?, userAgent? } }`. +- Gateway replies `res {type:"res", id, ok:true, payload:hello-ok }` (or `ok:false` with an error, then closes). - After handshake: - Requests: `{type:"req", id, method, params}` → `{type:"res", id, ok, payload|error}` - Events: `{type:"event", event, payload, seq?, stateVersion?}` @@ -63,13 +63,13 @@ See also: `docs/presence.md` for how presence is produced/deduped and why `insta ## WebChat integration - WebChat serves static assets locally (default port 18788, configurable). - The WebChat backend keeps a single WS connection to the Gateway for control/data; all sends and agent runs flow through that connection. -- Remote use goes through the same SSH/Tailscale tunnel; if a gateway token is configured, WebChat must include it during hello. +- Remote use goes through the same SSH/Tailscale tunnel; if a gateway token is configured, WebChat must include it during connect. - macOS app also connects via this WS (one socket); it hydrates presence from the initial snapshot and listens for `presence` events to update the UI. ## Typing and validation - Server validates every inbound frame with AJV against JSON Schema emitted from the protocol definitions. -- Clients (TS/Swift) consume generated types (TS directly; Swift via quicktype from the JSON Schema). -- Types live in `src/gateway/protocol/*.ts`; regenerate schemas/models with `pnpm protocol:gen` (writes `dist/protocol.schema.json` and `apps/macos/Sources/ClawdisProtocol/Protocol.swift`). +- Clients (TS/Swift) consume generated types (TS directly; Swift via the repo’s generator). +- Types live in `src/gateway/protocol/*.ts`; regenerate schemas/models with `pnpm protocol:gen` (writes `dist/protocol.schema.json`) and `pnpm protocol:gen:swift` (writes `apps/macos/Sources/ClawdisProtocol/GatewayModels.swift`). ## Connection snapshot - `hello-ok` includes a `snapshot` with `presence`, `health`, `stateVersion`, and `uptimeMs` plus `policy {maxPayload,maxBufferedBytes,tickIntervalMs}` so clients can render immediately without extra requests. @@ -119,14 +119,14 @@ WantedBy=multi-user.target Enable with `systemctl enable --now clawdis-gateway.service`. ## Operational checks -- Liveness: open WS and send `hello` → expect `hello-ok` (with snapshot). +- Liveness: open WS and send `req:connect` → expect `res` with `payload.type="hello-ok"` (with snapshot). - Readiness: call `health` → expect `ok: true` and `web.linked=true`. - Debug: subscribe to `tick` and `presence` events; ensure `status` shows linked/auth age; presence entries show Gateway host and connected clients. ## Safety guarantees - Only one Gateway per host; all sends/agent calls must go through it. - No fallback to direct Baileys connections; if the Gateway is down, sends fail fast. -- Non-hello first frames or malformed JSON are rejected and the socket is closed. +- Non-connect first frames or malformed JSON are rejected and the socket is closed. - Graceful shutdown: emit `shutdown` event before closing; clients must handle close + reconnect. ## CLI helpers @@ -138,4 +138,4 @@ Enable with `systemctl enable --now clawdis-gateway.service`. ## Migration guidance - Retire uses of `clawdis gateway` and the legacy TCP control port. -- Update clients to speak the WS protocol with mandatory hello and structured presence. +- Update clients to speak the WS protocol with mandatory connect and structured presence. diff --git a/docs/ios/spec.md b/docs/ios/spec.md index 663e7e07b..c9a871cc5 100644 --- a/docs/ios/spec.md +++ b/docs/ios/spec.md @@ -82,7 +82,7 @@ Unify mac Canvas + iOS Canvas under a single conceptual surface: Add to `src/gateway/protocol/schema.ts` (and regenerate Swift models): **Identity** -- Node identity comes from `hello.client.instanceId` (stable), and `hello.client.mode = "node"` (or `"ios-node"`). +- Node identity comes from `connect.params.client.instanceId` (stable), and `connect.params.client.mode = "node"` (or `"ios-node"`). **Methods** - `node.list` → list paired/connected nodes + capabilities @@ -134,7 +134,7 @@ When iOS is backgrounded: ## Code sharing (macOS + iOS) Create/expand SwiftPM targets so both apps share: - `ClawdisProtocol` (generated models; platform-neutral) -- `ClawdisGatewayClient` (shared WS framing + hello/req/res + seq-gap handling) +- `ClawdisGatewayClient` (shared WS framing + connect/req/res + seq-gap handling) - `ClawdisNodeKit` (node.invoke command types + error codes) macOS continues to own: @@ -191,6 +191,6 @@ open ClawdisNode.xcodeproj - Keep existing implementation, but expose it through the unified protocol path so the agent uses one API. ## Open questions -- Should `hello.client.mode` be `"node"` with `platform="ios ..."` or a distinct mode `"ios-node"`? (Presence filtering currently excludes `"cli"` only.) +- Should `connect.params.client.mode` be `"node"` with `platform="ios ..."` or a distinct mode `"ios-node"`? (Presence filtering currently excludes `"cli"` only.) - Do we want a “permissions” model per node (voice only vs voice+screen) at pairing time? - Should “website mode” allow arbitrary https, or enforce an allowlist to reduce risk? diff --git a/docs/presence.md b/docs/presence.md index 33c380d83..9d65c7eab 100644 --- a/docs/presence.md +++ b/docs/presence.md @@ -3,7 +3,7 @@ summary: "How Clawdis presence entries are produced, merged, and displayed" read_when: - Debugging the Instances tab - Investigating duplicate or stale instance rows - - Changing gateway WS hello or system-event beacons + - Changing gateway WS connect or system-event beacons --- # Presence @@ -36,13 +36,13 @@ The Gateway seeds a “self” entry at startup so UIs always show at least the Implementation: `src/infra/system-presence.ts` (`initSelfPresence()`). -### 2) WebSocket hello (connection-derived presence) +### 2) WebSocket connect (connection-derived presence) -Every WS client must begin with a `hello` frame. On successful handshake, the Gateway upserts a presence entry for that connection. +Every WS client must begin with a `connect` request. On successful handshake, the Gateway upserts a presence entry for that connection. This is meant to answer: “Which clients are currently connected?” -Implementation: `src/gateway/server.ts` (WS `hello` handling uses `hello.client.instanceId` when provided; otherwise falls back to `connId`). +Implementation: `src/gateway/server.ts` (connect handling uses `connect.params.client.instanceId` when provided; otherwise falls back to `connId`). #### Why one-off CLI commands do not show up @@ -113,6 +113,6 @@ The store refreshes periodically and also applies `presence` WS events. - To see the raw list, call `system-presence` against the gateway. - If you see duplicates: - - confirm clients send a stable `instanceId` in `hello` + - confirm clients send a stable `instanceId` in the handshake (`connect.params.client.instanceId`) - confirm beaconing uses the same `instanceId` - check whether the connection-derived entry is missing `instanceId` (then it will be keyed by `connId` and duplicates are expected on reconnect) diff --git a/docs/refactor/new-arch.md b/docs/refactor/new-arch.md index 0c35d4384..38eb1f421 100644 --- a/docs/refactor/new-arch.md +++ b/docs/refactor/new-arch.md @@ -16,17 +16,16 @@ Goal: replace legacy gateway/stdin/TCP control with a single WebSocket Gateway, - **Protocol folder**: create `protocol/` for schemas and build artifacts. ✅ `src/gateway/protocol`. - **Schema tooling**: - Prefer **TypeBox** (or ArkType) as source-of-truth types. ✅ TypeBox in `schema.ts`. - - `pnpm protocol:gen`: - 1) emits JSON Schema (`dist/protocol.schema.json`), - 2) runs quicktype → Swift `Codable` models (`apps/macos/Sources/ClawdisProtocol/Protocol.swift`). ✅ + - `pnpm protocol:gen`: emits JSON Schema (`dist/protocol.schema.json`). ✅ + - `pnpm protocol:gen:swift`: generates Swift `Codable` models (`apps/macos/Sources/ClawdisProtocol/GatewayModels.swift`). ✅ - AJV compile step for server validators. ✅ - **CI**: add a job that fails if schema or generated Swift is stale. ✅ `pnpm protocol:check` (runs gen + git diff). ## Phase 1 — Protocol specification - Frames (WS text JSON, all with explicit `type`): - - `hello {type:"hello", minProtocol, maxProtocol, client:{name,version,platform,mode,instanceId}, caps, auth:{token?}, locale?, userAgent?}` + - `req {type:"req", id, method:"connect", params:{minProtocol,maxProtocol,client:{name,version,platform,mode,instanceId}, caps, auth:{token?}, locale?, userAgent?}}` + - `res {type:"res", id, ok:true, payload: hello-ok }` (or `ok:false` then close) - `hello-ok {type:"hello-ok", protocol:, server:{version,commit,host,connId}, features:{methods,events}, snapshot:{presence[], health, stateVersion:{presence,health}, uptimeMs}, policy:{maxPayload, maxBufferedBytes, tickIntervalMs}}` - - `hello-error {type:"hello-error", reason, expectedProtocol, minClient}` - `req {type:"req", id, method, params?}` - `res {type:"res", id, ok, payload?, error?}` where `error` = `{code,message,details?,retryable?,retryAfterMs?}` - `event {type:"event", event, payload, seq?, stateVersion?}` (presence/tick/shutdown/agent) @@ -40,8 +39,8 @@ Goal: replace legacy gateway/stdin/TCP control with a single WebSocket Gateway, - Error codes: `NOT_LINKED`, `AGENT_TIMEOUT`, `INVALID_REQUEST`, `UNAVAILABLE`. - Error shape: `{code, message, details?, retryable?, retryAfterMs?}` - Rules: - - First frame must be `type:"hello"`; otherwise close. Add handshake timeout (e.g., 3s) for silent clients. - - Negotiate protocol: server picks within `[minProtocol,maxProtocol]`; if none, send `hello-error`. + - First frame must be `req` with `method:"connect"`; otherwise close. Add handshake timeout (e.g., 3s) for silent clients. + - Negotiate protocol: server picks within `[minProtocol,maxProtocol]`; if none, reply `res ok:false` and close. - Protocol version bump on breaking changes; `hello-ok` must include `minClient` when needed. - `stateVersion` increments for presence/health to drop stale deltas. - Stable IDs: client sends `instanceId`; server issues per-connection `connId` in `hello-ok`; presence entries may include `instanceId` to dedupe reconnects. @@ -49,14 +48,14 @@ Goal: replace legacy gateway/stdin/TCP control with a single WebSocket Gateway, - Presence is primarily connection-derived; client may add hints (e.g., lastInputSeconds); entries expire via TTL to keep the map bounded (e.g., 5m TTL, max 200 entries). - Idempotency keys: required for `send` and `agent` to safely retry after disconnects. - Size limits: bound first-frame size by `maxPayload`; reject early if exceeded. - - Close on any non-JSON or wrong `type` before hello. + - Close on any non-JSON or wrong `type` before connect. - Per-op idempotency keys: client SHOULD supply an explicit key per `send`/`agent`; if omitted, server may derive a scoped key from `instanceId+connId`, but explicit keys are safer across reconnects. - Locale/userAgent are informational; server may log them for analytics but must not rely on them for access control. ## Phase 2 — Gateway WebSocket server - New module `src/gateway/server.ts`: - Bind 127.0.0.1:18789 (configurable). - - On connect: validate `hello`, send `hello-ok` with snapshot, start event pump. + - On connect: validate `connect` params, send snapshot payload, start event pump. - Per-connection queues with backpressure (bounded; drop oldest non-critical). - WS-level caps: set `maxPayload` to cap frame size before JSON parse. - Emit `tick` every N seconds when idle (or WS ping/pong if adequate). @@ -73,7 +72,7 @@ Goal: replace legacy gateway/stdin/TCP control with a single WebSocket Gateway, - Handshake edge cases: - Close on handshake timeout. - Close on over-limit first frame (maxPayload). - - Close immediately on non-JSON or wrong `type` before hello. + - Close immediately on non-JSON or wrong `type` before connect. - Default guardrails: `maxPayload` ~512 KB, handshake timeout ~3 s, outbound buffered amount cap ~1.5 MB (tune as you implement). - Dedupe cache: bound TTL (~5m) and max size (~1000 entries); evict oldest first (LRU) to prevent memory growth. @@ -101,7 +100,7 @@ Goal: replace legacy gateway/stdin/TCP control with a single WebSocket Gateway, - Replace stdio/SSH RPC with WS client (tunneled via SSH/Tailscale for remote). ✅ AgentRPC/ControlChannel now use Gateway WS. - Implement handshake, snapshot hydration, subscriptions to `presence`, `tick`, `agent`, `shutdown`. ✅ snapshot + presence events broadcast to InstancesStore; agent events still to wire to UI if desired. - Remove immediate `health/system-presence` fetch on connect. ✅ presence hydrated from snapshot; periodic refresh kept as fallback. - - Handle `hello-error` and retry with backoff if version/token mismatched. ✅ macOS GatewayChannel reconnects with exponential backoff. + - Handle connect failures (`res ok:false`) and retry with backoff if version/token mismatched. ✅ macOS GatewayChannel reconnects with exponential backoff. - **CLI**: - Add lightweight WS client helper for `status/health/send/agent` when Gateway is up. ✅ `gateway` subcommands use the Gateway over WS. - Consider a “local only” flag to avoid accidental remote connects. (optional; not needed with tunnel-first model.) @@ -134,7 +133,7 @@ Goal: replace legacy gateway/stdin/TCP control with a single WebSocket Gateway, ## Edge cases and ordering - Event ordering: all events carry `seq`; clients detect gaps and should re-fetch snapshot (or targeted refresh) on gap. -- Partial handshakes: if client connects and never sends hello, server closes after handshake timeout. +- Partial handshakes: if client connects and never sends `req:connect`, server closes after handshake timeout. - Garbage/oversize first frame: bounded by `maxPayload`; server closes immediately on parse failure. - Duplicate delivery on reconnect: clients must send idempotency keys; Gateway dedupe cache prevents double-send/agent execution. - Snapshot sufficiency: `hello-ok.snapshot` must contain enough to render UI after reconnect without event replay. @@ -144,7 +143,7 @@ Goal: replace legacy gateway/stdin/TCP control with a single WebSocket Gateway, ## Phase 9 — Testing & validation - Unit: frame validation, handshake failure, auth/token, stateVersion on presence events, agent stream fanout, send dedupe. ✅ -- Integration: connect → snapshot → req/res → streaming agent → shutdown. ✅ Covered in gateway WS tests (hello/health/status/presence, agent ack+final, shutdown broadcast). +- Integration: connect → snapshot → req/res → streaming agent → shutdown. ✅ Covered in gateway WS tests (connect/health/status/presence, agent ack+final, shutdown broadcast). - Load: multiple concurrent WS clients; backpressure behavior under burst. ✅ Basic fanout test with 3 clients receiving presence broadcast; heavier soak still recommended. - Mac app smoke: presence/health render from snapshot; reconnect on tick loss. (Manual: open Instances tab, verify snapshot after connect, induce seq gap by toggling wifi, ensure UI refreshes.) - WebChat smoke: snapshot seed + event updates; tunnel scenario. ✅ Offline snapshot harness in `src/webchat/server.test.ts` (mock gateway) now passes; live tunnel still recommended for manual. @@ -161,7 +160,7 @@ Goal: replace legacy gateway/stdin/TCP control with a single WebSocket Gateway, - Quick checklist - [x] Protocol types & schemas (TS + JSON Schema + Swift via quicktype) - [x] AJV validators wired -- [x] WS server with hello → snapshot → events +- [x] WS server with connect → snapshot → events - [x] Tick + shutdown events - [x] stateVersion + presence deltas - [x] Gateway CLI command diff --git a/docs/refactor/webagent-session.md b/docs/refactor/webagent-session.md index d26d5e81c..7282a7e63 100644 --- a/docs/refactor/webagent-session.md +++ b/docs/refactor/webagent-session.md @@ -18,7 +18,7 @@ Context: web chat currently lives in a WKWebView that loads the pi-web bundle. S ## Client work (pi-web bundle) - Replace `NativeTransport` with a Gateway WS client: - - `hello` → `chat.history` for initial state. + - `connect` → `chat.history` for initial state. - Listen to `chat/presence/tick/health`; update UI from events only. - Send via `chat.send`; mark pending until `chat state:final|error`. - Enforce health gate + 30s timeout. diff --git a/docs/typebox.md b/docs/typebox.md index c510d2d12..458e443d8 100644 --- a/docs/typebox.md +++ b/docs/typebox.md @@ -7,22 +7,22 @@ read_when: Last updated: 2025-12-09 -We use TypeBox schemas in `src/gateway/protocol/schema.ts` as the single source of truth for the Gateway control plane (hello/req/res/event frames and payloads). All derived artifacts should be generated from these schemas, not edited by hand. +We use TypeBox schemas in `src/gateway/protocol/schema.ts` as the single source of truth for the Gateway control plane (connect/req/res/event frames and payloads). All derived artifacts should be generated from these schemas, not edited by hand. ## Current pipeline - **TypeBox → JSON Schema**: `pnpm protocol:gen` writes `dist/protocol.schema.json` (draft-07) and runs AJV in the server tests. -- **TypeBox → Swift (quicktype)**: `pnpm protocol:gen` currently also generates `apps/macos/Sources/ClawdisProtocol/Protocol.swift` via quicktype. This produces a single struct with many optionals and is not ideal for strong typing. +- **TypeBox → Swift**: `pnpm protocol:gen:swift` generates `apps/macos/Sources/ClawdisProtocol/GatewayModels.swift`. ## Problem -- Quicktype flattens `oneOf`/`discriminator` into an all-optional struct, so Swift loses exhaustiveness and safety for `GatewayFrame`. +- We want strong typing in Swift, including a sealed `GatewayFrame` enum with a discriminator and a forward-compatible `unknown` case. ## Preferred plan (next step) - Add a small, custom Swift generator driven directly by the TypeBox schemas: - - Emit a sealed `enum GatewayFrame: Codable { case hello(Hello), helloOk(HelloOk), helloError(...), req(RequestFrame), res(ResponseFrame), event(EventFrame) }`. - - Emit strongly typed payload structs/enums (`Hello`, `HelloOk`, `HelloError`, `RequestFrame`, `ResponseFrame`, `EventFrame`, `PresenceEntry`, `Snapshot`, `StateVersion`, `ErrorShape`, `AgentEvent`, `TickEvent`, `ShutdownEvent`, `SendParams`, `AgentParams`, `ErrorCode`, `PROTOCOL_VERSION`). + - Emit a sealed `enum GatewayFrame: Codable { case req(RequestFrame), res(ResponseFrame), event(EventFrame) }`. + - Emit strongly typed payload structs/enums (`ConnectParams`, `HelloOk`, `RequestFrame`, `ResponseFrame`, `EventFrame`, `PresenceEntry`, `Snapshot`, `StateVersion`, `ErrorShape`, `AgentEvent`, `TickEvent`, `ShutdownEvent`, `SendParams`, `AgentParams`, `ErrorCode`, `PROTOCOL_VERSION`). - Custom `init(from:)` / `encode(to:)` enforces the `type` discriminator and can include an `unknown` case for forward compatibility. - Wire a new script (e.g., `pnpm protocol:gen:swift`) into `protocol:check` so CI fails if the generated Swift is stale. diff --git a/docs/webchat.md b/docs/webchat.md index d8a705f92..1ec1d3902 100644 --- a/docs/webchat.md +++ b/docs/webchat.md @@ -20,7 +20,7 @@ Updated: 2025-12-09 - Data plane is entirely on the Gateway WS (`ws://127.0.0.1:`): methods `chat.history`, `chat.send`; events `chat`, `presence`, `tick`, `health`. ## How it connects -- Browser/WebView performs Gateway WS `hello`, then calls `chat.history` for bootstrap and `chat.send` for sends; listens to `chat/presence/tick/health` events. +- Browser/WebView performs Gateway WS `connect`, then calls `chat.history` for bootstrap and `chat.send` for sends; listens to `chat/presence/tick/health` events. - No session file watching. History comes from the Gateway via `chat.history`. - If Gateway WS is unavailable, the UI surfaces the error and blocks send. diff --git a/scripts/protocol-gen-swift.ts b/scripts/protocol-gen-swift.ts index 0d9fba9ba..8d1b21b16 100644 --- a/scripts/protocol-gen-swift.ts +++ b/scripts/protocol-gen-swift.ts @@ -146,11 +146,8 @@ function emitStruct(name: string, schema: JsonSchema): string { } function emitGatewayFrame(): string { - const cases = ["hello", "hello-ok", "hello-error", "req", "res", "event"]; + const cases = ["req", "res", "event"]; const associated: Record = { - hello: "Hello", - "hello-ok": "HelloOk", - "hello-error": "HelloError", req: "RequestFrame", res: "ResponseFrame", event: "EventFrame", @@ -165,12 +162,6 @@ function emitGatewayFrame(): string { let typeContainer = try decoder.container(keyedBy: CodingKeys.self) let type = try typeContainer.decode(String.self, forKey: .type) switch type { - case "hello": - self = .hello(try Hello(from: decoder)) - case "hello-ok": - self = .helloOk(try HelloOk(from: decoder)) - case "hello-error": - self = .helloError(try HelloError(from: decoder)) case "req": self = .req(try RequestFrame(from: decoder)) case "res": @@ -186,9 +177,6 @@ function emitGatewayFrame(): string { public func encode(to encoder: Encoder) throws { switch self { - case .hello(let v): try v.encode(to: encoder) - case .helloOk(let v): try v.encode(to: encoder) - case .helloError(let v): try v.encode(to: encoder) case .req(let v): try v.encode(to: encoder) case .res(let v): try v.encode(to: encoder) case .event(let v): try v.encode(to: encoder) diff --git a/scripts/protocol-gen.ts b/scripts/protocol-gen.ts index a915791a9..39874b773 100644 --- a/scripts/protocol-gen.ts +++ b/scripts/protocol-gen.ts @@ -18,9 +18,6 @@ async function writeJsonSchema() { title: "Clawdis Gateway Protocol", description: "Handshake, request/response, and event frames for the Gateway WebSocket.", oneOf: [ - { $ref: "#/definitions/Hello" }, - { $ref: "#/definitions/HelloOk" }, - { $ref: "#/definitions/HelloError" }, { $ref: "#/definitions/RequestFrame" }, { $ref: "#/definitions/ResponseFrame" }, { $ref: "#/definitions/EventFrame" }, @@ -28,9 +25,6 @@ async function writeJsonSchema() { discriminator: { propertyName: "type", mapping: { - hello: "#/definitions/Hello", - "hello-ok": "#/definitions/HelloOk", - "hello-error": "#/definitions/HelloError", req: "#/definitions/RequestFrame", res: "#/definitions/ResponseFrame", event: "#/definitions/EventFrame", diff --git a/src/cli/program.ts b/src/cli/program.ts index 481303e8c..f3dee2ca3 100644 --- a/src/cli/program.ts +++ b/src/cli/program.ts @@ -220,7 +220,7 @@ Examples: ) .option( "--token ", - "Shared token required in hello.auth.token (default: CLAWDIS_GATEWAY_TOKEN env if set)", + "Shared token required in connect.params.auth.token (default: CLAWDIS_GATEWAY_TOKEN env if set)", ) .option( "--force", diff --git a/src/gateway/client.test.ts b/src/gateway/client.test.ts index 3b7a2637b..9ce4ee8a6 100644 --- a/src/gateway/client.test.ts +++ b/src/gateway/client.test.ts @@ -29,11 +29,13 @@ describe("GatewayClient", () => { wss = new WebSocketServer({ port, host: "127.0.0.1" }); wss.on("connection", (socket) => { - socket.once("message", () => { + socket.once("message", (data) => { + const first = JSON.parse(String(data)) as { id?: string }; + const id = first.id ?? "connect"; // Respond with tiny tick interval to trigger watchdog quickly. const helloOk = { type: "hello-ok", - protocol: 1, + protocol: 2, server: { version: "dev", connId: "c1" }, features: { methods: [], events: [] }, snapshot: { @@ -48,7 +50,9 @@ describe("GatewayClient", () => { tickIntervalMs: 5, }, }; - socket.send(JSON.stringify(helloOk)); + socket.send( + JSON.stringify({ type: "res", id, ok: true, payload: helloOk }), + ); }); }); diff --git a/src/gateway/client.ts b/src/gateway/client.ts index 936f48823..9f4751095 100644 --- a/src/gateway/client.ts +++ b/src/gateway/client.ts @@ -2,8 +2,8 @@ import { randomUUID } from "node:crypto"; import { WebSocket } from "ws"; import { logDebug, logError } from "../logger.js"; import { + type ConnectParams, type EventFrame, - type Hello, type HelloOk, PROTOCOL_VERSION, type RequestFrame, @@ -53,7 +53,7 @@ export class GatewayClient { const url = this.opts.url ?? "ws://127.0.0.1:18789"; this.ws = new WebSocket(url, { maxPayload: 512 * 1024 }); - this.ws.on("open", () => this.sendHello()); + this.ws.on("open", () => this.sendConnect()); this.ws.on("message", (data) => this.handleMessage(data.toString())); this.ws.on("close", (code, reason) => { this.ws = null; @@ -79,9 +79,8 @@ export class GatewayClient { this.flushPendingErrors(new Error("gateway client stopped")); } - private sendHello() { - const hello: Hello = { - type: "hello", + private sendConnect() { + const params: ConnectParams = { minProtocol: this.opts.minProtocol ?? PROTOCOL_VERSION, maxProtocol: this.opts.maxProtocol ?? PROTOCOL_VERSION, client: { @@ -94,28 +93,27 @@ export class GatewayClient { caps: [], auth: this.opts.token ? { token: this.opts.token } : undefined, }; - this.ws?.send(JSON.stringify(hello)); + + void this.request("connect", params) + .then((helloOk) => { + this.backoffMs = 1000; + this.tickIntervalMs = + typeof helloOk.policy?.tickIntervalMs === "number" + ? helloOk.policy.tickIntervalMs + : 30_000; + this.lastTick = Date.now(); + this.startTickWatch(); + this.opts.onHelloOk?.(helloOk); + }) + .catch((err) => { + logError(`gateway connect failed: ${String(err)}`); + this.ws?.close(1008, "connect failed"); + }); } private handleMessage(raw: string) { try { const parsed = JSON.parse(raw); - if (parsed?.type === "hello-ok") { - this.backoffMs = 1000; - this.tickIntervalMs = - typeof parsed.policy?.tickIntervalMs === "number" - ? parsed.policy.tickIntervalMs - : 30_000; - this.lastTick = Date.now(); - this.startTickWatch(); - this.opts.onHelloOk?.(parsed as HelloOk); - return; - } - if (parsed?.type === "hello-error") { - logError(`gateway hello-error: ${parsed.reason}`); - this.ws?.close(1008, "hello-error"); - return; - } if (parsed?.type === "event") { const evt = parsed as EventFrame; const seq = typeof evt.seq === "number" ? evt.seq : null; diff --git a/src/gateway/protocol/index.ts b/src/gateway/protocol/index.ts index cb68697f6..23d9e8092 100644 --- a/src/gateway/protocol/index.ts +++ b/src/gateway/protocol/index.ts @@ -7,6 +7,8 @@ import { ChatEventSchema, ChatHistoryParamsSchema, ChatSendParamsSchema, + type ConnectParams, + ConnectParamsSchema, ErrorCodes, type ErrorShape, ErrorShapeSchema, @@ -15,12 +17,8 @@ import { errorShape, type GatewayFrame, GatewayFrameSchema, - type Hello, - type HelloError, - HelloErrorSchema, type HelloOk, HelloOkSchema, - HelloSchema, PROTOCOL_VERSION, type PresenceEntry, PresenceEntrySchema, @@ -50,7 +48,8 @@ const ajv = new ( removeAdditional: false, }); -export const validateHello = ajv.compile(HelloSchema); +export const validateConnectParams = + ajv.compile(ConnectParamsSchema); export const validateRequestFrame = ajv.compile(RequestFrameSchema); export const validateSendParams = ajv.compile(SendParamsSchema); @@ -67,9 +66,8 @@ export function formatValidationErrors( } export { - HelloSchema, + ConnectParamsSchema, HelloOkSchema, - HelloErrorSchema, RequestFrameSchema, ResponseFrameSchema, EventFrameSchema, @@ -94,9 +92,8 @@ export { export type { GatewayFrame, - Hello, + ConnectParams, HelloOk, - HelloError, RequestFrame, ResponseFrame, EventFrame, diff --git a/src/gateway/protocol/schema.ts b/src/gateway/protocol/schema.ts index e998ada14..fc4bf018f 100644 --- a/src/gateway/protocol/schema.ts +++ b/src/gateway/protocol/schema.ts @@ -53,9 +53,8 @@ export const ShutdownEventSchema = Type.Object( { additionalProperties: false }, ); -export const HelloSchema = Type.Object( +export const ConnectParamsSchema = Type.Object( { - type: Type.Literal("hello"), minProtocol: Type.Integer({ minimum: 1 }), maxProtocol: Type.Integer({ minimum: 1 }), client: Type.Object( @@ -116,16 +115,6 @@ export const HelloOkSchema = Type.Object( { additionalProperties: false }, ); -export const HelloErrorSchema = Type.Object( - { - type: Type.Literal("hello-error"), - reason: NonEmptyString, - expectedProtocol: Type.Optional(Type.Integer({ minimum: 1 })), - minClient: Type.Optional(NonEmptyString), - }, - { additionalProperties: false }, -); - export const ErrorShapeSchema = Type.Object( { code: NonEmptyString, @@ -173,14 +162,7 @@ export const EventFrameSchema = Type.Object( // downstream codegen (quicktype) produce tighter types instead of all-optional // blobs. export const GatewayFrameSchema = Type.Union( - [ - HelloSchema, - HelloOkSchema, - HelloErrorSchema, - RequestFrameSchema, - ResponseFrameSchema, - EventFrameSchema, - ], + [RequestFrameSchema, ResponseFrameSchema, EventFrameSchema], { discriminator: "type" }, ); @@ -261,9 +243,8 @@ export const ChatEventSchema = Type.Object( ); export const ProtocolSchemas: Record = { - Hello: HelloSchema, + ConnectParams: ConnectParamsSchema, HelloOk: HelloOkSchema, - HelloError: HelloErrorSchema, RequestFrame: RequestFrameSchema, ResponseFrame: ResponseFrameSchema, EventFrame: EventFrameSchema, @@ -282,11 +263,10 @@ export const ProtocolSchemas: Record = { ShutdownEvent: ShutdownEventSchema, }; -export const PROTOCOL_VERSION = 1 as const; +export const PROTOCOL_VERSION = 2 as const; -export type Hello = Static; +export type ConnectParams = Static; export type HelloOk = Static; -export type HelloError = Static; export type RequestFrame = Static; export type ResponseFrame = Static; export type EventFrame = Static; diff --git a/src/gateway/server.test.ts b/src/gateway/server.test.ts index 6cbd27805..e301364b7 100644 --- a/src/gateway/server.test.ts +++ b/src/gateway/server.test.ts @@ -8,6 +8,7 @@ import { WebSocket } from "ws"; import { agentCommand } from "../commands/agent.js"; import { emitAgentEvent } from "../infra/agent-events.js"; import { GatewayLockError } from "../infra/gateway-lock.js"; +import { PROTOCOL_VERSION } from "./protocol/index.js"; import { startGatewayServer } from "./server.js"; let testSessionStorePath: string | undefined; @@ -109,6 +110,67 @@ async function startServerWithClient(token?: string) { return { server, ws, port, prevToken: prev }; } +type ConnectResponse = { + type: "res"; + id: string; + ok: boolean; + payload?: unknown; + error?: { message?: string }; +}; + +async function connectReq( + ws: WebSocket, + opts?: { + token?: string; + minProtocol?: number; + maxProtocol?: number; + client?: { + name: string; + version: string; + platform: string; + mode: string; + instanceId?: string; + }; + }, +): Promise { + const id = randomUUID(); + ws.send( + JSON.stringify({ + type: "req", + id, + method: "connect", + params: { + minProtocol: opts?.minProtocol ?? PROTOCOL_VERSION, + maxProtocol: opts?.maxProtocol ?? PROTOCOL_VERSION, + client: opts?.client ?? { + name: "test", + version: "1.0.0", + platform: "test", + mode: "test", + }, + caps: [], + auth: opts?.token ? { token: opts.token } : undefined, + }, + }), + ); + return await onceMessage( + ws, + (o) => o.type === "res" && o.id === id, + ); +} + +async function connectOk( + ws: WebSocket, + opts?: Parameters[1], +) { + const res = await connectReq(ws, opts); + expect(res.ok).toBe(true); + expect((res.payload as { type?: unknown } | undefined)?.type).toBe( + "hello-ok", + ); + return res.payload as { type: "hello-ok" }; +} + describe("gateway server", () => { test("agent falls back to allowFrom when lastTo is stale", async () => { testAllowFrom = ["+436769770569"]; @@ -132,16 +194,7 @@ describe("gateway server", () => { ); const { server, ws } = await startServerWithClient(); - ws.send( - JSON.stringify({ - type: "hello", - minProtocol: 1, - maxProtocol: 1, - client: { name: "test", version: "1", platform: "test", mode: "test" }, - caps: [], - }), - ); - await onceMessage(ws, (o) => o.type === "hello-ok"); + await connectOk(ws); ws.send( JSON.stringify({ @@ -196,16 +249,7 @@ describe("gateway server", () => { ); const { server, ws } = await startServerWithClient(); - ws.send( - JSON.stringify({ - type: "hello", - minProtocol: 1, - maxProtocol: 1, - client: { name: "test", version: "1", platform: "test", mode: "test" }, - caps: [], - }), - ); - await onceMessage(ws, (o) => o.type === "hello-ok"); + await connectOk(ws); ws.send( JSON.stringify({ @@ -260,16 +304,7 @@ describe("gateway server", () => { ); const { server, ws } = await startServerWithClient(); - ws.send( - JSON.stringify({ - type: "hello", - minProtocol: 1, - maxProtocol: 1, - client: { name: "test", version: "1", platform: "test", mode: "test" }, - caps: [], - }), - ); - await onceMessage(ws, (o) => o.type === "hello-ok"); + await connectOk(ws); ws.send( JSON.stringify({ @@ -322,16 +357,7 @@ describe("gateway server", () => { ); const { server, ws } = await startServerWithClient(); - ws.send( - JSON.stringify({ - type: "hello", - minProtocol: 1, - maxProtocol: 1, - client: { name: "test", version: "1", platform: "test", mode: "test" }, - caps: [], - }), - ); - await onceMessage(ws, (o) => o.type === "hello-ok"); + await connectOk(ws); ws.send( JSON.stringify({ @@ -364,18 +390,12 @@ describe("gateway server", () => { test("rejects protocol mismatch", async () => { const { server, ws } = await startServerWithClient(); - ws.send( - JSON.stringify({ - type: "hello", - minProtocol: 2, - maxProtocol: 3, - client: { name: "test", version: "1", platform: "test", mode: "test" }, - caps: [], - }), - ); try { - const res = await onceMessage(ws, () => true, 2000); - expect(res.type).toBe("hello-error"); + const res = await connectReq(ws, { + minProtocol: PROTOCOL_VERSION + 1, + maxProtocol: PROTOCOL_VERSION + 2, + }); + expect(res.ok).toBe(false); } catch { // If the server closed before we saw the frame, that's acceptable for mismatch. } @@ -385,19 +405,9 @@ describe("gateway server", () => { test("rejects invalid token", async () => { const { server, ws, prevToken } = await startServerWithClient("secret"); - ws.send( - JSON.stringify({ - type: "hello", - minProtocol: 1, - maxProtocol: 1, - client: { name: "test", version: "1", platform: "test", mode: "test" }, - caps: [], - auth: { token: "wrong" }, - }), - ); - const res = await onceMessage(ws, () => true); - expect(res.type).toBe("hello-error"); - expect(res.reason).toContain("unauthorized"); + const res = await connectReq(ws, { token: "wrong" }); + expect(res.ok).toBe(false); + expect(res.error?.message ?? "").toContain("unauthorized"); ws.close(); await server.close(); process.env.CLAWDIS_GATEWAY_TOKEN = prevToken; @@ -420,16 +430,17 @@ describe("gateway server", () => { }, ); - test( - "hello + health + presence + status succeed", - { timeout: 8000 }, - async () => { - const { server, ws } = await startServerWithClient(); - ws.send( - JSON.stringify({ - type: "hello", - minProtocol: 1, - maxProtocol: 1, + test("connect (req) handshake returns hello-ok payload", async () => { + const { server, ws } = await startServerWithClient(); + const id = randomUUID(); + ws.send( + JSON.stringify({ + type: "req", + id, + method: "connect", + params: { + minProtocol: PROTOCOL_VERSION, + maxProtocol: PROTOCOL_VERSION, client: { name: "test", version: "1.0.0", @@ -437,9 +448,40 @@ describe("gateway server", () => { mode: "test", }, caps: [], - }), - ); - await onceMessage(ws, (o) => o.type === "hello-ok"); + }, + }), + ); + + const res = await onceMessage<{ ok: boolean; payload?: unknown }>( + ws, + (o) => o.type === "res" && o.id === id, + ); + expect(res.ok).toBe(true); + expect((res.payload as { type?: unknown } | undefined)?.type).toBe( + "hello-ok", + ); + ws.close(); + await server.close(); + }); + + test("rejects non-connect first request", async () => { + const { server, ws } = await startServerWithClient(); + ws.send(JSON.stringify({ type: "req", id: "h1", method: "health" })); + const res = await onceMessage<{ ok: boolean; error?: unknown }>( + ws, + (o) => o.type === "res" && o.id === "h1", + ); + expect(res.ok).toBe(false); + await new Promise((resolve) => ws.once("close", () => resolve())); + await server.close(); + }); + + test( + "connect + health + presence + status succeed", + { timeout: 8000 }, + async () => { + const { server, ws } = await startServerWithClient(); + await connectOk(ws); const healthP = onceMessage( ws, @@ -478,21 +520,7 @@ describe("gateway server", () => { { timeout: 8000 }, async () => { const { server, ws } = await startServerWithClient(); - ws.send( - JSON.stringify({ - type: "hello", - minProtocol: 1, - maxProtocol: 1, - client: { - name: "test", - version: "1.0.0", - platform: "test", - mode: "test", - }, - caps: [], - }), - ); - await onceMessage(ws, (o) => o.type === "hello-ok"); + await connectOk(ws); const presenceEventP = onceMessage( ws, @@ -519,21 +547,7 @@ describe("gateway server", () => { test("agent events stream with seq", { timeout: 8000 }, async () => { const { server, ws } = await startServerWithClient(); - ws.send( - JSON.stringify({ - type: "hello", - minProtocol: 1, - maxProtocol: 1, - client: { - name: "test", - version: "1.0.0", - platform: "test", - mode: "test", - }, - caps: [], - }), - ); - await onceMessage(ws, (o) => o.type === "hello-ok"); + await connectOk(ws); // Emit a fake agent event directly through the shared emitter. const evtPromise = onceMessage( @@ -555,21 +569,7 @@ describe("gateway server", () => { { timeout: 8000 }, async () => { const { server, ws } = await startServerWithClient(); - ws.send( - JSON.stringify({ - type: "hello", - minProtocol: 1, - maxProtocol: 1, - client: { - name: "test", - version: "1.0.0", - platform: "test", - mode: "test", - }, - caps: [], - }), - ); - await onceMessage(ws, (o) => o.type === "hello-ok"); + await connectOk(ws); const ackP = onceMessage( ws, @@ -610,21 +610,7 @@ describe("gateway server", () => { { timeout: 8000 }, async () => { const { server, ws } = await startServerWithClient(); - ws.send( - JSON.stringify({ - type: "hello", - minProtocol: 1, - maxProtocol: 1, - client: { - name: "test", - version: "1.0.0", - platform: "test", - mode: "test", - }, - caps: [], - }), - ); - await onceMessage(ws, (o) => o.type === "hello-ok"); + await connectOk(ws); const firstFinalP = onceMessage( ws, @@ -665,21 +651,7 @@ describe("gateway server", () => { test("shutdown event is broadcast on close", { timeout: 8000 }, async () => { const { server, ws } = await startServerWithClient(); - ws.send( - JSON.stringify({ - type: "hello", - minProtocol: 1, - maxProtocol: 1, - client: { - name: "test", - version: "1.0.0", - platform: "test", - mode: "test", - }, - caps: [], - }), - ); - await onceMessage(ws, (o) => o.type === "hello-ok"); + await connectOk(ws); const shutdownP = onceMessage( ws, @@ -700,21 +672,7 @@ describe("gateway server", () => { const mkClient = async () => { const c = new WebSocket(`ws://127.0.0.1:${port}`); await new Promise((resolve) => c.once("open", resolve)); - c.send( - JSON.stringify({ - type: "hello", - minProtocol: 1, - maxProtocol: 1, - client: { - name: "test", - version: "1.0.0", - platform: "test", - mode: "test", - }, - caps: [], - }), - ); - await onceMessage(c, (o) => o.type === "hello-ok"); + await connectOk(c); return c; }; @@ -742,21 +700,7 @@ describe("gateway server", () => { test("send dedupes by idempotencyKey", { timeout: 8000 }, async () => { const { server, ws } = await startServerWithClient(); - ws.send( - JSON.stringify({ - type: "hello", - minProtocol: 1, - maxProtocol: 1, - client: { - name: "test", - version: "1.0.0", - platform: "test", - mode: "test", - }, - caps: [], - }), - ); - await onceMessage(ws, (o) => o.type === "hello-ok"); + await connectOk(ws); const idem = "same-key"; const res1P = onceMessage(ws, (o) => o.type === "res" && o.id === "a1"); @@ -789,21 +733,7 @@ describe("gateway server", () => { const dial = async () => { const ws = new WebSocket(`ws://127.0.0.1:${port}`); await new Promise((resolve) => ws.once("open", resolve)); - ws.send( - JSON.stringify({ - type: "hello", - minProtocol: 1, - maxProtocol: 1, - client: { - name: "test", - version: "1.0.0", - platform: "test", - mode: "test", - }, - caps: [], - }), - ); - await onceMessage(ws, (o) => o.type === "hello-ok"); + await connectOk(ws); return ws; }; @@ -849,16 +779,7 @@ describe("gateway server", () => { test("chat.send accepts image attachment", { timeout: 12000 }, async () => { const { server, ws } = await startServerWithClient(); - ws.send( - JSON.stringify({ - type: "hello", - minProtocol: 1, - maxProtocol: 1, - client: { name: "test", version: "1", platform: "test", mode: "test" }, - caps: [], - }), - ); - await onceMessage(ws, (o) => o.type === "hello-ok"); + await connectOk(ws); const reqId = "chat-img"; ws.send( @@ -916,16 +837,7 @@ describe("gateway server", () => { ); const { server, ws } = await startServerWithClient(); - ws.send( - JSON.stringify({ - type: "hello", - minProtocol: 1, - maxProtocol: 1, - client: { name: "test", version: "1", platform: "test", mode: "test" }, - caps: [], - }), - ); - await onceMessage(ws, (o) => o.type === "hello-ok"); + await connectOk(ws); const reqId = "chat-route"; ws.send( @@ -961,22 +873,15 @@ describe("gateway server", () => { test("presence includes client fingerprint", async () => { const { server, ws } = await startServerWithClient(); - ws.send( - JSON.stringify({ - type: "hello", - minProtocol: 1, - maxProtocol: 1, - client: { - name: "fingerprint", - version: "9.9.9", - platform: "test", - mode: "ui", - instanceId: "abc", - }, - caps: [], - }), - ); - await onceMessage(ws, (o) => o.type === "hello-ok"); + await connectOk(ws, { + client: { + name: "fingerprint", + version: "9.9.9", + platform: "test", + mode: "ui", + instanceId: "abc", + }, + }); const presenceP = onceMessage( ws, @@ -1005,22 +910,15 @@ describe("gateway server", () => { test("cli connections are not tracked as instances", async () => { const { server, ws } = await startServerWithClient(); const cliId = `cli-${randomUUID()}`; - ws.send( - JSON.stringify({ - type: "hello", - minProtocol: 1, - maxProtocol: 1, - client: { - name: "cli", - version: "dev", - platform: "test", - mode: "cli", - instanceId: cliId, - }, - caps: [], - }), - ); - await onceMessage(ws, (o) => o.type === "hello-ok"); + await connectOk(ws, { + client: { + name: "cli", + version: "dev", + platform: "test", + mode: "cli", + instanceId: cliId, + }, + }); const presenceP = onceMessage( ws, diff --git a/src/gateway/server.ts b/src/gateway/server.ts index 2eba1041d..21f3453d0 100644 --- a/src/gateway/server.ts +++ b/src/gateway/server.ts @@ -39,25 +39,25 @@ import { sendMessageWhatsApp } from "../web/outbound.js"; import { ensureWebChatServerFromConfig } from "../webchat/server.js"; import { buildMessageWithAttachments } from "./chat-attachments.js"; import { + type ConnectParams, ErrorCodes, type ErrorShape, errorShape, formatValidationErrors, - type Hello, PROTOCOL_VERSION, type RequestFrame, type Snapshot, validateAgentParams, validateChatHistoryParams, validateChatSendParams, - validateHello, + validateConnectParams, validateRequestFrame, validateSendParams, } from "./protocol/index.js"; type Client = { socket: WebSocket; - hello: Hello; + connect: ConnectParams; connId: string; presenceKey?: string; }; @@ -502,13 +502,10 @@ export async function startGatewayServer( const remoteAddr = ( socket as WebSocket & { _socket?: { remoteAddress?: string } } )._socket?.remoteAddress; - logWs("in", "connect", { connId, remoteAddr }); - const describeHello = (hello: Hello | null | undefined) => - hello - ? `${hello.client.name ?? "unknown"} ${hello.client.mode ?? "?"} v${hello.client.version ?? "?"}` - : "unknown"; - const isWebchatHello = (hello: Hello | null | undefined) => - hello?.client?.mode === "webchat" || hello?.client?.name === "webchat-ui"; + logWs("in", "open", { connId, remoteAddr }); + const isWebchatConnect = (params: ConnectParams | null | undefined) => + params?.client?.mode === "webchat" || + params?.client?.name === "webchat-ui"; const send = (obj: unknown) => { try { @@ -539,10 +536,10 @@ export async function startGatewayServer( socket.once("close", (code, reason) => { if (!client) { logWarn( - `gateway/ws closed before hello conn=${connId} remote=${remoteAddr ?? "?"} code=${code ?? "n/a"} reason=${reason?.toString() || "n/a"}`, + `gateway/ws closed before connect conn=${connId} remote=${remoteAddr ?? "?"} code=${code ?? "n/a"} reason=${reason?.toString() || "n/a"}`, ); } - if (client && isWebchatHello(client.hello)) { + if (client && isWebchatConnect(client.connect)) { logInfo( `webchat disconnected code=${code} reason=${reason?.toString() || "n/a"} conn=${connId}`, ); @@ -585,91 +582,115 @@ export async function startGatewayServer( try { const parsed = JSON.parse(text); if (!client) { - // Expect hello - if (!validateHello(parsed)) { - logWarn( - `gateway/ws invalid hello conn=${connId} remote=${remoteAddr ?? "?"}`, - ); - send({ - type: "hello-error", - reason: `invalid hello: ${formatValidationErrors(validateHello.errors)}`, - }); - socket.close(1008, "invalid hello"); + // Handshake must be a normal request: + // { type:"req", method:"connect", params: ConnectParams }. + if ( + !validateRequestFrame(parsed) || + (parsed as RequestFrame).method !== "connect" || + !validateConnectParams((parsed as RequestFrame).params) + ) { + if (validateRequestFrame(parsed)) { + const req = parsed as RequestFrame; + send({ + type: "res", + id: req.id, + ok: false, + error: errorShape( + ErrorCodes.INVALID_REQUEST, + req.method === "connect" + ? `invalid connect params: ${formatValidationErrors(validateConnectParams.errors)}` + : "invalid handshake: first request must be connect", + ), + }); + } else { + logWarn( + `gateway/ws invalid handshake conn=${connId} remote=${remoteAddr ?? "?"}`, + ); + } + socket.close(1008, "invalid handshake"); close(); return; } - const hello = parsed as Hello; + + const req = parsed as RequestFrame; + const connectParams = req.params as ConnectParams; + // protocol negotiation - const { minProtocol, maxProtocol } = hello; + const { minProtocol, maxProtocol } = connectParams; if ( maxProtocol < PROTOCOL_VERSION || minProtocol > PROTOCOL_VERSION ) { logWarn( - `gateway/ws protocol mismatch conn=${connId} remote=${remoteAddr ?? "?"} client=${describeHello(hello)}`, + `gateway/ws protocol mismatch conn=${connId} remote=${remoteAddr ?? "?"} client=${connectParams.client.name} ${connectParams.client.mode} v${connectParams.client.version}`, ); - logWs("out", "hello-error", { - connId, - reason: "protocol mismatch", - minProtocol, - maxProtocol, - expected: PROTOCOL_VERSION, - }); send({ - type: "hello-error", - reason: "protocol mismatch", - expectedProtocol: PROTOCOL_VERSION, + type: "res", + id: req.id, + ok: false, + error: errorShape( + ErrorCodes.INVALID_REQUEST, + "protocol mismatch", + { + details: { expectedProtocol: PROTOCOL_VERSION }, + }, + ), }); socket.close(1002, "protocol mismatch"); close(); return; } + // token auth if required const token = getGatewayToken(); - if (token && hello.auth?.token !== token) { + if (token && connectParams.auth?.token !== token) { logWarn( - `gateway/ws unauthorized conn=${connId} remote=${remoteAddr ?? "?"} client=${describeHello(hello)}`, + `gateway/ws unauthorized conn=${connId} remote=${remoteAddr ?? "?"} client=${connectParams.client.name} ${connectParams.client.mode} v${connectParams.client.version}`, ); - logWs("out", "hello-error", { connId, reason: "unauthorized" }); send({ - type: "hello-error", - reason: "unauthorized", + type: "res", + id: req.id, + ok: false, + error: errorShape(ErrorCodes.INVALID_REQUEST, "unauthorized"), }); socket.close(1008, "unauthorized"); close(); return; } - const shouldTrackPresence = hello.client.mode !== "cli"; - // synthesize presence entry for this connection (client fingerprint) + const shouldTrackPresence = connectParams.client.mode !== "cli"; const presenceKey = shouldTrackPresence - ? hello.client.instanceId || connId + ? connectParams.client.instanceId || connId : undefined; - logWs("in", "hello", { + + logWs("in", "connect", { connId, - client: hello.client.name, - version: hello.client.version, - mode: hello.client.mode, - instanceId: hello.client.instanceId, - platform: hello.client.platform, - token: hello.auth?.token ? "set" : "none", + client: connectParams.client.name, + version: connectParams.client.version, + mode: connectParams.client.mode, + instanceId: connectParams.client.instanceId, + platform: connectParams.client.platform, + token: connectParams.auth?.token ? "set" : "none", }); - if (isWebchatHello(hello)) { + + if (isWebchatConnect(connectParams)) { logInfo( - `webchat connected conn=${connId} remote=${remoteAddr ?? "?"} client=${describeHello(hello)}`, + `webchat connected conn=${connId} remote=${remoteAddr ?? "?"} client=${connectParams.client.name} ${connectParams.client.mode} v${connectParams.client.version}`, ); } + if (presenceKey) { upsertPresence(presenceKey, { - host: hello.client.name || os.hostname(), + host: connectParams.client.name || os.hostname(), ip: isLoopbackAddress(remoteAddr) ? undefined : remoteAddr, - version: hello.client.version, - mode: hello.client.mode, - instanceId: hello.client.instanceId, + version: connectParams.client.version, + mode: connectParams.client.mode, + instanceId: connectParams.client.instanceId, reason: "connect", }); presenceVersion += 1; } + const snapshot = buildSnapshot(); if (healthCache) { snapshot.health = healthCache; @@ -695,10 +716,10 @@ export async function startGatewayServer( tickIntervalMs: TICK_INTERVAL_MS, }, }; + clearTimeout(handshakeTimer); - // Add the client only after the hello response is ready so no tick/presence - // events reach it before the handshake completes. - client = { socket, hello, connId, presenceKey }; + client = { socket, connect: connectParams, connId, presenceKey }; + logWs("out", "hello-ok", { connId, methods: METHODS.length, @@ -706,11 +727,12 @@ export async function startGatewayServer( presence: snapshot.presence.length, stateVersion: snapshot.stateVersion.presence, }); - send(helloOk); + + send({ type: "res", id: req.id, ok: true, payload: helloOk }); + clients.add(client); - // Kick a health refresh in the background to keep cache warm. void refreshHealthSnapshot({ probe: true }).catch((err) => - logError(`post-hello health refresh failed: ${formatError(err)}`), + logError(`post-connect health refresh failed: ${formatError(err)}`), ); return; } @@ -751,6 +773,17 @@ export async function startGatewayServer( }; switch (req.method) { + case "connect": { + respond( + false, + undefined, + errorShape( + ErrorCodes.INVALID_REQUEST, + "connect is only valid as the first request", + ), + ); + break; + } case "health": { const now = Date.now(); const cached = healthCache;