feat: broadcast agent events over control channel

This commit is contained in:
Peter Steinberger
2025-12-09 00:28:03 +01:00
parent 3114dfd39b
commit 40dd23337c
4 changed files with 161 additions and 12 deletions

View File

@@ -13,6 +13,62 @@ struct ControlHeartbeatEvent: Codable {
let reason: String?
}
struct ControlAgentEvent: Codable {
let runId: String
let seq: Int
let stream: String
let ts: Double
let data: [String: AnyCodable]
}
struct AnyCodable: Codable {
let value: Any
init(_ value: Any) { self.value = value }
init(from decoder: Decoder) throws {
let container = try decoder.singleValueContainer()
if let intVal = try? container.decode(Int.self) {
self.value = intVal; return
}
if let doubleVal = try? container.decode(Double.self) {
self.value = doubleVal; return
}
if let boolVal = try? container.decode(Bool.self) {
self.value = boolVal; return
}
if let stringVal = try? container.decode(String.self) {
self.value = stringVal; return
}
if container.decodeNil() {
self.value = NSNull(); return
}
if let dict = try? container.decode([String: AnyCodable].self) {
self.value = dict; return
}
if let array = try? container.decode([AnyCodable].self) {
self.value = array; return
}
throw DecodingError.dataCorruptedError(in: container, debugDescription: "Unsupported type")
}
func encode(to encoder: Encoder) throws {
var container = encoder.singleValueContainer()
switch self.value {
case let intVal as Int: try container.encode(intVal)
case let doubleVal as Double: try container.encode(doubleVal)
case let boolVal as Bool: try container.encode(boolVal)
case let stringVal as String: try container.encode(stringVal)
case is NSNull: try container.encodeNil()
case let dict as [String: AnyCodable]: try container.encode(dict)
case let array as [AnyCodable]: try container.encode(array)
default:
let context = EncodingError.Context(codingPath: encoder.codingPath, debugDescription: "Unsupported type")
throw EncodingError.invalidValue(self.value, context)
}
}
}
// Handles single-shot continuation resumption without Sendable capture issues
actor ConnectionWaiter {
private var cont: CheckedContinuation<Void, Error>?
@@ -349,6 +405,11 @@ final class ControlChannel: ObservableObject {
if let payloadData = try? JSONSerialization.data(withJSONObject: payload) {
NotificationCenter.default.post(name: .controlHeartbeat, object: payloadData)
}
} else if event == "agent", let payload = obj["payload"] {
if let payloadData = try? JSONSerialization.data(withJSONObject: payload),
let agent = try? JSONDecoder().decode(ControlAgentEvent.self, from: payloadData) {
self.handleAgentEvent(agent)
}
}
return
}
@@ -366,6 +427,17 @@ final class ControlChannel: ObservableObject {
}
}
private func handleAgentEvent(_ event: ControlAgentEvent) {
if event.stream == "job" {
if let state = event.data["state"]?.value as? String {
let working = state.lowercased() == "started" || state.lowercased() == "streaming"
Task { @MainActor in
AppStateStore.shared.setWorking(working)
}
}
}
}
private static func pickAvailablePort() -> UInt16 {
var port: UInt16 = 0
let socket = socket(AF_INET, SOCK_STREAM, 0)

View File

@@ -25,6 +25,7 @@ import { runCommandWithTimeout } from "../process/exec.js";
import { defaultRuntime, type RuntimeEnv } from "../runtime.js";
import { normalizeE164 } from "../utils.js";
import { sendViaIpc } from "../web/ipc.js";
import { emitAgentEvent } from "../infra/agent-events.js";
type AgentCommandOpts = {
message: string;
@@ -293,20 +294,58 @@ export async function agentCommand(
BodyStripped: commandBody,
};
const result = await runCommandReply({
reply: { ...replyCfg, mode: "command" },
templatingCtx,
sendSystemOnce,
isNewSession,
isFirstTurnInSession,
systemSent,
timeoutMs,
timeoutSeconds,
commandRunner: runCommandWithTimeout,
thinkLevel: resolvedThinkLevel,
verboseLevel: resolvedVerboseLevel,
const startedAt = Date.now();
emitAgentEvent({
runId: sessionId,
stream: "job",
data: {
state: "started",
to: opts.to,
sessionId,
isNewSession,
},
});
let result;
try {
result = await runCommandReply({
reply: { ...replyCfg, mode: "command" },
templatingCtx,
sendSystemOnce,
isNewSession,
isFirstTurnInSession,
systemSent,
timeoutMs,
timeoutSeconds,
commandRunner: runCommandWithTimeout,
thinkLevel: resolvedThinkLevel,
verboseLevel: resolvedVerboseLevel,
});
emitAgentEvent({
runId: sessionId,
stream: "job",
data: {
state: "done",
to: opts.to,
sessionId,
durationMs: Date.now() - startedAt,
},
});
} catch (err) {
emitAgentEvent({
runId: sessionId,
stream: "job",
data: {
state: "error",
to: opts.to,
sessionId,
durationMs: Date.now() - startedAt,
error: String(err),
},
});
throw err;
}
// If the agent returned a new session id, persist it.
const returnedSessionId = result.meta.agentMeta?.sessionId;
if (

31
src/infra/agent-events.ts Normal file
View File

@@ -0,0 +1,31 @@
export type AgentEventPayload = {
runId: string;
seq: number;
stream: "job" | "tool" | string;
ts: number;
data: Record<string, unknown>;
};
let seq = 0;
const listeners = new Set<(evt: AgentEventPayload) => void>();
export function emitAgentEvent(event: Omit<AgentEventPayload, "seq" | "ts">) {
const enriched: AgentEventPayload = {
...event,
seq: ++seq,
ts: Date.now(),
};
for (const listener of listeners) {
try {
listener(enriched);
} catch {
/* ignore */
}
}
}
export function onAgentEvent(listener: (evt: AgentEventPayload) => void) {
listeners.add(listener);
return () => listeners.delete(listener);
}

View File

@@ -9,6 +9,7 @@ import {
type HeartbeatEventPayload,
onHeartbeatEvent,
} from "./heartbeat-events.js";
import { onAgentEvent, type AgentEventPayload } from "./agent-events.js";
type ControlRequest = {
type: "request";
@@ -38,6 +39,7 @@ type Handlers = {
type ControlServer = {
close: () => Promise<void>;
broadcastHeartbeat: (evt: HeartbeatEventPayload) => void;
broadcastAgentEvent: (evt: AgentEventPayload) => void;
};
const DEFAULT_PORT = 18789;
@@ -91,6 +93,7 @@ export async function startControlChannel(
});
const stopHeartbeat = onHeartbeatEvent((evt) => broadcast("heartbeat", evt));
const stopAgent = onAgentEvent((evt) => broadcast("agent", evt));
const handleLine = async (socket: net.Socket, line: string) => {
if (!line) return;
@@ -184,6 +187,7 @@ export async function startControlChannel(
return {
close: async () => {
stopHeartbeat();
stopAgent();
await new Promise<void>((resolve) => server.close(() => resolve()));
for (const client of [...clients]) {
client.destroy();
@@ -194,5 +198,8 @@ export async function startControlChannel(
emitHeartbeatEvent(evt);
broadcast("heartbeat", evt);
},
broadcastAgentEvent: (evt: AgentEventPayload) => {
broadcast("agent", evt);
},
};
}