fix(voice-call): prevent audio overlap with TTS queue (#1713)

* fix(voice-call): prevent audio overlap with TTS queue

Add a TTS queue to serialize audio playback and prevent overlapping
speech during voice calls. Previously, concurrent speak() calls could
send audio chunks simultaneously, causing garbled/choppy output.

Changes:
- Add queueTts() to MediaStreamHandler for sequential TTS playback
- Wrap playTtsViaStream() audio sending in the queue
- Clear queue on barge-in (when user starts speaking)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

* fix(voice-call): use iterative queue processing to prevent heap exhaustion

The recursive processQueue() pattern accumulated stack frames, causing
JavaScript heap out of memory errors on macOS CI. Convert to while loop
for constant stack usage regardless of queue depth.

* fix: prevent voice-call TTS overlap (#1713) (thanks @dguido)

---------

Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>
Co-authored-by: Peter Steinberger <steipete@gmail.com>
This commit is contained in:
Dan Guido
2026-01-25 07:02:17 -05:00
committed by GitHub
parent 875b018ea1
commit 101d0f451f
6 changed files with 259 additions and 11 deletions

View File

@@ -0,0 +1,97 @@
import { describe, expect, it } from "vitest";
import type {
OpenAIRealtimeSTTProvider,
RealtimeSTTSession,
} from "./providers/stt-openai-realtime.js";
import { MediaStreamHandler } from "./media-stream.js";
const createStubSession = (): RealtimeSTTSession => ({
connect: async () => {},
sendAudio: () => {},
waitForTranscript: async () => "",
onPartial: () => {},
onTranscript: () => {},
onSpeechStart: () => {},
close: () => {},
isConnected: () => true,
});
const createStubSttProvider = (): OpenAIRealtimeSTTProvider =>
({
createSession: () => createStubSession(),
}) as unknown as OpenAIRealtimeSTTProvider;
const flush = async (): Promise<void> => {
await new Promise((resolve) => setTimeout(resolve, 0));
};
const waitForAbort = (signal: AbortSignal): Promise<void> =>
new Promise((resolve) => {
if (signal.aborted) {
resolve();
return;
}
signal.addEventListener("abort", () => resolve(), { once: true });
});
describe("MediaStreamHandler TTS queue", () => {
it("serializes TTS playback and resolves in order", async () => {
const handler = new MediaStreamHandler({
sttProvider: createStubSttProvider(),
});
const started: number[] = [];
const finished: number[] = [];
let resolveFirst!: () => void;
const firstGate = new Promise<void>((resolve) => {
resolveFirst = resolve;
});
const first = handler.queueTts("stream-1", async () => {
started.push(1);
await firstGate;
finished.push(1);
});
const second = handler.queueTts("stream-1", async () => {
started.push(2);
finished.push(2);
});
await flush();
expect(started).toEqual([1]);
resolveFirst();
await first;
await second;
expect(started).toEqual([1, 2]);
expect(finished).toEqual([1, 2]);
});
it("cancels active playback and clears queued items", async () => {
const handler = new MediaStreamHandler({
sttProvider: createStubSttProvider(),
});
let queuedRan = false;
const started: string[] = [];
const active = handler.queueTts("stream-1", async (signal) => {
started.push("active");
await waitForAbort(signal);
});
void handler.queueTts("stream-1", async () => {
queuedRan = true;
});
await flush();
expect(started).toEqual(["active"]);
handler.clearTtsQueue("stream-1");
await active;
await flush();
expect(queuedRan).toBe(false);
});
});

View File

@@ -29,6 +29,8 @@ export interface MediaStreamConfig {
onPartialTranscript?: (callId: string, partial: string) => void;
/** Callback when stream connects */
onConnect?: (callId: string, streamSid: string) => void;
/** Callback when speech starts (barge-in) */
onSpeechStart?: (callId: string) => void;
/** Callback when stream disconnects */
onDisconnect?: (callId: string) => void;
}
@@ -43,6 +45,13 @@ interface StreamSession {
sttSession: RealtimeSTTSession;
}
type TtsQueueEntry = {
playFn: (signal: AbortSignal) => Promise<void>;
controller: AbortController;
resolve: () => void;
reject: (error: unknown) => void;
};
/**
* Manages WebSocket connections for Twilio media streams.
*/
@@ -50,6 +59,12 @@ export class MediaStreamHandler {
private wss: WebSocketServer | null = null;
private sessions = new Map<string, StreamSession>();
private config: MediaStreamConfig;
/** TTS playback queues per stream (serialize audio to prevent overlap) */
private ttsQueues = new Map<string, TtsQueueEntry[]>();
/** Whether TTS is currently playing per stream */
private ttsPlaying = new Map<string, boolean>();
/** Active TTS playback controllers per stream */
private ttsActiveControllers = new Map<string, AbortController>();
constructor(config: MediaStreamConfig) {
this.config = config;
@@ -148,6 +163,10 @@ export class MediaStreamHandler {
this.config.onTranscript?.(callSid, transcript);
});
sttSession.onSpeechStart(() => {
this.config.onSpeechStart?.(callSid);
});
const session: StreamSession = {
callId: callSid,
streamSid,
@@ -177,6 +196,7 @@ export class MediaStreamHandler {
private handleStop(session: StreamSession): void {
console.log(`[MediaStream] Stream stopped: ${session.streamSid}`);
this.clearTtsState(session.streamSid);
session.sttSession.close();
this.sessions.delete(session.streamSid);
this.config.onDisconnect?.(session.callId);
@@ -228,6 +248,46 @@ export class MediaStreamHandler {
this.sendToStream(streamSid, { event: "clear", streamSid });
}
/**
* Queue a TTS operation for sequential playback.
* Only one TTS operation plays at a time per stream to prevent overlap.
*/
async queueTts(
streamSid: string,
playFn: (signal: AbortSignal) => Promise<void>,
): Promise<void> {
const queue = this.getTtsQueue(streamSid);
let resolveEntry: () => void;
let rejectEntry: (error: unknown) => void;
const promise = new Promise<void>((resolve, reject) => {
resolveEntry = resolve;
rejectEntry = reject;
});
queue.push({
playFn,
controller: new AbortController(),
resolve: resolveEntry!,
reject: rejectEntry!,
});
if (!this.ttsPlaying.get(streamSid)) {
void this.processQueue(streamSid);
}
return promise;
}
/**
* Clear TTS queue and interrupt current playback (barge-in).
*/
clearTtsQueue(streamSid: string): void {
const queue = this.getTtsQueue(streamSid);
queue.length = 0;
this.ttsActiveControllers.get(streamSid)?.abort();
this.clearAudio(streamSid);
}
/**
* Get active session by call ID.
*/
@@ -242,11 +302,65 @@ export class MediaStreamHandler {
*/
closeAll(): void {
for (const session of this.sessions.values()) {
this.clearTtsState(session.streamSid);
session.sttSession.close();
session.ws.close();
}
this.sessions.clear();
}
private getTtsQueue(streamSid: string): TtsQueueEntry[] {
const existing = this.ttsQueues.get(streamSid);
if (existing) return existing;
const queue: TtsQueueEntry[] = [];
this.ttsQueues.set(streamSid, queue);
return queue;
}
/**
* Process the TTS queue for a stream.
* Uses iterative approach to avoid stack accumulation from recursion.
*/
private async processQueue(streamSid: string): Promise<void> {
this.ttsPlaying.set(streamSid, true);
while (true) {
const queue = this.ttsQueues.get(streamSid);
if (!queue || queue.length === 0) {
this.ttsPlaying.set(streamSid, false);
this.ttsActiveControllers.delete(streamSid);
return;
}
const entry = queue.shift()!;
this.ttsActiveControllers.set(streamSid, entry.controller);
try {
await entry.playFn(entry.controller.signal);
entry.resolve();
} catch (error) {
if (entry.controller.signal.aborted) {
entry.resolve();
} else {
console.error("[MediaStream] TTS playback error:", error);
entry.reject(error);
}
} finally {
if (this.ttsActiveControllers.get(streamSid) === entry.controller) {
this.ttsActiveControllers.delete(streamSid);
}
}
}
}
private clearTtsState(streamSid: string): void {
const queue = this.ttsQueues.get(streamSid);
if (queue) queue.length = 0;
this.ttsActiveControllers.get(streamSid)?.abort();
this.ttsActiveControllers.delete(streamSid);
this.ttsPlaying.delete(streamSid);
this.ttsQueues.delete(streamSid);
}
}
/**

View File

@@ -38,6 +38,8 @@ export interface RealtimeSTTSession {
onPartial(callback: (partial: string) => void): void;
/** Set callback for final transcripts */
onTranscript(callback: (transcript: string) => void): void;
/** Set callback when speech starts (VAD) */
onSpeechStart(callback: () => void): void;
/** Close the session */
close(): void;
/** Check if session is connected */
@@ -91,6 +93,7 @@ class OpenAIRealtimeSTTSession implements RealtimeSTTSession {
private pendingTranscript = "";
private onTranscriptCallback: ((transcript: string) => void) | null = null;
private onPartialCallback: ((partial: string) => void) | null = null;
private onSpeechStartCallback: (() => void) | null = null;
constructor(
private readonly apiKey: string,
@@ -243,6 +246,7 @@ class OpenAIRealtimeSTTSession implements RealtimeSTTSession {
case "input_audio_buffer.speech_started":
console.log("[RealtimeSTT] Speech started");
this.pendingTranscript = "";
this.onSpeechStartCallback?.();
break;
case "error":
@@ -273,6 +277,10 @@ class OpenAIRealtimeSTTSession implements RealtimeSTTSession {
this.onTranscriptCallback = callback;
}
onSpeechStart(callback: () => void): void {
this.onSpeechStartCallback = callback;
}
async waitForTranscript(timeoutMs = 30000): Promise<string> {
return new Promise((resolve, reject) => {
const timeout = setTimeout(() => {

View File

@@ -135,6 +135,17 @@ export class TwilioProvider implements VoiceCallProvider {
this.callStreamMap.delete(callSid);
}
/**
* Clear TTS queue for a call (barge-in).
* Used when user starts speaking to interrupt current TTS playback.
*/
clearTtsQueue(callSid: string): void {
const streamSid = this.callStreamMap.get(callSid);
if (streamSid && this.mediaStreamHandler) {
this.mediaStreamHandler.clearTtsQueue(streamSid);
}
}
/**
* Make an authenticated request to the Twilio API.
*/
@@ -504,7 +515,7 @@ export class TwilioProvider implements VoiceCallProvider {
/**
* Play TTS via core TTS and Twilio Media Streams.
* Generates audio with core TTS, converts to mu-law, and streams via WebSocket.
* Uses a jitter buffer to smooth out timing variations.
* Uses a queue to serialize playback and prevent overlapping audio.
*/
private async playTtsViaStream(
text: string,
@@ -514,22 +525,29 @@ export class TwilioProvider implements VoiceCallProvider {
throw new Error("TTS provider and media stream handler required");
}
// Generate audio with core TTS (returns mu-law at 8kHz)
const muLawAudio = await this.ttsProvider.synthesizeForTelephony(text);
// Stream audio in 20ms chunks (160 bytes at 8kHz mu-law)
const CHUNK_SIZE = 160;
const CHUNK_DELAY_MS = 20;
for (const chunk of chunkAudio(muLawAudio, CHUNK_SIZE)) {
this.mediaStreamHandler.sendAudio(streamSid, chunk);
const handler = this.mediaStreamHandler;
const ttsProvider = this.ttsProvider;
await handler.queueTts(streamSid, async (signal) => {
// Generate audio with core TTS (returns mu-law at 8kHz)
const muLawAudio = await ttsProvider.synthesizeForTelephony(text);
for (const chunk of chunkAudio(muLawAudio, CHUNK_SIZE)) {
if (signal.aborted) break;
handler.sendAudio(streamSid, chunk);
// Pace the audio to match real-time playback
await new Promise((resolve) => setTimeout(resolve, CHUNK_DELAY_MS));
}
// Pace the audio to match real-time playback
await new Promise((resolve) => setTimeout(resolve, CHUNK_DELAY_MS));
if (signal.aborted) break;
}
// Send a mark to track when audio finishes
this.mediaStreamHandler.sendMark(streamSid, `tts-${Date.now()}`);
if (!signal.aborted) {
// Send a mark to track when audio finishes
handler.sendMark(streamSid, `tts-${Date.now()}`);
}
});
}
/**

View File

@@ -78,6 +78,11 @@ export class VoiceCallWebhookServer {
`[voice-call] Transcript for ${providerCallId}: ${transcript}`,
);
// Clear TTS queue on barge-in (user started speaking, interrupt current playback)
if (this.provider.name === "twilio") {
(this.provider as TwilioProvider).clearTtsQueue(providerCallId);
}
// Look up our internal call ID from the provider call ID
const call = this.manager.getCallByProviderCallId(providerCallId);
if (!call) {
@@ -109,6 +114,11 @@ export class VoiceCallWebhookServer {
});
}
},
onSpeechStart: (providerCallId) => {
if (this.provider.name === "twilio") {
(this.provider as TwilioProvider).clearTtsQueue(providerCallId);
}
},
onPartialTranscript: (callId, partial) => {
console.log(`[voice-call] Partial for ${callId}: ${partial}`);
},