fix(gateway): harden chat abort semantics

This commit is contained in:
Peter Steinberger
2026-01-10 17:23:16 +01:00
parent 84d64f9395
commit a1533a17f7
12 changed files with 456 additions and 111 deletions

View File

@@ -9,7 +9,6 @@ import {
waitForEmbeddedPiRunEnd,
} from "../agents/pi-embedded.js";
import { resolveAgentTimeoutMs } from "../agents/timeout.js";
import { isAbortTrigger } from "../auto-reply/reply/abort.js";
import type { CliDeps } from "../cli/deps.js";
import { agentCommand } from "../commands/agent.js";
import type { HealthSummary } from "../commands/health.js";
@@ -37,6 +36,13 @@ import {
import { clearCommandLane } from "../process/command-queue.js";
import { normalizeMainKey } from "../routing/session-key.js";
import { defaultRuntime } from "../runtime.js";
import {
abortChatRunById,
abortChatRunsForSessionKey,
type ChatAbortControllerEntry,
isChatStopCommandText,
resolveChatRunExpiresAtMs,
} from "./chat-abort.js";
import { buildMessageWithAttachments } from "./chat-attachments.js";
import {
ErrorCodes,
@@ -107,10 +113,8 @@ export type BridgeHandlersContext = {
clientRunId: string,
sessionKey?: string,
) => ChatRunEntry | undefined;
chatAbortControllers: Map<
string,
{ controller: AbortController; sessionId: string; sessionKey: string }
>;
chatAbortControllers: Map<string, ChatAbortControllerEntry>;
chatAbortedRuns: Map<string, number>;
chatRunBuffers: Map<string, string>;
chatDeltaSentAt: Map<string, number>;
dedupe: Map<string, DedupeEntry>;
@@ -701,13 +705,41 @@ export function createBridgeHandlers(ctx: BridgeHandlersContext) {
const { sessionKey, runId } = params as {
sessionKey: string;
runId: string;
runId?: string;
};
const ops = {
chatAbortControllers: ctx.chatAbortControllers,
chatRunBuffers: ctx.chatRunBuffers,
chatDeltaSentAt: ctx.chatDeltaSentAt,
chatAbortedRuns: ctx.chatAbortedRuns,
removeChatRun: ctx.removeChatRun,
agentRunSeq: ctx.agentRunSeq,
broadcast: ctx.broadcast,
bridgeSendToSession: ctx.bridgeSendToSession,
};
if (!runId) {
const res = abortChatRunsForSessionKey(ops, {
sessionKey,
stopReason: "rpc",
});
return {
ok: true,
payloadJSON: JSON.stringify({
ok: true,
aborted: res.aborted,
runIds: res.runIds,
}),
};
}
const active = ctx.chatAbortControllers.get(runId);
if (!active) {
return {
ok: true,
payloadJSON: JSON.stringify({ ok: true, aborted: false }),
payloadJSON: JSON.stringify({
ok: true,
aborted: false,
runIds: [],
}),
};
}
if (active.sessionKey !== sessionKey) {
@@ -719,24 +751,18 @@ export function createBridgeHandlers(ctx: BridgeHandlersContext) {
},
};
}
active.controller.abort();
ctx.chatAbortControllers.delete(runId);
ctx.chatRunBuffers.delete(runId);
ctx.chatDeltaSentAt.delete(runId);
ctx.removeChatRun(runId, runId, sessionKey);
const payload = {
const res = abortChatRunById(ops, {
runId,
sessionKey,
seq: (ctx.agentRunSeq.get(runId) ?? 0) + 1,
state: "aborted" as const,
};
ctx.broadcast("chat", payload);
ctx.bridgeSendToSession(sessionKey, "chat", payload);
stopReason: "rpc",
});
return {
ok: true,
payloadJSON: JSON.stringify({ ok: true, aborted: true }),
payloadJSON: JSON.stringify({
ok: true,
aborted: res.aborted,
runIds: res.aborted ? [runId] : [],
}),
};
}
case "chat.send": {
@@ -765,12 +791,7 @@ export function createBridgeHandlers(ctx: BridgeHandlersContext) {
timeoutMs?: number;
idempotencyKey: string;
};
const stopCommand = (() => {
const msg = p.message.trim();
if (!msg) return false;
const normalized = msg.toLowerCase();
return normalized === "/stop" || isAbortTrigger(msg);
})();
const stopCommand = isChatStopCommandText(p.message);
const normalizedAttachments =
p.attachments?.map((a) => ({
type: typeof a?.type === "string" ? a.type : undefined,
@@ -826,30 +847,25 @@ export function createBridgeHandlers(ctx: BridgeHandlersContext) {
registerAgentRunContext(clientRunId, { sessionKey: p.sessionKey });
if (stopCommand) {
const runIds: string[] = [];
for (const [runId, active] of ctx.chatAbortControllers) {
if (active.sessionKey !== p.sessionKey) continue;
active.controller.abort();
ctx.chatAbortControllers.delete(runId);
ctx.chatRunBuffers.delete(runId);
ctx.chatDeltaSentAt.delete(runId);
ctx.removeChatRun(runId, runId, p.sessionKey);
const payload = {
runId,
sessionKey: p.sessionKey,
seq: (ctx.agentRunSeq.get(runId) ?? 0) + 1,
state: "aborted" as const,
};
ctx.broadcast("chat", payload);
ctx.bridgeSendToSession(p.sessionKey, "chat", payload);
runIds.push(runId);
}
const res = abortChatRunsForSessionKey(
{
chatAbortControllers: ctx.chatAbortControllers,
chatRunBuffers: ctx.chatRunBuffers,
chatDeltaSentAt: ctx.chatDeltaSentAt,
chatAbortedRuns: ctx.chatAbortedRuns,
removeChatRun: ctx.removeChatRun,
agentRunSeq: ctx.agentRunSeq,
broadcast: ctx.broadcast,
bridgeSendToSession: ctx.bridgeSendToSession,
},
{ sessionKey: p.sessionKey, stopReason: "stop" },
);
return {
ok: true,
payloadJSON: JSON.stringify({
ok: true,
aborted: runIds.length > 0,
runIds,
aborted: res.aborted,
runIds: res.runIds,
}),
};
}
@@ -885,6 +901,8 @@ export function createBridgeHandlers(ctx: BridgeHandlersContext) {
controller: abortController,
sessionId,
sessionKey: p.sessionKey,
startedAtMs: now,
expiresAtMs: resolveChatRunExpiresAtMs({ now, timeoutMs }),
});
ctx.addChatRun(clientRunId, {
sessionKey: p.sessionKey,