macOS: fold agent control into GatewayConnection

This commit is contained in:
Peter Steinberger
2025-12-17 17:10:30 +01:00
parent 557ffdbe35
commit 17a27fd312
19 changed files with 443 additions and 201 deletions

View File

@@ -332,14 +332,7 @@ final class AppState {
self.voiceWakeGlobalSyncTask?.cancel() self.voiceWakeGlobalSyncTask?.cancel()
self.voiceWakeGlobalSyncTask = Task { [sanitized] in self.voiceWakeGlobalSyncTask = Task { [sanitized] in
try? await Task.sleep(nanoseconds: 650_000_000) try? await Task.sleep(nanoseconds: 650_000_000)
do { await GatewayConnection.shared.voiceWakeSetTriggers(sanitized)
_ = try await GatewayConnection.shared.request(
method: "voicewake.set",
params: ["triggers": AnyCodable(sanitized)],
timeoutMs: 10000)
} catch {
// Best-effort only.
}
} }
} }

View File

@@ -167,13 +167,13 @@ actor BridgeServer {
let sessionKey = payload.sessionKey?.trimmingCharacters(in: .whitespacesAndNewlines).nonEmpty let sessionKey = payload.sessionKey?.trimmingCharacters(in: .whitespacesAndNewlines).nonEmpty
?? "node-\(nodeId)" ?? "node-\(nodeId)"
_ = await GatewayConnection.shared.sendAgent( _ = await GatewayConnection.shared.sendAgent(GatewayAgentInvocation(
message: text, message: text,
thinking: "low",
sessionKey: sessionKey, sessionKey: sessionKey,
thinking: "low",
deliver: false, deliver: false,
to: nil, to: nil,
channel: "last") channel: .last))
case "agent.request": case "agent.request":
guard let json = evt.payloadJSON, let data = json.data(using: .utf8) else { guard let json = evt.payloadJSON, let data = json.data(using: .utf8) else {
@@ -191,15 +191,15 @@ actor BridgeServer {
?? "node-\(nodeId)" ?? "node-\(nodeId)"
let thinking = link.thinking?.trimmingCharacters(in: .whitespacesAndNewlines).nonEmpty let thinking = link.thinking?.trimmingCharacters(in: .whitespacesAndNewlines).nonEmpty
let to = link.to?.trimmingCharacters(in: .whitespacesAndNewlines).nonEmpty let to = link.to?.trimmingCharacters(in: .whitespacesAndNewlines).nonEmpty
let channel = link.channel?.trimmingCharacters(in: .whitespacesAndNewlines).nonEmpty let channel = GatewayAgentChannel(raw: link.channel)
_ = await GatewayConnection.shared.sendAgent( _ = await GatewayConnection.shared.sendAgent(GatewayAgentInvocation(
message: message, message: message,
thinking: thinking,
sessionKey: sessionKey, sessionKey: sessionKey,
thinking: thinking,
deliver: link.deliver, deliver: link.deliver,
to: to, to: to,
channel: channel ?? "last") channel: channel))
default: default:
break break
@@ -347,17 +347,17 @@ actor BridgeServer {
"reason \(reason)", "reason \(reason)",
].compactMap(\.self).joined(separator: " · ") ].compactMap(\.self).joined(separator: " · ")
var params: [String: Any] = [ var params: [String: AnyCodable] = [
"text": summary, "text": AnyCodable(summary),
"instanceId": nodeId, "instanceId": AnyCodable(nodeId),
"host": host, "host": AnyCodable(host),
"mode": "node", "mode": AnyCodable("node"),
"reason": reason, "reason": AnyCodable(reason),
"tags": tags, "tags": AnyCodable(tags),
] ]
if let ip { params["ip"] = ip } if let ip { params["ip"] = AnyCodable(ip) }
if let version { params["version"] = version } if let version { params["version"] = AnyCodable(version) }
_ = try await GatewayConnection.shared.controlRequest(method: "system-event", params: params) await GatewayConnection.shared.sendSystemEvent(params)
} catch { } catch {
// Best-effort only. // Best-effort only.
} }

View File

@@ -550,13 +550,17 @@ private final class CanvasA2UIActionMessageHandler: NSObject, WKScriptMessageHan
: "A2UI action: \(name)\n\n```json\n\(json)\n```" : "A2UI action: \(name)\n\n```json\n\(json)\n```"
Task { Task {
let result = await GatewayConnection.shared.sendAgent( if AppStateStore.shared.connectionMode == .local {
GatewayProcessManager.shared.setActive(true)
}
let result = await GatewayConnection.shared.sendAgent(GatewayAgentInvocation(
message: text, message: text,
thinking: nil,
sessionKey: self.sessionKey, sessionKey: self.sessionKey,
thinking: "low",
deliver: false, deliver: false,
to: nil, to: nil,
channel: "webchat") channel: .last))
if !result.ok { if !result.ok {
canvasWindowLogger.error( canvasWindowLogger.error(
"A2UI action send failed name=\(name, privacy: .public) error=\(result.error ?? "unknown", privacy: .public)") "A2UI action send failed name=\(name, privacy: .public) error=\(result.error ?? "unknown", privacy: .public)")
@@ -678,11 +682,11 @@ private final class HoverChromeContainerView: NSView {
v.state = .active v.state = .active
v.appearance = NSAppearance(named: .vibrantDark) v.appearance = NSAppearance(named: .vibrantDark)
v.wantsLayer = true v.wantsLayer = true
v.layer?.cornerRadius = 10 v.layer?.cornerRadius = 11
v.layer?.masksToBounds = true v.layer?.masksToBounds = true
v.layer?.borderWidth = 1 v.layer?.borderWidth = 1
v.layer?.borderColor = NSColor.white.withAlphaComponent(0.18).cgColor v.layer?.borderColor = NSColor.white.withAlphaComponent(0.22).cgColor
v.layer?.backgroundColor = NSColor.black.withAlphaComponent(0.22).cgColor v.layer?.backgroundColor = NSColor.black.withAlphaComponent(0.28).cgColor
v.layer?.shadowColor = NSColor.black.withAlphaComponent(0.35).cgColor v.layer?.shadowColor = NSColor.black.withAlphaComponent(0.35).cgColor
v.layer?.shadowOpacity = 0.35 v.layer?.shadowOpacity = 0.35
v.layer?.shadowRadius = 8 v.layer?.shadowRadius = 8
@@ -691,7 +695,7 @@ private final class HoverChromeContainerView: NSView {
}() }()
private let closeButton: NSButton = { private let closeButton: NSButton = {
let cfg = NSImage.SymbolConfiguration(pointSize: 10, weight: .semibold) let cfg = NSImage.SymbolConfiguration(pointSize: 9, weight: .semibold)
let img = NSImage(systemSymbolName: "xmark", accessibilityDescription: "Close")? let img = NSImage(systemSymbolName: "xmark", accessibilityDescription: "Close")?
.withSymbolConfiguration(cfg) .withSymbolConfiguration(cfg)
?? NSImage(size: NSSize(width: 18, height: 18)) ?? NSImage(size: NSSize(width: 18, height: 18))
@@ -699,7 +703,7 @@ private final class HoverChromeContainerView: NSView {
btn.isBordered = false btn.isBordered = false
btn.bezelStyle = .regularSquare btn.bezelStyle = .regularSquare
btn.imageScaling = .scaleProportionallyDown btn.imageScaling = .scaleProportionallyDown
btn.contentTintColor = NSColor.labelColor btn.contentTintColor = NSColor.white.withAlphaComponent(0.92)
btn.toolTip = "Close" btn.toolTip = "Close"
return btn return btn
}() }()
@@ -740,13 +744,13 @@ private final class HoverChromeContainerView: NSView {
self.closeBackground.centerXAnchor.constraint(equalTo: self.closeButton.centerXAnchor), self.closeBackground.centerXAnchor.constraint(equalTo: self.closeButton.centerXAnchor),
self.closeBackground.centerYAnchor.constraint(equalTo: self.closeButton.centerYAnchor), self.closeBackground.centerYAnchor.constraint(equalTo: self.closeButton.centerYAnchor),
self.closeBackground.widthAnchor.constraint(equalToConstant: 20), self.closeBackground.widthAnchor.constraint(equalToConstant: 22),
self.closeBackground.heightAnchor.constraint(equalToConstant: 20), self.closeBackground.heightAnchor.constraint(equalToConstant: 22),
self.closeButton.trailingAnchor.constraint(equalTo: self.trailingAnchor, constant: -8), self.closeButton.trailingAnchor.constraint(equalTo: self.trailingAnchor, constant: -9),
self.closeButton.topAnchor.constraint(equalTo: self.topAnchor, constant: 8), self.closeButton.topAnchor.constraint(equalTo: self.topAnchor, constant: 9),
self.closeButton.widthAnchor.constraint(equalToConstant: 20), self.closeButton.widthAnchor.constraint(equalToConstant: 18),
self.closeButton.heightAnchor.constraint(equalToConstant: 20), self.closeButton.heightAnchor.constraint(equalToConstant: 18),
self.resizeHandle.trailingAnchor.constraint(equalTo: self.trailingAnchor), self.resizeHandle.trailingAnchor.constraint(equalTo: self.trailingAnchor),
self.resizeHandle.bottomAnchor.constraint(equalTo: self.bottomAnchor), self.resizeHandle.bottomAnchor.constraint(equalTo: self.bottomAnchor),

View File

@@ -169,16 +169,15 @@ enum ControlRequestHandler {
let trimmed = message.trimmingCharacters(in: .whitespacesAndNewlines) let trimmed = message.trimmingCharacters(in: .whitespacesAndNewlines)
guard !trimmed.isEmpty else { return Response(ok: false, message: "message empty") } guard !trimmed.isEmpty else { return Response(ok: false, message: "message empty") }
let sessionKey = session ?? "main" let sessionKey = session ?? "main"
let rpcResult = await GatewayConnection.shared.sendAgent( let invocation = GatewayAgentInvocation(
message: trimmed, message: trimmed,
thinking: thinking,
sessionKey: sessionKey, sessionKey: sessionKey,
thinking: thinking,
deliver: deliver, deliver: deliver,
to: to, to: to,
channel: nil) channel: .last)
return rpcResult.ok let rpcResult = await GatewayConnection.shared.sendAgent(invocation)
? Response(ok: true, message: "sent") return rpcResult.ok ? Response(ok: true, message: "sent") : Response(ok: false, message: rpcResult.error)
: Response(ok: false, message: rpcResult.error ?? "failed to send")
} }
private static func canvasEnabled() -> Bool { private static func canvasEnabled() -> Bool {

View File

@@ -67,16 +67,12 @@ final class CronJobsStore {
defer { self.isLoadingJobs = false } defer { self.isLoadingJobs = false }
do { do {
if let status = try? await self.fetchCronStatus() { if let status = try? await GatewayConnection.shared.cronStatus() {
self.schedulerEnabled = status.enabled self.schedulerEnabled = status.enabled
self.schedulerStorePath = status.storePath self.schedulerStorePath = status.storePath
self.schedulerNextWakeAtMs = status.nextWakeAtMs self.schedulerNextWakeAtMs = status.nextWakeAtMs
} }
let data = try await self.request( self.jobs = try await GatewayConnection.shared.cronList(includeDisabled: true)
method: "cron.list",
params: ["includeDisabled": true])
let res = try JSONDecoder().decode(CronListResponse.self, from: data)
self.jobs = res.jobs
if self.jobs.isEmpty { if self.jobs.isEmpty {
self.statusMessage = "No cron jobs yet." self.statusMessage = "No cron jobs yet."
} }
@@ -92,11 +88,7 @@ final class CronJobsStore {
defer { self.isLoadingRuns = false } defer { self.isLoadingRuns = false }
do { do {
let data = try await self.request( self.runEntries = try await GatewayConnection.shared.cronRuns(jobId: jobId, limit: limit)
method: "cron.runs",
params: ["id": jobId, "limit": limit])
let res = try JSONDecoder().decode(CronRunsResponse.self, from: data)
self.runEntries = res.entries
} catch { } catch {
self.logger.error("cron.runs failed \(error.localizedDescription, privacy: .public)") self.logger.error("cron.runs failed \(error.localizedDescription, privacy: .public)")
self.lastError = error.localizedDescription self.lastError = error.localizedDescription
@@ -105,10 +97,7 @@ final class CronJobsStore {
func runJob(id: String, force: Bool = true) async { func runJob(id: String, force: Bool = true) async {
do { do {
_ = try await self.request( try await GatewayConnection.shared.cronRun(jobId: id, force: force)
method: "cron.run",
params: ["id": id, "mode": force ? "force" : "due"],
timeoutMs: 20000)
} catch { } catch {
self.lastError = error.localizedDescription self.lastError = error.localizedDescription
} }
@@ -116,7 +105,7 @@ final class CronJobsStore {
func removeJob(id: String) async { func removeJob(id: String) async {
do { do {
_ = try await self.request(method: "cron.remove", params: ["id": id]) try await GatewayConnection.shared.cronRemove(jobId: id)
await self.refreshJobs() await self.refreshJobs()
if self.selectedJobId == id { if self.selectedJobId == id {
self.selectedJobId = nil self.selectedJobId = nil
@@ -129,9 +118,7 @@ final class CronJobsStore {
func setJobEnabled(id: String, enabled: Bool) async { func setJobEnabled(id: String, enabled: Bool) async {
do { do {
_ = try await self.request( try await GatewayConnection.shared.cronUpdate(jobId: id, patch: ["enabled": enabled])
method: "cron.update",
params: ["id": id, "patch": ["enabled": enabled]])
await self.refreshJobs() await self.refreshJobs()
} catch { } catch {
self.lastError = error.localizedDescription self.lastError = error.localizedDescription
@@ -143,9 +130,9 @@ final class CronJobsStore {
payload: [String: Any]) async throws payload: [String: Any]) async throws
{ {
if let id { if let id {
_ = try await self.request(method: "cron.update", params: ["id": id, "patch": payload]) try await GatewayConnection.shared.cronUpdate(jobId: id, patch: payload)
} else { } else {
_ = try await self.request(method: "cron.add", params: payload) try await GatewayConnection.shared.cronAdd(payload: payload)
} }
await self.refreshJobs() await self.refreshJobs()
} }
@@ -206,26 +193,5 @@ final class CronJobsStore {
} }
} }
// MARK: - RPC // MARK: - (no additional RPC helpers)
private func request(
method: String,
params: [String: Any]?,
timeoutMs: Double? = nil) async throws -> Data
{
let rawParams = params?.reduce(into: [String: AnyCodable]()) { $0[$1.key] = AnyCodable($1.value) }
return try await GatewayConnection.shared.request(method: method, params: rawParams, timeoutMs: timeoutMs)
}
private func fetchCronStatus() async throws -> CronStatusResponse {
let data = try await self.request(method: "cron.status", params: nil)
return try JSONDecoder().decode(CronStatusResponse.self, from: data)
}
}
private struct CronStatusResponse: Decodable {
let enabled: Bool
let storePath: String
let jobs: Int
let nextWakeAtMs: Int?
} }

View File

@@ -530,7 +530,7 @@ struct CronJobEditor: View {
@State private var systemEventText: String = "" @State private var systemEventText: String = ""
@State private var agentMessage: String = "" @State private var agentMessage: String = ""
@State private var deliver: Bool = false @State private var deliver: Bool = false
@State private var channel: String = "last" @State private var channel: GatewayAgentChannel = .last
@State private var to: String = "" @State private var to: String = ""
@State private var thinking: String = "" @State private var thinking: String = ""
@State private var timeoutSeconds: String = "" @State private var timeoutSeconds: String = ""
@@ -801,9 +801,9 @@ struct CronJobEditor: View {
GridRow { GridRow {
self.gridLabel("Channel") self.gridLabel("Channel")
Picker("", selection: self.$channel) { Picker("", selection: self.$channel) {
Text("last").tag("last") Text("last").tag(GatewayAgentChannel.last)
Text("whatsapp").tag("whatsapp") Text("whatsapp").tag(GatewayAgentChannel.whatsapp)
Text("telegram").tag("telegram") Text("telegram").tag(GatewayAgentChannel.telegram)
} }
.labelsHidden() .labelsHidden()
.pickerStyle(.segmented) .pickerStyle(.segmented)
@@ -861,7 +861,7 @@ struct CronJobEditor: View {
self.thinking = thinking ?? "" self.thinking = thinking ?? ""
self.timeoutSeconds = timeoutSeconds.map(String.init) ?? "" self.timeoutSeconds = timeoutSeconds.map(String.init) ?? ""
self.deliver = deliver ?? false self.deliver = deliver ?? false
self.channel = channel ?? "last" self.channel = GatewayAgentChannel(raw: channel)
self.to = to ?? "" self.to = to ?? ""
self.bestEffortDeliver = bestEffortDeliver ?? false self.bestEffortDeliver = bestEffortDeliver ?? false
} }
@@ -980,7 +980,7 @@ struct CronJobEditor: View {
if let n = Int(self.timeoutSeconds), n > 0 { payload["timeoutSeconds"] = n } if let n = Int(self.timeoutSeconds), n > 0 { payload["timeoutSeconds"] = n }
payload["deliver"] = self.deliver payload["deliver"] = self.deliver
if self.deliver { if self.deliver {
payload["channel"] = self.channel payload["channel"] = self.channel.rawValue
let to = self.to.trimmingCharacters(in: .whitespacesAndNewlines) let to = self.to.trimmingCharacters(in: .whitespacesAndNewlines)
if !to.isEmpty { payload["to"] = to } if !to.isEmpty { payload["to"] = to }
payload["bestEffortDeliver"] = self.bestEffortDeliver payload["bestEffortDeliver"] = self.bestEffortDeliver

View File

@@ -54,18 +54,24 @@ final class DeepLinkHandler {
} }
do { do {
var params: [String: AnyCodable] = [ let channel = GatewayAgentChannel(raw: link.channel)
"message": AnyCodable(messagePreview), let invocation = GatewayAgentInvocation(
"idempotencyKey": AnyCodable(UUID().uuidString), message: messagePreview,
] sessionKey: link.sessionKey?.trimmingCharacters(in: .whitespacesAndNewlines).nonEmpty ?? "main",
if let sessionKey = link.sessionKey, !sessionKey.isEmpty { params["sessionKey"] = AnyCodable(sessionKey) } thinking: link.thinking?.trimmingCharacters(in: .whitespacesAndNewlines).nonEmpty,
if let thinking = link.thinking, !thinking.isEmpty { params["thinking"] = AnyCodable(thinking) } deliver: channel.shouldDeliver(link.deliver),
if let to = link.to, !to.isEmpty { params["to"] = AnyCodable(to) } to: link.to?.trimmingCharacters(in: .whitespacesAndNewlines).nonEmpty,
if let channel = link.channel, !channel.isEmpty { params["channel"] = AnyCodable(channel) } channel: channel,
if let timeout = link.timeoutSeconds { params["timeout"] = AnyCodable(timeout) } timeoutSeconds: link.timeoutSeconds,
params["deliver"] = AnyCodable(link.deliver) idempotencyKey: UUID().uuidString)
_ = try await GatewayConnection.shared.request(method: "agent", params: params) let res = await GatewayConnection.shared.sendAgent(invocation)
if !res.ok {
throw NSError(
domain: "DeepLink",
code: 1,
userInfo: [NSLocalizedDescriptionKey: res.error ?? "agent request failed"])
}
} catch { } catch {
self.presentAlert(title: "Agent request failed", message: error.localizedDescription) self.presentAlert(title: "Agent request failed", message: error.localizedDescription)
} }

View File

@@ -423,8 +423,12 @@ actor GatewayChannelActor {
throw NSError(domain: "Gateway", code: 2, userInfo: [NSLocalizedDescriptionKey: "unexpected frame"]) throw NSError(domain: "Gateway", code: 2, userInfo: [NSLocalizedDescriptionKey: "unexpected frame"])
} }
if res.ok == false { if res.ok == false {
let msg = (res.error?["message"]?.value as? String) ?? "gateway error" let code = res.error?["code"]?.value as? String
throw NSError(domain: "Gateway", code: 3, userInfo: [NSLocalizedDescriptionKey: msg]) let msg = res.error?["message"]?.value as? String
let details: [String: AnyCodable] = (res.error ?? [:]).reduce(into: [:]) { acc, pair in
acc[pair.key] = AnyCodable(pair.value.value)
}
throw GatewayResponseError(method: method, code: code, message: msg, details: details)
} }
if let payload = res.payload { if let payload = res.payload {
// Encode back to JSON with Swift's encoder to preserve types and avoid ObjC bridging exceptions. // Encode back to JSON with Swift's encoder to preserve types and avoid ObjC bridging exceptions.

View File

@@ -1,7 +1,37 @@
import ClawdisChatUI
import ClawdisProtocol import ClawdisProtocol
import Foundation import Foundation
import OSLog import OSLog
private let gatewayConnectionLogger = Logger(subsystem: "com.steipete.clawdis", category: "gateway.connection")
enum GatewayAgentChannel: String, Codable, CaseIterable, Sendable {
case last
case whatsapp
case telegram
case webchat
init(raw: String?) {
let normalized = (raw ?? "").trimmingCharacters(in: .whitespacesAndNewlines).lowercased()
self = GatewayAgentChannel(rawValue: normalized) ?? .last
}
var isDeliverable: Bool { self == .whatsapp || self == .telegram }
func shouldDeliver(_ deliver: Bool) -> Bool { deliver && self.isDeliverable }
}
struct GatewayAgentInvocation: Sendable {
var message: String
var sessionKey: String = "main"
var thinking: String?
var deliver: Bool = false
var to: String?
var channel: GatewayAgentChannel = .last
var timeoutSeconds: Int?
var idempotencyKey: String = UUID().uuidString
}
/// Single, shared Gateway websocket connection for the whole app. /// Single, shared Gateway websocket connection for the whole app.
/// ///
/// This owns exactly one `GatewayChannelActor` and reuses it across all callers /// This owns exactly one `GatewayChannelActor` and reuses it across all callers
@@ -11,8 +41,31 @@ actor GatewayConnection {
typealias Config = (url: URL, token: String?) typealias Config = (url: URL, token: String?)
enum Method: String, Sendable {
case agent = "agent"
case status = "status"
case setHeartbeats = "set-heartbeats"
case systemEvent = "system-event"
case health = "health"
case chatHistory = "chat.history"
case chatSend = "chat.send"
case chatAbort = "chat.abort"
case voicewakeGet = "voicewake.get"
case voicewakeSet = "voicewake.set"
case nodePairApprove = "node.pair.approve"
case nodePairReject = "node.pair.reject"
case cronList = "cron.list"
case cronRuns = "cron.runs"
case cronRun = "cron.run"
case cronRemove = "cron.remove"
case cronUpdate = "cron.update"
case cronAdd = "cron.add"
case cronStatus = "cron.status"
}
private let configProvider: @Sendable () async throws -> Config private let configProvider: @Sendable () async throws -> Config
private let sessionBox: WebSocketSessionBox? private let sessionBox: WebSocketSessionBox?
private let decoder = JSONDecoder()
private var client: GatewayChannelActor? private var client: GatewayChannelActor?
private var configuredURL: URL? private var configuredURL: URL?
@@ -29,6 +82,8 @@ actor GatewayConnection {
self.sessionBox = sessionBox self.sessionBox = sessionBox
} }
// MARK: - Low-level request
func request( func request(
method: String, method: String,
params: [String: AnyCodable]?, params: [String: AnyCodable]?,
@@ -42,6 +97,43 @@ actor GatewayConnection {
return try await client.request(method: method, params: params, timeoutMs: timeoutMs) return try await client.request(method: method, params: params, timeoutMs: timeoutMs)
} }
func requestRaw(
method: Method,
params: [String: AnyCodable]? = nil,
timeoutMs: Double? = nil) async throws -> Data
{
try await self.request(method: method.rawValue, params: params, timeoutMs: timeoutMs)
}
func requestRaw(
method: String,
params: [String: AnyCodable]? = nil,
timeoutMs: Double? = nil) async throws -> Data
{
try await self.request(method: method, params: params, timeoutMs: timeoutMs)
}
func requestDecoded<T: Decodable>(
method: Method,
params: [String: AnyCodable]? = nil,
timeoutMs: Double? = nil) async throws -> T
{
let data = try await self.requestRaw(method: method, params: params, timeoutMs: timeoutMs)
do {
return try self.decoder.decode(T.self, from: data)
} catch {
throw GatewayDecodingError(method: method.rawValue, message: error.localizedDescription)
}
}
func requestVoid(
method: Method,
params: [String: AnyCodable]? = nil,
timeoutMs: Double? = nil) async throws
{
_ = try await self.requestRaw(method: method, params: params, timeoutMs: timeoutMs)
}
/// Ensure the underlying socket is configured (and replaced if config changed). /// Ensure the underlying socket is configured (and replaced if config changed).
func refresh() async throws { func refresh() async throws {
let cfg = try await self.configProvider() let cfg = try await self.configProvider()
@@ -114,33 +206,13 @@ actor GatewayConnection {
} }
} }
private let gatewayControlLogger = Logger(subsystem: "com.steipete.clawdis", category: "gateway.control") // MARK: - Typed gateway API
extension GatewayConnection { extension GatewayConnection {
private static func wrapParams(_ raw: [String: Any]?) -> [String: AnyCodable]? {
guard let raw else { return nil }
return raw.reduce(into: [String: AnyCodable]()) { acc, pair in
acc[pair.key] = AnyCodable(pair.value)
}
}
func controlRequest(
method: String,
params: [String: Any]? = nil,
timeoutMs: Double? = nil) async throws -> Data
{
try await self.request(method: method, params: Self.wrapParams(params), timeoutMs: timeoutMs)
}
func status() async -> (ok: Bool, error: String?) { func status() async -> (ok: Bool, error: String?) {
do { do {
let data = try await self.controlRequest(method: "status") _ = try await self.requestRaw(method: .status)
if let obj = try? JSONSerialization.jsonObject(with: data) as? [String: Any], return (true, nil)
(obj["ok"] as? Bool) ?? true
{
return (true, nil)
}
return (false, "status error")
} catch { } catch {
return (false, error.localizedDescription) return (false, error.localizedDescription)
} }
@@ -148,39 +220,211 @@ extension GatewayConnection {
func setHeartbeatsEnabled(_ enabled: Bool) async -> Bool { func setHeartbeatsEnabled(_ enabled: Bool) async -> Bool {
do { do {
_ = try await self.controlRequest(method: "set-heartbeats", params: ["enabled": enabled]) try await self.requestVoid(method: .setHeartbeats, params: ["enabled": AnyCodable(enabled)])
return true return true
} catch { } catch {
gatewayControlLogger.error("setHeartbeatsEnabled failed \(error.localizedDescription, privacy: .public)") gatewayConnectionLogger.error("setHeartbeatsEnabled failed \(error.localizedDescription, privacy: .public)")
return false return false
} }
} }
func sendAgent(_ invocation: GatewayAgentInvocation) async -> (ok: Bool, error: String?) {
let trimmed = invocation.message.trimmingCharacters(in: .whitespacesAndNewlines)
guard !trimmed.isEmpty else { return (false, "message empty") }
var params: [String: AnyCodable] = [
"message": AnyCodable(trimmed),
"sessionKey": AnyCodable(invocation.sessionKey),
"thinking": AnyCodable(invocation.thinking ?? "default"),
"deliver": AnyCodable(invocation.deliver),
"to": AnyCodable(invocation.to ?? ""),
"channel": AnyCodable(invocation.channel.rawValue),
"idempotencyKey": AnyCodable(invocation.idempotencyKey),
]
if let timeout = invocation.timeoutSeconds {
params["timeout"] = AnyCodable(timeout)
}
do {
try await self.requestVoid(method: .agent, params: params)
return (true, nil)
} catch {
return (false, error.localizedDescription)
}
}
func sendAgent( func sendAgent(
message: String, message: String,
thinking: String?, thinking: String?,
sessionKey: String, sessionKey: String,
deliver: Bool, deliver: Bool,
to: String?, to: String?,
channel: String? = nil, channel: GatewayAgentChannel = .last,
timeoutSeconds: Int? = nil,
idempotencyKey: String = UUID().uuidString) async -> (ok: Bool, error: String?) idempotencyKey: String = UUID().uuidString) async -> (ok: Bool, error: String?)
{ {
let trimmed = message.trimmingCharacters(in: .whitespacesAndNewlines) await self.sendAgent(GatewayAgentInvocation(
guard !trimmed.isEmpty else { return (false, "message empty") } message: message,
sessionKey: sessionKey,
thinking: thinking,
deliver: deliver,
to: to,
channel: channel,
timeoutSeconds: timeoutSeconds,
idempotencyKey: idempotencyKey))
}
func sendSystemEvent(_ params: [String: AnyCodable]) async {
do { do {
let params: [String: Any] = [ try await self.requestVoid(method: .systemEvent, params: params)
"message": trimmed,
"sessionKey": sessionKey,
"thinking": thinking ?? "default",
"deliver": deliver,
"to": to ?? "",
"channel": channel ?? "",
"idempotencyKey": idempotencyKey,
]
_ = try await self.controlRequest(method: "agent", params: params)
return (true, nil)
} catch { } catch {
return (false, error.localizedDescription) // Best-effort only.
} }
} }
// MARK: - Health
func healthSnapshot(timeoutMs: Double? = nil) async throws -> HealthSnapshot {
let data = try await self.requestRaw(method: .health, timeoutMs: timeoutMs)
if let snap = decodeHealthSnapshot(from: data) { return snap }
throw GatewayDecodingError(method: Method.health.rawValue, message: "failed to decode health snapshot")
}
func healthOK(timeoutMs: Int = 8000) async throws -> Bool {
let data = try await self.requestRaw(method: .health, timeoutMs: Double(timeoutMs))
return (try? self.decoder.decode(ClawdisGatewayHealthOK.self, from: data))?.ok ?? true
}
// MARK: - Chat
func chatHistory(sessionKey: String) async throws -> ClawdisChatHistoryPayload {
try await self.requestDecoded(
method: .chatHistory,
params: ["sessionKey": AnyCodable(sessionKey)])
}
func chatSend(
sessionKey: String,
message: String,
thinking: String,
idempotencyKey: String,
attachments: [ClawdisChatAttachmentPayload],
timeoutMs: Int = 30000) async throws -> ClawdisChatSendResponse
{
var params: [String: AnyCodable] = [
"sessionKey": AnyCodable(sessionKey),
"message": AnyCodable(message),
"thinking": AnyCodable(thinking),
"idempotencyKey": AnyCodable(idempotencyKey),
"timeoutMs": AnyCodable(timeoutMs),
]
if !attachments.isEmpty {
let encoded = attachments.map { att in
[
"type": att.type,
"mimeType": att.mimeType,
"fileName": att.fileName,
"content": att.content,
]
}
params["attachments"] = AnyCodable(encoded)
}
return try await self.requestDecoded(method: .chatSend, params: params)
}
func chatAbort(sessionKey: String, runId: String) async throws -> Bool {
struct AbortResponse: Decodable { let ok: Bool?; let aborted: Bool? }
let res: AbortResponse = try await self.requestDecoded(
method: .chatAbort,
params: ["sessionKey": AnyCodable(sessionKey), "runId": AnyCodable(runId)])
return res.aborted ?? false
}
// MARK: - VoiceWake
func voiceWakeGetTriggers() async throws -> [String] {
struct VoiceWakePayload: Decodable { let triggers: [String] }
let payload: VoiceWakePayload = try await self.requestDecoded(method: .voicewakeGet)
return payload.triggers
}
func voiceWakeSetTriggers(_ triggers: [String]) async {
do {
try await self.requestVoid(
method: .voicewakeSet,
params: ["triggers": AnyCodable(triggers)],
timeoutMs: 10000)
} catch {
// Best-effort only.
}
}
// MARK: - Node pairing
func nodePairApprove(requestId: String) async throws {
try await self.requestVoid(
method: .nodePairApprove,
params: ["requestId": AnyCodable(requestId)],
timeoutMs: 10000)
}
func nodePairReject(requestId: String) async throws {
try await self.requestVoid(
method: .nodePairReject,
params: ["requestId": AnyCodable(requestId)],
timeoutMs: 10000)
}
// MARK: - Cron
struct CronSchedulerStatus: Decodable, Sendable {
let enabled: Bool
let storePath: String
let jobs: Int
let nextWakeAtMs: Int?
}
func cronStatus() async throws -> CronSchedulerStatus {
try await self.requestDecoded(method: .cronStatus)
}
func cronList(includeDisabled: Bool = true) async throws -> [CronJob] {
let res: CronListResponse = try await self.requestDecoded(
method: .cronList,
params: ["includeDisabled": AnyCodable(includeDisabled)])
return res.jobs
}
func cronRuns(jobId: String, limit: Int = 200) async throws -> [CronRunLogEntry] {
let res: CronRunsResponse = try await self.requestDecoded(
method: .cronRuns,
params: ["id": AnyCodable(jobId), "limit": AnyCodable(limit)])
return res.entries
}
func cronRun(jobId: String, force: Bool = true) async throws {
try await self.requestVoid(
method: .cronRun,
params: [
"id": AnyCodable(jobId),
"mode": AnyCodable(force ? "force" : "due"),
],
timeoutMs: 20000)
}
func cronRemove(jobId: String) async throws {
try await self.requestVoid(method: .cronRemove, params: ["id": AnyCodable(jobId)])
}
func cronUpdate(jobId: String, patch: [String: Any]) async throws {
try await self.requestVoid(
method: .cronUpdate,
params: ["id": AnyCodable(jobId), "patch": AnyCodable(patch)])
}
func cronAdd(payload: [String: Any]) async throws {
try await self.requestVoid(method: .cronAdd, params: payload.mapValues { AnyCodable($0) })
}
} }

View File

@@ -0,0 +1,34 @@
import ClawdisProtocol
import Foundation
/// Structured error surfaced when the gateway responds with `{ ok: false }`.
struct GatewayResponseError: LocalizedError, @unchecked Sendable {
let method: String
let code: String
let message: String
let details: [String: AnyCodable]
init(method: String, code: String?, message: String?, details: [String: AnyCodable]?) {
self.method = method
self.code = (code?.trimmingCharacters(in: .whitespacesAndNewlines).isEmpty == false)
? code!.trimmingCharacters(in: .whitespacesAndNewlines)
: "GATEWAY_ERROR"
self.message = (message?.trimmingCharacters(in: .whitespacesAndNewlines).isEmpty == false)
? message!.trimmingCharacters(in: .whitespacesAndNewlines)
: "gateway error"
self.details = details ?? [:]
}
var errorDescription: String? {
if self.code == "GATEWAY_ERROR" { return "\(self.method): \(self.message)" }
return "\(self.method): [\(self.code)] \(self.message)"
}
}
struct GatewayDecodingError: LocalizedError, Sendable {
let method: String
let message: String
var errorDescription: String? { "\(self.method): \(self.message)" }
}

View File

@@ -152,9 +152,8 @@ final class GatewayProcessManager {
private func attachExistingGatewayIfAvailable() async -> Bool { private func attachExistingGatewayIfAvailable() async -> Bool {
let port = GatewayEnvironment.gatewayPort() let port = GatewayEnvironment.gatewayPort()
do { do {
let data = try await GatewayConnection.shared.request(method: "health", params: nil)
let details: String let details: String
if let snap = decodeHealthSnapshot(from: data) { if let snap = try? await GatewayConnection.shared.healthSnapshot() {
let linked = snap.web.linked ? "linked" : "not linked" let linked = snap.web.linked ? "linked" : "not linked"
let authAge = snap.web.authAgeMs.flatMap(msToAge) ?? "unknown age" let authAge = snap.web.authAgeMs.flatMap(msToAge) ?? "unknown age"
let instance = await PortGuardian.shared.describe(port: port) let instance = await PortGuardian.shared.describe(port: port)

View File

@@ -320,10 +320,7 @@ final class NodePairingApprovalPrompter {
private func approve(requestId: String) async { private func approve(requestId: String) async {
do { do {
_ = try await GatewayConnection.shared.request( try await GatewayConnection.shared.nodePairApprove(requestId: requestId)
method: "node.pair.approve",
params: ["requestId": AnyCodable(requestId)],
timeoutMs: 10000)
self.logger.info("approved node pairing requestId=\(requestId, privacy: .public)") self.logger.info("approved node pairing requestId=\(requestId, privacy: .public)")
} catch { } catch {
self.logger.error("approve failed requestId=\(requestId, privacy: .public)") self.logger.error("approve failed requestId=\(requestId, privacy: .public)")
@@ -333,10 +330,7 @@ final class NodePairingApprovalPrompter {
private func reject(requestId: String) async { private func reject(requestId: String) async {
do { do {
_ = try await GatewayConnection.shared.request( try await GatewayConnection.shared.nodePairReject(requestId: requestId)
method: "node.pair.reject",
params: ["requestId": AnyCodable(requestId)],
timeoutMs: 10000)
self.logger.info("rejected node pairing requestId=\(requestId, privacy: .public)") self.logger.info("rejected node pairing requestId=\(requestId, privacy: .public)")
} catch { } catch {
self.logger.error("reject failed requestId=\(requestId, privacy: .public)") self.logger.error("reject failed requestId=\(requestId, privacy: .public)")

View File

@@ -37,7 +37,7 @@ enum VoiceWakeForwarder {
var thinking: String = "low" var thinking: String = "low"
var deliver: Bool = true var deliver: Bool = true
var to: String? var to: String?
var channel: String = "last" var channel: GatewayAgentChannel = .last
} }
@discardableResult @discardableResult
@@ -46,15 +46,14 @@ enum VoiceWakeForwarder {
options: ForwardOptions = ForwardOptions()) async -> Result<Void, VoiceWakeForwardError> options: ForwardOptions = ForwardOptions()) async -> Result<Void, VoiceWakeForwardError>
{ {
let payload = Self.prefixedTranscript(transcript) let payload = Self.prefixedTranscript(transcript)
let channel = options.channel.trimmingCharacters(in: .whitespacesAndNewlines).lowercased() let deliver = options.channel.shouldDeliver(options.deliver)
let deliver = options.deliver && channel != "webchat" let result = await GatewayConnection.shared.sendAgent(GatewayAgentInvocation(
let result = await GatewayConnection.shared.sendAgent(
message: payload, message: payload,
thinking: options.thinking,
sessionKey: options.sessionKey, sessionKey: options.sessionKey,
thinking: options.thinking,
deliver: deliver, deliver: deliver,
to: options.to, to: options.to,
channel: channel) channel: options.channel))
if result.ok { if result.ok {
self.logger.info("voice wake forward ok") self.logger.info("voice wake forward ok")

View File

@@ -44,9 +44,8 @@ final class VoiceWakeGlobalSettingsSync {
private func refreshFromGateway() async { private func refreshFromGateway() async {
do { do {
let data = try await GatewayConnection.shared.request(method: "voicewake.get", params: nil, timeoutMs: 8000) let triggers = try await GatewayConnection.shared.voiceWakeGetTriggers()
let payload = try JSONDecoder().decode(VoiceWakePayload.self, from: data) AppStateStore.shared.applyGlobalVoiceWakeTriggers(triggers)
AppStateStore.shared.applyGlobalVoiceWakeTriggers(payload.triggers)
} catch { } catch {
// Best-effort only. // Best-effort only.
} }

View File

@@ -15,10 +15,7 @@ private enum WebChatSwiftUILayout {
struct MacGatewayChatTransport: ClawdisChatTransport, Sendable { struct MacGatewayChatTransport: ClawdisChatTransport, Sendable {
func requestHistory(sessionKey: String) async throws -> ClawdisChatHistoryPayload { func requestHistory(sessionKey: String) async throws -> ClawdisChatHistoryPayload {
let data = try await GatewayConnection.shared.request( try await GatewayConnection.shared.chatHistory(sessionKey: sessionKey)
method: "chat.history",
params: ["sessionKey": AnyCodable(sessionKey)])
return try JSONDecoder().decode(ClawdisChatHistoryPayload.self, from: data)
} }
func sendMessage( func sendMessage(
@@ -28,36 +25,16 @@ struct MacGatewayChatTransport: ClawdisChatTransport, Sendable {
idempotencyKey: String, idempotencyKey: String,
attachments: [ClawdisChatAttachmentPayload]) async throws -> ClawdisChatSendResponse attachments: [ClawdisChatAttachmentPayload]) async throws -> ClawdisChatSendResponse
{ {
var params: [String: AnyCodable] = [ try await GatewayConnection.shared.chatSend(
"sessionKey": AnyCodable(sessionKey), sessionKey: sessionKey,
"message": AnyCodable(message), message: message,
"thinking": AnyCodable(thinking), thinking: thinking,
"idempotencyKey": AnyCodable(idempotencyKey), idempotencyKey: idempotencyKey,
"timeoutMs": AnyCodable(30000), attachments: attachments)
]
if !attachments.isEmpty {
let encoded = attachments.map { att in
[
"type": att.type,
"mimeType": att.mimeType,
"fileName": att.fileName,
"content": att.content,
]
}
params["attachments"] = AnyCodable(encoded)
}
let data = try await GatewayConnection.shared.request(method: "chat.send", params: params)
return try JSONDecoder().decode(ClawdisChatSendResponse.self, from: data)
} }
func requestHealth(timeoutMs: Int) async throws -> Bool { func requestHealth(timeoutMs: Int) async throws -> Bool {
let data = try await GatewayConnection.shared.request( try await GatewayConnection.shared.healthOK(timeoutMs: timeoutMs)
method: "health",
params: nil,
timeoutMs: Double(timeoutMs))
return (try? JSONDecoder().decode(ClawdisGatewayHealthOK.self, from: data))?.ok ?? true
} }
func events() -> AsyncStream<ClawdisChatTransportEvent> { func events() -> AsyncStream<ClawdisChatTransportEvent> {

View File

@@ -0,0 +1,24 @@
# Gateway Client Refactor (Dec 2025)
Goal: remove stringly-typed gateway calls from the macOS app, centralize routing/channel semantics, and improve error handling.
## Progress
- [x] Fold legacy “AgentRPC” into `GatewayConnection` (single layer; no separate client object).
- [x] Typed gateway API: `GatewayConnection.Method` + `requestDecoded/requestVoid` + typed helpers (status/agent/chat/cron/etc).
- [x] Centralize agent routing/channel semantics via `GatewayAgentChannel` + `GatewayAgentInvocation`.
- [x] Improve gateway error model (structured `GatewayResponseError` + decoding errors include method).
- [x] Migrate mac call sites to typed helpers (leave only intentionally dynamic forwarding paths).
- [x] Convert remaining UI raw channel strings to `GatewayAgentChannel` (Cron editor).
- [x] Cleanup naming: rename remaining tests/docs that still reference “RPC/AgentRPC”.
### Notes
- Intentionally string-based:
- `BridgeServer` dynamic request forwarding (method is data-driven).
- `ControlChannel` request wrapper (generic escape hatch).
## Notes / Non-goals
- No functional behavior changes intended (beyond better errors and removing “magic strings”).
- Keep changes incremental: introduce typed APIs first, then migrate call sites, then remove old helpers.

View File

@@ -45,7 +45,7 @@ Goal: enforce the invariant **“one gateway websocket per app process (per effe
Key elements: Key elements:
- `GatewayConnection.shared` owns the one websocket and is the *only* supported entry point for app code that needs gateway RPC. - `GatewayConnection.shared` owns the one websocket and is the *only* supported entry point for app code that needs gateway RPC.
- Consumers (e.g. Control UI, Agent RPC, SwiftUI WebChat) call `GatewayConnection.shared.request(...)` and do not create their own sockets. - Consumers (e.g. Control UI, agent invocations, SwiftUI WebChat) call `GatewayConnection.shared.request(...)` and do not create their own sockets.
- If the effective connection config changes (local ↔ remote tunnel port, token change), `GatewayConnection` replaces the underlying connection. - If the effective connection config changes (local ↔ remote tunnel port, token change), `GatewayConnection` replaces the underlying connection.
- The transport (`GatewayChannelActor`) is an internal detail and forwards push frames back into `GatewayConnection`. - The transport (`GatewayChannelActor`) is an internal detail and forwards push frames back into `GatewayConnection`.
- Server-push frames are delivered via `GatewayConnection.shared.subscribe(...) -> AsyncStream<GatewayPush>` (in-process event bus). - Server-push frames are delivered via `GatewayConnection.shared.subscribe(...) -> AsyncStream<GatewayPush>` (in-process event bus).
@@ -84,7 +84,7 @@ Minimum invariants:
- Config changes (token / endpoint) cancel the old socket and reconnect once. - Config changes (token / endpoint) cancel the old socket and reconnect once.
Nice-to-have integration coverage: Nice-to-have integration coverage:
- Multiple “consumers” (Control UI + Agent RPC + SwiftUI WebChat) all call through the shared connection and still produce only one websocket. - Multiple “consumers” (Control UI + agent invocations + SwiftUI WebChat) all call through the shared connection and still produce only one websocket.
Additional coverage added (macOS): Additional coverage added (macOS):
- Subscribing after connect replays the latest snapshot. - Subscribing after connect replays the latest snapshot.

View File

@@ -105,7 +105,7 @@ Goal: replace legacy gateway/stdin/TCP control with a single WebSocket Gateway,
- Add lightweight WS client helper for `status/health/send/agent` when Gateway is up. ✅ `gateway` subcommands use the Gateway over WS. - Add lightweight WS client helper for `status/health/send/agent` when Gateway is up. ✅ `gateway` subcommands use the Gateway over WS.
- Consider a “local only” flag to avoid accidental remote connects. (optional; not needed with tunnel-first model.) - Consider a “local only” flag to avoid accidental remote connects. (optional; not needed with tunnel-first model.)
- **WebChat backend**: - **WebChat backend**:
- Single WS to Gateway; seed UI from snapshot; forward `presence/tick/agent` to browser. ✅ implemented via `GatewayClient` in `webchat/server.ts`. - Single WS to Gateway; seed UI from snapshot; forward `presence/tick/agent` to browser. ✅ implemented via the WebChat gateway client in `webchat/server.ts`.
- Fail fast if handshake fails; no fallback transports. ✅ (webchat returns gateway unavailable) - Fail fast if handshake fails; no fallback transports. ✅ (webchat returns gateway unavailable)
## Phase 6 — Send/agent path hardening ## Phase 6 — Send/agent path hardening
@@ -148,7 +148,7 @@ Goal: replace legacy gateway/stdin/TCP control with a single WebSocket Gateway,
- Mac app smoke: presence/health render from snapshot; reconnect on tick loss. (Manual: open Instances tab, verify snapshot after connect, induce seq gap by toggling wifi, ensure UI refreshes.) - Mac app smoke: presence/health render from snapshot; reconnect on tick loss. (Manual: open Instances tab, verify snapshot after connect, induce seq gap by toggling wifi, ensure UI refreshes.)
- WebChat smoke: snapshot seed + event updates; tunnel scenario. ✅ Offline snapshot harness in `src/webchat/server.test.ts` (mock gateway) now passes; live tunnel still recommended for manual. - WebChat smoke: snapshot seed + event updates; tunnel scenario. ✅ Offline snapshot harness in `src/webchat/server.test.ts` (mock gateway) now passes; live tunnel still recommended for manual.
- Idempotency tests: retry send/agent with same key after forced disconnect; expect deduped result. ✅ send + agent dedupe + reconnect retry covered in gateway tests. - Idempotency tests: retry send/agent with same key after forced disconnect; expect deduped result. ✅ send + agent dedupe + reconnect retry covered in gateway tests.
- Seq-gap handling: ✅ clients now detect seq gaps (GatewayClient + mac GatewayChannel) and refresh health/presence (webchat) or trigger UI refresh (mac). Load-test still optional. - Seq-gap handling: ✅ clients now detect seq gaps (WebChat gateway client + mac `GatewayConnection/GatewayChannel`) and refresh health/presence (webchat) or trigger UI refresh (mac). Load-test still optional.
## Phase 10 — Rollout ## Phase 10 — Rollout
- Version bump; release notes: breaking change to control plane (WS only). - Version bump; release notes: breaking change to control plane (WS only).