From 7e498ab94a4309e87c5d98eaad252f4d48194767 Mon Sep 17 00:00:00 2001 From: Andrii Date: Fri, 23 Jan 2026 11:47:08 +0100 Subject: [PATCH] anthropic-payload-log mvp MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Added a dedicated Anthropic payload logger that writes exact request JSON (as sent) plus per‑run usage stats (input/output/cache read/write) to a standalone JSONL file, gated by an env flag. Changes - New logger: src/agents/anthropic-payload-log.ts (writes logs/anthropic-payload.jsonl under the state dir, optional override via env). - Hooked into embedded runs to wrap the stream function and record usage: src/agents/pi-embedded-runner/run/attempt.ts. How to enable - CLAWDBOT_ANTHROPIC_PAYLOAD_LOG=1 - Optional: CLAWDBOT_ANTHROPIC_PAYLOAD_LOG_FILE=/path/to/anthropic-payload.jsonl What you’ll get (JSONL) - stage: "request" with payload (exact Anthropic params) + payloadDigest - stage: "usage" with usage (input/output/cacheRead/cacheWrite/totalTokens/etc.) Notes - Usage is taken from the last assistant message in the run; if the run fails before usage is present, you’ll only see an error field. Files touched - src/agents/anthropic-payload-log.ts - src/agents/pi-embedded-runner/run/attempt.ts Tests not run. --- src/agents/anthropic-payload-log.ts | 202 +++++++++++++++++++ src/agents/pi-embedded-runner/run/attempt.ts | 17 ++ 2 files changed, 219 insertions(+) create mode 100644 src/agents/anthropic-payload-log.ts diff --git a/src/agents/anthropic-payload-log.ts b/src/agents/anthropic-payload-log.ts new file mode 100644 index 000000000..8b39d2f1a --- /dev/null +++ b/src/agents/anthropic-payload-log.ts @@ -0,0 +1,202 @@ +import crypto from "node:crypto"; +import fs from "node:fs/promises"; +import path from "node:path"; + +import type { AgentMessage, StreamFn } from "@mariozechner/pi-agent-core"; +import type { Api, Model } from "@mariozechner/pi-ai"; + +import { resolveStateDir } from "../config/paths.js"; +import { parseBooleanValue } from "../utils/boolean.js"; +import { resolveUserPath } from "../utils.js"; +import { createSubsystemLogger } from "../logging/subsystem.js"; + +type PayloadLogStage = "request" | "usage"; + +type PayloadLogEvent = { + ts: string; + stage: PayloadLogStage; + runId?: string; + sessionId?: string; + sessionKey?: string; + provider?: string; + modelId?: string; + modelApi?: string | null; + workspaceDir?: string; + payload?: unknown; + usage?: Record; + error?: string; + payloadDigest?: string; +}; + +type PayloadLogConfig = { + enabled: boolean; + filePath: string; +}; + +type PayloadLogWriter = { + filePath: string; + write: (line: string) => void; +}; + +const writers = new Map(); +const log = createSubsystemLogger("agent/anthropic-payload"); + +function resolvePayloadLogConfig(env: NodeJS.ProcessEnv): PayloadLogConfig { + const enabled = parseBooleanValue(env.CLAWDBOT_ANTHROPIC_PAYLOAD_LOG) ?? false; + const fileOverride = env.CLAWDBOT_ANTHROPIC_PAYLOAD_LOG_FILE?.trim(); + const filePath = fileOverride + ? resolveUserPath(fileOverride) + : path.join(resolveStateDir(env), "logs", "anthropic-payload.jsonl"); + return { enabled, filePath }; +} + +function getWriter(filePath: string): PayloadLogWriter { + const existing = writers.get(filePath); + if (existing) return existing; + + const dir = path.dirname(filePath); + const ready = fs.mkdir(dir, { recursive: true }).catch(() => undefined); + let queue = Promise.resolve(); + + const writer: PayloadLogWriter = { + filePath, + write: (line: string) => { + queue = queue + .then(() => ready) + .then(() => fs.appendFile(filePath, line, "utf8")) + .catch(() => undefined); + }, + }; + + writers.set(filePath, writer); + return writer; +} + +function safeJsonStringify(value: unknown): string | null { + try { + return JSON.stringify(value, (_key, val) => { + if (typeof val === "bigint") return val.toString(); + if (typeof val === "function") return "[Function]"; + if (val instanceof Error) { + return { name: val.name, message: val.message, stack: val.stack }; + } + if (val instanceof Uint8Array) { + return { type: "Uint8Array", data: Buffer.from(val).toString("base64") }; + } + return val; + }); + } catch { + return null; + } +} + +function digest(value: unknown): string | undefined { + const serialized = safeJsonStringify(value); + if (!serialized) return undefined; + return crypto.createHash("sha256").update(serialized).digest("hex"); +} + +function isAnthropicModel(model: Model | undefined | null): boolean { + return (model as { api?: unknown })?.api === "anthropic-messages"; +} + +function findLastAssistantUsage(messages: AgentMessage[]): Record | null { + for (let i = messages.length - 1; i >= 0; i -= 1) { + const msg = messages[i] as { role?: unknown; usage?: unknown }; + if (msg?.role === "assistant" && msg.usage && typeof msg.usage === "object") { + return msg.usage as Record; + } + } + return null; +} + +export type AnthropicPayloadLogger = { + enabled: true; + wrapStreamFn: (streamFn: StreamFn) => StreamFn; + recordUsage: (messages: AgentMessage[], error?: unknown) => void; +}; + +export function createAnthropicPayloadLogger(params: { + env?: NodeJS.ProcessEnv; + runId?: string; + sessionId?: string; + sessionKey?: string; + provider?: string; + modelId?: string; + modelApi?: string | null; + workspaceDir?: string; +}): AnthropicPayloadLogger | null { + const env = params.env ?? process.env; + const cfg = resolvePayloadLogConfig(env); + if (!cfg.enabled) return null; + + const writer = getWriter(cfg.filePath); + const base: Omit = { + runId: params.runId, + sessionId: params.sessionId, + sessionKey: params.sessionKey, + provider: params.provider, + modelId: params.modelId, + modelApi: params.modelApi, + workspaceDir: params.workspaceDir, + }; + + const record = (event: PayloadLogEvent) => { + const line = safeJsonStringify(event); + if (!line) return; + writer.write(`${line}\n`); + }; + + const wrapStreamFn: AnthropicPayloadLogger["wrapStreamFn"] = (streamFn) => { + const wrapped: StreamFn = (model, context, options) => { + if (!isAnthropicModel(model as Model)) { + return streamFn(model, context, options); + } + const nextOnPayload = (payload: unknown) => { + record({ + ...base, + ts: new Date().toISOString(), + stage: "request", + payload, + payloadDigest: digest(payload), + }); + options?.onPayload?.(payload); + }; + return streamFn(model, context, { + ...(options ?? {}), + onPayload: nextOnPayload, + }); + }; + return wrapped; + }; + + const recordUsage: AnthropicPayloadLogger["recordUsage"] = (messages, error) => { + const usage = findLastAssistantUsage(messages); + if (!usage) { + if (error) { + record({ + ...base, + ts: new Date().toISOString(), + stage: "usage", + error: String(error), + }); + } + return; + } + record({ + ...base, + ts: new Date().toISOString(), + stage: "usage", + usage, + error: error ? String(error) : undefined, + }); + log.info("anthropic usage", { + runId: params.runId, + sessionId: params.sessionId, + usage, + }); + }; + + log.info("anthropic payload logger enabled", { filePath: writer.filePath }); + return { enabled: true, wrapStreamFn, recordUsage }; +} diff --git a/src/agents/pi-embedded-runner/run/attempt.ts b/src/agents/pi-embedded-runner/run/attempt.ts index f64578369..c121bb42b 100644 --- a/src/agents/pi-embedded-runner/run/attempt.ts +++ b/src/agents/pi-embedded-runner/run/attempt.ts @@ -20,6 +20,7 @@ import { isReasoningTagProvider } from "../../../utils/provider-utils.js"; import { isSubagentSessionKey } from "../../../routing/session-key.js"; import { resolveUserPath } from "../../../utils.js"; import { createCacheTrace } from "../../cache-trace.js"; +import { createAnthropicPayloadLogger } from "../../anthropic-payload-log.js"; import { resolveClawdbotAgentDir } from "../../agent-paths.js"; import { resolveSessionAgentIds } from "../../agent-scope.js"; import { makeBootstrapWarn, resolveBootstrapContextForRun } from "../../bootstrap-files.js"; @@ -458,6 +459,16 @@ export async function runEmbeddedAttempt( modelApi: params.model.api, workspaceDir: params.workspaceDir, }); + const anthropicPayloadLogger = createAnthropicPayloadLogger({ + env: process.env, + runId: params.runId, + sessionId: activeSession.sessionId, + sessionKey: params.sessionKey, + provider: params.provider, + modelId: params.modelId, + modelApi: params.model.api, + workspaceDir: params.workspaceDir, + }); // Force a stable streamFn reference so vitest can reliably mock @mariozechner/pi-ai. activeSession.agent.streamFn = streamSimple; @@ -478,6 +489,11 @@ export async function runEmbeddedAttempt( }); activeSession.agent.streamFn = cacheTrace.wrapStreamFn(activeSession.agent.streamFn); } + if (anthropicPayloadLogger) { + activeSession.agent.streamFn = anthropicPayloadLogger.wrapStreamFn( + activeSession.agent.streamFn, + ); + } try { const prior = await sanitizeSessionHistory({ @@ -772,6 +788,7 @@ export async function runEmbeddedAttempt( messages: messagesSnapshot, note: promptError ? "prompt error" : undefined, }); + anthropicPayloadLogger?.recordUsage(messagesSnapshot, promptError); // Run agent_end hooks to allow plugins to analyze the conversation // This is fire-and-forget, so we don't await