From 971b98c96d97e5a701e6e139fa93074b08ee116e Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Sat, 3 Jan 2026 04:44:36 +0100 Subject: [PATCH] test: cover new queue modes --- src/auto-reply/reply.block-streaming.test.ts | 2 + src/auto-reply/reply.directive.test.ts | 27 +++ src/auto-reply/reply.queue.test.ts | 168 +++++++++++++++++++ src/auto-reply/reply.triggers.test.ts | 2 + src/auto-reply/reply.ts | 10 +- 5 files changed, 206 insertions(+), 3 deletions(-) create mode 100644 src/auto-reply/reply.queue.test.ts diff --git a/src/auto-reply/reply.block-streaming.test.ts b/src/auto-reply/reply.block-streaming.test.ts index ee9796748..a09f795da 100644 --- a/src/auto-reply/reply.block-streaming.test.ts +++ b/src/auto-reply/reply.block-streaming.test.ts @@ -14,6 +14,8 @@ vi.mock("../agents/pi-embedded.js", () => ({ queueEmbeddedPiMessage: vi.fn().mockReturnValue(false), resolveEmbeddedSessionLane: (key: string) => `session:${key.trim() || "main"}`, + isEmbeddedPiRunActive: vi.fn().mockReturnValue(false), + isEmbeddedPiRunStreaming: vi.fn().mockReturnValue(false), })); vi.mock("../agents/model-catalog.js", () => ({ loadModelCatalog: vi.fn(), diff --git a/src/auto-reply/reply.directive.test.ts b/src/auto-reply/reply.directive.test.ts index 7dbc8fcc9..5f471f095 100644 --- a/src/auto-reply/reply.directive.test.ts +++ b/src/auto-reply/reply.directive.test.ts @@ -26,6 +26,8 @@ vi.mock("../agents/pi-embedded.js", () => ({ queueEmbeddedPiMessage: vi.fn().mockReturnValue(false), resolveEmbeddedSessionLane: (key: string) => `session:${key.trim() || "main"}`, + isEmbeddedPiRunActive: vi.fn().mockReturnValue(false), + isEmbeddedPiRunStreaming: vi.fn().mockReturnValue(false), })); vi.mock("../agents/model-catalog.js", () => ({ loadModelCatalog: vi.fn(), @@ -109,6 +111,31 @@ describe("directive parsing", () => { expect(res.cleaned).toBe("please now"); }); + it("errors on invalid queue options", async () => { + await withTempHome(async (home) => { + vi.mocked(runEmbeddedPiAgent).mockReset(); + + const res = await getReplyFromConfig( + { Body: "/queue collect debounce:bogus cap:zero drop:maybe", From: "+1222", To: "+1222" }, + {}, + { + agent: { + model: "anthropic/claude-opus-4-5", + workspace: path.join(home, "clawd"), + }, + whatsapp: { allowFrom: ["*"] }, + session: { store: path.join(home, "sessions.json") }, + }, + ); + + const text = Array.isArray(res) ? res[0]?.text : res?.text; + expect(text).toContain("Invalid debounce"); + expect(text).toContain("Invalid cap"); + expect(text).toContain("Invalid drop policy"); + expect(runEmbeddedPiAgent).not.toHaveBeenCalled(); + }); + }); + it("extracts reply_to_current tag", () => { const res = extractReplyToTag("ok [[reply_to_current]]", "msg-1"); expect(res.replyToId).toBe("msg-1"); diff --git a/src/auto-reply/reply.queue.test.ts b/src/auto-reply/reply.queue.test.ts new file mode 100644 index 000000000..6aafdeb71 --- /dev/null +++ b/src/auto-reply/reply.queue.test.ts @@ -0,0 +1,168 @@ +import fs from "node:fs/promises"; +import os from "node:os"; +import path from "node:path"; + +import { afterEach, describe, expect, it, vi } from "vitest"; + +import { + isEmbeddedPiRunActive, + isEmbeddedPiRunStreaming, + runEmbeddedPiAgent, +} from "../agents/pi-embedded.js"; +import { getReplyFromConfig } from "./reply.js"; + +vi.mock("../agents/pi-embedded.js", () => ({ + abortEmbeddedPiRun: vi.fn().mockReturnValue(false), + runEmbeddedPiAgent: vi.fn(), + queueEmbeddedPiMessage: vi.fn().mockReturnValue(false), + resolveEmbeddedSessionLane: (key: string) => + `session:${key.trim() || "main"}`, + isEmbeddedPiRunActive: vi.fn().mockReturnValue(false), + isEmbeddedPiRunStreaming: vi.fn().mockReturnValue(false), +})); + +function makeResult(text: string) { + return { + payloads: [{ text }], + meta: { + durationMs: 5, + agentMeta: { sessionId: "s", provider: "p", model: "m" }, + }, + }; +} + +async function withTempHome(fn: (home: string) => Promise): Promise { + const base = await fs.mkdtemp(path.join(os.tmpdir(), "clawdis-queue-")); + const previousHome = process.env.HOME; + process.env.HOME = base; + try { + vi.mocked(runEmbeddedPiAgent).mockReset(); + return await fn(base); + } finally { + process.env.HOME = previousHome; + try { + await fs.rm(base, { recursive: true, force: true }); + } catch (err) { + const msg = err instanceof Error ? err.message : String(err); + if (!msg.includes("ENOTEMPTY")) throw err; + } + } +} + +function makeCfg(home: string, queue?: Record) { + return { + agent: { + model: "anthropic/claude-opus-4-5", + workspace: path.join(home, "clawd"), + }, + whatsapp: { allowFrom: ["*"] }, + session: { store: path.join(home, "sessions.json") }, + routing: queue ? { queue } : undefined, + }; +} + +describe("queue followups", () => { + afterEach(() => { + vi.useRealTimers(); + }); + + it("collects queued messages and drains after run completes", async () => { + vi.useFakeTimers(); + await withTempHome(async (home) => { + const prompts: string[] = []; + vi.mocked(runEmbeddedPiAgent).mockImplementation(async (params) => { + prompts.push(params.prompt); + if (params.prompt.includes("[Queued messages while agent was busy]")) { + return makeResult("followup"); + } + return makeResult("main"); + }); + + vi.mocked(isEmbeddedPiRunActive).mockReturnValue(true); + vi.mocked(isEmbeddedPiRunStreaming).mockReturnValue(true); + + const cfg = makeCfg(home, { + mode: "collect", + debounceMs: 200, + cap: 10, + drop: "summarize", + }); + + const first = await getReplyFromConfig( + { Body: "first", From: "+1001", To: "+2000" }, + {}, + cfg, + ); + expect(first).toBeUndefined(); + expect(runEmbeddedPiAgent).not.toHaveBeenCalled(); + + vi.mocked(isEmbeddedPiRunActive).mockReturnValue(false); + vi.mocked(isEmbeddedPiRunStreaming).mockReturnValue(false); + + const second = await getReplyFromConfig( + { Body: "second", From: "+1001", To: "+2000" }, + {}, + cfg, + ); + + const secondText = Array.isArray(second) + ? second[0]?.text + : second?.text; + expect(secondText).toBe("main"); + + await vi.runAllTimersAsync(); + await Promise.resolve(); + + expect(runEmbeddedPiAgent).toHaveBeenCalledTimes(2); + expect( + prompts.some((p) => + p.includes("[Queued messages while agent was busy]"), + ), + ).toBe(true); + }); + }); + + it("summarizes dropped followups when cap is exceeded", async () => { + vi.useFakeTimers(); + await withTempHome(async (home) => { + const prompts: string[] = []; + vi.mocked(runEmbeddedPiAgent).mockImplementation(async (params) => { + prompts.push(params.prompt); + return makeResult("ok"); + }); + + vi.mocked(isEmbeddedPiRunActive).mockReturnValue(true); + vi.mocked(isEmbeddedPiRunStreaming).mockReturnValue(false); + + const cfg = makeCfg(home, { + mode: "followup", + debounceMs: 0, + cap: 1, + drop: "summarize", + }); + + await getReplyFromConfig( + { Body: "one", From: "+1002", To: "+2000" }, + {}, + cfg, + ); + await getReplyFromConfig( + { Body: "two", From: "+1002", To: "+2000" }, + {}, + cfg, + ); + + vi.mocked(isEmbeddedPiRunActive).mockReturnValue(false); + await getReplyFromConfig( + { Body: "three", From: "+1002", To: "+2000" }, + {}, + cfg, + ); + + await vi.runAllTimersAsync(); + await Promise.resolve(); + + expect(prompts.some((p) => p.includes("[Queue overflow]"))).toBe(true); + }); + }); +}); diff --git a/src/auto-reply/reply.triggers.test.ts b/src/auto-reply/reply.triggers.test.ts index 2fea53288..d60b308da 100644 --- a/src/auto-reply/reply.triggers.test.ts +++ b/src/auto-reply/reply.triggers.test.ts @@ -9,6 +9,8 @@ vi.mock("../agents/pi-embedded.js", () => ({ queueEmbeddedPiMessage: vi.fn().mockReturnValue(false), resolveEmbeddedSessionLane: (key: string) => `session:${key.trim() || "main"}`, + isEmbeddedPiRunActive: vi.fn().mockReturnValue(false), + isEmbeddedPiRunStreaming: vi.fn().mockReturnValue(false), })); import { runEmbeddedPiAgent } from "../agents/pi-embedded.js"; diff --git a/src/auto-reply/reply.ts b/src/auto-reply/reply.ts index 00adadb51..4d3ad67ba 100644 --- a/src/auto-reply/reply.ts +++ b/src/auto-reply/reply.ts @@ -218,9 +218,13 @@ function normalizeQueueDropPolicy(raw?: string): QueueDropPolicy | undefined { function parseQueueDebounce(raw?: string): number | undefined { if (!raw) return undefined; - const parsed = parseDurationMs(raw.trim(), { defaultUnit: "ms" }); - if (!parsed || parsed < 0) return undefined; - return Math.round(parsed); + try { + const parsed = parseDurationMs(raw.trim(), { defaultUnit: "ms" }); + if (!parsed || parsed < 0) return undefined; + return Math.round(parsed); + } catch { + return undefined; + } } function parseQueueCap(raw?: string): number | undefined {