diff --git a/CHANGELOG.md b/CHANGELOG.md index 696203b9f..c66c5f794 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -22,6 +22,12 @@ ### Changes - **Manual heartbeat sends:** `warelay heartbeat` accepts `--message/--body` with `--provider web|twilio` to push real outbound messages through the same plumbing; `--dry-run` previews payloads without sending. +## Unreleased + +### Changes +- **Heartbeat backpressure:** Web reply heartbeats now check the shared command queue and skip while any command/Claude runs are in flight, preventing concurrent prompts during long-running requests. +- **Isolated session fixtures in web tests:** Heartbeat/auto-reply tests now create temporary session stores instead of using the default `~/.warelay/sessions.json`, preventing local config pollution during test runs. + ## 1.2.1 — 2025-11-28 ### Changes diff --git a/src/web/auto-reply.test.ts b/src/web/auto-reply.test.ts index ff59e05d9..059c6e570 100644 --- a/src/web/auto-reply.test.ts +++ b/src/web/auto-reply.test.ts @@ -13,7 +13,7 @@ import sharp from "sharp"; import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; import type { WarelayConfig } from "../config/config.js"; -import { resolveStorePath } from "../config/sessions.js"; +import * as commandQueue from "../process/command-queue.js"; import { resetLogger, setLoggerOverride } from "../logging.js"; import { HEARTBEAT_PROMPT, @@ -26,6 +26,18 @@ import { } from "./auto-reply.js"; import type { sendMessageWeb } from "./outbound.js"; +const makeSessionStore = async ( + entries: Record = {}, +): Promise<{ storePath: string; cleanup: () => Promise }> => { + const dir = await fs.mkdtemp(path.join(os.tmpdir(), "warelay-session-")); + const storePath = path.join(dir, "sessions.json"); + await fs.writeFile(storePath, JSON.stringify(entries)); + return { + storePath, + cleanup: () => fs.rm(dir, { recursive: true, force: true }), + }; +}; + describe("heartbeat helpers", () => { it("strips heartbeat token and skips when only token", () => { expect(stripHeartbeatToken(undefined)).toEqual({ @@ -80,19 +92,9 @@ describe("heartbeat helpers", () => { }); describe("resolveHeartbeatRecipients", () => { - const makeStore = async (entries: Record) => { - const dir = await fs.mkdtemp(path.join(os.tmpdir(), "warelay-heartbeat-")); - const storePath = path.join(dir, "sessions.json"); - await fs.writeFile(storePath, JSON.stringify(entries)); - return { - storePath, - cleanup: async () => fs.rm(dir, { recursive: true, force: true }), - }; - }; - it("returns the sole session recipient", async () => { const now = Date.now(); - const store = await makeStore({ "+1000": { updatedAt: now } }); + const store = await makeSessionStore({ "+1000": { updatedAt: now } }); const cfg: WarelayConfig = { inbound: { allowFrom: ["+1999"], @@ -107,7 +109,7 @@ describe("resolveHeartbeatRecipients", () => { it("surfaces ambiguity when multiple sessions exist", async () => { const now = Date.now(); - const store = await makeStore({ + const store = await makeSessionStore({ "+1000": { updatedAt: now }, "+2000": { updatedAt: now - 10 }, }); @@ -124,7 +126,7 @@ describe("resolveHeartbeatRecipients", () => { }); it("filters wildcard allowFrom when no sessions exist", async () => { - const store = await makeStore({}); + const store = await makeSessionStore({}); const cfg: WarelayConfig = { inbound: { allowFrom: ["*"], @@ -139,7 +141,7 @@ describe("resolveHeartbeatRecipients", () => { it("merges sessions and allowFrom when --all is set", async () => { const now = Date.now(); - const store = await makeStore({ "+1000": { updatedAt: now } }); + const store = await makeSessionStore({ "+1000": { updatedAt: now } }); const cfg: WarelayConfig = { inbound: { allowFrom: ["+1999"], @@ -155,12 +157,16 @@ describe("resolveHeartbeatRecipients", () => { describe("runWebHeartbeatOnce", () => { it("skips when heartbeat token returned", async () => { + const store = await makeSessionStore(); const sender: typeof sendMessageWeb = vi.fn(); const resolver = vi.fn(async () => ({ text: HEARTBEAT_TOKEN })); - setLoadConfigMock({ - inbound: { allowFrom: ["+1555"], reply: { mode: "command" } }, - }); await runWebHeartbeatOnce({ + cfg: { + inbound: { + allowFrom: ["+1555"], + reply: { mode: "command", session: { store: store.storePath } }, + }, + }, to: "+1555", verbose: false, sender, @@ -168,55 +174,58 @@ describe("runWebHeartbeatOnce", () => { }); expect(resolver).toHaveBeenCalled(); expect(sender).not.toHaveBeenCalled(); + await store.cleanup(); }); it("sends when alert text present", async () => { + const store = await makeSessionStore(); const sender: typeof sendMessageWeb = vi .fn() .mockResolvedValue({ messageId: "m1", toJid: "jid" }); const resolver = vi.fn(async () => ({ text: "ALERT" })); - setLoadConfigMock({ - inbound: { allowFrom: ["+1555"], reply: { mode: "command" } }, - }); await runWebHeartbeatOnce({ + cfg: { + inbound: { + allowFrom: ["+1555"], + reply: { mode: "command", session: { store: store.storePath } }, + }, + }, to: "+1555", verbose: false, sender, replyResolver: resolver, }); expect(sender).toHaveBeenCalledWith("+1555", "ALERT", { verbose: false }); + await store.cleanup(); }); it("falls back to most recent session when no to is provided", async () => { - // Use temp directory to avoid corrupting production sessions.json - const tmpDir = await fs.mkdtemp( - path.join(os.tmpdir(), "warelay-fallback-session-"), - ); - const storePath = path.join(tmpDir, "sessions.json"); + const store = await makeSessionStore(); + const storePath = store.storePath; const sender: typeof sendMessageWeb = vi .fn() .mockResolvedValue({ messageId: "m1", toJid: "jid" }); const resolver = vi.fn(async () => ({ text: "ALERT" })); - // Seed session store const now = Date.now(); - const store = { + const sessionEntries = { "+1222": { sessionId: "s1", updatedAt: now - 1000 }, "+1333": { sessionId: "s2", updatedAt: now }, }; - await fs.writeFile(storePath, JSON.stringify(store)); - setLoadConfigMock({ - inbound: { - allowFrom: ["+1999"], - reply: { mode: "command", session: { store: storePath } }, - }, - }); + await fs.writeFile(storePath, JSON.stringify(sessionEntries)); await runWebHeartbeatOnce({ + cfg: { + inbound: { + allowFrom: ["+1999"], + reply: { mode: "command", session: { store: storePath } }, + }, + }, to: "+1999", verbose: false, sender, replyResolver: resolver, }); expect(sender).toHaveBeenCalledWith("+1999", "ALERT", { verbose: false }); + await store.cleanup(); }); it("does not refresh updatedAt when heartbeat is skipped", async () => { @@ -356,14 +365,18 @@ describe("runWebHeartbeatOnce", () => { }); it("sends overrideBody directly and skips resolver", async () => { + const store = await makeSessionStore(); const sender: typeof sendMessageWeb = vi .fn() .mockResolvedValue({ messageId: "m1", toJid: "jid" }); const resolver = vi.fn(); - setLoadConfigMock({ - inbound: { allowFrom: ["+1555"], reply: { mode: "command" } }, - }); await runWebHeartbeatOnce({ + cfg: { + inbound: { + allowFrom: ["+1555"], + reply: { mode: "command", session: { store: store.storePath } }, + }, + }, to: "+1555", verbose: false, sender, @@ -374,15 +387,20 @@ describe("runWebHeartbeatOnce", () => { verbose: false, }); expect(resolver).not.toHaveBeenCalled(); + await store.cleanup(); }); it("dry-run overrideBody prints and skips send", async () => { + const store = await makeSessionStore(); const sender: typeof sendMessageWeb = vi.fn(); const resolver = vi.fn(); - setLoadConfigMock({ - inbound: { allowFrom: ["+1555"], reply: { mode: "command" } }, - }); await runWebHeartbeatOnce({ + cfg: { + inbound: { + allowFrom: ["+1555"], + reply: { mode: "command", session: { store: store.storePath } }, + }, + }, to: "+1555", verbose: false, sender, @@ -392,6 +410,7 @@ describe("runWebHeartbeatOnce", () => { }); expect(sender).not.toHaveBeenCalled(); expect(resolver).not.toHaveBeenCalled(); + await store.cleanup(); }); }); @@ -507,6 +526,53 @@ describe("web auto-reply", () => { ); }); + it("skips reply heartbeat when requests are running", async () => { + const tmpDir = await fs.mkdtemp( + path.join(os.tmpdir(), "warelay-heartbeat-queue-"), + ); + const storePath = path.join(tmpDir, "sessions.json"); + await fs.writeFile(storePath, JSON.stringify({})); + + const queueSpy = vi + .spyOn(commandQueue, "getQueueSize") + .mockReturnValue(2); + const replyResolver = vi.fn(); + const listenerFactory = vi.fn(async () => { + const onClose = new Promise(() => { + // stay open until aborted + }); + return { close: vi.fn(), onClose }; + }); + const runtime = { log: vi.fn(), error: vi.fn(), exit: vi.fn() } as never; + + setLoadConfigMock(() => ({ + inbound: { + allowFrom: ["+1555"], + reply: { mode: "command", session: { store: storePath } }, + }, + })); + + const controller = new AbortController(); + const run = monitorWebProvider( + false, + listenerFactory, + true, + replyResolver, + runtime, + controller.signal, + { replyHeartbeatMinutes: 1, replyHeartbeatNow: true }, + ); + + try { + await Promise.resolve(); + controller.abort(); + await run; + expect(replyResolver).not.toHaveBeenCalled(); + } finally { + queueSpy.mockRestore(); + } + }); + it("falls back to text when media send fails", async () => { const sendMedia = vi.fn().mockRejectedValue(new Error("boom")); const reply = vi.fn().mockResolvedValue(undefined); diff --git a/src/web/auto-reply.ts b/src/web/auto-reply.ts index 858a3f7a8..5fdcbbe10 100644 --- a/src/web/auto-reply.ts +++ b/src/web/auto-reply.ts @@ -17,6 +17,7 @@ import { normalizeE164 } from "../utils.js"; import { monitorWebInbox } from "./inbound.js"; import { loadWebMedia } from "./media.js"; import { sendMessageWeb } from "./outbound.js"; +import { getQueueSize } from "../process/command-queue.js"; import { computeBackoff, newConnectionId, @@ -739,6 +740,15 @@ export async function monitorWebProvider( } const runReplyHeartbeat = async () => { + const queued = getQueueSize(); + if (queued > 0) { + heartbeatLogger.info( + { connectionId, reason: "requests-in-flight", queued }, + "reply heartbeat skipped", + ); + console.log(success("heartbeat: skipped (requests in flight)")); + return; + } if (!replyHeartbeatMinutes) return; const tickStart = Date.now(); if (!lastInboundMsg) {