Web relay: auto-reconnect Baileys and test

This commit is contained in:
Peter Steinberger
2025-11-25 18:09:57 +01:00
parent 46be5eac7d
commit dda017df23
3 changed files with 253 additions and 110 deletions

View File

@@ -236,6 +236,23 @@ describe("provider-web", () => {
await listener.close();
});
it("monitorWebInbox resolves onClose when the socket closes", async () => {
const listener = await monitorWebInbox({
verbose: false,
onMessage: vi.fn(),
});
const sock = getLastSocket();
const reasonPromise = listener.onClose;
sock.ev.emit("connection.update", {
connection: "close",
lastDisconnect: { error: { output: { statusCode: 500 } } },
});
await expect(reasonPromise).resolves.toEqual(
expect.objectContaining({ status: 500, isLoggedOut: false }),
);
await listener.close();
});
it("monitorWebInbox logs inbound bodies to file", async () => {
const logPath = path.join(
os.tmpdir(),
@@ -300,6 +317,49 @@ describe("provider-web", () => {
await listener.close();
});
it("monitorWebProvider reconnects after a connection close", async () => {
vi.useFakeTimers();
const closeResolvers: Array<() => void> = [];
const listenerFactory = vi.fn(async () => {
let resolve!: () => void;
const onClose = new Promise<void>((res) => {
resolve = res;
closeResolvers.push(res);
});
return { close: vi.fn(), onClose };
});
const runtime = {
log: vi.fn(),
error: vi.fn(),
exit: vi.fn(),
};
const controller = new AbortController();
const run = monitorWebProvider(
false,
listenerFactory,
true,
async () => ({ text: "ok" }),
runtime as never,
controller.signal,
);
await Promise.resolve();
expect(listenerFactory).toHaveBeenCalledTimes(1);
closeResolvers[0]?.();
await Promise.resolve();
await vi.runOnlyPendingTimersAsync();
expect(listenerFactory).toHaveBeenCalledTimes(2);
expect(runtime.error).toHaveBeenCalledWith(
expect.stringContaining("Reconnecting"),
);
controller.abort();
closeResolvers[1]?.();
await vi.runAllTimersAsync();
await run;
});
it("monitorWebProvider falls back to text when media send fails", async () => {
const sendMedia = vi.fn().mockRejectedValue(new Error("boom"));
const reply = vi.fn().mockResolvedValue(undefined);

View File

@@ -229,6 +229,12 @@ export function webAuthExists() {
.catch(() => false);
}
type WebListenerCloseReason = {
status?: number;
isLoggedOut: boolean;
error?: unknown;
};
export type WebInboundMessage = {
id?: string;
from: string;
@@ -255,6 +261,10 @@ export async function monitorWebInbox(options: {
const inboundLogger = getChildLogger({ module: "web-inbound" });
const sock = await createWaSocket(false, options.verbose);
await waitForWaConnection(sock);
let onCloseResolve: ((reason: WebListenerCloseReason) => void) | null = null;
const onClose = new Promise<WebListenerCloseReason>((resolve) => {
onCloseResolve = resolve;
});
try {
// Advertise that the relay is online right after connecting.
await sock.sendPresenceUpdate("available");
@@ -373,6 +383,20 @@ export async function monitorWebInbox(options: {
}
});
sock.ev.on(
"connection.update",
(update: Partial<import("@whiskeysockets/baileys").ConnectionState>) => {
if (update.connection === "close") {
const status = getStatusCode(update.lastDisconnect?.error);
onCloseResolve?.({
status,
isLoggedOut: status === DisconnectReason.loggedOut,
error: update.lastDisconnect?.error,
});
}
},
);
return {
close: async () => {
try {
@@ -381,6 +405,7 @@ export async function monitorWebInbox(options: {
logVerbose(`Socket close failed: ${String(err)}`);
}
},
onClose,
};
}
@@ -390,139 +415,196 @@ export async function monitorWebProvider(
keepAlive = true,
replyResolver: typeof getReplyFromConfig = getReplyFromConfig,
runtime: RuntimeEnv = defaultRuntime,
abortSignal?: AbortSignal,
) {
const replyLogger = getChildLogger({ module: "web-auto-reply" });
// Listen for inbound personal WhatsApp Web messages and auto-reply if configured.
const listener = await listenerFactory({
verbose,
onMessage: async (msg) => {
const ts = msg.timestamp
? new Date(msg.timestamp).toISOString()
: new Date().toISOString();
console.log(`\n[${ts}] ${msg.from} -> ${msg.to}: ${msg.body}`);
const stopRequested = () => abortSignal?.aborted === true;
const abortPromise =
abortSignal &&
new Promise<"aborted">((resolve) =>
abortSignal.addEventListener("abort", () => resolve("aborted"), {
once: true,
}),
);
const replyStarted = Date.now();
const replyResult = await replyResolver(
{
Body: msg.body,
From: msg.from,
To: msg.to,
MessageSid: msg.id,
MediaPath: msg.mediaPath,
MediaUrl: msg.mediaUrl,
MediaType: msg.mediaType,
},
{
onReplyStart: msg.sendComposing,
},
);
if (!replyResult || (!replyResult.text && !replyResult.mediaUrl)) {
logVerbose("Skipping auto-reply: no text/media returned from resolver");
return;
}
try {
if (replyResult.mediaUrl) {
logVerbose(`Web auto-reply media detected: ${replyResult.mediaUrl}`);
try {
const media = await loadWebMedia(replyResult.mediaUrl);
if (isVerbose()) {
logVerbose(
`Web auto-reply media size: ${(media.buffer.length / (1024 * 1024)).toFixed(2)}MB`,
);
}
await msg.sendMedia({
image: media.buffer,
caption: replyResult.text || undefined,
mimetype: media.contentType,
});
logInfo(
`✅ Sent web media reply to ${msg.from} (${(media.buffer.length / (1024 * 1024)).toFixed(2)}MB)`,
runtime,
const sleep = (ms: number) =>
new Promise<void>((resolve) => setTimeout(resolve, ms));
while (true) {
if (stopRequested()) break;
const listener = await listenerFactory({
verbose,
onMessage: async (msg) => {
const ts = msg.timestamp
? new Date(msg.timestamp).toISOString()
: new Date().toISOString();
console.log(`\n[${ts}] ${msg.from} -> ${msg.to}: ${msg.body}`);
const replyStarted = Date.now();
const replyResult = await replyResolver(
{
Body: msg.body,
From: msg.from,
To: msg.to,
MessageSid: msg.id,
MediaPath: msg.mediaPath,
MediaUrl: msg.mediaUrl,
MediaType: msg.mediaType,
},
{
onReplyStart: msg.sendComposing,
},
);
if (!replyResult || (!replyResult.text && !replyResult.mediaUrl)) {
logVerbose("Skipping auto-reply: no text/media returned from resolver");
return;
}
try {
if (replyResult.mediaUrl) {
logVerbose(
`Web auto-reply media detected: ${replyResult.mediaUrl}`,
);
replyLogger.info(
{
to: msg.from,
from: msg.to,
text: replyResult.text ?? null,
mediaUrl: replyResult.mediaUrl,
mediaSizeBytes: media.buffer.length,
durationMs: Date.now() - replyStarted,
},
"auto-reply sent (media)",
);
} catch (err) {
console.error(
danger(`Failed sending web media to ${msg.from}: ${String(err)}`),
);
if (replyResult.text) {
await msg.reply(replyResult.text);
try {
const media = await loadWebMedia(replyResult.mediaUrl);
if (isVerbose()) {
logVerbose(
`Web auto-reply media size: ${(media.buffer.length / (1024 * 1024)).toFixed(2)}MB`,
);
}
await msg.sendMedia({
image: media.buffer,
caption: replyResult.text || undefined,
mimetype: media.contentType,
});
logInfo(
`⚠️ Media skipped; sent text-only to ${msg.from}`,
`✅ Sent web media reply to ${msg.from} (${(media.buffer.length / (1024 * 1024)).toFixed(2)}MB)`,
runtime,
);
replyLogger.info(
{
to: msg.from,
from: msg.to,
text: replyResult.text,
text: replyResult.text ?? null,
mediaUrl: replyResult.mediaUrl,
mediaSizeBytes: media.buffer.length,
durationMs: Date.now() - replyStarted,
mediaSendFailed: true,
},
"auto-reply sent (text fallback)",
"auto-reply sent (media)",
);
} catch (err) {
console.error(
danger(
`Failed sending web media to ${msg.from}: ${String(err)}`,
),
);
if (replyResult.text) {
await msg.reply(replyResult.text);
logInfo(
`⚠️ Media skipped; sent text-only to ${msg.from}`,
runtime,
);
replyLogger.info(
{
to: msg.from,
from: msg.to,
text: replyResult.text,
mediaUrl: replyResult.mediaUrl,
durationMs: Date.now() - replyStarted,
mediaSendFailed: true,
},
"auto-reply sent (text fallback)",
);
}
}
} else {
await msg.reply(replyResult.text ?? "");
}
} else {
await msg.reply(replyResult.text ?? "");
}
const durationMs = Date.now() - replyStarted;
if (isVerbose()) {
console.log(
success(
`↩️ Auto-replied to ${msg.from} (web, ${replyResult.text?.length ?? 0} chars${replyResult.mediaUrl ? ", media" : ""}, ${formatDuration(durationMs)})`,
),
const durationMs = Date.now() - replyStarted;
if (isVerbose()) {
console.log(
success(
`↩️ Auto-replied to ${msg.from} (web, ${replyResult.text?.length ?? 0} chars${replyResult.mediaUrl ? ", media" : ""}, ${formatDuration(durationMs)})`,
),
);
} else {
console.log(
success(
`↩️ ${replyResult.text ?? "<media>"}${replyResult.mediaUrl ? " (media)" : ""}`,
),
);
}
replyLogger.info(
{
to: msg.from,
from: msg.to,
text: replyResult.text ?? null,
mediaUrl: replyResult.mediaUrl,
durationMs,
},
"auto-reply sent",
);
} else {
console.log(
success(
`↩️ ${replyResult.text ?? "<media>"}${replyResult.mediaUrl ? " (media)" : ""}`,
} catch (err) {
console.error(
danger(
`Failed sending web auto-reply to ${msg.from}: ${String(err)}`,
),
);
}
replyLogger.info(
{
to: msg.from,
from: msg.to,
text: replyResult.text ?? null,
mediaUrl: replyResult.mediaUrl,
durationMs,
},
"auto-reply sent",
);
} catch (err) {
console.error(
danger(
`Failed sending web auto-reply to ${msg.from}: ${String(err)}`,
),
);
}
},
});
logInfo(
"📡 Listening for personal WhatsApp Web inbound messages. Leave this running; Ctrl+C to stop.",
runtime,
);
process.on("SIGINT", () => {
void listener.close().finally(() => {
logInfo("👋 Web monitor stopped", runtime);
runtime.exit(0);
},
});
});
if (keepAlive) {
await waitForever();
logInfo(
"📡 Listening for personal WhatsApp Web inbound messages. Leave this running; Ctrl+C to stop.",
runtime,
);
let stop = false;
process.on("SIGINT", () => {
stop = true;
void listener.close().finally(() => {
logInfo("👋 Web monitor stopped", runtime);
runtime.exit(0);
});
});
if (!keepAlive) return;
const reason = await Promise.race([
listener.onClose ?? waitForever(),
abortPromise ?? waitForever(),
]);
if (stopRequested() || stop || reason === "aborted") {
await listener.close();
break;
}
const status =
(typeof reason === "object" && reason && "status" in reason
? (reason as WebListenerCloseReason).status
: undefined) ?? "unknown";
const loggedOut =
typeof reason === "object" &&
reason &&
"isLoggedOut" in reason &&
(reason as WebListenerCloseReason).isLoggedOut;
if (loggedOut) {
runtime.error(
danger(
"WhatsApp session logged out. Run `warelay login --provider web` to relink.",
),
);
break;
}
runtime.error(
danger(
`WhatsApp Web connection closed (status ${status}). Reconnecting in 2s…`,
),
);
await listener.close();
await sleep(2_000);
}
}