diff --git a/src/gateway/server.test.ts b/src/gateway/server.test.ts index 600687845..3bd7aa3da 100644 --- a/src/gateway/server.test.ts +++ b/src/gateway/server.test.ts @@ -68,6 +68,22 @@ let testSessionStorePath: string | undefined; let testAllowFrom: string[] | undefined; let testCronStorePath: string | undefined; let testCronEnabled: boolean | undefined = false; +const sessionStoreSaveDelayMs = vi.hoisted(() => ({ value: 0 })); +vi.mock("../config/sessions.js", async () => { + const actual = await vi.importActual( + "../config/sessions.js", + ); + return { + ...actual, + saveSessionStore: vi.fn(async (storePath: string, store: unknown) => { + const delay = sessionStoreSaveDelayMs.value; + if (delay > 0) { + await new Promise((resolve) => setTimeout(resolve, delay)); + } + return actual.saveSessionStore(storePath, store as never); + }), + }; +}); vi.mock("../config/config.js", () => ({ loadConfig: () => ({ inbound: { @@ -109,6 +125,7 @@ beforeEach(async () => { previousHome = process.env.HOME; tempHome = await fs.mkdtemp(path.join(os.tmpdir(), "clawdis-gateway-home-")); process.env.HOME = tempHome; + sessionStoreSaveDelayMs.value = 0; }); afterEach(async () => { @@ -2075,6 +2092,93 @@ describe("gateway server", () => { }, ); + test("chat.abort cancels while saving the session store", async () => { + const dir = await fs.mkdtemp(path.join(os.tmpdir(), "clawdis-gw-")); + testSessionStorePath = path.join(dir, "sessions.json"); + await fs.writeFile( + testSessionStorePath, + JSON.stringify( + { + main: { + sessionId: "sess-main", + updatedAt: Date.now(), + }, + }, + null, + 2, + ), + "utf-8", + ); + + sessionStoreSaveDelayMs.value = 120; + + const { server, ws } = await startServerWithClient(); + await connectOk(ws); + + const spy = vi.mocked(agentCommand); + spy.mockImplementationOnce(async (opts) => { + const signal = (opts as { abortSignal?: AbortSignal }).abortSignal; + await new Promise((resolve) => { + if (!signal) return resolve(); + if (signal.aborted) return resolve(); + signal.addEventListener("abort", () => resolve(), { once: true }); + }); + }); + + const abortedEventP = onceMessage( + ws, + (o) => + o.type === "event" && + o.event === "chat" && + o.payload?.state === "aborted", + ); + + const sendResP = onceMessage( + ws, + (o) => o.type === "res" && o.id === "send-abort-save-1", + ); + + ws.send( + JSON.stringify({ + type: "req", + id: "send-abort-save-1", + method: "chat.send", + params: { + sessionKey: "main", + message: "hello", + idempotencyKey: "idem-abort-save-1", + timeoutMs: 30_000, + }, + }), + ); + + const abortResP = onceMessage( + ws, + (o) => o.type === "res" && o.id === "abort-save-1", + ); + ws.send( + JSON.stringify({ + type: "req", + id: "abort-save-1", + method: "chat.abort", + params: { sessionKey: "main", runId: "idem-abort-save-1" }, + }), + ); + + const abortRes = await abortResP; + expect(abortRes.ok).toBe(true); + + const sendRes = await sendResP; + expect(sendRes.ok).toBe(true); + + const evt = await abortedEventP; + expect(evt.payload?.runId).toBe("idem-abort-save-1"); + expect(evt.payload?.sessionKey).toBe("main"); + + ws.close(); + await server.close(); + }); + test("chat.abort returns aborted=false for unknown runId", async () => { const dir = await fs.mkdtemp(path.join(os.tmpdir(), "clawdis-gw-")); testSessionStorePath = path.join(dir, "sessions.json"); @@ -2454,6 +2558,69 @@ describe("gateway server", () => { await server.close(); }); + test("bridge chat.abort cancels while saving the session store", async () => { + const dir = await fs.mkdtemp(path.join(os.tmpdir(), "clawdis-gw-")); + testSessionStorePath = path.join(dir, "sessions.json"); + await fs.writeFile( + testSessionStorePath, + JSON.stringify( + { + main: { + sessionId: "sess-main", + updatedAt: Date.now(), + }, + }, + null, + 2, + ), + "utf-8", + ); + + sessionStoreSaveDelayMs.value = 120; + + const port = await getFreePort(); + const server = await startGatewayServer(port); + const bridgeCall = bridgeStartCalls.at(-1); + expect(bridgeCall?.onRequest).toBeDefined(); + + const spy = vi.mocked(agentCommand); + spy.mockImplementationOnce(async (opts) => { + const signal = (opts as { abortSignal?: AbortSignal }).abortSignal; + await new Promise((resolve) => { + if (!signal) return resolve(); + if (signal.aborted) return resolve(); + signal.addEventListener("abort", () => resolve(), { once: true }); + }); + }); + + const sendP = bridgeCall?.onRequest?.("ios-node", { + id: "send-abort-save-bridge-1", + method: "chat.send", + paramsJSON: JSON.stringify({ + sessionKey: "main", + message: "hello", + idempotencyKey: "idem-abort-save-bridge-1", + timeoutMs: 30_000, + }), + }); + + const abortRes = await bridgeCall?.onRequest?.("ios-node", { + id: "abort-save-bridge-1", + method: "chat.abort", + paramsJSON: JSON.stringify({ + sessionKey: "main", + runId: "idem-abort-save-bridge-1", + }), + }); + + expect(abortRes?.ok).toBe(true); + + const sendRes = await sendP; + expect(sendRes?.ok).toBe(true); + + await server.close(); + }); + test("presence includes client fingerprint", async () => { const { server, ws } = await startServerWithClient(); await connectOk(ws, { diff --git a/src/gateway/server.ts b/src/gateway/server.ts index 0eed58948..83c28ac4c 100644 --- a/src/gateway/server.ts +++ b/src/gateway/server.ts @@ -241,12 +241,9 @@ function isLoopbackAddress(ip: string | undefined): boolean { let presenceVersion = 1; let healthVersion = 1; -let seq = 0; let healthCache: HealthSummary | null = null; let healthRefresh: Promise | null = null; let broadcastHealthUpdate: ((snap: HealthSummary) => void) | null = null; -// Track per-run sequence to detect out-of-order/lost agent events. -const agentRunSeq = new Map(); function buildSnapshot(): Snapshot { const presence = listSystemPresence(); @@ -277,17 +274,6 @@ type DedupeEntry = { payload?: unknown; error?: ErrorShape; }; -const dedupe = new Map(); -// Map agent sessionId -> {sessionKey, clientRunId} for chat events (WS WebChat clients). -const chatRunSessions = new Map< - string, - { sessionKey: string; clientRunId: string } ->(); -const chatRunBuffers = new Map(); -const chatAbortControllers = new Map< - string, - { controller: AbortController; sessionId: string; sessionKey: string } ->(); const getGatewayToken = () => process.env.CLAWDIS_GATEWAY_TOKEN; @@ -582,6 +568,20 @@ export async function startGatewayServer(port = 18789): Promise { const providerAbort = new AbortController(); const providerTasks: Array> = []; const clients = new Set(); + let seq = 0; + // Track per-run sequence to detect out-of-order/lost agent events. + const agentRunSeq = new Map(); + const dedupe = new Map(); + // Map agent sessionId -> {sessionKey, clientRunId} for chat events (WS WebChat clients). + const chatRunSessions = new Map< + string, + { sessionKey: string; clientRunId: string } + >(); + const chatRunBuffers = new Map(); + const chatAbortControllers = new Map< + string, + { controller: AbortController; sessionId: string; sessionKey: string } + >(); const cfgAtStart = loadConfig(); setCommandLaneConcurrency("cron", cfgAtStart.cron?.maxConcurrentRuns ?? 1); @@ -1201,18 +1201,7 @@ export async function startGatewayServer(port = 18789): Promise { lastChannel: entry?.lastChannel, lastTo: entry?.lastTo, }; - if (store) { - store[p.sessionKey] = sessionEntry; - if (storePath) { - await saveSessionStore(storePath, store); - } - } - const clientRunId = p.idempotencyKey; - chatRunSessions.set(sessionId, { - sessionKey: p.sessionKey, - clientRunId, - }); const cached = dedupe.get(`chat:${clientRunId}`); if (cached) { @@ -1228,14 +1217,25 @@ export async function startGatewayServer(port = 18789): Promise { }; } - const abortController = new AbortController(); - chatAbortControllers.set(clientRunId, { - controller: abortController, - sessionId, - sessionKey: p.sessionKey, - }); - try { + const abortController = new AbortController(); + chatAbortControllers.set(clientRunId, { + controller: abortController, + sessionId, + sessionKey: p.sessionKey, + }); + chatRunSessions.set(sessionId, { + sessionKey: p.sessionKey, + clientRunId, + }); + + if (store) { + store[p.sessionKey] = sessionEntry; + if (storePath) { + await saveSessionStore(storePath, store); + } + } + await agentCommand( { message: messageWithAttachments, @@ -2287,17 +2287,7 @@ export async function startGatewayServer(port = 18789): Promise { lastChannel: entry?.lastChannel, lastTo: entry?.lastTo, }; - if (store) { - store[p.sessionKey] = sessionEntry; - if (storePath) { - await saveSessionStore(storePath, store); - } - } const clientRunId = p.idempotencyKey; - chatRunSessions.set(sessionId, { - sessionKey: p.sessionKey, - clientRunId, - }); const cached = dedupe.get(`chat:${clientRunId}`); if (cached) { @@ -2314,6 +2304,17 @@ export async function startGatewayServer(port = 18789): Promise { sessionId, sessionKey: p.sessionKey, }); + chatRunSessions.set(sessionId, { + sessionKey: p.sessionKey, + clientRunId, + }); + + if (store) { + store[p.sessionKey] = sessionEntry; + if (storePath) { + await saveSessionStore(storePath, store); + } + } await agentCommand( {