diff --git a/apps/macos/Sources/Clawdis/AgentRPC.swift b/apps/macos/Sources/Clawdis/AgentRPC.swift index 30df9da16..2466fe494 100644 --- a/apps/macos/Sources/Clawdis/AgentRPC.swift +++ b/apps/macos/Sources/Clawdis/AgentRPC.swift @@ -14,6 +14,12 @@ actor AgentRPC { let reason: String? } + struct JobStateEvent: Codable { + let id: String + let state: String + let durationMs: Double? + } + static let heartbeatNotification = Notification.Name("clawdis.rpc.heartbeat") private var process: Process? @@ -23,6 +29,7 @@ actor AgentRPC { private var waiters: [CheckedContinuation] = [] private let logger = Logger(subsystem: "com.steipete.clawdis", category: "agent.rpc") private var starting = false + private var activeJobs = 0 private struct RpcError: Error { let message: String } @@ -196,6 +203,10 @@ actor AgentRPC { } continue } + if let jobEvent = self.parseJobStateEvent(from: line) { + Task { await self.updateJobState(jobEvent) } + continue + } if let waiter = waiters.first { self.waiters.removeFirst() @@ -204,6 +215,21 @@ actor AgentRPC { } } + private func updateJobState(_ evt: JobStateEvent) async { + switch evt.state.lowercased() { + case "started", "streaming": + self.activeJobs &+= 1 + case "done", "error": + self.activeJobs = max(0, self.activeJobs - 1) + default: + break + } + let working = self.activeJobs > 0 + await MainActor.run { + AppStateStore.shared.setWorking(working) + } + } + private func parseHeartbeatEvent(from line: String) -> HeartbeatEvent? { guard let data = line.data(using: .utf8) else { return nil } guard @@ -222,6 +248,24 @@ actor AgentRPC { return try? decoder.decode(HeartbeatEvent.self, from: payloadData) } + private func parseJobStateEvent(from line: String) -> JobStateEvent? { + guard let data = line.data(using: .utf8) else { return nil } + guard + let obj = try? JSONSerialization.jsonObject(with: data) as? [String: Any], + let type = obj["type"] as? String, + type == "event", + let evt = obj["event"] as? String, + evt == "job-state", + let payload = obj["payload"] as? [String: Any] + else { + return nil + } + + let decoder = JSONDecoder() + guard let payloadData = try? JSONSerialization.data(withJSONObject: payload) else { return nil } + return try? decoder.decode(JobStateEvent.self, from: payloadData) + } + private func nextLine() async throws -> String { try await withCheckedThrowingContinuation { (cont: CheckedContinuation) in self.waiters.append(cont) diff --git a/src/cli/program.ts b/src/cli/program.ts index 0e5f1a99d..a58b29a87 100644 --- a/src/cli/program.ts +++ b/src/cli/program.ts @@ -1,4 +1,5 @@ import chalk from "chalk"; +import { randomUUID } from "node:crypto"; import { Command } from "commander"; import { agentCommand } from "../commands/agent.js"; import { healthCommand } from "../commands/health.js"; @@ -272,6 +273,14 @@ Examples: return; } + const jobId = cmd.jobId ? String(cmd.jobId) : randomUUID(); + const startedAt = Date.now(); + respond({ + type: "event", + event: "job-state", + payload: { id: jobId, state: "started", startedAt }, + }); + const logs: string[] = []; const runtime: RuntimeEnv = { log: (msg: string) => logs.push(String(msg)), @@ -299,9 +308,31 @@ Examples: try { await agentCommand(opts, runtime, createDefaultDeps()); + const endedAt = Date.now(); + respond({ + type: "event", + event: "job-state", + payload: { + id: jobId, + state: "done", + durationMs: endedAt - startedAt, + endedAt, + }, + }); const payload = extractPayload(logs); respond({ type: "result", ok: true, payload }); } catch (err) { + const endedAt = Date.now(); + respond({ + type: "event", + event: "job-state", + payload: { + id: jobId, + state: "error", + durationMs: endedAt - startedAt, + endedAt, + }, + }); respond({ type: "error", error: String(err) }); } } catch (err) { diff --git a/src/infra/control-channel.test.ts b/src/infra/control-channel.test.ts index f5bd310b3..27d19d338 100644 --- a/src/infra/control-channel.test.ts +++ b/src/infra/control-channel.test.ts @@ -50,11 +50,18 @@ describe("control channel", () => { const frame = { type: "request", id, method, params }; client.write(`${JSON.stringify(frame)}\n`); const onData = (chunk: Buffer) => { - const line = chunk.toString("utf8").trim(); - const parsed = JSON.parse(line) as { id?: string }; - if (parsed.id === id) { - client.off("data", onData); - resolve(parsed as Record); + const lines = chunk.toString("utf8").trim().split(/\n/); + for (const line of lines) { + try { + const parsed = JSON.parse(line) as { id?: string }; + if (parsed.id === id) { + client.off("data", onData); + resolve(parsed as Record); + return; + } + } catch { + /* ignore non-JSON noise */ + } } }; client.on("data", onData);