From 872d54a2dd2276d7947c0e50b24f68aca469b323 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Wed, 10 Dec 2025 01:04:29 +0100 Subject: [PATCH] mac: guard ports and sweep stale tunnels --- .../Clawdis/GatewayProcessManager.swift | 19 +++ apps/macos/Sources/Clawdis/MenuBar.swift | 4 + apps/macos/Sources/Clawdis/PortGuardian.swift | 146 ++++++++++++++++++ .../macos/Sources/Clawdis/WebChatWindow.swift | 12 ++ 4 files changed, 181 insertions(+) create mode 100644 apps/macos/Sources/Clawdis/PortGuardian.swift diff --git a/apps/macos/Sources/Clawdis/GatewayProcessManager.swift b/apps/macos/Sources/Clawdis/GatewayProcessManager.swift index 54bee6e87..604edb68b 100644 --- a/apps/macos/Sources/Clawdis/GatewayProcessManager.swift +++ b/apps/macos/Sources/Clawdis/GatewayProcessManager.swift @@ -46,6 +46,8 @@ final class GatewayProcessManager: ObservableObject { @Published private(set) var existingGatewayDetails: String? private var execution: Execution? + private var lastPid: Int32? + private var lastCommand: [String]? private var desiredActive = false private var stopping = false private var recentCrashes: [Date] = [] @@ -167,6 +169,7 @@ final class GatewayProcessManager: ObservableObject { let cwd = self.defaultProjectRoot().path self.appendLog("[gateway] starting: \(command.joined(separator: " ")) (cwd: \(cwd))\n") + self.lastCommand = command do { // Acquire the same UDS lock the CLI uses to guarantee a single instance. @@ -214,7 +217,15 @@ final class GatewayProcessManager: ObservableObject { self.execution = execution self.stopping = false self.status = .running(pid: execution.processIdentifier.value) + self.lastPid = execution.processIdentifier.value self.logger.info("gateway started pid \(execution.processIdentifier.value)") + Task { + await PortGuardian.shared.record( + port: GatewayEnvironment.gatewayPort(), + pid: execution.processIdentifier.value, + command: (self.lastCommand ?? []).joined(separator: " "), + mode: AppStateStore.shared.connectionMode) + } } private func handleTermination(status: TerminationStatus) async { @@ -224,9 +235,17 @@ final class GatewayProcessManager: ObservableObject { } self.execution = nil + if let pid = self.lastPid { + Task { await PortGuardian.shared.removeRecord(pid: pid) } + } + self.lastPid = nil + self.lastCommand = nil if self.stopping || !self.desiredActive { self.status = .stopped self.stopping = false + if let pid = self.lastPid { + Task { await PortGuardian.shared.removeRecord(pid: pid) } + } return } diff --git a/apps/macos/Sources/Clawdis/MenuBar.swift b/apps/macos/Sources/Clawdis/MenuBar.swift index e513ae5ec..90af16ee9 100644 --- a/apps/macos/Sources/Clawdis/MenuBar.swift +++ b/apps/macos/Sources/Clawdis/MenuBar.swift @@ -168,6 +168,10 @@ final class AppDelegate: NSObject, NSApplicationDelegate, NSXPCListenerDelegate PresenceReporter.shared.start() } Task { await HealthStore.shared.refresh(onDemand: true) } + Task { + let mode = AppStateStore.shared.connectionMode + await PortGuardian.shared.sweep(mode: mode) + } self.startListener() self.scheduleFirstRunOnboardingIfNeeded() diff --git a/apps/macos/Sources/Clawdis/PortGuardian.swift b/apps/macos/Sources/Clawdis/PortGuardian.swift new file mode 100644 index 000000000..da9ec3e77 --- /dev/null +++ b/apps/macos/Sources/Clawdis/PortGuardian.swift @@ -0,0 +1,146 @@ +import Foundation +import OSLog + +actor PortGuardian { + static let shared = PortGuardian() + + struct Record: Codable { + let port: Int + let pid: Int32 + let command: String + let mode: String + let timestamp: TimeInterval + } + + private var records: [Record] = [] + private let logger = Logger(subsystem: "com.steipete.clawdis", category: "portguard") + nonisolated private static let appSupportDir: URL = { + let base = FileManager.default.urls(for: .applicationSupportDirectory, in: .userDomainMask).first! + return base.appendingPathComponent("Clawdis", isDirectory: true) + }() + nonisolated private static var recordPath: URL { + self.appSupportDir.appendingPathComponent("port-guard.json", isDirectory: false) + } + + init() { + self.records = Self.loadRecords(from: Self.recordPath) + } + + func sweep(mode: AppState.ConnectionMode) async { + self.logger.info("port sweep starting (mode=\(mode.rawValue, privacy: .public))") + let ports = [18788, 18789] + for port in ports { + let listeners = await self.listeners(on: port) + guard !listeners.isEmpty else { continue } + for listener in listeners { + if self.isExpected(listener, port: port, mode: mode) { + self.logger.info("port \(port, privacy: .public) already served by expected \(listener.command, privacy: .public) (pid \(listener.pid)) — keeping") + continue + } + let killed = await self.kill(listener.pid) + if killed { + self.logger.error("port \(port, privacy: .public) was held by \(listener.command, privacy: .public) (pid \(listener.pid)); terminated") + } else { + self.logger.error("failed to terminate pid \(listener.pid) on port \(port, privacy: .public)") + } + } + } + self.logger.info("port sweep done") + } + + func record(port: Int, pid: Int32, command: String, mode: AppState.ConnectionMode) async { + try? FileManager.default.createDirectory(at: Self.appSupportDir, withIntermediateDirectories: true) + self.records.removeAll { $0.pid == pid } + self.records.append(Record(port: port, pid: pid, command: command, mode: mode.rawValue, timestamp: Date().timeIntervalSince1970)) + self.save() + } + + func removeRecord(pid: Int32) { + let before = self.records.count + self.records.removeAll { $0.pid == pid } + if self.records.count != before { + self.save() + } + } + + // MARK: - Internals + + private struct Listener { + let pid: Int32 + let command: String + let user: String? + } + + private func listeners(on port: Int) async -> [Listener] { + let res = await ShellExecutor.run( + command: ["lsof", "-nP", "-iTCP:\(port)", "-sTCP:LISTEN", "-Fpcn"], + cwd: nil, + env: nil, + timeout: 5) + guard res.ok, let data = res.payload, !data.isEmpty else { return [] } + let text = String(data: data, encoding: .utf8) ?? "" + var listeners: [Listener] = [] + var currentPid: Int32? + var currentCmd: String? + var currentUser: String? + + func flush() { + if let pid = currentPid, let cmd = currentCmd { + listeners.append(Listener(pid: pid, command: cmd, user: currentUser)) + } + currentPid = nil + currentCmd = nil + currentUser = nil + } + + for line in text.split(separator: "\n") { + guard let prefix = line.first else { continue } + let value = String(line.dropFirst()) + switch prefix { + case "p": + flush() + currentPid = Int32(value) ?? 0 + case "c": + currentCmd = value + case "u": + currentUser = value + default: + continue + } + } + flush() + return listeners + } + + private func kill(_ pid: Int32) async -> Bool { + let term = await ShellExecutor.run(command: ["kill", "-TERM", "\(pid)"], cwd: nil, env: nil, timeout: 2) + if term.ok { return true } + let sigkill = await ShellExecutor.run(command: ["kill", "-KILL", "\(pid)"], cwd: nil, env: nil, timeout: 2) + return sigkill.ok + } + + private func isExpected(_ listener: Listener, port: Int, mode: AppState.ConnectionMode) -> Bool { + let cmd = listener.command.lowercased() + switch mode { + case .remote: + if port == 18788 { + return cmd.contains("ssh") && cmd.contains("18788") + } + return false + case .local: + return cmd.contains("node") || cmd.contains("clawdis") || cmd.contains("tsx") + } + } + + private static func loadRecords(from url: URL) -> [Record] { + guard let data = try? Data(contentsOf: url), + let decoded = try? JSONDecoder().decode([Record].self, from: data) + else { return [] } + return decoded + } + + private func save() { + guard let data = try? JSONEncoder().encode(self.records) else { return } + try? data.write(to: Self.recordPath, options: [.atomic]) + } +} diff --git a/apps/macos/Sources/Clawdis/WebChatWindow.swift b/apps/macos/Sources/Clawdis/WebChatWindow.swift index b2592ba8a..0edef9bbe 100644 --- a/apps/macos/Sources/Clawdis/WebChatWindow.swift +++ b/apps/macos/Sources/Clawdis/WebChatWindow.swift @@ -541,14 +541,18 @@ final class WebChatTunnel { } deinit { + let pid = self.process.processIdentifier self.process.terminate() + Task { await PortGuardian.shared.removeRecord(pid: pid) } } func terminate() { + let pid = self.process.processIdentifier if self.process.isRunning { self.process.terminate() self.process.waitUntilExit() } + Task { await PortGuardian.shared.removeRecord(pid: pid) } } static func create(remotePort: Int, preferredLocalPort: UInt16? = nil) async throws -> WebChatTunnel { @@ -588,6 +592,14 @@ final class WebChatTunnel { webChatLogger.error("webchat tunnel stderr: \(line, privacy: .public)") } try process.run() + // Track tunnel so we can clean up stale listeners on restart. + Task { + await PortGuardian.shared.record( + port: Int(localPort), + pid: process.processIdentifier, + command: process.executableURL?.path ?? "ssh", + mode: CommandResolver.connectionSettings().mode) + } return WebChatTunnel(process: process, localPort: localPort) }