RPC: extract stdio loop and tests

This commit is contained in:
Peter Steinberger
2025-12-09 02:37:04 +00:00
parent c568284f1b
commit 59a2cbefcb
3 changed files with 288 additions and 155 deletions

View File

@@ -1,17 +1,13 @@
import chalk from "chalk";
import { Command } from "commander";
import { agentCommand } from "../commands/agent.js";
import { getHealthSnapshot, healthCommand, type HealthSummary } from "../commands/health.js";
import { healthCommand } from "../commands/health.js";
import { sendCommand } from "../commands/send.js";
import { sessionsCommand } from "../commands/sessions.js";
import { getStatusSummary, statusCommand, type StatusSummary } from "../commands/status.js";
import { statusCommand } from "../commands/status.js";
import { loadConfig } from "../config/config.js";
import { danger, info, setVerbose } from "../globals.js";
import { startControlChannel } from "../infra/control-channel.js";
import {
getLastHeartbeatEvent,
onHeartbeatEvent,
} from "../infra/heartbeat-events.js";
import { getResolvedLoggerSettings } from "../logging.js";
import {
loginWeb,
@@ -22,7 +18,8 @@ import {
setHeartbeatsEnabled,
type WebMonitorTuning,
} from "../provider-web.js";
import { defaultRuntime, type RuntimeEnv } from "../runtime.js";
import { runRpcLoop } from "../rpc/loop.js";
import { defaultRuntime } from "../runtime.js";
import { VERSION } from "../version.js";
import {
resolveHeartbeatSeconds,
@@ -33,12 +30,6 @@ import {
startWebChatServer,
} from "../webchat/server.js";
import { createDefaultDeps, logWebSelfId } from "./deps.js";
import { onAgentEvent } from "../infra/agent-events.js";
import { enqueueSystemEvent } from "../infra/system-events.js";
import {
listSystemPresence,
updateSystemPresence,
} from "../infra/system-presence.js";
export function buildProgram() {
const program = new Command();
@@ -241,148 +232,8 @@ Examples:
.command("rpc")
.description("Run stdin/stdout JSON RPC loop for agent sends")
.action(async () => {
const { createInterface } = await import("node:readline");
const rl = createInterface({ input: process.stdin, crlfDelay: Infinity });
const respond = (obj: unknown) => {
try {
console.log(JSON.stringify(obj));
} catch (err) {
console.error(JSON.stringify({ type: "error", error: String(err) }));
}
};
const forwardHeartbeat = (payload: unknown) => {
respond({ type: "event", event: "heartbeat", payload });
};
const forwardAgent = (payload: unknown) => {
respond({ type: "event", event: "agent", payload });
};
const latest = getLastHeartbeatEvent();
if (latest) forwardHeartbeat(latest);
const stopBus = onHeartbeatEvent(forwardHeartbeat);
const stopAgentBus = onAgentEvent(forwardAgent);
rl.on("line", async (line: string) => {
if (!line.trim()) return;
try {
const cmd = JSON.parse(line);
if (cmd.type === "status") {
respond({ type: "result", ok: true });
return;
}
if (cmd.type === "set-heartbeats") {
setHeartbeatsEnabled(Boolean(cmd.enabled));
respond({ type: "result", ok: true });
return;
}
if (cmd.type === "control-request" && cmd.id && cmd.method) {
const id = String(cmd.id);
const method = String(cmd.method);
const params = (cmd.params ?? {}) as Record<string, unknown>;
const controlRespond = (ok: boolean, payload?: unknown, error?: string) =>
respond({ type: "control-response", id, ok, payload, error });
try {
if (method === "health") {
const timeoutMs = typeof params.timeoutMs === "number" ? params.timeoutMs : undefined;
const payload = await getHealthSnapshot(timeoutMs);
controlRespond(true, payload satisfies HealthSummary);
return;
}
if (method === "status") {
const payload = await getStatusSummary();
controlRespond(true, payload satisfies StatusSummary);
return;
}
if (method === "last-heartbeat") {
controlRespond(true, getLastHeartbeatEvent());
return;
}
if (method === "set-heartbeats") {
setHeartbeatsEnabled(Boolean(params.enabled));
controlRespond(true, { ok: true });
return;
}
if (method === "system-event") {
const text = String(params.text ?? "").trim();
if (text) {
enqueueSystemEvent(text);
updateSystemPresence(text);
}
controlRespond(true, { ok: true });
return;
}
if (method === "system-presence") {
controlRespond(true, listSystemPresence());
return;
}
controlRespond(false, undefined, `unknown control method: ${method}`);
} catch (err) {
controlRespond(false, undefined, String(err));
}
return;
}
if (cmd.type !== "send" || !cmd.text) {
respond({ type: "error", error: "unsupported command" });
return;
}
const logs: string[] = [];
const runtime: RuntimeEnv = {
log: (msg: string) => logs.push(String(msg)),
error: (msg: string) => logs.push(String(msg)),
exit: (_code: number): never => {
throw new Error("agentCommand requested exit");
},
};
const opts: {
message: string;
to?: string;
sessionId?: string;
thinking?: string;
deliver?: boolean;
json: boolean;
} = {
message: String(cmd.text),
to: cmd.to ? String(cmd.to) : undefined,
sessionId: cmd.session ? String(cmd.session) : undefined,
thinking: cmd.thinking ? String(cmd.thinking) : undefined,
deliver: Boolean(cmd.deliver),
json: true,
};
try {
await agentCommand(opts, runtime, createDefaultDeps());
const payload = extractPayload(logs);
respond({ type: "result", ok: true, payload });
} catch (err) {
respond({ type: "error", error: String(err) });
}
} catch (err) {
respond({ type: "error", error: `parse error: ${String(err)}` });
}
});
const extractPayload = (logs: string[]) => {
for (const entry of logs.slice().reverse()) {
try {
const parsed = JSON.parse(entry);
if (parsed && typeof parsed === "object" && "payloads" in parsed) {
return parsed;
}
} catch {
// non-JSON log, ignore
}
}
return null;
};
await new Promise(() => {});
stopBus();
stopAgentBus();
await runRpcLoop({ input: process.stdin, output: process.stdout });
await new Promise<never>(() => {});
});
program

100
src/rpc/loop.test.ts Normal file
View File

@@ -0,0 +1,100 @@
import { PassThrough } from "node:stream";
import { beforeEach, describe, expect, it, vi } from "vitest";
import { runRpcLoop } from "./loop.js";
vi.mock("../commands/health.js", () => ({
getHealthSnapshot: vi.fn(async () => ({ heartbeatSeconds: 42 })),
}));
vi.mock("../commands/status.js", () => ({
getStatusSummary: vi.fn(async () => ({ providerSummary: "ok" })),
}));
vi.mock("../infra/heartbeat-events.js", () => ({
getLastHeartbeatEvent: vi.fn(() => ({ ts: 1, status: "sent" })),
onHeartbeatEvent: vi.fn((cb: (p: unknown) => void) => {
// return stopper
return () => void cb({});
}),
}));
vi.mock("../infra/agent-events.js", () => ({
onAgentEvent: vi.fn((_cb: (p: unknown) => void) => () => {}),
}));
vi.mock("../infra/system-presence.js", () => ({
enqueueSystemEvent: vi.fn(),
updateSystemPresence: vi.fn(),
listSystemPresence: vi.fn(() => [{ text: "hi" }]),
}));
vi.mock("../commands/agent.js", () => ({
agentCommand: vi.fn(
async (_opts, runtime: { log: (msg: string) => void }) => {
// Emit a fake payload log entry the loop will pick up
runtime.log(JSON.stringify({ payloads: [{ text: "ok" }] }));
},
),
}));
vi.mock("../cli/deps.js", () => ({
createDefaultDeps: vi.fn(() => ({})),
}));
describe("runRpcLoop", () => {
let input: PassThrough;
let output: PassThrough;
let lines: unknown[];
beforeEach(() => {
input = new PassThrough();
output = new PassThrough();
lines = [];
output.on("data", (chunk) => {
const str = chunk.toString();
for (const line of str.split("\n").filter(Boolean)) {
lines.push(JSON.parse(line));
}
});
});
it("responds to control-request health", async () => {
const loop = await runRpcLoop({ input, output });
input.write('{"type":"control-request","id":"1","method":"health"}\n');
await new Promise((r) => setTimeout(r, 50));
loop.close();
expect(
lines.find((l) => l.type === "control-response" && l.id === "1"),
).toMatchObject({
ok: true,
});
});
it("forwards initial heartbeat event", async () => {
const loop = await runRpcLoop({ input, output });
await new Promise((r) => setTimeout(r, 20));
loop.close();
expect(lines[0]).toMatchObject({ type: "event", event: "heartbeat" });
});
it("handles send via agentCommand", async () => {
const loop = await runRpcLoop({ input, output });
input.write('{"type":"send","text":"hi"}\n');
await new Promise((r) => setTimeout(r, 50));
loop.close();
expect(lines.find((l) => l.type === "result" && l.ok)).toBeTruthy();
});
it("routes system-event", async () => {
const loop = await runRpcLoop({ input, output });
input.write(
'{"type":"control-request","id":"sys","method":"system-event","params":{"text":"ping"}}\n',
);
await new Promise((r) => setTimeout(r, 50));
loop.close();
const resp = lines.find((l) => l.id === "sys");
expect(resp).toMatchObject({ ok: true, type: "control-response" });
});
});

182
src/rpc/loop.ts Normal file
View File

@@ -0,0 +1,182 @@
import { createInterface } from "node:readline";
import type { Readable, Writable } from "node:stream";
import { createDefaultDeps } from "../cli/deps.js";
import { agentCommand } from "../commands/agent.js";
import { getHealthSnapshot, type HealthSummary } from "../commands/health.js";
import { getStatusSummary, type StatusSummary } from "../commands/status.js";
import { onAgentEvent } from "../infra/agent-events.js";
import {
getLastHeartbeatEvent,
onHeartbeatEvent,
} from "../infra/heartbeat-events.js";
import {
enqueueSystemEvent,
listSystemPresence,
updateSystemPresence,
} from "../infra/system-presence.js";
import { setHeartbeatsEnabled } from "../provider-web.js";
export type RpcLoopHandles = { close: () => void };
/**
* Run the stdin/stdout RPC loop used by `clawdis rpc`.
* Exposed for testing and reuse.
*/
export async function runRpcLoop(io: {
input: Readable;
output: Writable;
}): Promise<RpcLoopHandles> {
const rl = createInterface({ input: io.input, crlfDelay: Infinity });
const respond = (obj: unknown) => {
try {
io.output.write(`${JSON.stringify(obj)}\n`);
} catch (err) {
io.output.write(
`${JSON.stringify({ type: "error", error: String(err) })}\n`,
);
}
};
const forwardHeartbeat = (payload: unknown) => {
respond({ type: "event", event: "heartbeat", payload });
};
const forwardAgent = (payload: unknown) => {
respond({ type: "event", event: "agent", payload });
};
const latest = getLastHeartbeatEvent();
if (latest) forwardHeartbeat(latest);
const stopHeartbeat = onHeartbeatEvent(forwardHeartbeat);
const stopAgent = onAgentEvent(forwardAgent);
rl.on("line", async (line: string) => {
if (!line.trim()) return;
try {
const cmd = JSON.parse(line);
if (cmd.type === "status") {
respond({ type: "result", ok: true });
return;
}
if (cmd.type === "set-heartbeats") {
setHeartbeatsEnabled(Boolean(cmd.enabled));
respond({ type: "result", ok: true });
return;
}
if (cmd.type === "control-request" && cmd.id && cmd.method) {
const id = String(cmd.id);
const method = String(cmd.method);
const params = (cmd.params ?? {}) as Record<string, unknown>;
const controlRespond = (
ok: boolean,
payload?: unknown,
error?: string,
) => respond({ type: "control-response", id, ok, payload, error });
try {
if (method === "health") {
const timeoutMs =
typeof params.timeoutMs === "number"
? params.timeoutMs
: undefined;
const payload = await getHealthSnapshot(timeoutMs);
controlRespond(true, payload satisfies HealthSummary);
return;
}
if (method === "status") {
const payload = await getStatusSummary();
controlRespond(true, payload satisfies StatusSummary);
return;
}
if (method === "last-heartbeat") {
controlRespond(true, getLastHeartbeatEvent());
return;
}
if (method === "set-heartbeats") {
setHeartbeatsEnabled(Boolean(params.enabled));
controlRespond(true, { ok: true });
return;
}
if (method === "system-event") {
const text = String(params.text ?? "").trim();
if (text) {
enqueueSystemEvent(text);
updateSystemPresence(text);
}
controlRespond(true, { ok: true });
return;
}
if (method === "system-presence") {
controlRespond(true, listSystemPresence());
return;
}
controlRespond(false, undefined, `unknown control method: ${method}`);
} catch (err) {
controlRespond(false, undefined, String(err));
}
return;
}
if (cmd.type !== "send" || !cmd.text) {
respond({ type: "error", error: "unsupported command" });
return;
}
const logs: string[] = [];
const runtime: RuntimeEnv = {
log: (msg: string) => logs.push(String(msg)),
error: (msg: string) => logs.push(String(msg)),
exit: (_code: number): never => {
throw new Error("agentCommand requested exit");
},
};
const opts: {
message: string;
to?: string;
sessionId?: string;
thinking?: string;
deliver?: boolean;
json: boolean;
} = {
message: String(cmd.text),
to: cmd.to ? String(cmd.to) : undefined,
sessionId: cmd.session ? String(cmd.session) : undefined,
thinking: cmd.thinking ? String(cmd.thinking) : undefined,
deliver: Boolean(cmd.deliver),
json: true,
};
try {
await agentCommand(opts, runtime, createDefaultDeps());
const payload = extractPayload(logs);
respond({ type: "result", ok: true, payload });
} catch (err) {
respond({ type: "error", error: String(err) });
}
} catch (err) {
respond({ type: "error", error: `parse error: ${String(err)}` });
}
});
const extractPayload = (logs: string[]) => {
for (const entry of logs.slice().reverse()) {
try {
const parsed = JSON.parse(entry);
if (parsed && typeof parsed === "object" && "payloads" in parsed) {
return parsed;
}
} catch {
// non-JSON log, ignore
}
}
return null;
};
const close = () => {
stopHeartbeat();
stopAgent();
rl.close();
};
return { close };
}