fix(webchat): stream assistant events and correlate runId

This commit is contained in:
Peter Steinberger
2025-12-14 00:56:06 +00:00
parent 037ea92679
commit 2583fb66cc
2 changed files with 41 additions and 19 deletions

View File

@@ -260,8 +260,11 @@ type DedupeEntry = {
error?: ErrorShape;
};
const dedupe = new Map<string, DedupeEntry>();
// Map runId -> sessionKey for chat events (WS WebChat clients).
const chatRunSessions = new Map<string, string>();
// Map agent sessionId -> {sessionKey, clientRunId} for chat events (WS WebChat clients).
const chatRunSessions = new Map<
string,
{ sessionKey: string; clientRunId: string }
>();
const chatRunBuffers = new Map<string, string[]>();
const getGatewayToken = () => process.env.CLAWDIS_GATEWAY_TOKEN;
@@ -972,25 +975,27 @@ export async function startGatewayServer(
agentRunSeq.set(evt.runId, evt.seq);
broadcast("agent", evt);
const sessionKey = chatRunSessions.get(evt.runId);
if (sessionKey) {
const chatLink = chatRunSessions.get(evt.runId);
if (chatLink) {
// Map agent bus events to chat events for WS WebChat clients.
// Use clientRunId so the webchat can correlate with its pending promise.
const { sessionKey, clientRunId } = chatLink;
const base = {
runId: evt.runId,
runId: clientRunId,
sessionKey,
seq: evt.seq,
};
if (evt.stream === "assistant" && typeof evt.data?.text === "string") {
const buf = chatRunBuffers.get(evt.runId) ?? [];
const buf = chatRunBuffers.get(clientRunId) ?? [];
buf.push(evt.data.text);
chatRunBuffers.set(evt.runId, buf);
chatRunBuffers.set(clientRunId, buf);
} else if (
evt.stream === "job" &&
typeof evt.data?.state === "string" &&
(evt.data.state === "done" || evt.data.state === "error")
) {
const text = chatRunBuffers.get(evt.runId)?.join("\n").trim() ?? "";
chatRunBuffers.delete(evt.runId);
const text = chatRunBuffers.get(clientRunId)?.join("\n").trim() ?? "";
chatRunBuffers.delete(clientRunId);
if (evt.data.state === "done") {
broadcast("chat", {
...base,
@@ -1448,10 +1453,13 @@ export async function startGatewayServer(
await saveSessionStore(storePath, store);
}
}
chatRunSessions.set(sessionId, p.sessionKey);
const clientRunId = p.idempotencyKey;
chatRunSessions.set(sessionId, {
sessionKey: p.sessionKey,
clientRunId,
});
const idem = p.idempotencyKey;
const cached = dedupe.get(`chat:${idem}`);
const cached = dedupe.get(`chat:${clientRunId}`);
if (cached) {
respond(cached.ok, cached.payload, cached.error, {
cached: true,
@@ -1473,26 +1481,30 @@ export async function startGatewayServer(
deps,
);
const payload = {
runId: sessionId,
runId: clientRunId,
status: "ok" as const,
};
dedupe.set(`chat:${idem}`, { ts: Date.now(), ok: true, payload });
respond(true, payload, undefined, { runId: sessionId });
dedupe.set(`chat:${clientRunId}`, {
ts: Date.now(),
ok: true,
payload,
});
respond(true, payload, undefined, { runId: clientRunId });
} catch (err) {
const error = errorShape(ErrorCodes.UNAVAILABLE, String(err));
const payload = {
runId: sessionId,
runId: clientRunId,
status: "error" as const,
summary: String(err),
};
dedupe.set(`chat:${idem}`, {
dedupe.set(`chat:${clientRunId}`, {
ts: Date.now(),
ok: false,
payload,
error,
});
respond(false, payload, error, {
runId: sessionId,
runId: clientRunId,
error: formatForLog(err),
});
}
@@ -2342,7 +2354,10 @@ export async function startGatewayServer(
(cfg.inbound?.reply?.session?.mainKey ?? "main").trim() ||
"main";
if (requestedSessionKey === mainKey) {
chatRunSessions.set(sessionId, requestedSessionKey);
chatRunSessions.set(sessionId, {
sessionKey: requestedSessionKey,
clientRunId: idem,
});
bestEffortDeliver = true;
}
}