import { type ChildProcessWithoutNullStreams, spawn } from "node:child_process"; import { createInterface, type Interface } from "node:readline"; import type { RuntimeEnv } from "../runtime.js"; import { resolveUserPath } from "../utils.js"; export type IMessageRpcError = { code?: number; message?: string; data?: unknown; }; export type IMessageRpcResponse = { jsonrpc?: string; id?: string | number | null; result?: T; error?: IMessageRpcError; method?: string; params?: unknown; }; export type IMessageRpcNotification = { method: string; params?: unknown; }; export type IMessageRpcClientOptions = { cliPath?: string; dbPath?: string; runtime?: RuntimeEnv; onNotification?: (msg: IMessageRpcNotification) => void; }; type PendingRequest = { resolve: (value: unknown) => void; reject: (error: Error) => void; timer?: NodeJS.Timeout; }; export class IMessageRpcClient { private readonly cliPath: string; private readonly dbPath?: string; private readonly runtime?: RuntimeEnv; private readonly onNotification?: (msg: IMessageRpcNotification) => void; private readonly pending = new Map(); private readonly closed: Promise; private closedResolve: (() => void) | null = null; private child: ChildProcessWithoutNullStreams | null = null; private reader: Interface | null = null; private nextId = 1; constructor(opts: IMessageRpcClientOptions = {}) { this.cliPath = opts.cliPath?.trim() || "imsg"; this.dbPath = opts.dbPath?.trim() ? resolveUserPath(opts.dbPath) : undefined; this.runtime = opts.runtime; this.onNotification = opts.onNotification; this.closed = new Promise((resolve) => { this.closedResolve = resolve; }); } async start(): Promise { if (this.child) return; const args = ["rpc"]; if (this.dbPath) { args.push("--db", this.dbPath); } const child = spawn(this.cliPath, args, { stdio: ["pipe", "pipe", "pipe"], }); this.child = child; this.reader = createInterface({ input: child.stdout }); this.reader.on("line", (line) => { const trimmed = line.trim(); if (!trimmed) return; this.handleLine(trimmed); }); child.stderr?.on("data", (chunk) => { const lines = chunk.toString().split(/\r?\n/); for (const line of lines) { if (!line.trim()) continue; this.runtime?.error?.(`imsg rpc: ${line.trim()}`); } }); child.on("error", (err) => { this.failAll(err instanceof Error ? err : new Error(String(err))); this.closedResolve?.(); }); child.on("close", (code, signal) => { if (code !== 0 && code !== null) { const reason = signal ? `signal ${signal}` : `code ${code}`; this.failAll(new Error(`imsg rpc exited (${reason})`)); } else { this.failAll(new Error("imsg rpc closed")); } this.closedResolve?.(); }); } async stop(): Promise { if (!this.child) return; this.reader?.close(); this.reader = null; this.child.stdin?.end(); const child = this.child; this.child = null; await Promise.race([ this.closed, new Promise((resolve) => { setTimeout(() => { if (!child.killed) child.kill("SIGTERM"); resolve(); }, 500); }), ]); } async waitForClose(): Promise { await this.closed; } async request( method: string, params?: Record, opts?: { timeoutMs?: number }, ): Promise { if (!this.child || !this.child.stdin) { throw new Error("imsg rpc not running"); } const id = this.nextId++; const payload = { jsonrpc: "2.0", id, method, params: params ?? {}, }; const line = `${JSON.stringify(payload)}\n`; const timeoutMs = opts?.timeoutMs ?? 10_000; const response = new Promise((resolve, reject) => { const key = String(id); const timer = timeoutMs > 0 ? setTimeout(() => { this.pending.delete(key); reject(new Error(`imsg rpc timeout (${method})`)); }, timeoutMs) : undefined; this.pending.set(key, { resolve: (value) => resolve(value as T), reject, timer, }); }); this.child.stdin.write(line); return await response; } private handleLine(line: string) { let parsed: IMessageRpcResponse; try { parsed = JSON.parse(line) as IMessageRpcResponse; } catch (err) { const detail = err instanceof Error ? err.message : String(err); this.runtime?.error?.(`imsg rpc: failed to parse ${line}: ${detail}`); return; } if (parsed.id !== undefined && parsed.id !== null) { const key = String(parsed.id); const pending = this.pending.get(key); if (!pending) return; if (pending.timer) clearTimeout(pending.timer); this.pending.delete(key); if (parsed.error) { const msg = parsed.error.message ?? "imsg rpc error"; pending.reject(new Error(msg)); return; } pending.resolve(parsed.result); return; } if (parsed.method) { this.onNotification?.({ method: parsed.method, params: parsed.params, }); } } private failAll(err: Error) { for (const [key, pending] of this.pending.entries()) { if (pending.timer) clearTimeout(pending.timer); pending.reject(err); this.pending.delete(key); } } } export async function createIMessageRpcClient( opts: IMessageRpcClientOptions = {}, ): Promise { const client = new IMessageRpcClient(opts); await client.start(); return client; }