cache trace mvp
Added a standalone cache tracing module and wired it into the embedded
runner so you can capture message flow and the exact context sent to
Anthropic in a separate JSONL file.
What changed
- New tracing module: src/agents/cache-trace.ts (self‑contained,
env‑gated, writes JSONL, computes per‑message digests).
- Hook points in src/agents/pi-embedded-runner/run/attempt.ts: logs
stage snapshots (loaded/sanitized/limited/prompt/stream/after) and wraps
the
stream fn to record the real context.messages at send time.
How to enable
- CLAWDBOT_CACHE_TRACE=1 enables tracing.
- CLAWDBOT_CACHE_TRACE_FILE=~/.clawdbot/logs/cache-trace.jsonl
overrides output (default is
$CLAWDBOT_STATE_DIR/logs/cache-trace.jsonl).
- Optional filters:
- CLAWDBOT_CACHE_TRACE_MESSAGES=0 to omit full messages (still
logs digests).
- CLAWDBOT_CACHE_TRACE_PROMPT=0 to omit prompt text.
- CLAWDBOT_CACHE_TRACE_SYSTEM=0 to omit system prompt.
What you’ll see
- One JSON object per line with stage, messagesDigest, per‑message
messageFingerprints, and the actual messages if enabled.
- The most important line is stage: "stream:context" — that is the
exact payload pi‑mono is sending. If this diverges from earlier stages,
you’ve
found the mutation point.
This commit is contained in:
committed by
Peter Steinberger
parent
63d017c3af
commit
5392fa0dfa
266
src/agents/cache-trace.ts
Normal file
266
src/agents/cache-trace.ts
Normal file
@@ -0,0 +1,266 @@
|
||||
import crypto from "node:crypto";
|
||||
import fs from "node:fs/promises";
|
||||
import path from "node:path";
|
||||
|
||||
import type { AgentMessage, StreamFn } from "@mariozechner/pi-agent-core";
|
||||
import type { Api, Model } from "@mariozechner/pi-ai";
|
||||
|
||||
import type { ClawdbotConfig } from "../config/config.js";
|
||||
import { resolveStateDir } from "../config/paths.js";
|
||||
import { isTruthyEnvValue } from "../infra/env.js";
|
||||
import { parseBooleanValue } from "../utils/boolean.js";
|
||||
|
||||
export type CacheTraceStage =
|
||||
| "session:loaded"
|
||||
| "session:sanitized"
|
||||
| "session:limited"
|
||||
| "prompt:before"
|
||||
| "prompt:images"
|
||||
| "stream:context"
|
||||
| "session:after";
|
||||
|
||||
export type CacheTraceEvent = {
|
||||
ts: string;
|
||||
seq: number;
|
||||
stage: CacheTraceStage;
|
||||
runId?: string;
|
||||
sessionId?: string;
|
||||
sessionKey?: string;
|
||||
provider?: string;
|
||||
modelId?: string;
|
||||
modelApi?: string | null;
|
||||
workspaceDir?: string;
|
||||
prompt?: string;
|
||||
system?: unknown;
|
||||
options?: Record<string, unknown>;
|
||||
model?: Record<string, unknown>;
|
||||
messages?: AgentMessage[];
|
||||
messageCount?: number;
|
||||
messageRoles?: Array<string | undefined>;
|
||||
messageFingerprints?: string[];
|
||||
messagesDigest?: string;
|
||||
systemDigest?: string;
|
||||
note?: string;
|
||||
error?: string;
|
||||
};
|
||||
|
||||
export type CacheTrace = {
|
||||
enabled: true;
|
||||
filePath: string;
|
||||
recordStage: (stage: CacheTraceStage, payload?: Partial<CacheTraceEvent>) => void;
|
||||
wrapStreamFn: (streamFn: StreamFn) => StreamFn;
|
||||
};
|
||||
|
||||
type CacheTraceInit = {
|
||||
cfg?: ClawdbotConfig;
|
||||
env?: NodeJS.ProcessEnv;
|
||||
runId?: string;
|
||||
sessionId?: string;
|
||||
sessionKey?: string;
|
||||
provider?: string;
|
||||
modelId?: string;
|
||||
modelApi?: string | null;
|
||||
workspaceDir?: string;
|
||||
};
|
||||
|
||||
type CacheTraceConfig = {
|
||||
enabled: boolean;
|
||||
filePath: string;
|
||||
includeMessages: boolean;
|
||||
includePrompt: boolean;
|
||||
includeSystem: boolean;
|
||||
};
|
||||
|
||||
type CacheTraceWriter = {
|
||||
filePath: string;
|
||||
write: (line: string) => void;
|
||||
};
|
||||
|
||||
const writers = new Map<string, CacheTraceWriter>();
|
||||
|
||||
function resolveCacheTraceConfig(params: CacheTraceInit): CacheTraceConfig {
|
||||
const env = params.env ?? process.env;
|
||||
const enabled = isTruthyEnvValue(env.CLAWDBOT_CACHE_TRACE);
|
||||
const filePath =
|
||||
env.CLAWDBOT_CACHE_TRACE_FILE?.trim() ||
|
||||
path.join(resolveStateDir(env), "logs", "cache-trace.jsonl");
|
||||
|
||||
const includeMessages = parseBooleanValue(env.CLAWDBOT_CACHE_TRACE_MESSAGES);
|
||||
const includePrompt = parseBooleanValue(env.CLAWDBOT_CACHE_TRACE_PROMPT);
|
||||
const includeSystem = parseBooleanValue(env.CLAWDBOT_CACHE_TRACE_SYSTEM);
|
||||
|
||||
return {
|
||||
enabled,
|
||||
filePath,
|
||||
includeMessages: includeMessages ?? true,
|
||||
includePrompt: includePrompt ?? true,
|
||||
includeSystem: includeSystem ?? true,
|
||||
};
|
||||
}
|
||||
|
||||
function getWriter(filePath: string): CacheTraceWriter {
|
||||
const existing = writers.get(filePath);
|
||||
if (existing) return existing;
|
||||
|
||||
const dir = path.dirname(filePath);
|
||||
const ready = fs.mkdir(dir, { recursive: true }).catch(() => undefined);
|
||||
let queue = Promise.resolve();
|
||||
|
||||
const writer: CacheTraceWriter = {
|
||||
filePath,
|
||||
write: (line: string) => {
|
||||
queue = queue
|
||||
.then(() => ready)
|
||||
.then(() => fs.appendFile(filePath, line, "utf8"))
|
||||
.catch(() => undefined);
|
||||
},
|
||||
};
|
||||
|
||||
writers.set(filePath, writer);
|
||||
return writer;
|
||||
}
|
||||
|
||||
function stableStringify(value: unknown): string {
|
||||
if (value === null || value === undefined) return String(value);
|
||||
if (typeof value === "number" && !Number.isFinite(value)) return JSON.stringify(String(value));
|
||||
if (typeof value === "bigint") return JSON.stringify(value.toString());
|
||||
if (typeof value !== "object") return JSON.stringify(value) ?? "null";
|
||||
if (value instanceof Error) {
|
||||
return stableStringify({
|
||||
name: value.name,
|
||||
message: value.message,
|
||||
stack: value.stack,
|
||||
});
|
||||
}
|
||||
if (value instanceof Uint8Array) {
|
||||
return stableStringify({
|
||||
type: "Uint8Array",
|
||||
data: Buffer.from(value).toString("base64"),
|
||||
});
|
||||
}
|
||||
if (Array.isArray(value)) {
|
||||
return `[${value.map((entry) => stableStringify(entry)).join(",")}]`;
|
||||
}
|
||||
const record = value as Record<string, unknown>;
|
||||
const keys = Object.keys(record).sort();
|
||||
const entries = keys.map((key) => `${JSON.stringify(key)}:${stableStringify(record[key])}`);
|
||||
return `{${entries.join(",")}}`;
|
||||
}
|
||||
|
||||
function digest(value: unknown): string {
|
||||
const serialized = stableStringify(value);
|
||||
return crypto.createHash("sha256").update(serialized).digest("hex");
|
||||
}
|
||||
|
||||
function summarizeMessages(messages: AgentMessage[]): {
|
||||
messageCount: number;
|
||||
messageRoles: Array<string | undefined>;
|
||||
messageFingerprints: string[];
|
||||
messagesDigest: string;
|
||||
} {
|
||||
const messageFingerprints = messages.map((msg) => digest(msg));
|
||||
return {
|
||||
messageCount: messages.length,
|
||||
messageRoles: messages.map((msg) => (msg as { role?: string }).role),
|
||||
messageFingerprints,
|
||||
messagesDigest: digest(messageFingerprints.join("|")),
|
||||
};
|
||||
}
|
||||
|
||||
function safeJsonStringify(value: unknown): string | null {
|
||||
try {
|
||||
return JSON.stringify(value, (_key, val) => {
|
||||
if (typeof val === "bigint") return val.toString();
|
||||
if (typeof val === "function") return "[Function]";
|
||||
if (val instanceof Error) {
|
||||
return { name: val.name, message: val.message, stack: val.stack };
|
||||
}
|
||||
if (val instanceof Uint8Array) {
|
||||
return { type: "Uint8Array", data: Buffer.from(val).toString("base64") };
|
||||
}
|
||||
return val;
|
||||
});
|
||||
} catch {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
export function createCacheTrace(params: CacheTraceInit): CacheTrace | null {
|
||||
const cfg = resolveCacheTraceConfig(params);
|
||||
if (!cfg.enabled) return null;
|
||||
|
||||
const writer = getWriter(cfg.filePath);
|
||||
let seq = 0;
|
||||
|
||||
const base: Omit<CacheTraceEvent, "ts" | "seq" | "stage"> = {
|
||||
runId: params.runId,
|
||||
sessionId: params.sessionId,
|
||||
sessionKey: params.sessionKey,
|
||||
provider: params.provider,
|
||||
modelId: params.modelId,
|
||||
modelApi: params.modelApi,
|
||||
workspaceDir: params.workspaceDir,
|
||||
};
|
||||
|
||||
const recordStage: CacheTrace["recordStage"] = (stage, payload = {}) => {
|
||||
const event: CacheTraceEvent = {
|
||||
...base,
|
||||
ts: new Date().toISOString(),
|
||||
seq: (seq += 1),
|
||||
stage,
|
||||
};
|
||||
|
||||
if (payload.prompt && cfg.includePrompt) {
|
||||
event.prompt = payload.prompt;
|
||||
}
|
||||
if (payload.system && cfg.includeSystem) {
|
||||
event.system = payload.system;
|
||||
event.systemDigest = digest(payload.system);
|
||||
}
|
||||
if (payload.options) event.options = payload.options;
|
||||
if (payload.model) event.model = payload.model;
|
||||
|
||||
const messages = payload.messages;
|
||||
if (Array.isArray(messages)) {
|
||||
const summary = summarizeMessages(messages);
|
||||
event.messageCount = summary.messageCount;
|
||||
event.messageRoles = summary.messageRoles;
|
||||
event.messageFingerprints = summary.messageFingerprints;
|
||||
event.messagesDigest = summary.messagesDigest;
|
||||
if (cfg.includeMessages) {
|
||||
event.messages = messages;
|
||||
}
|
||||
}
|
||||
|
||||
if (payload.note) event.note = payload.note;
|
||||
if (payload.error) event.error = payload.error;
|
||||
|
||||
const line = safeJsonStringify(event);
|
||||
if (!line) return;
|
||||
writer.write(`${line}\n`);
|
||||
};
|
||||
|
||||
const wrapStreamFn: CacheTrace["wrapStreamFn"] = (streamFn) => {
|
||||
const wrapped: StreamFn = (model, context, options) => {
|
||||
recordStage("stream:context", {
|
||||
model: {
|
||||
id: (model as Model<Api>)?.id,
|
||||
provider: (model as Model<Api>)?.provider,
|
||||
api: (model as Model<Api>)?.api,
|
||||
},
|
||||
system: (context as { system?: unknown }).system,
|
||||
messages: (context as { messages?: AgentMessage[] }).messages ?? [],
|
||||
options: (options ?? {}) as Record<string, unknown>,
|
||||
});
|
||||
return streamFn(model, context, options);
|
||||
};
|
||||
return wrapped;
|
||||
};
|
||||
|
||||
return {
|
||||
enabled: true,
|
||||
filePath: cfg.filePath,
|
||||
recordStage,
|
||||
wrapStreamFn,
|
||||
};
|
||||
}
|
||||
@@ -16,6 +16,7 @@ import { normalizeMessageChannel } from "../../../utils/message-channel.js";
|
||||
import { isReasoningTagProvider } from "../../../utils/provider-utils.js";
|
||||
import { isSubagentSessionKey } from "../../../routing/session-key.js";
|
||||
import { resolveUserPath } from "../../../utils.js";
|
||||
import { createCacheTrace } from "../../cache-trace.js";
|
||||
import { resolveClawdbotAgentDir } from "../../agent-paths.js";
|
||||
import { resolveSessionAgentIds } from "../../agent-scope.js";
|
||||
import { makeBootstrapWarn, resolveBootstrapContextForRun } from "../../bootstrap-files.js";
|
||||
@@ -367,6 +368,17 @@ export async function runEmbeddedAttempt(
|
||||
throw new Error("Embedded agent session missing");
|
||||
}
|
||||
const activeSession = session;
|
||||
const cacheTrace = createCacheTrace({
|
||||
cfg: params.config,
|
||||
env: process.env,
|
||||
runId: params.runId,
|
||||
sessionId: activeSession.sessionId,
|
||||
sessionKey: params.sessionKey,
|
||||
provider: params.provider,
|
||||
modelId: params.modelId,
|
||||
modelApi: params.model.api,
|
||||
workspaceDir: params.workspaceDir,
|
||||
});
|
||||
|
||||
// Force a stable streamFn reference so vitest can reliably mock @mariozechner/pi-ai.
|
||||
activeSession.agent.streamFn = streamSimple;
|
||||
@@ -379,6 +391,15 @@ export async function runEmbeddedAttempt(
|
||||
params.streamParams,
|
||||
);
|
||||
|
||||
if (cacheTrace) {
|
||||
cacheTrace.recordStage("session:loaded", {
|
||||
messages: activeSession.messages,
|
||||
system: systemPrompt,
|
||||
note: "after session create",
|
||||
});
|
||||
activeSession.agent.streamFn = cacheTrace.wrapStreamFn(activeSession.agent.streamFn);
|
||||
}
|
||||
|
||||
try {
|
||||
const prior = await sanitizeSessionHistory({
|
||||
messages: activeSession.messages,
|
||||
@@ -388,12 +409,14 @@ export async function runEmbeddedAttempt(
|
||||
sessionManager,
|
||||
sessionId: params.sessionId,
|
||||
});
|
||||
cacheTrace?.recordStage("session:sanitized", { messages: prior });
|
||||
const validatedGemini = validateGeminiTurns(prior);
|
||||
const validated = validateAnthropicTurns(validatedGemini);
|
||||
const limited = limitHistoryTurns(
|
||||
validated,
|
||||
getDmHistoryLimitFromSessionKey(params.sessionKey, params.config),
|
||||
);
|
||||
cacheTrace?.recordStage("session:limited", { messages: limited });
|
||||
if (limited.length > 0) {
|
||||
activeSession.agent.replaceMessages(limited);
|
||||
}
|
||||
@@ -564,6 +587,10 @@ export async function runEmbeddedAttempt(
|
||||
}
|
||||
|
||||
log.debug(`embedded run prompt start: runId=${params.runId} sessionId=${params.sessionId}`);
|
||||
cacheTrace?.recordStage("prompt:before", {
|
||||
prompt: effectivePrompt,
|
||||
messages: activeSession.messages,
|
||||
});
|
||||
|
||||
// Repair orphaned trailing user messages so new prompts don't violate role ordering.
|
||||
const leafEntry = sessionManager.getLeafEntry();
|
||||
@@ -633,6 +660,12 @@ export async function runEmbeddedAttempt(
|
||||
}
|
||||
}
|
||||
|
||||
cacheTrace?.recordStage("prompt:images", {
|
||||
prompt: effectivePrompt,
|
||||
messages: activeSession.messages,
|
||||
note: `images: prompt=${imageResult.images.length} history=${imageResult.historyImagesByIndex.size}`,
|
||||
});
|
||||
|
||||
// Only pass images option if there are actually images to pass
|
||||
// This avoids potential issues with models that don't expect the images parameter
|
||||
if (imageResult.images.length > 0) {
|
||||
@@ -660,6 +693,10 @@ export async function runEmbeddedAttempt(
|
||||
|
||||
messagesSnapshot = activeSession.messages.slice();
|
||||
sessionIdUsed = activeSession.sessionId;
|
||||
cacheTrace?.recordStage("session:after", {
|
||||
messages: messagesSnapshot,
|
||||
note: promptError ? "prompt error" : undefined,
|
||||
});
|
||||
|
||||
// Run agent_end hooks to allow plugins to analyze the conversation
|
||||
// This is fire-and-forget, so we don't await
|
||||
|
||||
Reference in New Issue
Block a user