diff --git a/apps/macos/Sources/Clawdis/ControlChannel.swift b/apps/macos/Sources/Clawdis/ControlChannel.swift index 76b87c27e..fe02db608 100644 --- a/apps/macos/Sources/Clawdis/ControlChannel.swift +++ b/apps/macos/Sources/Clawdis/ControlChannel.swift @@ -56,7 +56,7 @@ final class ControlChannel: ObservableObject { private let logger = Logger(subsystem: "com.steipete.clawdis", category: "control") - private var eventTokens: [NSObjectProtocol] = [] + private var eventTask: Task? private init() { self.startEventStream() @@ -198,42 +198,36 @@ final class ControlChannel: ObservableObject { } private func startEventStream() { - for tok in self.eventTokens { - NotificationCenter.default.removeObserver(tok) - } - self.eventTokens.removeAll() - let ev = NotificationCenter.default.addObserver( - forName: .gatewayEvent, - object: nil, - queue: .main) - { [weak self] note in - guard let self, - let frame = note.object as? GatewayFrame else { return } - switch frame { - case let .event(evt) where evt.event == "agent": - if let payload = evt.payload, - let payloadData = try? JSONEncoder().encode(payload), - let agent = try? JSONDecoder().decode(ControlAgentEvent.self, from: payloadData) - { - Task { @MainActor in - AgentEventStore.shared.append(agent) - self.routeWorkActivity(from: agent) - } + self.eventTask?.cancel() + self.eventTask = Task { [weak self] in + guard let self else { return } + let stream = await GatewayConnection.shared.subscribe() + for await push in stream { + if Task.isCancelled { return } + await MainActor.run { [weak self] in + self?.handle(push: push) } - case let .event(evt) where evt.event == "shutdown": - Task { @MainActor in self.state = .degraded("gateway shutdown") } - default: - break } } - let tick = NotificationCenter.default.addObserver( - forName: .gatewaySnapshot, - object: nil, - queue: .main) - { [weak self] _ in - Task { @MainActor [weak self] in self?.state = .connected } + } + + private func handle(push: GatewayPush) { + switch push { + case let .event(evt) where evt.event == "agent": + if let payload = evt.payload, + let payloadData = try? JSONEncoder().encode(payload), + let agent = try? JSONDecoder().decode(ControlAgentEvent.self, from: payloadData) + { + AgentEventStore.shared.append(agent) + self.routeWorkActivity(from: agent) + } + case let .event(evt) where evt.event == "shutdown": + self.state = .degraded("gateway shutdown") + case .snapshot: + self.state = .connected + default: + break } - self.eventTokens = [ev, tick] } private func routeWorkActivity(from event: ControlAgentEvent) { diff --git a/apps/macos/Sources/Clawdis/GatewayChannel.swift b/apps/macos/Sources/Clawdis/GatewayChannel.swift index e69e47ac3..f4118d602 100644 --- a/apps/macos/Sources/Clawdis/GatewayChannel.swift +++ b/apps/macos/Sources/Clawdis/GatewayChannel.swift @@ -51,22 +51,9 @@ struct WebSocketSessionBox: @unchecked Sendable { let session: any WebSocketSessioning } -struct GatewayEvent: Codable { - let type: String - let event: String? - let payload: AnyCodable? - let seq: Int? -} - // Avoid ambiguity with the app's own AnyCodable type. private typealias ProtoAnyCodable = ClawdisProtocol.AnyCodable -extension Notification.Name { - static let gatewaySnapshot = Notification.Name("clawdis.gateway.snapshot") - static let gatewayEvent = Notification.Name("clawdis.gateway.event") - static let gatewaySeqGap = Notification.Name("clawdis.gateway.seqgap") -} - actor GatewayChannelActor { private let logger = Logger(subsystem: "com.steipete.clawdis", category: "gateway") private var task: WebSocketTaskBox? @@ -87,11 +74,18 @@ actor GatewayChannelActor { private var watchdogTask: Task? private var tickTask: Task? private let defaultRequestTimeoutMs: Double = 15000 + private let pushHandler: (@Sendable (GatewayPush) async -> Void)? - init(url: URL, token: String?, session: WebSocketSessionBox? = nil) { + init( + url: URL, + token: String?, + session: WebSocketSessionBox? = nil, + pushHandler: (@Sendable (GatewayPush) async -> Void)? = nil) + { self.url = url self.token = token self.session = session?.session ?? URLSession(configuration: .default) + self.pushHandler = pushHandler Task { [weak self] in await self?.startWatchdog() } @@ -240,8 +234,7 @@ actor GatewayChannelActor { guard let self else { return } await self.watchTicks() } - let frame = GatewayFrame.helloOk(ok) - NotificationCenter.default.post(name: .gatewaySnapshot, object: frame) + await self.pushHandler?(.snapshot(ok)) return } if let err = try? decoder.decode(HelloError.self, from: data) { @@ -302,18 +295,15 @@ actor GatewayChannelActor { case let .event(evt): if let seq = evt.seq { if let last = lastSeq, seq > last + 1 { - NotificationCenter.default.post( - name: .gatewaySeqGap, - object: frame, - userInfo: ["expected": last + 1, "received": seq]) + await self.pushHandler?(.seqGap(expected: last + 1, received: seq)) } self.lastSeq = seq } if evt.event == "tick" { self.lastTick = Date() } - NotificationCenter.default.post(name: .gatewayEvent, object: frame) - case .helloOk: + await self.pushHandler?(.event(evt)) + case let .helloOk(ok): self.lastTick = Date() - NotificationCenter.default.post(name: .gatewaySnapshot, object: frame) + await self.pushHandler?(.snapshot(ok)) default: break } diff --git a/apps/macos/Sources/Clawdis/GatewayConnection.swift b/apps/macos/Sources/Clawdis/GatewayConnection.swift index 999dcf992..7d29d650a 100644 --- a/apps/macos/Sources/Clawdis/GatewayConnection.swift +++ b/apps/macos/Sources/Clawdis/GatewayConnection.swift @@ -1,3 +1,4 @@ +import ClawdisProtocol import Foundation /// Single, shared Gateway websocket connection for the whole app. @@ -16,6 +17,9 @@ actor GatewayConnection { private var configuredURL: URL? private var configuredToken: String? + private var subscribers: [UUID: AsyncStream.Continuation] = [:] + private var lastSnapshot: HelloOk? + init( configProvider: @escaping @Sendable () async throws -> Config = GatewayConnection.defaultConfigProvider, sessionBox: WebSocketSessionBox? = nil) @@ -50,6 +54,35 @@ actor GatewayConnection { self.client = nil self.configuredURL = nil self.configuredToken = nil + self.lastSnapshot = nil + } + + func subscribe(bufferingNewest: Int = 100) -> AsyncStream { + let id = UUID() + let snapshot = self.lastSnapshot + let connection = self + return AsyncStream(bufferingPolicy: .bufferingNewest(bufferingNewest)) { continuation in + if let snapshot { + continuation.yield(.snapshot(snapshot)) + } + self.subscribers[id] = continuation + continuation.onTermination = { @Sendable _ in + Task { await connection.removeSubscriber(id) } + } + } + } + + private func removeSubscriber(_ id: UUID) { + self.subscribers[id] = nil + } + + private func broadcast(_ push: GatewayPush) { + if case let .snapshot(snapshot) = push { + self.lastSnapshot = snapshot + } + for (_, continuation) in self.subscribers { + continuation.yield(push) + } } private func configure(url: URL, token: String?) async { @@ -59,11 +92,22 @@ actor GatewayConnection { if let client { await client.shutdown() } - self.client = GatewayChannelActor(url: url, token: token, session: self.sessionBox) + self.lastSnapshot = nil + self.client = GatewayChannelActor( + url: url, + token: token, + session: self.sessionBox, + pushHandler: { [weak self] push in + await self?.handle(push: push) + }) self.configuredURL = url self.configuredToken = token } + private func handle(push: GatewayPush) { + self.broadcast(push) + } + private static func defaultConfigProvider() async throws -> Config { let mode = await MainActor.run { AppStateStore.shared.connectionMode } let token = ProcessInfo.processInfo.environment["CLAWDIS_GATEWAY_TOKEN"] @@ -72,9 +116,13 @@ actor GatewayConnection { let port = GatewayEnvironment.gatewayPort() return (URL(string: "ws://127.0.0.1:\(port)")!, token) case .remote: - let forwarded = try await RemoteTunnelManager.shared.ensureControlTunnel() - return (URL(string: "ws://127.0.0.1:\(Int(forwarded))")!, token) + if let forwarded = await RemoteTunnelManager.shared.controlTunnelPortIfRunning() { + return (URL(string: "ws://127.0.0.1:\(Int(forwarded))")!, token) + } + throw NSError( + domain: "RemoteTunnel", + code: 2, + userInfo: [NSLocalizedDescriptionKey: "Remote mode is enabled, but the control tunnel is not active"]) } } } - diff --git a/apps/macos/Sources/Clawdis/GatewayProtocolSendable.swift b/apps/macos/Sources/Clawdis/GatewayProtocolSendable.swift new file mode 100644 index 000000000..cd2ce252a --- /dev/null +++ b/apps/macos/Sources/Clawdis/GatewayProtocolSendable.swift @@ -0,0 +1,7 @@ +import ClawdisProtocol + +// The generated gateway protocol models are value types, but they don't currently declare Sendable. +// We use them across actors via GatewayConnection's event stream, so mark them as unchecked. +extension HelloOk: @unchecked Sendable {} +extension EventFrame: @unchecked Sendable {} + diff --git a/apps/macos/Sources/Clawdis/GatewayPush.swift b/apps/macos/Sources/Clawdis/GatewayPush.swift new file mode 100644 index 000000000..02585ed9a --- /dev/null +++ b/apps/macos/Sources/Clawdis/GatewayPush.swift @@ -0,0 +1,14 @@ +import ClawdisProtocol + +/// Server-push messages from the gateway websocket. +/// +/// This is the in-process replacement for the legacy `NotificationCenter` fan-out. +enum GatewayPush: Sendable { + /// A full snapshot that arrives on connect (or reconnect). + case snapshot(HelloOk) + /// A server push event frame. + case event(EventFrame) + /// A detected sequence gap (`expected...received`) for event frames. + case seqGap(expected: Int, received: Int) +} + diff --git a/apps/macos/Sources/Clawdis/InstancesStore.swift b/apps/macos/Sources/Clawdis/InstancesStore.swift index cf6e26a70..d55423fa4 100644 --- a/apps/macos/Sources/Clawdis/InstancesStore.swift +++ b/apps/macos/Sources/Clawdis/InstancesStore.swift @@ -38,7 +38,7 @@ final class InstancesStore: ObservableObject { private let logger = Logger(subsystem: "com.steipete.clawdis", category: "instances") private var task: Task? private let interval: TimeInterval = 30 - private var observers: [NSObjectProtocol] = [] + private var eventTask: Task? private struct PresenceEventPayload: Codable { let presence: [PresenceEntry] @@ -51,7 +51,7 @@ final class InstancesStore: ObservableObject { func start() { guard !self.isPreview else { return } guard self.task == nil else { return } - self.observeGatewayEvents() + self.startGatewaySubscription() self.task = Task.detached { [weak self] in guard let self else { return } await self.refresh() @@ -65,56 +65,41 @@ final class InstancesStore: ObservableObject { func stop() { self.task?.cancel() self.task = nil - for token in self.observers { - NotificationCenter.default.removeObserver(token) - } - self.observers.removeAll() + self.eventTask?.cancel() + self.eventTask = nil } - private func observeGatewayEvents() { - let ev = NotificationCenter.default.addObserver( - forName: .gatewayEvent, - object: nil, - queue: .main) - { [weak self] note in - guard let self, - let frame = note.object as? GatewayFrame else { return } - switch frame { - case let .event(evt) where evt.event == "presence": - if let payload = evt.payload { - Task { @MainActor [weak self] in self?.handlePresenceEventPayload(payload) } - } - default: - break - } - } - let gap = NotificationCenter.default.addObserver( - forName: .gatewaySeqGap, - object: nil, - queue: .main) - { [weak self] _ in + private func startGatewaySubscription() { + self.eventTask?.cancel() + self.eventTask = Task { [weak self] in guard let self else { return } - Task { await self.refresh() } - } - let snap = NotificationCenter.default.addObserver( - forName: .gatewaySnapshot, - object: nil, - queue: .main) - { [weak self] note in - guard let self, - let frame = note.object as? GatewayFrame else { return } - switch frame { - case let .helloOk(hello): - if JSONSerialization.isValidJSONObject(hello.snapshot.presence), - let data = try? JSONEncoder().encode(hello.snapshot.presence) - { - Task { @MainActor [weak self] in self?.decodeAndApplyPresenceData(data) } + let stream = await GatewayConnection.shared.subscribe() + for await push in stream { + if Task.isCancelled { return } + await MainActor.run { [weak self] in + self?.handle(push: push) } - default: - break } } - self.observers = [ev, snap, gap] + } + + private func handle(push: GatewayPush) { + switch push { + case let .event(evt) where evt.event == "presence": + if let payload = evt.payload { + self.handlePresenceEventPayload(payload) + } + case .seqGap: + Task { await self.refresh() } + case let .snapshot(hello): + if JSONSerialization.isValidJSONObject(hello.snapshot.presence), + let data = try? JSONEncoder().encode(hello.snapshot.presence) + { + self.decodeAndApplyPresenceData(data) + } + default: + break + } } func refresh() async { diff --git a/apps/macos/Sources/Clawdis/RemoteTunnelManager.swift b/apps/macos/Sources/Clawdis/RemoteTunnelManager.swift index e1a6b7423..98e10d41b 100644 --- a/apps/macos/Sources/Clawdis/RemoteTunnelManager.swift +++ b/apps/macos/Sources/Clawdis/RemoteTunnelManager.swift @@ -6,6 +6,16 @@ actor RemoteTunnelManager { private var controlTunnel: WebChatTunnel? + func controlTunnelPortIfRunning() -> UInt16? { + if let tunnel = self.controlTunnel, + tunnel.process.isRunning, + let local = tunnel.localPort + { + return local + } + return nil + } + /// Ensure an SSH tunnel is running for the gateway control port. /// Returns the local forwarded port (usually 18789). func ensureControlTunnel() async throws -> UInt16 { @@ -17,12 +27,7 @@ actor RemoteTunnelManager { userInfo: [NSLocalizedDescriptionKey: "Remote mode is not enabled"]) } - if let tunnel = self.controlTunnel, - tunnel.process.isRunning, - let local = tunnel.localPort - { - return local - } + if let local = self.controlTunnelPortIfRunning() { return local } let desiredPort = UInt16(GatewayEnvironment.gatewayPort()) let tunnel = try await WebChatTunnel.create( diff --git a/apps/macos/Sources/Clawdis/WebChatSwiftUI.swift b/apps/macos/Sources/Clawdis/WebChatSwiftUI.swift index 388cbb00e..7913c5e42 100644 --- a/apps/macos/Sources/Clawdis/WebChatSwiftUI.swift +++ b/apps/macos/Sources/Clawdis/WebChatSwiftUI.swift @@ -4,9 +4,6 @@ import OSLog import SwiftUI import UniformTypeIdentifiers -extension GatewayFrame: @unchecked Sendable {} -extension EventFrame: @unchecked Sendable {} - private let webChatSwiftLogger = Logger(subsystem: "com.steipete.clawdis", category: "WebChatSwiftUI") private enum WebChatSwiftUILayout { @@ -79,25 +76,26 @@ final class WebChatViewModel: ObservableObject { @Published var healthOK: Bool = true private let sessionKey: String - private var eventToken: NSObjectProtocol? + private var eventTask: Task? private var pendingRuns = Set() init(sessionKey: String) { self.sessionKey = sessionKey - self.eventToken = NotificationCenter.default.addObserver( - forName: .gatewayEvent, - object: nil, - queue: .main) - { [weak self] note in - guard let frame = note.object as? GatewayFrame else { return } - Task { @MainActor in - self?.handleGatewayFrame(frame) + self.eventTask = Task { [weak self] in + guard let self else { return } + let stream = await GatewayConnection.shared.subscribe() + for await push in stream { + if Task.isCancelled { return } + guard case let .event(evt) = push else { continue } + await MainActor.run { [weak self] in + self?.handleGatewayEvent(evt) + } } } } deinit { - // Intentionally no cleanup; NotificationCenter observer is weakly captured and drops with this instance. + self.eventTask?.cancel() } func load() { @@ -212,8 +210,8 @@ final class WebChatViewModel: ObservableObject { return try JSONDecoder().decode(ChatHistoryPayload.self, from: data) } - private func handleGatewayFrame(_ frame: GatewayFrame) { - guard case let .event(evt) = frame, evt.event == "chat" else { return } + private func handleGatewayEvent(_ evt: EventFrame) { + guard evt.event == "chat" else { return } guard let payload = evt.payload else { return } guard let data = try? JSONEncoder().encode(payload) else { return } guard let chat = try? JSONDecoder().decode(ChatEventPayload.self, from: data) else { return } diff --git a/apps/macos/Tests/ClawdisIPCTests/GatewayChannelConfigureTests.swift b/apps/macos/Tests/ClawdisIPCTests/GatewayChannelConfigureTests.swift index f62330b54..2f3d5b35a 100644 --- a/apps/macos/Tests/ClawdisIPCTests/GatewayChannelConfigureTests.swift +++ b/apps/macos/Tests/ClawdisIPCTests/GatewayChannelConfigureTests.swift @@ -70,6 +70,11 @@ import Testing self.pendingReceiveHandler.withLock { $0 = completionHandler } } + func emitIncoming(_ data: Data) { + let handler = self.pendingReceiveHandler.withLock { $0 } + handler?(Result.success(.data(data))) + } + private static func helloOkData() -> Data { let json = """ { @@ -118,6 +123,10 @@ import Testing } } + func latestTask() -> FakeWebSocketTask? { + self.tasks.withLock { $0.last } + } + func makeWebSocketTask(url: URL) -> WebSocketTaskBox { _ = url self.makeCount.withLock { $0 += 1 } @@ -185,4 +194,74 @@ import Testing #expect(session.snapshotMakeCount() == 1) } + + @Test func subscribeReplaysLatestSnapshot() async throws { + let session = FakeWebSocketSession() + let url = URL(string: "ws://example.invalid")! + let cfg = ConfigSource(token: nil) + let conn = GatewayConnection( + configProvider: { (url, cfg.snapshotToken()) }, + sessionBox: WebSocketSessionBox(session: session)) + + _ = try await conn.request(method: "status", params: nil) + + let stream = await conn.subscribe(bufferingNewest: 10) + var iterator = stream.makeAsyncIterator() + let first = await iterator.next() + + guard case let .snapshot(snap) = first else { + Issue.record("expected snapshot, got \(String(describing: first))") + return + } + #expect(snap.type == "hello-ok") + } + + @Test func subscribeEmitsSeqGapBeforeEvent() async throws { + let session = FakeWebSocketSession() + let url = URL(string: "ws://example.invalid")! + let cfg = ConfigSource(token: nil) + let conn = GatewayConnection( + configProvider: { (url, cfg.snapshotToken()) }, + sessionBox: WebSocketSessionBox(session: session)) + + let stream = await conn.subscribe(bufferingNewest: 10) + var iterator = stream.makeAsyncIterator() + + _ = try await conn.request(method: "status", params: nil) + _ = await iterator.next() // snapshot + + let evt1 = Data( + """ + {"type":"event","event":"presence","payload":{"presence":[]},"seq":1} + """.utf8) + session.latestTask()?.emitIncoming(evt1) + + let firstEvent = await iterator.next() + guard case let .event(firstFrame) = firstEvent else { + Issue.record("expected event, got \(String(describing: firstEvent))") + return + } + #expect(firstFrame.seq == 1) + + let evt3 = Data( + """ + {"type":"event","event":"presence","payload":{"presence":[]},"seq":3} + """.utf8) + session.latestTask()?.emitIncoming(evt3) + + let gap = await iterator.next() + guard case let .seqGap(expected, received) = gap else { + Issue.record("expected seqGap, got \(String(describing: gap))") + return + } + #expect(expected == 2) + #expect(received == 3) + + let secondEvent = await iterator.next() + guard case let .event(secondFrame) = secondEvent else { + Issue.record("expected event, got \(String(describing: secondEvent))") + return + } + #expect(secondFrame.seq == 3) + } } diff --git a/docs/refactor/gateway.md b/docs/refactor/gateway.md new file mode 100644 index 000000000..79531285b --- /dev/null +++ b/docs/refactor/gateway.md @@ -0,0 +1,131 @@ +--- +summary: "Refactor notes for the macOS gateway client: single shared websocket + follow-ups" +read_when: + - Investigating duplicate/stale Gateway WS connections + - Refactoring macOS gateway client architecture + - Debugging noisy reconnect storms on gateway restart +--- +# Gateway Refactor Notes (macOS client) + +Last updated: 2025-12-12 + +This document captures the rationale and direction for the macOS app’s Gateway client refactor: **one shared websocket connection per app process**, plus follow-up improvements to simplify lifetimes and reduce “hidden” reconnection behavior. + +Related docs: +- `docs/refactor/new-arch.md` (overall gateway protocol/server plan) +- `docs/gateway.md` (gateway operations/runbook) +- `docs/presence.md` (presence semantics and dedupe) +- `docs/mac/webchat.md` (WebChat surfaces and debugging) + +--- + +## Background: what was wrong + +Symptoms: +- Restarting the gateway produced a *storm* of reconnects/log spam (`gateway/ws in connect`, `hello`, `hello-ok`) and elevated `clients=` counts. +- Even with “one panel open”, the mac app could hold tens of websocket connections to `ws://127.0.0.1:18789`. + +Root cause (historical bug): +- The mac app was repeatedly “reconfiguring” a gateway client on a timer (via health polling), creating a new websocket owner each time. +- Old websocket owners were not fully torn down and could keep watchdog/tick tasks alive, leading to **connection accumulation** over time. + +--- + +## Current architecture (as of 2025-12-12) + +Goal: enforce the invariant **“one gateway websocket per app process (per effective config)”**. + +Key elements: +- `GatewayConnection.shared` owns the one websocket and is the *only* supported entry point for app code that needs gateway RPC. +- Consumers (e.g. Control UI, Agent RPC, SwiftUI WebChat) call `GatewayConnection.shared.request(...)` and do not create their own sockets. +- If the effective connection config changes (local ↔ remote tunnel port, token change), `GatewayConnection` replaces the underlying connection. +- Server-push frames are delivered via `GatewayConnection.shared.subscribe(...) -> AsyncStream`, which is the in-process event bus (no `NotificationCenter`). + +Notes: +- Remote mode requires an SSH control tunnel. `GatewayConnection` **does not** start tunnels; it consumes the already-established forwarded port (owned by `ConnectionModeCoordinator` / `RemoteTunnelManager`). + +--- + +## Design constraints / principles + +- **Single ownership:** Exactly one component owns the actual socket and reconnect policy. +- **Explicit config changes:** Recreate/reconnect only when config changes, not as a side effect of periodic work. +- **No implicit fan-out sockets:** Adding new UI features must not accidentally add new persistent gateway connections. +- **Testable seams:** Connection config and websocket session creation should be overridable in tests. + +--- + +## Follow-up refactors (recommended) + +### Status (as of 2025-12-12) + +- ✅ One shared websocket per app process (per config) +- ✅ Event streaming moved into `GatewayConnection` (`AsyncStream`) +- ✅ `NotificationCenter` removed for in-process gateway events +- ✅ `GatewayConnection` no longer implicitly starts the remote control tunnel +- ⏳ Further separation of concerns (polish/cleanup): push parsing helpers + clearer UI adapters +- ⏳ Optional: a dedicated “resolved endpoint” publisher for remote mode (to make mode transitions observable) + +### 1) Move event streaming into `GatewayConnection` (done) + +Implemented: +- `GatewayChannelActor` no longer posts global notifications; it forwards pushes to `GatewayConnection` via a callback. +- `GatewayConnection` fans out pushes via `subscribe(...) -> AsyncStream` and replays the latest snapshot to new subscribers. + +### 2) Replace `NotificationCenter` for in-process events (done) + +Implemented: +- `ControlChannel`, `InstancesStore`, and SwiftUI WebChat now subscribe to `GatewayConnection` directly. +- This removed the risk of leaking `NotificationCenter` observer tokens when views/controllers churn. + +### 3) Separate control-plane vs chat-plane concerns (partially done) + +As features grow, split responsibilities: +- **RPC layer**: request/response, retries, timeouts. +- **Event bus**: typed gateway events with buffering/backpressure. +- **UI adapters**: user-facing state and error mapping. + +This reduces the risk that “a UI refresh” causes connection or tunnel side effects. + +Notes: +- The RPC layer and event bus are now centralized in `GatewayConnection`. +- There’s still room to extract small helpers for decoding specific event payloads (agent/chat/presence) so UI code stays thin. + +### 4) Centralize tunnel lifecycle (remote mode) (done for GatewayConnection) + +Previously, “first request wins” could implicitly start/ensure a tunnel (via `GatewayConnection`’s default config provider). + +Now: +- `GatewayConnection` uses the already-running forwarded port from `RemoteTunnelManager` and will error if remote mode is enabled but no tunnel is active. +- Remote tunnel lifecycle is owned by mode/application coordinators (e.g. `ConnectionModeCoordinator`), not by incidental RPC calls. + +Future improvement: +- A dedicated coordinator that owns remote tunnel lifecycle and publishes a resolved endpoint. +- `GatewayConnection` consumes that endpoint rather than calling into tunnel code itself. + +This makes remote mode behavior easier to reason about (and test). + +--- + +## Testing strategy (what we want to cover) + +Minimum invariants: +- Repeated requests under the same config do **not** create additional websocket tasks. +- Concurrent requests still create **exactly one** websocket and reuse it. +- Shutdown prevents any reconnect loop after failures. +- Config changes (token / endpoint) cancel the old socket and reconnect once. + +Nice-to-have integration coverage: +- Multiple “consumers” (Control UI + Agent RPC + SwiftUI WebChat) all call through the shared connection and still produce only one websocket. + +Additional coverage added (macOS): +- Subscribing after connect replays the latest snapshot. +- Sequence gaps emit an explicit `GatewayPush.seqGap(...)` before the corresponding event. + +--- + +## Debug notes (operational) + +When diagnosing “too many connections”: +- Prefer counting actual TCP connections on port 18789 and grouping by PID to see which process is holding sockets. +- Gateway `--verbose` prints *every* connect/hello and event broadcast; use it only when needed and filter output if you’re just sanity-checking.