web: isolate session fixtures and skip heartbeat when busy

This commit is contained in:
Peter Steinberger
2025-12-02 06:15:20 +00:00
parent 1b0e1edb08
commit 2fc3a822c8
3 changed files with 123 additions and 41 deletions

View File

@@ -22,6 +22,12 @@
### Changes
- **Manual heartbeat sends:** `warelay heartbeat` accepts `--message/--body` with `--provider web|twilio` to push real outbound messages through the same plumbing; `--dry-run` previews payloads without sending.
## Unreleased
### Changes
- **Heartbeat backpressure:** Web reply heartbeats now check the shared command queue and skip while any command/Claude runs are in flight, preventing concurrent prompts during long-running requests.
- **Isolated session fixtures in web tests:** Heartbeat/auto-reply tests now create temporary session stores instead of using the default `~/.warelay/sessions.json`, preventing local config pollution during test runs.
## 1.2.1 — 2025-11-28
### Changes

View File

@@ -13,7 +13,7 @@ import sharp from "sharp";
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
import type { WarelayConfig } from "../config/config.js";
import { resolveStorePath } from "../config/sessions.js";
import * as commandQueue from "../process/command-queue.js";
import { resetLogger, setLoggerOverride } from "../logging.js";
import {
HEARTBEAT_PROMPT,
@@ -26,6 +26,18 @@ import {
} from "./auto-reply.js";
import type { sendMessageWeb } from "./outbound.js";
const makeSessionStore = async (
entries: Record<string, unknown> = {},
): Promise<{ storePath: string; cleanup: () => Promise<void> }> => {
const dir = await fs.mkdtemp(path.join(os.tmpdir(), "warelay-session-"));
const storePath = path.join(dir, "sessions.json");
await fs.writeFile(storePath, JSON.stringify(entries));
return {
storePath,
cleanup: () => fs.rm(dir, { recursive: true, force: true }),
};
};
describe("heartbeat helpers", () => {
it("strips heartbeat token and skips when only token", () => {
expect(stripHeartbeatToken(undefined)).toEqual({
@@ -80,19 +92,9 @@ describe("heartbeat helpers", () => {
});
describe("resolveHeartbeatRecipients", () => {
const makeStore = async (entries: Record<string, { updatedAt: number }>) => {
const dir = await fs.mkdtemp(path.join(os.tmpdir(), "warelay-heartbeat-"));
const storePath = path.join(dir, "sessions.json");
await fs.writeFile(storePath, JSON.stringify(entries));
return {
storePath,
cleanup: async () => fs.rm(dir, { recursive: true, force: true }),
};
};
it("returns the sole session recipient", async () => {
const now = Date.now();
const store = await makeStore({ "+1000": { updatedAt: now } });
const store = await makeSessionStore({ "+1000": { updatedAt: now } });
const cfg: WarelayConfig = {
inbound: {
allowFrom: ["+1999"],
@@ -107,7 +109,7 @@ describe("resolveHeartbeatRecipients", () => {
it("surfaces ambiguity when multiple sessions exist", async () => {
const now = Date.now();
const store = await makeStore({
const store = await makeSessionStore({
"+1000": { updatedAt: now },
"+2000": { updatedAt: now - 10 },
});
@@ -124,7 +126,7 @@ describe("resolveHeartbeatRecipients", () => {
});
it("filters wildcard allowFrom when no sessions exist", async () => {
const store = await makeStore({});
const store = await makeSessionStore({});
const cfg: WarelayConfig = {
inbound: {
allowFrom: ["*"],
@@ -139,7 +141,7 @@ describe("resolveHeartbeatRecipients", () => {
it("merges sessions and allowFrom when --all is set", async () => {
const now = Date.now();
const store = await makeStore({ "+1000": { updatedAt: now } });
const store = await makeSessionStore({ "+1000": { updatedAt: now } });
const cfg: WarelayConfig = {
inbound: {
allowFrom: ["+1999"],
@@ -155,12 +157,16 @@ describe("resolveHeartbeatRecipients", () => {
describe("runWebHeartbeatOnce", () => {
it("skips when heartbeat token returned", async () => {
const store = await makeSessionStore();
const sender: typeof sendMessageWeb = vi.fn();
const resolver = vi.fn(async () => ({ text: HEARTBEAT_TOKEN }));
setLoadConfigMock({
inbound: { allowFrom: ["+1555"], reply: { mode: "command" } },
});
await runWebHeartbeatOnce({
cfg: {
inbound: {
allowFrom: ["+1555"],
reply: { mode: "command", session: { store: store.storePath } },
},
},
to: "+1555",
verbose: false,
sender,
@@ -168,55 +174,58 @@ describe("runWebHeartbeatOnce", () => {
});
expect(resolver).toHaveBeenCalled();
expect(sender).not.toHaveBeenCalled();
await store.cleanup();
});
it("sends when alert text present", async () => {
const store = await makeSessionStore();
const sender: typeof sendMessageWeb = vi
.fn()
.mockResolvedValue({ messageId: "m1", toJid: "jid" });
const resolver = vi.fn(async () => ({ text: "ALERT" }));
setLoadConfigMock({
inbound: { allowFrom: ["+1555"], reply: { mode: "command" } },
});
await runWebHeartbeatOnce({
cfg: {
inbound: {
allowFrom: ["+1555"],
reply: { mode: "command", session: { store: store.storePath } },
},
},
to: "+1555",
verbose: false,
sender,
replyResolver: resolver,
});
expect(sender).toHaveBeenCalledWith("+1555", "ALERT", { verbose: false });
await store.cleanup();
});
it("falls back to most recent session when no to is provided", async () => {
// Use temp directory to avoid corrupting production sessions.json
const tmpDir = await fs.mkdtemp(
path.join(os.tmpdir(), "warelay-fallback-session-"),
);
const storePath = path.join(tmpDir, "sessions.json");
const store = await makeSessionStore();
const storePath = store.storePath;
const sender: typeof sendMessageWeb = vi
.fn()
.mockResolvedValue({ messageId: "m1", toJid: "jid" });
const resolver = vi.fn(async () => ({ text: "ALERT" }));
// Seed session store
const now = Date.now();
const store = {
const sessionEntries = {
"+1222": { sessionId: "s1", updatedAt: now - 1000 },
"+1333": { sessionId: "s2", updatedAt: now },
};
await fs.writeFile(storePath, JSON.stringify(store));
setLoadConfigMock({
inbound: {
allowFrom: ["+1999"],
reply: { mode: "command", session: { store: storePath } },
},
});
await fs.writeFile(storePath, JSON.stringify(sessionEntries));
await runWebHeartbeatOnce({
cfg: {
inbound: {
allowFrom: ["+1999"],
reply: { mode: "command", session: { store: storePath } },
},
},
to: "+1999",
verbose: false,
sender,
replyResolver: resolver,
});
expect(sender).toHaveBeenCalledWith("+1999", "ALERT", { verbose: false });
await store.cleanup();
});
it("does not refresh updatedAt when heartbeat is skipped", async () => {
@@ -356,14 +365,18 @@ describe("runWebHeartbeatOnce", () => {
});
it("sends overrideBody directly and skips resolver", async () => {
const store = await makeSessionStore();
const sender: typeof sendMessageWeb = vi
.fn()
.mockResolvedValue({ messageId: "m1", toJid: "jid" });
const resolver = vi.fn();
setLoadConfigMock({
inbound: { allowFrom: ["+1555"], reply: { mode: "command" } },
});
await runWebHeartbeatOnce({
cfg: {
inbound: {
allowFrom: ["+1555"],
reply: { mode: "command", session: { store: store.storePath } },
},
},
to: "+1555",
verbose: false,
sender,
@@ -374,15 +387,20 @@ describe("runWebHeartbeatOnce", () => {
verbose: false,
});
expect(resolver).not.toHaveBeenCalled();
await store.cleanup();
});
it("dry-run overrideBody prints and skips send", async () => {
const store = await makeSessionStore();
const sender: typeof sendMessageWeb = vi.fn();
const resolver = vi.fn();
setLoadConfigMock({
inbound: { allowFrom: ["+1555"], reply: { mode: "command" } },
});
await runWebHeartbeatOnce({
cfg: {
inbound: {
allowFrom: ["+1555"],
reply: { mode: "command", session: { store: store.storePath } },
},
},
to: "+1555",
verbose: false,
sender,
@@ -392,6 +410,7 @@ describe("runWebHeartbeatOnce", () => {
});
expect(sender).not.toHaveBeenCalled();
expect(resolver).not.toHaveBeenCalled();
await store.cleanup();
});
});
@@ -507,6 +526,53 @@ describe("web auto-reply", () => {
);
});
it("skips reply heartbeat when requests are running", async () => {
const tmpDir = await fs.mkdtemp(
path.join(os.tmpdir(), "warelay-heartbeat-queue-"),
);
const storePath = path.join(tmpDir, "sessions.json");
await fs.writeFile(storePath, JSON.stringify({}));
const queueSpy = vi
.spyOn(commandQueue, "getQueueSize")
.mockReturnValue(2);
const replyResolver = vi.fn();
const listenerFactory = vi.fn(async () => {
const onClose = new Promise<void>(() => {
// stay open until aborted
});
return { close: vi.fn(), onClose };
});
const runtime = { log: vi.fn(), error: vi.fn(), exit: vi.fn() } as never;
setLoadConfigMock(() => ({
inbound: {
allowFrom: ["+1555"],
reply: { mode: "command", session: { store: storePath } },
},
}));
const controller = new AbortController();
const run = monitorWebProvider(
false,
listenerFactory,
true,
replyResolver,
runtime,
controller.signal,
{ replyHeartbeatMinutes: 1, replyHeartbeatNow: true },
);
try {
await Promise.resolve();
controller.abort();
await run;
expect(replyResolver).not.toHaveBeenCalled();
} finally {
queueSpy.mockRestore();
}
});
it("falls back to text when media send fails", async () => {
const sendMedia = vi.fn().mockRejectedValue(new Error("boom"));
const reply = vi.fn().mockResolvedValue(undefined);

View File

@@ -17,6 +17,7 @@ import { normalizeE164 } from "../utils.js";
import { monitorWebInbox } from "./inbound.js";
import { loadWebMedia } from "./media.js";
import { sendMessageWeb } from "./outbound.js";
import { getQueueSize } from "../process/command-queue.js";
import {
computeBackoff,
newConnectionId,
@@ -739,6 +740,15 @@ export async function monitorWebProvider(
}
const runReplyHeartbeat = async () => {
const queued = getQueueSize();
if (queued > 0) {
heartbeatLogger.info(
{ connectionId, reason: "requests-in-flight", queued },
"reply heartbeat skipped",
);
console.log(success("heartbeat: skipped (requests in flight)"));
return;
}
if (!replyHeartbeatMinutes) return;
const tickStart = Date.now();
if (!lastInboundMsg) {