diff --git a/apps/macos/Sources/Clawdis/AgentRPC.swift b/apps/macos/Sources/Clawdis/AgentRPC.swift new file mode 100644 index 000000000..b790f26b8 --- /dev/null +++ b/apps/macos/Sources/Clawdis/AgentRPC.swift @@ -0,0 +1,127 @@ +import Foundation +import OSLog + +actor AgentRPC { + static let shared = AgentRPC() + + private var process: Process? + private var stdinHandle: FileHandle? + private var stdoutHandle: FileHandle? + private var buffer = Data() + private var waiters: [CheckedContinuation] = [] + private let logger = Logger(subsystem: "com.steipete.clawdis", category: "agent.rpc") + + private struct RpcError: Error { let message: String } + + func send(text: String, thinking: String?, session: String) async -> (ok: Bool, text: String?, error: String?) { + do { + try await ensureRunning() + let payload: [String: Any] = [ + "type": "send", + "text": text, + "session": session, + "thinking": thinking ?? "default", + ] + let data = try JSONSerialization.data(withJSONObject: payload) + guard let stdinHandle else { throw RpcError(message: "stdin missing") } + stdinHandle.write(data) + stdinHandle.write(Data([0x0A])) + + let line = try await nextLine() + let parsed = try JSONSerialization.jsonObject(with: Data(line.utf8)) as? [String: Any] + guard let parsed else { throw RpcError(message: "invalid JSON") } + + if let ok = parsed["ok"] as? Bool, let type = parsed["type"] as? String, type == "result" { + if ok { + if let payloadDict = parsed["payload"] as? [String: Any], + let payloads = payloadDict["payloads"] as? [[String: Any]], + let first = payloads.first, + let txt = first["text"] as? String { + return (true, txt, nil) + } + return (true, nil, nil) + } + } + if let err = parsed["error"] as? String { + return (false, nil, err) + } + return (false, nil, "rpc returned unexpected response") + } catch { + logger.error("rpc send failed: \(error.localizedDescription, privacy: .public)") + await stop() + return (false, nil, error.localizedDescription) + } + } + + // MARK: - Process lifecycle + + private func ensureRunning() async throws { + if let process, process.isRunning { return } + try await start() + } + + private func start() async throws { + let process = Process() + process.executableURL = URL(fileURLWithPath: "/usr/bin/env") + process.arguments = ["pnpm", "clawdis", "rpc"] + let projectRoot = await RelayProcessManager.shared.projectRootPath() + process.currentDirectoryURL = URL(fileURLWithPath: projectRoot) + + let stdinPipe = Pipe() + let stdoutPipe = Pipe() + process.standardInput = stdinPipe + process.standardOutput = stdoutPipe + process.standardError = Pipe() + + try process.run() + + self.process = process + self.stdinHandle = stdinPipe.fileHandleForWriting + self.stdoutHandle = stdoutPipe.fileHandleForReading + + stdoutPipe.fileHandleForReading.readabilityHandler = { [weak self] handle in + guard let self else { return } + let data = handle.availableData + if data.isEmpty { return } + Task { await self.ingest(data: data) } + } + + Task.detached { [weak self] in + process.waitUntilExit() + await self?.stop() + } + } + + private func stop() async { + stdoutHandle?.readabilityHandler = nil + process?.terminate() + process = nil + stdinHandle = nil + stdoutHandle = nil + buffer.removeAll(keepingCapacity: false) + let waiters = self.waiters + self.waiters.removeAll() + for waiter in waiters { + waiter.resume(throwing: RpcError(message: "rpc process stopped")) + } + } + + private func ingest(data: Data) { + buffer.append(data) + while let range = buffer.firstRange(of: Data([0x0A])) { + let lineData = buffer.subdata(in: buffer.startIndex.. String { + try await withCheckedThrowingContinuation { (cont: CheckedContinuation) in + waiters.append(cont) + } + } +} diff --git a/apps/macos/Sources/Clawdis/XPCService.swift b/apps/macos/Sources/Clawdis/XPCService.swift index d31e68de1..e715d8d8b 100644 --- a/apps/macos/Sources/Clawdis/XPCService.swift +++ b/apps/macos/Sources/Clawdis/XPCService.swift @@ -71,14 +71,24 @@ final class ClawdisXPCService: NSObject, ClawdisXPCProtocol { case let .agent(message, thinking, session): let trimmed = message.trimmingCharacters(in: .whitespacesAndNewlines) guard !trimmed.isEmpty else { return Response(ok: false, message: "message empty") } + let sessionKey = session ?? "main" + + // Try RPC first for lower latency; fall back to one-shot CLI. + let rpcResult = await AgentRPC.shared.send( + text: trimmed, + thinking: thinking, + session: sessionKey) + if rpcResult.ok { + return Response(ok: true, message: rpcResult.text ?? "sent") + } let result = await self.runAgentCLI( message: trimmed, thinking: thinking, - session: session ?? "main") + session: sessionKey) return result.ok ? Response(ok: true, message: result.text ?? "sent") - : Response(ok: false, message: result.error ?? "failed to send") + : Response(ok: false, message: result.error ?? rpcResult.error ?? "failed to send") } }