Files
clawdbot/src/gateway/server-chat.ts
2026-01-10 17:23:27 +01:00

304 lines
8.4 KiB
TypeScript

import { normalizeVerboseLevel } from "../auto-reply/thinking.js";
import {
type AgentEventPayload,
getAgentRunContext,
} from "../infra/agent-events.js";
import { loadSessionEntry } from "./session-utils.js";
import { formatForLog } from "./ws-log.js";
export type ChatRunEntry = {
sessionKey: string;
clientRunId: string;
};
export type ChatRunRegistry = {
add: (sessionId: string, entry: ChatRunEntry) => void;
peek: (sessionId: string) => ChatRunEntry | undefined;
shift: (sessionId: string) => ChatRunEntry | undefined;
remove: (
sessionId: string,
clientRunId: string,
sessionKey?: string,
) => ChatRunEntry | undefined;
clear: () => void;
};
export function createChatRunRegistry(): ChatRunRegistry {
const chatRunSessions = new Map<string, ChatRunEntry[]>();
const add = (sessionId: string, entry: ChatRunEntry) => {
const queue = chatRunSessions.get(sessionId);
if (queue) {
queue.push(entry);
} else {
chatRunSessions.set(sessionId, [entry]);
}
};
const peek = (sessionId: string) => chatRunSessions.get(sessionId)?.[0];
const shift = (sessionId: string) => {
const queue = chatRunSessions.get(sessionId);
if (!queue || queue.length === 0) return undefined;
const entry = queue.shift();
if (!queue.length) chatRunSessions.delete(sessionId);
return entry;
};
const remove = (
sessionId: string,
clientRunId: string,
sessionKey?: string,
) => {
const queue = chatRunSessions.get(sessionId);
if (!queue || queue.length === 0) return undefined;
const idx = queue.findIndex(
(entry) =>
entry.clientRunId === clientRunId &&
(sessionKey ? entry.sessionKey === sessionKey : true),
);
if (idx < 0) return undefined;
const [entry] = queue.splice(idx, 1);
if (!queue.length) chatRunSessions.delete(sessionId);
return entry;
};
const clear = () => {
chatRunSessions.clear();
};
return { add, peek, shift, remove, clear };
}
export type ChatRunState = {
registry: ChatRunRegistry;
buffers: Map<string, string>;
deltaSentAt: Map<string, number>;
abortedRuns: Map<string, number>;
clear: () => void;
};
export function createChatRunState(): ChatRunState {
const registry = createChatRunRegistry();
const buffers = new Map<string, string>();
const deltaSentAt = new Map<string, number>();
const abortedRuns = new Map<string, number>();
const clear = () => {
registry.clear();
buffers.clear();
deltaSentAt.clear();
abortedRuns.clear();
};
return {
registry,
buffers,
deltaSentAt,
abortedRuns,
clear,
};
}
export type ChatEventBroadcast = (
event: string,
payload: unknown,
opts?: { dropIfSlow?: boolean },
) => void;
export type BridgeSendToSession = (
sessionKey: string,
event: string,
payload: unknown,
) => void;
export type AgentEventHandlerOptions = {
broadcast: ChatEventBroadcast;
bridgeSendToSession: BridgeSendToSession;
agentRunSeq: Map<string, number>;
chatRunState: ChatRunState;
resolveSessionKeyForRun: (runId: string) => string | undefined;
clearAgentRunContext: (runId: string) => void;
};
export function createAgentEventHandler({
broadcast,
bridgeSendToSession,
agentRunSeq,
chatRunState,
resolveSessionKeyForRun,
clearAgentRunContext,
}: AgentEventHandlerOptions) {
const emitChatDelta = (
sessionKey: string,
clientRunId: string,
seq: number,
text: string,
) => {
chatRunState.buffers.set(clientRunId, text);
const now = Date.now();
const last = chatRunState.deltaSentAt.get(clientRunId) ?? 0;
if (now - last < 150) return;
chatRunState.deltaSentAt.set(clientRunId, now);
const payload = {
runId: clientRunId,
sessionKey,
seq,
state: "delta" as const,
message: {
role: "assistant",
content: [{ type: "text", text }],
timestamp: now,
},
};
broadcast("chat", payload, { dropIfSlow: true });
bridgeSendToSession(sessionKey, "chat", payload);
};
const emitChatFinal = (
sessionKey: string,
clientRunId: string,
seq: number,
jobState: "done" | "error",
error?: unknown,
) => {
const text = chatRunState.buffers.get(clientRunId)?.trim() ?? "";
chatRunState.buffers.delete(clientRunId);
chatRunState.deltaSentAt.delete(clientRunId);
if (jobState === "done") {
const payload = {
runId: clientRunId,
sessionKey,
seq,
state: "final" as const,
message: text
? {
role: "assistant",
content: [{ type: "text", text }],
timestamp: Date.now(),
}
: undefined,
};
broadcast("chat", payload);
bridgeSendToSession(sessionKey, "chat", payload);
return;
}
const payload = {
runId: clientRunId,
sessionKey,
seq,
state: "error" as const,
errorMessage: error ? formatForLog(error) : undefined,
};
broadcast("chat", payload);
bridgeSendToSession(sessionKey, "chat", payload);
};
const shouldEmitToolEvents = (runId: string, sessionKey?: string) => {
const runContext = getAgentRunContext(runId);
const runVerbose = normalizeVerboseLevel(runContext?.verboseLevel);
if (runVerbose) return runVerbose === "on";
if (!sessionKey) return false;
try {
const { cfg, entry } = loadSessionEntry(sessionKey);
const sessionVerbose = normalizeVerboseLevel(entry?.verboseLevel);
if (sessionVerbose) return sessionVerbose === "on";
const defaultVerbose = normalizeVerboseLevel(
cfg.agents?.defaults?.verboseDefault,
);
return defaultVerbose === "on";
} catch {
return false;
}
};
return (evt: AgentEventPayload) => {
const chatLink = chatRunState.registry.peek(evt.runId);
const sessionKey =
chatLink?.sessionKey ?? resolveSessionKeyForRun(evt.runId);
const clientRunId = chatLink?.clientRunId ?? evt.runId;
const isAborted =
chatRunState.abortedRuns.has(clientRunId) ||
chatRunState.abortedRuns.has(evt.runId);
// Include sessionKey so Control UI can filter tool streams per session.
const agentPayload = sessionKey ? { ...evt, sessionKey } : evt;
const last = agentRunSeq.get(evt.runId) ?? 0;
if (evt.stream === "tool" && !shouldEmitToolEvents(evt.runId, sessionKey)) {
agentRunSeq.set(evt.runId, evt.seq);
return;
}
if (evt.seq !== last + 1) {
broadcast("agent", {
runId: evt.runId,
stream: "error",
ts: Date.now(),
sessionKey,
data: {
reason: "seq gap",
expected: last + 1,
received: evt.seq,
},
});
}
agentRunSeq.set(evt.runId, evt.seq);
broadcast("agent", agentPayload);
const lifecyclePhase =
evt.stream === "lifecycle" && typeof evt.data?.phase === "string"
? evt.data.phase
: null;
if (sessionKey) {
bridgeSendToSession(sessionKey, "agent", agentPayload);
if (
!isAborted &&
evt.stream === "assistant" &&
typeof evt.data?.text === "string"
) {
emitChatDelta(sessionKey, clientRunId, evt.seq, evt.data.text);
} else if (
!isAborted &&
(lifecyclePhase === "end" || lifecyclePhase === "error")
) {
if (chatLink) {
const finished = chatRunState.registry.shift(evt.runId);
if (!finished) {
clearAgentRunContext(evt.runId);
return;
}
emitChatFinal(
finished.sessionKey,
finished.clientRunId,
evt.seq,
lifecyclePhase === "error" ? "error" : "done",
evt.data?.error,
);
} else {
emitChatFinal(
sessionKey,
evt.runId,
evt.seq,
lifecyclePhase === "error" ? "error" : "done",
evt.data?.error,
);
}
} else if (
isAborted &&
(lifecyclePhase === "end" || lifecyclePhase === "error")
) {
chatRunState.abortedRuns.delete(clientRunId);
chatRunState.abortedRuns.delete(evt.runId);
chatRunState.buffers.delete(clientRunId);
chatRunState.deltaSentAt.delete(clientRunId);
if (chatLink) {
chatRunState.registry.remove(evt.runId, clientRunId, sessionKey);
}
}
}
if (lifecyclePhase === "end" || lifecyclePhase === "error") {
clearAgentRunContext(evt.runId);
}
};
}