chore: single-source working state from agent events
This commit is contained in:
@@ -14,12 +14,6 @@ actor AgentRPC {
|
|||||||
let reason: String?
|
let reason: String?
|
||||||
}
|
}
|
||||||
|
|
||||||
struct JobStateEvent: Codable {
|
|
||||||
let id: String
|
|
||||||
let state: String
|
|
||||||
let durationMs: Double?
|
|
||||||
}
|
|
||||||
|
|
||||||
static let heartbeatNotification = Notification.Name("clawdis.rpc.heartbeat")
|
static let heartbeatNotification = Notification.Name("clawdis.rpc.heartbeat")
|
||||||
|
|
||||||
private var process: Process?
|
private var process: Process?
|
||||||
@@ -202,8 +196,6 @@ actor AgentRPC {
|
|||||||
}
|
}
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if self.parseJobStateEvent(from: line) != nil { continue }
|
|
||||||
|
|
||||||
if let waiter = waiters.first {
|
if let waiter = waiters.first {
|
||||||
self.waiters.removeFirst()
|
self.waiters.removeFirst()
|
||||||
waiter.resume(returning: line)
|
waiter.resume(returning: line)
|
||||||
@@ -229,24 +221,6 @@ actor AgentRPC {
|
|||||||
return try? decoder.decode(HeartbeatEvent.self, from: payloadData)
|
return try? decoder.decode(HeartbeatEvent.self, from: payloadData)
|
||||||
}
|
}
|
||||||
|
|
||||||
private func parseJobStateEvent(from line: String) -> JobStateEvent? {
|
|
||||||
guard let data = line.data(using: .utf8) else { return nil }
|
|
||||||
guard
|
|
||||||
let obj = try? JSONSerialization.jsonObject(with: data) as? [String: Any],
|
|
||||||
let type = obj["type"] as? String,
|
|
||||||
type == "event",
|
|
||||||
let evt = obj["event"] as? String,
|
|
||||||
evt == "job-state",
|
|
||||||
let payload = obj["payload"] as? [String: Any]
|
|
||||||
else {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
let decoder = JSONDecoder()
|
|
||||||
guard let payloadData = try? JSONSerialization.data(withJSONObject: payload) else { return nil }
|
|
||||||
return try? decoder.decode(JobStateEvent.self, from: payloadData)
|
|
||||||
}
|
|
||||||
|
|
||||||
private func nextLine() async throws -> String {
|
private func nextLine() async throws -> String {
|
||||||
try await withCheckedThrowingContinuation { (cont: CheckedContinuation<String, Error>) in
|
try await withCheckedThrowingContinuation { (cont: CheckedContinuation<String, Error>) in
|
||||||
self.waiters.append(cont)
|
self.waiters.append(cont)
|
||||||
|
|||||||
@@ -190,7 +190,7 @@ final class ControlChannel: ObservableObject {
|
|||||||
private var mode: Mode = .local
|
private var mode: Mode = .local
|
||||||
private var localPort: UInt16 = 18789
|
private var localPort: UInt16 = 18789
|
||||||
private var pingTask: Task<Void, Never>?
|
private var pingTask: Task<Void, Never>?
|
||||||
private var activeJobs: Int = 0
|
private var jobStates: [String: String] = [:]
|
||||||
|
|
||||||
@Published private(set) var state: ConnectionState = .disconnected
|
@Published private(set) var state: ConnectionState = .disconnected
|
||||||
@Published private(set) var lastPingMs: Double?
|
@Published private(set) var lastPingMs: Double?
|
||||||
@@ -208,13 +208,7 @@ final class ControlChannel: ObservableObject {
|
|||||||
{ note in
|
{ note in
|
||||||
if let evt = note.object as? ControlAgentEvent {
|
if let evt = note.object as? ControlAgentEvent {
|
||||||
DispatchQueue.main.async { @MainActor in
|
DispatchQueue.main.async { @MainActor in
|
||||||
let payload = ControlAgentEvent(
|
AgentEventStore.shared.append(evt)
|
||||||
runId: evt.runId,
|
|
||||||
seq: evt.seq,
|
|
||||||
stream: evt.stream,
|
|
||||||
ts: evt.ts,
|
|
||||||
data: evt.data.mapValues { AnyCodable($0.value) })
|
|
||||||
AgentEventStore.shared.append(payload)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -459,15 +453,15 @@ final class ControlChannel: ObservableObject {
|
|||||||
private func handleAgentEvent(_ event: ControlAgentEvent) {
|
private func handleAgentEvent(_ event: ControlAgentEvent) {
|
||||||
if event.stream == "job" {
|
if event.stream == "job" {
|
||||||
if let state = event.data["state"]?.value as? String {
|
if let state = event.data["state"]?.value as? String {
|
||||||
switch state.lowercased() {
|
let normalized = state.lowercased()
|
||||||
case "started", "streaming":
|
if normalized == "done" || normalized == "error" {
|
||||||
self.activeJobs &+= 1
|
self.jobStates.removeValue(forKey: event.runId)
|
||||||
case "done", "error":
|
} else {
|
||||||
self.activeJobs = max(0, self.activeJobs - 1)
|
self.jobStates[event.runId] = normalized
|
||||||
default:
|
|
||||||
break
|
|
||||||
}
|
}
|
||||||
let working = self.activeJobs > 0
|
|
||||||
|
let workingStates: Set<String> = ["started", "streaming", "running", "queued", "waiting"]
|
||||||
|
let working = self.jobStates.values.contains { workingStates.contains($0) }
|
||||||
Task { @MainActor in
|
Task { @MainActor in
|
||||||
AppStateStore.shared.setWorking(working)
|
AppStateStore.shared.setWorking(working)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,3 +1,4 @@
|
|||||||
|
import Darwin
|
||||||
import Foundation
|
import Foundation
|
||||||
import Testing
|
import Testing
|
||||||
@testable import Clawdis
|
@testable import Clawdis
|
||||||
@@ -35,8 +36,20 @@ import Testing
|
|||||||
let nodePath = tmp.appendingPathComponent("node_modules/.bin/node")
|
let nodePath = tmp.appendingPathComponent("node_modules/.bin/node")
|
||||||
let scriptPath = tmp.appendingPathComponent("bin/clawdis.js")
|
let scriptPath = tmp.appendingPathComponent("bin/clawdis.js")
|
||||||
try makeExec(at: nodePath)
|
try makeExec(at: nodePath)
|
||||||
|
try "#!/bin/sh\necho v22.0.0\n".write(to: nodePath, atomically: true, encoding: .utf8)
|
||||||
|
try FileManager.default.setAttributes([.posixPermissions: 0o755], ofItemAtPath: nodePath.path)
|
||||||
try makeExec(at: scriptPath)
|
try makeExec(at: scriptPath)
|
||||||
|
|
||||||
|
let previous = getenv("CLAWDIS_RUNTIME").flatMap { String(validatingCString: $0) }
|
||||||
|
setenv("CLAWDIS_RUNTIME", "node", 1)
|
||||||
|
defer {
|
||||||
|
if let previous {
|
||||||
|
setenv("CLAWDIS_RUNTIME", previous, 1)
|
||||||
|
} else {
|
||||||
|
unsetenv("CLAWDIS_RUNTIME")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
let cmd = CommandResolver.clawdisCommand(subcommand: "rpc")
|
let cmd = CommandResolver.clawdisCommand(subcommand: "rpc")
|
||||||
|
|
||||||
#expect(cmd.count >= 3)
|
#expect(cmd.count >= 3)
|
||||||
|
|||||||
@@ -1,4 +1,3 @@
|
|||||||
import { randomUUID } from "node:crypto";
|
|
||||||
import chalk from "chalk";
|
import chalk from "chalk";
|
||||||
import { Command } from "commander";
|
import { Command } from "commander";
|
||||||
import { agentCommand } from "../commands/agent.js";
|
import { agentCommand } from "../commands/agent.js";
|
||||||
@@ -273,14 +272,6 @@ Examples:
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
const jobId = cmd.jobId ? String(cmd.jobId) : randomUUID();
|
|
||||||
const startedAt = Date.now();
|
|
||||||
respond({
|
|
||||||
type: "event",
|
|
||||||
event: "job-state",
|
|
||||||
payload: { id: jobId, state: "started", startedAt },
|
|
||||||
});
|
|
||||||
|
|
||||||
const logs: string[] = [];
|
const logs: string[] = [];
|
||||||
const runtime: RuntimeEnv = {
|
const runtime: RuntimeEnv = {
|
||||||
log: (msg: string) => logs.push(String(msg)),
|
log: (msg: string) => logs.push(String(msg)),
|
||||||
@@ -308,31 +299,9 @@ Examples:
|
|||||||
|
|
||||||
try {
|
try {
|
||||||
await agentCommand(opts, runtime, createDefaultDeps());
|
await agentCommand(opts, runtime, createDefaultDeps());
|
||||||
const endedAt = Date.now();
|
|
||||||
respond({
|
|
||||||
type: "event",
|
|
||||||
event: "job-state",
|
|
||||||
payload: {
|
|
||||||
id: jobId,
|
|
||||||
state: "done",
|
|
||||||
durationMs: endedAt - startedAt,
|
|
||||||
endedAt,
|
|
||||||
},
|
|
||||||
});
|
|
||||||
const payload = extractPayload(logs);
|
const payload = extractPayload(logs);
|
||||||
respond({ type: "result", ok: true, payload });
|
respond({ type: "result", ok: true, payload });
|
||||||
} catch (err) {
|
} catch (err) {
|
||||||
const endedAt = Date.now();
|
|
||||||
respond({
|
|
||||||
type: "event",
|
|
||||||
event: "job-state",
|
|
||||||
payload: {
|
|
||||||
id: jobId,
|
|
||||||
state: "error",
|
|
||||||
durationMs: endedAt - startedAt,
|
|
||||||
endedAt,
|
|
||||||
},
|
|
||||||
});
|
|
||||||
respond({ type: "error", error: String(err) });
|
respond({ type: "error", error: String(err) });
|
||||||
}
|
}
|
||||||
} catch (err) {
|
} catch (err) {
|
||||||
|
|||||||
@@ -300,6 +300,7 @@ export async function agentCommand(
|
|||||||
stream: "job",
|
stream: "job",
|
||||||
data: {
|
data: {
|
||||||
state: "started",
|
state: "started",
|
||||||
|
startedAt,
|
||||||
to: opts.to,
|
to: opts.to,
|
||||||
sessionId,
|
sessionId,
|
||||||
isNewSession,
|
isNewSession,
|
||||||
@@ -327,6 +328,8 @@ export async function agentCommand(
|
|||||||
stream: "job",
|
stream: "job",
|
||||||
data: {
|
data: {
|
||||||
state: "done",
|
state: "done",
|
||||||
|
startedAt,
|
||||||
|
endedAt: Date.now(),
|
||||||
to: opts.to,
|
to: opts.to,
|
||||||
sessionId,
|
sessionId,
|
||||||
durationMs: Date.now() - startedAt,
|
durationMs: Date.now() - startedAt,
|
||||||
@@ -338,6 +341,8 @@ export async function agentCommand(
|
|||||||
stream: "job",
|
stream: "job",
|
||||||
data: {
|
data: {
|
||||||
state: "error",
|
state: "error",
|
||||||
|
startedAt,
|
||||||
|
endedAt: Date.now(),
|
||||||
to: opts.to,
|
to: opts.to,
|
||||||
sessionId,
|
sessionId,
|
||||||
durationMs: Date.now() - startedAt,
|
durationMs: Date.now() - startedAt,
|
||||||
|
|||||||
Reference in New Issue
Block a user