Control: route health/heartbeat over RPC stdio
This commit is contained in:
@@ -15,12 +15,58 @@ actor AgentRPC {
|
|||||||
}
|
}
|
||||||
|
|
||||||
static let heartbeatNotification = Notification.Name("clawdis.rpc.heartbeat")
|
static let heartbeatNotification = Notification.Name("clawdis.rpc.heartbeat")
|
||||||
|
static let agentEventNotification = Notification.Name("clawdis.rpc.agent")
|
||||||
|
|
||||||
|
private struct ControlResponse: Decodable {
|
||||||
|
let type: String
|
||||||
|
let id: String
|
||||||
|
let ok: Bool
|
||||||
|
let payload: AnyCodable?
|
||||||
|
let error: String?
|
||||||
|
}
|
||||||
|
|
||||||
|
struct AnyCodable: Codable {
|
||||||
|
let value: Any
|
||||||
|
|
||||||
|
init(_ value: Any) { self.value = value }
|
||||||
|
|
||||||
|
init(from decoder: Decoder) throws {
|
||||||
|
let container = try decoder.singleValueContainer()
|
||||||
|
if let intVal = try? container.decode(Int.self) { self.value = intVal; return }
|
||||||
|
if let doubleVal = try? container.decode(Double.self) { self.value = doubleVal; return }
|
||||||
|
if let boolVal = try? container.decode(Bool.self) { self.value = boolVal; return }
|
||||||
|
if let stringVal = try? container.decode(String.self) { self.value = stringVal; return }
|
||||||
|
if container.decodeNil() { self.value = NSNull(); return }
|
||||||
|
if let dict = try? container.decode([String: AnyCodable].self) { self.value = dict; return }
|
||||||
|
if let array = try? container.decode([AnyCodable].self) { self.value = array; return }
|
||||||
|
throw DecodingError.dataCorruptedError(in: container, debugDescription: "Unsupported type")
|
||||||
|
}
|
||||||
|
|
||||||
|
func encode(to encoder: Encoder) throws {
|
||||||
|
var container = encoder.singleValueContainer()
|
||||||
|
switch self.value {
|
||||||
|
case let intVal as Int: try container.encode(intVal)
|
||||||
|
case let doubleVal as Double: try container.encode(doubleVal)
|
||||||
|
case let boolVal as Bool: try container.encode(boolVal)
|
||||||
|
case let stringVal as String: try container.encode(stringVal)
|
||||||
|
case is NSNull: try container.encodeNil()
|
||||||
|
case let dict as [String: AnyCodable]: try container.encode(dict)
|
||||||
|
case let array as [AnyCodable]: try container.encode(array)
|
||||||
|
default:
|
||||||
|
let context = EncodingError.Context(
|
||||||
|
codingPath: encoder.codingPath,
|
||||||
|
debugDescription: "Unsupported type")
|
||||||
|
throw EncodingError.invalidValue(self.value, context)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private var process: Process?
|
private var process: Process?
|
||||||
private var stdinHandle: FileHandle?
|
private var stdinHandle: FileHandle?
|
||||||
private var stdoutHandle: FileHandle?
|
private var stdoutHandle: FileHandle?
|
||||||
private var buffer = Data()
|
private var buffer = Data()
|
||||||
private var waiters: [CheckedContinuation<String, Error>] = []
|
private var waiters: [CheckedContinuation<String, Error>] = []
|
||||||
|
private var controlWaiters: [String: CheckedContinuation<Data, Error>] = [:]
|
||||||
private let logger = Logger(subsystem: "com.steipete.clawdis", category: "agent.rpc")
|
private let logger = Logger(subsystem: "com.steipete.clawdis", category: "agent.rpc")
|
||||||
private var starting = false
|
private var starting = false
|
||||||
|
|
||||||
@@ -127,6 +173,22 @@ actor AgentRPC {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func controlRequest(method: String, params: [String: Any]? = nil) async throws -> Data {
|
||||||
|
if self.process?.isRunning != true {
|
||||||
|
try await self.start()
|
||||||
|
}
|
||||||
|
let id = UUID().uuidString
|
||||||
|
var frame: [String: Any] = ["type": "control-request", "id": id, "method": method]
|
||||||
|
if let params { frame["params"] = params }
|
||||||
|
let data = try JSONSerialization.data(withJSONObject: frame)
|
||||||
|
guard let stdinHandle else { throw RpcError(message: "stdin missing") }
|
||||||
|
return try await withCheckedThrowingContinuation { (cont: CheckedContinuation<Data, Error>) in
|
||||||
|
self.controlWaiters[id] = cont
|
||||||
|
stdinHandle.write(data)
|
||||||
|
stdinHandle.write(Data([0x0A]))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// MARK: - Process lifecycle
|
// MARK: - Process lifecycle
|
||||||
|
|
||||||
func start() async throws {
|
func start() async throws {
|
||||||
@@ -180,6 +242,11 @@ actor AgentRPC {
|
|||||||
for waiter in waiters {
|
for waiter in waiters {
|
||||||
waiter.resume(throwing: RpcError(message: "rpc process stopped"))
|
waiter.resume(throwing: RpcError(message: "rpc process stopped"))
|
||||||
}
|
}
|
||||||
|
let control = self.controlWaiters
|
||||||
|
self.controlWaiters.removeAll()
|
||||||
|
for (_, waiter) in control {
|
||||||
|
waiter.resume(throwing: RpcError(message: "rpc process stopped"))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private func ingest(data: Data) {
|
private func ingest(data: Data) {
|
||||||
@@ -189,11 +256,11 @@ actor AgentRPC {
|
|||||||
self.buffer.removeSubrange(self.buffer.startIndex...range.lowerBound)
|
self.buffer.removeSubrange(self.buffer.startIndex...range.lowerBound)
|
||||||
guard let line = String(data: lineData, encoding: .utf8) else { continue }
|
guard let line = String(data: lineData, encoding: .utf8) else { continue }
|
||||||
|
|
||||||
// Event frames are pushed without request/response pairing (e.g., heartbeats).
|
// Event frames are pushed without request/response pairing (e.g., heartbeats/agent).
|
||||||
if let event = self.parseHeartbeatEvent(from: line) {
|
if self.handleEventLine(line) {
|
||||||
DispatchQueue.main.async {
|
continue
|
||||||
NotificationCenter.default.post(name: Self.heartbeatNotification, object: event)
|
}
|
||||||
}
|
if self.handleControlResponse(line) {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if let waiter = waiters.first {
|
if let waiter = waiters.first {
|
||||||
@@ -221,6 +288,62 @@ actor AgentRPC {
|
|||||||
return try? decoder.decode(HeartbeatEvent.self, from: payloadData)
|
return try? decoder.decode(HeartbeatEvent.self, from: payloadData)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private func parseAgentEvent(from line: String) -> ControlAgentEvent? {
|
||||||
|
guard let data = line.data(using: .utf8) else { return nil }
|
||||||
|
guard
|
||||||
|
let obj = try? JSONSerialization.jsonObject(with: data) as? [String: Any],
|
||||||
|
let type = obj["type"] as? String,
|
||||||
|
type == "event",
|
||||||
|
let evt = obj["event"] as? String,
|
||||||
|
evt == "agent",
|
||||||
|
let payload = obj["payload"]
|
||||||
|
else {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
guard let payloadData = try? JSONSerialization.data(withJSONObject: payload) else { return nil }
|
||||||
|
return try? JSONDecoder().decode(ControlAgentEvent.self, from: payloadData)
|
||||||
|
}
|
||||||
|
|
||||||
|
private func handleEventLine(_ line: String) -> Bool {
|
||||||
|
if let hb = self.parseHeartbeatEvent(from: line) {
|
||||||
|
DispatchQueue.main.async {
|
||||||
|
NotificationCenter.default.post(name: Self.heartbeatNotification, object: hb)
|
||||||
|
NotificationCenter.default.post(name: .controlHeartbeat, object: hb)
|
||||||
|
}
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
if let agent = self.parseAgentEvent(from: line) {
|
||||||
|
DispatchQueue.main.async {
|
||||||
|
NotificationCenter.default.post(name: Self.agentEventNotification, object: agent)
|
||||||
|
NotificationCenter.default.post(name: .controlAgentEvent, object: agent)
|
||||||
|
}
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
private func handleControlResponse(_ line: String) -> Bool {
|
||||||
|
guard let data = line.data(using: .utf8) else { return false }
|
||||||
|
guard let parsed = try? JSONDecoder().decode(ControlResponse.self, from: data) else { return false }
|
||||||
|
guard parsed.type == "control-response" else { return false }
|
||||||
|
guard let waiter = self.controlWaiters.removeValue(forKey: parsed.id) else {
|
||||||
|
self.logger.debug("control response with no waiter id=\(parsed.id, privacy: .public)")
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
if parsed.ok {
|
||||||
|
let payloadData: Data = if let payload = parsed.payload {
|
||||||
|
(try? JSONEncoder().encode(payload)) ?? Data()
|
||||||
|
} else {
|
||||||
|
Data()
|
||||||
|
}
|
||||||
|
waiter.resume(returning: payloadData)
|
||||||
|
} else {
|
||||||
|
waiter.resume(throwing: RpcError(message: parsed.error ?? "control error"))
|
||||||
|
}
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
private func nextLine() async throws -> String {
|
private func nextLine() async throws -> String {
|
||||||
try await withCheckedThrowingContinuation { (cont: CheckedContinuation<String, Error>) in
|
try await withCheckedThrowingContinuation { (cont: CheckedContinuation<String, Error>) in
|
||||||
self.waiters.append(cont)
|
self.waiters.append(cont)
|
||||||
|
|||||||
@@ -1,7 +1,5 @@
|
|||||||
import Foundation
|
import Foundation
|
||||||
import Network
|
|
||||||
import OSLog
|
import OSLog
|
||||||
import Darwin
|
|
||||||
|
|
||||||
struct ControlHeartbeatEvent: Codable {
|
struct ControlHeartbeatEvent: Codable {
|
||||||
let ts: Double
|
let ts: Double
|
||||||
@@ -21,10 +19,6 @@ struct ControlAgentEvent: Codable, Sendable {
|
|||||||
let data: [String: AnyCodable]
|
let data: [String: AnyCodable]
|
||||||
}
|
}
|
||||||
|
|
||||||
extension Notification.Name {
|
|
||||||
static let controlAgentEvent = Notification.Name("clawdis.control.agent")
|
|
||||||
}
|
|
||||||
|
|
||||||
struct AnyCodable: Codable, @unchecked Sendable {
|
struct AnyCodable: Codable, @unchecked Sendable {
|
||||||
let value: Any
|
let value: Any
|
||||||
|
|
||||||
@@ -32,27 +26,13 @@ struct AnyCodable: Codable, @unchecked Sendable {
|
|||||||
|
|
||||||
init(from decoder: Decoder) throws {
|
init(from decoder: Decoder) throws {
|
||||||
let container = try decoder.singleValueContainer()
|
let container = try decoder.singleValueContainer()
|
||||||
if let intVal = try? container.decode(Int.self) {
|
if let intVal = try? container.decode(Int.self) { self.value = intVal; return }
|
||||||
self.value = intVal; return
|
if let doubleVal = try? container.decode(Double.self) { self.value = doubleVal; return }
|
||||||
}
|
if let boolVal = try? container.decode(Bool.self) { self.value = boolVal; return }
|
||||||
if let doubleVal = try? container.decode(Double.self) {
|
if let stringVal = try? container.decode(String.self) { self.value = stringVal; return }
|
||||||
self.value = doubleVal; return
|
if container.decodeNil() { self.value = NSNull(); return }
|
||||||
}
|
if let dict = try? container.decode([String: AnyCodable].self) { self.value = dict; return }
|
||||||
if let boolVal = try? container.decode(Bool.self) {
|
if let array = try? container.decode([AnyCodable].self) { self.value = array; return }
|
||||||
self.value = boolVal; return
|
|
||||||
}
|
|
||||||
if let stringVal = try? container.decode(String.self) {
|
|
||||||
self.value = stringVal; return
|
|
||||||
}
|
|
||||||
if container.decodeNil() {
|
|
||||||
self.value = NSNull(); return
|
|
||||||
}
|
|
||||||
if let dict = try? container.decode([String: AnyCodable].self) {
|
|
||||||
self.value = dict; return
|
|
||||||
}
|
|
||||||
if let array = try? container.decode([AnyCodable].self) {
|
|
||||||
self.value = array; return
|
|
||||||
}
|
|
||||||
throw DecodingError.dataCorruptedError(in: container, debugDescription: "Unsupported type")
|
throw DecodingError.dataCorruptedError(in: container, debugDescription: "Unsupported type")
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -73,94 +53,14 @@ struct AnyCodable: Codable, @unchecked Sendable {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Handles single-shot continuation resumption without Sendable capture issues
|
|
||||||
actor ConnectionWaiter {
|
|
||||||
private var cont: CheckedContinuation<Void, Error>?
|
|
||||||
private var resumed = false
|
|
||||||
private var pendingResult: Result<Void, Error>?
|
|
||||||
|
|
||||||
func wait() async throws {
|
|
||||||
// Acts like a one-shot Future; if the connection resolves before wait() is called,
|
|
||||||
// stash the result so the waiter resumes immediately.
|
|
||||||
try await withCheckedThrowingContinuation { (c: CheckedContinuation<Void, Error>) in
|
|
||||||
if let pending = pendingResult {
|
|
||||||
pendingResult = nil
|
|
||||||
resumed = true
|
|
||||||
c.resume(with: pending)
|
|
||||||
} else {
|
|
||||||
cont = c
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func succeed() {
|
|
||||||
resume(.success(()))
|
|
||||||
}
|
|
||||||
|
|
||||||
func fail(_ error: Error) {
|
|
||||||
resume(.failure(error))
|
|
||||||
}
|
|
||||||
|
|
||||||
private func resume(_ result: Result<Void, Error>) {
|
|
||||||
if resumed { return }
|
|
||||||
if let c = cont {
|
|
||||||
resumed = true
|
|
||||||
cont = nil
|
|
||||||
c.resume(with: result)
|
|
||||||
} else {
|
|
||||||
pendingResult = result
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
struct ControlHealthSnapshot: Codable {
|
|
||||||
struct Web: Codable {
|
|
||||||
let linked: Bool
|
|
||||||
let authAgeMs: Double?
|
|
||||||
let connect: Connect?
|
|
||||||
|
|
||||||
struct Connect: Codable {
|
|
||||||
let ok: Bool
|
|
||||||
let status: Int?
|
|
||||||
let error: String?
|
|
||||||
let elapsedMs: Double?
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
struct Sessions: Codable {
|
|
||||||
struct Entry: Codable {
|
|
||||||
let key: String
|
|
||||||
let updatedAt: Double?
|
|
||||||
let age: Double?
|
|
||||||
}
|
|
||||||
let path: String
|
|
||||||
let count: Int
|
|
||||||
let recent: [Entry]
|
|
||||||
}
|
|
||||||
|
|
||||||
struct IPC: Codable {
|
|
||||||
let path: String
|
|
||||||
let exists: Bool
|
|
||||||
}
|
|
||||||
|
|
||||||
let ts: Double
|
|
||||||
let durationMs: Double
|
|
||||||
let web: Web
|
|
||||||
let heartbeatSeconds: Int
|
|
||||||
let sessions: Sessions
|
|
||||||
let ipc: IPC
|
|
||||||
}
|
|
||||||
|
|
||||||
enum ControlChannelError: Error, LocalizedError {
|
enum ControlChannelError: Error, LocalizedError {
|
||||||
case disconnected
|
case disconnected
|
||||||
case badResponse(String)
|
case badResponse(String)
|
||||||
case sshFailed(String)
|
|
||||||
|
|
||||||
var errorDescription: String? {
|
var errorDescription: String? {
|
||||||
switch self {
|
switch self {
|
||||||
case .disconnected: return "Control channel disconnected"
|
case .disconnected: "Control channel disconnected"
|
||||||
case let .badResponse(msg): return msg
|
case let .badResponse(msg): msg
|
||||||
case let .sshFailed(msg): return "SSH tunnel failed: \(msg)"
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -169,356 +69,74 @@ enum ControlChannelError: Error, LocalizedError {
|
|||||||
final class ControlChannel: ObservableObject {
|
final class ControlChannel: ObservableObject {
|
||||||
static let shared = ControlChannel()
|
static let shared = ControlChannel()
|
||||||
|
|
||||||
|
enum ConnectionState: Equatable {
|
||||||
|
case disconnected
|
||||||
|
case connected
|
||||||
|
case degraded(String)
|
||||||
|
}
|
||||||
|
|
||||||
enum Mode: Equatable {
|
enum Mode: Equatable {
|
||||||
case local
|
case local
|
||||||
case remote(target: String, identity: String)
|
case remote(target: String, identity: String)
|
||||||
}
|
}
|
||||||
|
|
||||||
enum ConnectionState: Equatable {
|
|
||||||
case disconnected
|
|
||||||
case connecting
|
|
||||||
case connected
|
|
||||||
case degraded(String)
|
|
||||||
}
|
|
||||||
|
|
||||||
private let logger = Logger(subsystem: "com.steipete.clawdis", category: "control")
|
|
||||||
private var connection: NWConnection?
|
|
||||||
private var sshProcess: Process?
|
|
||||||
private var buffer = Data()
|
|
||||||
private var pending: [String: CheckedContinuation<Data, Error>] = [:]
|
|
||||||
private var listenTask: Task<Void, Never>?
|
|
||||||
private var mode: Mode = .local
|
|
||||||
private var localPort: UInt16 = 18789
|
|
||||||
private var pingTask: Task<Void, Never>?
|
|
||||||
private var jobStates: [String: String] = [:]
|
|
||||||
|
|
||||||
@Published private(set) var state: ConnectionState = .disconnected
|
@Published private(set) var state: ConnectionState = .disconnected
|
||||||
@Published private(set) var lastPingMs: Double?
|
@Published private(set) var lastPingMs: Double?
|
||||||
|
|
||||||
func configure(mode: Mode) async throws {
|
private let logger = Logger(subsystem: "com.steipete.clawdis", category: "control")
|
||||||
if mode == self.mode, self.connection != nil { return }
|
|
||||||
await self.disconnect()
|
|
||||||
self.mode = mode
|
|
||||||
try await self.connect()
|
|
||||||
|
|
||||||
NotificationCenter.default.addObserver(
|
func configure() async {
|
||||||
forName: .controlAgentEvent,
|
do {
|
||||||
object: nil,
|
try await AgentRPC.shared.start()
|
||||||
queue: .main)
|
self.state = .connected
|
||||||
{ note in
|
} catch {
|
||||||
if let evt = note.object as? ControlAgentEvent {
|
self.state = .degraded(error.localizedDescription)
|
||||||
DispatchQueue.main.async { @MainActor in
|
|
||||||
AgentEventStore.shared.append(evt)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func disconnect() async {
|
func configure(mode: Mode) async throws {
|
||||||
self.listenTask?.cancel()
|
// Mode is retained for API compatibility; transport is always stdio now.
|
||||||
self.listenTask = nil
|
try await self.configure()
|
||||||
self.pingTask?.cancel()
|
|
||||||
self.pingTask = nil
|
|
||||||
if let conn = self.connection {
|
|
||||||
conn.cancel()
|
|
||||||
}
|
|
||||||
self.connection = nil
|
|
||||||
if let ssh = self.sshProcess, ssh.isRunning { ssh.terminate() }
|
|
||||||
self.sshProcess = nil
|
|
||||||
for (_, cont) in self.pending {
|
|
||||||
cont.resume(throwing: ControlChannelError.disconnected)
|
|
||||||
}
|
|
||||||
self.pending.removeAll()
|
|
||||||
self.state = .disconnected
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func health(timeout: TimeInterval? = nil) async throws -> Data {
|
func health(timeout: TimeInterval? = nil) async throws -> Data {
|
||||||
try await self.ensureConnected()
|
let params = timeout.map { ["timeoutMs": Int($0 * 1000)] }
|
||||||
let start = Date()
|
do {
|
||||||
self.logger.debug("health probe start timeout=\(timeout?.description ?? "nil", privacy: .public)")
|
let start = Date()
|
||||||
let payload = try await self.request(
|
let payload = try await AgentRPC.shared.controlRequest(method: "health", params: params)
|
||||||
method: "health",
|
let ms = Date().timeIntervalSince(start) * 1000
|
||||||
params: timeout.map { ["timeoutMs": Int($0 * 1000)] },
|
self.lastPingMs = ms
|
||||||
timeout: timeout.map { $0 + 1 } // small cushion over server-side timeout
|
self.state = .connected
|
||||||
)
|
return payload
|
||||||
let ms = Int(Date().timeIntervalSince(start) * 1000)
|
} catch {
|
||||||
self.logger.debug("health probe ok in \(ms)ms")
|
self.state = .degraded(error.localizedDescription)
|
||||||
return payload
|
throw error
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func lastHeartbeat() async throws -> ControlHeartbeatEvent? {
|
func lastHeartbeat() async throws -> ControlHeartbeatEvent? {
|
||||||
try await self.ensureConnected()
|
let data = try await AgentRPC.shared.controlRequest(method: "last-heartbeat")
|
||||||
let data = try await self.request(method: "last-heartbeat")
|
|
||||||
if data.isEmpty { return nil }
|
if data.isEmpty { return nil }
|
||||||
return try? JSONDecoder().decode(ControlHeartbeatEvent.self, from: data)
|
return try? JSONDecoder().decode(ControlHeartbeatEvent.self, from: data)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func request(method: String, params: [String: Any]? = nil) async throws -> Data {
|
||||||
|
do {
|
||||||
|
let data = try await AgentRPC.shared.controlRequest(method: method, params: params)
|
||||||
|
self.state = .connected
|
||||||
|
return data
|
||||||
|
} catch {
|
||||||
|
self.state = .degraded(error.localizedDescription)
|
||||||
|
throw error
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func sendSystemEvent(_ text: String) async throws {
|
func sendSystemEvent(_ text: String) async throws {
|
||||||
let trimmed = text.trimmingCharacters(in: .whitespacesAndNewlines)
|
_ = try await self.request(method: "system-event", params: ["text": text])
|
||||||
guard !trimmed.isEmpty else { return }
|
|
||||||
_ = try await self.request(method: "system-event", params: ["text": trimmed], timeout: 5)
|
|
||||||
}
|
|
||||||
|
|
||||||
func request(method: String, params: [String: Any]? = nil, timeout: TimeInterval? = nil) async throws -> Data {
|
|
||||||
try await self.ensureConnected()
|
|
||||||
let id = UUID().uuidString
|
|
||||||
var frame: [String: Any] = ["type": "request", "id": id, "method": method]
|
|
||||||
if let params { frame["params"] = params }
|
|
||||||
let data = try JSONSerialization.data(withJSONObject: frame)
|
|
||||||
try await self.send(data)
|
|
||||||
return try await withCheckedThrowingContinuation { (cont: CheckedContinuation<Data, Error>) in
|
|
||||||
self.pending[id] = cont
|
|
||||||
if let timeout {
|
|
||||||
Task { [weak self] in
|
|
||||||
try? await Task.sleep(nanoseconds: UInt64(timeout * 1_000_000_000))
|
|
||||||
guard let self else { return }
|
|
||||||
if let pending = self.pending.removeValue(forKey: id) {
|
|
||||||
self.logger.error("control request \(method) timed out after \(Int(timeout))s")
|
|
||||||
pending.resume(throwing: ControlChannelError.badResponse("timeout after \(Int(timeout))s"))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private func ensureConnected() async throws {
|
|
||||||
if let conn = self.connection {
|
|
||||||
switch conn.state {
|
|
||||||
case .ready: return
|
|
||||||
default: break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
try await self.connect()
|
|
||||||
}
|
|
||||||
|
|
||||||
private func connect() async throws {
|
|
||||||
switch self.mode {
|
|
||||||
case .local:
|
|
||||||
self.localPort = 18789
|
|
||||||
case let .remote(target, identity):
|
|
||||||
self.localPort = try self.startSSHTunnel(target: target, identity: identity)
|
|
||||||
}
|
|
||||||
|
|
||||||
self.state = .connecting
|
|
||||||
|
|
||||||
let host = NWEndpoint.Host("127.0.0.1")
|
|
||||||
let port = NWEndpoint.Port(rawValue: self.localPort)!
|
|
||||||
let conn = NWConnection(host: host, port: port, using: .tcp)
|
|
||||||
self.connection = conn
|
|
||||||
|
|
||||||
let waiter = ConnectionWaiter()
|
|
||||||
|
|
||||||
conn.stateUpdateHandler = { [weak self, weak conn] state in
|
|
||||||
switch state {
|
|
||||||
case .ready:
|
|
||||||
Task { @MainActor in self?.state = .connected }
|
|
||||||
Task {
|
|
||||||
await waiter.succeed()
|
|
||||||
conn?.stateUpdateHandler = nil
|
|
||||||
}
|
|
||||||
case let .failed(err):
|
|
||||||
Task { @MainActor in self?.state = .degraded(err.localizedDescription) }
|
|
||||||
Task {
|
|
||||||
await waiter.fail(err)
|
|
||||||
conn?.stateUpdateHandler = nil
|
|
||||||
}
|
|
||||||
case let .waiting(err):
|
|
||||||
Task { @MainActor in self?.state = .degraded(err.localizedDescription) }
|
|
||||||
Task {
|
|
||||||
await waiter.fail(err)
|
|
||||||
conn?.stateUpdateHandler = nil
|
|
||||||
}
|
|
||||||
default:
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
conn.start(queue: .global())
|
|
||||||
try await waiter.wait()
|
|
||||||
|
|
||||||
self.listenTask = Task.detached { [weak self] in
|
|
||||||
await self?.listen()
|
|
||||||
}
|
|
||||||
|
|
||||||
self.pingTask = Task.detached { [weak self] in
|
|
||||||
guard let self else { return }
|
|
||||||
while !Task.isCancelled {
|
|
||||||
do {
|
|
||||||
try await Task.sleep(nanoseconds: 30 * 1_000_000_000)
|
|
||||||
let start = Date()
|
|
||||||
_ = try await self.request(method: "ping")
|
|
||||||
let ms = Date().timeIntervalSince(start) * 1000
|
|
||||||
await MainActor.run { self.lastPingMs = ms; self.state = .connected }
|
|
||||||
} catch {
|
|
||||||
await MainActor.run { self.state = .degraded(error.localizedDescription) }
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private func startSSHTunnel(target: String, identity: String) throws -> UInt16 {
|
|
||||||
let localPort = Self.pickAvailablePort()
|
|
||||||
let proc = Process()
|
|
||||||
proc.executableURL = URL(fileURLWithPath: "/usr/bin/ssh")
|
|
||||||
var args: [String] = [
|
|
||||||
"-o", "BatchMode=yes",
|
|
||||||
"-o", "ExitOnForwardFailure=yes",
|
|
||||||
"-N", // don't run a remote shell; keep the tunnel open
|
|
||||||
"-T", // no pseudo-tty
|
|
||||||
"-L", "\(localPort):127.0.0.1:18789",
|
|
||||||
target,
|
|
||||||
]
|
|
||||||
if !identity.trimmingCharacters(in: .whitespacesAndNewlines).isEmpty {
|
|
||||||
args.insert(contentsOf: ["-i", identity], at: 2)
|
|
||||||
}
|
|
||||||
proc.arguments = args
|
|
||||||
proc.standardInput = nil
|
|
||||||
let outPipe = Pipe()
|
|
||||||
let errPipe = Pipe()
|
|
||||||
proc.standardOutput = outPipe
|
|
||||||
proc.standardError = errPipe
|
|
||||||
try proc.run()
|
|
||||||
// Give ssh a brief moment; if it exits immediately we surface stderr instead of silently failing.
|
|
||||||
Thread.sleep(forTimeInterval: 0.2) // 200ms
|
|
||||||
if !proc.isRunning {
|
|
||||||
let err = String(data: errPipe.fileHandleForReading.readDataToEndOfFile(), encoding: .utf8)?.trimmingCharacters(in: .whitespacesAndNewlines)
|
|
||||||
throw ControlChannelError.sshFailed(err ?? "ssh exited")
|
|
||||||
}
|
|
||||||
self.sshProcess = proc
|
|
||||||
return localPort
|
|
||||||
}
|
|
||||||
|
|
||||||
private func send(_ data: Data) async throws {
|
|
||||||
guard let conn = self.connection else { throw ControlChannelError.disconnected }
|
|
||||||
let line = data + Data([0x0A])
|
|
||||||
try await withCheckedThrowingContinuation { (cont: CheckedContinuation<Void, Error>) in
|
|
||||||
conn.send(content: line, completion: .contentProcessed { error in
|
|
||||||
if let error { cont.resume(throwing: error) }
|
|
||||||
else { cont.resume(returning: ()) }
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private func listen() async {
|
|
||||||
guard let conn = self.connection else { return }
|
|
||||||
while true {
|
|
||||||
let result: (Data?, Bool, NWError?) = await withCheckedContinuation { cont in
|
|
||||||
conn.receiveMessage { data, _, isComplete, error in
|
|
||||||
cont.resume(returning: (data, isComplete, error))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
let (data, isComplete, error) = result
|
|
||||||
if let error {
|
|
||||||
self.logger.debug("control receive error: \(error.localizedDescription, privacy: .public)")
|
|
||||||
break
|
|
||||||
}
|
|
||||||
if isComplete {
|
|
||||||
self.logger.debug("control receive complete")
|
|
||||||
break
|
|
||||||
}
|
|
||||||
guard let data else { continue }
|
|
||||||
self.buffer.append(data)
|
|
||||||
while let range = buffer.firstRange(of: Data([0x0A])) {
|
|
||||||
let lineData = buffer.subdata(in: buffer.startIndex..<range.lowerBound)
|
|
||||||
buffer.removeSubrange(buffer.startIndex...range.lowerBound)
|
|
||||||
self.handleLine(lineData)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// If we exit the loop, drop the connection so the next request reconnects.
|
|
||||||
await self.disconnect()
|
|
||||||
}
|
|
||||||
|
|
||||||
private func handleLine(_ data: Data) {
|
|
||||||
guard let obj = try? JSONSerialization.jsonObject(with: data) as? [String: Any],
|
|
||||||
let type = obj["type"] as? String else { return }
|
|
||||||
|
|
||||||
if type == "event", let event = obj["event"] as? String {
|
|
||||||
if event == "heartbeat", let payload = obj["payload"] {
|
|
||||||
if let payloadData = try? JSONSerialization.data(withJSONObject: payload) {
|
|
||||||
NotificationCenter.default.post(name: .controlHeartbeat, object: payloadData)
|
|
||||||
}
|
|
||||||
} else if event == "agent", let payload = obj["payload"] {
|
|
||||||
if let payloadData = try? JSONSerialization.data(withJSONObject: payload),
|
|
||||||
let agent = try? JSONDecoder().decode(ControlAgentEvent.self, from: payloadData) {
|
|
||||||
self.handleAgentEvent(agent)
|
|
||||||
NotificationCenter.default.post(name: .controlAgentEvent, object: agent)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
if type == "response", let id = obj["id"] as? String {
|
|
||||||
let ok = obj["ok"] as? Bool ?? false
|
|
||||||
if ok, let payload = obj["payload"] {
|
|
||||||
let payloadData = (try? JSONSerialization.data(withJSONObject: payload)) ?? Data()
|
|
||||||
self.pending[id]?.resume(returning: payloadData)
|
|
||||||
} else {
|
|
||||||
let err = (obj["error"] as? String) ?? "control error"
|
|
||||||
self.pending[id]?.resume(throwing: ControlChannelError.badResponse(err))
|
|
||||||
}
|
|
||||||
self.pending.removeValue(forKey: id)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private func handleAgentEvent(_ event: ControlAgentEvent) {
|
|
||||||
if event.stream == "job" {
|
|
||||||
if let state = event.data["state"]?.value as? String {
|
|
||||||
let normalized = state.lowercased()
|
|
||||||
if normalized == "done" || normalized == "error" {
|
|
||||||
self.jobStates.removeValue(forKey: event.runId)
|
|
||||||
} else {
|
|
||||||
self.jobStates[event.runId] = normalized
|
|
||||||
}
|
|
||||||
|
|
||||||
let workingStates: Set<String> = ["started", "streaming", "running", "queued", "waiting"]
|
|
||||||
let working = self.jobStates.values.contains { workingStates.contains($0) }
|
|
||||||
Task { @MainActor in
|
|
||||||
AppStateStore.shared.setWorking(working)
|
|
||||||
WorkActivityStore.shared.handleJob(
|
|
||||||
sessionKey: event.runId,
|
|
||||||
state: state)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} else if event.stream == "tool" {
|
|
||||||
guard let phase = event.data["phase"]?.value as? String else { return }
|
|
||||||
let name = event.data["name"]?.value as? String
|
|
||||||
let meta = event.data["meta"]?.value as? String
|
|
||||||
let args = event.data["args"]?.value as? [String: AnyCodable]
|
|
||||||
Task { @MainActor in
|
|
||||||
WorkActivityStore.shared.handleTool(
|
|
||||||
sessionKey: event.runId,
|
|
||||||
phase: phase,
|
|
||||||
name: name,
|
|
||||||
meta: meta,
|
|
||||||
args: args)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private static func pickAvailablePort() -> UInt16 {
|
|
||||||
var port: UInt16 = 0
|
|
||||||
let socket = socket(AF_INET, SOCK_STREAM, 0)
|
|
||||||
defer { close(socket) }
|
|
||||||
var addr = sockaddr_in()
|
|
||||||
addr.sin_family = sa_family_t(AF_INET)
|
|
||||||
addr.sin_port = in_port_t(0).bigEndian
|
|
||||||
addr.sin_addr = in_addr(s_addr: inet_addr("127.0.0.1"))
|
|
||||||
_ = withUnsafePointer(to: &addr) {
|
|
||||||
$0.withMemoryRebound(to: sockaddr.self, capacity: 1) {
|
|
||||||
bind(socket, $0, socklen_t(MemoryLayout<sockaddr_in>.size))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
var len = socklen_t(MemoryLayout<sockaddr_in>.size)
|
|
||||||
getsockname(socket, withUnsafeMutablePointer(to: &addr) {
|
|
||||||
$0.withMemoryRebound(to: sockaddr.self, capacity: 1) { $0 }
|
|
||||||
}, &len)
|
|
||||||
// Asking the kernel for port 0 yields an ephemeral free port; reuse it for the SSH tunnel.
|
|
||||||
port = UInt16(bigEndian: addr.sin_port)
|
|
||||||
return port
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
extension Notification.Name {
|
extension Notification.Name {
|
||||||
static let controlHeartbeat = Notification.Name("clawdis.control.heartbeat")
|
static let controlHeartbeat = Notification.Name("clawdis.control.heartbeat")
|
||||||
|
static let controlAgentEvent = Notification.Name("clawdis.control.agent")
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,10 +1,10 @@
|
|||||||
import chalk from "chalk";
|
import chalk from "chalk";
|
||||||
import { Command } from "commander";
|
import { Command } from "commander";
|
||||||
import { agentCommand } from "../commands/agent.js";
|
import { agentCommand } from "../commands/agent.js";
|
||||||
import { healthCommand } from "../commands/health.js";
|
import { getHealthSnapshot, healthCommand, type HealthSummary } from "../commands/health.js";
|
||||||
import { sendCommand } from "../commands/send.js";
|
import { sendCommand } from "../commands/send.js";
|
||||||
import { sessionsCommand } from "../commands/sessions.js";
|
import { sessionsCommand } from "../commands/sessions.js";
|
||||||
import { statusCommand } from "../commands/status.js";
|
import { getStatusSummary, statusCommand, type StatusSummary } from "../commands/status.js";
|
||||||
import { loadConfig } from "../config/config.js";
|
import { loadConfig } from "../config/config.js";
|
||||||
import { danger, info, setVerbose } from "../globals.js";
|
import { danger, info, setVerbose } from "../globals.js";
|
||||||
import { startControlChannel } from "../infra/control-channel.js";
|
import { startControlChannel } from "../infra/control-channel.js";
|
||||||
@@ -12,6 +12,7 @@ import {
|
|||||||
getLastHeartbeatEvent,
|
getLastHeartbeatEvent,
|
||||||
onHeartbeatEvent,
|
onHeartbeatEvent,
|
||||||
} from "../infra/heartbeat-events.js";
|
} from "../infra/heartbeat-events.js";
|
||||||
|
import { onAgentEvent } from "../infra/agent-events.js";
|
||||||
import { getResolvedLoggerSettings } from "../logging.js";
|
import { getResolvedLoggerSettings } from "../logging.js";
|
||||||
import {
|
import {
|
||||||
loginWeb,
|
loginWeb,
|
||||||
@@ -33,6 +34,12 @@ import {
|
|||||||
startWebChatServer,
|
startWebChatServer,
|
||||||
} from "../webchat/server.js";
|
} from "../webchat/server.js";
|
||||||
import { createDefaultDeps, logWebSelfId } from "./deps.js";
|
import { createDefaultDeps, logWebSelfId } from "./deps.js";
|
||||||
|
import { onAgentEvent } from "../infra/agent-events.js";
|
||||||
|
import {
|
||||||
|
enqueueSystemEvent,
|
||||||
|
listSystemPresence,
|
||||||
|
updateSystemPresence,
|
||||||
|
} from "../infra/system-presence.js";
|
||||||
|
|
||||||
export function buildProgram() {
|
export function buildProgram() {
|
||||||
const program = new Command();
|
const program = new Command();
|
||||||
@@ -249,10 +256,14 @@ Examples:
|
|||||||
const forwardHeartbeat = (payload: unknown) => {
|
const forwardHeartbeat = (payload: unknown) => {
|
||||||
respond({ type: "event", event: "heartbeat", payload });
|
respond({ type: "event", event: "heartbeat", payload });
|
||||||
};
|
};
|
||||||
|
const forwardAgent = (payload: unknown) => {
|
||||||
|
respond({ type: "event", event: "agent", payload });
|
||||||
|
};
|
||||||
|
|
||||||
const latest = getLastHeartbeatEvent();
|
const latest = getLastHeartbeatEvent();
|
||||||
if (latest) forwardHeartbeat(latest);
|
if (latest) forwardHeartbeat(latest);
|
||||||
const stopBus = onHeartbeatEvent(forwardHeartbeat);
|
const stopBus = onHeartbeatEvent(forwardHeartbeat);
|
||||||
|
const stopAgentBus = onAgentEvent(forwardAgent);
|
||||||
|
|
||||||
rl.on("line", async (line: string) => {
|
rl.on("line", async (line: string) => {
|
||||||
if (!line.trim()) return;
|
if (!line.trim()) return;
|
||||||
@@ -267,6 +278,52 @@ Examples:
|
|||||||
respond({ type: "result", ok: true });
|
respond({ type: "result", ok: true });
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
if (cmd.type === "control-request" && cmd.id && cmd.method) {
|
||||||
|
const id = String(cmd.id);
|
||||||
|
const method = String(cmd.method);
|
||||||
|
const params = (cmd.params ?? {}) as Record<string, unknown>;
|
||||||
|
const controlRespond = (ok: boolean, payload?: unknown, error?: string) =>
|
||||||
|
respond({ type: "control-response", id, ok, payload, error });
|
||||||
|
try {
|
||||||
|
if (method === "health") {
|
||||||
|
const timeoutMs = typeof params.timeoutMs === "number" ? params.timeoutMs : undefined;
|
||||||
|
const payload = await getHealthSnapshot(timeoutMs);
|
||||||
|
controlRespond(true, payload satisfies HealthSummary);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if (method === "status") {
|
||||||
|
const payload = await getStatusSummary();
|
||||||
|
controlRespond(true, payload satisfies StatusSummary);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if (method === "last-heartbeat") {
|
||||||
|
controlRespond(true, getLastHeartbeatEvent());
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if (method === "set-heartbeats") {
|
||||||
|
setHeartbeatsEnabled(Boolean(params.enabled));
|
||||||
|
controlRespond(true, { ok: true });
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if (method === "system-event") {
|
||||||
|
const text = String(params.text ?? "").trim();
|
||||||
|
if (text) {
|
||||||
|
enqueueSystemEvent(text);
|
||||||
|
updateSystemPresence(text);
|
||||||
|
}
|
||||||
|
controlRespond(true, { ok: true });
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if (method === "system-presence") {
|
||||||
|
controlRespond(true, listSystemPresence());
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
controlRespond(false, undefined, `unknown control method: ${method}`);
|
||||||
|
} catch (err) {
|
||||||
|
controlRespond(false, undefined, String(err));
|
||||||
|
}
|
||||||
|
return;
|
||||||
|
}
|
||||||
if (cmd.type !== "send" || !cmd.text) {
|
if (cmd.type !== "send" || !cmd.text) {
|
||||||
respond({ type: "error", error: "unsupported command" });
|
respond({ type: "error", error: "unsupported command" });
|
||||||
return;
|
return;
|
||||||
@@ -326,6 +383,7 @@ Examples:
|
|||||||
await new Promise(() => {});
|
await new Promise(() => {});
|
||||||
|
|
||||||
stopBus();
|
stopBus();
|
||||||
|
stopAgentBus();
|
||||||
});
|
});
|
||||||
|
|
||||||
program
|
program
|
||||||
|
|||||||
Reference in New Issue
Block a user