refactor: fast-lane directives helpers

This commit is contained in:
Peter Steinberger
2026-01-12 22:33:59 +00:00
parent 9f90d0721a
commit fd768334a9
5 changed files with 311 additions and 89 deletions

View File

@@ -455,6 +455,72 @@ describe("directive behavior", () => {
});
});
it("keeps reasoning acks for rapid mixed directives", async () => {
await withTempHome(async (home) => {
vi.mocked(runEmbeddedPiAgent).mockResolvedValue({
payloads: [{ text: "ok" }],
meta: {
durationMs: 5,
agentMeta: { sessionId: "s", provider: "p", model: "m" },
},
});
const blockReplies: string[] = [];
const storePath = path.join(home, "sessions.json");
await getReplyFromConfig(
{
Body: "do it\n/reasoning on",
From: "+1222",
To: "+1222",
Provider: "whatsapp",
},
{
onBlockReply: (payload) => {
if (payload.text) blockReplies.push(payload.text);
},
},
{
agents: {
defaults: {
model: "anthropic/claude-opus-4-5",
workspace: path.join(home, "clawd"),
},
},
whatsapp: { allowFrom: ["*"] },
session: { store: storePath },
},
);
await getReplyFromConfig(
{
Body: "again\n/reasoning on",
From: "+1222",
To: "+1222",
Provider: "whatsapp",
},
{
onBlockReply: (payload) => {
if (payload.text) blockReplies.push(payload.text);
},
},
{
agents: {
defaults: {
model: "anthropic/claude-opus-4-5",
workspace: path.join(home, "clawd"),
},
},
whatsapp: { allowFrom: ["*"] },
session: { store: storePath },
},
);
expect(runEmbeddedPiAgent).toHaveBeenCalledTimes(2);
expect(blockReplies.length).toBe(0);
});
});
it("acks verbose directive immediately with system marker", async () => {
await withTempHome(async (home) => {
vi.mocked(runEmbeddedPiAgent).mockReset();

View File

@@ -57,6 +57,7 @@ import {
handleCommands,
} from "./reply/commands.js";
import {
applyInlineDirectivesFastLane,
handleDirectiveOnly,
type InlineDirectives,
isDirectiveOnly,
@@ -810,6 +811,54 @@ export async function getReplyFromConfig(
return statusReply ?? directiveReply;
}
const hasAnyDirective =
directives.hasThinkDirective ||
directives.hasVerboseDirective ||
directives.hasReasoningDirective ||
directives.hasElevatedDirective ||
directives.hasModelDirective ||
directives.hasQueueDirective ||
directives.hasStatusDirective;
if (hasAnyDirective && command.isAuthorizedSender) {
const fastLane = await applyInlineDirectivesFastLane({
directives,
commandAuthorized: command.isAuthorizedSender,
ctx,
cfg,
agentId,
isGroup,
sessionEntry,
sessionStore,
sessionKey,
storePath,
elevatedEnabled,
elevatedAllowed,
elevatedFailures,
messageProviderKey,
defaultProvider,
defaultModel,
aliasIndex,
allowedModelKeys: modelState.allowedModelKeys,
allowedModelCatalog: modelState.allowedModelCatalog,
resetModelOverride: modelState.resetModelOverride,
provider,
model,
initialModelLabel,
formatModelSwitchEvent,
agentCfg,
modelState: {
resolveDefaultThinkingLevel: modelState.resolveDefaultThinkingLevel,
allowedModelKeys: modelState.allowedModelKeys,
allowedModelCatalog: modelState.allowedModelCatalog,
resetModelOverride: modelState.resetModelOverride,
},
});
directiveAck = fastLane.directiveAck;
provider = fastLane.provider;
model = fastLane.model;
}
const persisted = await persistInlineDirectives({
directives,
effectiveModelDirective,
@@ -854,77 +903,6 @@ export async function getReplyFromConfig(
await opts.onBlockReply(reply);
};
const hasAnyDirective =
directives.hasThinkDirective ||
directives.hasVerboseDirective ||
directives.hasReasoningDirective ||
directives.hasElevatedDirective ||
directives.hasModelDirective ||
directives.hasQueueDirective ||
directives.hasStatusDirective;
if (
hasAnyDirective &&
command.isAuthorizedSender &&
!isDirectiveOnly({
directives,
cleanedBody: directives.cleaned,
ctx,
cfg,
agentId,
isGroup,
})
) {
const resolvedDefaultThinkLevel =
(sessionEntry?.thinkingLevel as ThinkLevel | undefined) ??
(agentCfg?.thinkingDefault as ThinkLevel | undefined) ??
(await modelState.resolveDefaultThinkingLevel());
const currentThinkLevel = resolvedDefaultThinkLevel;
const currentVerboseLevel =
(sessionEntry?.verboseLevel as VerboseLevel | undefined) ??
(agentCfg?.verboseDefault as VerboseLevel | undefined);
const currentReasoningLevel =
(sessionEntry?.reasoningLevel as ReasoningLevel | undefined) ?? "off";
const currentElevatedLevel =
(sessionEntry?.elevatedLevel as ElevatedLevel | undefined) ??
(agentCfg?.elevatedDefault as ElevatedLevel | undefined);
directiveAck = await handleDirectiveOnly({
cfg,
directives,
sessionEntry,
sessionStore,
sessionKey,
storePath,
elevatedEnabled,
elevatedAllowed,
elevatedFailures,
messageProviderKey,
defaultProvider,
defaultModel,
aliasIndex,
allowedModelKeys: modelState.allowedModelKeys,
allowedModelCatalog: modelState.allowedModelCatalog,
resetModelOverride: modelState.resetModelOverride,
provider,
model,
initialModelLabel,
formatModelSwitchEvent,
currentThinkLevel,
currentVerboseLevel,
currentReasoningLevel,
currentElevatedLevel,
});
// Refresh provider/model from session overrides applied by directives.
if (sessionEntry?.providerOverride) {
provider = sessionEntry.providerOverride;
}
if (sessionEntry?.modelOverride) {
model = sessionEntry.modelOverride;
}
}
const inlineCommand =
allowTextCommands && command.isAuthorizedSender
? extractInlineSimpleCommand(cleanedBody)

View File

@@ -62,6 +62,12 @@ import {
} from "./queue.js";
const SYSTEM_MARK = "⚙️";
export const formatDirectiveAck = (text: string): string => {
if (!text) return text;
if (text.startsWith(SYSTEM_MARK)) return text;
return `${SYSTEM_MARK} ${text}`;
};
const formatOptionsLine = (options: string) => `Options: ${options}.`;
const withOptions = (line: string, options: string) =>
`${line}\n${formatOptionsLine(options)}`;
@@ -597,6 +603,135 @@ export function isDirectiveOnly(params: {
return noMentions.length === 0;
}
export async function applyInlineDirectivesFastLane(params: {
directives: InlineDirectives;
commandAuthorized: boolean;
ctx: MsgContext;
cfg: ClawdbotConfig;
agentId?: string;
isGroup: boolean;
sessionEntry?: SessionEntry;
sessionStore?: Record<string, SessionEntry>;
sessionKey: string;
storePath?: string;
elevatedEnabled: boolean;
elevatedAllowed: boolean;
elevatedFailures?: Array<{ gate: string; key: string }>;
messageProviderKey?: string;
defaultProvider: string;
defaultModel: string;
aliasIndex: ModelAliasIndex;
allowedModelKeys: Set<string>;
allowedModelCatalog: Awaited<
ReturnType<typeof import("../../agents/model-catalog.js").loadModelCatalog>
>;
resetModelOverride: boolean;
provider: string;
model: string;
initialModelLabel: string;
formatModelSwitchEvent: (label: string, alias?: string) => string;
agentCfg?: NonNullable<ClawdbotConfig["agents"]>["defaults"];
modelState: {
resolveDefaultThinkingLevel: () => Promise<ThinkLevel>;
allowedModelKeys: Set<string>;
allowedModelCatalog: Awaited<
ReturnType<typeof import("../../agents/model-catalog.js").loadModelCatalog>
>;
resetModelOverride: boolean;
};
}): Promise<{ directiveAck?: ReplyPayload; provider: string; model: string }> {
const {
directives,
commandAuthorized,
ctx,
cfg,
agentId,
isGroup,
sessionEntry,
sessionStore,
sessionKey,
storePath,
elevatedEnabled,
elevatedAllowed,
elevatedFailures,
messageProviderKey,
defaultProvider,
defaultModel,
aliasIndex,
allowedModelKeys,
allowedModelCatalog,
resetModelOverride,
formatModelSwitchEvent,
modelState,
} = params;
let { provider, model } = params;
if (
!commandAuthorized ||
isDirectiveOnly({
directives,
cleanedBody: directives.cleaned,
ctx,
cfg,
agentId,
isGroup,
})
) {
return { directiveAck: undefined, provider, model };
}
const agentCfg = params.agentCfg;
const resolvedDefaultThinkLevel =
(sessionEntry?.thinkingLevel as ThinkLevel | undefined) ??
(agentCfg?.thinkingDefault as ThinkLevel | undefined) ??
(await modelState.resolveDefaultThinkingLevel());
const currentThinkLevel = resolvedDefaultThinkLevel;
const currentVerboseLevel =
(sessionEntry?.verboseLevel as VerboseLevel | undefined) ??
(agentCfg?.verboseDefault as VerboseLevel | undefined);
const currentReasoningLevel =
(sessionEntry?.reasoningLevel as ReasoningLevel | undefined) ?? "off";
const currentElevatedLevel =
(sessionEntry?.elevatedLevel as ElevatedLevel | undefined) ??
(agentCfg?.elevatedDefault as ElevatedLevel | undefined);
const directiveAck = await handleDirectiveOnly({
cfg,
directives,
sessionEntry,
sessionStore,
sessionKey,
storePath,
elevatedEnabled,
elevatedAllowed,
elevatedFailures,
messageProviderKey,
defaultProvider,
defaultModel,
aliasIndex,
allowedModelKeys,
allowedModelCatalog,
resetModelOverride,
provider,
model,
initialModelLabel: params.initialModelLabel,
formatModelSwitchEvent,
currentThinkLevel,
currentVerboseLevel,
currentReasoningLevel,
currentElevatedLevel,
});
if (sessionEntry?.providerOverride) {
provider = sessionEntry.providerOverride;
}
if (sessionEntry?.modelOverride) {
model = sessionEntry.modelOverride;
}
return { directiveAck, provider, model };
}
export async function handleDirectiveOnly(params: {
cfg: ClawdbotConfig;
directives: InlineDirectives;
@@ -1186,24 +1321,24 @@ export async function handleDirectiveOnly(params: {
if (directives.hasVerboseDirective && directives.verboseLevel) {
parts.push(
directives.verboseLevel === "off"
? `${SYSTEM_MARK} Verbose logging disabled.`
: `${SYSTEM_MARK} Verbose logging enabled.`,
? formatDirectiveAck("Verbose logging disabled.")
: formatDirectiveAck("Verbose logging enabled."),
);
}
if (directives.hasReasoningDirective && directives.reasoningLevel) {
parts.push(
directives.reasoningLevel === "off"
? `${SYSTEM_MARK} Reasoning visibility disabled.`
? formatDirectiveAck("Reasoning visibility disabled.")
: directives.reasoningLevel === "stream"
? `${SYSTEM_MARK} Reasoning stream enabled (Telegram only).`
: `${SYSTEM_MARK} Reasoning visibility enabled.`,
? formatDirectiveAck("Reasoning stream enabled (Telegram only).")
: formatDirectiveAck("Reasoning visibility enabled."),
);
}
if (directives.hasElevatedDirective && directives.elevatedLevel) {
parts.push(
directives.elevatedLevel === "off"
? `${SYSTEM_MARK} Elevated mode disabled.`
: `${SYSTEM_MARK} Elevated mode enabled.`,
? formatDirectiveAck("Elevated mode disabled.")
: formatDirectiveAck("Elevated mode enabled."),
);
if (shouldHintDirectRuntime) parts.push(formatElevatedRuntimeHint());
}
@@ -1222,23 +1357,23 @@ export async function handleDirectiveOnly(params: {
}
}
if (directives.hasQueueDirective && directives.queueMode) {
parts.push(`${SYSTEM_MARK} Queue mode set to ${directives.queueMode}.`);
parts.push(formatDirectiveAck(`Queue mode set to ${directives.queueMode}.`));
} else if (directives.hasQueueDirective && directives.queueReset) {
parts.push(`${SYSTEM_MARK} Queue mode reset to default.`);
parts.push(formatDirectiveAck("Queue mode reset to default."));
}
if (
directives.hasQueueDirective &&
typeof directives.debounceMs === "number"
) {
parts.push(
`${SYSTEM_MARK} Queue debounce set to ${directives.debounceMs}ms.`,
formatDirectiveAck(`Queue debounce set to ${directives.debounceMs}ms.`),
);
}
if (directives.hasQueueDirective && typeof directives.cap === "number") {
parts.push(`${SYSTEM_MARK} Queue cap set to ${directives.cap}.`);
parts.push(formatDirectiveAck(`Queue cap set to ${directives.cap}.`));
}
if (directives.hasQueueDirective && directives.dropPolicy) {
parts.push(`${SYSTEM_MARK} Queue drop set to ${directives.dropPolicy}.`);
parts.push(formatDirectiveAck(`Queue drop set to ${directives.dropPolicy}.`));
}
const ack = parts.join(" ").trim();
if (!ack && directives.hasStatusDirective) return undefined;

View File

@@ -117,7 +117,7 @@ describe("followup queue deduplication", () => {
);
expect(first).toBe(true);
// Second enqueue with same prompt should be allowed (we only dedupe with message id)
// Second enqueue with same prompt should be allowed (default dedupe: message id only)
const second = enqueueFollowupRun(
key,
createRun({
@@ -173,6 +173,40 @@ describe("followup queue deduplication", () => {
);
expect(second).toBe(true);
});
it("can opt-in to prompt-based dedupe when message id is absent", async () => {
const key = `test-dedup-prompt-mode-${Date.now()}`;
const settings: QueueSettings = {
mode: "collect",
debounceMs: 0,
cap: 50,
dropPolicy: "summarize",
};
const first = enqueueFollowupRun(
key,
createRun({
prompt: "Hello world",
originatingChannel: "whatsapp",
originatingTo: "+1234567890",
}),
settings,
"prompt",
);
expect(first).toBe(true);
const second = enqueueFollowupRun(
key,
createRun({
prompt: "Hello world",
originatingChannel: "whatsapp",
originatingTo: "+1234567890",
}),
settings,
"prompt",
);
expect(second).toBe(false);
});
});
describe("followup queue collect routing", () => {

View File

@@ -25,6 +25,7 @@ export type QueueSettings = {
cap?: number;
dropPolicy?: QueueDropPolicy;
};
export type QueueDedupeMode = "message-id" | "prompt" | "none";
export type FollowupRun = {
prompt: string;
/** Provider message ID, when available (for deduplication). */
@@ -345,6 +346,7 @@ function getFollowupQueue(
function isRunAlreadyQueued(
run: FollowupRun,
queue: FollowupQueueState,
allowPromptFallback = false,
): boolean {
const hasSameRouting = (item: FollowupRun) =>
item.originatingChannel === run.originatingChannel &&
@@ -358,21 +360,28 @@ function isRunAlreadyQueued(
(item) => item.messageId?.trim() === messageId && hasSameRouting(item),
);
}
// Without a provider message id, avoid prompt-based dedupe to ensure rapid
// directive messages are not dropped.
return false;
if (!allowPromptFallback) return false;
return queue.items.some(
(item) => item.prompt === run.prompt && hasSameRouting(item),
);
}
export function enqueueFollowupRun(
key: string,
run: FollowupRun,
settings: QueueSettings,
dedupeMode: QueueDedupeMode = "message-id",
): boolean {
const queue = getFollowupQueue(key, settings);
// Deduplicate: skip if the same message is already queued.
if (isRunAlreadyQueued(run, queue)) {
return false;
if (dedupeMode !== "none") {
if (dedupeMode === "message-id" && isRunAlreadyQueued(run, queue)) {
return false;
}
if (dedupeMode === "prompt" && isRunAlreadyQueued(run, queue, true)) {
return false;
}
}
queue.lastEnqueuedAt = Date.now();