import Foundation import Observation @MainActor @Observable final class GatewayProcessManager { static let shared = GatewayProcessManager() enum Status: Equatable { case stopped case starting case running(details: String?) case attachedExisting(details: String?) case failed(String) var label: String { switch self { case .stopped: return "Stopped" case .starting: return "Starting…" case let .running(details): if let details, !details.isEmpty { return "Running (\(details))" } return "Running" case let .attachedExisting(details): if let details, !details.isEmpty { return "Using existing gateway (\(details))" } return "Using existing gateway" case let .failed(reason): return "Failed: \(reason)" } } } private(set) var status: Status = .stopped { didSet { CanvasManager.shared.refreshDebugStatus() } } private(set) var log: String = "" private(set) var environmentStatus: GatewayEnvironmentStatus = .checking private(set) var existingGatewayDetails: String? private(set) var lastFailureReason: String? private var desiredActive = false private var environmentRefreshTask: Task? private var lastEnvironmentRefresh: Date? private var logRefreshTask: Task? private let logger = Logger(subsystem: "com.clawdbot", category: "gateway.process") private let logLimit = 20000 // characters to keep in-memory private let environmentRefreshMinInterval: TimeInterval = 30 func setActive(_ active: Bool) { // Remote mode should never spawn a local gateway; treat as stopped. if CommandResolver.connectionModeIsRemote() { self.desiredActive = false self.stop() self.status = .stopped self.appendLog("[gateway] remote mode active; skipping local gateway\n") self.logger.info("gateway process skipped: remote mode active") return } self.logger.debug("gateway active requested active=\(active)") self.desiredActive = active self.refreshEnvironmentStatus() if active { self.startIfNeeded() } else { self.stop() } } func ensureLaunchAgentEnabledIfNeeded() async { guard !CommandResolver.connectionModeIsRemote() else { return } let enabled = await GatewayLaunchAgentManager.isLoaded() guard !enabled else { return } let bundlePath = Bundle.main.bundleURL.path let port = GatewayEnvironment.gatewayPort() self.appendLog("[gateway] auto-enabling launchd job (\(gatewayLaunchdLabel)) on port \(port)\n") let err = await GatewayLaunchAgentManager.set(enabled: true, bundlePath: bundlePath, port: port) if let err { self.appendLog("[gateway] launchd auto-enable failed: \(err)\n") } } func startIfNeeded() { guard self.desiredActive else { return } // Do not spawn in remote mode (the gateway should run on the remote host). guard !CommandResolver.connectionModeIsRemote() else { self.status = .stopped return } // Many surfaces can call `setActive(true)` in quick succession (startup, Canvas, health checks). // Avoid spawning multiple concurrent "start" tasks that can thrash launchd and flap the port. switch self.status { case .starting, .running, .attachedExisting: return case .stopped, .failed: break } self.status = .starting self.logger.debug("gateway start requested") // First try to latch onto an already-running gateway to avoid spawning a duplicate. Task { [weak self] in guard let self else { return } if await self.attachExistingGatewayIfAvailable() { return } await self.enableLaunchdGateway() } } func stop() { self.desiredActive = false self.existingGatewayDetails = nil self.lastFailureReason = nil self.status = .stopped self.logger.info("gateway stop requested") if CommandResolver.connectionModeIsRemote() { return } let bundlePath = Bundle.main.bundleURL.path Task { _ = await GatewayLaunchAgentManager.set( enabled: false, bundlePath: bundlePath, port: GatewayEnvironment.gatewayPort()) } } func refreshEnvironmentStatus(force: Bool = false) { let now = Date() if !force { if self.environmentRefreshTask != nil { return } if let last = self.lastEnvironmentRefresh, now.timeIntervalSince(last) < self.environmentRefreshMinInterval { return } } self.lastEnvironmentRefresh = now self.environmentRefreshTask = Task { [weak self] in let status = await Task.detached(priority: .utility) { GatewayEnvironment.check() }.value await MainActor.run { guard let self else { return } self.environmentStatus = status self.environmentRefreshTask = nil } } } func refreshLog() { guard self.logRefreshTask == nil else { return } let path = GatewayLaunchAgentManager.launchdGatewayLogPath() let limit = self.logLimit self.logRefreshTask = Task { [weak self] in let log = await Task.detached(priority: .utility) { Self.readGatewayLog(path: path, limit: limit) }.value await MainActor.run { guard let self else { return } if !log.isEmpty { self.log = log } self.logRefreshTask = nil } } } // MARK: - Internals /// Attempt to connect to an already-running gateway on the configured port. /// If successful, mark status as attached and skip spawning a new process. private func attachExistingGatewayIfAvailable() async -> Bool { let port = GatewayEnvironment.gatewayPort() let instance = await PortGuardian.shared.describe(port: port) let instanceText = instance.map { self.describe(instance: $0) } let hasListener = instance != nil let attemptAttach = { try await GatewayConnection.shared.requestRaw(method: .health, timeoutMs: 2000) } for attempt in 0..<(hasListener ? 3 : 1) { do { let data = try await attemptAttach() let snap = decodeHealthSnapshot(from: data) let details = self.describe(details: instanceText, port: port, snap: snap) self.existingGatewayDetails = details self.status = .attachedExisting(details: details) self.appendLog("[gateway] using existing instance: \(details)\n") self.logger.info("gateway using existing instance details=\(details)") self.refreshControlChannelIfNeeded(reason: "attach existing") self.refreshLog() return true } catch { if attempt < 2, hasListener { try? await Task.sleep(nanoseconds: 250_000_000) continue } if hasListener { let reason = self.describeAttachFailure(error, port: port, instance: instance) self.existingGatewayDetails = instanceText self.status = .failed(reason) self.lastFailureReason = reason self.appendLog("[gateway] existing listener on port \(port) but attach failed: \(reason)\n") self.logger.warning("gateway attach failed reason=\(reason)") return true } // No reachable gateway (and no listener) — fall through to spawn. self.existingGatewayDetails = nil return false } } self.existingGatewayDetails = nil return false } private func describe(details instance: String?, port: Int, snap: HealthSnapshot?) -> String { let instanceText = instance ?? "pid unknown" if let snap { let linkId = snap.channelOrder?.first(where: { if let summary = snap.channels[$0] { return summary.linked != nil } return false }) ?? snap.channels.keys.first(where: { if let summary = snap.channels[$0] { return summary.linked != nil } return false }) let linked = linkId.flatMap { snap.channels[$0]?.linked } ?? false let authAge = linkId.flatMap { snap.channels[$0]?.authAgeMs }.flatMap(msToAge) ?? "unknown age" let label = linkId.flatMap { snap.channelLabels?[$0] } ?? linkId?.capitalized ?? "channel" let linkText = linked ? "linked" : "not linked" return "port \(port), \(label) \(linkText), auth \(authAge), \(instanceText)" } return "port \(port), health probe succeeded, \(instanceText)" } private func describe(instance: PortGuardian.Descriptor) -> String { let path = instance.executablePath ?? "path unknown" return "pid \(instance.pid) \(instance.command) @ \(path)" } private func describeAttachFailure(_ error: Error, port: Int, instance: PortGuardian.Descriptor?) -> String { let ns = error as NSError let message = ns.localizedDescription.isEmpty ? "unknown error" : ns.localizedDescription let lower = message.lowercased() if self.isGatewayAuthFailure(error) { return """ Gateway on port \(port) rejected auth. Set gateway.auth.token (or CLAWDBOT_GATEWAY_TOKEN) \ to match the running gateway (or clear it on the gateway) and retry. """ } if lower.contains("protocol mismatch") { return "Gateway on port \(port) is incompatible (protocol mismatch). Update the app/gateway." } if lower.contains("unexpected response") || lower.contains("invalid response") { return "Port \(port) returned non-gateway data; another process is using it." } if let instance { let instanceText = self.describe(instance: instance) return "Gateway listener found on port \(port) (\(instanceText)) but health check failed: \(message)" } return "Gateway listener found on port \(port) but health check failed: \(message)" } private func isGatewayAuthFailure(_ error: Error) -> Bool { if let urlError = error as? URLError, urlError.code == .dataNotAllowed { return true } let ns = error as NSError if ns.domain == "Gateway", ns.code == 1008 { return true } let lower = ns.localizedDescription.lowercased() return lower.contains("unauthorized") || lower.contains("auth") } private func enableLaunchdGateway() async { self.existingGatewayDetails = nil let resolution = await Task.detached(priority: .utility) { GatewayEnvironment.resolveGatewayCommand() }.value await MainActor.run { self.environmentStatus = resolution.status } guard resolution.command != nil else { await MainActor.run { self.status = .failed(resolution.status.message) } self.logger.error("gateway command resolve failed: \(resolution.status.message)") return } let bundlePath = Bundle.main.bundleURL.path let port = GatewayEnvironment.gatewayPort() self.appendLog("[gateway] enabling launchd job (\(gatewayLaunchdLabel)) on port \(port)\n") self.logger.info("gateway enabling launchd port=\(port)") let err = await GatewayLaunchAgentManager.set(enabled: true, bundlePath: bundlePath, port: port) if let err { self.status = .failed(err) self.lastFailureReason = err self.logger.error("gateway launchd enable failed: \(err)") return } // Best-effort: wait for the gateway to accept connections. let deadline = Date().addingTimeInterval(6) while Date() < deadline { if !self.desiredActive { return } do { _ = try await GatewayConnection.shared.requestRaw(method: .health, timeoutMs: 1500) let instance = await PortGuardian.shared.describe(port: port) let details = instance.map { "pid \($0.pid)" } self.status = .running(details: details) self.logger.info("gateway started details=\(details ?? "ok")") self.refreshControlChannelIfNeeded(reason: "gateway started") self.refreshLog() return } catch { try? await Task.sleep(nanoseconds: 400_000_000) } } self.status = .failed("Gateway did not start in time") self.lastFailureReason = "launchd start timeout" self.logger.warning("gateway start timed out") } private func appendLog(_ chunk: String) { self.log.append(chunk) if self.log.count > self.logLimit { self.log = String(self.log.suffix(self.logLimit)) } } private func refreshControlChannelIfNeeded(reason: String) { switch ControlChannel.shared.state { case .connected, .connecting: return case .disconnected, .degraded: break } self.appendLog("[gateway] refreshing control channel (\(reason))\n") self.logger.debug("gateway control channel refresh reason=\(reason)") Task { await ControlChannel.shared.configure() } } func waitForGatewayReady(timeout: TimeInterval = 6) async -> Bool { let deadline = Date().addingTimeInterval(timeout) while Date() < deadline { if !self.desiredActive { return false } do { _ = try await GatewayConnection.shared.requestRaw(method: .health, timeoutMs: 1500) return true } catch { try? await Task.sleep(nanoseconds: 300_000_000) } } self.appendLog("[gateway] readiness wait timed out\n") self.logger.warning("gateway readiness wait timed out") return false } func clearLog() { self.log = "" try? FileManager.default.removeItem(atPath: GatewayLaunchAgentManager.launchdGatewayLogPath()) self.logger.debug("gateway log cleared") } func setProjectRoot(path: String) { CommandResolver.setProjectRoot(path) } func projectRootPath() -> String { CommandResolver.projectRootPath() } private nonisolated static func readGatewayLog(path: String, limit: Int) -> String { guard FileManager.default.fileExists(atPath: path) else { return "" } guard let data = try? Data(contentsOf: URL(fileURLWithPath: path)) else { return "" } let text = String(data: data, encoding: .utf8) ?? "" if text.count <= limit { return text } return String(text.suffix(limit)) } }