macos: use control channel for health and heartbeat
This commit is contained in:
273
apps/macos/Sources/Clawdis/ControlChannel.swift
Normal file
273
apps/macos/Sources/Clawdis/ControlChannel.swift
Normal file
@@ -0,0 +1,273 @@
|
||||
import Foundation
|
||||
import Network
|
||||
import OSLog
|
||||
import Darwin
|
||||
|
||||
struct ControlHeartbeatEvent: Codable {
|
||||
let ts: Double
|
||||
let status: String
|
||||
let to: String?
|
||||
let preview: String?
|
||||
let durationMs: Double?
|
||||
let hasMedia: Bool?
|
||||
let reason: String?
|
||||
}
|
||||
|
||||
struct ControlHealthSnapshot: Codable {
|
||||
struct Web: Codable {
|
||||
let linked: Bool
|
||||
let authAgeMs: Double?
|
||||
let connect: Connect?
|
||||
|
||||
struct Connect: Codable {
|
||||
let ok: Bool
|
||||
let status: Int?
|
||||
let error: String?
|
||||
let elapsedMs: Double?
|
||||
}
|
||||
}
|
||||
|
||||
struct Sessions: Codable {
|
||||
struct Entry: Codable {
|
||||
let key: String
|
||||
let updatedAt: Double?
|
||||
let age: Double?
|
||||
}
|
||||
let path: String
|
||||
let count: Int
|
||||
let recent: [Entry]
|
||||
}
|
||||
|
||||
struct IPC: Codable {
|
||||
let path: String
|
||||
let exists: Bool
|
||||
}
|
||||
|
||||
let ts: Double
|
||||
let durationMs: Double
|
||||
let web: Web
|
||||
let heartbeatSeconds: Int
|
||||
let sessions: Sessions
|
||||
let ipc: IPC
|
||||
}
|
||||
|
||||
enum ControlChannelError: Error, LocalizedError {
|
||||
case disconnected
|
||||
case badResponse(String)
|
||||
case sshFailed(String)
|
||||
|
||||
var errorDescription: String? {
|
||||
switch self {
|
||||
case .disconnected: return "Control channel disconnected"
|
||||
case let .badResponse(msg): return msg
|
||||
case let .sshFailed(msg): return "SSH tunnel failed: \(msg)"
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@MainActor
|
||||
final class ControlChannel: ObservableObject {
|
||||
static let shared = ControlChannel()
|
||||
|
||||
enum Mode: Equatable {
|
||||
case local
|
||||
case remote(target: String, identity: String)
|
||||
}
|
||||
|
||||
private let logger = Logger(subsystem: "com.steipete.clawdis", category: "control")
|
||||
private var connection: NWConnection?
|
||||
private var sshProcess: Process?
|
||||
private var buffer = Data()
|
||||
private var pending: [String: CheckedContinuation<Data, Error>] = [:]
|
||||
private var listenTask: Task<Void, Never>?
|
||||
private var mode: Mode = .local
|
||||
private var localPort: UInt16 = 18789
|
||||
|
||||
func configure(mode: Mode) async throws {
|
||||
if mode == self.mode, self.connection != nil { return }
|
||||
await self.disconnect()
|
||||
self.mode = mode
|
||||
try await self.connect()
|
||||
}
|
||||
|
||||
func disconnect() async {
|
||||
self.listenTask?.cancel()
|
||||
self.listenTask = nil
|
||||
if let conn = self.connection {
|
||||
conn.cancel()
|
||||
}
|
||||
self.connection = nil
|
||||
if let ssh = self.sshProcess, ssh.isRunning { ssh.terminate() }
|
||||
self.sshProcess = nil
|
||||
for (_, cont) in self.pending {
|
||||
cont.resume(throwing: ControlChannelError.disconnected)
|
||||
}
|
||||
self.pending.removeAll()
|
||||
}
|
||||
|
||||
func health(timeout: TimeInterval? = nil) async throws -> Data {
|
||||
try await self.ensureConnected()
|
||||
let payload = try await self.request(method: "health", params: timeout.map { ["timeoutMs": Int($0 * 1000)] })
|
||||
return payload
|
||||
}
|
||||
|
||||
private func request(method: String, params: [String: Any]? = nil) async throws -> Data {
|
||||
try await self.ensureConnected()
|
||||
let id = UUID().uuidString
|
||||
var frame: [String: Any] = ["type": "request", "id": id, "method": method]
|
||||
if let params { frame["params"] = params }
|
||||
let data = try JSONSerialization.data(withJSONObject: frame)
|
||||
try await self.send(data)
|
||||
return try await withCheckedThrowingContinuation { (cont: CheckedContinuation<Data, Error>) in
|
||||
self.pending[id] = cont
|
||||
}
|
||||
}
|
||||
|
||||
private func ensureConnected() async throws {
|
||||
if let conn = self.connection {
|
||||
switch conn.state {
|
||||
case .ready: return
|
||||
default: break
|
||||
}
|
||||
}
|
||||
try await self.connect()
|
||||
}
|
||||
|
||||
private func connect() async throws {
|
||||
switch self.mode {
|
||||
case .local:
|
||||
self.localPort = 18789
|
||||
case let .remote(target, identity):
|
||||
self.localPort = try self.startSSHTunnel(target: target, identity: identity)
|
||||
}
|
||||
|
||||
let host = NWEndpoint.Host("127.0.0.1")
|
||||
let port = NWEndpoint.Port(rawValue: self.localPort)!
|
||||
let conn = NWConnection(host: host, port: port, using: .tcp)
|
||||
self.connection = conn
|
||||
|
||||
try await withCheckedThrowingContinuation { (cont: CheckedContinuation<Void, Error>) in
|
||||
conn.stateUpdateHandler = { state in
|
||||
switch state {
|
||||
case .ready:
|
||||
cont.resume(returning: ())
|
||||
case let .failed(err):
|
||||
cont.resume(throwing: err)
|
||||
case let .waiting(err):
|
||||
cont.resume(throwing: err)
|
||||
default:
|
||||
break
|
||||
}
|
||||
}
|
||||
conn.start(queue: .global())
|
||||
}
|
||||
|
||||
self.listenTask = Task.detached { [weak self] in
|
||||
await self?.listen()
|
||||
}
|
||||
}
|
||||
|
||||
private func startSSHTunnel(target: String, identity: String) throws -> UInt16 {
|
||||
let localPort = Self.pickAvailablePort()
|
||||
let proc = Process()
|
||||
proc.executableURL = URL(fileURLWithPath: "/usr/bin/ssh")
|
||||
var args: [String] = ["-o", "BatchMode=yes", "-o", "ExitOnForwardFailure=yes", "-L", "\(localPort):127.0.0.1:18789", target]
|
||||
if !identity.trimmingCharacters(in: .whitespacesAndNewlines).isEmpty {
|
||||
args.insert(contentsOf: ["-i", identity], at: 2)
|
||||
}
|
||||
proc.arguments = args
|
||||
proc.standardInput = nil
|
||||
proc.standardOutput = Pipe()
|
||||
proc.standardError = Pipe()
|
||||
try proc.run()
|
||||
self.sshProcess = proc
|
||||
return localPort
|
||||
}
|
||||
|
||||
private func send(_ data: Data) async throws {
|
||||
guard let conn = self.connection else { throw ControlChannelError.disconnected }
|
||||
let line = data + Data([0x0A])
|
||||
try await withCheckedThrowingContinuation { (cont: CheckedContinuation<Void, Error>) in
|
||||
conn.send(content: line, completion: .contentProcessed { error in
|
||||
if let error { cont.resume(throwing: error) }
|
||||
else { cont.resume(returning: ()) }
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
private func listen() async {
|
||||
guard let conn = self.connection else { return }
|
||||
while true {
|
||||
let result: (Data?, Bool, NWError?) = await withCheckedContinuation { cont in
|
||||
conn.receiveMessage { data, _, isComplete, error in
|
||||
cont.resume(returning: (data, isComplete, error))
|
||||
}
|
||||
}
|
||||
|
||||
let (data, isComplete, error) = result
|
||||
if let error {
|
||||
self.logger.debug("control receive error: \(error.localizedDescription, privacy: .public)")
|
||||
break
|
||||
}
|
||||
if isComplete { break }
|
||||
guard let data else { continue }
|
||||
self.buffer.append(data)
|
||||
while let range = buffer.firstRange(of: Data([0x0A])) {
|
||||
let lineData = buffer.subdata(in: buffer.startIndex..<range.lowerBound)
|
||||
buffer.removeSubrange(buffer.startIndex...range.lowerBound)
|
||||
self.handleLine(lineData)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private func handleLine(_ data: Data) {
|
||||
guard let obj = try? JSONSerialization.jsonObject(with: data) as? [String: Any],
|
||||
let type = obj["type"] as? String else { return }
|
||||
|
||||
if type == "event", let event = obj["event"] as? String {
|
||||
if event == "heartbeat", let payload = obj["payload"] {
|
||||
if let payloadData = try? JSONSerialization.data(withJSONObject: payload) {
|
||||
NotificationCenter.default.post(name: .controlHeartbeat, object: payloadData)
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
if type == "response", let id = obj["id"] as? String {
|
||||
let ok = obj["ok"] as? Bool ?? false
|
||||
if ok, let payload = obj["payload"] {
|
||||
let payloadData = (try? JSONSerialization.data(withJSONObject: payload)) ?? Data()
|
||||
self.pending[id]?.resume(returning: payloadData)
|
||||
} else {
|
||||
let err = (obj["error"] as? String) ?? "control error"
|
||||
self.pending[id]?.resume(throwing: ControlChannelError.badResponse(err))
|
||||
}
|
||||
self.pending.removeValue(forKey: id)
|
||||
}
|
||||
}
|
||||
|
||||
private static func pickAvailablePort() -> UInt16 {
|
||||
var port: UInt16 = 0
|
||||
let socket = socket(AF_INET, SOCK_STREAM, 0)
|
||||
defer { close(socket) }
|
||||
var addr = sockaddr_in()
|
||||
addr.sin_family = sa_family_t(AF_INET)
|
||||
addr.sin_port = in_port_t(0).bigEndian
|
||||
addr.sin_addr = in_addr(s_addr: inet_addr("127.0.0.1"))
|
||||
_ = withUnsafePointer(to: &addr) {
|
||||
$0.withMemoryRebound(to: sockaddr.self, capacity: 1) {
|
||||
bind(socket, $0, socklen_t(MemoryLayout<sockaddr_in>.size))
|
||||
}
|
||||
}
|
||||
var len = socklen_t(MemoryLayout<sockaddr_in>.size)
|
||||
getsockname(socket, withUnsafeMutablePointer(to: &addr) {
|
||||
$0.withMemoryRebound(to: sockaddr.self, capacity: 1) { $0 }
|
||||
}, &len)
|
||||
port = UInt16(bigEndian: addr.sin_port)
|
||||
return port
|
||||
}
|
||||
}
|
||||
|
||||
extension Notification.Name {
|
||||
static let controlHeartbeat = Notification.Name("clawdis.control.heartbeat")
|
||||
}
|
||||
@@ -313,25 +313,54 @@ extension GeneralSettings {
|
||||
@MainActor
|
||||
private func testRemote() async {
|
||||
self.remoteStatus = .checking
|
||||
let command = CommandResolver.clawdisCommand(subcommand: "status", extraArgs: ["--json"])
|
||||
let response = await ShellRunner.run(command: command, cwd: nil, env: nil, timeout: 10)
|
||||
if response.ok {
|
||||
self.remoteStatus = .ok
|
||||
let settings = CommandResolver.connectionSettings()
|
||||
guard !settings.target.isEmpty else {
|
||||
self.remoteStatus = .failed("Set an SSH target first")
|
||||
return
|
||||
}
|
||||
|
||||
let msg: String
|
||||
if let payload = response.payload,
|
||||
let text = String(data: payload, encoding: .utf8),
|
||||
!text.trimmingCharacters(in: .whitespacesAndNewlines).isEmpty
|
||||
{
|
||||
msg = text.trimmingCharacters(in: .whitespacesAndNewlines)
|
||||
} else if let message = response.message, !message.isEmpty {
|
||||
msg = message
|
||||
} else {
|
||||
msg = "Remote status failed (is clawdis on PATH on the remote host?)"
|
||||
// Step 1: basic SSH reachability check
|
||||
let sshResult = await ShellRunner.run(
|
||||
command: Self.sshCheckCommand(target: settings.target, identity: settings.identity),
|
||||
cwd: nil,
|
||||
env: nil,
|
||||
timeout: 8)
|
||||
|
||||
guard sshResult.ok else {
|
||||
let msg = sshResult.message ?? "SSH check failed"
|
||||
self.remoteStatus = .failed(msg)
|
||||
return
|
||||
}
|
||||
self.remoteStatus = .failed(msg)
|
||||
|
||||
// Step 2: control channel health over tunnel
|
||||
let originalMode = AppStateStore.shared.connectionMode
|
||||
do {
|
||||
try await ControlChannel.shared.configure(mode: .remote(target: settings.target, identity: settings.identity))
|
||||
let data = try await ControlChannel.shared.health(timeout: 10)
|
||||
if decodeHealthSnapshot(from: data) != nil {
|
||||
self.remoteStatus = .ok
|
||||
} else {
|
||||
self.remoteStatus = .failed("Control channel returned invalid health JSON")
|
||||
}
|
||||
} catch {
|
||||
self.remoteStatus = .failed(error.localizedDescription)
|
||||
}
|
||||
|
||||
// Restore original mode if we temporarily switched
|
||||
if originalMode != .remote {
|
||||
let restoreMode: ControlChannel.Mode = .local
|
||||
try? await ControlChannel.shared.configure(mode: restoreMode)
|
||||
}
|
||||
}
|
||||
|
||||
private static func sshCheckCommand(target: String, identity: String) -> [String] {
|
||||
var args: [String] = ["/usr/bin/ssh", "-o", "BatchMode=yes", "-o", "ConnectTimeout=5"]
|
||||
if !identity.trimmingCharacters(in: .whitespacesAndNewlines).isEmpty {
|
||||
args.append(contentsOf: ["-i", identity])
|
||||
}
|
||||
args.append(target)
|
||||
args.append("echo ok")
|
||||
return args
|
||||
}
|
||||
|
||||
private func revealLogs() {
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
import Foundation
|
||||
import OSLog
|
||||
import SwiftUI
|
||||
import Network
|
||||
|
||||
struct HealthSnapshot: Codable, Sendable {
|
||||
struct Web: Codable, Sendable {
|
||||
@@ -96,43 +97,30 @@ final class HealthStore: ObservableObject {
|
||||
self.isRefreshing = true
|
||||
defer { self.isRefreshing = false }
|
||||
|
||||
guard CommandResolver.hasAnyClawdisInvoker() else {
|
||||
self.lastError = "clawdis CLI not found; install deps in the configured project root or add it to PATH"
|
||||
if onDemand { self.snapshot = nil }
|
||||
return
|
||||
}
|
||||
|
||||
var env = ProcessInfo.processInfo.environment
|
||||
env["PATH"] = CommandResolver.preferredPaths().joined(separator: ":")
|
||||
|
||||
let response = await ShellRunner.run(
|
||||
command: CommandResolver.clawdisCommand(subcommand: "health", extraArgs: ["--json"]),
|
||||
cwd: CommandResolver.projectRootPath(),
|
||||
env: env,
|
||||
timeout: 15)
|
||||
|
||||
if let data = response.payload, !data.isEmpty {
|
||||
if let decoded = decodeHealthSnapshot(from: data) {
|
||||
self.snapshot = decoded
|
||||
if response.ok {
|
||||
self.lastSuccess = Date()
|
||||
self.lastError = nil
|
||||
} else {
|
||||
self.lastError = self.describeFailure(from: decoded, fallback: response.message)
|
||||
}
|
||||
return
|
||||
do {
|
||||
let mode = AppStateStore.shared.connectionMode
|
||||
switch mode {
|
||||
case .local:
|
||||
try await ControlChannel.shared.configure(mode: .local)
|
||||
case .remote:
|
||||
let target = AppStateStore.shared.remoteTarget
|
||||
let identity = AppStateStore.shared.remoteIdentity
|
||||
try await ControlChannel.shared.configure(mode: .remote(target: target, identity: identity))
|
||||
}
|
||||
|
||||
let text = String(data: data, encoding: .utf8)?.trimmingCharacters(in: .whitespacesAndNewlines) ?? ""
|
||||
let snippet = String(text.prefix(220))
|
||||
Self.logger.error("health decode failed; payload=\(snippet, privacy: .public)")
|
||||
self.lastError = snippet.isEmpty ? (response.message ?? "health probe failed") : "health output not JSON: \(snippet)"
|
||||
let data = try await ControlChannel.shared.health(timeout: 15)
|
||||
if let decoded = decodeHealthSnapshot(from: data) {
|
||||
self.snapshot = decoded
|
||||
self.lastSuccess = Date()
|
||||
self.lastError = nil
|
||||
} else {
|
||||
self.lastError = "health output not JSON"
|
||||
if onDemand { self.snapshot = nil }
|
||||
}
|
||||
} catch {
|
||||
self.lastError = error.localizedDescription
|
||||
if onDemand { self.snapshot = nil }
|
||||
return
|
||||
}
|
||||
|
||||
self.lastError = response.message ?? "health probe failed"
|
||||
if onDemand { self.snapshot = nil }
|
||||
}
|
||||
|
||||
var state: HealthState {
|
||||
|
||||
@@ -5,21 +5,20 @@ import SwiftUI
|
||||
final class HeartbeatStore: ObservableObject {
|
||||
static let shared = HeartbeatStore()
|
||||
|
||||
@Published private(set) var lastEvent: AgentRPC.HeartbeatEvent?
|
||||
@Published private(set) var lastEvent: ControlHeartbeatEvent?
|
||||
|
||||
private var observer: NSObjectProtocol?
|
||||
|
||||
private init() {
|
||||
self.observer = NotificationCenter.default.addObserver(
|
||||
forName: AgentRPC.heartbeatNotification,
|
||||
forName: .controlHeartbeat,
|
||||
object: nil,
|
||||
queue: .main
|
||||
) { [weak self] note in
|
||||
guard let event = note.object as? AgentRPC.HeartbeatEvent else { return }
|
||||
Task { @MainActor in
|
||||
self?.lastEvent = event
|
||||
queue: .main) { [weak self] note in
|
||||
guard let data = note.object as? Data else { return }
|
||||
if let decoded = try? JSONDecoder().decode(ControlHeartbeatEvent.self, from: data) {
|
||||
Task { @MainActor in self?.lastEvent = decoded }
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@MainActor
|
||||
|
||||
@@ -6,6 +6,7 @@ import MenuBarExtraAccess
|
||||
import OSLog
|
||||
import Security
|
||||
import SwiftUI
|
||||
import Network
|
||||
|
||||
@main
|
||||
struct ClawdisApp: App {
|
||||
@@ -661,6 +662,10 @@ final class AppDelegate: NSObject, NSApplicationDelegate, NSXPCListenerDelegate
|
||||
RelayProcessManager.shared.setActive(!state.isPaused)
|
||||
}
|
||||
Task {
|
||||
let controlMode: ControlChannel.Mode = AppStateStore.shared.connectionMode == .remote
|
||||
? .remote(target: AppStateStore.shared.remoteTarget, identity: AppStateStore.shared.remoteIdentity)
|
||||
: .local
|
||||
try? await ControlChannel.shared.configure(mode: controlMode)
|
||||
try? await AgentRPC.shared.start()
|
||||
_ = await AgentRPC.shared.setHeartbeatsEnabled(AppStateStore.shared.heartbeatsEnabled)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user