feat(web): emit provider status updates

This commit is contained in:
Peter Steinberger
2025-12-20 23:30:17 +01:00
parent df9e4bdd63
commit 873daf079c
3 changed files with 76 additions and 104 deletions

View File

@@ -3,7 +3,7 @@ import { loadSessionStore, resolveStorePath } from "../config/sessions.js";
import { callGateway } from "../gateway/call.js"; import { callGateway } from "../gateway/call.js";
import { info } from "../globals.js"; import { info } from "../globals.js";
import type { RuntimeEnv } from "../runtime.js"; import type { RuntimeEnv } from "../runtime.js";
import { makeProxyFetch } from "../telegram/proxy.js"; import { probeTelegram, type TelegramProbe } from "../telegram/probe.js";
import { resolveHeartbeatSeconds } from "../web/reconnect.js"; import { resolveHeartbeatSeconds } from "../web/reconnect.js";
import { import {
getWebAuthAgeMs, getWebAuthAgeMs,
@@ -11,15 +11,6 @@ import {
webAuthExists, webAuthExists,
} from "../web/session.js"; } from "../web/session.js";
type TelegramProbe = {
ok: boolean;
status?: number | null;
error?: string | null;
elapsedMs: number;
bot?: { id?: number | null; username?: string | null };
webhook?: { url?: string | null; hasCustomCert?: boolean | null };
};
export type HealthSummary = { export type HealthSummary = {
/** /**
* Convenience top-level flag for UIs (e.g. WebChat) that only need a binary * Convenience top-level flag for UIs (e.g. WebChat) that only need a binary
@@ -56,94 +47,6 @@ export type HealthSummary = {
}; };
const DEFAULT_TIMEOUT_MS = 10_000; const DEFAULT_TIMEOUT_MS = 10_000;
const TELEGRAM_API_BASE = "https://api.telegram.org";
async function fetchWithTimeout(
url: string,
timeoutMs: number,
fetcher: typeof fetch,
): Promise<Response> {
const controller = new AbortController();
const timer = setTimeout(() => controller.abort(), timeoutMs);
try {
return await fetcher(url, { signal: controller.signal });
} finally {
clearTimeout(timer);
}
}
async function probeTelegram(
token: string,
timeoutMs: number,
proxyUrl?: string,
): Promise<TelegramProbe> {
const started = Date.now();
const fetcher = proxyUrl ? makeProxyFetch(proxyUrl) : fetch;
const base = `${TELEGRAM_API_BASE}/bot${token}`;
const result: TelegramProbe = {
ok: false,
status: null,
error: null,
elapsedMs: 0,
};
try {
const meRes = await fetchWithTimeout(`${base}/getMe`, timeoutMs, fetcher);
const meJson = (await meRes.json()) as {
ok?: boolean;
description?: string;
result?: { id?: number; username?: string };
};
if (!meRes.ok || !meJson?.ok) {
result.status = meRes.status;
result.error = meJson?.description ?? `getMe failed (${meRes.status})`;
return { ...result, elapsedMs: Date.now() - started };
}
result.bot = {
id: meJson.result?.id ?? null,
username: meJson.result?.username ?? null,
};
// Try to fetch webhook info, but don't fail health if it errors
try {
const webhookRes = await fetchWithTimeout(
`${base}/getWebhookInfo`,
timeoutMs,
fetcher,
);
const webhookJson = (await webhookRes.json()) as {
ok?: boolean;
result?: {
url?: string;
has_custom_certificate?: boolean;
};
};
if (webhookRes.ok && webhookJson?.ok) {
result.webhook = {
url: webhookJson.result?.url ?? null,
hasCustomCert: webhookJson.result?.has_custom_certificate ?? null,
};
}
} catch {
// ignore webhook errors for health
}
result.ok = true;
result.status = null;
result.error = null;
result.elapsedMs = Date.now() - started;
return result;
} catch (err) {
return {
...result,
status: err instanceof Response ? err.status : result.status,
error: err instanceof Error ? err.message : String(err),
elapsedMs: Date.now() - started,
};
}
}
export async function getHealthSnapshot( export async function getHealthSnapshot(
timeoutMs?: number, timeoutMs?: number,

View File

@@ -9,6 +9,7 @@ export {
runWebHeartbeatOnce, runWebHeartbeatOnce,
setHeartbeatsEnabled, setHeartbeatsEnabled,
type WebMonitorTuning, type WebMonitorTuning,
type WebProviderStatus,
} from "./web/auto-reply.js"; } from "./web/auto-reply.js";
export { export {
extractMediaPlaceholder, extractMediaPlaceholder,

View File

@@ -67,6 +67,7 @@ export type WebMonitorTuning = {
replyHeartbeatMinutes?: number; replyHeartbeatMinutes?: number;
replyHeartbeatNow?: boolean; replyHeartbeatNow?: boolean;
sleep?: (ms: number, signal?: AbortSignal) => Promise<void>; sleep?: (ms: number, signal?: AbortSignal) => Promise<void>;
statusSink?: (status: WebProviderStatus) => void;
}; };
const formatDuration = (ms: number) => const formatDuration = (ms: number) =>
@@ -76,6 +77,22 @@ const DEFAULT_REPLY_HEARTBEAT_MINUTES = 30;
export const HEARTBEAT_TOKEN = "HEARTBEAT_OK"; export const HEARTBEAT_TOKEN = "HEARTBEAT_OK";
export const HEARTBEAT_PROMPT = "HEARTBEAT"; export const HEARTBEAT_PROMPT = "HEARTBEAT";
export type WebProviderStatus = {
running: boolean;
connected: boolean;
reconnectAttempts: number;
lastConnectedAt?: number | null;
lastDisconnect?: {
at: number;
status?: number;
error?: string;
loggedOut?: boolean;
} | null;
lastMessageAt?: number | null;
lastEventAt?: number | null;
lastError?: string | null;
};
function elide(text?: string, limit = 400) { function elide(text?: string, limit = 400) {
if (!text) return text; if (!text) return text;
if (text.length <= limit) return text; if (text.length <= limit) return text;
@@ -702,6 +719,25 @@ export async function monitorWebProvider(
const replyLogger = getChildLogger({ module: "web-auto-reply", runId }); const replyLogger = getChildLogger({ module: "web-auto-reply", runId });
const heartbeatLogger = getChildLogger({ module: "web-heartbeat", runId }); const heartbeatLogger = getChildLogger({ module: "web-heartbeat", runId });
const reconnectLogger = getChildLogger({ module: "web-reconnect", runId }); const reconnectLogger = getChildLogger({ module: "web-reconnect", runId });
const status: WebProviderStatus = {
running: true,
connected: false,
reconnectAttempts: 0,
lastConnectedAt: null,
lastDisconnect: null,
lastMessageAt: null,
lastEventAt: null,
lastError: null,
};
const emitStatus = () => {
tuning.statusSink?.({
...status,
lastDisconnect: status.lastDisconnect
? { ...status.lastDisconnect }
: null,
});
};
emitStatus();
const cfg = loadConfig(); const cfg = loadConfig();
const configuredMaxMb = cfg.inbound?.agent?.mediaMaxMb; const configuredMaxMb = cfg.inbound?.agent?.mediaMaxMb;
const maxMediaBytes = const maxMediaBytes =
@@ -802,6 +838,9 @@ export async function monitorWebProvider(
}; };
const processMessage = async (msg: WebInboundMsg) => { const processMessage = async (msg: WebInboundMsg) => {
status.lastMessageAt = Date.now();
status.lastEventAt = status.lastMessageAt;
emitStatus();
const conversationId = msg.conversationId ?? msg.from; const conversationId = msg.conversationId ?? msg.from;
let combinedBody = buildLine(msg); let combinedBody = buildLine(msg);
@@ -1038,6 +1077,9 @@ export async function monitorWebProvider(
onMessage: async (msg) => { onMessage: async (msg) => {
handledMessages += 1; handledMessages += 1;
lastMessageAt = Date.now(); lastMessageAt = Date.now();
status.lastMessageAt = lastMessageAt;
status.lastEventAt = lastMessageAt;
emitStatus();
lastInboundMsg = msg; lastInboundMsg = msg;
const conversationId = msg.conversationId ?? msg.from; const conversationId = msg.conversationId ?? msg.from;
@@ -1090,6 +1132,12 @@ export async function monitorWebProvider(
}, },
}); });
status.connected = true;
status.lastConnectedAt = Date.now();
status.lastEventAt = status.lastConnectedAt;
status.lastError = null;
emitStatus();
// Surface a concise connection event for the next main-session turn/heartbeat. // Surface a concise connection event for the next main-session turn/heartbeat.
const { e164: selfE164 } = readWebSelfId(); const { e164: selfE164 } = readWebSelfId();
enqueueSystemEvent( enqueueSystemEvent(
@@ -1420,13 +1468,15 @@ export async function monitorWebProvider(
if (uptimeMs > heartbeatSeconds * 1000) { if (uptimeMs > heartbeatSeconds * 1000) {
reconnectAttempts = 0; // Healthy stretch; reset the backoff. reconnectAttempts = 0; // Healthy stretch; reset the backoff.
} }
status.reconnectAttempts = reconnectAttempts;
emitStatus();
if (stopRequested() || sigintStop || reason === "aborted") { if (stopRequested() || sigintStop || reason === "aborted") {
await closeListener(); await closeListener();
break; break;
} }
const status = const statusCode =
(typeof reason === "object" && reason && "status" in reason (typeof reason === "object" && reason && "status" in reason
? (reason as { status?: number }).status ? (reason as { status?: number }).status
: undefined) ?? "unknown"; : undefined) ?? "unknown";
@@ -1437,11 +1487,22 @@ export async function monitorWebProvider(
(reason as { isLoggedOut?: boolean }).isLoggedOut; (reason as { isLoggedOut?: boolean }).isLoggedOut;
const errorStr = formatError(reason); const errorStr = formatError(reason);
status.connected = false;
status.lastEventAt = Date.now();
status.lastDisconnect = {
at: status.lastEventAt,
status: typeof statusCode === "number" ? statusCode : undefined,
error: errorStr,
loggedOut: Boolean(loggedOut),
};
status.lastError = errorStr;
status.reconnectAttempts = reconnectAttempts;
emitStatus();
reconnectLogger.info( reconnectLogger.info(
{ {
connectionId, connectionId,
status, status: statusCode,
loggedOut, loggedOut,
reconnectAttempts, reconnectAttempts,
error: errorStr, error: errorStr,
@@ -1450,7 +1511,7 @@ export async function monitorWebProvider(
); );
enqueueSystemEvent( enqueueSystemEvent(
`WhatsApp gateway disconnected (status ${status ?? "unknown"})`, `WhatsApp gateway disconnected (status ${statusCode ?? "unknown"})`,
); );
if (loggedOut) { if (loggedOut) {
@@ -1464,6 +1525,8 @@ export async function monitorWebProvider(
} }
reconnectAttempts += 1; reconnectAttempts += 1;
status.reconnectAttempts = reconnectAttempts;
emitStatus();
if ( if (
reconnectPolicy.maxAttempts > 0 && reconnectPolicy.maxAttempts > 0 &&
reconnectAttempts >= reconnectPolicy.maxAttempts reconnectAttempts >= reconnectPolicy.maxAttempts
@@ -1471,7 +1534,7 @@ export async function monitorWebProvider(
reconnectLogger.warn( reconnectLogger.warn(
{ {
connectionId, connectionId,
status, status: statusCode,
reconnectAttempts, reconnectAttempts,
maxAttempts: reconnectPolicy.maxAttempts, maxAttempts: reconnectPolicy.maxAttempts,
}, },
@@ -1490,7 +1553,7 @@ export async function monitorWebProvider(
reconnectLogger.info( reconnectLogger.info(
{ {
connectionId, connectionId,
status, status: statusCode,
reconnectAttempts, reconnectAttempts,
maxAttempts: reconnectPolicy.maxAttempts || "unlimited", maxAttempts: reconnectPolicy.maxAttempts || "unlimited",
delayMs: delay, delayMs: delay,
@@ -1499,7 +1562,7 @@ export async function monitorWebProvider(
); );
runtime.error( runtime.error(
danger( danger(
`WhatsApp Web connection closed (status ${status}). Retry ${reconnectAttempts}/${reconnectPolicy.maxAttempts || "∞"} in ${formatDuration(delay)}… (${errorStr})`, `WhatsApp Web connection closed (status ${statusCode}). Retry ${reconnectAttempts}/${reconnectPolicy.maxAttempts || "∞"} in ${formatDuration(delay)}… (${errorStr})`,
), ),
); );
await closeListener(); await closeListener();
@@ -1510,6 +1573,11 @@ export async function monitorWebProvider(
} }
} }
status.running = false;
status.connected = false;
status.lastEventAt = Date.now();
emitStatus();
process.removeListener("SIGINT", handleSigint); process.removeListener("SIGINT", handleSigint);
} }