feat: add raw stream logging flags

This commit is contained in:
Peter Steinberger
2026-01-09 03:45:14 +00:00
parent 69546d563d
commit 8e27ea7371
8 changed files with 149 additions and 8 deletions

View File

@@ -1,8 +1,11 @@
import fs from "node:fs";
import path from "node:path";
import type { AgentEvent, AgentMessage } from "@mariozechner/pi-agent-core";
import type { AssistantMessage } from "@mariozechner/pi-ai";
import type { AgentSession } from "@mariozechner/pi-coding-agent";
import type { ReasoningLevel } from "../auto-reply/thinking.js";
import { formatToolAggregate } from "../auto-reply/tool-meta.js";
import { resolveStateDir } from "../config/paths.js";
import { emitAgentEvent } from "../infra/agent-events.js";
import { createSubsystemLogger } from "../logging.js";
import { splitMediaFromOutput } from "../media/parse.js";
@@ -23,6 +26,31 @@ const THINKING_OPEN_GLOBAL_RE = /<\s*think(?:ing)?\s*>/gi;
const THINKING_CLOSE_GLOBAL_RE = /<\s*\/\s*think(?:ing)?\s*>/gi;
const TOOL_RESULT_MAX_CHARS = 8000;
const log = createSubsystemLogger("agent/embedded");
const RAW_STREAM_ENABLED = process.env.CLAWDBOT_RAW_STREAM === "1";
const RAW_STREAM_PATH =
process.env.CLAWDBOT_RAW_STREAM_PATH?.trim() ||
path.join(resolveStateDir(), "logs", "raw-stream.jsonl");
let rawStreamReady = false;
const appendRawStream = (payload: Record<string, unknown>) => {
if (!RAW_STREAM_ENABLED) return;
if (!rawStreamReady) {
rawStreamReady = true;
try {
fs.mkdirSync(path.dirname(RAW_STREAM_PATH), { recursive: true });
} catch {
// ignore raw stream mkdir failures
}
}
try {
void fs.promises.appendFile(
RAW_STREAM_PATH,
`${JSON.stringify(payload)}\n`,
);
} catch {
// ignore raw stream write failures
}
};
export type { BlockReplyChunking } from "./pi-embedded-block-chunker.js";
@@ -664,6 +692,15 @@ export function subscribeEmbeddedPiSession(params: {
typeof assistantRecord?.content === "string"
? assistantRecord.content
: "";
appendRawStream({
ts: Date.now(),
event: "assistant_text_stream",
runId: params.runId,
sessionId: (params.session as { id?: string }).id,
evtType,
delta,
content,
});
let chunk = "";
if (evtType === "text_delta") {
chunk = delta;
@@ -756,6 +793,14 @@ export function subscribeEmbeddedPiSession(params: {
if (msg?.role === "assistant") {
const assistantMessage = msg as AssistantMessage;
const rawText = extractAssistantText(assistantMessage);
appendRawStream({
ts: Date.now(),
event: "assistant_message_end",
runId: params.runId,
sessionId: (params.session as { id?: string }).id,
rawText,
rawThinking: extractAssistantThinking(assistantMessage),
});
const cleaned = params.enforceFinalTag
? stripThinkingSegments(stripUnpairedThinkingTags(rawText))
: stripThinkingSegments(rawText);

View File

@@ -599,7 +599,7 @@ describe("directive behavior", () => {
const text = Array.isArray(res) ? res[0]?.text : res?.text;
expect(text).toContain("Elevated mode disabled.");
expect(text).toContain("status agent:main:main");
expect(text).toContain("Session: agent:main:main");
expect(runEmbeddedPiAgent).not.toHaveBeenCalled();
});
});

View File

@@ -555,7 +555,6 @@ export async function handleCommands(params: {
const reply = await buildStatusReply({
cfg,
command,
provider: command.provider,
sessionEntry,
sessionKey,
sessionScope,

View File

@@ -65,7 +65,6 @@ describe("buildStatusMessage", () => {
},
},
},
},
} as ClawdbotConfig,
agent: {
model: "anthropic/pi:opus",
@@ -248,7 +247,6 @@ describe("buildStatusMessage", () => {
},
},
},
},
} as ClawdbotConfig,
agent: { model: "anthropic/claude-opus-4-5" },
sessionEntry: { sessionId: "c1", updatedAt: 0, inputTokens: 10 },

View File

@@ -296,7 +296,10 @@ export function buildStatusMessage(args: StatusArgs): string {
const activationLine = activationParts.filter(Boolean).join(" · ");
const authMode = resolveModelAuthMode(provider, args.config);
const showCost = authMode === "api-key";
const authLabelValue =
args.modelAuth ??
(authMode && authMode !== "unknown" ? authMode : undefined);
const showCost = authLabelValue === "api-key" || authLabelValue === "mixed";
const costConfig = showCost
? resolveModelCostConfig({
provider,
@@ -319,9 +322,6 @@ export function buildStatusMessage(args: StatusArgs): string {
const costLabel = showCost && hasUsage ? formatUsd(cost) : undefined;
const modelLabel = model ? `${provider}/${model}` : "unknown";
const authLabelValue =
args.modelAuth ??
(authMode && authMode !== "unknown" ? authMode : undefined);
const authLabel = authLabelValue ? ` · 🔑 ${authLabelValue}` : "";
const modelLine = `🧠 Model: ${modelLabel}${authLabel}`;
const commit = resolveCommitHash();

View File

@@ -50,6 +50,8 @@ type GatewayRunOpts = {
verbose?: boolean;
wsLog?: unknown;
compact?: boolean;
rawStream?: boolean;
rawStreamPath?: unknown;
};
type GatewayRunParams = {
@@ -300,6 +302,14 @@ async function runGatewayCommand(
}
setGatewayWsLogStyle(wsLogStyle);
if (opts.rawStream) {
process.env.CLAWDBOT_RAW_STREAM = "1";
}
const rawStreamPath = toOptionString(opts.rawStreamPath);
if (rawStreamPath) {
process.env.CLAWDBOT_RAW_STREAM_PATH = rawStreamPath;
}
const cfg = loadConfig();
const portOverride = parsePort(opts.port);
if (opts.port !== undefined && portOverride === null) {
@@ -565,6 +575,8 @@ function addGatewayRunCommand(
"auto",
)
.option("--compact", 'Alias for "--ws-log compact"', false)
.option("--raw-stream", "Log raw model stream events to jsonl", false)
.option("--raw-stream-path <path>", "Raw stream jsonl path")
.action(async (opts) => {
await runGatewayCommand(opts, params);
});