fix: emit diagnostics across channels

This commit is contained in:
Peter Steinberger
2026-01-21 00:29:42 +00:00
parent e447233533
commit ec01e5c7e6
8 changed files with 318 additions and 275 deletions

View File

@@ -13,6 +13,11 @@ const mocks = vi.hoisted(() => ({
aborted: false,
})),
}));
const diagnosticMocks = vi.hoisted(() => ({
logMessageQueued: vi.fn(),
logMessageProcessed: vi.fn(),
logSessionStateChange: vi.fn(),
}));
vi.mock("./route-reply.js", () => ({
isRoutableChannel: (channel: string | undefined) =>
@@ -34,6 +39,12 @@ vi.mock("./abort.js", () => ({
},
}));
vi.mock("../../logging/diagnostic.js", () => ({
logMessageQueued: diagnosticMocks.logMessageQueued,
logMessageProcessed: diagnosticMocks.logMessageProcessed,
logSessionStateChange: diagnosticMocks.logSessionStateChange,
}));
const { dispatchReplyFromConfig } = await import("./dispatch-from-config.js");
const { resetInboundDedupe } = await import("./inbound-dedupe.js");
@@ -50,6 +61,9 @@ function createDispatcher(): ReplyDispatcher {
describe("dispatchReplyFromConfig", () => {
beforeEach(() => {
resetInboundDedupe();
diagnosticMocks.logMessageQueued.mockReset();
diagnosticMocks.logMessageProcessed.mockReset();
diagnosticMocks.logSessionStateChange.mockReset();
});
it("does not route when Provider matches OriginatingChannel (even if Surface is missing)", async () => {
mocks.tryFastAbortFromMessage.mockResolvedValue({
@@ -186,4 +200,74 @@ describe("dispatchReplyFromConfig", () => {
expect(replyResolver).toHaveBeenCalledTimes(1);
});
it("emits diagnostics when enabled", async () => {
mocks.tryFastAbortFromMessage.mockResolvedValue({
handled: false,
aborted: false,
});
const cfg = { diagnostics: { enabled: true } } as ClawdbotConfig;
const dispatcher = createDispatcher();
const ctx = buildTestCtx({
Provider: "slack",
Surface: "slack",
SessionKey: "agent:main:main",
MessageSid: "msg-1",
To: "slack:C123",
});
const replyResolver = async () => ({ text: "hi" }) satisfies ReplyPayload;
await dispatchReplyFromConfig({ ctx, cfg, dispatcher, replyResolver });
expect(diagnosticMocks.logMessageQueued).toHaveBeenCalledTimes(1);
expect(diagnosticMocks.logSessionStateChange).toHaveBeenCalledWith({
sessionKey: "agent:main:main",
state: "processing",
reason: "message_start",
});
expect(diagnosticMocks.logMessageProcessed).toHaveBeenCalledWith(
expect.objectContaining({
channel: "slack",
outcome: "completed",
sessionKey: "agent:main:main",
}),
);
});
it("marks diagnostics skipped for duplicate inbound messages", async () => {
mocks.tryFastAbortFromMessage.mockResolvedValue({
handled: false,
aborted: false,
});
const cfg = { diagnostics: { enabled: true } } as ClawdbotConfig;
const ctx = buildTestCtx({
Provider: "whatsapp",
OriginatingChannel: "whatsapp",
OriginatingTo: "whatsapp:+15555550123",
MessageSid: "msg-dup",
});
const replyResolver = vi.fn(async () => ({ text: "hi" }) as ReplyPayload);
await dispatchReplyFromConfig({
ctx,
cfg,
dispatcher: createDispatcher(),
replyResolver,
});
await dispatchReplyFromConfig({
ctx,
cfg,
dispatcher: createDispatcher(),
replyResolver,
});
expect(replyResolver).toHaveBeenCalledTimes(1);
expect(diagnosticMocks.logMessageProcessed).toHaveBeenCalledWith(
expect.objectContaining({
channel: "whatsapp",
outcome: "skipped",
reason: "duplicate",
}),
);
});
});

View File

@@ -1,5 +1,11 @@
import type { ClawdbotConfig } from "../../config/config.js";
import { logVerbose } from "../../globals.js";
import { isDiagnosticsEnabled } from "../../infra/diagnostic-events.js";
import {
logMessageProcessed,
logMessageQueued,
logSessionStateChange,
} from "../../logging/diagnostic.js";
import { getReplyFromConfig } from "../reply.js";
import type { FinalizedMsgContext } from "../templating.js";
import type { GetReplyOptions, ReplyPayload } from "../types.js";
@@ -21,8 +27,55 @@ export async function dispatchReplyFromConfig(params: {
replyResolver?: typeof getReplyFromConfig;
}): Promise<DispatchFromConfigResult> {
const { ctx, cfg, dispatcher } = params;
const diagnosticsEnabled = isDiagnosticsEnabled(cfg);
const channel = String(ctx.Surface ?? ctx.Provider ?? "unknown").toLowerCase();
const chatId = ctx.To ?? ctx.From;
const messageId = ctx.MessageSid ?? ctx.MessageSidFirst ?? ctx.MessageSidLast;
const sessionKey = ctx.SessionKey;
const startTime = diagnosticsEnabled ? Date.now() : 0;
const canTrackSession = diagnosticsEnabled && Boolean(sessionKey);
const recordProcessed = (
outcome: "completed" | "skipped" | "error",
opts?: {
reason?: string;
error?: string;
},
) => {
if (!diagnosticsEnabled) return;
logMessageProcessed({
channel,
chatId,
messageId,
sessionKey,
durationMs: Date.now() - startTime,
outcome,
reason: opts?.reason,
error: opts?.error,
});
};
const markProcessing = () => {
if (!canTrackSession || !sessionKey) return;
logMessageQueued({ sessionKey, channel, source: "dispatch" });
logSessionStateChange({
sessionKey,
state: "processing",
reason: "message_start",
});
};
const markIdle = (reason: string) => {
if (!canTrackSession || !sessionKey) return;
logSessionStateChange({
sessionKey,
state: "idle",
reason,
});
};
if (shouldSkipDuplicateInbound(ctx)) {
recordProcessed("skipped", { reason: "duplicate" });
return { queuedFinal: false, counts: dispatcher.getQueuedCounts() };
}
@@ -68,95 +121,107 @@ export async function dispatchReplyFromConfig(params: {
}
};
const fastAbort = await tryFastAbortFromMessage({ ctx, cfg });
if (fastAbort.handled) {
const payload = {
text: formatAbortReplyText(fastAbort.stoppedSubagents),
} satisfies ReplyPayload;
markProcessing();
try {
const fastAbort = await tryFastAbortFromMessage({ ctx, cfg });
if (fastAbort.handled) {
const payload = {
text: formatAbortReplyText(fastAbort.stoppedSubagents),
} satisfies ReplyPayload;
let queuedFinal = false;
let routedFinalCount = 0;
if (shouldRouteToOriginating && originatingChannel && originatingTo) {
const result = await routeReply({
payload,
channel: originatingChannel,
to: originatingTo,
sessionKey: ctx.SessionKey,
accountId: ctx.AccountId,
threadId: ctx.MessageThreadId,
cfg,
});
queuedFinal = result.ok;
if (result.ok) routedFinalCount += 1;
if (!result.ok) {
logVerbose(
`dispatch-from-config: route-reply (abort) failed: ${result.error ?? "unknown error"}`,
);
}
} else {
queuedFinal = dispatcher.sendFinalReply(payload);
}
await dispatcher.waitForIdle();
const counts = dispatcher.getQueuedCounts();
counts.final += routedFinalCount;
recordProcessed("completed", { reason: "fast_abort" });
markIdle("message_completed");
return { queuedFinal, counts };
}
const replyResult = await (params.replyResolver ?? getReplyFromConfig)(
ctx,
{
...params.replyOptions,
onToolResult: (payload: ReplyPayload) => {
if (shouldRouteToOriginating) {
// Fire-and-forget for streaming tool results when routing.
void sendPayloadAsync(payload);
} else {
// Synchronous dispatch to preserve callback timing.
dispatcher.sendToolResult(payload);
}
},
onBlockReply: (payload: ReplyPayload, context) => {
if (shouldRouteToOriginating) {
// Await routed sends so upstream can enforce ordering/timeouts.
return sendPayloadAsync(payload, context?.abortSignal);
} else {
// Synchronous dispatch to preserve callback timing.
dispatcher.sendBlockReply(payload);
}
},
},
cfg,
);
const replies = replyResult ? (Array.isArray(replyResult) ? replyResult : [replyResult]) : [];
let queuedFinal = false;
let routedFinalCount = 0;
if (shouldRouteToOriginating && originatingChannel && originatingTo) {
const result = await routeReply({
payload,
channel: originatingChannel,
to: originatingTo,
sessionKey: ctx.SessionKey,
accountId: ctx.AccountId,
threadId: ctx.MessageThreadId,
cfg,
});
queuedFinal = result.ok;
if (result.ok) routedFinalCount += 1;
if (!result.ok) {
logVerbose(
`dispatch-from-config: route-reply (abort) failed: ${result.error ?? "unknown error"}`,
);
for (const reply of replies) {
if (shouldRouteToOriginating && originatingChannel && originatingTo) {
// Route final reply to originating channel.
const result = await routeReply({
payload: reply,
channel: originatingChannel,
to: originatingTo,
sessionKey: ctx.SessionKey,
accountId: ctx.AccountId,
threadId: ctx.MessageThreadId,
cfg,
});
if (!result.ok) {
logVerbose(
`dispatch-from-config: route-reply (final) failed: ${result.error ?? "unknown error"}`,
);
}
queuedFinal = result.ok || queuedFinal;
if (result.ok) routedFinalCount += 1;
} else {
queuedFinal = dispatcher.sendFinalReply(reply) || queuedFinal;
}
} else {
queuedFinal = dispatcher.sendFinalReply(payload);
}
await dispatcher.waitForIdle();
const counts = dispatcher.getQueuedCounts();
counts.final += routedFinalCount;
recordProcessed("completed");
markIdle("message_completed");
return { queuedFinal, counts };
} catch (err) {
recordProcessed("error", { error: String(err) });
markIdle("message_error");
throw err;
}
const replyResult = await (params.replyResolver ?? getReplyFromConfig)(
ctx,
{
...params.replyOptions,
onToolResult: (payload: ReplyPayload) => {
if (shouldRouteToOriginating) {
// Fire-and-forget for streaming tool results when routing.
void sendPayloadAsync(payload);
} else {
// Synchronous dispatch to preserve callback timing.
dispatcher.sendToolResult(payload);
}
},
onBlockReply: (payload: ReplyPayload, context) => {
if (shouldRouteToOriginating) {
// Await routed sends so upstream can enforce ordering/timeouts.
return sendPayloadAsync(payload, context?.abortSignal);
} else {
// Synchronous dispatch to preserve callback timing.
dispatcher.sendBlockReply(payload);
}
},
},
cfg,
);
const replies = replyResult ? (Array.isArray(replyResult) ? replyResult : [replyResult]) : [];
let queuedFinal = false;
let routedFinalCount = 0;
for (const reply of replies) {
if (shouldRouteToOriginating && originatingChannel && originatingTo) {
// Route final reply to originating channel.
const result = await routeReply({
payload: reply,
channel: originatingChannel,
to: originatingTo,
sessionKey: ctx.SessionKey,
accountId: ctx.AccountId,
threadId: ctx.MessageThreadId,
cfg,
});
if (!result.ok) {
logVerbose(
`dispatch-from-config: route-reply (final) failed: ${result.error ?? "unknown error"}`,
);
}
queuedFinal = result.ok || queuedFinal;
if (result.ok) routedFinalCount += 1;
} else {
queuedFinal = dispatcher.sendFinalReply(reply) || queuedFinal;
}
}
await dispatcher.waitForIdle();
const counts = dispatcher.getQueuedCounts();
counts.final += routedFinalCount;
return { queuedFinal, counts };
}

View File

@@ -13,6 +13,7 @@ import {
readConfigFileSnapshot,
writeConfigFile,
} from "../config/config.js";
import { isDiagnosticsEnabled } from "../infra/diagnostic-events.js";
import { applyPluginAutoEnable } from "../config/plugin-auto-enable.js";
import { clearAgentRunContext, onAgentEvent } from "../infra/agent-events.js";
import { onHeartbeatEvent } from "../infra/heartbeat-events.js";
@@ -26,6 +27,7 @@ import {
} from "../infra/skills-remote.js";
import { scheduleGatewayUpdateCheck } from "../infra/update-startup.js";
import { setGatewaySigusr1RestartPolicy } from "../infra/restart.js";
import { startDiagnosticHeartbeat, stopDiagnosticHeartbeat } from "../logging/diagnostic.js";
import { createSubsystemLogger, runtimeForLogger } from "../logging/subsystem.js";
import type { PluginServicesHandle } from "../plugins/services.js";
import type { RuntimeEnv } from "../runtime.js";
@@ -198,6 +200,10 @@ export async function startGatewayServer(
}
const cfgAtStart = loadConfig();
const diagnosticsEnabled = isDiagnosticsEnabled(cfgAtStart);
if (diagnosticsEnabled) {
startDiagnosticHeartbeat();
}
setGatewaySigusr1RestartPolicy({ allowExternal: cfgAtStart.commands?.restart === true });
initSubagentRegistry();
const defaultAgentId = resolveDefaultAgentId(cfgAtStart);
@@ -533,5 +539,12 @@ export async function startGatewayServer(
httpServer,
});
return { close };
return {
close: async (opts) => {
if (diagnosticsEnabled) {
stopDiagnosticHeartbeat();
}
await close(opts);
},
};
}

View File

@@ -141,9 +141,11 @@ export type DiagnosticEventPayload =
| DiagnosticRunAttemptEvent
| DiagnosticHeartbeatEvent;
type DiagnosticEventInput<T extends DiagnosticEventPayload = DiagnosticEventPayload> =
T extends DiagnosticEventPayload ? Omit<T, "seq" | "ts"> : never;
export type DiagnosticEventInput = DiagnosticEventPayload extends infer Event
? Event extends DiagnosticEventPayload
? Omit<Event, "seq" | "ts">
: never
: never;
let seq = 0;
const listeners = new Set<(evt: DiagnosticEventPayload) => void>();
@@ -151,14 +153,12 @@ export function isDiagnosticsEnabled(config?: ClawdbotConfig): boolean {
return config?.diagnostics?.enabled === true;
}
export function emitDiagnosticEvent<T extends DiagnosticEventPayload>(
event: DiagnosticEventInput<T>,
) {
export function emitDiagnosticEvent(event: DiagnosticEventInput) {
const enriched = {
...event,
seq: (seq += 1),
ts: Date.now(),
} as DiagnosticEventPayload;
} satisfies DiagnosticEventPayload;
for (const listener of listeners) {
try {
listener(enriched);

View File

@@ -338,6 +338,7 @@ export function startDiagnosticHeartbeat() {
}
}
}, 30_000);
heartbeatInterval.unref?.();
}
export function stopDiagnosticHeartbeat() {

View File

@@ -2,14 +2,6 @@ import { beforeEach, describe, expect, it, vi } from "vitest";
const buildTelegramMessageContext = vi.hoisted(() => vi.fn());
const dispatchTelegramMessage = vi.hoisted(() => vi.fn());
const logMessageQueued = vi.hoisted(() => vi.fn());
const logMessageProcessed = vi.hoisted(() => vi.fn());
const logSessionStateChange = vi.hoisted(() => vi.fn());
const diagnosticLogger = vi.hoisted(() => ({
info: vi.fn(),
debug: vi.fn(),
error: vi.fn(),
}));
vi.mock("./bot-message-context.js", () => ({
buildTelegramMessageContext,
@@ -19,25 +11,12 @@ vi.mock("./bot-message-dispatch.js", () => ({
dispatchTelegramMessage,
}));
vi.mock("../logging/diagnostic.js", () => ({
diagnosticLogger,
logMessageQueued,
logMessageProcessed,
logSessionStateChange,
}));
import { createTelegramMessageProcessor } from "./bot-message.js";
describe("telegram bot message diagnostics", () => {
describe("telegram bot message processor", () => {
beforeEach(() => {
buildTelegramMessageContext.mockReset();
dispatchTelegramMessage.mockReset();
logMessageQueued.mockReset();
logMessageProcessed.mockReset();
logSessionStateChange.mockReset();
diagnosticLogger.info.mockReset();
diagnosticLogger.debug.mockReset();
diagnosticLogger.error.mockReset();
});
const baseDeps = {
@@ -63,39 +42,19 @@ describe("telegram bot message diagnostics", () => {
resolveBotTopicsEnabled: () => false,
};
it("decrements queue depth after successful processing", async () => {
buildTelegramMessageContext.mockResolvedValue({
route: { sessionKey: "agent:main:main" },
});
it("dispatches when context is available", async () => {
buildTelegramMessageContext.mockResolvedValue({ route: { sessionKey: "agent:main:main" } });
const processMessage = createTelegramMessageProcessor(baseDeps);
await processMessage({ message: { chat: { id: 123 }, message_id: 456 } }, [], [], {});
expect(logMessageQueued).toHaveBeenCalledTimes(1);
expect(logSessionStateChange).toHaveBeenCalledWith({
sessionKey: "agent:main:main",
state: "idle",
reason: "message_completed",
});
expect(dispatchTelegramMessage).toHaveBeenCalledTimes(1);
});
it("decrements queue depth after processing error", async () => {
buildTelegramMessageContext.mockResolvedValue({
route: { sessionKey: "agent:main:main" },
});
dispatchTelegramMessage.mockRejectedValue(new Error("boom"));
it("skips dispatch when no context is produced", async () => {
buildTelegramMessageContext.mockResolvedValue(null);
const processMessage = createTelegramMessageProcessor(baseDeps);
await expect(
processMessage({ message: { chat: { id: 123 }, message_id: 456 } }, [], [], {}),
).rejects.toThrow("boom");
expect(logMessageQueued).toHaveBeenCalledTimes(1);
expect(logSessionStateChange).toHaveBeenCalledWith({
sessionKey: "agent:main:main",
state: "idle",
reason: "message_error",
});
await processMessage({ message: { chat: { id: 123 }, message_id: 456 } }, [], [], {});
expect(dispatchTelegramMessage).not.toHaveBeenCalled();
});
});

View File

@@ -1,12 +1,6 @@
// @ts-nocheck
import { buildTelegramMessageContext } from "./bot-message-context.js";
import { dispatchTelegramMessage } from "./bot-message-dispatch.js";
import {
diagnosticLogger as diag,
logMessageProcessed,
logMessageQueued,
logSessionStateChange,
} from "../logging/diagnostic.js";
export const createTelegramMessageProcessor = (deps) => {
const {
@@ -33,122 +27,37 @@ export const createTelegramMessageProcessor = (deps) => {
} = deps;
return async (primaryCtx, allMedia, storeAllowFrom, options) => {
const chatId = primaryCtx?.message?.chat?.id ?? primaryCtx?.chat?.id ?? "unknown";
const messageId = primaryCtx?.message?.message_id ?? "unknown";
const startTime = Date.now();
diag.info(
`process message start: channel=telegram chatId=${chatId} messageId=${messageId} mediaCount=${
allMedia?.length ?? 0
}`,
);
let sessionKey: string | undefined;
try {
const context = await buildTelegramMessageContext({
primaryCtx,
allMedia,
storeAllowFrom,
options,
bot,
cfg,
account,
historyLimit,
groupHistories,
dmPolicy,
allowFrom,
groupAllowFrom,
ackReactionScope,
logger,
resolveGroupActivation,
resolveGroupRequireMention,
resolveTelegramGroupConfig,
});
if (!context) {
const durationMs = Date.now() - startTime;
diag.debug(
`process message skipped: channel=telegram chatId=${chatId} messageId=${messageId} reason=no_context`,
);
logMessageProcessed({
channel: "telegram",
chatId,
messageId,
durationMs,
outcome: "skipped",
reason: "no_context",
});
return;
}
sessionKey = context?.route?.sessionKey;
diag.info(
`process message dispatching: channel=telegram chatId=${chatId} messageId=${messageId} sessionKey=${
sessionKey ?? "unknown"
}`,
);
if (sessionKey) {
logMessageQueued({ sessionKey, channel: "telegram", source: "telegram" });
}
await dispatchTelegramMessage({
context,
bot,
cfg,
runtime,
replyToMode,
streamMode,
textLimit,
telegramCfg,
opts,
resolveBotTopicsEnabled,
});
const durationMs = Date.now() - startTime;
logMessageProcessed({
channel: "telegram",
chatId,
messageId,
sessionKey,
durationMs,
outcome: "completed",
});
if (sessionKey) {
logSessionStateChange({
sessionKey,
state: "idle",
reason: "message_completed",
});
}
diag.info(
`process message complete: channel=telegram chatId=${chatId} messageId=${messageId} sessionKey=${
sessionKey ?? "unknown"
} durationMs=${durationMs}`,
);
} catch (err) {
const durationMs = Date.now() - startTime;
logMessageProcessed({
channel: "telegram",
chatId,
messageId,
sessionKey,
durationMs,
outcome: "error",
error: String(err),
});
if (sessionKey) {
logSessionStateChange({
sessionKey,
state: "idle",
reason: "message_error",
});
}
diag.error(
`process message error: channel=telegram chatId=${chatId} messageId=${messageId} durationMs=${durationMs} error="${String(
err,
)}"`,
);
throw err;
}
const context = await buildTelegramMessageContext({
primaryCtx,
allMedia,
storeAllowFrom,
options,
bot,
cfg,
account,
historyLimit,
groupHistories,
dmPolicy,
allowFrom,
groupAllowFrom,
ackReactionScope,
logger,
resolveGroupActivation,
resolveGroupRequireMention,
resolveTelegramGroupConfig,
});
if (!context) return;
await dispatchTelegramMessage({
context,
bot,
cfg,
runtime,
replyToMode,
streamMode,
textLimit,
telegramCfg,
opts,
resolveBotTopicsEnabled,
});
};
};

View File

@@ -2,6 +2,7 @@ import { createServer } from "node:http";
import { webhookCallback } from "grammy";
import type { ClawdbotConfig } from "../config/config.js";
import { isDiagnosticsEnabled } from "../infra/diagnostic-events.js";
import { formatErrorMessage } from "../infra/errors.js";
import type { RuntimeEnv } from "../runtime.js";
import { defaultRuntime } from "../runtime.js";
@@ -34,6 +35,7 @@ export async function startTelegramWebhook(opts: {
const port = opts.port ?? 8787;
const host = opts.host ?? "0.0.0.0";
const runtime = opts.runtime ?? defaultRuntime;
const diagnosticsEnabled = isDiagnosticsEnabled(opts.config);
const bot = createTelegramBot({
token: opts.token,
runtime,
@@ -45,7 +47,9 @@ export async function startTelegramWebhook(opts: {
secretToken: opts.secret,
});
startDiagnosticHeartbeat();
if (diagnosticsEnabled) {
startDiagnosticHeartbeat();
}
const server = createServer((req, res) => {
if (req.url === healthPath) {
@@ -59,24 +63,30 @@ export async function startTelegramWebhook(opts: {
return;
}
const startTime = Date.now();
logWebhookReceived({ channel: "telegram", updateType: "telegram-post" });
if (diagnosticsEnabled) {
logWebhookReceived({ channel: "telegram", updateType: "telegram-post" });
}
const handled = handler(req, res);
if (handled && typeof (handled as Promise<void>).catch === "function") {
void (handled as Promise<void>)
.then(() => {
logWebhookProcessed({
channel: "telegram",
updateType: "telegram-post",
durationMs: Date.now() - startTime,
});
if (diagnosticsEnabled) {
logWebhookProcessed({
channel: "telegram",
updateType: "telegram-post",
durationMs: Date.now() - startTime,
});
}
})
.catch((err) => {
const errMsg = formatErrorMessage(err);
logWebhookError({
channel: "telegram",
updateType: "telegram-post",
error: errMsg,
});
if (diagnosticsEnabled) {
logWebhookError({
channel: "telegram",
updateType: "telegram-post",
error: errMsg,
});
}
runtime.log?.(`webhook handler failed: ${errMsg}`);
if (!res.headersSent) res.writeHead(500);
res.end();
@@ -98,7 +108,9 @@ export async function startTelegramWebhook(opts: {
const shutdown = () => {
server.close();
void bot.stop();
stopDiagnosticHeartbeat();
if (diagnosticsEnabled) {
stopDiagnosticHeartbeat();
}
};
if (opts.abortSignal) {
opts.abortSignal.addEventListener("abort", shutdown, { once: true });