chore(gateway): use ws bind as lock

This commit is contained in:
Peter Steinberger
2025-12-11 15:17:40 +00:00
parent 47a1f757a9
commit f417b51fb6
8 changed files with 73 additions and 198 deletions

View File

@@ -1,11 +1,9 @@
import { randomUUID } from "node:crypto";
import { type AddressInfo, createServer } from "node:net";
import os from "node:os";
import path from "node:path";
import { afterEach, beforeEach, describe, expect, test, vi } from "vitest";
import { describe, expect, test, vi } from "vitest";
import { WebSocket } from "ws";
import { emitAgentEvent } from "../infra/agent-events.js";
import { startGatewayServer } from "./server.js";
import { GatewayLockError } from "../infra/gateway-lock.js";
vi.mock("../commands/health.js", () => ({
getHealthSnapshot: vi.fn().mockResolvedValue({ ok: true, stub: true }),
@@ -27,19 +25,6 @@ vi.mock("../commands/agent.js", () => ({
process.env.CLAWDIS_SKIP_PROVIDERS = "1";
const originalLockPath = process.env.CLAWDIS_GATEWAY_LOCK_PATH;
beforeEach(() => {
process.env.CLAWDIS_GATEWAY_LOCK_PATH = path.join(
os.tmpdir(),
`clawdis-gateway-${randomUUID()}.lock`,
);
});
afterEach(() => {
process.env.CLAWDIS_GATEWAY_LOCK_PATH = originalLockPath;
});
async function getFreePort(): Promise<number> {
return await new Promise((resolve, reject) => {
const server = createServer();
@@ -50,6 +35,17 @@ async function getFreePort(): Promise<number> {
});
}
async function occupyPort(): Promise<{ server: ReturnType<typeof createServer>; port: number }> {
return await new Promise((resolve, reject) => {
const server = createServer();
server.once("error", reject);
server.listen(0, "127.0.0.1", () => {
const port = (server.address() as AddressInfo).port;
resolve({ server, port });
});
});
}
function onceMessage<T = unknown>(
ws: WebSocket,
filter: (obj: unknown) => boolean,
@@ -654,4 +650,12 @@ describe("gateway server", () => {
ws.close();
await server.close();
});
test("refuses to start when port already bound", async () => {
const { server: blocker, port } = await occupyPort();
await expect(startGatewayServer(port)).rejects.toBeInstanceOf(
GatewayLockError,
);
blocker.close();
});
});

View File

@@ -2,6 +2,7 @@ import { randomUUID } from "node:crypto";
import fs from "node:fs";
import os from "node:os";
import path from "node:path";
import { createServer as createHttpServer, type Server as HttpServer } from "node:http";
import chalk from "chalk";
import { type WebSocket, WebSocketServer } from "ws";
import { createDefaultDeps } from "../cli/deps.js";
@@ -17,7 +18,7 @@ import {
} from "../config/sessions.js";
import { isVerbose } from "../globals.js";
import { onAgentEvent } from "../infra/agent-events.js";
import { acquireGatewayLock, GatewayLockError } from "../infra/gateway-lock.js";
import { GatewayLockError } from "../infra/gateway-lock.js";
import { enqueueSystemEvent } from "../infra/system-events.js";
import {
listSystemPresence,
@@ -255,15 +256,38 @@ export async function startGatewayServer(
port = 18789,
opts?: { webchatPort?: number },
): Promise<GatewayServer> {
const releaseLock = await acquireGatewayLock().catch((err) => {
// Bubble known lock errors so callers can present a nice message.
if (err instanceof GatewayLockError) throw err;
throw new GatewayLockError(String(err));
});
const host = "127.0.0.1";
const httpServer: HttpServer = createHttpServer();
try {
await new Promise<void>((resolve, reject) => {
const onError = (err: NodeJS.ErrnoException) => {
httpServer.off("listening", onListening);
reject(err);
};
const onListening = () => {
httpServer.off("error", onError);
resolve();
};
httpServer.once("error", onError);
httpServer.once("listening", onListening);
httpServer.listen(port, host);
});
} catch (err) {
const code = (err as NodeJS.ErrnoException).code;
if (code === "EADDRINUSE") {
throw new GatewayLockError(
`another gateway instance is already listening on ws://${host}:${port}`,
err,
);
}
throw new GatewayLockError(
`failed to bind gateway socket on ws://${host}:${port}: ${String(err)}`,
err,
);
}
const wss = new WebSocketServer({
port,
host: "127.0.0.1",
server: httpServer,
maxPayload: MAX_PAYLOAD_BYTES,
});
const providerAbort = new AbortController();
@@ -1183,7 +1207,6 @@ export async function startGatewayServer(
return {
close: async () => {
await releaseLock();
providerAbort.abort();
broadcast("shutdown", {
reason: "gateway stopping",
@@ -1211,6 +1234,9 @@ export async function startGatewayServer(
clients.clear();
await Promise.allSettled(providerTasks);
await new Promise<void>((resolve) => wss.close(() => resolve()));
await new Promise<void>((resolve, reject) =>
httpServer.close((err) => (err ? reject(err) : resolve())),
);
},
};
}

View File

@@ -1,34 +0,0 @@
import fs from "node:fs";
import os from "node:os";
import path from "node:path";
import { describe, expect, it } from "vitest";
import { acquireGatewayLock, GatewayLockError } from "./gateway-lock.js";
const newLockPath = () =>
path.join(
os.tmpdir(),
`clawdis-gateway-lock-test-${process.pid}-${Math.random().toString(16).slice(2)}.sock`,
);
describe("gateway-lock", () => {
it("prevents concurrent gateway instances and releases cleanly", async () => {
const lockPath = newLockPath();
const release1 = await acquireGatewayLock(lockPath);
expect(fs.existsSync(lockPath)).toBe(true);
await expect(acquireGatewayLock(lockPath)).rejects.toBeInstanceOf(
GatewayLockError,
);
await release1();
expect(fs.existsSync(lockPath)).toBe(false);
// After release, lock can be reacquired.
const release2 = await acquireGatewayLock(lockPath);
await release2();
expect(fs.existsSync(lockPath)).toBe(false);
});
});

View File

@@ -1,84 +1,6 @@
import fs from "node:fs";
import os from "node:os";
import path from "node:path";
import { flockSync } from "fs-ext";
import { getLogger } from "../logging.js";
const defaultLockPath = () =>
process.env.CLAWDIS_GATEWAY_LOCK_PATH ??
path.join(os.tmpdir(), "clawdis-gateway.lock");
export class GatewayLockError extends Error {}
type ReleaseFn = () => Promise<void>;
const SIGNALS: NodeJS.Signals[] = ["SIGINT", "SIGTERM", "SIGHUP"];
/**
* Acquire an exclusive gateway lock using POSIX flock and write the PID into the same file.
*
* Kernel locks are released automatically when the process exits or is SIGKILLed, so the
* lock cannot become stale. A best-effort unlink on shutdown keeps the path clean, but
* correctness relies solely on the kernel lock.
*/
export async function acquireGatewayLock(
lockPath = defaultLockPath(),
): Promise<ReleaseFn> {
fs.mkdirSync(path.dirname(lockPath), { recursive: true });
const fd = fs.openSync(lockPath, "w+");
try {
flockSync(fd, "exnb");
} catch (err) {
fs.closeSync(fd);
const code = (err as NodeJS.ErrnoException).code;
if (code === "EWOULDBLOCK" || code === "EAGAIN") {
throw new GatewayLockError("another gateway instance is already running");
}
throw new GatewayLockError(
`failed to acquire gateway lock: ${(err as Error).message}`,
);
export class GatewayLockError extends Error {
constructor(message: string, public readonly cause?: unknown) {
super(message);
this.name = "GatewayLockError";
}
fs.ftruncateSync(fd, 0);
fs.writeSync(fd, `${process.pid}\n`, 0, "utf8");
fs.fsyncSync(fd);
getLogger().info({ pid: process.pid, lockPath }, "gateway lock acquired");
let released = false;
const release = async (): Promise<void> => {
if (released) return;
released = true;
try {
flockSync(fd, "un");
} catch {
/* ignore unlock errors */
}
try {
fs.closeSync(fd);
} catch {
/* ignore close errors */
}
try {
fs.rmSync(lockPath, { force: true });
} catch {
/* ignore unlink errors */
}
};
const handleSignal = () => {
void release();
process.exit(0);
};
for (const sig of SIGNALS) {
process.once(sig, handleSignal);
}
process.once("exit", () => {
void release();
});
return release;
}