fix(gateway): make chat.abort reliable
This commit is contained in:
@@ -68,6 +68,22 @@ let testSessionStorePath: string | undefined;
|
|||||||
let testAllowFrom: string[] | undefined;
|
let testAllowFrom: string[] | undefined;
|
||||||
let testCronStorePath: string | undefined;
|
let testCronStorePath: string | undefined;
|
||||||
let testCronEnabled: boolean | undefined = false;
|
let testCronEnabled: boolean | undefined = false;
|
||||||
|
const sessionStoreSaveDelayMs = vi.hoisted(() => ({ value: 0 }));
|
||||||
|
vi.mock("../config/sessions.js", async () => {
|
||||||
|
const actual = await vi.importActual<typeof import("../config/sessions.js")>(
|
||||||
|
"../config/sessions.js",
|
||||||
|
);
|
||||||
|
return {
|
||||||
|
...actual,
|
||||||
|
saveSessionStore: vi.fn(async (storePath: string, store: unknown) => {
|
||||||
|
const delay = sessionStoreSaveDelayMs.value;
|
||||||
|
if (delay > 0) {
|
||||||
|
await new Promise((resolve) => setTimeout(resolve, delay));
|
||||||
|
}
|
||||||
|
return actual.saveSessionStore(storePath, store as never);
|
||||||
|
}),
|
||||||
|
};
|
||||||
|
});
|
||||||
vi.mock("../config/config.js", () => ({
|
vi.mock("../config/config.js", () => ({
|
||||||
loadConfig: () => ({
|
loadConfig: () => ({
|
||||||
inbound: {
|
inbound: {
|
||||||
@@ -109,6 +125,7 @@ beforeEach(async () => {
|
|||||||
previousHome = process.env.HOME;
|
previousHome = process.env.HOME;
|
||||||
tempHome = await fs.mkdtemp(path.join(os.tmpdir(), "clawdis-gateway-home-"));
|
tempHome = await fs.mkdtemp(path.join(os.tmpdir(), "clawdis-gateway-home-"));
|
||||||
process.env.HOME = tempHome;
|
process.env.HOME = tempHome;
|
||||||
|
sessionStoreSaveDelayMs.value = 0;
|
||||||
});
|
});
|
||||||
|
|
||||||
afterEach(async () => {
|
afterEach(async () => {
|
||||||
@@ -2075,6 +2092,93 @@ describe("gateway server", () => {
|
|||||||
},
|
},
|
||||||
);
|
);
|
||||||
|
|
||||||
|
test("chat.abort cancels while saving the session store", async () => {
|
||||||
|
const dir = await fs.mkdtemp(path.join(os.tmpdir(), "clawdis-gw-"));
|
||||||
|
testSessionStorePath = path.join(dir, "sessions.json");
|
||||||
|
await fs.writeFile(
|
||||||
|
testSessionStorePath,
|
||||||
|
JSON.stringify(
|
||||||
|
{
|
||||||
|
main: {
|
||||||
|
sessionId: "sess-main",
|
||||||
|
updatedAt: Date.now(),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
null,
|
||||||
|
2,
|
||||||
|
),
|
||||||
|
"utf-8",
|
||||||
|
);
|
||||||
|
|
||||||
|
sessionStoreSaveDelayMs.value = 120;
|
||||||
|
|
||||||
|
const { server, ws } = await startServerWithClient();
|
||||||
|
await connectOk(ws);
|
||||||
|
|
||||||
|
const spy = vi.mocked(agentCommand);
|
||||||
|
spy.mockImplementationOnce(async (opts) => {
|
||||||
|
const signal = (opts as { abortSignal?: AbortSignal }).abortSignal;
|
||||||
|
await new Promise<void>((resolve) => {
|
||||||
|
if (!signal) return resolve();
|
||||||
|
if (signal.aborted) return resolve();
|
||||||
|
signal.addEventListener("abort", () => resolve(), { once: true });
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
const abortedEventP = onceMessage(
|
||||||
|
ws,
|
||||||
|
(o) =>
|
||||||
|
o.type === "event" &&
|
||||||
|
o.event === "chat" &&
|
||||||
|
o.payload?.state === "aborted",
|
||||||
|
);
|
||||||
|
|
||||||
|
const sendResP = onceMessage(
|
||||||
|
ws,
|
||||||
|
(o) => o.type === "res" && o.id === "send-abort-save-1",
|
||||||
|
);
|
||||||
|
|
||||||
|
ws.send(
|
||||||
|
JSON.stringify({
|
||||||
|
type: "req",
|
||||||
|
id: "send-abort-save-1",
|
||||||
|
method: "chat.send",
|
||||||
|
params: {
|
||||||
|
sessionKey: "main",
|
||||||
|
message: "hello",
|
||||||
|
idempotencyKey: "idem-abort-save-1",
|
||||||
|
timeoutMs: 30_000,
|
||||||
|
},
|
||||||
|
}),
|
||||||
|
);
|
||||||
|
|
||||||
|
const abortResP = onceMessage(
|
||||||
|
ws,
|
||||||
|
(o) => o.type === "res" && o.id === "abort-save-1",
|
||||||
|
);
|
||||||
|
ws.send(
|
||||||
|
JSON.stringify({
|
||||||
|
type: "req",
|
||||||
|
id: "abort-save-1",
|
||||||
|
method: "chat.abort",
|
||||||
|
params: { sessionKey: "main", runId: "idem-abort-save-1" },
|
||||||
|
}),
|
||||||
|
);
|
||||||
|
|
||||||
|
const abortRes = await abortResP;
|
||||||
|
expect(abortRes.ok).toBe(true);
|
||||||
|
|
||||||
|
const sendRes = await sendResP;
|
||||||
|
expect(sendRes.ok).toBe(true);
|
||||||
|
|
||||||
|
const evt = await abortedEventP;
|
||||||
|
expect(evt.payload?.runId).toBe("idem-abort-save-1");
|
||||||
|
expect(evt.payload?.sessionKey).toBe("main");
|
||||||
|
|
||||||
|
ws.close();
|
||||||
|
await server.close();
|
||||||
|
});
|
||||||
|
|
||||||
test("chat.abort returns aborted=false for unknown runId", async () => {
|
test("chat.abort returns aborted=false for unknown runId", async () => {
|
||||||
const dir = await fs.mkdtemp(path.join(os.tmpdir(), "clawdis-gw-"));
|
const dir = await fs.mkdtemp(path.join(os.tmpdir(), "clawdis-gw-"));
|
||||||
testSessionStorePath = path.join(dir, "sessions.json");
|
testSessionStorePath = path.join(dir, "sessions.json");
|
||||||
@@ -2454,6 +2558,69 @@ describe("gateway server", () => {
|
|||||||
await server.close();
|
await server.close();
|
||||||
});
|
});
|
||||||
|
|
||||||
|
test("bridge chat.abort cancels while saving the session store", async () => {
|
||||||
|
const dir = await fs.mkdtemp(path.join(os.tmpdir(), "clawdis-gw-"));
|
||||||
|
testSessionStorePath = path.join(dir, "sessions.json");
|
||||||
|
await fs.writeFile(
|
||||||
|
testSessionStorePath,
|
||||||
|
JSON.stringify(
|
||||||
|
{
|
||||||
|
main: {
|
||||||
|
sessionId: "sess-main",
|
||||||
|
updatedAt: Date.now(),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
null,
|
||||||
|
2,
|
||||||
|
),
|
||||||
|
"utf-8",
|
||||||
|
);
|
||||||
|
|
||||||
|
sessionStoreSaveDelayMs.value = 120;
|
||||||
|
|
||||||
|
const port = await getFreePort();
|
||||||
|
const server = await startGatewayServer(port);
|
||||||
|
const bridgeCall = bridgeStartCalls.at(-1);
|
||||||
|
expect(bridgeCall?.onRequest).toBeDefined();
|
||||||
|
|
||||||
|
const spy = vi.mocked(agentCommand);
|
||||||
|
spy.mockImplementationOnce(async (opts) => {
|
||||||
|
const signal = (opts as { abortSignal?: AbortSignal }).abortSignal;
|
||||||
|
await new Promise<void>((resolve) => {
|
||||||
|
if (!signal) return resolve();
|
||||||
|
if (signal.aborted) return resolve();
|
||||||
|
signal.addEventListener("abort", () => resolve(), { once: true });
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
const sendP = bridgeCall?.onRequest?.("ios-node", {
|
||||||
|
id: "send-abort-save-bridge-1",
|
||||||
|
method: "chat.send",
|
||||||
|
paramsJSON: JSON.stringify({
|
||||||
|
sessionKey: "main",
|
||||||
|
message: "hello",
|
||||||
|
idempotencyKey: "idem-abort-save-bridge-1",
|
||||||
|
timeoutMs: 30_000,
|
||||||
|
}),
|
||||||
|
});
|
||||||
|
|
||||||
|
const abortRes = await bridgeCall?.onRequest?.("ios-node", {
|
||||||
|
id: "abort-save-bridge-1",
|
||||||
|
method: "chat.abort",
|
||||||
|
paramsJSON: JSON.stringify({
|
||||||
|
sessionKey: "main",
|
||||||
|
runId: "idem-abort-save-bridge-1",
|
||||||
|
}),
|
||||||
|
});
|
||||||
|
|
||||||
|
expect(abortRes?.ok).toBe(true);
|
||||||
|
|
||||||
|
const sendRes = await sendP;
|
||||||
|
expect(sendRes?.ok).toBe(true);
|
||||||
|
|
||||||
|
await server.close();
|
||||||
|
});
|
||||||
|
|
||||||
test("presence includes client fingerprint", async () => {
|
test("presence includes client fingerprint", async () => {
|
||||||
const { server, ws } = await startServerWithClient();
|
const { server, ws } = await startServerWithClient();
|
||||||
await connectOk(ws, {
|
await connectOk(ws, {
|
||||||
|
|||||||
@@ -241,12 +241,9 @@ function isLoopbackAddress(ip: string | undefined): boolean {
|
|||||||
|
|
||||||
let presenceVersion = 1;
|
let presenceVersion = 1;
|
||||||
let healthVersion = 1;
|
let healthVersion = 1;
|
||||||
let seq = 0;
|
|
||||||
let healthCache: HealthSummary | null = null;
|
let healthCache: HealthSummary | null = null;
|
||||||
let healthRefresh: Promise<HealthSummary> | null = null;
|
let healthRefresh: Promise<HealthSummary> | null = null;
|
||||||
let broadcastHealthUpdate: ((snap: HealthSummary) => void) | null = null;
|
let broadcastHealthUpdate: ((snap: HealthSummary) => void) | null = null;
|
||||||
// Track per-run sequence to detect out-of-order/lost agent events.
|
|
||||||
const agentRunSeq = new Map<string, number>();
|
|
||||||
|
|
||||||
function buildSnapshot(): Snapshot {
|
function buildSnapshot(): Snapshot {
|
||||||
const presence = listSystemPresence();
|
const presence = listSystemPresence();
|
||||||
@@ -277,17 +274,6 @@ type DedupeEntry = {
|
|||||||
payload?: unknown;
|
payload?: unknown;
|
||||||
error?: ErrorShape;
|
error?: ErrorShape;
|
||||||
};
|
};
|
||||||
const dedupe = new Map<string, DedupeEntry>();
|
|
||||||
// Map agent sessionId -> {sessionKey, clientRunId} for chat events (WS WebChat clients).
|
|
||||||
const chatRunSessions = new Map<
|
|
||||||
string,
|
|
||||||
{ sessionKey: string; clientRunId: string }
|
|
||||||
>();
|
|
||||||
const chatRunBuffers = new Map<string, string>();
|
|
||||||
const chatAbortControllers = new Map<
|
|
||||||
string,
|
|
||||||
{ controller: AbortController; sessionId: string; sessionKey: string }
|
|
||||||
>();
|
|
||||||
|
|
||||||
const getGatewayToken = () => process.env.CLAWDIS_GATEWAY_TOKEN;
|
const getGatewayToken = () => process.env.CLAWDIS_GATEWAY_TOKEN;
|
||||||
|
|
||||||
@@ -582,6 +568,20 @@ export async function startGatewayServer(port = 18789): Promise<GatewayServer> {
|
|||||||
const providerAbort = new AbortController();
|
const providerAbort = new AbortController();
|
||||||
const providerTasks: Array<Promise<unknown>> = [];
|
const providerTasks: Array<Promise<unknown>> = [];
|
||||||
const clients = new Set<Client>();
|
const clients = new Set<Client>();
|
||||||
|
let seq = 0;
|
||||||
|
// Track per-run sequence to detect out-of-order/lost agent events.
|
||||||
|
const agentRunSeq = new Map<string, number>();
|
||||||
|
const dedupe = new Map<string, DedupeEntry>();
|
||||||
|
// Map agent sessionId -> {sessionKey, clientRunId} for chat events (WS WebChat clients).
|
||||||
|
const chatRunSessions = new Map<
|
||||||
|
string,
|
||||||
|
{ sessionKey: string; clientRunId: string }
|
||||||
|
>();
|
||||||
|
const chatRunBuffers = new Map<string, string>();
|
||||||
|
const chatAbortControllers = new Map<
|
||||||
|
string,
|
||||||
|
{ controller: AbortController; sessionId: string; sessionKey: string }
|
||||||
|
>();
|
||||||
const cfgAtStart = loadConfig();
|
const cfgAtStart = loadConfig();
|
||||||
setCommandLaneConcurrency("cron", cfgAtStart.cron?.maxConcurrentRuns ?? 1);
|
setCommandLaneConcurrency("cron", cfgAtStart.cron?.maxConcurrentRuns ?? 1);
|
||||||
|
|
||||||
@@ -1201,18 +1201,7 @@ export async function startGatewayServer(port = 18789): Promise<GatewayServer> {
|
|||||||
lastChannel: entry?.lastChannel,
|
lastChannel: entry?.lastChannel,
|
||||||
lastTo: entry?.lastTo,
|
lastTo: entry?.lastTo,
|
||||||
};
|
};
|
||||||
if (store) {
|
|
||||||
store[p.sessionKey] = sessionEntry;
|
|
||||||
if (storePath) {
|
|
||||||
await saveSessionStore(storePath, store);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
const clientRunId = p.idempotencyKey;
|
const clientRunId = p.idempotencyKey;
|
||||||
chatRunSessions.set(sessionId, {
|
|
||||||
sessionKey: p.sessionKey,
|
|
||||||
clientRunId,
|
|
||||||
});
|
|
||||||
|
|
||||||
const cached = dedupe.get(`chat:${clientRunId}`);
|
const cached = dedupe.get(`chat:${clientRunId}`);
|
||||||
if (cached) {
|
if (cached) {
|
||||||
@@ -1228,14 +1217,25 @@ export async function startGatewayServer(port = 18789): Promise<GatewayServer> {
|
|||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
const abortController = new AbortController();
|
|
||||||
chatAbortControllers.set(clientRunId, {
|
|
||||||
controller: abortController,
|
|
||||||
sessionId,
|
|
||||||
sessionKey: p.sessionKey,
|
|
||||||
});
|
|
||||||
|
|
||||||
try {
|
try {
|
||||||
|
const abortController = new AbortController();
|
||||||
|
chatAbortControllers.set(clientRunId, {
|
||||||
|
controller: abortController,
|
||||||
|
sessionId,
|
||||||
|
sessionKey: p.sessionKey,
|
||||||
|
});
|
||||||
|
chatRunSessions.set(sessionId, {
|
||||||
|
sessionKey: p.sessionKey,
|
||||||
|
clientRunId,
|
||||||
|
});
|
||||||
|
|
||||||
|
if (store) {
|
||||||
|
store[p.sessionKey] = sessionEntry;
|
||||||
|
if (storePath) {
|
||||||
|
await saveSessionStore(storePath, store);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
await agentCommand(
|
await agentCommand(
|
||||||
{
|
{
|
||||||
message: messageWithAttachments,
|
message: messageWithAttachments,
|
||||||
@@ -2287,17 +2287,7 @@ export async function startGatewayServer(port = 18789): Promise<GatewayServer> {
|
|||||||
lastChannel: entry?.lastChannel,
|
lastChannel: entry?.lastChannel,
|
||||||
lastTo: entry?.lastTo,
|
lastTo: entry?.lastTo,
|
||||||
};
|
};
|
||||||
if (store) {
|
|
||||||
store[p.sessionKey] = sessionEntry;
|
|
||||||
if (storePath) {
|
|
||||||
await saveSessionStore(storePath, store);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
const clientRunId = p.idempotencyKey;
|
const clientRunId = p.idempotencyKey;
|
||||||
chatRunSessions.set(sessionId, {
|
|
||||||
sessionKey: p.sessionKey,
|
|
||||||
clientRunId,
|
|
||||||
});
|
|
||||||
|
|
||||||
const cached = dedupe.get(`chat:${clientRunId}`);
|
const cached = dedupe.get(`chat:${clientRunId}`);
|
||||||
if (cached) {
|
if (cached) {
|
||||||
@@ -2314,6 +2304,17 @@ export async function startGatewayServer(port = 18789): Promise<GatewayServer> {
|
|||||||
sessionId,
|
sessionId,
|
||||||
sessionKey: p.sessionKey,
|
sessionKey: p.sessionKey,
|
||||||
});
|
});
|
||||||
|
chatRunSessions.set(sessionId, {
|
||||||
|
sessionKey: p.sessionKey,
|
||||||
|
clientRunId,
|
||||||
|
});
|
||||||
|
|
||||||
|
if (store) {
|
||||||
|
store[p.sessionKey] = sessionEntry;
|
||||||
|
if (storePath) {
|
||||||
|
await saveSessionStore(storePath, store);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
await agentCommand(
|
await agentCommand(
|
||||||
{
|
{
|
||||||
|
|||||||
Reference in New Issue
Block a user