/** * Media Stream Handler * * Handles bidirectional audio streaming between Twilio and the AI services. * - Receives mu-law audio from Twilio via WebSocket * - Forwards to OpenAI Realtime STT for transcription * - Sends TTS audio back to Twilio */ import type { IncomingMessage } from "node:http"; import type { Duplex } from "node:stream"; import { WebSocket, WebSocketServer } from "ws"; import type { OpenAIRealtimeSTTProvider, RealtimeSTTSession, } from "./providers/stt-openai-realtime.js"; /** * Configuration for the media stream handler. */ export interface MediaStreamConfig { /** STT provider for transcription */ sttProvider: OpenAIRealtimeSTTProvider; /** Callback when transcript is received */ onTranscript?: (callId: string, transcript: string) => void; /** Callback for partial transcripts (streaming UI) */ onPartialTranscript?: (callId: string, partial: string) => void; /** Callback when stream connects */ onConnect?: (callId: string, streamSid: string) => void; /** Callback when stream disconnects */ onDisconnect?: (callId: string) => void; } /** * Active media stream session. */ interface StreamSession { callId: string; streamSid: string; ws: WebSocket; sttSession: RealtimeSTTSession; } /** * Manages WebSocket connections for Twilio media streams. */ export class MediaStreamHandler { private wss: WebSocketServer | null = null; private sessions = new Map(); private config: MediaStreamConfig; constructor(config: MediaStreamConfig) { this.config = config; } /** * Handle WebSocket upgrade for media stream connections. */ handleUpgrade(request: IncomingMessage, socket: Duplex, head: Buffer): void { if (!this.wss) { this.wss = new WebSocketServer({ noServer: true }); this.wss.on("connection", (ws, req) => this.handleConnection(ws, req)); } this.wss.handleUpgrade(request, socket, head, (ws) => { this.wss?.emit("connection", ws, request); }); } /** * Handle new WebSocket connection from Twilio. */ private async handleConnection( ws: WebSocket, _request: IncomingMessage, ): Promise { let session: StreamSession | null = null; ws.on("message", async (data: Buffer) => { try { const message = JSON.parse(data.toString()) as TwilioMediaMessage; switch (message.event) { case "connected": console.log("[MediaStream] Twilio connected"); break; case "start": session = await this.handleStart(ws, message); break; case "media": if (session && message.media?.payload) { // Forward audio to STT const audioBuffer = Buffer.from(message.media.payload, "base64"); session.sttSession.sendAudio(audioBuffer); } break; case "stop": if (session) { this.handleStop(session); session = null; } break; } } catch (error) { console.error("[MediaStream] Error processing message:", error); } }); ws.on("close", () => { if (session) { this.handleStop(session); } }); ws.on("error", (error) => { console.error("[MediaStream] WebSocket error:", error); }); } /** * Handle stream start event. */ private async handleStart( ws: WebSocket, message: TwilioMediaMessage, ): Promise { const streamSid = message.streamSid || ""; const callSid = message.start?.callSid || ""; console.log( `[MediaStream] Stream started: ${streamSid} (call: ${callSid})`, ); // Create STT session const sttSession = this.config.sttProvider.createSession(); // Set up transcript callbacks sttSession.onPartial((partial) => { this.config.onPartialTranscript?.(callSid, partial); }); sttSession.onTranscript((transcript) => { this.config.onTranscript?.(callSid, transcript); }); const session: StreamSession = { callId: callSid, streamSid, ws, sttSession, }; this.sessions.set(streamSid, session); // Notify connection BEFORE STT connect so TTS can work even if STT fails this.config.onConnect?.(callSid, streamSid); // Connect to OpenAI STT (non-blocking, log errors but don't fail the call) sttSession.connect().catch((err) => { console.warn( `[MediaStream] STT connection failed (TTS still works):`, err.message, ); }); return session; } /** * Handle stream stop event. */ private handleStop(session: StreamSession): void { console.log(`[MediaStream] Stream stopped: ${session.streamSid}`); session.sttSession.close(); this.sessions.delete(session.streamSid); this.config.onDisconnect?.(session.callId); } /** * Get an active session with an open WebSocket, or undefined if unavailable. */ private getOpenSession(streamSid: string): StreamSession | undefined { const session = this.sessions.get(streamSid); return session?.ws.readyState === WebSocket.OPEN ? session : undefined; } /** * Send a message to a stream's WebSocket if available. */ private sendToStream(streamSid: string, message: unknown): void { const session = this.getOpenSession(streamSid); session?.ws.send(JSON.stringify(message)); } /** * Send audio to a specific stream (for TTS playback). * Audio should be mu-law encoded at 8kHz mono. */ sendAudio(streamSid: string, muLawAudio: Buffer): void { this.sendToStream(streamSid, { event: "media", streamSid, media: { payload: muLawAudio.toString("base64") }, }); } /** * Send a mark event to track audio playback position. */ sendMark(streamSid: string, name: string): void { this.sendToStream(streamSid, { event: "mark", streamSid, mark: { name }, }); } /** * Clear audio buffer (interrupt playback). */ clearAudio(streamSid: string): void { this.sendToStream(streamSid, { event: "clear", streamSid }); } /** * Get active session by call ID. */ getSessionByCallId(callId: string): StreamSession | undefined { return [...this.sessions.values()].find( (session) => session.callId === callId, ); } /** * Close all sessions. */ closeAll(): void { for (const session of this.sessions.values()) { session.sttSession.close(); session.ws.close(); } this.sessions.clear(); } } /** * Twilio Media Stream message format. */ interface TwilioMediaMessage { event: "connected" | "start" | "media" | "stop" | "mark" | "clear"; sequenceNumber?: string; streamSid?: string; start?: { streamSid: string; accountSid: string; callSid: string; tracks: string[]; mediaFormat: { encoding: string; sampleRate: number; channels: number; }; }; media?: { track?: string; chunk?: string; timestamp?: string; payload?: string; }; mark?: { name: string; }; }