From 5392fa0dfab23872462f7e7c19d7fb1e1614f672 Mon Sep 17 00:00:00 2001 From: Andrii Date: Wed, 21 Jan 2026 10:03:24 +0100 Subject: [PATCH] cache trace mvp MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Added a standalone cache tracing module and wired it into the embedded runner so you can capture message flow and the exact context sent to Anthropic in a separate JSONL file. What changed - New tracing module: src/agents/cache-trace.ts (self‑contained, env‑gated, writes JSONL, computes per‑message digests). - Hook points in src/agents/pi-embedded-runner/run/attempt.ts: logs stage snapshots (loaded/sanitized/limited/prompt/stream/after) and wraps the stream fn to record the real context.messages at send time. How to enable - CLAWDBOT_CACHE_TRACE=1 enables tracing. - CLAWDBOT_CACHE_TRACE_FILE=~/.clawdbot/logs/cache-trace.jsonl overrides output (default is $CLAWDBOT_STATE_DIR/logs/cache-trace.jsonl). - Optional filters: - CLAWDBOT_CACHE_TRACE_MESSAGES=0 to omit full messages (still logs digests). - CLAWDBOT_CACHE_TRACE_PROMPT=0 to omit prompt text. - CLAWDBOT_CACHE_TRACE_SYSTEM=0 to omit system prompt. What you’ll see - One JSON object per line with stage, messagesDigest, per‑message messageFingerprints, and the actual messages if enabled. - The most important line is stage: "stream:context" — that is the exact payload pi‑mono is sending. If this diverges from earlier stages, you’ve found the mutation point. --- src/agents/cache-trace.ts | 266 +++++++++++++++++++ src/agents/pi-embedded-runner/run/attempt.ts | 37 +++ 2 files changed, 303 insertions(+) create mode 100644 src/agents/cache-trace.ts diff --git a/src/agents/cache-trace.ts b/src/agents/cache-trace.ts new file mode 100644 index 000000000..b86c15a13 --- /dev/null +++ b/src/agents/cache-trace.ts @@ -0,0 +1,266 @@ +import crypto from "node:crypto"; +import fs from "node:fs/promises"; +import path from "node:path"; + +import type { AgentMessage, StreamFn } from "@mariozechner/pi-agent-core"; +import type { Api, Model } from "@mariozechner/pi-ai"; + +import type { ClawdbotConfig } from "../config/config.js"; +import { resolveStateDir } from "../config/paths.js"; +import { isTruthyEnvValue } from "../infra/env.js"; +import { parseBooleanValue } from "../utils/boolean.js"; + +export type CacheTraceStage = + | "session:loaded" + | "session:sanitized" + | "session:limited" + | "prompt:before" + | "prompt:images" + | "stream:context" + | "session:after"; + +export type CacheTraceEvent = { + ts: string; + seq: number; + stage: CacheTraceStage; + runId?: string; + sessionId?: string; + sessionKey?: string; + provider?: string; + modelId?: string; + modelApi?: string | null; + workspaceDir?: string; + prompt?: string; + system?: unknown; + options?: Record; + model?: Record; + messages?: AgentMessage[]; + messageCount?: number; + messageRoles?: Array; + messageFingerprints?: string[]; + messagesDigest?: string; + systemDigest?: string; + note?: string; + error?: string; +}; + +export type CacheTrace = { + enabled: true; + filePath: string; + recordStage: (stage: CacheTraceStage, payload?: Partial) => void; + wrapStreamFn: (streamFn: StreamFn) => StreamFn; +}; + +type CacheTraceInit = { + cfg?: ClawdbotConfig; + env?: NodeJS.ProcessEnv; + runId?: string; + sessionId?: string; + sessionKey?: string; + provider?: string; + modelId?: string; + modelApi?: string | null; + workspaceDir?: string; +}; + +type CacheTraceConfig = { + enabled: boolean; + filePath: string; + includeMessages: boolean; + includePrompt: boolean; + includeSystem: boolean; +}; + +type CacheTraceWriter = { + filePath: string; + write: (line: string) => void; +}; + +const writers = new Map(); + +function resolveCacheTraceConfig(params: CacheTraceInit): CacheTraceConfig { + const env = params.env ?? process.env; + const enabled = isTruthyEnvValue(env.CLAWDBOT_CACHE_TRACE); + const filePath = + env.CLAWDBOT_CACHE_TRACE_FILE?.trim() || + path.join(resolveStateDir(env), "logs", "cache-trace.jsonl"); + + const includeMessages = parseBooleanValue(env.CLAWDBOT_CACHE_TRACE_MESSAGES); + const includePrompt = parseBooleanValue(env.CLAWDBOT_CACHE_TRACE_PROMPT); + const includeSystem = parseBooleanValue(env.CLAWDBOT_CACHE_TRACE_SYSTEM); + + return { + enabled, + filePath, + includeMessages: includeMessages ?? true, + includePrompt: includePrompt ?? true, + includeSystem: includeSystem ?? true, + }; +} + +function getWriter(filePath: string): CacheTraceWriter { + const existing = writers.get(filePath); + if (existing) return existing; + + const dir = path.dirname(filePath); + const ready = fs.mkdir(dir, { recursive: true }).catch(() => undefined); + let queue = Promise.resolve(); + + const writer: CacheTraceWriter = { + filePath, + write: (line: string) => { + queue = queue + .then(() => ready) + .then(() => fs.appendFile(filePath, line, "utf8")) + .catch(() => undefined); + }, + }; + + writers.set(filePath, writer); + return writer; +} + +function stableStringify(value: unknown): string { + if (value === null || value === undefined) return String(value); + if (typeof value === "number" && !Number.isFinite(value)) return JSON.stringify(String(value)); + if (typeof value === "bigint") return JSON.stringify(value.toString()); + if (typeof value !== "object") return JSON.stringify(value) ?? "null"; + if (value instanceof Error) { + return stableStringify({ + name: value.name, + message: value.message, + stack: value.stack, + }); + } + if (value instanceof Uint8Array) { + return stableStringify({ + type: "Uint8Array", + data: Buffer.from(value).toString("base64"), + }); + } + if (Array.isArray(value)) { + return `[${value.map((entry) => stableStringify(entry)).join(",")}]`; + } + const record = value as Record; + const keys = Object.keys(record).sort(); + const entries = keys.map((key) => `${JSON.stringify(key)}:${stableStringify(record[key])}`); + return `{${entries.join(",")}}`; +} + +function digest(value: unknown): string { + const serialized = stableStringify(value); + return crypto.createHash("sha256").update(serialized).digest("hex"); +} + +function summarizeMessages(messages: AgentMessage[]): { + messageCount: number; + messageRoles: Array; + messageFingerprints: string[]; + messagesDigest: string; +} { + const messageFingerprints = messages.map((msg) => digest(msg)); + return { + messageCount: messages.length, + messageRoles: messages.map((msg) => (msg as { role?: string }).role), + messageFingerprints, + messagesDigest: digest(messageFingerprints.join("|")), + }; +} + +function safeJsonStringify(value: unknown): string | null { + try { + return JSON.stringify(value, (_key, val) => { + if (typeof val === "bigint") return val.toString(); + if (typeof val === "function") return "[Function]"; + if (val instanceof Error) { + return { name: val.name, message: val.message, stack: val.stack }; + } + if (val instanceof Uint8Array) { + return { type: "Uint8Array", data: Buffer.from(val).toString("base64") }; + } + return val; + }); + } catch { + return null; + } +} + +export function createCacheTrace(params: CacheTraceInit): CacheTrace | null { + const cfg = resolveCacheTraceConfig(params); + if (!cfg.enabled) return null; + + const writer = getWriter(cfg.filePath); + let seq = 0; + + const base: Omit = { + runId: params.runId, + sessionId: params.sessionId, + sessionKey: params.sessionKey, + provider: params.provider, + modelId: params.modelId, + modelApi: params.modelApi, + workspaceDir: params.workspaceDir, + }; + + const recordStage: CacheTrace["recordStage"] = (stage, payload = {}) => { + const event: CacheTraceEvent = { + ...base, + ts: new Date().toISOString(), + seq: (seq += 1), + stage, + }; + + if (payload.prompt && cfg.includePrompt) { + event.prompt = payload.prompt; + } + if (payload.system && cfg.includeSystem) { + event.system = payload.system; + event.systemDigest = digest(payload.system); + } + if (payload.options) event.options = payload.options; + if (payload.model) event.model = payload.model; + + const messages = payload.messages; + if (Array.isArray(messages)) { + const summary = summarizeMessages(messages); + event.messageCount = summary.messageCount; + event.messageRoles = summary.messageRoles; + event.messageFingerprints = summary.messageFingerprints; + event.messagesDigest = summary.messagesDigest; + if (cfg.includeMessages) { + event.messages = messages; + } + } + + if (payload.note) event.note = payload.note; + if (payload.error) event.error = payload.error; + + const line = safeJsonStringify(event); + if (!line) return; + writer.write(`${line}\n`); + }; + + const wrapStreamFn: CacheTrace["wrapStreamFn"] = (streamFn) => { + const wrapped: StreamFn = (model, context, options) => { + recordStage("stream:context", { + model: { + id: (model as Model)?.id, + provider: (model as Model)?.provider, + api: (model as Model)?.api, + }, + system: (context as { system?: unknown }).system, + messages: (context as { messages?: AgentMessage[] }).messages ?? [], + options: (options ?? {}) as Record, + }); + return streamFn(model, context, options); + }; + return wrapped; + }; + + return { + enabled: true, + filePath: cfg.filePath, + recordStage, + wrapStreamFn, + }; +} diff --git a/src/agents/pi-embedded-runner/run/attempt.ts b/src/agents/pi-embedded-runner/run/attempt.ts index 941c24b09..6aa62cd39 100644 --- a/src/agents/pi-embedded-runner/run/attempt.ts +++ b/src/agents/pi-embedded-runner/run/attempt.ts @@ -16,6 +16,7 @@ import { normalizeMessageChannel } from "../../../utils/message-channel.js"; import { isReasoningTagProvider } from "../../../utils/provider-utils.js"; import { isSubagentSessionKey } from "../../../routing/session-key.js"; import { resolveUserPath } from "../../../utils.js"; +import { createCacheTrace } from "../../cache-trace.js"; import { resolveClawdbotAgentDir } from "../../agent-paths.js"; import { resolveSessionAgentIds } from "../../agent-scope.js"; import { makeBootstrapWarn, resolveBootstrapContextForRun } from "../../bootstrap-files.js"; @@ -367,6 +368,17 @@ export async function runEmbeddedAttempt( throw new Error("Embedded agent session missing"); } const activeSession = session; + const cacheTrace = createCacheTrace({ + cfg: params.config, + env: process.env, + runId: params.runId, + sessionId: activeSession.sessionId, + sessionKey: params.sessionKey, + provider: params.provider, + modelId: params.modelId, + modelApi: params.model.api, + workspaceDir: params.workspaceDir, + }); // Force a stable streamFn reference so vitest can reliably mock @mariozechner/pi-ai. activeSession.agent.streamFn = streamSimple; @@ -379,6 +391,15 @@ export async function runEmbeddedAttempt( params.streamParams, ); + if (cacheTrace) { + cacheTrace.recordStage("session:loaded", { + messages: activeSession.messages, + system: systemPrompt, + note: "after session create", + }); + activeSession.agent.streamFn = cacheTrace.wrapStreamFn(activeSession.agent.streamFn); + } + try { const prior = await sanitizeSessionHistory({ messages: activeSession.messages, @@ -388,12 +409,14 @@ export async function runEmbeddedAttempt( sessionManager, sessionId: params.sessionId, }); + cacheTrace?.recordStage("session:sanitized", { messages: prior }); const validatedGemini = validateGeminiTurns(prior); const validated = validateAnthropicTurns(validatedGemini); const limited = limitHistoryTurns( validated, getDmHistoryLimitFromSessionKey(params.sessionKey, params.config), ); + cacheTrace?.recordStage("session:limited", { messages: limited }); if (limited.length > 0) { activeSession.agent.replaceMessages(limited); } @@ -564,6 +587,10 @@ export async function runEmbeddedAttempt( } log.debug(`embedded run prompt start: runId=${params.runId} sessionId=${params.sessionId}`); + cacheTrace?.recordStage("prompt:before", { + prompt: effectivePrompt, + messages: activeSession.messages, + }); // Repair orphaned trailing user messages so new prompts don't violate role ordering. const leafEntry = sessionManager.getLeafEntry(); @@ -633,6 +660,12 @@ export async function runEmbeddedAttempt( } } + cacheTrace?.recordStage("prompt:images", { + prompt: effectivePrompt, + messages: activeSession.messages, + note: `images: prompt=${imageResult.images.length} history=${imageResult.historyImagesByIndex.size}`, + }); + // Only pass images option if there are actually images to pass // This avoids potential issues with models that don't expect the images parameter if (imageResult.images.length > 0) { @@ -660,6 +693,10 @@ export async function runEmbeddedAttempt( messagesSnapshot = activeSession.messages.slice(); sessionIdUsed = activeSession.sessionId; + cacheTrace?.recordStage("session:after", { + messages: messagesSnapshot, + note: promptError ? "prompt error" : undefined, + }); // Run agent_end hooks to allow plugins to analyze the conversation // This is fire-and-forget, so we don't await