diff --git a/src/agents/pi-embedded-runner.ts b/src/agents/pi-embedded-runner.ts index a80b6a982..314511b9b 100644 --- a/src/agents/pi-embedded-runner.ts +++ b/src/agents/pi-embedded-runner.ts @@ -593,9 +593,9 @@ export async function runEmbeddedPiAgent(params: { ownerNumbers?: string[]; enforceFinalTag?: boolean; }): Promise { - const sessionLane = resolveSessionLane( - params.sessionKey?.trim() || params.sessionId, - ); + // Use sessionKey as the lookup key for /stop, falling back to sessionId + const activeRunKey = params.sessionKey?.trim() || params.sessionId; + const sessionLane = resolveSessionLane(activeRunKey); const globalLane = resolveGlobalLane(params.lane); const enqueueGlobal = params.enqueue ?? @@ -859,7 +859,7 @@ export async function runEmbeddedPiAgent(params: { isCompacting: () => subscription.isCompacting(), abort: abortRun, }; - ACTIVE_EMBEDDED_RUNS.set(params.sessionId, queueHandle); + ACTIVE_EMBEDDED_RUNS.set(activeRunKey, queueHandle); let abortWarnTimer: NodeJS.Timeout | undefined; const abortTimer = setTimeout( @@ -919,9 +919,9 @@ export async function runEmbeddedPiAgent(params: { abortWarnTimer = undefined; } unsubscribe(); - if (ACTIVE_EMBEDDED_RUNS.get(params.sessionId) === queueHandle) { - ACTIVE_EMBEDDED_RUNS.delete(params.sessionId); - notifyEmbeddedRunEnded(params.sessionId); + if (ACTIVE_EMBEDDED_RUNS.get(activeRunKey) === queueHandle) { + ACTIVE_EMBEDDED_RUNS.delete(activeRunKey); + notifyEmbeddedRunEnded(activeRunKey); } session.dispose(); params.abortSignal?.removeEventListener?.("abort", onAbort); diff --git a/src/telegram/bot.ts b/src/telegram/bot.ts index d02ab6ae6..4a6d6bc17 100644 --- a/src/telegram/bot.ts +++ b/src/telegram/bot.ts @@ -121,6 +121,38 @@ export function createTelegramBot(opts: TelegramBotOptions) { const mediaGroupBuffer = new Map(); const cfg = loadConfig(); + + // Handle /stop command - abort current agent run + // Registered before bot.on("message") to ensure it runs first via grammy's middleware chain + bot.command("stop", async (ctx) => { + const { abortEmbeddedPiRun, isEmbeddedPiRunActive } = await import( + "../agents/pi-embedded.js" + ); + const msg = ctx.message; + const chatId = msg?.chat.id; + const isGroup = + msg?.chat.type === "group" || msg?.chat.type === "supergroup"; + const messageThreadId = (msg as { message_thread_id?: number }) + ?.message_thread_id; + + // Build session key matching the format used by the reply flow + const sessionCfg = cfg.session; + const mainKey = (sessionCfg?.mainKey ?? "main").trim() || "main"; + const sessionId = isGroup + ? messageThreadId + ? `telegram:group:${chatId}:topic:${messageThreadId}` + : `telegram:group:${chatId}` + : mainKey; + + const wasActive = isEmbeddedPiRunActive(sessionId); + const aborted = abortEmbeddedPiRun(sessionId); + const statusMsg = wasActive + ? aborted + ? "🛑 stopped" + : "🛑 stop requested (finishing current step)" + : "nothing running"; + await ctx.reply(statusMsg); + }); const textLimit = resolveTextChunkLimit(cfg, "telegram"); const dmPolicy = cfg.telegram?.dmPolicy ?? "pairing"; const allowFrom = opts.allowFrom ?? cfg.telegram?.allowFrom;