refactor: harden log stream writes
This commit is contained in:
@@ -29,6 +29,7 @@ Docs: https://docs.clawd.bot
|
|||||||
- Diagnostics: export OTLP logs, correct queue depth tracking, and document message-flow telemetry.
|
- Diagnostics: export OTLP logs, correct queue depth tracking, and document message-flow telemetry.
|
||||||
- Diagnostics: emit message-flow diagnostics across channels via shared dispatch; gate heartbeat/webhook logging. (#1244) — thanks @oscargavin.
|
- Diagnostics: emit message-flow diagnostics across channels via shared dispatch; gate heartbeat/webhook logging. (#1244) — thanks @oscargavin.
|
||||||
- CLI: preserve cron delivery settings when editing message payloads. (#1322) — thanks @KrauseFx.
|
- CLI: preserve cron delivery settings when editing message payloads. (#1322) — thanks @KrauseFx.
|
||||||
|
- CLI: keep `clawdbot logs` output resilient to broken pipes while preserving progress output.
|
||||||
- Model catalog: avoid caching import failures, log transient discovery errors, and keep partial results. (#1332) — thanks @dougvk.
|
- Model catalog: avoid caching import failures, log transient discovery errors, and keep partial results. (#1332) — thanks @dougvk.
|
||||||
- Doctor: clarify plugin auto-enable hint text in the startup banner.
|
- Doctor: clarify plugin auto-enable hint text in the startup banner.
|
||||||
- Gateway: clarify unauthorized handshake responses with token/password mismatch guidance.
|
- Gateway: clarify unauthorized handshake responses with token/password mismatch guidance.
|
||||||
|
|||||||
85
src/cli/logs-cli.test.ts
Normal file
85
src/cli/logs-cli.test.ts
Normal file
@@ -0,0 +1,85 @@
|
|||||||
|
import { Command } from "commander";
|
||||||
|
import { afterEach, describe, expect, it, vi } from "vitest";
|
||||||
|
|
||||||
|
const callGatewayFromCli = vi.fn();
|
||||||
|
|
||||||
|
vi.mock("./gateway-rpc.js", async () => {
|
||||||
|
const actual = await vi.importActual<typeof import("./gateway-rpc.js")>("./gateway-rpc.js");
|
||||||
|
return {
|
||||||
|
...actual,
|
||||||
|
callGatewayFromCli: (...args: unknown[]) => callGatewayFromCli(...args),
|
||||||
|
};
|
||||||
|
});
|
||||||
|
|
||||||
|
describe("logs cli", () => {
|
||||||
|
afterEach(() => {
|
||||||
|
callGatewayFromCli.mockReset();
|
||||||
|
});
|
||||||
|
|
||||||
|
it("writes output directly to stdout/stderr", async () => {
|
||||||
|
callGatewayFromCli.mockResolvedValueOnce({
|
||||||
|
file: "/tmp/clawdbot.log",
|
||||||
|
cursor: 1,
|
||||||
|
size: 123,
|
||||||
|
lines: ["raw line"],
|
||||||
|
truncated: true,
|
||||||
|
reset: true,
|
||||||
|
});
|
||||||
|
|
||||||
|
const stdoutWrites: string[] = [];
|
||||||
|
const stderrWrites: string[] = [];
|
||||||
|
const stdoutSpy = vi.spyOn(process.stdout, "write").mockImplementation((chunk: unknown) => {
|
||||||
|
stdoutWrites.push(String(chunk));
|
||||||
|
return true;
|
||||||
|
});
|
||||||
|
const stderrSpy = vi.spyOn(process.stderr, "write").mockImplementation((chunk: unknown) => {
|
||||||
|
stderrWrites.push(String(chunk));
|
||||||
|
return true;
|
||||||
|
});
|
||||||
|
|
||||||
|
const { registerLogsCli } = await import("./logs-cli.js");
|
||||||
|
const program = new Command();
|
||||||
|
program.exitOverride();
|
||||||
|
registerLogsCli(program);
|
||||||
|
|
||||||
|
await program.parseAsync(["logs"], { from: "user" });
|
||||||
|
|
||||||
|
stdoutSpy.mockRestore();
|
||||||
|
stderrSpy.mockRestore();
|
||||||
|
|
||||||
|
expect(stdoutWrites.join("")).toContain("Log file:");
|
||||||
|
expect(stdoutWrites.join("")).toContain("raw line");
|
||||||
|
expect(stderrWrites.join("")).toContain("Log tail truncated");
|
||||||
|
expect(stderrWrites.join("")).toContain("Log cursor reset");
|
||||||
|
});
|
||||||
|
|
||||||
|
it("warns when the output pipe closes", async () => {
|
||||||
|
callGatewayFromCli.mockResolvedValueOnce({
|
||||||
|
file: "/tmp/clawdbot.log",
|
||||||
|
lines: ["line one"],
|
||||||
|
});
|
||||||
|
|
||||||
|
const stderrWrites: string[] = [];
|
||||||
|
const stdoutSpy = vi.spyOn(process.stdout, "write").mockImplementation(() => {
|
||||||
|
const err = new Error("EPIPE") as NodeJS.ErrnoException;
|
||||||
|
err.code = "EPIPE";
|
||||||
|
throw err;
|
||||||
|
});
|
||||||
|
const stderrSpy = vi.spyOn(process.stderr, "write").mockImplementation((chunk: unknown) => {
|
||||||
|
stderrWrites.push(String(chunk));
|
||||||
|
return true;
|
||||||
|
});
|
||||||
|
|
||||||
|
const { registerLogsCli } = await import("./logs-cli.js");
|
||||||
|
const program = new Command();
|
||||||
|
program.exitOverride();
|
||||||
|
registerLogsCli(program);
|
||||||
|
|
||||||
|
await program.parseAsync(["logs"], { from: "user" });
|
||||||
|
|
||||||
|
stdoutSpy.mockRestore();
|
||||||
|
stderrSpy.mockRestore();
|
||||||
|
|
||||||
|
expect(stderrWrites.join("")).toContain("output stdout closed");
|
||||||
|
});
|
||||||
|
});
|
||||||
@@ -1,13 +1,13 @@
|
|||||||
import { setTimeout as delay } from "node:timers/promises";
|
import { setTimeout as delay } from "node:timers/promises";
|
||||||
import type { Command } from "commander";
|
import type { Command } from "commander";
|
||||||
import { buildGatewayConnectionDetails, callGateway } from "../gateway/call.js";
|
import { buildGatewayConnectionDetails } from "../gateway/call.js";
|
||||||
import { parseLogLine } from "../logging/parse-log-line.js";
|
import { parseLogLine } from "../logging/parse-log-line.js";
|
||||||
import { formatDocsLink } from "../terminal/links.js";
|
import { formatDocsLink } from "../terminal/links.js";
|
||||||
import { clearActiveProgressLine } from "../terminal/progress-line.js";
|
import { clearActiveProgressLine } from "../terminal/progress-line.js";
|
||||||
|
import { createSafeStreamWriter } from "../terminal/stream-writer.js";
|
||||||
import { colorize, isRich, theme } from "../terminal/theme.js";
|
import { colorize, isRich, theme } from "../terminal/theme.js";
|
||||||
import { GATEWAY_CLIENT_MODES, GATEWAY_CLIENT_NAMES } from "../utils/message-channel.js";
|
|
||||||
import { formatCliCommand } from "./command-format.js";
|
import { formatCliCommand } from "./command-format.js";
|
||||||
import { addGatewayClientOptions } from "./gateway-rpc.js";
|
import { addGatewayClientOptions, callGatewayFromCli } from "./gateway-rpc.js";
|
||||||
|
|
||||||
type LogsTailPayload = {
|
type LogsTailPayload = {
|
||||||
file?: string;
|
file?: string;
|
||||||
@@ -44,15 +44,10 @@ async function fetchLogs(
|
|||||||
): Promise<LogsTailPayload> {
|
): Promise<LogsTailPayload> {
|
||||||
const limit = parsePositiveInt(opts.limit, 200);
|
const limit = parsePositiveInt(opts.limit, 200);
|
||||||
const maxBytes = parsePositiveInt(opts.maxBytes, 250_000);
|
const maxBytes = parsePositiveInt(opts.maxBytes, 250_000);
|
||||||
const payload = await callGateway<LogsTailPayload>({
|
const payload = await callGatewayFromCli("logs.tail", opts, {
|
||||||
url: opts.url,
|
cursor,
|
||||||
token: opts.token,
|
limit,
|
||||||
method: "logs.tail",
|
maxBytes,
|
||||||
params: { cursor, limit, maxBytes },
|
|
||||||
expectFinal: Boolean(opts.expectFinal),
|
|
||||||
timeoutMs: Number(opts.timeout ?? 10_000),
|
|
||||||
clientName: GATEWAY_CLIENT_NAMES.CLI,
|
|
||||||
mode: GATEWAY_CLIENT_MODES.CLI,
|
|
||||||
});
|
});
|
||||||
if (!payload || typeof payload !== "object") {
|
if (!payload || typeof payload !== "object") {
|
||||||
throw new Error("Unexpected logs.tail response");
|
throw new Error("Unexpected logs.tail response");
|
||||||
@@ -110,25 +105,28 @@ function formatLogLine(
|
|||||||
return [head, messageValue].filter(Boolean).join(" ").trim();
|
return [head, messageValue].filter(Boolean).join(" ").trim();
|
||||||
}
|
}
|
||||||
|
|
||||||
function writeLine(text: string, stream: NodeJS.WriteStream) {
|
function createLogWriters() {
|
||||||
// Avoid feeding CLI output back into the log file via console capture.
|
const writer = createSafeStreamWriter({
|
||||||
clearActiveProgressLine();
|
beforeWrite: () => clearActiveProgressLine(),
|
||||||
stream.write(`${text}\n`);
|
onBrokenPipe: (err, stream) => {
|
||||||
}
|
const code = err.code ?? "EPIPE";
|
||||||
|
const target = stream === process.stdout ? "stdout" : "stderr";
|
||||||
|
const message = `clawdbot logs: output ${target} closed (${code}). Stopping tail.`;
|
||||||
|
try {
|
||||||
|
clearActiveProgressLine();
|
||||||
|
process.stderr.write(`${message}\n`);
|
||||||
|
} catch {
|
||||||
|
// ignore secondary failures while reporting the broken pipe
|
||||||
|
}
|
||||||
|
},
|
||||||
|
});
|
||||||
|
|
||||||
function logLine(text: string) {
|
return {
|
||||||
writeLine(text, process.stdout);
|
logLine: (text: string) => writer.writeLine(process.stdout, text),
|
||||||
}
|
errorLine: (text: string) => writer.writeLine(process.stderr, text),
|
||||||
|
emitJsonLine: (payload: Record<string, unknown>, toStdErr = false) =>
|
||||||
function errorLine(text: string) {
|
writer.write(toStdErr ? process.stderr : process.stdout, `${JSON.stringify(payload)}\n`),
|
||||||
writeLine(text, process.stderr);
|
};
|
||||||
}
|
|
||||||
|
|
||||||
function emitJsonLine(payload: Record<string, unknown>, toStdErr = false) {
|
|
||||||
const text = `${JSON.stringify(payload)}\n`;
|
|
||||||
clearActiveProgressLine();
|
|
||||||
if (toStdErr) process.stderr.write(text);
|
|
||||||
else process.stdout.write(text);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
function emitGatewayError(
|
function emitGatewayError(
|
||||||
@@ -136,6 +134,8 @@ function emitGatewayError(
|
|||||||
opts: LogsCliOptions,
|
opts: LogsCliOptions,
|
||||||
mode: "json" | "text",
|
mode: "json" | "text",
|
||||||
rich: boolean,
|
rich: boolean,
|
||||||
|
emitJsonLine: (payload: Record<string, unknown>, toStdErr?: boolean) => boolean,
|
||||||
|
errorLine: (text: string) => boolean,
|
||||||
) {
|
) {
|
||||||
const details = buildGatewayConnectionDetails({ url: opts.url });
|
const details = buildGatewayConnectionDetails({ url: opts.url });
|
||||||
const message = "Gateway not reachable. Is it running and accessible?";
|
const message = "Gateway not reachable. Is it running and accessible?";
|
||||||
@@ -143,21 +143,25 @@ function emitGatewayError(
|
|||||||
const errorText = err instanceof Error ? err.message : String(err);
|
const errorText = err instanceof Error ? err.message : String(err);
|
||||||
|
|
||||||
if (mode === "json") {
|
if (mode === "json") {
|
||||||
emitJsonLine(
|
if (
|
||||||
{
|
!emitJsonLine(
|
||||||
type: "error",
|
{
|
||||||
message,
|
type: "error",
|
||||||
error: errorText,
|
message,
|
||||||
details,
|
error: errorText,
|
||||||
hint,
|
details,
|
||||||
},
|
hint,
|
||||||
true,
|
},
|
||||||
);
|
true,
|
||||||
|
)
|
||||||
|
) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
errorLine(colorize(rich, theme.error, message));
|
if (!errorLine(colorize(rich, theme.error, message))) return;
|
||||||
errorLine(details.message);
|
if (!errorLine(details.message)) return;
|
||||||
errorLine(colorize(rich, theme.muted, hint));
|
errorLine(colorize(rich, theme.muted, hint));
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -180,6 +184,7 @@ export function registerLogsCli(program: Command) {
|
|||||||
addGatewayClientOptions(logs);
|
addGatewayClientOptions(logs);
|
||||||
|
|
||||||
logs.action(async (opts: LogsCliOptions) => {
|
logs.action(async (opts: LogsCliOptions) => {
|
||||||
|
const { logLine, errorLine, emitJsonLine } = createLogWriters();
|
||||||
const interval = parsePositiveInt(opts.interval, 1000);
|
const interval = parsePositiveInt(opts.interval, 1000);
|
||||||
let cursor: number | undefined;
|
let cursor: number | undefined;
|
||||||
let first = true;
|
let first = true;
|
||||||
@@ -192,58 +197,84 @@ export function registerLogsCli(program: Command) {
|
|||||||
try {
|
try {
|
||||||
payload = await fetchLogs(opts, cursor);
|
payload = await fetchLogs(opts, cursor);
|
||||||
} catch (err) {
|
} catch (err) {
|
||||||
emitGatewayError(err, opts, jsonMode ? "json" : "text", rich);
|
emitGatewayError(err, opts, jsonMode ? "json" : "text", rich, emitJsonLine, errorLine);
|
||||||
process.exit(1);
|
process.exit(1);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
const lines = Array.isArray(payload.lines) ? payload.lines : [];
|
const lines = Array.isArray(payload.lines) ? payload.lines : [];
|
||||||
if (jsonMode) {
|
if (jsonMode) {
|
||||||
if (first) {
|
if (first) {
|
||||||
emitJsonLine({
|
if (
|
||||||
type: "meta",
|
!emitJsonLine({
|
||||||
file: payload.file,
|
type: "meta",
|
||||||
cursor: payload.cursor,
|
file: payload.file,
|
||||||
size: payload.size,
|
cursor: payload.cursor,
|
||||||
});
|
size: payload.size,
|
||||||
|
})
|
||||||
|
) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
for (const line of lines) {
|
for (const line of lines) {
|
||||||
const parsed = parseLogLine(line);
|
const parsed = parseLogLine(line);
|
||||||
if (parsed) {
|
if (parsed) {
|
||||||
emitJsonLine({ type: "log", ...parsed });
|
if (!emitJsonLine({ type: "log", ...parsed })) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
emitJsonLine({ type: "raw", raw: line });
|
if (!emitJsonLine({ type: "raw", raw: line })) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (payload.truncated) {
|
if (payload.truncated) {
|
||||||
emitJsonLine({
|
if (
|
||||||
type: "notice",
|
!emitJsonLine({
|
||||||
message: "Log tail truncated (increase --max-bytes).",
|
type: "notice",
|
||||||
});
|
message: "Log tail truncated (increase --max-bytes).",
|
||||||
|
})
|
||||||
|
) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
if (payload.reset) {
|
if (payload.reset) {
|
||||||
emitJsonLine({
|
if (
|
||||||
type: "notice",
|
!emitJsonLine({
|
||||||
message: "Log cursor reset (file rotated).",
|
type: "notice",
|
||||||
});
|
message: "Log cursor reset (file rotated).",
|
||||||
|
})
|
||||||
|
) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
if (first && payload.file) {
|
if (first && payload.file) {
|
||||||
const prefix = pretty ? colorize(rich, theme.muted, "Log file:") : "Log file:";
|
const prefix = pretty ? colorize(rich, theme.muted, "Log file:") : "Log file:";
|
||||||
logLine(`${prefix} ${payload.file}`);
|
if (!logLine(`${prefix} ${payload.file}`)) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
for (const line of lines) {
|
for (const line of lines) {
|
||||||
logLine(
|
if (
|
||||||
formatLogLine(line, {
|
!logLine(
|
||||||
pretty,
|
formatLogLine(line, {
|
||||||
rich,
|
pretty,
|
||||||
}),
|
rich,
|
||||||
);
|
}),
|
||||||
|
)
|
||||||
|
) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
if (payload.truncated) {
|
if (payload.truncated) {
|
||||||
errorLine("Log tail truncated (increase --max-bytes).");
|
if (!errorLine("Log tail truncated (increase --max-bytes).")) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
if (payload.reset) {
|
if (payload.reset) {
|
||||||
errorLine("Log cursor reset (file rotated).");
|
if (!errorLine("Log cursor reset (file rotated).")) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
cursor =
|
cursor =
|
||||||
|
|||||||
42
src/terminal/stream-writer.test.ts
Normal file
42
src/terminal/stream-writer.test.ts
Normal file
@@ -0,0 +1,42 @@
|
|||||||
|
import { describe, expect, it, vi } from "vitest";
|
||||||
|
|
||||||
|
import { createSafeStreamWriter } from "./stream-writer.js";
|
||||||
|
|
||||||
|
describe("createSafeStreamWriter", () => {
|
||||||
|
it("signals broken pipes and closes the writer", () => {
|
||||||
|
const onBrokenPipe = vi.fn();
|
||||||
|
const writer = createSafeStreamWriter({ onBrokenPipe });
|
||||||
|
const stream = {
|
||||||
|
write: vi.fn(() => {
|
||||||
|
const err = new Error("EPIPE") as NodeJS.ErrnoException;
|
||||||
|
err.code = "EPIPE";
|
||||||
|
throw err;
|
||||||
|
}),
|
||||||
|
} as unknown as NodeJS.WriteStream;
|
||||||
|
|
||||||
|
expect(writer.writeLine(stream, "hello")).toBe(false);
|
||||||
|
expect(writer.isClosed()).toBe(true);
|
||||||
|
expect(onBrokenPipe).toHaveBeenCalledTimes(1);
|
||||||
|
|
||||||
|
onBrokenPipe.mockClear();
|
||||||
|
expect(writer.writeLine(stream, "again")).toBe(false);
|
||||||
|
expect(onBrokenPipe).toHaveBeenCalledTimes(0);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("treats broken pipes from beforeWrite as closed", () => {
|
||||||
|
const onBrokenPipe = vi.fn();
|
||||||
|
const writer = createSafeStreamWriter({
|
||||||
|
onBrokenPipe,
|
||||||
|
beforeWrite: () => {
|
||||||
|
const err = new Error("EIO") as NodeJS.ErrnoException;
|
||||||
|
err.code = "EIO";
|
||||||
|
throw err;
|
||||||
|
},
|
||||||
|
});
|
||||||
|
const stream = { write: vi.fn(() => true) } as unknown as NodeJS.WriteStream;
|
||||||
|
|
||||||
|
expect(writer.write(stream, "hi")).toBe(false);
|
||||||
|
expect(writer.isClosed()).toBe(true);
|
||||||
|
expect(onBrokenPipe).toHaveBeenCalledTimes(1);
|
||||||
|
});
|
||||||
|
});
|
||||||
64
src/terminal/stream-writer.ts
Normal file
64
src/terminal/stream-writer.ts
Normal file
@@ -0,0 +1,64 @@
|
|||||||
|
export type SafeStreamWriterOptions = {
|
||||||
|
beforeWrite?: () => void;
|
||||||
|
onBrokenPipe?: (err: NodeJS.ErrnoException, stream: NodeJS.WriteStream) => void;
|
||||||
|
};
|
||||||
|
|
||||||
|
export type SafeStreamWriter = {
|
||||||
|
write: (stream: NodeJS.WriteStream, text: string) => boolean;
|
||||||
|
writeLine: (stream: NodeJS.WriteStream, text: string) => boolean;
|
||||||
|
reset: () => void;
|
||||||
|
isClosed: () => boolean;
|
||||||
|
};
|
||||||
|
|
||||||
|
function isBrokenPipeError(err: unknown): err is NodeJS.ErrnoException {
|
||||||
|
const code = (err as NodeJS.ErrnoException)?.code;
|
||||||
|
return code === "EPIPE" || code === "EIO";
|
||||||
|
}
|
||||||
|
|
||||||
|
export function createSafeStreamWriter(options: SafeStreamWriterOptions = {}): SafeStreamWriter {
|
||||||
|
let closed = false;
|
||||||
|
let notified = false;
|
||||||
|
|
||||||
|
const noteBrokenPipe = (err: NodeJS.ErrnoException, stream: NodeJS.WriteStream) => {
|
||||||
|
if (notified) return;
|
||||||
|
notified = true;
|
||||||
|
options.onBrokenPipe?.(err, stream);
|
||||||
|
};
|
||||||
|
|
||||||
|
const handleError = (err: unknown, stream: NodeJS.WriteStream): boolean => {
|
||||||
|
if (!isBrokenPipeError(err)) {
|
||||||
|
throw err;
|
||||||
|
}
|
||||||
|
closed = true;
|
||||||
|
noteBrokenPipe(err, stream);
|
||||||
|
return false;
|
||||||
|
};
|
||||||
|
|
||||||
|
const write = (stream: NodeJS.WriteStream, text: string): boolean => {
|
||||||
|
if (closed) return false;
|
||||||
|
try {
|
||||||
|
options.beforeWrite?.();
|
||||||
|
} catch (err) {
|
||||||
|
return handleError(err, process.stderr);
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
stream.write(text);
|
||||||
|
return !closed;
|
||||||
|
} catch (err) {
|
||||||
|
return handleError(err, stream);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
const writeLine = (stream: NodeJS.WriteStream, text: string): boolean =>
|
||||||
|
write(stream, `${text}\n`);
|
||||||
|
|
||||||
|
return {
|
||||||
|
write,
|
||||||
|
writeLine,
|
||||||
|
reset: () => {
|
||||||
|
closed = false;
|
||||||
|
notified = false;
|
||||||
|
},
|
||||||
|
isClosed: () => closed,
|
||||||
|
};
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user