fix: improve app restart and gateway logs
This commit is contained in:
@@ -197,6 +197,7 @@ struct DebugSettings: View {
|
||||
Button("Restart app") { self.relaunch() }
|
||||
Button("Reveal app in Finder") { self.revealApp() }
|
||||
Button("Restart Gateway") { DebugActions.restartGateway() }
|
||||
Button("Clear log") { GatewayProcessManager.shared.clearLog() }
|
||||
}
|
||||
.buttonStyle(.bordered)
|
||||
Spacer(minLength: 8)
|
||||
@@ -268,10 +269,13 @@ struct DebugSettings: View {
|
||||
private func relaunch() {
|
||||
let url = Bundle.main.bundleURL
|
||||
let task = Process()
|
||||
task.launchPath = "/usr/bin/open"
|
||||
task.arguments = [url.path]
|
||||
task.launchPath = "/bin/sh"
|
||||
task.arguments = ["-c", "sleep 0.3; open -n \"\(url.path)\""]
|
||||
task.standardOutput = nil
|
||||
task.standardError = nil
|
||||
task.standardInput = nil
|
||||
try? task.run()
|
||||
task.waitUntilExit()
|
||||
// Terminate current instance; spawned shell re-opens after a short delay.
|
||||
NSApp.terminate(nil)
|
||||
}
|
||||
|
||||
|
||||
196
apps/macos/Sources/Clawdis/GatewayEnvironment.swift
Normal file
196
apps/macos/Sources/Clawdis/GatewayEnvironment.swift
Normal file
@@ -0,0 +1,196 @@
|
||||
import ClawdisIPC
|
||||
import Foundation
|
||||
|
||||
// Lightweight SemVer helper (major.minor.patch only) for gateway compatibility checks.
|
||||
struct Semver: Comparable, CustomStringConvertible, Sendable {
|
||||
let major: Int
|
||||
let minor: Int
|
||||
let patch: Int
|
||||
|
||||
var description: String { "\(self.major).\(self.minor).\(self.patch)" }
|
||||
|
||||
static func < (lhs: Semver, rhs: Semver) -> Bool {
|
||||
if lhs.major != rhs.major { return lhs.major < rhs.major }
|
||||
if lhs.minor != rhs.minor { return lhs.minor < rhs.minor }
|
||||
return lhs.patch < rhs.patch
|
||||
}
|
||||
|
||||
static func parse(_ raw: String?) -> Semver? {
|
||||
guard let raw, !raw.isEmpty else { return nil }
|
||||
let cleaned = raw.trimmingCharacters(in: .whitespacesAndNewlines)
|
||||
.replacingOccurrences(of: "^v", with: "", options: .regularExpression)
|
||||
let parts = cleaned.split(separator: ".")
|
||||
guard parts.count >= 3,
|
||||
let major = Int(parts[0]),
|
||||
let minor = Int(parts[1])
|
||||
else { return nil }
|
||||
let patch = Int(parts[2]) ?? 0
|
||||
return Semver(major: major, minor: minor, patch: patch)
|
||||
}
|
||||
|
||||
func compatible(with required: Semver) -> Bool {
|
||||
// Same major and not older than required.
|
||||
self.major == required.major && self >= required
|
||||
}
|
||||
}
|
||||
|
||||
enum GatewayEnvironmentKind: Equatable {
|
||||
case checking
|
||||
case ok
|
||||
case missingNode
|
||||
case missingGateway
|
||||
case incompatible(found: String, required: String)
|
||||
case error(String)
|
||||
}
|
||||
|
||||
struct GatewayEnvironmentStatus: Equatable {
|
||||
let kind: GatewayEnvironmentKind
|
||||
let nodeVersion: String?
|
||||
let gatewayVersion: String?
|
||||
let requiredGateway: String?
|
||||
let message: String
|
||||
|
||||
static var checking: Self {
|
||||
.init(kind: .checking, nodeVersion: nil, gatewayVersion: nil, requiredGateway: nil, message: "Checking…")
|
||||
}
|
||||
}
|
||||
|
||||
struct GatewayCommandResolution {
|
||||
let status: GatewayEnvironmentStatus
|
||||
let command: [String]?
|
||||
}
|
||||
|
||||
enum GatewayEnvironment {
|
||||
static func gatewayPort() -> Int {
|
||||
let stored = UserDefaults.standard.integer(forKey: "gatewayPort")
|
||||
return stored > 0 ? stored : 18789
|
||||
}
|
||||
|
||||
static func expectedGatewayVersion() -> Semver? {
|
||||
let bundleVersion = Bundle.main.infoDictionary?["CFBundleShortVersionString"] as? String
|
||||
return Semver.parse(bundleVersion)
|
||||
}
|
||||
|
||||
static func check() -> GatewayEnvironmentStatus {
|
||||
let expected = self.expectedGatewayVersion()
|
||||
let projectRoot = CommandResolver.projectRoot()
|
||||
let projectEntrypoint = CommandResolver.gatewayEntrypoint(in: projectRoot)
|
||||
|
||||
switch RuntimeLocator.resolve(searchPaths: CommandResolver.preferredPaths()) {
|
||||
case let .failure(err):
|
||||
return GatewayEnvironmentStatus(
|
||||
kind: .missingNode,
|
||||
nodeVersion: nil,
|
||||
gatewayVersion: nil,
|
||||
requiredGateway: expected?.description,
|
||||
message: RuntimeLocator.describeFailure(err))
|
||||
case let .success(runtime):
|
||||
let gatewayBin = CommandResolver.clawdisExecutable()
|
||||
|
||||
if gatewayBin == nil, projectEntrypoint == nil {
|
||||
return GatewayEnvironmentStatus(
|
||||
kind: .missingGateway,
|
||||
nodeVersion: runtime.version.description,
|
||||
gatewayVersion: nil,
|
||||
requiredGateway: expected?.description,
|
||||
message: "clawdis CLI not found in PATH; install the global package.")
|
||||
}
|
||||
|
||||
let installedGateway = gatewayBin.flatMap { self.readGatewayVersion(binary: $0) }
|
||||
?? self.readLocalGatewayVersion(projectRoot: projectRoot)
|
||||
|
||||
if let expected, let installed = installedGateway, !installed.compatible(with: expected) {
|
||||
return GatewayEnvironmentStatus(
|
||||
kind: .incompatible(found: installed.description, required: expected.description),
|
||||
nodeVersion: runtime.version.description,
|
||||
gatewayVersion: installed.description,
|
||||
requiredGateway: expected.description,
|
||||
message: "Gateway version \(installed.description) is incompatible with app \(expected.description); install/update the global package.")
|
||||
}
|
||||
|
||||
let gatewayLabel = gatewayBin != nil ? "global" : "local"
|
||||
let gatewayVersionText = installedGateway?.description ?? "unknown"
|
||||
return GatewayEnvironmentStatus(
|
||||
kind: .ok,
|
||||
nodeVersion: runtime.version.description,
|
||||
gatewayVersion: gatewayVersionText,
|
||||
requiredGateway: expected?.description,
|
||||
message: "Node \(runtime.version.description); gateway \(gatewayVersionText) (\(gatewayLabel))")
|
||||
}
|
||||
}
|
||||
|
||||
static func resolveGatewayCommand() -> GatewayCommandResolution {
|
||||
let projectRoot = CommandResolver.projectRoot()
|
||||
let projectEntrypoint = CommandResolver.gatewayEntrypoint(in: projectRoot)
|
||||
let status = self.check()
|
||||
let gatewayBin = CommandResolver.clawdisExecutable()
|
||||
let runtime = RuntimeLocator.resolve(searchPaths: CommandResolver.preferredPaths())
|
||||
|
||||
guard case .ok = status.kind else {
|
||||
return GatewayCommandResolution(status: status, command: nil)
|
||||
}
|
||||
|
||||
let port = self.gatewayPort()
|
||||
if let gatewayBin {
|
||||
let cmd = [gatewayBin, "gateway", "--port", "\(port)"]
|
||||
return GatewayCommandResolution(status: status, command: cmd)
|
||||
}
|
||||
|
||||
if let entry = projectEntrypoint,
|
||||
case let .success(resolvedRuntime) = runtime
|
||||
{
|
||||
let cmd = [resolvedRuntime.path, entry, "gateway", "--port", "\(port)"]
|
||||
return GatewayCommandResolution(status: status, command: cmd)
|
||||
}
|
||||
|
||||
return GatewayCommandResolution(status: status, command: nil)
|
||||
}
|
||||
|
||||
static func installGlobal(version: Semver?, statusHandler: @escaping @Sendable (String) -> Void) async {
|
||||
let preferred = CommandResolver.preferredPaths().joined(separator: ":")
|
||||
let target = version?.description ?? "latest"
|
||||
let pnpm = CommandResolver.findExecutable(named: "pnpm") ?? "pnpm"
|
||||
let cmd = [pnpm, "add", "-g", "clawdis@\(target)"]
|
||||
|
||||
statusHandler("Installing clawdis@\(target) via pnpm…")
|
||||
let response = await ShellExecutor.run(command: cmd, cwd: nil, env: ["PATH": preferred], timeout: 300)
|
||||
if response.ok {
|
||||
statusHandler("Installed clawdis@\(target)")
|
||||
} else {
|
||||
let detail = response.message ?? "install failed"
|
||||
statusHandler("Install failed: \(detail)")
|
||||
}
|
||||
}
|
||||
|
||||
// MARK: - Internals
|
||||
|
||||
private static func readGatewayVersion(binary: String) -> Semver? {
|
||||
let process = Process()
|
||||
process.executableURL = URL(fileURLWithPath: binary)
|
||||
process.arguments = ["--version"]
|
||||
process.environment = ["PATH": CommandResolver.preferredPaths().joined(separator: ":")]
|
||||
|
||||
let pipe = Pipe()
|
||||
process.standardOutput = pipe
|
||||
process.standardError = pipe
|
||||
do {
|
||||
try process.run()
|
||||
process.waitUntilExit()
|
||||
let data = pipe.fileHandleForReading.readDataToEndOfFile()
|
||||
let raw = String(data: data, encoding: .utf8)?.trimmingCharacters(in: .whitespacesAndNewlines)
|
||||
return Semver.parse(raw)
|
||||
} catch {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
private static func readLocalGatewayVersion(projectRoot: URL) -> Semver? {
|
||||
let pkg = projectRoot.appendingPathComponent("package.json")
|
||||
guard let data = try? Data(contentsOf: pkg) else { return nil }
|
||||
guard
|
||||
let json = try? JSONSerialization.jsonObject(with: data) as? [String: Any],
|
||||
let version = json["version"] as? String
|
||||
else { return nil }
|
||||
return Semver.parse(version)
|
||||
}
|
||||
}
|
||||
234
apps/macos/Sources/Clawdis/GatewayProcessManager.swift
Normal file
234
apps/macos/Sources/Clawdis/GatewayProcessManager.swift
Normal file
@@ -0,0 +1,234 @@
|
||||
import Foundation
|
||||
import OSLog
|
||||
import Subprocess
|
||||
#if canImport(Darwin)
|
||||
import Darwin
|
||||
#endif
|
||||
#if canImport(System)
|
||||
import System
|
||||
#else
|
||||
import SystemPackage
|
||||
#endif
|
||||
|
||||
@MainActor
|
||||
final class GatewayProcessManager: ObservableObject {
|
||||
static let shared = GatewayProcessManager()
|
||||
|
||||
enum Status: Equatable {
|
||||
case stopped
|
||||
case starting
|
||||
case running(pid: Int32)
|
||||
case restarting
|
||||
case failed(String)
|
||||
|
||||
var label: String {
|
||||
switch self {
|
||||
case .stopped: "Stopped"
|
||||
case .starting: "Starting…"
|
||||
case let .running(pid): "Running (pid \(pid))"
|
||||
case .restarting: "Restarting…"
|
||||
case let .failed(reason): "Failed: \(reason)"
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Published private(set) var status: Status = .stopped
|
||||
@Published private(set) var log: String = ""
|
||||
@Published private(set) var restartCount: Int = 0
|
||||
@Published private(set) var environmentStatus: GatewayEnvironmentStatus = .checking
|
||||
|
||||
private var execution: Execution?
|
||||
private var desiredActive = false
|
||||
private var stopping = false
|
||||
private var recentCrashes: [Date] = []
|
||||
|
||||
private let logger = Logger(subsystem: "com.steipete.clawdis", category: "gateway")
|
||||
private let logLimit = 20000 // characters to keep in-memory
|
||||
private let maxCrashes = 3
|
||||
private let crashWindow: TimeInterval = 120 // seconds
|
||||
|
||||
func setActive(_ active: Bool) {
|
||||
self.desiredActive = active
|
||||
self.refreshEnvironmentStatus()
|
||||
if active {
|
||||
self.startIfNeeded()
|
||||
} else {
|
||||
self.stop()
|
||||
}
|
||||
}
|
||||
|
||||
func startIfNeeded() {
|
||||
guard self.execution == nil, self.desiredActive else { return }
|
||||
if self.shouldGiveUpAfterCrashes() {
|
||||
self.status = .failed("Too many crashes; giving up")
|
||||
return
|
||||
}
|
||||
self.status = self.status == .restarting ? .restarting : .starting
|
||||
Task.detached { [weak self] in
|
||||
guard let self else { return }
|
||||
await self.spawnGateway()
|
||||
}
|
||||
}
|
||||
|
||||
func stop() {
|
||||
self.desiredActive = false
|
||||
self.stopping = true
|
||||
guard let execution else {
|
||||
self.status = .stopped
|
||||
return
|
||||
}
|
||||
self.status = .stopped
|
||||
Task {
|
||||
await execution.teardown(using: [.gracefulShutDown(allowedDurationToNextStep: .seconds(1))])
|
||||
}
|
||||
self.execution = nil
|
||||
}
|
||||
|
||||
func refreshEnvironmentStatus() {
|
||||
self.environmentStatus = GatewayEnvironment.check()
|
||||
}
|
||||
|
||||
// MARK: - Internals
|
||||
|
||||
private func spawnGateway() async {
|
||||
let resolution = GatewayEnvironment.resolveGatewayCommand()
|
||||
await MainActor.run { self.environmentStatus = resolution.status }
|
||||
guard let command = resolution.command else {
|
||||
await MainActor.run {
|
||||
self.status = .failed(resolution.status.message)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
let cwd = self.defaultProjectRoot().path
|
||||
self.appendLog("[gateway] starting: \(command.joined(separator: " ")) (cwd: \(cwd))\n")
|
||||
|
||||
do {
|
||||
let result = try await run(
|
||||
.name(command.first ?? "clawdis"),
|
||||
arguments: Arguments(Array(command.dropFirst())),
|
||||
environment: self.makeEnvironment(),
|
||||
workingDirectory: FilePath(cwd))
|
||||
{ execution, stdin, stdout, stderr in
|
||||
self.didStart(execution)
|
||||
// Consume stdout/stderr eagerly so the gateway can't block on full pipes.
|
||||
async let out: Void = self.stream(output: stdout, label: "stdout")
|
||||
async let err: Void = self.stream(output: stderr, label: "stderr")
|
||||
try await stdin.finish()
|
||||
await out
|
||||
await err
|
||||
}
|
||||
|
||||
await self.handleTermination(status: result.terminationStatus)
|
||||
} catch {
|
||||
await self.handleError(error)
|
||||
}
|
||||
}
|
||||
|
||||
private func didStart(_ execution: Execution) {
|
||||
self.execution = execution
|
||||
self.stopping = false
|
||||
self.status = .running(pid: execution.processIdentifier.value)
|
||||
self.logger.info("gateway started pid \(execution.processIdentifier.value)")
|
||||
}
|
||||
|
||||
private func handleTermination(status: TerminationStatus) async {
|
||||
let code: Int32 = switch status {
|
||||
case let .exited(exitCode): exitCode
|
||||
case let .unhandledException(sig): -Int32(sig)
|
||||
}
|
||||
|
||||
self.execution = nil
|
||||
if self.stopping || !self.desiredActive {
|
||||
self.status = .stopped
|
||||
self.stopping = false
|
||||
return
|
||||
}
|
||||
|
||||
self.recentCrashes.append(Date())
|
||||
self.recentCrashes = self.recentCrashes.filter { Date().timeIntervalSince($0) < self.crashWindow }
|
||||
self.restartCount += 1
|
||||
self.appendLog("[gateway] exited (\(code)).\n")
|
||||
|
||||
if self.shouldGiveUpAfterCrashes() {
|
||||
self.status = .failed("Too many crashes; stopped auto-restart.")
|
||||
self.logger.error("gateway crash loop detected; giving up")
|
||||
return
|
||||
}
|
||||
|
||||
self.status = .restarting
|
||||
self.logger.warning("gateway crashed (code \(code)); restarting")
|
||||
// Slight backoff to avoid hammering the system in case of immediate crash-on-start.
|
||||
try? await Task.sleep(nanoseconds: 750_000_000)
|
||||
self.startIfNeeded()
|
||||
}
|
||||
|
||||
private func handleError(_ error: any Error) async {
|
||||
self.execution = nil
|
||||
var message = error.localizedDescription
|
||||
if let sp = error as? SubprocessError {
|
||||
message = "SubprocessError \(sp.code.value): \(sp)"
|
||||
}
|
||||
self.appendLog("[gateway] failed: \(message)\n")
|
||||
self.logger.error("gateway failed: \(message, privacy: .public)")
|
||||
if self.desiredActive, !self.shouldGiveUpAfterCrashes() {
|
||||
self.status = .restarting
|
||||
self.recentCrashes.append(Date())
|
||||
self.startIfNeeded()
|
||||
} else {
|
||||
self.status = .failed(error.localizedDescription)
|
||||
}
|
||||
}
|
||||
|
||||
private func shouldGiveUpAfterCrashes() -> Bool {
|
||||
self.recentCrashes = self.recentCrashes.filter { Date().timeIntervalSince($0) < self.crashWindow }
|
||||
return self.recentCrashes.count >= self.maxCrashes
|
||||
}
|
||||
|
||||
private func stream(output: AsyncBufferSequence, label: String) async {
|
||||
do {
|
||||
for try await line in output.lines() {
|
||||
await MainActor.run {
|
||||
self.appendLog(line + "\n")
|
||||
}
|
||||
}
|
||||
} catch {
|
||||
await MainActor.run {
|
||||
self.appendLog("[gateway \(label)] stream error: \(error.localizedDescription)\n")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private func appendLog(_ chunk: String) {
|
||||
self.log.append(chunk)
|
||||
if self.log.count > self.logLimit {
|
||||
self.log = String(self.log.suffix(self.logLimit))
|
||||
}
|
||||
}
|
||||
|
||||
func clearLog() {
|
||||
self.log = ""
|
||||
}
|
||||
|
||||
private func makeEnvironment() -> Environment {
|
||||
let merged = CommandResolver.preferredPaths().joined(separator: ":")
|
||||
return .inherit.updating([
|
||||
"PATH": merged,
|
||||
"PNPM_HOME": FileManager.default.homeDirectoryForCurrentUser
|
||||
.appendingPathComponent("Library/pnpm").path,
|
||||
"CLAWDIS_PROJECT_ROOT": CommandResolver.projectRoot().path,
|
||||
])
|
||||
}
|
||||
|
||||
private func defaultProjectRoot() -> URL {
|
||||
CommandResolver.projectRoot()
|
||||
}
|
||||
|
||||
func setProjectRoot(path: String) {
|
||||
CommandResolver.setProjectRoot(path)
|
||||
}
|
||||
|
||||
func projectRootPath() -> String {
|
||||
CommandResolver.projectRootPath()
|
||||
}
|
||||
}
|
||||
37
docs/refactor/web-gateway-troubleshooting.md
Normal file
37
docs/refactor/web-gateway-troubleshooting.md
Normal file
@@ -0,0 +1,37 @@
|
||||
---
|
||||
summary: "Troubleshooting guide for the web gateway/Baileys stack"
|
||||
read_when:
|
||||
- Diagnosing web gateway socket or login issues
|
||||
---
|
||||
# Web Gateway Troubleshooting (Nov 26, 2025)
|
||||
|
||||
## Symptoms & quick fixes
|
||||
- **Stream Errored / Conflict / status 409–515:** WhatsApp closed the socket because another session is active or creds went stale. Run `clawdis logout` then `clawdis login --provider web` and restart the gateway.
|
||||
- **Logged out:** Console prints “session logged out”; re-link with `clawdis login --provider web`.
|
||||
- **Repeated retries then exit:** Reconnects are capped (default 12 attempts). Tune with `--web-retries`, `--web-retry-initial`, `--web-retry-max`, or config `web.reconnect`.
|
||||
- **No inbound messages:** Ensure the QR-linked account is online in WhatsApp, and check logs for `web-heartbeat` to confirm auth age/connection.
|
||||
- **Fast nuke:** From an allowed WhatsApp sender you can send `/restart` to kick `com.steipete.clawdis` via launchd; wait a few seconds for it to relink.
|
||||
|
||||
## Helpful commands
|
||||
- Start gateway web-only: `pnpm clawdis gateway --provider web --verbose`
|
||||
- Show who is linked: `pnpm clawdis gateway --provider web --verbose` (first line prints the linked E.164)
|
||||
- Logout (clear creds): `pnpm clawdis logout`
|
||||
- Relink: `pnpm clawdis login --provider web`
|
||||
- Tail logs (default): `tail -f /tmp/clawdis/clawdis.log`
|
||||
|
||||
## Reading the logs
|
||||
- `web-reconnect`: close reasons, retry/backoff, max-attempt exit.
|
||||
- `web-heartbeat`: connectionId, messagesHandled, authAgeMs, uptimeMs (every 60s by default).
|
||||
- `web-auto-reply`: inbound/outbound message records with correlation IDs.
|
||||
|
||||
## When to tweak knobs
|
||||
- High churn networks: increase `web.reconnect.maxAttempts` or `--web-retries`.
|
||||
- Slow links: raise `--web-retry-max` to give more headroom before bailing.
|
||||
- Chatty monitors: increase `--web-heartbeat` interval if log volume is high.
|
||||
|
||||
## If it keeps failing
|
||||
1) `clawdis logout` → `clawdis login --provider web` (fresh QR link).
|
||||
2) Ensure no other device/browser is using the same WA Web session.
|
||||
3) Check WhatsApp mobile app is online and not in low-power mode.
|
||||
4) If status is 515, let the client restart once after pairing (already handled automatically).
|
||||
5) Capture the last `web-reconnect` entry and the status code before escalating.
|
||||
34
src/infra/gateway-lock.test.ts
Normal file
34
src/infra/gateway-lock.test.ts
Normal file
@@ -0,0 +1,34 @@
|
||||
import fs from "node:fs";
|
||||
import os from "node:os";
|
||||
import path from "node:path";
|
||||
|
||||
import { describe, expect, it } from "vitest";
|
||||
|
||||
import { acquireGatewayLock, GatewayLockError } from "./gateway-lock.js";
|
||||
|
||||
const newLockPath = () =>
|
||||
path.join(
|
||||
os.tmpdir(),
|
||||
`clawdis-gateway-lock-test-${process.pid}-${Math.random().toString(16).slice(2)}.sock`,
|
||||
);
|
||||
|
||||
describe("gateway-lock", () => {
|
||||
it("prevents concurrent gateway instances and releases cleanly", async () => {
|
||||
const lockPath = newLockPath();
|
||||
|
||||
const release1 = await acquireGatewayLock(lockPath);
|
||||
expect(fs.existsSync(lockPath)).toBe(true);
|
||||
|
||||
await expect(acquireGatewayLock(lockPath)).rejects.toBeInstanceOf(
|
||||
GatewayLockError,
|
||||
);
|
||||
|
||||
await release1();
|
||||
expect(fs.existsSync(lockPath)).toBe(false);
|
||||
|
||||
// After release, lock can be reacquired.
|
||||
const release2 = await acquireGatewayLock(lockPath);
|
||||
await release2();
|
||||
expect(fs.existsSync(lockPath)).toBe(false);
|
||||
});
|
||||
});
|
||||
102
src/infra/gateway-lock.ts
Normal file
102
src/infra/gateway-lock.ts
Normal file
@@ -0,0 +1,102 @@
|
||||
import fs from "node:fs";
|
||||
import net from "node:net";
|
||||
import os from "node:os";
|
||||
import path from "node:path";
|
||||
|
||||
const DEFAULT_LOCK_PATH = path.join(os.tmpdir(), "clawdis-gateway.lock");
|
||||
|
||||
export class GatewayLockError extends Error {}
|
||||
|
||||
type ReleaseFn = () => Promise<void>;
|
||||
|
||||
/**
|
||||
* Acquire an exclusive single-instance lock for the gateway using a Unix domain socket.
|
||||
*
|
||||
* Why a socket? If the process crashes or is SIGKILLed, the socket file remains but
|
||||
* the next start will detect ECONNREFUSED when connecting and clean the stale path
|
||||
* before retrying. This keeps the lock self-healing without manual pidfile cleanup.
|
||||
*/
|
||||
export async function acquireGatewayLock(
|
||||
lockPath = DEFAULT_LOCK_PATH,
|
||||
): Promise<ReleaseFn> {
|
||||
// Fast path: try to listen on the lock path.
|
||||
const attemptListen = (): Promise<net.Server> =>
|
||||
new Promise((resolve, reject) => {
|
||||
const server = net.createServer();
|
||||
|
||||
server.once("error", async (err: NodeJS.ErrnoException) => {
|
||||
if (err.code !== "EADDRINUSE") {
|
||||
reject(new GatewayLockError(`lock listen failed: ${err.message}`));
|
||||
return;
|
||||
}
|
||||
|
||||
// Something is already bound. Try to connect to see if it is alive.
|
||||
const client = net.connect({ path: lockPath });
|
||||
|
||||
client.once("connect", () => {
|
||||
client.destroy();
|
||||
reject(
|
||||
new GatewayLockError("another gateway instance is already running"),
|
||||
);
|
||||
});
|
||||
|
||||
client.once("error", (connErr: NodeJS.ErrnoException) => {
|
||||
// Nothing is listening -> stale socket file. Remove and retry once.
|
||||
if (connErr.code === "ECONNREFUSED" || connErr.code === "ENOENT") {
|
||||
try {
|
||||
fs.rmSync(lockPath, { force: true });
|
||||
} catch (rmErr) {
|
||||
reject(
|
||||
new GatewayLockError(
|
||||
`failed to clean stale lock at ${lockPath}: ${String(rmErr)}`,
|
||||
),
|
||||
);
|
||||
return;
|
||||
}
|
||||
attemptListen().then(resolve, reject);
|
||||
return;
|
||||
}
|
||||
|
||||
reject(
|
||||
new GatewayLockError(
|
||||
`failed to connect to existing lock (${lockPath}): ${connErr.message}`,
|
||||
),
|
||||
);
|
||||
});
|
||||
});
|
||||
|
||||
server.listen(lockPath, () => resolve(server));
|
||||
});
|
||||
|
||||
const server = await attemptListen();
|
||||
|
||||
let released = false;
|
||||
const release = async (): Promise<void> => {
|
||||
if (released) return;
|
||||
released = true;
|
||||
await new Promise<void>((resolve) => server.close(() => resolve()));
|
||||
try {
|
||||
fs.rmSync(lockPath, { force: true });
|
||||
} catch {
|
||||
/* ignore */
|
||||
}
|
||||
};
|
||||
|
||||
const cleanupSignals: NodeJS.Signals[] = ["SIGINT", "SIGTERM", "SIGHUP"];
|
||||
const handleSignal = async () => {
|
||||
await release();
|
||||
process.exit(0);
|
||||
};
|
||||
|
||||
for (const sig of cleanupSignals) {
|
||||
process.once(sig, () => {
|
||||
void handleSignal();
|
||||
});
|
||||
}
|
||||
process.once("exit", () => {
|
||||
// Exit handler must be sync-safe; release is async but close+rm are fast.
|
||||
void release();
|
||||
});
|
||||
|
||||
return release;
|
||||
}
|
||||
Reference in New Issue
Block a user