import { randomUUID } from "node:crypto"; import { WebSocket } from "ws"; import { rawDataToString } from "../infra/ws.js"; import { logDebug, logError } from "../logger.js"; import { type ConnectParams, type EventFrame, type HelloOk, PROTOCOL_VERSION, type RequestFrame, validateRequestFrame, } from "./protocol/index.js"; type Pending = { resolve: (value: unknown) => void; reject: (err: unknown) => void; expectFinal: boolean; }; export type GatewayClientOptions = { url?: string; // ws://127.0.0.1:18789 token?: string; password?: string; instanceId?: string; clientName?: string; clientVersion?: string; platform?: string; mode?: string; minProtocol?: number; maxProtocol?: number; onEvent?: (evt: EventFrame) => void; onHelloOk?: (hello: HelloOk) => void; onClose?: (code: number, reason: string) => void; onGap?: (info: { expected: number; received: number }) => void; }; export class GatewayClient { private ws: WebSocket | null = null; private opts: GatewayClientOptions; private pending = new Map(); private backoffMs = 1000; private closed = false; private lastSeq: number | null = null; // Track last tick to detect silent stalls. private lastTick: number | null = null; private tickIntervalMs = 30_000; private tickTimer: NodeJS.Timeout | null = null; constructor(opts: GatewayClientOptions) { this.opts = opts; } start() { if (this.closed) return; const url = this.opts.url ?? "ws://127.0.0.1:18789"; // Allow node screen snapshots and other large responses. this.ws = new WebSocket(url, { maxPayload: 25 * 1024 * 1024 }); this.ws.on("open", () => this.sendConnect()); this.ws.on("message", (data) => this.handleMessage(rawDataToString(data))); this.ws.on("close", (code, reason) => { const reasonText = rawDataToString(reason); this.ws = null; this.flushPendingErrors( new Error(`gateway closed (${code}): ${reasonText}`), ); this.scheduleReconnect(); this.opts.onClose?.(code, reasonText); }); this.ws.on("error", (err) => { logDebug(`gateway client error: ${String(err)}`); }); } stop() { this.closed = true; if (this.tickTimer) { clearInterval(this.tickTimer); this.tickTimer = null; } this.ws?.close(); this.ws = null; this.flushPendingErrors(new Error("gateway client stopped")); } private sendConnect() { const auth = this.opts.token || this.opts.password ? { token: this.opts.token, password: this.opts.password, } : undefined; const params: ConnectParams = { minProtocol: this.opts.minProtocol ?? PROTOCOL_VERSION, maxProtocol: this.opts.maxProtocol ?? PROTOCOL_VERSION, client: { name: this.opts.clientName ?? "gateway-client", version: this.opts.clientVersion ?? "dev", platform: this.opts.platform ?? process.platform, mode: this.opts.mode ?? "backend", instanceId: this.opts.instanceId, }, caps: [], auth, }; void this.request("connect", params) .then((helloOk) => { this.backoffMs = 1000; this.tickIntervalMs = typeof helloOk.policy?.tickIntervalMs === "number" ? helloOk.policy.tickIntervalMs : 30_000; this.lastTick = Date.now(); this.startTickWatch(); this.opts.onHelloOk?.(helloOk); }) .catch((err) => { logError(`gateway connect failed: ${String(err)}`); this.ws?.close(1008, "connect failed"); }); } private handleMessage(raw: string) { try { const parsed = JSON.parse(raw); if (parsed?.type === "event") { const evt = parsed as EventFrame; const seq = typeof evt.seq === "number" ? evt.seq : null; if (seq !== null) { if (this.lastSeq !== null && seq > this.lastSeq + 1) { this.opts.onGap?.({ expected: this.lastSeq + 1, received: seq }); } this.lastSeq = seq; } if (evt.event === "tick") { this.lastTick = Date.now(); } this.opts.onEvent?.(evt); return; } if (parsed?.type === "res") { const pending = this.pending.get(parsed.id); if (!pending) return; // If the payload is an ack with status accepted, keep waiting for final. const status = parsed.payload?.status; if (pending.expectFinal && status === "accepted") { return; } this.pending.delete(parsed.id); if (parsed.ok) pending.resolve(parsed.payload); else pending.reject(new Error(parsed.error?.message ?? "unknown error")); } } catch (err) { logDebug(`gateway client parse error: ${String(err)}`); } } private scheduleReconnect() { if (this.closed) return; if (this.tickTimer) { clearInterval(this.tickTimer); this.tickTimer = null; } const delay = this.backoffMs; this.backoffMs = Math.min(this.backoffMs * 2, 30_000); setTimeout(() => this.start(), delay).unref(); } private flushPendingErrors(err: Error) { for (const [, p] of this.pending) { p.reject(err); } this.pending.clear(); } private startTickWatch() { if (this.tickTimer) clearInterval(this.tickTimer); const interval = Math.max(this.tickIntervalMs, 1000); this.tickTimer = setInterval(() => { if (this.closed) return; if (!this.lastTick) return; const gap = Date.now() - this.lastTick; if (gap > this.tickIntervalMs * 2) { this.ws?.close(4000, "tick timeout"); } }, interval); } async request( method: string, params?: unknown, opts?: { expectFinal?: boolean }, ): Promise { if (!this.ws || this.ws.readyState !== WebSocket.OPEN) { throw new Error("gateway not connected"); } const id = randomUUID(); const frame: RequestFrame = { type: "req", id, method, params }; if (!validateRequestFrame(frame)) { throw new Error( `invalid request frame: ${JSON.stringify( validateRequestFrame.errors, null, 2, )}`, ); } const expectFinal = opts?.expectFinal === true; const p = new Promise((resolve, reject) => { this.pending.set(id, { resolve: (value) => resolve(value as T), reject, expectFinal, }); }); this.ws.send(JSON.stringify(frame)); return p; } }