test: stabilize gateway ports and timers

This commit is contained in:
Peter Steinberger
2026-01-18 04:29:52 +00:00
parent cf8b3ed988
commit 1a0d1cb7b2
7 changed files with 137 additions and 135 deletions

View File

@@ -1,11 +1,11 @@
import { randomUUID } from "node:crypto";
import fs from "node:fs/promises";
import { createServer } from "node:net";
import os from "node:os";
import path from "node:path";
import { describe, expect, it } from "vitest";
import { GATEWAY_CLIENT_MODES, GATEWAY_CLIENT_NAMES } from "../utils/message-channel.js";
import { getDeterministicFreePortBlock } from "../test-utils/ports.js";
import { GatewayClient } from "./client.js";
import { startGatewayServer } from "./server.js";
@@ -169,49 +169,8 @@ async function buildOpenAIResponsesSse(params: OpenAIResponsesParams): Promise<R
});
}
async function getFreePort(): Promise<number> {
return await new Promise((resolve, reject) => {
const srv = createServer();
srv.on("error", reject);
srv.listen(0, "127.0.0.1", () => {
const addr = srv.address();
if (!addr || typeof addr === "string") {
srv.close();
reject(new Error("failed to acquire free port"));
return;
}
const port = addr.port;
srv.close((err) => {
if (err) reject(err);
else resolve(port);
});
});
});
}
async function isPortFree(port: number): Promise<boolean> {
if (!Number.isFinite(port) || port <= 0 || port > 65535) return false;
return await new Promise((resolve) => {
const srv = createServer();
srv.once("error", () => resolve(false));
srv.listen(port, "127.0.0.1", () => {
srv.close(() => resolve(true));
});
});
}
async function getFreeGatewayPort(): Promise<number> {
// Gateway uses derived ports (bridge/browser/canvas). Avoid flaky collisions by
// ensuring the common derived offsets are free too.
for (let attempt = 0; attempt < 25; attempt += 1) {
const port = await getFreePort();
const candidates = [port, port + 1, port + 2, port + 4];
const ok = (await Promise.all(candidates.map((candidate) => isPortFree(candidate)))).every(
Boolean,
);
if (ok) return port;
}
throw new Error("failed to acquire a free gateway port block");
return await getDeterministicFreePortBlock({ offsets: [0, 1, 2, 3, 4] });
}
function extractPayloadText(result: unknown): string {
@@ -267,7 +226,8 @@ describe("gateway (mock openai): tool calling", () => {
};
const originalFetch = globalThis.fetch;
const openaiResponsesUrl = "https://api.openai.com/v1/responses";
const openaiBaseUrl = "https://api.openai.com/v1";
const openaiResponsesUrl = `${openaiBaseUrl}/responses`;
const isOpenAIResponsesRequest = (url: string) =>
url === openaiResponsesUrl ||
url.startsWith(`${openaiResponsesUrl}/`) ||
@@ -288,6 +248,9 @@ describe("gateway (mock openai): tool calling", () => {
const inputItems = Array.isArray(parsed.input) ? parsed.input : [];
return await buildOpenAIResponsesSse({ input: inputItems });
}
if (url.startsWith(openaiBaseUrl)) {
throw new Error(`unexpected OpenAI request in mock test: ${url}`);
}
if (!originalFetch) {
throw new Error(`fetch is not available (url=${url})`);
@@ -325,7 +288,7 @@ describe("gateway (mock openai): tool calling", () => {
mode: "replace",
providers: {
openai: {
baseUrl: "https://api.openai.com/v1",
baseUrl: openaiBaseUrl,
apiKey: "test",
api: "openai-responses",
models: [
@@ -404,6 +367,5 @@ describe("gateway (mock openai): tool calling", () => {
process.env.CLAWDBOT_SKIP_CANVAS_HOST = prev.skipCanvas;
}
},
30_000,
);
});

View File

@@ -1,6 +1,5 @@
import { randomUUID } from "node:crypto";
import fs from "node:fs/promises";
import { createServer } from "node:net";
import os from "node:os";
import path from "node:path";
@@ -8,52 +7,12 @@ import { describe, expect, it } from "vitest";
import { WebSocket } from "ws";
import { rawDataToString } from "../infra/ws.js";
import { getDeterministicFreePortBlock } from "../test-utils/ports.js";
import { GATEWAY_CLIENT_MODES, GATEWAY_CLIENT_NAMES } from "../utils/message-channel.js";
import { PROTOCOL_VERSION } from "./protocol/index.js";
async function getFreePort(): Promise<number> {
return await new Promise((resolve, reject) => {
const srv = createServer();
srv.on("error", reject);
srv.listen(0, "127.0.0.1", () => {
const addr = srv.address();
if (!addr || typeof addr === "string") {
srv.close();
reject(new Error("failed to acquire free port"));
return;
}
const port = addr.port;
srv.close((err) => {
if (err) reject(err);
else resolve(port);
});
});
});
}
async function isPortFree(port: number): Promise<boolean> {
if (!Number.isFinite(port) || port <= 0 || port > 65535) return false;
return await new Promise((resolve) => {
const srv = createServer();
srv.once("error", () => resolve(false));
srv.listen(port, "127.0.0.1", () => {
srv.close(() => resolve(true));
});
});
}
async function getFreeGatewayPort(): Promise<number> {
// Gateway uses derived ports (bridge/browser/canvas). Avoid flaky collisions by
// ensuring the common derived offsets are free too.
for (let attempt = 0; attempt < 25; attempt += 1) {
const port = await getFreePort();
const candidates = [port, port + 1, port + 2, port + 4];
const ok = (await Promise.all(candidates.map((candidate) => isPortFree(candidate)))).every(
Boolean,
);
if (ok) return port;
}
throw new Error("failed to acquire a free gateway port block");
return await getDeterministicFreePortBlock({ offsets: [0, 1, 2, 3, 4] });
}
async function onceMessage<T = unknown>(

View File

@@ -1,6 +1,8 @@
import { describe, expect, test } from "vitest";
import { describe, expect, test, vi } from "vitest";
import fs from "node:fs/promises";
import { WebSocket } from "ws";
import { PROTOCOL_VERSION } from "./protocol/index.js";
import { HANDSHAKE_TIMEOUT_MS } from "./server-constants.js";
import {
connectReq,
getFreePort,
@@ -13,16 +15,21 @@ import {
installGatewayTestHooks();
async function waitForWsClose(ws: WebSocket, timeoutMs: number): Promise<boolean> {
const deadline = process.hrtime.bigint() + BigInt(timeoutMs) * 1_000_000n;
while (process.hrtime.bigint() < deadline) {
if (ws.readyState === WebSocket.CLOSED) return true;
// Yield to the event loop without relying on timers (fake timers can leak).
await fs.stat(process.cwd()).catch(() => {});
}
return ws.readyState === WebSocket.CLOSED;
}
describe("gateway server auth/connect", () => {
test("closes silent handshakes after timeout", { timeout: 30_000 }, async () => {
vi.useRealTimers();
const { server, ws } = await startServerWithClient();
const closed = await new Promise<boolean>((resolve) => {
const timer = setTimeout(() => resolve(false), 25_000);
ws.once("close", () => {
clearTimeout(timer);
resolve(true);
});
});
const closed = await waitForWsClose(ws, HANDSHAKE_TIMEOUT_MS + 2_000);
expect(closed).toBe(true);
await server.close();
});

View File

@@ -429,5 +429,5 @@ describe("gateway server cron", () => {
testState.cronStorePath = undefined;
await fs.rm(dir, { recursive: true, force: true });
}
}, 15_000);
}, 45_000);
});

View File

@@ -3,7 +3,7 @@ import { type AddressInfo, createServer } from "node:net";
import os from "node:os";
import path from "node:path";
import { afterEach, beforeEach, expect } from "vitest";
import { afterEach, beforeEach, expect, vi } from "vitest";
import { WebSocket } from "ws";
import { resolveMainSessionKeyFromConfig, type SessionEntry } from "../config/sessions.js";
@@ -12,6 +12,7 @@ import { drainSystemEvents, peekSystemEvents } from "../infra/system-events.js";
import { rawDataToString } from "../infra/ws.js";
import { resetLogger, setLoggerOverride } from "../logging.js";
import { DEFAULT_AGENT_ID, toAgentStoreSessionKey } from "../routing/session-key.js";
import { getDeterministicFreePortBlock } from "../test-utils/ports.js";
import { GATEWAY_CLIENT_MODES, GATEWAY_CLIENT_NAMES } from "../utils/message-channel.js";
import { PROTOCOL_VERSION } from "./protocol/index.js";
@@ -29,6 +30,9 @@ import {
} from "./test-helpers.mocks.js";
let previousHome: string | undefined;
let previousUserProfile: string | undefined;
let previousStateDir: string | undefined;
let previousConfigPath: string | undefined;
let tempHome: string | undefined;
let tempConfigRoot: string | undefined;
@@ -60,10 +64,18 @@ export async function writeSessionStore(params: {
export function installGatewayTestHooks() {
beforeEach(async () => {
// Some tests intentionally use fake timers; ensure they don't leak into gateway suites.
vi.useRealTimers();
setLoggerOverride({ level: "silent", consoleLevel: "silent" });
previousHome = process.env.HOME;
previousUserProfile = process.env.USERPROFILE;
previousStateDir = process.env.CLAWDBOT_STATE_DIR;
previousConfigPath = process.env.CLAWDBOT_CONFIG_PATH;
tempHome = await fs.mkdtemp(path.join(os.tmpdir(), "clawdbot-gateway-home-"));
process.env.HOME = tempHome;
process.env.USERPROFILE = tempHome;
process.env.CLAWDBOT_STATE_DIR = path.join(tempHome, ".clawdbot");
delete process.env.CLAWDBOT_CONFIG_PATH;
tempConfigRoot = path.join(tempHome, ".clawdbot-test");
setTestConfigRoot(tempConfigRoot);
sessionStoreSaveDelayMs.value = 0;
@@ -101,8 +113,16 @@ export function installGatewayTestHooks() {
}, 60_000);
afterEach(async () => {
vi.useRealTimers();
resetLogger();
process.env.HOME = previousHome;
if (previousHome === undefined) delete process.env.HOME;
else process.env.HOME = previousHome;
if (previousUserProfile === undefined) delete process.env.USERPROFILE;
else process.env.USERPROFILE = previousUserProfile;
if (previousStateDir === undefined) delete process.env.CLAWDBOT_STATE_DIR;
else process.env.CLAWDBOT_STATE_DIR = previousStateDir;
if (previousConfigPath === undefined) delete process.env.CLAWDBOT_CONFIG_PATH;
else process.env.CLAWDBOT_CONFIG_PATH = previousConfigPath;
if (tempHome) {
await fs.rm(tempHome, {
recursive: true,
@@ -116,42 +136,8 @@ export function installGatewayTestHooks() {
});
}
let nextTestPortOffset = 0;
export async function getFreePort(): Promise<number> {
const workerIdRaw = process.env.VITEST_WORKER_ID ?? process.env.VITEST_POOL_ID ?? "";
const workerId = Number.parseInt(workerIdRaw, 10);
const shard = Number.isFinite(workerId) ? Math.max(0, workerId) : Math.abs(process.pid);
// Avoid flaky "get a free port then bind later" races by allocating from a
// deterministic per-worker port range. Still probe for EADDRINUSE to avoid
// collisions with external processes.
const rangeSize = 1000;
const shardCount = 30;
const base = 30_000 + (Math.abs(shard) % shardCount) * rangeSize; // <= 59_999
for (let attempt = 0; attempt < rangeSize; attempt++) {
const port = base + (nextTestPortOffset++ % rangeSize);
// eslint-disable-next-line no-await-in-loop
const ok = await new Promise<boolean>((resolve) => {
const server = createServer();
server.once("error", () => resolve(false));
server.listen(port, "127.0.0.1", () => {
server.close(() => resolve(true));
});
});
if (ok) return port;
}
// Fallback: let the OS pick a port.
return await new Promise((resolve, reject) => {
const server = createServer();
server.once("error", reject);
server.listen(0, "127.0.0.1", () => {
const port = (server.address() as AddressInfo).port;
server.close((err) => (err ? reject(err) : resolve(port)));
});
});
return await getDeterministicFreePortBlock({ offsets: [0, 1, 2, 3, 4] });
}
export async function occupyPort(): Promise<{

82
src/test-utils/ports.ts Normal file
View File

@@ -0,0 +1,82 @@
import { type AddressInfo, createServer } from "node:net";
async function isPortFree(port: number): Promise<boolean> {
if (!Number.isFinite(port) || port <= 0 || port > 65535) return false;
return await new Promise((resolve) => {
const server = createServer();
server.once("error", () => resolve(false));
server.listen(port, "127.0.0.1", () => {
server.close(() => resolve(true));
});
});
}
async function getOsFreePort(): Promise<number> {
return await new Promise((resolve, reject) => {
const server = createServer();
server.once("error", reject);
server.listen(0, "127.0.0.1", () => {
const addr = server.address();
if (!addr || typeof addr === "string") {
server.close();
reject(new Error("failed to acquire free port"));
return;
}
const port = (addr as AddressInfo).port;
server.close((err) => (err ? reject(err) : resolve(port)));
});
});
}
let nextTestPortOffset = 0;
/**
* Allocate a deterministic per-worker port block.
*
* Motivation: many tests spin up gateway + related services that use derived ports
* (e.g. +1/+2/+3/+4). If each test just grabs an OS free port, parallel test runs
* can collide on derived ports and get flaky EADDRINUSE.
*/
export async function getDeterministicFreePortBlock(params?: {
offsets?: number[];
}): Promise<number> {
const offsets = params?.offsets ?? [0, 1, 2, 3, 4];
const maxOffset = Math.max(...offsets);
const workerIdRaw = process.env.VITEST_WORKER_ID ?? process.env.VITEST_POOL_ID ?? "";
const workerId = Number.parseInt(workerIdRaw, 10);
const shard = Number.isFinite(workerId) ? Math.max(0, workerId) : Math.abs(process.pid);
const rangeSize = 1000;
const shardCount = 30;
const base = 30_000 + (Math.abs(shard) % shardCount) * rangeSize; // <= 59_999
const usable = rangeSize - maxOffset;
// Allocate in blocks to avoid derived-port overlaps (e.g. port+3).
const blockSize = Math.max(maxOffset + 1, 8);
for (let attempt = 0; attempt < usable; attempt += 1) {
const start = base + ((nextTestPortOffset + attempt) % usable);
// eslint-disable-next-line no-await-in-loop
const ok = (
await Promise.all(offsets.map((offset) => isPortFree(start + offset)))
).every(Boolean);
if (!ok) continue;
nextTestPortOffset = (nextTestPortOffset + attempt + blockSize) % usable;
return start;
}
// Fallback: let the OS pick a port block (best effort).
for (let attempt = 0; attempt < 25; attempt += 1) {
// eslint-disable-next-line no-await-in-loop
const port = await getOsFreePort();
// eslint-disable-next-line no-await-in-loop
const ok = (await Promise.all(offsets.map((offset) => isPortFree(port + offset)))).every(
Boolean,
);
if (ok) return port;
}
throw new Error("failed to acquire a free port block");
}

View File

@@ -1,4 +1,10 @@
import { installTestEnv } from "./test-env";
import { afterEach, vi } from "vitest";
const { cleanup } = installTestEnv();
process.on("exit", cleanup);
afterEach(() => {
// Guard against leaked fake timers across test files/workers.
vi.useRealTimers();
});