refactor: migrate iOS gateway to unified ws
This commit is contained in:
@@ -1,5 +1,6 @@
|
||||
import AVFAudio
|
||||
import ClawdbotKit
|
||||
import ClawdbotProtocol
|
||||
import Foundation
|
||||
import Observation
|
||||
import OSLog
|
||||
@@ -42,15 +43,15 @@ final class TalkModeManager: NSObject {
|
||||
var pcmPlayer: PCMStreamingAudioPlaying = PCMStreamingAudioPlayer.shared
|
||||
var mp3Player: StreamingAudioPlaying = StreamingAudioPlayer.shared
|
||||
|
||||
private var bridge: BridgeSession?
|
||||
private var gateway: GatewayNodeSession?
|
||||
private let silenceWindow: TimeInterval = 0.7
|
||||
|
||||
private var chatSubscribedSessionKeys = Set<String>()
|
||||
|
||||
private let logger = Logger(subsystem: "com.clawdbot", category: "TalkMode")
|
||||
|
||||
func attachBridge(_ bridge: BridgeSession) {
|
||||
self.bridge = bridge
|
||||
func attachGateway(_ gateway: GatewayNodeSession) {
|
||||
self.gateway = gateway
|
||||
}
|
||||
|
||||
func updateMainSessionKey(_ sessionKey: String?) {
|
||||
@@ -232,9 +233,9 @@ final class TalkModeManager: NSObject {
|
||||
|
||||
await self.reloadConfig()
|
||||
let prompt = self.buildPrompt(transcript: transcript)
|
||||
guard let bridge else {
|
||||
self.statusText = "Bridge not connected"
|
||||
self.logger.warning("finalize: bridge not connected")
|
||||
guard let gateway else {
|
||||
self.statusText = "Gateway not connected"
|
||||
self.logger.warning("finalize: gateway not connected")
|
||||
await self.start()
|
||||
return
|
||||
}
|
||||
@@ -245,9 +246,9 @@ final class TalkModeManager: NSObject {
|
||||
await self.subscribeChatIfNeeded(sessionKey: sessionKey)
|
||||
self.logger.info(
|
||||
"chat.send start sessionKey=\(sessionKey, privacy: .public) chars=\(prompt.count, privacy: .public)")
|
||||
let runId = try await self.sendChat(prompt, bridge: bridge)
|
||||
let runId = try await self.sendChat(prompt, gateway: gateway)
|
||||
self.logger.info("chat.send ok runId=\(runId, privacy: .public)")
|
||||
let completion = await self.waitForChatCompletion(runId: runId, bridge: bridge, timeoutSeconds: 120)
|
||||
let completion = await self.waitForChatCompletion(runId: runId, gateway: gateway, timeoutSeconds: 120)
|
||||
if completion == .timeout {
|
||||
self.logger.warning(
|
||||
"chat completion timeout runId=\(runId, privacy: .public); attempting history fallback")
|
||||
@@ -264,7 +265,7 @@ final class TalkModeManager: NSObject {
|
||||
}
|
||||
|
||||
guard let assistantText = try await self.waitForAssistantText(
|
||||
bridge: bridge,
|
||||
gateway: gateway,
|
||||
since: startedAt,
|
||||
timeoutSeconds: completion == .final ? 12 : 25)
|
||||
else {
|
||||
@@ -286,31 +287,22 @@ final class TalkModeManager: NSObject {
|
||||
private func subscribeChatIfNeeded(sessionKey: String) async {
|
||||
let key = sessionKey.trimmingCharacters(in: .whitespacesAndNewlines)
|
||||
guard !key.isEmpty else { return }
|
||||
guard let bridge else { return }
|
||||
guard let gateway else { return }
|
||||
guard !self.chatSubscribedSessionKeys.contains(key) else { return }
|
||||
|
||||
do {
|
||||
let payload = "{\"sessionKey\":\"\(key)\"}"
|
||||
try await bridge.sendEvent(event: "chat.subscribe", payloadJSON: payload)
|
||||
self.chatSubscribedSessionKeys.insert(key)
|
||||
self.logger.info("chat.subscribe ok sessionKey=\(key, privacy: .public)")
|
||||
} catch {
|
||||
let err = error.localizedDescription
|
||||
self.logger.warning("chat.subscribe failed key=\(key, privacy: .public) err=\(err, privacy: .public)")
|
||||
}
|
||||
let payload = "{\"sessionKey\":\"\(key)\"}"
|
||||
await gateway.sendEvent(event: "chat.subscribe", payloadJSON: payload)
|
||||
self.chatSubscribedSessionKeys.insert(key)
|
||||
self.logger.info("chat.subscribe ok sessionKey=\(key, privacy: .public)")
|
||||
}
|
||||
|
||||
private func unsubscribeAllChats() async {
|
||||
guard let bridge else { return }
|
||||
guard let gateway else { return }
|
||||
let keys = self.chatSubscribedSessionKeys
|
||||
self.chatSubscribedSessionKeys.removeAll()
|
||||
for key in keys {
|
||||
do {
|
||||
let payload = "{\"sessionKey\":\"\(key)\"}"
|
||||
try await bridge.sendEvent(event: "chat.unsubscribe", payloadJSON: payload)
|
||||
} catch {
|
||||
// ignore
|
||||
}
|
||||
let payload = "{\"sessionKey\":\"\(key)\"}"
|
||||
await gateway.sendEvent(event: "chat.unsubscribe", payloadJSON: payload)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -336,7 +328,7 @@ final class TalkModeManager: NSObject {
|
||||
}
|
||||
}
|
||||
|
||||
private func sendChat(_ message: String, bridge: BridgeSession) async throws -> String {
|
||||
private func sendChat(_ message: String, gateway: GatewayNodeSession) async throws -> String {
|
||||
struct SendResponse: Decodable { let runId: String }
|
||||
let payload: [String: Any] = [
|
||||
"sessionKey": self.mainSessionKey,
|
||||
@@ -352,26 +344,27 @@ final class TalkModeManager: NSObject {
|
||||
code: 1,
|
||||
userInfo: [NSLocalizedDescriptionKey: "Failed to encode chat payload"])
|
||||
}
|
||||
let res = try await bridge.request(method: "chat.send", paramsJSON: json, timeoutSeconds: 30)
|
||||
let res = try await gateway.request(method: "chat.send", paramsJSON: json, timeoutSeconds: 30)
|
||||
let decoded = try JSONDecoder().decode(SendResponse.self, from: res)
|
||||
return decoded.runId
|
||||
}
|
||||
|
||||
private func waitForChatCompletion(
|
||||
runId: String,
|
||||
bridge: BridgeSession,
|
||||
gateway: GatewayNodeSession,
|
||||
timeoutSeconds: Int = 120) async -> ChatCompletionState
|
||||
{
|
||||
let stream = await bridge.subscribeServerEvents(bufferingNewest: 200)
|
||||
let stream = await gateway.subscribeServerEvents(bufferingNewest: 200)
|
||||
return await withTaskGroup(of: ChatCompletionState.self) { group in
|
||||
group.addTask { [runId] in
|
||||
for await evt in stream {
|
||||
if Task.isCancelled { return .timeout }
|
||||
guard evt.event == "chat", let payload = evt.payloadJSON else { continue }
|
||||
guard let data = payload.data(using: .utf8) else { continue }
|
||||
guard let json = try? JSONSerialization.jsonObject(with: data) as? [String: Any] else { continue }
|
||||
if (json["runId"] as? String) != runId { continue }
|
||||
if let state = json["state"] as? String {
|
||||
guard evt.event == "chat", let payload = evt.payload else { continue }
|
||||
guard let chatEvent = try? GatewayPayloadDecoding.decode(payload, as: ChatEvent.self) else {
|
||||
continue
|
||||
}
|
||||
guard chatEvent.runid == runId else { continue }
|
||||
if let state = chatEvent.state.value as? String {
|
||||
switch state {
|
||||
case "final": return .final
|
||||
case "aborted": return .aborted
|
||||
@@ -393,13 +386,13 @@ final class TalkModeManager: NSObject {
|
||||
}
|
||||
|
||||
private func waitForAssistantText(
|
||||
bridge: BridgeSession,
|
||||
gateway: GatewayNodeSession,
|
||||
since: Double,
|
||||
timeoutSeconds: Int) async throws -> String?
|
||||
{
|
||||
let deadline = Date().addingTimeInterval(TimeInterval(timeoutSeconds))
|
||||
while Date() < deadline {
|
||||
if let text = try await self.fetchLatestAssistantText(bridge: bridge, since: since) {
|
||||
if let text = try await self.fetchLatestAssistantText(gateway: gateway, since: since) {
|
||||
return text
|
||||
}
|
||||
try? await Task.sleep(nanoseconds: 300_000_000)
|
||||
@@ -407,8 +400,8 @@ final class TalkModeManager: NSObject {
|
||||
return nil
|
||||
}
|
||||
|
||||
private func fetchLatestAssistantText(bridge: BridgeSession, since: Double? = nil) async throws -> String? {
|
||||
let res = try await bridge.request(
|
||||
private func fetchLatestAssistantText(gateway: GatewayNodeSession, since: Double? = nil) async throws -> String? {
|
||||
let res = try await gateway.request(
|
||||
method: "chat.history",
|
||||
paramsJSON: "{\"sessionKey\":\"\(self.mainSessionKey)\"}",
|
||||
timeoutSeconds: 15)
|
||||
@@ -649,9 +642,9 @@ final class TalkModeManager: NSObject {
|
||||
}
|
||||
|
||||
private func reloadConfig() async {
|
||||
guard let bridge else { return }
|
||||
guard let gateway else { return }
|
||||
do {
|
||||
let res = try await bridge.request(method: "config.get", paramsJSON: "{}", timeoutSeconds: 8)
|
||||
let res = try await gateway.request(method: "config.get", paramsJSON: "{}", timeoutSeconds: 8)
|
||||
guard let json = try JSONSerialization.jsonObject(with: res) as? [String: Any] else { return }
|
||||
guard let config = json["config"] as? [String: Any] else { return }
|
||||
let talk = config["talk"] as? [String: Any]
|
||||
|
||||
Reference in New Issue
Block a user