diff --git a/CHANGELOG.md b/CHANGELOG.md index d9eef2209..c387d1d75 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -58,6 +58,7 @@ - macOS Talk Mode: fix audio stop ordering so disabling Talk Mode always stops in-flight playback. - macOS Talk Mode: throttle audio-level updates (avoid per-buffer task creation) to reduce CPU/task churn. - macOS Talk Mode: increase overlay window size so wave rings don’t clip; close button is hover-only and closer to the orb. +- WebChat: preserve chat run ordering per session so concurrent runs don’t strand the typing indicator. - Talk Mode: fall back to system TTS when ElevenLabs is unavailable, returns non-audio, or playback fails (macOS/iOS/Android). - Talk Mode: stream PCM on macOS/iOS for lower latency (incremental playback); Android continues MP3 streaming. - Talk Mode: validate ElevenLabs v3 stability and latency tier directives before sending requests. diff --git a/src/gateway/server.test.ts b/src/gateway/server.test.ts index cff7b0a18..f6ec100bc 100644 --- a/src/gateway/server.test.ts +++ b/src/gateway/server.test.ts @@ -3187,6 +3187,121 @@ describe("gateway server", () => { await server.close(); }); + test("chat.send preserves run ordering for queued runs", async () => { + const dir = await fs.mkdtemp(path.join(os.tmpdir(), "clawdis-gw-")); + testSessionStorePath = path.join(dir, "sessions.json"); + await fs.writeFile( + testSessionStorePath, + JSON.stringify( + { + main: { + sessionId: "sess-main", + updatedAt: Date.now(), + }, + }, + null, + 2, + ), + "utf-8", + ); + + const { server, ws } = await startServerWithClient(); + await connectOk(ws); + + ws.send( + JSON.stringify({ + type: "req", + id: "chat-1", + method: "chat.send", + params: { + sessionKey: "main", + message: "first", + idempotencyKey: "idem-1", + }, + }), + ); + const res1 = await onceMessage( + ws, + (o) => o.type === "res" && o.id === "chat-1", + ); + expect(res1.ok).toBe(true); + + ws.send( + JSON.stringify({ + type: "req", + id: "chat-2", + method: "chat.send", + params: { + sessionKey: "main", + message: "second", + idempotencyKey: "idem-2", + }, + }), + ); + const res2 = await onceMessage( + ws, + (o) => o.type === "res" && o.id === "chat-2", + ); + expect(res2.ok).toBe(true); + + const final1P = onceMessage<{ + type: "event"; + event: string; + payload?: unknown; + }>( + ws, + (o) => { + if (o.type !== "event" || o.event !== "chat") return false; + const payload = o.payload as { state?: unknown } | undefined; + return payload?.state === "final"; + }, + 8000, + ); + + emitAgentEvent({ + runId: "sess-main", + stream: "job", + data: { state: "done" }, + }); + + const final1 = await final1P; + const run1 = + final1.payload && typeof final1.payload === "object" + ? (final1.payload as { runId?: string }).runId + : undefined; + expect(run1).toBe("idem-1"); + + const final2P = onceMessage<{ + type: "event"; + event: string; + payload?: unknown; + }>( + ws, + (o) => { + if (o.type !== "event" || o.event !== "chat") return false; + const payload = o.payload as { state?: unknown } | undefined; + return payload?.state === "final"; + }, + 8000, + ); + + emitAgentEvent({ + runId: "sess-main", + stream: "job", + data: { state: "done" }, + }); + + const final2 = await final2P; + const run2 = + final2.payload && typeof final2.payload === "object" + ? (final2.payload as { runId?: string }).runId + : undefined; + expect(run2).toBe("idem-2"); + + ws.close(); + await server.close(); + }); + test("bridge RPC chat.history returns session messages", async () => { const dir = await fs.mkdtemp(path.join(os.tmpdir(), "clawdis-gw-")); testSessionStorePath = path.join(dir, "sessions.json"); diff --git a/src/gateway/server.ts b/src/gateway/server.ts index 3ddf91841..8867cbd09 100644 --- a/src/gateway/server.ts +++ b/src/gateway/server.ts @@ -1770,11 +1770,48 @@ export async function startGatewayServer( // Track per-run sequence to detect out-of-order/lost agent events. const agentRunSeq = new Map(); const dedupe = new Map(); - // Map agent sessionId -> {sessionKey, clientRunId} for chat events (WS WebChat clients). + // Map agent sessionId -> pending chat runs for WebChat clients. const chatRunSessions = new Map< string, - { sessionKey: string; clientRunId: string } + Array<{ sessionKey: string; clientRunId: string }> >(); + const addChatRun = ( + sessionId: string, + entry: { sessionKey: string; clientRunId: string }, + ) => { + const queue = chatRunSessions.get(sessionId); + if (queue) { + queue.push(entry); + } else { + chatRunSessions.set(sessionId, [entry]); + } + }; + const peekChatRun = (sessionId: string) => + chatRunSessions.get(sessionId)?.[0]; + const shiftChatRun = (sessionId: string) => { + const queue = chatRunSessions.get(sessionId); + if (!queue || queue.length === 0) return undefined; + const entry = queue.shift(); + if (!queue.length) chatRunSessions.delete(sessionId); + return entry; + }; + const removeChatRun = ( + sessionId: string, + clientRunId: string, + sessionKey?: string, + ) => { + const queue = chatRunSessions.get(sessionId); + if (!queue || queue.length === 0) return undefined; + const idx = queue.findIndex( + (entry) => + entry.clientRunId === clientRunId && + (sessionKey ? entry.sessionKey === sessionKey : true), + ); + if (idx < 0) return undefined; + const [entry] = queue.splice(idx, 1); + if (!queue.length) chatRunSessions.delete(sessionId); + return entry; + }; const chatRunBuffers = new Map(); const chatDeltaSentAt = new Map(); const chatAbortControllers = new Map< @@ -2948,13 +2985,7 @@ export async function startGatewayServer( chatAbortControllers.delete(runId); chatRunBuffers.delete(runId); chatDeltaSentAt.delete(runId); - const current = chatRunSessions.get(active.sessionId); - if ( - current?.clientRunId === runId && - current.sessionKey === sessionKey - ) { - chatRunSessions.delete(active.sessionId); - } + removeChatRun(active.sessionId, runId, sessionKey); const payload = { runId, @@ -3072,10 +3103,7 @@ export async function startGatewayServer( sessionId, sessionKey: p.sessionKey, }); - chatRunSessions.set(sessionId, { - sessionKey: p.sessionKey, - clientRunId, - }); + addChatRun(sessionId, { sessionKey: p.sessionKey, clientRunId }); if (store) { store[p.sessionKey] = sessionEntry; @@ -3192,7 +3220,7 @@ export async function startGatewayServer( // Ensure chat UI clients refresh when this run completes (even though it wasn't started via chat.send). // This maps agent bus events (keyed by sessionId) to chat events (keyed by clientRunId). - chatRunSessions.set(sessionId, { + addChatRun(sessionId, { sessionKey, clientRunId: `voice-${randomUUID()}`, }); @@ -3556,18 +3584,18 @@ export async function startGatewayServer( agentRunSeq.set(evt.runId, evt.seq); broadcast("agent", evt); - const chatLink = chatRunSessions.get(evt.runId); + const chatLink = peekChatRun(evt.runId); if (chatLink) { // Map agent bus events to chat events for WS WebChat clients. // Use clientRunId so the webchat can correlate with its pending promise. const { sessionKey, clientRunId } = chatLink; bridgeSendToSession(sessionKey, "agent", evt); - const base = { - runId: clientRunId, - sessionKey, - seq: evt.seq, - }; if (evt.stream === "assistant" && typeof evt.data?.text === "string") { + const base = { + runId: clientRunId, + sessionKey, + seq: evt.seq, + }; chatRunBuffers.set(clientRunId, evt.data.text); const now = Date.now(); const last = chatDeltaSentAt.get(clientRunId) ?? 0; @@ -3591,9 +3619,20 @@ export async function startGatewayServer( typeof evt.data?.state === "string" && (evt.data.state === "done" || evt.data.state === "error") ) { - const text = chatRunBuffers.get(clientRunId)?.trim() ?? ""; - chatRunBuffers.delete(clientRunId); - chatDeltaSentAt.delete(clientRunId); + const finished = shiftChatRun(evt.runId); + if (!finished) { + return; + } + const { sessionKey: finishedSessionKey, clientRunId: finishedRunId } = + finished; + const base = { + runId: finishedRunId, + sessionKey: finishedSessionKey, + seq: evt.seq, + }; + const text = chatRunBuffers.get(finishedRunId)?.trim() ?? ""; + chatRunBuffers.delete(finishedRunId); + chatDeltaSentAt.delete(finishedRunId); if (evt.data.state === "done") { const payload = { ...base, @@ -3607,7 +3646,7 @@ export async function startGatewayServer( : undefined, }; broadcast("chat", payload); - bridgeSendToSession(sessionKey, "chat", payload); + bridgeSendToSession(finishedSessionKey, "chat", payload); } else { const payload = { ...base, @@ -3617,9 +3656,8 @@ export async function startGatewayServer( : undefined, }; broadcast("chat", payload); - bridgeSendToSession(sessionKey, "chat", payload); + bridgeSendToSession(finishedSessionKey, "chat", payload); } - chatRunSessions.delete(evt.runId); } } }); @@ -4221,13 +4259,7 @@ export async function startGatewayServer( chatAbortControllers.delete(runId); chatRunBuffers.delete(runId); chatDeltaSentAt.delete(runId); - const current = chatRunSessions.get(active.sessionId); - if ( - current?.clientRunId === runId && - current.sessionKey === sessionKey - ) { - chatRunSessions.delete(active.sessionId); - } + removeChatRun(active.sessionId, runId, sessionKey); const payload = { runId, @@ -4352,7 +4384,7 @@ export async function startGatewayServer( sessionId, sessionKey: p.sessionKey, }); - chatRunSessions.set(sessionId, { + addChatRun(sessionId, { sessionKey: p.sessionKey, clientRunId, }); @@ -6152,7 +6184,7 @@ export async function startGatewayServer( const mainKey = (cfg.session?.mainKey ?? "main").trim() || "main"; if (requestedSessionKey === mainKey) { - chatRunSessions.set(sessionId, { + addChatRun(sessionId, { sessionKey: requestedSessionKey, clientRunId: idem, });