diff --git a/src/agents/bash-tools.exec.ts b/src/agents/bash-tools.exec.ts index d72524461..b9de81872 100644 --- a/src/agents/bash-tools.exec.ts +++ b/src/agents/bash-tools.exec.ts @@ -1,5 +1,5 @@ import crypto from "node:crypto"; -import { spawn, type ChildProcessWithoutNullStreams } from "node:child_process"; +import type { ChildProcessWithoutNullStreams } from "node:child_process"; import path from "node:path"; import type { AgentTool, AgentToolResult } from "@mariozechner/pi-agent-core"; import { Type } from "@sinclair/typebox"; @@ -27,6 +27,7 @@ import { } from "../infra/shell-env.js"; import { enqueueSystemEvent } from "../infra/system-events.js"; import { logInfo, logWarn } from "../logger.js"; +import { formatSpawnError, spawnWithFallback } from "../process/spawn-utils.js"; import { type ProcessSession, type SessionStdin, @@ -362,23 +363,38 @@ async function runExecProcess(opts: { let stdin: SessionStdin | undefined; if (opts.sandbox) { - child = spawn( - "docker", - buildDockerExecArgs({ - containerName: opts.sandbox.containerName, - command: opts.command, - workdir: opts.containerWorkdir ?? opts.sandbox.containerWorkdir, - env: opts.env, - tty: opts.usePty, - }), - { + const { child: spawned } = await spawnWithFallback({ + argv: [ + "docker", + ...buildDockerExecArgs({ + containerName: opts.sandbox.containerName, + command: opts.command, + workdir: opts.containerWorkdir ?? opts.sandbox.containerWorkdir, + env: opts.env, + tty: opts.usePty, + }), + ], + options: { cwd: opts.workdir, env: process.env, detached: process.platform !== "win32", stdio: ["pipe", "pipe", "pipe"], windowsHide: true, }, - ) as ChildProcessWithoutNullStreams; + fallbacks: [ + { + label: "no-detach", + options: { detached: false }, + }, + ], + onFallback: (err, fallback) => { + const errText = formatSpawnError(err); + const warning = `Warning: spawn failed (${errText}); retrying with ${fallback.label}.`; + logWarn(`exec: spawn failed (${errText}); retrying with ${fallback.label}.`); + opts.warnings.push(warning); + }, + }); + child = spawned as ChildProcessWithoutNullStreams; stdin = child.stdin; } else if (opts.usePty) { const { shell, args: shellArgs } = getShellConfig(); @@ -422,24 +438,56 @@ async function runExecProcess(opts: { const warning = `Warning: PTY spawn failed (${errText}); retrying without PTY for \`${opts.command}\`.`; logWarn(`exec: PTY spawn failed (${errText}); retrying without PTY for "${opts.command}".`); opts.warnings.push(warning); - child = spawn(shell, [...shellArgs, opts.command], { + const { child: spawned } = await spawnWithFallback({ + argv: [shell, ...shellArgs, opts.command], + options: { + cwd: opts.workdir, + env: opts.env, + detached: process.platform !== "win32", + stdio: ["pipe", "pipe", "pipe"], + windowsHide: true, + }, + fallbacks: [ + { + label: "no-detach", + options: { detached: false }, + }, + ], + onFallback: (fallbackErr, fallback) => { + const fallbackText = formatSpawnError(fallbackErr); + const fallbackWarning = `Warning: spawn failed (${fallbackText}); retrying with ${fallback.label}.`; + logWarn(`exec: spawn failed (${fallbackText}); retrying with ${fallback.label}.`); + opts.warnings.push(fallbackWarning); + }, + }); + child = spawned as ChildProcessWithoutNullStreams; + stdin = child.stdin; + } + } else { + const { shell, args: shellArgs } = getShellConfig(); + const { child: spawned } = await spawnWithFallback({ + argv: [shell, ...shellArgs, opts.command], + options: { cwd: opts.workdir, env: opts.env, detached: process.platform !== "win32", stdio: ["pipe", "pipe", "pipe"], windowsHide: true, - }) as ChildProcessWithoutNullStreams; - stdin = child.stdin; - } - } else { - const { shell, args: shellArgs } = getShellConfig(); - child = spawn(shell, [...shellArgs, opts.command], { - cwd: opts.workdir, - env: opts.env, - detached: process.platform !== "win32", - stdio: ["pipe", "pipe", "pipe"], - windowsHide: true, - }) as ChildProcessWithoutNullStreams; + }, + fallbacks: [ + { + label: "no-detach", + options: { detached: false }, + }, + ], + onFallback: (err, fallback) => { + const errText = formatSpawnError(err); + const warning = `Warning: spawn failed (${errText}); retrying with ${fallback.label}.`; + logWarn(`exec: spawn failed (${errText}); retrying with ${fallback.label}.`); + opts.warnings.push(warning); + }, + }); + child = spawned as ChildProcessWithoutNullStreams; stdin = child.stdin; } diff --git a/src/process/exec.ts b/src/process/exec.ts index 103612ffc..44f8b2ce0 100644 --- a/src/process/exec.ts +++ b/src/process/exec.ts @@ -4,6 +4,7 @@ import { promisify } from "node:util"; import { danger, shouldLogVerbose } from "../globals.js"; import { logDebug, logError } from "../logger.js"; +import { resolveCommandStdio } from "./spawn-utils.js"; const execFileAsync = promisify(execFile); @@ -78,19 +79,22 @@ export async function runCommandWithTimeout( if (resolvedEnv.npm_config_fund == null) resolvedEnv.npm_config_fund = "false"; } + const stdio = resolveCommandStdio({ hasInput, preferInherit: true }); + const child = spawn(argv[0], argv.slice(1), { + stdio, + cwd, + env: resolvedEnv, + windowsVerbatimArguments, + }); // Spawn with inherited stdin (TTY) so tools like `pi` stay interactive when needed. return await new Promise((resolve, reject) => { - const child = spawn(argv[0], argv.slice(1), { - stdio: [hasInput ? "pipe" : "inherit", "pipe", "pipe"], - cwd, - env: resolvedEnv, - windowsVerbatimArguments, - }); let stdout = ""; let stderr = ""; let settled = false; const timer = setTimeout(() => { - child.kill("SIGKILL"); + if (typeof child.kill === "function") { + child.kill("SIGKILL"); + } }, timeoutMs); if (hasInput && child.stdin) { diff --git a/src/process/spawn-utils.test.ts b/src/process/spawn-utils.test.ts new file mode 100644 index 000000000..d290d5938 --- /dev/null +++ b/src/process/spawn-utils.test.ts @@ -0,0 +1,64 @@ +import { EventEmitter } from "node:events"; +import { PassThrough } from "node:stream"; +import type { ChildProcess } from "node:child_process"; +import { describe, expect, it, vi } from "vitest"; + +import { spawnWithFallback } from "./spawn-utils.js"; + +function createStubChild() { + const child = new EventEmitter() as ChildProcess; + child.stdin = new PassThrough() as ChildProcess["stdin"]; + child.stdout = new PassThrough() as ChildProcess["stdout"]; + child.stderr = new PassThrough() as ChildProcess["stderr"]; + child.pid = 1234; + child.killed = false; + child.kill = vi.fn(() => true) as ChildProcess["kill"]; + queueMicrotask(() => { + child.emit("spawn"); + }); + return child; +} + +describe("spawnWithFallback", () => { + it("retries on EBADF using fallback options", async () => { + const spawnMock = vi + .fn() + .mockImplementationOnce(() => { + const err = new Error("spawn EBADF"); + (err as NodeJS.ErrnoException).code = "EBADF"; + throw err; + }) + .mockImplementationOnce(() => createStubChild()); + + const result = await spawnWithFallback({ + argv: ["echo", "ok"], + options: { stdio: ["pipe", "pipe", "pipe"] }, + fallbacks: [{ label: "safe-stdin", options: { stdio: ["ignore", "pipe", "pipe"] } }], + spawnImpl: spawnMock, + }); + + expect(result.usedFallback).toBe(true); + expect(result.fallbackLabel).toBe("safe-stdin"); + expect(spawnMock).toHaveBeenCalledTimes(2); + expect(spawnMock.mock.calls[0]?.[2]?.stdio).toEqual(["pipe", "pipe", "pipe"]); + expect(spawnMock.mock.calls[1]?.[2]?.stdio).toEqual(["ignore", "pipe", "pipe"]); + }); + + it("does not retry on non-EBADF errors", async () => { + const spawnMock = vi.fn().mockImplementationOnce(() => { + const err = new Error("spawn ENOENT"); + (err as NodeJS.ErrnoException).code = "ENOENT"; + throw err; + }); + + await expect( + spawnWithFallback({ + argv: ["missing"], + options: { stdio: ["pipe", "pipe", "pipe"] }, + fallbacks: [{ label: "safe-stdin", options: { stdio: ["ignore", "pipe", "pipe"] } }], + spawnImpl: spawnMock, + }), + ).rejects.toThrow(/ENOENT/); + expect(spawnMock).toHaveBeenCalledTimes(1); + }); +}); diff --git a/src/process/spawn-utils.ts b/src/process/spawn-utils.ts new file mode 100644 index 000000000..2d4604432 --- /dev/null +++ b/src/process/spawn-utils.ts @@ -0,0 +1,127 @@ +import type { ChildProcess, SpawnOptions } from "node:child_process"; +import { spawn } from "node:child_process"; + +export type SpawnFallback = { + label: string; + options: SpawnOptions; +}; + +export type SpawnWithFallbackResult = { + child: ChildProcess; + usedFallback: boolean; + fallbackLabel?: string; +}; + +type SpawnWithFallbackParams = { + argv: string[]; + options: SpawnOptions; + fallbacks?: SpawnFallback[]; + spawnImpl?: typeof spawn; + retryCodes?: string[]; + onFallback?: (err: unknown, fallback: SpawnFallback) => void; +}; + +const DEFAULT_RETRY_CODES = ["EBADF"]; + +export function resolveCommandStdio(params: { + hasInput: boolean; + preferInherit: boolean; +}): ["pipe" | "inherit" | "ignore", "pipe", "pipe"] { + const stdin = params.hasInput ? "pipe" : params.preferInherit ? "inherit" : "pipe"; + return [stdin, "pipe", "pipe"]; +} + +export function formatSpawnError(err: unknown): string { + if (!(err instanceof Error)) return String(err); + const details = err as NodeJS.ErrnoException; + const parts: string[] = []; + const message = err.message?.trim(); + if (message) parts.push(message); + if (details.code && !message?.includes(details.code)) parts.push(details.code); + if (details.syscall) parts.push(`syscall=${details.syscall}`); + if (typeof details.errno === "number") parts.push(`errno=${details.errno}`); + return parts.join(" "); +} + +function shouldRetry(err: unknown, codes: string[]): boolean { + const code = + err && typeof err === "object" && "code" in err ? String((err as { code?: unknown }).code) : ""; + return code.length > 0 && codes.includes(code); +} + +async function spawnAndWaitForSpawn( + spawnImpl: typeof spawn, + argv: string[], + options: SpawnOptions, +): Promise { + const child = spawnImpl(argv[0], argv.slice(1), options); + + return await new Promise((resolve, reject) => { + let settled = false; + const cleanup = () => { + child.removeListener("error", onError); + child.removeListener("spawn", onSpawn); + }; + const finishResolve = () => { + if (settled) return; + settled = true; + cleanup(); + resolve(child); + }; + const onError = (err: unknown) => { + if (settled) return; + settled = true; + cleanup(); + reject(err); + }; + const onSpawn = () => { + finishResolve(); + }; + child.once("error", onError); + child.once("spawn", onSpawn); + // Ensure mocked spawns that never emit "spawn" don't stall. + process.nextTick(() => { + if (typeof child.pid === "number") { + finishResolve(); + } + }); + }); +} + +export async function spawnWithFallback( + params: SpawnWithFallbackParams, +): Promise { + const spawnImpl = params.spawnImpl ?? spawn; + const retryCodes = params.retryCodes ?? DEFAULT_RETRY_CODES; + const baseOptions = { ...params.options }; + const fallbacks = params.fallbacks ?? []; + const attempts: Array<{ label?: string; options: SpawnOptions }> = [ + { options: baseOptions }, + ...fallbacks.map((fallback) => ({ + label: fallback.label, + options: { ...baseOptions, ...fallback.options }, + })), + ]; + + let lastError: unknown; + for (let index = 0; index < attempts.length; index += 1) { + const attempt = attempts[index]; + try { + const child = await spawnAndWaitForSpawn(spawnImpl, params.argv, attempt.options); + return { + child, + usedFallback: index > 0, + fallbackLabel: attempt.label, + }; + } catch (err) { + lastError = err; + const nextFallback = fallbacks[index]; + if (!nextFallback || !shouldRetry(err, retryCodes)) { + throw err; + } + params.onFallback?.(err, nextFallback); + } + } + + throw lastError; +}