diff --git a/CHANGELOG.md b/CHANGELOG.md index 9ef37aef9..656abc4cb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -51,6 +51,7 @@ Docs: https://docs.clawd.bot - Gateway: store lock files in the temp directory to avoid stale locks on persistent volumes. (#1676) - macOS: default direct-transport `ws://` URLs to port 18789; document `gateway.remote.transport`. (#1603) Thanks @ngutman. - Voice Call: return stream TwiML for outbound conversation calls on initial Twilio webhook. (#1634) +- Voice Call: serialize Twilio TTS playback and cancel on barge-in to prevent overlap. (#1713) Thanks @dguido. - Google Chat: tighten email allowlist matching, typing cleanup, media caps, and onboarding/docs/tests. (#1635) Thanks @iHildy. - Google Chat: normalize space targets without double `spaces/` prefix. - Messaging: keep newline chunking safe for fenced markdown blocks across channels. diff --git a/extensions/voice-call/src/media-stream.test.ts b/extensions/voice-call/src/media-stream.test.ts new file mode 100644 index 000000000..773445121 --- /dev/null +++ b/extensions/voice-call/src/media-stream.test.ts @@ -0,0 +1,97 @@ +import { describe, expect, it } from "vitest"; + +import type { + OpenAIRealtimeSTTProvider, + RealtimeSTTSession, +} from "./providers/stt-openai-realtime.js"; +import { MediaStreamHandler } from "./media-stream.js"; + +const createStubSession = (): RealtimeSTTSession => ({ + connect: async () => {}, + sendAudio: () => {}, + waitForTranscript: async () => "", + onPartial: () => {}, + onTranscript: () => {}, + onSpeechStart: () => {}, + close: () => {}, + isConnected: () => true, +}); + +const createStubSttProvider = (): OpenAIRealtimeSTTProvider => + ({ + createSession: () => createStubSession(), + }) as unknown as OpenAIRealtimeSTTProvider; + +const flush = async (): Promise => { + await new Promise((resolve) => setTimeout(resolve, 0)); +}; + +const waitForAbort = (signal: AbortSignal): Promise => + new Promise((resolve) => { + if (signal.aborted) { + resolve(); + return; + } + signal.addEventListener("abort", () => resolve(), { once: true }); + }); + +describe("MediaStreamHandler TTS queue", () => { + it("serializes TTS playback and resolves in order", async () => { + const handler = new MediaStreamHandler({ + sttProvider: createStubSttProvider(), + }); + const started: number[] = []; + const finished: number[] = []; + + let resolveFirst!: () => void; + const firstGate = new Promise((resolve) => { + resolveFirst = resolve; + }); + + const first = handler.queueTts("stream-1", async () => { + started.push(1); + await firstGate; + finished.push(1); + }); + const second = handler.queueTts("stream-1", async () => { + started.push(2); + finished.push(2); + }); + + await flush(); + expect(started).toEqual([1]); + + resolveFirst(); + await first; + await second; + + expect(started).toEqual([1, 2]); + expect(finished).toEqual([1, 2]); + }); + + it("cancels active playback and clears queued items", async () => { + const handler = new MediaStreamHandler({ + sttProvider: createStubSttProvider(), + }); + + let queuedRan = false; + const started: string[] = []; + + const active = handler.queueTts("stream-1", async (signal) => { + started.push("active"); + await waitForAbort(signal); + }); + void handler.queueTts("stream-1", async () => { + queuedRan = true; + }); + + await flush(); + expect(started).toEqual(["active"]); + + handler.clearTtsQueue("stream-1"); + await active; + await flush(); + + expect(queuedRan).toBe(false); + }); +}); diff --git a/extensions/voice-call/src/media-stream.ts b/extensions/voice-call/src/media-stream.ts index 252b6b331..e14dc9137 100644 --- a/extensions/voice-call/src/media-stream.ts +++ b/extensions/voice-call/src/media-stream.ts @@ -29,6 +29,8 @@ export interface MediaStreamConfig { onPartialTranscript?: (callId: string, partial: string) => void; /** Callback when stream connects */ onConnect?: (callId: string, streamSid: string) => void; + /** Callback when speech starts (barge-in) */ + onSpeechStart?: (callId: string) => void; /** Callback when stream disconnects */ onDisconnect?: (callId: string) => void; } @@ -43,6 +45,13 @@ interface StreamSession { sttSession: RealtimeSTTSession; } +type TtsQueueEntry = { + playFn: (signal: AbortSignal) => Promise; + controller: AbortController; + resolve: () => void; + reject: (error: unknown) => void; +}; + /** * Manages WebSocket connections for Twilio media streams. */ @@ -50,6 +59,12 @@ export class MediaStreamHandler { private wss: WebSocketServer | null = null; private sessions = new Map(); private config: MediaStreamConfig; + /** TTS playback queues per stream (serialize audio to prevent overlap) */ + private ttsQueues = new Map(); + /** Whether TTS is currently playing per stream */ + private ttsPlaying = new Map(); + /** Active TTS playback controllers per stream */ + private ttsActiveControllers = new Map(); constructor(config: MediaStreamConfig) { this.config = config; @@ -148,6 +163,10 @@ export class MediaStreamHandler { this.config.onTranscript?.(callSid, transcript); }); + sttSession.onSpeechStart(() => { + this.config.onSpeechStart?.(callSid); + }); + const session: StreamSession = { callId: callSid, streamSid, @@ -177,6 +196,7 @@ export class MediaStreamHandler { private handleStop(session: StreamSession): void { console.log(`[MediaStream] Stream stopped: ${session.streamSid}`); + this.clearTtsState(session.streamSid); session.sttSession.close(); this.sessions.delete(session.streamSid); this.config.onDisconnect?.(session.callId); @@ -228,6 +248,46 @@ export class MediaStreamHandler { this.sendToStream(streamSid, { event: "clear", streamSid }); } + /** + * Queue a TTS operation for sequential playback. + * Only one TTS operation plays at a time per stream to prevent overlap. + */ + async queueTts( + streamSid: string, + playFn: (signal: AbortSignal) => Promise, + ): Promise { + const queue = this.getTtsQueue(streamSid); + let resolveEntry: () => void; + let rejectEntry: (error: unknown) => void; + const promise = new Promise((resolve, reject) => { + resolveEntry = resolve; + rejectEntry = reject; + }); + + queue.push({ + playFn, + controller: new AbortController(), + resolve: resolveEntry!, + reject: rejectEntry!, + }); + + if (!this.ttsPlaying.get(streamSid)) { + void this.processQueue(streamSid); + } + + return promise; + } + + /** + * Clear TTS queue and interrupt current playback (barge-in). + */ + clearTtsQueue(streamSid: string): void { + const queue = this.getTtsQueue(streamSid); + queue.length = 0; + this.ttsActiveControllers.get(streamSid)?.abort(); + this.clearAudio(streamSid); + } + /** * Get active session by call ID. */ @@ -242,11 +302,65 @@ export class MediaStreamHandler { */ closeAll(): void { for (const session of this.sessions.values()) { + this.clearTtsState(session.streamSid); session.sttSession.close(); session.ws.close(); } this.sessions.clear(); } + + private getTtsQueue(streamSid: string): TtsQueueEntry[] { + const existing = this.ttsQueues.get(streamSid); + if (existing) return existing; + const queue: TtsQueueEntry[] = []; + this.ttsQueues.set(streamSid, queue); + return queue; + } + + /** + * Process the TTS queue for a stream. + * Uses iterative approach to avoid stack accumulation from recursion. + */ + private async processQueue(streamSid: string): Promise { + this.ttsPlaying.set(streamSid, true); + + while (true) { + const queue = this.ttsQueues.get(streamSid); + if (!queue || queue.length === 0) { + this.ttsPlaying.set(streamSid, false); + this.ttsActiveControllers.delete(streamSid); + return; + } + + const entry = queue.shift()!; + this.ttsActiveControllers.set(streamSid, entry.controller); + + try { + await entry.playFn(entry.controller.signal); + entry.resolve(); + } catch (error) { + if (entry.controller.signal.aborted) { + entry.resolve(); + } else { + console.error("[MediaStream] TTS playback error:", error); + entry.reject(error); + } + } finally { + if (this.ttsActiveControllers.get(streamSid) === entry.controller) { + this.ttsActiveControllers.delete(streamSid); + } + } + } + } + + private clearTtsState(streamSid: string): void { + const queue = this.ttsQueues.get(streamSid); + if (queue) queue.length = 0; + this.ttsActiveControllers.get(streamSid)?.abort(); + this.ttsActiveControllers.delete(streamSid); + this.ttsPlaying.delete(streamSid); + this.ttsQueues.delete(streamSid); + } } /** diff --git a/extensions/voice-call/src/providers/stt-openai-realtime.ts b/extensions/voice-call/src/providers/stt-openai-realtime.ts index 01c698f21..5cd52658d 100644 --- a/extensions/voice-call/src/providers/stt-openai-realtime.ts +++ b/extensions/voice-call/src/providers/stt-openai-realtime.ts @@ -38,6 +38,8 @@ export interface RealtimeSTTSession { onPartial(callback: (partial: string) => void): void; /** Set callback for final transcripts */ onTranscript(callback: (transcript: string) => void): void; + /** Set callback when speech starts (VAD) */ + onSpeechStart(callback: () => void): void; /** Close the session */ close(): void; /** Check if session is connected */ @@ -91,6 +93,7 @@ class OpenAIRealtimeSTTSession implements RealtimeSTTSession { private pendingTranscript = ""; private onTranscriptCallback: ((transcript: string) => void) | null = null; private onPartialCallback: ((partial: string) => void) | null = null; + private onSpeechStartCallback: (() => void) | null = null; constructor( private readonly apiKey: string, @@ -243,6 +246,7 @@ class OpenAIRealtimeSTTSession implements RealtimeSTTSession { case "input_audio_buffer.speech_started": console.log("[RealtimeSTT] Speech started"); this.pendingTranscript = ""; + this.onSpeechStartCallback?.(); break; case "error": @@ -273,6 +277,10 @@ class OpenAIRealtimeSTTSession implements RealtimeSTTSession { this.onTranscriptCallback = callback; } + onSpeechStart(callback: () => void): void { + this.onSpeechStartCallback = callback; + } + async waitForTranscript(timeoutMs = 30000): Promise { return new Promise((resolve, reject) => { const timeout = setTimeout(() => { diff --git a/extensions/voice-call/src/providers/twilio.ts b/extensions/voice-call/src/providers/twilio.ts index 8e400f82f..be9dd6eda 100644 --- a/extensions/voice-call/src/providers/twilio.ts +++ b/extensions/voice-call/src/providers/twilio.ts @@ -135,6 +135,17 @@ export class TwilioProvider implements VoiceCallProvider { this.callStreamMap.delete(callSid); } + /** + * Clear TTS queue for a call (barge-in). + * Used when user starts speaking to interrupt current TTS playback. + */ + clearTtsQueue(callSid: string): void { + const streamSid = this.callStreamMap.get(callSid); + if (streamSid && this.mediaStreamHandler) { + this.mediaStreamHandler.clearTtsQueue(streamSid); + } + } + /** * Make an authenticated request to the Twilio API. */ @@ -504,7 +515,7 @@ export class TwilioProvider implements VoiceCallProvider { /** * Play TTS via core TTS and Twilio Media Streams. * Generates audio with core TTS, converts to mu-law, and streams via WebSocket. - * Uses a jitter buffer to smooth out timing variations. + * Uses a queue to serialize playback and prevent overlapping audio. */ private async playTtsViaStream( text: string, @@ -514,22 +525,29 @@ export class TwilioProvider implements VoiceCallProvider { throw new Error("TTS provider and media stream handler required"); } - // Generate audio with core TTS (returns mu-law at 8kHz) - const muLawAudio = await this.ttsProvider.synthesizeForTelephony(text); - // Stream audio in 20ms chunks (160 bytes at 8kHz mu-law) const CHUNK_SIZE = 160; const CHUNK_DELAY_MS = 20; - for (const chunk of chunkAudio(muLawAudio, CHUNK_SIZE)) { - this.mediaStreamHandler.sendAudio(streamSid, chunk); + const handler = this.mediaStreamHandler; + const ttsProvider = this.ttsProvider; + await handler.queueTts(streamSid, async (signal) => { + // Generate audio with core TTS (returns mu-law at 8kHz) + const muLawAudio = await ttsProvider.synthesizeForTelephony(text); + for (const chunk of chunkAudio(muLawAudio, CHUNK_SIZE)) { + if (signal.aborted) break; + handler.sendAudio(streamSid, chunk); - // Pace the audio to match real-time playback - await new Promise((resolve) => setTimeout(resolve, CHUNK_DELAY_MS)); - } + // Pace the audio to match real-time playback + await new Promise((resolve) => setTimeout(resolve, CHUNK_DELAY_MS)); + if (signal.aborted) break; + } - // Send a mark to track when audio finishes - this.mediaStreamHandler.sendMark(streamSid, `tts-${Date.now()}`); + if (!signal.aborted) { + // Send a mark to track when audio finishes + handler.sendMark(streamSid, `tts-${Date.now()}`); + } + }); } /** diff --git a/extensions/voice-call/src/webhook.ts b/extensions/voice-call/src/webhook.ts index c69436d77..6ab4d0eed 100644 --- a/extensions/voice-call/src/webhook.ts +++ b/extensions/voice-call/src/webhook.ts @@ -78,6 +78,11 @@ export class VoiceCallWebhookServer { `[voice-call] Transcript for ${providerCallId}: ${transcript}`, ); + // Clear TTS queue on barge-in (user started speaking, interrupt current playback) + if (this.provider.name === "twilio") { + (this.provider as TwilioProvider).clearTtsQueue(providerCallId); + } + // Look up our internal call ID from the provider call ID const call = this.manager.getCallByProviderCallId(providerCallId); if (!call) { @@ -109,6 +114,11 @@ export class VoiceCallWebhookServer { }); } }, + onSpeechStart: (providerCallId) => { + if (this.provider.name === "twilio") { + (this.provider as TwilioProvider).clearTtsQueue(providerCallId); + } + }, onPartialTranscript: (callId, partial) => { console.log(`[voice-call] Partial for ${callId}: ${partial}`); },