From 13be898c07e3df69e2118753b434ff9cecf7ab9c Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Tue, 25 Nov 2025 04:40:49 +0100 Subject: [PATCH] feat: serialize command auto-replies with queue --- docs/queue.md | 22 ++++++++++++ src/auto-reply/reply.ts | 46 +++++++++++++++--------- src/index.core.test.ts | 49 ++++++++++++++++++++++++-- src/process/command-queue.test.ts | 58 +++++++++++++++++++++++++++++++ src/process/command-queue.ts | 58 +++++++++++++++++++++++++++++++ 5 files changed, 215 insertions(+), 18 deletions(-) create mode 100644 docs/queue.md create mode 100644 src/process/command-queue.test.ts create mode 100644 src/process/command-queue.ts diff --git a/docs/queue.md b/docs/queue.md new file mode 100644 index 000000000..811605728 --- /dev/null +++ b/docs/queue.md @@ -0,0 +1,22 @@ +# Command Queue (2025-11-25) + +We now serialize all command-based auto-replies (Twilio webhook + poller + WhatsApp Web listener) through a tiny in-process queue to prevent multiple commands from running at once. + +## Why +- Some auto-reply commands are expensive (LLM calls) and can collide when multiple inbound messages arrive close together. +- Serializing avoids competing for terminal/stdin, keeps logs readable, and reduces the chance of rate limits from upstream tools. + +## How it works +- `src/process/command-queue.ts` holds a single FIFO queue and drains it synchronously; only one task runs at a time. +- `getReplyFromConfig` wraps command execution with `enqueueCommand(...)`, so every config-driven command reply flows through the queue automatically. +- When verbose logging is enabled, queued commands emit a short notice if they waited more than ~2s before starting. +- Typing indicators (`onReplyStart`) still fire immediately on enqueue so user experience is unchanged while we wait our turn. + +## Scope and guarantees +- Applies only to config-driven command replies; plain text replies are unaffected. +- Queue is process-wide, so webhook handlers, Twilio polling, and the web inbox listener all respect the same lock. +- No external dependencies or background worker threads; pure TypeScript + promises. + +## Troubleshooting +- If commands seem stuck, enable verbose logs and look for “queued for …ms” lines to confirm the queue is draining. +- `enqueueCommand` exposes a lightweight `getQueueSize()` helper if you need to surface queue depth in future diagnostics. diff --git a/src/auto-reply/reply.ts b/src/auto-reply/reply.ts index 88893e8f7..6c099f92c 100644 --- a/src/auto-reply/reply.ts +++ b/src/auto-reply/reply.ts @@ -18,9 +18,12 @@ import { } from "../config/sessions.js"; import { loadConfig, type WarelayConfig } from "../config/config.js"; import { info, isVerbose, logVerbose } from "../globals.js"; +import { enqueueCommand } from "../process/command-queue.js"; import { runCommandWithTimeout } from "../process/exec.js"; import { sendTypingIndicator } from "../twilio/typing.js"; +import type { TwilioRequester } from "../twilio/types.js"; import { defaultRuntime, type RuntimeEnv } from "../runtime.js"; +import { logError } from "../logger.js"; type GetReplyOptions = { onReplyStart?: () => Promise | void; @@ -46,17 +49,20 @@ function summarizeClaudeMetadata(payload: unknown): string | undefined { const usage = obj.usage; if (usage && typeof usage === "object") { - const serverToolUse = ( - usage as { server_tool_use?: Record } - ).server_tool_use; - if (serverToolUse && typeof serverToolUse === "object") { - const toolCalls = Object.values(serverToolUse).reduce((sum, val) => { - if (typeof val === "number") return sum + val; - return sum; - }, 0); - if (toolCalls > 0) parts.push(`tool_calls=${toolCalls}`); + const serverToolUse = ( + usage as { server_tool_use?: Record } + ).server_tool_use; + if (serverToolUse && typeof serverToolUse === "object") { + const toolCalls = Object.values(serverToolUse).reduce( + (sum, val) => { + if (typeof val === "number") return sum + val; + return sum; + }, + 0, + ); + if (toolCalls > 0) parts.push(`tool_calls=${toolCalls}`); + } } - } const modelUsage = obj.modelUsage; if (modelUsage && typeof modelUsage === "object") { @@ -248,9 +254,17 @@ export async function getReplyFromConfig( logVerbose(`Running command auto-reply: ${finalArgv.join(" ")}`); const started = Date.now(); try { - const { stdout, stderr, code, signal, killed } = await commandRunner( - finalArgv, - timeoutMs, + const { stdout, stderr, code, signal, killed } = await enqueueCommand( + () => commandRunner(finalArgv, timeoutMs), + { + onWait: (waitMs, queuedAhead) => { + if (isVerbose()) { + logVerbose( + `Command auto-reply queued for ${waitMs}ms (${queuedAhead} ahead)`, + ); + } + }, + }, ); const rawStdout = stdout.trim(); let trimmed = rawStdout; @@ -309,7 +323,7 @@ export async function getReplyFromConfig( `Command auto-reply timed out after ${elapsed}ms (limit ${timeoutMs}ms)`, ); } else { - logError("Command auto-reply failed after ms: " . String(err), runtime); + logError(`Command auto-reply failed after ${elapsed}ms: ${String(err)}`); } return undefined; } @@ -318,7 +332,7 @@ export async function getReplyFromConfig( return undefined; } -type TwilioLikeClient = { +type TwilioLikeClient = TwilioRequester & { messages: { create: (opts: { from?: string; @@ -345,7 +359,7 @@ export async function autoReplyIfConfigured( const replyText = await getReplyFromConfig( ctx, { - onReplyStart: () => sendTypingIndicator(client, message.sid, runtime), + onReplyStart: () => sendTypingIndicator(client, runtime, message.sid), }, configOverride, ); diff --git a/src/index.core.test.ts b/src/index.core.test.ts index 5694dce64..dd024e23a 100644 --- a/src/index.core.test.ts +++ b/src/index.core.test.ts @@ -237,6 +237,51 @@ describe("config and templating", () => { expect(result).toBe("Sure! What's up?"); }); + + it("serializes command auto-replies via the queue", async () => { + let active = 0; + let maxActive = 0; + const runSpy = vi.fn(async () => { + active += 1; + maxActive = Math.max(maxActive, active); + await new Promise((resolve) => setTimeout(resolve, 25)); + active -= 1; + return { + stdout: "ok", + stderr: "", + code: 0, + signal: null, + killed: false, + }; + }); + + const cfg = { + inbound: { + reply: { + mode: "command" as const, + command: ["echo", "{{Body}}"], + }, + }, + }; + + await Promise.all([ + index.getReplyFromConfig( + { Body: "first", From: "+1", To: "+2" }, + undefined, + cfg, + runSpy, + ), + index.getReplyFromConfig( + { Body: "second", From: "+3", To: "+4" }, + undefined, + cfg, + runSpy, + ), + ]); + + expect(runSpy).toHaveBeenCalledTimes(2); + expect(maxActive).toBe(1); + }); }); describe("twilio interactions", () => { @@ -267,10 +312,10 @@ describe("twilio interactions", () => { it("sendTypingIndicator skips missing messageSid and sends when present", async () => { const client = twilioFactory._createClient(); - await index.sendTypingIndicator(client, undefined); + await index.sendTypingIndicator(client, index.defaultRuntime, undefined); expect(client.request).not.toHaveBeenCalled(); - await index.sendTypingIndicator(client, "SM123"); + await index.sendTypingIndicator(client, index.defaultRuntime, "SM123"); expect(client.request).toHaveBeenCalledWith( expect.objectContaining({ method: "post" }), ); diff --git a/src/process/command-queue.test.ts b/src/process/command-queue.test.ts new file mode 100644 index 000000000..9cd98edd7 --- /dev/null +++ b/src/process/command-queue.test.ts @@ -0,0 +1,58 @@ +import { describe, expect, it } from "vitest"; + +import { enqueueCommand, getQueueSize } from "./command-queue.js"; + +describe("command queue", () => { + it("runs tasks one at a time in order", async () => { + let active = 0; + let maxActive = 0; + const calls: number[] = []; + + const makeTask = (id: number) => async () => { + active += 1; + maxActive = Math.max(maxActive, active); + calls.push(id); + await new Promise((resolve) => setTimeout(resolve, 15)); + active -= 1; + return id; + }; + + const results = await Promise.all([ + enqueueCommand(makeTask(1)), + enqueueCommand(makeTask(2)), + enqueueCommand(makeTask(3)), + ]); + + expect(results).toEqual([1, 2, 3]); + expect(calls).toEqual([1, 2, 3]); + expect(maxActive).toBe(1); + expect(getQueueSize()).toBe(0); + }); + + it("invokes onWait callback when a task waits past the threshold", async () => { + let waited: number | null = null; + let queuedAhead: number | null = null; + + // First task holds the queue long enough to trigger wait notice. + const first = enqueueCommand(async () => { + await new Promise((resolve) => setTimeout(resolve, 30)); + }); + + const second = enqueueCommand( + async () => {}, + { + warnAfterMs: 5, + onWait: (ms, ahead) => { + waited = ms; + queuedAhead = ahead; + }, + }, + ); + + await Promise.all([first, second]); + + expect(waited).not.toBeNull(); + expect((waited as number)).toBeGreaterThanOrEqual(5); + expect(queuedAhead).toBe(0); + }); +}); diff --git a/src/process/command-queue.ts b/src/process/command-queue.ts new file mode 100644 index 000000000..07a0f1f92 --- /dev/null +++ b/src/process/command-queue.ts @@ -0,0 +1,58 @@ +// Minimal in-process queue to serialize command executions. +// Ensures only one command runs at a time across webhook, poller, and web inbox flows. + +type QueueEntry = { + task: () => Promise; + resolve: (value: unknown) => void; + reject: (reason?: unknown) => void; + enqueuedAt: number; + warnAfterMs: number; + onWait?: (waitMs: number, queuedAhead: number) => void; +}; + +const queue: QueueEntry[] = []; +let draining = false; + +async function drainQueue() { + if (draining) return; + draining = true; + while (queue.length) { + const entry = queue.shift() as QueueEntry; + const waitedMs = Date.now() - entry.enqueuedAt; + if (waitedMs >= entry.warnAfterMs) { + entry.onWait?.(waitedMs, queue.length); + } + try { + const result = await entry.task(); + entry.resolve(result); + } catch (err) { + entry.reject(err); + } + } + draining = false; +} + +export function enqueueCommand( + task: () => Promise, + opts?: { + warnAfterMs?: number; + onWait?: (waitMs: number, queuedAhead: number) => void; + }, +): Promise { + const warnAfterMs = opts?.warnAfterMs ?? 2_000; + return new Promise((resolve, reject) => { + queue.push({ + task: () => task(), + resolve: (value) => resolve(value as T), + reject, + enqueuedAt: Date.now(), + warnAfterMs, + onWait: opts?.onWait, + }); + void drainQueue(); + }); +} + +export function getQueueSize() { + return queue.length + (draining ? 1 : 0); +}