diff --git a/apps/macos/Sources/Clawdis/ControlChannel.swift b/apps/macos/Sources/Clawdis/ControlChannel.swift index b24043c36..d8921ef7d 100644 --- a/apps/macos/Sources/Clawdis/ControlChannel.swift +++ b/apps/macos/Sources/Clawdis/ControlChannel.swift @@ -13,6 +13,62 @@ struct ControlHeartbeatEvent: Codable { let reason: String? } +struct ControlAgentEvent: Codable { + let runId: String + let seq: Int + let stream: String + let ts: Double + let data: [String: AnyCodable] +} + +struct AnyCodable: Codable { + let value: Any + + init(_ value: Any) { self.value = value } + + init(from decoder: Decoder) throws { + let container = try decoder.singleValueContainer() + if let intVal = try? container.decode(Int.self) { + self.value = intVal; return + } + if let doubleVal = try? container.decode(Double.self) { + self.value = doubleVal; return + } + if let boolVal = try? container.decode(Bool.self) { + self.value = boolVal; return + } + if let stringVal = try? container.decode(String.self) { + self.value = stringVal; return + } + if container.decodeNil() { + self.value = NSNull(); return + } + if let dict = try? container.decode([String: AnyCodable].self) { + self.value = dict; return + } + if let array = try? container.decode([AnyCodable].self) { + self.value = array; return + } + throw DecodingError.dataCorruptedError(in: container, debugDescription: "Unsupported type") + } + + func encode(to encoder: Encoder) throws { + var container = encoder.singleValueContainer() + switch self.value { + case let intVal as Int: try container.encode(intVal) + case let doubleVal as Double: try container.encode(doubleVal) + case let boolVal as Bool: try container.encode(boolVal) + case let stringVal as String: try container.encode(stringVal) + case is NSNull: try container.encodeNil() + case let dict as [String: AnyCodable]: try container.encode(dict) + case let array as [AnyCodable]: try container.encode(array) + default: + let context = EncodingError.Context(codingPath: encoder.codingPath, debugDescription: "Unsupported type") + throw EncodingError.invalidValue(self.value, context) + } + } +} + // Handles single-shot continuation resumption without Sendable capture issues actor ConnectionWaiter { private var cont: CheckedContinuation? @@ -349,6 +405,11 @@ final class ControlChannel: ObservableObject { if let payloadData = try? JSONSerialization.data(withJSONObject: payload) { NotificationCenter.default.post(name: .controlHeartbeat, object: payloadData) } + } else if event == "agent", let payload = obj["payload"] { + if let payloadData = try? JSONSerialization.data(withJSONObject: payload), + let agent = try? JSONDecoder().decode(ControlAgentEvent.self, from: payloadData) { + self.handleAgentEvent(agent) + } } return } @@ -366,6 +427,17 @@ final class ControlChannel: ObservableObject { } } + private func handleAgentEvent(_ event: ControlAgentEvent) { + if event.stream == "job" { + if let state = event.data["state"]?.value as? String { + let working = state.lowercased() == "started" || state.lowercased() == "streaming" + Task { @MainActor in + AppStateStore.shared.setWorking(working) + } + } + } + } + private static func pickAvailablePort() -> UInt16 { var port: UInt16 = 0 let socket = socket(AF_INET, SOCK_STREAM, 0) diff --git a/src/commands/agent.ts b/src/commands/agent.ts index 90b349c5b..ad534407a 100644 --- a/src/commands/agent.ts +++ b/src/commands/agent.ts @@ -25,6 +25,7 @@ import { runCommandWithTimeout } from "../process/exec.js"; import { defaultRuntime, type RuntimeEnv } from "../runtime.js"; import { normalizeE164 } from "../utils.js"; import { sendViaIpc } from "../web/ipc.js"; +import { emitAgentEvent } from "../infra/agent-events.js"; type AgentCommandOpts = { message: string; @@ -293,20 +294,58 @@ export async function agentCommand( BodyStripped: commandBody, }; - const result = await runCommandReply({ - reply: { ...replyCfg, mode: "command" }, - templatingCtx, - sendSystemOnce, - isNewSession, - isFirstTurnInSession, - systemSent, - timeoutMs, - timeoutSeconds, - commandRunner: runCommandWithTimeout, - thinkLevel: resolvedThinkLevel, - verboseLevel: resolvedVerboseLevel, + const startedAt = Date.now(); + emitAgentEvent({ + runId: sessionId, + stream: "job", + data: { + state: "started", + to: opts.to, + sessionId, + isNewSession, + }, }); + let result; + try { + result = await runCommandReply({ + reply: { ...replyCfg, mode: "command" }, + templatingCtx, + sendSystemOnce, + isNewSession, + isFirstTurnInSession, + systemSent, + timeoutMs, + timeoutSeconds, + commandRunner: runCommandWithTimeout, + thinkLevel: resolvedThinkLevel, + verboseLevel: resolvedVerboseLevel, + }); + emitAgentEvent({ + runId: sessionId, + stream: "job", + data: { + state: "done", + to: opts.to, + sessionId, + durationMs: Date.now() - startedAt, + }, + }); + } catch (err) { + emitAgentEvent({ + runId: sessionId, + stream: "job", + data: { + state: "error", + to: opts.to, + sessionId, + durationMs: Date.now() - startedAt, + error: String(err), + }, + }); + throw err; + } + // If the agent returned a new session id, persist it. const returnedSessionId = result.meta.agentMeta?.sessionId; if ( diff --git a/src/infra/agent-events.ts b/src/infra/agent-events.ts new file mode 100644 index 000000000..3a0f447d1 --- /dev/null +++ b/src/infra/agent-events.ts @@ -0,0 +1,31 @@ +export type AgentEventPayload = { + runId: string; + seq: number; + stream: "job" | "tool" | string; + ts: number; + data: Record; +}; + +let seq = 0; +const listeners = new Set<(evt: AgentEventPayload) => void>(); + +export function emitAgentEvent(event: Omit) { + const enriched: AgentEventPayload = { + ...event, + seq: ++seq, + ts: Date.now(), + }; + for (const listener of listeners) { + try { + listener(enriched); + } catch { + /* ignore */ + } + } +} + +export function onAgentEvent(listener: (evt: AgentEventPayload) => void) { + listeners.add(listener); + return () => listeners.delete(listener); +} + diff --git a/src/infra/control-channel.ts b/src/infra/control-channel.ts index bcba6fc38..658add6dc 100644 --- a/src/infra/control-channel.ts +++ b/src/infra/control-channel.ts @@ -9,6 +9,7 @@ import { type HeartbeatEventPayload, onHeartbeatEvent, } from "./heartbeat-events.js"; +import { onAgentEvent, type AgentEventPayload } from "./agent-events.js"; type ControlRequest = { type: "request"; @@ -38,6 +39,7 @@ type Handlers = { type ControlServer = { close: () => Promise; broadcastHeartbeat: (evt: HeartbeatEventPayload) => void; + broadcastAgentEvent: (evt: AgentEventPayload) => void; }; const DEFAULT_PORT = 18789; @@ -91,6 +93,7 @@ export async function startControlChannel( }); const stopHeartbeat = onHeartbeatEvent((evt) => broadcast("heartbeat", evt)); + const stopAgent = onAgentEvent((evt) => broadcast("agent", evt)); const handleLine = async (socket: net.Socket, line: string) => { if (!line) return; @@ -184,6 +187,7 @@ export async function startControlChannel( return { close: async () => { stopHeartbeat(); + stopAgent(); await new Promise((resolve) => server.close(() => resolve())); for (const client of [...clients]) { client.destroy(); @@ -194,5 +198,8 @@ export async function startControlChannel( emitHeartbeatEvent(evt); broadcast("heartbeat", evt); }, + broadcastAgentEvent: (evt: AgentEventPayload) => { + broadcast("agent", evt); + }, }; }