diff --git a/apps/macos/Sources/Clawdbot/ConnectionModeCoordinator.swift b/apps/macos/Sources/Clawdbot/ConnectionModeCoordinator.swift index b1f893b1e..00f93bd85 100644 --- a/apps/macos/Sources/Clawdbot/ConnectionModeCoordinator.swift +++ b/apps/macos/Sources/Clawdbot/ConnectionModeCoordinator.swift @@ -6,15 +6,20 @@ final class ConnectionModeCoordinator { static let shared = ConnectionModeCoordinator() private let logger = Logger(subsystem: "com.clawdbot", category: "connection") + private var lastMode: AppState.ConnectionMode? /// Apply the requested connection mode by starting/stopping local gateway, /// managing the control-channel SSH tunnel, and cleaning up chat windows/panels. func apply(mode: AppState.ConnectionMode, paused: Bool) async { + if let lastMode = self.lastMode, lastMode != mode { + GatewayProcessManager.shared.clearLastFailure() + NodesStore.shared.lastError = nil + } + self.lastMode = mode switch mode { case .unconfigured: - if let error = await NodeServiceManager.stop() { - NodesStore.shared.lastError = "Node service stop failed: \(error)" - } + _ = await NodeServiceManager.stop() + NodesStore.shared.lastError = nil await RemoteTunnelManager.shared.stopAll() WebChatManager.shared.resetTunnels() GatewayProcessManager.shared.stop() @@ -23,9 +28,8 @@ final class ConnectionModeCoordinator { Task.detached { await PortGuardian.shared.sweep(mode: .unconfigured) } case .local: - if let error = await NodeServiceManager.stop() { - NodesStore.shared.lastError = "Node service stop failed: \(error)" - } + _ = await NodeServiceManager.stop() + NodesStore.shared.lastError = nil await RemoteTunnelManager.shared.stopAll() WebChatManager.shared.resetTunnels() let shouldStart = GatewayAutostartPolicy.shouldStartGateway(mode: .local, paused: paused) @@ -56,6 +60,7 @@ final class ConnectionModeCoordinator { WebChatManager.shared.resetTunnels() do { + NodesStore.shared.lastError = nil if let error = await NodeServiceManager.start() { NodesStore.shared.lastError = "Node service start failed: \(error)" } diff --git a/apps/macos/Sources/Clawdbot/GatewayProcessManager.swift b/apps/macos/Sources/Clawdbot/GatewayProcessManager.swift index 9c1761544..33156f58f 100644 --- a/apps/macos/Sources/Clawdbot/GatewayProcessManager.swift +++ b/apps/macos/Sources/Clawdbot/GatewayProcessManager.swift @@ -42,10 +42,20 @@ final class GatewayProcessManager { private var environmentRefreshTask: Task? private var lastEnvironmentRefresh: Date? private var logRefreshTask: Task? + #if DEBUG + private var testingConnection: GatewayConnection? + #endif private let logger = Logger(subsystem: "com.clawdbot", category: "gateway.process") private let logLimit = 20000 // characters to keep in-memory private let environmentRefreshMinInterval: TimeInterval = 30 + private var connection: GatewayConnection { + #if DEBUG + return self.testingConnection ?? .shared + #else + return .shared + #endif + } func setActive(_ active: Bool) { // Remote mode should never spawn a local gateway; treat as stopped. @@ -126,6 +136,10 @@ final class GatewayProcessManager { } } + func clearLastFailure() { + self.lastFailureReason = nil + } + func refreshEnvironmentStatus(force: Bool = false) { let now = Date() if !force { @@ -178,7 +192,7 @@ final class GatewayProcessManager { let hasListener = instance != nil let attemptAttach = { - try await GatewayConnection.shared.requestRaw(method: .health, timeoutMs: 2000) + try await self.connection.requestRaw(method: .health, timeoutMs: 2000) } for attempt in 0..<(hasListener ? 3 : 1) { @@ -187,6 +201,7 @@ final class GatewayProcessManager { let snap = decodeHealthSnapshot(from: data) let details = self.describe(details: instanceText, port: port, snap: snap) self.existingGatewayDetails = details + self.clearLastFailure() self.status = .attachedExisting(details: details) self.appendLog("[gateway] using existing instance: \(details)\n") self.logger.info("gateway using existing instance details=\(details)") @@ -310,9 +325,10 @@ final class GatewayProcessManager { while Date() < deadline { if !self.desiredActive { return } do { - _ = try await GatewayConnection.shared.requestRaw(method: .health, timeoutMs: 1500) + _ = try await self.connection.requestRaw(method: .health, timeoutMs: 1500) let instance = await PortGuardian.shared.describe(port: port) let details = instance.map { "pid \($0.pid)" } + self.clearLastFailure() self.status = .running(details: details) self.logger.info("gateway started details=\(details ?? "ok")") self.refreshControlChannelIfNeeded(reason: "gateway started") @@ -352,7 +368,8 @@ final class GatewayProcessManager { while Date() < deadline { if !self.desiredActive { return false } do { - _ = try await GatewayConnection.shared.requestRaw(method: .health, timeoutMs: 1500) + _ = try await self.connection.requestRaw(method: .health, timeoutMs: 1500) + self.clearLastFailure() return true } catch { try? await Task.sleep(nanoseconds: 300_000_000) @@ -385,3 +402,19 @@ final class GatewayProcessManager { return String(text.suffix(limit)) } } + +#if DEBUG +extension GatewayProcessManager { + func setTestingConnection(_ connection: GatewayConnection?) { + self.testingConnection = connection + } + + func setTestingDesiredActive(_ active: Bool) { + self.desiredActive = active + } + + func setTestingLastFailureReason(_ reason: String?) { + self.lastFailureReason = reason + } +} +#endif diff --git a/apps/macos/Tests/ClawdbotIPCTests/GatewayProcessManagerTests.swift b/apps/macos/Tests/ClawdbotIPCTests/GatewayProcessManagerTests.swift new file mode 100644 index 000000000..18e529389 --- /dev/null +++ b/apps/macos/Tests/ClawdbotIPCTests/GatewayProcessManagerTests.swift @@ -0,0 +1,146 @@ +import Foundation +import os +import Testing +@testable import Clawdbot + +@Suite(.serialized) +@MainActor +struct GatewayProcessManagerTests { + private final class FakeWebSocketTask: WebSocketTasking, @unchecked Sendable { + private let connectRequestID = OSAllocatedUnfairLock(initialState: nil) + private let pendingReceiveHandler = + OSAllocatedUnfairLock<(@Sendable (Result) + -> Void)?>(initialState: nil) + private let cancelCount = OSAllocatedUnfairLock(initialState: 0) + private let sendCount = OSAllocatedUnfairLock(initialState: 0) + + var state: URLSessionTask.State = .suspended + + func resume() { + self.state = .running + } + + func cancel(with closeCode: URLSessionWebSocketTask.CloseCode, reason: Data?) { + _ = (closeCode, reason) + self.state = .canceling + self.cancelCount.withLock { $0 += 1 } + let handler = self.pendingReceiveHandler.withLock { handler in + defer { handler = nil } + return handler + } + handler?(Result.failure(URLError(.cancelled))) + } + + func send(_ message: URLSessionWebSocketTask.Message) async throws { + let currentSendCount = self.sendCount.withLock { count in + defer { count += 1 } + return count + } + + if currentSendCount == 0 { + guard case let .data(data) = message else { return } + if let obj = try? JSONSerialization.jsonObject(with: data) as? [String: Any], + (obj["type"] as? String) == "req", + (obj["method"] as? String) == "connect", + let id = obj["id"] as? String + { + self.connectRequestID.withLock { $0 = id } + } + return + } + + guard case let .data(data) = message else { return } + guard + let obj = try? JSONSerialization.jsonObject(with: data) as? [String: Any], + (obj["type"] as? String) == "req", + let id = obj["id"] as? String + else { + return + } + + let response = Self.responseData(id: id) + let handler = self.pendingReceiveHandler.withLock { $0 } + handler?(Result.success(.data(response))) + } + + func receive() async throws -> URLSessionWebSocketTask.Message { + let id = self.connectRequestID.withLock { $0 } ?? "connect" + return .data(Self.connectOkData(id: id)) + } + + func receive( + completionHandler: @escaping @Sendable (Result) -> Void) + { + self.pendingReceiveHandler.withLock { $0 = completionHandler } + } + + private static func connectOkData(id: String) -> Data { + let json = """ + { + "type": "res", + "id": "\(id)", + "ok": true, + "payload": { + "type": "hello-ok", + "protocol": 2, + "server": { "version": "test", "connId": "test" }, + "features": { "methods": [], "events": [] }, + "snapshot": { + "presence": [ { "ts": 1 } ], + "health": {}, + "stateVersion": { "presence": 0, "health": 0 }, + "uptimeMs": 0 + }, + "policy": { "maxPayload": 1, "maxBufferedBytes": 1, "tickIntervalMs": 30000 } + } + } + """ + return Data(json.utf8) + } + + private static func responseData(id: String) -> Data { + let json = """ + { + "type": "res", + "id": "\(id)", + "ok": true, + "payload": { "ok": true } + } + """ + return Data(json.utf8) + } + } + + private final class FakeWebSocketSession: WebSocketSessioning, @unchecked Sendable { + private let tasks = OSAllocatedUnfairLock(initialState: [FakeWebSocketTask]()) + + func makeWebSocketTask(url: URL) -> WebSocketTaskBox { + _ = url + let task = FakeWebSocketTask() + self.tasks.withLock { $0.append(task) } + return WebSocketTaskBox(task: task) + } + } + + @Test func clearsLastFailureWhenHealthSucceeds() async { + let session = FakeWebSocketSession() + let url = URL(string: "ws://example.invalid")! + let connection = GatewayConnection( + configProvider: { (url: url, token: nil, password: nil) }, + sessionBox: WebSocketSessionBox(session: session)) + + let manager = GatewayProcessManager.shared + manager.setTestingConnection(connection) + manager.setTestingDesiredActive(true) + manager.setTestingLastFailureReason("health failed") + defer { + manager.setTestingConnection(nil) + manager.setTestingDesiredActive(false) + manager.setTestingLastFailureReason(nil) + } + + let ready = await manager.waitForGatewayReady(timeout: 0.5) + #expect(ready) + #expect(manager.lastFailureReason == nil) + } +}