From ec01e5c7e671f25a27bb04f44bf0f96816bd0332 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Wed, 21 Jan 2026 00:29:42 +0000 Subject: [PATCH] fix: emit diagnostics across channels --- .../reply/dispatch-from-config.test.ts | 84 +++++++ src/auto-reply/reply/dispatch-from-config.ts | 229 +++++++++++------- src/gateway/server.impl.ts | 15 +- src/infra/diagnostic-events.ts | 14 +- src/logging/diagnostic.ts | 1 + src/telegram/bot-message.test.ts | 57 +---- src/telegram/bot-message.ts | 155 +++--------- src/telegram/webhook.ts | 38 ++- 8 files changed, 318 insertions(+), 275 deletions(-) diff --git a/src/auto-reply/reply/dispatch-from-config.test.ts b/src/auto-reply/reply/dispatch-from-config.test.ts index b395e2bc1..0e8905ccb 100644 --- a/src/auto-reply/reply/dispatch-from-config.test.ts +++ b/src/auto-reply/reply/dispatch-from-config.test.ts @@ -13,6 +13,11 @@ const mocks = vi.hoisted(() => ({ aborted: false, })), })); +const diagnosticMocks = vi.hoisted(() => ({ + logMessageQueued: vi.fn(), + logMessageProcessed: vi.fn(), + logSessionStateChange: vi.fn(), +})); vi.mock("./route-reply.js", () => ({ isRoutableChannel: (channel: string | undefined) => @@ -34,6 +39,12 @@ vi.mock("./abort.js", () => ({ }, })); +vi.mock("../../logging/diagnostic.js", () => ({ + logMessageQueued: diagnosticMocks.logMessageQueued, + logMessageProcessed: diagnosticMocks.logMessageProcessed, + logSessionStateChange: diagnosticMocks.logSessionStateChange, +})); + const { dispatchReplyFromConfig } = await import("./dispatch-from-config.js"); const { resetInboundDedupe } = await import("./inbound-dedupe.js"); @@ -50,6 +61,9 @@ function createDispatcher(): ReplyDispatcher { describe("dispatchReplyFromConfig", () => { beforeEach(() => { resetInboundDedupe(); + diagnosticMocks.logMessageQueued.mockReset(); + diagnosticMocks.logMessageProcessed.mockReset(); + diagnosticMocks.logSessionStateChange.mockReset(); }); it("does not route when Provider matches OriginatingChannel (even if Surface is missing)", async () => { mocks.tryFastAbortFromMessage.mockResolvedValue({ @@ -186,4 +200,74 @@ describe("dispatchReplyFromConfig", () => { expect(replyResolver).toHaveBeenCalledTimes(1); }); + + it("emits diagnostics when enabled", async () => { + mocks.tryFastAbortFromMessage.mockResolvedValue({ + handled: false, + aborted: false, + }); + const cfg = { diagnostics: { enabled: true } } as ClawdbotConfig; + const dispatcher = createDispatcher(); + const ctx = buildTestCtx({ + Provider: "slack", + Surface: "slack", + SessionKey: "agent:main:main", + MessageSid: "msg-1", + To: "slack:C123", + }); + + const replyResolver = async () => ({ text: "hi" }) satisfies ReplyPayload; + await dispatchReplyFromConfig({ ctx, cfg, dispatcher, replyResolver }); + + expect(diagnosticMocks.logMessageQueued).toHaveBeenCalledTimes(1); + expect(diagnosticMocks.logSessionStateChange).toHaveBeenCalledWith({ + sessionKey: "agent:main:main", + state: "processing", + reason: "message_start", + }); + expect(diagnosticMocks.logMessageProcessed).toHaveBeenCalledWith( + expect.objectContaining({ + channel: "slack", + outcome: "completed", + sessionKey: "agent:main:main", + }), + ); + }); + + it("marks diagnostics skipped for duplicate inbound messages", async () => { + mocks.tryFastAbortFromMessage.mockResolvedValue({ + handled: false, + aborted: false, + }); + const cfg = { diagnostics: { enabled: true } } as ClawdbotConfig; + const ctx = buildTestCtx({ + Provider: "whatsapp", + OriginatingChannel: "whatsapp", + OriginatingTo: "whatsapp:+15555550123", + MessageSid: "msg-dup", + }); + const replyResolver = vi.fn(async () => ({ text: "hi" }) as ReplyPayload); + + await dispatchReplyFromConfig({ + ctx, + cfg, + dispatcher: createDispatcher(), + replyResolver, + }); + await dispatchReplyFromConfig({ + ctx, + cfg, + dispatcher: createDispatcher(), + replyResolver, + }); + + expect(replyResolver).toHaveBeenCalledTimes(1); + expect(diagnosticMocks.logMessageProcessed).toHaveBeenCalledWith( + expect.objectContaining({ + channel: "whatsapp", + outcome: "skipped", + reason: "duplicate", + }), + ); + }); }); diff --git a/src/auto-reply/reply/dispatch-from-config.ts b/src/auto-reply/reply/dispatch-from-config.ts index 391ab6c1d..47989026c 100644 --- a/src/auto-reply/reply/dispatch-from-config.ts +++ b/src/auto-reply/reply/dispatch-from-config.ts @@ -1,5 +1,11 @@ import type { ClawdbotConfig } from "../../config/config.js"; import { logVerbose } from "../../globals.js"; +import { isDiagnosticsEnabled } from "../../infra/diagnostic-events.js"; +import { + logMessageProcessed, + logMessageQueued, + logSessionStateChange, +} from "../../logging/diagnostic.js"; import { getReplyFromConfig } from "../reply.js"; import type { FinalizedMsgContext } from "../templating.js"; import type { GetReplyOptions, ReplyPayload } from "../types.js"; @@ -21,8 +27,55 @@ export async function dispatchReplyFromConfig(params: { replyResolver?: typeof getReplyFromConfig; }): Promise { const { ctx, cfg, dispatcher } = params; + const diagnosticsEnabled = isDiagnosticsEnabled(cfg); + const channel = String(ctx.Surface ?? ctx.Provider ?? "unknown").toLowerCase(); + const chatId = ctx.To ?? ctx.From; + const messageId = ctx.MessageSid ?? ctx.MessageSidFirst ?? ctx.MessageSidLast; + const sessionKey = ctx.SessionKey; + const startTime = diagnosticsEnabled ? Date.now() : 0; + const canTrackSession = diagnosticsEnabled && Boolean(sessionKey); + + const recordProcessed = ( + outcome: "completed" | "skipped" | "error", + opts?: { + reason?: string; + error?: string; + }, + ) => { + if (!diagnosticsEnabled) return; + logMessageProcessed({ + channel, + chatId, + messageId, + sessionKey, + durationMs: Date.now() - startTime, + outcome, + reason: opts?.reason, + error: opts?.error, + }); + }; + + const markProcessing = () => { + if (!canTrackSession || !sessionKey) return; + logMessageQueued({ sessionKey, channel, source: "dispatch" }); + logSessionStateChange({ + sessionKey, + state: "processing", + reason: "message_start", + }); + }; + + const markIdle = (reason: string) => { + if (!canTrackSession || !sessionKey) return; + logSessionStateChange({ + sessionKey, + state: "idle", + reason, + }); + }; if (shouldSkipDuplicateInbound(ctx)) { + recordProcessed("skipped", { reason: "duplicate" }); return { queuedFinal: false, counts: dispatcher.getQueuedCounts() }; } @@ -68,95 +121,107 @@ export async function dispatchReplyFromConfig(params: { } }; - const fastAbort = await tryFastAbortFromMessage({ ctx, cfg }); - if (fastAbort.handled) { - const payload = { - text: formatAbortReplyText(fastAbort.stoppedSubagents), - } satisfies ReplyPayload; + markProcessing(); + + try { + const fastAbort = await tryFastAbortFromMessage({ ctx, cfg }); + if (fastAbort.handled) { + const payload = { + text: formatAbortReplyText(fastAbort.stoppedSubagents), + } satisfies ReplyPayload; + let queuedFinal = false; + let routedFinalCount = 0; + if (shouldRouteToOriginating && originatingChannel && originatingTo) { + const result = await routeReply({ + payload, + channel: originatingChannel, + to: originatingTo, + sessionKey: ctx.SessionKey, + accountId: ctx.AccountId, + threadId: ctx.MessageThreadId, + cfg, + }); + queuedFinal = result.ok; + if (result.ok) routedFinalCount += 1; + if (!result.ok) { + logVerbose( + `dispatch-from-config: route-reply (abort) failed: ${result.error ?? "unknown error"}`, + ); + } + } else { + queuedFinal = dispatcher.sendFinalReply(payload); + } + await dispatcher.waitForIdle(); + const counts = dispatcher.getQueuedCounts(); + counts.final += routedFinalCount; + recordProcessed("completed", { reason: "fast_abort" }); + markIdle("message_completed"); + return { queuedFinal, counts }; + } + + const replyResult = await (params.replyResolver ?? getReplyFromConfig)( + ctx, + { + ...params.replyOptions, + onToolResult: (payload: ReplyPayload) => { + if (shouldRouteToOriginating) { + // Fire-and-forget for streaming tool results when routing. + void sendPayloadAsync(payload); + } else { + // Synchronous dispatch to preserve callback timing. + dispatcher.sendToolResult(payload); + } + }, + onBlockReply: (payload: ReplyPayload, context) => { + if (shouldRouteToOriginating) { + // Await routed sends so upstream can enforce ordering/timeouts. + return sendPayloadAsync(payload, context?.abortSignal); + } else { + // Synchronous dispatch to preserve callback timing. + dispatcher.sendBlockReply(payload); + } + }, + }, + cfg, + ); + + const replies = replyResult ? (Array.isArray(replyResult) ? replyResult : [replyResult]) : []; + let queuedFinal = false; let routedFinalCount = 0; - if (shouldRouteToOriginating && originatingChannel && originatingTo) { - const result = await routeReply({ - payload, - channel: originatingChannel, - to: originatingTo, - sessionKey: ctx.SessionKey, - accountId: ctx.AccountId, - threadId: ctx.MessageThreadId, - cfg, - }); - queuedFinal = result.ok; - if (result.ok) routedFinalCount += 1; - if (!result.ok) { - logVerbose( - `dispatch-from-config: route-reply (abort) failed: ${result.error ?? "unknown error"}`, - ); + for (const reply of replies) { + if (shouldRouteToOriginating && originatingChannel && originatingTo) { + // Route final reply to originating channel. + const result = await routeReply({ + payload: reply, + channel: originatingChannel, + to: originatingTo, + sessionKey: ctx.SessionKey, + accountId: ctx.AccountId, + threadId: ctx.MessageThreadId, + cfg, + }); + if (!result.ok) { + logVerbose( + `dispatch-from-config: route-reply (final) failed: ${result.error ?? "unknown error"}`, + ); + } + queuedFinal = result.ok || queuedFinal; + if (result.ok) routedFinalCount += 1; + } else { + queuedFinal = dispatcher.sendFinalReply(reply) || queuedFinal; } - } else { - queuedFinal = dispatcher.sendFinalReply(payload); } await dispatcher.waitForIdle(); + const counts = dispatcher.getQueuedCounts(); counts.final += routedFinalCount; + recordProcessed("completed"); + markIdle("message_completed"); return { queuedFinal, counts }; + } catch (err) { + recordProcessed("error", { error: String(err) }); + markIdle("message_error"); + throw err; } - - const replyResult = await (params.replyResolver ?? getReplyFromConfig)( - ctx, - { - ...params.replyOptions, - onToolResult: (payload: ReplyPayload) => { - if (shouldRouteToOriginating) { - // Fire-and-forget for streaming tool results when routing. - void sendPayloadAsync(payload); - } else { - // Synchronous dispatch to preserve callback timing. - dispatcher.sendToolResult(payload); - } - }, - onBlockReply: (payload: ReplyPayload, context) => { - if (shouldRouteToOriginating) { - // Await routed sends so upstream can enforce ordering/timeouts. - return sendPayloadAsync(payload, context?.abortSignal); - } else { - // Synchronous dispatch to preserve callback timing. - dispatcher.sendBlockReply(payload); - } - }, - }, - cfg, - ); - - const replies = replyResult ? (Array.isArray(replyResult) ? replyResult : [replyResult]) : []; - - let queuedFinal = false; - let routedFinalCount = 0; - for (const reply of replies) { - if (shouldRouteToOriginating && originatingChannel && originatingTo) { - // Route final reply to originating channel. - const result = await routeReply({ - payload: reply, - channel: originatingChannel, - to: originatingTo, - sessionKey: ctx.SessionKey, - accountId: ctx.AccountId, - threadId: ctx.MessageThreadId, - cfg, - }); - if (!result.ok) { - logVerbose( - `dispatch-from-config: route-reply (final) failed: ${result.error ?? "unknown error"}`, - ); - } - queuedFinal = result.ok || queuedFinal; - if (result.ok) routedFinalCount += 1; - } else { - queuedFinal = dispatcher.sendFinalReply(reply) || queuedFinal; - } - } - await dispatcher.waitForIdle(); - - const counts = dispatcher.getQueuedCounts(); - counts.final += routedFinalCount; - return { queuedFinal, counts }; } diff --git a/src/gateway/server.impl.ts b/src/gateway/server.impl.ts index 898ed1ca7..33a0d9152 100644 --- a/src/gateway/server.impl.ts +++ b/src/gateway/server.impl.ts @@ -13,6 +13,7 @@ import { readConfigFileSnapshot, writeConfigFile, } from "../config/config.js"; +import { isDiagnosticsEnabled } from "../infra/diagnostic-events.js"; import { applyPluginAutoEnable } from "../config/plugin-auto-enable.js"; import { clearAgentRunContext, onAgentEvent } from "../infra/agent-events.js"; import { onHeartbeatEvent } from "../infra/heartbeat-events.js"; @@ -26,6 +27,7 @@ import { } from "../infra/skills-remote.js"; import { scheduleGatewayUpdateCheck } from "../infra/update-startup.js"; import { setGatewaySigusr1RestartPolicy } from "../infra/restart.js"; +import { startDiagnosticHeartbeat, stopDiagnosticHeartbeat } from "../logging/diagnostic.js"; import { createSubsystemLogger, runtimeForLogger } from "../logging/subsystem.js"; import type { PluginServicesHandle } from "../plugins/services.js"; import type { RuntimeEnv } from "../runtime.js"; @@ -198,6 +200,10 @@ export async function startGatewayServer( } const cfgAtStart = loadConfig(); + const diagnosticsEnabled = isDiagnosticsEnabled(cfgAtStart); + if (diagnosticsEnabled) { + startDiagnosticHeartbeat(); + } setGatewaySigusr1RestartPolicy({ allowExternal: cfgAtStart.commands?.restart === true }); initSubagentRegistry(); const defaultAgentId = resolveDefaultAgentId(cfgAtStart); @@ -533,5 +539,12 @@ export async function startGatewayServer( httpServer, }); - return { close }; + return { + close: async (opts) => { + if (diagnosticsEnabled) { + stopDiagnosticHeartbeat(); + } + await close(opts); + }, + }; } diff --git a/src/infra/diagnostic-events.ts b/src/infra/diagnostic-events.ts index a5c6912db..732db4040 100644 --- a/src/infra/diagnostic-events.ts +++ b/src/infra/diagnostic-events.ts @@ -141,9 +141,11 @@ export type DiagnosticEventPayload = | DiagnosticRunAttemptEvent | DiagnosticHeartbeatEvent; -type DiagnosticEventInput = - T extends DiagnosticEventPayload ? Omit : never; - +export type DiagnosticEventInput = DiagnosticEventPayload extends infer Event + ? Event extends DiagnosticEventPayload + ? Omit + : never + : never; let seq = 0; const listeners = new Set<(evt: DiagnosticEventPayload) => void>(); @@ -151,14 +153,12 @@ export function isDiagnosticsEnabled(config?: ClawdbotConfig): boolean { return config?.diagnostics?.enabled === true; } -export function emitDiagnosticEvent( - event: DiagnosticEventInput, -) { +export function emitDiagnosticEvent(event: DiagnosticEventInput) { const enriched = { ...event, seq: (seq += 1), ts: Date.now(), - } as DiagnosticEventPayload; + } satisfies DiagnosticEventPayload; for (const listener of listeners) { try { listener(enriched); diff --git a/src/logging/diagnostic.ts b/src/logging/diagnostic.ts index 3470acddc..ba6239184 100644 --- a/src/logging/diagnostic.ts +++ b/src/logging/diagnostic.ts @@ -338,6 +338,7 @@ export function startDiagnosticHeartbeat() { } } }, 30_000); + heartbeatInterval.unref?.(); } export function stopDiagnosticHeartbeat() { diff --git a/src/telegram/bot-message.test.ts b/src/telegram/bot-message.test.ts index ce0099cec..4e65c0fa1 100644 --- a/src/telegram/bot-message.test.ts +++ b/src/telegram/bot-message.test.ts @@ -2,14 +2,6 @@ import { beforeEach, describe, expect, it, vi } from "vitest"; const buildTelegramMessageContext = vi.hoisted(() => vi.fn()); const dispatchTelegramMessage = vi.hoisted(() => vi.fn()); -const logMessageQueued = vi.hoisted(() => vi.fn()); -const logMessageProcessed = vi.hoisted(() => vi.fn()); -const logSessionStateChange = vi.hoisted(() => vi.fn()); -const diagnosticLogger = vi.hoisted(() => ({ - info: vi.fn(), - debug: vi.fn(), - error: vi.fn(), -})); vi.mock("./bot-message-context.js", () => ({ buildTelegramMessageContext, @@ -19,25 +11,12 @@ vi.mock("./bot-message-dispatch.js", () => ({ dispatchTelegramMessage, })); -vi.mock("../logging/diagnostic.js", () => ({ - diagnosticLogger, - logMessageQueued, - logMessageProcessed, - logSessionStateChange, -})); - import { createTelegramMessageProcessor } from "./bot-message.js"; -describe("telegram bot message diagnostics", () => { +describe("telegram bot message processor", () => { beforeEach(() => { buildTelegramMessageContext.mockReset(); dispatchTelegramMessage.mockReset(); - logMessageQueued.mockReset(); - logMessageProcessed.mockReset(); - logSessionStateChange.mockReset(); - diagnosticLogger.info.mockReset(); - diagnosticLogger.debug.mockReset(); - diagnosticLogger.error.mockReset(); }); const baseDeps = { @@ -63,39 +42,19 @@ describe("telegram bot message diagnostics", () => { resolveBotTopicsEnabled: () => false, }; - it("decrements queue depth after successful processing", async () => { - buildTelegramMessageContext.mockResolvedValue({ - route: { sessionKey: "agent:main:main" }, - }); + it("dispatches when context is available", async () => { + buildTelegramMessageContext.mockResolvedValue({ route: { sessionKey: "agent:main:main" } }); const processMessage = createTelegramMessageProcessor(baseDeps); await processMessage({ message: { chat: { id: 123 }, message_id: 456 } }, [], [], {}); - expect(logMessageQueued).toHaveBeenCalledTimes(1); - expect(logSessionStateChange).toHaveBeenCalledWith({ - sessionKey: "agent:main:main", - state: "idle", - reason: "message_completed", - }); + expect(dispatchTelegramMessage).toHaveBeenCalledTimes(1); }); - it("decrements queue depth after processing error", async () => { - buildTelegramMessageContext.mockResolvedValue({ - route: { sessionKey: "agent:main:main" }, - }); - dispatchTelegramMessage.mockRejectedValue(new Error("boom")); - + it("skips dispatch when no context is produced", async () => { + buildTelegramMessageContext.mockResolvedValue(null); const processMessage = createTelegramMessageProcessor(baseDeps); - - await expect( - processMessage({ message: { chat: { id: 123 }, message_id: 456 } }, [], [], {}), - ).rejects.toThrow("boom"); - - expect(logMessageQueued).toHaveBeenCalledTimes(1); - expect(logSessionStateChange).toHaveBeenCalledWith({ - sessionKey: "agent:main:main", - state: "idle", - reason: "message_error", - }); + await processMessage({ message: { chat: { id: 123 }, message_id: 456 } }, [], [], {}); + expect(dispatchTelegramMessage).not.toHaveBeenCalled(); }); }); diff --git a/src/telegram/bot-message.ts b/src/telegram/bot-message.ts index bfd89793d..313296b1d 100644 --- a/src/telegram/bot-message.ts +++ b/src/telegram/bot-message.ts @@ -1,12 +1,6 @@ // @ts-nocheck import { buildTelegramMessageContext } from "./bot-message-context.js"; import { dispatchTelegramMessage } from "./bot-message-dispatch.js"; -import { - diagnosticLogger as diag, - logMessageProcessed, - logMessageQueued, - logSessionStateChange, -} from "../logging/diagnostic.js"; export const createTelegramMessageProcessor = (deps) => { const { @@ -33,122 +27,37 @@ export const createTelegramMessageProcessor = (deps) => { } = deps; return async (primaryCtx, allMedia, storeAllowFrom, options) => { - const chatId = primaryCtx?.message?.chat?.id ?? primaryCtx?.chat?.id ?? "unknown"; - const messageId = primaryCtx?.message?.message_id ?? "unknown"; - const startTime = Date.now(); - - diag.info( - `process message start: channel=telegram chatId=${chatId} messageId=${messageId} mediaCount=${ - allMedia?.length ?? 0 - }`, - ); - - let sessionKey: string | undefined; - - try { - const context = await buildTelegramMessageContext({ - primaryCtx, - allMedia, - storeAllowFrom, - options, - bot, - cfg, - account, - historyLimit, - groupHistories, - dmPolicy, - allowFrom, - groupAllowFrom, - ackReactionScope, - logger, - resolveGroupActivation, - resolveGroupRequireMention, - resolveTelegramGroupConfig, - }); - if (!context) { - const durationMs = Date.now() - startTime; - diag.debug( - `process message skipped: channel=telegram chatId=${chatId} messageId=${messageId} reason=no_context`, - ); - logMessageProcessed({ - channel: "telegram", - chatId, - messageId, - durationMs, - outcome: "skipped", - reason: "no_context", - }); - return; - } - - sessionKey = context?.route?.sessionKey; - diag.info( - `process message dispatching: channel=telegram chatId=${chatId} messageId=${messageId} sessionKey=${ - sessionKey ?? "unknown" - }`, - ); - if (sessionKey) { - logMessageQueued({ sessionKey, channel: "telegram", source: "telegram" }); - } - - await dispatchTelegramMessage({ - context, - bot, - cfg, - runtime, - replyToMode, - streamMode, - textLimit, - telegramCfg, - opts, - resolveBotTopicsEnabled, - }); - - const durationMs = Date.now() - startTime; - logMessageProcessed({ - channel: "telegram", - chatId, - messageId, - sessionKey, - durationMs, - outcome: "completed", - }); - if (sessionKey) { - logSessionStateChange({ - sessionKey, - state: "idle", - reason: "message_completed", - }); - } - diag.info( - `process message complete: channel=telegram chatId=${chatId} messageId=${messageId} sessionKey=${ - sessionKey ?? "unknown" - } durationMs=${durationMs}`, - ); - } catch (err) { - const durationMs = Date.now() - startTime; - logMessageProcessed({ - channel: "telegram", - chatId, - messageId, - sessionKey, - durationMs, - outcome: "error", - error: String(err), - }); - if (sessionKey) { - logSessionStateChange({ - sessionKey, - state: "idle", - reason: "message_error", - }); - } - diag.error( - `process message error: channel=telegram chatId=${chatId} messageId=${messageId} durationMs=${durationMs} error="${String( - err, - )}"`, - ); - throw err; - } + const context = await buildTelegramMessageContext({ + primaryCtx, + allMedia, + storeAllowFrom, + options, + bot, + cfg, + account, + historyLimit, + groupHistories, + dmPolicy, + allowFrom, + groupAllowFrom, + ackReactionScope, + logger, + resolveGroupActivation, + resolveGroupRequireMention, + resolveTelegramGroupConfig, + }); + if (!context) return; + await dispatchTelegramMessage({ + context, + bot, + cfg, + runtime, + replyToMode, + streamMode, + textLimit, + telegramCfg, + opts, + resolveBotTopicsEnabled, + }); }; }; diff --git a/src/telegram/webhook.ts b/src/telegram/webhook.ts index 1092722ef..4d341bb88 100644 --- a/src/telegram/webhook.ts +++ b/src/telegram/webhook.ts @@ -2,6 +2,7 @@ import { createServer } from "node:http"; import { webhookCallback } from "grammy"; import type { ClawdbotConfig } from "../config/config.js"; +import { isDiagnosticsEnabled } from "../infra/diagnostic-events.js"; import { formatErrorMessage } from "../infra/errors.js"; import type { RuntimeEnv } from "../runtime.js"; import { defaultRuntime } from "../runtime.js"; @@ -34,6 +35,7 @@ export async function startTelegramWebhook(opts: { const port = opts.port ?? 8787; const host = opts.host ?? "0.0.0.0"; const runtime = opts.runtime ?? defaultRuntime; + const diagnosticsEnabled = isDiagnosticsEnabled(opts.config); const bot = createTelegramBot({ token: opts.token, runtime, @@ -45,7 +47,9 @@ export async function startTelegramWebhook(opts: { secretToken: opts.secret, }); - startDiagnosticHeartbeat(); + if (diagnosticsEnabled) { + startDiagnosticHeartbeat(); + } const server = createServer((req, res) => { if (req.url === healthPath) { @@ -59,24 +63,30 @@ export async function startTelegramWebhook(opts: { return; } const startTime = Date.now(); - logWebhookReceived({ channel: "telegram", updateType: "telegram-post" }); + if (diagnosticsEnabled) { + logWebhookReceived({ channel: "telegram", updateType: "telegram-post" }); + } const handled = handler(req, res); if (handled && typeof (handled as Promise).catch === "function") { void (handled as Promise) .then(() => { - logWebhookProcessed({ - channel: "telegram", - updateType: "telegram-post", - durationMs: Date.now() - startTime, - }); + if (diagnosticsEnabled) { + logWebhookProcessed({ + channel: "telegram", + updateType: "telegram-post", + durationMs: Date.now() - startTime, + }); + } }) .catch((err) => { const errMsg = formatErrorMessage(err); - logWebhookError({ - channel: "telegram", - updateType: "telegram-post", - error: errMsg, - }); + if (diagnosticsEnabled) { + logWebhookError({ + channel: "telegram", + updateType: "telegram-post", + error: errMsg, + }); + } runtime.log?.(`webhook handler failed: ${errMsg}`); if (!res.headersSent) res.writeHead(500); res.end(); @@ -98,7 +108,9 @@ export async function startTelegramWebhook(opts: { const shutdown = () => { server.close(); void bot.stop(); - stopDiagnosticHeartbeat(); + if (diagnosticsEnabled) { + stopDiagnosticHeartbeat(); + } }; if (opts.abortSignal) { opts.abortSignal.addEventListener("abort", shutdown, { once: true });