fix(commands): wire /stop across chat commands
This commit is contained in:
@@ -30,6 +30,7 @@
|
||||
- Messages: stop defaulting ack reactions to 👀 when identity emoji is missing.
|
||||
- Auto-reply: require slash for control commands to avoid false triggers in normal text.
|
||||
- Auto-reply: flag error payloads and improve Bun socket error messaging. Thanks @emanuelst for PR #331.
|
||||
- Commands: add `/stop` to the registry and route native aborts to the active chat session. Thanks @nachoiacovino for PR #295.
|
||||
- Commands: unify native + text chat commands behind `commands.*` config (Discord/Slack/Telegram). Thanks @thewilloftheshadow for PR #275.
|
||||
- Auto-reply: treat steer during compaction as a follow-up, queued until compaction completes.
|
||||
- Auth: lock auth profile refreshes to avoid multi-instance OAuth logouts; keep credentials on refresh failure.
|
||||
|
||||
@@ -33,6 +33,7 @@ Inline text like `hello /status` is ignored.
|
||||
Text + native (when enabled):
|
||||
- `/help`
|
||||
- `/status`
|
||||
- `/stop`
|
||||
- `/restart`
|
||||
- `/activation mention|always` (groups only)
|
||||
- `/send on|off|inherit` (owner-only)
|
||||
@@ -50,4 +51,5 @@ Text-only:
|
||||
|
||||
- **Text commands** run in the normal chat session (DMs share `main`, groups have their own session).
|
||||
- **Native commands** use isolated sessions: `discord:slash:<userId>`, `slack:slash:<userId>`, `telegram:slash:<userId>`.
|
||||
- **`/stop`** targets the active chat session so it can abort the current run.
|
||||
- **Slack:** `slack.slashCommand` is still supported for a single `/clawd`-style command. If you enable `commands.native`, you must create one Slack slash command per built-in command (same names as `/help`).
|
||||
|
||||
@@ -593,9 +593,9 @@ export async function runEmbeddedPiAgent(params: {
|
||||
ownerNumbers?: string[];
|
||||
enforceFinalTag?: boolean;
|
||||
}): Promise<EmbeddedPiRunResult> {
|
||||
// Use sessionKey as the lookup key for /stop, falling back to sessionId
|
||||
const activeRunKey = params.sessionKey?.trim() || params.sessionId;
|
||||
const sessionLane = resolveSessionLane(activeRunKey);
|
||||
const sessionLane = resolveSessionLane(
|
||||
params.sessionKey?.trim() || params.sessionId,
|
||||
);
|
||||
const globalLane = resolveGlobalLane(params.lane);
|
||||
const enqueueGlobal =
|
||||
params.enqueue ??
|
||||
@@ -859,7 +859,7 @@ export async function runEmbeddedPiAgent(params: {
|
||||
isCompacting: () => subscription.isCompacting(),
|
||||
abort: abortRun,
|
||||
};
|
||||
ACTIVE_EMBEDDED_RUNS.set(activeRunKey, queueHandle);
|
||||
ACTIVE_EMBEDDED_RUNS.set(params.sessionId, queueHandle);
|
||||
|
||||
let abortWarnTimer: NodeJS.Timeout | undefined;
|
||||
const abortTimer = setTimeout(
|
||||
@@ -919,9 +919,9 @@ export async function runEmbeddedPiAgent(params: {
|
||||
abortWarnTimer = undefined;
|
||||
}
|
||||
unsubscribe();
|
||||
if (ACTIVE_EMBEDDED_RUNS.get(activeRunKey) === queueHandle) {
|
||||
ACTIVE_EMBEDDED_RUNS.delete(activeRunKey);
|
||||
notifyEmbeddedRunEnded(activeRunKey);
|
||||
if (ACTIVE_EMBEDDED_RUNS.get(params.sessionId) === queueHandle) {
|
||||
ACTIVE_EMBEDDED_RUNS.delete(params.sessionId);
|
||||
notifyEmbeddedRunEnded(params.sessionId);
|
||||
}
|
||||
session.dispose();
|
||||
params.abortSignal?.removeEventListener?.("abort", onAbort);
|
||||
|
||||
@@ -16,12 +16,14 @@ describe("commands registry", () => {
|
||||
it("exposes native specs", () => {
|
||||
const specs = listNativeCommandSpecs();
|
||||
expect(specs.find((spec) => spec.name === "help")).toBeTruthy();
|
||||
expect(specs.find((spec) => spec.name === "stop")).toBeTruthy();
|
||||
});
|
||||
|
||||
it("detects known text commands", () => {
|
||||
const detection = getCommandDetection();
|
||||
expect(detection.exact.has("/help")).toBe(true);
|
||||
expect(detection.regex.test("/status")).toBe(true);
|
||||
expect(detection.regex.test("/stop")).toBe(true);
|
||||
expect(detection.regex.test("try /status")).toBe(false);
|
||||
});
|
||||
|
||||
|
||||
@@ -27,6 +27,12 @@ const CHAT_COMMANDS: ChatCommandDefinition[] = [
|
||||
description: "Show current status.",
|
||||
textAliases: ["/status"],
|
||||
},
|
||||
{
|
||||
key: "stop",
|
||||
nativeName: "stop",
|
||||
description: "Stop the current run.",
|
||||
textAliases: ["/stop"],
|
||||
},
|
||||
{
|
||||
key: "restart",
|
||||
nativeName: "restart",
|
||||
|
||||
@@ -81,6 +81,23 @@ describe("trigger handling", () => {
|
||||
});
|
||||
});
|
||||
|
||||
it("handles /stop without invoking the agent", async () => {
|
||||
await withTempHome(async (home) => {
|
||||
const res = await getReplyFromConfig(
|
||||
{
|
||||
Body: "/stop",
|
||||
From: "+1003",
|
||||
To: "+2000",
|
||||
},
|
||||
{},
|
||||
makeCfg(home),
|
||||
);
|
||||
const text = Array.isArray(res) ? res[0]?.text : res?.text;
|
||||
expect(text).toBe("⚙️ Agent was aborted.");
|
||||
expect(runEmbeddedPiAgent).not.toHaveBeenCalled();
|
||||
});
|
||||
});
|
||||
|
||||
it("restarts even with prefix/whitespace", async () => {
|
||||
await withTempHome(async (home) => {
|
||||
const res = await getReplyFromConfig(
|
||||
|
||||
@@ -22,6 +22,7 @@ import {
|
||||
import { logVerbose } from "../../globals.js";
|
||||
import { triggerClawdbotRestart } from "../../infra/restart.js";
|
||||
import { enqueueSystemEvent } from "../../infra/system-events.js";
|
||||
import { parseAgentSessionKey } from "../../routing/session-key.js";
|
||||
import { resolveSendPolicy } from "../../sessions/send-policy.js";
|
||||
import { normalizeE164 } from "../../utils.js";
|
||||
import { resolveHeartbeatSeconds } from "../../web/reconnect.js";
|
||||
@@ -47,6 +48,21 @@ import type { InlineDirectives } from "./directive-handling.js";
|
||||
import { stripMentions, stripStructuralPrefixes } from "./mentions.js";
|
||||
import { incrementCompactionCount } from "./session-updates.js";
|
||||
|
||||
function resolveSessionEntryForKey(
|
||||
store: Record<string, SessionEntry> | undefined,
|
||||
sessionKey: string | undefined,
|
||||
) {
|
||||
if (!store || !sessionKey) return {};
|
||||
const direct = store[sessionKey];
|
||||
if (direct) return { entry: direct, key: sessionKey };
|
||||
const parsed = parseAgentSessionKey(sessionKey);
|
||||
const legacyKey = parsed?.rest;
|
||||
if (legacyKey && store[legacyKey]) {
|
||||
return { entry: store[legacyKey], key: legacyKey };
|
||||
}
|
||||
return {};
|
||||
}
|
||||
|
||||
export type CommandContext = {
|
||||
surface: string;
|
||||
provider: string;
|
||||
@@ -149,6 +165,29 @@ export function buildCommandContext(params: {
|
||||
};
|
||||
}
|
||||
|
||||
function resolveAbortTarget(params: {
|
||||
ctx: MsgContext;
|
||||
sessionKey?: string;
|
||||
sessionEntry?: SessionEntry;
|
||||
sessionStore?: Record<string, SessionEntry>;
|
||||
}) {
|
||||
const targetSessionKey =
|
||||
params.ctx.CommandTargetSessionKey?.trim() || params.sessionKey;
|
||||
const { entry, key } = resolveSessionEntryForKey(
|
||||
params.sessionStore,
|
||||
targetSessionKey,
|
||||
);
|
||||
if (entry && key) return { entry, key, sessionId: entry.sessionId };
|
||||
if (params.sessionEntry && params.sessionKey) {
|
||||
return {
|
||||
entry: params.sessionEntry,
|
||||
key: params.sessionKey,
|
||||
sessionId: params.sessionEntry.sessionId,
|
||||
};
|
||||
}
|
||||
return { entry: undefined, key: targetSessionKey, sessionId: undefined };
|
||||
}
|
||||
|
||||
export async function handleCommands(params: {
|
||||
ctx: MsgContext;
|
||||
cfg: ClawdbotConfig;
|
||||
@@ -375,6 +414,36 @@ export async function handleCommands(params: {
|
||||
return { shouldContinue: false, reply: { text: statusText } };
|
||||
}
|
||||
|
||||
const stopRequested = command.commandBodyNormalized === "/stop";
|
||||
if (allowTextCommands && stopRequested) {
|
||||
if (!command.isAuthorizedSender) {
|
||||
logVerbose(
|
||||
`Ignoring /stop from unauthorized sender: ${command.senderE164 || "<unknown>"}`,
|
||||
);
|
||||
return { shouldContinue: false };
|
||||
}
|
||||
const abortTarget = resolveAbortTarget({
|
||||
ctx,
|
||||
sessionKey,
|
||||
sessionEntry,
|
||||
sessionStore,
|
||||
});
|
||||
if (abortTarget.sessionId) {
|
||||
abortEmbeddedPiRun(abortTarget.sessionId);
|
||||
}
|
||||
if (abortTarget.entry && sessionStore && abortTarget.key) {
|
||||
abortTarget.entry.abortedLastRun = true;
|
||||
abortTarget.entry.updatedAt = Date.now();
|
||||
sessionStore[abortTarget.key] = abortTarget.entry;
|
||||
if (storePath) {
|
||||
await saveSessionStore(storePath, sessionStore);
|
||||
}
|
||||
} else if (command.abortKey) {
|
||||
setAbortMemory(command.abortKey, true);
|
||||
}
|
||||
return { shouldContinue: false, reply: { text: "⚙️ Agent was aborted." } };
|
||||
}
|
||||
|
||||
const compactRequested =
|
||||
command.commandBodyNormalized === "/compact" ||
|
||||
command.commandBodyNormalized.startsWith("/compact ");
|
||||
@@ -455,10 +524,19 @@ export async function handleCommands(params: {
|
||||
|
||||
const abortRequested = isAbortTrigger(command.rawBodyNormalized);
|
||||
if (allowTextCommands && abortRequested) {
|
||||
if (sessionEntry && sessionStore && sessionKey) {
|
||||
sessionEntry.abortedLastRun = true;
|
||||
sessionEntry.updatedAt = Date.now();
|
||||
sessionStore[sessionKey] = sessionEntry;
|
||||
const abortTarget = resolveAbortTarget({
|
||||
ctx,
|
||||
sessionKey,
|
||||
sessionEntry,
|
||||
sessionStore,
|
||||
});
|
||||
if (abortTarget.sessionId) {
|
||||
abortEmbeddedPiRun(abortTarget.sessionId);
|
||||
}
|
||||
if (abortTarget.entry && sessionStore && abortTarget.key) {
|
||||
abortTarget.entry.abortedLastRun = true;
|
||||
abortTarget.entry.updatedAt = Date.now();
|
||||
sessionStore[abortTarget.key] = abortTarget.entry;
|
||||
if (storePath) {
|
||||
await saveSessionStore(storePath, sessionStore);
|
||||
}
|
||||
|
||||
@@ -33,6 +33,7 @@ export type MsgContext = {
|
||||
WasMentioned?: boolean;
|
||||
CommandAuthorized?: boolean;
|
||||
CommandSource?: "text" | "native";
|
||||
CommandTargetSessionKey?: string;
|
||||
};
|
||||
|
||||
export type TemplateContext = MsgContext & {
|
||||
|
||||
@@ -1178,6 +1178,7 @@ function createDiscordNativeCommand(params: {
|
||||
From: isDirectMessage ? `discord:${user.id}` : `group:${channelId}`,
|
||||
To: `slash:${user.id}`,
|
||||
SessionKey: `agent:${route.agentId}:${sessionPrefix}:${user.id}`,
|
||||
CommandTargetSessionKey: route.sessionKey,
|
||||
AccountId: route.accountId,
|
||||
ChatType: isDirectMessage ? "direct" : "group",
|
||||
GroupSubject: isGuild ? interaction.guild?.name : undefined,
|
||||
|
||||
@@ -1562,6 +1562,7 @@ export async function monitorSlackProvider(opts: MonitorSlackOpts = {}) {
|
||||
MessageSid: command.trigger_id,
|
||||
Timestamp: Date.now(),
|
||||
SessionKey: `agent:${route.agentId}:${slashCommand.sessionPrefix}:${command.user_id}`,
|
||||
CommandTargetSessionKey: route.sessionKey,
|
||||
AccountId: route.accountId,
|
||||
CommandSource: "native" as const,
|
||||
CommandAuthorized: commandAuthorized,
|
||||
|
||||
@@ -121,38 +121,6 @@ export function createTelegramBot(opts: TelegramBotOptions) {
|
||||
const mediaGroupBuffer = new Map<string, MediaGroupEntry>();
|
||||
|
||||
const cfg = loadConfig();
|
||||
|
||||
// Handle /stop command - abort current agent run
|
||||
// Registered before bot.on("message") to ensure it runs first via grammy's middleware chain
|
||||
bot.command("stop", async (ctx) => {
|
||||
const { abortEmbeddedPiRun, isEmbeddedPiRunActive } = await import(
|
||||
"../agents/pi-embedded.js"
|
||||
);
|
||||
const msg = ctx.message;
|
||||
const chatId = msg?.chat.id;
|
||||
const isGroup =
|
||||
msg?.chat.type === "group" || msg?.chat.type === "supergroup";
|
||||
const messageThreadId = (msg as { message_thread_id?: number })
|
||||
?.message_thread_id;
|
||||
|
||||
// Build session key matching the format used by the reply flow
|
||||
const sessionCfg = cfg.session;
|
||||
const mainKey = (sessionCfg?.mainKey ?? "main").trim() || "main";
|
||||
const sessionId = isGroup
|
||||
? messageThreadId
|
||||
? `telegram:group:${chatId}:topic:${messageThreadId}`
|
||||
: `telegram:group:${chatId}`
|
||||
: mainKey;
|
||||
|
||||
const wasActive = isEmbeddedPiRunActive(sessionId);
|
||||
const aborted = abortEmbeddedPiRun(sessionId);
|
||||
const statusMsg = wasActive
|
||||
? aborted
|
||||
? "🛑 stopped"
|
||||
: "🛑 stop requested (finishing current step)"
|
||||
: "nothing running";
|
||||
await ctx.reply(statusMsg);
|
||||
});
|
||||
const textLimit = resolveTextChunkLimit(cfg, "telegram");
|
||||
const dmPolicy = cfg.telegram?.dmPolicy ?? "pairing";
|
||||
const allowFrom = opts.allowFrom ?? cfg.telegram?.allowFrom;
|
||||
@@ -602,6 +570,14 @@ export function createTelegramBot(opts: TelegramBotOptions) {
|
||||
}
|
||||
|
||||
const prompt = buildCommandText(command.name, ctx.match ?? "");
|
||||
const route = resolveAgentRoute({
|
||||
cfg,
|
||||
provider: "telegram",
|
||||
peer: {
|
||||
kind: isGroup ? "group" : "dm",
|
||||
id: String(chatId),
|
||||
},
|
||||
});
|
||||
const ctxPayload = {
|
||||
Body: prompt,
|
||||
From: isGroup ? `group:${chatId}` : `telegram:${chatId}`,
|
||||
@@ -618,6 +594,7 @@ export function createTelegramBot(opts: TelegramBotOptions) {
|
||||
CommandAuthorized: commandAuthorized,
|
||||
CommandSource: "native" as const,
|
||||
SessionKey: `telegram:slash:${senderId || chatId}`,
|
||||
CommandTargetSessionKey: route.sessionKey,
|
||||
};
|
||||
|
||||
const replyResult = await getReplyFromConfig(
|
||||
|
||||
Reference in New Issue
Block a user