Files
clawdbot/src/gateway/server/ws-connection.ts
2026-01-19 00:15:44 +00:00

245 lines
7.7 KiB
TypeScript

import { randomUUID } from "node:crypto";
import type { WebSocket, WebSocketServer } from "ws";
import { resolveCanvasHostUrl } from "../../infra/canvas-host-url.js";
import { listSystemPresence, upsertPresence } from "../../infra/system-presence.js";
import type { createSubsystemLogger } from "../../logging/subsystem.js";
import { isWebchatClient } from "../../utils/message-channel.js";
import type { ResolvedGatewayAuth } from "../auth.js";
import { isLoopbackAddress } from "../net.js";
import { HANDSHAKE_TIMEOUT_MS } from "../server-constants.js";
import type { GatewayRequestContext, GatewayRequestHandlers } from "../server-methods/types.js";
import { formatError } from "../server-utils.js";
import { logWs } from "../ws-log.js";
import { getHealthVersion, getPresenceVersion, incrementPresenceVersion } from "./health-state.js";
import { attachGatewayWsMessageHandler } from "./ws-connection/message-handler.js";
import type { GatewayWsClient } from "./ws-types.js";
type SubsystemLogger = ReturnType<typeof createSubsystemLogger>;
export function attachGatewayWsConnectionHandler(params: {
wss: WebSocketServer;
clients: Set<GatewayWsClient>;
port: number;
bridgeHost?: string;
canvasHostEnabled: boolean;
canvasHostServerPort?: number;
resolvedAuth: ResolvedGatewayAuth;
gatewayMethods: string[];
events: string[];
logGateway: SubsystemLogger;
logHealth: SubsystemLogger;
logWsControl: SubsystemLogger;
extraHandlers: GatewayRequestHandlers;
broadcast: (
event: string,
payload: unknown,
opts?: {
dropIfSlow?: boolean;
stateVersion?: { presence?: number; health?: number };
},
) => void;
buildRequestContext: () => GatewayRequestContext;
}) {
const {
wss,
clients,
port,
bridgeHost,
canvasHostEnabled,
canvasHostServerPort,
resolvedAuth,
gatewayMethods,
events,
logGateway,
logHealth,
logWsControl,
extraHandlers,
broadcast,
buildRequestContext,
} = params;
wss.on("connection", (socket, upgradeReq) => {
let client: GatewayWsClient | null = null;
let closed = false;
const openedAt = Date.now();
const connId = randomUUID();
const remoteAddr = (socket as WebSocket & { _socket?: { remoteAddress?: string } })._socket
?.remoteAddress;
const headerValue = (value: string | string[] | undefined) =>
Array.isArray(value) ? value[0] : value;
const requestHost = headerValue(upgradeReq.headers.host);
const requestOrigin = headerValue(upgradeReq.headers.origin);
const requestUserAgent = headerValue(upgradeReq.headers["user-agent"]);
const forwardedFor = headerValue(upgradeReq.headers["x-forwarded-for"]);
const canvasHostPortForWs = canvasHostServerPort ?? (canvasHostEnabled ? port : undefined);
const canvasHostOverride =
bridgeHost && bridgeHost !== "0.0.0.0" && bridgeHost !== "::" ? bridgeHost : undefined;
const canvasHostUrl = resolveCanvasHostUrl({
canvasPort: canvasHostPortForWs,
hostOverride: canvasHostServerPort ? canvasHostOverride : undefined,
requestHost: upgradeReq.headers.host,
forwardedProto: upgradeReq.headers["x-forwarded-proto"],
localAddress: upgradeReq.socket?.localAddress,
});
logWs("in", "open", { connId, remoteAddr });
let handshakeState: "pending" | "connected" | "failed" = "pending";
let closeCause: string | undefined;
let closeMeta: Record<string, unknown> = {};
let lastFrameType: string | undefined;
let lastFrameMethod: string | undefined;
let lastFrameId: string | undefined;
const setCloseCause = (cause: string, meta?: Record<string, unknown>) => {
if (!closeCause) closeCause = cause;
if (meta && Object.keys(meta).length > 0) {
closeMeta = { ...closeMeta, ...meta };
}
};
const setLastFrameMeta = (meta: { type?: string; method?: string; id?: string }) => {
if (meta.type || meta.method || meta.id) {
lastFrameType = meta.type ?? lastFrameType;
lastFrameMethod = meta.method ?? lastFrameMethod;
lastFrameId = meta.id ?? lastFrameId;
}
};
const send = (obj: unknown) => {
try {
socket.send(JSON.stringify(obj));
} catch {
/* ignore */
}
};
const close = (code = 1000, reason?: string) => {
if (closed) return;
closed = true;
clearTimeout(handshakeTimer);
if (client) clients.delete(client);
try {
socket.close(code, reason);
} catch {
/* ignore */
}
};
socket.once("error", (err) => {
logWsControl.warn(`error conn=${connId} remote=${remoteAddr ?? "?"}: ${formatError(err)}`);
close();
});
const isNoisySwiftPmHelperClose = (userAgent: string | undefined, remote: string | undefined) =>
Boolean(
userAgent?.toLowerCase().includes("swiftpm-testing-helper") && isLoopbackAddress(remote),
);
socket.once("close", (code, reason) => {
const durationMs = Date.now() - openedAt;
const closeContext = {
cause: closeCause,
handshake: handshakeState,
durationMs,
lastFrameType,
lastFrameMethod,
lastFrameId,
host: requestHost,
origin: requestOrigin,
userAgent: requestUserAgent,
forwardedFor,
...closeMeta,
};
if (!client) {
const logFn = isNoisySwiftPmHelperClose(requestUserAgent, remoteAddr)
? logWsControl.debug
: logWsControl.warn;
logFn(
`closed before connect conn=${connId} remote=${remoteAddr ?? "?"} fwd=${forwardedFor ?? "n/a"} origin=${requestOrigin ?? "n/a"} host=${requestHost ?? "n/a"} ua=${requestUserAgent ?? "n/a"} code=${code ?? "n/a"} reason=${reason?.toString() || "n/a"}`,
closeContext,
);
}
if (client && isWebchatClient(client.connect.client)) {
logWsControl.info(
`webchat disconnected code=${code} reason=${reason?.toString() || "n/a"} conn=${connId}`,
);
}
if (client?.presenceKey) {
upsertPresence(client.presenceKey, { reason: "disconnect" });
incrementPresenceVersion();
broadcast(
"presence",
{ presence: listSystemPresence() },
{
dropIfSlow: true,
stateVersion: {
presence: getPresenceVersion(),
health: getHealthVersion(),
},
},
);
}
logWs("out", "close", {
connId,
code,
reason: reason?.toString(),
durationMs,
cause: closeCause,
handshake: handshakeState,
lastFrameType,
lastFrameMethod,
lastFrameId,
});
close();
});
const handshakeTimer = setTimeout(() => {
if (!client) {
handshakeState = "failed";
setCloseCause("handshake-timeout", {
handshakeMs: Date.now() - openedAt,
});
logWsControl.warn(`handshake timeout conn=${connId} remote=${remoteAddr ?? "?"}`);
close();
}
}, HANDSHAKE_TIMEOUT_MS);
attachGatewayWsMessageHandler({
socket,
upgradeReq,
connId,
remoteAddr,
forwardedFor,
requestHost,
requestOrigin,
requestUserAgent,
canvasHostUrl,
resolvedAuth,
gatewayMethods,
events,
extraHandlers,
buildRequestContext,
send,
close,
isClosed: () => closed,
clearHandshakeTimer: () => clearTimeout(handshakeTimer),
getClient: () => client,
setClient: (next) => {
client = next;
clients.add(next);
},
setHandshakeState: (next) => {
handshakeState = next;
},
setCloseCause,
setLastFrameMeta,
logGateway,
logHealth,
logWsControl,
});
});
}