diff --git a/src/gateway/server.ts b/src/gateway/server.ts index 62563ad0f..052add7b1 100644 --- a/src/gateway/server.ts +++ b/src/gateway/server.ts @@ -6,7 +6,7 @@ import chalk from "chalk"; import { type WebSocket, WebSocketServer } from "ws"; import { createDefaultDeps } from "../cli/deps.js"; import { agentCommand } from "../commands/agent.js"; -import { getHealthSnapshot } from "../commands/health.js"; +import { getHealthSnapshot, type HealthSummary } from "../commands/health.js"; import { getStatusSummary } from "../commands/status.js"; import { loadConfig } from "../config/config.js"; import { @@ -67,7 +67,7 @@ const METHODS = [ "chat.send", ]; -const EVENTS = ["agent", "chat", "presence", "tick", "shutdown"]; +const EVENTS = ["agent", "chat", "presence", "tick", "shutdown", "health"]; export type GatewayServer = { close: () => Promise; @@ -76,6 +76,9 @@ export type GatewayServer = { let presenceVersion = 1; let healthVersion = 1; let seq = 0; +let healthCache: HealthSummary | null = null; +let healthRefresh: Promise | null = null; +let broadcastHealthUpdate: ((snap: HealthSummary) => void) | null = null; // Track per-run sequence to detect out-of-order/lost agent events. const agentRunSeq = new Map(); @@ -94,8 +97,9 @@ function buildSnapshot(): Snapshot { const MAX_PAYLOAD_BYTES = 512 * 1024; // cap incoming frame size const MAX_BUFFERED_BYTES = 1.5 * 1024 * 1024; // per-connection send buffer limit -const HANDSHAKE_TIMEOUT_MS = 3000; +const HANDSHAKE_TIMEOUT_MS = 10_000; const TICK_INTERVAL_MS = 30_000; +const HEALTH_REFRESH_INTERVAL_MS = 60_000; const DEDUPE_TTL_MS = 5 * 60_000; const DEDUPE_MAX = 1000; const LOG_VALUE_LIMIT = 240; @@ -228,6 +232,24 @@ function formatError(err: unknown): string { return JSON.stringify(err, null, 2); } +async function refreshHealthSnapshot(opts?: { probe?: boolean }) { + if (!healthRefresh) { + healthRefresh = (async () => { + const snap = await getHealthSnapshot(undefined, opts); + healthCache = snap; + healthVersion += 1; + if (broadcastHealthUpdate) { + broadcastHealthUpdate(snap); + } + return snap; + })(); + healthRefresh.finally(() => { + healthRefresh = null; + }); + } + return healthRefresh; +} + export async function startGatewayServer(port = 18789): Promise { const releaseLock = await acquireGatewayLock().catch((err) => { // Bubble known lock errors so callers can present a nice message. @@ -332,11 +354,29 @@ export async function startGatewayServer(port = 18789): Promise { } }; + broadcastHealthUpdate = (snap: HealthSummary) => { + broadcast("health", snap, { + stateVersion: { presence: presenceVersion, health: healthVersion }, + }); + }; + // periodic keepalive const tickInterval = setInterval(() => { broadcast("tick", { ts: Date.now() }, { dropIfSlow: true }); }, TICK_INTERVAL_MS); + // periodic health refresh to keep cached snapshot warm + const healthInterval = setInterval(() => { + void refreshHealthSnapshot({ probe: true }).catch((err) => + logError(`health refresh failed: ${formatError(err)}`), + ); + }, HEALTH_REFRESH_INTERVAL_MS); + + // Prime cache so first client gets a snapshot without waiting. + void refreshHealthSnapshot({ probe: true }).catch((err) => + logError(`initial health refresh failed: ${formatError(err)}`), + ); + // dedupe cache cleanup const dedupeCleanup = setInterval(() => { const now = Date.now(); @@ -541,10 +581,10 @@ export async function startGatewayServer(port = 18789): Promise { }); presenceVersion += 1; const snapshot = buildSnapshot(); - // Fill health asynchronously for snapshot - const health = await getHealthSnapshot(undefined, { probe: false }); - snapshot.health = health; - snapshot.stateVersion.health = ++healthVersion; + if (healthCache) { + snapshot.health = healthCache; + snapshot.stateVersion.health = healthVersion; + } const helloOk = { type: "hello-ok", protocol: PROTOCOL_VERSION, @@ -578,6 +618,10 @@ export async function startGatewayServer(port = 18789): Promise { }); send(helloOk); clients.add(client); + // Kick a health refresh in the background to keep cache warm. + void refreshHealthSnapshot({ probe: true }).catch((err) => + logError(`post-hello health refresh failed: ${formatError(err)}`), + ); return; } @@ -618,9 +662,25 @@ export async function startGatewayServer(port = 18789): Promise { switch (req.method) { case "health": { - const health = await getHealthSnapshot(); - healthVersion += 1; - respond(true, health, undefined); + const now = Date.now(); + const cached = healthCache; + if (cached && now - cached.ts < HEALTH_REFRESH_INTERVAL_MS) { + respond(true, cached, undefined, { cached: true }); + void refreshHealthSnapshot({ probe: true }).catch((err) => + logError(`background health refresh failed: ${formatError(err)}`), + ); + break; + } + try { + const snap = await refreshHealthSnapshot({ probe: true }); + respond(true, snap, undefined); + } catch (err) { + respond( + false, + undefined, + errorShape(ErrorCodes.UNAVAILABLE, formatForLog(err)), + ); + } break; } case "chat.history": { @@ -1047,6 +1107,7 @@ export async function startGatewayServer(port = 18789): Promise { restartExpectedMs: null, }); clearInterval(tickInterval); + clearInterval(healthInterval); clearInterval(dedupeCleanup); if (agentUnsub) { try {