fix: harden exec spawn fallback
This commit is contained in:
@@ -1,5 +1,5 @@
|
||||
import crypto from "node:crypto";
|
||||
import { spawn, type ChildProcessWithoutNullStreams } from "node:child_process";
|
||||
import type { ChildProcessWithoutNullStreams } from "node:child_process";
|
||||
import path from "node:path";
|
||||
import type { AgentTool, AgentToolResult } from "@mariozechner/pi-agent-core";
|
||||
import { Type } from "@sinclair/typebox";
|
||||
@@ -27,6 +27,7 @@ import {
|
||||
} from "../infra/shell-env.js";
|
||||
import { enqueueSystemEvent } from "../infra/system-events.js";
|
||||
import { logInfo, logWarn } from "../logger.js";
|
||||
import { formatSpawnError, spawnWithFallback } from "../process/spawn-utils.js";
|
||||
import {
|
||||
type ProcessSession,
|
||||
type SessionStdin,
|
||||
@@ -362,23 +363,38 @@ async function runExecProcess(opts: {
|
||||
let stdin: SessionStdin | undefined;
|
||||
|
||||
if (opts.sandbox) {
|
||||
child = spawn(
|
||||
"docker",
|
||||
buildDockerExecArgs({
|
||||
containerName: opts.sandbox.containerName,
|
||||
command: opts.command,
|
||||
workdir: opts.containerWorkdir ?? opts.sandbox.containerWorkdir,
|
||||
env: opts.env,
|
||||
tty: opts.usePty,
|
||||
}),
|
||||
{
|
||||
const { child: spawned } = await spawnWithFallback({
|
||||
argv: [
|
||||
"docker",
|
||||
...buildDockerExecArgs({
|
||||
containerName: opts.sandbox.containerName,
|
||||
command: opts.command,
|
||||
workdir: opts.containerWorkdir ?? opts.sandbox.containerWorkdir,
|
||||
env: opts.env,
|
||||
tty: opts.usePty,
|
||||
}),
|
||||
],
|
||||
options: {
|
||||
cwd: opts.workdir,
|
||||
env: process.env,
|
||||
detached: process.platform !== "win32",
|
||||
stdio: ["pipe", "pipe", "pipe"],
|
||||
windowsHide: true,
|
||||
},
|
||||
) as ChildProcessWithoutNullStreams;
|
||||
fallbacks: [
|
||||
{
|
||||
label: "no-detach",
|
||||
options: { detached: false },
|
||||
},
|
||||
],
|
||||
onFallback: (err, fallback) => {
|
||||
const errText = formatSpawnError(err);
|
||||
const warning = `Warning: spawn failed (${errText}); retrying with ${fallback.label}.`;
|
||||
logWarn(`exec: spawn failed (${errText}); retrying with ${fallback.label}.`);
|
||||
opts.warnings.push(warning);
|
||||
},
|
||||
});
|
||||
child = spawned as ChildProcessWithoutNullStreams;
|
||||
stdin = child.stdin;
|
||||
} else if (opts.usePty) {
|
||||
const { shell, args: shellArgs } = getShellConfig();
|
||||
@@ -422,24 +438,56 @@ async function runExecProcess(opts: {
|
||||
const warning = `Warning: PTY spawn failed (${errText}); retrying without PTY for \`${opts.command}\`.`;
|
||||
logWarn(`exec: PTY spawn failed (${errText}); retrying without PTY for "${opts.command}".`);
|
||||
opts.warnings.push(warning);
|
||||
child = spawn(shell, [...shellArgs, opts.command], {
|
||||
const { child: spawned } = await spawnWithFallback({
|
||||
argv: [shell, ...shellArgs, opts.command],
|
||||
options: {
|
||||
cwd: opts.workdir,
|
||||
env: opts.env,
|
||||
detached: process.platform !== "win32",
|
||||
stdio: ["pipe", "pipe", "pipe"],
|
||||
windowsHide: true,
|
||||
},
|
||||
fallbacks: [
|
||||
{
|
||||
label: "no-detach",
|
||||
options: { detached: false },
|
||||
},
|
||||
],
|
||||
onFallback: (fallbackErr, fallback) => {
|
||||
const fallbackText = formatSpawnError(fallbackErr);
|
||||
const fallbackWarning = `Warning: spawn failed (${fallbackText}); retrying with ${fallback.label}.`;
|
||||
logWarn(`exec: spawn failed (${fallbackText}); retrying with ${fallback.label}.`);
|
||||
opts.warnings.push(fallbackWarning);
|
||||
},
|
||||
});
|
||||
child = spawned as ChildProcessWithoutNullStreams;
|
||||
stdin = child.stdin;
|
||||
}
|
||||
} else {
|
||||
const { shell, args: shellArgs } = getShellConfig();
|
||||
const { child: spawned } = await spawnWithFallback({
|
||||
argv: [shell, ...shellArgs, opts.command],
|
||||
options: {
|
||||
cwd: opts.workdir,
|
||||
env: opts.env,
|
||||
detached: process.platform !== "win32",
|
||||
stdio: ["pipe", "pipe", "pipe"],
|
||||
windowsHide: true,
|
||||
}) as ChildProcessWithoutNullStreams;
|
||||
stdin = child.stdin;
|
||||
}
|
||||
} else {
|
||||
const { shell, args: shellArgs } = getShellConfig();
|
||||
child = spawn(shell, [...shellArgs, opts.command], {
|
||||
cwd: opts.workdir,
|
||||
env: opts.env,
|
||||
detached: process.platform !== "win32",
|
||||
stdio: ["pipe", "pipe", "pipe"],
|
||||
windowsHide: true,
|
||||
}) as ChildProcessWithoutNullStreams;
|
||||
},
|
||||
fallbacks: [
|
||||
{
|
||||
label: "no-detach",
|
||||
options: { detached: false },
|
||||
},
|
||||
],
|
||||
onFallback: (err, fallback) => {
|
||||
const errText = formatSpawnError(err);
|
||||
const warning = `Warning: spawn failed (${errText}); retrying with ${fallback.label}.`;
|
||||
logWarn(`exec: spawn failed (${errText}); retrying with ${fallback.label}.`);
|
||||
opts.warnings.push(warning);
|
||||
},
|
||||
});
|
||||
child = spawned as ChildProcessWithoutNullStreams;
|
||||
stdin = child.stdin;
|
||||
}
|
||||
|
||||
|
||||
@@ -4,6 +4,7 @@ import { promisify } from "node:util";
|
||||
|
||||
import { danger, shouldLogVerbose } from "../globals.js";
|
||||
import { logDebug, logError } from "../logger.js";
|
||||
import { resolveCommandStdio } from "./spawn-utils.js";
|
||||
|
||||
const execFileAsync = promisify(execFile);
|
||||
|
||||
@@ -78,19 +79,22 @@ export async function runCommandWithTimeout(
|
||||
if (resolvedEnv.npm_config_fund == null) resolvedEnv.npm_config_fund = "false";
|
||||
}
|
||||
|
||||
const stdio = resolveCommandStdio({ hasInput, preferInherit: true });
|
||||
const child = spawn(argv[0], argv.slice(1), {
|
||||
stdio,
|
||||
cwd,
|
||||
env: resolvedEnv,
|
||||
windowsVerbatimArguments,
|
||||
});
|
||||
// Spawn with inherited stdin (TTY) so tools like `pi` stay interactive when needed.
|
||||
return await new Promise((resolve, reject) => {
|
||||
const child = spawn(argv[0], argv.slice(1), {
|
||||
stdio: [hasInput ? "pipe" : "inherit", "pipe", "pipe"],
|
||||
cwd,
|
||||
env: resolvedEnv,
|
||||
windowsVerbatimArguments,
|
||||
});
|
||||
let stdout = "";
|
||||
let stderr = "";
|
||||
let settled = false;
|
||||
const timer = setTimeout(() => {
|
||||
child.kill("SIGKILL");
|
||||
if (typeof child.kill === "function") {
|
||||
child.kill("SIGKILL");
|
||||
}
|
||||
}, timeoutMs);
|
||||
|
||||
if (hasInput && child.stdin) {
|
||||
|
||||
64
src/process/spawn-utils.test.ts
Normal file
64
src/process/spawn-utils.test.ts
Normal file
@@ -0,0 +1,64 @@
|
||||
import { EventEmitter } from "node:events";
|
||||
import { PassThrough } from "node:stream";
|
||||
import type { ChildProcess } from "node:child_process";
|
||||
import { describe, expect, it, vi } from "vitest";
|
||||
|
||||
import { spawnWithFallback } from "./spawn-utils.js";
|
||||
|
||||
function createStubChild() {
|
||||
const child = new EventEmitter() as ChildProcess;
|
||||
child.stdin = new PassThrough() as ChildProcess["stdin"];
|
||||
child.stdout = new PassThrough() as ChildProcess["stdout"];
|
||||
child.stderr = new PassThrough() as ChildProcess["stderr"];
|
||||
child.pid = 1234;
|
||||
child.killed = false;
|
||||
child.kill = vi.fn(() => true) as ChildProcess["kill"];
|
||||
queueMicrotask(() => {
|
||||
child.emit("spawn");
|
||||
});
|
||||
return child;
|
||||
}
|
||||
|
||||
describe("spawnWithFallback", () => {
|
||||
it("retries on EBADF using fallback options", async () => {
|
||||
const spawnMock = vi
|
||||
.fn()
|
||||
.mockImplementationOnce(() => {
|
||||
const err = new Error("spawn EBADF");
|
||||
(err as NodeJS.ErrnoException).code = "EBADF";
|
||||
throw err;
|
||||
})
|
||||
.mockImplementationOnce(() => createStubChild());
|
||||
|
||||
const result = await spawnWithFallback({
|
||||
argv: ["echo", "ok"],
|
||||
options: { stdio: ["pipe", "pipe", "pipe"] },
|
||||
fallbacks: [{ label: "safe-stdin", options: { stdio: ["ignore", "pipe", "pipe"] } }],
|
||||
spawnImpl: spawnMock,
|
||||
});
|
||||
|
||||
expect(result.usedFallback).toBe(true);
|
||||
expect(result.fallbackLabel).toBe("safe-stdin");
|
||||
expect(spawnMock).toHaveBeenCalledTimes(2);
|
||||
expect(spawnMock.mock.calls[0]?.[2]?.stdio).toEqual(["pipe", "pipe", "pipe"]);
|
||||
expect(spawnMock.mock.calls[1]?.[2]?.stdio).toEqual(["ignore", "pipe", "pipe"]);
|
||||
});
|
||||
|
||||
it("does not retry on non-EBADF errors", async () => {
|
||||
const spawnMock = vi.fn().mockImplementationOnce(() => {
|
||||
const err = new Error("spawn ENOENT");
|
||||
(err as NodeJS.ErrnoException).code = "ENOENT";
|
||||
throw err;
|
||||
});
|
||||
|
||||
await expect(
|
||||
spawnWithFallback({
|
||||
argv: ["missing"],
|
||||
options: { stdio: ["pipe", "pipe", "pipe"] },
|
||||
fallbacks: [{ label: "safe-stdin", options: { stdio: ["ignore", "pipe", "pipe"] } }],
|
||||
spawnImpl: spawnMock,
|
||||
}),
|
||||
).rejects.toThrow(/ENOENT/);
|
||||
expect(spawnMock).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
});
|
||||
127
src/process/spawn-utils.ts
Normal file
127
src/process/spawn-utils.ts
Normal file
@@ -0,0 +1,127 @@
|
||||
import type { ChildProcess, SpawnOptions } from "node:child_process";
|
||||
import { spawn } from "node:child_process";
|
||||
|
||||
export type SpawnFallback = {
|
||||
label: string;
|
||||
options: SpawnOptions;
|
||||
};
|
||||
|
||||
export type SpawnWithFallbackResult = {
|
||||
child: ChildProcess;
|
||||
usedFallback: boolean;
|
||||
fallbackLabel?: string;
|
||||
};
|
||||
|
||||
type SpawnWithFallbackParams = {
|
||||
argv: string[];
|
||||
options: SpawnOptions;
|
||||
fallbacks?: SpawnFallback[];
|
||||
spawnImpl?: typeof spawn;
|
||||
retryCodes?: string[];
|
||||
onFallback?: (err: unknown, fallback: SpawnFallback) => void;
|
||||
};
|
||||
|
||||
const DEFAULT_RETRY_CODES = ["EBADF"];
|
||||
|
||||
export function resolveCommandStdio(params: {
|
||||
hasInput: boolean;
|
||||
preferInherit: boolean;
|
||||
}): ["pipe" | "inherit" | "ignore", "pipe", "pipe"] {
|
||||
const stdin = params.hasInput ? "pipe" : params.preferInherit ? "inherit" : "pipe";
|
||||
return [stdin, "pipe", "pipe"];
|
||||
}
|
||||
|
||||
export function formatSpawnError(err: unknown): string {
|
||||
if (!(err instanceof Error)) return String(err);
|
||||
const details = err as NodeJS.ErrnoException;
|
||||
const parts: string[] = [];
|
||||
const message = err.message?.trim();
|
||||
if (message) parts.push(message);
|
||||
if (details.code && !message?.includes(details.code)) parts.push(details.code);
|
||||
if (details.syscall) parts.push(`syscall=${details.syscall}`);
|
||||
if (typeof details.errno === "number") parts.push(`errno=${details.errno}`);
|
||||
return parts.join(" ");
|
||||
}
|
||||
|
||||
function shouldRetry(err: unknown, codes: string[]): boolean {
|
||||
const code =
|
||||
err && typeof err === "object" && "code" in err ? String((err as { code?: unknown }).code) : "";
|
||||
return code.length > 0 && codes.includes(code);
|
||||
}
|
||||
|
||||
async function spawnAndWaitForSpawn(
|
||||
spawnImpl: typeof spawn,
|
||||
argv: string[],
|
||||
options: SpawnOptions,
|
||||
): Promise<ChildProcess> {
|
||||
const child = spawnImpl(argv[0], argv.slice(1), options);
|
||||
|
||||
return await new Promise((resolve, reject) => {
|
||||
let settled = false;
|
||||
const cleanup = () => {
|
||||
child.removeListener("error", onError);
|
||||
child.removeListener("spawn", onSpawn);
|
||||
};
|
||||
const finishResolve = () => {
|
||||
if (settled) return;
|
||||
settled = true;
|
||||
cleanup();
|
||||
resolve(child);
|
||||
};
|
||||
const onError = (err: unknown) => {
|
||||
if (settled) return;
|
||||
settled = true;
|
||||
cleanup();
|
||||
reject(err);
|
||||
};
|
||||
const onSpawn = () => {
|
||||
finishResolve();
|
||||
};
|
||||
child.once("error", onError);
|
||||
child.once("spawn", onSpawn);
|
||||
// Ensure mocked spawns that never emit "spawn" don't stall.
|
||||
process.nextTick(() => {
|
||||
if (typeof child.pid === "number") {
|
||||
finishResolve();
|
||||
}
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
export async function spawnWithFallback(
|
||||
params: SpawnWithFallbackParams,
|
||||
): Promise<SpawnWithFallbackResult> {
|
||||
const spawnImpl = params.spawnImpl ?? spawn;
|
||||
const retryCodes = params.retryCodes ?? DEFAULT_RETRY_CODES;
|
||||
const baseOptions = { ...params.options };
|
||||
const fallbacks = params.fallbacks ?? [];
|
||||
const attempts: Array<{ label?: string; options: SpawnOptions }> = [
|
||||
{ options: baseOptions },
|
||||
...fallbacks.map((fallback) => ({
|
||||
label: fallback.label,
|
||||
options: { ...baseOptions, ...fallback.options },
|
||||
})),
|
||||
];
|
||||
|
||||
let lastError: unknown;
|
||||
for (let index = 0; index < attempts.length; index += 1) {
|
||||
const attempt = attempts[index];
|
||||
try {
|
||||
const child = await spawnAndWaitForSpawn(spawnImpl, params.argv, attempt.options);
|
||||
return {
|
||||
child,
|
||||
usedFallback: index > 0,
|
||||
fallbackLabel: attempt.label,
|
||||
};
|
||||
} catch (err) {
|
||||
lastError = err;
|
||||
const nextFallback = fallbacks[index];
|
||||
if (!nextFallback || !shouldRetry(err, retryCodes)) {
|
||||
throw err;
|
||||
}
|
||||
params.onFallback?.(err, nextFallback);
|
||||
}
|
||||
}
|
||||
|
||||
throw lastError;
|
||||
}
|
||||
Reference in New Issue
Block a user