feat(agent): use persistent rpc worker for agent sends
This commit is contained in:
127
apps/macos/Sources/Clawdis/AgentRPC.swift
Normal file
127
apps/macos/Sources/Clawdis/AgentRPC.swift
Normal file
@@ -0,0 +1,127 @@
|
||||
import Foundation
|
||||
import OSLog
|
||||
|
||||
actor AgentRPC {
|
||||
static let shared = AgentRPC()
|
||||
|
||||
private var process: Process?
|
||||
private var stdinHandle: FileHandle?
|
||||
private var stdoutHandle: FileHandle?
|
||||
private var buffer = Data()
|
||||
private var waiters: [CheckedContinuation<String, Error>] = []
|
||||
private let logger = Logger(subsystem: "com.steipete.clawdis", category: "agent.rpc")
|
||||
|
||||
private struct RpcError: Error { let message: String }
|
||||
|
||||
func send(text: String, thinking: String?, session: String) async -> (ok: Bool, text: String?, error: String?) {
|
||||
do {
|
||||
try await ensureRunning()
|
||||
let payload: [String: Any] = [
|
||||
"type": "send",
|
||||
"text": text,
|
||||
"session": session,
|
||||
"thinking": thinking ?? "default",
|
||||
]
|
||||
let data = try JSONSerialization.data(withJSONObject: payload)
|
||||
guard let stdinHandle else { throw RpcError(message: "stdin missing") }
|
||||
stdinHandle.write(data)
|
||||
stdinHandle.write(Data([0x0A]))
|
||||
|
||||
let line = try await nextLine()
|
||||
let parsed = try JSONSerialization.jsonObject(with: Data(line.utf8)) as? [String: Any]
|
||||
guard let parsed else { throw RpcError(message: "invalid JSON") }
|
||||
|
||||
if let ok = parsed["ok"] as? Bool, let type = parsed["type"] as? String, type == "result" {
|
||||
if ok {
|
||||
if let payloadDict = parsed["payload"] as? [String: Any],
|
||||
let payloads = payloadDict["payloads"] as? [[String: Any]],
|
||||
let first = payloads.first,
|
||||
let txt = first["text"] as? String {
|
||||
return (true, txt, nil)
|
||||
}
|
||||
return (true, nil, nil)
|
||||
}
|
||||
}
|
||||
if let err = parsed["error"] as? String {
|
||||
return (false, nil, err)
|
||||
}
|
||||
return (false, nil, "rpc returned unexpected response")
|
||||
} catch {
|
||||
logger.error("rpc send failed: \(error.localizedDescription, privacy: .public)")
|
||||
await stop()
|
||||
return (false, nil, error.localizedDescription)
|
||||
}
|
||||
}
|
||||
|
||||
// MARK: - Process lifecycle
|
||||
|
||||
private func ensureRunning() async throws {
|
||||
if let process, process.isRunning { return }
|
||||
try await start()
|
||||
}
|
||||
|
||||
private func start() async throws {
|
||||
let process = Process()
|
||||
process.executableURL = URL(fileURLWithPath: "/usr/bin/env")
|
||||
process.arguments = ["pnpm", "clawdis", "rpc"]
|
||||
let projectRoot = await RelayProcessManager.shared.projectRootPath()
|
||||
process.currentDirectoryURL = URL(fileURLWithPath: projectRoot)
|
||||
|
||||
let stdinPipe = Pipe()
|
||||
let stdoutPipe = Pipe()
|
||||
process.standardInput = stdinPipe
|
||||
process.standardOutput = stdoutPipe
|
||||
process.standardError = Pipe()
|
||||
|
||||
try process.run()
|
||||
|
||||
self.process = process
|
||||
self.stdinHandle = stdinPipe.fileHandleForWriting
|
||||
self.stdoutHandle = stdoutPipe.fileHandleForReading
|
||||
|
||||
stdoutPipe.fileHandleForReading.readabilityHandler = { [weak self] handle in
|
||||
guard let self else { return }
|
||||
let data = handle.availableData
|
||||
if data.isEmpty { return }
|
||||
Task { await self.ingest(data: data) }
|
||||
}
|
||||
|
||||
Task.detached { [weak self] in
|
||||
process.waitUntilExit()
|
||||
await self?.stop()
|
||||
}
|
||||
}
|
||||
|
||||
private func stop() async {
|
||||
stdoutHandle?.readabilityHandler = nil
|
||||
process?.terminate()
|
||||
process = nil
|
||||
stdinHandle = nil
|
||||
stdoutHandle = nil
|
||||
buffer.removeAll(keepingCapacity: false)
|
||||
let waiters = self.waiters
|
||||
self.waiters.removeAll()
|
||||
for waiter in waiters {
|
||||
waiter.resume(throwing: RpcError(message: "rpc process stopped"))
|
||||
}
|
||||
}
|
||||
|
||||
private func ingest(data: Data) {
|
||||
buffer.append(data)
|
||||
while let range = buffer.firstRange(of: Data([0x0A])) {
|
||||
let lineData = buffer.subdata(in: buffer.startIndex..<range.lowerBound)
|
||||
buffer.removeSubrange(buffer.startIndex...range.lowerBound)
|
||||
guard let line = String(data: lineData, encoding: .utf8) else { continue }
|
||||
if let waiter = waiters.first {
|
||||
waiters.removeFirst()
|
||||
waiter.resume(returning: line)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private func nextLine() async throws -> String {
|
||||
try await withCheckedThrowingContinuation { (cont: CheckedContinuation<String, Error>) in
|
||||
waiters.append(cont)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -71,14 +71,24 @@ final class ClawdisXPCService: NSObject, ClawdisXPCProtocol {
|
||||
case let .agent(message, thinking, session):
|
||||
let trimmed = message.trimmingCharacters(in: .whitespacesAndNewlines)
|
||||
guard !trimmed.isEmpty else { return Response(ok: false, message: "message empty") }
|
||||
let sessionKey = session ?? "main"
|
||||
|
||||
// Try RPC first for lower latency; fall back to one-shot CLI.
|
||||
let rpcResult = await AgentRPC.shared.send(
|
||||
text: trimmed,
|
||||
thinking: thinking,
|
||||
session: sessionKey)
|
||||
if rpcResult.ok {
|
||||
return Response(ok: true, message: rpcResult.text ?? "sent")
|
||||
}
|
||||
|
||||
let result = await self.runAgentCLI(
|
||||
message: trimmed,
|
||||
thinking: thinking,
|
||||
session: session ?? "main")
|
||||
session: sessionKey)
|
||||
return result.ok
|
||||
? Response(ok: true, message: result.text ?? "sent")
|
||||
: Response(ok: false, message: result.error ?? "failed to send")
|
||||
: Response(ok: false, message: result.error ?? rpcResult.error ?? "failed to send")
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user