feat(gateway)!: switch handshake to req:connect (protocol v2)

This commit is contained in:
Peter Steinberger
2025-12-12 23:29:57 +00:00
parent e915ed182d
commit d5d80f4247
26 changed files with 586 additions and 955 deletions

View File

@@ -220,7 +220,7 @@ Examples:
)
.option(
"--token <token>",
"Shared token required in hello.auth.token (default: CLAWDIS_GATEWAY_TOKEN env if set)",
"Shared token required in connect.params.auth.token (default: CLAWDIS_GATEWAY_TOKEN env if set)",
)
.option(
"--force",

View File

@@ -29,11 +29,13 @@ describe("GatewayClient", () => {
wss = new WebSocketServer({ port, host: "127.0.0.1" });
wss.on("connection", (socket) => {
socket.once("message", () => {
socket.once("message", (data) => {
const first = JSON.parse(String(data)) as { id?: string };
const id = first.id ?? "connect";
// Respond with tiny tick interval to trigger watchdog quickly.
const helloOk = {
type: "hello-ok",
protocol: 1,
protocol: 2,
server: { version: "dev", connId: "c1" },
features: { methods: [], events: [] },
snapshot: {
@@ -48,7 +50,9 @@ describe("GatewayClient", () => {
tickIntervalMs: 5,
},
};
socket.send(JSON.stringify(helloOk));
socket.send(
JSON.stringify({ type: "res", id, ok: true, payload: helloOk }),
);
});
});

View File

@@ -2,8 +2,8 @@ import { randomUUID } from "node:crypto";
import { WebSocket } from "ws";
import { logDebug, logError } from "../logger.js";
import {
type ConnectParams,
type EventFrame,
type Hello,
type HelloOk,
PROTOCOL_VERSION,
type RequestFrame,
@@ -53,7 +53,7 @@ export class GatewayClient {
const url = this.opts.url ?? "ws://127.0.0.1:18789";
this.ws = new WebSocket(url, { maxPayload: 512 * 1024 });
this.ws.on("open", () => this.sendHello());
this.ws.on("open", () => this.sendConnect());
this.ws.on("message", (data) => this.handleMessage(data.toString()));
this.ws.on("close", (code, reason) => {
this.ws = null;
@@ -79,9 +79,8 @@ export class GatewayClient {
this.flushPendingErrors(new Error("gateway client stopped"));
}
private sendHello() {
const hello: Hello = {
type: "hello",
private sendConnect() {
const params: ConnectParams = {
minProtocol: this.opts.minProtocol ?? PROTOCOL_VERSION,
maxProtocol: this.opts.maxProtocol ?? PROTOCOL_VERSION,
client: {
@@ -94,28 +93,27 @@ export class GatewayClient {
caps: [],
auth: this.opts.token ? { token: this.opts.token } : undefined,
};
this.ws?.send(JSON.stringify(hello));
void this.request<HelloOk>("connect", params)
.then((helloOk) => {
this.backoffMs = 1000;
this.tickIntervalMs =
typeof helloOk.policy?.tickIntervalMs === "number"
? helloOk.policy.tickIntervalMs
: 30_000;
this.lastTick = Date.now();
this.startTickWatch();
this.opts.onHelloOk?.(helloOk);
})
.catch((err) => {
logError(`gateway connect failed: ${String(err)}`);
this.ws?.close(1008, "connect failed");
});
}
private handleMessage(raw: string) {
try {
const parsed = JSON.parse(raw);
if (parsed?.type === "hello-ok") {
this.backoffMs = 1000;
this.tickIntervalMs =
typeof parsed.policy?.tickIntervalMs === "number"
? parsed.policy.tickIntervalMs
: 30_000;
this.lastTick = Date.now();
this.startTickWatch();
this.opts.onHelloOk?.(parsed as HelloOk);
return;
}
if (parsed?.type === "hello-error") {
logError(`gateway hello-error: ${parsed.reason}`);
this.ws?.close(1008, "hello-error");
return;
}
if (parsed?.type === "event") {
const evt = parsed as EventFrame;
const seq = typeof evt.seq === "number" ? evt.seq : null;

View File

@@ -7,6 +7,8 @@ import {
ChatEventSchema,
ChatHistoryParamsSchema,
ChatSendParamsSchema,
type ConnectParams,
ConnectParamsSchema,
ErrorCodes,
type ErrorShape,
ErrorShapeSchema,
@@ -15,12 +17,8 @@ import {
errorShape,
type GatewayFrame,
GatewayFrameSchema,
type Hello,
type HelloError,
HelloErrorSchema,
type HelloOk,
HelloOkSchema,
HelloSchema,
PROTOCOL_VERSION,
type PresenceEntry,
PresenceEntrySchema,
@@ -50,7 +48,8 @@ const ajv = new (
removeAdditional: false,
});
export const validateHello = ajv.compile<Hello>(HelloSchema);
export const validateConnectParams =
ajv.compile<ConnectParams>(ConnectParamsSchema);
export const validateRequestFrame =
ajv.compile<RequestFrame>(RequestFrameSchema);
export const validateSendParams = ajv.compile(SendParamsSchema);
@@ -67,9 +66,8 @@ export function formatValidationErrors(
}
export {
HelloSchema,
ConnectParamsSchema,
HelloOkSchema,
HelloErrorSchema,
RequestFrameSchema,
ResponseFrameSchema,
EventFrameSchema,
@@ -94,9 +92,8 @@ export {
export type {
GatewayFrame,
Hello,
ConnectParams,
HelloOk,
HelloError,
RequestFrame,
ResponseFrame,
EventFrame,

View File

@@ -53,9 +53,8 @@ export const ShutdownEventSchema = Type.Object(
{ additionalProperties: false },
);
export const HelloSchema = Type.Object(
export const ConnectParamsSchema = Type.Object(
{
type: Type.Literal("hello"),
minProtocol: Type.Integer({ minimum: 1 }),
maxProtocol: Type.Integer({ minimum: 1 }),
client: Type.Object(
@@ -116,16 +115,6 @@ export const HelloOkSchema = Type.Object(
{ additionalProperties: false },
);
export const HelloErrorSchema = Type.Object(
{
type: Type.Literal("hello-error"),
reason: NonEmptyString,
expectedProtocol: Type.Optional(Type.Integer({ minimum: 1 })),
minClient: Type.Optional(NonEmptyString),
},
{ additionalProperties: false },
);
export const ErrorShapeSchema = Type.Object(
{
code: NonEmptyString,
@@ -173,14 +162,7 @@ export const EventFrameSchema = Type.Object(
// downstream codegen (quicktype) produce tighter types instead of all-optional
// blobs.
export const GatewayFrameSchema = Type.Union(
[
HelloSchema,
HelloOkSchema,
HelloErrorSchema,
RequestFrameSchema,
ResponseFrameSchema,
EventFrameSchema,
],
[RequestFrameSchema, ResponseFrameSchema, EventFrameSchema],
{ discriminator: "type" },
);
@@ -261,9 +243,8 @@ export const ChatEventSchema = Type.Object(
);
export const ProtocolSchemas: Record<string, TSchema> = {
Hello: HelloSchema,
ConnectParams: ConnectParamsSchema,
HelloOk: HelloOkSchema,
HelloError: HelloErrorSchema,
RequestFrame: RequestFrameSchema,
ResponseFrame: ResponseFrameSchema,
EventFrame: EventFrameSchema,
@@ -282,11 +263,10 @@ export const ProtocolSchemas: Record<string, TSchema> = {
ShutdownEvent: ShutdownEventSchema,
};
export const PROTOCOL_VERSION = 1 as const;
export const PROTOCOL_VERSION = 2 as const;
export type Hello = Static<typeof HelloSchema>;
export type ConnectParams = Static<typeof ConnectParamsSchema>;
export type HelloOk = Static<typeof HelloOkSchema>;
export type HelloError = Static<typeof HelloErrorSchema>;
export type RequestFrame = Static<typeof RequestFrameSchema>;
export type ResponseFrame = Static<typeof ResponseFrameSchema>;
export type EventFrame = Static<typeof EventFrameSchema>;

View File

@@ -8,6 +8,7 @@ import { WebSocket } from "ws";
import { agentCommand } from "../commands/agent.js";
import { emitAgentEvent } from "../infra/agent-events.js";
import { GatewayLockError } from "../infra/gateway-lock.js";
import { PROTOCOL_VERSION } from "./protocol/index.js";
import { startGatewayServer } from "./server.js";
let testSessionStorePath: string | undefined;
@@ -109,6 +110,67 @@ async function startServerWithClient(token?: string) {
return { server, ws, port, prevToken: prev };
}
type ConnectResponse = {
type: "res";
id: string;
ok: boolean;
payload?: unknown;
error?: { message?: string };
};
async function connectReq(
ws: WebSocket,
opts?: {
token?: string;
minProtocol?: number;
maxProtocol?: number;
client?: {
name: string;
version: string;
platform: string;
mode: string;
instanceId?: string;
};
},
): Promise<ConnectResponse> {
const id = randomUUID();
ws.send(
JSON.stringify({
type: "req",
id,
method: "connect",
params: {
minProtocol: opts?.minProtocol ?? PROTOCOL_VERSION,
maxProtocol: opts?.maxProtocol ?? PROTOCOL_VERSION,
client: opts?.client ?? {
name: "test",
version: "1.0.0",
platform: "test",
mode: "test",
},
caps: [],
auth: opts?.token ? { token: opts.token } : undefined,
},
}),
);
return await onceMessage<ConnectResponse>(
ws,
(o) => o.type === "res" && o.id === id,
);
}
async function connectOk(
ws: WebSocket,
opts?: Parameters<typeof connectReq>[1],
) {
const res = await connectReq(ws, opts);
expect(res.ok).toBe(true);
expect((res.payload as { type?: unknown } | undefined)?.type).toBe(
"hello-ok",
);
return res.payload as { type: "hello-ok" };
}
describe("gateway server", () => {
test("agent falls back to allowFrom when lastTo is stale", async () => {
testAllowFrom = ["+436769770569"];
@@ -132,16 +194,7 @@ describe("gateway server", () => {
);
const { server, ws } = await startServerWithClient();
ws.send(
JSON.stringify({
type: "hello",
minProtocol: 1,
maxProtocol: 1,
client: { name: "test", version: "1", platform: "test", mode: "test" },
caps: [],
}),
);
await onceMessage(ws, (o) => o.type === "hello-ok");
await connectOk(ws);
ws.send(
JSON.stringify({
@@ -196,16 +249,7 @@ describe("gateway server", () => {
);
const { server, ws } = await startServerWithClient();
ws.send(
JSON.stringify({
type: "hello",
minProtocol: 1,
maxProtocol: 1,
client: { name: "test", version: "1", platform: "test", mode: "test" },
caps: [],
}),
);
await onceMessage(ws, (o) => o.type === "hello-ok");
await connectOk(ws);
ws.send(
JSON.stringify({
@@ -260,16 +304,7 @@ describe("gateway server", () => {
);
const { server, ws } = await startServerWithClient();
ws.send(
JSON.stringify({
type: "hello",
minProtocol: 1,
maxProtocol: 1,
client: { name: "test", version: "1", platform: "test", mode: "test" },
caps: [],
}),
);
await onceMessage(ws, (o) => o.type === "hello-ok");
await connectOk(ws);
ws.send(
JSON.stringify({
@@ -322,16 +357,7 @@ describe("gateway server", () => {
);
const { server, ws } = await startServerWithClient();
ws.send(
JSON.stringify({
type: "hello",
minProtocol: 1,
maxProtocol: 1,
client: { name: "test", version: "1", platform: "test", mode: "test" },
caps: [],
}),
);
await onceMessage(ws, (o) => o.type === "hello-ok");
await connectOk(ws);
ws.send(
JSON.stringify({
@@ -364,18 +390,12 @@ describe("gateway server", () => {
test("rejects protocol mismatch", async () => {
const { server, ws } = await startServerWithClient();
ws.send(
JSON.stringify({
type: "hello",
minProtocol: 2,
maxProtocol: 3,
client: { name: "test", version: "1", platform: "test", mode: "test" },
caps: [],
}),
);
try {
const res = await onceMessage(ws, () => true, 2000);
expect(res.type).toBe("hello-error");
const res = await connectReq(ws, {
minProtocol: PROTOCOL_VERSION + 1,
maxProtocol: PROTOCOL_VERSION + 2,
});
expect(res.ok).toBe(false);
} catch {
// If the server closed before we saw the frame, that's acceptable for mismatch.
}
@@ -385,19 +405,9 @@ describe("gateway server", () => {
test("rejects invalid token", async () => {
const { server, ws, prevToken } = await startServerWithClient("secret");
ws.send(
JSON.stringify({
type: "hello",
minProtocol: 1,
maxProtocol: 1,
client: { name: "test", version: "1", platform: "test", mode: "test" },
caps: [],
auth: { token: "wrong" },
}),
);
const res = await onceMessage(ws, () => true);
expect(res.type).toBe("hello-error");
expect(res.reason).toContain("unauthorized");
const res = await connectReq(ws, { token: "wrong" });
expect(res.ok).toBe(false);
expect(res.error?.message ?? "").toContain("unauthorized");
ws.close();
await server.close();
process.env.CLAWDIS_GATEWAY_TOKEN = prevToken;
@@ -420,16 +430,17 @@ describe("gateway server", () => {
},
);
test(
"hello + health + presence + status succeed",
{ timeout: 8000 },
async () => {
const { server, ws } = await startServerWithClient();
ws.send(
JSON.stringify({
type: "hello",
minProtocol: 1,
maxProtocol: 1,
test("connect (req) handshake returns hello-ok payload", async () => {
const { server, ws } = await startServerWithClient();
const id = randomUUID();
ws.send(
JSON.stringify({
type: "req",
id,
method: "connect",
params: {
minProtocol: PROTOCOL_VERSION,
maxProtocol: PROTOCOL_VERSION,
client: {
name: "test",
version: "1.0.0",
@@ -437,9 +448,40 @@ describe("gateway server", () => {
mode: "test",
},
caps: [],
}),
);
await onceMessage(ws, (o) => o.type === "hello-ok");
},
}),
);
const res = await onceMessage<{ ok: boolean; payload?: unknown }>(
ws,
(o) => o.type === "res" && o.id === id,
);
expect(res.ok).toBe(true);
expect((res.payload as { type?: unknown } | undefined)?.type).toBe(
"hello-ok",
);
ws.close();
await server.close();
});
test("rejects non-connect first request", async () => {
const { server, ws } = await startServerWithClient();
ws.send(JSON.stringify({ type: "req", id: "h1", method: "health" }));
const res = await onceMessage<{ ok: boolean; error?: unknown }>(
ws,
(o) => o.type === "res" && o.id === "h1",
);
expect(res.ok).toBe(false);
await new Promise<void>((resolve) => ws.once("close", () => resolve()));
await server.close();
});
test(
"connect + health + presence + status succeed",
{ timeout: 8000 },
async () => {
const { server, ws } = await startServerWithClient();
await connectOk(ws);
const healthP = onceMessage(
ws,
@@ -478,21 +520,7 @@ describe("gateway server", () => {
{ timeout: 8000 },
async () => {
const { server, ws } = await startServerWithClient();
ws.send(
JSON.stringify({
type: "hello",
minProtocol: 1,
maxProtocol: 1,
client: {
name: "test",
version: "1.0.0",
platform: "test",
mode: "test",
},
caps: [],
}),
);
await onceMessage(ws, (o) => o.type === "hello-ok");
await connectOk(ws);
const presenceEventP = onceMessage(
ws,
@@ -519,21 +547,7 @@ describe("gateway server", () => {
test("agent events stream with seq", { timeout: 8000 }, async () => {
const { server, ws } = await startServerWithClient();
ws.send(
JSON.stringify({
type: "hello",
minProtocol: 1,
maxProtocol: 1,
client: {
name: "test",
version: "1.0.0",
platform: "test",
mode: "test",
},
caps: [],
}),
);
await onceMessage(ws, (o) => o.type === "hello-ok");
await connectOk(ws);
// Emit a fake agent event directly through the shared emitter.
const evtPromise = onceMessage(
@@ -555,21 +569,7 @@ describe("gateway server", () => {
{ timeout: 8000 },
async () => {
const { server, ws } = await startServerWithClient();
ws.send(
JSON.stringify({
type: "hello",
minProtocol: 1,
maxProtocol: 1,
client: {
name: "test",
version: "1.0.0",
platform: "test",
mode: "test",
},
caps: [],
}),
);
await onceMessage(ws, (o) => o.type === "hello-ok");
await connectOk(ws);
const ackP = onceMessage(
ws,
@@ -610,21 +610,7 @@ describe("gateway server", () => {
{ timeout: 8000 },
async () => {
const { server, ws } = await startServerWithClient();
ws.send(
JSON.stringify({
type: "hello",
minProtocol: 1,
maxProtocol: 1,
client: {
name: "test",
version: "1.0.0",
platform: "test",
mode: "test",
},
caps: [],
}),
);
await onceMessage(ws, (o) => o.type === "hello-ok");
await connectOk(ws);
const firstFinalP = onceMessage(
ws,
@@ -665,21 +651,7 @@ describe("gateway server", () => {
test("shutdown event is broadcast on close", { timeout: 8000 }, async () => {
const { server, ws } = await startServerWithClient();
ws.send(
JSON.stringify({
type: "hello",
minProtocol: 1,
maxProtocol: 1,
client: {
name: "test",
version: "1.0.0",
platform: "test",
mode: "test",
},
caps: [],
}),
);
await onceMessage(ws, (o) => o.type === "hello-ok");
await connectOk(ws);
const shutdownP = onceMessage(
ws,
@@ -700,21 +672,7 @@ describe("gateway server", () => {
const mkClient = async () => {
const c = new WebSocket(`ws://127.0.0.1:${port}`);
await new Promise<void>((resolve) => c.once("open", resolve));
c.send(
JSON.stringify({
type: "hello",
minProtocol: 1,
maxProtocol: 1,
client: {
name: "test",
version: "1.0.0",
platform: "test",
mode: "test",
},
caps: [],
}),
);
await onceMessage(c, (o) => o.type === "hello-ok");
await connectOk(c);
return c;
};
@@ -742,21 +700,7 @@ describe("gateway server", () => {
test("send dedupes by idempotencyKey", { timeout: 8000 }, async () => {
const { server, ws } = await startServerWithClient();
ws.send(
JSON.stringify({
type: "hello",
minProtocol: 1,
maxProtocol: 1,
client: {
name: "test",
version: "1.0.0",
platform: "test",
mode: "test",
},
caps: [],
}),
);
await onceMessage(ws, (o) => o.type === "hello-ok");
await connectOk(ws);
const idem = "same-key";
const res1P = onceMessage(ws, (o) => o.type === "res" && o.id === "a1");
@@ -789,21 +733,7 @@ describe("gateway server", () => {
const dial = async () => {
const ws = new WebSocket(`ws://127.0.0.1:${port}`);
await new Promise<void>((resolve) => ws.once("open", resolve));
ws.send(
JSON.stringify({
type: "hello",
minProtocol: 1,
maxProtocol: 1,
client: {
name: "test",
version: "1.0.0",
platform: "test",
mode: "test",
},
caps: [],
}),
);
await onceMessage(ws, (o) => o.type === "hello-ok");
await connectOk(ws);
return ws;
};
@@ -849,16 +779,7 @@ describe("gateway server", () => {
test("chat.send accepts image attachment", { timeout: 12000 }, async () => {
const { server, ws } = await startServerWithClient();
ws.send(
JSON.stringify({
type: "hello",
minProtocol: 1,
maxProtocol: 1,
client: { name: "test", version: "1", platform: "test", mode: "test" },
caps: [],
}),
);
await onceMessage(ws, (o) => o.type === "hello-ok");
await connectOk(ws);
const reqId = "chat-img";
ws.send(
@@ -916,16 +837,7 @@ describe("gateway server", () => {
);
const { server, ws } = await startServerWithClient();
ws.send(
JSON.stringify({
type: "hello",
minProtocol: 1,
maxProtocol: 1,
client: { name: "test", version: "1", platform: "test", mode: "test" },
caps: [],
}),
);
await onceMessage(ws, (o) => o.type === "hello-ok");
await connectOk(ws);
const reqId = "chat-route";
ws.send(
@@ -961,22 +873,15 @@ describe("gateway server", () => {
test("presence includes client fingerprint", async () => {
const { server, ws } = await startServerWithClient();
ws.send(
JSON.stringify({
type: "hello",
minProtocol: 1,
maxProtocol: 1,
client: {
name: "fingerprint",
version: "9.9.9",
platform: "test",
mode: "ui",
instanceId: "abc",
},
caps: [],
}),
);
await onceMessage(ws, (o) => o.type === "hello-ok");
await connectOk(ws, {
client: {
name: "fingerprint",
version: "9.9.9",
platform: "test",
mode: "ui",
instanceId: "abc",
},
});
const presenceP = onceMessage(
ws,
@@ -1005,22 +910,15 @@ describe("gateway server", () => {
test("cli connections are not tracked as instances", async () => {
const { server, ws } = await startServerWithClient();
const cliId = `cli-${randomUUID()}`;
ws.send(
JSON.stringify({
type: "hello",
minProtocol: 1,
maxProtocol: 1,
client: {
name: "cli",
version: "dev",
platform: "test",
mode: "cli",
instanceId: cliId,
},
caps: [],
}),
);
await onceMessage(ws, (o) => o.type === "hello-ok");
await connectOk(ws, {
client: {
name: "cli",
version: "dev",
platform: "test",
mode: "cli",
instanceId: cliId,
},
});
const presenceP = onceMessage(
ws,

View File

@@ -39,25 +39,25 @@ import { sendMessageWhatsApp } from "../web/outbound.js";
import { ensureWebChatServerFromConfig } from "../webchat/server.js";
import { buildMessageWithAttachments } from "./chat-attachments.js";
import {
type ConnectParams,
ErrorCodes,
type ErrorShape,
errorShape,
formatValidationErrors,
type Hello,
PROTOCOL_VERSION,
type RequestFrame,
type Snapshot,
validateAgentParams,
validateChatHistoryParams,
validateChatSendParams,
validateHello,
validateConnectParams,
validateRequestFrame,
validateSendParams,
} from "./protocol/index.js";
type Client = {
socket: WebSocket;
hello: Hello;
connect: ConnectParams;
connId: string;
presenceKey?: string;
};
@@ -502,13 +502,10 @@ export async function startGatewayServer(
const remoteAddr = (
socket as WebSocket & { _socket?: { remoteAddress?: string } }
)._socket?.remoteAddress;
logWs("in", "connect", { connId, remoteAddr });
const describeHello = (hello: Hello | null | undefined) =>
hello
? `${hello.client.name ?? "unknown"} ${hello.client.mode ?? "?"} v${hello.client.version ?? "?"}`
: "unknown";
const isWebchatHello = (hello: Hello | null | undefined) =>
hello?.client?.mode === "webchat" || hello?.client?.name === "webchat-ui";
logWs("in", "open", { connId, remoteAddr });
const isWebchatConnect = (params: ConnectParams | null | undefined) =>
params?.client?.mode === "webchat" ||
params?.client?.name === "webchat-ui";
const send = (obj: unknown) => {
try {
@@ -539,10 +536,10 @@ export async function startGatewayServer(
socket.once("close", (code, reason) => {
if (!client) {
logWarn(
`gateway/ws closed before hello conn=${connId} remote=${remoteAddr ?? "?"} code=${code ?? "n/a"} reason=${reason?.toString() || "n/a"}`,
`gateway/ws closed before connect conn=${connId} remote=${remoteAddr ?? "?"} code=${code ?? "n/a"} reason=${reason?.toString() || "n/a"}`,
);
}
if (client && isWebchatHello(client.hello)) {
if (client && isWebchatConnect(client.connect)) {
logInfo(
`webchat disconnected code=${code} reason=${reason?.toString() || "n/a"} conn=${connId}`,
);
@@ -585,91 +582,115 @@ export async function startGatewayServer(
try {
const parsed = JSON.parse(text);
if (!client) {
// Expect hello
if (!validateHello(parsed)) {
logWarn(
`gateway/ws invalid hello conn=${connId} remote=${remoteAddr ?? "?"}`,
);
send({
type: "hello-error",
reason: `invalid hello: ${formatValidationErrors(validateHello.errors)}`,
});
socket.close(1008, "invalid hello");
// Handshake must be a normal request:
// { type:"req", method:"connect", params: ConnectParams }.
if (
!validateRequestFrame(parsed) ||
(parsed as RequestFrame).method !== "connect" ||
!validateConnectParams((parsed as RequestFrame).params)
) {
if (validateRequestFrame(parsed)) {
const req = parsed as RequestFrame;
send({
type: "res",
id: req.id,
ok: false,
error: errorShape(
ErrorCodes.INVALID_REQUEST,
req.method === "connect"
? `invalid connect params: ${formatValidationErrors(validateConnectParams.errors)}`
: "invalid handshake: first request must be connect",
),
});
} else {
logWarn(
`gateway/ws invalid handshake conn=${connId} remote=${remoteAddr ?? "?"}`,
);
}
socket.close(1008, "invalid handshake");
close();
return;
}
const hello = parsed as Hello;
const req = parsed as RequestFrame;
const connectParams = req.params as ConnectParams;
// protocol negotiation
const { minProtocol, maxProtocol } = hello;
const { minProtocol, maxProtocol } = connectParams;
if (
maxProtocol < PROTOCOL_VERSION ||
minProtocol > PROTOCOL_VERSION
) {
logWarn(
`gateway/ws protocol mismatch conn=${connId} remote=${remoteAddr ?? "?"} client=${describeHello(hello)}`,
`gateway/ws protocol mismatch conn=${connId} remote=${remoteAddr ?? "?"} client=${connectParams.client.name} ${connectParams.client.mode} v${connectParams.client.version}`,
);
logWs("out", "hello-error", {
connId,
reason: "protocol mismatch",
minProtocol,
maxProtocol,
expected: PROTOCOL_VERSION,
});
send({
type: "hello-error",
reason: "protocol mismatch",
expectedProtocol: PROTOCOL_VERSION,
type: "res",
id: req.id,
ok: false,
error: errorShape(
ErrorCodes.INVALID_REQUEST,
"protocol mismatch",
{
details: { expectedProtocol: PROTOCOL_VERSION },
},
),
});
socket.close(1002, "protocol mismatch");
close();
return;
}
// token auth if required
const token = getGatewayToken();
if (token && hello.auth?.token !== token) {
if (token && connectParams.auth?.token !== token) {
logWarn(
`gateway/ws unauthorized conn=${connId} remote=${remoteAddr ?? "?"} client=${describeHello(hello)}`,
`gateway/ws unauthorized conn=${connId} remote=${remoteAddr ?? "?"} client=${connectParams.client.name} ${connectParams.client.mode} v${connectParams.client.version}`,
);
logWs("out", "hello-error", { connId, reason: "unauthorized" });
send({
type: "hello-error",
reason: "unauthorized",
type: "res",
id: req.id,
ok: false,
error: errorShape(ErrorCodes.INVALID_REQUEST, "unauthorized"),
});
socket.close(1008, "unauthorized");
close();
return;
}
const shouldTrackPresence = hello.client.mode !== "cli";
// synthesize presence entry for this connection (client fingerprint)
const shouldTrackPresence = connectParams.client.mode !== "cli";
const presenceKey = shouldTrackPresence
? hello.client.instanceId || connId
? connectParams.client.instanceId || connId
: undefined;
logWs("in", "hello", {
logWs("in", "connect", {
connId,
client: hello.client.name,
version: hello.client.version,
mode: hello.client.mode,
instanceId: hello.client.instanceId,
platform: hello.client.platform,
token: hello.auth?.token ? "set" : "none",
client: connectParams.client.name,
version: connectParams.client.version,
mode: connectParams.client.mode,
instanceId: connectParams.client.instanceId,
platform: connectParams.client.platform,
token: connectParams.auth?.token ? "set" : "none",
});
if (isWebchatHello(hello)) {
if (isWebchatConnect(connectParams)) {
logInfo(
`webchat connected conn=${connId} remote=${remoteAddr ?? "?"} client=${describeHello(hello)}`,
`webchat connected conn=${connId} remote=${remoteAddr ?? "?"} client=${connectParams.client.name} ${connectParams.client.mode} v${connectParams.client.version}`,
);
}
if (presenceKey) {
upsertPresence(presenceKey, {
host: hello.client.name || os.hostname(),
host: connectParams.client.name || os.hostname(),
ip: isLoopbackAddress(remoteAddr) ? undefined : remoteAddr,
version: hello.client.version,
mode: hello.client.mode,
instanceId: hello.client.instanceId,
version: connectParams.client.version,
mode: connectParams.client.mode,
instanceId: connectParams.client.instanceId,
reason: "connect",
});
presenceVersion += 1;
}
const snapshot = buildSnapshot();
if (healthCache) {
snapshot.health = healthCache;
@@ -695,10 +716,10 @@ export async function startGatewayServer(
tickIntervalMs: TICK_INTERVAL_MS,
},
};
clearTimeout(handshakeTimer);
// Add the client only after the hello response is ready so no tick/presence
// events reach it before the handshake completes.
client = { socket, hello, connId, presenceKey };
client = { socket, connect: connectParams, connId, presenceKey };
logWs("out", "hello-ok", {
connId,
methods: METHODS.length,
@@ -706,11 +727,12 @@ export async function startGatewayServer(
presence: snapshot.presence.length,
stateVersion: snapshot.stateVersion.presence,
});
send(helloOk);
send({ type: "res", id: req.id, ok: true, payload: helloOk });
clients.add(client);
// Kick a health refresh in the background to keep cache warm.
void refreshHealthSnapshot({ probe: true }).catch((err) =>
logError(`post-hello health refresh failed: ${formatError(err)}`),
logError(`post-connect health refresh failed: ${formatError(err)}`),
);
return;
}
@@ -751,6 +773,17 @@ export async function startGatewayServer(
};
switch (req.method) {
case "connect": {
respond(
false,
undefined,
errorShape(
ErrorCodes.INVALID_REQUEST,
"connect is only valid as the first request",
),
);
break;
}
case "health": {
const now = Date.now();
const cached = healthCache;