feat(telegram): add /stop command to abort running agent

Adds a /stop command that:
- Can interrupt a running agent session mid-execution
- Works in both DMs and group chats (including forum topics)
- Uses grammy's bot.command() to run before the main message handler
- Returns status: stopped, stop requested, or nothing running

Also fixes session key lookup in pi-embedded-runner to use sessionKey
instead of sessionId, ensuring /stop finds the correct active run.
This commit is contained in:
Nacho Iacovino
2026-01-06 12:06:37 +00:00
committed by Peter Steinberger
parent 5bc3f13b46
commit 0df7c3addf
2 changed files with 39 additions and 7 deletions

View File

@@ -593,9 +593,9 @@ export async function runEmbeddedPiAgent(params: {
ownerNumbers?: string[];
enforceFinalTag?: boolean;
}): Promise<EmbeddedPiRunResult> {
const sessionLane = resolveSessionLane(
params.sessionKey?.trim() || params.sessionId,
);
// Use sessionKey as the lookup key for /stop, falling back to sessionId
const activeRunKey = params.sessionKey?.trim() || params.sessionId;
const sessionLane = resolveSessionLane(activeRunKey);
const globalLane = resolveGlobalLane(params.lane);
const enqueueGlobal =
params.enqueue ??
@@ -859,7 +859,7 @@ export async function runEmbeddedPiAgent(params: {
isCompacting: () => subscription.isCompacting(),
abort: abortRun,
};
ACTIVE_EMBEDDED_RUNS.set(params.sessionId, queueHandle);
ACTIVE_EMBEDDED_RUNS.set(activeRunKey, queueHandle);
let abortWarnTimer: NodeJS.Timeout | undefined;
const abortTimer = setTimeout(
@@ -919,9 +919,9 @@ export async function runEmbeddedPiAgent(params: {
abortWarnTimer = undefined;
}
unsubscribe();
if (ACTIVE_EMBEDDED_RUNS.get(params.sessionId) === queueHandle) {
ACTIVE_EMBEDDED_RUNS.delete(params.sessionId);
notifyEmbeddedRunEnded(params.sessionId);
if (ACTIVE_EMBEDDED_RUNS.get(activeRunKey) === queueHandle) {
ACTIVE_EMBEDDED_RUNS.delete(activeRunKey);
notifyEmbeddedRunEnded(activeRunKey);
}
session.dispose();
params.abortSignal?.removeEventListener?.("abort", onAbort);

View File

@@ -121,6 +121,38 @@ export function createTelegramBot(opts: TelegramBotOptions) {
const mediaGroupBuffer = new Map<string, MediaGroupEntry>();
const cfg = loadConfig();
// Handle /stop command - abort current agent run
// Registered before bot.on("message") to ensure it runs first via grammy's middleware chain
bot.command("stop", async (ctx) => {
const { abortEmbeddedPiRun, isEmbeddedPiRunActive } = await import(
"../agents/pi-embedded.js"
);
const msg = ctx.message;
const chatId = msg?.chat.id;
const isGroup =
msg?.chat.type === "group" || msg?.chat.type === "supergroup";
const messageThreadId = (msg as { message_thread_id?: number })
?.message_thread_id;
// Build session key matching the format used by the reply flow
const sessionCfg = cfg.session;
const mainKey = (sessionCfg?.mainKey ?? "main").trim() || "main";
const sessionId = isGroup
? messageThreadId
? `telegram:group:${chatId}:topic:${messageThreadId}`
: `telegram:group:${chatId}`
: mainKey;
const wasActive = isEmbeddedPiRunActive(sessionId);
const aborted = abortEmbeddedPiRun(sessionId);
const statusMsg = wasActive
? aborted
? "🛑 stopped"
: "🛑 stop requested (finishing current step)"
: "nothing running";
await ctx.reply(statusMsg);
});
const textLimit = resolveTextChunkLimit(cfg, "telegram");
const dmPolicy = cfg.telegram?.dmPolicy ?? "pairing";
const allowFrom = opts.allowFrom ?? cfg.telegram?.allowFrom;