import Foundation import OSLog import Subprocess #if canImport(Darwin) import Darwin #endif #if canImport(System) import System #else import SystemPackage #endif @MainActor final class RelayProcessManager: ObservableObject { static let shared = RelayProcessManager() enum Status: Equatable { case stopped case starting case running(pid: Int32) case restarting case failed(String) var label: String { switch self { case .stopped: "Stopped" case .starting: "Starting…" case let .running(pid): "Running (pid \(pid))" case .restarting: "Restarting…" case let .failed(reason): "Failed: \(reason)" } } } @Published private(set) var status: Status = .stopped @Published private(set) var log: String = "" @Published private(set) var restartCount: Int = 0 @Published private(set) var environmentStatus: RelayEnvironmentStatus = .checking private var execution: Execution? private var desiredActive = false private var stopping = false private var recentCrashes: [Date] = [] private let logger = Logger(subsystem: "com.steipete.clawdis", category: "relay") private let logLimit = 20000 // characters to keep in-memory private let maxCrashes = 3 private let crashWindow: TimeInterval = 120 // seconds func setActive(_ active: Bool) { self.desiredActive = active self.refreshEnvironmentStatus() if active { self.startIfNeeded() } else { self.stop() } } func startIfNeeded() { guard self.execution == nil, self.desiredActive else { return } if self.shouldGiveUpAfterCrashes() { self.status = .failed("Too many crashes; giving up") return } self.status = self.status == .restarting ? .restarting : .starting Task.detached { [weak self] in guard let self else { return } await self.spawnRelay() } } func stop() { self.desiredActive = false self.stopping = true guard let execution else { self.status = .stopped return } self.status = .stopped Task { await execution.teardown(using: [.gracefulShutDown(allowedDurationToNextStep: .seconds(1))]) } self.execution = nil } func refreshEnvironmentStatus() { self.environmentStatus = RelayEnvironment.check() } // MARK: - Internals private func spawnRelay() async { let resolution = RelayEnvironment.resolveGatewayCommand() await MainActor.run { self.environmentStatus = resolution.status } guard let command = resolution.command else { await MainActor.run { self.status = .failed(resolution.status.message) } return } let cwd = self.defaultProjectRoot().path self.appendLog("[relay] starting: \(command.joined(separator: " ")) (cwd: \(cwd))\n") do { let result = try await run( .name(command.first ?? "clawdis"), arguments: Arguments(Array(command.dropFirst())), environment: self.makeEnvironment(), workingDirectory: FilePath(cwd)) { execution, stdin, stdout, stderr in self.didStart(execution) // Consume stdout/stderr eagerly so the relay can't block on full pipes. async let out: Void = self.stream(output: stdout, label: "stdout") async let err: Void = self.stream(output: stderr, label: "stderr") try await stdin.finish() await out await err } await self.handleTermination(status: result.terminationStatus) } catch { await self.handleError(error) } } private func didStart(_ execution: Execution) { self.execution = execution self.stopping = false self.status = .running(pid: execution.processIdentifier.value) self.logger.info("relay started pid \(execution.processIdentifier.value)") } private func handleTermination(status: TerminationStatus) async { let code: Int32 = switch status { case let .exited(exitCode): exitCode case let .unhandledException(sig): -Int32(sig) } self.execution = nil if self.stopping || !self.desiredActive { self.status = .stopped self.stopping = false return } self.recentCrashes.append(Date()) self.recentCrashes = self.recentCrashes.filter { Date().timeIntervalSince($0) < self.crashWindow } self.restartCount += 1 self.appendLog("[relay] exited (\(code)).\n") if self.shouldGiveUpAfterCrashes() { self.status = .failed("Too many crashes; stopped auto-restart.") self.logger.error("relay crash loop detected; giving up") return } self.status = .restarting self.logger.warning("relay crashed (code \(code)); restarting") // Slight backoff to avoid hammering the system in case of immediate crash-on-start. try? await Task.sleep(nanoseconds: 750_000_000) self.startIfNeeded() } private func handleError(_ error: any Error) async { self.execution = nil var message = error.localizedDescription if let sp = error as? SubprocessError { message = "SubprocessError \(sp.code.value): \(sp)" } self.appendLog("[relay] failed: \(message)\n") self.logger.error("relay failed: \(message, privacy: .public)") if self.desiredActive, !self.shouldGiveUpAfterCrashes() { self.status = .restarting self.recentCrashes.append(Date()) self.startIfNeeded() } else { self.status = .failed(error.localizedDescription) } } private func shouldGiveUpAfterCrashes() -> Bool { self.recentCrashes = self.recentCrashes.filter { Date().timeIntervalSince($0) < self.crashWindow } return self.recentCrashes.count >= self.maxCrashes } private func stream(output: AsyncBufferSequence, label: String) async { do { for try await line in output.lines() { await MainActor.run { self.appendLog(line + "\n") } } } catch { await MainActor.run { self.appendLog("[relay \(label)] stream error: \(error.localizedDescription)\n") } } } private func appendLog(_ chunk: String) { self.log.append(chunk) if self.log.count > self.logLimit { self.log = String(self.log.suffix(self.logLimit)) } } private func makeEnvironment() -> Environment { let merged = CommandResolver.preferredPaths().joined(separator: ":") return .inherit.updating([ "PATH": merged, "PNPM_HOME": FileManager.default.homeDirectoryForCurrentUser .appendingPathComponent("Library/pnpm").path, "CLAWDIS_PROJECT_ROOT": CommandResolver.projectRoot().path, ]) } private func defaultProjectRoot() -> URL { CommandResolver.projectRoot() } func setProjectRoot(path: String) { CommandResolver.setProjectRoot(path) } func projectRootPath() -> String { CommandResolver.projectRootPath() } }