From 27adfb76fa61c3ca1bb52677cb5d004c4a598b21 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Tue, 30 Dec 2025 12:17:40 +0100 Subject: [PATCH] fix: stream elevenlabs tts playback --- CHANGELOG.md | 3 + .../node/voice/StreamingMediaDataSource.kt | 98 ++++ .../clawdis/node/voice/TalkModeManager.kt | 172 ++++++- apps/ios/Sources/Voice/TalkModeManager.swift | 130 ++++-- .../Sources/Clawdis/TalkModeRuntime.swift | 47 +- .../Sources/ClawdisKit/ElevenLabsTTS.swift | 102 ++++- .../ClawdisKit/PCMStreamingAudioPlayer.swift | 144 ++++++ .../ClawdisKit/StreamingAudioPlayer.swift | 429 ++++++++++++++++++ .../ClawdisKit/TalkTTSValidation.swift | 26 +- .../TalkTTSValidationTests.swift | 21 + docs/talk.md | 10 +- 11 files changed, 1091 insertions(+), 91 deletions(-) create mode 100644 apps/android/app/src/main/java/com/steipete/clawdis/node/voice/StreamingMediaDataSource.kt create mode 100644 apps/shared/ClawdisKit/Sources/ClawdisKit/PCMStreamingAudioPlayer.swift create mode 100644 apps/shared/ClawdisKit/Sources/ClawdisKit/StreamingAudioPlayer.swift diff --git a/CHANGELOG.md b/CHANGELOG.md index bf0fcb59b..d5d05b5ee 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,6 +17,9 @@ - macOS Talk Mode: throttle audio-level updates (avoid per-buffer task creation) to reduce CPU/task churn. - macOS Talk Mode: increase overlay window size so wave rings don’t clip; close button is hover-only and closer to the orb. - Talk Mode: fall back to system TTS when ElevenLabs is unavailable, returns non-audio, or playback fails (macOS/iOS/Android). +- Talk Mode: stream PCM on macOS/iOS for lower latency (incremental playback); Android continues MP3 streaming. +- Talk Mode: validate ElevenLabs v3 stability and latency tier directives before sending requests. +- iOS/Android Talk Mode: auto-select the first ElevenLabs voice when none is configured. - ElevenLabs: add retry/backoff for 429/5xx and include content-type in errors for debugging. - Talk Mode: align to the gateway’s main session key and fall back to history polling when chat events drop (prevents stuck “thinking” / missing messages). - Talk Mode: treat history timestamps as seconds or milliseconds to avoid stale assistant picks (macOS/iOS/Android). diff --git a/apps/android/app/src/main/java/com/steipete/clawdis/node/voice/StreamingMediaDataSource.kt b/apps/android/app/src/main/java/com/steipete/clawdis/node/voice/StreamingMediaDataSource.kt new file mode 100644 index 000000000..0be4a016c --- /dev/null +++ b/apps/android/app/src/main/java/com/steipete/clawdis/node/voice/StreamingMediaDataSource.kt @@ -0,0 +1,98 @@ +package com.steipete.clawdis.node.voice + +import android.media.MediaDataSource +import kotlin.math.min + +internal class StreamingMediaDataSource : MediaDataSource() { + private data class Chunk(val start: Long, val data: ByteArray) + + private val lock = Object() + private val chunks = ArrayList() + private var totalSize: Long = 0 + private var closed = false + private var finished = false + private var lastReadIndex = 0 + + fun append(data: ByteArray) { + if (data.isEmpty()) return + synchronized(lock) { + if (closed || finished) return + val chunk = Chunk(totalSize, data) + chunks.add(chunk) + totalSize += data.size.toLong() + lock.notifyAll() + } + } + + fun finish() { + synchronized(lock) { + if (closed) return + finished = true + lock.notifyAll() + } + } + + fun fail() { + synchronized(lock) { + closed = true + lock.notifyAll() + } + } + + override fun readAt(position: Long, buffer: ByteArray, offset: Int, size: Int): Int { + if (position < 0) return -1 + synchronized(lock) { + while (!closed && !finished && position >= totalSize) { + lock.wait() + } + if (closed) return -1 + if (position >= totalSize && finished) return -1 + + val available = (totalSize - position).toInt() + val toRead = min(size, available) + var remaining = toRead + var destOffset = offset + var pos = position + + var index = findChunkIndex(pos) + while (remaining > 0 && index < chunks.size) { + val chunk = chunks[index] + val inChunkOffset = (pos - chunk.start).toInt() + if (inChunkOffset >= chunk.data.size) { + index++ + continue + } + val copyLen = min(remaining, chunk.data.size - inChunkOffset) + System.arraycopy(chunk.data, inChunkOffset, buffer, destOffset, copyLen) + remaining -= copyLen + destOffset += copyLen + pos += copyLen + if (inChunkOffset + copyLen >= chunk.data.size) { + index++ + } + } + + return toRead - remaining + } + } + + override fun getSize(): Long = -1 + + override fun close() { + synchronized(lock) { + closed = true + lock.notifyAll() + } + } + + private fun findChunkIndex(position: Long): Int { + var index = lastReadIndex + while (index < chunks.size) { + val chunk = chunks[index] + if (position < chunk.start + chunk.data.size) break + index++ + } + lastReadIndex = index + return index + } +} diff --git a/apps/android/app/src/main/java/com/steipete/clawdis/node/voice/TalkModeManager.kt b/apps/android/app/src/main/java/com/steipete/clawdis/node/voice/TalkModeManager.kt index e72b07651..42536d359 100644 --- a/apps/android/app/src/main/java/com/steipete/clawdis/node/voice/TalkModeManager.kt +++ b/apps/android/app/src/main/java/com/steipete/clawdis/node/voice/TalkModeManager.kt @@ -18,7 +18,6 @@ import android.speech.tts.UtteranceProgressListener import android.util.Log import androidx.core.content.ContextCompat import com.steipete.clawdis.node.bridge.BridgeSession -import java.io.File import java.net.HttpURLConnection import java.net.URL import java.util.UUID @@ -44,6 +43,7 @@ class TalkModeManager( ) { companion object { private const val tag = "TalkMode" + private const val defaultModelIdFallback = "eleven_v3" } private val mainHandler = Handler(Looper.getMainLooper()) @@ -81,6 +81,7 @@ class TalkModeManager( private var defaultVoiceId: String? = null private var currentVoiceId: String? = null + private var fallbackVoiceId: String? = null private var defaultModelId: String? = null private var currentModelId: String? = null private var defaultOutputFormat: String? = null @@ -97,7 +98,7 @@ class TalkModeManager( private var chatSubscribedSessionKey: String? = null private var player: MediaPlayer? = null - private var currentAudioFile: File? = null + private var streamingSource: StreamingMediaDataSource? = null private var systemTts: TextToSpeech? = null private var systemTtsPending: CompletableDeferred? = null private var systemTtsPendingId: String? = null @@ -464,7 +465,13 @@ class TalkModeManager( val apiKey = apiKey?.trim()?.takeIf { it.isNotEmpty() } ?: System.getenv("ELEVENLABS_API_KEY")?.trim() - val voiceId = resolvedVoice ?: currentVoiceId ?: defaultVoiceId + val preferredVoice = resolvedVoice ?: currentVoiceId ?: defaultVoiceId + val voiceId = + if (!apiKey.isNullOrEmpty()) { + resolveVoiceId(preferredVoice, apiKey) + } else { + null + } _statusText.value = "Speaking…" _isSpeaking.value = true @@ -486,24 +493,25 @@ class TalkModeManager( } else { _usingFallbackTts.value = false val ttsStarted = SystemClock.elapsedRealtime() + val modelId = directive?.modelId ?: currentModelId ?: defaultModelId val request = ElevenLabsRequest( text = cleaned, - modelId = directive?.modelId ?: currentModelId ?: defaultModelId, + modelId = modelId, outputFormat = TalkModeRuntime.validatedOutputFormat(directive?.outputFormat ?: defaultOutputFormat), speed = TalkModeRuntime.resolveSpeed(directive?.speed, directive?.rateWpm), - stability = TalkModeRuntime.validatedUnit(directive?.stability), + stability = TalkModeRuntime.validatedStability(directive?.stability, modelId), similarity = TalkModeRuntime.validatedUnit(directive?.similarity), style = TalkModeRuntime.validatedUnit(directive?.style), speakerBoost = directive?.speakerBoost, seed = TalkModeRuntime.validatedSeed(directive?.seed), normalize = TalkModeRuntime.validatedNormalize(directive?.normalize), language = TalkModeRuntime.validatedLanguage(directive?.language), + latencyTier = TalkModeRuntime.validatedLatencyTier(directive?.latencyTier), ) - val audio = synthesize(voiceId = voiceId!!, apiKey = apiKey!!, request = request) - Log.d(tag, "elevenlabs ok bytes=${audio.size} durMs=${SystemClock.elapsedRealtime() - ttsStarted}") - playAudio(audio) + streamAndPlay(voiceId = voiceId!!, apiKey = apiKey!!, request = request) + Log.d(tag, "elevenlabs stream ok durMs=${SystemClock.elapsedRealtime() - ttsStarted}") } } catch (err: Throwable) { Log.w(tag, "speak failed: ${err.message ?: err::class.simpleName}; falling back to system voice") @@ -520,22 +528,28 @@ class TalkModeManager( _isSpeaking.value = false } - private suspend fun playAudio(data: ByteArray) { + private suspend fun streamAndPlay(voiceId: String, apiKey: String, request: ElevenLabsRequest) { stopSpeaking(resetInterrupt = false) - val file = File.createTempFile("talk-", ".mp3", context.cacheDir) - file.writeBytes(data) - currentAudioFile = file + + val dataSource = StreamingMediaDataSource() + streamingSource = dataSource val player = MediaPlayer() this.player = player + val prepared = CompletableDeferred() val finished = CompletableDeferred() + player.setAudioAttributes( AudioAttributes.Builder() .setContentType(AudioAttributes.CONTENT_TYPE_SPEECH) .setUsage(AudioAttributes.USAGE_ASSISTANT) .build(), ) + player.setOnPreparedListener { + it.start() + prepared.complete(Unit) + } player.setOnCompletionListener { finished.complete(Unit) } @@ -544,16 +558,30 @@ class TalkModeManager( true } - player.setDataSource(file.absolutePath) + player.setDataSource(dataSource) withContext(Dispatchers.Main) { - player.setOnPreparedListener { it.start() } player.prepareAsync() } + val fetchError = CompletableDeferred() + val fetchJob = + scope.launch(Dispatchers.IO) { + try { + streamTts(voiceId = voiceId, apiKey = apiKey, request = request, sink = dataSource) + fetchError.complete(null) + } catch (err: Throwable) { + dataSource.fail() + fetchError.complete(err) + } + } + Log.d(tag, "play start") try { + prepared.await() finished.await() + fetchError.await()?.let { throw it } } finally { + fetchJob.cancel() cleanupPlayer() } Log.d(tag, "play done") @@ -674,8 +702,8 @@ class TalkModeManager( player?.stop() player?.release() player = null - currentAudioFile?.delete() - currentAudioFile = null + streamingSource?.close() + streamingSource = null } private fun shouldInterrupt(transcript: String): Boolean { @@ -713,13 +741,15 @@ class TalkModeManager( defaultVoiceId = voice ?: envVoice?.takeIf { it.isNotEmpty() } ?: sagVoice?.takeIf { it.isNotEmpty() } voiceAliases = aliases if (!voiceOverrideActive) currentVoiceId = defaultVoiceId - defaultModelId = model + defaultModelId = model ?: defaultModelIdFallback if (!modelOverrideActive) currentModelId = defaultModelId defaultOutputFormat = outputFormat apiKey = key ?: envKey?.takeIf { it.isNotEmpty() } if (interrupt != null) interruptOnSpeech = interrupt } catch (_: Throwable) { defaultVoiceId = envVoice?.takeIf { it.isNotEmpty() } ?: sagVoice?.takeIf { it.isNotEmpty() } + defaultModelId = defaultModelIdFallback + if (!modelOverrideActive) currentModelId = defaultModelId apiKey = envKey?.takeIf { it.isNotEmpty() } voiceAliases = emptyMap() } @@ -730,9 +760,21 @@ class TalkModeManager( return obj["runId"].asStringOrNull() } - private suspend fun synthesize(voiceId: String, apiKey: String, request: ElevenLabsRequest): ByteArray { - return withContext(Dispatchers.IO) { - val url = URL("https://api.elevenlabs.io/v1/text-to-speech/$voiceId") + private suspend fun streamTts( + voiceId: String, + apiKey: String, + request: ElevenLabsRequest, + sink: StreamingMediaDataSource, + ) { + withContext(Dispatchers.IO) { + val baseUrl = "https://api.elevenlabs.io/v1/text-to-speech/$voiceId/stream" + val latencyTier = request.latencyTier + val url = + if (latencyTier != null) { + URL("$baseUrl?optimize_streaming_latency=$latencyTier") + } else { + URL(baseUrl) + } val conn = url.openConnection() as HttpURLConnection conn.requestMethod = "POST" conn.connectTimeout = 30_000 @@ -746,13 +788,21 @@ class TalkModeManager( conn.outputStream.use { it.write(payload.toByteArray()) } val code = conn.responseCode - val stream = if (code >= 400) conn.errorStream else conn.inputStream - val data = stream.readBytes() if (code >= 400) { - val message = String(data) + val message = conn.errorStream?.readBytes()?.toString(Charsets.UTF_8) ?: "" + sink.fail() throw IllegalStateException("ElevenLabs failed: $code $message") } - data + + val buffer = ByteArray(8 * 1024) + conn.inputStream.use { input -> + while (true) { + val read = input.read(buffer) + if (read <= 0) break + sink.append(buffer.copyOf(read)) + } + } + sink.finish() } } @@ -794,6 +844,7 @@ class TalkModeManager( val seed: Long?, val normalize: String?, val language: String?, + val latencyTier: Int?, ) private object TalkModeRuntime { @@ -816,6 +867,15 @@ class TalkModeManager( return value } + fun validatedStability(value: Double?, modelId: String?): Double? { + if (value == null) return null + val normalized = modelId?.trim()?.lowercase() + if (normalized == "eleven_v3") { + return if (value == 0.0 || value == 0.5 || value == 1.0) value else null + } + return validatedUnit(value) + } + fun validatedSeed(value: Long?): Long? { if (value == null) return null if (value < 0 || value > 4294967295L) return null @@ -840,6 +900,12 @@ class TalkModeManager( return if (trimmed.startsWith("mp3_")) trimmed else null } + fun validatedLatencyTier(value: Int?): Int? { + if (value == null) return null + if (value < 0 || value > 4) return null + return value + } + fun isMessageTimestampAfter(timestamp: Double, sinceSeconds: Double): Boolean { val sinceMs = sinceSeconds * 1000 return if (timestamp > 10_000_000_000) { @@ -876,6 +942,62 @@ class TalkModeManager( return if (isLikelyVoiceId(trimmed)) trimmed else null } + private suspend fun resolveVoiceId(preferred: String?, apiKey: String): String? { + val trimmed = preferred?.trim().orEmpty() + if (trimmed.isNotEmpty()) { + val resolved = resolveVoiceAlias(trimmed) + if (resolved != null) return resolved + Log.w(tag, "unknown voice alias $trimmed") + } + fallbackVoiceId?.let { return it } + + return try { + val voices = listVoices(apiKey) + val first = voices.firstOrNull() ?: return null + fallbackVoiceId = first.voiceId + if (defaultVoiceId.isNullOrBlank()) { + defaultVoiceId = first.voiceId + } + if (!voiceOverrideActive) { + currentVoiceId = first.voiceId + } + val name = first.name ?: "unknown" + Log.d(tag, "default voice selected $name (${first.voiceId})") + first.voiceId + } catch (err: Throwable) { + Log.w(tag, "list voices failed: ${err.message ?: err::class.simpleName}") + null + } + } + + private suspend fun listVoices(apiKey: String): List { + return withContext(Dispatchers.IO) { + val url = URL("https://api.elevenlabs.io/v1/voices") + val conn = url.openConnection() as HttpURLConnection + conn.requestMethod = "GET" + conn.connectTimeout = 15_000 + conn.readTimeout = 15_000 + conn.setRequestProperty("xi-api-key", apiKey) + + val code = conn.responseCode + val stream = if (code >= 400) conn.errorStream else conn.inputStream + val data = stream.readBytes() + if (code >= 400) { + val message = data.toString(Charsets.UTF_8) + throw IllegalStateException("ElevenLabs voices failed: $code $message") + } + + val root = json.parseToJsonElement(data.toString(Charsets.UTF_8)).asObjectOrNull() + val voices = (root?.get("voices") as? JsonArray) ?: JsonArray(emptyList()) + voices.mapNotNull { entry -> + val obj = entry.asObjectOrNull() ?: return@mapNotNull null + val voiceId = obj["voice_id"].asStringOrNull() ?: return@mapNotNull null + val name = obj["name"].asStringOrNull() + ElevenLabsVoice(voiceId, name) + } + } + } + private fun isLikelyVoiceId(value: String): Boolean { if (value.length < 10) return false return value.all { it.isLetterOrDigit() || it == '-' || it == '_' } @@ -884,6 +1006,8 @@ class TalkModeManager( private fun normalizeAliasKey(value: String): String = value.trim().lowercase() + private data class ElevenLabsVoice(val voiceId: String, val name: String?) + private val listener = object : RecognitionListener { override fun onReadyForSpeech(params: Bundle?) { diff --git a/apps/ios/Sources/Voice/TalkModeManager.swift b/apps/ios/Sources/Voice/TalkModeManager.swift index a84a7b4a4..e93bce006 100644 --- a/apps/ios/Sources/Voice/TalkModeManager.swift +++ b/apps/ios/Sources/Voice/TalkModeManager.swift @@ -9,6 +9,7 @@ import Speech @Observable final class TalkModeManager: NSObject { private typealias SpeechRequest = SFSpeechAudioBufferRecognitionRequest + private static let defaultModelIdFallback = "eleven_v3" var isEnabled: Bool = false var isListening: Bool = false var isSpeaking: Bool = false @@ -36,11 +37,12 @@ final class TalkModeManager: NSObject { private var voiceAliases: [String: String] = [:] private var interruptOnSpeech: Bool = true private var mainSessionKey: String = "main" + private var fallbackVoiceId: String? + private var lastPlaybackWasPCM: Bool = false private var bridge: BridgeSession? private let silenceWindow: TimeInterval = 0.7 - private var player: AVAudioPlayer? private var chatSubscribedSessionKeys = Set() private let logger = Logger(subsystem: "com.steipete.clawdis", category: "TalkMode") @@ -446,43 +448,43 @@ final class TalkModeManager: NSObject { let started = Date() let language = ElevenLabsTTSClient.validatedLanguage(directive?.language) - let voiceId = resolvedVoice ?? self.currentVoiceId ?? self.defaultVoiceId let resolvedKey = (self.apiKey?.trimmingCharacters(in: .whitespacesAndNewlines).isEmpty == false ? self.apiKey : nil) ?? ProcessInfo.processInfo.environment["ELEVENLABS_API_KEY"] let apiKey = resolvedKey?.trimmingCharacters(in: .whitespacesAndNewlines) + let preferredVoice = resolvedVoice ?? self.currentVoiceId ?? self.defaultVoiceId + let voiceId: String? = if let apiKey, !apiKey.isEmpty { + await self.resolveVoiceId(preferred: preferredVoice, apiKey: apiKey) + } else { + nil + } let canUseElevenLabs = (voiceId?.isEmpty == false) && (apiKey?.isEmpty == false) if canUseElevenLabs, let voiceId, let apiKey { - let desiredOutputFormat = directive?.outputFormat ?? self.defaultOutputFormat + let desiredOutputFormat = directive?.outputFormat ?? self.defaultOutputFormat ?? "pcm_44100" let outputFormat = ElevenLabsTTSClient.validatedOutputFormat(desiredOutputFormat) if outputFormat == nil, let desiredOutputFormat, !desiredOutputFormat.isEmpty { self.logger.warning( "talk output_format unsupported for local playback: \(desiredOutputFormat, privacy: .public)") } + let modelId = directive?.modelId ?? self.currentModelId ?? self.defaultModelId let request = ElevenLabsTTSRequest( text: cleaned, - modelId: directive?.modelId ?? self.currentModelId ?? self.defaultModelId, + modelId: modelId, outputFormat: outputFormat, speed: TalkTTSValidation.resolveSpeed(speed: directive?.speed, rateWPM: directive?.rateWPM), - stability: TalkTTSValidation.validatedUnit(directive?.stability), + stability: TalkTTSValidation.validatedStability(directive?.stability, modelId: modelId), similarity: TalkTTSValidation.validatedUnit(directive?.similarity), style: TalkTTSValidation.validatedUnit(directive?.style), speakerBoost: directive?.speakerBoost, seed: TalkTTSValidation.validatedSeed(directive?.seed), normalize: ElevenLabsTTSClient.validatedNormalize(directive?.normalize), - language: language) + language: language, + latencyTier: TalkTTSValidation.validatedLatencyTier(directive?.latencyTier)) - let synthTimeoutSeconds = max(20.0, min(90.0, Double(cleaned.count) * 0.12)) let client = ElevenLabsTTSClient(apiKey: apiKey) - let audio = try await client.synthesizeWithHardTimeout( - voiceId: voiceId, - request: request, - hardTimeoutSeconds: synthTimeoutSeconds) - self.logger - .info( - "elevenlabs ok bytes=\(audio.count, privacy: .public) dur=\(Date().timeIntervalSince(started), privacy: .public)s") + let stream = client.streamSynthesize(voiceId: voiceId, request: request) if self.interruptOnSpeech { do { @@ -494,7 +496,21 @@ final class TalkModeManager: NSObject { } self.statusText = "Speaking…" - try await self.playAudio(data: audio) + let sampleRate = TalkTTSValidation.pcmSampleRate(from: outputFormat) + let result: StreamingPlaybackResult + if let sampleRate { + self.lastPlaybackWasPCM = true + result = await PCMStreamingAudioPlayer.shared.play(stream: stream, sampleRate: sampleRate) + } else { + self.lastPlaybackWasPCM = false + result = await StreamingAudioPlayer.shared.play(stream: stream) + } + self.logger + .info( + "elevenlabs stream finished=\(result.finished, privacy: .public) dur=\(Date().timeIntervalSince(started), privacy: .public)s") + if !result.finished, let interruptedAt = result.interruptedAt { + self.lastInterruptedAtSeconds = interruptedAt + } } else { self.logger.warning("tts unavailable; falling back to system voice (missing key or voiceId)") if self.interruptOnSpeech { @@ -533,30 +549,17 @@ final class TalkModeManager: NSObject { self.isSpeaking = false } - private func playAudio(data: Data) async throws { - self.player?.stop() - let player = try AVAudioPlayer(data: data) - self.player = player - player.prepareToPlay() - self.logger.info("play start") - guard player.play() else { - throw NSError(domain: "TalkMode", code: 2, userInfo: [ - NSLocalizedDescriptionKey: "audio player refused to play", - ]) - } - while player.isPlaying { - try? await Task.sleep(nanoseconds: 120_000_000) - } - self.logger.info("play done") - } - private func stopSpeaking(storeInterruption: Bool = true) { guard self.isSpeaking else { return } + let interruptedAt = self.lastPlaybackWasPCM + ? PCMStreamingAudioPlayer.shared.stop() + : StreamingAudioPlayer.shared.stop() if storeInterruption { - self.lastInterruptedAtSeconds = self.player?.currentTime + self.lastInterruptedAtSeconds = interruptedAt } - self.player?.stop() - self.player = nil + _ = self.lastPlaybackWasPCM + ? StreamingAudioPlayer.shared.stop() + : PCMStreamingAudioPlayer.shared.stop() TalkSystemSpeechSynthesizer.shared.stop() self.isSpeaking = false } @@ -581,6 +584,37 @@ final class TalkModeManager: NSObject { return Self.isLikelyVoiceId(trimmed) ? trimmed : nil } + private func resolveVoiceId(preferred: String?, apiKey: String) async -> String? { + let trimmed = preferred?.trimmingCharacters(in: .whitespacesAndNewlines) ?? "" + if !trimmed.isEmpty { + if let resolved = self.resolveVoiceAlias(trimmed) { return resolved } + self.logger.warning("unknown voice alias \(trimmed, privacy: .public)") + } + if let fallbackVoiceId { return fallbackVoiceId } + + do { + let voices = try await ElevenLabsTTSClient(apiKey: apiKey).listVoices() + guard let first = voices.first else { + self.logger.warning("elevenlabs voices list empty") + return nil + } + self.fallbackVoiceId = first.voiceId + if self.defaultVoiceId == nil { + self.defaultVoiceId = first.voiceId + } + if !self.voiceOverrideActive { + self.currentVoiceId = first.voiceId + } + let name = first.name ?? "unknown" + self.logger + .info("default voice selected \(name, privacy: .public) (\(first.voiceId, privacy: .public))") + return first.voiceId + } catch { + self.logger.error("elevenlabs list voices failed: \(error.localizedDescription, privacy: .public)") + return nil + } + } + private static func isLikelyVoiceId(_ value: String) -> Bool { guard value.count >= 10 else { return false } return value.allSatisfy { $0.isLetter || $0.isNumber || $0 == "-" || $0 == "_" } @@ -598,22 +632,23 @@ final class TalkModeManager: NSObject { self.mainSessionKey = rawMainKey.isEmpty ? "main" : rawMainKey self.defaultVoiceId = (talk?["voiceId"] as? String)?.trimmingCharacters(in: .whitespacesAndNewlines) if let aliases = talk?["voiceAliases"] as? [String: Any] { - self.voiceAliases = - aliases.compactMap { key, value in - guard let id = value as? String else { return nil } - let normalizedKey = key.trimmingCharacters(in: .whitespacesAndNewlines).lowercased() - let trimmedId = id.trimmingCharacters(in: .whitespacesAndNewlines) - guard !normalizedKey.isEmpty, !trimmedId.isEmpty else { return nil } - return (normalizedKey, trimmedId) - } - .reduce(into: [:]) { $0[$1.0] = $1.1 } + var resolved: [String: String] = [:] + for (key, value) in aliases { + guard let id = value as? String else { continue } + let normalizedKey = key.trimmingCharacters(in: .whitespacesAndNewlines).lowercased() + let trimmedId = id.trimmingCharacters(in: .whitespacesAndNewlines) + guard !normalizedKey.isEmpty, !trimmedId.isEmpty else { continue } + resolved[normalizedKey] = trimmedId + } + self.voiceAliases = resolved } else { self.voiceAliases = [:] } if !self.voiceOverrideActive { self.currentVoiceId = self.defaultVoiceId } - self.defaultModelId = (talk?["modelId"] as? String)?.trimmingCharacters(in: .whitespacesAndNewlines) + let model = (talk?["modelId"] as? String)?.trimmingCharacters(in: .whitespacesAndNewlines) + self.defaultModelId = (model?.isEmpty == false) ? model : Self.defaultModelIdFallback if !self.modelOverrideActive { self.currentModelId = self.defaultModelId } @@ -624,7 +659,10 @@ final class TalkModeManager: NSObject { self.interruptOnSpeech = interrupt } } catch { - // ignore + self.defaultModelId = Self.defaultModelIdFallback + if !self.modelOverrideActive { + self.currentModelId = self.defaultModelId + } } } diff --git a/apps/macos/Sources/Clawdis/TalkModeRuntime.swift b/apps/macos/Sources/Clawdis/TalkModeRuntime.swift index 9baf17707..140c338b8 100644 --- a/apps/macos/Sources/Clawdis/TalkModeRuntime.swift +++ b/apps/macos/Sources/Clawdis/TalkModeRuntime.swift @@ -10,6 +10,7 @@ actor TalkModeRuntime { private let logger = Logger(subsystem: "com.steipete.clawdis", category: "talk.runtime") private let ttsLogger = Logger(subsystem: "com.steipete.clawdis", category: "talk.tts") + private static let defaultModelIdFallback = "eleven_v3" private final class RMSMeter: @unchecked Sendable { private let lock = NSLock() @@ -62,6 +63,7 @@ actor TalkModeRuntime { private var lastSpokenText: String? private var apiKey: String? private var fallbackVoiceId: String? + private var lastPlaybackWasPCM: Bool = false private let silenceWindow: TimeInterval = 0.7 private let minSpeechRMS: Double = 1e-3 @@ -496,7 +498,7 @@ actor TalkModeRuntime { do { if let apiKey, !apiKey.isEmpty, let voiceId { - let desiredOutputFormat = directive?.outputFormat ?? self.defaultOutputFormat + let desiredOutputFormat = directive?.outputFormat ?? self.defaultOutputFormat ?? "pcm_44100" let outputFormat = ElevenLabsTTSClient.validatedOutputFormat(desiredOutputFormat) if outputFormat == nil, let desiredOutputFormat, !desiredOutputFormat.isEmpty { self.logger @@ -504,27 +506,25 @@ actor TalkModeRuntime { "talk output_format unsupported for local playback: \(desiredOutputFormat, privacy: .public)") } + let modelId = directive?.modelId ?? self.currentModelId ?? self.defaultModelId let request = ElevenLabsTTSRequest( text: cleaned, - modelId: directive?.modelId ?? self.currentModelId ?? self.defaultModelId, + modelId: modelId, outputFormat: outputFormat, speed: TalkTTSValidation.resolveSpeed(speed: directive?.speed, rateWPM: directive?.rateWPM), - stability: TalkTTSValidation.validatedUnit(directive?.stability), + stability: TalkTTSValidation.validatedStability(directive?.stability, modelId: modelId), similarity: TalkTTSValidation.validatedUnit(directive?.similarity), style: TalkTTSValidation.validatedUnit(directive?.style), speakerBoost: directive?.speakerBoost, seed: TalkTTSValidation.validatedSeed(directive?.seed), normalize: ElevenLabsTTSClient.validatedNormalize(directive?.normalize), - language: language) + language: language, + latencyTier: TalkTTSValidation.validatedLatencyTier(directive?.latencyTier)) self.ttsLogger.info("talk TTS synth timeout=\(synthTimeoutSeconds, privacy: .public)s") let client = ElevenLabsTTSClient(apiKey: apiKey) - let audio = try await client.synthesizeWithHardTimeout( - voiceId: voiceId, - request: request, - hardTimeoutSeconds: synthTimeoutSeconds) + let stream = client.streamSynthesize(voiceId: voiceId, request: request) guard self.isCurrent(gen) else { return } - self.ttsLogger.info("talk TTS response bytes=\(audio.count, privacy: .public)") if self.interruptOnSpeech { await self.startRecognition() @@ -534,12 +534,20 @@ actor TalkModeRuntime { await MainActor.run { TalkModeController.shared.updatePhase(.speaking) } self.phase = .speaking - let result = await TalkAudioPlayer.shared.play(data: audio) + let sampleRate = TalkTTSValidation.pcmSampleRate(from: outputFormat) + let result: StreamingPlaybackResult + if let sampleRate { + self.lastPlaybackWasPCM = true + result = await PCMStreamingAudioPlayer.shared.play(stream: stream, sampleRate: sampleRate) + } else { + self.lastPlaybackWasPCM = false + result = await StreamingAudioPlayer.shared.play(stream: stream) + } self.ttsLogger .info( "talk audio result finished=\(result.finished, privacy: .public) interruptedAt=\(String(describing: result.interruptedAt), privacy: .public)") if !result.finished, result.interruptedAt == nil { - throw NSError(domain: "TalkAudioPlayer", code: 1, userInfo: [ + throw NSError(domain: "StreamingAudioPlayer", code: 1, userInfo: [ NSLocalizedDescriptionKey: "audio playback failed", ]) } @@ -631,7 +639,15 @@ actor TalkModeRuntime { } func stopSpeaking(reason: TalkStopReason) async { - let interruptedAt = await MainActor.run { TalkAudioPlayer.shared.stop() } + let interruptedAt = await MainActor.run { + let primary = self.lastPlaybackWasPCM + ? PCMStreamingAudioPlayer.shared.stop() + : StreamingAudioPlayer.shared.stop() + _ = self.lastPlaybackWasPCM + ? StreamingAudioPlayer.shared.stop() + : PCMStreamingAudioPlayer.shared.stop() + return primary + } await TalkSystemSpeechSynthesizer.shared.stop() guard self.phase == .speaking else { return } if reason == .speech, let interruptedAt { @@ -707,7 +723,8 @@ actor TalkModeRuntime { guard !key.isEmpty, !value.isEmpty else { return } acc[key] = value } ?? [:] - let model = talk?["modelId"]?.stringValue + let model = talk?["modelId"]?.stringValue?.trimmingCharacters(in: .whitespacesAndNewlines) + let resolvedModel = (model?.isEmpty == false) ? model! : Self.defaultModelIdFallback let outputFormat = talk?["outputFormat"]?.stringValue let interrupt = talk?["interruptOnSpeech"]?.boolValue let apiKey = talk?["apiKey"]?.stringValue @@ -721,7 +738,7 @@ actor TalkModeRuntime { return TalkRuntimeConfig( voiceId: resolvedVoice, voiceAliases: resolvedAliases, - modelId: model, + modelId: resolvedModel, outputFormat: outputFormat, interruptOnSpeech: interrupt ?? true, apiKey: resolvedApiKey) @@ -733,7 +750,7 @@ actor TalkModeRuntime { return TalkRuntimeConfig( voiceId: resolvedVoice, voiceAliases: [:], - modelId: nil, + modelId: Self.defaultModelIdFallback, outputFormat: nil, interruptOnSpeech: true, apiKey: resolvedApiKey) diff --git a/apps/shared/ClawdisKit/Sources/ClawdisKit/ElevenLabsTTS.swift b/apps/shared/ClawdisKit/Sources/ClawdisKit/ElevenLabsTTS.swift index c4b1e7999..38e602094 100644 --- a/apps/shared/ClawdisKit/Sources/ClawdisKit/ElevenLabsTTS.swift +++ b/apps/shared/ClawdisKit/Sources/ClawdisKit/ElevenLabsTTS.swift @@ -22,6 +22,7 @@ public struct ElevenLabsTTSRequest: Sendable { public var seed: UInt32? public var normalize: String? public var language: String? + public var latencyTier: Int? public init( text: String, @@ -34,7 +35,8 @@ public struct ElevenLabsTTSRequest: Sendable { speakerBoost: Bool? = nil, seed: UInt32? = nil, normalize: String? = nil, - language: String? = nil) + language: String? = nil, + latencyTier: Int? = nil) { self.text = text self.modelId = modelId @@ -47,6 +49,7 @@ public struct ElevenLabsTTSRequest: Sendable { self.seed = seed self.normalize = normalize self.language = language + self.latencyTier = latencyTier } } @@ -155,6 +158,72 @@ public struct ElevenLabsTTSClient: Sendable { ]) } + public func streamSynthesize( + voiceId: String, + request: ElevenLabsTTSRequest) -> AsyncThrowingStream + { + AsyncThrowingStream { continuation in + let task = Task { + do { + let url = Self.streamingURL( + baseUrl: self.baseUrl, + voiceId: voiceId, + latencyTier: request.latencyTier) + let body = try JSONSerialization.data(withJSONObject: Self.buildPayload(request), options: []) + + var req = URLRequest(url: url) + req.httpMethod = "POST" + req.httpBody = body + req.timeoutInterval = self.requestTimeoutSeconds + req.setValue("application/json", forHTTPHeaderField: "Content-Type") + req.setValue("audio/mpeg", forHTTPHeaderField: "Accept") + req.setValue(self.apiKey, forHTTPHeaderField: "xi-api-key") + + let (bytes, response) = try await URLSession.shared.bytes(for: req) + guard let http = response as? HTTPURLResponse else { + throw NSError(domain: "ElevenLabsTTS", code: 1, userInfo: [ + NSLocalizedDescriptionKey: "ElevenLabs invalid response", + ]) + } + + let contentType = (http.value(forHTTPHeaderField: "Content-Type") ?? "unknown").lowercased() + if http.statusCode >= 400 { + let message = try await Self.readErrorBody(bytes: bytes) + throw NSError(domain: "ElevenLabsTTS", code: http.statusCode, userInfo: [ + NSLocalizedDescriptionKey: "ElevenLabs failed: \(http.statusCode) ct=\(contentType) \(message)", + ]) + } + if !contentType.contains("audio") { + let message = try await Self.readErrorBody(bytes: bytes) + throw NSError(domain: "ElevenLabsTTS", code: 415, userInfo: [ + NSLocalizedDescriptionKey: "ElevenLabs returned non-audio ct=\(contentType) \(message)", + ]) + } + + var buffer = Data() + buffer.reserveCapacity(16_384) + for try await byte in bytes { + buffer.append(byte) + if buffer.count >= 8_192 { + continuation.yield(buffer) + buffer.removeAll(keepingCapacity: true) + } + } + if !buffer.isEmpty { + continuation.yield(buffer) + } + continuation.finish() + } catch { + continuation.finish(throwing: error) + } + } + + continuation.onTermination = { _ in + task.cancel() + } + } + } + public func listVoices() async throws -> [ElevenLabsVoice] { var url = self.baseUrl url.appendPathComponent("v1") @@ -180,7 +249,7 @@ public struct ElevenLabsTTSClient: Sendable { public static func validatedOutputFormat(_ value: String?) -> String? { let trimmed = (value ?? "").trimmingCharacters(in: .whitespacesAndNewlines) guard !trimmed.isEmpty else { return nil } - guard trimmed.hasPrefix("mp3_") else { return nil } + guard trimmed.hasPrefix("mp3_") || trimmed.hasPrefix("pcm_") else { return nil } return trimmed } @@ -230,4 +299,33 @@ public struct ElevenLabsTTSClient: Sendable { let raw = String(data: data.prefix(4096), encoding: .utf8) ?? "unknown" return raw.replacingOccurrences(of: "\n", with: " ").replacingOccurrences(of: "\r", with: " ") } + + private static func streamingURL(baseUrl: URL, voiceId: String, latencyTier: Int?) -> URL { + var url = baseUrl + url.appendPathComponent("v1") + url.appendPathComponent("text-to-speech") + url.appendPathComponent(voiceId) + url.appendPathComponent("stream") + + guard let latencyTier else { return url } + let latencyItem = URLQueryItem( + name: "optimize_streaming_latency", + value: "\(latencyTier)") + guard var components = URLComponents(url: url, resolvingAgainstBaseURL: false) else { + return url + } + var items = components.queryItems ?? [] + items.append(latencyItem) + components.queryItems = items + return components.url ?? url + } + + private static func readErrorBody(bytes: URLSession.AsyncBytes) async throws -> String { + var data = Data() + for try await byte in bytes { + data.append(byte) + if data.count >= 4096 { break } + } + return truncatedErrorBody(data) + } } diff --git a/apps/shared/ClawdisKit/Sources/ClawdisKit/PCMStreamingAudioPlayer.swift b/apps/shared/ClawdisKit/Sources/ClawdisKit/PCMStreamingAudioPlayer.swift new file mode 100644 index 000000000..66238cd5c --- /dev/null +++ b/apps/shared/ClawdisKit/Sources/ClawdisKit/PCMStreamingAudioPlayer.swift @@ -0,0 +1,144 @@ +import AVFoundation +import Foundation +import OSLog + +@MainActor +public final class PCMStreamingAudioPlayer { + public static let shared = PCMStreamingAudioPlayer() + + private let logger = Logger(subsystem: "com.steipete.clawdis", category: "talk.tts.pcm") + private var engine = AVAudioEngine() + private var player = AVAudioPlayerNode() + private var format: AVAudioFormat? + private var pendingBuffers: Int = 0 + private var inputFinished = false + private var continuation: CheckedContinuation? + + public init() { + self.engine.attach(self.player) + } + + public func play(stream: AsyncThrowingStream, sampleRate: Double) async -> StreamingPlaybackResult { + self.stopInternal() + + let format = AVAudioFormat( + commonFormat: .pcmFormatInt16, + sampleRate: sampleRate, + channels: 1, + interleaved: true) + + guard let format else { + return StreamingPlaybackResult(finished: false, interruptedAt: nil) + } + self.configure(format: format) + + return await withCheckedContinuation { continuation in + self.continuation = continuation + self.pendingBuffers = 0 + self.inputFinished = false + + Task.detached { [weak self] in + guard let self else { return } + do { + for try await chunk in stream { + await self.enqueuePCM(chunk, format: format) + } + await self.finishInput() + } catch { + await self.fail(error) + } + } + } + } + + public func stop() -> Double? { + let interruptedAt = self.currentTimeSeconds() + self.stopInternal() + self.finish(StreamingPlaybackResult(finished: false, interruptedAt: interruptedAt)) + return interruptedAt + } + + private func configure(format: AVAudioFormat) { + if self.format?.sampleRate != format.sampleRate || self.format?.commonFormat != format.commonFormat { + self.engine.stop() + self.engine = AVAudioEngine() + self.player = AVAudioPlayerNode() + self.engine.attach(self.player) + } + self.format = format + if self.engine.attachedNodes.contains(self.player) { + self.engine.connect(self.player, to: self.engine.mainMixerNode, format: format) + } + } + + private func enqueuePCM(_ data: Data, format: AVAudioFormat) async { + guard !data.isEmpty else { return } + let frameCount = data.count / MemoryLayout.size + guard frameCount > 0 else { return } + guard let buffer = AVAudioPCMBuffer(pcmFormat: format, frameCapacity: AVAudioFrameCount(frameCount)) else { + return + } + buffer.frameLength = AVAudioFrameCount(frameCount) + + data.withUnsafeBytes { raw in + guard let src = raw.baseAddress else { return } + let audioBuffer = buffer.audioBufferList.pointee.mBuffers + if let dst = audioBuffer.mData { + memcpy(dst, src, frameCount * MemoryLayout.size) + } + } + + self.pendingBuffers += 1 + self.player.scheduleBuffer(buffer) { [weak self] in + Task { @MainActor in + guard let self else { return } + self.pendingBuffers = max(0, self.pendingBuffers - 1) + if self.inputFinished && self.pendingBuffers == 0 { + self.finish(StreamingPlaybackResult(finished: true, interruptedAt: nil)) + } + } + } + + if !self.player.isPlaying { + do { + try self.engine.start() + self.player.play() + } catch { + self.logger.error("pcm engine start failed: \(error.localizedDescription, privacy: .public)") + self.fail(error) + } + } + } + + private func finishInput() { + self.inputFinished = true + if self.pendingBuffers == 0 { + self.finish(StreamingPlaybackResult(finished: true, interruptedAt: nil)) + } + } + + private func fail(_ error: Error) { + self.logger.error("pcm stream failed: \(error.localizedDescription, privacy: .public)") + self.finish(StreamingPlaybackResult(finished: false, interruptedAt: nil)) + } + + private func stopInternal() { + self.player.stop() + self.engine.stop() + self.pendingBuffers = 0 + self.inputFinished = false + } + + private func finish(_ result: StreamingPlaybackResult) { + let continuation = self.continuation + self.continuation = nil + continuation?.resume(returning: result) + } + + private func currentTimeSeconds() -> Double? { + guard let nodeTime = self.player.lastRenderTime, + let playerTime = self.player.playerTime(forNodeTime: nodeTime) + else { return nil } + return Double(playerTime.sampleTime) / playerTime.sampleRate + } +} diff --git a/apps/shared/ClawdisKit/Sources/ClawdisKit/StreamingAudioPlayer.swift b/apps/shared/ClawdisKit/Sources/ClawdisKit/StreamingAudioPlayer.swift new file mode 100644 index 000000000..776c59100 --- /dev/null +++ b/apps/shared/ClawdisKit/Sources/ClawdisKit/StreamingAudioPlayer.swift @@ -0,0 +1,429 @@ +import AudioToolbox +import Foundation +import OSLog + +public struct StreamingPlaybackResult: Sendable { + public let finished: Bool + public let interruptedAt: Double? + + public init(finished: Bool, interruptedAt: Double?) { + self.finished = finished + self.interruptedAt = interruptedAt + } +} + +@MainActor +public final class StreamingAudioPlayer: NSObject { + public static let shared = StreamingAudioPlayer() + + private let logger = Logger(subsystem: "com.steipete.clawdis", category: "talk.tts.stream") + private var playback: Playback? + + public func play(stream: AsyncThrowingStream) async -> StreamingPlaybackResult { + self.stopInternal() + + let playback = Playback(logger: self.logger) + self.playback = playback + + return await withCheckedContinuation { continuation in + playback.setContinuation(continuation) + playback.start() + + Task.detached { + do { + for try await chunk in stream { + playback.append(chunk) + } + playback.finishInput() + } catch { + playback.fail(error) + } + } + } + } + + public func stop() -> Double? { + guard let playback else { return nil } + let interruptedAt = playback.stop(immediate: true) + self.finish(playback: playback, result: StreamingPlaybackResult(finished: false, interruptedAt: interruptedAt)) + return interruptedAt + } + + private func stopInternal() { + guard let playback else { return } + let interruptedAt = playback.stop(immediate: true) + self.finish(playback: playback, result: StreamingPlaybackResult(finished: false, interruptedAt: interruptedAt)) + } + + private func finish(playback: Playback, result: StreamingPlaybackResult) { + playback.finish(result) + guard self.playback === playback else { return } + self.playback = nil + } +} + +private final class Playback: @unchecked Sendable { + private static let bufferCount: Int = 3 + private static let bufferSize: Int = 32 * 1024 + + private let logger: Logger + private let lock = NSLock() + private let parseQueue = DispatchQueue(label: "talk.stream.parse") + fileprivate let bufferLock = NSLock() + fileprivate let bufferSemaphore = DispatchSemaphore(value: bufferCount) + + private var continuation: CheckedContinuation? + private var finished = false + + private var audioFileStream: AudioFileStreamID? + private var audioQueue: AudioQueueRef? + fileprivate var audioFormat: AudioStreamBasicDescription? + fileprivate var maxPacketSize: UInt32 = 0 + + fileprivate var availableBuffers: [AudioQueueBufferRef] = [] + private var currentBuffer: AudioQueueBufferRef? + private var currentBufferSize: Int = 0 + private var currentPacketDescs: [AudioStreamPacketDescription] = [] + + private var isRunning = false + fileprivate var inputFinished = false + private var startRequested = false + + private var sampleRate: Double = 0 + + init(logger: Logger) { + self.logger = logger + } + + func setContinuation(_ continuation: CheckedContinuation) { + self.lock.lock() + self.continuation = continuation + self.lock.unlock() + } + + func start() { + let selfPtr = Unmanaged.passUnretained(self).toOpaque() + let status = AudioFileStreamOpen( + selfPtr, + propertyListenerProc, + packetsProc, + kAudioFileMP3Type, + &self.audioFileStream) + if status != noErr { + self.logger.error("talk stream open failed: \(status)") + self.finish(StreamingPlaybackResult(finished: false, interruptedAt: nil)) + } + } + + func append(_ data: Data) { + guard !data.isEmpty else { return } + self.parseQueue.async { [weak self] in + guard let self else { return } + guard let audioFileStream = self.audioFileStream else { return } + let status = data.withUnsafeBytes { bytes in + AudioFileStreamParseBytes( + audioFileStream, + UInt32(bytes.count), + bytes.baseAddress, + []) + } + if status != noErr { + self.logger.error("talk stream parse failed: \(status)") + self.fail(NSError(domain: "StreamingAudio", code: Int(status))) + } + } + } + + func finishInput() { + self.parseQueue.async { [weak self] in + guard let self else { return } + self.inputFinished = true + if self.audioQueue == nil { + self.finish(StreamingPlaybackResult(finished: false, interruptedAt: nil)) + return + } + self.enqueueCurrentBuffer(flushOnly: true) + self.stop(immediate: false) + } + } + + func fail(_ error: Error) { + self.logger.error("talk stream failed: \(error.localizedDescription, privacy: .public)") + _ = self.stop(immediate: true) + self.finish(StreamingPlaybackResult(finished: false, interruptedAt: nil)) + } + + func stop(immediate: Bool) -> Double? { + guard let audioQueue else { return nil } + let interruptedAt = self.currentTimeSeconds() + AudioQueueStop(audioQueue, immediate) + return interruptedAt + } + + fileprivate func finish(_ result: StreamingPlaybackResult) { + let continuation: CheckedContinuation? + self.lock.lock() + if self.finished { + continuation = nil + } else { + self.finished = true + continuation = self.continuation + self.continuation = nil + } + self.lock.unlock() + + continuation?.resume(returning: result) + self.teardown() + } + + private func teardown() { + if let audioQueue { + AudioQueueDispose(audioQueue, true) + self.audioQueue = nil + } + if let audioFileStream { + AudioFileStreamClose(audioFileStream) + self.audioFileStream = nil + } + self.bufferLock.lock() + self.availableBuffers.removeAll() + self.bufferLock.unlock() + self.currentBuffer = nil + self.currentPacketDescs.removeAll() + } + + fileprivate func setupQueueIfNeeded(_ asbd: AudioStreamBasicDescription) { + guard self.audioQueue == nil else { return } + + var format = asbd + self.audioFormat = format + self.sampleRate = format.mSampleRate + + let selfPtr = Unmanaged.passUnretained(self).toOpaque() + let status = AudioQueueNewOutput( + &format, + outputCallbackProc, + selfPtr, + nil, + nil, + 0, + &self.audioQueue) + if status != noErr { + self.logger.error("talk queue create failed: \(status)") + self.finish(StreamingPlaybackResult(finished: false, interruptedAt: nil)) + return + } + + if let audioQueue { + AudioQueueAddPropertyListener(audioQueue, kAudioQueueProperty_IsRunning, isRunningCallbackProc, selfPtr) + } + + if let audioFileStream { + var cookieSize: UInt32 = 0 + var writable: DarwinBoolean = false + let cookieStatus = AudioFileStreamGetPropertyInfo( + audioFileStream, + kAudioFileStreamProperty_MagicCookieData, + &cookieSize, + &writable) + if cookieStatus == noErr, cookieSize > 0, let audioQueue { + var cookie = [UInt8](repeating: 0, count: Int(cookieSize)) + let readStatus = AudioFileStreamGetProperty( + audioFileStream, + kAudioFileStreamProperty_MagicCookieData, + &cookieSize, + &cookie) + if readStatus == noErr { + AudioQueueSetProperty(audioQueue, kAudioQueueProperty_MagicCookie, cookie, cookieSize) + } + } + } + + if let audioQueue { + for _ in 0.. 0 else { return } + + buffer.pointee.mAudioDataByteSize = UInt32(self.currentBufferSize) + let packetCount = UInt32(self.currentPacketDescs.count) + + let status = self.currentPacketDescs.withUnsafeBufferPointer { descPtr in + AudioQueueEnqueueBuffer(audioQueue, buffer, packetCount, descPtr.baseAddress) + } + if status != noErr { + self.logger.error("talk queue enqueue failed: \(status)") + } else { + if !self.startRequested { + self.startRequested = true + let startStatus = AudioQueueStart(audioQueue, nil) + if startStatus != noErr { + self.logger.error("talk queue start failed: \(startStatus)") + } + } + } + + self.currentBuffer = nil + self.currentBufferSize = 0 + self.currentPacketDescs.removeAll(keepingCapacity: true) + if !flushOnly { + self.bufferSemaphore.wait() + self.bufferLock.lock() + let next = self.availableBuffers.popLast() + self.bufferLock.unlock() + if let next { self.currentBuffer = next } + } + } + + fileprivate func handlePackets( + numberBytes: UInt32, + numberPackets: UInt32, + inputData: UnsafeRawPointer, + packetDescriptions: UnsafeMutablePointer?) + { + if self.audioQueue == nil, let format = self.audioFormat { + self.setupQueueIfNeeded(format) + } + + if self.audioQueue == nil { + return + } + + if self.currentBuffer == nil { + self.bufferSemaphore.wait() + self.bufferLock.lock() + self.currentBuffer = self.availableBuffers.popLast() + self.bufferLock.unlock() + self.currentBufferSize = 0 + self.currentPacketDescs.removeAll(keepingCapacity: true) + } + + let bytes = inputData.assumingMemoryBound(to: UInt8.self) + let packetCount = Int(numberPackets) + for index in 0.. Self.bufferSize { + continue + } + + if self.currentBufferSize + packetSize > Self.bufferSize { + self.enqueueCurrentBuffer() + } + + guard let buffer = self.currentBuffer else { continue } + let dest = buffer.pointee.mAudioData.advanced(by: self.currentBufferSize) + memcpy(dest, bytes.advanced(by: packetOffset), packetSize) + + let desc = AudioStreamPacketDescription( + mStartOffset: Int64(self.currentBufferSize), + mVariableFramesInPacket: 0, + mDataByteSize: UInt32(packetSize)) + self.currentPacketDescs.append(desc) + self.currentBufferSize += packetSize + } + } + + private func currentTimeSeconds() -> Double? { + guard let audioQueue, sampleRate > 0 else { return nil } + var timeStamp = AudioTimeStamp() + let status = AudioQueueGetCurrentTime(audioQueue, nil, &timeStamp, nil) + if status != noErr { return nil } + if timeStamp.mSampleTime.isNaN { return nil } + return timeStamp.mSampleTime / sampleRate + } +} + +private func propertyListenerProc( + inClientData: UnsafeMutableRawPointer, + inAudioFileStream: AudioFileStreamID, + inPropertyID: AudioFileStreamPropertyID, + ioFlags: UnsafeMutablePointer) +{ + let playback = Unmanaged.fromOpaque(inClientData).takeUnretainedValue() + + if inPropertyID == kAudioFileStreamProperty_DataFormat { + var format = AudioStreamBasicDescription() + var size = UInt32(MemoryLayout.size) + let status = AudioFileStreamGetProperty(inAudioFileStream, inPropertyID, &size, &format) + if status == noErr { + playback.audioFormat = format + playback.setupQueueIfNeeded(format) + } + } else if inPropertyID == kAudioFileStreamProperty_PacketSizeUpperBound { + var maxPacketSize: UInt32 = 0 + var size = UInt32(MemoryLayout.size) + let status = AudioFileStreamGetProperty(inAudioFileStream, inPropertyID, &size, &maxPacketSize) + if status == noErr { + playback.maxPacketSize = maxPacketSize + } + } +} + +private func packetsProc( + inClientData: UnsafeMutableRawPointer, + inNumberBytes: UInt32, + inNumberPackets: UInt32, + inInputData: UnsafeRawPointer, + inPacketDescriptions: UnsafeMutablePointer?) +{ + let playback = Unmanaged.fromOpaque(inClientData).takeUnretainedValue() + playback.handlePackets( + numberBytes: inNumberBytes, + numberPackets: inNumberPackets, + inputData: inInputData, + packetDescriptions: inPacketDescriptions) +} + +private func outputCallbackProc( + inUserData: UnsafeMutableRawPointer?, + inAQ: AudioQueueRef, + inBuffer: AudioQueueBufferRef) +{ + guard let inUserData else { return } + let playback = Unmanaged.fromOpaque(inUserData).takeUnretainedValue() + playback.bufferLock.lock() + playback.availableBuffers.append(inBuffer) + playback.bufferLock.unlock() + playback.bufferSemaphore.signal() +} + +private func isRunningCallbackProc( + inUserData: UnsafeMutableRawPointer?, + inAQ: AudioQueueRef, + inID: AudioQueuePropertyID) +{ + guard let inUserData else { return } + guard inID == kAudioQueueProperty_IsRunning else { return } + + let playback = Unmanaged.fromOpaque(inUserData).takeUnretainedValue() + var running: UInt32 = 0 + var size = UInt32(MemoryLayout.size) + let status = AudioQueueGetProperty(inAQ, kAudioQueueProperty_IsRunning, &running, &size) + if status != noErr { return } + + if running == 0, playback.inputFinished { + playback.finish(StreamingPlaybackResult(finished: true, interruptedAt: nil)) + } +} diff --git a/apps/shared/ClawdisKit/Sources/ClawdisKit/TalkTTSValidation.swift b/apps/shared/ClawdisKit/Sources/ClawdisKit/TalkTTSValidation.swift index 8137998ac..e010f9197 100644 --- a/apps/shared/ClawdisKit/Sources/ClawdisKit/TalkTTSValidation.swift +++ b/apps/shared/ClawdisKit/Sources/ClawdisKit/TalkTTSValidation.swift @@ -1,4 +1,6 @@ public enum TalkTTSValidation: Sendable { + private static let v3StabilityValues: Set = [0.0, 0.5, 1.0] + public static func resolveSpeed(speed: Double?, rateWPM: Int?) -> Double? { if let rateWPM, rateWPM > 0 { let resolved = Double(rateWPM) / 175.0 @@ -18,10 +20,32 @@ public enum TalkTTSValidation: Sendable { return value } + public static func validatedStability(_ value: Double?, modelId: String?) -> Double? { + guard let value else { return nil } + let normalizedModel = (modelId ?? "").trimmingCharacters(in: .whitespacesAndNewlines).lowercased() + if normalizedModel == "eleven_v3" { + return v3StabilityValues.contains(value) ? value : nil + } + return validatedUnit(value) + } + public static func validatedSeed(_ value: Int?) -> UInt32? { guard let value else { return nil } if value < 0 || value > 4294967295 { return nil } return UInt32(value) } -} + public static func validatedLatencyTier(_ value: Int?) -> Int? { + guard let value else { return nil } + if value < 0 || value > 4 { return nil } + return value + } + + public static func pcmSampleRate(from outputFormat: String?) -> Double? { + let trimmed = (outputFormat ?? "").trimmingCharacters(in: .whitespacesAndNewlines).lowercased() + guard trimmed.hasPrefix("pcm_") else { return nil } + let parts = trimmed.split(separator: "_", maxSplits: 1) + guard parts.count == 2, let rate = Double(parts[1]), rate > 0 else { return nil } + return rate + } +} diff --git a/apps/shared/ClawdisKit/Tests/ClawdisKitTests/TalkTTSValidationTests.swift b/apps/shared/ClawdisKit/Tests/ClawdisKitTests/TalkTTSValidationTests.swift index f2d7c3c9e..d3426d768 100644 --- a/apps/shared/ClawdisKit/Tests/ClawdisKitTests/TalkTTSValidationTests.swift +++ b/apps/shared/ClawdisKit/Tests/ClawdisKitTests/TalkTTSValidationTests.swift @@ -16,9 +16,30 @@ final class TalkTTSValidationTests: XCTestCase { XCTAssertNil(TalkTTSValidation.validatedUnit(1.01)) } + func testValidatedStability() { + XCTAssertEqual(TalkTTSValidation.validatedStability(0, modelId: "eleven_v3"), 0) + XCTAssertEqual(TalkTTSValidation.validatedStability(0.5, modelId: "eleven_v3"), 0.5) + XCTAssertEqual(TalkTTSValidation.validatedStability(1, modelId: "eleven_v3"), 1) + XCTAssertNil(TalkTTSValidation.validatedStability(0.7, modelId: "eleven_v3")) + XCTAssertEqual(TalkTTSValidation.validatedStability(0.7, modelId: "eleven_multilingual_v2"), 0.7) + } + func testValidatedSeedBounds() { XCTAssertEqual(TalkTTSValidation.validatedSeed(0), 0) XCTAssertEqual(TalkTTSValidation.validatedSeed(1234), 1234) XCTAssertNil(TalkTTSValidation.validatedSeed(-1)) } + + func testValidatedLatencyTier() { + XCTAssertEqual(TalkTTSValidation.validatedLatencyTier(0), 0) + XCTAssertEqual(TalkTTSValidation.validatedLatencyTier(4), 4) + XCTAssertNil(TalkTTSValidation.validatedLatencyTier(-1)) + XCTAssertNil(TalkTTSValidation.validatedLatencyTier(5)) + } + + func testPcmSampleRateParse() { + XCTAssertEqual(TalkTTSValidation.pcmSampleRate(from: "pcm_44100"), 44100) + XCTAssertNil(TalkTTSValidation.pcmSampleRate(from: "mp3_44100_128")) + XCTAssertNil(TalkTTSValidation.pcmSampleRate(from: "pcm_bad")) + } } diff --git a/docs/talk.md b/docs/talk.md index 41f8239a4..e271d0c68 100644 --- a/docs/talk.md +++ b/docs/talk.md @@ -10,7 +10,7 @@ Talk mode is a continuous voice conversation loop: 1) Listen for speech 2) Send transcript to the model (main session, chat.send) 3) Wait for the response -4) Speak it via ElevenLabs +4) Speak it via ElevenLabs (streaming playback) ## Behavior (macOS) - **Always-on overlay** while Talk mode is enabled. @@ -55,8 +55,10 @@ Supported keys: Defaults: - `interruptOnSpeech`: true -- `voiceId`: falls back to `ELEVENLABS_VOICE_ID` / `SAG_VOICE_ID` +- `voiceId`: falls back to `ELEVENLABS_VOICE_ID` / `SAG_VOICE_ID` (or first ElevenLabs voice when API key is available) +- `modelId`: defaults to `eleven_v3` when unset - `apiKey`: falls back to `ELEVENLABS_API_KEY` (or gateway shell profile if available) +- `outputFormat`: defaults to `pcm_44100` on macOS/iOS for faster streaming playback (Android stays on MP3) ## macOS UI - Menu bar toggle: **Talk** @@ -71,4 +73,6 @@ Defaults: ## Notes - Requires Speech + Microphone permissions. - Uses `chat.send` against session key `main`. -- TTS uses ElevenLabs API with `ELEVENLABS_API_KEY`. +- TTS uses ElevenLabs streaming API with `ELEVENLABS_API_KEY` and incremental playback on macOS/iOS/Android for lower latency. +- `stability` for `eleven_v3` is validated to `0.0`, `0.5`, or `1.0`; other models accept `0..1`. +- `latency_tier` is validated to `0..4` when set.