diff --git a/src/commands/agent.ts b/src/commands/agent.ts index 9a709397b..342a3e9b9 100644 --- a/src/commands/agent.ts +++ b/src/commands/agent.ts @@ -324,6 +324,13 @@ export async function agentCommand( thinkLevel: resolvedThinkLevel, verboseLevel: resolvedVerboseLevel, runId: sessionId, + onAgentEvent: (evt) => { + emitAgentEvent({ + runId: sessionId, + stream: evt.stream, + data: evt.data, + }); + }, }); emitAgentEvent({ runId: sessionId, diff --git a/src/gateway/server.ts b/src/gateway/server.ts index 8704bf0a9..0494f778a 100644 --- a/src/gateway/server.ts +++ b/src/gateway/server.ts @@ -260,8 +260,11 @@ type DedupeEntry = { error?: ErrorShape; }; const dedupe = new Map(); -// Map runId -> sessionKey for chat events (WS WebChat clients). -const chatRunSessions = new Map(); +// Map agent sessionId -> {sessionKey, clientRunId} for chat events (WS WebChat clients). +const chatRunSessions = new Map< + string, + { sessionKey: string; clientRunId: string } +>(); const chatRunBuffers = new Map(); const getGatewayToken = () => process.env.CLAWDIS_GATEWAY_TOKEN; @@ -972,25 +975,27 @@ export async function startGatewayServer( agentRunSeq.set(evt.runId, evt.seq); broadcast("agent", evt); - const sessionKey = chatRunSessions.get(evt.runId); - if (sessionKey) { + const chatLink = chatRunSessions.get(evt.runId); + if (chatLink) { // Map agent bus events to chat events for WS WebChat clients. + // Use clientRunId so the webchat can correlate with its pending promise. + const { sessionKey, clientRunId } = chatLink; const base = { - runId: evt.runId, + runId: clientRunId, sessionKey, seq: evt.seq, }; if (evt.stream === "assistant" && typeof evt.data?.text === "string") { - const buf = chatRunBuffers.get(evt.runId) ?? []; + const buf = chatRunBuffers.get(clientRunId) ?? []; buf.push(evt.data.text); - chatRunBuffers.set(evt.runId, buf); + chatRunBuffers.set(clientRunId, buf); } else if ( evt.stream === "job" && typeof evt.data?.state === "string" && (evt.data.state === "done" || evt.data.state === "error") ) { - const text = chatRunBuffers.get(evt.runId)?.join("\n").trim() ?? ""; - chatRunBuffers.delete(evt.runId); + const text = chatRunBuffers.get(clientRunId)?.join("\n").trim() ?? ""; + chatRunBuffers.delete(clientRunId); if (evt.data.state === "done") { broadcast("chat", { ...base, @@ -1448,10 +1453,13 @@ export async function startGatewayServer( await saveSessionStore(storePath, store); } } - chatRunSessions.set(sessionId, p.sessionKey); + const clientRunId = p.idempotencyKey; + chatRunSessions.set(sessionId, { + sessionKey: p.sessionKey, + clientRunId, + }); - const idem = p.idempotencyKey; - const cached = dedupe.get(`chat:${idem}`); + const cached = dedupe.get(`chat:${clientRunId}`); if (cached) { respond(cached.ok, cached.payload, cached.error, { cached: true, @@ -1473,26 +1481,30 @@ export async function startGatewayServer( deps, ); const payload = { - runId: sessionId, + runId: clientRunId, status: "ok" as const, }; - dedupe.set(`chat:${idem}`, { ts: Date.now(), ok: true, payload }); - respond(true, payload, undefined, { runId: sessionId }); + dedupe.set(`chat:${clientRunId}`, { + ts: Date.now(), + ok: true, + payload, + }); + respond(true, payload, undefined, { runId: clientRunId }); } catch (err) { const error = errorShape(ErrorCodes.UNAVAILABLE, String(err)); const payload = { - runId: sessionId, + runId: clientRunId, status: "error" as const, summary: String(err), }; - dedupe.set(`chat:${idem}`, { + dedupe.set(`chat:${clientRunId}`, { ts: Date.now(), ok: false, payload, error, }); respond(false, payload, error, { - runId: sessionId, + runId: clientRunId, error: formatForLog(err), }); } @@ -2342,7 +2354,10 @@ export async function startGatewayServer( (cfg.inbound?.reply?.session?.mainKey ?? "main").trim() || "main"; if (requestedSessionKey === mainKey) { - chatRunSessions.set(sessionId, requestedSessionKey); + chatRunSessions.set(sessionId, { + sessionKey: requestedSessionKey, + clientRunId: idem, + }); bestEffortDeliver = true; } }