import type { GatewayWsClient } from "./server/ws-types.js"; import { MAX_BUFFERED_BYTES } from "./server-constants.js"; import { logWs, summarizeAgentEventForWsLog } from "./ws-log.js"; const ADMIN_SCOPE = "operator.admin"; const APPROVALS_SCOPE = "operator.approvals"; const PAIRING_SCOPE = "operator.pairing"; const EVENT_SCOPE_GUARDS: Record = { "exec.approval.requested": [APPROVALS_SCOPE], "exec.approval.resolved": [APPROVALS_SCOPE], "device.pair.requested": [PAIRING_SCOPE], "device.pair.resolved": [PAIRING_SCOPE], "node.pair.requested": [PAIRING_SCOPE], "node.pair.resolved": [PAIRING_SCOPE], }; function hasEventScope(client: GatewayWsClient, event: string): boolean { const required = EVENT_SCOPE_GUARDS[event]; if (!required) return true; const role = client.connect.role ?? "operator"; if (role !== "operator") return false; const scopes = Array.isArray(client.connect.scopes) ? client.connect.scopes : []; if (scopes.includes(ADMIN_SCOPE)) return true; return required.some((scope) => scopes.includes(scope)); } export function createGatewayBroadcaster(params: { clients: Set }) { let seq = 0; const broadcast = ( event: string, payload: unknown, opts?: { dropIfSlow?: boolean; stateVersion?: { presence?: number; health?: number }; }, ) => { const eventSeq = ++seq; const frame = JSON.stringify({ type: "event", event, payload, seq: eventSeq, stateVersion: opts?.stateVersion, }); const logMeta: Record = { event, seq: eventSeq, clients: params.clients.size, dropIfSlow: opts?.dropIfSlow, presenceVersion: opts?.stateVersion?.presence, healthVersion: opts?.stateVersion?.health, }; if (event === "agent") { Object.assign(logMeta, summarizeAgentEventForWsLog(payload)); } logWs("out", "event", logMeta); for (const c of params.clients) { if (!hasEventScope(c, event)) continue; const slow = c.socket.bufferedAmount > MAX_BUFFERED_BYTES; if (slow && opts?.dropIfSlow) continue; if (slow) { try { c.socket.close(1008, "slow consumer"); } catch { /* ignore */ } continue; } try { c.socket.send(frame); } catch { /* ignore */ } } }; return { broadcast }; }