fix: cap pending process output
This commit is contained in:
@@ -33,6 +33,7 @@ When spawning long-running child processes outside the exec/process tools (for e
|
|||||||
Environment overrides:
|
Environment overrides:
|
||||||
- `PI_BASH_YIELD_MS`: default yield (ms)
|
- `PI_BASH_YIELD_MS`: default yield (ms)
|
||||||
- `PI_BASH_MAX_OUTPUT_CHARS`: in‑memory output cap (chars)
|
- `PI_BASH_MAX_OUTPUT_CHARS`: in‑memory output cap (chars)
|
||||||
|
- `CLAWDBOT_BASH_PENDING_MAX_OUTPUT_CHARS`: pending stdout/stderr cap per stream (chars)
|
||||||
- `PI_BASH_JOB_TTL_MS`: TTL for finished sessions (ms, bounded to 1m–3h)
|
- `PI_BASH_JOB_TTL_MS`: TTL for finished sessions (ms, bounded to 1m–3h)
|
||||||
|
|
||||||
Config (preferred):
|
Config (preferred):
|
||||||
|
|||||||
@@ -4,6 +4,7 @@ import type { ProcessSession } from "./bash-process-registry.js";
|
|||||||
import {
|
import {
|
||||||
addSession,
|
addSession,
|
||||||
appendOutput,
|
appendOutput,
|
||||||
|
drainSession,
|
||||||
listFinishedSessions,
|
listFinishedSessions,
|
||||||
markBackgrounded,
|
markBackgrounded,
|
||||||
markExited,
|
markExited,
|
||||||
@@ -23,9 +24,12 @@ describe("bash process registry", () => {
|
|||||||
startedAt: Date.now(),
|
startedAt: Date.now(),
|
||||||
cwd: "/tmp",
|
cwd: "/tmp",
|
||||||
maxOutputChars: 10,
|
maxOutputChars: 10,
|
||||||
|
pendingMaxOutputChars: 30_000,
|
||||||
totalOutputChars: 0,
|
totalOutputChars: 0,
|
||||||
pendingStdout: [],
|
pendingStdout: [],
|
||||||
pendingStderr: [],
|
pendingStderr: [],
|
||||||
|
pendingStdoutChars: 0,
|
||||||
|
pendingStderrChars: 0,
|
||||||
aggregated: "",
|
aggregated: "",
|
||||||
tail: "",
|
tail: "",
|
||||||
exited: false,
|
exited: false,
|
||||||
@@ -43,6 +47,105 @@ describe("bash process registry", () => {
|
|||||||
expect(session.truncated).toBe(true);
|
expect(session.truncated).toBe(true);
|
||||||
});
|
});
|
||||||
|
|
||||||
|
it("caps pending output to avoid runaway polls", () => {
|
||||||
|
const session: ProcessSession = {
|
||||||
|
id: "sess",
|
||||||
|
command: "echo test",
|
||||||
|
child: { pid: 123 } as ChildProcessWithoutNullStreams,
|
||||||
|
startedAt: Date.now(),
|
||||||
|
cwd: "/tmp",
|
||||||
|
maxOutputChars: 100_000,
|
||||||
|
pendingMaxOutputChars: 20_000,
|
||||||
|
totalOutputChars: 0,
|
||||||
|
pendingStdout: [],
|
||||||
|
pendingStderr: [],
|
||||||
|
pendingStdoutChars: 0,
|
||||||
|
pendingStderrChars: 0,
|
||||||
|
aggregated: "",
|
||||||
|
tail: "",
|
||||||
|
exited: false,
|
||||||
|
exitCode: undefined,
|
||||||
|
exitSignal: undefined,
|
||||||
|
truncated: false,
|
||||||
|
backgrounded: true,
|
||||||
|
};
|
||||||
|
|
||||||
|
addSession(session);
|
||||||
|
const payload = `${"a".repeat(70_000)}${"b".repeat(20_000)}`;
|
||||||
|
appendOutput(session, "stdout", payload);
|
||||||
|
|
||||||
|
const drained = drainSession(session);
|
||||||
|
expect(drained.stdout).toBe("b".repeat(20_000));
|
||||||
|
expect(session.pendingStdout).toHaveLength(0);
|
||||||
|
expect(session.pendingStdoutChars).toBe(0);
|
||||||
|
expect(session.truncated).toBe(true);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("respects max output cap when pending cap is larger", () => {
|
||||||
|
const session: ProcessSession = {
|
||||||
|
id: "sess",
|
||||||
|
command: "echo test",
|
||||||
|
child: { pid: 123 } as ChildProcessWithoutNullStreams,
|
||||||
|
startedAt: Date.now(),
|
||||||
|
cwd: "/tmp",
|
||||||
|
maxOutputChars: 5_000,
|
||||||
|
pendingMaxOutputChars: 30_000,
|
||||||
|
totalOutputChars: 0,
|
||||||
|
pendingStdout: [],
|
||||||
|
pendingStderr: [],
|
||||||
|
pendingStdoutChars: 0,
|
||||||
|
pendingStderrChars: 0,
|
||||||
|
aggregated: "",
|
||||||
|
tail: "",
|
||||||
|
exited: false,
|
||||||
|
exitCode: undefined,
|
||||||
|
exitSignal: undefined,
|
||||||
|
truncated: false,
|
||||||
|
backgrounded: true,
|
||||||
|
};
|
||||||
|
|
||||||
|
addSession(session);
|
||||||
|
appendOutput(session, "stdout", "x".repeat(10_000));
|
||||||
|
|
||||||
|
const drained = drainSession(session);
|
||||||
|
expect(drained.stdout.length).toBe(5_000);
|
||||||
|
expect(session.truncated).toBe(true);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("caps stdout and stderr independently", () => {
|
||||||
|
const session: ProcessSession = {
|
||||||
|
id: "sess",
|
||||||
|
command: "echo test",
|
||||||
|
child: { pid: 123 } as ChildProcessWithoutNullStreams,
|
||||||
|
startedAt: Date.now(),
|
||||||
|
cwd: "/tmp",
|
||||||
|
maxOutputChars: 100,
|
||||||
|
pendingMaxOutputChars: 10,
|
||||||
|
totalOutputChars: 0,
|
||||||
|
pendingStdout: [],
|
||||||
|
pendingStderr: [],
|
||||||
|
pendingStdoutChars: 0,
|
||||||
|
pendingStderrChars: 0,
|
||||||
|
aggregated: "",
|
||||||
|
tail: "",
|
||||||
|
exited: false,
|
||||||
|
exitCode: undefined,
|
||||||
|
exitSignal: undefined,
|
||||||
|
truncated: false,
|
||||||
|
backgrounded: true,
|
||||||
|
};
|
||||||
|
|
||||||
|
addSession(session);
|
||||||
|
appendOutput(session, "stdout", "a".repeat(6));
|
||||||
|
appendOutput(session, "stdout", "b".repeat(6));
|
||||||
|
appendOutput(session, "stderr", "c".repeat(12));
|
||||||
|
|
||||||
|
const drained = drainSession(session);
|
||||||
|
expect(drained.stdout).toBe("a".repeat(4) + "b".repeat(6));
|
||||||
|
expect(drained.stderr).toBe("c".repeat(10));
|
||||||
|
expect(session.truncated).toBe(true);
|
||||||
|
});
|
||||||
|
|
||||||
it("only persists finished sessions when backgrounded", () => {
|
it("only persists finished sessions when backgrounded", () => {
|
||||||
const session: ProcessSession = {
|
const session: ProcessSession = {
|
||||||
id: "sess",
|
id: "sess",
|
||||||
@@ -51,9 +154,12 @@ describe("bash process registry", () => {
|
|||||||
startedAt: Date.now(),
|
startedAt: Date.now(),
|
||||||
cwd: "/tmp",
|
cwd: "/tmp",
|
||||||
maxOutputChars: 100,
|
maxOutputChars: 100,
|
||||||
|
pendingMaxOutputChars: 30_000,
|
||||||
totalOutputChars: 0,
|
totalOutputChars: 0,
|
||||||
pendingStdout: [],
|
pendingStdout: [],
|
||||||
pendingStderr: [],
|
pendingStderr: [],
|
||||||
|
pendingStdoutChars: 0,
|
||||||
|
pendingStderrChars: 0,
|
||||||
aggregated: "",
|
aggregated: "",
|
||||||
tail: "",
|
tail: "",
|
||||||
exited: false,
|
exited: false,
|
||||||
|
|||||||
@@ -4,6 +4,7 @@ import { createSessionSlug as createSessionSlugId } from "./session-slug.js";
|
|||||||
const DEFAULT_JOB_TTL_MS = 30 * 60 * 1000; // 30 minutes
|
const DEFAULT_JOB_TTL_MS = 30 * 60 * 1000; // 30 minutes
|
||||||
const MIN_JOB_TTL_MS = 60 * 1000; // 1 minute
|
const MIN_JOB_TTL_MS = 60 * 1000; // 1 minute
|
||||||
const MAX_JOB_TTL_MS = 3 * 60 * 60 * 1000; // 3 hours
|
const MAX_JOB_TTL_MS = 3 * 60 * 60 * 1000; // 3 hours
|
||||||
|
const DEFAULT_PENDING_OUTPUT_CHARS = 30_000;
|
||||||
|
|
||||||
function clampTtl(value: number | undefined) {
|
function clampTtl(value: number | undefined) {
|
||||||
if (!value || Number.isNaN(value)) return DEFAULT_JOB_TTL_MS;
|
if (!value || Number.isNaN(value)) return DEFAULT_JOB_TTL_MS;
|
||||||
@@ -33,9 +34,12 @@ export interface ProcessSession {
|
|||||||
startedAt: number;
|
startedAt: number;
|
||||||
cwd?: string;
|
cwd?: string;
|
||||||
maxOutputChars: number;
|
maxOutputChars: number;
|
||||||
|
pendingMaxOutputChars?: number;
|
||||||
totalOutputChars: number;
|
totalOutputChars: number;
|
||||||
pendingStdout: string[];
|
pendingStdout: string[];
|
||||||
pendingStderr: string[];
|
pendingStderr: string[];
|
||||||
|
pendingStdoutChars: number;
|
||||||
|
pendingStderrChars: number;
|
||||||
aggregated: string;
|
aggregated: string;
|
||||||
tail: string;
|
tail: string;
|
||||||
exitCode?: number | null;
|
exitCode?: number | null;
|
||||||
@@ -95,8 +99,25 @@ export function deleteSession(id: string) {
|
|||||||
export function appendOutput(session: ProcessSession, stream: "stdout" | "stderr", chunk: string) {
|
export function appendOutput(session: ProcessSession, stream: "stdout" | "stderr", chunk: string) {
|
||||||
session.pendingStdout ??= [];
|
session.pendingStdout ??= [];
|
||||||
session.pendingStderr ??= [];
|
session.pendingStderr ??= [];
|
||||||
|
session.pendingStdoutChars ??= sumPendingChars(session.pendingStdout);
|
||||||
|
session.pendingStderrChars ??= sumPendingChars(session.pendingStderr);
|
||||||
const buffer = stream === "stdout" ? session.pendingStdout : session.pendingStderr;
|
const buffer = stream === "stdout" ? session.pendingStdout : session.pendingStderr;
|
||||||
|
const bufferChars = stream === "stdout" ? session.pendingStdoutChars : session.pendingStderrChars;
|
||||||
|
const pendingCap = Math.min(
|
||||||
|
session.pendingMaxOutputChars ?? DEFAULT_PENDING_OUTPUT_CHARS,
|
||||||
|
session.maxOutputChars,
|
||||||
|
);
|
||||||
buffer.push(chunk);
|
buffer.push(chunk);
|
||||||
|
let pendingChars = bufferChars + chunk.length;
|
||||||
|
if (pendingChars > pendingCap) {
|
||||||
|
session.truncated = true;
|
||||||
|
pendingChars = capPendingBuffer(buffer, pendingChars, pendingCap);
|
||||||
|
}
|
||||||
|
if (stream === "stdout") {
|
||||||
|
session.pendingStdoutChars = pendingChars;
|
||||||
|
} else {
|
||||||
|
session.pendingStderrChars = pendingChars;
|
||||||
|
}
|
||||||
session.totalOutputChars += chunk.length;
|
session.totalOutputChars += chunk.length;
|
||||||
const aggregated = trimWithCap(session.aggregated + chunk, session.maxOutputChars);
|
const aggregated = trimWithCap(session.aggregated + chunk, session.maxOutputChars);
|
||||||
session.truncated =
|
session.truncated =
|
||||||
@@ -110,6 +131,8 @@ export function drainSession(session: ProcessSession) {
|
|||||||
const stderr = session.pendingStderr.join("");
|
const stderr = session.pendingStderr.join("");
|
||||||
session.pendingStdout = [];
|
session.pendingStdout = [];
|
||||||
session.pendingStderr = [];
|
session.pendingStderr = [];
|
||||||
|
session.pendingStdoutChars = 0;
|
||||||
|
session.pendingStderrChars = 0;
|
||||||
return { stdout, stderr };
|
return { stdout, stderr };
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -155,6 +178,32 @@ export function tail(text: string, max = 2000) {
|
|||||||
return text.slice(text.length - max);
|
return text.slice(text.length - max);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
function sumPendingChars(buffer: string[]) {
|
||||||
|
let total = 0;
|
||||||
|
for (const chunk of buffer) total += chunk.length;
|
||||||
|
return total;
|
||||||
|
}
|
||||||
|
|
||||||
|
function capPendingBuffer(buffer: string[], pendingChars: number, cap: number) {
|
||||||
|
if (pendingChars <= cap) return pendingChars;
|
||||||
|
const last = buffer.at(-1);
|
||||||
|
if (last && last.length >= cap) {
|
||||||
|
buffer.length = 0;
|
||||||
|
buffer.push(last.slice(last.length - cap));
|
||||||
|
return cap;
|
||||||
|
}
|
||||||
|
while (buffer.length && pendingChars - buffer[0].length >= cap) {
|
||||||
|
pendingChars -= buffer[0].length;
|
||||||
|
buffer.shift();
|
||||||
|
}
|
||||||
|
if (buffer.length && pendingChars > cap) {
|
||||||
|
const overflow = pendingChars - cap;
|
||||||
|
buffer[0] = buffer[0].slice(overflow);
|
||||||
|
pendingChars = cap;
|
||||||
|
}
|
||||||
|
return pendingChars;
|
||||||
|
}
|
||||||
|
|
||||||
export function trimWithCap(text: string, max: number) {
|
export function trimWithCap(text: string, max: number) {
|
||||||
if (text.length <= max) return text;
|
if (text.length <= max) return text;
|
||||||
return text.slice(text.length - max);
|
return text.slice(text.length - max);
|
||||||
|
|||||||
@@ -37,6 +37,12 @@ const DEFAULT_MAX_OUTPUT = clampNumber(
|
|||||||
1_000,
|
1_000,
|
||||||
150_000,
|
150_000,
|
||||||
);
|
);
|
||||||
|
const DEFAULT_PENDING_MAX_OUTPUT = clampNumber(
|
||||||
|
readEnvInt("CLAWDBOT_BASH_PENDING_MAX_OUTPUT_CHARS"),
|
||||||
|
30_000,
|
||||||
|
1_000,
|
||||||
|
150_000,
|
||||||
|
);
|
||||||
const DEFAULT_PATH =
|
const DEFAULT_PATH =
|
||||||
process.env.PATH ?? "/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin";
|
process.env.PATH ?? "/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin";
|
||||||
const DEFAULT_NOTIFY_TAIL_CHARS = 400;
|
const DEFAULT_NOTIFY_TAIL_CHARS = 400;
|
||||||
@@ -189,6 +195,7 @@ export function createExecTool(
|
|||||||
}
|
}
|
||||||
|
|
||||||
const maxOutput = DEFAULT_MAX_OUTPUT;
|
const maxOutput = DEFAULT_MAX_OUTPUT;
|
||||||
|
const pendingMaxOutput = DEFAULT_PENDING_MAX_OUTPUT;
|
||||||
const startedAt = Date.now();
|
const startedAt = Date.now();
|
||||||
const sessionId = createSessionSlug();
|
const sessionId = createSessionSlug();
|
||||||
const warnings: string[] = [];
|
const warnings: string[] = [];
|
||||||
@@ -350,9 +357,12 @@ export function createExecTool(
|
|||||||
startedAt,
|
startedAt,
|
||||||
cwd: workdir,
|
cwd: workdir,
|
||||||
maxOutputChars: maxOutput,
|
maxOutputChars: maxOutput,
|
||||||
|
pendingMaxOutputChars: pendingMaxOutput,
|
||||||
totalOutputChars: 0,
|
totalOutputChars: 0,
|
||||||
pendingStdout: [],
|
pendingStdout: [],
|
||||||
pendingStderr: [],
|
pendingStderr: [],
|
||||||
|
pendingStdoutChars: 0,
|
||||||
|
pendingStderrChars: 0,
|
||||||
aggregated: "",
|
aggregated: "",
|
||||||
tail: "",
|
tail: "",
|
||||||
exited: false,
|
exited: false,
|
||||||
|
|||||||
Reference in New Issue
Block a user