From ac36eba8220bf88ca7e191b081c70ea7e7fc6d35 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Sat, 3 Jan 2026 04:26:36 +0100 Subject: [PATCH] feat: expand queue modes and followup backlog --- docs/agent.md | 11 +- docs/configuration.md | 15 +- docs/queue.md | 32 +- src/auto-reply/reply.directive.test.ts | 52 ++ src/auto-reply/reply.ts | 804 +++++++++++++++++++++++-- src/config/config.ts | 31 +- src/config/sessions.ts | 12 +- 7 files changed, 884 insertions(+), 73 deletions(-) diff --git a/docs/agent.md b/docs/agent.md index 1c4525fb9..2dac0b3f3 100644 --- a/docs/agent.md +++ b/docs/agent.md @@ -70,7 +70,16 @@ Legacy Pi/Tau session folders are **not** read. ## Steering while streaming -Incoming user messages are queued while the agent is streaming. The queue is checked **after each tool call**. If a queued message is present, remaining tool calls from the current assistant message are skipped (error tool results with "Skipped due to queued user message."), then the queued user message is injected before the next assistant response. +When queue mode is `steer`, inbound messages are injected into the current run. +The queue is checked **after each tool call**; if a queued message is present, +remaining tool calls from the current assistant message are skipped (error tool +results with "Skipped due to queued user message."), then the queued user +message is injected before the next assistant response. + +When queue mode is `followup` or `collect`, inbound messages are held until the +current turn ends, then a new agent turn starts with the queued payloads. See +`docs/queue.md` for mode + debounce/cap behavior. + Block streaming sends completed assistant blocks as soon as they finish; disable via `agent.blockStreamingDefault: "off"` if you only want the final response. Tune the boundary via `agent.blockStreamingBreak` (`text_end` vs `message_end`). diff --git a/docs/configuration.md b/docs/configuration.md index 4b618621d..40e932c82 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -156,13 +156,16 @@ Controls how inbound messages behave when an agent run is already active. { routing: { queue: { - mode: "interrupt", // global default: queue | interrupt + mode: "collect", // steer | followup | collect | steer-backlog (steer+backlog ok) | interrupt (queue=steer legacy) + debounceMs: 1000, + cap: 20, + drop: "summarize", // old | new | summarize bySurface: { - whatsapp: "interrupt", - telegram: "interrupt", - discord: "queue", - imessage: "interrupt", - webchat: "queue" + whatsapp: "collect", + telegram: "collect", + discord: "steer-backlog", + imessage: "collect", + webchat: "collect" } } } diff --git a/docs/queue.md b/docs/queue.md index 4f8d1083d..fd11d8eb6 100644 --- a/docs/queue.md +++ b/docs/queue.md @@ -3,7 +3,7 @@ summary: "Command queue design that serializes auto-reply command execution" read_when: - Changing auto-reply execution or concurrency --- -# Command Queue (2025-11-25) +# Command Queue (2026-01-03) We now serialize command-based auto-replies (WhatsApp Web listener) through a tiny in-process queue to prevent multiple commands from running at once, while allowing safe parallelism across sessions. @@ -19,13 +19,16 @@ We now serialize command-based auto-replies (WhatsApp Web listener) through a ti - Typing indicators (`onReplyStart`) still fire immediately on enqueue so user experience is unchanged while we wait our turn. ## Queue modes (per surface) -Inbound messages can either queue or interrupt when a run is already active: -- `queue`: serialize per session; if the agent is streaming, the new message is appended to the current run. -- `interrupt`: abort the active run for that session, then run the newest message. +Inbound messages can steer the current run, wait for a followup turn, or do both: +- `steer`: inject immediately into the current run (cancels pending tool calls after the next tool boundary). If not streaming, falls back to followup. +- `followup`: enqueue for the next agent turn after the current run ends. +- `collect`: coalesce all queued messages into a **single** followup turn (default). +- `steer-backlog` (aka `steer+backlog`): steer now **and** preserve the message for a followup turn. +- `interrupt` (legacy): abort the active run for that session, then run the newest message. +- `queue` (legacy alias): same as `steer`. Defaults (when unset in config): -- WhatsApp + Telegram → `interrupt` -- Discord + WebChat → `queue` +- All surfaces → `collect` Configure globally or per surface via `routing.queue`: @@ -33,16 +36,29 @@ Configure globally or per surface via `routing.queue`: { routing: { queue: { - mode: "interrupt", - bySurface: { discord: "queue", telegram: "interrupt" } + mode: "collect", + debounceMs: 1000, + cap: 20, + drop: "summarize", + bySurface: { discord: "steer-backlog" } } } } ``` +## Queue options +Options apply to `followup`, `collect`, and `steer-backlog` (and to `steer` when it falls back to followup): +- `debounceMs`: wait for quiet before starting a followup turn (prevents “continue, continue”). +- `cap`: max queued messages per session. +- `drop`: overflow policy (`old`, `new`, `summarize`). + +Summarize keeps a short bullet list of dropped messages and injects it as a synthetic followup prompt. +Defaults: `debounceMs: 1000`, `cap: 20`, `drop: summarize`. + ## Per-session overrides - `/queue ` as a standalone command stores the mode for the current session. - `/queue ` embedded in a message applies **once** (no persistence). +- Options can be combined: `/queue collect debounce:2s cap:25 drop:summarize` - `/queue default` or `/queue reset` clears the session override. ## Scope and guarantees diff --git a/src/auto-reply/reply.directive.test.ts b/src/auto-reply/reply.directive.test.ts index aac22e6b2..7dbc8fcc9 100644 --- a/src/auto-reply/reply.directive.test.ts +++ b/src/auto-reply/reply.directive.test.ts @@ -97,6 +97,18 @@ describe("directive parsing", () => { expect(res.cleaned).toBe("please now"); }); + it("parses queue options and modes", () => { + const res = extractQueueDirective( + "please /queue steer+backlog debounce:2s cap:5 drop:summarize now", + ); + expect(res.hasDirective).toBe(true); + expect(res.queueMode).toBe("steer-backlog"); + expect(res.debounceMs).toBe(2000); + expect(res.cap).toBe(5); + expect(res.dropPolicy).toBe("summarize"); + expect(res.cleaned).toBe("please now"); + }); + it("extracts reply_to_current tag", () => { const res = extractReplyToTag("ok [[reply_to_current]]", "msg-1"); expect(res.replyToId).toBe("msg-1"); @@ -276,6 +288,43 @@ describe("directive parsing", () => { }); }); + it("persists queue options when directive is standalone", async () => { + await withTempHome(async (home) => { + vi.mocked(runEmbeddedPiAgent).mockReset(); + const storePath = path.join(home, "sessions.json"); + + const res = await getReplyFromConfig( + { + Body: "/queue collect debounce:2s cap:5 drop:old", + From: "+1222", + To: "+1222", + }, + {}, + { + agent: { + model: "anthropic/claude-opus-4-5", + workspace: path.join(home, "clawd"), + }, + whatsapp: { allowFrom: ["*"] }, + session: { store: storePath }, + }, + ); + + const text = Array.isArray(res) ? res[0]?.text : res?.text; + expect(text).toMatch(/^⚙️ Queue mode set to collect\./); + expect(text).toMatch(/Queue debounce set to 2000ms/); + expect(text).toMatch(/Queue cap set to 5/); + expect(text).toMatch(/Queue drop set to old/); + const store = loadSessionStore(storePath); + const entry = Object.values(store)[0]; + expect(entry?.queueMode).toBe("collect"); + expect(entry?.queueDebounceMs).toBe(2000); + expect(entry?.queueCap).toBe(5); + expect(entry?.queueDrop).toBe("old"); + expect(runEmbeddedPiAgent).not.toHaveBeenCalled(); + }); + }); + it("resets queue mode to default", async () => { await withTempHome(async (home) => { vi.mocked(runEmbeddedPiAgent).mockReset(); @@ -312,6 +361,9 @@ describe("directive parsing", () => { const store = loadSessionStore(storePath); const entry = Object.values(store)[0]; expect(entry?.queueMode).toBeUndefined(); + expect(entry?.queueDebounceMs).toBeUndefined(); + expect(entry?.queueCap).toBeUndefined(); + expect(entry?.queueDrop).toBeUndefined(); expect(runEmbeddedPiAgent).not.toHaveBeenCalled(); }); }); diff --git a/src/auto-reply/reply.ts b/src/auto-reply/reply.ts index a74267a96..00adadb51 100644 --- a/src/auto-reply/reply.ts +++ b/src/auto-reply/reply.ts @@ -16,11 +16,16 @@ import { } from "../agents/model-selection.js"; import { abortEmbeddedPiRun, + isEmbeddedPiRunActive, + isEmbeddedPiRunStreaming, queueEmbeddedPiMessage, resolveEmbeddedSessionLane, runEmbeddedPiAgent, } from "../agents/pi-embedded.js"; -import { buildWorkspaceSkillSnapshot } from "../agents/skills.js"; +import { + buildWorkspaceSkillSnapshot, + type SkillSnapshot, +} from "../agents/skills.js"; import { DEFAULT_AGENT_WORKSPACE_DIR, ensureAgentWorkspace, @@ -49,6 +54,7 @@ import { import { clearCommandLane, getQueueSize } from "../process/command-queue.js"; import { defaultRuntime } from "../runtime.js"; import { normalizeE164 } from "../utils.js"; +import { parseDurationMs } from "../cli/parse-duration.js"; import { resolveHeartbeatSeconds } from "../web/reconnect.js"; import { getWebAuthAgeMs, webAuthExists } from "../web/session.js"; import { @@ -75,11 +81,68 @@ const ABORT_TRIGGERS = new Set(["stop", "esc", "abort", "wait", "exit"]); const ABORT_MEMORY = new Map(); const SYSTEM_MARK = "⚙️"; +type QueueMode = + | "steer" + | "followup" + | "collect" + | "steer-backlog" + | "interrupt" + | "queue"; + +type QueueDropPolicy = "old" | "new" | "summarize"; + +type QueueSettings = { + mode: QueueMode; + debounceMs?: number; + cap?: number; + dropPolicy?: QueueDropPolicy; +}; + +type FollowupRun = { + prompt: string; + summaryLine?: string; + enqueuedAt: number; + run: { + sessionId: string; + sessionKey?: string; + sessionFile: string; + workspaceDir: string; + config: ClawdisConfig; + skillsSnapshot?: SkillSnapshot; + provider: string; + model: string; + thinkLevel?: ThinkLevel; + verboseLevel?: VerboseLevel; + timeoutMs: number; + blockReplyBreak: "text_end" | "message_end"; + ownerNumbers?: string[]; + extraSystemPrompt?: string; + enforceFinalTag?: boolean; + }; +}; + +type FollowupQueueState = { + items: FollowupRun[]; + draining: boolean; + lastEnqueuedAt: number; + mode: QueueMode; + debounceMs: number; + cap: number; + dropPolicy: QueueDropPolicy; + droppedCount: number; + summaryLines: string[]; + lastRun?: FollowupRun["run"]; +}; + +const DEFAULT_QUEUE_DEBOUNCE_MS = 1000; +const DEFAULT_QUEUE_CAP = 20; +const DEFAULT_QUEUE_DROP: QueueDropPolicy = "summarize"; + +const FOLLOWUP_QUEUES = new Map(); + const BARE_SESSION_RESET_PROMPT = "A new session was started via /new or /reset. Say hi briefly (1-2 sentences) and ask what the user wants to do next. Do not mention internal steps, files, tools, or reasoning."; -type QueueMode = "queue" | "interrupt"; - export function extractThinkDirective(body?: string): { cleaned: string; thinkLevel?: ThinkLevel; @@ -128,39 +191,200 @@ export function extractVerboseDirective(body?: string): { function normalizeQueueMode(raw?: string): QueueMode | undefined { if (!raw) return undefined; const cleaned = raw.trim().toLowerCase(); - if (cleaned === "queue" || cleaned === "queued") return "queue"; - if ( - cleaned === "interrupt" || - cleaned === "interrupts" || - cleaned === "abort" - ) + if (cleaned === "queue" || cleaned === "queued") return "steer"; + if (cleaned === "interrupt" || cleaned === "interrupts" || cleaned === "abort") return "interrupt"; + if (cleaned === "steer" || cleaned === "steering") return "steer"; + if (cleaned === "followup" || cleaned === "follow-ups" || cleaned === "followups") + return "followup"; + if (cleaned === "collect" || cleaned === "coalesce") return "collect"; + if ( + cleaned === "steer+backlog" || + cleaned === "steer-backlog" || + cleaned === "steer_backlog" + ) + return "steer-backlog"; return undefined; } +function normalizeQueueDropPolicy(raw?: string): QueueDropPolicy | undefined { + if (!raw) return undefined; + const cleaned = raw.trim().toLowerCase(); + if (cleaned === "old" || cleaned === "oldest") return "old"; + if (cleaned === "new" || cleaned === "newest") return "new"; + if (cleaned === "summarize" || cleaned === "summary") return "summarize"; + return undefined; +} + +function parseQueueDebounce(raw?: string): number | undefined { + if (!raw) return undefined; + const parsed = parseDurationMs(raw.trim(), { defaultUnit: "ms" }); + if (!parsed || parsed < 0) return undefined; + return Math.round(parsed); +} + +function parseQueueCap(raw?: string): number | undefined { + if (!raw) return undefined; + const num = Number(raw); + if (!Number.isFinite(num)) return undefined; + const cap = Math.floor(num); + if (cap < 1) return undefined; + return cap; +} + +function parseQueueDirectiveArgs(raw: string): { + consumed: number; + queueMode?: QueueMode; + queueReset: boolean; + rawMode?: string; + debounceMs?: number; + cap?: number; + dropPolicy?: QueueDropPolicy; + rawDebounce?: string; + rawCap?: string; + rawDrop?: string; + hasOptions: boolean; +} { + let i = 0; + const len = raw.length; + while (i < len && /\s/.test(raw[i])) i += 1; + if (raw[i] === ":") { + i += 1; + while (i < len && /\s/.test(raw[i])) i += 1; + } + + let consumed = i; + let queueMode: QueueMode | undefined; + let queueReset = false; + let rawMode: string | undefined; + let debounceMs: number | undefined; + let cap: number | undefined; + let dropPolicy: QueueDropPolicy | undefined; + let rawDebounce: string | undefined; + let rawCap: string | undefined; + let rawDrop: string | undefined; + let hasOptions = false; + + const takeToken = (): string | null => { + if (i >= len) return null; + const start = i; + while (i < len && !/\s/.test(raw[i])) i += 1; + if (start === i) return null; + const token = raw.slice(start, i); + while (i < len && /\s/.test(raw[i])) i += 1; + return token; + }; + + while (i < len) { + const token = takeToken(); + if (!token) break; + const lowered = token.trim().toLowerCase(); + if (lowered === "default" || lowered === "reset" || lowered === "clear") { + queueReset = true; + consumed = i; + break; + } + + if (lowered.startsWith("debounce:") || lowered.startsWith("debounce=")) { + rawDebounce = token.split(/[:=]/)[1] ?? ""; + debounceMs = parseQueueDebounce(rawDebounce); + hasOptions = true; + consumed = i; + continue; + } + if (lowered.startsWith("cap:") || lowered.startsWith("cap=")) { + rawCap = token.split(/[:=]/)[1] ?? ""; + cap = parseQueueCap(rawCap); + hasOptions = true; + consumed = i; + continue; + } + if (lowered.startsWith("drop:") || lowered.startsWith("drop=")) { + rawDrop = token.split(/[:=]/)[1] ?? ""; + dropPolicy = normalizeQueueDropPolicy(rawDrop); + hasOptions = true; + consumed = i; + continue; + } + + const mode = normalizeQueueMode(token); + if (mode) { + queueMode = mode; + rawMode = token; + consumed = i; + continue; + } + + // Stop at first unrecognized token. + break; + } + + return { + consumed, + queueMode, + queueReset, + rawMode, + debounceMs, + cap, + dropPolicy, + rawDebounce, + rawCap, + rawDrop, + hasOptions, + }; +} + export function extractQueueDirective(body?: string): { cleaned: string; queueMode?: QueueMode; queueReset: boolean; rawMode?: string; hasDirective: boolean; + debounceMs?: number; + cap?: number; + dropPolicy?: QueueDropPolicy; + rawDebounce?: string; + rawCap?: string; + rawDrop?: string; + hasOptions: boolean; } { - if (!body) return { cleaned: "", hasDirective: false, queueReset: false }; - const match = body.match(/(?:^|\s)\/queue(?=$|\s|:)\s*:?\s*([a-zA-Z-]+)\b/i); - const rawMode = match?.[1]; - const lowered = rawMode?.trim().toLowerCase(); - const queueReset = - lowered === "default" || lowered === "reset" || lowered === "clear"; - const queueMode = queueReset ? undefined : normalizeQueueMode(rawMode); - const cleaned = match - ? body.replace(match[0], "").replace(/\s+/g, " ").trim() - : body.trim(); + if (!body) + return { + cleaned: "", + hasDirective: false, + queueReset: false, + hasOptions: false, + }; + const re = /(?:^|\s)\/queue(?=$|\s|:)/i; + const match = re.exec(body); + if (!match) { + return { + cleaned: body.trim(), + hasDirective: false, + queueReset: false, + hasOptions: false, + }; + } + const start = match.index + match[0].indexOf("/queue"); + const argsStart = start + "/queue".length; + const args = body.slice(argsStart); + const parsed = parseQueueDirectiveArgs(args); + const cleanedRaw = + body.slice(0, start) + body.slice(argsStart + parsed.consumed); + const cleaned = cleanedRaw.replace(/\s+/g, " ").trim(); return { cleaned, - queueMode, - queueReset, - rawMode, - hasDirective: !!match, + queueMode: parsed.queueMode, + queueReset: parsed.queueReset, + rawMode: parsed.rawMode, + debounceMs: parsed.debounceMs, + cap: parsed.cap, + dropPolicy: parsed.dropPolicy, + rawDebounce: parsed.rawDebounce, + rawCap: parsed.rawCap, + rawDrop: parsed.rawDrop, + hasDirective: true, + hasOptions: parsed.hasOptions, }; } @@ -200,6 +424,184 @@ export function extractReplyToTag( return { cleaned, replyToId, hasTag }; } +function elideText(text: string, limit = 140): string { + if (text.length <= limit) return text; + return `${text.slice(0, Math.max(0, limit - 1)).trimEnd()}…`; +} + +function buildQueueSummaryLine(run: FollowupRun): string { + const base = run.summaryLine?.trim() || run.prompt.trim(); + const cleaned = base.replace(/\s+/g, " ").trim(); + return elideText(cleaned, 160); +} + +function getFollowupQueue( + key: string, + settings: QueueSettings, +): FollowupQueueState { + const existing = FOLLOWUP_QUEUES.get(key); + if (existing) { + existing.mode = settings.mode; + existing.debounceMs = + typeof settings.debounceMs === "number" + ? Math.max(0, settings.debounceMs) + : existing.debounceMs; + existing.cap = + typeof settings.cap === "number" && settings.cap > 0 + ? Math.floor(settings.cap) + : existing.cap; + existing.dropPolicy = settings.dropPolicy ?? existing.dropPolicy; + return existing; + } + const created: FollowupQueueState = { + items: [], + draining: false, + lastEnqueuedAt: 0, + mode: settings.mode, + debounceMs: + typeof settings.debounceMs === "number" + ? Math.max(0, settings.debounceMs) + : DEFAULT_QUEUE_DEBOUNCE_MS, + cap: + typeof settings.cap === "number" && settings.cap > 0 + ? Math.floor(settings.cap) + : DEFAULT_QUEUE_CAP, + dropPolicy: settings.dropPolicy ?? DEFAULT_QUEUE_DROP, + droppedCount: 0, + summaryLines: [], + }; + FOLLOWUP_QUEUES.set(key, created); + return created; +} + +function enqueueFollowupRun( + key: string, + run: FollowupRun, + settings: QueueSettings, +): boolean { + const queue = getFollowupQueue(key, settings); + queue.lastEnqueuedAt = Date.now(); + queue.lastRun = run.run; + + const cap = queue.cap; + if (cap > 0 && queue.items.length >= cap) { + if (queue.dropPolicy === "new") { + return false; + } + + const dropCount = queue.items.length - cap + 1; + const dropped = queue.items.splice(0, dropCount); + if (queue.dropPolicy === "summarize") { + for (const item of dropped) { + queue.droppedCount += 1; + queue.summaryLines.push(buildQueueSummaryLine(item)); + } + while (queue.summaryLines.length > cap) queue.summaryLines.shift(); + } + } + + queue.items.push(run); + return true; +} + +async function waitForQueueDebounce(queue: FollowupQueueState): Promise { + const debounceMs = Math.max(0, queue.debounceMs); + if (debounceMs <= 0) return; + while (true) { + const since = Date.now() - queue.lastEnqueuedAt; + if (since >= debounceMs) return; + await new Promise((resolve) => setTimeout(resolve, debounceMs - since)); + } +} + +function buildSummaryPrompt(queue: FollowupQueueState): string | undefined { + if (queue.dropPolicy !== "summarize" || queue.droppedCount <= 0) { + return undefined; + } + const lines = [ + `[Queue overflow] Dropped ${queue.droppedCount} message${queue.droppedCount === 1 ? "" : "s"} due to cap.`, + ]; + if (queue.summaryLines.length > 0) { + lines.push("Summary:"); + for (const line of queue.summaryLines) { + lines.push(`- ${line}`); + } + } + queue.droppedCount = 0; + queue.summaryLines = []; + return lines.join("\n"); +} + +function buildCollectPrompt( + items: FollowupRun[], + summary?: string, +): string { + const blocks: string[] = ["[Queued messages while agent was busy]"]; + if (summary) { + blocks.push(summary); + } + items.forEach((item, idx) => { + blocks.push(`---\nQueued #${idx + 1}\n${item.prompt}`.trim()); + }); + return blocks.join("\n\n"); +} + +function scheduleFollowupDrain( + key: string, + runFollowup: (run: FollowupRun) => Promise, +): void { + const queue = FOLLOWUP_QUEUES.get(key); + if (!queue || queue.draining) return; + queue.draining = true; + void (async () => { + try { + while (queue.items.length > 0 || queue.droppedCount > 0) { + await waitForQueueDebounce(queue); + if (queue.mode === "collect") { + const items = queue.items.splice(0, queue.items.length); + const summary = buildSummaryPrompt(queue); + const run = items.at(-1)?.run ?? queue.lastRun; + if (!run) break; + const prompt = buildCollectPrompt(items, summary); + await runFollowup({ + prompt, + run, + enqueuedAt: Date.now(), + }); + continue; + } + + const summaryPrompt = buildSummaryPrompt(queue); + if (summaryPrompt) { + const run = queue.lastRun; + if (!run) break; + await runFollowup({ + prompt: summaryPrompt, + run, + enqueuedAt: Date.now(), + }); + continue; + } + + const next = queue.items.shift(); + if (!next) break; + await runFollowup(next); + } + } catch (err) { + defaultRuntime.error?.( + `followup queue drain failed for ${key}: ${String(err)}`, + ); + } finally { + queue.draining = false; + if (queue.items.length === 0 && queue.droppedCount === 0) { + FOLLOWUP_QUEUES.delete(key); + } else { + scheduleFollowupDrain(key, runFollowup); + } + } + })(); +} + function isAbortTrigger(text?: string): boolean { if (!text) return false; const normalized = text.trim().toLowerCase(); @@ -251,32 +653,58 @@ function stripMentions( function defaultQueueModeForSurface(surface?: string): QueueMode { const normalized = surface?.trim().toLowerCase(); - if (normalized === "discord") return "queue"; - if (normalized === "webchat") return "queue"; - return "interrupt"; + if (normalized === "discord") return "collect"; + if (normalized === "webchat") return "collect"; + if (normalized === "whatsapp") return "collect"; + if (normalized === "telegram") return "collect"; + if (normalized === "imessage") return "collect"; + if (normalized === "signal") return "collect"; + return "collect"; } -function resolveQueueMode(params: { +function resolveQueueSettings(params: { cfg: ClawdisConfig; surface?: string; sessionEntry?: SessionEntry; inlineMode?: QueueMode; -}): QueueMode { + inlineOptions?: Partial; +}): QueueSettings { const surfaceKey = params.surface?.trim().toLowerCase(); const queueCfg = params.cfg.routing?.queue; - const surfaceMode = + const surfaceModeRaw = surfaceKey && queueCfg?.bySurface - ? (queueCfg.bySurface as Record)[ - surfaceKey - ] + ? (queueCfg.bySurface as Record)[surfaceKey] : undefined; - return ( + const resolvedMode = params.inlineMode ?? - params.sessionEntry?.queueMode ?? - surfaceMode ?? - queueCfg?.mode ?? - defaultQueueModeForSurface(surfaceKey) - ); + normalizeQueueMode(params.sessionEntry?.queueMode) ?? + normalizeQueueMode(surfaceModeRaw) ?? + normalizeQueueMode(queueCfg?.mode) ?? + defaultQueueModeForSurface(surfaceKey); + + const debounceRaw = + params.inlineOptions?.debounceMs ?? + params.sessionEntry?.queueDebounceMs ?? + queueCfg?.debounceMs ?? + DEFAULT_QUEUE_DEBOUNCE_MS; + const capRaw = + params.inlineOptions?.cap ?? + params.sessionEntry?.queueCap ?? + queueCfg?.cap ?? + DEFAULT_QUEUE_CAP; + const dropRaw = + params.inlineOptions?.dropPolicy ?? + params.sessionEntry?.queueDrop ?? + normalizeQueueDropPolicy(queueCfg?.drop) ?? + DEFAULT_QUEUE_DROP; + + return { + mode: resolvedMode, + debounceMs: + typeof debounceRaw === "number" ? Math.max(0, debounceRaw) : undefined, + cap: typeof capRaw === "number" ? Math.max(1, Math.floor(capRaw)) : undefined, + dropPolicy: dropRaw, + }; } export async function getReplyFromConfig( @@ -476,6 +904,9 @@ export async function getReplyFromConfig( modelOverride: persistedModelOverride ?? baseEntry?.modelOverride, providerOverride: persistedProviderOverride ?? baseEntry?.providerOverride, queueMode: baseEntry?.queueMode, + queueDebounceMs: baseEntry?.queueDebounceMs, + queueCap: baseEntry?.queueCap, + queueDrop: baseEntry?.queueDrop, displayName: baseEntry?.displayName, chatType: baseEntry?.chatType, surface: baseEntry?.surface, @@ -543,6 +974,12 @@ export async function getReplyFromConfig( queueMode: inlineQueueMode, queueReset: inlineQueueReset, rawMode: rawQueueMode, + debounceMs: inlineQueueDebounceMs, + cap: inlineQueueCap, + dropPolicy: inlineQueueDrop, + rawDebounce: rawQueueDebounce, + rawCap: rawQueueCap, + rawDrop: rawQueueDrop, hasDirective: hasQueueDirective, } = extractQueueDirective(modelCleaned); sessionCtx.Body = queueCleaned; @@ -757,11 +1194,50 @@ export async function getReplyFromConfig( text: `Unrecognized verbose level "${rawVerboseLevel ?? ""}". Valid levels: off, on.`, }; } - if (hasQueueDirective && !inlineQueueMode && !inlineQueueReset) { + const queueModeInvalid = + hasQueueDirective && + !inlineQueueMode && + !inlineQueueReset && + Boolean(rawQueueMode); + const queueDebounceInvalid = + hasQueueDirective && + rawQueueDebounce !== undefined && + typeof inlineQueueDebounceMs !== "number"; + const queueCapInvalid = + hasQueueDirective && + rawQueueCap !== undefined && + typeof inlineQueueCap !== "number"; + const queueDropInvalid = + hasQueueDirective && rawQueueDrop !== undefined && !inlineQueueDrop; + if ( + queueModeInvalid || + queueDebounceInvalid || + queueCapInvalid || + queueDropInvalid + ) { + const errors: string[] = []; + if (queueModeInvalid) { + errors.push( + `Unrecognized queue mode "${rawQueueMode ?? ""}". Valid modes: steer, followup, collect, steer+backlog, interrupt.`, + ); + } + if (queueDebounceInvalid) { + errors.push( + `Invalid debounce "${rawQueueDebounce ?? ""}". Use ms/s/m (e.g. debounce:1500ms, debounce:2s).`, + ); + } + if (queueCapInvalid) { + errors.push( + `Invalid cap "${rawQueueCap ?? ""}". Use a positive integer (e.g. cap:10).`, + ); + } + if (queueDropInvalid) { + errors.push( + `Invalid drop policy "${rawQueueDrop ?? ""}". Use drop:old, drop:new, or drop:summarize.`, + ); + } cleanupTyping(); - return { - text: `Unrecognized queue mode "${rawQueueMode ?? ""}". Valid modes: queue, interrupt.`, - }; + return { text: errors.join(" ") }; } let modelSelection: @@ -826,8 +1302,20 @@ export async function getReplyFromConfig( } if (hasQueueDirective && inlineQueueReset) { delete sessionEntry.queueMode; - } else if (hasQueueDirective && inlineQueueMode) { - sessionEntry.queueMode = inlineQueueMode; + delete sessionEntry.queueDebounceMs; + delete sessionEntry.queueCap; + delete sessionEntry.queueDrop; + } else if (hasQueueDirective) { + if (inlineQueueMode) sessionEntry.queueMode = inlineQueueMode; + if (typeof inlineQueueDebounceMs === "number") { + sessionEntry.queueDebounceMs = inlineQueueDebounceMs; + } + if (typeof inlineQueueCap === "number") { + sessionEntry.queueCap = inlineQueueCap; + } + if (inlineQueueDrop) { + sessionEntry.queueDrop = inlineQueueDrop; + } } sessionEntry.updatedAt = Date.now(); sessionStore[sessionKey] = sessionEntry; @@ -865,6 +1353,17 @@ export async function getReplyFromConfig( } else if (hasQueueDirective && inlineQueueReset) { parts.push(`${SYSTEM_MARK} Queue mode reset to default.`); } + if (hasQueueDirective && typeof inlineQueueDebounceMs === "number") { + parts.push( + `${SYSTEM_MARK} Queue debounce set to ${inlineQueueDebounceMs}ms.`, + ); + } + if (hasQueueDirective && typeof inlineQueueCap === "number") { + parts.push(`${SYSTEM_MARK} Queue cap set to ${inlineQueueCap}.`); + } + if (hasQueueDirective && inlineQueueDrop) { + parts.push(`${SYSTEM_MARK} Queue drop set to ${inlineQueueDrop}.`); + } const ack = parts.join(" ").trim(); cleanupTyping(); return { text: ack || "OK." }; @@ -927,6 +1426,9 @@ export async function getReplyFromConfig( } if (hasQueueDirective && inlineQueueReset) { delete sessionEntry.queueMode; + delete sessionEntry.queueDebounceMs; + delete sessionEntry.queueCap; + delete sessionEntry.queueDrop; updated = true; } if (updated) { @@ -937,6 +1439,14 @@ export async function getReplyFromConfig( } const perMessageQueueMode = hasQueueDirective && !inlineQueueReset ? inlineQueueMode : undefined; + const perMessageQueueOptions = + hasQueueDirective && !inlineQueueReset + ? { + debounceMs: inlineQueueDebounceMs, + cap: inlineQueueCap, + dropPolicy: inlineQueueDrop, + } + : undefined; const surface = (ctx.Surface ?? "").trim().toLowerCase(); const isWhatsAppSurface = @@ -1315,17 +1825,18 @@ export async function getReplyFromConfig( .trim() : queueBodyBase; - const resolvedQueueMode = resolveQueueMode({ + const resolvedQueue = resolveQueueSettings({ cfg, surface: sessionCtx.Surface, sessionEntry, inlineMode: perMessageQueueMode, + inlineOptions: perMessageQueueOptions, }); const sessionLaneKey = resolveEmbeddedSessionLane( sessionKey ?? sessionIdFinal, ); const laneSize = getQueueSize(sessionLaneKey); - if (resolvedQueueMode === "interrupt" && laneSize > 0) { + if (resolvedQueue.mode === "interrupt" && laneSize > 0) { const cleared = clearCommandLane(sessionLaneKey); const aborted = abortEmbeddedPiRun(sessionIdFinal); logVerbose( @@ -1333,10 +1844,54 @@ export async function getReplyFromConfig( ); } - if ( - resolvedQueueMode === "queue" && - queueEmbeddedPiMessage(sessionIdFinal, queuedBody) - ) { + const queueKey = sessionKey ?? sessionIdFinal; + const isActive = isEmbeddedPiRunActive(sessionIdFinal); + const isStreaming = isEmbeddedPiRunStreaming(sessionIdFinal); + const shouldSteer = + resolvedQueue.mode === "steer" || resolvedQueue.mode === "steer-backlog"; + const shouldFollowup = + resolvedQueue.mode === "followup" || + resolvedQueue.mode === "collect" || + resolvedQueue.mode === "steer-backlog"; + + const followupRun: FollowupRun = { + prompt: queuedBody, + summaryLine: baseBodyTrimmedRaw, + enqueuedAt: Date.now(), + run: { + sessionId: sessionIdFinal, + sessionKey, + sessionFile, + workspaceDir, + config: cfg, + skillsSnapshot, + provider, + model, + thinkLevel: resolvedThinkLevel, + verboseLevel: resolvedVerboseLevel, + timeoutMs, + blockReplyBreak: resolvedBlockStreamingBreak, + ownerNumbers: ownerList.length > 0 ? ownerList : undefined, + extraSystemPrompt: groupIntro || undefined, + enforceFinalTag: provider === "ollama" ? true : undefined, + }, + }; + + if (shouldSteer && isStreaming) { + const steered = queueEmbeddedPiMessage(sessionIdFinal, queuedBody); + if (steered && !shouldFollowup) { + if (sessionEntry && sessionStore && sessionKey) { + sessionEntry.updatedAt = Date.now(); + sessionStore[sessionKey] = sessionEntry; + await saveSessionStore(storePath, sessionStore); + } + cleanupTyping(); + return undefined; + } + } + + if (isActive && (shouldFollowup || resolvedQueue.mode === "steer")) { + enqueueFollowupRun(queueKey, followupRun, resolvedQueue); if (sessionEntry && sessionStore && sessionKey) { sessionEntry.updatedAt = Date.now(); sessionStore[sessionKey] = sessionEntry; @@ -1346,6 +1901,143 @@ export async function getReplyFromConfig( return undefined; } + const sendFollowupPayloads = async (payloads: ReplyPayload[]) => { + if (!opts?.onBlockReply) { + logVerbose("followup queue: no onBlockReply handler; dropping payloads"); + return; + } + for (const payload of payloads) { + if (!payload?.text && !payload?.mediaUrl && !payload?.mediaUrls?.length) { + continue; + } + if ( + payload.text?.trim() === SILENT_REPLY_TOKEN && + !payload.mediaUrl && + !payload.mediaUrls?.length + ) { + continue; + } + await startTypingOnText(payload.text); + await opts.onBlockReply(payload); + } + }; + + const runFollowupTurn = async (queued: FollowupRun) => { + const runId = crypto.randomUUID(); + if (queued.run.sessionKey) { + registerAgentRunContext(runId, { sessionKey: queued.run.sessionKey }); + } + let runResult: Awaited>; + try { + runResult = await runEmbeddedPiAgent({ + sessionId: queued.run.sessionId, + sessionKey: queued.run.sessionKey, + sessionFile: queued.run.sessionFile, + workspaceDir: queued.run.workspaceDir, + config: queued.run.config, + skillsSnapshot: queued.run.skillsSnapshot, + prompt: queued.prompt, + extraSystemPrompt: queued.run.extraSystemPrompt, + ownerNumbers: queued.run.ownerNumbers, + enforceFinalTag: queued.run.enforceFinalTag, + provider: queued.run.provider, + model: queued.run.model, + thinkLevel: queued.run.thinkLevel, + verboseLevel: queued.run.verboseLevel, + timeoutMs: queued.run.timeoutMs, + runId, + blockReplyBreak: queued.run.blockReplyBreak, + }); + } catch (err) { + const message = err instanceof Error ? err.message : String(err); + defaultRuntime.error?.( + `Followup agent failed before reply: ${message}`, + ); + return; + } + + const payloadArray = runResult.payloads ?? []; + if (payloadArray.length === 0) return; + const sanitizedPayloads = payloadArray.flatMap((payload) => { + const text = payload.text; + if (!text || !text.includes("HEARTBEAT_OK")) return [payload]; + const stripped = stripHeartbeatToken(text, { mode: "message" }); + const hasMedia = + Boolean(payload.mediaUrl) || (payload.mediaUrls?.length ?? 0) > 0; + if (stripped.shouldSkip && !hasMedia) return []; + return [{ ...payload, text: stripped.text }]; + }); + + const replyTaggedPayloads: ReplyPayload[] = sanitizedPayloads + .map((payload) => { + const { cleaned, replyToId } = extractReplyToTag(payload.text); + return { + ...payload, + text: cleaned ? cleaned : undefined, + replyToId: replyToId ?? payload.replyToId, + }; + }) + .filter( + (payload) => + payload.text || + payload.mediaUrl || + (payload.mediaUrls && payload.mediaUrls.length > 0), + ); + + if (replyTaggedPayloads.length === 0) return; + + if (sessionStore && sessionKey) { + const usage = runResult.meta.agentMeta?.usage; + const modelUsed = runResult.meta.agentMeta?.model ?? defaultModel; + const contextTokensUsed = + agentCfg?.contextTokens ?? + lookupContextTokens(modelUsed) ?? + sessionEntry?.contextTokens ?? + DEFAULT_CONTEXT_TOKENS; + + if (usage) { + const entry = sessionStore[sessionKey]; + if (entry) { + const input = usage.input ?? 0; + const output = usage.output ?? 0; + const promptTokens = + input + (usage.cacheRead ?? 0) + (usage.cacheWrite ?? 0); + sessionStore[sessionKey] = { + ...entry, + inputTokens: input, + outputTokens: output, + totalTokens: promptTokens > 0 ? promptTokens : usage.total ?? input, + model: modelUsed, + contextTokens: contextTokensUsed ?? entry.contextTokens, + updatedAt: Date.now(), + }; + if (storePath) { + await saveSessionStore(storePath, sessionStore); + } + } + } else if (modelUsed || contextTokensUsed) { + const entry = sessionStore[sessionKey]; + if (entry) { + sessionStore[sessionKey] = { + ...entry, + model: modelUsed ?? entry.model, + contextTokens: contextTokensUsed ?? entry.contextTokens, + }; + if (storePath) { + await saveSessionStore(storePath, sessionStore); + } + } + } + } + + await sendFollowupPayloads(replyTaggedPayloads); + }; + + const finalizeWithFollowup = (value: T): T => { + scheduleFollowupDrain(queueKey, runFollowupTurn); + return value; + }; + let didLogHeartbeatStrip = false; try { if (shouldEagerType) { @@ -1472,11 +2164,11 @@ export async function getReplyFromConfig( const isContextOverflow = /context.*overflow|too large|context window/i.test(message); defaultRuntime.error(`Embedded agent failed before reply: ${message}`); - return { + return finalizeWithFollowup({ text: isContextOverflow ? "⚠️ Context overflow - conversation too long. Starting fresh might help!" : "⚠️ Agent failed. Check gateway logs.", - }; + }); } if ( @@ -1493,7 +2185,7 @@ export async function getReplyFromConfig( } const payloadArray = runResult.payloads ?? []; - if (payloadArray.length === 0) return undefined; + if (payloadArray.length === 0) return finalizeWithFollowup(undefined); if (pendingBlockTasks.size > 0) { await Promise.allSettled(pendingBlockTasks); } @@ -1539,7 +2231,7 @@ export async function getReplyFromConfig( ) : replyTaggedPayloads; - if (filteredPayloads.length === 0) return undefined; + if (filteredPayloads.length === 0) return finalizeWithFollowup(undefined); const shouldSignalTyping = filteredPayloads.some((payload) => { const trimmed = payload.text?.trim(); @@ -1604,7 +2296,9 @@ export async function getReplyFromConfig( ]; } - return finalPayloads.length === 1 ? finalPayloads[0] : finalPayloads; + return finalizeWithFollowup( + finalPayloads.length === 1 ? finalPayloads[0] : finalPayloads, + ); } finally { cleanupTyping(); } diff --git a/src/config/config.ts b/src/config/config.ts index c7103ac94..dfa8c707a 100644 --- a/src/config/config.ts +++ b/src/config/config.ts @@ -309,7 +309,15 @@ export type IMessageConfig = { >; }; -export type QueueMode = "queue" | "interrupt"; +export type QueueMode = + | "steer" + | "followup" + | "collect" + | "steer-backlog" + | "steer+backlog" + | "queue" + | "interrupt"; +export type QueueDropPolicy = "old" | "new" | "summarize"; export type QueueModeBySurface = { whatsapp?: QueueMode; @@ -335,6 +343,9 @@ export type RoutingConfig = { queue?: { mode?: QueueMode; bySurface?: QueueModeBySurface; + debounceMs?: number; + cap?: number; + drop?: QueueDropPolicy; }; }; @@ -690,7 +701,20 @@ const GroupChatSchema = z }) .optional(); -const QueueModeSchema = z.union([z.literal("queue"), z.literal("interrupt")]); +const QueueModeSchema = z.union([ + z.literal("steer"), + z.literal("followup"), + z.literal("collect"), + z.literal("steer-backlog"), + z.literal("steer+backlog"), + z.literal("queue"), + z.literal("interrupt"), +]); +const QueueDropSchema = z.union([ + z.literal("old"), + z.literal("new"), + z.literal("summarize"), +]); const ReplyToModeSchema = z.union([ z.literal("off"), z.literal("first"), @@ -779,6 +803,9 @@ const RoutingSchema = z .object({ mode: QueueModeSchema.optional(), bySurface: QueueModeBySurfaceSchema, + debounceMs: z.number().int().nonnegative().optional(), + cap: z.number().int().positive().optional(), + drop: QueueDropSchema.optional(), }) .optional(), }) diff --git a/src/config/sessions.ts b/src/config/sessions.ts index 7457bb0d2..fdc742663 100644 --- a/src/config/sessions.ts +++ b/src/config/sessions.ts @@ -34,7 +34,17 @@ export type SessionEntry = { modelOverride?: string; groupActivation?: "mention" | "always"; groupActivationNeedsSystemIntro?: boolean; - queueMode?: "queue" | "interrupt"; + queueMode?: + | "steer" + | "followup" + | "collect" + | "steer-backlog" + | "steer+backlog" + | "queue" + | "interrupt"; + queueDebounceMs?: number; + queueCap?: number; + queueDrop?: "old" | "new" | "summarize"; inputTokens?: number; outputTokens?: number; totalTokens?: number;