feat: serialize command auto-replies with queue

This commit is contained in:
Peter Steinberger
2025-11-25 04:40:49 +01:00
parent 6dd0d04206
commit 13be898c07
5 changed files with 215 additions and 18 deletions

22
docs/queue.md Normal file
View File

@@ -0,0 +1,22 @@
# Command Queue (2025-11-25)
We now serialize all command-based auto-replies (Twilio webhook + poller + WhatsApp Web listener) through a tiny in-process queue to prevent multiple commands from running at once.
## Why
- Some auto-reply commands are expensive (LLM calls) and can collide when multiple inbound messages arrive close together.
- Serializing avoids competing for terminal/stdin, keeps logs readable, and reduces the chance of rate limits from upstream tools.
## How it works
- `src/process/command-queue.ts` holds a single FIFO queue and drains it synchronously; only one task runs at a time.
- `getReplyFromConfig` wraps command execution with `enqueueCommand(...)`, so every config-driven command reply flows through the queue automatically.
- When verbose logging is enabled, queued commands emit a short notice if they waited more than ~2s before starting.
- Typing indicators (`onReplyStart`) still fire immediately on enqueue so user experience is unchanged while we wait our turn.
## Scope and guarantees
- Applies only to config-driven command replies; plain text replies are unaffected.
- Queue is process-wide, so webhook handlers, Twilio polling, and the web inbox listener all respect the same lock.
- No external dependencies or background worker threads; pure TypeScript + promises.
## Troubleshooting
- If commands seem stuck, enable verbose logs and look for “queued for …ms” lines to confirm the queue is draining.
- `enqueueCommand` exposes a lightweight `getQueueSize()` helper if you need to surface queue depth in future diagnostics.

View File

@@ -18,9 +18,12 @@ import {
} from "../config/sessions.js";
import { loadConfig, type WarelayConfig } from "../config/config.js";
import { info, isVerbose, logVerbose } from "../globals.js";
import { enqueueCommand } from "../process/command-queue.js";
import { runCommandWithTimeout } from "../process/exec.js";
import { sendTypingIndicator } from "../twilio/typing.js";
import type { TwilioRequester } from "../twilio/types.js";
import { defaultRuntime, type RuntimeEnv } from "../runtime.js";
import { logError } from "../logger.js";
type GetReplyOptions = {
onReplyStart?: () => Promise<void> | void;
@@ -46,17 +49,20 @@ function summarizeClaudeMetadata(payload: unknown): string | undefined {
const usage = obj.usage;
if (usage && typeof usage === "object") {
const serverToolUse = (
usage as { server_tool_use?: Record<string, unknown> }
).server_tool_use;
if (serverToolUse && typeof serverToolUse === "object") {
const toolCalls = Object.values(serverToolUse).reduce((sum, val) => {
if (typeof val === "number") return sum + val;
return sum;
}, 0);
if (toolCalls > 0) parts.push(`tool_calls=${toolCalls}`);
const serverToolUse = (
usage as { server_tool_use?: Record<string, unknown> }
).server_tool_use;
if (serverToolUse && typeof serverToolUse === "object") {
const toolCalls = Object.values(serverToolUse).reduce<number>(
(sum, val) => {
if (typeof val === "number") return sum + val;
return sum;
},
0,
);
if (toolCalls > 0) parts.push(`tool_calls=${toolCalls}`);
}
}
}
const modelUsage = obj.modelUsage;
if (modelUsage && typeof modelUsage === "object") {
@@ -248,9 +254,17 @@ export async function getReplyFromConfig(
logVerbose(`Running command auto-reply: ${finalArgv.join(" ")}`);
const started = Date.now();
try {
const { stdout, stderr, code, signal, killed } = await commandRunner(
finalArgv,
timeoutMs,
const { stdout, stderr, code, signal, killed } = await enqueueCommand(
() => commandRunner(finalArgv, timeoutMs),
{
onWait: (waitMs, queuedAhead) => {
if (isVerbose()) {
logVerbose(
`Command auto-reply queued for ${waitMs}ms (${queuedAhead} ahead)`,
);
}
},
},
);
const rawStdout = stdout.trim();
let trimmed = rawStdout;
@@ -309,7 +323,7 @@ export async function getReplyFromConfig(
`Command auto-reply timed out after ${elapsed}ms (limit ${timeoutMs}ms)`,
);
} else {
logError("Command auto-reply failed after ms: " . String(err), runtime);
logError(`Command auto-reply failed after ${elapsed}ms: ${String(err)}`);
}
return undefined;
}
@@ -318,7 +332,7 @@ export async function getReplyFromConfig(
return undefined;
}
type TwilioLikeClient = {
type TwilioLikeClient = TwilioRequester & {
messages: {
create: (opts: {
from?: string;
@@ -345,7 +359,7 @@ export async function autoReplyIfConfigured(
const replyText = await getReplyFromConfig(
ctx,
{
onReplyStart: () => sendTypingIndicator(client, message.sid, runtime),
onReplyStart: () => sendTypingIndicator(client, runtime, message.sid),
},
configOverride,
);

View File

@@ -237,6 +237,51 @@ describe("config and templating", () => {
expect(result).toBe("Sure! What's up?");
});
it("serializes command auto-replies via the queue", async () => {
let active = 0;
let maxActive = 0;
const runSpy = vi.fn(async () => {
active += 1;
maxActive = Math.max(maxActive, active);
await new Promise((resolve) => setTimeout(resolve, 25));
active -= 1;
return {
stdout: "ok",
stderr: "",
code: 0,
signal: null,
killed: false,
};
});
const cfg = {
inbound: {
reply: {
mode: "command" as const,
command: ["echo", "{{Body}}"],
},
},
};
await Promise.all([
index.getReplyFromConfig(
{ Body: "first", From: "+1", To: "+2" },
undefined,
cfg,
runSpy,
),
index.getReplyFromConfig(
{ Body: "second", From: "+3", To: "+4" },
undefined,
cfg,
runSpy,
),
]);
expect(runSpy).toHaveBeenCalledTimes(2);
expect(maxActive).toBe(1);
});
});
describe("twilio interactions", () => {
@@ -267,10 +312,10 @@ describe("twilio interactions", () => {
it("sendTypingIndicator skips missing messageSid and sends when present", async () => {
const client = twilioFactory._createClient();
await index.sendTypingIndicator(client, undefined);
await index.sendTypingIndicator(client, index.defaultRuntime, undefined);
expect(client.request).not.toHaveBeenCalled();
await index.sendTypingIndicator(client, "SM123");
await index.sendTypingIndicator(client, index.defaultRuntime, "SM123");
expect(client.request).toHaveBeenCalledWith(
expect.objectContaining({ method: "post" }),
);

View File

@@ -0,0 +1,58 @@
import { describe, expect, it } from "vitest";
import { enqueueCommand, getQueueSize } from "./command-queue.js";
describe("command queue", () => {
it("runs tasks one at a time in order", async () => {
let active = 0;
let maxActive = 0;
const calls: number[] = [];
const makeTask = (id: number) => async () => {
active += 1;
maxActive = Math.max(maxActive, active);
calls.push(id);
await new Promise((resolve) => setTimeout(resolve, 15));
active -= 1;
return id;
};
const results = await Promise.all([
enqueueCommand(makeTask(1)),
enqueueCommand(makeTask(2)),
enqueueCommand(makeTask(3)),
]);
expect(results).toEqual([1, 2, 3]);
expect(calls).toEqual([1, 2, 3]);
expect(maxActive).toBe(1);
expect(getQueueSize()).toBe(0);
});
it("invokes onWait callback when a task waits past the threshold", async () => {
let waited: number | null = null;
let queuedAhead: number | null = null;
// First task holds the queue long enough to trigger wait notice.
const first = enqueueCommand(async () => {
await new Promise((resolve) => setTimeout(resolve, 30));
});
const second = enqueueCommand(
async () => {},
{
warnAfterMs: 5,
onWait: (ms, ahead) => {
waited = ms;
queuedAhead = ahead;
},
},
);
await Promise.all([first, second]);
expect(waited).not.toBeNull();
expect((waited as number)).toBeGreaterThanOrEqual(5);
expect(queuedAhead).toBe(0);
});
});

View File

@@ -0,0 +1,58 @@
// Minimal in-process queue to serialize command executions.
// Ensures only one command runs at a time across webhook, poller, and web inbox flows.
type QueueEntry = {
task: () => Promise<unknown>;
resolve: (value: unknown) => void;
reject: (reason?: unknown) => void;
enqueuedAt: number;
warnAfterMs: number;
onWait?: (waitMs: number, queuedAhead: number) => void;
};
const queue: QueueEntry[] = [];
let draining = false;
async function drainQueue() {
if (draining) return;
draining = true;
while (queue.length) {
const entry = queue.shift() as QueueEntry;
const waitedMs = Date.now() - entry.enqueuedAt;
if (waitedMs >= entry.warnAfterMs) {
entry.onWait?.(waitedMs, queue.length);
}
try {
const result = await entry.task();
entry.resolve(result);
} catch (err) {
entry.reject(err);
}
}
draining = false;
}
export function enqueueCommand<T>(
task: () => Promise<T>,
opts?: {
warnAfterMs?: number;
onWait?: (waitMs: number, queuedAhead: number) => void;
},
): Promise<T> {
const warnAfterMs = opts?.warnAfterMs ?? 2_000;
return new Promise<T>((resolve, reject) => {
queue.push({
task: () => task(),
resolve: (value) => resolve(value as T),
reject,
enqueuedAt: Date.now(),
warnAfterMs,
onWait: opts?.onWait,
});
void drainQueue();
});
}
export function getQueueSize() {
return queue.length + (draining ? 1 : 0);
}