fix: wire OTLP logs for diagnostics

This commit is contained in:
Peter Steinberger
2026-01-20 22:51:47 +00:00
parent e12abf3114
commit 6734f2d71c
17 changed files with 1656 additions and 107 deletions

View File

@@ -1,3 +1,9 @@
import {
diagnosticLogger as diag,
logMessageQueued,
logSessionStateChange,
} from "../../logging/diagnostic.js";
type EmbeddedPiQueueHandle = {
queueMessage: (text: string) => Promise<void>;
isStreaming: () => boolean;
@@ -14,22 +20,40 @@ const EMBEDDED_RUN_WAITERS = new Map<string, Set<EmbeddedRunWaiter>>();
export function queueEmbeddedPiMessage(sessionId: string, text: string): boolean {
const handle = ACTIVE_EMBEDDED_RUNS.get(sessionId);
if (!handle) return false;
if (!handle.isStreaming()) return false;
if (handle.isCompacting()) return false;
if (!handle) {
diag.debug(`queue message failed: sessionId=${sessionId} reason=no_active_run`);
return false;
}
if (!handle.isStreaming()) {
diag.debug(`queue message failed: sessionId=${sessionId} reason=not_streaming`);
return false;
}
if (handle.isCompacting()) {
diag.debug(`queue message failed: sessionId=${sessionId} reason=compacting`);
return false;
}
logMessageQueued({ sessionId, source: "pi-embedded-runner" });
void handle.queueMessage(text);
return true;
}
export function abortEmbeddedPiRun(sessionId: string): boolean {
const handle = ACTIVE_EMBEDDED_RUNS.get(sessionId);
if (!handle) return false;
if (!handle) {
diag.debug(`abort failed: sessionId=${sessionId} reason=no_active_run`);
return false;
}
diag.info(`aborting run: sessionId=${sessionId}`);
handle.abort();
return true;
}
export function isEmbeddedPiRunActive(sessionId: string): boolean {
return ACTIVE_EMBEDDED_RUNS.has(sessionId);
const active = ACTIVE_EMBEDDED_RUNS.has(sessionId);
if (active) {
diag.debug(`run active check: sessionId=${sessionId} active=true`);
}
return active;
}
export function isEmbeddedPiRunStreaming(sessionId: string): boolean {
@@ -40,6 +64,7 @@ export function isEmbeddedPiRunStreaming(sessionId: string): boolean {
export function waitForEmbeddedPiRunEnd(sessionId: string, timeoutMs = 15_000): Promise<boolean> {
if (!sessionId || !ACTIVE_EMBEDDED_RUNS.has(sessionId)) return Promise.resolve(true);
diag.debug(`waiting for run end: sessionId=${sessionId} timeoutMs=${timeoutMs}`);
return new Promise((resolve) => {
const waiters = EMBEDDED_RUN_WAITERS.get(sessionId) ?? new Set();
const waiter: EmbeddedRunWaiter = {
@@ -48,6 +73,7 @@ export function waitForEmbeddedPiRunEnd(sessionId: string, timeoutMs = 15_000):
() => {
waiters.delete(waiter);
if (waiters.size === 0) EMBEDDED_RUN_WAITERS.delete(sessionId);
diag.warn(`wait timeout: sessionId=${sessionId} timeoutMs=${timeoutMs}`);
resolve(false);
},
Math.max(100, timeoutMs),
@@ -68,6 +94,7 @@ function notifyEmbeddedRunEnded(sessionId: string) {
const waiters = EMBEDDED_RUN_WAITERS.get(sessionId);
if (!waiters || waiters.size === 0) return;
EMBEDDED_RUN_WAITERS.delete(sessionId);
diag.debug(`notifying waiters: sessionId=${sessionId} waiterCount=${waiters.size}`);
for (const waiter of waiters) {
clearTimeout(waiter.timer);
waiter.resolve(true);
@@ -75,13 +102,24 @@ function notifyEmbeddedRunEnded(sessionId: string) {
}
export function setActiveEmbeddedRun(sessionId: string, handle: EmbeddedPiQueueHandle) {
const wasActive = ACTIVE_EMBEDDED_RUNS.has(sessionId);
ACTIVE_EMBEDDED_RUNS.set(sessionId, handle);
logSessionStateChange({
sessionId,
state: "processing",
reason: wasActive ? "run_replaced" : "run_started",
});
diag.info(`run registered: sessionId=${sessionId} totalActive=${ACTIVE_EMBEDDED_RUNS.size}`);
}
export function clearActiveEmbeddedRun(sessionId: string, handle: EmbeddedPiQueueHandle) {
if (ACTIVE_EMBEDDED_RUNS.get(sessionId) === handle) {
ACTIVE_EMBEDDED_RUNS.delete(sessionId);
logSessionStateChange({ sessionId, state: "idle", reason: "run_completed" });
diag.info(`run cleared: sessionId=${sessionId} totalActive=${ACTIVE_EMBEDDED_RUNS.size}`);
notifyEmbeddedRunEnded(sessionId);
} else {
diag.debug(`run clear skipped: sessionId=${sessionId} reason=handle_mismatch`);
}
}

View File

@@ -25,4 +25,31 @@ describe("diagnostic-events", () => {
expect(seqs).toEqual([1, 2]);
});
test("emits message-flow events", async () => {
resetDiagnosticEventsForTest();
const types: string[] = [];
const stop = onDiagnosticEvent((evt) => types.push(evt.type));
emitDiagnosticEvent({
type: "webhook.received",
channel: "telegram",
updateType: "telegram-post",
});
emitDiagnosticEvent({
type: "message.queued",
channel: "telegram",
source: "telegram",
queueDepth: 1,
});
emitDiagnosticEvent({
type: "session.state",
state: "processing",
reason: "run_started",
});
stop();
expect(types).toEqual(["webhook.received", "message.queued", "session.state"]);
});
});

View File

@@ -1,9 +1,14 @@
import type { ClawdbotConfig } from "../config/config.js";
export type DiagnosticUsageEvent = {
type: "model.usage";
export type DiagnosticSessionState = "idle" | "processing" | "waiting";
type DiagnosticBaseEvent = {
ts: number;
seq: number;
};
export type DiagnosticUsageEvent = DiagnosticBaseEvent & {
type: "model.usage";
sessionKey?: string;
sessionId?: string;
channel?: string;
@@ -25,7 +30,116 @@ export type DiagnosticUsageEvent = {
durationMs?: number;
};
export type DiagnosticEventPayload = DiagnosticUsageEvent;
export type DiagnosticWebhookReceivedEvent = DiagnosticBaseEvent & {
type: "webhook.received";
channel: string;
updateType?: string;
chatId?: number | string;
};
export type DiagnosticWebhookProcessedEvent = DiagnosticBaseEvent & {
type: "webhook.processed";
channel: string;
updateType?: string;
chatId?: number | string;
durationMs?: number;
};
export type DiagnosticWebhookErrorEvent = DiagnosticBaseEvent & {
type: "webhook.error";
channel: string;
updateType?: string;
chatId?: number | string;
error: string;
};
export type DiagnosticMessageQueuedEvent = DiagnosticBaseEvent & {
type: "message.queued";
sessionKey?: string;
sessionId?: string;
channel?: string;
source: string;
queueDepth?: number;
};
export type DiagnosticMessageProcessedEvent = DiagnosticBaseEvent & {
type: "message.processed";
channel: string;
messageId?: number | string;
chatId?: number | string;
sessionKey?: string;
sessionId?: string;
durationMs?: number;
outcome: "completed" | "skipped" | "error";
reason?: string;
error?: string;
};
export type DiagnosticSessionStateEvent = DiagnosticBaseEvent & {
type: "session.state";
sessionKey?: string;
sessionId?: string;
prevState?: DiagnosticSessionState;
state: DiagnosticSessionState;
reason?: string;
queueDepth?: number;
};
export type DiagnosticSessionStuckEvent = DiagnosticBaseEvent & {
type: "session.stuck";
sessionKey?: string;
sessionId?: string;
state: DiagnosticSessionState;
ageMs: number;
queueDepth?: number;
};
export type DiagnosticLaneEnqueueEvent = DiagnosticBaseEvent & {
type: "queue.lane.enqueue";
lane: string;
queueSize: number;
};
export type DiagnosticLaneDequeueEvent = DiagnosticBaseEvent & {
type: "queue.lane.dequeue";
lane: string;
queueSize: number;
waitMs: number;
};
export type DiagnosticRunAttemptEvent = DiagnosticBaseEvent & {
type: "run.attempt";
sessionKey?: string;
sessionId?: string;
runId: string;
attempt: number;
};
export type DiagnosticHeartbeatEvent = DiagnosticBaseEvent & {
type: "diagnostic.heartbeat";
webhooks: {
received: number;
processed: number;
errors: number;
};
active: number;
waiting: number;
queued: number;
};
export type DiagnosticEventPayload =
| DiagnosticUsageEvent
| DiagnosticWebhookReceivedEvent
| DiagnosticWebhookProcessedEvent
| DiagnosticWebhookErrorEvent
| DiagnosticMessageQueuedEvent
| DiagnosticMessageProcessedEvent
| DiagnosticSessionStateEvent
| DiagnosticSessionStuckEvent
| DiagnosticLaneEnqueueEvent
| DiagnosticLaneDequeueEvent
| DiagnosticRunAttemptEvent
| DiagnosticHeartbeatEvent;
let seq = 0;
const listeners = new Set<(evt: DiagnosticEventPayload) => void>();

350
src/logging/diagnostic.ts Normal file
View File

@@ -0,0 +1,350 @@
import { emitDiagnosticEvent } from "../infra/diagnostic-events.js";
import { createSubsystemLogger } from "./subsystem.js";
const diag = createSubsystemLogger("diagnostic");
type SessionStateValue = "idle" | "processing" | "waiting";
type SessionState = {
sessionId?: string;
sessionKey?: string;
lastActivity: number;
state: SessionStateValue;
queueDepth: number;
};
type SessionRef = {
sessionId?: string;
sessionKey?: string;
};
const sessionStates = new Map<string, SessionState>();
const webhookStats = {
received: 0,
processed: 0,
errors: 0,
lastReceived: 0,
};
let lastActivityAt = 0;
function markActivity() {
lastActivityAt = Date.now();
}
function resolveSessionKey({ sessionKey, sessionId }: SessionRef) {
return sessionKey ?? sessionId ?? "unknown";
}
function getSessionState(ref: SessionRef): SessionState {
const key = resolveSessionKey(ref);
const existing = sessionStates.get(key);
if (existing) {
if (ref.sessionId) existing.sessionId = ref.sessionId;
if (ref.sessionKey) existing.sessionKey = ref.sessionKey;
return existing;
}
const created: SessionState = {
sessionId: ref.sessionId,
sessionKey: ref.sessionKey,
lastActivity: Date.now(),
state: "idle",
queueDepth: 0,
};
sessionStates.set(key, created);
return created;
}
export function logWebhookReceived(params: {
channel: string;
updateType?: string;
chatId?: number | string;
}) {
webhookStats.received += 1;
webhookStats.lastReceived = Date.now();
diag.info(
`webhook received: channel=${params.channel} type=${params.updateType ?? "unknown"} chatId=${
params.chatId ?? "unknown"
} total=${webhookStats.received}`,
);
emitDiagnosticEvent({
type: "webhook.received",
channel: params.channel,
updateType: params.updateType,
chatId: params.chatId,
});
markActivity();
}
export function logWebhookProcessed(params: {
channel: string;
updateType?: string;
chatId?: number | string;
durationMs?: number;
}) {
webhookStats.processed += 1;
diag.info(
`webhook processed: channel=${params.channel} type=${
params.updateType ?? "unknown"
} chatId=${params.chatId ?? "unknown"} duration=${params.durationMs ?? 0}ms processed=${
webhookStats.processed
}`,
);
emitDiagnosticEvent({
type: "webhook.processed",
channel: params.channel,
updateType: params.updateType,
chatId: params.chatId,
durationMs: params.durationMs,
});
markActivity();
}
export function logWebhookError(params: {
channel: string;
updateType?: string;
chatId?: number | string;
error: string;
}) {
webhookStats.errors += 1;
diag.error(
`webhook error: channel=${params.channel} type=${params.updateType ?? "unknown"} chatId=${
params.chatId ?? "unknown"
} error="${params.error}" errors=${webhookStats.errors}`,
);
emitDiagnosticEvent({
type: "webhook.error",
channel: params.channel,
updateType: params.updateType,
chatId: params.chatId,
error: params.error,
});
markActivity();
}
export function logMessageQueued(params: {
sessionId?: string;
sessionKey?: string;
channel?: string;
source: string;
}) {
const state = getSessionState(params);
state.queueDepth += 1;
state.lastActivity = Date.now();
diag.info(
`message queued: sessionId=${state.sessionId ?? "unknown"} sessionKey=${
state.sessionKey ?? "unknown"
} source=${params.source} queueDepth=${state.queueDepth} sessionState=${state.state}`,
);
emitDiagnosticEvent({
type: "message.queued",
sessionId: state.sessionId,
sessionKey: state.sessionKey,
channel: params.channel,
source: params.source,
queueDepth: state.queueDepth,
});
markActivity();
}
export function logMessageProcessed(params: {
channel: string;
messageId?: number | string;
chatId?: number | string;
sessionId?: string;
sessionKey?: string;
durationMs?: number;
outcome: "completed" | "skipped" | "error";
reason?: string;
error?: string;
}) {
const payload = `message processed: channel=${params.channel} chatId=${
params.chatId ?? "unknown"
} messageId=${params.messageId ?? "unknown"} sessionId=${
params.sessionId ?? "unknown"
} sessionKey=${params.sessionKey ?? "unknown"} outcome=${params.outcome} duration=${
params.durationMs ?? 0
}ms${params.reason ? ` reason=${params.reason}` : ""}${
params.error ? ` error="${params.error}"` : ""
}`;
if (params.outcome === "error") {
diag.error(payload);
} else if (params.outcome === "skipped") {
diag.debug(payload);
} else {
diag.info(payload);
}
emitDiagnosticEvent({
type: "message.processed",
channel: params.channel,
chatId: params.chatId,
messageId: params.messageId,
sessionId: params.sessionId,
sessionKey: params.sessionKey,
durationMs: params.durationMs,
outcome: params.outcome,
reason: params.reason,
error: params.error,
});
markActivity();
}
export function logSessionStateChange(
params: SessionRef & {
state: SessionStateValue;
reason?: string;
},
) {
const state = getSessionState(params);
const prevState = state.state;
state.state = params.state;
state.lastActivity = Date.now();
if (params.state === "idle") state.queueDepth = Math.max(0, state.queueDepth - 1);
diag.info(
`session state: sessionId=${state.sessionId ?? "unknown"} sessionKey=${
state.sessionKey ?? "unknown"
} prev=${prevState} new=${params.state} reason="${params.reason ?? ""}" queueDepth=${
state.queueDepth
}`,
);
emitDiagnosticEvent({
type: "session.state",
sessionId: state.sessionId,
sessionKey: state.sessionKey,
prevState,
state: params.state,
reason: params.reason,
queueDepth: state.queueDepth,
});
markActivity();
}
export function logSessionStuck(params: SessionRef & { state: SessionStateValue; ageMs: number }) {
const state = getSessionState(params);
diag.warn(
`stuck session: sessionId=${state.sessionId ?? "unknown"} sessionKey=${
state.sessionKey ?? "unknown"
} state=${params.state} age=${Math.round(params.ageMs / 1000)}s queueDepth=${state.queueDepth}`,
);
emitDiagnosticEvent({
type: "session.stuck",
sessionId: state.sessionId,
sessionKey: state.sessionKey,
state: params.state,
ageMs: params.ageMs,
queueDepth: state.queueDepth,
});
markActivity();
}
export function logLaneEnqueue(lane: string, queueSize: number) {
diag.debug(`lane enqueue: lane=${lane} queueSize=${queueSize}`);
emitDiagnosticEvent({
type: "queue.lane.enqueue",
lane,
queueSize,
});
markActivity();
}
export function logLaneDequeue(lane: string, waitMs: number, queueSize: number) {
diag.debug(`lane dequeue: lane=${lane} waitMs=${waitMs} queueSize=${queueSize}`);
emitDiagnosticEvent({
type: "queue.lane.dequeue",
lane,
queueSize,
waitMs,
});
markActivity();
}
export function logRunAttempt(params: SessionRef & { runId: string; attempt: number }) {
diag.info(
`run attempt: sessionId=${params.sessionId ?? "unknown"} sessionKey=${
params.sessionKey ?? "unknown"
} runId=${params.runId} attempt=${params.attempt}`,
);
emitDiagnosticEvent({
type: "run.attempt",
sessionId: params.sessionId,
sessionKey: params.sessionKey,
runId: params.runId,
attempt: params.attempt,
});
markActivity();
}
export function logActiveRuns() {
const activeSessions = Array.from(sessionStates.entries())
.filter(([, s]) => s.state === "processing")
.map(
([id, s]) =>
`${id}(q=${s.queueDepth},age=${Math.round((Date.now() - s.lastActivity) / 1000)}s)`,
);
diag.info(`active runs: count=${activeSessions.length} sessions=[${activeSessions.join(", ")}]`);
markActivity();
}
let heartbeatInterval: NodeJS.Timeout | null = null;
export function startDiagnosticHeartbeat() {
if (heartbeatInterval) return;
heartbeatInterval = setInterval(() => {
const now = Date.now();
const activeCount = Array.from(sessionStates.values()).filter(
(s) => s.state === "processing",
).length;
const waitingCount = Array.from(sessionStates.values()).filter(
(s) => s.state === "waiting",
).length;
const totalQueued = Array.from(sessionStates.values()).reduce(
(sum, s) => sum + s.queueDepth,
0,
);
const hasActivity =
lastActivityAt > 0 ||
webhookStats.received > 0 ||
activeCount > 0 ||
waitingCount > 0 ||
totalQueued > 0;
if (!hasActivity) return;
if (now - lastActivityAt > 120_000 && activeCount === 0 && waitingCount === 0) return;
diag.info(
`heartbeat: webhooks=${webhookStats.received}/${webhookStats.processed}/${webhookStats.errors} active=${activeCount} waiting=${waitingCount} queued=${totalQueued}`,
);
emitDiagnosticEvent({
type: "diagnostic.heartbeat",
webhooks: {
received: webhookStats.received,
processed: webhookStats.processed,
errors: webhookStats.errors,
},
active: activeCount,
waiting: waitingCount,
queued: totalQueued,
});
for (const [, state] of sessionStates) {
const ageMs = now - state.lastActivity;
if (state.state === "processing" && ageMs > 120_000) {
logSessionStuck({
sessionId: state.sessionId,
sessionKey: state.sessionKey,
state: state.state,
ageMs,
});
}
}
}, 30_000);
}
export function stopDiagnosticHeartbeat() {
if (heartbeatInterval) {
clearInterval(heartbeatInterval);
heartbeatInterval = null;
}
}
export { diag as diagnosticLogger };

View File

@@ -35,6 +35,21 @@ type ResolvedSettings = {
file: string;
};
export type LoggerResolvedSettings = ResolvedSettings;
export type LogTransportRecord = Record<string, unknown>;
export type LogTransport = (logObj: LogTransportRecord) => void;
const externalTransports = new Set<LogTransport>();
function attachExternalTransport(logger: TsLogger<LogObj>, transport: LogTransport): void {
logger.attachTransport((logObj: LogObj) => {
if (!externalTransports.has(transport)) return;
try {
transport(logObj as LogTransportRecord);
} catch {
// never block on logging failures
}
});
}
function resolveSettings(): ResolvedSettings {
let cfg: ClawdbotConfig["logging"] | undefined =
@@ -87,6 +102,9 @@ function buildLogger(settings: ResolvedSettings): TsLogger<LogObj> {
// never block on logging failures
}
});
for (const transport of externalTransports) {
attachExternalTransport(logger, transport);
}
return logger;
}
@@ -168,6 +186,17 @@ export function resetLogger() {
loggingState.overrideSettings = null;
}
export function registerLogTransport(transport: LogTransport): () => void {
externalTransports.add(transport);
const logger = loggingState.cachedLogger as TsLogger<LogObj> | null;
if (logger) {
attachExternalTransport(logger, transport);
}
return () => {
externalTransports.delete(transport);
};
}
function defaultRollingPathForToday(): string {
const today = new Date().toISOString().slice(0, 10); // YYYY-MM-DD
return path.join(DEFAULT_LOG_DIR, `${LOG_PREFIX}-${today}${LOG_SUFFIX}`);

View File

@@ -182,12 +182,29 @@ export { formatDocsLink } from "../terminal/links.js";
export type { HookEntry } from "../hooks/types.js";
export { normalizeE164 } from "../utils.js";
export { missingTargetError } from "../infra/outbound/target-errors.js";
export { registerLogTransport } from "../logging/logger.js";
export type { LogTransport, LogTransportRecord } from "../logging/logger.js";
export {
emitDiagnosticEvent,
isDiagnosticsEnabled,
onDiagnosticEvent,
} from "../infra/diagnostic-events.js";
export type { DiagnosticEventPayload, DiagnosticUsageEvent } from "../infra/diagnostic-events.js";
export type {
DiagnosticEventPayload,
DiagnosticHeartbeatEvent,
DiagnosticLaneDequeueEvent,
DiagnosticLaneEnqueueEvent,
DiagnosticMessageProcessedEvent,
DiagnosticMessageQueuedEvent,
DiagnosticRunAttemptEvent,
DiagnosticSessionState,
DiagnosticSessionStateEvent,
DiagnosticSessionStuckEvent,
DiagnosticUsageEvent,
DiagnosticWebhookErrorEvent,
DiagnosticWebhookProcessedEvent,
DiagnosticWebhookReceivedEvent,
} from "../infra/diagnostic-events.js";
// Channel: Discord
export {

View File

@@ -1,8 +1,32 @@
import { describe, expect, it } from "vitest";
import { beforeEach, describe, expect, it, vi } from "vitest";
const diagnosticMocks = vi.hoisted(() => ({
logLaneEnqueue: vi.fn(),
logLaneDequeue: vi.fn(),
diag: {
debug: vi.fn(),
warn: vi.fn(),
error: vi.fn(),
},
}));
vi.mock("../logging/diagnostic.js", () => ({
logLaneEnqueue: diagnosticMocks.logLaneEnqueue,
logLaneDequeue: diagnosticMocks.logLaneDequeue,
diagnosticLogger: diagnosticMocks.diag,
}));
import { enqueueCommand, getQueueSize } from "./command-queue.js";
describe("command queue", () => {
beforeEach(() => {
diagnosticMocks.logLaneEnqueue.mockClear();
diagnosticMocks.logLaneDequeue.mockClear();
diagnosticMocks.diag.debug.mockClear();
diagnosticMocks.diag.warn.mockClear();
diagnosticMocks.diag.error.mockClear();
});
it("runs tasks one at a time in order", async () => {
let active = 0;
let maxActive = 0;
@@ -29,6 +53,15 @@ describe("command queue", () => {
expect(getQueueSize()).toBe(0);
});
it("logs enqueue depth after push", async () => {
const task = enqueueCommand(async () => {});
expect(diagnosticMocks.logLaneEnqueue).toHaveBeenCalledTimes(1);
expect(diagnosticMocks.logLaneEnqueue.mock.calls[0]?.[1]).toBe(1);
await task;
});
it("invokes onWait callback when a task waits past the threshold", async () => {
let waited: number | null = null;
let queuedAhead: number | null = null;

View File

@@ -1,4 +1,5 @@
import { CommandLane } from "./lanes.js";
import { diagnosticLogger as diag, logLaneDequeue, logLaneEnqueue } from "../logging/diagnostic.js";
// Minimal in-process queue to serialize command executions.
// Default lane ("main") preserves the existing behavior. Additional lanes allow
@@ -49,16 +50,27 @@ function drainLane(lane: string) {
const waitedMs = Date.now() - entry.enqueuedAt;
if (waitedMs >= entry.warnAfterMs) {
entry.onWait?.(waitedMs, state.queue.length);
diag.warn(
`lane wait exceeded: lane=${lane} waitedMs=${waitedMs} queueAhead=${state.queue.length}`,
);
}
logLaneDequeue(lane, waitedMs, state.queue.length);
state.active += 1;
void (async () => {
const startTime = Date.now();
try {
const result = await entry.task();
state.active -= 1;
diag.debug(
`lane task done: lane=${lane} durationMs=${Date.now() - startTime} active=${state.active} queued=${state.queue.length}`,
);
pump();
entry.resolve(result);
} catch (err) {
state.active -= 1;
diag.error(
`lane task error: lane=${lane} durationMs=${Date.now() - startTime} error="${String(err)}"`,
);
pump();
entry.reject(err);
}
@@ -97,6 +109,7 @@ export function enqueueCommandInLane<T>(
warnAfterMs,
onWait: opts?.onWait,
});
logLaneEnqueue(cleaned, state.queue.length + state.active);
drainLane(cleaned);
});
}

View File

@@ -0,0 +1,101 @@
import { beforeEach, describe, expect, it, vi } from "vitest";
const buildTelegramMessageContext = vi.hoisted(() => vi.fn());
const dispatchTelegramMessage = vi.hoisted(() => vi.fn());
const logMessageQueued = vi.hoisted(() => vi.fn());
const logMessageProcessed = vi.hoisted(() => vi.fn());
const logSessionStateChange = vi.hoisted(() => vi.fn());
const diagnosticLogger = vi.hoisted(() => ({
info: vi.fn(),
debug: vi.fn(),
error: vi.fn(),
}));
vi.mock("./bot-message-context.js", () => ({
buildTelegramMessageContext,
}));
vi.mock("./bot-message-dispatch.js", () => ({
dispatchTelegramMessage,
}));
vi.mock("../logging/diagnostic.js", () => ({
diagnosticLogger,
logMessageQueued,
logMessageProcessed,
logSessionStateChange,
}));
import { createTelegramMessageProcessor } from "./bot-message.js";
describe("telegram bot message diagnostics", () => {
beforeEach(() => {
buildTelegramMessageContext.mockReset();
dispatchTelegramMessage.mockReset();
logMessageQueued.mockReset();
logMessageProcessed.mockReset();
logSessionStateChange.mockReset();
diagnosticLogger.info.mockReset();
diagnosticLogger.debug.mockReset();
diagnosticLogger.error.mockReset();
});
const baseDeps = {
bot: {},
cfg: {},
account: {},
telegramCfg: {},
historyLimit: 0,
groupHistories: {},
dmPolicy: {},
allowFrom: [],
groupAllowFrom: [],
ackReactionScope: "none",
logger: {},
resolveGroupActivation: () => true,
resolveGroupRequireMention: () => false,
resolveTelegramGroupConfig: () => ({}),
runtime: {},
replyToMode: "auto",
streamMode: "auto",
textLimit: 4096,
opts: {},
resolveBotTopicsEnabled: () => false,
};
it("decrements queue depth after successful processing", async () => {
buildTelegramMessageContext.mockResolvedValue({
route: { sessionKey: "agent:main:main" },
});
const processMessage = createTelegramMessageProcessor(baseDeps);
await processMessage({ message: { chat: { id: 123 }, message_id: 456 } }, [], [], {});
expect(logMessageQueued).toHaveBeenCalledTimes(1);
expect(logSessionStateChange).toHaveBeenCalledWith({
sessionKey: "agent:main:main",
state: "idle",
reason: "message_completed",
});
});
it("decrements queue depth after processing error", async () => {
buildTelegramMessageContext.mockResolvedValue({
route: { sessionKey: "agent:main:main" },
});
dispatchTelegramMessage.mockRejectedValue(new Error("boom"));
const processMessage = createTelegramMessageProcessor(baseDeps);
await expect(
processMessage({ message: { chat: { id: 123 }, message_id: 456 } }, [], [], {}),
).rejects.toThrow("boom");
expect(logMessageQueued).toHaveBeenCalledTimes(1);
expect(logSessionStateChange).toHaveBeenCalledWith({
sessionKey: "agent:main:main",
state: "idle",
reason: "message_error",
});
});
});

View File

@@ -1,6 +1,12 @@
// @ts-nocheck
import { buildTelegramMessageContext } from "./bot-message-context.js";
import { dispatchTelegramMessage } from "./bot-message-dispatch.js";
import {
diagnosticLogger as diag,
logMessageProcessed,
logMessageQueued,
logSessionStateChange,
} from "../logging/diagnostic.js";
export const createTelegramMessageProcessor = (deps) => {
const {
@@ -27,37 +33,122 @@ export const createTelegramMessageProcessor = (deps) => {
} = deps;
return async (primaryCtx, allMedia, storeAllowFrom, options) => {
const context = await buildTelegramMessageContext({
primaryCtx,
allMedia,
storeAllowFrom,
options,
bot,
cfg,
account,
historyLimit,
groupHistories,
dmPolicy,
allowFrom,
groupAllowFrom,
ackReactionScope,
logger,
resolveGroupActivation,
resolveGroupRequireMention,
resolveTelegramGroupConfig,
});
if (!context) return;
await dispatchTelegramMessage({
context,
bot,
cfg,
runtime,
replyToMode,
streamMode,
textLimit,
telegramCfg,
opts,
resolveBotTopicsEnabled,
});
const chatId = primaryCtx?.message?.chat?.id ?? primaryCtx?.chat?.id ?? "unknown";
const messageId = primaryCtx?.message?.message_id ?? "unknown";
const startTime = Date.now();
diag.info(
`process message start: channel=telegram chatId=${chatId} messageId=${messageId} mediaCount=${
allMedia?.length ?? 0
}`,
);
let sessionKey: string | undefined;
try {
const context = await buildTelegramMessageContext({
primaryCtx,
allMedia,
storeAllowFrom,
options,
bot,
cfg,
account,
historyLimit,
groupHistories,
dmPolicy,
allowFrom,
groupAllowFrom,
ackReactionScope,
logger,
resolveGroupActivation,
resolveGroupRequireMention,
resolveTelegramGroupConfig,
});
if (!context) {
const durationMs = Date.now() - startTime;
diag.debug(
`process message skipped: channel=telegram chatId=${chatId} messageId=${messageId} reason=no_context`,
);
logMessageProcessed({
channel: "telegram",
chatId,
messageId,
durationMs,
outcome: "skipped",
reason: "no_context",
});
return;
}
sessionKey = context?.route?.sessionKey;
diag.info(
`process message dispatching: channel=telegram chatId=${chatId} messageId=${messageId} sessionKey=${
sessionKey ?? "unknown"
}`,
);
if (sessionKey) {
logMessageQueued({ sessionKey, channel: "telegram", source: "telegram" });
}
await dispatchTelegramMessage({
context,
bot,
cfg,
runtime,
replyToMode,
streamMode,
textLimit,
telegramCfg,
opts,
resolveBotTopicsEnabled,
});
const durationMs = Date.now() - startTime;
logMessageProcessed({
channel: "telegram",
chatId,
messageId,
sessionKey,
durationMs,
outcome: "completed",
});
if (sessionKey) {
logSessionStateChange({
sessionKey,
state: "idle",
reason: "message_completed",
});
}
diag.info(
`process message complete: channel=telegram chatId=${chatId} messageId=${messageId} sessionKey=${
sessionKey ?? "unknown"
} durationMs=${durationMs}`,
);
} catch (err) {
const durationMs = Date.now() - startTime;
logMessageProcessed({
channel: "telegram",
chatId,
messageId,
sessionKey,
durationMs,
outcome: "error",
error: String(err),
});
if (sessionKey) {
logSessionStateChange({
sessionKey,
state: "idle",
reason: "message_error",
});
}
diag.error(
`process message error: channel=telegram chatId=${chatId} messageId=${messageId} durationMs=${durationMs} error="${String(
err,
)}"`,
);
throw err;
}
};
};

View File

@@ -5,6 +5,13 @@ import type { ClawdbotConfig } from "../config/config.js";
import { formatErrorMessage } from "../infra/errors.js";
import type { RuntimeEnv } from "../runtime.js";
import { defaultRuntime } from "../runtime.js";
import {
logWebhookError,
logWebhookProcessed,
logWebhookReceived,
startDiagnosticHeartbeat,
stopDiagnosticHeartbeat,
} from "../logging/diagnostic.js";
import { resolveTelegramAllowedUpdates } from "./allowed-updates.js";
import { createTelegramBot } from "./bot.js";
@@ -38,6 +45,8 @@ export async function startTelegramWebhook(opts: {
secretToken: opts.secret,
});
startDiagnosticHeartbeat();
const server = createServer((req, res) => {
if (req.url === healthPath) {
res.writeHead(200);
@@ -49,13 +58,29 @@ export async function startTelegramWebhook(opts: {
res.end();
return;
}
const startTime = Date.now();
logWebhookReceived({ channel: "telegram", updateType: "telegram-post" });
const handled = handler(req, res);
if (handled && typeof (handled as Promise<void>).catch === "function") {
void (handled as Promise<void>).catch((err) => {
runtime.log?.(`webhook handler failed: ${formatErrorMessage(err)}`);
if (!res.headersSent) res.writeHead(500);
res.end();
});
void (handled as Promise<void>)
.then(() => {
logWebhookProcessed({
channel: "telegram",
updateType: "telegram-post",
durationMs: Date.now() - startTime,
});
})
.catch((err) => {
const errMsg = formatErrorMessage(err);
logWebhookError({
channel: "telegram",
updateType: "telegram-post",
error: errMsg,
});
runtime.log?.(`webhook handler failed: ${errMsg}`);
if (!res.headersSent) res.writeHead(500);
res.end();
});
}
});
@@ -73,6 +98,7 @@ export async function startTelegramWebhook(opts: {
const shutdown = () => {
server.close();
void bot.stop();
stopDiagnosticHeartbeat();
};
if (opts.abortSignal) {
opts.abortSignal.addEventListener("abort", shutdown, { once: true });