From e0efcda77f5e4eea1e1bb0209c21efee33988dd3 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Tue, 6 Jan 2026 23:05:05 +0000 Subject: [PATCH] fix(commands): wire /stop across chat commands --- CHANGELOG.md | 1 + docs/slash-commands.md | 2 + src/agents/pi-embedded-runner.ts | 14 ++-- src/auto-reply/commands-registry.test.ts | 2 + src/auto-reply/commands-registry.ts | 6 ++ src/auto-reply/reply.triggers.test.ts | 17 +++++ src/auto-reply/reply/commands.ts | 86 ++++++++++++++++++++++-- src/auto-reply/templating.ts | 1 + src/discord/monitor.ts | 1 + src/slack/monitor.ts | 1 + src/telegram/bot.ts | 41 +++-------- 11 files changed, 129 insertions(+), 43 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 5fe04238e..80961d142 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -30,6 +30,7 @@ - Messages: stop defaulting ack reactions to 👀 when identity emoji is missing. - Auto-reply: require slash for control commands to avoid false triggers in normal text. - Auto-reply: flag error payloads and improve Bun socket error messaging. Thanks @emanuelst for PR #331. +- Commands: add `/stop` to the registry and route native aborts to the active chat session. Thanks @nachoiacovino for PR #295. - Commands: unify native + text chat commands behind `commands.*` config (Discord/Slack/Telegram). Thanks @thewilloftheshadow for PR #275. - Auto-reply: treat steer during compaction as a follow-up, queued until compaction completes. - Auth: lock auth profile refreshes to avoid multi-instance OAuth logouts; keep credentials on refresh failure. diff --git a/docs/slash-commands.md b/docs/slash-commands.md index 81cdd12cf..d558af97a 100644 --- a/docs/slash-commands.md +++ b/docs/slash-commands.md @@ -33,6 +33,7 @@ Inline text like `hello /status` is ignored. Text + native (when enabled): - `/help` - `/status` +- `/stop` - `/restart` - `/activation mention|always` (groups only) - `/send on|off|inherit` (owner-only) @@ -50,4 +51,5 @@ Text-only: - **Text commands** run in the normal chat session (DMs share `main`, groups have their own session). - **Native commands** use isolated sessions: `discord:slash:`, `slack:slash:`, `telegram:slash:`. +- **`/stop`** targets the active chat session so it can abort the current run. - **Slack:** `slack.slashCommand` is still supported for a single `/clawd`-style command. If you enable `commands.native`, you must create one Slack slash command per built-in command (same names as `/help`). diff --git a/src/agents/pi-embedded-runner.ts b/src/agents/pi-embedded-runner.ts index 314511b9b..a80b6a982 100644 --- a/src/agents/pi-embedded-runner.ts +++ b/src/agents/pi-embedded-runner.ts @@ -593,9 +593,9 @@ export async function runEmbeddedPiAgent(params: { ownerNumbers?: string[]; enforceFinalTag?: boolean; }): Promise { - // Use sessionKey as the lookup key for /stop, falling back to sessionId - const activeRunKey = params.sessionKey?.trim() || params.sessionId; - const sessionLane = resolveSessionLane(activeRunKey); + const sessionLane = resolveSessionLane( + params.sessionKey?.trim() || params.sessionId, + ); const globalLane = resolveGlobalLane(params.lane); const enqueueGlobal = params.enqueue ?? @@ -859,7 +859,7 @@ export async function runEmbeddedPiAgent(params: { isCompacting: () => subscription.isCompacting(), abort: abortRun, }; - ACTIVE_EMBEDDED_RUNS.set(activeRunKey, queueHandle); + ACTIVE_EMBEDDED_RUNS.set(params.sessionId, queueHandle); let abortWarnTimer: NodeJS.Timeout | undefined; const abortTimer = setTimeout( @@ -919,9 +919,9 @@ export async function runEmbeddedPiAgent(params: { abortWarnTimer = undefined; } unsubscribe(); - if (ACTIVE_EMBEDDED_RUNS.get(activeRunKey) === queueHandle) { - ACTIVE_EMBEDDED_RUNS.delete(activeRunKey); - notifyEmbeddedRunEnded(activeRunKey); + if (ACTIVE_EMBEDDED_RUNS.get(params.sessionId) === queueHandle) { + ACTIVE_EMBEDDED_RUNS.delete(params.sessionId); + notifyEmbeddedRunEnded(params.sessionId); } session.dispose(); params.abortSignal?.removeEventListener?.("abort", onAbort); diff --git a/src/auto-reply/commands-registry.test.ts b/src/auto-reply/commands-registry.test.ts index 7e07e9b81..394540a1d 100644 --- a/src/auto-reply/commands-registry.test.ts +++ b/src/auto-reply/commands-registry.test.ts @@ -16,12 +16,14 @@ describe("commands registry", () => { it("exposes native specs", () => { const specs = listNativeCommandSpecs(); expect(specs.find((spec) => spec.name === "help")).toBeTruthy(); + expect(specs.find((spec) => spec.name === "stop")).toBeTruthy(); }); it("detects known text commands", () => { const detection = getCommandDetection(); expect(detection.exact.has("/help")).toBe(true); expect(detection.regex.test("/status")).toBe(true); + expect(detection.regex.test("/stop")).toBe(true); expect(detection.regex.test("try /status")).toBe(false); }); diff --git a/src/auto-reply/commands-registry.ts b/src/auto-reply/commands-registry.ts index cc90e0be9..44219f66d 100644 --- a/src/auto-reply/commands-registry.ts +++ b/src/auto-reply/commands-registry.ts @@ -27,6 +27,12 @@ const CHAT_COMMANDS: ChatCommandDefinition[] = [ description: "Show current status.", textAliases: ["/status"], }, + { + key: "stop", + nativeName: "stop", + description: "Stop the current run.", + textAliases: ["/stop"], + }, { key: "restart", nativeName: "restart", diff --git a/src/auto-reply/reply.triggers.test.ts b/src/auto-reply/reply.triggers.test.ts index f3efa1368..a312a1f55 100644 --- a/src/auto-reply/reply.triggers.test.ts +++ b/src/auto-reply/reply.triggers.test.ts @@ -81,6 +81,23 @@ describe("trigger handling", () => { }); }); + it("handles /stop without invoking the agent", async () => { + await withTempHome(async (home) => { + const res = await getReplyFromConfig( + { + Body: "/stop", + From: "+1003", + To: "+2000", + }, + {}, + makeCfg(home), + ); + const text = Array.isArray(res) ? res[0]?.text : res?.text; + expect(text).toBe("⚙️ Agent was aborted."); + expect(runEmbeddedPiAgent).not.toHaveBeenCalled(); + }); + }); + it("restarts even with prefix/whitespace", async () => { await withTempHome(async (home) => { const res = await getReplyFromConfig( diff --git a/src/auto-reply/reply/commands.ts b/src/auto-reply/reply/commands.ts index f7b7e7122..030fa902e 100644 --- a/src/auto-reply/reply/commands.ts +++ b/src/auto-reply/reply/commands.ts @@ -22,6 +22,7 @@ import { import { logVerbose } from "../../globals.js"; import { triggerClawdbotRestart } from "../../infra/restart.js"; import { enqueueSystemEvent } from "../../infra/system-events.js"; +import { parseAgentSessionKey } from "../../routing/session-key.js"; import { resolveSendPolicy } from "../../sessions/send-policy.js"; import { normalizeE164 } from "../../utils.js"; import { resolveHeartbeatSeconds } from "../../web/reconnect.js"; @@ -47,6 +48,21 @@ import type { InlineDirectives } from "./directive-handling.js"; import { stripMentions, stripStructuralPrefixes } from "./mentions.js"; import { incrementCompactionCount } from "./session-updates.js"; +function resolveSessionEntryForKey( + store: Record | undefined, + sessionKey: string | undefined, +) { + if (!store || !sessionKey) return {}; + const direct = store[sessionKey]; + if (direct) return { entry: direct, key: sessionKey }; + const parsed = parseAgentSessionKey(sessionKey); + const legacyKey = parsed?.rest; + if (legacyKey && store[legacyKey]) { + return { entry: store[legacyKey], key: legacyKey }; + } + return {}; +} + export type CommandContext = { surface: string; provider: string; @@ -149,6 +165,29 @@ export function buildCommandContext(params: { }; } +function resolveAbortTarget(params: { + ctx: MsgContext; + sessionKey?: string; + sessionEntry?: SessionEntry; + sessionStore?: Record; +}) { + const targetSessionKey = + params.ctx.CommandTargetSessionKey?.trim() || params.sessionKey; + const { entry, key } = resolveSessionEntryForKey( + params.sessionStore, + targetSessionKey, + ); + if (entry && key) return { entry, key, sessionId: entry.sessionId }; + if (params.sessionEntry && params.sessionKey) { + return { + entry: params.sessionEntry, + key: params.sessionKey, + sessionId: params.sessionEntry.sessionId, + }; + } + return { entry: undefined, key: targetSessionKey, sessionId: undefined }; +} + export async function handleCommands(params: { ctx: MsgContext; cfg: ClawdbotConfig; @@ -375,6 +414,36 @@ export async function handleCommands(params: { return { shouldContinue: false, reply: { text: statusText } }; } + const stopRequested = command.commandBodyNormalized === "/stop"; + if (allowTextCommands && stopRequested) { + if (!command.isAuthorizedSender) { + logVerbose( + `Ignoring /stop from unauthorized sender: ${command.senderE164 || ""}`, + ); + return { shouldContinue: false }; + } + const abortTarget = resolveAbortTarget({ + ctx, + sessionKey, + sessionEntry, + sessionStore, + }); + if (abortTarget.sessionId) { + abortEmbeddedPiRun(abortTarget.sessionId); + } + if (abortTarget.entry && sessionStore && abortTarget.key) { + abortTarget.entry.abortedLastRun = true; + abortTarget.entry.updatedAt = Date.now(); + sessionStore[abortTarget.key] = abortTarget.entry; + if (storePath) { + await saveSessionStore(storePath, sessionStore); + } + } else if (command.abortKey) { + setAbortMemory(command.abortKey, true); + } + return { shouldContinue: false, reply: { text: "⚙️ Agent was aborted." } }; + } + const compactRequested = command.commandBodyNormalized === "/compact" || command.commandBodyNormalized.startsWith("/compact "); @@ -455,10 +524,19 @@ export async function handleCommands(params: { const abortRequested = isAbortTrigger(command.rawBodyNormalized); if (allowTextCommands && abortRequested) { - if (sessionEntry && sessionStore && sessionKey) { - sessionEntry.abortedLastRun = true; - sessionEntry.updatedAt = Date.now(); - sessionStore[sessionKey] = sessionEntry; + const abortTarget = resolveAbortTarget({ + ctx, + sessionKey, + sessionEntry, + sessionStore, + }); + if (abortTarget.sessionId) { + abortEmbeddedPiRun(abortTarget.sessionId); + } + if (abortTarget.entry && sessionStore && abortTarget.key) { + abortTarget.entry.abortedLastRun = true; + abortTarget.entry.updatedAt = Date.now(); + sessionStore[abortTarget.key] = abortTarget.entry; if (storePath) { await saveSessionStore(storePath, sessionStore); } diff --git a/src/auto-reply/templating.ts b/src/auto-reply/templating.ts index 5b229c076..65e90f319 100644 --- a/src/auto-reply/templating.ts +++ b/src/auto-reply/templating.ts @@ -33,6 +33,7 @@ export type MsgContext = { WasMentioned?: boolean; CommandAuthorized?: boolean; CommandSource?: "text" | "native"; + CommandTargetSessionKey?: string; }; export type TemplateContext = MsgContext & { diff --git a/src/discord/monitor.ts b/src/discord/monitor.ts index 7f6e9f355..b4800c23a 100644 --- a/src/discord/monitor.ts +++ b/src/discord/monitor.ts @@ -1178,6 +1178,7 @@ function createDiscordNativeCommand(params: { From: isDirectMessage ? `discord:${user.id}` : `group:${channelId}`, To: `slash:${user.id}`, SessionKey: `agent:${route.agentId}:${sessionPrefix}:${user.id}`, + CommandTargetSessionKey: route.sessionKey, AccountId: route.accountId, ChatType: isDirectMessage ? "direct" : "group", GroupSubject: isGuild ? interaction.guild?.name : undefined, diff --git a/src/slack/monitor.ts b/src/slack/monitor.ts index f98841d41..2f855fbb1 100644 --- a/src/slack/monitor.ts +++ b/src/slack/monitor.ts @@ -1562,6 +1562,7 @@ export async function monitorSlackProvider(opts: MonitorSlackOpts = {}) { MessageSid: command.trigger_id, Timestamp: Date.now(), SessionKey: `agent:${route.agentId}:${slashCommand.sessionPrefix}:${command.user_id}`, + CommandTargetSessionKey: route.sessionKey, AccountId: route.accountId, CommandSource: "native" as const, CommandAuthorized: commandAuthorized, diff --git a/src/telegram/bot.ts b/src/telegram/bot.ts index 4a6d6bc17..daa87403a 100644 --- a/src/telegram/bot.ts +++ b/src/telegram/bot.ts @@ -121,38 +121,6 @@ export function createTelegramBot(opts: TelegramBotOptions) { const mediaGroupBuffer = new Map(); const cfg = loadConfig(); - - // Handle /stop command - abort current agent run - // Registered before bot.on("message") to ensure it runs first via grammy's middleware chain - bot.command("stop", async (ctx) => { - const { abortEmbeddedPiRun, isEmbeddedPiRunActive } = await import( - "../agents/pi-embedded.js" - ); - const msg = ctx.message; - const chatId = msg?.chat.id; - const isGroup = - msg?.chat.type === "group" || msg?.chat.type === "supergroup"; - const messageThreadId = (msg as { message_thread_id?: number }) - ?.message_thread_id; - - // Build session key matching the format used by the reply flow - const sessionCfg = cfg.session; - const mainKey = (sessionCfg?.mainKey ?? "main").trim() || "main"; - const sessionId = isGroup - ? messageThreadId - ? `telegram:group:${chatId}:topic:${messageThreadId}` - : `telegram:group:${chatId}` - : mainKey; - - const wasActive = isEmbeddedPiRunActive(sessionId); - const aborted = abortEmbeddedPiRun(sessionId); - const statusMsg = wasActive - ? aborted - ? "🛑 stopped" - : "🛑 stop requested (finishing current step)" - : "nothing running"; - await ctx.reply(statusMsg); - }); const textLimit = resolveTextChunkLimit(cfg, "telegram"); const dmPolicy = cfg.telegram?.dmPolicy ?? "pairing"; const allowFrom = opts.allowFrom ?? cfg.telegram?.allowFrom; @@ -602,6 +570,14 @@ export function createTelegramBot(opts: TelegramBotOptions) { } const prompt = buildCommandText(command.name, ctx.match ?? ""); + const route = resolveAgentRoute({ + cfg, + provider: "telegram", + peer: { + kind: isGroup ? "group" : "dm", + id: String(chatId), + }, + }); const ctxPayload = { Body: prompt, From: isGroup ? `group:${chatId}` : `telegram:${chatId}`, @@ -618,6 +594,7 @@ export function createTelegramBot(opts: TelegramBotOptions) { CommandAuthorized: commandAuthorized, CommandSource: "native" as const, SessionKey: `telegram:slash:${senderId || chatId}`, + CommandTargetSessionKey: route.sessionKey, }; const replyResult = await getReplyFromConfig(