fix: preserve webchat run ordering
This commit is contained in:
@@ -58,6 +58,7 @@
|
|||||||
- macOS Talk Mode: fix audio stop ordering so disabling Talk Mode always stops in-flight playback.
|
- macOS Talk Mode: fix audio stop ordering so disabling Talk Mode always stops in-flight playback.
|
||||||
- macOS Talk Mode: throttle audio-level updates (avoid per-buffer task creation) to reduce CPU/task churn.
|
- macOS Talk Mode: throttle audio-level updates (avoid per-buffer task creation) to reduce CPU/task churn.
|
||||||
- macOS Talk Mode: increase overlay window size so wave rings don’t clip; close button is hover-only and closer to the orb.
|
- macOS Talk Mode: increase overlay window size so wave rings don’t clip; close button is hover-only and closer to the orb.
|
||||||
|
- WebChat: preserve chat run ordering per session so concurrent runs don’t strand the typing indicator.
|
||||||
- Talk Mode: fall back to system TTS when ElevenLabs is unavailable, returns non-audio, or playback fails (macOS/iOS/Android).
|
- Talk Mode: fall back to system TTS when ElevenLabs is unavailable, returns non-audio, or playback fails (macOS/iOS/Android).
|
||||||
- Talk Mode: stream PCM on macOS/iOS for lower latency (incremental playback); Android continues MP3 streaming.
|
- Talk Mode: stream PCM on macOS/iOS for lower latency (incremental playback); Android continues MP3 streaming.
|
||||||
- Talk Mode: validate ElevenLabs v3 stability and latency tier directives before sending requests.
|
- Talk Mode: validate ElevenLabs v3 stability and latency tier directives before sending requests.
|
||||||
|
|||||||
@@ -3187,6 +3187,121 @@ describe("gateway server", () => {
|
|||||||
await server.close();
|
await server.close();
|
||||||
});
|
});
|
||||||
|
|
||||||
|
test("chat.send preserves run ordering for queued runs", async () => {
|
||||||
|
const dir = await fs.mkdtemp(path.join(os.tmpdir(), "clawdis-gw-"));
|
||||||
|
testSessionStorePath = path.join(dir, "sessions.json");
|
||||||
|
await fs.writeFile(
|
||||||
|
testSessionStorePath,
|
||||||
|
JSON.stringify(
|
||||||
|
{
|
||||||
|
main: {
|
||||||
|
sessionId: "sess-main",
|
||||||
|
updatedAt: Date.now(),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
null,
|
||||||
|
2,
|
||||||
|
),
|
||||||
|
"utf-8",
|
||||||
|
);
|
||||||
|
|
||||||
|
const { server, ws } = await startServerWithClient();
|
||||||
|
await connectOk(ws);
|
||||||
|
|
||||||
|
ws.send(
|
||||||
|
JSON.stringify({
|
||||||
|
type: "req",
|
||||||
|
id: "chat-1",
|
||||||
|
method: "chat.send",
|
||||||
|
params: {
|
||||||
|
sessionKey: "main",
|
||||||
|
message: "first",
|
||||||
|
idempotencyKey: "idem-1",
|
||||||
|
},
|
||||||
|
}),
|
||||||
|
);
|
||||||
|
const res1 = await onceMessage(
|
||||||
|
ws,
|
||||||
|
(o) => o.type === "res" && o.id === "chat-1",
|
||||||
|
);
|
||||||
|
expect(res1.ok).toBe(true);
|
||||||
|
|
||||||
|
ws.send(
|
||||||
|
JSON.stringify({
|
||||||
|
type: "req",
|
||||||
|
id: "chat-2",
|
||||||
|
method: "chat.send",
|
||||||
|
params: {
|
||||||
|
sessionKey: "main",
|
||||||
|
message: "second",
|
||||||
|
idempotencyKey: "idem-2",
|
||||||
|
},
|
||||||
|
}),
|
||||||
|
);
|
||||||
|
const res2 = await onceMessage(
|
||||||
|
ws,
|
||||||
|
(o) => o.type === "res" && o.id === "chat-2",
|
||||||
|
);
|
||||||
|
expect(res2.ok).toBe(true);
|
||||||
|
|
||||||
|
const final1P = onceMessage<{
|
||||||
|
type: "event";
|
||||||
|
event: string;
|
||||||
|
payload?: unknown;
|
||||||
|
}>(
|
||||||
|
ws,
|
||||||
|
(o) => {
|
||||||
|
if (o.type !== "event" || o.event !== "chat") return false;
|
||||||
|
const payload = o.payload as { state?: unknown } | undefined;
|
||||||
|
return payload?.state === "final";
|
||||||
|
},
|
||||||
|
8000,
|
||||||
|
);
|
||||||
|
|
||||||
|
emitAgentEvent({
|
||||||
|
runId: "sess-main",
|
||||||
|
stream: "job",
|
||||||
|
data: { state: "done" },
|
||||||
|
});
|
||||||
|
|
||||||
|
const final1 = await final1P;
|
||||||
|
const run1 =
|
||||||
|
final1.payload && typeof final1.payload === "object"
|
||||||
|
? (final1.payload as { runId?: string }).runId
|
||||||
|
: undefined;
|
||||||
|
expect(run1).toBe("idem-1");
|
||||||
|
|
||||||
|
const final2P = onceMessage<{
|
||||||
|
type: "event";
|
||||||
|
event: string;
|
||||||
|
payload?: unknown;
|
||||||
|
}>(
|
||||||
|
ws,
|
||||||
|
(o) => {
|
||||||
|
if (o.type !== "event" || o.event !== "chat") return false;
|
||||||
|
const payload = o.payload as { state?: unknown } | undefined;
|
||||||
|
return payload?.state === "final";
|
||||||
|
},
|
||||||
|
8000,
|
||||||
|
);
|
||||||
|
|
||||||
|
emitAgentEvent({
|
||||||
|
runId: "sess-main",
|
||||||
|
stream: "job",
|
||||||
|
data: { state: "done" },
|
||||||
|
});
|
||||||
|
|
||||||
|
const final2 = await final2P;
|
||||||
|
const run2 =
|
||||||
|
final2.payload && typeof final2.payload === "object"
|
||||||
|
? (final2.payload as { runId?: string }).runId
|
||||||
|
: undefined;
|
||||||
|
expect(run2).toBe("idem-2");
|
||||||
|
|
||||||
|
ws.close();
|
||||||
|
await server.close();
|
||||||
|
});
|
||||||
|
|
||||||
test("bridge RPC chat.history returns session messages", async () => {
|
test("bridge RPC chat.history returns session messages", async () => {
|
||||||
const dir = await fs.mkdtemp(path.join(os.tmpdir(), "clawdis-gw-"));
|
const dir = await fs.mkdtemp(path.join(os.tmpdir(), "clawdis-gw-"));
|
||||||
testSessionStorePath = path.join(dir, "sessions.json");
|
testSessionStorePath = path.join(dir, "sessions.json");
|
||||||
|
|||||||
@@ -1770,11 +1770,48 @@ export async function startGatewayServer(
|
|||||||
// Track per-run sequence to detect out-of-order/lost agent events.
|
// Track per-run sequence to detect out-of-order/lost agent events.
|
||||||
const agentRunSeq = new Map<string, number>();
|
const agentRunSeq = new Map<string, number>();
|
||||||
const dedupe = new Map<string, DedupeEntry>();
|
const dedupe = new Map<string, DedupeEntry>();
|
||||||
// Map agent sessionId -> {sessionKey, clientRunId} for chat events (WS WebChat clients).
|
// Map agent sessionId -> pending chat runs for WebChat clients.
|
||||||
const chatRunSessions = new Map<
|
const chatRunSessions = new Map<
|
||||||
string,
|
string,
|
||||||
{ sessionKey: string; clientRunId: string }
|
Array<{ sessionKey: string; clientRunId: string }>
|
||||||
>();
|
>();
|
||||||
|
const addChatRun = (
|
||||||
|
sessionId: string,
|
||||||
|
entry: { sessionKey: string; clientRunId: string },
|
||||||
|
) => {
|
||||||
|
const queue = chatRunSessions.get(sessionId);
|
||||||
|
if (queue) {
|
||||||
|
queue.push(entry);
|
||||||
|
} else {
|
||||||
|
chatRunSessions.set(sessionId, [entry]);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
const peekChatRun = (sessionId: string) =>
|
||||||
|
chatRunSessions.get(sessionId)?.[0];
|
||||||
|
const shiftChatRun = (sessionId: string) => {
|
||||||
|
const queue = chatRunSessions.get(sessionId);
|
||||||
|
if (!queue || queue.length === 0) return undefined;
|
||||||
|
const entry = queue.shift();
|
||||||
|
if (!queue.length) chatRunSessions.delete(sessionId);
|
||||||
|
return entry;
|
||||||
|
};
|
||||||
|
const removeChatRun = (
|
||||||
|
sessionId: string,
|
||||||
|
clientRunId: string,
|
||||||
|
sessionKey?: string,
|
||||||
|
) => {
|
||||||
|
const queue = chatRunSessions.get(sessionId);
|
||||||
|
if (!queue || queue.length === 0) return undefined;
|
||||||
|
const idx = queue.findIndex(
|
||||||
|
(entry) =>
|
||||||
|
entry.clientRunId === clientRunId &&
|
||||||
|
(sessionKey ? entry.sessionKey === sessionKey : true),
|
||||||
|
);
|
||||||
|
if (idx < 0) return undefined;
|
||||||
|
const [entry] = queue.splice(idx, 1);
|
||||||
|
if (!queue.length) chatRunSessions.delete(sessionId);
|
||||||
|
return entry;
|
||||||
|
};
|
||||||
const chatRunBuffers = new Map<string, string>();
|
const chatRunBuffers = new Map<string, string>();
|
||||||
const chatDeltaSentAt = new Map<string, number>();
|
const chatDeltaSentAt = new Map<string, number>();
|
||||||
const chatAbortControllers = new Map<
|
const chatAbortControllers = new Map<
|
||||||
@@ -2948,13 +2985,7 @@ export async function startGatewayServer(
|
|||||||
chatAbortControllers.delete(runId);
|
chatAbortControllers.delete(runId);
|
||||||
chatRunBuffers.delete(runId);
|
chatRunBuffers.delete(runId);
|
||||||
chatDeltaSentAt.delete(runId);
|
chatDeltaSentAt.delete(runId);
|
||||||
const current = chatRunSessions.get(active.sessionId);
|
removeChatRun(active.sessionId, runId, sessionKey);
|
||||||
if (
|
|
||||||
current?.clientRunId === runId &&
|
|
||||||
current.sessionKey === sessionKey
|
|
||||||
) {
|
|
||||||
chatRunSessions.delete(active.sessionId);
|
|
||||||
}
|
|
||||||
|
|
||||||
const payload = {
|
const payload = {
|
||||||
runId,
|
runId,
|
||||||
@@ -3072,10 +3103,7 @@ export async function startGatewayServer(
|
|||||||
sessionId,
|
sessionId,
|
||||||
sessionKey: p.sessionKey,
|
sessionKey: p.sessionKey,
|
||||||
});
|
});
|
||||||
chatRunSessions.set(sessionId, {
|
addChatRun(sessionId, { sessionKey: p.sessionKey, clientRunId });
|
||||||
sessionKey: p.sessionKey,
|
|
||||||
clientRunId,
|
|
||||||
});
|
|
||||||
|
|
||||||
if (store) {
|
if (store) {
|
||||||
store[p.sessionKey] = sessionEntry;
|
store[p.sessionKey] = sessionEntry;
|
||||||
@@ -3192,7 +3220,7 @@ export async function startGatewayServer(
|
|||||||
|
|
||||||
// Ensure chat UI clients refresh when this run completes (even though it wasn't started via chat.send).
|
// Ensure chat UI clients refresh when this run completes (even though it wasn't started via chat.send).
|
||||||
// This maps agent bus events (keyed by sessionId) to chat events (keyed by clientRunId).
|
// This maps agent bus events (keyed by sessionId) to chat events (keyed by clientRunId).
|
||||||
chatRunSessions.set(sessionId, {
|
addChatRun(sessionId, {
|
||||||
sessionKey,
|
sessionKey,
|
||||||
clientRunId: `voice-${randomUUID()}`,
|
clientRunId: `voice-${randomUUID()}`,
|
||||||
});
|
});
|
||||||
@@ -3556,18 +3584,18 @@ export async function startGatewayServer(
|
|||||||
agentRunSeq.set(evt.runId, evt.seq);
|
agentRunSeq.set(evt.runId, evt.seq);
|
||||||
broadcast("agent", evt);
|
broadcast("agent", evt);
|
||||||
|
|
||||||
const chatLink = chatRunSessions.get(evt.runId);
|
const chatLink = peekChatRun(evt.runId);
|
||||||
if (chatLink) {
|
if (chatLink) {
|
||||||
// Map agent bus events to chat events for WS WebChat clients.
|
// Map agent bus events to chat events for WS WebChat clients.
|
||||||
// Use clientRunId so the webchat can correlate with its pending promise.
|
// Use clientRunId so the webchat can correlate with its pending promise.
|
||||||
const { sessionKey, clientRunId } = chatLink;
|
const { sessionKey, clientRunId } = chatLink;
|
||||||
bridgeSendToSession(sessionKey, "agent", evt);
|
bridgeSendToSession(sessionKey, "agent", evt);
|
||||||
const base = {
|
|
||||||
runId: clientRunId,
|
|
||||||
sessionKey,
|
|
||||||
seq: evt.seq,
|
|
||||||
};
|
|
||||||
if (evt.stream === "assistant" && typeof evt.data?.text === "string") {
|
if (evt.stream === "assistant" && typeof evt.data?.text === "string") {
|
||||||
|
const base = {
|
||||||
|
runId: clientRunId,
|
||||||
|
sessionKey,
|
||||||
|
seq: evt.seq,
|
||||||
|
};
|
||||||
chatRunBuffers.set(clientRunId, evt.data.text);
|
chatRunBuffers.set(clientRunId, evt.data.text);
|
||||||
const now = Date.now();
|
const now = Date.now();
|
||||||
const last = chatDeltaSentAt.get(clientRunId) ?? 0;
|
const last = chatDeltaSentAt.get(clientRunId) ?? 0;
|
||||||
@@ -3591,9 +3619,20 @@ export async function startGatewayServer(
|
|||||||
typeof evt.data?.state === "string" &&
|
typeof evt.data?.state === "string" &&
|
||||||
(evt.data.state === "done" || evt.data.state === "error")
|
(evt.data.state === "done" || evt.data.state === "error")
|
||||||
) {
|
) {
|
||||||
const text = chatRunBuffers.get(clientRunId)?.trim() ?? "";
|
const finished = shiftChatRun(evt.runId);
|
||||||
chatRunBuffers.delete(clientRunId);
|
if (!finished) {
|
||||||
chatDeltaSentAt.delete(clientRunId);
|
return;
|
||||||
|
}
|
||||||
|
const { sessionKey: finishedSessionKey, clientRunId: finishedRunId } =
|
||||||
|
finished;
|
||||||
|
const base = {
|
||||||
|
runId: finishedRunId,
|
||||||
|
sessionKey: finishedSessionKey,
|
||||||
|
seq: evt.seq,
|
||||||
|
};
|
||||||
|
const text = chatRunBuffers.get(finishedRunId)?.trim() ?? "";
|
||||||
|
chatRunBuffers.delete(finishedRunId);
|
||||||
|
chatDeltaSentAt.delete(finishedRunId);
|
||||||
if (evt.data.state === "done") {
|
if (evt.data.state === "done") {
|
||||||
const payload = {
|
const payload = {
|
||||||
...base,
|
...base,
|
||||||
@@ -3607,7 +3646,7 @@ export async function startGatewayServer(
|
|||||||
: undefined,
|
: undefined,
|
||||||
};
|
};
|
||||||
broadcast("chat", payload);
|
broadcast("chat", payload);
|
||||||
bridgeSendToSession(sessionKey, "chat", payload);
|
bridgeSendToSession(finishedSessionKey, "chat", payload);
|
||||||
} else {
|
} else {
|
||||||
const payload = {
|
const payload = {
|
||||||
...base,
|
...base,
|
||||||
@@ -3617,9 +3656,8 @@ export async function startGatewayServer(
|
|||||||
: undefined,
|
: undefined,
|
||||||
};
|
};
|
||||||
broadcast("chat", payload);
|
broadcast("chat", payload);
|
||||||
bridgeSendToSession(sessionKey, "chat", payload);
|
bridgeSendToSession(finishedSessionKey, "chat", payload);
|
||||||
}
|
}
|
||||||
chatRunSessions.delete(evt.runId);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
@@ -4221,13 +4259,7 @@ export async function startGatewayServer(
|
|||||||
chatAbortControllers.delete(runId);
|
chatAbortControllers.delete(runId);
|
||||||
chatRunBuffers.delete(runId);
|
chatRunBuffers.delete(runId);
|
||||||
chatDeltaSentAt.delete(runId);
|
chatDeltaSentAt.delete(runId);
|
||||||
const current = chatRunSessions.get(active.sessionId);
|
removeChatRun(active.sessionId, runId, sessionKey);
|
||||||
if (
|
|
||||||
current?.clientRunId === runId &&
|
|
||||||
current.sessionKey === sessionKey
|
|
||||||
) {
|
|
||||||
chatRunSessions.delete(active.sessionId);
|
|
||||||
}
|
|
||||||
|
|
||||||
const payload = {
|
const payload = {
|
||||||
runId,
|
runId,
|
||||||
@@ -4352,7 +4384,7 @@ export async function startGatewayServer(
|
|||||||
sessionId,
|
sessionId,
|
||||||
sessionKey: p.sessionKey,
|
sessionKey: p.sessionKey,
|
||||||
});
|
});
|
||||||
chatRunSessions.set(sessionId, {
|
addChatRun(sessionId, {
|
||||||
sessionKey: p.sessionKey,
|
sessionKey: p.sessionKey,
|
||||||
clientRunId,
|
clientRunId,
|
||||||
});
|
});
|
||||||
@@ -6152,7 +6184,7 @@ export async function startGatewayServer(
|
|||||||
const mainKey =
|
const mainKey =
|
||||||
(cfg.session?.mainKey ?? "main").trim() || "main";
|
(cfg.session?.mainKey ?? "main").trim() || "main";
|
||||||
if (requestedSessionKey === mainKey) {
|
if (requestedSessionKey === mainKey) {
|
||||||
chatRunSessions.set(sessionId, {
|
addChatRun(sessionId, {
|
||||||
sessionKey: requestedSessionKey,
|
sessionKey: requestedSessionKey,
|
||||||
clientRunId: idem,
|
clientRunId: idem,
|
||||||
});
|
});
|
||||||
|
|||||||
Reference in New Issue
Block a user