fix(talk): harden playback, interrupts, and timeouts
This commit is contained in:
@@ -106,8 +106,13 @@ final class TalkAudioPlayer: NSObject, @preconcurrency AVAudioPlayerDelegate {
|
||||
}
|
||||
|
||||
private func stopInternal() {
|
||||
self.playback?.cancelWatchdog()
|
||||
self.playback = nil
|
||||
if let playback = self.playback {
|
||||
let interruptedAt = self.player?.currentTime
|
||||
self.finish(
|
||||
playback: playback,
|
||||
result: TalkPlaybackResult(finished: false, interruptedAt: interruptedAt))
|
||||
return
|
||||
}
|
||||
self.player?.stop()
|
||||
self.player = nil
|
||||
}
|
||||
|
||||
@@ -11,16 +11,37 @@ actor TalkModeRuntime {
|
||||
private let logger = Logger(subsystem: "com.steipete.clawdis", category: "talk.runtime")
|
||||
private let ttsLogger = Logger(subsystem: "com.steipete.clawdis", category: "talk.tts")
|
||||
|
||||
private final class RMSMeter: @unchecked Sendable {
|
||||
private let lock = NSLock()
|
||||
private var latestRMS: Double = 0
|
||||
|
||||
func set(_ rms: Double) {
|
||||
self.lock.lock()
|
||||
self.latestRMS = rms
|
||||
self.lock.unlock()
|
||||
}
|
||||
|
||||
func get() -> Double {
|
||||
self.lock.lock()
|
||||
let value = self.latestRMS
|
||||
self.lock.unlock()
|
||||
return value
|
||||
}
|
||||
}
|
||||
|
||||
private var recognizer: SFSpeechRecognizer?
|
||||
private var audioEngine: AVAudioEngine?
|
||||
private var recognitionRequest: SFSpeechAudioBufferRecognitionRequest?
|
||||
private var recognitionTask: SFSpeechRecognitionTask?
|
||||
private var recognitionGeneration: Int = 0
|
||||
private var rmsTask: Task<Void, Never>?
|
||||
private let rmsMeter = RMSMeter()
|
||||
|
||||
private var captureTask: Task<Void, Never>?
|
||||
private var silenceTask: Task<Void, Never>?
|
||||
private var phase: TalkModePhase = .idle
|
||||
private var isEnabled = false
|
||||
private var lifecycleGeneration: Int = 0
|
||||
|
||||
private var lastHeard: Date?
|
||||
private var noiseFloorRMS: Double = 1e-4
|
||||
@@ -49,6 +70,7 @@ actor TalkModeRuntime {
|
||||
func setEnabled(_ enabled: Bool) async {
|
||||
guard enabled != self.isEnabled else { return }
|
||||
self.isEnabled = enabled
|
||||
self.lifecycleGeneration &+= 1
|
||||
if enabled {
|
||||
await self.start()
|
||||
} else {
|
||||
@@ -56,14 +78,21 @@ actor TalkModeRuntime {
|
||||
}
|
||||
}
|
||||
|
||||
private func isCurrent(_ generation: Int) -> Bool {
|
||||
generation == self.lifecycleGeneration && self.isEnabled
|
||||
}
|
||||
|
||||
private func start() async {
|
||||
let gen = self.lifecycleGeneration
|
||||
guard voiceWakeSupported else { return }
|
||||
guard PermissionManager.voiceWakePermissionsGranted() else {
|
||||
self.logger.debug("talk runtime not starting: permissions missing")
|
||||
return
|
||||
}
|
||||
await self.reloadConfig()
|
||||
guard self.isCurrent(gen) else { return }
|
||||
await self.startRecognition()
|
||||
guard self.isCurrent(gen) else { return }
|
||||
self.phase = .listening
|
||||
await MainActor.run { TalkModeController.shared.updatePhase(.listening) }
|
||||
self.startSilenceMonitor()
|
||||
@@ -74,12 +103,15 @@ actor TalkModeRuntime {
|
||||
self.captureTask = nil
|
||||
self.silenceTask?.cancel()
|
||||
self.silenceTask = nil
|
||||
|
||||
// Stop audio before changing phase (stopSpeaking is gated on .speaking).
|
||||
await self.stopSpeaking(reason: .manual)
|
||||
|
||||
self.lastTranscript = ""
|
||||
self.lastHeard = nil
|
||||
self.lastSpeechEnergyAt = nil
|
||||
self.phase = .idle
|
||||
await self.stopRecognition()
|
||||
await self.stopSpeaking(reason: .manual)
|
||||
await MainActor.run {
|
||||
TalkModeController.shared.updateLevel(0)
|
||||
TalkModeController.shared.updatePhase(.idle)
|
||||
@@ -120,12 +152,11 @@ actor TalkModeRuntime {
|
||||
let input = audioEngine.inputNode
|
||||
let format = input.outputFormat(forBus: 0)
|
||||
input.removeTap(onBus: 0)
|
||||
input.installTap(onBus: 0, bufferSize: 2048, format: format) { [weak self, weak request] buffer, _ in
|
||||
let meter = self.rmsMeter
|
||||
input.installTap(onBus: 0, bufferSize: 2048, format: format) { [weak request, meter] buffer, _ in
|
||||
request?.append(buffer)
|
||||
if let rms = Self.rmsLevel(buffer: buffer) {
|
||||
Task.detached { [weak self] in
|
||||
await self?.noteAudioLevel(rms: rms)
|
||||
}
|
||||
meter.set(rms)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -137,6 +168,8 @@ actor TalkModeRuntime {
|
||||
return
|
||||
}
|
||||
|
||||
self.startRMSTicker(meter: meter)
|
||||
|
||||
self.recognitionTask = recognizer.recognitionTask(with: request) { [weak self, generation] result, error in
|
||||
guard let self else { return }
|
||||
let segments = result?.bestTranscription.segments ?? []
|
||||
@@ -161,6 +194,19 @@ actor TalkModeRuntime {
|
||||
self.audioEngine?.stop()
|
||||
self.audioEngine = nil
|
||||
self.recognizer = nil
|
||||
self.rmsTask?.cancel()
|
||||
self.rmsTask = nil
|
||||
}
|
||||
|
||||
private func startRMSTicker(meter: RMSMeter) {
|
||||
self.rmsTask?.cancel()
|
||||
self.rmsTask = Task { [weak self, meter] in
|
||||
while let self {
|
||||
try? await Task.sleep(nanoseconds: 50_000_000)
|
||||
if Task.isCancelled { return }
|
||||
await self.noteAudioLevel(rms: meter.get())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private func handleRecognition(_ update: RecognitionUpdate) async {
|
||||
@@ -241,43 +287,42 @@ actor TalkModeRuntime {
|
||||
// MARK: - Gateway + TTS
|
||||
|
||||
private func sendAndSpeak(_ transcript: String) async {
|
||||
let gen = self.lifecycleGeneration
|
||||
await self.reloadConfig()
|
||||
guard self.isCurrent(gen) else { return }
|
||||
let prompt = self.buildPrompt(transcript: transcript)
|
||||
let sessionKey = await GatewayConnection.shared.mainSessionKey()
|
||||
let runId = UUID().uuidString
|
||||
let startedAt = Date().timeIntervalSince1970
|
||||
self.logger.info("talk send start runId=\(runId, privacy: .public) chars=\(prompt.count, privacy: .public)")
|
||||
self.logger.info(
|
||||
"talk send start runId=\(runId, privacy: .public) session=\(sessionKey, privacy: .public) chars=\(prompt.count, privacy: .public)")
|
||||
|
||||
do {
|
||||
let response = try await GatewayConnection.shared.chatSend(
|
||||
sessionKey: "main",
|
||||
sessionKey: sessionKey,
|
||||
message: prompt,
|
||||
thinking: "low",
|
||||
idempotencyKey: runId,
|
||||
attachments: [])
|
||||
self.logger.info("talk chat.send ok runId=\(response.runId, privacy: .public)")
|
||||
let completion = await self.waitForChatCompletion(
|
||||
runId: response.runId,
|
||||
timeoutSeconds: 120)
|
||||
self.logger.info("talk chat completion runId=\(response.runId, privacy: .public) state=\(String(describing: completion), privacy: .public)")
|
||||
guard completion == .final else {
|
||||
await self.startListening()
|
||||
await self.startRecognition()
|
||||
return
|
||||
}
|
||||
guard self.isCurrent(gen) else { return }
|
||||
self.logger.info(
|
||||
"talk chat.send ok runId=\(response.runId, privacy: .public) session=\(sessionKey, privacy: .public)")
|
||||
|
||||
guard let assistantText = await self.waitForAssistantText(
|
||||
sessionKey: "main",
|
||||
sessionKey: sessionKey,
|
||||
since: startedAt,
|
||||
timeoutSeconds: 12)
|
||||
timeoutSeconds: 45)
|
||||
else {
|
||||
self.logger.warning("talk assistant text missing after completion")
|
||||
self.logger.warning("talk assistant text missing after timeout")
|
||||
await self.startListening()
|
||||
await self.startRecognition()
|
||||
return
|
||||
}
|
||||
guard self.isCurrent(gen) else { return }
|
||||
|
||||
self.logger.info("talk assistant text len=\(assistantText.count, privacy: .public)")
|
||||
await self.playAssistant(text: assistantText)
|
||||
guard self.isCurrent(gen) else { return }
|
||||
await self.startListening()
|
||||
await self.startRecognition()
|
||||
return
|
||||
@@ -306,54 +351,6 @@ actor TalkModeRuntime {
|
||||
return lines.joined(separator: "\n")
|
||||
}
|
||||
|
||||
private enum ChatCompletionState: CustomStringConvertible {
|
||||
case final
|
||||
case aborted
|
||||
case error
|
||||
case timeout
|
||||
|
||||
var description: String {
|
||||
switch self {
|
||||
case .final: return "final"
|
||||
case .aborted: return "aborted"
|
||||
case .error: return "error"
|
||||
case .timeout: return "timeout"
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private func waitForChatCompletion(runId: String, timeoutSeconds: Int) async -> ChatCompletionState {
|
||||
let stream = await GatewayConnection.shared.subscribe()
|
||||
return await withTaskGroup(of: ChatCompletionState.self) { group in
|
||||
group.addTask { [runId] in
|
||||
for await push in stream {
|
||||
if case let .event(evt) = push, evt.event == "chat", let payload = evt.payload {
|
||||
if let chat = try? JSONDecoder().decode(
|
||||
ClawdisChatEventPayload.self,
|
||||
from: JSONEncoder().encode(payload))
|
||||
{
|
||||
guard chat.runId == runId else { continue }
|
||||
switch chat.state {
|
||||
case .some("final"): return .final
|
||||
case .some("aborted"): return .aborted
|
||||
case .some("error"): return .error
|
||||
default: break
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return .timeout
|
||||
}
|
||||
group.addTask {
|
||||
try? await Task.sleep(nanoseconds: UInt64(timeoutSeconds) * 1_000_000_000)
|
||||
return .timeout
|
||||
}
|
||||
let result = await group.next() ?? .timeout
|
||||
group.cancelAll()
|
||||
return result
|
||||
}
|
||||
}
|
||||
|
||||
private func waitForAssistantText(
|
||||
sessionKey: String,
|
||||
since: Double,
|
||||
@@ -394,10 +391,12 @@ actor TalkModeRuntime {
|
||||
}
|
||||
|
||||
private func playAssistant(text: String) async {
|
||||
let gen = self.lifecycleGeneration
|
||||
let parse = TalkDirectiveParser.parse(text)
|
||||
let directive = parse.directive
|
||||
let cleaned = parse.stripped.trimmingCharacters(in: .whitespacesAndNewlines)
|
||||
guard !cleaned.isEmpty else { return }
|
||||
guard self.isCurrent(gen) else { return }
|
||||
|
||||
if !parse.unknownKeys.isEmpty {
|
||||
self.logger.warning("talk directive ignored keys: \(parse.unknownKeys.joined(separator: ","), privacy: .public)")
|
||||
@@ -435,9 +434,11 @@ actor TalkModeRuntime {
|
||||
self.logger.error("talk missing voiceId; set talk.voiceId or ELEVENLABS_VOICE_ID")
|
||||
return
|
||||
}
|
||||
guard self.isCurrent(gen) else { return }
|
||||
self.ttsLogger.info("talk TTS request voiceId=\(voiceId, privacy: .public) chars=\(cleaned.count, privacy: .public)")
|
||||
|
||||
await self.startRecognition()
|
||||
guard self.isCurrent(gen) else { return }
|
||||
await MainActor.run { TalkModeController.shared.updatePhase(.speaking) }
|
||||
self.phase = .speaking
|
||||
self.lastSpokenText = cleaned
|
||||
@@ -450,7 +451,7 @@ actor TalkModeRuntime {
|
||||
let request = ElevenLabsRequest(
|
||||
text: cleaned,
|
||||
modelId: directive?.modelId ?? self.currentModelId ?? self.defaultModelId,
|
||||
outputFormat: directive?.outputFormat ?? self.defaultOutputFormat,
|
||||
outputFormat: Self.validatedOutputFormat(directive?.outputFormat ?? self.defaultOutputFormat, logger: self.logger),
|
||||
speed: resolvedSpeed,
|
||||
stability: Self.validatedUnit(directive?.stability, name: "stability", logger: self.logger),
|
||||
similarity: Self.validatedUnit(directive?.similarity, name: "similarity", logger: self.logger),
|
||||
@@ -479,6 +480,7 @@ actor TalkModeRuntime {
|
||||
group.cancelAll()
|
||||
return data
|
||||
}
|
||||
guard self.isCurrent(gen) else { return }
|
||||
self.ttsLogger.info("talk TTS response bytes=\(audio.count, privacy: .public)")
|
||||
let result = await TalkAudioPlayer.shared.play(data: audio)
|
||||
self.ttsLogger.info("talk audio result finished=\(result.finished, privacy: .public) interruptedAt=\(String(describing: result.interruptedAt), privacy: .public)")
|
||||
@@ -491,8 +493,10 @@ actor TalkModeRuntime {
|
||||
self.logger.error("talk TTS failed: \(error.localizedDescription, privacy: .public)")
|
||||
}
|
||||
|
||||
self.phase = .thinking
|
||||
await MainActor.run { TalkModeController.shared.updatePhase(.thinking) }
|
||||
if self.phase == .speaking {
|
||||
self.phase = .thinking
|
||||
await MainActor.run { TalkModeController.shared.updatePhase(.thinking) }
|
||||
}
|
||||
}
|
||||
|
||||
private func resolveVoiceId(preferred: String?, apiKey: String) async -> String? {
|
||||
@@ -523,11 +527,18 @@ actor TalkModeRuntime {
|
||||
}
|
||||
|
||||
func stopSpeaking(reason: TalkStopReason) async {
|
||||
guard self.phase == .speaking else { return }
|
||||
let interruptedAt = await MainActor.run { TalkAudioPlayer.shared.stop() }
|
||||
guard self.phase == .speaking else { return }
|
||||
if reason == .speech, let interruptedAt {
|
||||
self.lastInterruptedAtSeconds = interruptedAt
|
||||
}
|
||||
if reason == .manual {
|
||||
return
|
||||
}
|
||||
if reason == .speech || reason == .userTap {
|
||||
await self.startListening()
|
||||
return
|
||||
}
|
||||
self.phase = .thinking
|
||||
await MainActor.run { TalkModeController.shared.updatePhase(.thinking) }
|
||||
}
|
||||
@@ -718,6 +729,16 @@ actor TalkModeRuntime {
|
||||
}
|
||||
return normalized
|
||||
}
|
||||
|
||||
private static func validatedOutputFormat(_ value: String?, logger: Logger) -> String? {
|
||||
let trimmed = value?.trimmingCharacters(in: .whitespacesAndNewlines) ?? ""
|
||||
guard !trimmed.isEmpty else { return nil }
|
||||
guard trimmed.hasPrefix("mp3_") else {
|
||||
logger.warning("talk output_format unsupported for local playback: \(trimmed, privacy: .public)")
|
||||
return nil
|
||||
}
|
||||
return trimmed
|
||||
}
|
||||
}
|
||||
|
||||
private struct ElevenLabsRequest {
|
||||
|
||||
Reference in New Issue
Block a user