feat: expand queue modes and followup backlog

This commit is contained in:
Peter Steinberger
2026-01-03 04:26:36 +01:00
parent 6160521f2f
commit ac36eba822
7 changed files with 884 additions and 73 deletions

View File

@@ -70,7 +70,16 @@ Legacy Pi/Tau session folders are **not** read.
## Steering while streaming
Incoming user messages are queued while the agent is streaming. The queue is checked **after each tool call**. If a queued message is present, remaining tool calls from the current assistant message are skipped (error tool results with "Skipped due to queued user message."), then the queued user message is injected before the next assistant response.
When queue mode is `steer`, inbound messages are injected into the current run.
The queue is checked **after each tool call**; if a queued message is present,
remaining tool calls from the current assistant message are skipped (error tool
results with "Skipped due to queued user message."), then the queued user
message is injected before the next assistant response.
When queue mode is `followup` or `collect`, inbound messages are held until the
current turn ends, then a new agent turn starts with the queued payloads. See
`docs/queue.md` for mode + debounce/cap behavior.
Block streaming sends completed assistant blocks as soon as they finish; disable
via `agent.blockStreamingDefault: "off"` if you only want the final response.
Tune the boundary via `agent.blockStreamingBreak` (`text_end` vs `message_end`).

View File

@@ -156,13 +156,16 @@ Controls how inbound messages behave when an agent run is already active.
{
routing: {
queue: {
mode: "interrupt", // global default: queue | interrupt
mode: "collect", // steer | followup | collect | steer-backlog (steer+backlog ok) | interrupt (queue=steer legacy)
debounceMs: 1000,
cap: 20,
drop: "summarize", // old | new | summarize
bySurface: {
whatsapp: "interrupt",
telegram: "interrupt",
discord: "queue",
imessage: "interrupt",
webchat: "queue"
whatsapp: "collect",
telegram: "collect",
discord: "steer-backlog",
imessage: "collect",
webchat: "collect"
}
}
}

View File

@@ -3,7 +3,7 @@ summary: "Command queue design that serializes auto-reply command execution"
read_when:
- Changing auto-reply execution or concurrency
---
# Command Queue (2025-11-25)
# Command Queue (2026-01-03)
We now serialize command-based auto-replies (WhatsApp Web listener) through a tiny in-process queue to prevent multiple commands from running at once, while allowing safe parallelism across sessions.
@@ -19,13 +19,16 @@ We now serialize command-based auto-replies (WhatsApp Web listener) through a ti
- Typing indicators (`onReplyStart`) still fire immediately on enqueue so user experience is unchanged while we wait our turn.
## Queue modes (per surface)
Inbound messages can either queue or interrupt when a run is already active:
- `queue`: serialize per session; if the agent is streaming, the new message is appended to the current run.
- `interrupt`: abort the active run for that session, then run the newest message.
Inbound messages can steer the current run, wait for a followup turn, or do both:
- `steer`: inject immediately into the current run (cancels pending tool calls after the next tool boundary). If not streaming, falls back to followup.
- `followup`: enqueue for the next agent turn after the current run ends.
- `collect`: coalesce all queued messages into a **single** followup turn (default).
- `steer-backlog` (aka `steer+backlog`): steer now **and** preserve the message for a followup turn.
- `interrupt` (legacy): abort the active run for that session, then run the newest message.
- `queue` (legacy alias): same as `steer`.
Defaults (when unset in config):
- WhatsApp + Telegram → `interrupt`
- Discord + WebChat → `queue`
- All surfaces → `collect`
Configure globally or per surface via `routing.queue`:
@@ -33,16 +36,29 @@ Configure globally or per surface via `routing.queue`:
{
routing: {
queue: {
mode: "interrupt",
bySurface: { discord: "queue", telegram: "interrupt" }
mode: "collect",
debounceMs: 1000,
cap: 20,
drop: "summarize",
bySurface: { discord: "steer-backlog" }
}
}
}
```
## Queue options
Options apply to `followup`, `collect`, and `steer-backlog` (and to `steer` when it falls back to followup):
- `debounceMs`: wait for quiet before starting a followup turn (prevents “continue, continue”).
- `cap`: max queued messages per session.
- `drop`: overflow policy (`old`, `new`, `summarize`).
Summarize keeps a short bullet list of dropped messages and injects it as a synthetic followup prompt.
Defaults: `debounceMs: 1000`, `cap: 20`, `drop: summarize`.
## Per-session overrides
- `/queue <mode>` as a standalone command stores the mode for the current session.
- `/queue <mode>` embedded in a message applies **once** (no persistence).
- Options can be combined: `/queue collect debounce:2s cap:25 drop:summarize`
- `/queue default` or `/queue reset` clears the session override.
## Scope and guarantees

View File

@@ -97,6 +97,18 @@ describe("directive parsing", () => {
expect(res.cleaned).toBe("please now");
});
it("parses queue options and modes", () => {
const res = extractQueueDirective(
"please /queue steer+backlog debounce:2s cap:5 drop:summarize now",
);
expect(res.hasDirective).toBe(true);
expect(res.queueMode).toBe("steer-backlog");
expect(res.debounceMs).toBe(2000);
expect(res.cap).toBe(5);
expect(res.dropPolicy).toBe("summarize");
expect(res.cleaned).toBe("please now");
});
it("extracts reply_to_current tag", () => {
const res = extractReplyToTag("ok [[reply_to_current]]", "msg-1");
expect(res.replyToId).toBe("msg-1");
@@ -276,6 +288,43 @@ describe("directive parsing", () => {
});
});
it("persists queue options when directive is standalone", async () => {
await withTempHome(async (home) => {
vi.mocked(runEmbeddedPiAgent).mockReset();
const storePath = path.join(home, "sessions.json");
const res = await getReplyFromConfig(
{
Body: "/queue collect debounce:2s cap:5 drop:old",
From: "+1222",
To: "+1222",
},
{},
{
agent: {
model: "anthropic/claude-opus-4-5",
workspace: path.join(home, "clawd"),
},
whatsapp: { allowFrom: ["*"] },
session: { store: storePath },
},
);
const text = Array.isArray(res) ? res[0]?.text : res?.text;
expect(text).toMatch(/^⚙️ Queue mode set to collect\./);
expect(text).toMatch(/Queue debounce set to 2000ms/);
expect(text).toMatch(/Queue cap set to 5/);
expect(text).toMatch(/Queue drop set to old/);
const store = loadSessionStore(storePath);
const entry = Object.values(store)[0];
expect(entry?.queueMode).toBe("collect");
expect(entry?.queueDebounceMs).toBe(2000);
expect(entry?.queueCap).toBe(5);
expect(entry?.queueDrop).toBe("old");
expect(runEmbeddedPiAgent).not.toHaveBeenCalled();
});
});
it("resets queue mode to default", async () => {
await withTempHome(async (home) => {
vi.mocked(runEmbeddedPiAgent).mockReset();
@@ -312,6 +361,9 @@ describe("directive parsing", () => {
const store = loadSessionStore(storePath);
const entry = Object.values(store)[0];
expect(entry?.queueMode).toBeUndefined();
expect(entry?.queueDebounceMs).toBeUndefined();
expect(entry?.queueCap).toBeUndefined();
expect(entry?.queueDrop).toBeUndefined();
expect(runEmbeddedPiAgent).not.toHaveBeenCalled();
});
});

View File

@@ -16,11 +16,16 @@ import {
} from "../agents/model-selection.js";
import {
abortEmbeddedPiRun,
isEmbeddedPiRunActive,
isEmbeddedPiRunStreaming,
queueEmbeddedPiMessage,
resolveEmbeddedSessionLane,
runEmbeddedPiAgent,
} from "../agents/pi-embedded.js";
import { buildWorkspaceSkillSnapshot } from "../agents/skills.js";
import {
buildWorkspaceSkillSnapshot,
type SkillSnapshot,
} from "../agents/skills.js";
import {
DEFAULT_AGENT_WORKSPACE_DIR,
ensureAgentWorkspace,
@@ -49,6 +54,7 @@ import {
import { clearCommandLane, getQueueSize } from "../process/command-queue.js";
import { defaultRuntime } from "../runtime.js";
import { normalizeE164 } from "../utils.js";
import { parseDurationMs } from "../cli/parse-duration.js";
import { resolveHeartbeatSeconds } from "../web/reconnect.js";
import { getWebAuthAgeMs, webAuthExists } from "../web/session.js";
import {
@@ -75,11 +81,68 @@ const ABORT_TRIGGERS = new Set(["stop", "esc", "abort", "wait", "exit"]);
const ABORT_MEMORY = new Map<string, boolean>();
const SYSTEM_MARK = "⚙️";
type QueueMode =
| "steer"
| "followup"
| "collect"
| "steer-backlog"
| "interrupt"
| "queue";
type QueueDropPolicy = "old" | "new" | "summarize";
type QueueSettings = {
mode: QueueMode;
debounceMs?: number;
cap?: number;
dropPolicy?: QueueDropPolicy;
};
type FollowupRun = {
prompt: string;
summaryLine?: string;
enqueuedAt: number;
run: {
sessionId: string;
sessionKey?: string;
sessionFile: string;
workspaceDir: string;
config: ClawdisConfig;
skillsSnapshot?: SkillSnapshot;
provider: string;
model: string;
thinkLevel?: ThinkLevel;
verboseLevel?: VerboseLevel;
timeoutMs: number;
blockReplyBreak: "text_end" | "message_end";
ownerNumbers?: string[];
extraSystemPrompt?: string;
enforceFinalTag?: boolean;
};
};
type FollowupQueueState = {
items: FollowupRun[];
draining: boolean;
lastEnqueuedAt: number;
mode: QueueMode;
debounceMs: number;
cap: number;
dropPolicy: QueueDropPolicy;
droppedCount: number;
summaryLines: string[];
lastRun?: FollowupRun["run"];
};
const DEFAULT_QUEUE_DEBOUNCE_MS = 1000;
const DEFAULT_QUEUE_CAP = 20;
const DEFAULT_QUEUE_DROP: QueueDropPolicy = "summarize";
const FOLLOWUP_QUEUES = new Map<string, FollowupQueueState>();
const BARE_SESSION_RESET_PROMPT =
"A new session was started via /new or /reset. Say hi briefly (1-2 sentences) and ask what the user wants to do next. Do not mention internal steps, files, tools, or reasoning.";
type QueueMode = "queue" | "interrupt";
export function extractThinkDirective(body?: string): {
cleaned: string;
thinkLevel?: ThinkLevel;
@@ -128,39 +191,200 @@ export function extractVerboseDirective(body?: string): {
function normalizeQueueMode(raw?: string): QueueMode | undefined {
if (!raw) return undefined;
const cleaned = raw.trim().toLowerCase();
if (cleaned === "queue" || cleaned === "queued") return "queue";
if (
cleaned === "interrupt" ||
cleaned === "interrupts" ||
cleaned === "abort"
)
if (cleaned === "queue" || cleaned === "queued") return "steer";
if (cleaned === "interrupt" || cleaned === "interrupts" || cleaned === "abort")
return "interrupt";
if (cleaned === "steer" || cleaned === "steering") return "steer";
if (cleaned === "followup" || cleaned === "follow-ups" || cleaned === "followups")
return "followup";
if (cleaned === "collect" || cleaned === "coalesce") return "collect";
if (
cleaned === "steer+backlog" ||
cleaned === "steer-backlog" ||
cleaned === "steer_backlog"
)
return "steer-backlog";
return undefined;
}
function normalizeQueueDropPolicy(raw?: string): QueueDropPolicy | undefined {
if (!raw) return undefined;
const cleaned = raw.trim().toLowerCase();
if (cleaned === "old" || cleaned === "oldest") return "old";
if (cleaned === "new" || cleaned === "newest") return "new";
if (cleaned === "summarize" || cleaned === "summary") return "summarize";
return undefined;
}
function parseQueueDebounce(raw?: string): number | undefined {
if (!raw) return undefined;
const parsed = parseDurationMs(raw.trim(), { defaultUnit: "ms" });
if (!parsed || parsed < 0) return undefined;
return Math.round(parsed);
}
function parseQueueCap(raw?: string): number | undefined {
if (!raw) return undefined;
const num = Number(raw);
if (!Number.isFinite(num)) return undefined;
const cap = Math.floor(num);
if (cap < 1) return undefined;
return cap;
}
function parseQueueDirectiveArgs(raw: string): {
consumed: number;
queueMode?: QueueMode;
queueReset: boolean;
rawMode?: string;
debounceMs?: number;
cap?: number;
dropPolicy?: QueueDropPolicy;
rawDebounce?: string;
rawCap?: string;
rawDrop?: string;
hasOptions: boolean;
} {
let i = 0;
const len = raw.length;
while (i < len && /\s/.test(raw[i])) i += 1;
if (raw[i] === ":") {
i += 1;
while (i < len && /\s/.test(raw[i])) i += 1;
}
let consumed = i;
let queueMode: QueueMode | undefined;
let queueReset = false;
let rawMode: string | undefined;
let debounceMs: number | undefined;
let cap: number | undefined;
let dropPolicy: QueueDropPolicy | undefined;
let rawDebounce: string | undefined;
let rawCap: string | undefined;
let rawDrop: string | undefined;
let hasOptions = false;
const takeToken = (): string | null => {
if (i >= len) return null;
const start = i;
while (i < len && !/\s/.test(raw[i])) i += 1;
if (start === i) return null;
const token = raw.slice(start, i);
while (i < len && /\s/.test(raw[i])) i += 1;
return token;
};
while (i < len) {
const token = takeToken();
if (!token) break;
const lowered = token.trim().toLowerCase();
if (lowered === "default" || lowered === "reset" || lowered === "clear") {
queueReset = true;
consumed = i;
break;
}
if (lowered.startsWith("debounce:") || lowered.startsWith("debounce=")) {
rawDebounce = token.split(/[:=]/)[1] ?? "";
debounceMs = parseQueueDebounce(rawDebounce);
hasOptions = true;
consumed = i;
continue;
}
if (lowered.startsWith("cap:") || lowered.startsWith("cap=")) {
rawCap = token.split(/[:=]/)[1] ?? "";
cap = parseQueueCap(rawCap);
hasOptions = true;
consumed = i;
continue;
}
if (lowered.startsWith("drop:") || lowered.startsWith("drop=")) {
rawDrop = token.split(/[:=]/)[1] ?? "";
dropPolicy = normalizeQueueDropPolicy(rawDrop);
hasOptions = true;
consumed = i;
continue;
}
const mode = normalizeQueueMode(token);
if (mode) {
queueMode = mode;
rawMode = token;
consumed = i;
continue;
}
// Stop at first unrecognized token.
break;
}
return {
consumed,
queueMode,
queueReset,
rawMode,
debounceMs,
cap,
dropPolicy,
rawDebounce,
rawCap,
rawDrop,
hasOptions,
};
}
export function extractQueueDirective(body?: string): {
cleaned: string;
queueMode?: QueueMode;
queueReset: boolean;
rawMode?: string;
hasDirective: boolean;
debounceMs?: number;
cap?: number;
dropPolicy?: QueueDropPolicy;
rawDebounce?: string;
rawCap?: string;
rawDrop?: string;
hasOptions: boolean;
} {
if (!body) return { cleaned: "", hasDirective: false, queueReset: false };
const match = body.match(/(?:^|\s)\/queue(?=$|\s|:)\s*:?\s*([a-zA-Z-]+)\b/i);
const rawMode = match?.[1];
const lowered = rawMode?.trim().toLowerCase();
const queueReset =
lowered === "default" || lowered === "reset" || lowered === "clear";
const queueMode = queueReset ? undefined : normalizeQueueMode(rawMode);
const cleaned = match
? body.replace(match[0], "").replace(/\s+/g, " ").trim()
: body.trim();
if (!body)
return {
cleaned: "",
hasDirective: false,
queueReset: false,
hasOptions: false,
};
const re = /(?:^|\s)\/queue(?=$|\s|:)/i;
const match = re.exec(body);
if (!match) {
return {
cleaned: body.trim(),
hasDirective: false,
queueReset: false,
hasOptions: false,
};
}
const start = match.index + match[0].indexOf("/queue");
const argsStart = start + "/queue".length;
const args = body.slice(argsStart);
const parsed = parseQueueDirectiveArgs(args);
const cleanedRaw =
body.slice(0, start) + body.slice(argsStart + parsed.consumed);
const cleaned = cleanedRaw.replace(/\s+/g, " ").trim();
return {
cleaned,
queueMode,
queueReset,
rawMode,
hasDirective: !!match,
queueMode: parsed.queueMode,
queueReset: parsed.queueReset,
rawMode: parsed.rawMode,
debounceMs: parsed.debounceMs,
cap: parsed.cap,
dropPolicy: parsed.dropPolicy,
rawDebounce: parsed.rawDebounce,
rawCap: parsed.rawCap,
rawDrop: parsed.rawDrop,
hasDirective: true,
hasOptions: parsed.hasOptions,
};
}
@@ -200,6 +424,184 @@ export function extractReplyToTag(
return { cleaned, replyToId, hasTag };
}
function elideText(text: string, limit = 140): string {
if (text.length <= limit) return text;
return `${text.slice(0, Math.max(0, limit - 1)).trimEnd()}`;
}
function buildQueueSummaryLine(run: FollowupRun): string {
const base = run.summaryLine?.trim() || run.prompt.trim();
const cleaned = base.replace(/\s+/g, " ").trim();
return elideText(cleaned, 160);
}
function getFollowupQueue(
key: string,
settings: QueueSettings,
): FollowupQueueState {
const existing = FOLLOWUP_QUEUES.get(key);
if (existing) {
existing.mode = settings.mode;
existing.debounceMs =
typeof settings.debounceMs === "number"
? Math.max(0, settings.debounceMs)
: existing.debounceMs;
existing.cap =
typeof settings.cap === "number" && settings.cap > 0
? Math.floor(settings.cap)
: existing.cap;
existing.dropPolicy = settings.dropPolicy ?? existing.dropPolicy;
return existing;
}
const created: FollowupQueueState = {
items: [],
draining: false,
lastEnqueuedAt: 0,
mode: settings.mode,
debounceMs:
typeof settings.debounceMs === "number"
? Math.max(0, settings.debounceMs)
: DEFAULT_QUEUE_DEBOUNCE_MS,
cap:
typeof settings.cap === "number" && settings.cap > 0
? Math.floor(settings.cap)
: DEFAULT_QUEUE_CAP,
dropPolicy: settings.dropPolicy ?? DEFAULT_QUEUE_DROP,
droppedCount: 0,
summaryLines: [],
};
FOLLOWUP_QUEUES.set(key, created);
return created;
}
function enqueueFollowupRun(
key: string,
run: FollowupRun,
settings: QueueSettings,
): boolean {
const queue = getFollowupQueue(key, settings);
queue.lastEnqueuedAt = Date.now();
queue.lastRun = run.run;
const cap = queue.cap;
if (cap > 0 && queue.items.length >= cap) {
if (queue.dropPolicy === "new") {
return false;
}
const dropCount = queue.items.length - cap + 1;
const dropped = queue.items.splice(0, dropCount);
if (queue.dropPolicy === "summarize") {
for (const item of dropped) {
queue.droppedCount += 1;
queue.summaryLines.push(buildQueueSummaryLine(item));
}
while (queue.summaryLines.length > cap) queue.summaryLines.shift();
}
}
queue.items.push(run);
return true;
}
async function waitForQueueDebounce(queue: FollowupQueueState): Promise<void> {
const debounceMs = Math.max(0, queue.debounceMs);
if (debounceMs <= 0) return;
while (true) {
const since = Date.now() - queue.lastEnqueuedAt;
if (since >= debounceMs) return;
await new Promise((resolve) => setTimeout(resolve, debounceMs - since));
}
}
function buildSummaryPrompt(queue: FollowupQueueState): string | undefined {
if (queue.dropPolicy !== "summarize" || queue.droppedCount <= 0) {
return undefined;
}
const lines = [
`[Queue overflow] Dropped ${queue.droppedCount} message${queue.droppedCount === 1 ? "" : "s"} due to cap.`,
];
if (queue.summaryLines.length > 0) {
lines.push("Summary:");
for (const line of queue.summaryLines) {
lines.push(`- ${line}`);
}
}
queue.droppedCount = 0;
queue.summaryLines = [];
return lines.join("\n");
}
function buildCollectPrompt(
items: FollowupRun[],
summary?: string,
): string {
const blocks: string[] = ["[Queued messages while agent was busy]"];
if (summary) {
blocks.push(summary);
}
items.forEach((item, idx) => {
blocks.push(`---\nQueued #${idx + 1}\n${item.prompt}`.trim());
});
return blocks.join("\n\n");
}
function scheduleFollowupDrain(
key: string,
runFollowup: (run: FollowupRun) => Promise<void>,
): void {
const queue = FOLLOWUP_QUEUES.get(key);
if (!queue || queue.draining) return;
queue.draining = true;
void (async () => {
try {
while (queue.items.length > 0 || queue.droppedCount > 0) {
await waitForQueueDebounce(queue);
if (queue.mode === "collect") {
const items = queue.items.splice(0, queue.items.length);
const summary = buildSummaryPrompt(queue);
const run = items.at(-1)?.run ?? queue.lastRun;
if (!run) break;
const prompt = buildCollectPrompt(items, summary);
await runFollowup({
prompt,
run,
enqueuedAt: Date.now(),
});
continue;
}
const summaryPrompt = buildSummaryPrompt(queue);
if (summaryPrompt) {
const run = queue.lastRun;
if (!run) break;
await runFollowup({
prompt: summaryPrompt,
run,
enqueuedAt: Date.now(),
});
continue;
}
const next = queue.items.shift();
if (!next) break;
await runFollowup(next);
}
} catch (err) {
defaultRuntime.error?.(
`followup queue drain failed for ${key}: ${String(err)}`,
);
} finally {
queue.draining = false;
if (queue.items.length === 0 && queue.droppedCount === 0) {
FOLLOWUP_QUEUES.delete(key);
} else {
scheduleFollowupDrain(key, runFollowup);
}
}
})();
}
function isAbortTrigger(text?: string): boolean {
if (!text) return false;
const normalized = text.trim().toLowerCase();
@@ -251,32 +653,58 @@ function stripMentions(
function defaultQueueModeForSurface(surface?: string): QueueMode {
const normalized = surface?.trim().toLowerCase();
if (normalized === "discord") return "queue";
if (normalized === "webchat") return "queue";
return "interrupt";
if (normalized === "discord") return "collect";
if (normalized === "webchat") return "collect";
if (normalized === "whatsapp") return "collect";
if (normalized === "telegram") return "collect";
if (normalized === "imessage") return "collect";
if (normalized === "signal") return "collect";
return "collect";
}
function resolveQueueMode(params: {
function resolveQueueSettings(params: {
cfg: ClawdisConfig;
surface?: string;
sessionEntry?: SessionEntry;
inlineMode?: QueueMode;
}): QueueMode {
inlineOptions?: Partial<QueueSettings>;
}): QueueSettings {
const surfaceKey = params.surface?.trim().toLowerCase();
const queueCfg = params.cfg.routing?.queue;
const surfaceMode =
const surfaceModeRaw =
surfaceKey && queueCfg?.bySurface
? (queueCfg.bySurface as Record<string, QueueMode | undefined>)[
surfaceKey
]
? (queueCfg.bySurface as Record<string, string | undefined>)[surfaceKey]
: undefined;
return (
const resolvedMode =
params.inlineMode ??
params.sessionEntry?.queueMode ??
surfaceMode ??
queueCfg?.mode ??
defaultQueueModeForSurface(surfaceKey)
);
normalizeQueueMode(params.sessionEntry?.queueMode) ??
normalizeQueueMode(surfaceModeRaw) ??
normalizeQueueMode(queueCfg?.mode) ??
defaultQueueModeForSurface(surfaceKey);
const debounceRaw =
params.inlineOptions?.debounceMs ??
params.sessionEntry?.queueDebounceMs ??
queueCfg?.debounceMs ??
DEFAULT_QUEUE_DEBOUNCE_MS;
const capRaw =
params.inlineOptions?.cap ??
params.sessionEntry?.queueCap ??
queueCfg?.cap ??
DEFAULT_QUEUE_CAP;
const dropRaw =
params.inlineOptions?.dropPolicy ??
params.sessionEntry?.queueDrop ??
normalizeQueueDropPolicy(queueCfg?.drop) ??
DEFAULT_QUEUE_DROP;
return {
mode: resolvedMode,
debounceMs:
typeof debounceRaw === "number" ? Math.max(0, debounceRaw) : undefined,
cap: typeof capRaw === "number" ? Math.max(1, Math.floor(capRaw)) : undefined,
dropPolicy: dropRaw,
};
}
export async function getReplyFromConfig(
@@ -476,6 +904,9 @@ export async function getReplyFromConfig(
modelOverride: persistedModelOverride ?? baseEntry?.modelOverride,
providerOverride: persistedProviderOverride ?? baseEntry?.providerOverride,
queueMode: baseEntry?.queueMode,
queueDebounceMs: baseEntry?.queueDebounceMs,
queueCap: baseEntry?.queueCap,
queueDrop: baseEntry?.queueDrop,
displayName: baseEntry?.displayName,
chatType: baseEntry?.chatType,
surface: baseEntry?.surface,
@@ -543,6 +974,12 @@ export async function getReplyFromConfig(
queueMode: inlineQueueMode,
queueReset: inlineQueueReset,
rawMode: rawQueueMode,
debounceMs: inlineQueueDebounceMs,
cap: inlineQueueCap,
dropPolicy: inlineQueueDrop,
rawDebounce: rawQueueDebounce,
rawCap: rawQueueCap,
rawDrop: rawQueueDrop,
hasDirective: hasQueueDirective,
} = extractQueueDirective(modelCleaned);
sessionCtx.Body = queueCleaned;
@@ -757,11 +1194,50 @@ export async function getReplyFromConfig(
text: `Unrecognized verbose level "${rawVerboseLevel ?? ""}". Valid levels: off, on.`,
};
}
if (hasQueueDirective && !inlineQueueMode && !inlineQueueReset) {
const queueModeInvalid =
hasQueueDirective &&
!inlineQueueMode &&
!inlineQueueReset &&
Boolean(rawQueueMode);
const queueDebounceInvalid =
hasQueueDirective &&
rawQueueDebounce !== undefined &&
typeof inlineQueueDebounceMs !== "number";
const queueCapInvalid =
hasQueueDirective &&
rawQueueCap !== undefined &&
typeof inlineQueueCap !== "number";
const queueDropInvalid =
hasQueueDirective && rawQueueDrop !== undefined && !inlineQueueDrop;
if (
queueModeInvalid ||
queueDebounceInvalid ||
queueCapInvalid ||
queueDropInvalid
) {
const errors: string[] = [];
if (queueModeInvalid) {
errors.push(
`Unrecognized queue mode "${rawQueueMode ?? ""}". Valid modes: steer, followup, collect, steer+backlog, interrupt.`,
);
}
if (queueDebounceInvalid) {
errors.push(
`Invalid debounce "${rawQueueDebounce ?? ""}". Use ms/s/m (e.g. debounce:1500ms, debounce:2s).`,
);
}
if (queueCapInvalid) {
errors.push(
`Invalid cap "${rawQueueCap ?? ""}". Use a positive integer (e.g. cap:10).`,
);
}
if (queueDropInvalid) {
errors.push(
`Invalid drop policy "${rawQueueDrop ?? ""}". Use drop:old, drop:new, or drop:summarize.`,
);
}
cleanupTyping();
return {
text: `Unrecognized queue mode "${rawQueueMode ?? ""}". Valid modes: queue, interrupt.`,
};
return { text: errors.join(" ") };
}
let modelSelection:
@@ -826,8 +1302,20 @@ export async function getReplyFromConfig(
}
if (hasQueueDirective && inlineQueueReset) {
delete sessionEntry.queueMode;
} else if (hasQueueDirective && inlineQueueMode) {
sessionEntry.queueMode = inlineQueueMode;
delete sessionEntry.queueDebounceMs;
delete sessionEntry.queueCap;
delete sessionEntry.queueDrop;
} else if (hasQueueDirective) {
if (inlineQueueMode) sessionEntry.queueMode = inlineQueueMode;
if (typeof inlineQueueDebounceMs === "number") {
sessionEntry.queueDebounceMs = inlineQueueDebounceMs;
}
if (typeof inlineQueueCap === "number") {
sessionEntry.queueCap = inlineQueueCap;
}
if (inlineQueueDrop) {
sessionEntry.queueDrop = inlineQueueDrop;
}
}
sessionEntry.updatedAt = Date.now();
sessionStore[sessionKey] = sessionEntry;
@@ -865,6 +1353,17 @@ export async function getReplyFromConfig(
} else if (hasQueueDirective && inlineQueueReset) {
parts.push(`${SYSTEM_MARK} Queue mode reset to default.`);
}
if (hasQueueDirective && typeof inlineQueueDebounceMs === "number") {
parts.push(
`${SYSTEM_MARK} Queue debounce set to ${inlineQueueDebounceMs}ms.`,
);
}
if (hasQueueDirective && typeof inlineQueueCap === "number") {
parts.push(`${SYSTEM_MARK} Queue cap set to ${inlineQueueCap}.`);
}
if (hasQueueDirective && inlineQueueDrop) {
parts.push(`${SYSTEM_MARK} Queue drop set to ${inlineQueueDrop}.`);
}
const ack = parts.join(" ").trim();
cleanupTyping();
return { text: ack || "OK." };
@@ -927,6 +1426,9 @@ export async function getReplyFromConfig(
}
if (hasQueueDirective && inlineQueueReset) {
delete sessionEntry.queueMode;
delete sessionEntry.queueDebounceMs;
delete sessionEntry.queueCap;
delete sessionEntry.queueDrop;
updated = true;
}
if (updated) {
@@ -937,6 +1439,14 @@ export async function getReplyFromConfig(
}
const perMessageQueueMode =
hasQueueDirective && !inlineQueueReset ? inlineQueueMode : undefined;
const perMessageQueueOptions =
hasQueueDirective && !inlineQueueReset
? {
debounceMs: inlineQueueDebounceMs,
cap: inlineQueueCap,
dropPolicy: inlineQueueDrop,
}
: undefined;
const surface = (ctx.Surface ?? "").trim().toLowerCase();
const isWhatsAppSurface =
@@ -1315,17 +1825,18 @@ export async function getReplyFromConfig(
.trim()
: queueBodyBase;
const resolvedQueueMode = resolveQueueMode({
const resolvedQueue = resolveQueueSettings({
cfg,
surface: sessionCtx.Surface,
sessionEntry,
inlineMode: perMessageQueueMode,
inlineOptions: perMessageQueueOptions,
});
const sessionLaneKey = resolveEmbeddedSessionLane(
sessionKey ?? sessionIdFinal,
);
const laneSize = getQueueSize(sessionLaneKey);
if (resolvedQueueMode === "interrupt" && laneSize > 0) {
if (resolvedQueue.mode === "interrupt" && laneSize > 0) {
const cleared = clearCommandLane(sessionLaneKey);
const aborted = abortEmbeddedPiRun(sessionIdFinal);
logVerbose(
@@ -1333,10 +1844,54 @@ export async function getReplyFromConfig(
);
}
if (
resolvedQueueMode === "queue" &&
queueEmbeddedPiMessage(sessionIdFinal, queuedBody)
) {
const queueKey = sessionKey ?? sessionIdFinal;
const isActive = isEmbeddedPiRunActive(sessionIdFinal);
const isStreaming = isEmbeddedPiRunStreaming(sessionIdFinal);
const shouldSteer =
resolvedQueue.mode === "steer" || resolvedQueue.mode === "steer-backlog";
const shouldFollowup =
resolvedQueue.mode === "followup" ||
resolvedQueue.mode === "collect" ||
resolvedQueue.mode === "steer-backlog";
const followupRun: FollowupRun = {
prompt: queuedBody,
summaryLine: baseBodyTrimmedRaw,
enqueuedAt: Date.now(),
run: {
sessionId: sessionIdFinal,
sessionKey,
sessionFile,
workspaceDir,
config: cfg,
skillsSnapshot,
provider,
model,
thinkLevel: resolvedThinkLevel,
verboseLevel: resolvedVerboseLevel,
timeoutMs,
blockReplyBreak: resolvedBlockStreamingBreak,
ownerNumbers: ownerList.length > 0 ? ownerList : undefined,
extraSystemPrompt: groupIntro || undefined,
enforceFinalTag: provider === "ollama" ? true : undefined,
},
};
if (shouldSteer && isStreaming) {
const steered = queueEmbeddedPiMessage(sessionIdFinal, queuedBody);
if (steered && !shouldFollowup) {
if (sessionEntry && sessionStore && sessionKey) {
sessionEntry.updatedAt = Date.now();
sessionStore[sessionKey] = sessionEntry;
await saveSessionStore(storePath, sessionStore);
}
cleanupTyping();
return undefined;
}
}
if (isActive && (shouldFollowup || resolvedQueue.mode === "steer")) {
enqueueFollowupRun(queueKey, followupRun, resolvedQueue);
if (sessionEntry && sessionStore && sessionKey) {
sessionEntry.updatedAt = Date.now();
sessionStore[sessionKey] = sessionEntry;
@@ -1346,6 +1901,143 @@ export async function getReplyFromConfig(
return undefined;
}
const sendFollowupPayloads = async (payloads: ReplyPayload[]) => {
if (!opts?.onBlockReply) {
logVerbose("followup queue: no onBlockReply handler; dropping payloads");
return;
}
for (const payload of payloads) {
if (!payload?.text && !payload?.mediaUrl && !payload?.mediaUrls?.length) {
continue;
}
if (
payload.text?.trim() === SILENT_REPLY_TOKEN &&
!payload.mediaUrl &&
!payload.mediaUrls?.length
) {
continue;
}
await startTypingOnText(payload.text);
await opts.onBlockReply(payload);
}
};
const runFollowupTurn = async (queued: FollowupRun) => {
const runId = crypto.randomUUID();
if (queued.run.sessionKey) {
registerAgentRunContext(runId, { sessionKey: queued.run.sessionKey });
}
let runResult: Awaited<ReturnType<typeof runEmbeddedPiAgent>>;
try {
runResult = await runEmbeddedPiAgent({
sessionId: queued.run.sessionId,
sessionKey: queued.run.sessionKey,
sessionFile: queued.run.sessionFile,
workspaceDir: queued.run.workspaceDir,
config: queued.run.config,
skillsSnapshot: queued.run.skillsSnapshot,
prompt: queued.prompt,
extraSystemPrompt: queued.run.extraSystemPrompt,
ownerNumbers: queued.run.ownerNumbers,
enforceFinalTag: queued.run.enforceFinalTag,
provider: queued.run.provider,
model: queued.run.model,
thinkLevel: queued.run.thinkLevel,
verboseLevel: queued.run.verboseLevel,
timeoutMs: queued.run.timeoutMs,
runId,
blockReplyBreak: queued.run.blockReplyBreak,
});
} catch (err) {
const message = err instanceof Error ? err.message : String(err);
defaultRuntime.error?.(
`Followup agent failed before reply: ${message}`,
);
return;
}
const payloadArray = runResult.payloads ?? [];
if (payloadArray.length === 0) return;
const sanitizedPayloads = payloadArray.flatMap((payload) => {
const text = payload.text;
if (!text || !text.includes("HEARTBEAT_OK")) return [payload];
const stripped = stripHeartbeatToken(text, { mode: "message" });
const hasMedia =
Boolean(payload.mediaUrl) || (payload.mediaUrls?.length ?? 0) > 0;
if (stripped.shouldSkip && !hasMedia) return [];
return [{ ...payload, text: stripped.text }];
});
const replyTaggedPayloads: ReplyPayload[] = sanitizedPayloads
.map((payload) => {
const { cleaned, replyToId } = extractReplyToTag(payload.text);
return {
...payload,
text: cleaned ? cleaned : undefined,
replyToId: replyToId ?? payload.replyToId,
};
})
.filter(
(payload) =>
payload.text ||
payload.mediaUrl ||
(payload.mediaUrls && payload.mediaUrls.length > 0),
);
if (replyTaggedPayloads.length === 0) return;
if (sessionStore && sessionKey) {
const usage = runResult.meta.agentMeta?.usage;
const modelUsed = runResult.meta.agentMeta?.model ?? defaultModel;
const contextTokensUsed =
agentCfg?.contextTokens ??
lookupContextTokens(modelUsed) ??
sessionEntry?.contextTokens ??
DEFAULT_CONTEXT_TOKENS;
if (usage) {
const entry = sessionStore[sessionKey];
if (entry) {
const input = usage.input ?? 0;
const output = usage.output ?? 0;
const promptTokens =
input + (usage.cacheRead ?? 0) + (usage.cacheWrite ?? 0);
sessionStore[sessionKey] = {
...entry,
inputTokens: input,
outputTokens: output,
totalTokens: promptTokens > 0 ? promptTokens : usage.total ?? input,
model: modelUsed,
contextTokens: contextTokensUsed ?? entry.contextTokens,
updatedAt: Date.now(),
};
if (storePath) {
await saveSessionStore(storePath, sessionStore);
}
}
} else if (modelUsed || contextTokensUsed) {
const entry = sessionStore[sessionKey];
if (entry) {
sessionStore[sessionKey] = {
...entry,
model: modelUsed ?? entry.model,
contextTokens: contextTokensUsed ?? entry.contextTokens,
};
if (storePath) {
await saveSessionStore(storePath, sessionStore);
}
}
}
}
await sendFollowupPayloads(replyTaggedPayloads);
};
const finalizeWithFollowup = <T>(value: T): T => {
scheduleFollowupDrain(queueKey, runFollowupTurn);
return value;
};
let didLogHeartbeatStrip = false;
try {
if (shouldEagerType) {
@@ -1472,11 +2164,11 @@ export async function getReplyFromConfig(
const isContextOverflow =
/context.*overflow|too large|context window/i.test(message);
defaultRuntime.error(`Embedded agent failed before reply: ${message}`);
return {
return finalizeWithFollowup({
text: isContextOverflow
? "⚠️ Context overflow - conversation too long. Starting fresh might help!"
: "⚠️ Agent failed. Check gateway logs.",
};
});
}
if (
@@ -1493,7 +2185,7 @@ export async function getReplyFromConfig(
}
const payloadArray = runResult.payloads ?? [];
if (payloadArray.length === 0) return undefined;
if (payloadArray.length === 0) return finalizeWithFollowup(undefined);
if (pendingBlockTasks.size > 0) {
await Promise.allSettled(pendingBlockTasks);
}
@@ -1539,7 +2231,7 @@ export async function getReplyFromConfig(
)
: replyTaggedPayloads;
if (filteredPayloads.length === 0) return undefined;
if (filteredPayloads.length === 0) return finalizeWithFollowup(undefined);
const shouldSignalTyping = filteredPayloads.some((payload) => {
const trimmed = payload.text?.trim();
@@ -1604,7 +2296,9 @@ export async function getReplyFromConfig(
];
}
return finalPayloads.length === 1 ? finalPayloads[0] : finalPayloads;
return finalizeWithFollowup(
finalPayloads.length === 1 ? finalPayloads[0] : finalPayloads,
);
} finally {
cleanupTyping();
}

View File

@@ -309,7 +309,15 @@ export type IMessageConfig = {
>;
};
export type QueueMode = "queue" | "interrupt";
export type QueueMode =
| "steer"
| "followup"
| "collect"
| "steer-backlog"
| "steer+backlog"
| "queue"
| "interrupt";
export type QueueDropPolicy = "old" | "new" | "summarize";
export type QueueModeBySurface = {
whatsapp?: QueueMode;
@@ -335,6 +343,9 @@ export type RoutingConfig = {
queue?: {
mode?: QueueMode;
bySurface?: QueueModeBySurface;
debounceMs?: number;
cap?: number;
drop?: QueueDropPolicy;
};
};
@@ -690,7 +701,20 @@ const GroupChatSchema = z
})
.optional();
const QueueModeSchema = z.union([z.literal("queue"), z.literal("interrupt")]);
const QueueModeSchema = z.union([
z.literal("steer"),
z.literal("followup"),
z.literal("collect"),
z.literal("steer-backlog"),
z.literal("steer+backlog"),
z.literal("queue"),
z.literal("interrupt"),
]);
const QueueDropSchema = z.union([
z.literal("old"),
z.literal("new"),
z.literal("summarize"),
]);
const ReplyToModeSchema = z.union([
z.literal("off"),
z.literal("first"),
@@ -779,6 +803,9 @@ const RoutingSchema = z
.object({
mode: QueueModeSchema.optional(),
bySurface: QueueModeBySurfaceSchema,
debounceMs: z.number().int().nonnegative().optional(),
cap: z.number().int().positive().optional(),
drop: QueueDropSchema.optional(),
})
.optional(),
})

View File

@@ -34,7 +34,17 @@ export type SessionEntry = {
modelOverride?: string;
groupActivation?: "mention" | "always";
groupActivationNeedsSystemIntro?: boolean;
queueMode?: "queue" | "interrupt";
queueMode?:
| "steer"
| "followup"
| "collect"
| "steer-backlog"
| "steer+backlog"
| "queue"
| "interrupt";
queueDebounceMs?: number;
queueCap?: number;
queueDrop?: "old" | "new" | "summarize";
inputTokens?: number;
outputTokens?: number;
totalTokens?: number;