feat(queue): add queue modes and discord gating

This commit is contained in:
Peter Steinberger
2025-12-26 13:35:44 +01:00
parent e9f1851c5d
commit 8dda07a1e9
16 changed files with 378 additions and 12 deletions

View File

@@ -5,8 +5,11 @@ import path from "node:path";
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
vi.mock("../agents/pi-embedded.js", () => ({
abortEmbeddedPiRun: vi.fn().mockReturnValue(false),
runEmbeddedPiAgent: vi.fn(),
queueEmbeddedPiMessage: vi.fn().mockReturnValue(false),
resolveEmbeddedSessionLane: (key: string) =>
`session:${key.trim() || "main"}`,
}));
vi.mock("../agents/model-catalog.js", () => ({
loadModelCatalog: vi.fn(),
@@ -20,6 +23,7 @@ import {
saveSessionStore,
} from "../config/sessions.js";
import {
extractQueueDirective,
extractThinkDirective,
extractVerboseDirective,
getReplyFromConfig,
@@ -83,6 +87,13 @@ describe("directive parsing", () => {
expect(res.thinkLevel).toBe("high");
});
it("matches queue directive", () => {
const res = extractQueueDirective("please /queue interrupt now");
expect(res.hasDirective).toBe(true);
expect(res.queueMode).toBe("interrupt");
expect(res.cleaned).toBe("please now");
});
it("applies inline think and still runs agent content", async () => {
await withTempHome(async (home) => {
vi.mocked(runEmbeddedPiAgent).mockResolvedValue({
@@ -142,6 +153,33 @@ describe("directive parsing", () => {
});
});
it("acks queue directive and persists override", async () => {
await withTempHome(async (home) => {
vi.mocked(runEmbeddedPiAgent).mockReset();
const storePath = path.join(home, "sessions.json");
const res = await getReplyFromConfig(
{ Body: "/queue interrupt", From: "+1222", To: "+1222" },
{},
{
agent: {
model: "anthropic/claude-opus-4-5",
workspace: path.join(home, "clawd"),
},
routing: { allowFrom: ["*"] },
session: { store: storePath },
},
);
const text = Array.isArray(res) ? res[0]?.text : res?.text;
expect(text).toMatch(/^⚙️ Queue mode set to interrupt\./);
const store = loadSessionStore(storePath);
const entry = Object.values(store)[0];
expect(entry?.queueMode).toBe("interrupt");
expect(runEmbeddedPiAgent).not.toHaveBeenCalled();
});
});
it("updates tool verbose during an in-flight run (toggle on)", async () => {
await withTempHome(async (home) => {
const storePath = path.join(home, "sessions.json");

View File

@@ -4,8 +4,11 @@ import { join } from "node:path";
import { afterEach, describe, expect, it, vi } from "vitest";
vi.mock("../agents/pi-embedded.js", () => ({
abortEmbeddedPiRun: vi.fn().mockReturnValue(false),
runEmbeddedPiAgent: vi.fn(),
queueEmbeddedPiMessage: vi.fn().mockReturnValue(false),
resolveEmbeddedSessionLane: (key: string) =>
`session:${key.trim() || "main"}`,
}));
import { runEmbeddedPiAgent } from "../agents/pi-embedded.js";

View File

@@ -14,7 +14,9 @@ import {
resolveConfiguredModelRef,
} from "../agents/model-selection.js";
import {
abortEmbeddedPiRun,
queueEmbeddedPiMessage,
resolveEmbeddedSessionLane,
runEmbeddedPiAgent,
} from "../agents/pi-embedded.js";
import { buildWorkspaceSkillSnapshot } from "../agents/skills.js";
@@ -37,6 +39,7 @@ import { logVerbose } from "../globals.js";
import { buildProviderSummary } from "../infra/provider-summary.js";
import { triggerClawdisRestart } from "../infra/restart.js";
import { drainSystemEvents } from "../infra/system-events.js";
import { clearCommandLane, getQueueSize } from "../process/command-queue.js";
import { defaultRuntime } from "../runtime.js";
import { normalizeE164 } from "../utils.js";
import { resolveHeartbeatSeconds } from "../web/reconnect.js";
@@ -67,6 +70,8 @@ const SYSTEM_MARK = "⚙️";
const BARE_SESSION_RESET_PROMPT =
"A new session was started via /new or /reset. Say hi briefly (1-2 sentences) and ask what the user wants to do next. Do not mention internal steps, files, tools, or reasoning.";
type QueueMode = "queue" | "interrupt" | "drop";
export function extractThinkDirective(body?: string): {
cleaned: string;
thinkLevel?: ThinkLevel;
@@ -112,6 +117,36 @@ export function extractVerboseDirective(body?: string): {
};
}
function normalizeQueueMode(raw?: string): QueueMode | undefined {
if (!raw) return undefined;
const cleaned = raw.trim().toLowerCase();
if (cleaned === "queue" || cleaned === "queued") return "queue";
if (cleaned === "interrupt" || cleaned === "interrupts" || cleaned === "abort")
return "interrupt";
if (cleaned === "drop" || cleaned === "discard") return "drop";
return undefined;
}
export function extractQueueDirective(body?: string): {
cleaned: string;
queueMode?: QueueMode;
rawMode?: string;
hasDirective: boolean;
} {
if (!body) return { cleaned: "", hasDirective: false };
const match = body.match(/(?:^|\s)\/queue(?=$|\s|:)\s*:?\s*([a-zA-Z-]+)\b/i);
const queueMode = normalizeQueueMode(match?.[1]);
const cleaned = match
? body.replace(match[0], "").replace(/\s+/g, " ").trim()
: body.trim();
return {
cleaned,
queueMode,
rawMode: match?.[1],
hasDirective: !!match,
};
}
function isAbortTrigger(text?: string): boolean {
if (!text) return false;
const normalized = text.trim().toLowerCase();
@@ -156,9 +191,41 @@ function stripMentions(
}
// Generic mention patterns like @123456789 or plain digits
result = result.replace(/@[0-9+]{5,}/g, " ");
// Discord-style mentions (<@123> or <@!123>)
result = result.replace(/<@!?\d+>/g, " ");
return result.replace(/\s+/g, " ").trim();
}
function defaultQueueModeForSurface(surface?: string): QueueMode {
const normalized = surface?.trim().toLowerCase();
if (normalized === "discord") return "queue";
if (normalized === "webchat") return "queue";
return "interrupt";
}
function resolveQueueMode(params: {
cfg: ClawdisConfig;
surface?: string;
sessionEntry?: SessionEntry;
inlineMode?: QueueMode;
}): QueueMode {
const surfaceKey = params.surface?.trim().toLowerCase();
const queueCfg = params.cfg.routing?.queue;
const surfaceMode =
surfaceKey && queueCfg?.bySurface
? (queueCfg.bySurface as Record<string, QueueMode | undefined>)[
surfaceKey
]
: undefined;
return (
params.inlineMode ??
params.sessionEntry?.queueMode ??
surfaceMode ??
queueCfg?.mode ??
defaultQueueModeForSurface(surfaceKey)
);
}
export async function getReplyFromConfig(
ctx: MsgContext,
opts?: GetReplyOptions,
@@ -343,6 +410,7 @@ export async function getReplyFromConfig(
verboseLevel: persistedVerbose ?? baseEntry?.verboseLevel,
modelOverride: persistedModelOverride ?? baseEntry?.modelOverride,
providerOverride: persistedProviderOverride ?? baseEntry?.providerOverride,
queueMode: baseEntry?.queueMode,
};
sessionStore[sessionKey] = sessionEntry;
await saveSessionStore(storePath, sessionStore);
@@ -371,8 +439,14 @@ export async function getReplyFromConfig(
rawModel: rawModelDirective,
hasDirective: hasModelDirective,
} = extractModelDirective(verboseCleaned);
sessionCtx.Body = modelCleaned;
sessionCtx.BodyStripped = modelCleaned;
const {
cleaned: queueCleaned,
queueMode: inlineQueueMode,
rawMode: rawQueueMode,
hasDirective: hasQueueDirective,
} = extractQueueDirective(modelCleaned);
sessionCtx.Body = queueCleaned;
sessionCtx.BodyStripped = queueCleaned;
const defaultGroupActivation = () => {
const requireMention = cfg.routing?.groupChat?.requireMention;
@@ -457,9 +531,14 @@ export async function getReplyFromConfig(
DEFAULT_CONTEXT_TOKENS;
const directiveOnly = (() => {
if (!hasThinkDirective && !hasVerboseDirective && !hasModelDirective)
if (
!hasThinkDirective &&
!hasVerboseDirective &&
!hasModelDirective &&
!hasQueueDirective
)
return false;
const stripped = stripStructuralPrefixes(modelCleaned ?? "");
const stripped = stripStructuralPrefixes(queueCleaned ?? "");
const noMentions = isGroup ? stripMentions(stripped, ctx, cfg) : stripped;
return noMentions.length === 0;
})();
@@ -501,6 +580,12 @@ export async function getReplyFromConfig(
text: `Unrecognized verbose level "${rawVerboseLevel ?? ""}". Valid levels: off, on.`,
};
}
if (hasQueueDirective && !inlineQueueMode) {
cleanupTyping();
return {
text: `Unrecognized queue mode "${rawQueueMode ?? ""}". Valid modes: queue, interrupt, drop.`,
};
}
let modelSelection:
| { provider: string; model: string; isDefault: boolean }
@@ -543,6 +628,9 @@ export async function getReplyFromConfig(
sessionEntry.modelOverride = modelSelection.model;
}
}
if (hasQueueDirective && inlineQueueMode) {
sessionEntry.queueMode = inlineQueueMode;
}
sessionEntry.updatedAt = Date.now();
sessionStore[sessionKey] = sessionEntry;
await saveSessionStore(storePath, sessionStore);
@@ -571,6 +659,9 @@ export async function getReplyFromConfig(
: `Model set to ${label}.`,
);
}
if (hasQueueDirective && inlineQueueMode) {
parts.push(`${SYSTEM_MARK} Queue mode set to ${inlineQueueMode}.`);
}
const ack = parts.join(" ").trim();
cleanupTyping();
return { text: ack || "OK." };
@@ -626,6 +717,7 @@ export async function getReplyFromConfig(
await saveSessionStore(storePath, sessionStore);
}
}
const perMessageQueueMode = hasQueueDirective ? inlineQueueMode : undefined;
// Optional allowlist by origin number (E.164 without whatsapp: prefix)
const configuredAllowFrom = cfg.routing?.allowFrom;
@@ -990,7 +1082,35 @@ export async function getReplyFromConfig(
.trim()
: queueBodyBase;
if (queueEmbeddedPiMessage(sessionIdFinal, queuedBody)) {
const resolvedQueueMode = resolveQueueMode({
cfg,
surface: sessionCtx.Surface,
sessionEntry,
inlineMode: perMessageQueueMode,
});
const sessionLaneKey = resolveEmbeddedSessionLane(
sessionKey ?? sessionIdFinal,
);
const laneSize = getQueueSize(sessionLaneKey);
if (resolvedQueueMode === "drop" && laneSize > 0) {
logVerbose(
`Dropping inbound message for ${sessionLaneKey} (queue busy, mode=drop)`,
);
cleanupTyping();
return undefined;
}
if (resolvedQueueMode === "interrupt" && laneSize > 0) {
const cleared = clearCommandLane(sessionLaneKey);
const aborted = abortEmbeddedPiRun(sessionIdFinal);
logVerbose(
`Interrupting ${sessionLaneKey} (cleared ${cleared}, aborted=${aborted})`,
);
}
if (
resolvedQueueMode === "queue" &&
queueEmbeddedPiMessage(sessionIdFinal, queuedBody)
) {
if (sessionEntry && sessionStore && sessionKey) {
sessionEntry.updatedAt = Date.now();
sessionStore[sessionKey] = sessionEntry;