feat: add heartbeat cli and relay trigger

This commit is contained in:
Peter Steinberger
2025-11-26 17:04:43 +01:00
parent c9e2d69bfb
commit 271004bf60
6 changed files with 576 additions and 108 deletions

View File

@@ -3,14 +3,110 @@ import fs from "node:fs/promises";
import sharp from "sharp";
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
import type { WarelayConfig } from "../config/config.js";
import { resetLogger, setLoggerOverride } from "../logging.js";
import { monitorWebProvider } from "./auto-reply.js";
import {
HEARTBEAT_TOKEN,
monitorWebProvider,
resolveReplyHeartbeatMinutes,
runWebHeartbeatOnce,
stripHeartbeatToken,
} from "./auto-reply.js";
import type { sendMessageWeb } from "./outbound.js";
import {
resetBaileysMocks,
resetLoadConfigMock,
setLoadConfigMock,
} from "./test-helpers.js";
describe("heartbeat helpers", () => {
it("strips heartbeat token and skips when only token", () => {
expect(stripHeartbeatToken(undefined)).toEqual({
shouldSkip: true,
text: "",
});
expect(stripHeartbeatToken(" ")).toEqual({
shouldSkip: true,
text: "",
});
expect(stripHeartbeatToken(HEARTBEAT_TOKEN)).toEqual({
shouldSkip: true,
text: "",
});
});
it("keeps content and removes token when mixed", () => {
expect(stripHeartbeatToken(`ALERT ${HEARTBEAT_TOKEN}`)).toEqual({
shouldSkip: false,
text: "ALERT",
});
expect(stripHeartbeatToken(`hello`)).toEqual({
shouldSkip: false,
text: "hello",
});
});
it("resolves heartbeat minutes with default and overrides", () => {
const cfgBase: WarelayConfig = {
inbound: {
reply: { mode: "command" as const },
},
};
expect(resolveReplyHeartbeatMinutes(cfgBase)).toBe(30);
expect(
resolveReplyHeartbeatMinutes({
inbound: { reply: { mode: "command", heartbeatMinutes: 5 } },
}),
).toBe(5);
expect(
resolveReplyHeartbeatMinutes({
inbound: { reply: { mode: "command", heartbeatMinutes: 0 } },
}),
).toBeNull();
expect(resolveReplyHeartbeatMinutes(cfgBase, 7)).toBe(7);
expect(
resolveReplyHeartbeatMinutes({
inbound: { reply: { mode: "text" } },
}),
).toBeNull();
});
});
describe("runWebHeartbeatOnce", () => {
it("skips when heartbeat token returned", async () => {
const sender: typeof sendMessageWeb = vi.fn();
const resolver = vi.fn(async () => ({ text: HEARTBEAT_TOKEN }));
setLoadConfigMock({
inbound: { allowFrom: ["+1555"], reply: { mode: "command" } },
});
await runWebHeartbeatOnce({
to: "+1555",
verbose: false,
sender,
replyResolver: resolver,
});
expect(resolver).toHaveBeenCalled();
expect(sender).not.toHaveBeenCalled();
});
it("sends when alert text present", async () => {
const sender: typeof sendMessageWeb = vi
.fn()
.mockResolvedValue({ messageId: "m1", toJid: "jid" });
const resolver = vi.fn(async () => ({ text: "ALERT" }));
setLoadConfigMock({
inbound: { allowFrom: ["+1555"], reply: { mode: "command" } },
});
await runWebHeartbeatOnce({
to: "+1555",
verbose: false,
sender,
replyResolver: resolver,
});
expect(sender).toHaveBeenCalledWith("+1555", "ALERT", { verbose: false });
});
});
describe("web auto-reply", () => {
beforeEach(() => {
vi.clearAllMocks();

View File

@@ -1,4 +1,5 @@
import { getReplyFromConfig } from "../auto-reply/reply.js";
import type { ReplyPayload } from "../auto-reply/types.js";
import { waitForever } from "../cli/wait.js";
import { loadConfig } from "../config/config.js";
import { danger, isVerbose, logVerbose, success } from "../globals.js";
@@ -7,6 +8,7 @@ import { getChildLogger } from "../logging.js";
import { defaultRuntime, type RuntimeEnv } from "../runtime.js";
import { monitorWebInbox } from "./inbound.js";
import { loadWebMedia } from "./media.js";
import { sendMessageWeb } from "./outbound.js";
import {
computeBackoff,
newConnectionId,
@@ -18,16 +20,249 @@ import {
import { getWebAuthAgeMs } from "./session.js";
const DEFAULT_WEB_MEDIA_BYTES = 5 * 1024 * 1024;
type WebInboundMsg = Parameters<
typeof monitorWebInbox
>[0]["onMessage"] extends (msg: infer M) => unknown
? M
: never;
export type WebMonitorTuning = {
reconnect?: Partial<ReconnectPolicy>;
heartbeatSeconds?: number;
replyHeartbeatMinutes?: number;
replyHeartbeatNow?: boolean;
sleep?: (ms: number, signal?: AbortSignal) => Promise<void>;
};
const formatDuration = (ms: number) =>
ms >= 1000 ? `${(ms / 1000).toFixed(2)}s` : `${ms}ms`;
const DEFAULT_REPLY_HEARTBEAT_MINUTES = 30;
export const HEARTBEAT_TOKEN = "HEARTBEAT_OK";
export const HEARTBEAT_PROMPT =
"HEARTBEAT ping — if nothing important happened, reply exactly HEARTBEAT_OK. Otherwise return a concise alert.";
export function resolveReplyHeartbeatMinutes(
cfg: ReturnType<typeof loadConfig>,
overrideMinutes?: number,
) {
const raw = overrideMinutes ?? cfg.inbound?.reply?.heartbeatMinutes;
if (raw === 0) return null;
if (typeof raw === "number" && raw > 0) return raw;
return cfg.inbound?.reply?.mode === "command"
? DEFAULT_REPLY_HEARTBEAT_MINUTES
: null;
}
export function stripHeartbeatToken(raw?: string) {
if (!raw) return { shouldSkip: true, text: "" };
const trimmed = raw.trim();
if (!trimmed) return { shouldSkip: true, text: "" };
if (trimmed === HEARTBEAT_TOKEN) return { shouldSkip: true, text: "" };
const withoutToken = trimmed.replaceAll(HEARTBEAT_TOKEN, "").trim();
return {
shouldSkip: withoutToken.length === 0,
text: withoutToken || trimmed,
};
}
export async function runWebHeartbeatOnce(opts: {
to: string;
verbose?: boolean;
replyResolver?: typeof getReplyFromConfig;
runtime?: RuntimeEnv;
sender?: typeof sendMessageWeb;
}) {
const { to, verbose = false } = opts;
const _runtime = opts.runtime ?? defaultRuntime;
const replyResolver = opts.replyResolver ?? getReplyFromConfig;
const sender = opts.sender ?? sendMessageWeb;
const runId = newConnectionId();
const heartbeatLogger = getChildLogger({
module: "web-heartbeat",
runId,
to,
});
const cfg = loadConfig();
try {
const replyResult = await replyResolver(
{
Body: HEARTBEAT_PROMPT,
From: to,
To: to,
MessageSid: undefined,
},
undefined,
cfg,
);
if (
!replyResult ||
(!replyResult.text &&
!replyResult.mediaUrl &&
!replyResult.mediaUrls?.length)
) {
heartbeatLogger.info({ to, reason: "empty-reply" }, "heartbeat skipped");
if (verbose) console.log(success("heartbeat: ok (empty reply)"));
return;
}
const hasMedia =
(replyResult.mediaUrl ?? replyResult.mediaUrls?.length ?? 0) > 0;
const stripped = stripHeartbeatToken(replyResult.text);
if (stripped.shouldSkip && !hasMedia) {
heartbeatLogger.info(
{ to, reason: "heartbeat-token", rawLength: replyResult.text?.length },
"heartbeat skipped",
);
console.log(success("heartbeat: ok (HEARTBEAT_OK)"));
return;
}
if (hasMedia) {
heartbeatLogger.warn(
{ to },
"heartbeat reply contained media; sending text only",
);
}
const finalText = stripped.text || replyResult.text || "";
const sendResult = await sender(to, finalText, { verbose });
heartbeatLogger.info(
{ to, messageId: sendResult.messageId, chars: finalText.length },
"heartbeat sent",
);
console.log(success(`heartbeat: alert sent to ${to}`));
} catch (err) {
heartbeatLogger.warn({ to, error: String(err) }, "heartbeat failed");
console.log(danger(`heartbeat: failed - ${String(err)}`));
throw err;
}
}
async function deliverWebReply(params: {
replyResult: ReplyPayload;
msg: WebInboundMsg;
maxMediaBytes: number;
replyLogger: ReturnType<typeof getChildLogger>;
runtime: RuntimeEnv;
connectionId?: string;
skipLog?: boolean;
}) {
const {
replyResult,
msg,
maxMediaBytes,
replyLogger,
runtime,
connectionId,
skipLog,
} = params;
const replyStarted = Date.now();
const mediaList = replyResult.mediaUrls?.length
? replyResult.mediaUrls
: replyResult.mediaUrl
? [replyResult.mediaUrl]
: [];
if (mediaList.length === 0 && replyResult.text) {
await msg.reply(replyResult.text || "");
if (!skipLog) {
logInfo(
`✅ Sent web reply to ${msg.from} (${(Date.now() - replyStarted).toFixed(0)}ms)`,
runtime,
);
}
replyLogger.info(
{
correlationId: msg.id ?? newConnectionId(),
connectionId: connectionId ?? null,
to: msg.from,
from: msg.to,
text: replyResult.text,
mediaUrl: null,
mediaSizeBytes: null,
mediaKind: null,
durationMs: Date.now() - replyStarted,
},
"auto-reply sent (text)",
);
return;
}
const cleanText = replyResult.text ?? undefined;
for (const [index, mediaUrl] of mediaList.entries()) {
try {
const media = await loadWebMedia(mediaUrl, maxMediaBytes);
if (isVerbose()) {
logVerbose(
`Web auto-reply media size: ${(media.buffer.length / (1024 * 1024)).toFixed(2)}MB`,
);
logVerbose(
`Web auto-reply media source: ${mediaUrl} (kind ${media.kind})`,
);
}
const caption = index === 0 ? cleanText || undefined : undefined;
if (media.kind === "image") {
await msg.sendMedia({
image: media.buffer,
caption,
mimetype: media.contentType,
});
} else if (media.kind === "audio") {
await msg.sendMedia({
audio: media.buffer,
ptt: true,
mimetype: media.contentType,
caption,
});
} else if (media.kind === "video") {
await msg.sendMedia({
video: media.buffer,
caption,
mimetype: media.contentType,
});
} else {
const fileName = mediaUrl.split("/").pop() ?? "file";
const mimetype = media.contentType ?? "application/octet-stream";
await msg.sendMedia({
document: media.buffer,
fileName,
caption,
mimetype,
});
}
logInfo(
`✅ Sent web media reply to ${msg.from} (${(media.buffer.length / (1024 * 1024)).toFixed(2)}MB)`,
runtime,
);
replyLogger.info(
{
correlationId: msg.id ?? newConnectionId(),
connectionId: connectionId ?? null,
to: msg.from,
from: msg.to,
text: index === 0 ? (cleanText ?? null) : null,
mediaUrl,
mediaSizeBytes: media.buffer.length,
mediaKind: media.kind,
durationMs: Date.now() - replyStarted,
},
"auto-reply sent (media)",
);
} catch (err) {
console.error(
danger(`Failed sending web media to ${msg.from}: ${String(err)}`),
);
if (index === 0 && cleanText) {
console.log(`⚠️ Media skipped; sent text-only to ${msg.from}`);
await msg.reply(cleanText || "");
}
}
}
}
export async function monitorWebProvider(
verbose: boolean,
listenerFactory: typeof monitorWebInbox | undefined = monitorWebInbox,
@@ -51,6 +286,10 @@ export async function monitorWebProvider(
cfg,
tuning.heartbeatSeconds,
);
const replyHeartbeatMinutes = resolveReplyHeartbeatMinutes(
cfg,
tuning.replyHeartbeatMinutes,
);
const reconnectPolicy = resolveReconnectPolicy(cfg, tuning.reconnect);
const sleep =
tuning.sleep ??
@@ -79,8 +318,10 @@ export async function monitorWebProvider(
const connectionId = newConnectionId();
const startedAt = Date.now();
let heartbeat: NodeJS.Timeout | null = null;
let replyHeartbeatTimer: NodeJS.Timeout | null = null;
let lastMessageAt: number | null = null;
let handledMessages = 0;
let lastInboundMsg: WebInboundMsg | null = null;
const listener = await (listenerFactory ?? monitorWebInbox)({
verbose,
@@ -106,7 +347,8 @@ export async function monitorWebProvider(
console.log(`\n[${ts}] ${msg.from} -> ${msg.to}: ${msg.body}`);
const replyStarted = Date.now();
lastInboundMsg = msg;
const replyResult = await (replyResolver ?? getReplyFromConfig)(
{
Body: msg.body,
@@ -133,122 +375,27 @@ export async function monitorWebProvider(
return;
}
try {
const mediaList = replyResult.mediaUrls?.length
? replyResult.mediaUrls
: replyResult.mediaUrl
? [replyResult.mediaUrl]
: [];
if (mediaList.length > 0) {
logVerbose(
`Web auto-reply media detected: ${mediaList.filter(Boolean).join(", ")}`,
);
for (const [index, mediaUrl] of mediaList.entries()) {
try {
const media = await loadWebMedia(mediaUrl, maxMediaBytes);
if (isVerbose()) {
logVerbose(
`Web auto-reply media size: ${(media.buffer.length / (1024 * 1024)).toFixed(2)}MB`,
);
logVerbose(
`Web auto-reply media source: ${mediaUrl} (kind ${media.kind})`,
);
}
const caption =
index === 0 ? replyResult.text || undefined : undefined;
if (media.kind === "image") {
await msg.sendMedia({
image: media.buffer,
caption,
mimetype: media.contentType,
});
} else if (media.kind === "audio") {
await msg.sendMedia({
audio: media.buffer,
ptt: true,
mimetype: media.contentType,
caption,
});
} else if (media.kind === "video") {
await msg.sendMedia({
video: media.buffer,
caption,
mimetype: media.contentType,
});
} else {
const fileName = mediaUrl.split("/").pop() ?? "file";
const mimetype =
media.contentType ?? "application/octet-stream";
await msg.sendMedia({
document: media.buffer,
fileName,
caption,
mimetype,
});
}
logInfo(
`✅ Sent web media reply to ${msg.from} (${(media.buffer.length / (1024 * 1024)).toFixed(2)}MB)`,
runtime,
);
replyLogger.info(
{
connectionId,
correlationId,
to: msg.from,
from: msg.to,
text: index === 0 ? (replyResult.text ?? null) : null,
mediaUrl,
mediaSizeBytes: media.buffer.length,
mediaKind: media.kind,
durationMs: Date.now() - replyStarted,
},
"auto-reply sent (media)",
);
} catch (err) {
console.error(
danger(
`Failed sending web media to ${msg.from}: ${String(err)}`,
),
);
if (index === 0 && replyResult.text) {
console.log(
`⚠️ Media skipped; sent text-only to ${msg.from}`,
);
await msg.reply(replyResult.text || "");
}
}
}
} else if (replyResult.text) {
await msg.reply(replyResult.text);
}
const durationMs = Date.now() - replyStarted;
const hasMedia = mediaList.length > 0;
await deliverWebReply({
replyResult,
msg,
maxMediaBytes,
replyLogger,
runtime,
connectionId,
});
if (isVerbose()) {
console.log(
success(
`↩️ Auto-replied to ${msg.from} (web, ${replyResult.text?.length ?? 0} chars${hasMedia ? ", media" : ""}, ${formatDuration(durationMs)})`,
`↩️ Auto-replied to ${msg.from} (web${replyResult.mediaUrl || replyResult.mediaUrls?.length ? ", media" : ""})`,
),
);
} else {
console.log(
success(
`↩️ ${replyResult.text ?? "<media>"}${hasMedia ? " (media)" : ""}`,
`↩️ ${replyResult.text ?? "<media>"}${replyResult.mediaUrl || replyResult.mediaUrls?.length ? " (media)" : ""}`,
),
);
}
replyLogger.info(
{
connectionId,
correlationId,
to: msg.from,
from: msg.to,
text: replyResult.text ?? null,
mediaUrl: mediaList[0] ?? null,
durationMs,
},
"auto-reply sent",
);
} catch (err) {
console.error(
danger(
@@ -261,6 +408,7 @@ export async function monitorWebProvider(
const closeListener = async () => {
if (heartbeat) clearInterval(heartbeat);
if (replyHeartbeatTimer) clearInterval(replyHeartbeatTimer);
try {
await listener.close();
} catch (err) {
@@ -285,6 +433,135 @@ export async function monitorWebProvider(
}, heartbeatSeconds * 1000);
}
const runReplyHeartbeat = async () => {
if (!replyHeartbeatMinutes) return;
const tickStart = Date.now();
if (!lastInboundMsg) {
heartbeatLogger.info(
{
connectionId,
reason: "no-recent-inbound",
durationMs: Date.now() - tickStart,
},
"reply heartbeat skipped",
);
console.log(success("heartbeat: skipped (no recent inbound)"));
return;
}
try {
if (isVerbose()) {
heartbeatLogger.info(
{
connectionId,
to: lastInboundMsg.from,
intervalMinutes: replyHeartbeatMinutes,
},
"reply heartbeat start",
);
}
const replyResult = await (replyResolver ?? getReplyFromConfig)(
{
Body: HEARTBEAT_PROMPT,
From: lastInboundMsg.from,
To: lastInboundMsg.to,
MessageSid: undefined,
MediaPath: undefined,
MediaUrl: undefined,
MediaType: undefined,
},
{
onReplyStart: lastInboundMsg.sendComposing,
},
);
if (
!replyResult ||
(!replyResult.text &&
!replyResult.mediaUrl &&
!replyResult.mediaUrls?.length)
) {
heartbeatLogger.info(
{
connectionId,
durationMs: Date.now() - tickStart,
reason: "empty-reply",
},
"reply heartbeat skipped",
);
console.log(success("heartbeat: ok (empty reply)"));
return;
}
const stripped = stripHeartbeatToken(replyResult.text);
const hasMedia =
(replyResult.mediaUrl ?? replyResult.mediaUrls?.length ?? 0) > 0;
if (stripped.shouldSkip && !hasMedia) {
heartbeatLogger.info(
{
connectionId,
durationMs: Date.now() - tickStart,
reason: "heartbeat-token",
rawLength: replyResult.text?.length ?? 0,
},
"reply heartbeat skipped",
);
console.log(success("heartbeat: ok (HEARTBEAT_OK)"));
return;
}
const cleanedReply: ReplyPayload = {
...replyResult,
text: stripped.text,
};
await deliverWebReply({
replyResult: cleanedReply,
msg: lastInboundMsg,
maxMediaBytes,
replyLogger,
runtime,
connectionId,
});
const durationMs = Date.now() - tickStart;
const summary = `heartbeat: alert sent (${formatDuration(durationMs)})`;
console.log(summary);
heartbeatLogger.info(
{
connectionId,
durationMs,
hasMedia,
chars: stripped.text?.length ?? 0,
},
"reply heartbeat sent",
);
} catch (err) {
const durationMs = Date.now() - tickStart;
heartbeatLogger.warn(
{
connectionId,
error: String(err),
durationMs,
},
"reply heartbeat failed",
);
console.log(
danger(`heartbeat: failed (${formatDuration(durationMs)})`),
);
}
};
if (replyHeartbeatMinutes && !replyHeartbeatTimer) {
const intervalMs = replyHeartbeatMinutes * 60_000;
replyHeartbeatTimer = setInterval(() => {
void runReplyHeartbeat();
}, intervalMs);
if (tuning.replyHeartbeatNow) {
void runReplyHeartbeat();
}
}
logInfo(
"📡 Listening for personal WhatsApp Web inbound messages. Leave this running; Ctrl+C to stop.",
runtime,