From e86b507da75d29be235aa99e59f7405ec0509dd0 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Tue, 2 Dec 2025 06:31:01 +0000 Subject: [PATCH] Add IPC to prevent Signal session corruption from concurrent connections When the relay is running, `warelay send` and `warelay heartbeat` now communicate via Unix socket IPC (~/.warelay/relay.sock) to send messages through the relay's existing WhatsApp connection. Previously, these commands created new Baileys sockets that wrote to the same auth state files, corrupting the Signal session ratchet and causing the relay's subsequent sends to fail silently. Changes: - Add src/web/ipc.ts with Unix socket server/client - Relay starts IPC server after connecting - send command tries IPC first, falls back to direct - heartbeat uses sendWithIpcFallback helper - inbound.ts exposes sendMessage on listener object - Messages sent via IPC are added to echo detection set --- CHANGELOG.md | 2 + src/commands/send.ts | 32 +++++- src/web/auto-reply.ts | 53 +++++++++- src/web/inbound.ts | 45 +++++++++ src/web/ipc.ts | 225 ++++++++++++++++++++++++++++++++++++++++++ 5 files changed, 354 insertions(+), 3 deletions(-) create mode 100644 src/web/ipc.ts diff --git a/CHANGELOG.md b/CHANGELOG.md index c66c5f794..73f6caaa1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,8 +7,10 @@ - **Response prefix on heartbeat replies:** Fixed `responsePrefix` (e.g., `🦞`) not being applied to heartbeat alert messages. The prefix was only applied in the regular message handler, not in `runReplyHeartbeat`. - **User-visible error messages:** Command failures (non-zero exit, killed processes, exceptions) now return user-friendly error messages to WhatsApp instead of silently failing with empty responses. - **Test session isolation:** Fixed tests corrupting production `sessions.json` by mocking session persistence in all test files. +- **Signal session corruption prevention:** Added IPC mechanism so `warelay send` and `warelay heartbeat` reuse the running relay's WhatsApp connection instead of creating new Baileys sockets. Previously, using these commands while the relay was running could corrupt the Signal session ratchet (both connections wrote to the same auth state), causing the relay's subsequent sends to fail silently. ### Changes +- **IPC server for relay:** The web relay now starts a Unix socket server at `~/.warelay/relay.sock`. Commands like `warelay send --provider web` automatically connect via IPC when the relay is running, falling back to direct connection otherwise. - **Auto-recovery from stuck WhatsApp sessions:** Added watchdog timer that detects when WhatsApp event emitter stops firing (e.g., after Bad MAC decryption errors) and automatically restarts the connection after 30 minutes of no message activity. Heartbeat logging now includes `minutesSinceLastMessage` and warns when >30 minutes without messages. The 30-minute timeout is intentionally longer than typical `heartbeatMinutes` configs to avoid false positives. - **Early allowFrom filtering:** Unauthorized senders are now blocked in `inbound.ts` BEFORE encryption/decryption attempts, preventing Bad MAC errors from corrupting session state. Previously, messages from unauthorized senders would trigger decryption failures that could silently kill the event emitter. - **Test isolation improvements:** Mock `loadConfig()` in all test files to prevent loading real user config (with emojis/prefixes) during tests. Default test config now has no prefixes/timestamps for cleaner assertions. diff --git a/src/commands/send.ts b/src/commands/send.ts index 193c407cf..45b770f74 100644 --- a/src/commands/send.ts +++ b/src/commands/send.ts @@ -1,7 +1,8 @@ import type { CliDeps } from "../cli/deps.js"; -import { info } from "../globals.js"; +import { info, success } from "../globals.js"; import type { RuntimeEnv } from "../runtime.js"; import type { Provider } from "../utils.js"; +import { sendViaIpc } from "../web/ipc.js"; export async function sendCommand( opts: { @@ -39,6 +40,34 @@ export async function sendCommand( if (waitSeconds !== 0) { runtime.log(info("Wait/poll are Twilio-only; ignored for provider=web.")); } + + // Try to send via IPC to running relay first (avoids Signal session corruption) + const ipcResult = await sendViaIpc(opts.to, opts.message, opts.media); + if (ipcResult) { + if (ipcResult.success) { + runtime.log(success(`✅ Sent via relay IPC. Message ID: ${ipcResult.messageId}`)); + if (opts.json) { + runtime.log( + JSON.stringify( + { + provider: "web", + via: "ipc", + to: opts.to, + messageId: ipcResult.messageId, + mediaUrl: opts.media ?? null, + }, + null, + 2, + ), + ); + } + return; + } + // IPC failed but relay is running - warn and fall back + runtime.log(info(`IPC send failed (${ipcResult.error}), falling back to direct connection`)); + } + + // Fall back to direct connection (creates new Baileys socket) const res = await deps .sendMessageWeb(opts.to, opts.message, { verbose: false, @@ -53,6 +82,7 @@ export async function sendCommand( JSON.stringify( { provider: "web", + via: "direct", to: opts.to, messageId: res.messageId, mediaUrl: opts.media ?? null, diff --git a/src/web/auto-reply.ts b/src/web/auto-reply.ts index 5fdcbbe10..9020a21fd 100644 --- a/src/web/auto-reply.ts +++ b/src/web/auto-reply.ts @@ -9,12 +9,13 @@ import { resolveStorePath, saveSessionStore, } from "../config/sessions.js"; -import { danger, isVerbose, logVerbose, success } from "../globals.js"; +import { danger, info, isVerbose, logVerbose, success } from "../globals.js"; import { logInfo } from "../logger.js"; import { getChildLogger } from "../logging.js"; import { defaultRuntime, type RuntimeEnv } from "../runtime.js"; import { normalizeE164 } from "../utils.js"; import { monitorWebInbox } from "./inbound.js"; +import { sendViaIpc, startIpcServer, stopIpcServer } from "./ipc.js"; import { loadWebMedia } from "./media.js"; import { sendMessageWeb } from "./outbound.js"; import { getQueueSize } from "../process/command-queue.js"; @@ -28,6 +29,26 @@ import { } from "./reconnect.js"; import { getWebAuthAgeMs } from "./session.js"; +/** + * Send a message via IPC if relay is running, otherwise fall back to direct. + * This avoids Signal session corruption from multiple Baileys connections. + */ +async function sendWithIpcFallback( + to: string, + message: string, + opts: { verbose: boolean; mediaUrl?: string }, +): Promise<{ messageId: string; toJid: string }> { + const ipcResult = await sendViaIpc(to, message, opts.mediaUrl); + if (ipcResult?.success && ipcResult.messageId) { + if (opts.verbose) { + console.log(info(`Sent via relay IPC (avoiding session corruption)`)); + } + return { messageId: ipcResult.messageId, toJid: `${to}@s.whatsapp.net` }; + } + // Fall back to direct send + return sendMessageWeb(to, message, opts); +} + const DEFAULT_WEB_MEDIA_BYTES = 5 * 1024 * 1024; type WebInboundMsg = Parameters< typeof monitorWebInbox @@ -95,7 +116,7 @@ export async function runWebHeartbeatOnce(opts: { } = opts; const _runtime = opts.runtime ?? defaultRuntime; const replyResolver = opts.replyResolver ?? getReplyFromConfig; - const sender = opts.sender ?? sendMessageWeb; + const sender = opts.sender ?? sendWithIpcFallback; const runId = newConnectionId(); const heartbeatLogger = getChildLogger({ module: "web-heartbeat", @@ -526,6 +547,8 @@ export async function monitorWebProvider( const listener = await (listenerFactory ?? monitorWebInbox)({ verbose, onMessage: async (msg) => { + // Also add IPC-sent messages to echo detection + // (this is handled below in the IPC sendHandler) handledMessages += 1; lastMessageAt = Date.now(); const ts = msg.timestamp @@ -677,7 +700,33 @@ export async function monitorWebProvider( }, }); + // Start IPC server so `warelay send` can use this connection + // instead of creating a new one (which would corrupt Signal session) + if ("sendMessage" in listener) { + startIpcServer(async (to, message, mediaUrl) => { + let mediaBuffer: Buffer | undefined; + let mediaType: string | undefined; + if (mediaUrl) { + const media = await loadWebMedia(mediaUrl); + mediaBuffer = media.buffer; + mediaType = media.contentType; + } + const result = await listener.sendMessage(to, message, mediaBuffer, mediaType); + // Add to echo detection so we don't process our own message + if (message) { + recentlySent.add(message); + if (recentlySent.size > MAX_RECENT_MESSAGES) { + const firstKey = recentlySent.values().next().value; + if (firstKey) recentlySent.delete(firstKey); + } + } + logInfo(`📤 IPC send to ${to}: ${message.substring(0, 50)}...`, runtime); + return result; + }); + } + const closeListener = async () => { + stopIpcServer(); if (heartbeat) clearInterval(heartbeat); if (replyHeartbeatTimer) clearInterval(replyHeartbeatTimer); if (watchdogTimer) clearInterval(watchdogTimer); diff --git a/src/web/inbound.ts b/src/web/inbound.ts index 209824229..bafcac10d 100644 --- a/src/web/inbound.ts +++ b/src/web/inbound.ts @@ -212,6 +212,51 @@ export async function monitorWebInbox(options: { } }, onClose, + /** + * Send a message through this connection's socket. + * Used by IPC to avoid creating new connections. + */ + sendMessage: async ( + to: string, + text: string, + mediaBuffer?: Buffer, + mediaType?: string, + ): Promise<{ messageId: string }> => { + const jid = `${to.replace(/^\+/, "")}@s.whatsapp.net`; + let payload: AnyMessageContent; + if (mediaBuffer && mediaType) { + if (mediaType.startsWith("image/")) { + payload = { + image: mediaBuffer, + caption: text || undefined, + mimetype: mediaType, + }; + } else if (mediaType.startsWith("audio/")) { + payload = { + audio: mediaBuffer, + ptt: true, + mimetype: mediaType, + }; + } else if (mediaType.startsWith("video/")) { + payload = { + video: mediaBuffer, + caption: text || undefined, + mimetype: mediaType, + }; + } else { + payload = { + document: mediaBuffer, + fileName: "file", + caption: text || undefined, + mimetype: mediaType, + }; + } + } else { + payload = { text }; + } + const result = await sock.sendMessage(jid, payload); + return { messageId: result?.key?.id ?? "unknown" }; + }, } as const; } diff --git a/src/web/ipc.ts b/src/web/ipc.ts new file mode 100644 index 000000000..99e09e0f5 --- /dev/null +++ b/src/web/ipc.ts @@ -0,0 +1,225 @@ +/** + * IPC server for warelay relay. + * + * When the relay is running, it starts a Unix socket server that allows + * `warelay send` and `warelay heartbeat` to send messages through the + * existing WhatsApp connection instead of creating new ones. + * + * This prevents Signal session ratchet corruption from multiple connections. + */ + +import fs from "node:fs"; +import net from "node:net"; +import os from "node:os"; +import path from "node:path"; + +import { getChildLogger } from "../logging.js"; + +const SOCKET_PATH = path.join(os.homedir(), ".warelay", "relay.sock"); + +export interface IpcSendRequest { + type: "send"; + to: string; + message: string; + mediaUrl?: string; +} + +export interface IpcSendResponse { + success: boolean; + messageId?: string; + error?: string; +} + +type SendHandler = ( + to: string, + message: string, + mediaUrl?: string, +) => Promise<{ messageId: string }>; + +let server: net.Server | null = null; + +/** + * Start the IPC server. Called by the relay when it starts. + */ +export function startIpcServer(sendHandler: SendHandler): void { + const logger = getChildLogger({ module: "ipc-server" }); + + // Clean up stale socket file + try { + fs.unlinkSync(SOCKET_PATH); + } catch { + // Ignore if doesn't exist + } + + server = net.createServer((conn) => { + let buffer = ""; + + conn.on("data", async (data) => { + buffer += data.toString(); + + // Try to parse complete JSON messages (newline-delimited) + const lines = buffer.split("\n"); + buffer = lines.pop() ?? ""; // Keep incomplete line in buffer + + for (const line of lines) { + if (!line.trim()) continue; + + try { + const request = JSON.parse(line) as IpcSendRequest; + + if (request.type === "send") { + try { + const result = await sendHandler( + request.to, + request.message, + request.mediaUrl, + ); + const response: IpcSendResponse = { + success: true, + messageId: result.messageId, + }; + conn.write(JSON.stringify(response) + "\n"); + } catch (err) { + const response: IpcSendResponse = { + success: false, + error: String(err), + }; + conn.write(JSON.stringify(response) + "\n"); + } + } + } catch (err) { + logger.warn({ error: String(err) }, "failed to parse IPC request"); + const response: IpcSendResponse = { + success: false, + error: "Invalid request format", + }; + conn.write(JSON.stringify(response) + "\n"); + } + } + }); + + conn.on("error", (err) => { + logger.debug({ error: String(err) }, "IPC connection error"); + }); + }); + + server.listen(SOCKET_PATH, () => { + logger.info({ socketPath: SOCKET_PATH }, "IPC server started"); + // Make socket accessible + fs.chmodSync(SOCKET_PATH, 0o600); + }); + + server.on("error", (err) => { + logger.error({ error: String(err) }, "IPC server error"); + }); +} + +/** + * Stop the IPC server. Called when relay shuts down. + */ +export function stopIpcServer(): void { + if (server) { + server.close(); + server = null; + } + try { + fs.unlinkSync(SOCKET_PATH); + } catch { + // Ignore + } +} + +/** + * Check if the relay IPC server is running. + */ +export function isRelayRunning(): boolean { + try { + fs.accessSync(SOCKET_PATH); + return true; + } catch { + return false; + } +} + +/** + * Send a message through the running relay's IPC. + * Returns null if relay is not running. + */ +export async function sendViaIpc( + to: string, + message: string, + mediaUrl?: string, +): Promise { + if (!isRelayRunning()) { + return null; + } + + return new Promise((resolve) => { + const client = net.createConnection(SOCKET_PATH); + let buffer = ""; + let resolved = false; + + const timeout = setTimeout(() => { + if (!resolved) { + resolved = true; + client.destroy(); + resolve({ success: false, error: "IPC timeout" }); + } + }, 30000); // 30 second timeout + + client.on("connect", () => { + const request: IpcSendRequest = { + type: "send", + to, + message, + mediaUrl, + }; + client.write(JSON.stringify(request) + "\n"); + }); + + client.on("data", (data) => { + buffer += data.toString(); + const lines = buffer.split("\n"); + + for (const line of lines) { + if (!line.trim()) continue; + try { + const response = JSON.parse(line) as IpcSendResponse; + if (!resolved) { + resolved = true; + clearTimeout(timeout); + client.end(); + resolve(response); + } + return; + } catch { + // Keep reading + } + } + }); + + client.on("error", (err) => { + if (!resolved) { + resolved = true; + clearTimeout(timeout); + // Socket exists but can't connect - relay might have crashed + resolve(null); + } + }); + + client.on("close", () => { + if (!resolved) { + resolved = true; + clearTimeout(timeout); + resolve({ success: false, error: "Connection closed" }); + } + }); + }); +} + +/** + * Get the IPC socket path for debugging/status. + */ +export function getSocketPath(): string { + return SOCKET_PATH; +}