diff --git a/CHANGELOG.md b/CHANGELOG.md index b5271e2ed..cb3bdd616 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -20,6 +20,7 @@ Docs: https://docs.clawd.bot - Nodes tool: include agent/node/gateway context in tool failure logs to speed approval debugging. - macOS: exec approvals now respect wildcard agent allowlists (`*`). - macOS: allow SSH agent auth when no identity file is set. (#1384) Thanks @ameno-. +- Gateway: prevent multiple gateways from sharing the same config/state at once (singleton lock). - UI: remove the chat stop button and keep the composer aligned to the bottom edge. - Typing: start instant typing indicators at run start so DMs and mentions show immediately. - Configure: restrict the model allowlist picker to OAuth-compatible Anthropic models and preselect Opus 4.5. diff --git a/src/cli/gateway-cli/run-loop.ts b/src/cli/gateway-cli/run-loop.ts index 21e32e91b..358d9a3cb 100644 --- a/src/cli/gateway-cli/run-loop.ts +++ b/src/cli/gateway-cli/run-loop.ts @@ -1,4 +1,5 @@ import type { startGatewayServer } from "../../gateway/server.js"; +import { acquireGatewayLock } from "../../infra/gateway-lock.js"; import { consumeGatewaySigusr1RestartAuthorization, isGatewaySigusr1RestartExternallyAllowed, @@ -14,6 +15,7 @@ export async function runGatewayLoop(params: { start: () => Promise>>; runtime: typeof defaultRuntime; }) { + const lock = await acquireGatewayLock(); let server: Awaited> | null = null; let shuttingDown = false; let restartResolver: (() => void) | null = null; @@ -96,6 +98,7 @@ export async function runGatewayLoop(params: { }); } } finally { + await lock?.release(); cleanupSignals(); } } diff --git a/src/infra/gateway-lock.test.ts b/src/infra/gateway-lock.test.ts new file mode 100644 index 000000000..1e4e90cac --- /dev/null +++ b/src/infra/gateway-lock.test.ts @@ -0,0 +1,55 @@ +import fs from "node:fs/promises"; +import os from "node:os"; +import path from "node:path"; + +import { describe, expect, it } from "vitest"; + +import { acquireGatewayLock, GatewayLockError } from "./gateway-lock.js"; + +async function makeEnv() { + const dir = await fs.mkdtemp(path.join(os.tmpdir(), "clawdbot-gateway-lock-")); + const configPath = path.join(dir, "clawdbot.json"); + await fs.writeFile(configPath, "{}", "utf8"); + return { + env: { + ...process.env, + CLAWDBOT_STATE_DIR: dir, + CLAWDBOT_CONFIG_PATH: configPath, + }, + cleanup: async () => { + await fs.rm(dir, { recursive: true, force: true }); + }, + }; +} + +describe("gateway lock", () => { + it("blocks concurrent acquisition until release", async () => { + const { env, cleanup } = await makeEnv(); + const lock = await acquireGatewayLock({ + env, + allowInTests: true, + timeoutMs: 200, + pollIntervalMs: 20, + }); + expect(lock).not.toBeNull(); + + await expect( + acquireGatewayLock({ + env, + allowInTests: true, + timeoutMs: 200, + pollIntervalMs: 20, + }), + ).rejects.toBeInstanceOf(GatewayLockError); + + await lock?.release(); + const lock2 = await acquireGatewayLock({ + env, + allowInTests: true, + timeoutMs: 200, + pollIntervalMs: 20, + }); + await lock2?.release(); + await cleanup(); + }); +}); diff --git a/src/infra/gateway-lock.ts b/src/infra/gateway-lock.ts index f77c7fcf0..bab197f36 100644 --- a/src/infra/gateway-lock.ts +++ b/src/infra/gateway-lock.ts @@ -1,3 +1,33 @@ +import { createHash } from "node:crypto"; +import fs from "node:fs/promises"; +import path from "node:path"; + +import { resolveConfigPath, resolveStateDir } from "../config/paths.js"; + +const DEFAULT_TIMEOUT_MS = 5000; +const DEFAULT_POLL_INTERVAL_MS = 100; +const DEFAULT_STALE_MS = 30_000; + +type LockPayload = { + pid: number; + createdAt: string; + configPath: string; +}; + +export type GatewayLockHandle = { + lockPath: string; + configPath: string; + release: () => Promise; +}; + +export type GatewayLockOptions = { + env?: NodeJS.ProcessEnv; + timeoutMs?: number; + pollIntervalMs?: number; + staleMs?: number; + allowInTests?: boolean; +}; + export class GatewayLockError extends Error { constructor( message: string, @@ -7,3 +37,117 @@ export class GatewayLockError extends Error { this.name = "GatewayLockError"; } } + +function isAlive(pid: number): boolean { + if (!Number.isFinite(pid) || pid <= 0) return false; + try { + process.kill(pid, 0); + return true; + } catch { + return false; + } +} + +async function readLockPayload(lockPath: string): Promise { + try { + const raw = await fs.readFile(lockPath, "utf8"); + const parsed = JSON.parse(raw) as Partial; + if (typeof parsed.pid !== "number") return null; + if (typeof parsed.createdAt !== "string") return null; + if (typeof parsed.configPath !== "string") return null; + return { + pid: parsed.pid, + createdAt: parsed.createdAt, + configPath: parsed.configPath, + }; + } catch { + return null; + } +} + +function resolveGatewayLockPath(env: NodeJS.ProcessEnv) { + const stateDir = resolveStateDir(env); + const configPath = resolveConfigPath(env, stateDir); + const hash = createHash("sha1").update(configPath).digest("hex").slice(0, 8); + const lockPath = path.join(stateDir, `gateway.${hash}.lock`); + return { lockPath, configPath }; +} + +export async function acquireGatewayLock( + opts: GatewayLockOptions = {}, +): Promise { + const env = opts.env ?? process.env; + const allowInTests = opts.allowInTests === true; + if ( + env.CLAWDBOT_ALLOW_MULTI_GATEWAY === "1" || + (!allowInTests && (env.VITEST || env.NODE_ENV === "test")) + ) { + return null; + } + + const timeoutMs = opts.timeoutMs ?? DEFAULT_TIMEOUT_MS; + const pollIntervalMs = opts.pollIntervalMs ?? DEFAULT_POLL_INTERVAL_MS; + const staleMs = opts.staleMs ?? DEFAULT_STALE_MS; + const { lockPath, configPath } = resolveGatewayLockPath(env); + await fs.mkdir(path.dirname(lockPath), { recursive: true }); + + const startedAt = Date.now(); + let lastPayload: LockPayload | null = null; + + while (Date.now() - startedAt < timeoutMs) { + try { + const handle = await fs.open(lockPath, "wx"); + const payload: LockPayload = { + pid: process.pid, + createdAt: new Date().toISOString(), + configPath, + }; + await handle.writeFile(JSON.stringify(payload), "utf8"); + return { + lockPath, + configPath, + release: async () => { + await handle.close().catch(() => undefined); + await fs.rm(lockPath, { force: true }); + }, + }; + } catch (err) { + const code = (err as { code?: unknown }).code; + if (code !== "EEXIST") { + throw new GatewayLockError(`failed to acquire gateway lock at ${lockPath}`, err); + } + + lastPayload = await readLockPayload(lockPath); + const ownerPid = lastPayload?.pid; + const ownerAlive = ownerPid ? isAlive(ownerPid) : false; + if (!ownerAlive && ownerPid) { + await fs.rm(lockPath, { force: true }); + continue; + } + if (!ownerAlive) { + let stale = false; + if (lastPayload?.createdAt) { + const createdAt = Date.parse(lastPayload.createdAt); + stale = Number.isFinite(createdAt) ? Date.now() - createdAt > staleMs : false; + } + if (!stale) { + try { + const st = await fs.stat(lockPath); + stale = Date.now() - st.mtimeMs > staleMs; + } catch { + stale = true; + } + } + if (stale) { + await fs.rm(lockPath, { force: true }); + continue; + } + } + + await new Promise((r) => setTimeout(r, pollIntervalMs)); + } + } + + const owner = lastPayload?.pid ? ` (pid ${lastPayload.pid})` : ""; + throw new GatewayLockError(`gateway already running${owner}; lock timeout after ${timeoutMs}ms`); +} diff --git a/src/macos/gateway-daemon.ts b/src/macos/gateway-daemon.ts index 03379a6df..0b045e254 100644 --- a/src/macos/gateway-daemon.ts +++ b/src/macos/gateway-daemon.ts @@ -45,6 +45,7 @@ async function main() { { startGatewayServer }, { setGatewayWsLogStyle }, { setVerbose }, + { acquireGatewayLock, GatewayLockError }, { consumeGatewaySigusr1RestartAuthorization, isGatewaySigusr1RestartExternallyAllowed }, { defaultRuntime }, { enableConsoleCapture, setConsoleTimestampPrefix }, @@ -53,6 +54,7 @@ async function main() { import("../gateway/server.js"), import("../gateway/ws-logging.js"), import("../globals.js"), + import("../infra/gateway-lock.js"), import("../infra/restart.js"), import("../runtime.js"), import("../logging.js"), @@ -103,6 +105,7 @@ async function main() { if (token) process.env.CLAWDBOT_GATEWAY_TOKEN = token; let server: Awaited> | null = null; + let lock: Awaited> | null = null; let shuttingDown = false; let forceExitTimer: ReturnType | null = null; let restartResolver: (() => void) | null = null; @@ -177,6 +180,15 @@ async function main() { process.on("SIGUSR1", onSigusr1); try { + try { + lock = await acquireGatewayLock(); + } catch (err) { + if (err instanceof GatewayLockError) { + defaultRuntime.error(`Gateway start blocked: ${err.message}`); + process.exit(1); + } + throw err; + } // eslint-disable-next-line no-constant-condition while (true) { try { @@ -191,6 +203,7 @@ async function main() { }); } } finally { + await lock?.release(); cleanupSignals(); } }