feat(macos): run bundled gateway via launchd
This commit is contained in:
@@ -1,16 +1,5 @@
|
||||
import Foundation
|
||||
import Network
|
||||
import Observation
|
||||
import OSLog
|
||||
import Subprocess
|
||||
#if canImport(Darwin)
|
||||
import Darwin
|
||||
#endif
|
||||
#if canImport(System)
|
||||
import System
|
||||
#else
|
||||
import SystemPackage
|
||||
#endif
|
||||
|
||||
@MainActor
|
||||
@Observable
|
||||
@@ -20,8 +9,7 @@ final class GatewayProcessManager {
|
||||
enum Status: Equatable {
|
||||
case stopped
|
||||
case starting
|
||||
case running(pid: Int32)
|
||||
case restarting
|
||||
case running(details: String?)
|
||||
case attachedExisting(details: String?)
|
||||
case failed(String)
|
||||
|
||||
@@ -29,8 +17,9 @@ final class GatewayProcessManager {
|
||||
switch self {
|
||||
case .stopped: return "Stopped"
|
||||
case .starting: return "Starting…"
|
||||
case let .running(pid): return "Running (pid \(pid))"
|
||||
case .restarting: return "Restarting…"
|
||||
case let .running(details):
|
||||
if let details, !details.isEmpty { return "Running (\(details))" }
|
||||
return "Running"
|
||||
case let .attachedExisting(details):
|
||||
if let details, !details.isEmpty {
|
||||
return "Using existing gateway (\(details))"
|
||||
@@ -43,41 +32,15 @@ final class GatewayProcessManager {
|
||||
|
||||
private(set) var status: Status = .stopped
|
||||
private(set) var log: String = ""
|
||||
private(set) var restartCount: Int = 0
|
||||
private(set) var environmentStatus: GatewayEnvironmentStatus = .checking
|
||||
private(set) var existingGatewayDetails: String?
|
||||
private(set) var lastFailureReason: String?
|
||||
private(set) var lastExitCode: Int32?
|
||||
private(set) var lastSubprocessError: String?
|
||||
|
||||
private var execution: Execution?
|
||||
private var lastPid: Int32?
|
||||
private var lastCommand: [String]?
|
||||
private var desiredActive = false
|
||||
private var stopping = false
|
||||
private var recentCrashes: [Date] = []
|
||||
private var environmentRefreshTask: Task<Void, Never>?
|
||||
private var lastEnvironmentRefresh: Date?
|
||||
private var logRefreshTask: Task<Void, Never>?
|
||||
|
||||
private final class GatewayLockHandle {
|
||||
private let fd: FileDescriptor
|
||||
private let path: String
|
||||
|
||||
init(fd: FileDescriptor, path: String) {
|
||||
self.fd = fd
|
||||
self.path = path
|
||||
}
|
||||
|
||||
func cancel() {
|
||||
try? self.fd.close()
|
||||
try? FileManager.default.removeItem(atPath: self.path)
|
||||
}
|
||||
}
|
||||
|
||||
private let logger = Logger(subsystem: "com.steipete.clawdis", category: "gateway")
|
||||
private let logLimit = 20000 // characters to keep in-memory
|
||||
private let maxCrashes = 3
|
||||
private let crashWindow: TimeInterval = 120 // seconds
|
||||
private let environmentRefreshMinInterval: TimeInterval = 30
|
||||
|
||||
func setActive(_ active: Bool) {
|
||||
@@ -99,20 +62,13 @@ final class GatewayProcessManager {
|
||||
}
|
||||
|
||||
func startIfNeeded() {
|
||||
guard self.execution == nil, self.desiredActive else { return }
|
||||
guard self.desiredActive else { return }
|
||||
// Do not spawn in remote mode (the gateway should run on the remote host).
|
||||
guard !CommandResolver.connectionModeIsRemote() else {
|
||||
self.status = .stopped
|
||||
return
|
||||
}
|
||||
if self.shouldGiveUpAfterCrashes() {
|
||||
self.status = .failed("Too many crashes; giving up")
|
||||
return
|
||||
}
|
||||
|
||||
if self.status != .restarting {
|
||||
self.status = .starting
|
||||
}
|
||||
self.status = .starting
|
||||
|
||||
// First try to latch onto an already-running gateway to avoid spawning a duplicate.
|
||||
Task { [weak self] in
|
||||
@@ -128,26 +84,17 @@ final class GatewayProcessManager {
|
||||
}
|
||||
return
|
||||
}
|
||||
await self.spawnGateway()
|
||||
await self.enableLaunchdGateway()
|
||||
}
|
||||
}
|
||||
|
||||
func stop() {
|
||||
self.desiredActive = false
|
||||
self.stopping = true
|
||||
self.existingGatewayDetails = nil
|
||||
self.lastFailureReason = nil
|
||||
self.lastExitCode = nil
|
||||
self.lastSubprocessError = nil
|
||||
guard let execution else {
|
||||
self.status = .stopped
|
||||
return
|
||||
}
|
||||
self.status = .stopped
|
||||
Task {
|
||||
await execution.teardown(using: [.gracefulShutDown(allowedDurationToNextStep: .seconds(1))])
|
||||
}
|
||||
self.execution = nil
|
||||
let bundlePath = Bundle.main.bundleURL.path
|
||||
Task { _ = await GatewayLaunchAgentManager.set(enabled: false, bundlePath: bundlePath, port: GatewayEnvironment.gatewayPort()) }
|
||||
}
|
||||
|
||||
func refreshEnvironmentStatus(force: Bool = false) {
|
||||
@@ -173,6 +120,24 @@ final class GatewayProcessManager {
|
||||
}
|
||||
}
|
||||
|
||||
func refreshLog() {
|
||||
guard self.logRefreshTask == nil else { return }
|
||||
let path = LogLocator.launchdGatewayLogPath
|
||||
let limit = self.logLimit
|
||||
self.logRefreshTask = Task { [weak self] in
|
||||
let log = await Task.detached(priority: .utility) {
|
||||
Self.readGatewayLog(path: path, limit: limit)
|
||||
}.value
|
||||
await MainActor.run {
|
||||
guard let self else { return }
|
||||
if !log.isEmpty {
|
||||
self.log = log
|
||||
}
|
||||
self.logRefreshTask = nil
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// MARK: - Internals
|
||||
|
||||
/// Attempt to connect to an already-running gateway on the configured port.
|
||||
@@ -204,6 +169,7 @@ final class GatewayProcessManager {
|
||||
self.existingGatewayDetails = details
|
||||
self.status = .attachedExisting(details: details)
|
||||
self.appendLog("[gateway] using existing instance: \(details)\n")
|
||||
self.refreshLog()
|
||||
return true
|
||||
} catch {
|
||||
// No reachable gateway (or token mismatch) — fall through to spawn.
|
||||
@@ -212,162 +178,47 @@ final class GatewayProcessManager {
|
||||
}
|
||||
}
|
||||
|
||||
private func spawnGateway() async {
|
||||
if self.status != .restarting {
|
||||
self.status = .starting
|
||||
}
|
||||
private func enableLaunchdGateway() async {
|
||||
self.existingGatewayDetails = nil
|
||||
let resolution = await Task.detached(priority: .utility) {
|
||||
GatewayEnvironment.resolveGatewayCommand()
|
||||
}.value
|
||||
await MainActor.run { self.environmentStatus = resolution.status }
|
||||
guard let command = resolution.command else {
|
||||
guard resolution.command != nil else {
|
||||
await MainActor.run {
|
||||
self.status = .failed(resolution.status.message)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
let cwd = self.defaultProjectRoot().path
|
||||
self.appendLog("[gateway] starting: \(command.joined(separator: " ")) (cwd: \(cwd))\n")
|
||||
self.lastCommand = command
|
||||
|
||||
do {
|
||||
// Acquire the same UDS lock the CLI uses to guarantee a single instance.
|
||||
let lockPath = FileManager.default.temporaryDirectory.appendingPathComponent("clawdis-gateway.lock").path
|
||||
let listener = try self.acquireGatewayLock(path: lockPath)
|
||||
|
||||
let result = try await run(
|
||||
.name(command.first ?? "clawdis"),
|
||||
arguments: Arguments(Array(command.dropFirst())),
|
||||
environment: self.makeEnvironment(),
|
||||
workingDirectory: FilePath(cwd))
|
||||
{ execution, stdin, stdout, stderr in
|
||||
self.didStart(execution)
|
||||
// Consume stdout/stderr eagerly so the gateway can't block on full pipes.
|
||||
async let out: Void = self.stream(output: stdout, label: "stdout")
|
||||
async let err: Void = self.stream(output: stderr, label: "stderr")
|
||||
try await stdin.finish()
|
||||
await out
|
||||
await err
|
||||
}
|
||||
|
||||
// Release the lock after the process exits.
|
||||
listener.cancel()
|
||||
|
||||
await self.handleTermination(status: result.terminationStatus)
|
||||
} catch {
|
||||
await self.handleError(error)
|
||||
}
|
||||
}
|
||||
|
||||
/// Minimal clone of the Node gateway lock: take an exclusive file lock.
|
||||
private func acquireGatewayLock(path: String) throws -> GatewayLockHandle {
|
||||
// Remove stale lock if needed (mirrors CLI behavior).
|
||||
try? FileManager.default.removeItem(atPath: path)
|
||||
let fd = try FileDescriptor.open(
|
||||
FilePath(path),
|
||||
.readWrite,
|
||||
options: [.create, .exclusiveCreate],
|
||||
permissions: [.ownerReadWrite])
|
||||
return GatewayLockHandle(fd: fd, path: path)
|
||||
}
|
||||
|
||||
private func didStart(_ execution: Execution) {
|
||||
self.execution = execution
|
||||
self.stopping = false
|
||||
self.lastFailureReason = nil
|
||||
self.lastExitCode = nil
|
||||
self.lastSubprocessError = nil
|
||||
self.status = .running(pid: execution.processIdentifier.value)
|
||||
self.lastPid = execution.processIdentifier.value
|
||||
self.logger.info("gateway started pid \(execution.processIdentifier.value)")
|
||||
Task {
|
||||
await PortGuardian.shared.record(
|
||||
port: GatewayEnvironment.gatewayPort(),
|
||||
pid: execution.processIdentifier.value,
|
||||
command: (self.lastCommand ?? []).joined(separator: " "),
|
||||
mode: AppStateStore.shared.connectionMode)
|
||||
}
|
||||
}
|
||||
|
||||
private func handleTermination(status: TerminationStatus) async {
|
||||
let code: Int32 = switch status {
|
||||
case let .exited(exitCode): exitCode
|
||||
case let .unhandledException(sig): -Int32(sig)
|
||||
}
|
||||
|
||||
self.execution = nil
|
||||
if let pid = self.lastPid {
|
||||
Task { await PortGuardian.shared.removeRecord(pid: pid) }
|
||||
}
|
||||
self.lastPid = nil
|
||||
self.lastCommand = nil
|
||||
if self.stopping || !self.desiredActive {
|
||||
self.status = .stopped
|
||||
self.stopping = false
|
||||
if let pid = self.lastPid {
|
||||
Task { await PortGuardian.shared.removeRecord(pid: pid) }
|
||||
}
|
||||
let bundlePath = Bundle.main.bundleURL.path
|
||||
let port = GatewayEnvironment.gatewayPort()
|
||||
self.appendLog("[gateway] enabling launchd job (\(gatewayLaunchdLabel)) on port \(port)\n")
|
||||
let err = await GatewayLaunchAgentManager.set(enabled: true, bundlePath: bundlePath, port: port)
|
||||
if let err {
|
||||
self.status = .failed(err)
|
||||
self.lastFailureReason = err
|
||||
return
|
||||
}
|
||||
|
||||
self.lastExitCode = code
|
||||
self.lastFailureReason = "Gateway exited (code \(code))."
|
||||
self.recentCrashes.append(Date())
|
||||
self.recentCrashes = self.recentCrashes.filter { Date().timeIntervalSince($0) < self.crashWindow }
|
||||
self.restartCount += 1
|
||||
self.appendLog("[gateway] exited (\(code)).\n")
|
||||
|
||||
if self.shouldGiveUpAfterCrashes() {
|
||||
self.status = .failed("Too many crashes; last exit code \(code).")
|
||||
self.logger.error("gateway crash loop detected; giving up")
|
||||
return
|
||||
}
|
||||
|
||||
self.status = .restarting
|
||||
self.logger.warning("gateway crashed (code \(code)); restarting")
|
||||
// Slight backoff to avoid hammering the system in case of immediate crash-on-start.
|
||||
try? await Task.sleep(nanoseconds: 750_000_000)
|
||||
self.startIfNeeded()
|
||||
}
|
||||
|
||||
private func handleError(_ error: any Error) async {
|
||||
self.execution = nil
|
||||
var message = error.localizedDescription
|
||||
if let sp = error as? SubprocessError {
|
||||
message = "SubprocessError \(sp.code.value): \(sp)"
|
||||
self.lastSubprocessError = message
|
||||
}
|
||||
self.lastFailureReason = message
|
||||
self.appendLog("[gateway] failed: \(message)\n")
|
||||
self.logger.error("gateway failed: \(message, privacy: .public)")
|
||||
if self.desiredActive, !self.shouldGiveUpAfterCrashes() {
|
||||
self.status = .restarting
|
||||
self.recentCrashes.append(Date())
|
||||
self.startIfNeeded()
|
||||
} else {
|
||||
self.status = .failed(error.localizedDescription)
|
||||
}
|
||||
}
|
||||
|
||||
private func shouldGiveUpAfterCrashes() -> Bool {
|
||||
self.recentCrashes = self.recentCrashes.filter { Date().timeIntervalSince($0) < self.crashWindow }
|
||||
return self.recentCrashes.count >= self.maxCrashes
|
||||
}
|
||||
|
||||
private func stream(output: AsyncBufferSequence, label: String) async {
|
||||
do {
|
||||
for try await line in output.lines() {
|
||||
await MainActor.run {
|
||||
self.appendLog(line + "\n")
|
||||
}
|
||||
}
|
||||
} catch {
|
||||
await MainActor.run {
|
||||
self.appendLog("[gateway \(label)] stream error: \(error.localizedDescription)\n")
|
||||
// Best-effort: wait for the gateway to accept connections.
|
||||
let deadline = Date().addingTimeInterval(6)
|
||||
while Date() < deadline {
|
||||
if !self.desiredActive { return }
|
||||
do {
|
||||
_ = try await GatewayConnection.shared.requestRaw(method: .health, timeoutMs: 1500)
|
||||
let instance = await PortGuardian.shared.describe(port: port)
|
||||
let details = instance.map { "pid \($0.pid)" }
|
||||
self.status = .running(details: details)
|
||||
self.refreshLog()
|
||||
return
|
||||
} catch {
|
||||
try? await Task.sleep(nanoseconds: 400_000_000)
|
||||
}
|
||||
}
|
||||
|
||||
self.status = .failed("Gateway did not start in time")
|
||||
self.lastFailureReason = "launchd start timeout"
|
||||
}
|
||||
|
||||
private func appendLog(_ chunk: String) {
|
||||
@@ -379,20 +230,7 @@ final class GatewayProcessManager {
|
||||
|
||||
func clearLog() {
|
||||
self.log = ""
|
||||
}
|
||||
|
||||
private func makeEnvironment() -> Environment {
|
||||
let merged = CommandResolver.preferredPaths().joined(separator: ":")
|
||||
return .inherit.updating([
|
||||
"PATH": merged,
|
||||
"PNPM_HOME": FileManager.default.homeDirectoryForCurrentUser
|
||||
.appendingPathComponent("Library/pnpm").path,
|
||||
"CLAWDIS_PROJECT_ROOT": CommandResolver.projectRoot().path,
|
||||
])
|
||||
}
|
||||
|
||||
private func defaultProjectRoot() -> URL {
|
||||
CommandResolver.projectRoot()
|
||||
try? FileManager.default.removeItem(atPath: LogLocator.launchdGatewayLogPath)
|
||||
}
|
||||
|
||||
func setProjectRoot(path: String) {
|
||||
@@ -402,4 +240,12 @@ final class GatewayProcessManager {
|
||||
func projectRootPath() -> String {
|
||||
CommandResolver.projectRootPath()
|
||||
}
|
||||
|
||||
private static func readGatewayLog(path: String, limit: Int) -> String {
|
||||
guard FileManager.default.fileExists(atPath: path) else { return "" }
|
||||
guard let data = try? Data(contentsOf: URL(fileURLWithPath: path)) else { return "" }
|
||||
let text = String(data: data, encoding: .utf8) ?? ""
|
||||
if text.count <= limit { return text }
|
||||
return String(text.suffix(limit))
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user