Add auto-recovery from stuck WhatsApp sessions
Fixes issue where unauthorized messages from +212652169245 (5elements spa) triggered Bad MAC errors and silently killed the event emitter, preventing all future message processing. Changes: 1. Early allowFrom filtering in inbound.ts - blocks unauthorized senders before they trigger encryption errors 2. Message timeout watchdog - auto-restarts connection if no messages received for 10 minutes 3. Health monitoring in heartbeat - warns if >30 min without messages 4. Mock loadConfig in tests to handle new dependency Root cause: Event emitter stopped firing after Bad MAC errors from decryption attempts on messages from unauthorized senders. Connection stayed alive but all subsequent messages.upsert events silently failed.
This commit is contained in:
@@ -512,10 +512,15 @@ export async function monitorWebProvider(
|
||||
const startedAt = Date.now();
|
||||
let heartbeat: NodeJS.Timeout | null = null;
|
||||
let replyHeartbeatTimer: NodeJS.Timeout | null = null;
|
||||
let watchdogTimer: NodeJS.Timeout | null = null;
|
||||
let lastMessageAt: number | null = null;
|
||||
let handledMessages = 0;
|
||||
let lastInboundMsg: WebInboundMsg | null = null;
|
||||
|
||||
// Watchdog to detect stuck message processing (e.g., event emitter died)
|
||||
const MESSAGE_TIMEOUT_MS = 10 * 60 * 1000; // 10 minutes without any messages
|
||||
const WATCHDOG_CHECK_MS = 60 * 1000; // Check every minute
|
||||
|
||||
const listener = await (listenerFactory ?? monitorWebInbox)({
|
||||
verbose,
|
||||
onMessage: async (msg) => {
|
||||
@@ -673,6 +678,7 @@ export async function monitorWebProvider(
|
||||
const closeListener = async () => {
|
||||
if (heartbeat) clearInterval(heartbeat);
|
||||
if (replyHeartbeatTimer) clearInterval(replyHeartbeatTimer);
|
||||
if (watchdogTimer) clearInterval(watchdogTimer);
|
||||
try {
|
||||
await listener.close();
|
||||
} catch (err) {
|
||||
@@ -683,18 +689,52 @@ export async function monitorWebProvider(
|
||||
if (keepAlive) {
|
||||
heartbeat = setInterval(() => {
|
||||
const authAgeMs = getWebAuthAgeMs();
|
||||
heartbeatLogger.info(
|
||||
{
|
||||
connectionId,
|
||||
reconnectAttempts,
|
||||
messagesHandled: handledMessages,
|
||||
lastMessageAt,
|
||||
authAgeMs,
|
||||
uptimeMs: Date.now() - startedAt,
|
||||
},
|
||||
"web relay heartbeat",
|
||||
);
|
||||
const minutesSinceLastMessage = lastMessageAt
|
||||
? Math.floor((Date.now() - lastMessageAt) / 60000)
|
||||
: null;
|
||||
|
||||
const logData = {
|
||||
connectionId,
|
||||
reconnectAttempts,
|
||||
messagesHandled: handledMessages,
|
||||
lastMessageAt,
|
||||
authAgeMs,
|
||||
uptimeMs: Date.now() - startedAt,
|
||||
...(minutesSinceLastMessage !== null && minutesSinceLastMessage > 30
|
||||
? { minutesSinceLastMessage }
|
||||
: {}),
|
||||
};
|
||||
|
||||
// Warn if no messages in 30+ minutes
|
||||
if (minutesSinceLastMessage && minutesSinceLastMessage > 30) {
|
||||
heartbeatLogger.warn(logData, "⚠️ web relay heartbeat - no messages in 30+ minutes");
|
||||
} else {
|
||||
heartbeatLogger.info(logData, "web relay heartbeat");
|
||||
}
|
||||
}, heartbeatSeconds * 1000);
|
||||
|
||||
// Watchdog: Auto-restart if no messages received for MESSAGE_TIMEOUT_MS
|
||||
watchdogTimer = setInterval(() => {
|
||||
if (lastMessageAt) {
|
||||
const timeSinceLastMessage = Date.now() - lastMessageAt;
|
||||
if (timeSinceLastMessage > MESSAGE_TIMEOUT_MS) {
|
||||
const minutesSinceLastMessage = Math.floor(timeSinceLastMessage / 60000);
|
||||
heartbeatLogger.warn(
|
||||
{
|
||||
connectionId,
|
||||
minutesSinceLastMessage,
|
||||
lastMessageAt: new Date(lastMessageAt),
|
||||
messagesHandled: handledMessages,
|
||||
},
|
||||
"Message timeout detected - forcing reconnect",
|
||||
);
|
||||
console.error(
|
||||
`⚠️ No messages received in ${minutesSinceLastMessage}m - restarting connection`,
|
||||
);
|
||||
closeListener(); // Trigger reconnect
|
||||
}
|
||||
}
|
||||
}, WATCHDOG_CHECK_MS);
|
||||
}
|
||||
|
||||
const runReplyHeartbeat = async () => {
|
||||
|
||||
@@ -5,6 +5,14 @@ import path from "node:path";
|
||||
|
||||
import { afterAll, beforeAll, describe, expect, it, vi } from "vitest";
|
||||
|
||||
vi.mock("../config/config.js", () => ({
|
||||
loadConfig: vi.fn().mockReturnValue({
|
||||
inbound: {
|
||||
allowFrom: ["*"], // Allow all in tests
|
||||
},
|
||||
}),
|
||||
}));
|
||||
|
||||
const HOME = path.join(
|
||||
os.tmpdir(),
|
||||
`warelay-inbound-media-${crypto.randomUUID()}`,
|
||||
|
||||
@@ -8,10 +8,11 @@ import {
|
||||
downloadMediaMessage,
|
||||
} from "@whiskeysockets/baileys";
|
||||
|
||||
import { loadConfig } from "../config/config.js";
|
||||
import { isVerbose, logVerbose } from "../globals.js";
|
||||
import { getChildLogger } from "../logging.js";
|
||||
import { saveMediaBuffer } from "../media/store.js";
|
||||
import { jidToE164 } from "../utils.js";
|
||||
import { jidToE164, normalizeE164 } from "../utils.js";
|
||||
import {
|
||||
createWaSocket,
|
||||
getStatusCode,
|
||||
@@ -94,6 +95,20 @@ export async function monitorWebInbox(options: {
|
||||
}
|
||||
const from = jidToE164(remoteJid);
|
||||
if (!from) continue;
|
||||
|
||||
// Filter unauthorized senders early to prevent wasted processing
|
||||
// and potential session corruption from Bad MAC errors
|
||||
const cfg = loadConfig();
|
||||
const allowFrom = cfg.inbound?.allowFrom;
|
||||
const isSamePhone = from === selfE164;
|
||||
|
||||
if (!isSamePhone && Array.isArray(allowFrom) && allowFrom.length > 0) {
|
||||
if (!allowFrom.includes("*") && !allowFrom.map(normalizeE164).includes(from)) {
|
||||
logVerbose(`Blocked unauthorized sender ${from} (not in allowFrom list)`);
|
||||
continue; // Skip processing entirely
|
||||
}
|
||||
}
|
||||
|
||||
let body = extractText(msg.message ?? undefined);
|
||||
if (!body) {
|
||||
body = extractMediaPlaceholder(msg.message ?? undefined);
|
||||
|
||||
@@ -9,6 +9,14 @@ vi.mock("../media/store.js", () => ({
|
||||
}),
|
||||
}));
|
||||
|
||||
vi.mock("../config/config.js", () => ({
|
||||
loadConfig: vi.fn().mockReturnValue({
|
||||
inbound: {
|
||||
allowFrom: ["*"], // Allow all in tests
|
||||
},
|
||||
}),
|
||||
}));
|
||||
|
||||
vi.mock("./session.js", () => {
|
||||
const { EventEmitter } = require("node:events");
|
||||
const ev = new EventEmitter();
|
||||
|
||||
Reference in New Issue
Block a user