From fd768334a9175f50bff2edcbc199b766d994e262 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Mon, 12 Jan 2026 22:33:59 +0000 Subject: [PATCH] refactor: fast-lane directives helpers --- src/auto-reply/reply.directive.test.ts | 66 ++++++++ src/auto-reply/reply.ts | 120 ++++++------- src/auto-reply/reply/directive-handling.ts | 159 ++++++++++++++++-- .../reply/queue.collect-routing.test.ts | 36 +++- src/auto-reply/reply/queue.ts | 19 ++- 5 files changed, 311 insertions(+), 89 deletions(-) diff --git a/src/auto-reply/reply.directive.test.ts b/src/auto-reply/reply.directive.test.ts index 4b9199a48..46166c2f1 100644 --- a/src/auto-reply/reply.directive.test.ts +++ b/src/auto-reply/reply.directive.test.ts @@ -455,6 +455,72 @@ describe("directive behavior", () => { }); }); + it("keeps reasoning acks for rapid mixed directives", async () => { + await withTempHome(async (home) => { + vi.mocked(runEmbeddedPiAgent).mockResolvedValue({ + payloads: [{ text: "ok" }], + meta: { + durationMs: 5, + agentMeta: { sessionId: "s", provider: "p", model: "m" }, + }, + }); + + const blockReplies: string[] = []; + const storePath = path.join(home, "sessions.json"); + + await getReplyFromConfig( + { + Body: "do it\n/reasoning on", + From: "+1222", + To: "+1222", + Provider: "whatsapp", + }, + { + onBlockReply: (payload) => { + if (payload.text) blockReplies.push(payload.text); + }, + }, + { + agents: { + defaults: { + model: "anthropic/claude-opus-4-5", + workspace: path.join(home, "clawd"), + }, + }, + whatsapp: { allowFrom: ["*"] }, + session: { store: storePath }, + }, + ); + + await getReplyFromConfig( + { + Body: "again\n/reasoning on", + From: "+1222", + To: "+1222", + Provider: "whatsapp", + }, + { + onBlockReply: (payload) => { + if (payload.text) blockReplies.push(payload.text); + }, + }, + { + agents: { + defaults: { + model: "anthropic/claude-opus-4-5", + workspace: path.join(home, "clawd"), + }, + }, + whatsapp: { allowFrom: ["*"] }, + session: { store: storePath }, + }, + ); + + expect(runEmbeddedPiAgent).toHaveBeenCalledTimes(2); + expect(blockReplies.length).toBe(0); + }); + }); + it("acks verbose directive immediately with system marker", async () => { await withTempHome(async (home) => { vi.mocked(runEmbeddedPiAgent).mockReset(); diff --git a/src/auto-reply/reply.ts b/src/auto-reply/reply.ts index 38561bf2f..7299dadaa 100644 --- a/src/auto-reply/reply.ts +++ b/src/auto-reply/reply.ts @@ -57,6 +57,7 @@ import { handleCommands, } from "./reply/commands.js"; import { + applyInlineDirectivesFastLane, handleDirectiveOnly, type InlineDirectives, isDirectiveOnly, @@ -810,6 +811,54 @@ export async function getReplyFromConfig( return statusReply ?? directiveReply; } + const hasAnyDirective = + directives.hasThinkDirective || + directives.hasVerboseDirective || + directives.hasReasoningDirective || + directives.hasElevatedDirective || + directives.hasModelDirective || + directives.hasQueueDirective || + directives.hasStatusDirective; + + if (hasAnyDirective && command.isAuthorizedSender) { + const fastLane = await applyInlineDirectivesFastLane({ + directives, + commandAuthorized: command.isAuthorizedSender, + ctx, + cfg, + agentId, + isGroup, + sessionEntry, + sessionStore, + sessionKey, + storePath, + elevatedEnabled, + elevatedAllowed, + elevatedFailures, + messageProviderKey, + defaultProvider, + defaultModel, + aliasIndex, + allowedModelKeys: modelState.allowedModelKeys, + allowedModelCatalog: modelState.allowedModelCatalog, + resetModelOverride: modelState.resetModelOverride, + provider, + model, + initialModelLabel, + formatModelSwitchEvent, + agentCfg, + modelState: { + resolveDefaultThinkingLevel: modelState.resolveDefaultThinkingLevel, + allowedModelKeys: modelState.allowedModelKeys, + allowedModelCatalog: modelState.allowedModelCatalog, + resetModelOverride: modelState.resetModelOverride, + }, + }); + directiveAck = fastLane.directiveAck; + provider = fastLane.provider; + model = fastLane.model; + } + const persisted = await persistInlineDirectives({ directives, effectiveModelDirective, @@ -854,77 +903,6 @@ export async function getReplyFromConfig( await opts.onBlockReply(reply); }; - const hasAnyDirective = - directives.hasThinkDirective || - directives.hasVerboseDirective || - directives.hasReasoningDirective || - directives.hasElevatedDirective || - directives.hasModelDirective || - directives.hasQueueDirective || - directives.hasStatusDirective; - - if ( - hasAnyDirective && - command.isAuthorizedSender && - !isDirectiveOnly({ - directives, - cleanedBody: directives.cleaned, - ctx, - cfg, - agentId, - isGroup, - }) - ) { - const resolvedDefaultThinkLevel = - (sessionEntry?.thinkingLevel as ThinkLevel | undefined) ?? - (agentCfg?.thinkingDefault as ThinkLevel | undefined) ?? - (await modelState.resolveDefaultThinkingLevel()); - const currentThinkLevel = resolvedDefaultThinkLevel; - const currentVerboseLevel = - (sessionEntry?.verboseLevel as VerboseLevel | undefined) ?? - (agentCfg?.verboseDefault as VerboseLevel | undefined); - const currentReasoningLevel = - (sessionEntry?.reasoningLevel as ReasoningLevel | undefined) ?? "off"; - const currentElevatedLevel = - (sessionEntry?.elevatedLevel as ElevatedLevel | undefined) ?? - (agentCfg?.elevatedDefault as ElevatedLevel | undefined); - - directiveAck = await handleDirectiveOnly({ - cfg, - directives, - sessionEntry, - sessionStore, - sessionKey, - storePath, - elevatedEnabled, - elevatedAllowed, - elevatedFailures, - messageProviderKey, - defaultProvider, - defaultModel, - aliasIndex, - allowedModelKeys: modelState.allowedModelKeys, - allowedModelCatalog: modelState.allowedModelCatalog, - resetModelOverride: modelState.resetModelOverride, - provider, - model, - initialModelLabel, - formatModelSwitchEvent, - currentThinkLevel, - currentVerboseLevel, - currentReasoningLevel, - currentElevatedLevel, - }); - - // Refresh provider/model from session overrides applied by directives. - if (sessionEntry?.providerOverride) { - provider = sessionEntry.providerOverride; - } - if (sessionEntry?.modelOverride) { - model = sessionEntry.modelOverride; - } - } - const inlineCommand = allowTextCommands && command.isAuthorizedSender ? extractInlineSimpleCommand(cleanedBody) diff --git a/src/auto-reply/reply/directive-handling.ts b/src/auto-reply/reply/directive-handling.ts index 7395bf6c2..658425b12 100644 --- a/src/auto-reply/reply/directive-handling.ts +++ b/src/auto-reply/reply/directive-handling.ts @@ -62,6 +62,12 @@ import { } from "./queue.js"; const SYSTEM_MARK = "⚙️"; +export const formatDirectiveAck = (text: string): string => { + if (!text) return text; + if (text.startsWith(SYSTEM_MARK)) return text; + return `${SYSTEM_MARK} ${text}`; +}; + const formatOptionsLine = (options: string) => `Options: ${options}.`; const withOptions = (line: string, options: string) => `${line}\n${formatOptionsLine(options)}`; @@ -597,6 +603,135 @@ export function isDirectiveOnly(params: { return noMentions.length === 0; } +export async function applyInlineDirectivesFastLane(params: { + directives: InlineDirectives; + commandAuthorized: boolean; + ctx: MsgContext; + cfg: ClawdbotConfig; + agentId?: string; + isGroup: boolean; + sessionEntry?: SessionEntry; + sessionStore?: Record; + sessionKey: string; + storePath?: string; + elevatedEnabled: boolean; + elevatedAllowed: boolean; + elevatedFailures?: Array<{ gate: string; key: string }>; + messageProviderKey?: string; + defaultProvider: string; + defaultModel: string; + aliasIndex: ModelAliasIndex; + allowedModelKeys: Set; + allowedModelCatalog: Awaited< + ReturnType + >; + resetModelOverride: boolean; + provider: string; + model: string; + initialModelLabel: string; + formatModelSwitchEvent: (label: string, alias?: string) => string; + agentCfg?: NonNullable["defaults"]; + modelState: { + resolveDefaultThinkingLevel: () => Promise; + allowedModelKeys: Set; + allowedModelCatalog: Awaited< + ReturnType + >; + resetModelOverride: boolean; + }; +}): Promise<{ directiveAck?: ReplyPayload; provider: string; model: string }> { + const { + directives, + commandAuthorized, + ctx, + cfg, + agentId, + isGroup, + sessionEntry, + sessionStore, + sessionKey, + storePath, + elevatedEnabled, + elevatedAllowed, + elevatedFailures, + messageProviderKey, + defaultProvider, + defaultModel, + aliasIndex, + allowedModelKeys, + allowedModelCatalog, + resetModelOverride, + formatModelSwitchEvent, + modelState, + } = params; + + let { provider, model } = params; + if ( + !commandAuthorized || + isDirectiveOnly({ + directives, + cleanedBody: directives.cleaned, + ctx, + cfg, + agentId, + isGroup, + }) + ) { + return { directiveAck: undefined, provider, model }; + } + + const agentCfg = params.agentCfg; + const resolvedDefaultThinkLevel = + (sessionEntry?.thinkingLevel as ThinkLevel | undefined) ?? + (agentCfg?.thinkingDefault as ThinkLevel | undefined) ?? + (await modelState.resolveDefaultThinkingLevel()); + const currentThinkLevel = resolvedDefaultThinkLevel; + const currentVerboseLevel = + (sessionEntry?.verboseLevel as VerboseLevel | undefined) ?? + (agentCfg?.verboseDefault as VerboseLevel | undefined); + const currentReasoningLevel = + (sessionEntry?.reasoningLevel as ReasoningLevel | undefined) ?? "off"; + const currentElevatedLevel = + (sessionEntry?.elevatedLevel as ElevatedLevel | undefined) ?? + (agentCfg?.elevatedDefault as ElevatedLevel | undefined); + + const directiveAck = await handleDirectiveOnly({ + cfg, + directives, + sessionEntry, + sessionStore, + sessionKey, + storePath, + elevatedEnabled, + elevatedAllowed, + elevatedFailures, + messageProviderKey, + defaultProvider, + defaultModel, + aliasIndex, + allowedModelKeys, + allowedModelCatalog, + resetModelOverride, + provider, + model, + initialModelLabel: params.initialModelLabel, + formatModelSwitchEvent, + currentThinkLevel, + currentVerboseLevel, + currentReasoningLevel, + currentElevatedLevel, + }); + + if (sessionEntry?.providerOverride) { + provider = sessionEntry.providerOverride; + } + if (sessionEntry?.modelOverride) { + model = sessionEntry.modelOverride; + } + + return { directiveAck, provider, model }; +} + export async function handleDirectiveOnly(params: { cfg: ClawdbotConfig; directives: InlineDirectives; @@ -1186,24 +1321,24 @@ export async function handleDirectiveOnly(params: { if (directives.hasVerboseDirective && directives.verboseLevel) { parts.push( directives.verboseLevel === "off" - ? `${SYSTEM_MARK} Verbose logging disabled.` - : `${SYSTEM_MARK} Verbose logging enabled.`, + ? formatDirectiveAck("Verbose logging disabled.") + : formatDirectiveAck("Verbose logging enabled."), ); } if (directives.hasReasoningDirective && directives.reasoningLevel) { parts.push( directives.reasoningLevel === "off" - ? `${SYSTEM_MARK} Reasoning visibility disabled.` + ? formatDirectiveAck("Reasoning visibility disabled.") : directives.reasoningLevel === "stream" - ? `${SYSTEM_MARK} Reasoning stream enabled (Telegram only).` - : `${SYSTEM_MARK} Reasoning visibility enabled.`, + ? formatDirectiveAck("Reasoning stream enabled (Telegram only).") + : formatDirectiveAck("Reasoning visibility enabled."), ); } if (directives.hasElevatedDirective && directives.elevatedLevel) { parts.push( directives.elevatedLevel === "off" - ? `${SYSTEM_MARK} Elevated mode disabled.` - : `${SYSTEM_MARK} Elevated mode enabled.`, + ? formatDirectiveAck("Elevated mode disabled.") + : formatDirectiveAck("Elevated mode enabled."), ); if (shouldHintDirectRuntime) parts.push(formatElevatedRuntimeHint()); } @@ -1222,23 +1357,23 @@ export async function handleDirectiveOnly(params: { } } if (directives.hasQueueDirective && directives.queueMode) { - parts.push(`${SYSTEM_MARK} Queue mode set to ${directives.queueMode}.`); + parts.push(formatDirectiveAck(`Queue mode set to ${directives.queueMode}.`)); } else if (directives.hasQueueDirective && directives.queueReset) { - parts.push(`${SYSTEM_MARK} Queue mode reset to default.`); + parts.push(formatDirectiveAck("Queue mode reset to default.")); } if ( directives.hasQueueDirective && typeof directives.debounceMs === "number" ) { parts.push( - `${SYSTEM_MARK} Queue debounce set to ${directives.debounceMs}ms.`, + formatDirectiveAck(`Queue debounce set to ${directives.debounceMs}ms.`), ); } if (directives.hasQueueDirective && typeof directives.cap === "number") { - parts.push(`${SYSTEM_MARK} Queue cap set to ${directives.cap}.`); + parts.push(formatDirectiveAck(`Queue cap set to ${directives.cap}.`)); } if (directives.hasQueueDirective && directives.dropPolicy) { - parts.push(`${SYSTEM_MARK} Queue drop set to ${directives.dropPolicy}.`); + parts.push(formatDirectiveAck(`Queue drop set to ${directives.dropPolicy}.`)); } const ack = parts.join(" ").trim(); if (!ack && directives.hasStatusDirective) return undefined; diff --git a/src/auto-reply/reply/queue.collect-routing.test.ts b/src/auto-reply/reply/queue.collect-routing.test.ts index 84ad1bf19..58a957c62 100644 --- a/src/auto-reply/reply/queue.collect-routing.test.ts +++ b/src/auto-reply/reply/queue.collect-routing.test.ts @@ -117,7 +117,7 @@ describe("followup queue deduplication", () => { ); expect(first).toBe(true); - // Second enqueue with same prompt should be allowed (we only dedupe with message id) + // Second enqueue with same prompt should be allowed (default dedupe: message id only) const second = enqueueFollowupRun( key, createRun({ @@ -173,6 +173,40 @@ describe("followup queue deduplication", () => { ); expect(second).toBe(true); }); + + it("can opt-in to prompt-based dedupe when message id is absent", async () => { + const key = `test-dedup-prompt-mode-${Date.now()}`; + const settings: QueueSettings = { + mode: "collect", + debounceMs: 0, + cap: 50, + dropPolicy: "summarize", + }; + + const first = enqueueFollowupRun( + key, + createRun({ + prompt: "Hello world", + originatingChannel: "whatsapp", + originatingTo: "+1234567890", + }), + settings, + "prompt", + ); + expect(first).toBe(true); + + const second = enqueueFollowupRun( + key, + createRun({ + prompt: "Hello world", + originatingChannel: "whatsapp", + originatingTo: "+1234567890", + }), + settings, + "prompt", + ); + expect(second).toBe(false); + }); }); describe("followup queue collect routing", () => { diff --git a/src/auto-reply/reply/queue.ts b/src/auto-reply/reply/queue.ts index 49f3b0801..53225b8e3 100644 --- a/src/auto-reply/reply/queue.ts +++ b/src/auto-reply/reply/queue.ts @@ -25,6 +25,7 @@ export type QueueSettings = { cap?: number; dropPolicy?: QueueDropPolicy; }; +export type QueueDedupeMode = "message-id" | "prompt" | "none"; export type FollowupRun = { prompt: string; /** Provider message ID, when available (for deduplication). */ @@ -345,6 +346,7 @@ function getFollowupQueue( function isRunAlreadyQueued( run: FollowupRun, queue: FollowupQueueState, + allowPromptFallback = false, ): boolean { const hasSameRouting = (item: FollowupRun) => item.originatingChannel === run.originatingChannel && @@ -358,21 +360,28 @@ function isRunAlreadyQueued( (item) => item.messageId?.trim() === messageId && hasSameRouting(item), ); } - // Without a provider message id, avoid prompt-based dedupe to ensure rapid - // directive messages are not dropped. - return false; + if (!allowPromptFallback) return false; + return queue.items.some( + (item) => item.prompt === run.prompt && hasSameRouting(item), + ); } export function enqueueFollowupRun( key: string, run: FollowupRun, settings: QueueSettings, + dedupeMode: QueueDedupeMode = "message-id", ): boolean { const queue = getFollowupQueue(key, settings); // Deduplicate: skip if the same message is already queued. - if (isRunAlreadyQueued(run, queue)) { - return false; + if (dedupeMode !== "none") { + if (dedupeMode === "message-id" && isRunAlreadyQueued(run, queue)) { + return false; + } + if (dedupeMode === "prompt" && isRunAlreadyQueued(run, queue, true)) { + return false; + } } queue.lastEnqueuedAt = Date.now();