From 9114331218c2b5b9e5a461bf2bf473a8a4a98cd1 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Fri, 9 Jan 2026 04:58:21 +0000 Subject: [PATCH] fix: serialize claude cli runs --- src/agents/claude-cli-runner.test.ts | 95 +++++++++++++++++++-- src/agents/claude-cli-runner.ts | 119 ++++++++++++++++++++------- src/auto-reply/reply/agent-runner.ts | 6 +- src/cli/gateway-cli.ts | 12 ++- src/commands/agent.ts | 4 +- src/cron/isolated-agent.ts | 4 +- src/gateway/ws-log.ts | 2 + src/logging.ts | 27 +++++- 8 files changed, 222 insertions(+), 47 deletions(-) diff --git a/src/agents/claude-cli-runner.test.ts b/src/agents/claude-cli-runner.test.ts index cfdccdd0e..4ffae66c8 100644 --- a/src/agents/claude-cli-runner.test.ts +++ b/src/agents/claude-cli-runner.test.ts @@ -4,6 +4,20 @@ import { runClaudeCliAgent } from "./claude-cli-runner.js"; const runCommandWithTimeoutMock = vi.fn(); +function createDeferred() { + let resolve: (value: T) => void; + let reject: (error: unknown) => void; + const promise = new Promise((res, rej) => { + resolve = res; + reject = rej; + }); + return { + promise, + resolve: resolve as (value: T) => void, + reject: reject as (error: unknown) => void, + }; +} + vi.mock("../process/exec.js", () => ({ runCommandWithTimeout: (...args: unknown[]) => runCommandWithTimeoutMock(...args), })); @@ -13,7 +27,7 @@ describe("runClaudeCliAgent", () => { runCommandWithTimeoutMock.mockReset(); }); - it("starts a new session without --session-id when no resume id", async () => { + it("starts a new session with --session-id when none is provided", async () => { runCommandWithTimeoutMock.mockResolvedValueOnce({ stdout: JSON.stringify({ message: "ok", session_id: "sid-1" }), stderr: "", @@ -35,11 +49,11 @@ describe("runClaudeCliAgent", () => { expect(runCommandWithTimeoutMock).toHaveBeenCalledTimes(1); const argv = runCommandWithTimeoutMock.mock.calls[0]?.[0] as string[]; expect(argv).toContain("claude"); - expect(argv).not.toContain("--session-id"); - expect(argv).not.toContain("--resume"); + expect(argv).toContain("--session-id"); + expect(argv).toContain("hi"); }); - it("uses --resume when a resume session id is provided", async () => { + it("uses provided --session-id when a claude session id is provided", async () => { runCommandWithTimeoutMock.mockResolvedValueOnce({ stdout: JSON.stringify({ message: "ok", session_id: "sid-2" }), stderr: "", @@ -56,13 +70,78 @@ describe("runClaudeCliAgent", () => { model: "opus", timeoutMs: 1_000, runId: "run-2", - resumeSessionId: "sid-1", + claudeSessionId: "c9d7b831-1c31-4d22-80b9-1e50ca207d4b", }); expect(runCommandWithTimeoutMock).toHaveBeenCalledTimes(1); const argv = runCommandWithTimeoutMock.mock.calls[0]?.[0] as string[]; - expect(argv).toContain("--resume"); - expect(argv).toContain("sid-1"); - expect(argv).not.toContain("--session-id"); + expect(argv).toContain("--session-id"); + expect(argv).toContain("c9d7b831-1c31-4d22-80b9-1e50ca207d4b"); + expect(argv).toContain("hi"); + }); + + it("serializes concurrent claude-cli runs", async () => { + const firstDeferred = createDeferred<{ + stdout: string; + stderr: string; + code: number | null; + signal: NodeJS.Signals | null; + killed: boolean; + }>(); + const secondDeferred = createDeferred<{ + stdout: string; + stderr: string; + code: number | null; + signal: NodeJS.Signals | null; + killed: boolean; + }>(); + + runCommandWithTimeoutMock + .mockImplementationOnce(() => firstDeferred.promise) + .mockImplementationOnce(() => secondDeferred.promise); + + const firstRun = runClaudeCliAgent({ + sessionId: "s1", + sessionFile: "/tmp/session.jsonl", + workspaceDir: "/tmp", + prompt: "first", + model: "opus", + timeoutMs: 1_000, + runId: "run-1", + }); + + const secondRun = runClaudeCliAgent({ + sessionId: "s2", + sessionFile: "/tmp/session.jsonl", + workspaceDir: "/tmp", + prompt: "second", + model: "opus", + timeoutMs: 1_000, + runId: "run-2", + }); + + await Promise.resolve(); + expect(runCommandWithTimeoutMock).toHaveBeenCalledTimes(1); + + firstDeferred.resolve({ + stdout: JSON.stringify({ message: "ok", session_id: "sid-1" }), + stderr: "", + code: 0, + signal: null, + killed: false, + }); + + await Promise.resolve(); + expect(runCommandWithTimeoutMock).toHaveBeenCalledTimes(2); + + secondDeferred.resolve({ + stdout: JSON.stringify({ message: "ok", session_id: "sid-2" }), + stderr: "", + code: 0, + signal: null, + killed: false, + }); + + await Promise.all([firstRun, secondRun]); }); }); diff --git a/src/agents/claude-cli-runner.ts b/src/agents/claude-cli-runner.ts index 50b9081d2..cbed8c395 100644 --- a/src/agents/claude-cli-runner.ts +++ b/src/agents/claude-cli-runner.ts @@ -1,3 +1,4 @@ +import crypto from "node:crypto"; import os from "node:os"; import type { AgentTool } from "@mariozechner/pi-agent-core"; @@ -7,6 +8,7 @@ import type { ClawdbotConfig } from "../config/config.js"; import { createSubsystemLogger } from "../logging.js"; import { runCommandWithTimeout } from "../process/exec.js"; import { resolveUserPath } from "../utils.js"; +import { shouldLogVerbose } from "../globals.js"; import { buildBootstrapContextFiles, type EmbeddedContextFile, @@ -16,6 +18,20 @@ import { buildAgentSystemPrompt } from "./system-prompt.js"; import { loadWorkspaceBootstrapFiles } from "./workspace.js"; const log = createSubsystemLogger("agent/claude-cli"); +const CLAUDE_CLI_QUEUE_KEY = "global"; +const CLAUDE_CLI_RUN_QUEUE = new Map>(); + +function enqueueClaudeCliRun(key: string, task: () => Promise): Promise { + const prior = CLAUDE_CLI_RUN_QUEUE.get(key) ?? Promise.resolve(); + const chained = prior.catch(() => undefined).then(task); + const tracked = chained.finally(() => { + if (CLAUDE_CLI_RUN_QUEUE.get(key) === tracked) { + CLAUDE_CLI_RUN_QUEUE.delete(key); + } + }); + CLAUDE_CLI_RUN_QUEUE.set(key, tracked); + return chained; +} type ClaudeCliUsage = { input?: number; @@ -31,6 +47,15 @@ type ClaudeCliOutput = { usage?: ClaudeCliUsage; }; +const UUID_RE = + /^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$/i; + +function normalizeClaudeSessionId(raw?: string): string { + const trimmed = raw?.trim(); + if (trimmed && UUID_RE.test(trimmed)) return trimmed; + return crypto.randomUUID(); +} + function resolveUserTimezone(configured?: string): string { const trimmed = configured?.trim(); if (trimmed) { @@ -207,7 +232,7 @@ async function runClaudeCliOnce(params: { modelId: string; systemPrompt: string; timeoutMs: number; - resumeSessionId?: string; + sessionId: string; }): Promise { const args = [ "-p", @@ -218,28 +243,74 @@ async function runClaudeCliOnce(params: { "--append-system-prompt", params.systemPrompt, "--dangerously-skip-permissions", - "--permission-mode", - "dontAsk", - "--tools", - "", + "--session-id", + params.sessionId, ]; - if (params.resumeSessionId) { - args.push("--resume", params.resumeSessionId); - } args.push(params.prompt); + log.info( + `claude-cli exec: model=${normalizeClaudeCliModel(params.modelId)} promptChars=${params.prompt.length} systemPromptChars=${params.systemPrompt.length}`, + ); + if (process.env.CLAWDBOT_CLAUDE_CLI_LOG_OUTPUT === "1") { + const logArgs: string[] = []; + for (let i = 0; i < args.length; i += 1) { + const arg = args[i]; + if (arg === "--append-system-prompt") { + logArgs.push(arg, ``); + i += 1; + continue; + } + if (arg === "--session-id") { + logArgs.push(arg, args[i + 1] ?? ""); + i += 1; + continue; + } + logArgs.push(arg); + } + const promptIndex = logArgs.indexOf(params.prompt); + if (promptIndex >= 0) { + logArgs[promptIndex] = ``; + } + log.info(`claude-cli argv: claude ${logArgs.join(" ")}`); + } + const result = await runCommandWithTimeout(["claude", ...args], { timeoutMs: params.timeoutMs, cwd: params.workspaceDir, }); + if (process.env.CLAWDBOT_CLAUDE_CLI_LOG_OUTPUT === "1") { + const stdoutDump = result.stdout.trim(); + const stderrDump = result.stderr.trim(); + if (stdoutDump) { + log.info(`claude-cli stdout:\n${stdoutDump}`); + } + if (stderrDump) { + log.info(`claude-cli stderr:\n${stderrDump}`); + } + } const stdout = result.stdout.trim(); + const logOutputText = process.env.CLAWDBOT_CLAUDE_CLI_LOG_OUTPUT === "1"; + if (shouldLogVerbose()) { + if (stdout) { + log.debug(`claude-cli stdout:\n${stdout}`); + } + if (result.stderr.trim()) { + log.debug(`claude-cli stderr:\n${result.stderr.trim()}`); + } + } if (result.code !== 0) { const err = result.stderr.trim() || stdout || "Claude CLI failed."; throw new Error(err); } const parsed = parseClaudeCliJson(stdout); - if (parsed) return parsed; - return { text: stdout }; + const output = parsed ?? { text: stdout }; + if (logOutputText) { + const text = output.text?.trim(); + if (text) { + log.info(`claude-cli output:\n${text}`); + } + } + return output; } export async function runClaudeCliAgent(params: { @@ -256,7 +327,7 @@ export async function runClaudeCliAgent(params: { runId: string; extraSystemPrompt?: string; ownerNumbers?: string[]; - resumeSessionId?: string; + claudeSessionId?: string; }): Promise { const started = Date.now(); const resolvedWorkspace = resolveUserPath(params.workspaceDir); @@ -285,29 +356,17 @@ export async function runClaudeCliAgent(params: { modelDisplay, }); - let output: ClaudeCliOutput; - try { - output = await runClaudeCliOnce({ + const claudeSessionId = normalizeClaudeSessionId(params.claudeSessionId); + const output = await enqueueClaudeCliRun(CLAUDE_CLI_QUEUE_KEY, () => + runClaudeCliOnce({ prompt: params.prompt, workspaceDir, modelId, systemPrompt, timeoutMs: params.timeoutMs, - resumeSessionId: params.resumeSessionId, - }); - } catch (err) { - if (!params.resumeSessionId) throw err; - log.warn( - `claude-cli resume failed for ${params.resumeSessionId}; retrying without resume`, - ); - output = await runClaudeCliOnce({ - prompt: params.prompt, - workspaceDir, - modelId, - systemPrompt, - timeoutMs: params.timeoutMs, - }); - } + sessionId: claudeSessionId, + }), + ); const text = output.text?.trim(); const payloads = text ? [{ text }] : undefined; @@ -317,7 +376,7 @@ export async function runClaudeCliAgent(params: { meta: { durationMs: Date.now() - started, agentMeta: { - sessionId: output.sessionId ?? params.sessionId, + sessionId: output.sessionId ?? claudeSessionId, provider: params.provider ?? "claude-cli", model: modelId, usage: output.usage, diff --git a/src/auto-reply/reply/agent-runner.ts b/src/auto-reply/reply/agent-runner.ts index 068cb2259..eca41f080 100644 --- a/src/auto-reply/reply/agent-runner.ts +++ b/src/auto-reply/reply/agent-runner.ts @@ -352,9 +352,9 @@ export async function runReplyAgent(params: { runId, extraSystemPrompt: followupRun.run.extraSystemPrompt, ownerNumbers: followupRun.run.ownerNumbers, - resumeSessionId: - sessionEntry?.claudeCliSessionId?.trim() || undefined, - }) + claudeSessionId: + sessionEntry?.claudeCliSessionId?.trim() || undefined, + }) .then((result) => { emitAgentEvent({ runId, diff --git a/src/cli/gateway-cli.ts b/src/cli/gateway-cli.ts index 5222af2b4..449b9235e 100644 --- a/src/cli/gateway-cli.ts +++ b/src/cli/gateway-cli.ts @@ -24,7 +24,7 @@ import { import { setVerbose } from "../globals.js"; import { GatewayLockError } from "../infra/gateway-lock.js"; import { formatPortDiagnostics, inspectPortUsage } from "../infra/ports.js"; -import { createSubsystemLogger } from "../logging.js"; +import { createSubsystemLogger, setConsoleSubsystemFilter } from "../logging.js"; import { defaultRuntime } from "../runtime.js"; import { forceFreePortAndWait } from "./ports.js"; import { withProgress } from "./progress.js"; @@ -48,6 +48,7 @@ type GatewayRunOpts = { allowUnconfigured?: boolean; force?: boolean; verbose?: boolean; + claudeCliLogs?: boolean; wsLog?: unknown; compact?: boolean; rawStream?: boolean; @@ -286,6 +287,10 @@ async function runGatewayCommand( } setVerbose(Boolean(opts.verbose)); + if (opts.claudeCliLogs) { + setConsoleSubsystemFilter(["agent/claude-cli"]); + process.env.CLAWDBOT_CLAUDE_CLI_LOG_OUTPUT = "1"; + } const wsLogRaw = (opts.compact ? "compact" : opts.wsLog) as | string | undefined; @@ -569,6 +574,11 @@ function addGatewayRunCommand( false, ) .option("--verbose", "Verbose logging to stdout/stderr", false) + .option( + "--claude-cli-logs", + "Only show claude-cli logs in the console (includes stdout/stderr)", + false, + ) .option( "--ws-log