fix: serialize claude cli runs

This commit is contained in:
Peter Steinberger
2026-01-09 04:58:21 +00:00
parent aa5e75e853
commit 9114331218
8 changed files with 222 additions and 47 deletions

View File

@@ -4,6 +4,20 @@ import { runClaudeCliAgent } from "./claude-cli-runner.js";
const runCommandWithTimeoutMock = vi.fn(); const runCommandWithTimeoutMock = vi.fn();
function createDeferred<T>() {
let resolve: (value: T) => void;
let reject: (error: unknown) => void;
const promise = new Promise<T>((res, rej) => {
resolve = res;
reject = rej;
});
return {
promise,
resolve: resolve as (value: T) => void,
reject: reject as (error: unknown) => void,
};
}
vi.mock("../process/exec.js", () => ({ vi.mock("../process/exec.js", () => ({
runCommandWithTimeout: (...args: unknown[]) => runCommandWithTimeoutMock(...args), runCommandWithTimeout: (...args: unknown[]) => runCommandWithTimeoutMock(...args),
})); }));
@@ -13,7 +27,7 @@ describe("runClaudeCliAgent", () => {
runCommandWithTimeoutMock.mockReset(); runCommandWithTimeoutMock.mockReset();
}); });
it("starts a new session without --session-id when no resume id", async () => { it("starts a new session with --session-id when none is provided", async () => {
runCommandWithTimeoutMock.mockResolvedValueOnce({ runCommandWithTimeoutMock.mockResolvedValueOnce({
stdout: JSON.stringify({ message: "ok", session_id: "sid-1" }), stdout: JSON.stringify({ message: "ok", session_id: "sid-1" }),
stderr: "", stderr: "",
@@ -35,11 +49,11 @@ describe("runClaudeCliAgent", () => {
expect(runCommandWithTimeoutMock).toHaveBeenCalledTimes(1); expect(runCommandWithTimeoutMock).toHaveBeenCalledTimes(1);
const argv = runCommandWithTimeoutMock.mock.calls[0]?.[0] as string[]; const argv = runCommandWithTimeoutMock.mock.calls[0]?.[0] as string[];
expect(argv).toContain("claude"); expect(argv).toContain("claude");
expect(argv).not.toContain("--session-id"); expect(argv).toContain("--session-id");
expect(argv).not.toContain("--resume"); expect(argv).toContain("hi");
}); });
it("uses --resume when a resume session id is provided", async () => { it("uses provided --session-id when a claude session id is provided", async () => {
runCommandWithTimeoutMock.mockResolvedValueOnce({ runCommandWithTimeoutMock.mockResolvedValueOnce({
stdout: JSON.stringify({ message: "ok", session_id: "sid-2" }), stdout: JSON.stringify({ message: "ok", session_id: "sid-2" }),
stderr: "", stderr: "",
@@ -56,13 +70,78 @@ describe("runClaudeCliAgent", () => {
model: "opus", model: "opus",
timeoutMs: 1_000, timeoutMs: 1_000,
runId: "run-2", runId: "run-2",
resumeSessionId: "sid-1", claudeSessionId: "c9d7b831-1c31-4d22-80b9-1e50ca207d4b",
}); });
expect(runCommandWithTimeoutMock).toHaveBeenCalledTimes(1); expect(runCommandWithTimeoutMock).toHaveBeenCalledTimes(1);
const argv = runCommandWithTimeoutMock.mock.calls[0]?.[0] as string[]; const argv = runCommandWithTimeoutMock.mock.calls[0]?.[0] as string[];
expect(argv).toContain("--resume"); expect(argv).toContain("--session-id");
expect(argv).toContain("sid-1"); expect(argv).toContain("c9d7b831-1c31-4d22-80b9-1e50ca207d4b");
expect(argv).not.toContain("--session-id"); expect(argv).toContain("hi");
});
it("serializes concurrent claude-cli runs", async () => {
const firstDeferred = createDeferred<{
stdout: string;
stderr: string;
code: number | null;
signal: NodeJS.Signals | null;
killed: boolean;
}>();
const secondDeferred = createDeferred<{
stdout: string;
stderr: string;
code: number | null;
signal: NodeJS.Signals | null;
killed: boolean;
}>();
runCommandWithTimeoutMock
.mockImplementationOnce(() => firstDeferred.promise)
.mockImplementationOnce(() => secondDeferred.promise);
const firstRun = runClaudeCliAgent({
sessionId: "s1",
sessionFile: "/tmp/session.jsonl",
workspaceDir: "/tmp",
prompt: "first",
model: "opus",
timeoutMs: 1_000,
runId: "run-1",
});
const secondRun = runClaudeCliAgent({
sessionId: "s2",
sessionFile: "/tmp/session.jsonl",
workspaceDir: "/tmp",
prompt: "second",
model: "opus",
timeoutMs: 1_000,
runId: "run-2",
});
await Promise.resolve();
expect(runCommandWithTimeoutMock).toHaveBeenCalledTimes(1);
firstDeferred.resolve({
stdout: JSON.stringify({ message: "ok", session_id: "sid-1" }),
stderr: "",
code: 0,
signal: null,
killed: false,
});
await Promise.resolve();
expect(runCommandWithTimeoutMock).toHaveBeenCalledTimes(2);
secondDeferred.resolve({
stdout: JSON.stringify({ message: "ok", session_id: "sid-2" }),
stderr: "",
code: 0,
signal: null,
killed: false,
});
await Promise.all([firstRun, secondRun]);
}); });
}); });

View File

@@ -1,3 +1,4 @@
import crypto from "node:crypto";
import os from "node:os"; import os from "node:os";
import type { AgentTool } from "@mariozechner/pi-agent-core"; import type { AgentTool } from "@mariozechner/pi-agent-core";
@@ -7,6 +8,7 @@ import type { ClawdbotConfig } from "../config/config.js";
import { createSubsystemLogger } from "../logging.js"; import { createSubsystemLogger } from "../logging.js";
import { runCommandWithTimeout } from "../process/exec.js"; import { runCommandWithTimeout } from "../process/exec.js";
import { resolveUserPath } from "../utils.js"; import { resolveUserPath } from "../utils.js";
import { shouldLogVerbose } from "../globals.js";
import { import {
buildBootstrapContextFiles, buildBootstrapContextFiles,
type EmbeddedContextFile, type EmbeddedContextFile,
@@ -16,6 +18,20 @@ import { buildAgentSystemPrompt } from "./system-prompt.js";
import { loadWorkspaceBootstrapFiles } from "./workspace.js"; import { loadWorkspaceBootstrapFiles } from "./workspace.js";
const log = createSubsystemLogger("agent/claude-cli"); const log = createSubsystemLogger("agent/claude-cli");
const CLAUDE_CLI_QUEUE_KEY = "global";
const CLAUDE_CLI_RUN_QUEUE = new Map<string, Promise<unknown>>();
function enqueueClaudeCliRun<T>(key: string, task: () => Promise<T>): Promise<T> {
const prior = CLAUDE_CLI_RUN_QUEUE.get(key) ?? Promise.resolve();
const chained = prior.catch(() => undefined).then(task);
const tracked = chained.finally(() => {
if (CLAUDE_CLI_RUN_QUEUE.get(key) === tracked) {
CLAUDE_CLI_RUN_QUEUE.delete(key);
}
});
CLAUDE_CLI_RUN_QUEUE.set(key, tracked);
return chained;
}
type ClaudeCliUsage = { type ClaudeCliUsage = {
input?: number; input?: number;
@@ -31,6 +47,15 @@ type ClaudeCliOutput = {
usage?: ClaudeCliUsage; usage?: ClaudeCliUsage;
}; };
const UUID_RE =
/^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$/i;
function normalizeClaudeSessionId(raw?: string): string {
const trimmed = raw?.trim();
if (trimmed && UUID_RE.test(trimmed)) return trimmed;
return crypto.randomUUID();
}
function resolveUserTimezone(configured?: string): string { function resolveUserTimezone(configured?: string): string {
const trimmed = configured?.trim(); const trimmed = configured?.trim();
if (trimmed) { if (trimmed) {
@@ -207,7 +232,7 @@ async function runClaudeCliOnce(params: {
modelId: string; modelId: string;
systemPrompt: string; systemPrompt: string;
timeoutMs: number; timeoutMs: number;
resumeSessionId?: string; sessionId: string;
}): Promise<ClaudeCliOutput> { }): Promise<ClaudeCliOutput> {
const args = [ const args = [
"-p", "-p",
@@ -218,28 +243,74 @@ async function runClaudeCliOnce(params: {
"--append-system-prompt", "--append-system-prompt",
params.systemPrompt, params.systemPrompt,
"--dangerously-skip-permissions", "--dangerously-skip-permissions",
"--permission-mode", "--session-id",
"dontAsk", params.sessionId,
"--tools",
"",
]; ];
if (params.resumeSessionId) {
args.push("--resume", params.resumeSessionId);
}
args.push(params.prompt); args.push(params.prompt);
log.info(
`claude-cli exec: model=${normalizeClaudeCliModel(params.modelId)} promptChars=${params.prompt.length} systemPromptChars=${params.systemPrompt.length}`,
);
if (process.env.CLAWDBOT_CLAUDE_CLI_LOG_OUTPUT === "1") {
const logArgs: string[] = [];
for (let i = 0; i < args.length; i += 1) {
const arg = args[i];
if (arg === "--append-system-prompt") {
logArgs.push(arg, `<systemPrompt:${params.systemPrompt.length} chars>`);
i += 1;
continue;
}
if (arg === "--session-id") {
logArgs.push(arg, args[i + 1] ?? "");
i += 1;
continue;
}
logArgs.push(arg);
}
const promptIndex = logArgs.indexOf(params.prompt);
if (promptIndex >= 0) {
logArgs[promptIndex] = `<prompt:${params.prompt.length} chars>`;
}
log.info(`claude-cli argv: claude ${logArgs.join(" ")}`);
}
const result = await runCommandWithTimeout(["claude", ...args], { const result = await runCommandWithTimeout(["claude", ...args], {
timeoutMs: params.timeoutMs, timeoutMs: params.timeoutMs,
cwd: params.workspaceDir, cwd: params.workspaceDir,
}); });
if (process.env.CLAWDBOT_CLAUDE_CLI_LOG_OUTPUT === "1") {
const stdoutDump = result.stdout.trim();
const stderrDump = result.stderr.trim();
if (stdoutDump) {
log.info(`claude-cli stdout:\n${stdoutDump}`);
}
if (stderrDump) {
log.info(`claude-cli stderr:\n${stderrDump}`);
}
}
const stdout = result.stdout.trim(); const stdout = result.stdout.trim();
const logOutputText = process.env.CLAWDBOT_CLAUDE_CLI_LOG_OUTPUT === "1";
if (shouldLogVerbose()) {
if (stdout) {
log.debug(`claude-cli stdout:\n${stdout}`);
}
if (result.stderr.trim()) {
log.debug(`claude-cli stderr:\n${result.stderr.trim()}`);
}
}
if (result.code !== 0) { if (result.code !== 0) {
const err = result.stderr.trim() || stdout || "Claude CLI failed."; const err = result.stderr.trim() || stdout || "Claude CLI failed.";
throw new Error(err); throw new Error(err);
} }
const parsed = parseClaudeCliJson(stdout); const parsed = parseClaudeCliJson(stdout);
if (parsed) return parsed; const output = parsed ?? { text: stdout };
return { text: stdout }; if (logOutputText) {
const text = output.text?.trim();
if (text) {
log.info(`claude-cli output:\n${text}`);
}
}
return output;
} }
export async function runClaudeCliAgent(params: { export async function runClaudeCliAgent(params: {
@@ -256,7 +327,7 @@ export async function runClaudeCliAgent(params: {
runId: string; runId: string;
extraSystemPrompt?: string; extraSystemPrompt?: string;
ownerNumbers?: string[]; ownerNumbers?: string[];
resumeSessionId?: string; claudeSessionId?: string;
}): Promise<EmbeddedPiRunResult> { }): Promise<EmbeddedPiRunResult> {
const started = Date.now(); const started = Date.now();
const resolvedWorkspace = resolveUserPath(params.workspaceDir); const resolvedWorkspace = resolveUserPath(params.workspaceDir);
@@ -285,29 +356,17 @@ export async function runClaudeCliAgent(params: {
modelDisplay, modelDisplay,
}); });
let output: ClaudeCliOutput; const claudeSessionId = normalizeClaudeSessionId(params.claudeSessionId);
try { const output = await enqueueClaudeCliRun(CLAUDE_CLI_QUEUE_KEY, () =>
output = await runClaudeCliOnce({ runClaudeCliOnce({
prompt: params.prompt, prompt: params.prompt,
workspaceDir, workspaceDir,
modelId, modelId,
systemPrompt, systemPrompt,
timeoutMs: params.timeoutMs, timeoutMs: params.timeoutMs,
resumeSessionId: params.resumeSessionId, sessionId: claudeSessionId,
}); }),
} catch (err) { );
if (!params.resumeSessionId) throw err;
log.warn(
`claude-cli resume failed for ${params.resumeSessionId}; retrying without resume`,
);
output = await runClaudeCliOnce({
prompt: params.prompt,
workspaceDir,
modelId,
systemPrompt,
timeoutMs: params.timeoutMs,
});
}
const text = output.text?.trim(); const text = output.text?.trim();
const payloads = text ? [{ text }] : undefined; const payloads = text ? [{ text }] : undefined;
@@ -317,7 +376,7 @@ export async function runClaudeCliAgent(params: {
meta: { meta: {
durationMs: Date.now() - started, durationMs: Date.now() - started,
agentMeta: { agentMeta: {
sessionId: output.sessionId ?? params.sessionId, sessionId: output.sessionId ?? claudeSessionId,
provider: params.provider ?? "claude-cli", provider: params.provider ?? "claude-cli",
model: modelId, model: modelId,
usage: output.usage, usage: output.usage,

View File

@@ -352,9 +352,9 @@ export async function runReplyAgent(params: {
runId, runId,
extraSystemPrompt: followupRun.run.extraSystemPrompt, extraSystemPrompt: followupRun.run.extraSystemPrompt,
ownerNumbers: followupRun.run.ownerNumbers, ownerNumbers: followupRun.run.ownerNumbers,
resumeSessionId: claudeSessionId:
sessionEntry?.claudeCliSessionId?.trim() || undefined, sessionEntry?.claudeCliSessionId?.trim() || undefined,
}) })
.then((result) => { .then((result) => {
emitAgentEvent({ emitAgentEvent({
runId, runId,

View File

@@ -24,7 +24,7 @@ import {
import { setVerbose } from "../globals.js"; import { setVerbose } from "../globals.js";
import { GatewayLockError } from "../infra/gateway-lock.js"; import { GatewayLockError } from "../infra/gateway-lock.js";
import { formatPortDiagnostics, inspectPortUsage } from "../infra/ports.js"; import { formatPortDiagnostics, inspectPortUsage } from "../infra/ports.js";
import { createSubsystemLogger } from "../logging.js"; import { createSubsystemLogger, setConsoleSubsystemFilter } from "../logging.js";
import { defaultRuntime } from "../runtime.js"; import { defaultRuntime } from "../runtime.js";
import { forceFreePortAndWait } from "./ports.js"; import { forceFreePortAndWait } from "./ports.js";
import { withProgress } from "./progress.js"; import { withProgress } from "./progress.js";
@@ -48,6 +48,7 @@ type GatewayRunOpts = {
allowUnconfigured?: boolean; allowUnconfigured?: boolean;
force?: boolean; force?: boolean;
verbose?: boolean; verbose?: boolean;
claudeCliLogs?: boolean;
wsLog?: unknown; wsLog?: unknown;
compact?: boolean; compact?: boolean;
rawStream?: boolean; rawStream?: boolean;
@@ -286,6 +287,10 @@ async function runGatewayCommand(
} }
setVerbose(Boolean(opts.verbose)); setVerbose(Boolean(opts.verbose));
if (opts.claudeCliLogs) {
setConsoleSubsystemFilter(["agent/claude-cli"]);
process.env.CLAWDBOT_CLAUDE_CLI_LOG_OUTPUT = "1";
}
const wsLogRaw = (opts.compact ? "compact" : opts.wsLog) as const wsLogRaw = (opts.compact ? "compact" : opts.wsLog) as
| string | string
| undefined; | undefined;
@@ -569,6 +574,11 @@ function addGatewayRunCommand(
false, false,
) )
.option("--verbose", "Verbose logging to stdout/stderr", false) .option("--verbose", "Verbose logging to stdout/stderr", false)
.option(
"--claude-cli-logs",
"Only show claude-cli logs in the console (includes stdout/stderr)",
false,
)
.option( .option(
"--ws-log <style>", "--ws-log <style>",
'WebSocket log style ("auto"|"full"|"compact")', 'WebSocket log style ("auto"|"full"|"compact")',

View File

@@ -411,7 +411,7 @@ export async function agentCommand(
let result: Awaited<ReturnType<typeof runEmbeddedPiAgent>>; let result: Awaited<ReturnType<typeof runEmbeddedPiAgent>>;
let fallbackProvider = provider; let fallbackProvider = provider;
let fallbackModel = model; let fallbackModel = model;
const claudeResumeId = sessionEntry?.claudeCliSessionId?.trim(); const claudeSessionId = sessionEntry?.claudeCliSessionId?.trim();
try { try {
const messageProvider = resolveMessageProvider( const messageProvider = resolveMessageProvider(
opts.messageProvider, opts.messageProvider,
@@ -436,7 +436,7 @@ export async function agentCommand(
timeoutMs, timeoutMs,
runId, runId,
extraSystemPrompt: opts.extraSystemPrompt, extraSystemPrompt: opts.extraSystemPrompt,
resumeSessionId: claudeResumeId, claudeSessionId,
}); });
} }
return runEmbeddedPiAgent({ return runEmbeddedPiAgent({

View File

@@ -424,7 +424,7 @@ export async function runCronIsolatedAgentTurn(params: {
sessionKey: params.sessionKey, sessionKey: params.sessionKey,
}); });
const messageProvider = resolvedDelivery.provider; const messageProvider = resolvedDelivery.provider;
const claudeResumeId = cronSession.sessionEntry.claudeCliSessionId?.trim(); const claudeSessionId = cronSession.sessionEntry.claudeCliSessionId?.trim();
const fallbackResult = await runWithModelFallback({ const fallbackResult = await runWithModelFallback({
cfg: params.cfg, cfg: params.cfg,
provider, provider,
@@ -443,7 +443,7 @@ export async function runCronIsolatedAgentTurn(params: {
thinkLevel, thinkLevel,
timeoutMs, timeoutMs,
runId: cronSession.sessionEntry.sessionId, runId: cronSession.sessionEntry.sessionId,
resumeSessionId: claudeResumeId, claudeSessionId,
}); });
} }
return runEmbeddedPiAgent({ return runEmbeddedPiAgent({

View File

@@ -1,5 +1,6 @@
import chalk from "chalk"; import chalk from "chalk";
import { isVerbose } from "../globals.js"; import { isVerbose } from "../globals.js";
import { shouldLogSubsystemToConsole } from "../logging.js";
import { DEFAULT_WS_SLOW_MS, getGatewayWsLogStyle } from "./ws-logging.js"; import { DEFAULT_WS_SLOW_MS, getGatewayWsLogStyle } from "./ws-logging.js";
const LOG_VALUE_LIMIT = 240; const LOG_VALUE_LIMIT = 240;
@@ -140,6 +141,7 @@ export function logWs(
kind: string, kind: string,
meta?: Record<string, unknown>, meta?: Record<string, unknown>,
) { ) {
if (!shouldLogSubsystemToConsole("gateway/ws")) return;
const style = getGatewayWsLogStyle(); const style = getGatewayWsLogStyle();
if (!isVerbose()) { if (!isVerbose()) {
logWsOptimized(direction, kind, meta); logWsOptimized(direction, kind, meta);

View File

@@ -57,6 +57,7 @@ let cachedConsoleSettings: ConsoleSettings | null = null;
let overrideSettings: LoggerSettings | null = null; let overrideSettings: LoggerSettings | null = null;
let consolePatched = false; let consolePatched = false;
let forceConsoleToStderr = false; let forceConsoleToStderr = false;
let consoleSubsystemFilter: string[] | null = null;
let rawConsole: { let rawConsole: {
log: typeof console.log; log: typeof console.log;
info: typeof console.info; info: typeof console.info;
@@ -258,6 +259,27 @@ export function routeLogsToStderr(): void {
forceConsoleToStderr = true; forceConsoleToStderr = true;
} }
export function setConsoleSubsystemFilter(filters?: string[] | null): void {
if (!filters || filters.length === 0) {
consoleSubsystemFilter = null;
return;
}
const normalized = filters
.map((value) => value.trim())
.filter((value) => value.length > 0);
consoleSubsystemFilter = normalized.length > 0 ? normalized : null;
}
export function shouldLogSubsystemToConsole(subsystem: string): boolean {
if (!consoleSubsystemFilter || consoleSubsystemFilter.length === 0) {
return true;
}
return consoleSubsystemFilter.some(
(prefix) =>
subsystem === prefix || subsystem.startsWith(`${prefix}/`),
);
}
const SUPPRESSED_CONSOLE_PREFIXES = [ const SUPPRESSED_CONSOLE_PREFIXES = [
"Closing session:", "Closing session:",
"Opening session:", "Opening session:",
@@ -536,6 +558,7 @@ export function createSubsystemLogger(subsystem: string): SubsystemLogger {
} }
logToFile(getFileLogger(), level, message, fileMeta); logToFile(getFileLogger(), level, message, fileMeta);
if (!shouldLogToConsole(level, consoleSettings)) return; if (!shouldLogToConsole(level, consoleSettings)) return;
if (!shouldLogSubsystemToConsole(subsystem)) return;
const line = formatConsoleLine({ const line = formatConsoleLine({
level, level,
subsystem, subsystem,
@@ -559,7 +582,9 @@ export function createSubsystemLogger(subsystem: string): SubsystemLogger {
fatal: (message, meta) => emit("fatal", message, meta), fatal: (message, meta) => emit("fatal", message, meta),
raw: (message) => { raw: (message) => {
logToFile(getFileLogger(), "info", message, { raw: true }); logToFile(getFileLogger(), "info", message, { raw: true });
writeConsoleLine("info", message); if (shouldLogSubsystemToConsole(subsystem)) {
writeConsoleLine("info", message);
}
}, },
child: (name) => createSubsystemLogger(`${subsystem}/${name}`), child: (name) => createSubsystemLogger(`${subsystem}/${name}`),
}; };