diff --git a/CHANGELOG.md b/CHANGELOG.md index 6ebd02cb8..64c50dec2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -21,6 +21,7 @@ Docs: https://docs.clawd.bot - Discovery: shorten Bonjour DNS-SD service type to `_clawdbot-gw._tcp` and update discovery clients/docs. - Agents: preserve subagent announce thread/topic routing + queued replies across channels. (#1241) — thanks @gnarco. - Agents: avoid treating timeout errors with "aborted" messages as user aborts, so model fallback still runs. +- Diagnostics: export OTLP logs, correct queue depth tracking, and document message-flow telemetry. - Doctor: clarify plugin auto-enable hint text in the startup banner. - Gateway: clarify unauthorized handshake responses with token/password mismatch guidance. - UI: keep config form enums typed, preserve empty strings, protect sensitive defaults, and deepen config search. (#1315) — thanks @MaudeBot. diff --git a/docs/logging.md b/docs/logging.md index 2159d877e..74774866b 100644 --- a/docs/logging.md +++ b/docs/logging.md @@ -138,17 +138,45 @@ Redaction affects **console output only** and does not alter file logs. ## Diagnostics + OpenTelemetry -Diagnostics are **opt-in** structured events for model runs (usage + cost + -context + duration). They do **not** replace logs; they exist to feed metrics, -traces, and other exporters. +Diagnostics are structured, machine-readable events for model runs **and** +message-flow telemetry (webhooks, queueing, session state). They do **not** +replace logs; they exist to feed metrics, traces, and other exporters. -Clawdbot currently emits a `model.usage` event after each agent run with: +Diagnostics events are emitted in-process, but exporters only attach when +diagnostics + the exporter plugin are enabled. -- Token counts (input/output/cache/prompt/total) -- Estimated cost (USD) -- Context window used/limit -- Duration (ms) -- Provider/channel/model + session identifiers +### OpenTelemetry vs OTLP + +- **OpenTelemetry (OTel)**: the data model + SDKs for traces, metrics, and logs. +- **OTLP**: the wire protocol used to export OTel data to a collector/backend. +- Clawdbot exports via **OTLP/HTTP (protobuf)** today. + +### Signals exported + +- **Metrics**: counters + histograms (token usage, message flow, queueing). +- **Traces**: spans for model usage + webhook/message processing. +- **Logs**: exported over OTLP when `diagnostics.otel.logs` is enabled. Log + volume can be high; keep `logging.level` and exporter filters in mind. + +### Diagnostic event catalog + +Model usage: +- `model.usage`: tokens, cost, duration, context, provider/model/channel, session ids. + +Message flow: +- `webhook.received`: webhook ingress per channel. +- `webhook.processed`: webhook handled + duration. +- `webhook.error`: webhook handler errors. +- `message.queued`: message enqueued for processing. +- `message.processed`: outcome + duration + optional error. + +Queue + session: +- `queue.lane.enqueue`: command queue lane enqueue + depth. +- `queue.lane.dequeue`: command queue lane dequeue + wait time. +- `session.state`: session state transition + reason. +- `session.stuck`: session stuck warning + age. +- `run.attempt`: run retry/attempt metadata. +- `diagnostic.heartbeat`: aggregate counters (webhooks/queue/session). ### Enable diagnostics (no exporter) @@ -186,6 +214,7 @@ works with any OpenTelemetry collector/backend that accepts OTLP/HTTP. "serviceName": "clawdbot-gateway", "traces": true, "metrics": true, + "logs": true, "sampleRate": 0.2, "flushIntervalMs": 60000 } @@ -195,13 +224,91 @@ works with any OpenTelemetry collector/backend that accepts OTLP/HTTP. Notes: - You can also enable the plugin with `clawdbot plugins enable diagnostics-otel`. -- `protocol` currently supports `http/protobuf`. -- Metrics include token usage, cost, context size, and run duration. -- Traces/metrics can be toggled with `traces` / `metrics` (default: on). +- `protocol` currently supports `http/protobuf` only. `grpc` is ignored. +- Metrics include token usage, cost, context size, run duration, and message-flow + counters/histograms (webhooks, queueing, session state, queue depth/wait). +- Traces/metrics can be toggled with `traces` / `metrics` (default: on). Traces + include model usage spans plus webhook/message processing spans when enabled. - Set `headers` when your collector requires auth. - Environment variables supported: `OTEL_EXPORTER_OTLP_ENDPOINT`, `OTEL_SERVICE_NAME`, `OTEL_EXPORTER_OTLP_PROTOCOL`. +### Exported metrics (names + types) + +Model usage: +- `clawdbot.tokens` (counter, attrs: `clawdbot.token`, `clawdbot.channel`, + `clawdbot.provider`, `clawdbot.model`) +- `clawdbot.cost.usd` (counter, attrs: `clawdbot.channel`, `clawdbot.provider`, + `clawdbot.model`) +- `clawdbot.run.duration_ms` (histogram, attrs: `clawdbot.channel`, + `clawdbot.provider`, `clawdbot.model`) +- `clawdbot.context.tokens` (histogram, attrs: `clawdbot.context`, + `clawdbot.channel`, `clawdbot.provider`, `clawdbot.model`) + +Message flow: +- `clawdbot.webhook.received` (counter, attrs: `clawdbot.channel`, + `clawdbot.webhook`) +- `clawdbot.webhook.error` (counter, attrs: `clawdbot.channel`, + `clawdbot.webhook`) +- `clawdbot.webhook.duration_ms` (histogram, attrs: `clawdbot.channel`, + `clawdbot.webhook`) +- `clawdbot.message.queued` (counter, attrs: `clawdbot.channel`, + `clawdbot.source`) +- `clawdbot.message.processed` (counter, attrs: `clawdbot.channel`, + `clawdbot.outcome`) +- `clawdbot.message.duration_ms` (histogram, attrs: `clawdbot.channel`, + `clawdbot.outcome`) + +Queues + sessions: +- `clawdbot.queue.lane.enqueue` (counter, attrs: `clawdbot.lane`) +- `clawdbot.queue.lane.dequeue` (counter, attrs: `clawdbot.lane`) +- `clawdbot.queue.depth` (histogram, attrs: `clawdbot.lane` or + `clawdbot.channel=heartbeat`) +- `clawdbot.queue.wait_ms` (histogram, attrs: `clawdbot.lane`) +- `clawdbot.session.state` (counter, attrs: `clawdbot.state`, `clawdbot.reason`) +- `clawdbot.session.stuck` (counter, attrs: `clawdbot.state`) +- `clawdbot.session.stuck_age_ms` (histogram, attrs: `clawdbot.state`) +- `clawdbot.run.attempt` (counter, attrs: `clawdbot.attempt`) + +### Exported spans (names + key attributes) + +- `clawdbot.model.usage` + - `clawdbot.channel`, `clawdbot.provider`, `clawdbot.model` + - `clawdbot.sessionKey`, `clawdbot.sessionId` + - `clawdbot.tokens.*` (input/output/cache_read/cache_write/total) +- `clawdbot.webhook.processed` + - `clawdbot.channel`, `clawdbot.webhook`, `clawdbot.chatId` +- `clawdbot.webhook.error` + - `clawdbot.channel`, `clawdbot.webhook`, `clawdbot.chatId`, + `clawdbot.error` +- `clawdbot.message.processed` + - `clawdbot.channel`, `clawdbot.outcome`, `clawdbot.chatId`, + `clawdbot.messageId`, `clawdbot.sessionKey`, `clawdbot.sessionId`, + `clawdbot.reason` +- `clawdbot.session.stuck` + - `clawdbot.state`, `clawdbot.ageMs`, `clawdbot.queueDepth`, + `clawdbot.sessionKey`, `clawdbot.sessionId` + +### Sampling + flushing + +- Trace sampling: `diagnostics.otel.sampleRate` (0.0–1.0, root spans only). +- Metric export interval: `diagnostics.otel.flushIntervalMs` (min 1000ms). + +### Protocol notes + +- OTLP/HTTP endpoints can be set via `diagnostics.otel.endpoint` or + `OTEL_EXPORTER_OTLP_ENDPOINT`. +- If the endpoint already contains `/v1/traces` or `/v1/metrics`, it is used as-is. +- If the endpoint already contains `/v1/logs`, it is used as-is for logs. +- `diagnostics.otel.logs` enables OTLP log export for the main logger output. + +### Log export behavior + +- OTLP logs use the same structured records written to `logging.file`. +- Respect `logging.level` (file log level). Console redaction does **not** apply + to OTLP logs. +- High-volume installs should prefer OTLP collector sampling/filtering. + ## Troubleshooting tips - **Gateway not reachable?** Run `clawdbot doctor` first. diff --git a/extensions/diagnostics-otel/package.json b/extensions/diagnostics-otel/package.json index 8c43def2c..063d2a01d 100644 --- a/extensions/diagnostics-otel/package.json +++ b/extensions/diagnostics-otel/package.json @@ -8,9 +8,12 @@ }, "dependencies": { "@opentelemetry/api": "^1.9.0", + "@opentelemetry/api-logs": "^0.210.0", + "@opentelemetry/exporter-logs-otlp-http": "^0.210.0", "@opentelemetry/exporter-metrics-otlp-http": "^0.210.0", "@opentelemetry/exporter-trace-otlp-http": "^0.210.0", "@opentelemetry/resources": "^2.4.0", + "@opentelemetry/sdk-logs": "^0.210.0", "@opentelemetry/sdk-metrics": "^2.4.0", "@opentelemetry/sdk-node": "^0.210.0", "@opentelemetry/sdk-trace-base": "^2.4.0", diff --git a/extensions/diagnostics-otel/src/service.test.ts b/extensions/diagnostics-otel/src/service.test.ts new file mode 100644 index 000000000..1a5e76a25 --- /dev/null +++ b/extensions/diagnostics-otel/src/service.test.ts @@ -0,0 +1,220 @@ +import { beforeEach, describe, expect, test, vi } from "vitest"; + +const registerLogTransportMock = vi.hoisted(() => vi.fn()); + +const telemetryState = vi.hoisted(() => { + const counters = new Map }>(); + const histograms = new Map }>(); + const tracer = { + startSpan: vi.fn((_name: string, _opts?: unknown) => ({ + end: vi.fn(), + setStatus: vi.fn(), + })), + }; + const meter = { + createCounter: vi.fn((name: string) => { + const counter = { add: vi.fn() }; + counters.set(name, counter); + return counter; + }), + createHistogram: vi.fn((name: string) => { + const histogram = { record: vi.fn() }; + histograms.set(name, histogram); + return histogram; + }), + }; + return { counters, histograms, tracer, meter }; +}); + +const sdkStart = vi.hoisted(() => vi.fn().mockResolvedValue(undefined)); +const sdkShutdown = vi.hoisted(() => vi.fn().mockResolvedValue(undefined)); +const logEmit = vi.hoisted(() => vi.fn()); +const logShutdown = vi.hoisted(() => vi.fn().mockResolvedValue(undefined)); + +vi.mock("@opentelemetry/api", () => ({ + metrics: { + getMeter: () => telemetryState.meter, + }, + trace: { + getTracer: () => telemetryState.tracer, + }, + SpanStatusCode: { + ERROR: 2, + }, +})); + +vi.mock("@opentelemetry/sdk-node", () => ({ + NodeSDK: class { + start = sdkStart; + shutdown = sdkShutdown; + }, +})); + +vi.mock("@opentelemetry/exporter-metrics-otlp-http", () => ({ + OTLPMetricExporter: class {}, +})); + +vi.mock("@opentelemetry/exporter-trace-otlp-http", () => ({ + OTLPTraceExporter: class {}, +})); + +vi.mock("@opentelemetry/exporter-logs-otlp-http", () => ({ + OTLPLogExporter: class {}, +})); + +vi.mock("@opentelemetry/sdk-logs", () => ({ + BatchLogRecordProcessor: class {}, + LoggerProvider: class { + addLogRecordProcessor = vi.fn(); + getLogger = vi.fn(() => ({ + emit: logEmit, + })); + shutdown = logShutdown; + }, +})); + +vi.mock("@opentelemetry/sdk-metrics", () => ({ + PeriodicExportingMetricReader: class {}, +})); + +vi.mock("@opentelemetry/sdk-trace-base", () => ({ + ParentBasedSampler: class {}, + TraceIdRatioBasedSampler: class {}, +})); + +vi.mock("@opentelemetry/resources", () => ({ + Resource: class { + // eslint-disable-next-line @typescript-eslint/no-useless-constructor + constructor(_value?: unknown) {} + }, +})); + +vi.mock("@opentelemetry/semantic-conventions", () => ({ + SemanticResourceAttributes: { + SERVICE_NAME: "service.name", + }, +})); + +vi.mock("clawdbot/plugin-sdk", async () => { + const actual = await vi.importActual("clawdbot/plugin-sdk"); + return { + ...actual, + registerLogTransport: registerLogTransportMock, + }; +}); + +import { createDiagnosticsOtelService } from "./service.js"; +import { emitDiagnosticEvent } from "clawdbot/plugin-sdk"; + +describe("diagnostics-otel service", () => { + beforeEach(() => { + telemetryState.counters.clear(); + telemetryState.histograms.clear(); + telemetryState.tracer.startSpan.mockClear(); + telemetryState.meter.createCounter.mockClear(); + telemetryState.meter.createHistogram.mockClear(); + sdkStart.mockClear(); + sdkShutdown.mockClear(); + logEmit.mockClear(); + logShutdown.mockClear(); + registerLogTransportMock.mockReset(); + }); + + test("records message-flow metrics and spans", async () => { + const registeredTransports: Array<(logObj: Record) => void> = []; + const stopTransport = vi.fn(); + registerLogTransportMock.mockImplementation((transport) => { + registeredTransports.push(transport); + return stopTransport; + }); + + const service = createDiagnosticsOtelService(); + await service.start({ + config: { + diagnostics: { + enabled: true, + otel: { + enabled: true, + endpoint: "http://otel-collector:4318", + protocol: "http/protobuf", + traces: true, + metrics: true, + logs: true, + }, + }, + }, + logger: { + info: vi.fn(), + warn: vi.fn(), + error: vi.fn(), + debug: vi.fn(), + }, + }); + + emitDiagnosticEvent({ + type: "webhook.received", + channel: "telegram", + updateType: "telegram-post", + }); + emitDiagnosticEvent({ + type: "webhook.processed", + channel: "telegram", + updateType: "telegram-post", + durationMs: 120, + }); + emitDiagnosticEvent({ + type: "message.queued", + channel: "telegram", + source: "telegram", + queueDepth: 2, + }); + emitDiagnosticEvent({ + type: "message.processed", + channel: "telegram", + outcome: "completed", + durationMs: 55, + }); + emitDiagnosticEvent({ + type: "queue.lane.dequeue", + lane: "main", + queueSize: 3, + waitMs: 10, + }); + emitDiagnosticEvent({ + type: "session.stuck", + state: "processing", + ageMs: 125_000, + }); + emitDiagnosticEvent({ + type: "run.attempt", + runId: "run-1", + attempt: 2, + }); + + expect(telemetryState.counters.get("clawdbot.webhook.received")?.add).toHaveBeenCalled(); + expect(telemetryState.histograms.get("clawdbot.webhook.duration_ms")?.record).toHaveBeenCalled(); + expect(telemetryState.counters.get("clawdbot.message.queued")?.add).toHaveBeenCalled(); + expect(telemetryState.counters.get("clawdbot.message.processed")?.add).toHaveBeenCalled(); + expect(telemetryState.histograms.get("clawdbot.message.duration_ms")?.record).toHaveBeenCalled(); + expect(telemetryState.histograms.get("clawdbot.queue.wait_ms")?.record).toHaveBeenCalled(); + expect(telemetryState.counters.get("clawdbot.session.stuck")?.add).toHaveBeenCalled(); + expect(telemetryState.histograms.get("clawdbot.session.stuck_age_ms")?.record).toHaveBeenCalled(); + expect(telemetryState.counters.get("clawdbot.run.attempt")?.add).toHaveBeenCalled(); + + const spanNames = telemetryState.tracer.startSpan.mock.calls.map((call) => call[0]); + expect(spanNames).toContain("clawdbot.webhook.processed"); + expect(spanNames).toContain("clawdbot.message.processed"); + expect(spanNames).toContain("clawdbot.session.stuck"); + + expect(registerLogTransportMock).toHaveBeenCalledTimes(1); + expect(registeredTransports).toHaveLength(1); + registeredTransports[0]?.({ + 0: "{\"subsystem\":\"diagnostic\"}", + 1: "hello", + _meta: { logLevelName: "INFO", date: new Date() }, + }); + expect(logEmit).toHaveBeenCalled(); + + await service.stop?.(); + }); +}); diff --git a/extensions/diagnostics-otel/src/service.ts b/extensions/diagnostics-otel/src/service.ts index 9d4e59e72..2c11271b7 100644 --- a/extensions/diagnostics-otel/src/service.ts +++ b/extensions/diagnostics-otel/src/service.ts @@ -1,17 +1,17 @@ -import { metrics, trace } from "@opentelemetry/api"; +import { metrics, trace, SpanStatusCode } from "@opentelemetry/api"; +import type { SeverityNumber } from "@opentelemetry/api-logs"; +import { OTLPLogExporter } from "@opentelemetry/exporter-logs-otlp-http"; import { OTLPMetricExporter } from "@opentelemetry/exporter-metrics-otlp-http"; import { OTLPTraceExporter } from "@opentelemetry/exporter-trace-otlp-http"; import { Resource } from "@opentelemetry/resources"; +import { BatchLogRecordProcessor, LoggerProvider } from "@opentelemetry/sdk-logs"; import { PeriodicExportingMetricReader } from "@opentelemetry/sdk-metrics"; import { NodeSDK } from "@opentelemetry/sdk-node"; import { ParentBasedSampler, TraceIdRatioBasedSampler } from "@opentelemetry/sdk-trace-base"; import { SemanticResourceAttributes } from "@opentelemetry/semantic-conventions"; -import type { - ClawdbotPluginService, - DiagnosticUsageEvent, -} from "clawdbot/plugin-sdk"; -import { onDiagnosticEvent } from "clawdbot/plugin-sdk"; +import type { ClawdbotPluginService, DiagnosticEventPayload } from "clawdbot/plugin-sdk"; +import { onDiagnosticEvent, registerLogTransport } from "clawdbot/plugin-sdk"; const DEFAULT_SERVICE_NAME = "clawdbot"; @@ -34,6 +34,8 @@ function resolveSampleRate(value: number | undefined): number | undefined { export function createDiagnosticsOtelService(): ClawdbotPluginService { let sdk: NodeSDK | null = null; + let logProvider: LoggerProvider | null = null; + let stopLogTransport: (() => void) | null = null; let unsubscribe: (() => void) | null = null; return { @@ -57,7 +59,8 @@ export function createDiagnosticsOtelService(): ClawdbotPluginService { const tracesEnabled = otel.traces !== false; const metricsEnabled = otel.metrics !== false; - if (!tracesEnabled && !metricsEnabled) return; + const logsEnabled = otel.logs === true; + if (!tracesEnabled && !metricsEnabled && !logsEnabled) return; const resource = new Resource({ [SemanticResourceAttributes.SERVICE_NAME]: serviceName, @@ -65,6 +68,7 @@ export function createDiagnosticsOtelService(): ClawdbotPluginService { const traceUrl = resolveOtelUrl(endpoint, "v1/traces"); const metricUrl = resolveOtelUrl(endpoint, "v1/metrics"); + const logUrl = resolveOtelUrl(endpoint, "v1/logs"); const traceExporter = tracesEnabled ? new OTLPTraceExporter({ ...(traceUrl ? { url: traceUrl } : {}), @@ -88,20 +92,31 @@ export function createDiagnosticsOtelService(): ClawdbotPluginService { }) : undefined; - sdk = new NodeSDK({ - resource, - ...(traceExporter ? { traceExporter } : {}), - ...(metricReader ? { metricReader } : {}), - ...(sampleRate !== undefined - ? { - sampler: new ParentBasedSampler({ - root: new TraceIdRatioBasedSampler(sampleRate), - }), - } - : {}), - }); + if (tracesEnabled || metricsEnabled) { + sdk = new NodeSDK({ + resource, + ...(traceExporter ? { traceExporter } : {}), + ...(metricReader ? { metricReader } : {}), + ...(sampleRate !== undefined + ? { + sampler: new ParentBasedSampler({ + root: new TraceIdRatioBasedSampler(sampleRate), + }), + } + : {}), + }); - await sdk.start(); + await sdk.start(); + } + + const logSeverityMap: Record = { + TRACE: 1 as SeverityNumber, + DEBUG: 5 as SeverityNumber, + INFO: 9 as SeverityNumber, + WARN: 13 as SeverityNumber, + ERROR: 17 as SeverityNumber, + FATAL: 21 as SeverityNumber, + }; const meter = metrics.getMeter("clawdbot"); const tracer = trace.getTracer("clawdbot"); @@ -122,38 +137,210 @@ export function createDiagnosticsOtelService(): ClawdbotPluginService { unit: "1", description: "Context window size and usage", }); + const webhookReceivedCounter = meter.createCounter("clawdbot.webhook.received", { + unit: "1", + description: "Webhook requests received", + }); + const webhookErrorCounter = meter.createCounter("clawdbot.webhook.error", { + unit: "1", + description: "Webhook processing errors", + }); + const webhookDurationHistogram = meter.createHistogram("clawdbot.webhook.duration_ms", { + unit: "ms", + description: "Webhook processing duration", + }); + const messageQueuedCounter = meter.createCounter("clawdbot.message.queued", { + unit: "1", + description: "Messages queued for processing", + }); + const messageProcessedCounter = meter.createCounter("clawdbot.message.processed", { + unit: "1", + description: "Messages processed by outcome", + }); + const messageDurationHistogram = meter.createHistogram("clawdbot.message.duration_ms", { + unit: "ms", + description: "Message processing duration", + }); + const queueDepthHistogram = meter.createHistogram("clawdbot.queue.depth", { + unit: "1", + description: "Queue depth on enqueue/dequeue", + }); + const queueWaitHistogram = meter.createHistogram("clawdbot.queue.wait_ms", { + unit: "ms", + description: "Queue wait time before execution", + }); + const laneEnqueueCounter = meter.createCounter("clawdbot.queue.lane.enqueue", { + unit: "1", + description: "Command queue lane enqueue events", + }); + const laneDequeueCounter = meter.createCounter("clawdbot.queue.lane.dequeue", { + unit: "1", + description: "Command queue lane dequeue events", + }); + const sessionStateCounter = meter.createCounter("clawdbot.session.state", { + unit: "1", + description: "Session state transitions", + }); + const sessionStuckCounter = meter.createCounter("clawdbot.session.stuck", { + unit: "1", + description: "Sessions stuck in processing", + }); + const sessionStuckAgeHistogram = meter.createHistogram("clawdbot.session.stuck_age_ms", { + unit: "ms", + description: "Age of stuck sessions", + }); + const runAttemptCounter = meter.createCounter("clawdbot.run.attempt", { + unit: "1", + description: "Run attempts", + }); - unsubscribe = onDiagnosticEvent((evt) => { - if (evt.type !== "model.usage") return; - const usageEvent = evt as DiagnosticUsageEvent; + if (logsEnabled) { + const logExporter = new OTLPLogExporter({ + ...(logUrl ? { url: logUrl } : {}), + ...(headers ? { headers } : {}), + }); + logProvider = new LoggerProvider({ resource }); + logProvider.addLogRecordProcessor( + new BatchLogRecordProcessor(logExporter, { + ...(typeof otel.flushIntervalMs === "number" + ? { scheduledDelayMillis: Math.max(1000, otel.flushIntervalMs) } + : {}), + }), + ); + const otelLogger = logProvider.getLogger("clawdbot"); + + stopLogTransport = registerLogTransport((logObj) => { + const safeStringify = (value: unknown) => { + try { + return JSON.stringify(value); + } catch { + return String(value); + } + }; + const meta = (logObj as Record)._meta as + | { + logLevelName?: string; + date?: Date; + name?: string; + parentNames?: string[]; + path?: { + filePath?: string; + fileLine?: string; + fileColumn?: string; + filePathWithLine?: string; + method?: string; + }; + } + | undefined; + const logLevelName = meta?.logLevelName ?? "INFO"; + const severityNumber = logSeverityMap[logLevelName] ?? (9 as SeverityNumber); + + const numericArgs = Object.entries(logObj) + .filter(([key]) => /^\d+$/.test(key)) + .sort((a, b) => Number(a[0]) - Number(b[0])) + .map(([, value]) => value); + + let bindings: Record | undefined; + if (typeof numericArgs[0] === "string" && numericArgs[0].trim().startsWith("{")) { + try { + const parsed = JSON.parse(numericArgs[0]); + if (parsed && typeof parsed === "object" && !Array.isArray(parsed)) { + bindings = parsed as Record; + numericArgs.shift(); + } + } catch { + // ignore malformed json bindings + } + } + + let message = ""; + if (numericArgs.length > 0 && typeof numericArgs[numericArgs.length - 1] === "string") { + message = String(numericArgs.pop()); + } else if (numericArgs.length === 1) { + message = safeStringify(numericArgs[0]); + numericArgs.length = 0; + } + if (!message) { + message = "log"; + } + + const attributes: Record = { + "clawdbot.log.level": logLevelName, + }; + if (meta?.name) attributes["clawdbot.logger"] = meta.name; + if (meta?.parentNames?.length) { + attributes["clawdbot.logger.parents"] = meta.parentNames.join("."); + } + if (bindings) { + for (const [key, value] of Object.entries(bindings)) { + if (typeof value === "string" || typeof value === "number" || typeof value === "boolean") { + attributes[`clawdbot.${key}`] = value; + } else if (value != null) { + attributes[`clawdbot.${key}`] = safeStringify(value); + } + } + } + if (numericArgs.length > 0) { + attributes["clawdbot.log.args"] = safeStringify(numericArgs); + } + if (meta?.path?.filePath) attributes["code.filepath"] = meta.path.filePath; + if (meta?.path?.fileLine) attributes["code.lineno"] = Number(meta.path.fileLine); + if (meta?.path?.method) attributes["code.function"] = meta.path.method; + if (meta?.path?.filePathWithLine) { + attributes["clawdbot.code.location"] = meta.path.filePathWithLine; + } + + otelLogger.emit({ + body: message, + severityText: logLevelName, + severityNumber, + attributes, + timestamp: meta?.date ?? new Date(), + }); + }); + } + + const spanWithDuration = ( + name: string, + attributes: Record, + durationMs?: number, + ) => { + const startTime = + typeof durationMs === "number" ? Date.now() - Math.max(0, durationMs) : undefined; + const span = tracer.startSpan(name, { + attributes, + ...(startTime ? { startTime } : {}), + }); + return span; + }; + + const recordModelUsage = (evt: Extract) => { const attrs = { - "clawdbot.channel": usageEvent.channel ?? "unknown", - "clawdbot.provider": usageEvent.provider ?? "unknown", - "clawdbot.model": usageEvent.model ?? "unknown", + "clawdbot.channel": evt.channel ?? "unknown", + "clawdbot.provider": evt.provider ?? "unknown", + "clawdbot.model": evt.model ?? "unknown", }; - const usage = usageEvent.usage; + const usage = evt.usage; if (usage.input) tokensCounter.add(usage.input, { ...attrs, "clawdbot.token": "input" }); - if (usage.output) - tokensCounter.add(usage.output, { ...attrs, "clawdbot.token": "output" }); + if (usage.output) tokensCounter.add(usage.output, { ...attrs, "clawdbot.token": "output" }); if (usage.cacheRead) tokensCounter.add(usage.cacheRead, { ...attrs, "clawdbot.token": "cache_read" }); if (usage.cacheWrite) tokensCounter.add(usage.cacheWrite, { ...attrs, "clawdbot.token": "cache_write" }); if (usage.promptTokens) tokensCounter.add(usage.promptTokens, { ...attrs, "clawdbot.token": "prompt" }); - if (usage.total) - tokensCounter.add(usage.total, { ...attrs, "clawdbot.token": "total" }); + if (usage.total) tokensCounter.add(usage.total, { ...attrs, "clawdbot.token": "total" }); - if (usageEvent.costUsd) costCounter.add(usageEvent.costUsd, attrs); - if (usageEvent.durationMs) durationHistogram.record(usageEvent.durationMs, attrs); - if (usageEvent.context?.limit) - contextHistogram.record(usageEvent.context.limit, { + if (evt.costUsd) costCounter.add(evt.costUsd, attrs); + if (evt.durationMs) durationHistogram.record(evt.durationMs, attrs); + if (evt.context?.limit) + contextHistogram.record(evt.context.limit, { ...attrs, "clawdbot.context": "limit", }); - if (usageEvent.context?.used) - contextHistogram.record(usageEvent.context.used, { + if (evt.context?.used) + contextHistogram.record(evt.context.used, { ...attrs, "clawdbot.context": "used", }); @@ -161,8 +348,8 @@ export function createDiagnosticsOtelService(): ClawdbotPluginService { if (!tracesEnabled) return; const spanAttrs: Record = { ...attrs, - "clawdbot.sessionKey": usageEvent.sessionKey ?? "", - "clawdbot.sessionId": usageEvent.sessionId ?? "", + "clawdbot.sessionKey": evt.sessionKey ?? "", + "clawdbot.sessionId": evt.sessionId ?? "", "clawdbot.tokens.input": usage.input ?? 0, "clawdbot.tokens.output": usage.output ?? 0, "clawdbot.tokens.cache_read": usage.cacheRead ?? 0, @@ -170,23 +357,206 @@ export function createDiagnosticsOtelService(): ClawdbotPluginService { "clawdbot.tokens.total": usage.total ?? 0, }; - const startTime = usageEvent.durationMs - ? Date.now() - Math.max(0, usageEvent.durationMs) - : undefined; - const span = tracer.startSpan("clawdbot.model.usage", { - attributes: spanAttrs, - ...(startTime ? { startTime } : {}), - }); + const span = spanWithDuration("clawdbot.model.usage", spanAttrs, evt.durationMs); span.end(); + }; + + const recordWebhookReceived = ( + evt: Extract, + ) => { + const attrs = { + "clawdbot.channel": evt.channel ?? "unknown", + "clawdbot.webhook": evt.updateType ?? "unknown", + }; + webhookReceivedCounter.add(1, attrs); + }; + + const recordWebhookProcessed = ( + evt: Extract, + ) => { + const attrs = { + "clawdbot.channel": evt.channel ?? "unknown", + "clawdbot.webhook": evt.updateType ?? "unknown", + }; + if (typeof evt.durationMs === "number") { + webhookDurationHistogram.record(evt.durationMs, attrs); + } + if (!tracesEnabled) return; + const spanAttrs: Record = { ...attrs }; + if (evt.chatId !== undefined) spanAttrs["clawdbot.chatId"] = String(evt.chatId); + const span = spanWithDuration("clawdbot.webhook.processed", spanAttrs, evt.durationMs); + span.end(); + }; + + const recordWebhookError = ( + evt: Extract, + ) => { + const attrs = { + "clawdbot.channel": evt.channel ?? "unknown", + "clawdbot.webhook": evt.updateType ?? "unknown", + }; + webhookErrorCounter.add(1, attrs); + if (!tracesEnabled) return; + const spanAttrs: Record = { + ...attrs, + "clawdbot.error": evt.error, + }; + if (evt.chatId !== undefined) spanAttrs["clawdbot.chatId"] = String(evt.chatId); + const span = tracer.startSpan("clawdbot.webhook.error", { + attributes: spanAttrs, + }); + span.setStatus({ code: SpanStatusCode.ERROR, message: evt.error }); + span.end(); + }; + + const recordMessageQueued = ( + evt: Extract, + ) => { + const attrs = { + "clawdbot.channel": evt.channel ?? "unknown", + "clawdbot.source": evt.source ?? "unknown", + }; + messageQueuedCounter.add(1, attrs); + if (typeof evt.queueDepth === "number") { + queueDepthHistogram.record(evt.queueDepth, attrs); + } + }; + + const recordMessageProcessed = ( + evt: Extract, + ) => { + const attrs = { + "clawdbot.channel": evt.channel ?? "unknown", + "clawdbot.outcome": evt.outcome ?? "unknown", + }; + messageProcessedCounter.add(1, attrs); + if (typeof evt.durationMs === "number") { + messageDurationHistogram.record(evt.durationMs, attrs); + } + if (!tracesEnabled) return; + const spanAttrs: Record = { ...attrs }; + if (evt.sessionKey) spanAttrs["clawdbot.sessionKey"] = evt.sessionKey; + if (evt.sessionId) spanAttrs["clawdbot.sessionId"] = evt.sessionId; + if (evt.chatId !== undefined) spanAttrs["clawdbot.chatId"] = String(evt.chatId); + if (evt.messageId !== undefined) spanAttrs["clawdbot.messageId"] = String(evt.messageId); + if (evt.reason) spanAttrs["clawdbot.reason"] = evt.reason; + const span = spanWithDuration("clawdbot.message.processed", spanAttrs, evt.durationMs); + if (evt.outcome === "error") { + span.setStatus({ code: SpanStatusCode.ERROR, message: evt.error }); + } + span.end(); + }; + + const recordLaneEnqueue = ( + evt: Extract, + ) => { + const attrs = { "clawdbot.lane": evt.lane }; + laneEnqueueCounter.add(1, attrs); + queueDepthHistogram.record(evt.queueSize, attrs); + }; + + const recordLaneDequeue = ( + evt: Extract, + ) => { + const attrs = { "clawdbot.lane": evt.lane }; + laneDequeueCounter.add(1, attrs); + queueDepthHistogram.record(evt.queueSize, attrs); + if (typeof evt.waitMs === "number") { + queueWaitHistogram.record(evt.waitMs, attrs); + } + }; + + const recordSessionState = ( + evt: Extract, + ) => { + const attrs: Record = { "clawdbot.state": evt.state }; + if (evt.reason) attrs["clawdbot.reason"] = evt.reason; + sessionStateCounter.add(1, attrs); + }; + + const recordSessionStuck = ( + evt: Extract, + ) => { + const attrs: Record = { "clawdbot.state": evt.state }; + sessionStuckCounter.add(1, attrs); + if (typeof evt.ageMs === "number") { + sessionStuckAgeHistogram.record(evt.ageMs, attrs); + } + if (!tracesEnabled) return; + const spanAttrs: Record = { ...attrs }; + if (evt.sessionKey) spanAttrs["clawdbot.sessionKey"] = evt.sessionKey; + if (evt.sessionId) spanAttrs["clawdbot.sessionId"] = evt.sessionId; + spanAttrs["clawdbot.queueDepth"] = evt.queueDepth ?? 0; + spanAttrs["clawdbot.ageMs"] = evt.ageMs; + const span = tracer.startSpan("clawdbot.session.stuck", { attributes: spanAttrs }); + span.setStatus({ code: SpanStatusCode.ERROR, message: "session stuck" }); + span.end(); + }; + + const recordRunAttempt = (evt: Extract) => { + runAttemptCounter.add(1, { "clawdbot.attempt": evt.attempt }); + }; + + const recordHeartbeat = ( + evt: Extract, + ) => { + queueDepthHistogram.record(evt.queued, { "clawdbot.channel": "heartbeat" }); + }; + + unsubscribe = onDiagnosticEvent((evt: DiagnosticEventPayload) => { + switch (evt.type) { + case "model.usage": + recordModelUsage(evt); + return; + case "webhook.received": + recordWebhookReceived(evt); + return; + case "webhook.processed": + recordWebhookProcessed(evt); + return; + case "webhook.error": + recordWebhookError(evt); + return; + case "message.queued": + recordMessageQueued(evt); + return; + case "message.processed": + recordMessageProcessed(evt); + return; + case "queue.lane.enqueue": + recordLaneEnqueue(evt); + return; + case "queue.lane.dequeue": + recordLaneDequeue(evt); + return; + case "session.state": + recordSessionState(evt); + return; + case "session.stuck": + recordSessionStuck(evt); + return; + case "run.attempt": + recordRunAttempt(evt); + return; + case "diagnostic.heartbeat": + recordHeartbeat(evt); + return; + } }); - if (otel.logs) { - ctx.logger.warn("diagnostics-otel: logs exporter not wired yet"); + if (logsEnabled) { + ctx.logger.info("diagnostics-otel: logs exporter enabled (OTLP/HTTP)"); } }, async stop() { unsubscribe?.(); unsubscribe = null; + stopLogTransport?.(); + stopLogTransport = null; + if (logProvider) { + await logProvider.shutdown().catch(() => undefined); + logProvider = null; + } if (sdk) { await sdk.shutdown().catch(() => undefined); sdk = null; diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 49339d3c8..893c14dc3 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -264,6 +264,12 @@ importers: '@opentelemetry/api': specifier: ^1.9.0 version: 1.9.0 + '@opentelemetry/api-logs': + specifier: ^0.210.0 + version: 0.210.0 + '@opentelemetry/exporter-logs-otlp-http': + specifier: ^0.210.0 + version: 0.210.0(@opentelemetry/api@1.9.0) '@opentelemetry/exporter-metrics-otlp-http': specifier: ^0.210.0 version: 0.210.0(@opentelemetry/api@1.9.0) @@ -273,6 +279,9 @@ importers: '@opentelemetry/resources': specifier: ^2.4.0 version: 2.4.0(@opentelemetry/api@1.9.0) + '@opentelemetry/sdk-logs': + specifier: ^0.210.0 + version: 0.210.0(@opentelemetry/api@1.9.0) '@opentelemetry/sdk-metrics': specifier: ^2.4.0 version: 2.4.0(@opentelemetry/api@1.9.0) diff --git a/src/agents/pi-embedded-runner/runs.ts b/src/agents/pi-embedded-runner/runs.ts index abc207744..4fcefca12 100644 --- a/src/agents/pi-embedded-runner/runs.ts +++ b/src/agents/pi-embedded-runner/runs.ts @@ -1,3 +1,9 @@ +import { + diagnosticLogger as diag, + logMessageQueued, + logSessionStateChange, +} from "../../logging/diagnostic.js"; + type EmbeddedPiQueueHandle = { queueMessage: (text: string) => Promise; isStreaming: () => boolean; @@ -14,22 +20,40 @@ const EMBEDDED_RUN_WAITERS = new Map>(); export function queueEmbeddedPiMessage(sessionId: string, text: string): boolean { const handle = ACTIVE_EMBEDDED_RUNS.get(sessionId); - if (!handle) return false; - if (!handle.isStreaming()) return false; - if (handle.isCompacting()) return false; + if (!handle) { + diag.debug(`queue message failed: sessionId=${sessionId} reason=no_active_run`); + return false; + } + if (!handle.isStreaming()) { + diag.debug(`queue message failed: sessionId=${sessionId} reason=not_streaming`); + return false; + } + if (handle.isCompacting()) { + diag.debug(`queue message failed: sessionId=${sessionId} reason=compacting`); + return false; + } + logMessageQueued({ sessionId, source: "pi-embedded-runner" }); void handle.queueMessage(text); return true; } export function abortEmbeddedPiRun(sessionId: string): boolean { const handle = ACTIVE_EMBEDDED_RUNS.get(sessionId); - if (!handle) return false; + if (!handle) { + diag.debug(`abort failed: sessionId=${sessionId} reason=no_active_run`); + return false; + } + diag.info(`aborting run: sessionId=${sessionId}`); handle.abort(); return true; } export function isEmbeddedPiRunActive(sessionId: string): boolean { - return ACTIVE_EMBEDDED_RUNS.has(sessionId); + const active = ACTIVE_EMBEDDED_RUNS.has(sessionId); + if (active) { + diag.debug(`run active check: sessionId=${sessionId} active=true`); + } + return active; } export function isEmbeddedPiRunStreaming(sessionId: string): boolean { @@ -40,6 +64,7 @@ export function isEmbeddedPiRunStreaming(sessionId: string): boolean { export function waitForEmbeddedPiRunEnd(sessionId: string, timeoutMs = 15_000): Promise { if (!sessionId || !ACTIVE_EMBEDDED_RUNS.has(sessionId)) return Promise.resolve(true); + diag.debug(`waiting for run end: sessionId=${sessionId} timeoutMs=${timeoutMs}`); return new Promise((resolve) => { const waiters = EMBEDDED_RUN_WAITERS.get(sessionId) ?? new Set(); const waiter: EmbeddedRunWaiter = { @@ -48,6 +73,7 @@ export function waitForEmbeddedPiRunEnd(sessionId: string, timeoutMs = 15_000): () => { waiters.delete(waiter); if (waiters.size === 0) EMBEDDED_RUN_WAITERS.delete(sessionId); + diag.warn(`wait timeout: sessionId=${sessionId} timeoutMs=${timeoutMs}`); resolve(false); }, Math.max(100, timeoutMs), @@ -68,6 +94,7 @@ function notifyEmbeddedRunEnded(sessionId: string) { const waiters = EMBEDDED_RUN_WAITERS.get(sessionId); if (!waiters || waiters.size === 0) return; EMBEDDED_RUN_WAITERS.delete(sessionId); + diag.debug(`notifying waiters: sessionId=${sessionId} waiterCount=${waiters.size}`); for (const waiter of waiters) { clearTimeout(waiter.timer); waiter.resolve(true); @@ -75,13 +102,24 @@ function notifyEmbeddedRunEnded(sessionId: string) { } export function setActiveEmbeddedRun(sessionId: string, handle: EmbeddedPiQueueHandle) { + const wasActive = ACTIVE_EMBEDDED_RUNS.has(sessionId); ACTIVE_EMBEDDED_RUNS.set(sessionId, handle); + logSessionStateChange({ + sessionId, + state: "processing", + reason: wasActive ? "run_replaced" : "run_started", + }); + diag.info(`run registered: sessionId=${sessionId} totalActive=${ACTIVE_EMBEDDED_RUNS.size}`); } export function clearActiveEmbeddedRun(sessionId: string, handle: EmbeddedPiQueueHandle) { if (ACTIVE_EMBEDDED_RUNS.get(sessionId) === handle) { ACTIVE_EMBEDDED_RUNS.delete(sessionId); + logSessionStateChange({ sessionId, state: "idle", reason: "run_completed" }); + diag.info(`run cleared: sessionId=${sessionId} totalActive=${ACTIVE_EMBEDDED_RUNS.size}`); notifyEmbeddedRunEnded(sessionId); + } else { + diag.debug(`run clear skipped: sessionId=${sessionId} reason=handle_mismatch`); } } diff --git a/src/infra/diagnostic-events.test.ts b/src/infra/diagnostic-events.test.ts index 6d6806418..2235c9c8a 100644 --- a/src/infra/diagnostic-events.test.ts +++ b/src/infra/diagnostic-events.test.ts @@ -25,4 +25,31 @@ describe("diagnostic-events", () => { expect(seqs).toEqual([1, 2]); }); + + test("emits message-flow events", async () => { + resetDiagnosticEventsForTest(); + const types: string[] = []; + const stop = onDiagnosticEvent((evt) => types.push(evt.type)); + + emitDiagnosticEvent({ + type: "webhook.received", + channel: "telegram", + updateType: "telegram-post", + }); + emitDiagnosticEvent({ + type: "message.queued", + channel: "telegram", + source: "telegram", + queueDepth: 1, + }); + emitDiagnosticEvent({ + type: "session.state", + state: "processing", + reason: "run_started", + }); + + stop(); + + expect(types).toEqual(["webhook.received", "message.queued", "session.state"]); + }); }); diff --git a/src/infra/diagnostic-events.ts b/src/infra/diagnostic-events.ts index 9e9b6e99c..51edf7a3b 100644 --- a/src/infra/diagnostic-events.ts +++ b/src/infra/diagnostic-events.ts @@ -1,9 +1,14 @@ import type { ClawdbotConfig } from "../config/config.js"; -export type DiagnosticUsageEvent = { - type: "model.usage"; +export type DiagnosticSessionState = "idle" | "processing" | "waiting"; + +type DiagnosticBaseEvent = { ts: number; seq: number; +}; + +export type DiagnosticUsageEvent = DiagnosticBaseEvent & { + type: "model.usage"; sessionKey?: string; sessionId?: string; channel?: string; @@ -25,7 +30,116 @@ export type DiagnosticUsageEvent = { durationMs?: number; }; -export type DiagnosticEventPayload = DiagnosticUsageEvent; +export type DiagnosticWebhookReceivedEvent = DiagnosticBaseEvent & { + type: "webhook.received"; + channel: string; + updateType?: string; + chatId?: number | string; +}; + +export type DiagnosticWebhookProcessedEvent = DiagnosticBaseEvent & { + type: "webhook.processed"; + channel: string; + updateType?: string; + chatId?: number | string; + durationMs?: number; +}; + +export type DiagnosticWebhookErrorEvent = DiagnosticBaseEvent & { + type: "webhook.error"; + channel: string; + updateType?: string; + chatId?: number | string; + error: string; +}; + +export type DiagnosticMessageQueuedEvent = DiagnosticBaseEvent & { + type: "message.queued"; + sessionKey?: string; + sessionId?: string; + channel?: string; + source: string; + queueDepth?: number; +}; + +export type DiagnosticMessageProcessedEvent = DiagnosticBaseEvent & { + type: "message.processed"; + channel: string; + messageId?: number | string; + chatId?: number | string; + sessionKey?: string; + sessionId?: string; + durationMs?: number; + outcome: "completed" | "skipped" | "error"; + reason?: string; + error?: string; +}; + +export type DiagnosticSessionStateEvent = DiagnosticBaseEvent & { + type: "session.state"; + sessionKey?: string; + sessionId?: string; + prevState?: DiagnosticSessionState; + state: DiagnosticSessionState; + reason?: string; + queueDepth?: number; +}; + +export type DiagnosticSessionStuckEvent = DiagnosticBaseEvent & { + type: "session.stuck"; + sessionKey?: string; + sessionId?: string; + state: DiagnosticSessionState; + ageMs: number; + queueDepth?: number; +}; + +export type DiagnosticLaneEnqueueEvent = DiagnosticBaseEvent & { + type: "queue.lane.enqueue"; + lane: string; + queueSize: number; +}; + +export type DiagnosticLaneDequeueEvent = DiagnosticBaseEvent & { + type: "queue.lane.dequeue"; + lane: string; + queueSize: number; + waitMs: number; +}; + +export type DiagnosticRunAttemptEvent = DiagnosticBaseEvent & { + type: "run.attempt"; + sessionKey?: string; + sessionId?: string; + runId: string; + attempt: number; +}; + +export type DiagnosticHeartbeatEvent = DiagnosticBaseEvent & { + type: "diagnostic.heartbeat"; + webhooks: { + received: number; + processed: number; + errors: number; + }; + active: number; + waiting: number; + queued: number; +}; + +export type DiagnosticEventPayload = + | DiagnosticUsageEvent + | DiagnosticWebhookReceivedEvent + | DiagnosticWebhookProcessedEvent + | DiagnosticWebhookErrorEvent + | DiagnosticMessageQueuedEvent + | DiagnosticMessageProcessedEvent + | DiagnosticSessionStateEvent + | DiagnosticSessionStuckEvent + | DiagnosticLaneEnqueueEvent + | DiagnosticLaneDequeueEvent + | DiagnosticRunAttemptEvent + | DiagnosticHeartbeatEvent; let seq = 0; const listeners = new Set<(evt: DiagnosticEventPayload) => void>(); diff --git a/src/logging/diagnostic.ts b/src/logging/diagnostic.ts new file mode 100644 index 000000000..3470acddc --- /dev/null +++ b/src/logging/diagnostic.ts @@ -0,0 +1,350 @@ +import { emitDiagnosticEvent } from "../infra/diagnostic-events.js"; +import { createSubsystemLogger } from "./subsystem.js"; + +const diag = createSubsystemLogger("diagnostic"); + +type SessionStateValue = "idle" | "processing" | "waiting"; + +type SessionState = { + sessionId?: string; + sessionKey?: string; + lastActivity: number; + state: SessionStateValue; + queueDepth: number; +}; + +type SessionRef = { + sessionId?: string; + sessionKey?: string; +}; + +const sessionStates = new Map(); + +const webhookStats = { + received: 0, + processed: 0, + errors: 0, + lastReceived: 0, +}; + +let lastActivityAt = 0; + +function markActivity() { + lastActivityAt = Date.now(); +} + +function resolveSessionKey({ sessionKey, sessionId }: SessionRef) { + return sessionKey ?? sessionId ?? "unknown"; +} + +function getSessionState(ref: SessionRef): SessionState { + const key = resolveSessionKey(ref); + const existing = sessionStates.get(key); + if (existing) { + if (ref.sessionId) existing.sessionId = ref.sessionId; + if (ref.sessionKey) existing.sessionKey = ref.sessionKey; + return existing; + } + const created: SessionState = { + sessionId: ref.sessionId, + sessionKey: ref.sessionKey, + lastActivity: Date.now(), + state: "idle", + queueDepth: 0, + }; + sessionStates.set(key, created); + return created; +} + +export function logWebhookReceived(params: { + channel: string; + updateType?: string; + chatId?: number | string; +}) { + webhookStats.received += 1; + webhookStats.lastReceived = Date.now(); + diag.info( + `webhook received: channel=${params.channel} type=${params.updateType ?? "unknown"} chatId=${ + params.chatId ?? "unknown" + } total=${webhookStats.received}`, + ); + emitDiagnosticEvent({ + type: "webhook.received", + channel: params.channel, + updateType: params.updateType, + chatId: params.chatId, + }); + markActivity(); +} + +export function logWebhookProcessed(params: { + channel: string; + updateType?: string; + chatId?: number | string; + durationMs?: number; +}) { + webhookStats.processed += 1; + diag.info( + `webhook processed: channel=${params.channel} type=${ + params.updateType ?? "unknown" + } chatId=${params.chatId ?? "unknown"} duration=${params.durationMs ?? 0}ms processed=${ + webhookStats.processed + }`, + ); + emitDiagnosticEvent({ + type: "webhook.processed", + channel: params.channel, + updateType: params.updateType, + chatId: params.chatId, + durationMs: params.durationMs, + }); + markActivity(); +} + +export function logWebhookError(params: { + channel: string; + updateType?: string; + chatId?: number | string; + error: string; +}) { + webhookStats.errors += 1; + diag.error( + `webhook error: channel=${params.channel} type=${params.updateType ?? "unknown"} chatId=${ + params.chatId ?? "unknown" + } error="${params.error}" errors=${webhookStats.errors}`, + ); + emitDiagnosticEvent({ + type: "webhook.error", + channel: params.channel, + updateType: params.updateType, + chatId: params.chatId, + error: params.error, + }); + markActivity(); +} + +export function logMessageQueued(params: { + sessionId?: string; + sessionKey?: string; + channel?: string; + source: string; +}) { + const state = getSessionState(params); + state.queueDepth += 1; + state.lastActivity = Date.now(); + diag.info( + `message queued: sessionId=${state.sessionId ?? "unknown"} sessionKey=${ + state.sessionKey ?? "unknown" + } source=${params.source} queueDepth=${state.queueDepth} sessionState=${state.state}`, + ); + emitDiagnosticEvent({ + type: "message.queued", + sessionId: state.sessionId, + sessionKey: state.sessionKey, + channel: params.channel, + source: params.source, + queueDepth: state.queueDepth, + }); + markActivity(); +} + +export function logMessageProcessed(params: { + channel: string; + messageId?: number | string; + chatId?: number | string; + sessionId?: string; + sessionKey?: string; + durationMs?: number; + outcome: "completed" | "skipped" | "error"; + reason?: string; + error?: string; +}) { + const payload = `message processed: channel=${params.channel} chatId=${ + params.chatId ?? "unknown" + } messageId=${params.messageId ?? "unknown"} sessionId=${ + params.sessionId ?? "unknown" + } sessionKey=${params.sessionKey ?? "unknown"} outcome=${params.outcome} duration=${ + params.durationMs ?? 0 + }ms${params.reason ? ` reason=${params.reason}` : ""}${ + params.error ? ` error="${params.error}"` : "" + }`; + if (params.outcome === "error") { + diag.error(payload); + } else if (params.outcome === "skipped") { + diag.debug(payload); + } else { + diag.info(payload); + } + emitDiagnosticEvent({ + type: "message.processed", + channel: params.channel, + chatId: params.chatId, + messageId: params.messageId, + sessionId: params.sessionId, + sessionKey: params.sessionKey, + durationMs: params.durationMs, + outcome: params.outcome, + reason: params.reason, + error: params.error, + }); + markActivity(); +} + +export function logSessionStateChange( + params: SessionRef & { + state: SessionStateValue; + reason?: string; + }, +) { + const state = getSessionState(params); + const prevState = state.state; + state.state = params.state; + state.lastActivity = Date.now(); + if (params.state === "idle") state.queueDepth = Math.max(0, state.queueDepth - 1); + diag.info( + `session state: sessionId=${state.sessionId ?? "unknown"} sessionKey=${ + state.sessionKey ?? "unknown" + } prev=${prevState} new=${params.state} reason="${params.reason ?? ""}" queueDepth=${ + state.queueDepth + }`, + ); + emitDiagnosticEvent({ + type: "session.state", + sessionId: state.sessionId, + sessionKey: state.sessionKey, + prevState, + state: params.state, + reason: params.reason, + queueDepth: state.queueDepth, + }); + markActivity(); +} + +export function logSessionStuck(params: SessionRef & { state: SessionStateValue; ageMs: number }) { + const state = getSessionState(params); + diag.warn( + `stuck session: sessionId=${state.sessionId ?? "unknown"} sessionKey=${ + state.sessionKey ?? "unknown" + } state=${params.state} age=${Math.round(params.ageMs / 1000)}s queueDepth=${state.queueDepth}`, + ); + emitDiagnosticEvent({ + type: "session.stuck", + sessionId: state.sessionId, + sessionKey: state.sessionKey, + state: params.state, + ageMs: params.ageMs, + queueDepth: state.queueDepth, + }); + markActivity(); +} + +export function logLaneEnqueue(lane: string, queueSize: number) { + diag.debug(`lane enqueue: lane=${lane} queueSize=${queueSize}`); + emitDiagnosticEvent({ + type: "queue.lane.enqueue", + lane, + queueSize, + }); + markActivity(); +} + +export function logLaneDequeue(lane: string, waitMs: number, queueSize: number) { + diag.debug(`lane dequeue: lane=${lane} waitMs=${waitMs} queueSize=${queueSize}`); + emitDiagnosticEvent({ + type: "queue.lane.dequeue", + lane, + queueSize, + waitMs, + }); + markActivity(); +} + +export function logRunAttempt(params: SessionRef & { runId: string; attempt: number }) { + diag.info( + `run attempt: sessionId=${params.sessionId ?? "unknown"} sessionKey=${ + params.sessionKey ?? "unknown" + } runId=${params.runId} attempt=${params.attempt}`, + ); + emitDiagnosticEvent({ + type: "run.attempt", + sessionId: params.sessionId, + sessionKey: params.sessionKey, + runId: params.runId, + attempt: params.attempt, + }); + markActivity(); +} + +export function logActiveRuns() { + const activeSessions = Array.from(sessionStates.entries()) + .filter(([, s]) => s.state === "processing") + .map( + ([id, s]) => + `${id}(q=${s.queueDepth},age=${Math.round((Date.now() - s.lastActivity) / 1000)}s)`, + ); + diag.info(`active runs: count=${activeSessions.length} sessions=[${activeSessions.join(", ")}]`); + markActivity(); +} + +let heartbeatInterval: NodeJS.Timeout | null = null; + +export function startDiagnosticHeartbeat() { + if (heartbeatInterval) return; + heartbeatInterval = setInterval(() => { + const now = Date.now(); + const activeCount = Array.from(sessionStates.values()).filter( + (s) => s.state === "processing", + ).length; + const waitingCount = Array.from(sessionStates.values()).filter( + (s) => s.state === "waiting", + ).length; + const totalQueued = Array.from(sessionStates.values()).reduce( + (sum, s) => sum + s.queueDepth, + 0, + ); + const hasActivity = + lastActivityAt > 0 || + webhookStats.received > 0 || + activeCount > 0 || + waitingCount > 0 || + totalQueued > 0; + if (!hasActivity) return; + if (now - lastActivityAt > 120_000 && activeCount === 0 && waitingCount === 0) return; + + diag.info( + `heartbeat: webhooks=${webhookStats.received}/${webhookStats.processed}/${webhookStats.errors} active=${activeCount} waiting=${waitingCount} queued=${totalQueued}`, + ); + emitDiagnosticEvent({ + type: "diagnostic.heartbeat", + webhooks: { + received: webhookStats.received, + processed: webhookStats.processed, + errors: webhookStats.errors, + }, + active: activeCount, + waiting: waitingCount, + queued: totalQueued, + }); + + for (const [, state] of sessionStates) { + const ageMs = now - state.lastActivity; + if (state.state === "processing" && ageMs > 120_000) { + logSessionStuck({ + sessionId: state.sessionId, + sessionKey: state.sessionKey, + state: state.state, + ageMs, + }); + } + } + }, 30_000); +} + +export function stopDiagnosticHeartbeat() { + if (heartbeatInterval) { + clearInterval(heartbeatInterval); + heartbeatInterval = null; + } +} + +export { diag as diagnosticLogger }; diff --git a/src/logging/logger.ts b/src/logging/logger.ts index 3bf729388..a8dc784a4 100644 --- a/src/logging/logger.ts +++ b/src/logging/logger.ts @@ -35,6 +35,21 @@ type ResolvedSettings = { file: string; }; export type LoggerResolvedSettings = ResolvedSettings; +export type LogTransportRecord = Record; +export type LogTransport = (logObj: LogTransportRecord) => void; + +const externalTransports = new Set(); + +function attachExternalTransport(logger: TsLogger, transport: LogTransport): void { + logger.attachTransport((logObj: LogObj) => { + if (!externalTransports.has(transport)) return; + try { + transport(logObj as LogTransportRecord); + } catch { + // never block on logging failures + } + }); +} function resolveSettings(): ResolvedSettings { let cfg: ClawdbotConfig["logging"] | undefined = @@ -87,6 +102,9 @@ function buildLogger(settings: ResolvedSettings): TsLogger { // never block on logging failures } }); + for (const transport of externalTransports) { + attachExternalTransport(logger, transport); + } return logger; } @@ -168,6 +186,17 @@ export function resetLogger() { loggingState.overrideSettings = null; } +export function registerLogTransport(transport: LogTransport): () => void { + externalTransports.add(transport); + const logger = loggingState.cachedLogger as TsLogger | null; + if (logger) { + attachExternalTransport(logger, transport); + } + return () => { + externalTransports.delete(transport); + }; +} + function defaultRollingPathForToday(): string { const today = new Date().toISOString().slice(0, 10); // YYYY-MM-DD return path.join(DEFAULT_LOG_DIR, `${LOG_PREFIX}-${today}${LOG_SUFFIX}`); diff --git a/src/plugin-sdk/index.ts b/src/plugin-sdk/index.ts index 3814ae11c..254ed7c52 100644 --- a/src/plugin-sdk/index.ts +++ b/src/plugin-sdk/index.ts @@ -182,12 +182,29 @@ export { formatDocsLink } from "../terminal/links.js"; export type { HookEntry } from "../hooks/types.js"; export { normalizeE164 } from "../utils.js"; export { missingTargetError } from "../infra/outbound/target-errors.js"; +export { registerLogTransport } from "../logging/logger.js"; +export type { LogTransport, LogTransportRecord } from "../logging/logger.js"; export { emitDiagnosticEvent, isDiagnosticsEnabled, onDiagnosticEvent, } from "../infra/diagnostic-events.js"; -export type { DiagnosticEventPayload, DiagnosticUsageEvent } from "../infra/diagnostic-events.js"; +export type { + DiagnosticEventPayload, + DiagnosticHeartbeatEvent, + DiagnosticLaneDequeueEvent, + DiagnosticLaneEnqueueEvent, + DiagnosticMessageProcessedEvent, + DiagnosticMessageQueuedEvent, + DiagnosticRunAttemptEvent, + DiagnosticSessionState, + DiagnosticSessionStateEvent, + DiagnosticSessionStuckEvent, + DiagnosticUsageEvent, + DiagnosticWebhookErrorEvent, + DiagnosticWebhookProcessedEvent, + DiagnosticWebhookReceivedEvent, +} from "../infra/diagnostic-events.js"; // Channel: Discord export { diff --git a/src/process/command-queue.test.ts b/src/process/command-queue.test.ts index d71619aed..8ede381da 100644 --- a/src/process/command-queue.test.ts +++ b/src/process/command-queue.test.ts @@ -1,8 +1,32 @@ -import { describe, expect, it } from "vitest"; +import { beforeEach, describe, expect, it, vi } from "vitest"; + +const diagnosticMocks = vi.hoisted(() => ({ + logLaneEnqueue: vi.fn(), + logLaneDequeue: vi.fn(), + diag: { + debug: vi.fn(), + warn: vi.fn(), + error: vi.fn(), + }, +})); + +vi.mock("../logging/diagnostic.js", () => ({ + logLaneEnqueue: diagnosticMocks.logLaneEnqueue, + logLaneDequeue: diagnosticMocks.logLaneDequeue, + diagnosticLogger: diagnosticMocks.diag, +})); import { enqueueCommand, getQueueSize } from "./command-queue.js"; describe("command queue", () => { + beforeEach(() => { + diagnosticMocks.logLaneEnqueue.mockClear(); + diagnosticMocks.logLaneDequeue.mockClear(); + diagnosticMocks.diag.debug.mockClear(); + diagnosticMocks.diag.warn.mockClear(); + diagnosticMocks.diag.error.mockClear(); + }); + it("runs tasks one at a time in order", async () => { let active = 0; let maxActive = 0; @@ -29,6 +53,15 @@ describe("command queue", () => { expect(getQueueSize()).toBe(0); }); + it("logs enqueue depth after push", async () => { + const task = enqueueCommand(async () => {}); + + expect(diagnosticMocks.logLaneEnqueue).toHaveBeenCalledTimes(1); + expect(diagnosticMocks.logLaneEnqueue.mock.calls[0]?.[1]).toBe(1); + + await task; + }); + it("invokes onWait callback when a task waits past the threshold", async () => { let waited: number | null = null; let queuedAhead: number | null = null; diff --git a/src/process/command-queue.ts b/src/process/command-queue.ts index 564ec5213..9b203c938 100644 --- a/src/process/command-queue.ts +++ b/src/process/command-queue.ts @@ -1,4 +1,5 @@ import { CommandLane } from "./lanes.js"; +import { diagnosticLogger as diag, logLaneDequeue, logLaneEnqueue } from "../logging/diagnostic.js"; // Minimal in-process queue to serialize command executions. // Default lane ("main") preserves the existing behavior. Additional lanes allow @@ -49,16 +50,27 @@ function drainLane(lane: string) { const waitedMs = Date.now() - entry.enqueuedAt; if (waitedMs >= entry.warnAfterMs) { entry.onWait?.(waitedMs, state.queue.length); + diag.warn( + `lane wait exceeded: lane=${lane} waitedMs=${waitedMs} queueAhead=${state.queue.length}`, + ); } + logLaneDequeue(lane, waitedMs, state.queue.length); state.active += 1; void (async () => { + const startTime = Date.now(); try { const result = await entry.task(); state.active -= 1; + diag.debug( + `lane task done: lane=${lane} durationMs=${Date.now() - startTime} active=${state.active} queued=${state.queue.length}`, + ); pump(); entry.resolve(result); } catch (err) { state.active -= 1; + diag.error( + `lane task error: lane=${lane} durationMs=${Date.now() - startTime} error="${String(err)}"`, + ); pump(); entry.reject(err); } @@ -97,6 +109,7 @@ export function enqueueCommandInLane( warnAfterMs, onWait: opts?.onWait, }); + logLaneEnqueue(cleaned, state.queue.length + state.active); drainLane(cleaned); }); } diff --git a/src/telegram/bot-message.test.ts b/src/telegram/bot-message.test.ts new file mode 100644 index 000000000..ce0099cec --- /dev/null +++ b/src/telegram/bot-message.test.ts @@ -0,0 +1,101 @@ +import { beforeEach, describe, expect, it, vi } from "vitest"; + +const buildTelegramMessageContext = vi.hoisted(() => vi.fn()); +const dispatchTelegramMessage = vi.hoisted(() => vi.fn()); +const logMessageQueued = vi.hoisted(() => vi.fn()); +const logMessageProcessed = vi.hoisted(() => vi.fn()); +const logSessionStateChange = vi.hoisted(() => vi.fn()); +const diagnosticLogger = vi.hoisted(() => ({ + info: vi.fn(), + debug: vi.fn(), + error: vi.fn(), +})); + +vi.mock("./bot-message-context.js", () => ({ + buildTelegramMessageContext, +})); + +vi.mock("./bot-message-dispatch.js", () => ({ + dispatchTelegramMessage, +})); + +vi.mock("../logging/diagnostic.js", () => ({ + diagnosticLogger, + logMessageQueued, + logMessageProcessed, + logSessionStateChange, +})); + +import { createTelegramMessageProcessor } from "./bot-message.js"; + +describe("telegram bot message diagnostics", () => { + beforeEach(() => { + buildTelegramMessageContext.mockReset(); + dispatchTelegramMessage.mockReset(); + logMessageQueued.mockReset(); + logMessageProcessed.mockReset(); + logSessionStateChange.mockReset(); + diagnosticLogger.info.mockReset(); + diagnosticLogger.debug.mockReset(); + diagnosticLogger.error.mockReset(); + }); + + const baseDeps = { + bot: {}, + cfg: {}, + account: {}, + telegramCfg: {}, + historyLimit: 0, + groupHistories: {}, + dmPolicy: {}, + allowFrom: [], + groupAllowFrom: [], + ackReactionScope: "none", + logger: {}, + resolveGroupActivation: () => true, + resolveGroupRequireMention: () => false, + resolveTelegramGroupConfig: () => ({}), + runtime: {}, + replyToMode: "auto", + streamMode: "auto", + textLimit: 4096, + opts: {}, + resolveBotTopicsEnabled: () => false, + }; + + it("decrements queue depth after successful processing", async () => { + buildTelegramMessageContext.mockResolvedValue({ + route: { sessionKey: "agent:main:main" }, + }); + + const processMessage = createTelegramMessageProcessor(baseDeps); + await processMessage({ message: { chat: { id: 123 }, message_id: 456 } }, [], [], {}); + + expect(logMessageQueued).toHaveBeenCalledTimes(1); + expect(logSessionStateChange).toHaveBeenCalledWith({ + sessionKey: "agent:main:main", + state: "idle", + reason: "message_completed", + }); + }); + + it("decrements queue depth after processing error", async () => { + buildTelegramMessageContext.mockResolvedValue({ + route: { sessionKey: "agent:main:main" }, + }); + dispatchTelegramMessage.mockRejectedValue(new Error("boom")); + + const processMessage = createTelegramMessageProcessor(baseDeps); + + await expect( + processMessage({ message: { chat: { id: 123 }, message_id: 456 } }, [], [], {}), + ).rejects.toThrow("boom"); + + expect(logMessageQueued).toHaveBeenCalledTimes(1); + expect(logSessionStateChange).toHaveBeenCalledWith({ + sessionKey: "agent:main:main", + state: "idle", + reason: "message_error", + }); + }); +}); diff --git a/src/telegram/bot-message.ts b/src/telegram/bot-message.ts index 313296b1d..bfd89793d 100644 --- a/src/telegram/bot-message.ts +++ b/src/telegram/bot-message.ts @@ -1,6 +1,12 @@ // @ts-nocheck import { buildTelegramMessageContext } from "./bot-message-context.js"; import { dispatchTelegramMessage } from "./bot-message-dispatch.js"; +import { + diagnosticLogger as diag, + logMessageProcessed, + logMessageQueued, + logSessionStateChange, +} from "../logging/diagnostic.js"; export const createTelegramMessageProcessor = (deps) => { const { @@ -27,37 +33,122 @@ export const createTelegramMessageProcessor = (deps) => { } = deps; return async (primaryCtx, allMedia, storeAllowFrom, options) => { - const context = await buildTelegramMessageContext({ - primaryCtx, - allMedia, - storeAllowFrom, - options, - bot, - cfg, - account, - historyLimit, - groupHistories, - dmPolicy, - allowFrom, - groupAllowFrom, - ackReactionScope, - logger, - resolveGroupActivation, - resolveGroupRequireMention, - resolveTelegramGroupConfig, - }); - if (!context) return; - await dispatchTelegramMessage({ - context, - bot, - cfg, - runtime, - replyToMode, - streamMode, - textLimit, - telegramCfg, - opts, - resolveBotTopicsEnabled, - }); + const chatId = primaryCtx?.message?.chat?.id ?? primaryCtx?.chat?.id ?? "unknown"; + const messageId = primaryCtx?.message?.message_id ?? "unknown"; + const startTime = Date.now(); + + diag.info( + `process message start: channel=telegram chatId=${chatId} messageId=${messageId} mediaCount=${ + allMedia?.length ?? 0 + }`, + ); + + let sessionKey: string | undefined; + + try { + const context = await buildTelegramMessageContext({ + primaryCtx, + allMedia, + storeAllowFrom, + options, + bot, + cfg, + account, + historyLimit, + groupHistories, + dmPolicy, + allowFrom, + groupAllowFrom, + ackReactionScope, + logger, + resolveGroupActivation, + resolveGroupRequireMention, + resolveTelegramGroupConfig, + }); + if (!context) { + const durationMs = Date.now() - startTime; + diag.debug( + `process message skipped: channel=telegram chatId=${chatId} messageId=${messageId} reason=no_context`, + ); + logMessageProcessed({ + channel: "telegram", + chatId, + messageId, + durationMs, + outcome: "skipped", + reason: "no_context", + }); + return; + } + + sessionKey = context?.route?.sessionKey; + diag.info( + `process message dispatching: channel=telegram chatId=${chatId} messageId=${messageId} sessionKey=${ + sessionKey ?? "unknown" + }`, + ); + if (sessionKey) { + logMessageQueued({ sessionKey, channel: "telegram", source: "telegram" }); + } + + await dispatchTelegramMessage({ + context, + bot, + cfg, + runtime, + replyToMode, + streamMode, + textLimit, + telegramCfg, + opts, + resolveBotTopicsEnabled, + }); + + const durationMs = Date.now() - startTime; + logMessageProcessed({ + channel: "telegram", + chatId, + messageId, + sessionKey, + durationMs, + outcome: "completed", + }); + if (sessionKey) { + logSessionStateChange({ + sessionKey, + state: "idle", + reason: "message_completed", + }); + } + diag.info( + `process message complete: channel=telegram chatId=${chatId} messageId=${messageId} sessionKey=${ + sessionKey ?? "unknown" + } durationMs=${durationMs}`, + ); + } catch (err) { + const durationMs = Date.now() - startTime; + logMessageProcessed({ + channel: "telegram", + chatId, + messageId, + sessionKey, + durationMs, + outcome: "error", + error: String(err), + }); + if (sessionKey) { + logSessionStateChange({ + sessionKey, + state: "idle", + reason: "message_error", + }); + } + diag.error( + `process message error: channel=telegram chatId=${chatId} messageId=${messageId} durationMs=${durationMs} error="${String( + err, + )}"`, + ); + throw err; + } }; }; diff --git a/src/telegram/webhook.ts b/src/telegram/webhook.ts index 6e2310b8b..1092722ef 100644 --- a/src/telegram/webhook.ts +++ b/src/telegram/webhook.ts @@ -5,6 +5,13 @@ import type { ClawdbotConfig } from "../config/config.js"; import { formatErrorMessage } from "../infra/errors.js"; import type { RuntimeEnv } from "../runtime.js"; import { defaultRuntime } from "../runtime.js"; +import { + logWebhookError, + logWebhookProcessed, + logWebhookReceived, + startDiagnosticHeartbeat, + stopDiagnosticHeartbeat, +} from "../logging/diagnostic.js"; import { resolveTelegramAllowedUpdates } from "./allowed-updates.js"; import { createTelegramBot } from "./bot.js"; @@ -38,6 +45,8 @@ export async function startTelegramWebhook(opts: { secretToken: opts.secret, }); + startDiagnosticHeartbeat(); + const server = createServer((req, res) => { if (req.url === healthPath) { res.writeHead(200); @@ -49,13 +58,29 @@ export async function startTelegramWebhook(opts: { res.end(); return; } + const startTime = Date.now(); + logWebhookReceived({ channel: "telegram", updateType: "telegram-post" }); const handled = handler(req, res); if (handled && typeof (handled as Promise).catch === "function") { - void (handled as Promise).catch((err) => { - runtime.log?.(`webhook handler failed: ${formatErrorMessage(err)}`); - if (!res.headersSent) res.writeHead(500); - res.end(); - }); + void (handled as Promise) + .then(() => { + logWebhookProcessed({ + channel: "telegram", + updateType: "telegram-post", + durationMs: Date.now() - startTime, + }); + }) + .catch((err) => { + const errMsg = formatErrorMessage(err); + logWebhookError({ + channel: "telegram", + updateType: "telegram-post", + error: errMsg, + }); + runtime.log?.(`webhook handler failed: ${errMsg}`); + if (!res.headersSent) res.writeHead(500); + res.end(); + }); } }); @@ -73,6 +98,7 @@ export async function startTelegramWebhook(opts: { const shutdown = () => { server.close(); void bot.stop(); + stopDiagnosticHeartbeat(); }; if (opts.abortSignal) { opts.abortSignal.addEventListener("abort", shutdown, { once: true });