web: extract reconnect helpers and add tests
This commit is contained in:
@@ -61,7 +61,10 @@ describe("web auto-reply", () => {
|
||||
closeResolvers[0]?.();
|
||||
const waitForSecondCall = async () => {
|
||||
const started = Date.now();
|
||||
while (listenerFactory.mock.calls.length < 2 && Date.now() - started < 200) {
|
||||
while (
|
||||
listenerFactory.mock.calls.length < 2 &&
|
||||
Date.now() - started < 200
|
||||
) {
|
||||
await new Promise((resolve) => setTimeout(resolve, 10));
|
||||
}
|
||||
};
|
||||
|
||||
@@ -1,33 +1,23 @@
|
||||
import { randomUUID } from "node:crypto";
|
||||
|
||||
import { getReplyFromConfig } from "../auto-reply/reply.js";
|
||||
import { waitForever } from "../cli/wait.js";
|
||||
import { loadConfig, type WarelayConfig } from "../config/config.js";
|
||||
import { loadConfig } from "../config/config.js";
|
||||
import { danger, isVerbose, logVerbose, success } from "../globals.js";
|
||||
import { logInfo } from "../logger.js";
|
||||
import { getChildLogger } from "../logging.js";
|
||||
import { defaultRuntime, type RuntimeEnv } from "../runtime.js";
|
||||
import { monitorWebInbox } from "./inbound.js";
|
||||
import { loadWebMedia } from "./media.js";
|
||||
import { getWebAuthAgeMs, newConnectionId } from "./session.js";
|
||||
import {
|
||||
computeBackoff,
|
||||
newConnectionId,
|
||||
type ReconnectPolicy,
|
||||
resolveHeartbeatSeconds,
|
||||
resolveReconnectPolicy,
|
||||
sleepWithAbort,
|
||||
} from "./reconnect.js";
|
||||
import { getWebAuthAgeMs } from "./session.js";
|
||||
|
||||
const DEFAULT_WEB_MEDIA_BYTES = 5 * 1024 * 1024;
|
||||
const DEFAULT_HEARTBEAT_SECONDS = 60;
|
||||
const DEFAULT_RECONNECT_POLICY: ReconnectPolicy = {
|
||||
initialMs: 2_000,
|
||||
maxMs: 30_000,
|
||||
factor: 1.8,
|
||||
jitter: 0.25,
|
||||
maxAttempts: 12,
|
||||
};
|
||||
|
||||
type ReconnectPolicy = {
|
||||
initialMs: number;
|
||||
maxMs: number;
|
||||
factor: number;
|
||||
jitter: number;
|
||||
maxAttempts: number;
|
||||
};
|
||||
|
||||
export type WebMonitorTuning = {
|
||||
reconnect?: Partial<ReconnectPolicy>;
|
||||
@@ -38,68 +28,6 @@ export type WebMonitorTuning = {
|
||||
const formatDuration = (ms: number) =>
|
||||
ms >= 1000 ? `${(ms / 1000).toFixed(2)}s` : `${ms}ms`;
|
||||
|
||||
const clamp = (val: number, min: number, max: number) =>
|
||||
Math.max(min, Math.min(max, val));
|
||||
|
||||
function resolveHeartbeatSeconds(
|
||||
cfg: WarelayConfig,
|
||||
tuning?: WebMonitorTuning,
|
||||
): number {
|
||||
const candidate = tuning?.heartbeatSeconds ?? cfg.web?.heartbeatSeconds;
|
||||
if (typeof candidate === "number" && candidate > 0) return candidate;
|
||||
return DEFAULT_HEARTBEAT_SECONDS;
|
||||
}
|
||||
|
||||
function resolveReconnectPolicy(
|
||||
cfg: WarelayConfig,
|
||||
tuning?: WebMonitorTuning,
|
||||
): ReconnectPolicy {
|
||||
const merged = {
|
||||
...DEFAULT_RECONNECT_POLICY,
|
||||
...(cfg.web?.reconnect ?? {}),
|
||||
...(tuning?.reconnect ?? {}),
|
||||
} as ReconnectPolicy;
|
||||
|
||||
// Keep the values sane to avoid runaway retries.
|
||||
merged.initialMs = Math.max(250, merged.initialMs);
|
||||
merged.maxMs = Math.max(merged.initialMs, merged.maxMs);
|
||||
merged.factor = clamp(merged.factor, 1.1, 10);
|
||||
merged.jitter = clamp(merged.jitter, 0, 1);
|
||||
merged.maxAttempts = Math.max(0, Math.floor(merged.maxAttempts));
|
||||
return merged;
|
||||
}
|
||||
|
||||
function computeBackoff(policy: ReconnectPolicy, attempt: number) {
|
||||
// attempt is 1-based.
|
||||
const base = policy.initialMs * policy.factor ** (attempt - 1);
|
||||
const jitter = base * policy.jitter * Math.random();
|
||||
return Math.min(policy.maxMs, Math.round(base + jitter));
|
||||
}
|
||||
|
||||
function sleepWithAbort(ms: number, abortSignal?: AbortSignal) {
|
||||
if (ms <= 0) return Promise.resolve();
|
||||
return new Promise<void>((resolve, reject) => {
|
||||
const timer = setTimeout(() => {
|
||||
cleanup();
|
||||
resolve();
|
||||
}, ms);
|
||||
|
||||
const onAbort = () => {
|
||||
cleanup();
|
||||
reject(new Error("aborted"));
|
||||
};
|
||||
|
||||
const cleanup = () => {
|
||||
clearTimeout(timer);
|
||||
abortSignal?.removeEventListener("abort", onAbort);
|
||||
};
|
||||
|
||||
if (abortSignal) {
|
||||
abortSignal.addEventListener("abort", onAbort, { once: true });
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
export async function monitorWebProvider(
|
||||
verbose: boolean,
|
||||
listenerFactory: typeof monitorWebInbox | undefined = monitorWebInbox,
|
||||
@@ -109,7 +37,7 @@ export async function monitorWebProvider(
|
||||
abortSignal?: AbortSignal,
|
||||
tuning: WebMonitorTuning = {},
|
||||
) {
|
||||
const runId = randomUUID();
|
||||
const runId = newConnectionId();
|
||||
const replyLogger = getChildLogger({ module: "web-auto-reply", runId });
|
||||
const heartbeatLogger = getChildLogger({ module: "web-heartbeat", runId });
|
||||
const cfg = loadConfig();
|
||||
@@ -118,9 +46,15 @@ export async function monitorWebProvider(
|
||||
typeof configuredMaxMb === "number" && configuredMaxMb > 0
|
||||
? configuredMaxMb * 1024 * 1024
|
||||
: DEFAULT_WEB_MEDIA_BYTES;
|
||||
const heartbeatSeconds = resolveHeartbeatSeconds(cfg, tuning);
|
||||
const reconnectPolicy = resolveReconnectPolicy(cfg, tuning);
|
||||
const sleep = tuning.sleep ?? ((ms: number, signal?: AbortSignal) => sleepWithAbort(ms, signal ?? abortSignal));
|
||||
const heartbeatSeconds = resolveHeartbeatSeconds(
|
||||
cfg,
|
||||
tuning.heartbeatSeconds,
|
||||
);
|
||||
const reconnectPolicy = resolveReconnectPolicy(cfg, tuning.reconnect);
|
||||
const sleep =
|
||||
tuning.sleep ??
|
||||
((ms: number, signal?: AbortSignal) =>
|
||||
sleepWithAbort(ms, signal ?? abortSignal));
|
||||
const stopRequested = () => abortSignal?.aborted === true;
|
||||
const abortPromise =
|
||||
abortSignal &&
|
||||
@@ -155,7 +89,7 @@ export async function monitorWebProvider(
|
||||
const ts = msg.timestamp
|
||||
? new Date(msg.timestamp).toISOString()
|
||||
: new Date().toISOString();
|
||||
const correlationId = msg.id ?? randomUUID();
|
||||
const correlationId = msg.id ?? newConnectionId();
|
||||
replyLogger.info(
|
||||
{
|
||||
connectionId,
|
||||
|
||||
52
src/web/reconnect.test.ts
Normal file
52
src/web/reconnect.test.ts
Normal file
@@ -0,0 +1,52 @@
|
||||
import { describe, expect, it } from "vitest";
|
||||
|
||||
import type { WarelayConfig } from "../config/config.js";
|
||||
import {
|
||||
computeBackoff,
|
||||
DEFAULT_HEARTBEAT_SECONDS,
|
||||
DEFAULT_RECONNECT_POLICY,
|
||||
resolveHeartbeatSeconds,
|
||||
resolveReconnectPolicy,
|
||||
sleepWithAbort,
|
||||
} from "./reconnect.js";
|
||||
|
||||
describe("web reconnect helpers", () => {
|
||||
const cfg: WarelayConfig = {};
|
||||
|
||||
it("resolves sane reconnect defaults with clamps", () => {
|
||||
const policy = resolveReconnectPolicy(cfg, {
|
||||
initialMs: 100,
|
||||
maxMs: 5,
|
||||
factor: 20,
|
||||
jitter: 2,
|
||||
maxAttempts: -1,
|
||||
});
|
||||
|
||||
expect(policy.initialMs).toBe(250); // clamped to minimum
|
||||
expect(policy.maxMs).toBeGreaterThanOrEqual(policy.initialMs);
|
||||
expect(policy.factor).toBeLessThanOrEqual(10);
|
||||
expect(policy.jitter).toBeLessThanOrEqual(1);
|
||||
expect(policy.maxAttempts).toBeGreaterThanOrEqual(0);
|
||||
});
|
||||
|
||||
it("computes increasing backoff with jitter", () => {
|
||||
const policy = { ...DEFAULT_RECONNECT_POLICY, jitter: 0 };
|
||||
const first = computeBackoff(policy, 1);
|
||||
const second = computeBackoff(policy, 2);
|
||||
expect(first).toBe(policy.initialMs);
|
||||
expect(second).toBeGreaterThan(first);
|
||||
expect(second).toBeLessThanOrEqual(policy.maxMs);
|
||||
});
|
||||
|
||||
it("returns heartbeat default when unset", () => {
|
||||
expect(resolveHeartbeatSeconds(cfg)).toBe(DEFAULT_HEARTBEAT_SECONDS);
|
||||
expect(resolveHeartbeatSeconds(cfg, 5)).toBe(5);
|
||||
});
|
||||
|
||||
it("sleepWithAbort rejects on abort", async () => {
|
||||
const controller = new AbortController();
|
||||
const promise = sleepWithAbort(50, controller.signal);
|
||||
controller.abort();
|
||||
await expect(promise).rejects.toThrow("aborted");
|
||||
});
|
||||
});
|
||||
84
src/web/reconnect.ts
Normal file
84
src/web/reconnect.ts
Normal file
@@ -0,0 +1,84 @@
|
||||
import { randomUUID } from "node:crypto";
|
||||
|
||||
import type { WarelayConfig } from "../config/config.js";
|
||||
|
||||
export type ReconnectPolicy = {
|
||||
initialMs: number;
|
||||
maxMs: number;
|
||||
factor: number;
|
||||
jitter: number;
|
||||
maxAttempts: number;
|
||||
};
|
||||
|
||||
export const DEFAULT_HEARTBEAT_SECONDS = 60;
|
||||
export const DEFAULT_RECONNECT_POLICY: ReconnectPolicy = {
|
||||
initialMs: 2_000,
|
||||
maxMs: 30_000,
|
||||
factor: 1.8,
|
||||
jitter: 0.25,
|
||||
maxAttempts: 12,
|
||||
};
|
||||
|
||||
const clamp = (val: number, min: number, max: number) =>
|
||||
Math.max(min, Math.min(max, val));
|
||||
|
||||
export function resolveHeartbeatSeconds(
|
||||
cfg: WarelayConfig,
|
||||
overrideSeconds?: number,
|
||||
): number {
|
||||
const candidate = overrideSeconds ?? cfg.web?.heartbeatSeconds;
|
||||
if (typeof candidate === "number" && candidate > 0) return candidate;
|
||||
return DEFAULT_HEARTBEAT_SECONDS;
|
||||
}
|
||||
|
||||
export function resolveReconnectPolicy(
|
||||
cfg: WarelayConfig,
|
||||
overrides?: Partial<ReconnectPolicy>,
|
||||
): ReconnectPolicy {
|
||||
const merged = {
|
||||
...DEFAULT_RECONNECT_POLICY,
|
||||
...(cfg.web?.reconnect ?? {}),
|
||||
...(overrides ?? {}),
|
||||
} as ReconnectPolicy;
|
||||
|
||||
merged.initialMs = Math.max(250, merged.initialMs);
|
||||
merged.maxMs = Math.max(merged.initialMs, merged.maxMs);
|
||||
merged.factor = clamp(merged.factor, 1.1, 10);
|
||||
merged.jitter = clamp(merged.jitter, 0, 1);
|
||||
merged.maxAttempts = Math.max(0, Math.floor(merged.maxAttempts));
|
||||
return merged;
|
||||
}
|
||||
|
||||
export function computeBackoff(policy: ReconnectPolicy, attempt: number) {
|
||||
const base = policy.initialMs * policy.factor ** Math.max(attempt - 1, 0);
|
||||
const jitter = base * policy.jitter * Math.random();
|
||||
return Math.min(policy.maxMs, Math.round(base + jitter));
|
||||
}
|
||||
|
||||
export function sleepWithAbort(ms: number, abortSignal?: AbortSignal) {
|
||||
if (ms <= 0) return Promise.resolve();
|
||||
return new Promise<void>((resolve, reject) => {
|
||||
const timer = setTimeout(() => {
|
||||
cleanup();
|
||||
resolve();
|
||||
}, ms);
|
||||
|
||||
const onAbort = () => {
|
||||
cleanup();
|
||||
reject(new Error("aborted"));
|
||||
};
|
||||
|
||||
const cleanup = () => {
|
||||
clearTimeout(timer);
|
||||
abortSignal?.removeEventListener("abort", onAbort);
|
||||
};
|
||||
|
||||
if (abortSignal) {
|
||||
abortSignal.addEventListener("abort", onAbort, { once: true });
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
export function newConnectionId() {
|
||||
return randomUUID();
|
||||
}
|
||||
@@ -1,18 +1,17 @@
|
||||
import { EventEmitter } from "node:events";
|
||||
import fsSync from "node:fs";
|
||||
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
|
||||
import { resetLogger, setLoggerOverride } from "../logging.js";
|
||||
import {
|
||||
baileys,
|
||||
getLastSocket,
|
||||
resetBaileysMocks,
|
||||
resetLoadConfigMock,
|
||||
} from "./test-helpers.js";
|
||||
import { resetLogger, setLoggerOverride } from "../logging.js";
|
||||
import {
|
||||
createWaSocket,
|
||||
logWebSelfId,
|
||||
waitForWaConnection,
|
||||
} from "./session.js";
|
||||
|
||||
const { createWaSocket, logWebSelfId, waitForWaConnection } = await import(
|
||||
"./session.js"
|
||||
);
|
||||
|
||||
describe("web session", () => {
|
||||
beforeEach(() => {
|
||||
|
||||
Reference in New Issue
Block a user