From 0df7c3addf4147458add391032376b52e0cb029f Mon Sep 17 00:00:00 2001 From: Nacho Iacovino Date: Tue, 6 Jan 2026 12:06:37 +0000 Subject: [PATCH] feat(telegram): add /stop command to abort running agent Adds a /stop command that: - Can interrupt a running agent session mid-execution - Works in both DMs and group chats (including forum topics) - Uses grammy's bot.command() to run before the main message handler - Returns status: stopped, stop requested, or nothing running Also fixes session key lookup in pi-embedded-runner to use sessionKey instead of sessionId, ensuring /stop finds the correct active run. --- src/agents/pi-embedded-runner.ts | 14 +++++++------- src/telegram/bot.ts | 32 ++++++++++++++++++++++++++++++++ 2 files changed, 39 insertions(+), 7 deletions(-) diff --git a/src/agents/pi-embedded-runner.ts b/src/agents/pi-embedded-runner.ts index a80b6a982..314511b9b 100644 --- a/src/agents/pi-embedded-runner.ts +++ b/src/agents/pi-embedded-runner.ts @@ -593,9 +593,9 @@ export async function runEmbeddedPiAgent(params: { ownerNumbers?: string[]; enforceFinalTag?: boolean; }): Promise { - const sessionLane = resolveSessionLane( - params.sessionKey?.trim() || params.sessionId, - ); + // Use sessionKey as the lookup key for /stop, falling back to sessionId + const activeRunKey = params.sessionKey?.trim() || params.sessionId; + const sessionLane = resolveSessionLane(activeRunKey); const globalLane = resolveGlobalLane(params.lane); const enqueueGlobal = params.enqueue ?? @@ -859,7 +859,7 @@ export async function runEmbeddedPiAgent(params: { isCompacting: () => subscription.isCompacting(), abort: abortRun, }; - ACTIVE_EMBEDDED_RUNS.set(params.sessionId, queueHandle); + ACTIVE_EMBEDDED_RUNS.set(activeRunKey, queueHandle); let abortWarnTimer: NodeJS.Timeout | undefined; const abortTimer = setTimeout( @@ -919,9 +919,9 @@ export async function runEmbeddedPiAgent(params: { abortWarnTimer = undefined; } unsubscribe(); - if (ACTIVE_EMBEDDED_RUNS.get(params.sessionId) === queueHandle) { - ACTIVE_EMBEDDED_RUNS.delete(params.sessionId); - notifyEmbeddedRunEnded(params.sessionId); + if (ACTIVE_EMBEDDED_RUNS.get(activeRunKey) === queueHandle) { + ACTIVE_EMBEDDED_RUNS.delete(activeRunKey); + notifyEmbeddedRunEnded(activeRunKey); } session.dispose(); params.abortSignal?.removeEventListener?.("abort", onAbort); diff --git a/src/telegram/bot.ts b/src/telegram/bot.ts index d02ab6ae6..4a6d6bc17 100644 --- a/src/telegram/bot.ts +++ b/src/telegram/bot.ts @@ -121,6 +121,38 @@ export function createTelegramBot(opts: TelegramBotOptions) { const mediaGroupBuffer = new Map(); const cfg = loadConfig(); + + // Handle /stop command - abort current agent run + // Registered before bot.on("message") to ensure it runs first via grammy's middleware chain + bot.command("stop", async (ctx) => { + const { abortEmbeddedPiRun, isEmbeddedPiRunActive } = await import( + "../agents/pi-embedded.js" + ); + const msg = ctx.message; + const chatId = msg?.chat.id; + const isGroup = + msg?.chat.type === "group" || msg?.chat.type === "supergroup"; + const messageThreadId = (msg as { message_thread_id?: number }) + ?.message_thread_id; + + // Build session key matching the format used by the reply flow + const sessionCfg = cfg.session; + const mainKey = (sessionCfg?.mainKey ?? "main").trim() || "main"; + const sessionId = isGroup + ? messageThreadId + ? `telegram:group:${chatId}:topic:${messageThreadId}` + : `telegram:group:${chatId}` + : mainKey; + + const wasActive = isEmbeddedPiRunActive(sessionId); + const aborted = abortEmbeddedPiRun(sessionId); + const statusMsg = wasActive + ? aborted + ? "🛑 stopped" + : "🛑 stop requested (finishing current step)" + : "nothing running"; + await ctx.reply(statusMsg); + }); const textLimit = resolveTextChunkLimit(cfg, "telegram"); const dmPolicy = cfg.telegram?.dmPolicy ?? "pairing"; const allowFrom = opts.allowFrom ?? cfg.telegram?.allowFrom;