diff --git a/src/cli/program.ts b/src/cli/program.ts index e5521ca46..424d81dbc 100644 --- a/src/cli/program.ts +++ b/src/cli/program.ts @@ -1,17 +1,13 @@ import chalk from "chalk"; import { Command } from "commander"; import { agentCommand } from "../commands/agent.js"; -import { getHealthSnapshot, healthCommand, type HealthSummary } from "../commands/health.js"; +import { healthCommand } from "../commands/health.js"; import { sendCommand } from "../commands/send.js"; import { sessionsCommand } from "../commands/sessions.js"; -import { getStatusSummary, statusCommand, type StatusSummary } from "../commands/status.js"; +import { statusCommand } from "../commands/status.js"; import { loadConfig } from "../config/config.js"; import { danger, info, setVerbose } from "../globals.js"; import { startControlChannel } from "../infra/control-channel.js"; -import { - getLastHeartbeatEvent, - onHeartbeatEvent, -} from "../infra/heartbeat-events.js"; import { getResolvedLoggerSettings } from "../logging.js"; import { loginWeb, @@ -22,7 +18,8 @@ import { setHeartbeatsEnabled, type WebMonitorTuning, } from "../provider-web.js"; -import { defaultRuntime, type RuntimeEnv } from "../runtime.js"; +import { runRpcLoop } from "../rpc/loop.js"; +import { defaultRuntime } from "../runtime.js"; import { VERSION } from "../version.js"; import { resolveHeartbeatSeconds, @@ -33,12 +30,6 @@ import { startWebChatServer, } from "../webchat/server.js"; import { createDefaultDeps, logWebSelfId } from "./deps.js"; -import { onAgentEvent } from "../infra/agent-events.js"; -import { enqueueSystemEvent } from "../infra/system-events.js"; -import { - listSystemPresence, - updateSystemPresence, -} from "../infra/system-presence.js"; export function buildProgram() { const program = new Command(); @@ -241,148 +232,8 @@ Examples: .command("rpc") .description("Run stdin/stdout JSON RPC loop for agent sends") .action(async () => { - const { createInterface } = await import("node:readline"); - const rl = createInterface({ input: process.stdin, crlfDelay: Infinity }); - - const respond = (obj: unknown) => { - try { - console.log(JSON.stringify(obj)); - } catch (err) { - console.error(JSON.stringify({ type: "error", error: String(err) })); - } - }; - - const forwardHeartbeat = (payload: unknown) => { - respond({ type: "event", event: "heartbeat", payload }); - }; - const forwardAgent = (payload: unknown) => { - respond({ type: "event", event: "agent", payload }); - }; - - const latest = getLastHeartbeatEvent(); - if (latest) forwardHeartbeat(latest); - const stopBus = onHeartbeatEvent(forwardHeartbeat); - const stopAgentBus = onAgentEvent(forwardAgent); - - rl.on("line", async (line: string) => { - if (!line.trim()) return; - try { - const cmd = JSON.parse(line); - if (cmd.type === "status") { - respond({ type: "result", ok: true }); - return; - } - if (cmd.type === "set-heartbeats") { - setHeartbeatsEnabled(Boolean(cmd.enabled)); - respond({ type: "result", ok: true }); - return; - } - if (cmd.type === "control-request" && cmd.id && cmd.method) { - const id = String(cmd.id); - const method = String(cmd.method); - const params = (cmd.params ?? {}) as Record; - const controlRespond = (ok: boolean, payload?: unknown, error?: string) => - respond({ type: "control-response", id, ok, payload, error }); - try { - if (method === "health") { - const timeoutMs = typeof params.timeoutMs === "number" ? params.timeoutMs : undefined; - const payload = await getHealthSnapshot(timeoutMs); - controlRespond(true, payload satisfies HealthSummary); - return; - } - if (method === "status") { - const payload = await getStatusSummary(); - controlRespond(true, payload satisfies StatusSummary); - return; - } - if (method === "last-heartbeat") { - controlRespond(true, getLastHeartbeatEvent()); - return; - } - if (method === "set-heartbeats") { - setHeartbeatsEnabled(Boolean(params.enabled)); - controlRespond(true, { ok: true }); - return; - } - if (method === "system-event") { - const text = String(params.text ?? "").trim(); - if (text) { - enqueueSystemEvent(text); - updateSystemPresence(text); - } - controlRespond(true, { ok: true }); - return; - } - if (method === "system-presence") { - controlRespond(true, listSystemPresence()); - return; - } - controlRespond(false, undefined, `unknown control method: ${method}`); - } catch (err) { - controlRespond(false, undefined, String(err)); - } - return; - } - if (cmd.type !== "send" || !cmd.text) { - respond({ type: "error", error: "unsupported command" }); - return; - } - - const logs: string[] = []; - const runtime: RuntimeEnv = { - log: (msg: string) => logs.push(String(msg)), - error: (msg: string) => logs.push(String(msg)), - exit: (_code: number): never => { - throw new Error("agentCommand requested exit"); - }, - }; - - const opts: { - message: string; - to?: string; - sessionId?: string; - thinking?: string; - deliver?: boolean; - json: boolean; - } = { - message: String(cmd.text), - to: cmd.to ? String(cmd.to) : undefined, - sessionId: cmd.session ? String(cmd.session) : undefined, - thinking: cmd.thinking ? String(cmd.thinking) : undefined, - deliver: Boolean(cmd.deliver), - json: true, - }; - - try { - await agentCommand(opts, runtime, createDefaultDeps()); - const payload = extractPayload(logs); - respond({ type: "result", ok: true, payload }); - } catch (err) { - respond({ type: "error", error: String(err) }); - } - } catch (err) { - respond({ type: "error", error: `parse error: ${String(err)}` }); - } - }); - - const extractPayload = (logs: string[]) => { - for (const entry of logs.slice().reverse()) { - try { - const parsed = JSON.parse(entry); - if (parsed && typeof parsed === "object" && "payloads" in parsed) { - return parsed; - } - } catch { - // non-JSON log, ignore - } - } - return null; - }; - - await new Promise(() => {}); - - stopBus(); - stopAgentBus(); + await runRpcLoop({ input: process.stdin, output: process.stdout }); + await new Promise(() => {}); }); program diff --git a/src/rpc/loop.test.ts b/src/rpc/loop.test.ts new file mode 100644 index 000000000..3880e4cae --- /dev/null +++ b/src/rpc/loop.test.ts @@ -0,0 +1,100 @@ +import { PassThrough } from "node:stream"; + +import { beforeEach, describe, expect, it, vi } from "vitest"; + +import { runRpcLoop } from "./loop.js"; + +vi.mock("../commands/health.js", () => ({ + getHealthSnapshot: vi.fn(async () => ({ heartbeatSeconds: 42 })), +})); + +vi.mock("../commands/status.js", () => ({ + getStatusSummary: vi.fn(async () => ({ providerSummary: "ok" })), +})); + +vi.mock("../infra/heartbeat-events.js", () => ({ + getLastHeartbeatEvent: vi.fn(() => ({ ts: 1, status: "sent" })), + onHeartbeatEvent: vi.fn((cb: (p: unknown) => void) => { + // return stopper + return () => void cb({}); + }), +})); + +vi.mock("../infra/agent-events.js", () => ({ + onAgentEvent: vi.fn((_cb: (p: unknown) => void) => () => {}), +})); + +vi.mock("../infra/system-presence.js", () => ({ + enqueueSystemEvent: vi.fn(), + updateSystemPresence: vi.fn(), + listSystemPresence: vi.fn(() => [{ text: "hi" }]), +})); + +vi.mock("../commands/agent.js", () => ({ + agentCommand: vi.fn( + async (_opts, runtime: { log: (msg: string) => void }) => { + // Emit a fake payload log entry the loop will pick up + runtime.log(JSON.stringify({ payloads: [{ text: "ok" }] })); + }, + ), +})); + +vi.mock("../cli/deps.js", () => ({ + createDefaultDeps: vi.fn(() => ({})), +})); + +describe("runRpcLoop", () => { + let input: PassThrough; + let output: PassThrough; + let lines: unknown[]; + + beforeEach(() => { + input = new PassThrough(); + output = new PassThrough(); + lines = []; + output.on("data", (chunk) => { + const str = chunk.toString(); + for (const line of str.split("\n").filter(Boolean)) { + lines.push(JSON.parse(line)); + } + }); + }); + + it("responds to control-request health", async () => { + const loop = await runRpcLoop({ input, output }); + input.write('{"type":"control-request","id":"1","method":"health"}\n'); + await new Promise((r) => setTimeout(r, 50)); + loop.close(); + expect( + lines.find((l) => l.type === "control-response" && l.id === "1"), + ).toMatchObject({ + ok: true, + }); + }); + + it("forwards initial heartbeat event", async () => { + const loop = await runRpcLoop({ input, output }); + await new Promise((r) => setTimeout(r, 20)); + loop.close(); + expect(lines[0]).toMatchObject({ type: "event", event: "heartbeat" }); + }); + + it("handles send via agentCommand", async () => { + const loop = await runRpcLoop({ input, output }); + input.write('{"type":"send","text":"hi"}\n'); + await new Promise((r) => setTimeout(r, 50)); + loop.close(); + expect(lines.find((l) => l.type === "result" && l.ok)).toBeTruthy(); + }); + + it("routes system-event", async () => { + const loop = await runRpcLoop({ input, output }); + input.write( + '{"type":"control-request","id":"sys","method":"system-event","params":{"text":"ping"}}\n', + ); + await new Promise((r) => setTimeout(r, 50)); + loop.close(); + const resp = lines.find((l) => l.id === "sys"); + expect(resp).toMatchObject({ ok: true, type: "control-response" }); + }); +}); diff --git a/src/rpc/loop.ts b/src/rpc/loop.ts new file mode 100644 index 000000000..a0dfb24df --- /dev/null +++ b/src/rpc/loop.ts @@ -0,0 +1,182 @@ +import { createInterface } from "node:readline"; +import type { Readable, Writable } from "node:stream"; + +import { createDefaultDeps } from "../cli/deps.js"; +import { agentCommand } from "../commands/agent.js"; +import { getHealthSnapshot, type HealthSummary } from "../commands/health.js"; +import { getStatusSummary, type StatusSummary } from "../commands/status.js"; +import { onAgentEvent } from "../infra/agent-events.js"; +import { + getLastHeartbeatEvent, + onHeartbeatEvent, +} from "../infra/heartbeat-events.js"; +import { + enqueueSystemEvent, + listSystemPresence, + updateSystemPresence, +} from "../infra/system-presence.js"; +import { setHeartbeatsEnabled } from "../provider-web.js"; + +export type RpcLoopHandles = { close: () => void }; + +/** + * Run the stdin/stdout RPC loop used by `clawdis rpc`. + * Exposed for testing and reuse. + */ +export async function runRpcLoop(io: { + input: Readable; + output: Writable; +}): Promise { + const rl = createInterface({ input: io.input, crlfDelay: Infinity }); + + const respond = (obj: unknown) => { + try { + io.output.write(`${JSON.stringify(obj)}\n`); + } catch (err) { + io.output.write( + `${JSON.stringify({ type: "error", error: String(err) })}\n`, + ); + } + }; + + const forwardHeartbeat = (payload: unknown) => { + respond({ type: "event", event: "heartbeat", payload }); + }; + const forwardAgent = (payload: unknown) => { + respond({ type: "event", event: "agent", payload }); + }; + + const latest = getLastHeartbeatEvent(); + if (latest) forwardHeartbeat(latest); + const stopHeartbeat = onHeartbeatEvent(forwardHeartbeat); + const stopAgent = onAgentEvent(forwardAgent); + + rl.on("line", async (line: string) => { + if (!line.trim()) return; + try { + const cmd = JSON.parse(line); + if (cmd.type === "status") { + respond({ type: "result", ok: true }); + return; + } + if (cmd.type === "set-heartbeats") { + setHeartbeatsEnabled(Boolean(cmd.enabled)); + respond({ type: "result", ok: true }); + return; + } + if (cmd.type === "control-request" && cmd.id && cmd.method) { + const id = String(cmd.id); + const method = String(cmd.method); + const params = (cmd.params ?? {}) as Record; + const controlRespond = ( + ok: boolean, + payload?: unknown, + error?: string, + ) => respond({ type: "control-response", id, ok, payload, error }); + try { + if (method === "health") { + const timeoutMs = + typeof params.timeoutMs === "number" + ? params.timeoutMs + : undefined; + const payload = await getHealthSnapshot(timeoutMs); + controlRespond(true, payload satisfies HealthSummary); + return; + } + if (method === "status") { + const payload = await getStatusSummary(); + controlRespond(true, payload satisfies StatusSummary); + return; + } + if (method === "last-heartbeat") { + controlRespond(true, getLastHeartbeatEvent()); + return; + } + if (method === "set-heartbeats") { + setHeartbeatsEnabled(Boolean(params.enabled)); + controlRespond(true, { ok: true }); + return; + } + if (method === "system-event") { + const text = String(params.text ?? "").trim(); + if (text) { + enqueueSystemEvent(text); + updateSystemPresence(text); + } + controlRespond(true, { ok: true }); + return; + } + if (method === "system-presence") { + controlRespond(true, listSystemPresence()); + return; + } + controlRespond(false, undefined, `unknown control method: ${method}`); + } catch (err) { + controlRespond(false, undefined, String(err)); + } + return; + } + if (cmd.type !== "send" || !cmd.text) { + respond({ type: "error", error: "unsupported command" }); + return; + } + + const logs: string[] = []; + const runtime: RuntimeEnv = { + log: (msg: string) => logs.push(String(msg)), + error: (msg: string) => logs.push(String(msg)), + exit: (_code: number): never => { + throw new Error("agentCommand requested exit"); + }, + }; + + const opts: { + message: string; + to?: string; + sessionId?: string; + thinking?: string; + deliver?: boolean; + json: boolean; + } = { + message: String(cmd.text), + to: cmd.to ? String(cmd.to) : undefined, + sessionId: cmd.session ? String(cmd.session) : undefined, + thinking: cmd.thinking ? String(cmd.thinking) : undefined, + deliver: Boolean(cmd.deliver), + json: true, + }; + + try { + await agentCommand(opts, runtime, createDefaultDeps()); + const payload = extractPayload(logs); + respond({ type: "result", ok: true, payload }); + } catch (err) { + respond({ type: "error", error: String(err) }); + } + } catch (err) { + respond({ type: "error", error: `parse error: ${String(err)}` }); + } + }); + + const extractPayload = (logs: string[]) => { + for (const entry of logs.slice().reverse()) { + try { + const parsed = JSON.parse(entry); + if (parsed && typeof parsed === "object" && "payloads" in parsed) { + return parsed; + } + } catch { + // non-JSON log, ignore + } + } + return null; + }; + + const close = () => { + stopHeartbeat(); + stopAgent(); + rl.close(); + }; + + return { close }; +}