From 22996854f79c248e94d51608c4018f690c008bc8 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Mon, 8 Dec 2025 21:50:24 +0100 Subject: [PATCH] relay: add control channel and heartbeat stream --- src/cli/program.ts | 35 ++++-- src/commands/health.ts | 57 ++++++---- src/commands/status.ts | 67 ++++++----- src/infra/control-channel.ts | 189 ++++++++++++++++++++++++++++++++ src/infra/heartbeat-events.ts | 35 ++++++ src/process/heartbeat-events.ts | 73 ------------ src/web/auto-reply.ts | 9 +- 7 files changed, 330 insertions(+), 135 deletions(-) create mode 100644 src/infra/control-channel.ts create mode 100644 src/infra/heartbeat-events.ts delete mode 100644 src/process/heartbeat-events.ts diff --git a/src/cli/program.ts b/src/cli/program.ts index d45d5da7c..0e5f1a99d 100644 --- a/src/cli/program.ts +++ b/src/cli/program.ts @@ -7,11 +7,12 @@ import { sessionsCommand } from "../commands/sessions.js"; import { statusCommand } from "../commands/status.js"; import { loadConfig } from "../config/config.js"; import { danger, info, setVerbose } from "../globals.js"; -import { getResolvedLoggerSettings } from "../logging.js"; +import { startControlChannel } from "../infra/control-channel.js"; import { - readLatestHeartbeat, - tailHeartbeatEvents, -} from "../process/heartbeat-events.js"; + getLastHeartbeatEvent, + onHeartbeatEvent, +} from "../infra/heartbeat-events.js"; +import { getResolvedLoggerSettings } from "../logging.js"; import { loginWeb, logoutWeb, @@ -249,9 +250,9 @@ Examples: respond({ type: "event", event: "heartbeat", payload }); }; - const latest = readLatestHeartbeat(); + const latest = getLastHeartbeatEvent(); if (latest) forwardHeartbeat(latest); - const stopTail = tailHeartbeatEvents(forwardHeartbeat); + const stopBus = onHeartbeatEvent(forwardHeartbeat); rl.on("line", async (line: string) => { if (!line.trim()) return; @@ -324,7 +325,7 @@ Examples: await new Promise(() => {}); - stopTail(); + stopBus(); }); program @@ -576,6 +577,24 @@ Examples: const runners: Array> = []; + let control = null as Awaited< + ReturnType + > | null; + try { + control = await startControlChannel( + { + setHeartbeats: async (enabled: boolean) => { + setHeartbeatsEnabled(enabled); + }, + }, + { runtime: defaultRuntime }, + ); + } catch (err) { + defaultRuntime.error( + danger(`Control channel failed to start: ${String(err)}`), + ); + } + if (startWeb) { const webTuning: WebMonitorTuning = {}; if (webHeartbeat !== undefined) @@ -661,6 +680,8 @@ Examples: } catch (err) { defaultRuntime.error(danger(`Relay failed: ${String(err)}`)); defaultRuntime.exit(1); + } finally { + if (control) await control.close(); } }); diff --git a/src/commands/health.ts b/src/commands/health.ts index 9539ce052..f51deadbc 100644 --- a/src/commands/health.ts +++ b/src/commands/health.ts @@ -22,7 +22,7 @@ type HealthConnect = { elapsedMs: number; }; -type HealthSummary = { +export type HealthSummary = { ts: number; durationMs: number; web: { @@ -77,10 +77,9 @@ async function probeWebConnect(timeoutMs: number): Promise { } } -export async function healthCommand( - opts: { json?: boolean; timeoutMs?: number }, - runtime: RuntimeEnv, -) { +export async function getHealthSnapshot( + timeoutMs?: number, +): Promise { const cfg = loadConfig(); const linked = await webAuthExists(); const authAgeMs = getWebAuthAgeMs(); @@ -101,8 +100,8 @@ export async function healthCommand( const ipcExists = Boolean(ipcPath) && fs.existsSync(ipcPath); const start = Date.now(); - const timeoutMs = Math.max(1000, opts.timeoutMs ?? DEFAULT_TIMEOUT_MS); - const connect = linked ? await probeWebConnect(timeoutMs) : undefined; + const cappedTimeout = Math.max(1000, timeoutMs ?? DEFAULT_TIMEOUT_MS); + const connect = linked ? await probeWebConnect(cappedTimeout) : undefined; const summary: HealthSummary = { ts: Date.now(), @@ -117,39 +116,55 @@ export async function healthCommand( ipc: { path: ipcPath, exists: ipcExists }, }; - const fatal = !linked || (connect && !connect.ok); + return summary; +} + +export async function healthCommand( + opts: { json?: boolean; timeoutMs?: number }, + runtime: RuntimeEnv, +) { + const summary = await getHealthSnapshot(opts.timeoutMs); + const fatal = + !summary.web.linked || (summary.web.connect && !summary.web.connect.ok); if (opts.json) { runtime.log(JSON.stringify(summary, null, 2)); } else { runtime.log( - linked - ? `Web: linked (auth age ${authAgeMs ? `${Math.round(authAgeMs / 60000)}m` : "unknown"})` + summary.web.linked + ? `Web: linked (auth age ${summary.web.authAgeMs ? `${Math.round(summary.web.authAgeMs / 60000)}m` : "unknown"})` : "Web: not linked (run clawdis login)", ); - if (linked) { + if (summary.web.linked) { logWebSelfId(runtime, true); } - if (connect) { - const base = connect.ok - ? info(`Connect: ok (${connect.elapsedMs}ms)`) - : `Connect: failed (${connect.status ?? "unknown"})`; - runtime.log(base + (connect.error ? ` - ${connect.error}` : "")); + if (summary.web.connect) { + const base = summary.web.connect.ok + ? info(`Connect: ok (${summary.web.connect.elapsedMs}ms)`) + : `Connect: failed (${summary.web.connect.status ?? "unknown"})`; + runtime.log( + base + + (summary.web.connect.error ? ` - ${summary.web.connect.error}` : ""), + ); } - runtime.log(info(`Heartbeat interval: ${heartbeatSeconds}s`)); + runtime.log(info(`Heartbeat interval: ${summary.heartbeatSeconds}s`)); runtime.log( - info(`Session store: ${storePath} (${sessions.length} entries)`), + info( + `Session store: ${summary.sessions.path} (${summary.sessions.count} entries)`, + ), ); - if (recent.length > 0) { + if (summary.sessions.recent.length > 0) { runtime.log("Recent sessions:"); - for (const r of recent) { + for (const r of summary.sessions.recent) { runtime.log( `- ${r.key} (${r.updatedAt ? `${Math.round((Date.now() - r.updatedAt) / 60000)}m ago` : "no activity"})`, ); } } runtime.log( - info(`IPC socket: ${ipcExists ? "present" : "missing"} (${ipcPath})`), + info( + `IPC socket: ${summary.ipc.exists ? "present" : "missing"} (${summary.ipc.path})`, + ), ); } diff --git a/src/commands/status.ts b/src/commands/status.ts index c7b52f1bc..fd2826bd2 100644 --- a/src/commands/status.ts +++ b/src/commands/status.ts @@ -9,21 +9,21 @@ import { webAuthExists, } from "../web/session.js"; -const formatAge = (ms: number | null | undefined) => { - if (!ms || ms < 0) return "unknown"; - const minutes = Math.round(ms / 60_000); - if (minutes < 1) return "just now"; - if (minutes < 60) return `${minutes}m ago`; - const hours = Math.round(minutes / 60); - if (hours < 48) return `${hours}h ago`; - const days = Math.round(hours / 24); - return `${days}d ago`; +export type StatusSummary = { + web: { linked: boolean; authAgeMs: number | null }; + heartbeatSeconds: number; + sessions: { + path: string; + count: number; + recent: Array<{ + key: string; + updatedAt: number | null; + age: number | null; + }>; + }; }; -export async function statusCommand( - opts: { json?: boolean }, - runtime: RuntimeEnv, -) { +export async function getStatusSummary(): Promise { const cfg = loadConfig(); const linked = await webAuthExists(); const authAgeMs = getWebAuthAgeMs(); @@ -41,18 +41,33 @@ export async function statusCommand( age: s.updatedAt ? Date.now() - s.updatedAt : null, })); - const summary = { - web: { - linked, - authAgeMs, - }, + return { + web: { linked, authAgeMs }, heartbeatSeconds, sessions: { path: storePath, count: sessions.length, recent, }, - } as const; + }; +} + +const formatAge = (ms: number | null | undefined) => { + if (!ms || ms < 0) return "unknown"; + const minutes = Math.round(ms / 60_000); + if (minutes < 1) return "just now"; + if (minutes < 60) return `${minutes}m ago`; + const hours = Math.round(minutes / 60); + if (hours < 48) return `${hours}h ago`; + const days = Math.round(hours / 24); + return `${days}d ago`; +}; + +export async function statusCommand( + opts: { json?: boolean }, + runtime: RuntimeEnv, +) { + const summary = await getStatusSummary(); if (opts.json) { runtime.log(JSON.stringify(summary, null, 2)); @@ -60,17 +75,17 @@ export async function statusCommand( } runtime.log( - `Web session: ${linked ? "linked" : "not linked"}${linked ? ` (last refreshed ${formatAge(authAgeMs)})` : ""}`, + `Web session: ${summary.web.linked ? "linked" : "not linked"}${summary.web.linked ? ` (last refreshed ${formatAge(summary.web.authAgeMs)})` : ""}`, ); - if (linked) { + if (summary.web.linked) { logWebSelfId(runtime, true); } - runtime.log(info(`Heartbeat: ${heartbeatSeconds}s`)); - runtime.log(info(`Session store: ${storePath}`)); - runtime.log(info(`Active sessions: ${sessions.length}`)); - if (recent.length > 0) { + runtime.log(info(`Heartbeat: ${summary.heartbeatSeconds}s`)); + runtime.log(info(`Session store: ${summary.sessions.path}`)); + runtime.log(info(`Active sessions: ${summary.sessions.count}`)); + if (summary.sessions.recent.length > 0) { runtime.log("Recent sessions:"); - for (const r of recent) { + for (const r of summary.sessions.recent) { runtime.log( `- ${r.key} (${r.updatedAt ? formatAge(Date.now() - r.updatedAt) : "no activity"})`, ); diff --git a/src/infra/control-channel.ts b/src/infra/control-channel.ts new file mode 100644 index 000000000..b05f52627 --- /dev/null +++ b/src/infra/control-channel.ts @@ -0,0 +1,189 @@ +import net from "node:net"; + +import { getHealthSnapshot, type HealthSummary } from "../commands/health.js"; +import { getStatusSummary, type StatusSummary } from "../commands/status.js"; +import { defaultRuntime, type RuntimeEnv } from "../runtime.js"; +import { + emitHeartbeatEvent, + getLastHeartbeatEvent, + type HeartbeatEventPayload, + onHeartbeatEvent, +} from "./heartbeat-events.js"; + +type ControlRequest = { + type: "request"; + id: string; + method: string; + params?: Record; +}; + +type ControlResponse = { + type: "response"; + id: string; + ok: boolean; + payload?: unknown; + error?: string; +}; + +type ControlEvent = { + type: "event"; + event: string; + payload: unknown; +}; + +type Handlers = { + setHeartbeats?: (enabled: boolean) => Promise | void; +}; + +type ControlServer = { + close: () => Promise; + broadcastHeartbeat: (evt: HeartbeatEventPayload) => void; +}; + +const DEFAULT_PORT = 18789; + +export async function startControlChannel( + handlers: Handlers = {}, + opts: { port?: number; runtime?: RuntimeEnv } = {}, +): Promise { + const port = opts.port ?? DEFAULT_PORT; + const runtime = opts.runtime ?? defaultRuntime; + + const clients = new Set(); + + const server = net.createServer((socket) => { + socket.setEncoding("utf8"); + clients.add(socket); + let buffer = ""; + + socket.on("data", (chunk) => { + buffer += chunk; + const lines = buffer.split(/\r?\n/); + buffer = lines.pop() ?? ""; + for (const line of lines) { + handleLine(socket, line.trim()); + } + }); + + socket.on("error", () => { + /* ignore */ + }); + + socket.on("close", () => { + clients.delete(socket); + }); + }); + + await new Promise((resolve, reject) => { + server.once("error", reject); + server.listen(port, "127.0.0.1", () => resolve()); + }); + + const stopHeartbeat = onHeartbeatEvent((evt) => broadcast("heartbeat", evt)); + + const handleLine = async (socket: net.Socket, line: string) => { + if (!line) return; + let parsed: ControlRequest; + try { + parsed = JSON.parse(line) as ControlRequest; + } catch (err) { + return write(socket, { + type: "response", + id: "", + ok: false, + error: `parse error: ${String(err)}`, + }); + } + + if (parsed.type !== "request" || !parsed.id) { + return write(socket, { + type: "response", + id: parsed.id ?? "", + ok: false, + error: "unsupported frame", + }); + } + + const respond = (payload: unknown, ok = true, error?: string) => + write(socket, { + type: "response", + id: parsed.id, + ok, + payload: ok ? payload : undefined, + error: ok ? undefined : error, + }); + + try { + switch (parsed.method) { + case "ping": { + respond({ pong: true, ts: Date.now() }); + break; + } + case "health": { + const summary = await getHealthSnapshot(); + respond(summary satisfies HealthSummary); + break; + } + case "status": { + const summary = await getStatusSummary(); + respond(summary satisfies StatusSummary); + break; + } + case "last-heartbeat": { + respond(getLastHeartbeatEvent()); + break; + } + case "set-heartbeats": { + const enabled = Boolean(parsed.params?.enabled); + if (handlers.setHeartbeats) await handlers.setHeartbeats(enabled); + respond({ ok: true }); + break; + } + default: + respond(undefined, false, `unknown method: ${parsed.method}`); + break; + } + } catch (err) { + respond(undefined, false, String(err)); + } + }; + + const write = (socket: net.Socket, frame: ControlResponse | ControlEvent) => { + try { + socket.write(`${JSON.stringify(frame)}\n`); + } catch { + // ignore + } + }; + + const broadcast = (event: string, payload: unknown) => { + const frame: ControlEvent = { type: "event", event, payload }; + const line = `${JSON.stringify(frame)}\n`; + for (const client of [...clients]) { + try { + client.write(line); + } catch { + clients.delete(client); + } + } + }; + + runtime.log?.(`control channel listening on 127.0.0.1:${port}`); + + return { + close: async () => { + stopHeartbeat(); + await new Promise((resolve) => server.close(() => resolve())); + for (const client of [...clients]) { + client.destroy(); + } + clients.clear(); + }, + broadcastHeartbeat: (evt: HeartbeatEventPayload) => { + emitHeartbeatEvent(evt); + broadcast("heartbeat", evt); + }, + }; +} + +export { HeartbeatEventPayload } from "./heartbeat-events.js"; diff --git a/src/infra/heartbeat-events.ts b/src/infra/heartbeat-events.ts new file mode 100644 index 000000000..4e42384cb --- /dev/null +++ b/src/infra/heartbeat-events.ts @@ -0,0 +1,35 @@ +export type HeartbeatEventPayload = { + ts: number; + status: "sent" | "ok-empty" | "ok-token" | "skipped" | "failed"; + to?: string; + preview?: string; + durationMs?: number; + hasMedia?: boolean; + reason?: string; +}; + +let lastHeartbeat: HeartbeatEventPayload | null = null; +const listeners = new Set<(evt: HeartbeatEventPayload) => void>(); + +export function emitHeartbeatEvent(evt: Omit) { + const enriched: HeartbeatEventPayload = { ts: Date.now(), ...evt }; + lastHeartbeat = enriched; + for (const listener of listeners) { + try { + listener(enriched); + } catch { + /* ignore */ + } + } +} + +export function onHeartbeatEvent( + listener: (evt: HeartbeatEventPayload) => void, +): () => void { + listeners.add(listener); + return () => listeners.delete(listener); +} + +export function getLastHeartbeatEvent(): HeartbeatEventPayload | null { + return lastHeartbeat; +} diff --git a/src/process/heartbeat-events.ts b/src/process/heartbeat-events.ts deleted file mode 100644 index 9307a37bb..000000000 --- a/src/process/heartbeat-events.ts +++ /dev/null @@ -1,73 +0,0 @@ -import fs from "node:fs"; -import os from "node:os"; -import path from "node:path"; -import readline from "node:readline"; - -export type HeartbeatEvent = { - type: "heartbeat"; - ts: number; // epoch ms - status: "sent" | "ok-empty" | "ok-token" | "skipped" | "failed"; - to?: string; - preview?: string; - durationMs?: number; - hasMedia?: boolean; - reason?: string; -}; - -const EVENT_FILENAME = "heartbeat-events.jsonl"; -const STATE_FILENAME = "heartbeat-state.json"; - -function baseDir() { - const dir = path.join(os.homedir(), ".clawdis"); - if (!fs.existsSync(dir)) fs.mkdirSync(dir, { recursive: true }); - return dir; -} - -export function heartbeatEventsPath() { - return path.join(baseDir(), EVENT_FILENAME); -} - -export function heartbeatStatePath() { - return path.join(baseDir(), STATE_FILENAME); -} - -export function writeHeartbeatEvent(evt: HeartbeatEvent) { - const line = JSON.stringify(evt); - fs.appendFileSync(heartbeatEventsPath(), `${line}\n`, { encoding: "utf8" }); - fs.writeFileSync(heartbeatStatePath(), line, { encoding: "utf8" }); -} - -export function readLatestHeartbeat(): HeartbeatEvent | null { - try { - const txt = fs.readFileSync(heartbeatStatePath(), "utf8"); - return JSON.parse(txt) as HeartbeatEvent; - } catch { - return null; - } -} - -// Tail the events file and invoke the callback for every new parsed event. -export function tailHeartbeatEvents(onEvent: (evt: HeartbeatEvent) => void) { - const file = heartbeatEventsPath(); - if (!fs.existsSync(file)) { - fs.writeFileSync(file, "", { encoding: "utf8" }); - } - - const stream = fs.createReadStream(file, { encoding: "utf8", flags: "a+" }); - const rl = readline.createInterface({ input: stream }); - rl.on("line", (line) => { - const trimmed = line.trim(); - if (!trimmed) return; - try { - const parsed = JSON.parse(trimmed) as HeartbeatEvent; - if (parsed?.type === "heartbeat") onEvent(parsed); - } catch { - // ignore malformed - } - }); - - return () => { - rl.close(); - stream.close(); - }; -} diff --git a/src/web/auto-reply.ts b/src/web/auto-reply.ts index 031be5740..90d40cb09 100644 --- a/src/web/auto-reply.ts +++ b/src/web/auto-reply.ts @@ -11,13 +11,10 @@ import { saveSessionStore, } from "../config/sessions.js"; import { danger, info, isVerbose, logVerbose, success } from "../globals.js"; +import { emitHeartbeatEvent } from "../infra/heartbeat-events.js"; import { logInfo } from "../logger.js"; import { getChildLogger } from "../logging.js"; import { getQueueSize } from "../process/command-queue.js"; -import { - type HeartbeatEvent, - writeHeartbeatEvent, -} from "../process/heartbeat-events.js"; import { defaultRuntime, type RuntimeEnv } from "../runtime.js"; import { jidToE164, normalizeE164 } from "../utils.js"; import { monitorWebInbox } from "./inbound.js"; @@ -82,10 +79,6 @@ const formatDuration = (ms: number) => const DEFAULT_REPLY_HEARTBEAT_MINUTES = 30; export const HEARTBEAT_TOKEN = "HEARTBEAT_OK"; - -function emitHeartbeatEvent(evt: Omit) { - writeHeartbeatEvent({ type: "heartbeat", ts: Date.now(), ...evt }); -} export const HEARTBEAT_PROMPT = "HEARTBEAT /think:high"; function elide(text?: string, limit = 400) {