From 9072e35f0895d269a3fb9f88fe9fa24f45978a4f Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Fri, 16 Jan 2026 21:15:25 +0000 Subject: [PATCH] fix: hard-abort clears queues on /stop --- CHANGELOG.md | 1 + docs/concepts/queue.md | 22 +++--- docs/concepts/session.md | 2 +- ...targets-active-session-native-stop.test.ts | 28 +++++++ src/auto-reply/reply/abort.test.ts | 78 ++++++++++++++++++- src/auto-reply/reply/abort.ts | 8 ++ src/auto-reply/reply/commands-session.ts | 13 ++++ src/auto-reply/reply/queue.ts | 3 + src/auto-reply/reply/queue/cleanup.ts | 27 +++++++ src/auto-reply/reply/queue/state.ts | 15 ++++ 10 files changed, 184 insertions(+), 13 deletions(-) create mode 100644 src/auto-reply/reply/queue/cleanup.ts diff --git a/CHANGELOG.md b/CHANGELOG.md index f6a91f82d..269655c18 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -54,6 +54,7 @@ - Discord: allow emoji/sticker uploads + channel actions in config defaults. (#870) — thanks @JDIVE. ### Fixes +- Messages: make `/stop` clear queued followups and pending session lane work for a hard abort. - WhatsApp: default response prefix only for self-chat, using identity name when set. - Signal/iMessage: bound transport readiness waits to 30s with periodic logging. (#1014) — thanks @Szpadel. - Auth: merge main auth profiles into per-agent stores for sub-agents and document inheritance. (#1013) — thanks @marcmarg. diff --git a/docs/concepts/queue.md b/docs/concepts/queue.md index 31b1f344b..da6c32741 100644 --- a/docs/concepts/queue.md +++ b/docs/concepts/queue.md @@ -1,28 +1,28 @@ --- -summary: "Command queue design that serializes auto-reply command execution" +summary: "Command queue design that serializes inbound auto-reply runs" read_when: - Changing auto-reply execution or concurrency --- -# Command Queue (2026-01-03) +# Command Queue (2026-01-16) -We now serialize command-based auto-replies (WhatsApp Web listener) through a tiny in-process queue to prevent multiple commands from running at once, while allowing safe parallelism across sessions. +We serialize inbound auto-reply runs (all channels) through a tiny in-process queue to prevent multiple agent runs from colliding, while still allowing safe parallelism across sessions. ## Why -- Some auto-reply commands are expensive (LLM calls) and can collide when multiple inbound messages arrive close together. -- Serializing avoids competing for terminal/stdin, keeps logs readable, and reduces the chance of rate limits from upstream tools. +- Auto-reply runs can be expensive (LLM calls) and can collide when multiple inbound messages arrive close together. +- Serializing avoids competing for shared resources (session files, logs, CLI stdin) and reduces the chance of upstream rate limits. ## How it works -- A lane-aware FIFO queue drains each lane synchronously. +- A lane-aware FIFO queue drains each lane with a configurable concurrency cap (default 1). - `runEmbeddedPiAgent` enqueues by **session key** (lane `session:`) to guarantee only one active run per session. - Each session run is then queued into a **global lane** (`main` by default) so overall parallelism is capped by `agents.defaults.maxConcurrent`. -- When verbose logging is enabled, queued commands emit a short notice if they waited more than ~2s before starting. -- Typing indicators (`onReplyStart`) still fire immediately on enqueue so user experience is unchanged while we wait our turn. +- When verbose logging is enabled, queued runs emit a short notice if they waited more than ~2s before starting. +- Typing indicators still fire immediately on enqueue (when supported by the channel) so user experience is unchanged while we wait our turn. ## Queue modes (per channel) Inbound messages can steer the current run, wait for a followup turn, or do both: - `steer`: inject immediately into the current run (cancels pending tool calls after the next tool boundary). If not streaming, falls back to followup. - `followup`: enqueue for the next agent turn after the current run ends. -- `collect`: coalesce all queued messages into a **single** followup turn (default). +- `collect`: coalesce all queued messages into a **single** followup turn (default). If messages target different channels/threads, they drain individually to preserve routing. - `steer-backlog` (aka `steer+backlog`): steer now **and** preserve the message for a followup turn. - `interrupt` (legacy): abort the active run for that session, then run the newest message. - `queue` (legacy alias): same as `steer`. @@ -66,9 +66,9 @@ Defaults: `debounceMs: 1000`, `cap: 20`, `drop: summarize`. - `/queue default` or `/queue reset` clears the session override. ## Scope and guarantees -- Applies only to config-driven command replies; plain text replies are unaffected. +- Applies to auto-reply agent runs across all inbound channels that use the gateway reply pipeline (WhatsApp web, Telegram, Slack, Discord, Signal, iMessage, webchat, etc.). - Default lane (`main`) is process-wide for inbound + main heartbeats; set `agents.defaults.maxConcurrent` to allow multiple sessions in parallel. -- Additional lanes may exist (e.g. `cron`) so background jobs can run in parallel without blocking inbound replies. +- Additional lanes may exist (e.g. `cron`, `subagent`) so background jobs can run in parallel without blocking inbound replies. - Per-session lanes guarantee that only one agent run touches a given session at a time. - No external dependencies or background worker threads; pure TypeScript + promises. diff --git a/docs/concepts/session.md b/docs/concepts/session.md index 075f4b039..94995b633 100644 --- a/docs/concepts/session.md +++ b/docs/concepts/session.md @@ -105,7 +105,7 @@ Send these as standalone messages so they register. - `clawdbot gateway call sessions.list --params '{}'` — fetch sessions from the running gateway (use `--url`/`--token` for remote gateway access). - Send `/status` as a standalone message in chat to see whether the agent is reachable, how much of the session context is used, current thinking/verbose toggles, and when your WhatsApp web creds were last refreshed (helps spot relink needs). - Send `/context list` or `/context detail` to see what’s in the system prompt and injected workspace files (and the biggest context contributors). -- Send `/stop` as a standalone message to abort the current run. +- Send `/stop` as a standalone message to abort the current run and clear queued followups for that session. - Send `/compact` (optional instructions) as a standalone message to summarize older context and free up window space. See [/concepts/compaction](/concepts/compaction). - JSONL transcripts can be opened directly to review full turns. diff --git a/src/auto-reply/reply.triggers.trigger-handling.targets-active-session-native-stop.test.ts b/src/auto-reply/reply.triggers.trigger-handling.targets-active-session-native-stop.test.ts index 7fd774dfd..be6393c25 100644 --- a/src/auto-reply/reply.triggers.trigger-handling.targets-active-session-native-stop.test.ts +++ b/src/auto-reply/reply.triggers.trigger-handling.targets-active-session-native-stop.test.ts @@ -50,6 +50,7 @@ vi.mock("../agents/model-catalog.js", () => modelCatalogMocks); import { abortEmbeddedPiRun, runEmbeddedPiAgent } from "../agents/pi-embedded.js"; import { loadSessionStore } from "../config/sessions.js"; +import { enqueueFollowupRun, getFollowupQueueDepth, type FollowupRun } from "./reply/queue.js"; import { getReplyFromConfig } from "./reply.js"; const MAIN_SESSION_KEY = "agent:main:main"; @@ -113,6 +114,32 @@ describe("trigger handling", () => { 2, ), ); + const followupRun: FollowupRun = { + prompt: "queued", + enqueuedAt: Date.now(), + run: { + agentId: "main", + agentDir: join(home, "agent"), + sessionId: targetSessionId, + sessionKey: targetSessionKey, + messageProvider: "telegram", + agentAccountId: "acct", + sessionFile: join(home, "session.jsonl"), + workspaceDir: join(home, "workspace"), + config: cfg, + provider: "anthropic", + model: "claude-opus-4-5", + timeoutMs: 1000, + blockReplyBreak: "text_end", + }, + }; + enqueueFollowupRun( + targetSessionKey, + followupRun, + { mode: "collect", debounceMs: 0, cap: 20, dropPolicy: "summarize" }, + "none", + ); + expect(getFollowupQueueDepth(targetSessionKey)).toBe(1); const res = await getReplyFromConfig( { @@ -136,6 +163,7 @@ describe("trigger handling", () => { expect(vi.mocked(abortEmbeddedPiRun)).toHaveBeenCalledWith(targetSessionId); const store = loadSessionStore(cfg.session.store); expect(store[targetSessionKey]?.abortedLastRun).toBe(true); + expect(getFollowupQueueDepth(targetSessionKey)).toBe(0); }); }); it("applies native /model to the target session", async () => { diff --git a/src/auto-reply/reply/abort.test.ts b/src/auto-reply/reply/abort.test.ts index 804f5ed82..3bd21633b 100644 --- a/src/auto-reply/reply/abort.test.ts +++ b/src/auto-reply/reply/abort.test.ts @@ -1,11 +1,23 @@ import fs from "node:fs/promises"; import os from "node:os"; import path from "node:path"; -import { describe, expect, it } from "vitest"; +import { describe, expect, it, vi } from "vitest"; import type { ClawdbotConfig } from "../../config/config.js"; import { isAbortTrigger, tryFastAbortFromMessage } from "./abort.js"; +import { enqueueFollowupRun, getFollowupQueueDepth, type FollowupRun } from "./queue.js"; import { initSessionState } from "./session.js"; +vi.mock("../../agents/pi-embedded.js", () => ({ + abortEmbeddedPiRun: vi.fn().mockReturnValue(true), + resolveEmbeddedSessionLane: (key: string) => `session:${key.trim() || "main"}`, +})); + +const commandQueueMocks = vi.hoisted(() => ({ + clearCommandLane: vi.fn(), +})); + +vi.mock("../../process/command-queue.js", () => commandQueueMocks); + describe("abort detection", () => { it("triggerBodyNormalized extracts /stop from RawBody for abort detection", async () => { const root = await fs.mkdtemp(path.join(os.tmpdir(), "clawdbot-abort-")); @@ -61,4 +73,68 @@ describe("abort detection", () => { expect(result.handled).toBe(true); }); + + it("fast-abort clears queued followups and session lane", async () => { + const root = await fs.mkdtemp(path.join(os.tmpdir(), "clawdbot-abort-")); + const storePath = path.join(root, "sessions.json"); + const cfg = { session: { store: storePath } } as ClawdbotConfig; + const sessionKey = "telegram:123"; + const sessionId = "session-123"; + await fs.writeFile( + storePath, + JSON.stringify( + { + [sessionKey]: { + sessionId, + updatedAt: Date.now(), + }, + }, + null, + 2, + ), + ); + const followupRun: FollowupRun = { + prompt: "queued", + enqueuedAt: Date.now(), + run: { + agentId: "main", + agentDir: path.join(root, "agent"), + sessionId, + sessionKey, + messageProvider: "telegram", + agentAccountId: "acct", + sessionFile: path.join(root, "session.jsonl"), + workspaceDir: path.join(root, "workspace"), + config: cfg, + provider: "anthropic", + model: "claude-opus-4-5", + timeoutMs: 1000, + blockReplyBreak: "text_end", + }, + }; + enqueueFollowupRun( + sessionKey, + followupRun, + { mode: "collect", debounceMs: 0, cap: 20, dropPolicy: "summarize" }, + "none", + ); + expect(getFollowupQueueDepth(sessionKey)).toBe(1); + + const result = await tryFastAbortFromMessage({ + ctx: { + CommandBody: "/stop", + RawBody: "/stop", + SessionKey: sessionKey, + Provider: "telegram", + Surface: "telegram", + From: "telegram:123", + To: "telegram:123", + }, + cfg, + }); + + expect(result.handled).toBe(true); + expect(getFollowupQueueDepth(sessionKey)).toBe(0); + expect(commandQueueMocks.clearCommandLane).toHaveBeenCalledWith(`session:${sessionKey}`); + }); }); diff --git a/src/auto-reply/reply/abort.ts b/src/auto-reply/reply/abort.ts index 43af0f8e1..da60cdc9f 100644 --- a/src/auto-reply/reply/abort.ts +++ b/src/auto-reply/reply/abort.ts @@ -11,7 +11,9 @@ import { parseAgentSessionKey } from "../../routing/session-key.js"; import { resolveCommandAuthorization } from "../command-auth.js"; import { normalizeCommandBody } from "../commands-registry.js"; import type { MsgContext } from "../templating.js"; +import { logVerbose } from "../../globals.js"; import { stripMentions, stripStructuralPrefixes } from "./mentions.js"; +import { clearSessionQueues } from "./queue.js"; const ABORT_TRIGGERS = new Set(["stop", "esc", "abort", "wait", "exit", "interrupt"]); const ABORT_MEMORY = new Map(); @@ -86,6 +88,12 @@ export async function tryFastAbortFromMessage(params: { const { entry, key } = resolveSessionEntryForKey(store, targetKey); const sessionId = entry?.sessionId; const aborted = sessionId ? abortEmbeddedPiRun(sessionId) : false; + const cleared = clearSessionQueues([key ?? targetKey, sessionId]); + if (cleared.followupCleared > 0 || cleared.laneCleared > 0) { + logVerbose( + `abort: cleared followups=${cleared.followupCleared} lane=${cleared.laneCleared} keys=${cleared.keys.join(",")}`, + ); + } if (entry && key) { entry.abortedLastRun = true; entry.updatedAt = Date.now(); diff --git a/src/auto-reply/reply/commands-session.ts b/src/auto-reply/reply/commands-session.ts index 1adde3b5f..2391e512d 100644 --- a/src/auto-reply/reply/commands-session.ts +++ b/src/auto-reply/reply/commands-session.ts @@ -7,6 +7,7 @@ import { parseAgentSessionKey } from "../../routing/session-key.js"; import { parseActivationCommand } from "../group-activation.js"; import { parseSendPolicyCommand } from "../send-policy.js"; import { isAbortTrigger, setAbortMemory } from "./abort.js"; +import { clearSessionQueues } from "./queue.js"; import type { CommandHandler } from "./commands-types.js"; function resolveSessionEntryForKey( @@ -189,6 +190,12 @@ export const handleStopCommand: CommandHandler = async (params, allowTextCommand if (abortTarget.sessionId) { abortEmbeddedPiRun(abortTarget.sessionId); } + const cleared = clearSessionQueues([abortTarget.key, abortTarget.sessionId]); + if (cleared.followupCleared > 0 || cleared.laneCleared > 0) { + logVerbose( + `stop: cleared followups=${cleared.followupCleared} lane=${cleared.laneCleared} keys=${cleared.keys.join(",")}`, + ); + } if (abortTarget.entry && params.sessionStore && abortTarget.key) { abortTarget.entry.abortedLastRun = true; abortTarget.entry.updatedAt = Date.now(); @@ -216,6 +223,12 @@ export const handleAbortTrigger: CommandHandler = async (params, allowTextComman if (abortTarget.sessionId) { abortEmbeddedPiRun(abortTarget.sessionId); } + const cleared = clearSessionQueues([abortTarget.key, abortTarget.sessionId]); + if (cleared.followupCleared > 0 || cleared.laneCleared > 0) { + logVerbose( + `stop-trigger: cleared followups=${cleared.followupCleared} lane=${cleared.laneCleared} keys=${cleared.keys.join(",")}`, + ); + } if (abortTarget.entry && params.sessionStore && abortTarget.key) { abortTarget.entry.abortedLastRun = true; abortTarget.entry.updatedAt = Date.now(); diff --git a/src/auto-reply/reply/queue.ts b/src/auto-reply/reply/queue.ts index cbb5e9cbd..3d0ddb371 100644 --- a/src/auto-reply/reply/queue.ts +++ b/src/auto-reply/reply/queue.ts @@ -1,7 +1,10 @@ export { extractQueueDirective } from "./queue/directive.js"; +export { clearSessionQueues } from "./queue/cleanup.js"; +export type { ClearSessionQueueResult } from "./queue/cleanup.js"; export { scheduleFollowupDrain } from "./queue/drain.js"; export { enqueueFollowupRun, getFollowupQueueDepth } from "./queue/enqueue.js"; export { resolveQueueSettings } from "./queue/settings.js"; +export { clearFollowupQueue } from "./queue/state.js"; export type { FollowupRun, QueueDedupeMode, diff --git a/src/auto-reply/reply/queue/cleanup.ts b/src/auto-reply/reply/queue/cleanup.ts new file mode 100644 index 000000000..6f53c0185 --- /dev/null +++ b/src/auto-reply/reply/queue/cleanup.ts @@ -0,0 +1,27 @@ +import { resolveEmbeddedSessionLane } from "../../../agents/pi-embedded.js"; +import { clearCommandLane } from "../../../process/command-queue.js"; +import { clearFollowupQueue } from "./state.js"; + +export type ClearSessionQueueResult = { + followupCleared: number; + laneCleared: number; + keys: string[]; +}; + +export function clearSessionQueues(keys: Array): ClearSessionQueueResult { + const seen = new Set(); + let followupCleared = 0; + let laneCleared = 0; + const clearedKeys: string[] = []; + + for (const key of keys) { + const cleaned = key?.trim(); + if (!cleaned || seen.has(cleaned)) continue; + seen.add(cleaned); + clearedKeys.push(cleaned); + followupCleared += clearFollowupQueue(cleaned); + laneCleared += clearCommandLane(resolveEmbeddedSessionLane(cleaned)); + } + + return { followupCleared, laneCleared, keys: clearedKeys }; +} diff --git a/src/auto-reply/reply/queue/state.ts b/src/auto-reply/reply/queue/state.ts index d7977ae57..0357ff8c8 100644 --- a/src/auto-reply/reply/queue/state.ts +++ b/src/auto-reply/reply/queue/state.ts @@ -55,3 +55,18 @@ export function getFollowupQueue(key: string, settings: QueueSettings): Followup FOLLOWUP_QUEUES.set(key, created); return created; } + +export function clearFollowupQueue(key: string): number { + const cleaned = key.trim(); + if (!cleaned) return 0; + const queue = FOLLOWUP_QUEUES.get(cleaned); + if (!queue) return 0; + const cleared = queue.items.length + queue.droppedCount; + queue.items.length = 0; + queue.droppedCount = 0; + queue.summaryLines = []; + queue.lastRun = undefined; + queue.lastEnqueuedAt = 0; + FOLLOWUP_QUEUES.delete(cleaned); + return cleared; +}