From dda017df2330a332bf82936a03862b691e2e0445 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Tue, 25 Nov 2025 18:09:57 +0100 Subject: [PATCH] Web relay: auto-reconnect Baileys and test --- CHANGELOG.md | 1 + src/provider-web.test.ts | 60 ++++++++ src/provider-web.ts | 302 +++++++++++++++++++++++++-------------- 3 files changed, 253 insertions(+), 110 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index d67c62382..e5d12ddf5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,7 @@ ### Pending - Auto-replies now send a WhatsApp fallback message when a command/Claude run hits the timeout, including up to 800 chars of partial stdout so the user still sees progress. - Added tests covering the new timeout fallback behavior and partial-output truncation. +- Web relay auto-reconnects after Baileys/WebSocket drops (with log-out detection) and exposes close events for monitoring; added tests for close propagation and reconnect loop. ## 0.1.3 — 2025-11-25 diff --git a/src/provider-web.test.ts b/src/provider-web.test.ts index 8f73db051..0d197bbd8 100644 --- a/src/provider-web.test.ts +++ b/src/provider-web.test.ts @@ -236,6 +236,23 @@ describe("provider-web", () => { await listener.close(); }); + it("monitorWebInbox resolves onClose when the socket closes", async () => { + const listener = await monitorWebInbox({ + verbose: false, + onMessage: vi.fn(), + }); + const sock = getLastSocket(); + const reasonPromise = listener.onClose; + sock.ev.emit("connection.update", { + connection: "close", + lastDisconnect: { error: { output: { statusCode: 500 } } }, + }); + await expect(reasonPromise).resolves.toEqual( + expect.objectContaining({ status: 500, isLoggedOut: false }), + ); + await listener.close(); + }); + it("monitorWebInbox logs inbound bodies to file", async () => { const logPath = path.join( os.tmpdir(), @@ -300,6 +317,49 @@ describe("provider-web", () => { await listener.close(); }); + it("monitorWebProvider reconnects after a connection close", async () => { + vi.useFakeTimers(); + const closeResolvers: Array<() => void> = []; + const listenerFactory = vi.fn(async () => { + let resolve!: () => void; + const onClose = new Promise((res) => { + resolve = res; + closeResolvers.push(res); + }); + return { close: vi.fn(), onClose }; + }); + const runtime = { + log: vi.fn(), + error: vi.fn(), + exit: vi.fn(), + }; + const controller = new AbortController(); + const run = monitorWebProvider( + false, + listenerFactory, + true, + async () => ({ text: "ok" }), + runtime as never, + controller.signal, + ); + + await Promise.resolve(); + expect(listenerFactory).toHaveBeenCalledTimes(1); + + closeResolvers[0]?.(); + await Promise.resolve(); + await vi.runOnlyPendingTimersAsync(); + expect(listenerFactory).toHaveBeenCalledTimes(2); + expect(runtime.error).toHaveBeenCalledWith( + expect.stringContaining("Reconnecting"), + ); + + controller.abort(); + closeResolvers[1]?.(); + await vi.runAllTimersAsync(); + await run; + }); + it("monitorWebProvider falls back to text when media send fails", async () => { const sendMedia = vi.fn().mockRejectedValue(new Error("boom")); const reply = vi.fn().mockResolvedValue(undefined); diff --git a/src/provider-web.ts b/src/provider-web.ts index 52cab6d5c..4bb945c46 100644 --- a/src/provider-web.ts +++ b/src/provider-web.ts @@ -229,6 +229,12 @@ export function webAuthExists() { .catch(() => false); } +type WebListenerCloseReason = { + status?: number; + isLoggedOut: boolean; + error?: unknown; +}; + export type WebInboundMessage = { id?: string; from: string; @@ -255,6 +261,10 @@ export async function monitorWebInbox(options: { const inboundLogger = getChildLogger({ module: "web-inbound" }); const sock = await createWaSocket(false, options.verbose); await waitForWaConnection(sock); + let onCloseResolve: ((reason: WebListenerCloseReason) => void) | null = null; + const onClose = new Promise((resolve) => { + onCloseResolve = resolve; + }); try { // Advertise that the relay is online right after connecting. await sock.sendPresenceUpdate("available"); @@ -373,6 +383,20 @@ export async function monitorWebInbox(options: { } }); + sock.ev.on( + "connection.update", + (update: Partial) => { + if (update.connection === "close") { + const status = getStatusCode(update.lastDisconnect?.error); + onCloseResolve?.({ + status, + isLoggedOut: status === DisconnectReason.loggedOut, + error: update.lastDisconnect?.error, + }); + } + }, + ); + return { close: async () => { try { @@ -381,6 +405,7 @@ export async function monitorWebInbox(options: { logVerbose(`Socket close failed: ${String(err)}`); } }, + onClose, }; } @@ -390,139 +415,196 @@ export async function monitorWebProvider( keepAlive = true, replyResolver: typeof getReplyFromConfig = getReplyFromConfig, runtime: RuntimeEnv = defaultRuntime, + abortSignal?: AbortSignal, ) { const replyLogger = getChildLogger({ module: "web-auto-reply" }); - // Listen for inbound personal WhatsApp Web messages and auto-reply if configured. - const listener = await listenerFactory({ - verbose, - onMessage: async (msg) => { - const ts = msg.timestamp - ? new Date(msg.timestamp).toISOString() - : new Date().toISOString(); - console.log(`\n[${ts}] ${msg.from} -> ${msg.to}: ${msg.body}`); + const stopRequested = () => abortSignal?.aborted === true; + const abortPromise = + abortSignal && + new Promise<"aborted">((resolve) => + abortSignal.addEventListener("abort", () => resolve("aborted"), { + once: true, + }), + ); - const replyStarted = Date.now(); - const replyResult = await replyResolver( - { - Body: msg.body, - From: msg.from, - To: msg.to, - MessageSid: msg.id, - MediaPath: msg.mediaPath, - MediaUrl: msg.mediaUrl, - MediaType: msg.mediaType, - }, - { - onReplyStart: msg.sendComposing, - }, - ); - if (!replyResult || (!replyResult.text && !replyResult.mediaUrl)) { - logVerbose("Skipping auto-reply: no text/media returned from resolver"); - return; - } - try { - if (replyResult.mediaUrl) { - logVerbose(`Web auto-reply media detected: ${replyResult.mediaUrl}`); - try { - const media = await loadWebMedia(replyResult.mediaUrl); - if (isVerbose()) { - logVerbose( - `Web auto-reply media size: ${(media.buffer.length / (1024 * 1024)).toFixed(2)}MB`, - ); - } - await msg.sendMedia({ - image: media.buffer, - caption: replyResult.text || undefined, - mimetype: media.contentType, - }); - logInfo( - `✅ Sent web media reply to ${msg.from} (${(media.buffer.length / (1024 * 1024)).toFixed(2)}MB)`, - runtime, + const sleep = (ms: number) => + new Promise((resolve) => setTimeout(resolve, ms)); + + while (true) { + if (stopRequested()) break; + + const listener = await listenerFactory({ + verbose, + onMessage: async (msg) => { + const ts = msg.timestamp + ? new Date(msg.timestamp).toISOString() + : new Date().toISOString(); + console.log(`\n[${ts}] ${msg.from} -> ${msg.to}: ${msg.body}`); + + const replyStarted = Date.now(); + const replyResult = await replyResolver( + { + Body: msg.body, + From: msg.from, + To: msg.to, + MessageSid: msg.id, + MediaPath: msg.mediaPath, + MediaUrl: msg.mediaUrl, + MediaType: msg.mediaType, + }, + { + onReplyStart: msg.sendComposing, + }, + ); + if (!replyResult || (!replyResult.text && !replyResult.mediaUrl)) { + logVerbose("Skipping auto-reply: no text/media returned from resolver"); + return; + } + try { + if (replyResult.mediaUrl) { + logVerbose( + `Web auto-reply media detected: ${replyResult.mediaUrl}`, ); - replyLogger.info( - { - to: msg.from, - from: msg.to, - text: replyResult.text ?? null, - mediaUrl: replyResult.mediaUrl, - mediaSizeBytes: media.buffer.length, - durationMs: Date.now() - replyStarted, - }, - "auto-reply sent (media)", - ); - } catch (err) { - console.error( - danger(`Failed sending web media to ${msg.from}: ${String(err)}`), - ); - if (replyResult.text) { - await msg.reply(replyResult.text); + try { + const media = await loadWebMedia(replyResult.mediaUrl); + if (isVerbose()) { + logVerbose( + `Web auto-reply media size: ${(media.buffer.length / (1024 * 1024)).toFixed(2)}MB`, + ); + } + await msg.sendMedia({ + image: media.buffer, + caption: replyResult.text || undefined, + mimetype: media.contentType, + }); logInfo( - `⚠️ Media skipped; sent text-only to ${msg.from}`, + `✅ Sent web media reply to ${msg.from} (${(media.buffer.length / (1024 * 1024)).toFixed(2)}MB)`, runtime, ); replyLogger.info( { to: msg.from, from: msg.to, - text: replyResult.text, + text: replyResult.text ?? null, mediaUrl: replyResult.mediaUrl, + mediaSizeBytes: media.buffer.length, durationMs: Date.now() - replyStarted, - mediaSendFailed: true, }, - "auto-reply sent (text fallback)", + "auto-reply sent (media)", ); + } catch (err) { + console.error( + danger( + `Failed sending web media to ${msg.from}: ${String(err)}`, + ), + ); + if (replyResult.text) { + await msg.reply(replyResult.text); + logInfo( + `⚠️ Media skipped; sent text-only to ${msg.from}`, + runtime, + ); + replyLogger.info( + { + to: msg.from, + from: msg.to, + text: replyResult.text, + mediaUrl: replyResult.mediaUrl, + durationMs: Date.now() - replyStarted, + mediaSendFailed: true, + }, + "auto-reply sent (text fallback)", + ); + } } + } else { + await msg.reply(replyResult.text ?? ""); } - } else { - await msg.reply(replyResult.text ?? ""); - } - const durationMs = Date.now() - replyStarted; - if (isVerbose()) { - console.log( - success( - `↩️ Auto-replied to ${msg.from} (web, ${replyResult.text?.length ?? 0} chars${replyResult.mediaUrl ? ", media" : ""}, ${formatDuration(durationMs)})`, - ), + const durationMs = Date.now() - replyStarted; + if (isVerbose()) { + console.log( + success( + `↩️ Auto-replied to ${msg.from} (web, ${replyResult.text?.length ?? 0} chars${replyResult.mediaUrl ? ", media" : ""}, ${formatDuration(durationMs)})`, + ), + ); + } else { + console.log( + success( + `↩️ ${replyResult.text ?? ""}${replyResult.mediaUrl ? " (media)" : ""}`, + ), + ); + } + replyLogger.info( + { + to: msg.from, + from: msg.to, + text: replyResult.text ?? null, + mediaUrl: replyResult.mediaUrl, + durationMs, + }, + "auto-reply sent", ); - } else { - console.log( - success( - `↩️ ${replyResult.text ?? ""}${replyResult.mediaUrl ? " (media)" : ""}`, + } catch (err) { + console.error( + danger( + `Failed sending web auto-reply to ${msg.from}: ${String(err)}`, ), ); } - replyLogger.info( - { - to: msg.from, - from: msg.to, - text: replyResult.text ?? null, - mediaUrl: replyResult.mediaUrl, - durationMs, - }, - "auto-reply sent", - ); - } catch (err) { - console.error( - danger( - `Failed sending web auto-reply to ${msg.from}: ${String(err)}`, - ), - ); - } - }, - }); - - logInfo( - "📡 Listening for personal WhatsApp Web inbound messages. Leave this running; Ctrl+C to stop.", - runtime, - ); - process.on("SIGINT", () => { - void listener.close().finally(() => { - logInfo("👋 Web monitor stopped", runtime); - runtime.exit(0); + }, }); - }); - if (keepAlive) { - await waitForever(); + logInfo( + "📡 Listening for personal WhatsApp Web inbound messages. Leave this running; Ctrl+C to stop.", + runtime, + ); + let stop = false; + process.on("SIGINT", () => { + stop = true; + void listener.close().finally(() => { + logInfo("👋 Web monitor stopped", runtime); + runtime.exit(0); + }); + }); + + if (!keepAlive) return; + + const reason = await Promise.race([ + listener.onClose ?? waitForever(), + abortPromise ?? waitForever(), + ]); + + if (stopRequested() || stop || reason === "aborted") { + await listener.close(); + break; + } + + const status = + (typeof reason === "object" && reason && "status" in reason + ? (reason as WebListenerCloseReason).status + : undefined) ?? "unknown"; + const loggedOut = + typeof reason === "object" && + reason && + "isLoggedOut" in reason && + (reason as WebListenerCloseReason).isLoggedOut; + + if (loggedOut) { + runtime.error( + danger( + "WhatsApp session logged out. Run `warelay login --provider web` to relink.", + ), + ); + break; + } + + runtime.error( + danger( + `WhatsApp Web connection closed (status ${status}). Reconnecting in 2s…`, + ), + ); + await listener.close(); + await sleep(2_000); } }