fix: /stop aborts subagents
This commit is contained in:
@@ -57,6 +57,7 @@
|
||||
|
||||
### Fixes
|
||||
- Messages: make `/stop` clear queued followups and pending session lane work for a hard abort.
|
||||
- Messages: make `/stop` abort active sub-agent runs spawned from the requester session and report how many were stopped.
|
||||
- WhatsApp: default response prefix only for self-chat, using identity name when set.
|
||||
- Signal/iMessage: bound transport readiness waits to 30s with periodic logging. (#1014) — thanks @Szpadel.
|
||||
- Auth: merge main auth profiles into per-agent stores for sub-agents and document inheritance. (#1013) — thanks @marcmarg.
|
||||
|
||||
@@ -106,7 +106,7 @@ Send these as standalone messages so they register.
|
||||
- `clawdbot gateway call sessions.list --params '{}'` — fetch sessions from the running gateway (use `--url`/`--token` for remote gateway access).
|
||||
- Send `/status` as a standalone message in chat to see whether the agent is reachable, how much of the session context is used, current thinking/verbose toggles, and when your WhatsApp web creds were last refreshed (helps spot relink needs).
|
||||
- Send `/context list` or `/context detail` to see what’s in the system prompt and injected workspace files (and the biggest context contributors).
|
||||
- Send `/stop` as a standalone message to abort the current run and clear queued followups for that session.
|
||||
- Send `/stop` as a standalone message to abort the current run, clear queued followups for that session, and stop any sub-agent runs spawned from it (the reply includes the stopped count).
|
||||
- Send `/compact` (optional instructions) as a standalone message to summarize older context and free up window space. See [/concepts/compaction](/concepts/compaction).
|
||||
- JSONL transcripts can be opened directly to review full turns.
|
||||
|
||||
|
||||
@@ -108,6 +108,11 @@ Sub-agents use a dedicated in-process queue lane:
|
||||
- Lane name: `subagent`
|
||||
- Concurrency: `agents.defaults.subagents.maxConcurrent` (default `1`)
|
||||
|
||||
## Stopping
|
||||
|
||||
- Sending `/stop` in the requester chat aborts the requester session and stops any active sub-agent runs spawned from it.
|
||||
- The `/stop` reply includes how many sub-agent runs were stopped.
|
||||
|
||||
## Limitations
|
||||
|
||||
- Sub-agent announce is **best-effort**. If the gateway restarts, pending “announce back” work is lost.
|
||||
|
||||
@@ -354,6 +354,12 @@ export function releaseSubagentRun(runId: string) {
|
||||
if (subagentRuns.size === 0) stopSweeper();
|
||||
}
|
||||
|
||||
export function listSubagentRunsForRequester(requesterSessionKey: string): SubagentRunRecord[] {
|
||||
const key = requesterSessionKey.trim();
|
||||
if (!key) return [];
|
||||
return [...subagentRuns.values()].filter((entry) => entry.requesterSessionKey === key);
|
||||
}
|
||||
|
||||
export function initSubagentRegistry() {
|
||||
restoreSubagentRunsOnce();
|
||||
}
|
||||
|
||||
@@ -222,7 +222,7 @@ describe("trigger handling", () => {
|
||||
makeCfg(home),
|
||||
);
|
||||
const text = Array.isArray(res) ? res[0]?.text : res?.text;
|
||||
expect(text).toBe("⚙️ Agent was aborted.");
|
||||
expect(text).toBe("⚙️ Agent was aborted. Stopped 0 sub-agents.");
|
||||
expect(runEmbeddedPiAgent).not.toHaveBeenCalled();
|
||||
});
|
||||
});
|
||||
@@ -238,7 +238,7 @@ describe("trigger handling", () => {
|
||||
makeCfg(home),
|
||||
);
|
||||
const text = Array.isArray(res) ? res[0]?.text : res?.text;
|
||||
expect(text).toBe("⚙️ Agent was aborted.");
|
||||
expect(text).toBe("⚙️ Agent was aborted. Stopped 0 sub-agents.");
|
||||
expect(runEmbeddedPiAgent).not.toHaveBeenCalled();
|
||||
});
|
||||
});
|
||||
|
||||
@@ -159,7 +159,7 @@ describe("trigger handling", () => {
|
||||
);
|
||||
|
||||
const text = Array.isArray(res) ? res[0]?.text : res?.text;
|
||||
expect(text).toBe("⚙️ Agent was aborted.");
|
||||
expect(text).toBe("⚙️ Agent was aborted. Stopped 0 sub-agents.");
|
||||
expect(vi.mocked(abortEmbeddedPiRun)).toHaveBeenCalledWith(targetSessionId);
|
||||
const store = loadSessionStore(cfg.session.store);
|
||||
expect(store[targetSessionKey]?.abortedLastRun).toBe(true);
|
||||
|
||||
@@ -18,6 +18,14 @@ const commandQueueMocks = vi.hoisted(() => ({
|
||||
|
||||
vi.mock("../../process/command-queue.js", () => commandQueueMocks);
|
||||
|
||||
const subagentRegistryMocks = vi.hoisted(() => ({
|
||||
listSubagentRunsForRequester: vi.fn(() => []),
|
||||
}));
|
||||
|
||||
vi.mock("../../agents/subagent-registry.js", () => ({
|
||||
listSubagentRunsForRequester: subagentRegistryMocks.listSubagentRunsForRequester,
|
||||
}));
|
||||
|
||||
describe("abort detection", () => {
|
||||
it("triggerBodyNormalized extracts /stop from RawBody for abort detection", async () => {
|
||||
const root = await fs.mkdtemp(path.join(os.tmpdir(), "clawdbot-abort-"));
|
||||
@@ -137,4 +145,59 @@ describe("abort detection", () => {
|
||||
expect(getFollowupQueueDepth(sessionKey)).toBe(0);
|
||||
expect(commandQueueMocks.clearCommandLane).toHaveBeenCalledWith(`session:${sessionKey}`);
|
||||
});
|
||||
|
||||
it("fast-abort stops active subagent runs for requester session", async () => {
|
||||
const root = await fs.mkdtemp(path.join(os.tmpdir(), "clawdbot-abort-"));
|
||||
const storePath = path.join(root, "sessions.json");
|
||||
const cfg = { session: { store: storePath } } as ClawdbotConfig;
|
||||
const sessionKey = "telegram:parent";
|
||||
const childKey = "agent:main:subagent:child-1";
|
||||
const sessionId = "session-parent";
|
||||
const childSessionId = "session-child";
|
||||
await fs.writeFile(
|
||||
storePath,
|
||||
JSON.stringify(
|
||||
{
|
||||
[sessionKey]: {
|
||||
sessionId,
|
||||
updatedAt: Date.now(),
|
||||
},
|
||||
[childKey]: {
|
||||
sessionId: childSessionId,
|
||||
updatedAt: Date.now(),
|
||||
},
|
||||
},
|
||||
null,
|
||||
2,
|
||||
),
|
||||
);
|
||||
|
||||
subagentRegistryMocks.listSubagentRunsForRequester.mockReturnValueOnce([
|
||||
{
|
||||
runId: "run-1",
|
||||
childSessionKey: childKey,
|
||||
requesterSessionKey: sessionKey,
|
||||
requesterDisplayKey: "telegram:parent",
|
||||
task: "do work",
|
||||
cleanup: "keep",
|
||||
createdAt: Date.now(),
|
||||
},
|
||||
]);
|
||||
|
||||
const result = await tryFastAbortFromMessage({
|
||||
ctx: {
|
||||
CommandBody: "/stop",
|
||||
RawBody: "/stop",
|
||||
SessionKey: sessionKey,
|
||||
Provider: "telegram",
|
||||
Surface: "telegram",
|
||||
From: "telegram:parent",
|
||||
To: "telegram:parent",
|
||||
},
|
||||
cfg,
|
||||
});
|
||||
|
||||
expect(result.stoppedSubagents).toBe(1);
|
||||
expect(commandQueueMocks.clearCommandLane).toHaveBeenCalledWith(`session:${childKey}`);
|
||||
});
|
||||
});
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
import { resolveSessionAgentId } from "../../agents/agent-scope.js";
|
||||
import { abortEmbeddedPiRun } from "../../agents/pi-embedded.js";
|
||||
import { listSubagentRunsForRequester } from "../../agents/subagent-registry.js";
|
||||
import type { ClawdbotConfig } from "../../config/config.js";
|
||||
import {
|
||||
loadSessionStore,
|
||||
@@ -14,6 +15,7 @@ import type { MsgContext } from "../templating.js";
|
||||
import { logVerbose } from "../../globals.js";
|
||||
import { stripMentions, stripStructuralPrefixes } from "./mentions.js";
|
||||
import { clearSessionQueues } from "./queue.js";
|
||||
import { resolveInternalSessionKey, resolveMainSessionAlias } from "../../agents/tools/sessions-helpers.js";
|
||||
|
||||
const ABORT_TRIGGERS = new Set(["stop", "esc", "abort", "wait", "exit", "interrupt"]);
|
||||
const ABORT_MEMORY = new Map<string, boolean>();
|
||||
@@ -32,6 +34,14 @@ export function setAbortMemory(key: string, value: boolean): void {
|
||||
ABORT_MEMORY.set(key, value);
|
||||
}
|
||||
|
||||
export function formatAbortReplyText(stoppedSubagents?: number): string {
|
||||
if (typeof stoppedSubagents !== "number") {
|
||||
return "⚙️ Agent was aborted.";
|
||||
}
|
||||
const label = stoppedSubagents === 1 ? "sub-agent" : "sub-agents";
|
||||
return `⚙️ Agent was aborted. Stopped ${stoppedSubagents} ${label}.`;
|
||||
}
|
||||
|
||||
function resolveSessionEntryForKey(
|
||||
store: Record<string, SessionEntry> | undefined,
|
||||
sessionKey: string | undefined,
|
||||
@@ -54,10 +64,62 @@ function resolveAbortTargetKey(ctx: MsgContext): string | undefined {
|
||||
return sessionKey || undefined;
|
||||
}
|
||||
|
||||
function normalizeRequesterSessionKey(
|
||||
cfg: ClawdbotConfig,
|
||||
key: string | undefined,
|
||||
): string | undefined {
|
||||
const cleaned = key?.trim();
|
||||
if (!cleaned) return undefined;
|
||||
const { mainKey, alias } = resolveMainSessionAlias(cfg);
|
||||
return resolveInternalSessionKey({ key: cleaned, alias, mainKey });
|
||||
}
|
||||
|
||||
export function stopSubagentsForRequester(params: {
|
||||
cfg: ClawdbotConfig;
|
||||
requesterSessionKey?: string;
|
||||
}): { stopped: number } {
|
||||
const requesterKey = normalizeRequesterSessionKey(params.cfg, params.requesterSessionKey);
|
||||
if (!requesterKey) return { stopped: 0 };
|
||||
const runs = listSubagentRunsForRequester(requesterKey);
|
||||
if (runs.length === 0) return { stopped: 0 };
|
||||
|
||||
const storeCache = new Map<string, Record<string, SessionEntry>>();
|
||||
const seenChildKeys = new Set<string>();
|
||||
let stopped = 0;
|
||||
|
||||
for (const run of runs) {
|
||||
if (run.endedAt) continue;
|
||||
const childKey = run.childSessionKey?.trim();
|
||||
if (!childKey || seenChildKeys.has(childKey)) continue;
|
||||
seenChildKeys.add(childKey);
|
||||
|
||||
const cleared = clearSessionQueues([childKey]);
|
||||
const parsed = parseAgentSessionKey(childKey);
|
||||
const storePath = resolveStorePath(params.cfg.session?.store, { agentId: parsed?.agentId });
|
||||
let store = storeCache.get(storePath);
|
||||
if (!store) {
|
||||
store = loadSessionStore(storePath);
|
||||
storeCache.set(storePath, store);
|
||||
}
|
||||
const entry = store[childKey];
|
||||
const sessionId = entry?.sessionId;
|
||||
const aborted = sessionId ? abortEmbeddedPiRun(sessionId) : false;
|
||||
|
||||
if (aborted || cleared.followupCleared > 0 || cleared.laneCleared > 0) {
|
||||
stopped += 1;
|
||||
}
|
||||
}
|
||||
|
||||
if (stopped > 0) {
|
||||
logVerbose(`abort: stopped ${stopped} subagent run(s) for ${requesterKey}`);
|
||||
}
|
||||
return { stopped };
|
||||
}
|
||||
|
||||
export async function tryFastAbortFromMessage(params: {
|
||||
ctx: MsgContext;
|
||||
cfg: ClawdbotConfig;
|
||||
}): Promise<{ handled: boolean; aborted: boolean }> {
|
||||
}): Promise<{ handled: boolean; aborted: boolean; stoppedSubagents?: number }> {
|
||||
const { ctx, cfg } = params;
|
||||
const commandAuthorized = ctx.CommandAuthorized ?? true;
|
||||
const auth = resolveCommandAuthorization({
|
||||
@@ -81,6 +143,7 @@ export async function tryFastAbortFromMessage(params: {
|
||||
if (!abortRequested) return { handled: false, aborted: false };
|
||||
|
||||
const abortKey = targetKey ?? auth.from ?? auth.to;
|
||||
const requesterSessionKey = targetKey ?? ctx.SessionKey ?? abortKey;
|
||||
|
||||
if (targetKey) {
|
||||
const storePath = resolveStorePath(cfg.session?.store, { agentId });
|
||||
@@ -108,11 +171,13 @@ export async function tryFastAbortFromMessage(params: {
|
||||
} else if (abortKey) {
|
||||
setAbortMemory(abortKey, true);
|
||||
}
|
||||
return { handled: true, aborted };
|
||||
const { stopped } = stopSubagentsForRequester({ cfg, requesterSessionKey });
|
||||
return { handled: true, aborted, stoppedSubagents: stopped };
|
||||
}
|
||||
|
||||
if (abortKey) {
|
||||
setAbortMemory(abortKey, true);
|
||||
}
|
||||
return { handled: true, aborted: false };
|
||||
const { stopped } = stopSubagentsForRequester({ cfg, requesterSessionKey });
|
||||
return { handled: true, aborted: false, stoppedSubagents: stopped };
|
||||
}
|
||||
|
||||
@@ -6,7 +6,7 @@ import { scheduleGatewaySigusr1Restart, triggerClawdbotRestart } from "../../inf
|
||||
import { parseAgentSessionKey } from "../../routing/session-key.js";
|
||||
import { parseActivationCommand } from "../group-activation.js";
|
||||
import { parseSendPolicyCommand } from "../send-policy.js";
|
||||
import { isAbortTrigger, setAbortMemory } from "./abort.js";
|
||||
import { formatAbortReplyText, isAbortTrigger, setAbortMemory, stopSubagentsForRequester } from "./abort.js";
|
||||
import { clearSessionQueues } from "./queue.js";
|
||||
import type { CommandHandler } from "./commands-types.js";
|
||||
|
||||
@@ -208,7 +208,14 @@ export const handleStopCommand: CommandHandler = async (params, allowTextCommand
|
||||
} else if (params.command.abortKey) {
|
||||
setAbortMemory(params.command.abortKey, true);
|
||||
}
|
||||
return { shouldContinue: false, reply: { text: "⚙️ Agent was aborted." } };
|
||||
const { stopped } = stopSubagentsForRequester({
|
||||
cfg: params.cfg,
|
||||
requesterSessionKey: abortTarget.key ?? params.sessionKey,
|
||||
});
|
||||
return {
|
||||
shouldContinue: false,
|
||||
reply: { text: formatAbortReplyText(stopped) },
|
||||
};
|
||||
};
|
||||
|
||||
export const handleAbortTrigger: CommandHandler = async (params, allowTextCommands) => {
|
||||
@@ -241,5 +248,12 @@ export const handleAbortTrigger: CommandHandler = async (params, allowTextComman
|
||||
} else if (params.command.abortKey) {
|
||||
setAbortMemory(params.command.abortKey, true);
|
||||
}
|
||||
return { shouldContinue: false, reply: { text: "⚙️ Agent was aborted." } };
|
||||
const { stopped } = stopSubagentsForRequester({
|
||||
cfg: params.cfg,
|
||||
requesterSessionKey: abortTarget.key ?? params.sessionKey,
|
||||
});
|
||||
return {
|
||||
shouldContinue: false,
|
||||
reply: { text: formatAbortReplyText(stopped) },
|
||||
};
|
||||
};
|
||||
|
||||
@@ -24,6 +24,11 @@ vi.mock("./route-reply.js", () => ({
|
||||
|
||||
vi.mock("./abort.js", () => ({
|
||||
tryFastAbortFromMessage: mocks.tryFastAbortFromMessage,
|
||||
formatAbortReplyText: (stoppedSubagents?: number) => {
|
||||
if (typeof stoppedSubagents !== "number") return "⚙️ Agent was aborted.";
|
||||
const label = stoppedSubagents === 1 ? "sub-agent" : "sub-agents";
|
||||
return `⚙️ Agent was aborted. Stopped ${stoppedSubagents} ${label}.`;
|
||||
},
|
||||
}));
|
||||
|
||||
const { dispatchReplyFromConfig } = await import("./dispatch-from-config.js");
|
||||
@@ -123,6 +128,31 @@ describe("dispatchReplyFromConfig", () => {
|
||||
});
|
||||
});
|
||||
|
||||
it("fast-abort reply includes stopped subagent count when provided", async () => {
|
||||
mocks.tryFastAbortFromMessage.mockResolvedValue({
|
||||
handled: true,
|
||||
aborted: true,
|
||||
stoppedSubagents: 2,
|
||||
});
|
||||
const cfg = {} as ClawdbotConfig;
|
||||
const dispatcher = createDispatcher();
|
||||
const ctx: MsgContext = {
|
||||
Provider: "telegram",
|
||||
Body: "/stop",
|
||||
};
|
||||
|
||||
await dispatchReplyFromConfig({
|
||||
ctx,
|
||||
cfg,
|
||||
dispatcher,
|
||||
replyResolver: vi.fn(async () => ({ text: "hi" }) as ReplyPayload),
|
||||
});
|
||||
|
||||
expect(dispatcher.sendFinalReply).toHaveBeenCalledWith({
|
||||
text: "⚙️ Agent was aborted. Stopped 2 sub-agents.",
|
||||
});
|
||||
});
|
||||
|
||||
it("deduplicates inbound messages by MessageSid and origin", async () => {
|
||||
mocks.tryFastAbortFromMessage.mockResolvedValue({
|
||||
handled: false,
|
||||
|
||||
@@ -3,7 +3,7 @@ import { logVerbose } from "../../globals.js";
|
||||
import { getReplyFromConfig } from "../reply.js";
|
||||
import type { MsgContext } from "../templating.js";
|
||||
import type { GetReplyOptions, ReplyPayload } from "../types.js";
|
||||
import { tryFastAbortFromMessage } from "./abort.js";
|
||||
import { formatAbortReplyText, tryFastAbortFromMessage } from "./abort.js";
|
||||
import { shouldSkipDuplicateInbound } from "./inbound-dedupe.js";
|
||||
import type { ReplyDispatcher, ReplyDispatchKind } from "./reply-dispatcher.js";
|
||||
import { isRoutableChannel, routeReply } from "./route-reply.js";
|
||||
@@ -70,7 +70,9 @@ export async function dispatchReplyFromConfig(params: {
|
||||
|
||||
const fastAbort = await tryFastAbortFromMessage({ ctx, cfg });
|
||||
if (fastAbort.handled) {
|
||||
const payload = { text: "⚙️ Agent was aborted." } satisfies ReplyPayload;
|
||||
const payload = {
|
||||
text: formatAbortReplyText(fastAbort.stoppedSubagents),
|
||||
} satisfies ReplyPayload;
|
||||
let queuedFinal = false;
|
||||
let routedFinalCount = 0;
|
||||
if (shouldRouteToOriginating && originatingChannel && originatingTo) {
|
||||
|
||||
Reference in New Issue
Block a user