gateway: cache health snapshot

This commit is contained in:
Peter Steinberger
2025-12-09 23:38:55 +00:00
parent e58d5a54b1
commit dd88345483

View File

@@ -6,7 +6,7 @@ import chalk from "chalk";
import { type WebSocket, WebSocketServer } from "ws";
import { createDefaultDeps } from "../cli/deps.js";
import { agentCommand } from "../commands/agent.js";
import { getHealthSnapshot } from "../commands/health.js";
import { getHealthSnapshot, type HealthSummary } from "../commands/health.js";
import { getStatusSummary } from "../commands/status.js";
import { loadConfig } from "../config/config.js";
import {
@@ -67,7 +67,7 @@ const METHODS = [
"chat.send",
];
const EVENTS = ["agent", "chat", "presence", "tick", "shutdown"];
const EVENTS = ["agent", "chat", "presence", "tick", "shutdown", "health"];
export type GatewayServer = {
close: () => Promise<void>;
@@ -76,6 +76,9 @@ export type GatewayServer = {
let presenceVersion = 1;
let healthVersion = 1;
let seq = 0;
let healthCache: HealthSummary | null = null;
let healthRefresh: Promise<HealthSummary> | null = null;
let broadcastHealthUpdate: ((snap: HealthSummary) => void) | null = null;
// Track per-run sequence to detect out-of-order/lost agent events.
const agentRunSeq = new Map<string, number>();
@@ -94,8 +97,9 @@ function buildSnapshot(): Snapshot {
const MAX_PAYLOAD_BYTES = 512 * 1024; // cap incoming frame size
const MAX_BUFFERED_BYTES = 1.5 * 1024 * 1024; // per-connection send buffer limit
const HANDSHAKE_TIMEOUT_MS = 3000;
const HANDSHAKE_TIMEOUT_MS = 10_000;
const TICK_INTERVAL_MS = 30_000;
const HEALTH_REFRESH_INTERVAL_MS = 60_000;
const DEDUPE_TTL_MS = 5 * 60_000;
const DEDUPE_MAX = 1000;
const LOG_VALUE_LIMIT = 240;
@@ -228,6 +232,24 @@ function formatError(err: unknown): string {
return JSON.stringify(err, null, 2);
}
async function refreshHealthSnapshot(opts?: { probe?: boolean }) {
if (!healthRefresh) {
healthRefresh = (async () => {
const snap = await getHealthSnapshot(undefined, opts);
healthCache = snap;
healthVersion += 1;
if (broadcastHealthUpdate) {
broadcastHealthUpdate(snap);
}
return snap;
})();
healthRefresh.finally(() => {
healthRefresh = null;
});
}
return healthRefresh;
}
export async function startGatewayServer(port = 18789): Promise<GatewayServer> {
const releaseLock = await acquireGatewayLock().catch((err) => {
// Bubble known lock errors so callers can present a nice message.
@@ -332,11 +354,29 @@ export async function startGatewayServer(port = 18789): Promise<GatewayServer> {
}
};
broadcastHealthUpdate = (snap: HealthSummary) => {
broadcast("health", snap, {
stateVersion: { presence: presenceVersion, health: healthVersion },
});
};
// periodic keepalive
const tickInterval = setInterval(() => {
broadcast("tick", { ts: Date.now() }, { dropIfSlow: true });
}, TICK_INTERVAL_MS);
// periodic health refresh to keep cached snapshot warm
const healthInterval = setInterval(() => {
void refreshHealthSnapshot({ probe: true }).catch((err) =>
logError(`health refresh failed: ${formatError(err)}`),
);
}, HEALTH_REFRESH_INTERVAL_MS);
// Prime cache so first client gets a snapshot without waiting.
void refreshHealthSnapshot({ probe: true }).catch((err) =>
logError(`initial health refresh failed: ${formatError(err)}`),
);
// dedupe cache cleanup
const dedupeCleanup = setInterval(() => {
const now = Date.now();
@@ -541,10 +581,10 @@ export async function startGatewayServer(port = 18789): Promise<GatewayServer> {
});
presenceVersion += 1;
const snapshot = buildSnapshot();
// Fill health asynchronously for snapshot
const health = await getHealthSnapshot(undefined, { probe: false });
snapshot.health = health;
snapshot.stateVersion.health = ++healthVersion;
if (healthCache) {
snapshot.health = healthCache;
snapshot.stateVersion.health = healthVersion;
}
const helloOk = {
type: "hello-ok",
protocol: PROTOCOL_VERSION,
@@ -578,6 +618,10 @@ export async function startGatewayServer(port = 18789): Promise<GatewayServer> {
});
send(helloOk);
clients.add(client);
// Kick a health refresh in the background to keep cache warm.
void refreshHealthSnapshot({ probe: true }).catch((err) =>
logError(`post-hello health refresh failed: ${formatError(err)}`),
);
return;
}
@@ -618,9 +662,25 @@ export async function startGatewayServer(port = 18789): Promise<GatewayServer> {
switch (req.method) {
case "health": {
const health = await getHealthSnapshot();
healthVersion += 1;
respond(true, health, undefined);
const now = Date.now();
const cached = healthCache;
if (cached && now - cached.ts < HEALTH_REFRESH_INTERVAL_MS) {
respond(true, cached, undefined, { cached: true });
void refreshHealthSnapshot({ probe: true }).catch((err) =>
logError(`background health refresh failed: ${formatError(err)}`),
);
break;
}
try {
const snap = await refreshHealthSnapshot({ probe: true });
respond(true, snap, undefined);
} catch (err) {
respond(
false,
undefined,
errorShape(ErrorCodes.UNAVAILABLE, formatForLog(err)),
);
}
break;
}
case "chat.history": {
@@ -1047,6 +1107,7 @@ export async function startGatewayServer(port = 18789): Promise<GatewayServer> {
restartExpectedMs: null,
});
clearInterval(tickInterval);
clearInterval(healthInterval);
clearInterval(dedupeCleanup);
if (agentUnsub) {
try {