test: cover new queue modes
This commit is contained in:
@@ -14,6 +14,8 @@ vi.mock("../agents/pi-embedded.js", () => ({
|
|||||||
queueEmbeddedPiMessage: vi.fn().mockReturnValue(false),
|
queueEmbeddedPiMessage: vi.fn().mockReturnValue(false),
|
||||||
resolveEmbeddedSessionLane: (key: string) =>
|
resolveEmbeddedSessionLane: (key: string) =>
|
||||||
`session:${key.trim() || "main"}`,
|
`session:${key.trim() || "main"}`,
|
||||||
|
isEmbeddedPiRunActive: vi.fn().mockReturnValue(false),
|
||||||
|
isEmbeddedPiRunStreaming: vi.fn().mockReturnValue(false),
|
||||||
}));
|
}));
|
||||||
vi.mock("../agents/model-catalog.js", () => ({
|
vi.mock("../agents/model-catalog.js", () => ({
|
||||||
loadModelCatalog: vi.fn(),
|
loadModelCatalog: vi.fn(),
|
||||||
|
|||||||
@@ -26,6 +26,8 @@ vi.mock("../agents/pi-embedded.js", () => ({
|
|||||||
queueEmbeddedPiMessage: vi.fn().mockReturnValue(false),
|
queueEmbeddedPiMessage: vi.fn().mockReturnValue(false),
|
||||||
resolveEmbeddedSessionLane: (key: string) =>
|
resolveEmbeddedSessionLane: (key: string) =>
|
||||||
`session:${key.trim() || "main"}`,
|
`session:${key.trim() || "main"}`,
|
||||||
|
isEmbeddedPiRunActive: vi.fn().mockReturnValue(false),
|
||||||
|
isEmbeddedPiRunStreaming: vi.fn().mockReturnValue(false),
|
||||||
}));
|
}));
|
||||||
vi.mock("../agents/model-catalog.js", () => ({
|
vi.mock("../agents/model-catalog.js", () => ({
|
||||||
loadModelCatalog: vi.fn(),
|
loadModelCatalog: vi.fn(),
|
||||||
@@ -109,6 +111,31 @@ describe("directive parsing", () => {
|
|||||||
expect(res.cleaned).toBe("please now");
|
expect(res.cleaned).toBe("please now");
|
||||||
});
|
});
|
||||||
|
|
||||||
|
it("errors on invalid queue options", async () => {
|
||||||
|
await withTempHome(async (home) => {
|
||||||
|
vi.mocked(runEmbeddedPiAgent).mockReset();
|
||||||
|
|
||||||
|
const res = await getReplyFromConfig(
|
||||||
|
{ Body: "/queue collect debounce:bogus cap:zero drop:maybe", From: "+1222", To: "+1222" },
|
||||||
|
{},
|
||||||
|
{
|
||||||
|
agent: {
|
||||||
|
model: "anthropic/claude-opus-4-5",
|
||||||
|
workspace: path.join(home, "clawd"),
|
||||||
|
},
|
||||||
|
whatsapp: { allowFrom: ["*"] },
|
||||||
|
session: { store: path.join(home, "sessions.json") },
|
||||||
|
},
|
||||||
|
);
|
||||||
|
|
||||||
|
const text = Array.isArray(res) ? res[0]?.text : res?.text;
|
||||||
|
expect(text).toContain("Invalid debounce");
|
||||||
|
expect(text).toContain("Invalid cap");
|
||||||
|
expect(text).toContain("Invalid drop policy");
|
||||||
|
expect(runEmbeddedPiAgent).not.toHaveBeenCalled();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
it("extracts reply_to_current tag", () => {
|
it("extracts reply_to_current tag", () => {
|
||||||
const res = extractReplyToTag("ok [[reply_to_current]]", "msg-1");
|
const res = extractReplyToTag("ok [[reply_to_current]]", "msg-1");
|
||||||
expect(res.replyToId).toBe("msg-1");
|
expect(res.replyToId).toBe("msg-1");
|
||||||
|
|||||||
168
src/auto-reply/reply.queue.test.ts
Normal file
168
src/auto-reply/reply.queue.test.ts
Normal file
@@ -0,0 +1,168 @@
|
|||||||
|
import fs from "node:fs/promises";
|
||||||
|
import os from "node:os";
|
||||||
|
import path from "node:path";
|
||||||
|
|
||||||
|
import { afterEach, describe, expect, it, vi } from "vitest";
|
||||||
|
|
||||||
|
import {
|
||||||
|
isEmbeddedPiRunActive,
|
||||||
|
isEmbeddedPiRunStreaming,
|
||||||
|
runEmbeddedPiAgent,
|
||||||
|
} from "../agents/pi-embedded.js";
|
||||||
|
import { getReplyFromConfig } from "./reply.js";
|
||||||
|
|
||||||
|
vi.mock("../agents/pi-embedded.js", () => ({
|
||||||
|
abortEmbeddedPiRun: vi.fn().mockReturnValue(false),
|
||||||
|
runEmbeddedPiAgent: vi.fn(),
|
||||||
|
queueEmbeddedPiMessage: vi.fn().mockReturnValue(false),
|
||||||
|
resolveEmbeddedSessionLane: (key: string) =>
|
||||||
|
`session:${key.trim() || "main"}`,
|
||||||
|
isEmbeddedPiRunActive: vi.fn().mockReturnValue(false),
|
||||||
|
isEmbeddedPiRunStreaming: vi.fn().mockReturnValue(false),
|
||||||
|
}));
|
||||||
|
|
||||||
|
function makeResult(text: string) {
|
||||||
|
return {
|
||||||
|
payloads: [{ text }],
|
||||||
|
meta: {
|
||||||
|
durationMs: 5,
|
||||||
|
agentMeta: { sessionId: "s", provider: "p", model: "m" },
|
||||||
|
},
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
async function withTempHome<T>(fn: (home: string) => Promise<T>): Promise<T> {
|
||||||
|
const base = await fs.mkdtemp(path.join(os.tmpdir(), "clawdis-queue-"));
|
||||||
|
const previousHome = process.env.HOME;
|
||||||
|
process.env.HOME = base;
|
||||||
|
try {
|
||||||
|
vi.mocked(runEmbeddedPiAgent).mockReset();
|
||||||
|
return await fn(base);
|
||||||
|
} finally {
|
||||||
|
process.env.HOME = previousHome;
|
||||||
|
try {
|
||||||
|
await fs.rm(base, { recursive: true, force: true });
|
||||||
|
} catch (err) {
|
||||||
|
const msg = err instanceof Error ? err.message : String(err);
|
||||||
|
if (!msg.includes("ENOTEMPTY")) throw err;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
function makeCfg(home: string, queue?: Record<string, unknown>) {
|
||||||
|
return {
|
||||||
|
agent: {
|
||||||
|
model: "anthropic/claude-opus-4-5",
|
||||||
|
workspace: path.join(home, "clawd"),
|
||||||
|
},
|
||||||
|
whatsapp: { allowFrom: ["*"] },
|
||||||
|
session: { store: path.join(home, "sessions.json") },
|
||||||
|
routing: queue ? { queue } : undefined,
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
describe("queue followups", () => {
|
||||||
|
afterEach(() => {
|
||||||
|
vi.useRealTimers();
|
||||||
|
});
|
||||||
|
|
||||||
|
it("collects queued messages and drains after run completes", async () => {
|
||||||
|
vi.useFakeTimers();
|
||||||
|
await withTempHome(async (home) => {
|
||||||
|
const prompts: string[] = [];
|
||||||
|
vi.mocked(runEmbeddedPiAgent).mockImplementation(async (params) => {
|
||||||
|
prompts.push(params.prompt);
|
||||||
|
if (params.prompt.includes("[Queued messages while agent was busy]")) {
|
||||||
|
return makeResult("followup");
|
||||||
|
}
|
||||||
|
return makeResult("main");
|
||||||
|
});
|
||||||
|
|
||||||
|
vi.mocked(isEmbeddedPiRunActive).mockReturnValue(true);
|
||||||
|
vi.mocked(isEmbeddedPiRunStreaming).mockReturnValue(true);
|
||||||
|
|
||||||
|
const cfg = makeCfg(home, {
|
||||||
|
mode: "collect",
|
||||||
|
debounceMs: 200,
|
||||||
|
cap: 10,
|
||||||
|
drop: "summarize",
|
||||||
|
});
|
||||||
|
|
||||||
|
const first = await getReplyFromConfig(
|
||||||
|
{ Body: "first", From: "+1001", To: "+2000" },
|
||||||
|
{},
|
||||||
|
cfg,
|
||||||
|
);
|
||||||
|
expect(first).toBeUndefined();
|
||||||
|
expect(runEmbeddedPiAgent).not.toHaveBeenCalled();
|
||||||
|
|
||||||
|
vi.mocked(isEmbeddedPiRunActive).mockReturnValue(false);
|
||||||
|
vi.mocked(isEmbeddedPiRunStreaming).mockReturnValue(false);
|
||||||
|
|
||||||
|
const second = await getReplyFromConfig(
|
||||||
|
{ Body: "second", From: "+1001", To: "+2000" },
|
||||||
|
{},
|
||||||
|
cfg,
|
||||||
|
);
|
||||||
|
|
||||||
|
const secondText = Array.isArray(second)
|
||||||
|
? second[0]?.text
|
||||||
|
: second?.text;
|
||||||
|
expect(secondText).toBe("main");
|
||||||
|
|
||||||
|
await vi.runAllTimersAsync();
|
||||||
|
await Promise.resolve();
|
||||||
|
|
||||||
|
expect(runEmbeddedPiAgent).toHaveBeenCalledTimes(2);
|
||||||
|
expect(
|
||||||
|
prompts.some((p) =>
|
||||||
|
p.includes("[Queued messages while agent was busy]"),
|
||||||
|
),
|
||||||
|
).toBe(true);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
it("summarizes dropped followups when cap is exceeded", async () => {
|
||||||
|
vi.useFakeTimers();
|
||||||
|
await withTempHome(async (home) => {
|
||||||
|
const prompts: string[] = [];
|
||||||
|
vi.mocked(runEmbeddedPiAgent).mockImplementation(async (params) => {
|
||||||
|
prompts.push(params.prompt);
|
||||||
|
return makeResult("ok");
|
||||||
|
});
|
||||||
|
|
||||||
|
vi.mocked(isEmbeddedPiRunActive).mockReturnValue(true);
|
||||||
|
vi.mocked(isEmbeddedPiRunStreaming).mockReturnValue(false);
|
||||||
|
|
||||||
|
const cfg = makeCfg(home, {
|
||||||
|
mode: "followup",
|
||||||
|
debounceMs: 0,
|
||||||
|
cap: 1,
|
||||||
|
drop: "summarize",
|
||||||
|
});
|
||||||
|
|
||||||
|
await getReplyFromConfig(
|
||||||
|
{ Body: "one", From: "+1002", To: "+2000" },
|
||||||
|
{},
|
||||||
|
cfg,
|
||||||
|
);
|
||||||
|
await getReplyFromConfig(
|
||||||
|
{ Body: "two", From: "+1002", To: "+2000" },
|
||||||
|
{},
|
||||||
|
cfg,
|
||||||
|
);
|
||||||
|
|
||||||
|
vi.mocked(isEmbeddedPiRunActive).mockReturnValue(false);
|
||||||
|
await getReplyFromConfig(
|
||||||
|
{ Body: "three", From: "+1002", To: "+2000" },
|
||||||
|
{},
|
||||||
|
cfg,
|
||||||
|
);
|
||||||
|
|
||||||
|
await vi.runAllTimersAsync();
|
||||||
|
await Promise.resolve();
|
||||||
|
|
||||||
|
expect(prompts.some((p) => p.includes("[Queue overflow]"))).toBe(true);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
});
|
||||||
@@ -9,6 +9,8 @@ vi.mock("../agents/pi-embedded.js", () => ({
|
|||||||
queueEmbeddedPiMessage: vi.fn().mockReturnValue(false),
|
queueEmbeddedPiMessage: vi.fn().mockReturnValue(false),
|
||||||
resolveEmbeddedSessionLane: (key: string) =>
|
resolveEmbeddedSessionLane: (key: string) =>
|
||||||
`session:${key.trim() || "main"}`,
|
`session:${key.trim() || "main"}`,
|
||||||
|
isEmbeddedPiRunActive: vi.fn().mockReturnValue(false),
|
||||||
|
isEmbeddedPiRunStreaming: vi.fn().mockReturnValue(false),
|
||||||
}));
|
}));
|
||||||
|
|
||||||
import { runEmbeddedPiAgent } from "../agents/pi-embedded.js";
|
import { runEmbeddedPiAgent } from "../agents/pi-embedded.js";
|
||||||
|
|||||||
@@ -218,9 +218,13 @@ function normalizeQueueDropPolicy(raw?: string): QueueDropPolicy | undefined {
|
|||||||
|
|
||||||
function parseQueueDebounce(raw?: string): number | undefined {
|
function parseQueueDebounce(raw?: string): number | undefined {
|
||||||
if (!raw) return undefined;
|
if (!raw) return undefined;
|
||||||
const parsed = parseDurationMs(raw.trim(), { defaultUnit: "ms" });
|
try {
|
||||||
if (!parsed || parsed < 0) return undefined;
|
const parsed = parseDurationMs(raw.trim(), { defaultUnit: "ms" });
|
||||||
return Math.round(parsed);
|
if (!parsed || parsed < 0) return undefined;
|
||||||
|
return Math.round(parsed);
|
||||||
|
} catch {
|
||||||
|
return undefined;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
function parseQueueCap(raw?: string): number | undefined {
|
function parseQueueCap(raw?: string): number | undefined {
|
||||||
|
|||||||
Reference in New Issue
Block a user