refactor(queue): remove drop mode

This commit is contained in:
Peter Steinberger
2025-12-26 14:29:28 +01:00
parent 3e4fc7ff7f
commit a678c3f53e
5 changed files with 6 additions and 19 deletions

View File

@@ -93,7 +93,7 @@ Controls how inbound messages behave when an agent run is already active.
{ {
routing: { routing: {
queue: { queue: {
mode: "interrupt", // global default: queue | interrupt | drop mode: "interrupt", // global default: queue | interrupt
bySurface: { bySurface: {
whatsapp: "interrupt", whatsapp: "interrupt",
telegram: "interrupt", telegram: "interrupt",

View File

@@ -22,7 +22,6 @@ We now serialize command-based auto-replies (WhatsApp Web listener) through a ti
Inbound messages can either queue or interrupt when a run is already active: Inbound messages can either queue or interrupt when a run is already active:
- `queue`: serialize per session; if the agent is streaming, the new message is appended to the current run. - `queue`: serialize per session; if the agent is streaming, the new message is appended to the current run.
- `interrupt`: abort the active run for that session, then run the newest message. - `interrupt`: abort the active run for that session, then run the newest message.
- `drop`: ignore the message if the session lane is busy.
Defaults (when unset in config): Defaults (when unset in config):
- WhatsApp + Telegram → `interrupt` - WhatsApp + Telegram → `interrupt`

View File

@@ -70,7 +70,7 @@ const SYSTEM_MARK = "⚙️";
const BARE_SESSION_RESET_PROMPT = const BARE_SESSION_RESET_PROMPT =
"A new session was started via /new or /reset. Say hi briefly (1-2 sentences) and ask what the user wants to do next. Do not mention internal steps, files, tools, or reasoning."; "A new session was started via /new or /reset. Say hi briefly (1-2 sentences) and ask what the user wants to do next. Do not mention internal steps, files, tools, or reasoning.";
type QueueMode = "queue" | "interrupt" | "drop"; type QueueMode = "queue" | "interrupt";
export function extractThinkDirective(body?: string): { export function extractThinkDirective(body?: string): {
cleaned: string; cleaned: string;
@@ -123,7 +123,6 @@ function normalizeQueueMode(raw?: string): QueueMode | undefined {
if (cleaned === "queue" || cleaned === "queued") return "queue"; if (cleaned === "queue" || cleaned === "queued") return "queue";
if (cleaned === "interrupt" || cleaned === "interrupts" || cleaned === "abort") if (cleaned === "interrupt" || cleaned === "interrupts" || cleaned === "abort")
return "interrupt"; return "interrupt";
if (cleaned === "drop" || cleaned === "discard") return "drop";
return undefined; return undefined;
} }
@@ -589,7 +588,7 @@ export async function getReplyFromConfig(
if (hasQueueDirective && !inlineQueueMode && !inlineQueueReset) { if (hasQueueDirective && !inlineQueueMode && !inlineQueueReset) {
cleanupTyping(); cleanupTyping();
return { return {
text: `Unrecognized queue mode "${rawQueueMode ?? ""}". Valid modes: queue, interrupt, drop.`, text: `Unrecognized queue mode "${rawQueueMode ?? ""}". Valid modes: queue, interrupt.`,
}; };
} }
@@ -1107,13 +1106,6 @@ export async function getReplyFromConfig(
sessionKey ?? sessionIdFinal, sessionKey ?? sessionIdFinal,
); );
const laneSize = getQueueSize(sessionLaneKey); const laneSize = getQueueSize(sessionLaneKey);
if (resolvedQueueMode === "drop" && laneSize > 0) {
logVerbose(
`Dropping inbound message for ${sessionLaneKey} (queue busy, mode=drop)`,
);
cleanupTyping();
return undefined;
}
if (resolvedQueueMode === "interrupt" && laneSize > 0) { if (resolvedQueueMode === "interrupt" && laneSize > 0) {
const cleared = clearCommandLane(sessionLaneKey); const cleared = clearCommandLane(sessionLaneKey);
const aborted = abortEmbeddedPiRun(sessionIdFinal); const aborted = abortEmbeddedPiRun(sessionIdFinal);

View File

@@ -147,7 +147,7 @@ export type DiscordConfig = {
mediaMaxMb?: number; mediaMaxMb?: number;
}; };
export type QueueMode = "queue" | "interrupt" | "drop"; export type QueueMode = "queue" | "interrupt";
export type QueueModeBySurface = { export type QueueModeBySurface = {
whatsapp?: QueueMode; whatsapp?: QueueMode;
@@ -454,11 +454,7 @@ const GroupChatSchema = z
}) })
.optional(); .optional();
const QueueModeSchema = z.union([ const QueueModeSchema = z.union([z.literal("queue"), z.literal("interrupt")]);
z.literal("queue"),
z.literal("interrupt"),
z.literal("drop"),
]);
const QueueModeBySurfaceSchema = z const QueueModeBySurfaceSchema = z
.object({ .object({

View File

@@ -21,7 +21,7 @@ export type SessionEntry = {
modelOverride?: string; modelOverride?: string;
groupActivation?: "mention" | "always"; groupActivation?: "mention" | "always";
groupActivationNeedsSystemIntro?: boolean; groupActivationNeedsSystemIntro?: boolean;
queueMode?: "queue" | "interrupt" | "drop"; queueMode?: "queue" | "interrupt";
inputTokens?: number; inputTokens?: number;
outputTokens?: number; outputTokens?: number;
totalTokens?: number; totalTokens?: number;