fix(macos): prevent control socket hangs
This commit is contained in:
@@ -8,11 +8,16 @@ enum ControlRequestHandler {
|
||||
notifier: NotificationManager = NotificationManager(),
|
||||
logger: Logger = Logger(subsystem: "com.steipete.clawdis", category: "control")) async throws -> Response
|
||||
{
|
||||
let paused = await MainActor.run { AppStateStore.isPausedFlag }
|
||||
// Keep `status` responsive even if the main actor is busy.
|
||||
let paused = UserDefaults.standard.bool(forKey: pauseDefaultsKey)
|
||||
if paused {
|
||||
return Response(ok: false, message: "clawdis paused")
|
||||
switch request {
|
||||
case .status:
|
||||
break
|
||||
default:
|
||||
return Response(ok: false, message: "clawdis paused")
|
||||
}
|
||||
}
|
||||
let canvasEnabled = await MainActor.run { AppStateStore.canvasEnabled }
|
||||
|
||||
switch request {
|
||||
case let .notify(title, body, sound, priority, delivery):
|
||||
@@ -47,7 +52,7 @@ enum ControlRequestHandler {
|
||||
return Response(ok: ok, message: msg)
|
||||
|
||||
case .status:
|
||||
return Response(ok: true, message: "ready")
|
||||
return paused ? Response(ok: false, message: "clawdis paused") : Response(ok: true, message: "ready")
|
||||
|
||||
case .rpcStatus:
|
||||
let result = await AgentRPC.shared.status()
|
||||
@@ -86,6 +91,7 @@ enum ControlRequestHandler {
|
||||
: Response(ok: false, message: rpcResult.error ?? "failed to send")
|
||||
|
||||
case let .canvasShow(session, path, placement):
|
||||
let canvasEnabled = UserDefaults.standard.object(forKey: canvasEnabledKey) as? Bool ?? true
|
||||
guard canvasEnabled else {
|
||||
return Response(ok: false, message: "Canvas disabled by user")
|
||||
}
|
||||
@@ -104,6 +110,7 @@ enum ControlRequestHandler {
|
||||
return Response(ok: true)
|
||||
|
||||
case let .canvasGoto(session, path, placement):
|
||||
let canvasEnabled = UserDefaults.standard.object(forKey: canvasEnabledKey) as? Bool ?? true
|
||||
guard canvasEnabled else {
|
||||
return Response(ok: false, message: "Canvas disabled by user")
|
||||
}
|
||||
@@ -118,6 +125,7 @@ enum ControlRequestHandler {
|
||||
}
|
||||
|
||||
case let .canvasEval(session, javaScript):
|
||||
let canvasEnabled = UserDefaults.standard.object(forKey: canvasEnabledKey) as? Bool ?? true
|
||||
guard canvasEnabled else {
|
||||
return Response(ok: false, message: "Canvas disabled by user")
|
||||
}
|
||||
@@ -129,6 +137,7 @@ enum ControlRequestHandler {
|
||||
}
|
||||
|
||||
case let .canvasSnapshot(session, outPath):
|
||||
let canvasEnabled = UserDefaults.standard.object(forKey: canvasEnabledKey) as? Bool ?? true
|
||||
guard canvasEnabled else {
|
||||
return Response(ok: false, message: "Canvas disabled by user")
|
||||
}
|
||||
|
||||
@@ -1,16 +1,34 @@
|
||||
import ClawdisIPC
|
||||
import Foundation
|
||||
import Darwin
|
||||
import OSLog
|
||||
|
||||
/// Lightweight UNIX-domain socket server so `clawdis-mac` can talk to the app
|
||||
/// without a launchd MachService. Listens on `controlSocketPath`.
|
||||
final actor ControlSocketServer {
|
||||
private var listenFD: Int32 = -1
|
||||
private var source: DispatchSourceRead?
|
||||
private let maxRequestBytes = 512 * 1024
|
||||
private let allowedTeamIDs: Set<String> = ["Y5PE65HELJ"]
|
||||
nonisolated private static let logger = Logger(subsystem: "com.steipete.clawdis", category: "control.socket")
|
||||
|
||||
private func disableSigPipe(fd: Int32) {
|
||||
private var listenFD: Int32 = -1
|
||||
private var acceptTask: Task<Void, Never>?
|
||||
|
||||
private let socketPath: String
|
||||
private let maxRequestBytes: Int
|
||||
private let allowedTeamIDs: Set<String>
|
||||
private let requestTimeoutSec: TimeInterval
|
||||
|
||||
init(
|
||||
socketPath: String = controlSocketPath,
|
||||
maxRequestBytes: Int = 512 * 1024,
|
||||
allowedTeamIDs: Set<String> = ["Y5PE65HELJ"],
|
||||
requestTimeoutSec: TimeInterval = 5)
|
||||
{
|
||||
self.socketPath = socketPath
|
||||
self.maxRequestBytes = maxRequestBytes
|
||||
self.allowedTeamIDs = allowedTeamIDs
|
||||
self.requestTimeoutSec = requestTimeoutSec
|
||||
}
|
||||
|
||||
private static func disableSigPipe(fd: Int32) {
|
||||
var one: Int32 = 1
|
||||
_ = setsockopt(fd, SOL_SOCKET, SO_NOSIGPIPE, &one, socklen_t(MemoryLayout.size(ofValue: one)))
|
||||
}
|
||||
@@ -19,7 +37,7 @@ final actor ControlSocketServer {
|
||||
// Already running
|
||||
guard self.listenFD == -1 else { return }
|
||||
|
||||
let path = controlSocketPath
|
||||
let path = self.socketPath
|
||||
let fm = FileManager.default
|
||||
// Ensure directory exists
|
||||
let dir = (path as NSString).deletingLastPathComponent
|
||||
@@ -53,79 +71,155 @@ final actor ControlSocketServer {
|
||||
return
|
||||
}
|
||||
|
||||
let src = DispatchSource.makeReadSource(fileDescriptor: fd, queue: .global(qos: .utility))
|
||||
src.setEventHandler { [weak self] in
|
||||
guard let self else { return }
|
||||
Task { await self.acceptConnection(listenFD: fd) }
|
||||
}
|
||||
src.setCancelHandler { close(fd) }
|
||||
src.resume()
|
||||
|
||||
self.listenFD = fd
|
||||
self.source = src
|
||||
|
||||
let allowedTeamIDs = self.allowedTeamIDs
|
||||
let maxRequestBytes = self.maxRequestBytes
|
||||
let requestTimeoutSec = self.requestTimeoutSec
|
||||
self.acceptTask = Task.detached(priority: .utility) {
|
||||
await Self.acceptLoop(
|
||||
listenFD: fd,
|
||||
allowedTeamIDs: allowedTeamIDs,
|
||||
maxRequestBytes: maxRequestBytes,
|
||||
requestTimeoutSec: requestTimeoutSec)
|
||||
}
|
||||
}
|
||||
|
||||
func stop() {
|
||||
self.source?.cancel()
|
||||
self.source = nil
|
||||
self.acceptTask?.cancel()
|
||||
self.acceptTask = nil
|
||||
if self.listenFD != -1 {
|
||||
close(self.listenFD)
|
||||
self.listenFD = -1
|
||||
}
|
||||
unlink(controlSocketPath)
|
||||
unlink(self.socketPath)
|
||||
}
|
||||
|
||||
private func acceptConnection(listenFD: Int32) {
|
||||
var addr = sockaddr()
|
||||
var len: socklen_t = socklen_t(MemoryLayout<sockaddr>.size)
|
||||
let client = accept(listenFD, &addr, &len)
|
||||
guard client >= 0 else { return }
|
||||
self.disableSigPipe(fd: client)
|
||||
Task.detached { [weak self] in
|
||||
defer { close(client) }
|
||||
guard let self else { return }
|
||||
await self.handleClient(fd: client)
|
||||
private nonisolated static func acceptLoop(
|
||||
listenFD: Int32,
|
||||
allowedTeamIDs: Set<String>,
|
||||
maxRequestBytes: Int,
|
||||
requestTimeoutSec: TimeInterval) async
|
||||
{
|
||||
while !Task.isCancelled {
|
||||
var addr = sockaddr()
|
||||
var len: socklen_t = socklen_t(MemoryLayout<sockaddr>.size)
|
||||
let client = accept(listenFD, &addr, &len)
|
||||
if client < 0 {
|
||||
if errno == EINTR { continue }
|
||||
// Socket was likely closed as part of stop().
|
||||
if errno == EBADF || errno == EINVAL { return }
|
||||
self.logger.error("accept failed: \(errno, privacy: .public)")
|
||||
try? await Task.sleep(nanoseconds: 50_000_000)
|
||||
continue
|
||||
}
|
||||
|
||||
Self.disableSigPipe(fd: client)
|
||||
Task.detached(priority: .utility) {
|
||||
defer { close(client) }
|
||||
await Self.handleClient(
|
||||
fd: client,
|
||||
allowedTeamIDs: allowedTeamIDs,
|
||||
maxRequestBytes: maxRequestBytes,
|
||||
requestTimeoutSec: requestTimeoutSec)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private func handleClient(fd: Int32) async {
|
||||
guard self.isAllowed(fd: fd) else { return }
|
||||
|
||||
var data = Data()
|
||||
var buffer = [UInt8](repeating: 0, count: 16 * 1024)
|
||||
let bufSize = buffer.count
|
||||
while true {
|
||||
let readCount = buffer.withUnsafeMutableBytes {
|
||||
read(fd, $0.baseAddress!, bufSize)
|
||||
}
|
||||
if readCount > 0 {
|
||||
data.append(buffer, count: readCount)
|
||||
if data.count > self.maxRequestBytes { return }
|
||||
} else {
|
||||
break
|
||||
}
|
||||
private nonisolated static func handleClient(
|
||||
fd: Int32,
|
||||
allowedTeamIDs: Set<String>,
|
||||
maxRequestBytes: Int,
|
||||
requestTimeoutSec: TimeInterval) async
|
||||
{
|
||||
guard self.isAllowed(fd: fd, allowedTeamIDs: allowedTeamIDs) else {
|
||||
return
|
||||
}
|
||||
|
||||
guard !data.isEmpty else { return }
|
||||
|
||||
do {
|
||||
let request = try JSONDecoder().decode(Request.self, from: data)
|
||||
let response = try await ControlRequestHandler.process(request: request)
|
||||
let encoded = try JSONEncoder().encode(response)
|
||||
_ = encoded.withUnsafeBytes { ptr in
|
||||
write(fd, ptr.baseAddress!, encoded.count)
|
||||
guard let request = try self.readRequest(
|
||||
fd: fd,
|
||||
maxRequestBytes: maxRequestBytes,
|
||||
timeoutSec: requestTimeoutSec)
|
||||
else {
|
||||
return
|
||||
}
|
||||
|
||||
let response = try await ControlRequestHandler.process(request: request)
|
||||
try self.writeResponse(fd: fd, response: response)
|
||||
} catch {
|
||||
self.logger.error("socket request failed: \(error.localizedDescription, privacy: .public)")
|
||||
let resp = Response(ok: false, message: "socket error: \(error.localizedDescription)")
|
||||
if let encoded = try? JSONEncoder().encode(resp) {
|
||||
_ = encoded.withUnsafeBytes { ptr in
|
||||
write(fd, ptr.baseAddress!, encoded.count)
|
||||
try? self.writeResponse(fd: fd, response: resp)
|
||||
}
|
||||
}
|
||||
|
||||
private nonisolated static func readRequest(
|
||||
fd: Int32,
|
||||
maxRequestBytes: Int,
|
||||
timeoutSec: TimeInterval) throws -> Request?
|
||||
{
|
||||
let deadline = Date().addingTimeInterval(timeoutSec)
|
||||
var data = Data()
|
||||
var buffer = [UInt8](repeating: 0, count: 16 * 1024)
|
||||
let bufferSize = buffer.count
|
||||
let decoder = JSONDecoder()
|
||||
|
||||
while true {
|
||||
let remaining = deadline.timeIntervalSinceNow
|
||||
if remaining <= 0 {
|
||||
throw POSIXError(.ETIMEDOUT)
|
||||
}
|
||||
|
||||
var pfd = pollfd(fd: fd, events: Int16(POLLIN), revents: 0)
|
||||
let sliceMs = max(1.0, min(remaining, 0.25) * 1000.0)
|
||||
let polled = poll(&pfd, 1, Int32(sliceMs))
|
||||
if polled == 0 { continue }
|
||||
if polled < 0 {
|
||||
if errno == EINTR { continue }
|
||||
throw POSIXError(POSIXErrorCode(rawValue: errno) ?? .EIO)
|
||||
}
|
||||
|
||||
let n = buffer.withUnsafeMutableBytes { read(fd, $0.baseAddress!, bufferSize) }
|
||||
if n > 0 {
|
||||
data.append(buffer, count: n)
|
||||
if data.count > maxRequestBytes {
|
||||
throw POSIXError(.EMSGSIZE)
|
||||
}
|
||||
if let req = try? decoder.decode(Request.self, from: data) {
|
||||
return req
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
if n == 0 {
|
||||
return data.isEmpty ? nil : try decoder.decode(Request.self, from: data)
|
||||
}
|
||||
|
||||
if errno == EINTR { continue }
|
||||
if errno == EAGAIN { continue }
|
||||
throw POSIXError(POSIXErrorCode(rawValue: errno) ?? .EIO)
|
||||
}
|
||||
}
|
||||
|
||||
private nonisolated static func writeResponse(fd: Int32, response: Response) throws {
|
||||
let encoded = try JSONEncoder().encode(response)
|
||||
try encoded.withUnsafeBytes { buf in
|
||||
guard let base = buf.baseAddress else { return }
|
||||
var written = 0
|
||||
while written < encoded.count {
|
||||
let n = write(fd, base.advanced(by: written), encoded.count - written)
|
||||
if n > 0 {
|
||||
written += n
|
||||
continue
|
||||
}
|
||||
if n == -1, errno == EINTR { continue }
|
||||
throw POSIXError(POSIXErrorCode(rawValue: errno) ?? .EIO)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private func isAllowed(fd: Int32) -> Bool {
|
||||
private nonisolated static func isAllowed(fd: Int32, allowedTeamIDs: Set<String>) -> Bool {
|
||||
var pid: pid_t = 0
|
||||
var pidSize = socklen_t(MemoryLayout<pid_t>.size)
|
||||
let r = getsockopt(fd, SOL_LOCAL, LOCAL_PEERPID, &pid, &pidSize)
|
||||
@@ -136,10 +230,10 @@ final actor ControlSocketServer {
|
||||
return true
|
||||
}
|
||||
|
||||
return self.teamIDMatches(pid: pid)
|
||||
return self.teamIDMatches(pid: pid, allowedTeamIDs: allowedTeamIDs)
|
||||
}
|
||||
|
||||
private func uid(for pid: pid_t) -> uid_t? {
|
||||
private nonisolated static func uid(for pid: pid_t) -> uid_t? {
|
||||
var info = kinfo_proc()
|
||||
var size = MemoryLayout.size(ofValue: info)
|
||||
var mib: [Int32] = [CTL_KERN, KERN_PROC, KERN_PROC_PID, pid]
|
||||
@@ -149,7 +243,7 @@ final actor ControlSocketServer {
|
||||
return ok ? info.kp_eproc.e_ucred.cr_uid : nil
|
||||
}
|
||||
|
||||
private func teamIDMatches(pid: pid_t) -> Bool {
|
||||
private nonisolated static func teamIDMatches(pid: pid_t, allowedTeamIDs: Set<String>) -> Bool {
|
||||
let attrs: NSDictionary = [kSecGuestAttributePid: pid]
|
||||
var secCode: SecCode?
|
||||
guard SecCodeCopyGuestWithAttributes(nil, attrs, SecCSFlags(), &secCode) == errSecSuccess,
|
||||
@@ -167,6 +261,6 @@ final actor ControlSocketServer {
|
||||
return false
|
||||
}
|
||||
|
||||
return self.allowedTeamIDs.contains(teamID)
|
||||
return allowedTeamIDs.contains(teamID)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -53,12 +53,16 @@ import Testing
|
||||
}
|
||||
|
||||
func receive() async throws -> URLSessionWebSocketTask.Message {
|
||||
let (delayMs, msg): (Int, URLSessionWebSocketTask.Message) = switch self.response {
|
||||
case let .helloOk(delayMs):
|
||||
let delayMs: Int
|
||||
let msg: URLSessionWebSocketTask.Message
|
||||
switch self.response {
|
||||
case let .helloOk(ms):
|
||||
delayMs = ms
|
||||
let id = self.connectRequestID.withLock { $0 } ?? "connect"
|
||||
(delayMs, .data(Self.connectOkData(id: id)))
|
||||
case let .invalid(delayMs):
|
||||
(delayMs, .string("not json"))
|
||||
msg = .data(Self.connectOkData(id: id))
|
||||
case let .invalid(ms):
|
||||
delayMs = ms
|
||||
msg = .string("not json")
|
||||
}
|
||||
try await Task.sleep(nanoseconds: UInt64(delayMs) * 1_000_000)
|
||||
return msg
|
||||
|
||||
Reference in New Issue
Block a user