diff --git a/CHANGELOG.md b/CHANGELOG.md index 1fd00edb2..b3be2f1ce 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -29,6 +29,7 @@ Docs: https://docs.clawd.bot - Diagnostics: export OTLP logs, correct queue depth tracking, and document message-flow telemetry. - Diagnostics: emit message-flow diagnostics across channels via shared dispatch; gate heartbeat/webhook logging. (#1244) — thanks @oscargavin. - CLI: preserve cron delivery settings when editing message payloads. (#1322) — thanks @KrauseFx. +- CLI: keep `clawdbot logs` output resilient to broken pipes while preserving progress output. - Model catalog: avoid caching import failures, log transient discovery errors, and keep partial results. (#1332) — thanks @dougvk. - Doctor: clarify plugin auto-enable hint text in the startup banner. - Gateway: clarify unauthorized handshake responses with token/password mismatch guidance. diff --git a/src/cli/logs-cli.test.ts b/src/cli/logs-cli.test.ts new file mode 100644 index 000000000..f6d08f9dc --- /dev/null +++ b/src/cli/logs-cli.test.ts @@ -0,0 +1,85 @@ +import { Command } from "commander"; +import { afterEach, describe, expect, it, vi } from "vitest"; + +const callGatewayFromCli = vi.fn(); + +vi.mock("./gateway-rpc.js", async () => { + const actual = await vi.importActual("./gateway-rpc.js"); + return { + ...actual, + callGatewayFromCli: (...args: unknown[]) => callGatewayFromCli(...args), + }; +}); + +describe("logs cli", () => { + afterEach(() => { + callGatewayFromCli.mockReset(); + }); + + it("writes output directly to stdout/stderr", async () => { + callGatewayFromCli.mockResolvedValueOnce({ + file: "/tmp/clawdbot.log", + cursor: 1, + size: 123, + lines: ["raw line"], + truncated: true, + reset: true, + }); + + const stdoutWrites: string[] = []; + const stderrWrites: string[] = []; + const stdoutSpy = vi.spyOn(process.stdout, "write").mockImplementation((chunk: unknown) => { + stdoutWrites.push(String(chunk)); + return true; + }); + const stderrSpy = vi.spyOn(process.stderr, "write").mockImplementation((chunk: unknown) => { + stderrWrites.push(String(chunk)); + return true; + }); + + const { registerLogsCli } = await import("./logs-cli.js"); + const program = new Command(); + program.exitOverride(); + registerLogsCli(program); + + await program.parseAsync(["logs"], { from: "user" }); + + stdoutSpy.mockRestore(); + stderrSpy.mockRestore(); + + expect(stdoutWrites.join("")).toContain("Log file:"); + expect(stdoutWrites.join("")).toContain("raw line"); + expect(stderrWrites.join("")).toContain("Log tail truncated"); + expect(stderrWrites.join("")).toContain("Log cursor reset"); + }); + + it("warns when the output pipe closes", async () => { + callGatewayFromCli.mockResolvedValueOnce({ + file: "/tmp/clawdbot.log", + lines: ["line one"], + }); + + const stderrWrites: string[] = []; + const stdoutSpy = vi.spyOn(process.stdout, "write").mockImplementation(() => { + const err = new Error("EPIPE") as NodeJS.ErrnoException; + err.code = "EPIPE"; + throw err; + }); + const stderrSpy = vi.spyOn(process.stderr, "write").mockImplementation((chunk: unknown) => { + stderrWrites.push(String(chunk)); + return true; + }); + + const { registerLogsCli } = await import("./logs-cli.js"); + const program = new Command(); + program.exitOverride(); + registerLogsCli(program); + + await program.parseAsync(["logs"], { from: "user" }); + + stdoutSpy.mockRestore(); + stderrSpy.mockRestore(); + + expect(stderrWrites.join("")).toContain("output stdout closed"); + }); +}); diff --git a/src/cli/logs-cli.ts b/src/cli/logs-cli.ts index c97f07961..df9a5c435 100644 --- a/src/cli/logs-cli.ts +++ b/src/cli/logs-cli.ts @@ -1,13 +1,13 @@ import { setTimeout as delay } from "node:timers/promises"; import type { Command } from "commander"; -import { buildGatewayConnectionDetails, callGateway } from "../gateway/call.js"; +import { buildGatewayConnectionDetails } from "../gateway/call.js"; import { parseLogLine } from "../logging/parse-log-line.js"; import { formatDocsLink } from "../terminal/links.js"; import { clearActiveProgressLine } from "../terminal/progress-line.js"; +import { createSafeStreamWriter } from "../terminal/stream-writer.js"; import { colorize, isRich, theme } from "../terminal/theme.js"; -import { GATEWAY_CLIENT_MODES, GATEWAY_CLIENT_NAMES } from "../utils/message-channel.js"; import { formatCliCommand } from "./command-format.js"; -import { addGatewayClientOptions } from "./gateway-rpc.js"; +import { addGatewayClientOptions, callGatewayFromCli } from "./gateway-rpc.js"; type LogsTailPayload = { file?: string; @@ -44,15 +44,10 @@ async function fetchLogs( ): Promise { const limit = parsePositiveInt(opts.limit, 200); const maxBytes = parsePositiveInt(opts.maxBytes, 250_000); - const payload = await callGateway({ - url: opts.url, - token: opts.token, - method: "logs.tail", - params: { cursor, limit, maxBytes }, - expectFinal: Boolean(opts.expectFinal), - timeoutMs: Number(opts.timeout ?? 10_000), - clientName: GATEWAY_CLIENT_NAMES.CLI, - mode: GATEWAY_CLIENT_MODES.CLI, + const payload = await callGatewayFromCli("logs.tail", opts, { + cursor, + limit, + maxBytes, }); if (!payload || typeof payload !== "object") { throw new Error("Unexpected logs.tail response"); @@ -110,25 +105,28 @@ function formatLogLine( return [head, messageValue].filter(Boolean).join(" ").trim(); } -function writeLine(text: string, stream: NodeJS.WriteStream) { - // Avoid feeding CLI output back into the log file via console capture. - clearActiveProgressLine(); - stream.write(`${text}\n`); -} +function createLogWriters() { + const writer = createSafeStreamWriter({ + beforeWrite: () => clearActiveProgressLine(), + onBrokenPipe: (err, stream) => { + const code = err.code ?? "EPIPE"; + const target = stream === process.stdout ? "stdout" : "stderr"; + const message = `clawdbot logs: output ${target} closed (${code}). Stopping tail.`; + try { + clearActiveProgressLine(); + process.stderr.write(`${message}\n`); + } catch { + // ignore secondary failures while reporting the broken pipe + } + }, + }); -function logLine(text: string) { - writeLine(text, process.stdout); -} - -function errorLine(text: string) { - writeLine(text, process.stderr); -} - -function emitJsonLine(payload: Record, toStdErr = false) { - const text = `${JSON.stringify(payload)}\n`; - clearActiveProgressLine(); - if (toStdErr) process.stderr.write(text); - else process.stdout.write(text); + return { + logLine: (text: string) => writer.writeLine(process.stdout, text), + errorLine: (text: string) => writer.writeLine(process.stderr, text), + emitJsonLine: (payload: Record, toStdErr = false) => + writer.write(toStdErr ? process.stderr : process.stdout, `${JSON.stringify(payload)}\n`), + }; } function emitGatewayError( @@ -136,6 +134,8 @@ function emitGatewayError( opts: LogsCliOptions, mode: "json" | "text", rich: boolean, + emitJsonLine: (payload: Record, toStdErr?: boolean) => boolean, + errorLine: (text: string) => boolean, ) { const details = buildGatewayConnectionDetails({ url: opts.url }); const message = "Gateway not reachable. Is it running and accessible?"; @@ -143,21 +143,25 @@ function emitGatewayError( const errorText = err instanceof Error ? err.message : String(err); if (mode === "json") { - emitJsonLine( - { - type: "error", - message, - error: errorText, - details, - hint, - }, - true, - ); + if ( + !emitJsonLine( + { + type: "error", + message, + error: errorText, + details, + hint, + }, + true, + ) + ) { + return; + } return; } - errorLine(colorize(rich, theme.error, message)); - errorLine(details.message); + if (!errorLine(colorize(rich, theme.error, message))) return; + if (!errorLine(details.message)) return; errorLine(colorize(rich, theme.muted, hint)); } @@ -180,6 +184,7 @@ export function registerLogsCli(program: Command) { addGatewayClientOptions(logs); logs.action(async (opts: LogsCliOptions) => { + const { logLine, errorLine, emitJsonLine } = createLogWriters(); const interval = parsePositiveInt(opts.interval, 1000); let cursor: number | undefined; let first = true; @@ -192,58 +197,84 @@ export function registerLogsCli(program: Command) { try { payload = await fetchLogs(opts, cursor); } catch (err) { - emitGatewayError(err, opts, jsonMode ? "json" : "text", rich); + emitGatewayError(err, opts, jsonMode ? "json" : "text", rich, emitJsonLine, errorLine); process.exit(1); return; } const lines = Array.isArray(payload.lines) ? payload.lines : []; if (jsonMode) { if (first) { - emitJsonLine({ - type: "meta", - file: payload.file, - cursor: payload.cursor, - size: payload.size, - }); + if ( + !emitJsonLine({ + type: "meta", + file: payload.file, + cursor: payload.cursor, + size: payload.size, + }) + ) { + return; + } } for (const line of lines) { const parsed = parseLogLine(line); if (parsed) { - emitJsonLine({ type: "log", ...parsed }); + if (!emitJsonLine({ type: "log", ...parsed })) { + return; + } } else { - emitJsonLine({ type: "raw", raw: line }); + if (!emitJsonLine({ type: "raw", raw: line })) { + return; + } } } if (payload.truncated) { - emitJsonLine({ - type: "notice", - message: "Log tail truncated (increase --max-bytes).", - }); + if ( + !emitJsonLine({ + type: "notice", + message: "Log tail truncated (increase --max-bytes).", + }) + ) { + return; + } } if (payload.reset) { - emitJsonLine({ - type: "notice", - message: "Log cursor reset (file rotated).", - }); + if ( + !emitJsonLine({ + type: "notice", + message: "Log cursor reset (file rotated).", + }) + ) { + return; + } } } else { if (first && payload.file) { const prefix = pretty ? colorize(rich, theme.muted, "Log file:") : "Log file:"; - logLine(`${prefix} ${payload.file}`); + if (!logLine(`${prefix} ${payload.file}`)) { + return; + } } for (const line of lines) { - logLine( - formatLogLine(line, { - pretty, - rich, - }), - ); + if ( + !logLine( + formatLogLine(line, { + pretty, + rich, + }), + ) + ) { + return; + } } if (payload.truncated) { - errorLine("Log tail truncated (increase --max-bytes)."); + if (!errorLine("Log tail truncated (increase --max-bytes).")) { + return; + } } if (payload.reset) { - errorLine("Log cursor reset (file rotated)."); + if (!errorLine("Log cursor reset (file rotated).")) { + return; + } } } cursor = diff --git a/src/terminal/stream-writer.test.ts b/src/terminal/stream-writer.test.ts new file mode 100644 index 000000000..429199a83 --- /dev/null +++ b/src/terminal/stream-writer.test.ts @@ -0,0 +1,42 @@ +import { describe, expect, it, vi } from "vitest"; + +import { createSafeStreamWriter } from "./stream-writer.js"; + +describe("createSafeStreamWriter", () => { + it("signals broken pipes and closes the writer", () => { + const onBrokenPipe = vi.fn(); + const writer = createSafeStreamWriter({ onBrokenPipe }); + const stream = { + write: vi.fn(() => { + const err = new Error("EPIPE") as NodeJS.ErrnoException; + err.code = "EPIPE"; + throw err; + }), + } as unknown as NodeJS.WriteStream; + + expect(writer.writeLine(stream, "hello")).toBe(false); + expect(writer.isClosed()).toBe(true); + expect(onBrokenPipe).toHaveBeenCalledTimes(1); + + onBrokenPipe.mockClear(); + expect(writer.writeLine(stream, "again")).toBe(false); + expect(onBrokenPipe).toHaveBeenCalledTimes(0); + }); + + it("treats broken pipes from beforeWrite as closed", () => { + const onBrokenPipe = vi.fn(); + const writer = createSafeStreamWriter({ + onBrokenPipe, + beforeWrite: () => { + const err = new Error("EIO") as NodeJS.ErrnoException; + err.code = "EIO"; + throw err; + }, + }); + const stream = { write: vi.fn(() => true) } as unknown as NodeJS.WriteStream; + + expect(writer.write(stream, "hi")).toBe(false); + expect(writer.isClosed()).toBe(true); + expect(onBrokenPipe).toHaveBeenCalledTimes(1); + }); +}); diff --git a/src/terminal/stream-writer.ts b/src/terminal/stream-writer.ts new file mode 100644 index 000000000..cd471ea5f --- /dev/null +++ b/src/terminal/stream-writer.ts @@ -0,0 +1,64 @@ +export type SafeStreamWriterOptions = { + beforeWrite?: () => void; + onBrokenPipe?: (err: NodeJS.ErrnoException, stream: NodeJS.WriteStream) => void; +}; + +export type SafeStreamWriter = { + write: (stream: NodeJS.WriteStream, text: string) => boolean; + writeLine: (stream: NodeJS.WriteStream, text: string) => boolean; + reset: () => void; + isClosed: () => boolean; +}; + +function isBrokenPipeError(err: unknown): err is NodeJS.ErrnoException { + const code = (err as NodeJS.ErrnoException)?.code; + return code === "EPIPE" || code === "EIO"; +} + +export function createSafeStreamWriter(options: SafeStreamWriterOptions = {}): SafeStreamWriter { + let closed = false; + let notified = false; + + const noteBrokenPipe = (err: NodeJS.ErrnoException, stream: NodeJS.WriteStream) => { + if (notified) return; + notified = true; + options.onBrokenPipe?.(err, stream); + }; + + const handleError = (err: unknown, stream: NodeJS.WriteStream): boolean => { + if (!isBrokenPipeError(err)) { + throw err; + } + closed = true; + noteBrokenPipe(err, stream); + return false; + }; + + const write = (stream: NodeJS.WriteStream, text: string): boolean => { + if (closed) return false; + try { + options.beforeWrite?.(); + } catch (err) { + return handleError(err, process.stderr); + } + try { + stream.write(text); + return !closed; + } catch (err) { + return handleError(err, stream); + } + }; + + const writeLine = (stream: NodeJS.WriteStream, text: string): boolean => + write(stream, `${text}\n`); + + return { + write, + writeLine, + reset: () => { + closed = false; + notified = false; + }, + isClosed: () => closed, + }; +}