fix: hard-abort clears queues on /stop

This commit is contained in:
Peter Steinberger
2026-01-16 21:15:25 +00:00
parent d887027e95
commit 9072e35f08
10 changed files with 184 additions and 13 deletions

View File

@@ -50,6 +50,7 @@ vi.mock("../agents/model-catalog.js", () => modelCatalogMocks);
import { abortEmbeddedPiRun, runEmbeddedPiAgent } from "../agents/pi-embedded.js";
import { loadSessionStore } from "../config/sessions.js";
import { enqueueFollowupRun, getFollowupQueueDepth, type FollowupRun } from "./reply/queue.js";
import { getReplyFromConfig } from "./reply.js";
const MAIN_SESSION_KEY = "agent:main:main";
@@ -113,6 +114,32 @@ describe("trigger handling", () => {
2,
),
);
const followupRun: FollowupRun = {
prompt: "queued",
enqueuedAt: Date.now(),
run: {
agentId: "main",
agentDir: join(home, "agent"),
sessionId: targetSessionId,
sessionKey: targetSessionKey,
messageProvider: "telegram",
agentAccountId: "acct",
sessionFile: join(home, "session.jsonl"),
workspaceDir: join(home, "workspace"),
config: cfg,
provider: "anthropic",
model: "claude-opus-4-5",
timeoutMs: 1000,
blockReplyBreak: "text_end",
},
};
enqueueFollowupRun(
targetSessionKey,
followupRun,
{ mode: "collect", debounceMs: 0, cap: 20, dropPolicy: "summarize" },
"none",
);
expect(getFollowupQueueDepth(targetSessionKey)).toBe(1);
const res = await getReplyFromConfig(
{
@@ -136,6 +163,7 @@ describe("trigger handling", () => {
expect(vi.mocked(abortEmbeddedPiRun)).toHaveBeenCalledWith(targetSessionId);
const store = loadSessionStore(cfg.session.store);
expect(store[targetSessionKey]?.abortedLastRun).toBe(true);
expect(getFollowupQueueDepth(targetSessionKey)).toBe(0);
});
});
it("applies native /model to the target session", async () => {

View File

@@ -1,11 +1,23 @@
import fs from "node:fs/promises";
import os from "node:os";
import path from "node:path";
import { describe, expect, it } from "vitest";
import { describe, expect, it, vi } from "vitest";
import type { ClawdbotConfig } from "../../config/config.js";
import { isAbortTrigger, tryFastAbortFromMessage } from "./abort.js";
import { enqueueFollowupRun, getFollowupQueueDepth, type FollowupRun } from "./queue.js";
import { initSessionState } from "./session.js";
vi.mock("../../agents/pi-embedded.js", () => ({
abortEmbeddedPiRun: vi.fn().mockReturnValue(true),
resolveEmbeddedSessionLane: (key: string) => `session:${key.trim() || "main"}`,
}));
const commandQueueMocks = vi.hoisted(() => ({
clearCommandLane: vi.fn(),
}));
vi.mock("../../process/command-queue.js", () => commandQueueMocks);
describe("abort detection", () => {
it("triggerBodyNormalized extracts /stop from RawBody for abort detection", async () => {
const root = await fs.mkdtemp(path.join(os.tmpdir(), "clawdbot-abort-"));
@@ -61,4 +73,68 @@ describe("abort detection", () => {
expect(result.handled).toBe(true);
});
it("fast-abort clears queued followups and session lane", async () => {
const root = await fs.mkdtemp(path.join(os.tmpdir(), "clawdbot-abort-"));
const storePath = path.join(root, "sessions.json");
const cfg = { session: { store: storePath } } as ClawdbotConfig;
const sessionKey = "telegram:123";
const sessionId = "session-123";
await fs.writeFile(
storePath,
JSON.stringify(
{
[sessionKey]: {
sessionId,
updatedAt: Date.now(),
},
},
null,
2,
),
);
const followupRun: FollowupRun = {
prompt: "queued",
enqueuedAt: Date.now(),
run: {
agentId: "main",
agentDir: path.join(root, "agent"),
sessionId,
sessionKey,
messageProvider: "telegram",
agentAccountId: "acct",
sessionFile: path.join(root, "session.jsonl"),
workspaceDir: path.join(root, "workspace"),
config: cfg,
provider: "anthropic",
model: "claude-opus-4-5",
timeoutMs: 1000,
blockReplyBreak: "text_end",
},
};
enqueueFollowupRun(
sessionKey,
followupRun,
{ mode: "collect", debounceMs: 0, cap: 20, dropPolicy: "summarize" },
"none",
);
expect(getFollowupQueueDepth(sessionKey)).toBe(1);
const result = await tryFastAbortFromMessage({
ctx: {
CommandBody: "/stop",
RawBody: "/stop",
SessionKey: sessionKey,
Provider: "telegram",
Surface: "telegram",
From: "telegram:123",
To: "telegram:123",
},
cfg,
});
expect(result.handled).toBe(true);
expect(getFollowupQueueDepth(sessionKey)).toBe(0);
expect(commandQueueMocks.clearCommandLane).toHaveBeenCalledWith(`session:${sessionKey}`);
});
});

View File

@@ -11,7 +11,9 @@ import { parseAgentSessionKey } from "../../routing/session-key.js";
import { resolveCommandAuthorization } from "../command-auth.js";
import { normalizeCommandBody } from "../commands-registry.js";
import type { MsgContext } from "../templating.js";
import { logVerbose } from "../../globals.js";
import { stripMentions, stripStructuralPrefixes } from "./mentions.js";
import { clearSessionQueues } from "./queue.js";
const ABORT_TRIGGERS = new Set(["stop", "esc", "abort", "wait", "exit", "interrupt"]);
const ABORT_MEMORY = new Map<string, boolean>();
@@ -86,6 +88,12 @@ export async function tryFastAbortFromMessage(params: {
const { entry, key } = resolveSessionEntryForKey(store, targetKey);
const sessionId = entry?.sessionId;
const aborted = sessionId ? abortEmbeddedPiRun(sessionId) : false;
const cleared = clearSessionQueues([key ?? targetKey, sessionId]);
if (cleared.followupCleared > 0 || cleared.laneCleared > 0) {
logVerbose(
`abort: cleared followups=${cleared.followupCleared} lane=${cleared.laneCleared} keys=${cleared.keys.join(",")}`,
);
}
if (entry && key) {
entry.abortedLastRun = true;
entry.updatedAt = Date.now();

View File

@@ -7,6 +7,7 @@ import { parseAgentSessionKey } from "../../routing/session-key.js";
import { parseActivationCommand } from "../group-activation.js";
import { parseSendPolicyCommand } from "../send-policy.js";
import { isAbortTrigger, setAbortMemory } from "./abort.js";
import { clearSessionQueues } from "./queue.js";
import type { CommandHandler } from "./commands-types.js";
function resolveSessionEntryForKey(
@@ -189,6 +190,12 @@ export const handleStopCommand: CommandHandler = async (params, allowTextCommand
if (abortTarget.sessionId) {
abortEmbeddedPiRun(abortTarget.sessionId);
}
const cleared = clearSessionQueues([abortTarget.key, abortTarget.sessionId]);
if (cleared.followupCleared > 0 || cleared.laneCleared > 0) {
logVerbose(
`stop: cleared followups=${cleared.followupCleared} lane=${cleared.laneCleared} keys=${cleared.keys.join(",")}`,
);
}
if (abortTarget.entry && params.sessionStore && abortTarget.key) {
abortTarget.entry.abortedLastRun = true;
abortTarget.entry.updatedAt = Date.now();
@@ -216,6 +223,12 @@ export const handleAbortTrigger: CommandHandler = async (params, allowTextComman
if (abortTarget.sessionId) {
abortEmbeddedPiRun(abortTarget.sessionId);
}
const cleared = clearSessionQueues([abortTarget.key, abortTarget.sessionId]);
if (cleared.followupCleared > 0 || cleared.laneCleared > 0) {
logVerbose(
`stop-trigger: cleared followups=${cleared.followupCleared} lane=${cleared.laneCleared} keys=${cleared.keys.join(",")}`,
);
}
if (abortTarget.entry && params.sessionStore && abortTarget.key) {
abortTarget.entry.abortedLastRun = true;
abortTarget.entry.updatedAt = Date.now();

View File

@@ -1,7 +1,10 @@
export { extractQueueDirective } from "./queue/directive.js";
export { clearSessionQueues } from "./queue/cleanup.js";
export type { ClearSessionQueueResult } from "./queue/cleanup.js";
export { scheduleFollowupDrain } from "./queue/drain.js";
export { enqueueFollowupRun, getFollowupQueueDepth } from "./queue/enqueue.js";
export { resolveQueueSettings } from "./queue/settings.js";
export { clearFollowupQueue } from "./queue/state.js";
export type {
FollowupRun,
QueueDedupeMode,

View File

@@ -0,0 +1,27 @@
import { resolveEmbeddedSessionLane } from "../../../agents/pi-embedded.js";
import { clearCommandLane } from "../../../process/command-queue.js";
import { clearFollowupQueue } from "./state.js";
export type ClearSessionQueueResult = {
followupCleared: number;
laneCleared: number;
keys: string[];
};
export function clearSessionQueues(keys: Array<string | undefined>): ClearSessionQueueResult {
const seen = new Set<string>();
let followupCleared = 0;
let laneCleared = 0;
const clearedKeys: string[] = [];
for (const key of keys) {
const cleaned = key?.trim();
if (!cleaned || seen.has(cleaned)) continue;
seen.add(cleaned);
clearedKeys.push(cleaned);
followupCleared += clearFollowupQueue(cleaned);
laneCleared += clearCommandLane(resolveEmbeddedSessionLane(cleaned));
}
return { followupCleared, laneCleared, keys: clearedKeys };
}

View File

@@ -55,3 +55,18 @@ export function getFollowupQueue(key: string, settings: QueueSettings): Followup
FOLLOWUP_QUEUES.set(key, created);
return created;
}
export function clearFollowupQueue(key: string): number {
const cleaned = key.trim();
if (!cleaned) return 0;
const queue = FOLLOWUP_QUEUES.get(cleaned);
if (!queue) return 0;
const cleared = queue.items.length + queue.droppedCount;
queue.items.length = 0;
queue.droppedCount = 0;
queue.summaryLines = [];
queue.lastRun = undefined;
queue.lastEnqueuedAt = 0;
FOLLOWUP_QUEUES.delete(cleaned);
return cleared;
}