web: add heartbeat and bounded reconnect tuning

This commit is contained in:
Peter Steinberger
2025-11-26 02:34:43 +01:00
parent e482e7768b
commit baf20af17f
19 changed files with 541 additions and 63 deletions

View File

@@ -25,8 +25,8 @@ describe("web auto-reply", () => {
});
it("reconnects after a connection close", async () => {
vi.useFakeTimers();
const closeResolvers: Array<() => void> = [];
const sleep = vi.fn(async () => {});
const listenerFactory = vi.fn(async () => {
let _resolve!: () => void;
const onClose = new Promise<void>((res) => {
@@ -48,25 +48,78 @@ describe("web auto-reply", () => {
async () => ({ text: "ok" }),
runtime as never,
controller.signal,
{
heartbeatSeconds: 1,
reconnect: { initialMs: 10, maxMs: 10, maxAttempts: 3, factor: 1.1 },
sleep,
},
);
await Promise.resolve();
expect(listenerFactory).toHaveBeenCalledTimes(1);
closeResolvers[0]?.();
await Promise.resolve();
await vi.runOnlyPendingTimersAsync();
const waitForSecondCall = async () => {
const started = Date.now();
while (listenerFactory.mock.calls.length < 2 && Date.now() - started < 200) {
await new Promise((resolve) => setTimeout(resolve, 10));
}
};
await waitForSecondCall();
expect(listenerFactory).toHaveBeenCalledTimes(2);
expect(runtime.error).toHaveBeenCalledWith(
expect.stringContaining("Reconnecting"),
expect.stringContaining("Retry 1"),
);
controller.abort();
closeResolvers[1]?.();
await vi.runAllTimersAsync();
await new Promise((resolve) => setTimeout(resolve, 5));
await run;
});
it("stops after hitting max reconnect attempts", async () => {
const closeResolvers: Array<() => void> = [];
const sleep = vi.fn(async () => {});
const listenerFactory = vi.fn(async () => {
const onClose = new Promise<void>((res) => closeResolvers.push(res));
return { close: vi.fn(), onClose };
});
const runtime = {
log: vi.fn(),
error: vi.fn(),
exit: vi.fn(),
};
const run = monitorWebProvider(
false,
listenerFactory,
true,
async () => ({ text: "ok" }),
runtime as never,
undefined,
{
heartbeatSeconds: 1,
reconnect: { initialMs: 5, maxMs: 5, maxAttempts: 2, factor: 1.1 },
sleep,
},
);
await Promise.resolve();
expect(listenerFactory).toHaveBeenCalledTimes(1);
closeResolvers.shift()?.();
await new Promise((resolve) => setTimeout(resolve, 15));
expect(listenerFactory).toHaveBeenCalledTimes(2);
closeResolvers.shift()?.();
await new Promise((resolve) => setTimeout(resolve, 15));
await run;
expect(runtime.error).toHaveBeenCalledWith(
expect.stringContaining("Reached max retries"),
);
});
it("falls back to text when media send fails", async () => {
const sendMedia = vi.fn().mockRejectedValue(new Error("boom"));
const reply = vi.fn().mockResolvedValue(undefined);
@@ -431,6 +484,49 @@ describe("web auto-reply", () => {
fetchMock.mockRestore();
});
it("emits heartbeat logs with connection metadata", async () => {
vi.useFakeTimers();
const logPath = `/tmp/warelay-heartbeat-${crypto.randomUUID()}.log`;
setLoggerOverride({ level: "trace", file: logPath });
const runtime = {
log: vi.fn(),
error: vi.fn(),
exit: vi.fn(),
};
const controller = new AbortController();
const listenerFactory = vi.fn(async () => {
const onClose = new Promise<void>(() => {
// never resolves; abort will short-circuit
});
return { close: vi.fn(), onClose };
});
const run = monitorWebProvider(
false,
listenerFactory,
true,
async () => ({ text: "ok" }),
runtime as never,
controller.signal,
{
heartbeatSeconds: 1,
reconnect: { initialMs: 5, maxMs: 5, maxAttempts: 1, factor: 1.1 },
},
);
await vi.advanceTimersByTimeAsync(1_000);
controller.abort();
await vi.runAllTimersAsync();
await run.catch(() => {});
const content = await fs.readFile(logPath, "utf-8");
expect(content).toContain('"module":"web-heartbeat"');
expect(content).toMatch(/connectionId/);
expect(content).toMatch(/messagesHandled/);
});
it("logs outbound replies to file", async () => {
const logPath = `/tmp/warelay-log-test-${crypto.randomUUID()}.log`;
setLoggerOverride({ level: "trace", file: logPath });

View File

@@ -1,33 +1,126 @@
import { randomUUID } from "node:crypto";
import { getReplyFromConfig } from "../auto-reply/reply.js";
import { waitForever } from "../cli/wait.js";
import { loadConfig } from "../config/config.js";
import { loadConfig, type WarelayConfig } from "../config/config.js";
import { danger, isVerbose, logVerbose, success } from "../globals.js";
import { logInfo } from "../logger.js";
import { getChildLogger } from "../logging.js";
import { defaultRuntime, type RuntimeEnv } from "../runtime.js";
import { monitorWebInbox } from "./inbound.js";
import { loadWebMedia } from "./media.js";
import { getWebAuthAgeMs, newConnectionId } from "./session.js";
const DEFAULT_WEB_MEDIA_BYTES = 5 * 1024 * 1024;
const DEFAULT_HEARTBEAT_SECONDS = 60;
const DEFAULT_RECONNECT_POLICY: ReconnectPolicy = {
initialMs: 2_000,
maxMs: 30_000,
factor: 1.8,
jitter: 0.25,
maxAttempts: 12,
};
type ReconnectPolicy = {
initialMs: number;
maxMs: number;
factor: number;
jitter: number;
maxAttempts: number;
};
export type WebMonitorTuning = {
reconnect?: Partial<ReconnectPolicy>;
heartbeatSeconds?: number;
sleep?: (ms: number, signal?: AbortSignal) => Promise<void>;
};
const formatDuration = (ms: number) =>
ms >= 1000 ? `${(ms / 1000).toFixed(2)}s` : `${ms}ms`;
const clamp = (val: number, min: number, max: number) =>
Math.max(min, Math.min(max, val));
function resolveHeartbeatSeconds(
cfg: WarelayConfig,
tuning?: WebMonitorTuning,
): number {
const candidate = tuning?.heartbeatSeconds ?? cfg.web?.heartbeatSeconds;
if (typeof candidate === "number" && candidate > 0) return candidate;
return DEFAULT_HEARTBEAT_SECONDS;
}
function resolveReconnectPolicy(
cfg: WarelayConfig,
tuning?: WebMonitorTuning,
): ReconnectPolicy {
const merged = {
...DEFAULT_RECONNECT_POLICY,
...(cfg.web?.reconnect ?? {}),
...(tuning?.reconnect ?? {}),
} as ReconnectPolicy;
// Keep the values sane to avoid runaway retries.
merged.initialMs = Math.max(250, merged.initialMs);
merged.maxMs = Math.max(merged.initialMs, merged.maxMs);
merged.factor = clamp(merged.factor, 1.1, 10);
merged.jitter = clamp(merged.jitter, 0, 1);
merged.maxAttempts = Math.max(0, Math.floor(merged.maxAttempts));
return merged;
}
function computeBackoff(policy: ReconnectPolicy, attempt: number) {
// attempt is 1-based.
const base = policy.initialMs * policy.factor ** (attempt - 1);
const jitter = base * policy.jitter * Math.random();
return Math.min(policy.maxMs, Math.round(base + jitter));
}
function sleepWithAbort(ms: number, abortSignal?: AbortSignal) {
if (ms <= 0) return Promise.resolve();
return new Promise<void>((resolve, reject) => {
const timer = setTimeout(() => {
cleanup();
resolve();
}, ms);
const onAbort = () => {
cleanup();
reject(new Error("aborted"));
};
const cleanup = () => {
clearTimeout(timer);
abortSignal?.removeEventListener("abort", onAbort);
};
if (abortSignal) {
abortSignal.addEventListener("abort", onAbort, { once: true });
}
});
}
export async function monitorWebProvider(
verbose: boolean,
listenerFactory = monitorWebInbox,
listenerFactory: typeof monitorWebInbox | undefined = monitorWebInbox,
keepAlive = true,
replyResolver: typeof getReplyFromConfig = getReplyFromConfig,
replyResolver: typeof getReplyFromConfig | undefined = getReplyFromConfig,
runtime: RuntimeEnv = defaultRuntime,
abortSignal?: AbortSignal,
tuning: WebMonitorTuning = {},
) {
const replyLogger = getChildLogger({ module: "web-auto-reply" });
const runId = randomUUID();
const replyLogger = getChildLogger({ module: "web-auto-reply", runId });
const heartbeatLogger = getChildLogger({ module: "web-heartbeat", runId });
const cfg = loadConfig();
const configuredMaxMb = cfg.inbound?.reply?.mediaMaxMb;
const maxMediaBytes =
typeof configuredMaxMb === "number" && configuredMaxMb > 0
? configuredMaxMb * 1024 * 1024
: DEFAULT_WEB_MEDIA_BYTES;
const heartbeatSeconds = resolveHeartbeatSeconds(cfg, tuning);
const reconnectPolicy = resolveReconnectPolicy(cfg, tuning);
const sleep = tuning.sleep ?? ((ms: number, signal?: AbortSignal) => sleepWithAbort(ms, signal ?? abortSignal));
const stopRequested = () => abortSignal?.aborted === true;
const abortPromise =
abortSignal &&
@@ -37,22 +130,49 @@ export async function monitorWebProvider(
}),
);
const sleep = (ms: number) =>
new Promise<void>((resolve) => setTimeout(resolve, ms));
let sigintStop = false;
const handleSigint = () => {
sigintStop = true;
};
process.once("SIGINT", handleSigint);
let reconnectAttempts = 0;
while (true) {
if (stopRequested()) break;
const listener = await listenerFactory({
const connectionId = newConnectionId();
const startedAt = Date.now();
let heartbeat: NodeJS.Timeout | null = null;
let lastMessageAt: number | null = null;
let handledMessages = 0;
const listener = await (listenerFactory ?? monitorWebInbox)({
verbose,
onMessage: async (msg) => {
handledMessages += 1;
lastMessageAt = Date.now();
const ts = msg.timestamp
? new Date(msg.timestamp).toISOString()
: new Date().toISOString();
const correlationId = msg.id ?? randomUUID();
replyLogger.info(
{
connectionId,
correlationId,
from: msg.from,
to: msg.to,
body: msg.body,
mediaType: msg.mediaType ?? null,
mediaPath: msg.mediaPath ?? null,
},
"inbound web message",
);
console.log(`\n[${ts}] ${msg.from} -> ${msg.to}: ${msg.body}`);
const replyStarted = Date.now();
const replyResult = await replyResolver(
const replyResult = await (replyResolver ?? getReplyFromConfig)(
{
Body: msg.body,
From: msg.from,
@@ -137,6 +257,8 @@ export async function monitorWebProvider(
);
replyLogger.info(
{
connectionId,
correlationId,
to: msg.from,
from: msg.to,
text: index === 0 ? (replyResult.text ?? null) : null,
@@ -182,6 +304,8 @@ export async function monitorWebProvider(
}
replyLogger.info(
{
connectionId,
correlationId,
to: msg.from,
from: msg.to,
text: replyResult.text ?? null,
@@ -200,28 +324,54 @@ export async function monitorWebProvider(
},
});
const closeListener = async () => {
if (heartbeat) clearInterval(heartbeat);
try {
await listener.close();
} catch (err) {
logVerbose(`Socket close failed: ${String(err)}`);
}
};
if (keepAlive) {
heartbeat = setInterval(() => {
const authAgeMs = getWebAuthAgeMs();
heartbeatLogger.info(
{
connectionId,
reconnectAttempts,
messagesHandled: handledMessages,
lastMessageAt,
authAgeMs,
uptimeMs: Date.now() - startedAt,
},
"web relay heartbeat",
);
}, heartbeatSeconds * 1000);
}
logInfo(
"📡 Listening for personal WhatsApp Web inbound messages. Leave this running; Ctrl+C to stop.",
runtime,
);
let stop = false;
process.on("SIGINT", () => {
stop = true;
void listener.close().finally(() => {
logInfo("👋 Web monitor stopped", runtime);
runtime.exit(0);
});
});
if (!keepAlive) return;
if (!keepAlive) {
await closeListener();
return;
}
const reason = await Promise.race([
listener.onClose ?? waitForever(),
abortPromise ?? waitForever(),
]);
if (stopRequested() || stop || reason === "aborted") {
await listener.close();
const uptimeMs = Date.now() - startedAt;
if (uptimeMs > heartbeatSeconds * 1000) {
reconnectAttempts = 0; // Healthy stretch; reset the backoff.
}
if (stopRequested() || sigintStop || reason === "aborted") {
await closeListener();
break;
}
@@ -241,17 +391,39 @@ export async function monitorWebProvider(
"WhatsApp session logged out. Run `warelay login --provider web` to relink.",
),
);
await closeListener();
break;
}
reconnectAttempts += 1;
if (
reconnectPolicy.maxAttempts > 0 &&
reconnectAttempts >= reconnectPolicy.maxAttempts
) {
runtime.error(
danger(
`WhatsApp Web connection closed (status ${status}). Reached max retries (${reconnectPolicy.maxAttempts}); exiting so you can relink.`,
),
);
await closeListener();
break;
}
const delay = computeBackoff(reconnectPolicy, reconnectAttempts);
runtime.error(
danger(
`WhatsApp Web connection closed (status ${status}). Reconnecting in 2s`,
`WhatsApp Web connection closed (status ${status}). Retry ${reconnectAttempts}/${reconnectPolicy.maxAttempts || "∞"} in ${formatDuration(delay)}`,
),
);
await listener.close();
await sleep(2_000);
await closeListener();
try {
await sleep(delay, abortSignal);
} catch {
break;
}
}
process.removeListener("SIGINT", handleSigint);
}
export { DEFAULT_WEB_MEDIA_BYTES };

View File

@@ -20,6 +20,7 @@ vi.mock("./session.js", () => {
import { loginWeb } from "./login.js";
import type { waitForWaConnection } from "./session.js";
const { createWaSocket } = await import("./session.js");
describe("web login", () => {

View File

@@ -1,11 +1,12 @@
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
import { resetLogger, setLoggerOverride } from "../logging.js";
import { vi } from "vitest";
vi.mock("../media/store.js", () => ({
saveMediaBuffer: vi
.fn()
.mockResolvedValue({ id: "mid", path: "/tmp/mid", size: 1, contentType: "image/jpeg" }),
saveMediaBuffer: vi.fn().mockResolvedValue({
id: "mid",
path: "/tmp/mid",
size: 1,
contentType: "image/jpeg",
}),
}));
vi.mock("./session.js", () => {
@@ -28,15 +29,16 @@ vi.mock("./session.js", () => {
};
});
import { monitorWebInbox } from "./inbound.js";
const { createWaSocket } = await import("./session.js");
const getSock = () => (createWaSocket as unknown as () => Promise<ReturnType<typeof mockSock>>)();
const _getSock = () =>
(createWaSocket as unknown as () => Promise<ReturnType<typeof mockSock>>)();
import crypto from "node:crypto";
import fsSync from "node:fs";
import os from "node:os";
import path from "node:path";
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
import { afterEach, beforeEach, describe, expect, it } from "vitest";
import { resetLogger, setLoggerOverride } from "../logging.js";
import { monitorWebInbox } from "./inbound.js";

View File

@@ -18,6 +18,7 @@ vi.mock("./session.js", () => {
});
import { sendMessageWeb } from "./outbound.js";
const { createWaSocket } = await import("./session.js");
describe("web outbound", () => {

View File

@@ -1,7 +1,10 @@
import { randomUUID } from "node:crypto";
import type { AnyMessageContent } from "@whiskeysockets/baileys";
import { logVerbose } from "../globals.js";
import { logInfo } from "../logger.js";
import { getChildLogger } from "../logging.js";
import { toWhatsappJid } from "../utils.js";
import { loadWebMedia } from "./media.js";
import { createWaSocket, waitForWaConnection } from "./session.js";
@@ -11,9 +14,16 @@ export async function sendMessageWeb(
body: string,
options: { verbose: boolean; mediaUrl?: string },
): Promise<{ messageId: string; toJid: string }> {
const correlationId = randomUUID();
const sock = await createWaSocket(false, options.verbose);
const logger = getChildLogger({
module: "web-outbound",
correlationId,
to,
});
try {
logInfo("🔌 Connecting to WhatsApp Web…");
logger.info("connecting to whatsapp web");
await waitForWaConnection(sock);
// waitForWaConnection sets up listeners and error handling; keep the presence update safe.
const jid = toWhatsappJid(to);
@@ -34,11 +44,16 @@ export async function sendMessageWeb(
logInfo(
`📤 Sending via web session -> ${jid}${options.mediaUrl ? " (media)" : ""}`,
);
logger.info(
{ jid, hasMedia: Boolean(options.mediaUrl) },
"sending message",
);
const result = await sock.sendMessage(jid, payload);
const messageId = result?.key?.id ?? "unknown";
logInfo(
`✅ Sent via web session. Message ID: ${messageId} -> ${jid}${options.mediaUrl ? " (media)" : ""}`,
);
logger.info({ jid, messageId }, "sent message");
return { messageId, toJid: jid };
} finally {
try {

View File

@@ -1,14 +1,12 @@
import { EventEmitter } from "node:events";
import fsSync from "node:fs";
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
import {
baileys,
getLastSocket,
resetBaileysMocks,
resetLoadConfigMock,
} from "./test-helpers.js";
import { EventEmitter } from "node:events";
import fsSync from "node:fs";
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
import { resetLogger, setLoggerOverride } from "../logging.js";
import {
createWaSocket,

View File

@@ -1,3 +1,4 @@
import { randomUUID } from "node:crypto";
import fsSync from "node:fs";
import fs from "node:fs/promises";
import os from "node:os";
@@ -165,6 +166,24 @@ function readWebSelfId() {
}
}
/**
* Return the age (in milliseconds) of the cached WhatsApp web auth state, or null when missing.
* Helpful for heartbeats/observability to spot stale credentials.
*/
export function getWebAuthAgeMs(): number | null {
const credsPath = path.join(WA_WEB_AUTH_DIR, "creds.json");
try {
const stats = fsSync.statSync(credsPath);
return Date.now() - stats.mtimeMs;
} catch {
return null;
}
}
export function newConnectionId() {
return randomUUID();
}
export function logWebSelfId(
runtime: RuntimeEnv = defaultRuntime,
includeProviderPrefix = false,