diff --git a/CHANGELOG.md b/CHANGELOG.md index e6d3cb31d..f44ef60e8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,7 @@ - Auth: update Claude Code keychain credentials in-place during refresh sync; share JSON file helpers; add CLI fallback coverage. - Onboarding/Gateway: persist non-interactive gateway token auth in config; add WS wizard + gateway tool-calling regression coverage. - Gateway/Control UI: make `chat.send` non-blocking, wire Stop to `chat.abort`, and treat `/stop` as an out-of-band abort. (#653) +- Gateway/Control UI: allow `chat.abort` without `runId` (abort active runs), suppress post-abort chat streaming, and prune stuck chat runs. (#653) - CLI: `clawdbot sessions` now includes `elev:*` + `usage:*` flags in the table output. - CLI/Pairing: accept positional provider for `pairing list|approve` (npm-run compatible); update docs/bot hints. - Branding: normalize user-facing “ClawdBot”/“CLAWDBOT” → “Clawdbot” (CLI, status, docs). diff --git a/docs/web/control-ui.md b/docs/web/control-ui.md index 7d00288c8..0b4e6e8e4 100644 --- a/docs/web/control-ui.md +++ b/docs/web/control-ui.md @@ -42,6 +42,15 @@ The dashboard settings panel lets you store a token; passwords are not persisted - Logs: live tail of gateway file logs with filter/export (`logs.tail`) - Update: run a package/git update + restart (`update.run`) with a restart report +## Chat behavior + +- `chat.send` is **non-blocking**: it acks immediately with `{ runId, status: "started" }` and the response streams via `chat` events. +- Re-sending with the same `idempotencyKey` returns `{ status: "in_flight" }` while running, and `{ status: "ok" }` after completion. +- Stop: + - Click **Stop** (calls `chat.abort`) + - Type `/stop` (or `stop|esc|abort|wait|exit`) to abort out-of-band + - `chat.abort` supports `{ sessionKey }` (no `runId`) to abort all active runs for that session + ## Tailnet access (recommended) ### Integrated Tailscale Serve (preferred) diff --git a/src/gateway/chat-abort.ts b/src/gateway/chat-abort.ts new file mode 100644 index 000000000..a93a35f26 --- /dev/null +++ b/src/gateway/chat-abort.ts @@ -0,0 +1,119 @@ +import { isAbortTrigger } from "../auto-reply/reply/abort.js"; + +export type ChatAbortControllerEntry = { + controller: AbortController; + sessionId: string; + sessionKey: string; + startedAtMs: number; + expiresAtMs: number; +}; + +export function isChatStopCommandText(text: string): boolean { + const trimmed = text.trim(); + if (!trimmed) return false; + return trimmed.toLowerCase() === "/stop" || isAbortTrigger(trimmed); +} + +export function resolveChatRunExpiresAtMs(params: { + now: number; + timeoutMs: number; + graceMs?: number; + minMs?: number; + maxMs?: number; +}): number { + const { + now, + timeoutMs, + graceMs = 60_000, + minMs = 2 * 60_000, + maxMs = 24 * 60 * 60_000, + } = params; + const boundedTimeoutMs = Math.max(0, timeoutMs); + const target = now + boundedTimeoutMs + graceMs; + const min = now + minMs; + const max = now + maxMs; + return Math.min(max, Math.max(min, target)); +} + +export type ChatAbortOps = { + chatAbortControllers: Map; + chatRunBuffers: Map; + chatDeltaSentAt: Map; + chatAbortedRuns: Map; + removeChatRun: ( + sessionId: string, + clientRunId: string, + sessionKey?: string, + ) => { sessionKey: string; clientRunId: string } | undefined; + agentRunSeq: Map; + broadcast: ( + event: string, + payload: unknown, + opts?: { dropIfSlow?: boolean }, + ) => void; + bridgeSendToSession: ( + sessionKey: string, + event: string, + payload: unknown, + ) => void; +}; + +function broadcastChatAborted( + ops: ChatAbortOps, + params: { + runId: string; + sessionKey: string; + stopReason?: string; + }, +) { + const { runId, sessionKey, stopReason } = params; + const payload = { + runId, + sessionKey, + seq: (ops.agentRunSeq.get(runId) ?? 0) + 1, + state: "aborted" as const, + stopReason, + }; + ops.broadcast("chat", payload); + ops.bridgeSendToSession(sessionKey, "chat", payload); +} + +export function abortChatRunById( + ops: ChatAbortOps, + params: { + runId: string; + sessionKey: string; + stopReason?: string; + }, +): { aborted: boolean } { + const { runId, sessionKey, stopReason } = params; + const active = ops.chatAbortControllers.get(runId); + if (!active) return { aborted: false }; + if (active.sessionKey !== sessionKey) return { aborted: false }; + + ops.chatAbortedRuns.set(runId, Date.now()); + active.controller.abort(); + ops.chatAbortControllers.delete(runId); + ops.chatRunBuffers.delete(runId); + ops.chatDeltaSentAt.delete(runId); + ops.removeChatRun(runId, runId, sessionKey); + broadcastChatAborted(ops, { runId, sessionKey, stopReason }); + return { aborted: true }; +} + +export function abortChatRunsForSessionKey( + ops: ChatAbortOps, + params: { + sessionKey: string; + stopReason?: string; + }, +): { aborted: boolean; runIds: string[] } { + const { sessionKey, stopReason } = params; + const runIds: string[] = []; + for (const [runId, active] of ops.chatAbortControllers) { + if (active.sessionKey !== sessionKey) continue; + const res = abortChatRunById(ops, { runId, sessionKey, stopReason }); + if (res.aborted) runIds.push(runId); + } + return { aborted: runIds.length > 0, runIds }; +} diff --git a/src/gateway/protocol/schema.ts b/src/gateway/protocol/schema.ts index a76734337..acceefe46 100644 --- a/src/gateway/protocol/schema.ts +++ b/src/gateway/protocol/schema.ts @@ -903,7 +903,7 @@ export const ChatSendParamsSchema = Type.Object( export const ChatAbortParamsSchema = Type.Object( { sessionKey: NonEmptyString, - runId: NonEmptyString, + runId: Type.Optional(NonEmptyString), }, { additionalProperties: false }, ); diff --git a/src/gateway/server-bridge.ts b/src/gateway/server-bridge.ts index d44649482..1de965f9a 100644 --- a/src/gateway/server-bridge.ts +++ b/src/gateway/server-bridge.ts @@ -9,7 +9,6 @@ import { waitForEmbeddedPiRunEnd, } from "../agents/pi-embedded.js"; import { resolveAgentTimeoutMs } from "../agents/timeout.js"; -import { isAbortTrigger } from "../auto-reply/reply/abort.js"; import type { CliDeps } from "../cli/deps.js"; import { agentCommand } from "../commands/agent.js"; import type { HealthSummary } from "../commands/health.js"; @@ -37,6 +36,13 @@ import { import { clearCommandLane } from "../process/command-queue.js"; import { normalizeMainKey } from "../routing/session-key.js"; import { defaultRuntime } from "../runtime.js"; +import { + abortChatRunById, + abortChatRunsForSessionKey, + type ChatAbortControllerEntry, + isChatStopCommandText, + resolveChatRunExpiresAtMs, +} from "./chat-abort.js"; import { buildMessageWithAttachments } from "./chat-attachments.js"; import { ErrorCodes, @@ -107,10 +113,8 @@ export type BridgeHandlersContext = { clientRunId: string, sessionKey?: string, ) => ChatRunEntry | undefined; - chatAbortControllers: Map< - string, - { controller: AbortController; sessionId: string; sessionKey: string } - >; + chatAbortControllers: Map; + chatAbortedRuns: Map; chatRunBuffers: Map; chatDeltaSentAt: Map; dedupe: Map; @@ -701,13 +705,41 @@ export function createBridgeHandlers(ctx: BridgeHandlersContext) { const { sessionKey, runId } = params as { sessionKey: string; - runId: string; + runId?: string; }; + const ops = { + chatAbortControllers: ctx.chatAbortControllers, + chatRunBuffers: ctx.chatRunBuffers, + chatDeltaSentAt: ctx.chatDeltaSentAt, + chatAbortedRuns: ctx.chatAbortedRuns, + removeChatRun: ctx.removeChatRun, + agentRunSeq: ctx.agentRunSeq, + broadcast: ctx.broadcast, + bridgeSendToSession: ctx.bridgeSendToSession, + }; + if (!runId) { + const res = abortChatRunsForSessionKey(ops, { + sessionKey, + stopReason: "rpc", + }); + return { + ok: true, + payloadJSON: JSON.stringify({ + ok: true, + aborted: res.aborted, + runIds: res.runIds, + }), + }; + } const active = ctx.chatAbortControllers.get(runId); if (!active) { return { ok: true, - payloadJSON: JSON.stringify({ ok: true, aborted: false }), + payloadJSON: JSON.stringify({ + ok: true, + aborted: false, + runIds: [], + }), }; } if (active.sessionKey !== sessionKey) { @@ -719,24 +751,18 @@ export function createBridgeHandlers(ctx: BridgeHandlersContext) { }, }; } - - active.controller.abort(); - ctx.chatAbortControllers.delete(runId); - ctx.chatRunBuffers.delete(runId); - ctx.chatDeltaSentAt.delete(runId); - ctx.removeChatRun(runId, runId, sessionKey); - - const payload = { + const res = abortChatRunById(ops, { runId, sessionKey, - seq: (ctx.agentRunSeq.get(runId) ?? 0) + 1, - state: "aborted" as const, - }; - ctx.broadcast("chat", payload); - ctx.bridgeSendToSession(sessionKey, "chat", payload); + stopReason: "rpc", + }); return { ok: true, - payloadJSON: JSON.stringify({ ok: true, aborted: true }), + payloadJSON: JSON.stringify({ + ok: true, + aborted: res.aborted, + runIds: res.aborted ? [runId] : [], + }), }; } case "chat.send": { @@ -765,12 +791,7 @@ export function createBridgeHandlers(ctx: BridgeHandlersContext) { timeoutMs?: number; idempotencyKey: string; }; - const stopCommand = (() => { - const msg = p.message.trim(); - if (!msg) return false; - const normalized = msg.toLowerCase(); - return normalized === "/stop" || isAbortTrigger(msg); - })(); + const stopCommand = isChatStopCommandText(p.message); const normalizedAttachments = p.attachments?.map((a) => ({ type: typeof a?.type === "string" ? a.type : undefined, @@ -826,30 +847,25 @@ export function createBridgeHandlers(ctx: BridgeHandlersContext) { registerAgentRunContext(clientRunId, { sessionKey: p.sessionKey }); if (stopCommand) { - const runIds: string[] = []; - for (const [runId, active] of ctx.chatAbortControllers) { - if (active.sessionKey !== p.sessionKey) continue; - active.controller.abort(); - ctx.chatAbortControllers.delete(runId); - ctx.chatRunBuffers.delete(runId); - ctx.chatDeltaSentAt.delete(runId); - ctx.removeChatRun(runId, runId, p.sessionKey); - const payload = { - runId, - sessionKey: p.sessionKey, - seq: (ctx.agentRunSeq.get(runId) ?? 0) + 1, - state: "aborted" as const, - }; - ctx.broadcast("chat", payload); - ctx.bridgeSendToSession(p.sessionKey, "chat", payload); - runIds.push(runId); - } + const res = abortChatRunsForSessionKey( + { + chatAbortControllers: ctx.chatAbortControllers, + chatRunBuffers: ctx.chatRunBuffers, + chatDeltaSentAt: ctx.chatDeltaSentAt, + chatAbortedRuns: ctx.chatAbortedRuns, + removeChatRun: ctx.removeChatRun, + agentRunSeq: ctx.agentRunSeq, + broadcast: ctx.broadcast, + bridgeSendToSession: ctx.bridgeSendToSession, + }, + { sessionKey: p.sessionKey, stopReason: "stop" }, + ); return { ok: true, payloadJSON: JSON.stringify({ ok: true, - aborted: runIds.length > 0, - runIds, + aborted: res.aborted, + runIds: res.runIds, }), }; } @@ -885,6 +901,8 @@ export function createBridgeHandlers(ctx: BridgeHandlersContext) { controller: abortController, sessionId, sessionKey: p.sessionKey, + startedAtMs: now, + expiresAtMs: resolveChatRunExpiresAtMs({ now, timeoutMs }), }); ctx.addChatRun(clientRunId, { sessionKey: p.sessionKey, diff --git a/src/gateway/server-chat.ts b/src/gateway/server-chat.ts index 8d418b62d..ddf36da7f 100644 --- a/src/gateway/server-chat.ts +++ b/src/gateway/server-chat.ts @@ -74,6 +74,7 @@ export type ChatRunState = { registry: ChatRunRegistry; buffers: Map; deltaSentAt: Map; + abortedRuns: Map; clear: () => void; }; @@ -81,17 +82,20 @@ export function createChatRunState(): ChatRunState { const registry = createChatRunRegistry(); const buffers = new Map(); const deltaSentAt = new Map(); + const abortedRuns = new Map(); const clear = () => { registry.clear(); buffers.clear(); deltaSentAt.clear(); + abortedRuns.clear(); }; return { registry, buffers, deltaSentAt, + abortedRuns, clear, }; } @@ -212,6 +216,10 @@ export function createAgentEventHandler({ const chatLink = chatRunState.registry.peek(evt.runId); const sessionKey = chatLink?.sessionKey ?? resolveSessionKeyForRun(evt.runId); + const clientRunId = chatLink?.clientRunId ?? evt.runId; + const isAborted = + chatRunState.abortedRuns.has(clientRunId) || + chatRunState.abortedRuns.has(evt.runId); // Include sessionKey so Control UI can filter tool streams per session. const agentPayload = sessionKey ? { ...evt, sessionKey } : evt; const last = agentRunSeq.get(evt.runId) ?? 0; @@ -242,10 +250,16 @@ export function createAgentEventHandler({ if (sessionKey) { bridgeSendToSession(sessionKey, "agent", agentPayload); - if (evt.stream === "assistant" && typeof evt.data?.text === "string") { - const clientRunId = chatLink?.clientRunId ?? evt.runId; + if ( + !isAborted && + evt.stream === "assistant" && + typeof evt.data?.text === "string" + ) { emitChatDelta(sessionKey, clientRunId, evt.seq, evt.data.text); - } else if (lifecyclePhase === "end" || lifecyclePhase === "error") { + } else if ( + !isAborted && + (lifecyclePhase === "end" || lifecyclePhase === "error") + ) { if (chatLink) { const finished = chatRunState.registry.shift(evt.runId); if (!finished) { @@ -268,6 +282,17 @@ export function createAgentEventHandler({ evt.data?.error, ); } + } else if ( + isAborted && + (lifecyclePhase === "end" || lifecyclePhase === "error") + ) { + chatRunState.abortedRuns.delete(clientRunId); + chatRunState.abortedRuns.delete(evt.runId); + chatRunState.buffers.delete(clientRunId); + chatRunState.deltaSentAt.delete(clientRunId); + if (chatLink) { + chatRunState.registry.remove(evt.runId, clientRunId, sessionKey); + } } } diff --git a/src/gateway/server-methods/chat.ts b/src/gateway/server-methods/chat.ts index 4305db9d2..e2c5db37b 100644 --- a/src/gateway/server-methods/chat.ts +++ b/src/gateway/server-methods/chat.ts @@ -2,12 +2,17 @@ import { randomUUID } from "node:crypto"; import { resolveThinkingDefault } from "../../agents/model-selection.js"; import { resolveAgentTimeoutMs } from "../../agents/timeout.js"; -import { isAbortTrigger } from "../../auto-reply/reply/abort.js"; import { agentCommand } from "../../commands/agent.js"; import { mergeSessionEntry, saveSessionStore } from "../../config/sessions.js"; import { registerAgentRunContext } from "../../infra/agent-events.js"; import { defaultRuntime } from "../../runtime.js"; import { resolveSendPolicy } from "../../sessions/send-policy.js"; +import { + abortChatRunById, + abortChatRunsForSessionKey, + isChatStopCommandText, + resolveChatRunExpiresAtMs, +} from "../chat-abort.js"; import { buildMessageWithAttachments } from "../chat-attachments.js"; import { ErrorCodes, @@ -97,11 +102,32 @@ export const chatHandlers: GatewayRequestHandlers = { } const { sessionKey, runId } = params as { sessionKey: string; - runId: string; + runId?: string; }; + + const ops = { + chatAbortControllers: context.chatAbortControllers, + chatRunBuffers: context.chatRunBuffers, + chatDeltaSentAt: context.chatDeltaSentAt, + chatAbortedRuns: context.chatAbortedRuns, + removeChatRun: context.removeChatRun, + agentRunSeq: context.agentRunSeq, + broadcast: context.broadcast, + bridgeSendToSession: context.bridgeSendToSession, + }; + + if (!runId) { + const res = abortChatRunsForSessionKey(ops, { + sessionKey, + stopReason: "rpc", + }); + respond(true, { ok: true, aborted: res.aborted, runIds: res.runIds }); + return; + } + const active = context.chatAbortControllers.get(runId); if (!active) { - respond(true, { ok: true, aborted: false }); + respond(true, { ok: true, aborted: false, runIds: [] }); return; } if (active.sessionKey !== sessionKey) { @@ -116,21 +142,16 @@ export const chatHandlers: GatewayRequestHandlers = { return; } - active.controller.abort(); - context.chatAbortControllers.delete(runId); - context.chatRunBuffers.delete(runId); - context.chatDeltaSentAt.delete(runId); - context.removeChatRun(runId, runId, sessionKey); - - const payload = { + const res = abortChatRunById(ops, { runId, sessionKey, - seq: (context.agentRunSeq.get(runId) ?? 0) + 1, - state: "aborted" as const, - }; - context.broadcast("chat", payload); - context.bridgeSendToSession(sessionKey, "chat", payload); - respond(true, { ok: true, aborted: true }); + stopReason: "rpc", + }); + respond(true, { + ok: true, + aborted: res.aborted, + runIds: res.aborted ? [runId] : [], + }); }, "chat.send": async ({ params, respond, context }) => { if (!validateChatSendParams(params)) { @@ -158,12 +179,7 @@ export const chatHandlers: GatewayRequestHandlers = { timeoutMs?: number; idempotencyKey: string; }; - const stopCommand = (() => { - const msg = p.message.trim(); - if (!msg) return false; - const normalized = msg.toLowerCase(); - return normalized === "/stop" || isAbortTrigger(msg); - })(); + const stopCommand = isChatStopCommandText(p.message); const normalizedAttachments = p.attachments?.map((a) => ({ type: typeof a?.type === "string" ? a.type : undefined, @@ -231,29 +247,20 @@ export const chatHandlers: GatewayRequestHandlers = { } if (stopCommand) { - const runIds: string[] = []; - for (const [runId, active] of context.chatAbortControllers) { - if (active.sessionKey !== p.sessionKey) continue; - active.controller.abort(); - context.chatAbortControllers.delete(runId); - context.chatRunBuffers.delete(runId); - context.chatDeltaSentAt.delete(runId); - context.removeChatRun(runId, runId, p.sessionKey); - const payload = { - runId, - sessionKey: p.sessionKey, - seq: (context.agentRunSeq.get(runId) ?? 0) + 1, - state: "aborted" as const, - }; - context.broadcast("chat", payload); - context.bridgeSendToSession(p.sessionKey, "chat", payload); - runIds.push(runId); - } - respond(true, { - ok: true, - aborted: runIds.length > 0, - runIds, - }); + const res = abortChatRunsForSessionKey( + { + chatAbortControllers: context.chatAbortControllers, + chatRunBuffers: context.chatRunBuffers, + chatDeltaSentAt: context.chatDeltaSentAt, + chatAbortedRuns: context.chatAbortedRuns, + removeChatRun: context.removeChatRun, + agentRunSeq: context.agentRunSeq, + broadcast: context.broadcast, + bridgeSendToSession: context.bridgeSendToSession, + }, + { sessionKey: p.sessionKey, stopReason: "stop" }, + ); + respond(true, { ok: true, aborted: res.aborted, runIds: res.runIds }); return; } @@ -282,6 +289,8 @@ export const chatHandlers: GatewayRequestHandlers = { controller: abortController, sessionId, sessionKey: p.sessionKey, + startedAtMs: now, + expiresAtMs: resolveChatRunExpiresAtMs({ now, timeoutMs }), }); context.addChatRun(clientRunId, { sessionKey: p.sessionKey, diff --git a/src/gateway/server-methods/types.ts b/src/gateway/server-methods/types.ts index 6cb18672a..79545fe80 100644 --- a/src/gateway/server-methods/types.ts +++ b/src/gateway/server-methods/types.ts @@ -4,6 +4,7 @@ import type { HealthSummary } from "../../commands/health.js"; import type { CronService } from "../../cron/service.js"; import type { startNodeBridgeServer } from "../../infra/bridge/server.js"; import type { WizardSession } from "../../wizard/session.js"; +import type { ChatAbortControllerEntry } from "../chat-abort.js"; import type { ConnectParams, ErrorShape, @@ -49,10 +50,8 @@ export type GatewayRequestContext = { ) => void; hasConnectedMobileNode: () => boolean; agentRunSeq: Map; - chatAbortControllers: Map< - string, - { controller: AbortController; sessionId: string; sessionKey: string } - >; + chatAbortControllers: Map; + chatAbortedRuns: Map; chatRunBuffers: Map; chatDeltaSentAt: Map; addChatRun: ( diff --git a/src/gateway/server.chat.test.ts b/src/gateway/server.chat.test.ts index f6d37f18b..9ab565b84 100644 --- a/src/gateway/server.chat.test.ts +++ b/src/gateway/server.chat.test.ts @@ -836,6 +836,136 @@ describe("gateway server chat", () => { }, ); + test("chat.send idempotency returns started → in_flight → ok", async () => { + const { server, ws } = await startServerWithClient(); + await connectOk(ws); + + const spy = vi.mocked(agentCommand); + let resolveRun: (() => void) | undefined; + const runDone = new Promise((resolve) => { + resolveRun = resolve; + }); + spy.mockImplementationOnce(async () => { + await runDone; + }); + + const started = await rpcReq<{ runId?: string; status?: string }>( + ws, + "chat.send", + { + sessionKey: "main", + message: "hello", + idempotencyKey: "idem-status-1", + }, + ); + expect(started.ok).toBe(true); + expect(started.payload?.status).toBe("started"); + + const inFlight = await rpcReq<{ runId?: string; status?: string }>( + ws, + "chat.send", + { + sessionKey: "main", + message: "hello", + idempotencyKey: "idem-status-1", + }, + ); + expect(inFlight.ok).toBe(true); + expect(inFlight.payload?.status).toBe("in_flight"); + + resolveRun?.(); + + let completed = false; + for (let i = 0; i < 50; i++) { + const again = await rpcReq<{ runId?: string; status?: string }>( + ws, + "chat.send", + { + sessionKey: "main", + message: "hello", + idempotencyKey: "idem-status-1", + }, + ); + if (again.ok && again.payload?.status === "ok") { + completed = true; + break; + } + await new Promise((r) => setTimeout(r, 10)); + } + expect(completed).toBe(true); + + ws.close(); + await server.close(); + }); + + test("chat.abort without runId aborts active runs and suppresses chat events after abort", async () => { + const { server, ws } = await startServerWithClient(); + await connectOk(ws); + + const spy = vi.mocked(agentCommand); + spy.mockImplementationOnce(async (opts) => { + const signal = (opts as { abortSignal?: AbortSignal }).abortSignal; + await new Promise((resolve) => { + if (!signal) return resolve(); + if (signal.aborted) return resolve(); + signal.addEventListener("abort", () => resolve(), { once: true }); + }); + }); + + const abortedEventP = onceMessage( + ws, + (o) => + o.type === "event" && + o.event === "chat" && + o.payload?.state === "aborted" && + o.payload?.runId === "idem-abort-all-1", + ); + + const started = await rpcReq(ws, "chat.send", { + sessionKey: "main", + message: "hello", + idempotencyKey: "idem-abort-all-1", + }); + expect(started.ok).toBe(true); + + const abortRes = await rpcReq<{ + ok?: boolean; + aborted?: boolean; + runIds?: string[]; + }>(ws, "chat.abort", { sessionKey: "main" }); + expect(abortRes.ok).toBe(true); + expect(abortRes.payload?.aborted).toBe(true); + expect(abortRes.payload?.runIds ?? []).toContain("idem-abort-all-1"); + + await abortedEventP; + + const noDeltaP = onceMessage( + ws, + (o) => + o.type === "event" && + o.event === "chat" && + (o.payload?.state === "delta" || o.payload?.state === "final") && + o.payload?.runId === "idem-abort-all-1", + 250, + ); + + emitAgentEvent({ + runId: "idem-abort-all-1", + stream: "assistant", + data: { text: "should be suppressed" }, + }); + emitAgentEvent({ + runId: "idem-abort-all-1", + stream: "lifecycle", + data: { phase: "end" }, + }); + + await expect(noDeltaP).rejects.toThrow(/timeout/i); + + ws.close(); + await server.close(); + }); + test("chat.abort returns aborted=false for unknown runId", async () => { const dir = await fs.mkdtemp(path.join(os.tmpdir(), "clawdbot-gw-")); testState.sessionStorePath = path.join(dir, "sessions.json"); diff --git a/src/gateway/server.ts b/src/gateway/server.ts index 0d1e8ebc2..805ae7ffe 100644 --- a/src/gateway/server.ts +++ b/src/gateway/server.ts @@ -110,6 +110,10 @@ import { type ResolvedGatewayAuth, resolveGatewayAuth, } from "./auth.js"; +import { + abortChatRunById, + type ChatAbortControllerEntry, +} from "./chat-abort.js"; import { type GatewayReloadPlan, type ProviderKind, @@ -685,10 +689,7 @@ export async function startGatewayServer( } return sessionKey; }; - const chatAbortControllers = new Map< - string, - { controller: AbortController; sessionId: string; sessionKey: string } - >(); + const chatAbortControllers = new Map(); setCommandLaneConcurrency("cron", cfgAtStart.cron?.maxConcurrentRuns ?? 1); setCommandLaneConcurrency( "main", @@ -967,6 +968,7 @@ export async function startGatewayServer( addChatRun, removeChatRun, chatAbortControllers, + chatAbortedRuns: chatRunState.abortedRuns, chatRunBuffers, chatDeltaSentAt, dedupe, @@ -1192,6 +1194,31 @@ export async function startGatewayServer( dedupe.delete(entries[i][0]); } } + + for (const [runId, entry] of chatAbortControllers) { + if (now <= entry.expiresAtMs) continue; + abortChatRunById( + { + chatAbortControllers, + chatRunBuffers, + chatDeltaSentAt, + chatAbortedRuns: chatRunState.abortedRuns, + removeChatRun, + agentRunSeq, + broadcast, + bridgeSendToSession, + }, + { runId, sessionKey: entry.sessionKey, stopReason: "timeout" }, + ); + } + + const ABORTED_RUN_TTL_MS = 60 * 60_000; + for (const [runId, abortedAt] of chatRunState.abortedRuns) { + if (now - abortedAt <= ABORTED_RUN_TTL_MS) continue; + chatRunState.abortedRuns.delete(runId); + chatRunBuffers.delete(runId); + chatDeltaSentAt.delete(runId); + } }, 60_000); const agentUnsub = onAgentEvent( @@ -1647,6 +1674,7 @@ export async function startGatewayServer( hasConnectedMobileNode, agentRunSeq, chatAbortControllers, + chatAbortedRuns: chatRunState.abortedRuns, chatRunBuffers, chatDeltaSentAt, addChatRun, diff --git a/ui/src/ui/app.ts b/ui/src/ui/app.ts index e16bc2739..d6bd06f9a 100644 --- a/ui/src/ui/app.ts +++ b/ui/src/ui/app.ts @@ -1033,13 +1033,19 @@ export class ClawdbotApp extends LitElement { const trimmed = text.trim(); if (!trimmed) return false; const normalized = trimmed.toLowerCase(); - return normalized === "/stop" || normalized === "stop" || normalized === "abort"; + if (normalized === "/stop") return true; + return ( + normalized === "stop" || + normalized === "esc" || + normalized === "abort" || + normalized === "wait" || + normalized === "exit" + ); } async handleAbortChat() { if (!this.connected) return; this.chatMessage = ""; - if (!this.chatRunId) return; await abortChatRun(this); } diff --git a/ui/src/ui/controllers/chat.ts b/ui/src/ui/controllers/chat.ts index 55c71b65f..8ea5ad84d 100644 --- a/ui/src/ui/controllers/chat.ts +++ b/ui/src/ui/controllers/chat.ts @@ -95,12 +95,13 @@ export async function sendChatMessage(state: ChatState, message: string): Promis export async function abortChatRun(state: ChatState): Promise { if (!state.client || !state.connected) return false; const runId = state.chatRunId; - if (!runId) return false; try { - await state.client.request("chat.abort", { - sessionKey: state.sessionKey, - runId, - }); + await state.client.request( + "chat.abort", + runId + ? { sessionKey: state.sessionKey, runId } + : { sessionKey: state.sessionKey }, + ); return true; } catch (err) { state.lastError = String(err);