From baf20af17fe428ad9cabfe6a0bcfd79e11b6dbb6 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Wed, 26 Nov 2025 02:34:43 +0100 Subject: [PATCH] web: add heartbeat and bounded reconnect tuning --- docs/refactor/web-provider-split.md | 24 +++ src/auto-reply/command-reply.test.ts | 7 +- src/auto-reply/command-reply.ts | 30 +++- src/auto-reply/reply.ts | 14 +- src/auto-reply/transcription.test.ts | 6 +- src/cli/program.ts | 86 ++++++++++- src/cli/relay.e2e.test.ts | 3 +- src/config/config.ts | 28 ++++ src/index.core.test.ts | 11 +- src/provider-web.ts | 1 + src/web/auto-reply.test.ts | 106 ++++++++++++- src/web/auto-reply.ts | 216 ++++++++++++++++++++++++--- src/web/login.test.ts | 1 + src/web/monitor-inbox.test.ts | 20 +-- src/web/outbound.test.ts | 1 + src/web/outbound.ts | 15 ++ src/web/session.test.ts | 8 +- src/web/session.ts | 19 +++ tsconfig.json | 8 +- 19 files changed, 541 insertions(+), 63 deletions(-) create mode 100644 docs/refactor/web-provider-split.md diff --git a/docs/refactor/web-provider-split.md b/docs/refactor/web-provider-split.md new file mode 100644 index 000000000..5f84230dc --- /dev/null +++ b/docs/refactor/web-provider-split.md @@ -0,0 +1,24 @@ +# Web Provider Refactor (Nov 26, 2025) + +Context: `src/provider-web.ts` was a 900+ line ball of mud mixing session management, outbound sends, inbound handling, auto-replies, and media helpers. We split it into focused modules under `src/web/` and adjusted tests/CLI behavior. + +## What changed +- New modules: `session.ts`, `login.ts`, `outbound.ts`, `inbound.ts`, `auto-reply.ts`, `media.ts`; barrel remains `src/provider-web.ts`. +- CLI adds `warelay logout` to clear `~/.warelay/credentials`; tested in `src/web/logout.test.ts`. +- Relay now **exits instead of falling back to Twilio** when the web provider fails (even in `--provider auto`), so outages are visible. +- Tests split accordingly; all suites green. +- Structured logging + heartbeats: web relay now emits structured logs with `runId`/`connectionId` plus periodic heartbeats (default every 60s) that include auth age and message counts. +- Bounded reconnects: web relay uses capped exponential backoff (default 2s→30s, max 12 attempts). CLI knobs `--web-retries`, `--web-retry-initial`, `--web-retry-max`, `--web-heartbeat` and config `web.reconnect`/`web.heartbeatSeconds` tune the behavior. +- Backoff reset after healthy uptime; logged-out state still exits immediately. + +## How to use +- Link: `warelay login --provider web` +- Logout: `warelay logout` (deletes `~/.warelay/credentials`) +- Run relay web-only: `warelay relay --provider web --verbose` + +## Follow-ups worth doing +- Document the new module boundaries in README/docs; add a one-liner explaining the no-fallback behavior. +- Add bounded backoff/jitter in `monitorWebProvider` reconnect loop with clearer exit codes. ✅ +- Tighten config validation (`mediaMaxMb`, etc.) on load. ✅ (schema now includes `web.*` knobs) +- Emit structured logs for reconnect/close reasons to help ops triage (status, isLoggedOut). ✅ +- Add quick troubleshooting snippets (how to read logs, restart relay, rotate creds). diff --git a/src/auto-reply/command-reply.test.ts b/src/auto-reply/command-reply.test.ts index 0713184d1..f318be229 100644 --- a/src/auto-reply/command-reply.test.ts +++ b/src/auto-reply/command-reply.test.ts @@ -36,7 +36,10 @@ function makeRunner(result: RunnerResult, capture: ReplyPayload[] = []) { } const enqueueImmediate = vi.fn( - async (task: () => Promise, opts?: { onWait?: (ms: number, ahead: number) => void }) => { + async ( + task: () => Promise, + opts?: { onWait?: (ms: number, ahead: number) => void }, + ) => { opts?.onWait?.(25, 2); return task(); }, @@ -49,7 +52,7 @@ describe("summarizeClaudeMetadata", () => { num_turns: 3, total_cost_usd: 0.012345, usage: { server_tool_use: { a: 1, b: 2 } }, - modelUsage: { "claude-3": 2, "haiku": 1 }, + modelUsage: { "claude-3": 2, haiku: 1 }, }); expect(meta).toContain("duration=1200ms"); expect(meta).toContain("turns=3"); diff --git a/src/auto-reply/command-reply.ts b/src/auto-reply/command-reply.ts index 75ede80db..d5f2e586b 100644 --- a/src/auto-reply/command-reply.ts +++ b/src/auto-reply/command-reply.ts @@ -6,7 +6,7 @@ import { isVerbose, logVerbose } from "../globals.js"; import { logError } from "../logger.js"; import { splitMediaFromOutput } from "../media/parse.js"; import { enqueueCommand } from "../process/command-queue.js"; -import { runCommandWithTimeout } from "../process/exec.js"; +import type { runCommandWithTimeout } from "../process/exec.js"; import { CLAUDE_BIN, CLAUDE_IDENTITY_PREFIX, @@ -116,6 +116,10 @@ export async function runCommandReply( enqueue = enqueueCommand, } = params; + if (!reply.command?.length) { + throw new Error("reply.command is required for mode=command"); + } + let argv = reply.command.map((part) => applyTemplate(part, templatingCtx)); const templatePrefix = reply.template && (!sendSystemOnce || isFirstTurnInSession || !systemSent) @@ -132,7 +136,8 @@ export async function runCommandReply( path.basename(argv[0]) === CLAUDE_BIN ) { const hasOutputFormat = argv.some( - (part) => part === "--output-format" || part.startsWith("--output-format="), + (part) => + part === "--output-format" || part.startsWith("--output-format="), ); const insertBeforeBody = Math.max(argv.length - 1, 0); if (!hasOutputFormat) { @@ -211,12 +216,17 @@ export async function runCommandReply( logVerbose(`Command auto-reply stderr: ${stderr.trim()}`); } let parsed: ClaudeJsonParseResult | undefined; - if (trimmed && (reply.claudeOutputFormat === "json" || isClaudeInvocation)) { + if ( + trimmed && + (reply.claudeOutputFormat === "json" || isClaudeInvocation) + ) { parsed = parseClaudeJson(trimmed); if (parsed?.parsed && isVerbose()) { const summary = summarizeClaudeMetadata(parsed.parsed); if (summary) logVerbose(`Claude JSON meta: ${summary}`); - logVerbose(`Claude JSON raw: ${JSON.stringify(parsed.parsed, null, 2)}`); + logVerbose( + `Claude JSON raw: ${JSON.stringify(parsed.parsed, null, 2)}`, + ); } if (parsed?.text) { logVerbose( @@ -256,7 +266,9 @@ export async function runCommandReply( exitCode: code, signal, killed, - claudeMeta: parsed ? summarizeClaudeMetadata(parsed.parsed) : undefined, + claudeMeta: parsed + ? summarizeClaudeMetadata(parsed.parsed) + : undefined, }, }; } @@ -273,7 +285,9 @@ export async function runCommandReply( exitCode: code, signal, killed, - claudeMeta: parsed ? summarizeClaudeMetadata(parsed.parsed) : undefined, + claudeMeta: parsed + ? summarizeClaudeMetadata(parsed.parsed) + : undefined, }, }; } @@ -344,7 +358,9 @@ export async function runCommandReply( `${timeoutSeconds}s${reply.cwd ? ` (cwd: ${reply.cwd})` : ""}. Try a shorter prompt or split the request.`; const partial = errorObj.stdout?.trim(); const partialSnippet = - partial && partial.length > 800 ? `${partial.slice(0, 800)}...` : partial; + partial && partial.length > 800 + ? `${partial.slice(0, 800)}...` + : partial; const text = partialSnippet ? `${baseMsg}\n\nPartial output before timeout:\n${partialSnippet}` : baseMsg; diff --git a/src/auto-reply/reply.ts b/src/auto-reply/reply.ts index d204d987c..b32fa0415 100644 --- a/src/auto-reply/reply.ts +++ b/src/auto-reply/reply.ts @@ -17,14 +17,15 @@ import { defaultRuntime, type RuntimeEnv } from "../runtime.js"; import type { TwilioRequester } from "../twilio/types.js"; import { sendTypingIndicator } from "../twilio/typing.js"; import { runCommandReply } from "./command-reply.js"; -import { transcribeInboundAudio, isAudio } from "./transcription.js"; import { applyTemplate, type MsgContext, type TemplateContext, } from "./templating.js"; +import { isAudio, transcribeInboundAudio } from "./transcription.js"; import type { GetReplyOptions, ReplyPayload } from "./types.js"; -export type { ReplyPayload, GetReplyOptions } from "./types.js"; + +export type { GetReplyOptions, ReplyPayload } from "./types.js"; export async function getReplyFromConfig( ctx: MsgContext, @@ -238,11 +239,16 @@ export async function getReplyFromConfig( return result; } - if (reply.mode === "command" && reply.command?.length) { + if (reply && reply.mode === "command" && reply.command?.length) { await onReplyStart(); + const commandReply = { + ...reply, + command: reply.command, + mode: "command" as const, + }; try { const { payload, meta } = await runCommandReply({ - reply, + reply: commandReply, templatingCtx, sendSystemOnce, isNewSession, diff --git a/src/auto-reply/transcription.test.ts b/src/auto-reply/transcription.test.ts index 7d0e12702..4cdd94543 100644 --- a/src/auto-reply/transcription.test.ts +++ b/src/auto-reply/transcription.test.ts @@ -57,7 +57,11 @@ describe("transcribeInboundAudio", () => { }); it("returns undefined when no transcription command", async () => { - const res = await transcribeInboundAudio({ inbound: {} } as never, {} as never, runtime as never); + const res = await transcribeInboundAudio( + { inbound: {} } as never, + {} as never, + runtime as never, + ); expect(res).toBeUndefined(); }); }); diff --git a/src/cli/program.ts b/src/cli/program.ts index 90581b951..40ddb6ef5 100644 --- a/src/cli/program.ts +++ b/src/cli/program.ts @@ -10,6 +10,7 @@ import { logoutWeb, monitorWebProvider, pickProvider, + type WebMonitorTuning, } from "../provider-web.js"; import { defaultRuntime } from "../runtime.js"; import type { Provider } from "../utils.js"; @@ -178,6 +179,19 @@ Examples: "Initial lookback window for twilio mode", "5", ) + .option( + "--web-heartbeat ", + "Heartbeat interval for web relay health logs (seconds)", + ) + .option( + "--web-retries ", + "Max consecutive web reconnect attempts before exit (0 = unlimited)", + ) + .option( + "--web-retry-initial ", + "Initial reconnect backoff for web relay (ms)", + ) + .option("--web-retry-max ", "Max reconnect backoff for web relay (ms)") .option("--verbose", "Verbose logging", false) .addHelpText( "after", @@ -198,6 +212,22 @@ Examples: } const intervalSeconds = Number.parseInt(opts.interval, 10); const lookbackMinutes = Number.parseInt(opts.lookback, 10); + const webHeartbeat = + opts.webHeartbeat !== undefined + ? Number.parseInt(String(opts.webHeartbeat), 10) + : undefined; + const webRetries = + opts.webRetries !== undefined + ? Number.parseInt(String(opts.webRetries), 10) + : undefined; + const webRetryInitial = + opts.webRetryInitial !== undefined + ? Number.parseInt(String(opts.webRetryInitial), 10) + : undefined; + const webRetryMax = + opts.webRetryMax !== undefined + ? Number.parseInt(String(opts.webRetryMax), 10) + : undefined; if (Number.isNaN(intervalSeconds) || intervalSeconds <= 0) { defaultRuntime.error("Interval must be a positive integer"); defaultRuntime.exit(1); @@ -206,13 +236,67 @@ Examples: defaultRuntime.error("Lookback must be >= 0 minutes"); defaultRuntime.exit(1); } + if ( + webHeartbeat !== undefined && + (Number.isNaN(webHeartbeat) || webHeartbeat <= 0) + ) { + defaultRuntime.error("--web-heartbeat must be a positive integer"); + defaultRuntime.exit(1); + } + if ( + webRetries !== undefined && + (Number.isNaN(webRetries) || webRetries < 0) + ) { + defaultRuntime.error("--web-retries must be >= 0"); + defaultRuntime.exit(1); + } + if ( + webRetryInitial !== undefined && + (Number.isNaN(webRetryInitial) || webRetryInitial <= 0) + ) { + defaultRuntime.error("--web-retry-initial must be a positive integer"); + defaultRuntime.exit(1); + } + if ( + webRetryMax !== undefined && + (Number.isNaN(webRetryMax) || webRetryMax <= 0) + ) { + defaultRuntime.error("--web-retry-max must be a positive integer"); + defaultRuntime.exit(1); + } + if ( + webRetryMax !== undefined && + webRetryInitial !== undefined && + webRetryMax < webRetryInitial + ) { + defaultRuntime.error("--web-retry-max must be >= --web-retry-initial"); + defaultRuntime.exit(1); + } + + const webTuning: WebMonitorTuning = {}; + if (webHeartbeat !== undefined) webTuning.heartbeatSeconds = webHeartbeat; + const reconnect: WebMonitorTuning["reconnect"] = {}; + if (webRetries !== undefined) reconnect.maxAttempts = webRetries; + if (webRetryInitial !== undefined) reconnect.initialMs = webRetryInitial; + if (webRetryMax !== undefined) reconnect.maxMs = webRetryMax; + if (Object.keys(reconnect).length > 0) { + webTuning.reconnect = reconnect; + } const provider = await pickProvider(providerPref as Provider | "auto"); if (provider === "web") { logWebSelfId(defaultRuntime, true); try { - await monitorWebProvider(Boolean(opts.verbose)); + await monitorWebProvider( + Boolean(opts.verbose), + undefined, + true, + undefined, + defaultRuntime, + undefined, + webTuning, + ); return; } catch (err) { defaultRuntime.error( diff --git a/src/cli/relay.e2e.test.ts b/src/cli/relay.e2e.test.ts index 07cd981c9..0ceb22472 100644 --- a/src/cli/relay.e2e.test.ts +++ b/src/cli/relay.e2e.test.ts @@ -38,7 +38,8 @@ describe("CLI relay command (e2e-ish)", () => { expect(pickProvider).toHaveBeenCalledWith("web"); expect(logWebSelfId).toHaveBeenCalledTimes(1); - expect(monitorWebProvider).toHaveBeenCalledWith(false); + expect(monitorWebProvider).toHaveBeenCalledTimes(1); + expect(monitorWebProvider.mock.calls[0][0]).toBe(false); expect(monitorTwilio).not.toHaveBeenCalled(); }); }); diff --git a/src/config/config.ts b/src/config/config.ts index ca40a5149..7d8d167cd 100644 --- a/src/config/config.ts +++ b/src/config/config.ts @@ -27,6 +27,19 @@ export type LoggingConfig = { file?: string; }; +export type WebReconnectConfig = { + initialMs?: number; + maxMs?: number; + factor?: number; + jitter?: number; + maxAttempts?: number; // 0 = unlimited +}; + +export type WebConfig = { + heartbeatSeconds?: number; + reconnect?: WebReconnectConfig; +}; + export type WarelayConfig = { logging?: LoggingConfig; inbound?: { @@ -51,6 +64,7 @@ export type WarelayConfig = { typingIntervalSeconds?: number; // how often to refresh typing indicator while command runs }; }; + web?: WebConfig; }; export const CONFIG_PATH = path.join(os.homedir(), ".warelay", "warelay.json"); @@ -129,6 +143,20 @@ const WarelaySchema = z.object({ reply: ReplySchema.optional(), }) .optional(), + web: z + .object({ + heartbeatSeconds: z.number().int().positive().optional(), + reconnect: z + .object({ + initialMs: z.number().positive().optional(), + maxMs: z.number().positive().optional(), + factor: z.number().positive().optional(), + jitter: z.number().min(0).max(1).optional(), + maxAttempts: z.number().int().min(0).optional(), + }) + .optional(), + }) + .optional(), }); export function loadConfig(): WarelayConfig { diff --git a/src/index.core.test.ts b/src/index.core.test.ts index 2a228c0fa..f33ed2513 100644 --- a/src/index.core.test.ts +++ b/src/index.core.test.ts @@ -1248,11 +1248,12 @@ describe("monitoring", () => { const replySpy = vi.fn(); const sendMediaSpy = vi.fn(); const listenerFactory = vi.fn( - async ( - opts: Parameters[1] extends undefined - ? never - : NonNullable[1]>, - ) => { + async (opts: { + verbose: boolean; + onMessage: ( + msg: import("./web/inbound.js").WebInboundMessage, + ) => Promise; + }) => { await opts.onMessage({ body: "hello", from: "+1", diff --git a/src/provider-web.ts b/src/provider-web.ts index 548dce40f..6015a5c63 100644 --- a/src/provider-web.ts +++ b/src/provider-web.ts @@ -3,6 +3,7 @@ export { DEFAULT_WEB_MEDIA_BYTES, monitorWebProvider, + type WebMonitorTuning, } from "./web/auto-reply.js"; export { extractMediaPlaceholder, diff --git a/src/web/auto-reply.test.ts b/src/web/auto-reply.test.ts index d2072e270..f4cdef069 100644 --- a/src/web/auto-reply.test.ts +++ b/src/web/auto-reply.test.ts @@ -25,8 +25,8 @@ describe("web auto-reply", () => { }); it("reconnects after a connection close", async () => { - vi.useFakeTimers(); const closeResolvers: Array<() => void> = []; + const sleep = vi.fn(async () => {}); const listenerFactory = vi.fn(async () => { let _resolve!: () => void; const onClose = new Promise((res) => { @@ -48,25 +48,78 @@ describe("web auto-reply", () => { async () => ({ text: "ok" }), runtime as never, controller.signal, + { + heartbeatSeconds: 1, + reconnect: { initialMs: 10, maxMs: 10, maxAttempts: 3, factor: 1.1 }, + sleep, + }, ); await Promise.resolve(); expect(listenerFactory).toHaveBeenCalledTimes(1); closeResolvers[0]?.(); - await Promise.resolve(); - await vi.runOnlyPendingTimersAsync(); + const waitForSecondCall = async () => { + const started = Date.now(); + while (listenerFactory.mock.calls.length < 2 && Date.now() - started < 200) { + await new Promise((resolve) => setTimeout(resolve, 10)); + } + }; + await waitForSecondCall(); expect(listenerFactory).toHaveBeenCalledTimes(2); expect(runtime.error).toHaveBeenCalledWith( - expect.stringContaining("Reconnecting"), + expect.stringContaining("Retry 1"), ); controller.abort(); closeResolvers[1]?.(); - await vi.runAllTimersAsync(); + await new Promise((resolve) => setTimeout(resolve, 5)); await run; }); + it("stops after hitting max reconnect attempts", async () => { + const closeResolvers: Array<() => void> = []; + const sleep = vi.fn(async () => {}); + const listenerFactory = vi.fn(async () => { + const onClose = new Promise((res) => closeResolvers.push(res)); + return { close: vi.fn(), onClose }; + }); + const runtime = { + log: vi.fn(), + error: vi.fn(), + exit: vi.fn(), + }; + + const run = monitorWebProvider( + false, + listenerFactory, + true, + async () => ({ text: "ok" }), + runtime as never, + undefined, + { + heartbeatSeconds: 1, + reconnect: { initialMs: 5, maxMs: 5, maxAttempts: 2, factor: 1.1 }, + sleep, + }, + ); + + await Promise.resolve(); + expect(listenerFactory).toHaveBeenCalledTimes(1); + + closeResolvers.shift()?.(); + await new Promise((resolve) => setTimeout(resolve, 15)); + expect(listenerFactory).toHaveBeenCalledTimes(2); + + closeResolvers.shift()?.(); + await new Promise((resolve) => setTimeout(resolve, 15)); + await run; + + expect(runtime.error).toHaveBeenCalledWith( + expect.stringContaining("Reached max retries"), + ); + }); + it("falls back to text when media send fails", async () => { const sendMedia = vi.fn().mockRejectedValue(new Error("boom")); const reply = vi.fn().mockResolvedValue(undefined); @@ -431,6 +484,49 @@ describe("web auto-reply", () => { fetchMock.mockRestore(); }); + it("emits heartbeat logs with connection metadata", async () => { + vi.useFakeTimers(); + const logPath = `/tmp/warelay-heartbeat-${crypto.randomUUID()}.log`; + setLoggerOverride({ level: "trace", file: logPath }); + + const runtime = { + log: vi.fn(), + error: vi.fn(), + exit: vi.fn(), + }; + + const controller = new AbortController(); + const listenerFactory = vi.fn(async () => { + const onClose = new Promise(() => { + // never resolves; abort will short-circuit + }); + return { close: vi.fn(), onClose }; + }); + + const run = monitorWebProvider( + false, + listenerFactory, + true, + async () => ({ text: "ok" }), + runtime as never, + controller.signal, + { + heartbeatSeconds: 1, + reconnect: { initialMs: 5, maxMs: 5, maxAttempts: 1, factor: 1.1 }, + }, + ); + + await vi.advanceTimersByTimeAsync(1_000); + controller.abort(); + await vi.runAllTimersAsync(); + await run.catch(() => {}); + + const content = await fs.readFile(logPath, "utf-8"); + expect(content).toContain('"module":"web-heartbeat"'); + expect(content).toMatch(/connectionId/); + expect(content).toMatch(/messagesHandled/); + }); + it("logs outbound replies to file", async () => { const logPath = `/tmp/warelay-log-test-${crypto.randomUUID()}.log`; setLoggerOverride({ level: "trace", file: logPath }); diff --git a/src/web/auto-reply.ts b/src/web/auto-reply.ts index d75f70998..7d875e58d 100644 --- a/src/web/auto-reply.ts +++ b/src/web/auto-reply.ts @@ -1,33 +1,126 @@ +import { randomUUID } from "node:crypto"; + import { getReplyFromConfig } from "../auto-reply/reply.js"; import { waitForever } from "../cli/wait.js"; -import { loadConfig } from "../config/config.js"; +import { loadConfig, type WarelayConfig } from "../config/config.js"; import { danger, isVerbose, logVerbose, success } from "../globals.js"; import { logInfo } from "../logger.js"; import { getChildLogger } from "../logging.js"; import { defaultRuntime, type RuntimeEnv } from "../runtime.js"; import { monitorWebInbox } from "./inbound.js"; import { loadWebMedia } from "./media.js"; +import { getWebAuthAgeMs, newConnectionId } from "./session.js"; const DEFAULT_WEB_MEDIA_BYTES = 5 * 1024 * 1024; +const DEFAULT_HEARTBEAT_SECONDS = 60; +const DEFAULT_RECONNECT_POLICY: ReconnectPolicy = { + initialMs: 2_000, + maxMs: 30_000, + factor: 1.8, + jitter: 0.25, + maxAttempts: 12, +}; + +type ReconnectPolicy = { + initialMs: number; + maxMs: number; + factor: number; + jitter: number; + maxAttempts: number; +}; + +export type WebMonitorTuning = { + reconnect?: Partial; + heartbeatSeconds?: number; + sleep?: (ms: number, signal?: AbortSignal) => Promise; +}; const formatDuration = (ms: number) => ms >= 1000 ? `${(ms / 1000).toFixed(2)}s` : `${ms}ms`; +const clamp = (val: number, min: number, max: number) => + Math.max(min, Math.min(max, val)); + +function resolveHeartbeatSeconds( + cfg: WarelayConfig, + tuning?: WebMonitorTuning, +): number { + const candidate = tuning?.heartbeatSeconds ?? cfg.web?.heartbeatSeconds; + if (typeof candidate === "number" && candidate > 0) return candidate; + return DEFAULT_HEARTBEAT_SECONDS; +} + +function resolveReconnectPolicy( + cfg: WarelayConfig, + tuning?: WebMonitorTuning, +): ReconnectPolicy { + const merged = { + ...DEFAULT_RECONNECT_POLICY, + ...(cfg.web?.reconnect ?? {}), + ...(tuning?.reconnect ?? {}), + } as ReconnectPolicy; + + // Keep the values sane to avoid runaway retries. + merged.initialMs = Math.max(250, merged.initialMs); + merged.maxMs = Math.max(merged.initialMs, merged.maxMs); + merged.factor = clamp(merged.factor, 1.1, 10); + merged.jitter = clamp(merged.jitter, 0, 1); + merged.maxAttempts = Math.max(0, Math.floor(merged.maxAttempts)); + return merged; +} + +function computeBackoff(policy: ReconnectPolicy, attempt: number) { + // attempt is 1-based. + const base = policy.initialMs * policy.factor ** (attempt - 1); + const jitter = base * policy.jitter * Math.random(); + return Math.min(policy.maxMs, Math.round(base + jitter)); +} + +function sleepWithAbort(ms: number, abortSignal?: AbortSignal) { + if (ms <= 0) return Promise.resolve(); + return new Promise((resolve, reject) => { + const timer = setTimeout(() => { + cleanup(); + resolve(); + }, ms); + + const onAbort = () => { + cleanup(); + reject(new Error("aborted")); + }; + + const cleanup = () => { + clearTimeout(timer); + abortSignal?.removeEventListener("abort", onAbort); + }; + + if (abortSignal) { + abortSignal.addEventListener("abort", onAbort, { once: true }); + } + }); +} + export async function monitorWebProvider( verbose: boolean, - listenerFactory = monitorWebInbox, + listenerFactory: typeof monitorWebInbox | undefined = monitorWebInbox, keepAlive = true, - replyResolver: typeof getReplyFromConfig = getReplyFromConfig, + replyResolver: typeof getReplyFromConfig | undefined = getReplyFromConfig, runtime: RuntimeEnv = defaultRuntime, abortSignal?: AbortSignal, + tuning: WebMonitorTuning = {}, ) { - const replyLogger = getChildLogger({ module: "web-auto-reply" }); + const runId = randomUUID(); + const replyLogger = getChildLogger({ module: "web-auto-reply", runId }); + const heartbeatLogger = getChildLogger({ module: "web-heartbeat", runId }); const cfg = loadConfig(); const configuredMaxMb = cfg.inbound?.reply?.mediaMaxMb; const maxMediaBytes = typeof configuredMaxMb === "number" && configuredMaxMb > 0 ? configuredMaxMb * 1024 * 1024 : DEFAULT_WEB_MEDIA_BYTES; + const heartbeatSeconds = resolveHeartbeatSeconds(cfg, tuning); + const reconnectPolicy = resolveReconnectPolicy(cfg, tuning); + const sleep = tuning.sleep ?? ((ms: number, signal?: AbortSignal) => sleepWithAbort(ms, signal ?? abortSignal)); const stopRequested = () => abortSignal?.aborted === true; const abortPromise = abortSignal && @@ -37,22 +130,49 @@ export async function monitorWebProvider( }), ); - const sleep = (ms: number) => - new Promise((resolve) => setTimeout(resolve, ms)); + let sigintStop = false; + const handleSigint = () => { + sigintStop = true; + }; + process.once("SIGINT", handleSigint); + + let reconnectAttempts = 0; while (true) { if (stopRequested()) break; - const listener = await listenerFactory({ + const connectionId = newConnectionId(); + const startedAt = Date.now(); + let heartbeat: NodeJS.Timeout | null = null; + let lastMessageAt: number | null = null; + let handledMessages = 0; + + const listener = await (listenerFactory ?? monitorWebInbox)({ verbose, onMessage: async (msg) => { + handledMessages += 1; + lastMessageAt = Date.now(); const ts = msg.timestamp ? new Date(msg.timestamp).toISOString() : new Date().toISOString(); + const correlationId = msg.id ?? randomUUID(); + replyLogger.info( + { + connectionId, + correlationId, + from: msg.from, + to: msg.to, + body: msg.body, + mediaType: msg.mediaType ?? null, + mediaPath: msg.mediaPath ?? null, + }, + "inbound web message", + ); + console.log(`\n[${ts}] ${msg.from} -> ${msg.to}: ${msg.body}`); const replyStarted = Date.now(); - const replyResult = await replyResolver( + const replyResult = await (replyResolver ?? getReplyFromConfig)( { Body: msg.body, From: msg.from, @@ -137,6 +257,8 @@ export async function monitorWebProvider( ); replyLogger.info( { + connectionId, + correlationId, to: msg.from, from: msg.to, text: index === 0 ? (replyResult.text ?? null) : null, @@ -182,6 +304,8 @@ export async function monitorWebProvider( } replyLogger.info( { + connectionId, + correlationId, to: msg.from, from: msg.to, text: replyResult.text ?? null, @@ -200,28 +324,54 @@ export async function monitorWebProvider( }, }); + const closeListener = async () => { + if (heartbeat) clearInterval(heartbeat); + try { + await listener.close(); + } catch (err) { + logVerbose(`Socket close failed: ${String(err)}`); + } + }; + + if (keepAlive) { + heartbeat = setInterval(() => { + const authAgeMs = getWebAuthAgeMs(); + heartbeatLogger.info( + { + connectionId, + reconnectAttempts, + messagesHandled: handledMessages, + lastMessageAt, + authAgeMs, + uptimeMs: Date.now() - startedAt, + }, + "web relay heartbeat", + ); + }, heartbeatSeconds * 1000); + } + logInfo( "📡 Listening for personal WhatsApp Web inbound messages. Leave this running; Ctrl+C to stop.", runtime, ); - let stop = false; - process.on("SIGINT", () => { - stop = true; - void listener.close().finally(() => { - logInfo("👋 Web monitor stopped", runtime); - runtime.exit(0); - }); - }); - if (!keepAlive) return; + if (!keepAlive) { + await closeListener(); + return; + } const reason = await Promise.race([ listener.onClose ?? waitForever(), abortPromise ?? waitForever(), ]); - if (stopRequested() || stop || reason === "aborted") { - await listener.close(); + const uptimeMs = Date.now() - startedAt; + if (uptimeMs > heartbeatSeconds * 1000) { + reconnectAttempts = 0; // Healthy stretch; reset the backoff. + } + + if (stopRequested() || sigintStop || reason === "aborted") { + await closeListener(); break; } @@ -241,17 +391,39 @@ export async function monitorWebProvider( "WhatsApp session logged out. Run `warelay login --provider web` to relink.", ), ); + await closeListener(); break; } + reconnectAttempts += 1; + if ( + reconnectPolicy.maxAttempts > 0 && + reconnectAttempts >= reconnectPolicy.maxAttempts + ) { + runtime.error( + danger( + `WhatsApp Web connection closed (status ${status}). Reached max retries (${reconnectPolicy.maxAttempts}); exiting so you can relink.`, + ), + ); + await closeListener(); + break; + } + + const delay = computeBackoff(reconnectPolicy, reconnectAttempts); runtime.error( danger( - `WhatsApp Web connection closed (status ${status}). Reconnecting in 2s…`, + `WhatsApp Web connection closed (status ${status}). Retry ${reconnectAttempts}/${reconnectPolicy.maxAttempts || "∞"} in ${formatDuration(delay)}…`, ), ); - await listener.close(); - await sleep(2_000); + await closeListener(); + try { + await sleep(delay, abortSignal); + } catch { + break; + } } + + process.removeListener("SIGINT", handleSigint); } export { DEFAULT_WEB_MEDIA_BYTES }; diff --git a/src/web/login.test.ts b/src/web/login.test.ts index 54ff7692e..76c37aa0b 100644 --- a/src/web/login.test.ts +++ b/src/web/login.test.ts @@ -20,6 +20,7 @@ vi.mock("./session.js", () => { import { loginWeb } from "./login.js"; import type { waitForWaConnection } from "./session.js"; + const { createWaSocket } = await import("./session.js"); describe("web login", () => { diff --git a/src/web/monitor-inbox.test.ts b/src/web/monitor-inbox.test.ts index 32733555d..99f857189 100644 --- a/src/web/monitor-inbox.test.ts +++ b/src/web/monitor-inbox.test.ts @@ -1,11 +1,12 @@ -import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; - -import { resetLogger, setLoggerOverride } from "../logging.js"; +import { vi } from "vitest"; vi.mock("../media/store.js", () => ({ - saveMediaBuffer: vi - .fn() - .mockResolvedValue({ id: "mid", path: "/tmp/mid", size: 1, contentType: "image/jpeg" }), + saveMediaBuffer: vi.fn().mockResolvedValue({ + id: "mid", + path: "/tmp/mid", + size: 1, + contentType: "image/jpeg", + }), })); vi.mock("./session.js", () => { @@ -28,15 +29,16 @@ vi.mock("./session.js", () => { }; }); -import { monitorWebInbox } from "./inbound.js"; const { createWaSocket } = await import("./session.js"); -const getSock = () => (createWaSocket as unknown as () => Promise>)(); +const _getSock = () => + (createWaSocket as unknown as () => Promise>)(); + import crypto from "node:crypto"; import fsSync from "node:fs"; import os from "node:os"; import path from "node:path"; -import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; +import { afterEach, beforeEach, describe, expect, it } from "vitest"; import { resetLogger, setLoggerOverride } from "../logging.js"; import { monitorWebInbox } from "./inbound.js"; diff --git a/src/web/outbound.test.ts b/src/web/outbound.test.ts index c2fcdf7c2..2e4abecd2 100644 --- a/src/web/outbound.test.ts +++ b/src/web/outbound.test.ts @@ -18,6 +18,7 @@ vi.mock("./session.js", () => { }); import { sendMessageWeb } from "./outbound.js"; + const { createWaSocket } = await import("./session.js"); describe("web outbound", () => { diff --git a/src/web/outbound.ts b/src/web/outbound.ts index 913f02444..ac17ea5b9 100644 --- a/src/web/outbound.ts +++ b/src/web/outbound.ts @@ -1,7 +1,10 @@ +import { randomUUID } from "node:crypto"; + import type { AnyMessageContent } from "@whiskeysockets/baileys"; import { logVerbose } from "../globals.js"; import { logInfo } from "../logger.js"; +import { getChildLogger } from "../logging.js"; import { toWhatsappJid } from "../utils.js"; import { loadWebMedia } from "./media.js"; import { createWaSocket, waitForWaConnection } from "./session.js"; @@ -11,9 +14,16 @@ export async function sendMessageWeb( body: string, options: { verbose: boolean; mediaUrl?: string }, ): Promise<{ messageId: string; toJid: string }> { + const correlationId = randomUUID(); const sock = await createWaSocket(false, options.verbose); + const logger = getChildLogger({ + module: "web-outbound", + correlationId, + to, + }); try { logInfo("🔌 Connecting to WhatsApp Web…"); + logger.info("connecting to whatsapp web"); await waitForWaConnection(sock); // waitForWaConnection sets up listeners and error handling; keep the presence update safe. const jid = toWhatsappJid(to); @@ -34,11 +44,16 @@ export async function sendMessageWeb( logInfo( `📤 Sending via web session -> ${jid}${options.mediaUrl ? " (media)" : ""}`, ); + logger.info( + { jid, hasMedia: Boolean(options.mediaUrl) }, + "sending message", + ); const result = await sock.sendMessage(jid, payload); const messageId = result?.key?.id ?? "unknown"; logInfo( `✅ Sent via web session. Message ID: ${messageId} -> ${jid}${options.mediaUrl ? " (media)" : ""}`, ); + logger.info({ jid, messageId }, "sent message"); return { messageId, toJid: jid }; } finally { try { diff --git a/src/web/session.test.ts b/src/web/session.test.ts index 257b58c11..cd3bf1e2e 100644 --- a/src/web/session.test.ts +++ b/src/web/session.test.ts @@ -1,14 +1,12 @@ +import { EventEmitter } from "node:events"; +import fsSync from "node:fs"; +import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; import { baileys, getLastSocket, resetBaileysMocks, resetLoadConfigMock, } from "./test-helpers.js"; -import { EventEmitter } from "node:events"; -import fsSync from "node:fs"; - -import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; - import { resetLogger, setLoggerOverride } from "../logging.js"; import { createWaSocket, diff --git a/src/web/session.ts b/src/web/session.ts index ed9b30559..f889dd241 100644 --- a/src/web/session.ts +++ b/src/web/session.ts @@ -1,3 +1,4 @@ +import { randomUUID } from "node:crypto"; import fsSync from "node:fs"; import fs from "node:fs/promises"; import os from "node:os"; @@ -165,6 +166,24 @@ function readWebSelfId() { } } +/** + * Return the age (in milliseconds) of the cached WhatsApp web auth state, or null when missing. + * Helpful for heartbeats/observability to spot stale credentials. + */ +export function getWebAuthAgeMs(): number | null { + const credsPath = path.join(WA_WEB_AUTH_DIR, "creds.json"); + try { + const stats = fsSync.statSync(credsPath); + return Date.now() - stats.mtimeMs; + } catch { + return null; + } +} + +export function newConnectionId() { + return randomUUID(); +} + export function logWebSelfId( runtime: RuntimeEnv = defaultRuntime, includeProviderPrefix = false, diff --git a/tsconfig.json b/tsconfig.json index 8e9536d7c..8f82c611d 100644 --- a/tsconfig.json +++ b/tsconfig.json @@ -14,5 +14,11 @@ "allowSyntheticDefaultImports": true }, "include": ["src/**/*"], - "exclude": ["node_modules", "dist", "src/**/*.test.ts", "src/**/*.test.tsx"] + "exclude": [ + "node_modules", + "dist", + "src/**/*.test.ts", + "src/**/*.test.tsx", + "src/**/test-helpers.ts" + ] }