fix(macos): detect and reset stale SSH tunnels
This commit is contained in:
@@ -65,6 +65,7 @@
|
|||||||
- Session list polish: sleeping/disconnected/error states, usage bar restored, padding + bar sizing tuned, syncing menu removed, header hidden when disconnected.
|
- Session list polish: sleeping/disconnected/error states, usage bar restored, padding + bar sizing tuned, syncing menu removed, header hidden when disconnected.
|
||||||
- Chat UI polish: tool call cards + merged tool results, glass background, tighter composer spacing, visual effect host tweaks.
|
- Chat UI polish: tool call cards + merged tool results, glass background, tighter composer spacing, visual effect host tweaks.
|
||||||
- OAuth storage moved; legacy session syncing metadata removed.
|
- OAuth storage moved; legacy session syncing metadata removed.
|
||||||
|
- Remote SSH tunnels now get health checks; Debug → Ports highlights unhealthy tunnels and offers Reset SSH tunnel.
|
||||||
|
|
||||||
### Nodes & Canvas
|
### Nodes & Canvas
|
||||||
- Debug status overlay gated and toggleable on macOS/iOS/Android nodes.
|
- Debug status overlay gated and toggleable on macOS/iOS/Android nodes.
|
||||||
|
|||||||
@@ -108,6 +108,27 @@ enum DebugActions {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static func resetGatewayTunnel() async -> Result<String, DebugActionError> {
|
||||||
|
let mode = CommandResolver.connectionSettings().mode
|
||||||
|
guard mode == .remote else {
|
||||||
|
return .failure(.message("Remote mode is not enabled."))
|
||||||
|
}
|
||||||
|
await RemoteTunnelManager.shared.stopAll()
|
||||||
|
await GatewayConnection.shared.shutdown()
|
||||||
|
do {
|
||||||
|
_ = try await RemoteTunnelManager.shared.ensureControlTunnel()
|
||||||
|
let settings = CommandResolver.connectionSettings()
|
||||||
|
try await ControlChannel.shared.configure(mode: .remote(
|
||||||
|
target: settings.target,
|
||||||
|
identity: settings.identity))
|
||||||
|
await HealthStore.shared.refresh(onDemand: true)
|
||||||
|
return .success("SSH tunnel reset.")
|
||||||
|
} catch {
|
||||||
|
Task { await HealthStore.shared.refresh(onDemand: true) }
|
||||||
|
return .failure(.message(error.localizedDescription))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
static func pinoLogPath() -> String {
|
static func pinoLogPath() -> String {
|
||||||
LogLocator.bestLogFile()?.path ?? LogLocator.launchdLogPath
|
LogLocator.bestLogFile()?.path ?? LogLocator.launchdLogPath
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -23,6 +23,8 @@ struct DebugSettings: View {
|
|||||||
@State private var portCheckInFlight = false
|
@State private var portCheckInFlight = false
|
||||||
@State private var portReports: [DebugActions.PortReport] = []
|
@State private var portReports: [DebugActions.PortReport] = []
|
||||||
@State private var portKillStatus: String?
|
@State private var portKillStatus: String?
|
||||||
|
@State private var tunnelResetInFlight = false
|
||||||
|
@State private var tunnelResetStatus: String?
|
||||||
@State private var pendingKill: DebugActions.PortListener?
|
@State private var pendingKill: DebugActions.PortListener?
|
||||||
@AppStorage(attachExistingGatewayOnlyKey) private var attachExistingGatewayOnly: Bool = false
|
@AppStorage(attachExistingGatewayOnlyKey) private var attachExistingGatewayOnly: Bool = false
|
||||||
@AppStorage(debugFileLogEnabledKey) private var diagnosticsFileLogEnabled: Bool = false
|
@AppStorage(debugFileLogEnabledKey) private var diagnosticsFileLogEnabled: Bool = false
|
||||||
@@ -264,6 +266,11 @@ struct DebugSettings: View {
|
|||||||
}
|
}
|
||||||
.buttonStyle(.borderedProminent)
|
.buttonStyle(.borderedProminent)
|
||||||
.disabled(self.portCheckInFlight)
|
.disabled(self.portCheckInFlight)
|
||||||
|
Button("Reset SSH tunnel") {
|
||||||
|
Task { await self.resetGatewayTunnel() }
|
||||||
|
}
|
||||||
|
.buttonStyle(.bordered)
|
||||||
|
.disabled(self.tunnelResetInFlight || !self.isRemoteMode)
|
||||||
}
|
}
|
||||||
|
|
||||||
if let portKillStatus {
|
if let portKillStatus {
|
||||||
@@ -272,6 +279,12 @@ struct DebugSettings: View {
|
|||||||
.foregroundStyle(.secondary)
|
.foregroundStyle(.secondary)
|
||||||
.fixedSize(horizontal: false, vertical: true)
|
.fixedSize(horizontal: false, vertical: true)
|
||||||
}
|
}
|
||||||
|
if let tunnelResetStatus {
|
||||||
|
Text(tunnelResetStatus)
|
||||||
|
.font(.caption2)
|
||||||
|
.foregroundStyle(.secondary)
|
||||||
|
.fixedSize(horizontal: false, vertical: true)
|
||||||
|
}
|
||||||
|
|
||||||
if self.portReports.isEmpty, !self.portCheckInFlight {
|
if self.portReports.isEmpty, !self.portCheckInFlight {
|
||||||
Text("Check which process owns 18789 and suggest fixes.")
|
Text("Check which process owns 18789 and suggest fixes.")
|
||||||
@@ -593,6 +606,21 @@ struct DebugSettings: View {
|
|||||||
self.portCheckInFlight = false
|
self.portCheckInFlight = false
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@MainActor
|
||||||
|
private func resetGatewayTunnel() async {
|
||||||
|
self.tunnelResetInFlight = true
|
||||||
|
self.tunnelResetStatus = nil
|
||||||
|
let result = await DebugActions.resetGatewayTunnel()
|
||||||
|
switch result {
|
||||||
|
case let .success(message):
|
||||||
|
self.tunnelResetStatus = message
|
||||||
|
case let .failure(err):
|
||||||
|
self.tunnelResetStatus = err.localizedDescription
|
||||||
|
}
|
||||||
|
await self.runPortCheck()
|
||||||
|
self.tunnelResetInFlight = false
|
||||||
|
}
|
||||||
|
|
||||||
@MainActor
|
@MainActor
|
||||||
private func requestKill(_ listener: DebugActions.PortListener) {
|
private func requestKill(_ listener: DebugActions.PortListener) {
|
||||||
if listener.expected {
|
if listener.expected {
|
||||||
@@ -730,6 +758,10 @@ struct DebugSettings: View {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private var isRemoteMode: Bool {
|
||||||
|
CommandResolver.connectionSettings().mode == .remote
|
||||||
|
}
|
||||||
|
|
||||||
private func configURL() -> URL {
|
private func configURL() -> URL {
|
||||||
FileManager.default.homeDirectoryForCurrentUser
|
FileManager.default.homeDirectoryForCurrentUser
|
||||||
.appendingPathComponent(".clawdis")
|
.appendingPathComponent(".clawdis")
|
||||||
|
|||||||
@@ -153,12 +153,37 @@ actor PortGuardian {
|
|||||||
|
|
||||||
for port in ports {
|
for port in ports {
|
||||||
let listeners = await self.listeners(on: port)
|
let listeners = await self.listeners(on: port)
|
||||||
reports.append(Self.buildReport(port: port, listeners: listeners, mode: mode))
|
let tunnelHealthy = await self.probeGatewayHealthIfNeeded(
|
||||||
|
port: port,
|
||||||
|
mode: mode,
|
||||||
|
listeners: listeners)
|
||||||
|
reports.append(Self.buildReport(
|
||||||
|
port: port,
|
||||||
|
listeners: listeners,
|
||||||
|
mode: mode,
|
||||||
|
tunnelHealthy: tunnelHealthy))
|
||||||
}
|
}
|
||||||
|
|
||||||
return reports
|
return reports
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func probeGatewayHealth(port: Int, timeout: TimeInterval = 2.0) async -> Bool {
|
||||||
|
let url = URL(string: "http://127.0.0.1:\(port)/")!
|
||||||
|
let config = URLSessionConfiguration.ephemeral
|
||||||
|
config.timeoutIntervalForRequest = timeout
|
||||||
|
config.timeoutIntervalForResource = timeout
|
||||||
|
let session = URLSession(configuration: config)
|
||||||
|
var request = URLRequest(url: url)
|
||||||
|
request.cachePolicy = .reloadIgnoringLocalCacheData
|
||||||
|
request.timeoutInterval = timeout
|
||||||
|
do {
|
||||||
|
let (_, response) = try await session.data(for: request)
|
||||||
|
return response is HTTPURLResponse
|
||||||
|
} catch {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private func listeners(on port: Int) async -> [Listener] {
|
private func listeners(on port: Int) async -> [Listener] {
|
||||||
let res = await ShellExecutor.run(
|
let res = await ShellExecutor.run(
|
||||||
command: ["lsof", "-nP", "-iTCP:\(port)", "-sTCP:LISTEN", "-Fpcn"],
|
command: ["lsof", "-nP", "-iTCP:\(port)", "-sTCP:LISTEN", "-Fpcn"],
|
||||||
@@ -227,7 +252,8 @@ actor PortGuardian {
|
|||||||
private static func buildReport(
|
private static func buildReport(
|
||||||
port: Int,
|
port: Int,
|
||||||
listeners: [Listener],
|
listeners: [Listener],
|
||||||
mode: AppState.ConnectionMode) -> PortReport
|
mode: AppState.ConnectionMode,
|
||||||
|
tunnelHealthy: Bool?) -> PortReport
|
||||||
{
|
{
|
||||||
let expectedDesc: String
|
let expectedDesc: String
|
||||||
let okPredicate: (Listener) -> Bool
|
let okPredicate: (Listener) -> Bool
|
||||||
@@ -253,16 +279,28 @@ actor PortGuardian {
|
|||||||
return .init(port: port, expected: expectedDesc, status: .missing(text), listeners: [])
|
return .init(port: port, expected: expectedDesc, status: .missing(text), listeners: [])
|
||||||
}
|
}
|
||||||
|
|
||||||
|
let tunnelUnhealthy = mode == .remote && port == 18789 && tunnelHealthy == false
|
||||||
let reportListeners = listeners.map { listener in
|
let reportListeners = listeners.map { listener in
|
||||||
ReportListener(
|
var expected = okPredicate(listener)
|
||||||
|
if tunnelUnhealthy, expected { expected = false }
|
||||||
|
return ReportListener(
|
||||||
pid: listener.pid,
|
pid: listener.pid,
|
||||||
command: listener.command,
|
command: listener.command,
|
||||||
fullCommand: listener.fullCommand,
|
fullCommand: listener.fullCommand,
|
||||||
user: listener.user,
|
user: listener.user,
|
||||||
expected: okPredicate(listener))
|
expected: expected)
|
||||||
}
|
}
|
||||||
|
|
||||||
let offenders = reportListeners.filter { !$0.expected }
|
let offenders = reportListeners.filter { !$0.expected }
|
||||||
|
if tunnelUnhealthy {
|
||||||
|
let list = listeners.map { "\($0.command) (\($0.pid))" }.joined(separator: ", ")
|
||||||
|
let reason = "Port \(port) is served by \(list), but the SSH tunnel is unhealthy."
|
||||||
|
return .init(
|
||||||
|
port: port,
|
||||||
|
expected: expectedDesc,
|
||||||
|
status: .interference(reason, offenders: offenders),
|
||||||
|
listeners: reportListeners)
|
||||||
|
}
|
||||||
if offenders.isEmpty {
|
if offenders.isEmpty {
|
||||||
let list = listeners.map { "\($0.command) (\($0.pid))" }.joined(separator: ", ")
|
let list = listeners.map { "\($0.command) (\($0.pid))" }.joined(separator: ", ")
|
||||||
let okText = "Port \(port) is served by \(list)."
|
let okText = "Port \(port) is served by \(list)."
|
||||||
@@ -318,6 +356,17 @@ actor PortGuardian {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private func probeGatewayHealthIfNeeded(
|
||||||
|
port: Int,
|
||||||
|
mode: AppState.ConnectionMode,
|
||||||
|
listeners: [Listener]) async -> Bool?
|
||||||
|
{
|
||||||
|
guard mode == .remote, port == 18789, !listeners.isEmpty else { return nil }
|
||||||
|
let hasSsh = listeners.contains { $0.command.lowercased().contains("ssh") }
|
||||||
|
guard hasSsh else { return nil }
|
||||||
|
return await self.probeGatewayHealth(port: port)
|
||||||
|
}
|
||||||
|
|
||||||
private static func loadRecords(from url: URL) -> [Record] {
|
private static func loadRecords(from url: URL) -> [Record] {
|
||||||
guard let data = try? Data(contentsOf: url),
|
guard let data = try? Data(contentsOf: url),
|
||||||
let decoded = try? JSONDecoder().decode([Record].self, from: data)
|
let decoded = try? JSONDecoder().decode([Record].self, from: data)
|
||||||
@@ -352,7 +401,7 @@ extension PortGuardian {
|
|||||||
command: $0.command,
|
command: $0.command,
|
||||||
fullCommand: $0.fullCommand,
|
fullCommand: $0.fullCommand,
|
||||||
user: $0.user) }
|
user: $0.user) }
|
||||||
return Self.buildReport(port: port, listeners: mapped, mode: mode)
|
return Self.buildReport(port: port, listeners: mapped, mode: mode, tunnelHealthy: nil)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|||||||
@@ -1,9 +1,11 @@
|
|||||||
import Foundation
|
import Foundation
|
||||||
|
import OSLog
|
||||||
|
|
||||||
/// Manages the SSH tunnel that forwards the remote gateway/control port to localhost.
|
/// Manages the SSH tunnel that forwards the remote gateway/control port to localhost.
|
||||||
actor RemoteTunnelManager {
|
actor RemoteTunnelManager {
|
||||||
static let shared = RemoteTunnelManager()
|
static let shared = RemoteTunnelManager()
|
||||||
|
|
||||||
|
private let logger = Logger(subsystem: "com.steipete.clawdis", category: "remote-tunnel")
|
||||||
private var controlTunnel: RemotePortTunnel?
|
private var controlTunnel: RemotePortTunnel?
|
||||||
|
|
||||||
func controlTunnelPortIfRunning() async -> UInt16? {
|
func controlTunnelPortIfRunning() async -> UInt16? {
|
||||||
@@ -11,15 +13,19 @@ actor RemoteTunnelManager {
|
|||||||
tunnel.process.isRunning,
|
tunnel.process.isRunning,
|
||||||
let local = tunnel.localPort
|
let local = tunnel.localPort
|
||||||
{
|
{
|
||||||
return local
|
if await self.isTunnelHealthy(port: local) { return local }
|
||||||
|
self.logger.error("active SSH tunnel on port \(local, privacy: .public) is unhealthy; restarting")
|
||||||
|
tunnel.terminate()
|
||||||
|
self.controlTunnel = nil
|
||||||
}
|
}
|
||||||
// If a previous Clawdis run already has an SSH listener on the expected port (common after restarts),
|
// If a previous Clawdis run already has an SSH listener on the expected port (common after restarts),
|
||||||
// reuse it instead of spawning new ssh processes that immediately fail with "Address already in use".
|
// reuse it instead of spawning new ssh processes that immediately fail with "Address already in use".
|
||||||
let desiredPort = UInt16(GatewayEnvironment.gatewayPort())
|
let desiredPort = UInt16(GatewayEnvironment.gatewayPort())
|
||||||
if let desc = await PortGuardian.shared.describe(port: Int(desiredPort)),
|
if let desc = await PortGuardian.shared.describe(port: Int(desiredPort)),
|
||||||
desc.command.lowercased().contains("ssh")
|
self.isSshProcess(desc)
|
||||||
{
|
{
|
||||||
return desiredPort
|
if await self.isTunnelHealthy(port: desiredPort) { return desiredPort }
|
||||||
|
await self.cleanupStaleTunnel(desc: desc, port: desiredPort)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@@ -49,4 +55,33 @@ actor RemoteTunnelManager {
|
|||||||
self.controlTunnel?.terminate()
|
self.controlTunnel?.terminate()
|
||||||
self.controlTunnel = nil
|
self.controlTunnel = nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private func isTunnelHealthy(port: UInt16) async -> Bool {
|
||||||
|
await PortGuardian.shared.probeGatewayHealth(port: Int(port))
|
||||||
|
}
|
||||||
|
|
||||||
|
private func isSshProcess(_ desc: PortGuardian.Descriptor) -> Bool {
|
||||||
|
let cmd = desc.command.lowercased()
|
||||||
|
if cmd.contains("ssh") { return true }
|
||||||
|
if let path = desc.executablePath?.lowercased(), path.contains("/ssh") { return true }
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
private func cleanupStaleTunnel(desc: PortGuardian.Descriptor, port: UInt16) async {
|
||||||
|
let pid = desc.pid
|
||||||
|
self.logger.error(
|
||||||
|
"stale SSH tunnel detected on port \(port, privacy: .public) pid \(pid, privacy: .public)")
|
||||||
|
let killed = await self.kill(pid: pid)
|
||||||
|
if !killed {
|
||||||
|
self.logger.error("failed to terminate stale SSH tunnel pid \(pid, privacy: .public)")
|
||||||
|
}
|
||||||
|
await PortGuardian.shared.removeRecord(pid: pid)
|
||||||
|
}
|
||||||
|
|
||||||
|
private func kill(pid: Int32) async -> Bool {
|
||||||
|
let term = await ShellExecutor.run(command: ["kill", "-TERM", "\(pid)"], cwd: nil, env: nil, timeout: 2)
|
||||||
|
if term.ok { return true }
|
||||||
|
let sigkill = await ShellExecutor.run(command: ["kill", "-KILL", "\(pid)"], cwd: nil, env: nil, timeout: 2)
|
||||||
|
return sigkill.ok
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user