From 04f595cd97324315a76942a1705b20ffc85ea297 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Tue, 9 Dec 2025 02:25:01 +0000 Subject: [PATCH] Control: route health/heartbeat over RPC stdio --- apps/macos/Sources/Clawdis/AgentRPC.swift | 133 ++++- .../Sources/Clawdis/ControlChannel.swift | 484 ++---------------- src/cli/program.ts | 62 ++- 3 files changed, 239 insertions(+), 440 deletions(-) diff --git a/apps/macos/Sources/Clawdis/AgentRPC.swift b/apps/macos/Sources/Clawdis/AgentRPC.swift index d6f83a234..c8f9bb94f 100644 --- a/apps/macos/Sources/Clawdis/AgentRPC.swift +++ b/apps/macos/Sources/Clawdis/AgentRPC.swift @@ -15,12 +15,58 @@ actor AgentRPC { } static let heartbeatNotification = Notification.Name("clawdis.rpc.heartbeat") + static let agentEventNotification = Notification.Name("clawdis.rpc.agent") + + private struct ControlResponse: Decodable { + let type: String + let id: String + let ok: Bool + let payload: AnyCodable? + let error: String? + } + + struct AnyCodable: Codable { + let value: Any + + init(_ value: Any) { self.value = value } + + init(from decoder: Decoder) throws { + let container = try decoder.singleValueContainer() + if let intVal = try? container.decode(Int.self) { self.value = intVal; return } + if let doubleVal = try? container.decode(Double.self) { self.value = doubleVal; return } + if let boolVal = try? container.decode(Bool.self) { self.value = boolVal; return } + if let stringVal = try? container.decode(String.self) { self.value = stringVal; return } + if container.decodeNil() { self.value = NSNull(); return } + if let dict = try? container.decode([String: AnyCodable].self) { self.value = dict; return } + if let array = try? container.decode([AnyCodable].self) { self.value = array; return } + throw DecodingError.dataCorruptedError(in: container, debugDescription: "Unsupported type") + } + + func encode(to encoder: Encoder) throws { + var container = encoder.singleValueContainer() + switch self.value { + case let intVal as Int: try container.encode(intVal) + case let doubleVal as Double: try container.encode(doubleVal) + case let boolVal as Bool: try container.encode(boolVal) + case let stringVal as String: try container.encode(stringVal) + case is NSNull: try container.encodeNil() + case let dict as [String: AnyCodable]: try container.encode(dict) + case let array as [AnyCodable]: try container.encode(array) + default: + let context = EncodingError.Context( + codingPath: encoder.codingPath, + debugDescription: "Unsupported type") + throw EncodingError.invalidValue(self.value, context) + } + } + } private var process: Process? private var stdinHandle: FileHandle? private var stdoutHandle: FileHandle? private var buffer = Data() private var waiters: [CheckedContinuation] = [] + private var controlWaiters: [String: CheckedContinuation] = [:] private let logger = Logger(subsystem: "com.steipete.clawdis", category: "agent.rpc") private var starting = false @@ -127,6 +173,22 @@ actor AgentRPC { } } + func controlRequest(method: String, params: [String: Any]? = nil) async throws -> Data { + if self.process?.isRunning != true { + try await self.start() + } + let id = UUID().uuidString + var frame: [String: Any] = ["type": "control-request", "id": id, "method": method] + if let params { frame["params"] = params } + let data = try JSONSerialization.data(withJSONObject: frame) + guard let stdinHandle else { throw RpcError(message: "stdin missing") } + return try await withCheckedThrowingContinuation { (cont: CheckedContinuation) in + self.controlWaiters[id] = cont + stdinHandle.write(data) + stdinHandle.write(Data([0x0A])) + } + } + // MARK: - Process lifecycle func start() async throws { @@ -180,6 +242,11 @@ actor AgentRPC { for waiter in waiters { waiter.resume(throwing: RpcError(message: "rpc process stopped")) } + let control = self.controlWaiters + self.controlWaiters.removeAll() + for (_, waiter) in control { + waiter.resume(throwing: RpcError(message: "rpc process stopped")) + } } private func ingest(data: Data) { @@ -189,11 +256,11 @@ actor AgentRPC { self.buffer.removeSubrange(self.buffer.startIndex...range.lowerBound) guard let line = String(data: lineData, encoding: .utf8) else { continue } - // Event frames are pushed without request/response pairing (e.g., heartbeats). - if let event = self.parseHeartbeatEvent(from: line) { - DispatchQueue.main.async { - NotificationCenter.default.post(name: Self.heartbeatNotification, object: event) - } + // Event frames are pushed without request/response pairing (e.g., heartbeats/agent). + if self.handleEventLine(line) { + continue + } + if self.handleControlResponse(line) { continue } if let waiter = waiters.first { @@ -221,6 +288,62 @@ actor AgentRPC { return try? decoder.decode(HeartbeatEvent.self, from: payloadData) } + private func parseAgentEvent(from line: String) -> ControlAgentEvent? { + guard let data = line.data(using: .utf8) else { return nil } + guard + let obj = try? JSONSerialization.jsonObject(with: data) as? [String: Any], + let type = obj["type"] as? String, + type == "event", + let evt = obj["event"] as? String, + evt == "agent", + let payload = obj["payload"] + else { + return nil + } + + guard let payloadData = try? JSONSerialization.data(withJSONObject: payload) else { return nil } + return try? JSONDecoder().decode(ControlAgentEvent.self, from: payloadData) + } + + private func handleEventLine(_ line: String) -> Bool { + if let hb = self.parseHeartbeatEvent(from: line) { + DispatchQueue.main.async { + NotificationCenter.default.post(name: Self.heartbeatNotification, object: hb) + NotificationCenter.default.post(name: .controlHeartbeat, object: hb) + } + return true + } + if let agent = self.parseAgentEvent(from: line) { + DispatchQueue.main.async { + NotificationCenter.default.post(name: Self.agentEventNotification, object: agent) + NotificationCenter.default.post(name: .controlAgentEvent, object: agent) + } + return true + } + return false + } + + private func handleControlResponse(_ line: String) -> Bool { + guard let data = line.data(using: .utf8) else { return false } + guard let parsed = try? JSONDecoder().decode(ControlResponse.self, from: data) else { return false } + guard parsed.type == "control-response" else { return false } + guard let waiter = self.controlWaiters.removeValue(forKey: parsed.id) else { + self.logger.debug("control response with no waiter id=\(parsed.id, privacy: .public)") + return true + } + if parsed.ok { + let payloadData: Data = if let payload = parsed.payload { + (try? JSONEncoder().encode(payload)) ?? Data() + } else { + Data() + } + waiter.resume(returning: payloadData) + } else { + waiter.resume(throwing: RpcError(message: parsed.error ?? "control error")) + } + return true + } + private func nextLine() async throws -> String { try await withCheckedThrowingContinuation { (cont: CheckedContinuation) in self.waiters.append(cont) diff --git a/apps/macos/Sources/Clawdis/ControlChannel.swift b/apps/macos/Sources/Clawdis/ControlChannel.swift index e60ad8669..e94a02da1 100644 --- a/apps/macos/Sources/Clawdis/ControlChannel.swift +++ b/apps/macos/Sources/Clawdis/ControlChannel.swift @@ -1,7 +1,5 @@ import Foundation -import Network import OSLog -import Darwin struct ControlHeartbeatEvent: Codable { let ts: Double @@ -21,10 +19,6 @@ struct ControlAgentEvent: Codable, Sendable { let data: [String: AnyCodable] } -extension Notification.Name { - static let controlAgentEvent = Notification.Name("clawdis.control.agent") -} - struct AnyCodable: Codable, @unchecked Sendable { let value: Any @@ -32,27 +26,13 @@ struct AnyCodable: Codable, @unchecked Sendable { init(from decoder: Decoder) throws { let container = try decoder.singleValueContainer() - if let intVal = try? container.decode(Int.self) { - self.value = intVal; return - } - if let doubleVal = try? container.decode(Double.self) { - self.value = doubleVal; return - } - if let boolVal = try? container.decode(Bool.self) { - self.value = boolVal; return - } - if let stringVal = try? container.decode(String.self) { - self.value = stringVal; return - } - if container.decodeNil() { - self.value = NSNull(); return - } - if let dict = try? container.decode([String: AnyCodable].self) { - self.value = dict; return - } - if let array = try? container.decode([AnyCodable].self) { - self.value = array; return - } + if let intVal = try? container.decode(Int.self) { self.value = intVal; return } + if let doubleVal = try? container.decode(Double.self) { self.value = doubleVal; return } + if let boolVal = try? container.decode(Bool.self) { self.value = boolVal; return } + if let stringVal = try? container.decode(String.self) { self.value = stringVal; return } + if container.decodeNil() { self.value = NSNull(); return } + if let dict = try? container.decode([String: AnyCodable].self) { self.value = dict; return } + if let array = try? container.decode([AnyCodable].self) { self.value = array; return } throw DecodingError.dataCorruptedError(in: container, debugDescription: "Unsupported type") } @@ -73,94 +53,14 @@ struct AnyCodable: Codable, @unchecked Sendable { } } -// Handles single-shot continuation resumption without Sendable capture issues -actor ConnectionWaiter { - private var cont: CheckedContinuation? - private var resumed = false - private var pendingResult: Result? - - func wait() async throws { - // Acts like a one-shot Future; if the connection resolves before wait() is called, - // stash the result so the waiter resumes immediately. - try await withCheckedThrowingContinuation { (c: CheckedContinuation) in - if let pending = pendingResult { - pendingResult = nil - resumed = true - c.resume(with: pending) - } else { - cont = c - } - } - } - - func succeed() { - resume(.success(())) - } - - func fail(_ error: Error) { - resume(.failure(error)) - } - - private func resume(_ result: Result) { - if resumed { return } - if let c = cont { - resumed = true - cont = nil - c.resume(with: result) - } else { - pendingResult = result - } - } -} - -struct ControlHealthSnapshot: Codable { - struct Web: Codable { - let linked: Bool - let authAgeMs: Double? - let connect: Connect? - - struct Connect: Codable { - let ok: Bool - let status: Int? - let error: String? - let elapsedMs: Double? - } - } - - struct Sessions: Codable { - struct Entry: Codable { - let key: String - let updatedAt: Double? - let age: Double? - } - let path: String - let count: Int - let recent: [Entry] - } - - struct IPC: Codable { - let path: String - let exists: Bool - } - - let ts: Double - let durationMs: Double - let web: Web - let heartbeatSeconds: Int - let sessions: Sessions - let ipc: IPC -} - enum ControlChannelError: Error, LocalizedError { case disconnected case badResponse(String) - case sshFailed(String) var errorDescription: String? { switch self { - case .disconnected: return "Control channel disconnected" - case let .badResponse(msg): return msg - case let .sshFailed(msg): return "SSH tunnel failed: \(msg)" + case .disconnected: "Control channel disconnected" + case let .badResponse(msg): msg } } } @@ -169,356 +69,74 @@ enum ControlChannelError: Error, LocalizedError { final class ControlChannel: ObservableObject { static let shared = ControlChannel() + enum ConnectionState: Equatable { + case disconnected + case connected + case degraded(String) + } + enum Mode: Equatable { case local case remote(target: String, identity: String) } - enum ConnectionState: Equatable { - case disconnected - case connecting - case connected - case degraded(String) - } - - private let logger = Logger(subsystem: "com.steipete.clawdis", category: "control") - private var connection: NWConnection? - private var sshProcess: Process? - private var buffer = Data() - private var pending: [String: CheckedContinuation] = [:] - private var listenTask: Task? - private var mode: Mode = .local - private var localPort: UInt16 = 18789 - private var pingTask: Task? - private var jobStates: [String: String] = [:] - @Published private(set) var state: ConnectionState = .disconnected @Published private(set) var lastPingMs: Double? - func configure(mode: Mode) async throws { - if mode == self.mode, self.connection != nil { return } - await self.disconnect() - self.mode = mode - try await self.connect() + private let logger = Logger(subsystem: "com.steipete.clawdis", category: "control") - NotificationCenter.default.addObserver( - forName: .controlAgentEvent, - object: nil, - queue: .main) - { note in - if let evt = note.object as? ControlAgentEvent { - DispatchQueue.main.async { @MainActor in - AgentEventStore.shared.append(evt) - } - } + func configure() async { + do { + try await AgentRPC.shared.start() + self.state = .connected + } catch { + self.state = .degraded(error.localizedDescription) } } - func disconnect() async { - self.listenTask?.cancel() - self.listenTask = nil - self.pingTask?.cancel() - self.pingTask = nil - if let conn = self.connection { - conn.cancel() - } - self.connection = nil - if let ssh = self.sshProcess, ssh.isRunning { ssh.terminate() } - self.sshProcess = nil - for (_, cont) in self.pending { - cont.resume(throwing: ControlChannelError.disconnected) - } - self.pending.removeAll() - self.state = .disconnected + func configure(mode: Mode) async throws { + // Mode is retained for API compatibility; transport is always stdio now. + try await self.configure() } func health(timeout: TimeInterval? = nil) async throws -> Data { - try await self.ensureConnected() - let start = Date() - self.logger.debug("health probe start timeout=\(timeout?.description ?? "nil", privacy: .public)") - let payload = try await self.request( - method: "health", - params: timeout.map { ["timeoutMs": Int($0 * 1000)] }, - timeout: timeout.map { $0 + 1 } // small cushion over server-side timeout - ) - let ms = Int(Date().timeIntervalSince(start) * 1000) - self.logger.debug("health probe ok in \(ms)ms") - return payload + let params = timeout.map { ["timeoutMs": Int($0 * 1000)] } + do { + let start = Date() + let payload = try await AgentRPC.shared.controlRequest(method: "health", params: params) + let ms = Date().timeIntervalSince(start) * 1000 + self.lastPingMs = ms + self.state = .connected + return payload + } catch { + self.state = .degraded(error.localizedDescription) + throw error + } } func lastHeartbeat() async throws -> ControlHeartbeatEvent? { - try await self.ensureConnected() - let data = try await self.request(method: "last-heartbeat") + let data = try await AgentRPC.shared.controlRequest(method: "last-heartbeat") if data.isEmpty { return nil } return try? JSONDecoder().decode(ControlHeartbeatEvent.self, from: data) } + func request(method: String, params: [String: Any]? = nil) async throws -> Data { + do { + let data = try await AgentRPC.shared.controlRequest(method: method, params: params) + self.state = .connected + return data + } catch { + self.state = .degraded(error.localizedDescription) + throw error + } + } + func sendSystemEvent(_ text: String) async throws { - let trimmed = text.trimmingCharacters(in: .whitespacesAndNewlines) - guard !trimmed.isEmpty else { return } - _ = try await self.request(method: "system-event", params: ["text": trimmed], timeout: 5) - } - - func request(method: String, params: [String: Any]? = nil, timeout: TimeInterval? = nil) async throws -> Data { - try await self.ensureConnected() - let id = UUID().uuidString - var frame: [String: Any] = ["type": "request", "id": id, "method": method] - if let params { frame["params"] = params } - let data = try JSONSerialization.data(withJSONObject: frame) - try await self.send(data) - return try await withCheckedThrowingContinuation { (cont: CheckedContinuation) in - self.pending[id] = cont - if let timeout { - Task { [weak self] in - try? await Task.sleep(nanoseconds: UInt64(timeout * 1_000_000_000)) - guard let self else { return } - if let pending = self.pending.removeValue(forKey: id) { - self.logger.error("control request \(method) timed out after \(Int(timeout))s") - pending.resume(throwing: ControlChannelError.badResponse("timeout after \(Int(timeout))s")) - } - } - } - } - } - - private func ensureConnected() async throws { - if let conn = self.connection { - switch conn.state { - case .ready: return - default: break - } - } - try await self.connect() - } - - private func connect() async throws { - switch self.mode { - case .local: - self.localPort = 18789 - case let .remote(target, identity): - self.localPort = try self.startSSHTunnel(target: target, identity: identity) - } - - self.state = .connecting - - let host = NWEndpoint.Host("127.0.0.1") - let port = NWEndpoint.Port(rawValue: self.localPort)! - let conn = NWConnection(host: host, port: port, using: .tcp) - self.connection = conn - - let waiter = ConnectionWaiter() - - conn.stateUpdateHandler = { [weak self, weak conn] state in - switch state { - case .ready: - Task { @MainActor in self?.state = .connected } - Task { - await waiter.succeed() - conn?.stateUpdateHandler = nil - } - case let .failed(err): - Task { @MainActor in self?.state = .degraded(err.localizedDescription) } - Task { - await waiter.fail(err) - conn?.stateUpdateHandler = nil - } - case let .waiting(err): - Task { @MainActor in self?.state = .degraded(err.localizedDescription) } - Task { - await waiter.fail(err) - conn?.stateUpdateHandler = nil - } - default: - break - } - } - - conn.start(queue: .global()) - try await waiter.wait() - - self.listenTask = Task.detached { [weak self] in - await self?.listen() - } - - self.pingTask = Task.detached { [weak self] in - guard let self else { return } - while !Task.isCancelled { - do { - try await Task.sleep(nanoseconds: 30 * 1_000_000_000) - let start = Date() - _ = try await self.request(method: "ping") - let ms = Date().timeIntervalSince(start) * 1000 - await MainActor.run { self.lastPingMs = ms; self.state = .connected } - } catch { - await MainActor.run { self.state = .degraded(error.localizedDescription) } - } - } - } - } - - private func startSSHTunnel(target: String, identity: String) throws -> UInt16 { - let localPort = Self.pickAvailablePort() - let proc = Process() - proc.executableURL = URL(fileURLWithPath: "/usr/bin/ssh") - var args: [String] = [ - "-o", "BatchMode=yes", - "-o", "ExitOnForwardFailure=yes", - "-N", // don't run a remote shell; keep the tunnel open - "-T", // no pseudo-tty - "-L", "\(localPort):127.0.0.1:18789", - target, - ] - if !identity.trimmingCharacters(in: .whitespacesAndNewlines).isEmpty { - args.insert(contentsOf: ["-i", identity], at: 2) - } - proc.arguments = args - proc.standardInput = nil - let outPipe = Pipe() - let errPipe = Pipe() - proc.standardOutput = outPipe - proc.standardError = errPipe - try proc.run() - // Give ssh a brief moment; if it exits immediately we surface stderr instead of silently failing. - Thread.sleep(forTimeInterval: 0.2) // 200ms - if !proc.isRunning { - let err = String(data: errPipe.fileHandleForReading.readDataToEndOfFile(), encoding: .utf8)?.trimmingCharacters(in: .whitespacesAndNewlines) - throw ControlChannelError.sshFailed(err ?? "ssh exited") - } - self.sshProcess = proc - return localPort - } - - private func send(_ data: Data) async throws { - guard let conn = self.connection else { throw ControlChannelError.disconnected } - let line = data + Data([0x0A]) - try await withCheckedThrowingContinuation { (cont: CheckedContinuation) in - conn.send(content: line, completion: .contentProcessed { error in - if let error { cont.resume(throwing: error) } - else { cont.resume(returning: ()) } - }) - } - } - - private func listen() async { - guard let conn = self.connection else { return } - while true { - let result: (Data?, Bool, NWError?) = await withCheckedContinuation { cont in - conn.receiveMessage { data, _, isComplete, error in - cont.resume(returning: (data, isComplete, error)) - } - } - - let (data, isComplete, error) = result - if let error { - self.logger.debug("control receive error: \(error.localizedDescription, privacy: .public)") - break - } - if isComplete { - self.logger.debug("control receive complete") - break - } - guard let data else { continue } - self.buffer.append(data) - while let range = buffer.firstRange(of: Data([0x0A])) { - let lineData = buffer.subdata(in: buffer.startIndex.. = ["started", "streaming", "running", "queued", "waiting"] - let working = self.jobStates.values.contains { workingStates.contains($0) } - Task { @MainActor in - AppStateStore.shared.setWorking(working) - WorkActivityStore.shared.handleJob( - sessionKey: event.runId, - state: state) - } - } - } else if event.stream == "tool" { - guard let phase = event.data["phase"]?.value as? String else { return } - let name = event.data["name"]?.value as? String - let meta = event.data["meta"]?.value as? String - let args = event.data["args"]?.value as? [String: AnyCodable] - Task { @MainActor in - WorkActivityStore.shared.handleTool( - sessionKey: event.runId, - phase: phase, - name: name, - meta: meta, - args: args) - } - } - } - - private static func pickAvailablePort() -> UInt16 { - var port: UInt16 = 0 - let socket = socket(AF_INET, SOCK_STREAM, 0) - defer { close(socket) } - var addr = sockaddr_in() - addr.sin_family = sa_family_t(AF_INET) - addr.sin_port = in_port_t(0).bigEndian - addr.sin_addr = in_addr(s_addr: inet_addr("127.0.0.1")) - _ = withUnsafePointer(to: &addr) { - $0.withMemoryRebound(to: sockaddr.self, capacity: 1) { - bind(socket, $0, socklen_t(MemoryLayout.size)) - } - } - var len = socklen_t(MemoryLayout.size) - getsockname(socket, withUnsafeMutablePointer(to: &addr) { - $0.withMemoryRebound(to: sockaddr.self, capacity: 1) { $0 } - }, &len) - // Asking the kernel for port 0 yields an ephemeral free port; reuse it for the SSH tunnel. - port = UInt16(bigEndian: addr.sin_port) - return port + _ = try await self.request(method: "system-event", params: ["text": text]) } } extension Notification.Name { static let controlHeartbeat = Notification.Name("clawdis.control.heartbeat") + static let controlAgentEvent = Notification.Name("clawdis.control.agent") } diff --git a/src/cli/program.ts b/src/cli/program.ts index 0e5f1a99d..71167fc90 100644 --- a/src/cli/program.ts +++ b/src/cli/program.ts @@ -1,10 +1,10 @@ import chalk from "chalk"; import { Command } from "commander"; import { agentCommand } from "../commands/agent.js"; -import { healthCommand } from "../commands/health.js"; +import { getHealthSnapshot, healthCommand, type HealthSummary } from "../commands/health.js"; import { sendCommand } from "../commands/send.js"; import { sessionsCommand } from "../commands/sessions.js"; -import { statusCommand } from "../commands/status.js"; +import { getStatusSummary, statusCommand, type StatusSummary } from "../commands/status.js"; import { loadConfig } from "../config/config.js"; import { danger, info, setVerbose } from "../globals.js"; import { startControlChannel } from "../infra/control-channel.js"; @@ -12,6 +12,7 @@ import { getLastHeartbeatEvent, onHeartbeatEvent, } from "../infra/heartbeat-events.js"; +import { onAgentEvent } from "../infra/agent-events.js"; import { getResolvedLoggerSettings } from "../logging.js"; import { loginWeb, @@ -33,6 +34,12 @@ import { startWebChatServer, } from "../webchat/server.js"; import { createDefaultDeps, logWebSelfId } from "./deps.js"; +import { onAgentEvent } from "../infra/agent-events.js"; +import { + enqueueSystemEvent, + listSystemPresence, + updateSystemPresence, +} from "../infra/system-presence.js"; export function buildProgram() { const program = new Command(); @@ -249,10 +256,14 @@ Examples: const forwardHeartbeat = (payload: unknown) => { respond({ type: "event", event: "heartbeat", payload }); }; + const forwardAgent = (payload: unknown) => { + respond({ type: "event", event: "agent", payload }); + }; const latest = getLastHeartbeatEvent(); if (latest) forwardHeartbeat(latest); const stopBus = onHeartbeatEvent(forwardHeartbeat); + const stopAgentBus = onAgentEvent(forwardAgent); rl.on("line", async (line: string) => { if (!line.trim()) return; @@ -267,6 +278,52 @@ Examples: respond({ type: "result", ok: true }); return; } + if (cmd.type === "control-request" && cmd.id && cmd.method) { + const id = String(cmd.id); + const method = String(cmd.method); + const params = (cmd.params ?? {}) as Record; + const controlRespond = (ok: boolean, payload?: unknown, error?: string) => + respond({ type: "control-response", id, ok, payload, error }); + try { + if (method === "health") { + const timeoutMs = typeof params.timeoutMs === "number" ? params.timeoutMs : undefined; + const payload = await getHealthSnapshot(timeoutMs); + controlRespond(true, payload satisfies HealthSummary); + return; + } + if (method === "status") { + const payload = await getStatusSummary(); + controlRespond(true, payload satisfies StatusSummary); + return; + } + if (method === "last-heartbeat") { + controlRespond(true, getLastHeartbeatEvent()); + return; + } + if (method === "set-heartbeats") { + setHeartbeatsEnabled(Boolean(params.enabled)); + controlRespond(true, { ok: true }); + return; + } + if (method === "system-event") { + const text = String(params.text ?? "").trim(); + if (text) { + enqueueSystemEvent(text); + updateSystemPresence(text); + } + controlRespond(true, { ok: true }); + return; + } + if (method === "system-presence") { + controlRespond(true, listSystemPresence()); + return; + } + controlRespond(false, undefined, `unknown control method: ${method}`); + } catch (err) { + controlRespond(false, undefined, String(err)); + } + return; + } if (cmd.type !== "send" || !cmd.text) { respond({ type: "error", error: "unsupported command" }); return; @@ -326,6 +383,7 @@ Examples: await new Promise(() => {}); stopBus(); + stopAgentBus(); }); program