From 293b4960f39b90aebd5bcdde58af5340af516939 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Mon, 8 Dec 2025 21:50:51 +0100 Subject: [PATCH] macos: use control channel for health and heartbeat --- .../Sources/Clawdis/ControlChannel.swift | 273 ++++++++++++++++++ .../Sources/Clawdis/GeneralSettings.swift | 59 +++- apps/macos/Sources/Clawdis/HealthStore.swift | 54 ++-- .../Sources/Clawdis/HeartbeatStore.swift | 15 +- apps/macos/Sources/Clawdis/MenuBar.swift | 5 + 5 files changed, 350 insertions(+), 56 deletions(-) create mode 100644 apps/macos/Sources/Clawdis/ControlChannel.swift diff --git a/apps/macos/Sources/Clawdis/ControlChannel.swift b/apps/macos/Sources/Clawdis/ControlChannel.swift new file mode 100644 index 000000000..07859d5a9 --- /dev/null +++ b/apps/macos/Sources/Clawdis/ControlChannel.swift @@ -0,0 +1,273 @@ +import Foundation +import Network +import OSLog +import Darwin + +struct ControlHeartbeatEvent: Codable { + let ts: Double + let status: String + let to: String? + let preview: String? + let durationMs: Double? + let hasMedia: Bool? + let reason: String? +} + +struct ControlHealthSnapshot: Codable { + struct Web: Codable { + let linked: Bool + let authAgeMs: Double? + let connect: Connect? + + struct Connect: Codable { + let ok: Bool + let status: Int? + let error: String? + let elapsedMs: Double? + } + } + + struct Sessions: Codable { + struct Entry: Codable { + let key: String + let updatedAt: Double? + let age: Double? + } + let path: String + let count: Int + let recent: [Entry] + } + + struct IPC: Codable { + let path: String + let exists: Bool + } + + let ts: Double + let durationMs: Double + let web: Web + let heartbeatSeconds: Int + let sessions: Sessions + let ipc: IPC +} + +enum ControlChannelError: Error, LocalizedError { + case disconnected + case badResponse(String) + case sshFailed(String) + + var errorDescription: String? { + switch self { + case .disconnected: return "Control channel disconnected" + case let .badResponse(msg): return msg + case let .sshFailed(msg): return "SSH tunnel failed: \(msg)" + } + } +} + +@MainActor +final class ControlChannel: ObservableObject { + static let shared = ControlChannel() + + enum Mode: Equatable { + case local + case remote(target: String, identity: String) + } + + private let logger = Logger(subsystem: "com.steipete.clawdis", category: "control") + private var connection: NWConnection? + private var sshProcess: Process? + private var buffer = Data() + private var pending: [String: CheckedContinuation] = [:] + private var listenTask: Task? + private var mode: Mode = .local + private var localPort: UInt16 = 18789 + + func configure(mode: Mode) async throws { + if mode == self.mode, self.connection != nil { return } + await self.disconnect() + self.mode = mode + try await self.connect() + } + + func disconnect() async { + self.listenTask?.cancel() + self.listenTask = nil + if let conn = self.connection { + conn.cancel() + } + self.connection = nil + if let ssh = self.sshProcess, ssh.isRunning { ssh.terminate() } + self.sshProcess = nil + for (_, cont) in self.pending { + cont.resume(throwing: ControlChannelError.disconnected) + } + self.pending.removeAll() + } + + func health(timeout: TimeInterval? = nil) async throws -> Data { + try await self.ensureConnected() + let payload = try await self.request(method: "health", params: timeout.map { ["timeoutMs": Int($0 * 1000)] }) + return payload + } + + private func request(method: String, params: [String: Any]? = nil) async throws -> Data { + try await self.ensureConnected() + let id = UUID().uuidString + var frame: [String: Any] = ["type": "request", "id": id, "method": method] + if let params { frame["params"] = params } + let data = try JSONSerialization.data(withJSONObject: frame) + try await self.send(data) + return try await withCheckedThrowingContinuation { (cont: CheckedContinuation) in + self.pending[id] = cont + } + } + + private func ensureConnected() async throws { + if let conn = self.connection { + switch conn.state { + case .ready: return + default: break + } + } + try await self.connect() + } + + private func connect() async throws { + switch self.mode { + case .local: + self.localPort = 18789 + case let .remote(target, identity): + self.localPort = try self.startSSHTunnel(target: target, identity: identity) + } + + let host = NWEndpoint.Host("127.0.0.1") + let port = NWEndpoint.Port(rawValue: self.localPort)! + let conn = NWConnection(host: host, port: port, using: .tcp) + self.connection = conn + + try await withCheckedThrowingContinuation { (cont: CheckedContinuation) in + conn.stateUpdateHandler = { state in + switch state { + case .ready: + cont.resume(returning: ()) + case let .failed(err): + cont.resume(throwing: err) + case let .waiting(err): + cont.resume(throwing: err) + default: + break + } + } + conn.start(queue: .global()) + } + + self.listenTask = Task.detached { [weak self] in + await self?.listen() + } + } + + private func startSSHTunnel(target: String, identity: String) throws -> UInt16 { + let localPort = Self.pickAvailablePort() + let proc = Process() + proc.executableURL = URL(fileURLWithPath: "/usr/bin/ssh") + var args: [String] = ["-o", "BatchMode=yes", "-o", "ExitOnForwardFailure=yes", "-L", "\(localPort):127.0.0.1:18789", target] + if !identity.trimmingCharacters(in: .whitespacesAndNewlines).isEmpty { + args.insert(contentsOf: ["-i", identity], at: 2) + } + proc.arguments = args + proc.standardInput = nil + proc.standardOutput = Pipe() + proc.standardError = Pipe() + try proc.run() + self.sshProcess = proc + return localPort + } + + private func send(_ data: Data) async throws { + guard let conn = self.connection else { throw ControlChannelError.disconnected } + let line = data + Data([0x0A]) + try await withCheckedThrowingContinuation { (cont: CheckedContinuation) in + conn.send(content: line, completion: .contentProcessed { error in + if let error { cont.resume(throwing: error) } + else { cont.resume(returning: ()) } + }) + } + } + + private func listen() async { + guard let conn = self.connection else { return } + while true { + let result: (Data?, Bool, NWError?) = await withCheckedContinuation { cont in + conn.receiveMessage { data, _, isComplete, error in + cont.resume(returning: (data, isComplete, error)) + } + } + + let (data, isComplete, error) = result + if let error { + self.logger.debug("control receive error: \(error.localizedDescription, privacy: .public)") + break + } + if isComplete { break } + guard let data else { continue } + self.buffer.append(data) + while let range = buffer.firstRange(of: Data([0x0A])) { + let lineData = buffer.subdata(in: buffer.startIndex.. UInt16 { + var port: UInt16 = 0 + let socket = socket(AF_INET, SOCK_STREAM, 0) + defer { close(socket) } + var addr = sockaddr_in() + addr.sin_family = sa_family_t(AF_INET) + addr.sin_port = in_port_t(0).bigEndian + addr.sin_addr = in_addr(s_addr: inet_addr("127.0.0.1")) + _ = withUnsafePointer(to: &addr) { + $0.withMemoryRebound(to: sockaddr.self, capacity: 1) { + bind(socket, $0, socklen_t(MemoryLayout.size)) + } + } + var len = socklen_t(MemoryLayout.size) + getsockname(socket, withUnsafeMutablePointer(to: &addr) { + $0.withMemoryRebound(to: sockaddr.self, capacity: 1) { $0 } + }, &len) + port = UInt16(bigEndian: addr.sin_port) + return port + } +} + +extension Notification.Name { + static let controlHeartbeat = Notification.Name("clawdis.control.heartbeat") +} diff --git a/apps/macos/Sources/Clawdis/GeneralSettings.swift b/apps/macos/Sources/Clawdis/GeneralSettings.swift index 1ae28596b..53c3dec1a 100644 --- a/apps/macos/Sources/Clawdis/GeneralSettings.swift +++ b/apps/macos/Sources/Clawdis/GeneralSettings.swift @@ -313,25 +313,54 @@ extension GeneralSettings { @MainActor private func testRemote() async { self.remoteStatus = .checking - let command = CommandResolver.clawdisCommand(subcommand: "status", extraArgs: ["--json"]) - let response = await ShellRunner.run(command: command, cwd: nil, env: nil, timeout: 10) - if response.ok { - self.remoteStatus = .ok + let settings = CommandResolver.connectionSettings() + guard !settings.target.isEmpty else { + self.remoteStatus = .failed("Set an SSH target first") return } - let msg: String - if let payload = response.payload, - let text = String(data: payload, encoding: .utf8), - !text.trimmingCharacters(in: .whitespacesAndNewlines).isEmpty - { - msg = text.trimmingCharacters(in: .whitespacesAndNewlines) - } else if let message = response.message, !message.isEmpty { - msg = message - } else { - msg = "Remote status failed (is clawdis on PATH on the remote host?)" + // Step 1: basic SSH reachability check + let sshResult = await ShellRunner.run( + command: Self.sshCheckCommand(target: settings.target, identity: settings.identity), + cwd: nil, + env: nil, + timeout: 8) + + guard sshResult.ok else { + let msg = sshResult.message ?? "SSH check failed" + self.remoteStatus = .failed(msg) + return } - self.remoteStatus = .failed(msg) + + // Step 2: control channel health over tunnel + let originalMode = AppStateStore.shared.connectionMode + do { + try await ControlChannel.shared.configure(mode: .remote(target: settings.target, identity: settings.identity)) + let data = try await ControlChannel.shared.health(timeout: 10) + if decodeHealthSnapshot(from: data) != nil { + self.remoteStatus = .ok + } else { + self.remoteStatus = .failed("Control channel returned invalid health JSON") + } + } catch { + self.remoteStatus = .failed(error.localizedDescription) + } + + // Restore original mode if we temporarily switched + if originalMode != .remote { + let restoreMode: ControlChannel.Mode = .local + try? await ControlChannel.shared.configure(mode: restoreMode) + } + } + + private static func sshCheckCommand(target: String, identity: String) -> [String] { + var args: [String] = ["/usr/bin/ssh", "-o", "BatchMode=yes", "-o", "ConnectTimeout=5"] + if !identity.trimmingCharacters(in: .whitespacesAndNewlines).isEmpty { + args.append(contentsOf: ["-i", identity]) + } + args.append(target) + args.append("echo ok") + return args } private func revealLogs() { diff --git a/apps/macos/Sources/Clawdis/HealthStore.swift b/apps/macos/Sources/Clawdis/HealthStore.swift index baa923028..7b576c3fd 100644 --- a/apps/macos/Sources/Clawdis/HealthStore.swift +++ b/apps/macos/Sources/Clawdis/HealthStore.swift @@ -1,6 +1,7 @@ import Foundation import OSLog import SwiftUI +import Network struct HealthSnapshot: Codable, Sendable { struct Web: Codable, Sendable { @@ -96,43 +97,30 @@ final class HealthStore: ObservableObject { self.isRefreshing = true defer { self.isRefreshing = false } - guard CommandResolver.hasAnyClawdisInvoker() else { - self.lastError = "clawdis CLI not found; install deps in the configured project root or add it to PATH" - if onDemand { self.snapshot = nil } - return - } - - var env = ProcessInfo.processInfo.environment - env["PATH"] = CommandResolver.preferredPaths().joined(separator: ":") - - let response = await ShellRunner.run( - command: CommandResolver.clawdisCommand(subcommand: "health", extraArgs: ["--json"]), - cwd: CommandResolver.projectRootPath(), - env: env, - timeout: 15) - - if let data = response.payload, !data.isEmpty { - if let decoded = decodeHealthSnapshot(from: data) { - self.snapshot = decoded - if response.ok { - self.lastSuccess = Date() - self.lastError = nil - } else { - self.lastError = self.describeFailure(from: decoded, fallback: response.message) - } - return + do { + let mode = AppStateStore.shared.connectionMode + switch mode { + case .local: + try await ControlChannel.shared.configure(mode: .local) + case .remote: + let target = AppStateStore.shared.remoteTarget + let identity = AppStateStore.shared.remoteIdentity + try await ControlChannel.shared.configure(mode: .remote(target: target, identity: identity)) } - let text = String(data: data, encoding: .utf8)?.trimmingCharacters(in: .whitespacesAndNewlines) ?? "" - let snippet = String(text.prefix(220)) - Self.logger.error("health decode failed; payload=\(snippet, privacy: .public)") - self.lastError = snippet.isEmpty ? (response.message ?? "health probe failed") : "health output not JSON: \(snippet)" + let data = try await ControlChannel.shared.health(timeout: 15) + if let decoded = decodeHealthSnapshot(from: data) { + self.snapshot = decoded + self.lastSuccess = Date() + self.lastError = nil + } else { + self.lastError = "health output not JSON" + if onDemand { self.snapshot = nil } + } + } catch { + self.lastError = error.localizedDescription if onDemand { self.snapshot = nil } - return } - - self.lastError = response.message ?? "health probe failed" - if onDemand { self.snapshot = nil } } var state: HealthState { diff --git a/apps/macos/Sources/Clawdis/HeartbeatStore.swift b/apps/macos/Sources/Clawdis/HeartbeatStore.swift index afe0948dd..84d00891b 100644 --- a/apps/macos/Sources/Clawdis/HeartbeatStore.swift +++ b/apps/macos/Sources/Clawdis/HeartbeatStore.swift @@ -5,21 +5,20 @@ import SwiftUI final class HeartbeatStore: ObservableObject { static let shared = HeartbeatStore() - @Published private(set) var lastEvent: AgentRPC.HeartbeatEvent? + @Published private(set) var lastEvent: ControlHeartbeatEvent? private var observer: NSObjectProtocol? private init() { self.observer = NotificationCenter.default.addObserver( - forName: AgentRPC.heartbeatNotification, + forName: .controlHeartbeat, object: nil, - queue: .main - ) { [weak self] note in - guard let event = note.object as? AgentRPC.HeartbeatEvent else { return } - Task { @MainActor in - self?.lastEvent = event + queue: .main) { [weak self] note in + guard let data = note.object as? Data else { return } + if let decoded = try? JSONDecoder().decode(ControlHeartbeatEvent.self, from: data) { + Task { @MainActor in self?.lastEvent = decoded } + } } - } } @MainActor diff --git a/apps/macos/Sources/Clawdis/MenuBar.swift b/apps/macos/Sources/Clawdis/MenuBar.swift index e3f2e7a69..3b786abf2 100644 --- a/apps/macos/Sources/Clawdis/MenuBar.swift +++ b/apps/macos/Sources/Clawdis/MenuBar.swift @@ -6,6 +6,7 @@ import MenuBarExtraAccess import OSLog import Security import SwiftUI +import Network @main struct ClawdisApp: App { @@ -661,6 +662,10 @@ final class AppDelegate: NSObject, NSApplicationDelegate, NSXPCListenerDelegate RelayProcessManager.shared.setActive(!state.isPaused) } Task { + let controlMode: ControlChannel.Mode = AppStateStore.shared.connectionMode == .remote + ? .remote(target: AppStateStore.shared.remoteTarget, identity: AppStateStore.shared.remoteIdentity) + : .local + try? await ControlChannel.shared.configure(mode: controlMode) try? await AgentRPC.shared.start() _ = await AgentRPC.shared.setHeartbeatsEnabled(AppStateStore.shared.heartbeatsEnabled) }