Files
clawdbot/extensions/voice-call/src/webhook.ts
Dan Guido 101d0f451f fix(voice-call): prevent audio overlap with TTS queue (#1713)
* fix(voice-call): prevent audio overlap with TTS queue

Add a TTS queue to serialize audio playback and prevent overlapping
speech during voice calls. Previously, concurrent speak() calls could
send audio chunks simultaneously, causing garbled/choppy output.

Changes:
- Add queueTts() to MediaStreamHandler for sequential TTS playback
- Wrap playTtsViaStream() audio sending in the queue
- Clear queue on barge-in (when user starts speaking)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

* fix(voice-call): use iterative queue processing to prevent heap exhaustion

The recursive processQueue() pattern accumulated stack frames, causing
JavaScript heap out of memory errors on macOS CI. Convert to while loop
for constant stack usage regardless of queue depth.

* fix: prevent voice-call TTS overlap (#1713) (thanks @dguido)

---------

Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>
Co-authored-by: Peter Steinberger <steipete@gmail.com>
2026-01-25 12:02:17 +00:00

491 lines
14 KiB
TypeScript

import { spawn } from "node:child_process";
import http from "node:http";
import { URL } from "node:url";
import type { VoiceCallConfig } from "./config.js";
import type { CoreConfig } from "./core-bridge.js";
import type { CallManager } from "./manager.js";
import type { MediaStreamConfig } from "./media-stream.js";
import { MediaStreamHandler } from "./media-stream.js";
import type { VoiceCallProvider } from "./providers/base.js";
import { OpenAIRealtimeSTTProvider } from "./providers/stt-openai-realtime.js";
import type { TwilioProvider } from "./providers/twilio.js";
import type { NormalizedEvent, WebhookContext } from "./types.js";
/**
* HTTP server for receiving voice call webhooks from providers.
* Supports WebSocket upgrades for media streams when streaming is enabled.
*/
export class VoiceCallWebhookServer {
private server: http.Server | null = null;
private config: VoiceCallConfig;
private manager: CallManager;
private provider: VoiceCallProvider;
private coreConfig: CoreConfig | null;
/** Media stream handler for bidirectional audio (when streaming enabled) */
private mediaStreamHandler: MediaStreamHandler | null = null;
constructor(
config: VoiceCallConfig,
manager: CallManager,
provider: VoiceCallProvider,
coreConfig?: CoreConfig,
) {
this.config = config;
this.manager = manager;
this.provider = provider;
this.coreConfig = coreConfig ?? null;
// Initialize media stream handler if streaming is enabled
if (config.streaming?.enabled) {
this.initializeMediaStreaming();
}
}
/**
* Get the media stream handler (for wiring to provider).
*/
getMediaStreamHandler(): MediaStreamHandler | null {
return this.mediaStreamHandler;
}
/**
* Initialize media streaming with OpenAI Realtime STT.
*/
private initializeMediaStreaming(): void {
const apiKey =
this.config.streaming?.openaiApiKey || process.env.OPENAI_API_KEY;
if (!apiKey) {
console.warn(
"[voice-call] Streaming enabled but no OpenAI API key found",
);
return;
}
const sttProvider = new OpenAIRealtimeSTTProvider({
apiKey,
model: this.config.streaming?.sttModel,
silenceDurationMs: this.config.streaming?.silenceDurationMs,
vadThreshold: this.config.streaming?.vadThreshold,
});
const streamConfig: MediaStreamConfig = {
sttProvider,
onTranscript: (providerCallId, transcript) => {
console.log(
`[voice-call] Transcript for ${providerCallId}: ${transcript}`,
);
// Clear TTS queue on barge-in (user started speaking, interrupt current playback)
if (this.provider.name === "twilio") {
(this.provider as TwilioProvider).clearTtsQueue(providerCallId);
}
// Look up our internal call ID from the provider call ID
const call = this.manager.getCallByProviderCallId(providerCallId);
if (!call) {
console.warn(
`[voice-call] No active call found for provider ID: ${providerCallId}`,
);
return;
}
// Create a speech event and process it through the manager
const event: NormalizedEvent = {
id: `stream-transcript-${Date.now()}`,
type: "call.speech",
callId: call.callId,
providerCallId,
timestamp: Date.now(),
transcript,
isFinal: true,
};
this.manager.processEvent(event);
// Auto-respond in conversation mode (inbound always, outbound if mode is conversation)
const callMode = call.metadata?.mode as string | undefined;
const shouldRespond =
call.direction === "inbound" || callMode === "conversation";
if (shouldRespond) {
this.handleInboundResponse(call.callId, transcript).catch((err) => {
console.warn(`[voice-call] Failed to auto-respond:`, err);
});
}
},
onSpeechStart: (providerCallId) => {
if (this.provider.name === "twilio") {
(this.provider as TwilioProvider).clearTtsQueue(providerCallId);
}
},
onPartialTranscript: (callId, partial) => {
console.log(`[voice-call] Partial for ${callId}: ${partial}`);
},
onConnect: (callId, streamSid) => {
console.log(
`[voice-call] Media stream connected: ${callId} -> ${streamSid}`,
);
// Register stream with provider for TTS routing
if (this.provider.name === "twilio") {
(this.provider as TwilioProvider).registerCallStream(
callId,
streamSid,
);
}
// Speak initial message if one was provided when call was initiated
// Use setTimeout to allow stream setup to complete
setTimeout(() => {
this.manager.speakInitialMessage(callId).catch((err) => {
console.warn(`[voice-call] Failed to speak initial message:`, err);
});
}, 500);
},
onDisconnect: (callId) => {
console.log(`[voice-call] Media stream disconnected: ${callId}`);
if (this.provider.name === "twilio") {
(this.provider as TwilioProvider).unregisterCallStream(callId);
}
},
};
this.mediaStreamHandler = new MediaStreamHandler(streamConfig);
console.log("[voice-call] Media streaming initialized");
}
/**
* Start the webhook server.
*/
async start(): Promise<string> {
const { port, bind, path: webhookPath } = this.config.serve;
const streamPath = this.config.streaming?.streamPath || "/voice/stream";
return new Promise((resolve, reject) => {
this.server = http.createServer((req, res) => {
this.handleRequest(req, res, webhookPath).catch((err) => {
console.error("[voice-call] Webhook error:", err);
res.statusCode = 500;
res.end("Internal Server Error");
});
});
// Handle WebSocket upgrades for media streams
if (this.mediaStreamHandler) {
this.server.on("upgrade", (request, socket, head) => {
const url = new URL(
request.url || "/",
`http://${request.headers.host}`,
);
if (url.pathname === streamPath) {
console.log("[voice-call] WebSocket upgrade for media stream");
this.mediaStreamHandler?.handleUpgrade(request, socket, head);
} else {
socket.destroy();
}
});
}
this.server.on("error", reject);
this.server.listen(port, bind, () => {
const url = `http://${bind}:${port}${webhookPath}`;
console.log(`[voice-call] Webhook server listening on ${url}`);
if (this.mediaStreamHandler) {
console.log(
`[voice-call] Media stream WebSocket on ws://${bind}:${port}${streamPath}`,
);
}
resolve(url);
});
});
}
/**
* Stop the webhook server.
*/
async stop(): Promise<void> {
return new Promise((resolve) => {
if (this.server) {
this.server.close(() => {
this.server = null;
resolve();
});
} else {
resolve();
}
});
}
/**
* Handle incoming HTTP request.
*/
private async handleRequest(
req: http.IncomingMessage,
res: http.ServerResponse,
webhookPath: string,
): Promise<void> {
const url = new URL(req.url || "/", `http://${req.headers.host}`);
// Check path
if (!url.pathname.startsWith(webhookPath)) {
res.statusCode = 404;
res.end("Not Found");
return;
}
// Only accept POST
if (req.method !== "POST") {
res.statusCode = 405;
res.end("Method Not Allowed");
return;
}
// Read body
const body = await this.readBody(req);
// Build webhook context
const ctx: WebhookContext = {
headers: req.headers as Record<string, string | string[] | undefined>,
rawBody: body,
url: `http://${req.headers.host}${req.url}`,
method: "POST",
query: Object.fromEntries(url.searchParams),
};
// Verify signature
const verification = this.provider.verifyWebhook(ctx);
if (!verification.ok) {
console.warn(
`[voice-call] Webhook verification failed: ${verification.reason}`,
);
res.statusCode = 401;
res.end("Unauthorized");
return;
}
// Parse events
const result = this.provider.parseWebhookEvent(ctx);
// Process each event
for (const event of result.events) {
try {
this.manager.processEvent(event);
} catch (err) {
console.error(
`[voice-call] Error processing event ${event.type}:`,
err,
);
}
}
// Send response
res.statusCode = result.statusCode || 200;
if (result.providerResponseHeaders) {
for (const [key, value] of Object.entries(
result.providerResponseHeaders,
)) {
res.setHeader(key, value);
}
}
res.end(result.providerResponseBody || "OK");
}
/**
* Read request body as string.
*/
private readBody(req: http.IncomingMessage): Promise<string> {
return new Promise((resolve, reject) => {
const chunks: Buffer[] = [];
req.on("data", (chunk) => chunks.push(chunk));
req.on("end", () => resolve(Buffer.concat(chunks).toString("utf-8")));
req.on("error", reject);
});
}
/**
* Handle auto-response for inbound calls using the agent system.
* Supports tool calling for richer voice interactions.
*/
private async handleInboundResponse(
callId: string,
userMessage: string,
): Promise<void> {
console.log(
`[voice-call] Auto-responding to inbound call ${callId}: "${userMessage}"`,
);
// Get call context for conversation history
const call = this.manager.getCall(callId);
if (!call) {
console.warn(`[voice-call] Call ${callId} not found for auto-response`);
return;
}
if (!this.coreConfig) {
console.warn("[voice-call] Core config missing; skipping auto-response");
return;
}
try {
const { generateVoiceResponse } = await import("./response-generator.js");
const result = await generateVoiceResponse({
voiceConfig: this.config,
coreConfig: this.coreConfig,
callId,
from: call.from,
transcript: call.transcript,
userMessage,
});
if (result.error) {
console.error(
`[voice-call] Response generation error: ${result.error}`,
);
return;
}
if (result.text) {
console.log(`[voice-call] AI response: "${result.text}"`);
await this.manager.speak(callId, result.text);
}
} catch (err) {
console.error(`[voice-call] Auto-response error:`, err);
}
}
}
/**
* Resolve the current machine's Tailscale DNS name.
*/
export type TailscaleSelfInfo = {
dnsName: string | null;
nodeId: string | null;
};
/**
* Run a tailscale command with timeout, collecting stdout.
*/
function runTailscaleCommand(
args: string[],
timeoutMs = 2500,
): Promise<{ code: number; stdout: string }> {
return new Promise((resolve) => {
const proc = spawn("tailscale", args, {
stdio: ["ignore", "pipe", "pipe"],
});
let stdout = "";
proc.stdout.on("data", (data) => {
stdout += data;
});
const timer = setTimeout(() => {
proc.kill("SIGKILL");
resolve({ code: -1, stdout: "" });
}, timeoutMs);
proc.on("close", (code) => {
clearTimeout(timer);
resolve({ code: code ?? -1, stdout });
});
});
}
export async function getTailscaleSelfInfo(): Promise<TailscaleSelfInfo | null> {
const { code, stdout } = await runTailscaleCommand(["status", "--json"]);
if (code !== 0) return null;
try {
const status = JSON.parse(stdout);
return {
dnsName: status.Self?.DNSName?.replace(/\.$/, "") || null,
nodeId: status.Self?.ID || null,
};
} catch {
return null;
}
}
export async function getTailscaleDnsName(): Promise<string | null> {
const info = await getTailscaleSelfInfo();
return info?.dnsName ?? null;
}
export async function setupTailscaleExposureRoute(opts: {
mode: "serve" | "funnel";
path: string;
localUrl: string;
}): Promise<string | null> {
const dnsName = await getTailscaleDnsName();
if (!dnsName) {
console.warn("[voice-call] Could not get Tailscale DNS name");
return null;
}
const { code } = await runTailscaleCommand([
opts.mode,
"--bg",
"--yes",
"--set-path",
opts.path,
opts.localUrl,
]);
if (code === 0) {
const publicUrl = `https://${dnsName}${opts.path}`;
console.log(`[voice-call] Tailscale ${opts.mode} active: ${publicUrl}`);
return publicUrl;
}
console.warn(`[voice-call] Tailscale ${opts.mode} failed`);
return null;
}
export async function cleanupTailscaleExposureRoute(opts: {
mode: "serve" | "funnel";
path: string;
}): Promise<void> {
await runTailscaleCommand([opts.mode, "off", opts.path]);
}
/**
* Setup Tailscale serve/funnel for the webhook server.
* This is a helper that shells out to `tailscale serve` or `tailscale funnel`.
*/
export async function setupTailscaleExposure(
config: VoiceCallConfig,
): Promise<string | null> {
if (config.tailscale.mode === "off") {
return null;
}
const mode = config.tailscale.mode === "funnel" ? "funnel" : "serve";
// Include the path suffix so tailscale forwards to the correct endpoint
// (tailscale strips the mount path prefix when proxying)
const localUrl = `http://127.0.0.1:${config.serve.port}${config.serve.path}`;
return setupTailscaleExposureRoute({
mode,
path: config.tailscale.path,
localUrl,
});
}
/**
* Cleanup Tailscale serve/funnel.
*/
export async function cleanupTailscaleExposure(
config: VoiceCallConfig,
): Promise<void> {
if (config.tailscale.mode === "off") {
return;
}
const mode = config.tailscale.mode === "funnel" ? "funnel" : "serve";
await cleanupTailscaleExposureRoute({ mode, path: config.tailscale.path });
}