import type { AgentEventPayload } from "../infra/agent-events.js"; import { formatForLog } from "./ws-log.js"; export type ChatRunEntry = { sessionKey: string; clientRunId: string; }; export type ChatRunRegistry = { add: (sessionId: string, entry: ChatRunEntry) => void; peek: (sessionId: string) => ChatRunEntry | undefined; shift: (sessionId: string) => ChatRunEntry | undefined; remove: ( sessionId: string, clientRunId: string, sessionKey?: string, ) => ChatRunEntry | undefined; clear: () => void; }; export function createChatRunRegistry(): ChatRunRegistry { const chatRunSessions = new Map(); const add = (sessionId: string, entry: ChatRunEntry) => { const queue = chatRunSessions.get(sessionId); if (queue) { queue.push(entry); } else { chatRunSessions.set(sessionId, [entry]); } }; const peek = (sessionId: string) => chatRunSessions.get(sessionId)?.[0]; const shift = (sessionId: string) => { const queue = chatRunSessions.get(sessionId); if (!queue || queue.length === 0) return undefined; const entry = queue.shift(); if (!queue.length) chatRunSessions.delete(sessionId); return entry; }; const remove = ( sessionId: string, clientRunId: string, sessionKey?: string, ) => { const queue = chatRunSessions.get(sessionId); if (!queue || queue.length === 0) return undefined; const idx = queue.findIndex( (entry) => entry.clientRunId === clientRunId && (sessionKey ? entry.sessionKey === sessionKey : true), ); if (idx < 0) return undefined; const [entry] = queue.splice(idx, 1); if (!queue.length) chatRunSessions.delete(sessionId); return entry; }; const clear = () => { chatRunSessions.clear(); }; return { add, peek, shift, remove, clear }; } export type ChatRunState = { registry: ChatRunRegistry; buffers: Map; deltaSentAt: Map; clear: () => void; }; export function createChatRunState(): ChatRunState { const registry = createChatRunRegistry(); const buffers = new Map(); const deltaSentAt = new Map(); const clear = () => { registry.clear(); buffers.clear(); deltaSentAt.clear(); }; return { registry, buffers, deltaSentAt, clear, }; } export type ChatEventBroadcast = ( event: string, payload: unknown, opts?: { dropIfSlow?: boolean }, ) => void; export type BridgeSendToSession = ( sessionKey: string, event: string, payload: unknown, ) => void; export type AgentEventHandlerOptions = { broadcast: ChatEventBroadcast; bridgeSendToSession: BridgeSendToSession; agentRunSeq: Map; chatRunState: ChatRunState; resolveSessionKeyForRun: (runId: string) => string | undefined; clearAgentRunContext: (runId: string) => void; }; export function createAgentEventHandler({ broadcast, bridgeSendToSession, agentRunSeq, chatRunState, resolveSessionKeyForRun, clearAgentRunContext, }: AgentEventHandlerOptions) { const emitChatDelta = ( sessionKey: string, clientRunId: string, seq: number, text: string, ) => { chatRunState.buffers.set(clientRunId, text); const now = Date.now(); const last = chatRunState.deltaSentAt.get(clientRunId) ?? 0; if (now - last < 150) return; chatRunState.deltaSentAt.set(clientRunId, now); const payload = { runId: clientRunId, sessionKey, seq, state: "delta" as const, message: { role: "assistant", content: [{ type: "text", text }], timestamp: now, }, }; broadcast("chat", payload, { dropIfSlow: true }); bridgeSendToSession(sessionKey, "chat", payload); }; const emitChatFinal = ( sessionKey: string, clientRunId: string, seq: number, jobState: "done" | "error", error?: unknown, ) => { const text = chatRunState.buffers.get(clientRunId)?.trim() ?? ""; chatRunState.buffers.delete(clientRunId); chatRunState.deltaSentAt.delete(clientRunId); if (jobState === "done") { const payload = { runId: clientRunId, sessionKey, seq, state: "final" as const, message: text ? { role: "assistant", content: [{ type: "text", text }], timestamp: Date.now(), } : undefined, }; broadcast("chat", payload); bridgeSendToSession(sessionKey, "chat", payload); return; } const payload = { runId: clientRunId, sessionKey, seq, state: "error" as const, errorMessage: error ? formatForLog(error) : undefined, }; broadcast("chat", payload); bridgeSendToSession(sessionKey, "chat", payload); }; return (evt: AgentEventPayload) => { const chatLink = chatRunState.registry.peek(evt.runId); const sessionKey = chatLink?.sessionKey ?? resolveSessionKeyForRun(evt.runId); // Include sessionKey so Control UI can filter tool streams per session. const agentPayload = sessionKey ? { ...evt, sessionKey } : evt; const last = agentRunSeq.get(evt.runId) ?? 0; if (evt.seq !== last + 1) { broadcast("agent", { runId: evt.runId, stream: "error", ts: Date.now(), sessionKey, data: { reason: "seq gap", expected: last + 1, received: evt.seq, }, }); } agentRunSeq.set(evt.runId, evt.seq); broadcast("agent", agentPayload); const lifecyclePhase = evt.stream === "lifecycle" && typeof evt.data?.phase === "string" ? evt.data.phase : null; if (sessionKey) { bridgeSendToSession(sessionKey, "agent", agentPayload); if (evt.stream === "assistant" && typeof evt.data?.text === "string") { const clientRunId = chatLink?.clientRunId ?? evt.runId; emitChatDelta(sessionKey, clientRunId, evt.seq, evt.data.text); } else if (lifecyclePhase === "end" || lifecyclePhase === "error") { if (chatLink) { const finished = chatRunState.registry.shift(evt.runId); if (!finished) { clearAgentRunContext(evt.runId); return; } emitChatFinal( finished.sessionKey, finished.clientRunId, evt.seq, lifecyclePhase === "error" ? "error" : "done", evt.data?.error, ); } else { emitChatFinal( sessionKey, evt.runId, evt.seq, lifecyclePhase === "error" ? "error" : "done", evt.data?.error, ); } } } if (lifecyclePhase === "end" || lifecyclePhase === "error") { clearAgentRunContext(evt.runId); } }; }