From f98ab2d0371f3deaaf6806e26f777fb773cd65dd Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Sat, 13 Dec 2025 00:51:39 +0000 Subject: [PATCH] fix(macos): prevent control socket hangs --- .../Clawdis/ControlRequestHandler.swift | 17 +- .../Sources/Clawdis/ControlSocketServer.swift | 212 +++++++++++++----- .../GatewayChannelConnectTests.swift | 14 +- 3 files changed, 175 insertions(+), 68 deletions(-) diff --git a/apps/macos/Sources/Clawdis/ControlRequestHandler.swift b/apps/macos/Sources/Clawdis/ControlRequestHandler.swift index f733ba369..c1ad3ec94 100644 --- a/apps/macos/Sources/Clawdis/ControlRequestHandler.swift +++ b/apps/macos/Sources/Clawdis/ControlRequestHandler.swift @@ -8,11 +8,16 @@ enum ControlRequestHandler { notifier: NotificationManager = NotificationManager(), logger: Logger = Logger(subsystem: "com.steipete.clawdis", category: "control")) async throws -> Response { - let paused = await MainActor.run { AppStateStore.isPausedFlag } + // Keep `status` responsive even if the main actor is busy. + let paused = UserDefaults.standard.bool(forKey: pauseDefaultsKey) if paused { - return Response(ok: false, message: "clawdis paused") + switch request { + case .status: + break + default: + return Response(ok: false, message: "clawdis paused") + } } - let canvasEnabled = await MainActor.run { AppStateStore.canvasEnabled } switch request { case let .notify(title, body, sound, priority, delivery): @@ -47,7 +52,7 @@ enum ControlRequestHandler { return Response(ok: ok, message: msg) case .status: - return Response(ok: true, message: "ready") + return paused ? Response(ok: false, message: "clawdis paused") : Response(ok: true, message: "ready") case .rpcStatus: let result = await AgentRPC.shared.status() @@ -86,6 +91,7 @@ enum ControlRequestHandler { : Response(ok: false, message: rpcResult.error ?? "failed to send") case let .canvasShow(session, path, placement): + let canvasEnabled = UserDefaults.standard.object(forKey: canvasEnabledKey) as? Bool ?? true guard canvasEnabled else { return Response(ok: false, message: "Canvas disabled by user") } @@ -104,6 +110,7 @@ enum ControlRequestHandler { return Response(ok: true) case let .canvasGoto(session, path, placement): + let canvasEnabled = UserDefaults.standard.object(forKey: canvasEnabledKey) as? Bool ?? true guard canvasEnabled else { return Response(ok: false, message: "Canvas disabled by user") } @@ -118,6 +125,7 @@ enum ControlRequestHandler { } case let .canvasEval(session, javaScript): + let canvasEnabled = UserDefaults.standard.object(forKey: canvasEnabledKey) as? Bool ?? true guard canvasEnabled else { return Response(ok: false, message: "Canvas disabled by user") } @@ -129,6 +137,7 @@ enum ControlRequestHandler { } case let .canvasSnapshot(session, outPath): + let canvasEnabled = UserDefaults.standard.object(forKey: canvasEnabledKey) as? Bool ?? true guard canvasEnabled else { return Response(ok: false, message: "Canvas disabled by user") } diff --git a/apps/macos/Sources/Clawdis/ControlSocketServer.swift b/apps/macos/Sources/Clawdis/ControlSocketServer.swift index a4e01eb79..5eed65d30 100644 --- a/apps/macos/Sources/Clawdis/ControlSocketServer.swift +++ b/apps/macos/Sources/Clawdis/ControlSocketServer.swift @@ -1,16 +1,34 @@ import ClawdisIPC import Foundation import Darwin +import OSLog /// Lightweight UNIX-domain socket server so `clawdis-mac` can talk to the app /// without a launchd MachService. Listens on `controlSocketPath`. final actor ControlSocketServer { - private var listenFD: Int32 = -1 - private var source: DispatchSourceRead? - private let maxRequestBytes = 512 * 1024 - private let allowedTeamIDs: Set = ["Y5PE65HELJ"] + nonisolated private static let logger = Logger(subsystem: "com.steipete.clawdis", category: "control.socket") - private func disableSigPipe(fd: Int32) { + private var listenFD: Int32 = -1 + private var acceptTask: Task? + + private let socketPath: String + private let maxRequestBytes: Int + private let allowedTeamIDs: Set + private let requestTimeoutSec: TimeInterval + + init( + socketPath: String = controlSocketPath, + maxRequestBytes: Int = 512 * 1024, + allowedTeamIDs: Set = ["Y5PE65HELJ"], + requestTimeoutSec: TimeInterval = 5) + { + self.socketPath = socketPath + self.maxRequestBytes = maxRequestBytes + self.allowedTeamIDs = allowedTeamIDs + self.requestTimeoutSec = requestTimeoutSec + } + + private static func disableSigPipe(fd: Int32) { var one: Int32 = 1 _ = setsockopt(fd, SOL_SOCKET, SO_NOSIGPIPE, &one, socklen_t(MemoryLayout.size(ofValue: one))) } @@ -19,7 +37,7 @@ final actor ControlSocketServer { // Already running guard self.listenFD == -1 else { return } - let path = controlSocketPath + let path = self.socketPath let fm = FileManager.default // Ensure directory exists let dir = (path as NSString).deletingLastPathComponent @@ -53,79 +71,155 @@ final actor ControlSocketServer { return } - let src = DispatchSource.makeReadSource(fileDescriptor: fd, queue: .global(qos: .utility)) - src.setEventHandler { [weak self] in - guard let self else { return } - Task { await self.acceptConnection(listenFD: fd) } - } - src.setCancelHandler { close(fd) } - src.resume() - self.listenFD = fd - self.source = src + + let allowedTeamIDs = self.allowedTeamIDs + let maxRequestBytes = self.maxRequestBytes + let requestTimeoutSec = self.requestTimeoutSec + self.acceptTask = Task.detached(priority: .utility) { + await Self.acceptLoop( + listenFD: fd, + allowedTeamIDs: allowedTeamIDs, + maxRequestBytes: maxRequestBytes, + requestTimeoutSec: requestTimeoutSec) + } } func stop() { - self.source?.cancel() - self.source = nil + self.acceptTask?.cancel() + self.acceptTask = nil if self.listenFD != -1 { close(self.listenFD) self.listenFD = -1 } - unlink(controlSocketPath) + unlink(self.socketPath) } - private func acceptConnection(listenFD: Int32) { - var addr = sockaddr() - var len: socklen_t = socklen_t(MemoryLayout.size) - let client = accept(listenFD, &addr, &len) - guard client >= 0 else { return } - self.disableSigPipe(fd: client) - Task.detached { [weak self] in - defer { close(client) } - guard let self else { return } - await self.handleClient(fd: client) + private nonisolated static func acceptLoop( + listenFD: Int32, + allowedTeamIDs: Set, + maxRequestBytes: Int, + requestTimeoutSec: TimeInterval) async + { + while !Task.isCancelled { + var addr = sockaddr() + var len: socklen_t = socklen_t(MemoryLayout.size) + let client = accept(listenFD, &addr, &len) + if client < 0 { + if errno == EINTR { continue } + // Socket was likely closed as part of stop(). + if errno == EBADF || errno == EINVAL { return } + self.logger.error("accept failed: \(errno, privacy: .public)") + try? await Task.sleep(nanoseconds: 50_000_000) + continue + } + + Self.disableSigPipe(fd: client) + Task.detached(priority: .utility) { + defer { close(client) } + await Self.handleClient( + fd: client, + allowedTeamIDs: allowedTeamIDs, + maxRequestBytes: maxRequestBytes, + requestTimeoutSec: requestTimeoutSec) + } } } - private func handleClient(fd: Int32) async { - guard self.isAllowed(fd: fd) else { return } - - var data = Data() - var buffer = [UInt8](repeating: 0, count: 16 * 1024) - let bufSize = buffer.count - while true { - let readCount = buffer.withUnsafeMutableBytes { - read(fd, $0.baseAddress!, bufSize) - } - if readCount > 0 { - data.append(buffer, count: readCount) - if data.count > self.maxRequestBytes { return } - } else { - break - } + private nonisolated static func handleClient( + fd: Int32, + allowedTeamIDs: Set, + maxRequestBytes: Int, + requestTimeoutSec: TimeInterval) async + { + guard self.isAllowed(fd: fd, allowedTeamIDs: allowedTeamIDs) else { + return } - guard !data.isEmpty else { return } - do { - let request = try JSONDecoder().decode(Request.self, from: data) - let response = try await ControlRequestHandler.process(request: request) - let encoded = try JSONEncoder().encode(response) - _ = encoded.withUnsafeBytes { ptr in - write(fd, ptr.baseAddress!, encoded.count) + guard let request = try self.readRequest( + fd: fd, + maxRequestBytes: maxRequestBytes, + timeoutSec: requestTimeoutSec) + else { + return } + + let response = try await ControlRequestHandler.process(request: request) + try self.writeResponse(fd: fd, response: response) } catch { + self.logger.error("socket request failed: \(error.localizedDescription, privacy: .public)") let resp = Response(ok: false, message: "socket error: \(error.localizedDescription)") - if let encoded = try? JSONEncoder().encode(resp) { - _ = encoded.withUnsafeBytes { ptr in - write(fd, ptr.baseAddress!, encoded.count) + try? self.writeResponse(fd: fd, response: resp) + } + } + + private nonisolated static func readRequest( + fd: Int32, + maxRequestBytes: Int, + timeoutSec: TimeInterval) throws -> Request? + { + let deadline = Date().addingTimeInterval(timeoutSec) + var data = Data() + var buffer = [UInt8](repeating: 0, count: 16 * 1024) + let bufferSize = buffer.count + let decoder = JSONDecoder() + + while true { + let remaining = deadline.timeIntervalSinceNow + if remaining <= 0 { + throw POSIXError(.ETIMEDOUT) + } + + var pfd = pollfd(fd: fd, events: Int16(POLLIN), revents: 0) + let sliceMs = max(1.0, min(remaining, 0.25) * 1000.0) + let polled = poll(&pfd, 1, Int32(sliceMs)) + if polled == 0 { continue } + if polled < 0 { + if errno == EINTR { continue } + throw POSIXError(POSIXErrorCode(rawValue: errno) ?? .EIO) + } + + let n = buffer.withUnsafeMutableBytes { read(fd, $0.baseAddress!, bufferSize) } + if n > 0 { + data.append(buffer, count: n) + if data.count > maxRequestBytes { + throw POSIXError(.EMSGSIZE) } + if let req = try? decoder.decode(Request.self, from: data) { + return req + } + continue + } + + if n == 0 { + return data.isEmpty ? nil : try decoder.decode(Request.self, from: data) + } + + if errno == EINTR { continue } + if errno == EAGAIN { continue } + throw POSIXError(POSIXErrorCode(rawValue: errno) ?? .EIO) + } + } + + private nonisolated static func writeResponse(fd: Int32, response: Response) throws { + let encoded = try JSONEncoder().encode(response) + try encoded.withUnsafeBytes { buf in + guard let base = buf.baseAddress else { return } + var written = 0 + while written < encoded.count { + let n = write(fd, base.advanced(by: written), encoded.count - written) + if n > 0 { + written += n + continue + } + if n == -1, errno == EINTR { continue } + throw POSIXError(POSIXErrorCode(rawValue: errno) ?? .EIO) } } } - private func isAllowed(fd: Int32) -> Bool { + private nonisolated static func isAllowed(fd: Int32, allowedTeamIDs: Set) -> Bool { var pid: pid_t = 0 var pidSize = socklen_t(MemoryLayout.size) let r = getsockopt(fd, SOL_LOCAL, LOCAL_PEERPID, &pid, &pidSize) @@ -136,10 +230,10 @@ final actor ControlSocketServer { return true } - return self.teamIDMatches(pid: pid) + return self.teamIDMatches(pid: pid, allowedTeamIDs: allowedTeamIDs) } - private func uid(for pid: pid_t) -> uid_t? { + private nonisolated static func uid(for pid: pid_t) -> uid_t? { var info = kinfo_proc() var size = MemoryLayout.size(ofValue: info) var mib: [Int32] = [CTL_KERN, KERN_PROC, KERN_PROC_PID, pid] @@ -149,7 +243,7 @@ final actor ControlSocketServer { return ok ? info.kp_eproc.e_ucred.cr_uid : nil } - private func teamIDMatches(pid: pid_t) -> Bool { + private nonisolated static func teamIDMatches(pid: pid_t, allowedTeamIDs: Set) -> Bool { let attrs: NSDictionary = [kSecGuestAttributePid: pid] var secCode: SecCode? guard SecCodeCopyGuestWithAttributes(nil, attrs, SecCSFlags(), &secCode) == errSecSuccess, @@ -167,6 +261,6 @@ final actor ControlSocketServer { return false } - return self.allowedTeamIDs.contains(teamID) + return allowedTeamIDs.contains(teamID) } } diff --git a/apps/macos/Tests/ClawdisIPCTests/GatewayChannelConnectTests.swift b/apps/macos/Tests/ClawdisIPCTests/GatewayChannelConnectTests.swift index 8b589885a..4a016c7ec 100644 --- a/apps/macos/Tests/ClawdisIPCTests/GatewayChannelConnectTests.swift +++ b/apps/macos/Tests/ClawdisIPCTests/GatewayChannelConnectTests.swift @@ -53,12 +53,16 @@ import Testing } func receive() async throws -> URLSessionWebSocketTask.Message { - let (delayMs, msg): (Int, URLSessionWebSocketTask.Message) = switch self.response { - case let .helloOk(delayMs): + let delayMs: Int + let msg: URLSessionWebSocketTask.Message + switch self.response { + case let .helloOk(ms): + delayMs = ms let id = self.connectRequestID.withLock { $0 } ?? "connect" - (delayMs, .data(Self.connectOkData(id: id))) - case let .invalid(delayMs): - (delayMs, .string("not json")) + msg = .data(Self.connectOkData(id: id)) + case let .invalid(ms): + delayMs = ms + msg = .string("not json") } try await Task.sleep(nanoseconds: UInt64(delayMs) * 1_000_000) return msg