refactor(macos): replace gateway NotificationCenter with event bus

This commit is contained in:
Peter Steinberger
2025-12-12 22:06:40 +00:00
parent 9cf457be0a
commit 6a7f955818
10 changed files with 378 additions and 127 deletions

View File

@@ -56,7 +56,7 @@ final class ControlChannel: ObservableObject {
private let logger = Logger(subsystem: "com.steipete.clawdis", category: "control")
private var eventTokens: [NSObjectProtocol] = []
private var eventTask: Task<Void, Never>?
private init() {
self.startEventStream()
@@ -198,42 +198,36 @@ final class ControlChannel: ObservableObject {
}
private func startEventStream() {
for tok in self.eventTokens {
NotificationCenter.default.removeObserver(tok)
}
self.eventTokens.removeAll()
let ev = NotificationCenter.default.addObserver(
forName: .gatewayEvent,
object: nil,
queue: .main)
{ [weak self] note in
guard let self,
let frame = note.object as? GatewayFrame else { return }
switch frame {
case let .event(evt) where evt.event == "agent":
if let payload = evt.payload,
let payloadData = try? JSONEncoder().encode(payload),
let agent = try? JSONDecoder().decode(ControlAgentEvent.self, from: payloadData)
{
Task { @MainActor in
AgentEventStore.shared.append(agent)
self.routeWorkActivity(from: agent)
}
self.eventTask?.cancel()
self.eventTask = Task { [weak self] in
guard let self else { return }
let stream = await GatewayConnection.shared.subscribe()
for await push in stream {
if Task.isCancelled { return }
await MainActor.run { [weak self] in
self?.handle(push: push)
}
case let .event(evt) where evt.event == "shutdown":
Task { @MainActor in self.state = .degraded("gateway shutdown") }
default:
break
}
}
let tick = NotificationCenter.default.addObserver(
forName: .gatewaySnapshot,
object: nil,
queue: .main)
{ [weak self] _ in
Task { @MainActor [weak self] in self?.state = .connected }
}
private func handle(push: GatewayPush) {
switch push {
case let .event(evt) where evt.event == "agent":
if let payload = evt.payload,
let payloadData = try? JSONEncoder().encode(payload),
let agent = try? JSONDecoder().decode(ControlAgentEvent.self, from: payloadData)
{
AgentEventStore.shared.append(agent)
self.routeWorkActivity(from: agent)
}
case let .event(evt) where evt.event == "shutdown":
self.state = .degraded("gateway shutdown")
case .snapshot:
self.state = .connected
default:
break
}
self.eventTokens = [ev, tick]
}
private func routeWorkActivity(from event: ControlAgentEvent) {

View File

@@ -51,22 +51,9 @@ struct WebSocketSessionBox: @unchecked Sendable {
let session: any WebSocketSessioning
}
struct GatewayEvent: Codable {
let type: String
let event: String?
let payload: AnyCodable?
let seq: Int?
}
// Avoid ambiguity with the app's own AnyCodable type.
private typealias ProtoAnyCodable = ClawdisProtocol.AnyCodable
extension Notification.Name {
static let gatewaySnapshot = Notification.Name("clawdis.gateway.snapshot")
static let gatewayEvent = Notification.Name("clawdis.gateway.event")
static let gatewaySeqGap = Notification.Name("clawdis.gateway.seqgap")
}
actor GatewayChannelActor {
private let logger = Logger(subsystem: "com.steipete.clawdis", category: "gateway")
private var task: WebSocketTaskBox?
@@ -87,11 +74,18 @@ actor GatewayChannelActor {
private var watchdogTask: Task<Void, Never>?
private var tickTask: Task<Void, Never>?
private let defaultRequestTimeoutMs: Double = 15000
private let pushHandler: (@Sendable (GatewayPush) async -> Void)?
init(url: URL, token: String?, session: WebSocketSessionBox? = nil) {
init(
url: URL,
token: String?,
session: WebSocketSessionBox? = nil,
pushHandler: (@Sendable (GatewayPush) async -> Void)? = nil)
{
self.url = url
self.token = token
self.session = session?.session ?? URLSession(configuration: .default)
self.pushHandler = pushHandler
Task { [weak self] in
await self?.startWatchdog()
}
@@ -240,8 +234,7 @@ actor GatewayChannelActor {
guard let self else { return }
await self.watchTicks()
}
let frame = GatewayFrame.helloOk(ok)
NotificationCenter.default.post(name: .gatewaySnapshot, object: frame)
await self.pushHandler?(.snapshot(ok))
return
}
if let err = try? decoder.decode(HelloError.self, from: data) {
@@ -302,18 +295,15 @@ actor GatewayChannelActor {
case let .event(evt):
if let seq = evt.seq {
if let last = lastSeq, seq > last + 1 {
NotificationCenter.default.post(
name: .gatewaySeqGap,
object: frame,
userInfo: ["expected": last + 1, "received": seq])
await self.pushHandler?(.seqGap(expected: last + 1, received: seq))
}
self.lastSeq = seq
}
if evt.event == "tick" { self.lastTick = Date() }
NotificationCenter.default.post(name: .gatewayEvent, object: frame)
case .helloOk:
await self.pushHandler?(.event(evt))
case let .helloOk(ok):
self.lastTick = Date()
NotificationCenter.default.post(name: .gatewaySnapshot, object: frame)
await self.pushHandler?(.snapshot(ok))
default:
break
}

View File

@@ -1,3 +1,4 @@
import ClawdisProtocol
import Foundation
/// Single, shared Gateway websocket connection for the whole app.
@@ -16,6 +17,9 @@ actor GatewayConnection {
private var configuredURL: URL?
private var configuredToken: String?
private var subscribers: [UUID: AsyncStream<GatewayPush>.Continuation] = [:]
private var lastSnapshot: HelloOk?
init(
configProvider: @escaping @Sendable () async throws -> Config = GatewayConnection.defaultConfigProvider,
sessionBox: WebSocketSessionBox? = nil)
@@ -50,6 +54,35 @@ actor GatewayConnection {
self.client = nil
self.configuredURL = nil
self.configuredToken = nil
self.lastSnapshot = nil
}
func subscribe(bufferingNewest: Int = 100) -> AsyncStream<GatewayPush> {
let id = UUID()
let snapshot = self.lastSnapshot
let connection = self
return AsyncStream(bufferingPolicy: .bufferingNewest(bufferingNewest)) { continuation in
if let snapshot {
continuation.yield(.snapshot(snapshot))
}
self.subscribers[id] = continuation
continuation.onTermination = { @Sendable _ in
Task { await connection.removeSubscriber(id) }
}
}
}
private func removeSubscriber(_ id: UUID) {
self.subscribers[id] = nil
}
private func broadcast(_ push: GatewayPush) {
if case let .snapshot(snapshot) = push {
self.lastSnapshot = snapshot
}
for (_, continuation) in self.subscribers {
continuation.yield(push)
}
}
private func configure(url: URL, token: String?) async {
@@ -59,11 +92,22 @@ actor GatewayConnection {
if let client {
await client.shutdown()
}
self.client = GatewayChannelActor(url: url, token: token, session: self.sessionBox)
self.lastSnapshot = nil
self.client = GatewayChannelActor(
url: url,
token: token,
session: self.sessionBox,
pushHandler: { [weak self] push in
await self?.handle(push: push)
})
self.configuredURL = url
self.configuredToken = token
}
private func handle(push: GatewayPush) {
self.broadcast(push)
}
private static func defaultConfigProvider() async throws -> Config {
let mode = await MainActor.run { AppStateStore.shared.connectionMode }
let token = ProcessInfo.processInfo.environment["CLAWDIS_GATEWAY_TOKEN"]
@@ -72,9 +116,13 @@ actor GatewayConnection {
let port = GatewayEnvironment.gatewayPort()
return (URL(string: "ws://127.0.0.1:\(port)")!, token)
case .remote:
let forwarded = try await RemoteTunnelManager.shared.ensureControlTunnel()
return (URL(string: "ws://127.0.0.1:\(Int(forwarded))")!, token)
if let forwarded = await RemoteTunnelManager.shared.controlTunnelPortIfRunning() {
return (URL(string: "ws://127.0.0.1:\(Int(forwarded))")!, token)
}
throw NSError(
domain: "RemoteTunnel",
code: 2,
userInfo: [NSLocalizedDescriptionKey: "Remote mode is enabled, but the control tunnel is not active"])
}
}
}

View File

@@ -0,0 +1,7 @@
import ClawdisProtocol
// The generated gateway protocol models are value types, but they don't currently declare Sendable.
// We use them across actors via GatewayConnection's event stream, so mark them as unchecked.
extension HelloOk: @unchecked Sendable {}
extension EventFrame: @unchecked Sendable {}

View File

@@ -0,0 +1,14 @@
import ClawdisProtocol
/// Server-push messages from the gateway websocket.
///
/// This is the in-process replacement for the legacy `NotificationCenter` fan-out.
enum GatewayPush: Sendable {
/// A full snapshot that arrives on connect (or reconnect).
case snapshot(HelloOk)
/// A server push event frame.
case event(EventFrame)
/// A detected sequence gap (`expected...received`) for event frames.
case seqGap(expected: Int, received: Int)
}

View File

@@ -38,7 +38,7 @@ final class InstancesStore: ObservableObject {
private let logger = Logger(subsystem: "com.steipete.clawdis", category: "instances")
private var task: Task<Void, Never>?
private let interval: TimeInterval = 30
private var observers: [NSObjectProtocol] = []
private var eventTask: Task<Void, Never>?
private struct PresenceEventPayload: Codable {
let presence: [PresenceEntry]
@@ -51,7 +51,7 @@ final class InstancesStore: ObservableObject {
func start() {
guard !self.isPreview else { return }
guard self.task == nil else { return }
self.observeGatewayEvents()
self.startGatewaySubscription()
self.task = Task.detached { [weak self] in
guard let self else { return }
await self.refresh()
@@ -65,56 +65,41 @@ final class InstancesStore: ObservableObject {
func stop() {
self.task?.cancel()
self.task = nil
for token in self.observers {
NotificationCenter.default.removeObserver(token)
}
self.observers.removeAll()
self.eventTask?.cancel()
self.eventTask = nil
}
private func observeGatewayEvents() {
let ev = NotificationCenter.default.addObserver(
forName: .gatewayEvent,
object: nil,
queue: .main)
{ [weak self] note in
guard let self,
let frame = note.object as? GatewayFrame else { return }
switch frame {
case let .event(evt) where evt.event == "presence":
if let payload = evt.payload {
Task { @MainActor [weak self] in self?.handlePresenceEventPayload(payload) }
}
default:
break
}
}
let gap = NotificationCenter.default.addObserver(
forName: .gatewaySeqGap,
object: nil,
queue: .main)
{ [weak self] _ in
private func startGatewaySubscription() {
self.eventTask?.cancel()
self.eventTask = Task { [weak self] in
guard let self else { return }
Task { await self.refresh() }
}
let snap = NotificationCenter.default.addObserver(
forName: .gatewaySnapshot,
object: nil,
queue: .main)
{ [weak self] note in
guard let self,
let frame = note.object as? GatewayFrame else { return }
switch frame {
case let .helloOk(hello):
if JSONSerialization.isValidJSONObject(hello.snapshot.presence),
let data = try? JSONEncoder().encode(hello.snapshot.presence)
{
Task { @MainActor [weak self] in self?.decodeAndApplyPresenceData(data) }
let stream = await GatewayConnection.shared.subscribe()
for await push in stream {
if Task.isCancelled { return }
await MainActor.run { [weak self] in
self?.handle(push: push)
}
default:
break
}
}
self.observers = [ev, snap, gap]
}
private func handle(push: GatewayPush) {
switch push {
case let .event(evt) where evt.event == "presence":
if let payload = evt.payload {
self.handlePresenceEventPayload(payload)
}
case .seqGap:
Task { await self.refresh() }
case let .snapshot(hello):
if JSONSerialization.isValidJSONObject(hello.snapshot.presence),
let data = try? JSONEncoder().encode(hello.snapshot.presence)
{
self.decodeAndApplyPresenceData(data)
}
default:
break
}
}
func refresh() async {

View File

@@ -6,6 +6,16 @@ actor RemoteTunnelManager {
private var controlTunnel: WebChatTunnel?
func controlTunnelPortIfRunning() -> UInt16? {
if let tunnel = self.controlTunnel,
tunnel.process.isRunning,
let local = tunnel.localPort
{
return local
}
return nil
}
/// Ensure an SSH tunnel is running for the gateway control port.
/// Returns the local forwarded port (usually 18789).
func ensureControlTunnel() async throws -> UInt16 {
@@ -17,12 +27,7 @@ actor RemoteTunnelManager {
userInfo: [NSLocalizedDescriptionKey: "Remote mode is not enabled"])
}
if let tunnel = self.controlTunnel,
tunnel.process.isRunning,
let local = tunnel.localPort
{
return local
}
if let local = self.controlTunnelPortIfRunning() { return local }
let desiredPort = UInt16(GatewayEnvironment.gatewayPort())
let tunnel = try await WebChatTunnel.create(

View File

@@ -4,9 +4,6 @@ import OSLog
import SwiftUI
import UniformTypeIdentifiers
extension GatewayFrame: @unchecked Sendable {}
extension EventFrame: @unchecked Sendable {}
private let webChatSwiftLogger = Logger(subsystem: "com.steipete.clawdis", category: "WebChatSwiftUI")
private enum WebChatSwiftUILayout {
@@ -79,25 +76,26 @@ final class WebChatViewModel: ObservableObject {
@Published var healthOK: Bool = true
private let sessionKey: String
private var eventToken: NSObjectProtocol?
private var eventTask: Task<Void, Never>?
private var pendingRuns = Set<String>()
init(sessionKey: String) {
self.sessionKey = sessionKey
self.eventToken = NotificationCenter.default.addObserver(
forName: .gatewayEvent,
object: nil,
queue: .main)
{ [weak self] note in
guard let frame = note.object as? GatewayFrame else { return }
Task { @MainActor in
self?.handleGatewayFrame(frame)
self.eventTask = Task { [weak self] in
guard let self else { return }
let stream = await GatewayConnection.shared.subscribe()
for await push in stream {
if Task.isCancelled { return }
guard case let .event(evt) = push else { continue }
await MainActor.run { [weak self] in
self?.handleGatewayEvent(evt)
}
}
}
}
deinit {
// Intentionally no cleanup; NotificationCenter observer is weakly captured and drops with this instance.
self.eventTask?.cancel()
}
func load() {
@@ -212,8 +210,8 @@ final class WebChatViewModel: ObservableObject {
return try JSONDecoder().decode(ChatHistoryPayload.self, from: data)
}
private func handleGatewayFrame(_ frame: GatewayFrame) {
guard case let .event(evt) = frame, evt.event == "chat" else { return }
private func handleGatewayEvent(_ evt: EventFrame) {
guard evt.event == "chat" else { return }
guard let payload = evt.payload else { return }
guard let data = try? JSONEncoder().encode(payload) else { return }
guard let chat = try? JSONDecoder().decode(ChatEventPayload.self, from: data) else { return }

View File

@@ -70,6 +70,11 @@ import Testing
self.pendingReceiveHandler.withLock { $0 = completionHandler }
}
func emitIncoming(_ data: Data) {
let handler = self.pendingReceiveHandler.withLock { $0 }
handler?(Result<URLSessionWebSocketTask.Message, Error>.success(.data(data)))
}
private static func helloOkData() -> Data {
let json = """
{
@@ -118,6 +123,10 @@ import Testing
}
}
func latestTask() -> FakeWebSocketTask? {
self.tasks.withLock { $0.last }
}
func makeWebSocketTask(url: URL) -> WebSocketTaskBox {
_ = url
self.makeCount.withLock { $0 += 1 }
@@ -185,4 +194,74 @@ import Testing
#expect(session.snapshotMakeCount() == 1)
}
@Test func subscribeReplaysLatestSnapshot() async throws {
let session = FakeWebSocketSession()
let url = URL(string: "ws://example.invalid")!
let cfg = ConfigSource(token: nil)
let conn = GatewayConnection(
configProvider: { (url, cfg.snapshotToken()) },
sessionBox: WebSocketSessionBox(session: session))
_ = try await conn.request(method: "status", params: nil)
let stream = await conn.subscribe(bufferingNewest: 10)
var iterator = stream.makeAsyncIterator()
let first = await iterator.next()
guard case let .snapshot(snap) = first else {
Issue.record("expected snapshot, got \(String(describing: first))")
return
}
#expect(snap.type == "hello-ok")
}
@Test func subscribeEmitsSeqGapBeforeEvent() async throws {
let session = FakeWebSocketSession()
let url = URL(string: "ws://example.invalid")!
let cfg = ConfigSource(token: nil)
let conn = GatewayConnection(
configProvider: { (url, cfg.snapshotToken()) },
sessionBox: WebSocketSessionBox(session: session))
let stream = await conn.subscribe(bufferingNewest: 10)
var iterator = stream.makeAsyncIterator()
_ = try await conn.request(method: "status", params: nil)
_ = await iterator.next() // snapshot
let evt1 = Data(
"""
{"type":"event","event":"presence","payload":{"presence":[]},"seq":1}
""".utf8)
session.latestTask()?.emitIncoming(evt1)
let firstEvent = await iterator.next()
guard case let .event(firstFrame) = firstEvent else {
Issue.record("expected event, got \(String(describing: firstEvent))")
return
}
#expect(firstFrame.seq == 1)
let evt3 = Data(
"""
{"type":"event","event":"presence","payload":{"presence":[]},"seq":3}
""".utf8)
session.latestTask()?.emitIncoming(evt3)
let gap = await iterator.next()
guard case let .seqGap(expected, received) = gap else {
Issue.record("expected seqGap, got \(String(describing: gap))")
return
}
#expect(expected == 2)
#expect(received == 3)
let secondEvent = await iterator.next()
guard case let .event(secondFrame) = secondEvent else {
Issue.record("expected event, got \(String(describing: secondEvent))")
return
}
#expect(secondFrame.seq == 3)
}
}

131
docs/refactor/gateway.md Normal file
View File

@@ -0,0 +1,131 @@
---
summary: "Refactor notes for the macOS gateway client: single shared websocket + follow-ups"
read_when:
- Investigating duplicate/stale Gateway WS connections
- Refactoring macOS gateway client architecture
- Debugging noisy reconnect storms on gateway restart
---
# Gateway Refactor Notes (macOS client)
Last updated: 2025-12-12
This document captures the rationale and direction for the macOS apps Gateway client refactor: **one shared websocket connection per app process**, plus follow-up improvements to simplify lifetimes and reduce “hidden” reconnection behavior.
Related docs:
- `docs/refactor/new-arch.md` (overall gateway protocol/server plan)
- `docs/gateway.md` (gateway operations/runbook)
- `docs/presence.md` (presence semantics and dedupe)
- `docs/mac/webchat.md` (WebChat surfaces and debugging)
---
## Background: what was wrong
Symptoms:
- Restarting the gateway produced a *storm* of reconnects/log spam (`gateway/ws in connect`, `hello`, `hello-ok`) and elevated `clients=` counts.
- Even with “one panel open”, the mac app could hold tens of websocket connections to `ws://127.0.0.1:18789`.
Root cause (historical bug):
- The mac app was repeatedly “reconfiguring” a gateway client on a timer (via health polling), creating a new websocket owner each time.
- Old websocket owners were not fully torn down and could keep watchdog/tick tasks alive, leading to **connection accumulation** over time.
---
## Current architecture (as of 2025-12-12)
Goal: enforce the invariant **“one gateway websocket per app process (per effective config)”**.
Key elements:
- `GatewayConnection.shared` owns the one websocket and is the *only* supported entry point for app code that needs gateway RPC.
- Consumers (e.g. Control UI, Agent RPC, SwiftUI WebChat) call `GatewayConnection.shared.request(...)` and do not create their own sockets.
- If the effective connection config changes (local ↔ remote tunnel port, token change), `GatewayConnection` replaces the underlying connection.
- Server-push frames are delivered via `GatewayConnection.shared.subscribe(...) -> AsyncStream<GatewayPush>`, which is the in-process event bus (no `NotificationCenter`).
Notes:
- Remote mode requires an SSH control tunnel. `GatewayConnection` **does not** start tunnels; it consumes the already-established forwarded port (owned by `ConnectionModeCoordinator` / `RemoteTunnelManager`).
---
## Design constraints / principles
- **Single ownership:** Exactly one component owns the actual socket and reconnect policy.
- **Explicit config changes:** Recreate/reconnect only when config changes, not as a side effect of periodic work.
- **No implicit fan-out sockets:** Adding new UI features must not accidentally add new persistent gateway connections.
- **Testable seams:** Connection config and websocket session creation should be overridable in tests.
---
## Follow-up refactors (recommended)
### Status (as of 2025-12-12)
- ✅ One shared websocket per app process (per config)
- ✅ Event streaming moved into `GatewayConnection` (`AsyncStream<GatewayPush>`)
-`NotificationCenter` removed for in-process gateway events
-`GatewayConnection` no longer implicitly starts the remote control tunnel
- ⏳ Further separation of concerns (polish/cleanup): push parsing helpers + clearer UI adapters
- ⏳ Optional: a dedicated “resolved endpoint” publisher for remote mode (to make mode transitions observable)
### 1) Move event streaming into `GatewayConnection` (done)
Implemented:
- `GatewayChannelActor` no longer posts global notifications; it forwards pushes to `GatewayConnection` via a callback.
- `GatewayConnection` fans out pushes via `subscribe(...) -> AsyncStream<GatewayPush>` and replays the latest snapshot to new subscribers.
### 2) Replace `NotificationCenter` for in-process events (done)
Implemented:
- `ControlChannel`, `InstancesStore`, and SwiftUI WebChat now subscribe to `GatewayConnection` directly.
- This removed the risk of leaking `NotificationCenter` observer tokens when views/controllers churn.
### 3) Separate control-plane vs chat-plane concerns (partially done)
As features grow, split responsibilities:
- **RPC layer**: request/response, retries, timeouts.
- **Event bus**: typed gateway events with buffering/backpressure.
- **UI adapters**: user-facing state and error mapping.
This reduces the risk that “a UI refresh” causes connection or tunnel side effects.
Notes:
- The RPC layer and event bus are now centralized in `GatewayConnection`.
- Theres still room to extract small helpers for decoding specific event payloads (agent/chat/presence) so UI code stays thin.
### 4) Centralize tunnel lifecycle (remote mode) (done for GatewayConnection)
Previously, “first request wins” could implicitly start/ensure a tunnel (via `GatewayConnection`s default config provider).
Now:
- `GatewayConnection` uses the already-running forwarded port from `RemoteTunnelManager` and will error if remote mode is enabled but no tunnel is active.
- Remote tunnel lifecycle is owned by mode/application coordinators (e.g. `ConnectionModeCoordinator`), not by incidental RPC calls.
Future improvement:
- A dedicated coordinator that owns remote tunnel lifecycle and publishes a resolved endpoint.
- `GatewayConnection` consumes that endpoint rather than calling into tunnel code itself.
This makes remote mode behavior easier to reason about (and test).
---
## Testing strategy (what we want to cover)
Minimum invariants:
- Repeated requests under the same config do **not** create additional websocket tasks.
- Concurrent requests still create **exactly one** websocket and reuse it.
- Shutdown prevents any reconnect loop after failures.
- Config changes (token / endpoint) cancel the old socket and reconnect once.
Nice-to-have integration coverage:
- Multiple “consumers” (Control UI + Agent RPC + SwiftUI WebChat) all call through the shared connection and still produce only one websocket.
Additional coverage added (macOS):
- Subscribing after connect replays the latest snapshot.
- Sequence gaps emit an explicit `GatewayPush.seqGap(...)` before the corresponding event.
---
## Debug notes (operational)
When diagnosing “too many connections”:
- Prefer counting actual TCP connections on port 18789 and grouping by PID to see which process is holding sockets.
- Gateway `--verbose` prints *every* connect/hello and event broadcast; use it only when needed and filter output if youre just sanity-checking.