feat: restore voice-call plugin parity

This commit is contained in:
Peter Steinberger
2026-01-12 21:40:22 +00:00
parent 3467b0ba07
commit 42c17adb5e
27 changed files with 6036 additions and 516 deletions

View File

@@ -0,0 +1,297 @@
import fs from "node:fs";
import os from "node:os";
import path from "node:path";
import type { Command } from "commander";
import type { VoiceCallConfig } from "./config.js";
import type { VoiceCallRuntime } from "./runtime.js";
import { resolveUserPath } from "./utils.js";
import {
cleanupTailscaleExposureRoute,
getTailscaleSelfInfo,
setupTailscaleExposureRoute,
} from "./webhook.js";
type Logger = {
info: (message: string) => void;
warn: (message: string) => void;
error: (message: string) => void;
};
function resolveMode(input: string): "off" | "serve" | "funnel" {
const raw = input.trim().toLowerCase();
if (raw === "serve" || raw === "off") return raw;
return "funnel";
}
function resolveDefaultStorePath(config: VoiceCallConfig): string {
const base =
config.store?.trim() || path.join(os.homedir(), "clawd", "voice-calls");
return path.join(resolveUserPath(base), "calls.jsonl");
}
function sleep(ms: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, ms));
}
export function registerVoiceCallCli(params: {
program: Command;
config: VoiceCallConfig;
ensureRuntime: () => Promise<VoiceCallRuntime>;
logger: Logger;
}) {
const { program, config, ensureRuntime, logger } = params;
const root = program.command("voicecall").description("Voice call utilities");
root
.command("call")
.description("Initiate an outbound voice call")
.requiredOption(
"-m, --message <text>",
"Message to speak when call connects",
)
.option(
"-t, --to <phone>",
"Phone number to call (E.164 format, uses config toNumber if not set)",
)
.option(
"--mode <mode>",
"Call mode: notify (hangup after message) or conversation (stay open)",
"conversation",
)
.action(
async (options: { message: string; to?: string; mode?: string }) => {
const rt = await ensureRuntime();
const to = options.to ?? rt.config.toNumber;
if (!to) {
throw new Error("Missing --to and no toNumber configured");
}
const result = await rt.manager.initiateCall(to, undefined, {
message: options.message,
mode:
options.mode === "notify" || options.mode === "conversation"
? options.mode
: undefined,
});
if (!result.success) {
throw new Error(result.error || "initiate failed");
}
// eslint-disable-next-line no-console
console.log(JSON.stringify({ callId: result.callId }, null, 2));
},
);
root
.command("start")
.description("Alias for voicecall call")
.requiredOption("--to <phone>", "Phone number to call")
.option("--message <text>", "Message to speak when call connects")
.option(
"--mode <mode>",
"Call mode: notify (hangup after message) or conversation (stay open)",
"conversation",
)
.action(
async (options: { to: string; message?: string; mode?: string }) => {
const rt = await ensureRuntime();
const result = await rt.manager.initiateCall(options.to, undefined, {
message: options.message,
mode:
options.mode === "notify" || options.mode === "conversation"
? options.mode
: undefined,
});
if (!result.success) {
throw new Error(result.error || "initiate failed");
}
// eslint-disable-next-line no-console
console.log(JSON.stringify({ callId: result.callId }, null, 2));
},
);
root
.command("continue")
.description("Speak a message and wait for a response")
.requiredOption("--call-id <id>", "Call ID")
.requiredOption("--message <text>", "Message to speak")
.action(async (options: { callId: string; message: string }) => {
const rt = await ensureRuntime();
const result = await rt.manager.continueCall(
options.callId,
options.message,
);
if (!result.success) {
throw new Error(result.error || "continue failed");
}
// eslint-disable-next-line no-console
console.log(JSON.stringify(result, null, 2));
});
root
.command("speak")
.description("Speak a message without waiting for response")
.requiredOption("--call-id <id>", "Call ID")
.requiredOption("--message <text>", "Message to speak")
.action(async (options: { callId: string; message: string }) => {
const rt = await ensureRuntime();
const result = await rt.manager.speak(options.callId, options.message);
if (!result.success) {
throw new Error(result.error || "speak failed");
}
// eslint-disable-next-line no-console
console.log(JSON.stringify(result, null, 2));
});
root
.command("end")
.description("Hang up an active call")
.requiredOption("--call-id <id>", "Call ID")
.action(async (options: { callId: string }) => {
const rt = await ensureRuntime();
const result = await rt.manager.endCall(options.callId);
if (!result.success) {
throw new Error(result.error || "end failed");
}
// eslint-disable-next-line no-console
console.log(JSON.stringify(result, null, 2));
});
root
.command("status")
.description("Show call status")
.requiredOption("--call-id <id>", "Call ID")
.action(async (options: { callId: string }) => {
const rt = await ensureRuntime();
const call = rt.manager.getCall(options.callId);
// eslint-disable-next-line no-console
console.log(JSON.stringify(call ?? { found: false }, null, 2));
});
root
.command("tail")
.description(
"Tail voice-call JSONL logs (prints new lines; useful during provider tests)",
)
.option("--file <path>", "Path to calls.jsonl", resolveDefaultStorePath(config))
.option("--since <n>", "Print last N lines first", "25")
.option("--poll <ms>", "Poll interval in ms", "250")
.action(
async (options: { file: string; since?: string; poll?: string }) => {
const file = options.file;
const since = Math.max(0, Number(options.since ?? 0));
const pollMs = Math.max(50, Number(options.poll ?? 250));
if (!fs.existsSync(file)) {
logger.error(`No log file at ${file}`);
process.exit(1);
}
const initial = fs.readFileSync(file, "utf8");
const lines = initial.split("\n").filter(Boolean);
for (const line of lines.slice(Math.max(0, lines.length - since))) {
// eslint-disable-next-line no-console
console.log(line);
}
let offset = Buffer.byteLength(initial, "utf8");
for (;;) {
try {
const stat = fs.statSync(file);
if (stat.size < offset) {
offset = 0;
}
if (stat.size > offset) {
const fd = fs.openSync(file, "r");
try {
const buf = Buffer.alloc(stat.size - offset);
fs.readSync(fd, buf, 0, buf.length, offset);
offset = stat.size;
const text = buf.toString("utf8");
for (const line of text.split("\n").filter(Boolean)) {
// eslint-disable-next-line no-console
console.log(line);
}
} finally {
fs.closeSync(fd);
}
}
} catch {
// ignore and retry
}
await sleep(pollMs);
}
},
);
root
.command("expose")
.description("Enable/disable Tailscale serve/funnel for the webhook")
.option("--mode <mode>", "off | serve (tailnet) | funnel (public)", "funnel")
.option(
"--path <path>",
"Tailscale path to expose (recommend matching serve.path)",
)
.option("--port <port>", "Local webhook port")
.option("--serve-path <path>", "Local webhook path")
.action(
async (options: {
mode?: string;
port?: string;
path?: string;
servePath?: string;
}) => {
const mode = resolveMode(options.mode ?? "funnel");
const servePort = Number(options.port ?? config.serve.port ?? 3334);
const servePath = String(
options.servePath ?? config.serve.path ?? "/voice/webhook",
);
const tsPath = String(
options.path ?? config.tailscale?.path ?? servePath,
);
const localUrl = `http://127.0.0.1:${servePort}`;
if (mode === "off") {
await cleanupTailscaleExposureRoute({ mode: "serve", path: tsPath });
await cleanupTailscaleExposureRoute({ mode: "funnel", path: tsPath });
// eslint-disable-next-line no-console
console.log(JSON.stringify({ ok: true, mode: "off", path: tsPath }, null, 2));
return;
}
const publicUrl = await setupTailscaleExposureRoute({
mode,
path: tsPath,
localUrl,
});
const tsInfo = publicUrl ? null : await getTailscaleSelfInfo();
const enableUrl = tsInfo?.nodeId
? `https://login.tailscale.com/f/${mode}?node=${tsInfo.nodeId}`
: null;
// eslint-disable-next-line no-console
console.log(
JSON.stringify(
{
ok: Boolean(publicUrl),
mode,
path: tsPath,
localUrl,
publicUrl,
hint: publicUrl
? undefined
: {
note: "Tailscale serve/funnel may be disabled on this tailnet (or require admin enable).",
enableUrl,
},
},
null,
2,
),
);
},
);
}

View File

@@ -0,0 +1,355 @@
import { z } from "zod";
// -----------------------------------------------------------------------------
// Phone Number Validation
// -----------------------------------------------------------------------------
/**
* E.164 phone number format: +[country code][number]
* Examples use 555 prefix (reserved for fictional numbers)
*/
export const E164Schema = z
.string()
.regex(/^\+[1-9]\d{1,14}$/, "Expected E.164 format, e.g. +15550001234");
// -----------------------------------------------------------------------------
// Inbound Policy
// -----------------------------------------------------------------------------
/**
* Controls how inbound calls are handled:
* - "disabled": Block all inbound calls (outbound only)
* - "allowlist": Only accept calls from numbers in allowFrom
* - "pairing": Unknown callers can request pairing (future)
* - "open": Accept all inbound calls (dangerous!)
*/
export const InboundPolicySchema = z.enum([
"disabled",
"allowlist",
"pairing",
"open",
]);
export type InboundPolicy = z.infer<typeof InboundPolicySchema>;
// -----------------------------------------------------------------------------
// Provider-Specific Configuration
// -----------------------------------------------------------------------------
export const TelnyxConfigSchema = z.object({
/** Telnyx API v2 key */
apiKey: z.string().min(1).optional(),
/** Telnyx connection ID (from Call Control app) */
connectionId: z.string().min(1).optional(),
/** Public key for webhook signature verification */
publicKey: z.string().min(1).optional(),
});
export type TelnyxConfig = z.infer<typeof TelnyxConfigSchema>;
export const TwilioConfigSchema = z.object({
/** Twilio Account SID */
accountSid: z.string().min(1).optional(),
/** Twilio Auth Token */
authToken: z.string().min(1).optional(),
});
export type TwilioConfig = z.infer<typeof TwilioConfigSchema>;
// -----------------------------------------------------------------------------
// STT/TTS Configuration
// -----------------------------------------------------------------------------
export const SttConfigSchema = z
.object({
/** STT provider (currently only OpenAI supported) */
provider: z.literal("openai").default("openai"),
/** Whisper model to use */
model: z.string().min(1).default("whisper-1"),
})
.default({ provider: "openai", model: "whisper-1" });
export type SttConfig = z.infer<typeof SttConfigSchema>;
export const TtsConfigSchema = z
.object({
/** TTS provider (currently only OpenAI supported) */
provider: z.literal("openai").default("openai"),
/**
* TTS model to use:
* - gpt-4o-mini-tts: newest, supports instructions for tone/style control (recommended)
* - tts-1: lower latency
* - tts-1-hd: higher quality
*/
model: z.string().min(1).default("gpt-4o-mini-tts"),
/**
* Voice ID. For best quality, use marin or cedar.
* All voices: alloy, ash, ballad, coral, echo, fable, nova, onyx, sage, shimmer, verse, marin, cedar
*/
voice: z.string().min(1).default("coral"),
/**
* Instructions for speech style (only works with gpt-4o-mini-tts).
* Examples: "Speak in a cheerful tone", "Talk like a sympathetic customer service agent"
*/
instructions: z.string().optional(),
})
.default({ provider: "openai", model: "gpt-4o-mini-tts", voice: "coral" });
export type TtsConfig = z.infer<typeof TtsConfigSchema>;
// -----------------------------------------------------------------------------
// Webhook Server Configuration
// -----------------------------------------------------------------------------
export const VoiceCallServeConfigSchema = z
.object({
/** Port to listen on */
port: z.number().int().positive().default(3334),
/** Bind address */
bind: z.string().default("127.0.0.1"),
/** Webhook path */
path: z.string().min(1).default("/voice/webhook"),
})
.default({ port: 3334, bind: "127.0.0.1", path: "/voice/webhook" });
export type VoiceCallServeConfig = z.infer<typeof VoiceCallServeConfigSchema>;
export const VoiceCallTailscaleConfigSchema = z
.object({
/**
* Tailscale exposure mode:
* - "off": No Tailscale exposure
* - "serve": Tailscale serve (private to tailnet)
* - "funnel": Tailscale funnel (public HTTPS)
*/
mode: z.enum(["off", "serve", "funnel"]).default("off"),
/** Path for Tailscale serve/funnel (should usually match serve.path) */
path: z.string().min(1).default("/voice/webhook"),
})
.default({ mode: "off", path: "/voice/webhook" });
export type VoiceCallTailscaleConfig = z.infer<
typeof VoiceCallTailscaleConfigSchema
>;
// -----------------------------------------------------------------------------
// Tunnel Configuration (unified ngrok/tailscale)
// -----------------------------------------------------------------------------
export const VoiceCallTunnelConfigSchema = z
.object({
/**
* Tunnel provider:
* - "none": No tunnel (use publicUrl if set, or manual setup)
* - "ngrok": Use ngrok for public HTTPS tunnel
* - "tailscale-serve": Tailscale serve (private to tailnet)
* - "tailscale-funnel": Tailscale funnel (public HTTPS)
*/
provider: z
.enum(["none", "ngrok", "tailscale-serve", "tailscale-funnel"])
.default("none"),
/** ngrok auth token (optional, enables longer sessions and more features) */
ngrokAuthToken: z.string().min(1).optional(),
/** ngrok custom domain (paid feature, e.g., "myapp.ngrok.io") */
ngrokDomain: z.string().min(1).optional(),
/**
* Allow ngrok free tier compatibility mode.
* When true, signature verification failures on ngrok-free.app URLs
* will be logged but allowed through. Less secure, but necessary
* for ngrok free tier which may modify URLs.
*/
allowNgrokFreeTier: z.boolean().default(true),
})
.default({ provider: "none", allowNgrokFreeTier: true });
export type VoiceCallTunnelConfig = z.infer<typeof VoiceCallTunnelConfigSchema>;
// -----------------------------------------------------------------------------
// Outbound Call Configuration
// -----------------------------------------------------------------------------
/**
* Call mode determines how outbound calls behave:
* - "notify": Deliver message and auto-hangup after delay (one-way notification)
* - "conversation": Stay open for back-and-forth until explicit end or timeout
*/
export const CallModeSchema = z.enum(["notify", "conversation"]);
export type CallMode = z.infer<typeof CallModeSchema>;
export const OutboundConfigSchema = z
.object({
/** Default call mode for outbound calls */
defaultMode: CallModeSchema.default("notify"),
/** Seconds to wait after TTS before auto-hangup in notify mode */
notifyHangupDelaySec: z.number().int().nonnegative().default(3),
})
.default({ defaultMode: "notify", notifyHangupDelaySec: 3 });
export type OutboundConfig = z.infer<typeof OutboundConfigSchema>;
// -----------------------------------------------------------------------------
// Streaming Configuration (OpenAI Realtime STT)
// -----------------------------------------------------------------------------
export const VoiceCallStreamingConfigSchema = z
.object({
/** Enable real-time audio streaming (requires WebSocket support) */
enabled: z.boolean().default(false),
/** STT provider for real-time transcription */
sttProvider: z.enum(["openai-realtime"]).default("openai-realtime"),
/** OpenAI API key for Realtime API (uses OPENAI_API_KEY env if not set) */
openaiApiKey: z.string().min(1).optional(),
/** OpenAI transcription model (default: gpt-4o-transcribe) */
sttModel: z.string().min(1).default("gpt-4o-transcribe"),
/** VAD silence duration in ms before considering speech ended */
silenceDurationMs: z.number().int().positive().default(800),
/** VAD threshold 0-1 (higher = less sensitive) */
vadThreshold: z.number().min(0).max(1).default(0.5),
/** WebSocket path for media stream connections */
streamPath: z.string().min(1).default("/voice/stream"),
})
.default({
enabled: false,
sttProvider: "openai-realtime",
sttModel: "gpt-4o-transcribe",
silenceDurationMs: 800,
vadThreshold: 0.5,
streamPath: "/voice/stream",
});
export type VoiceCallStreamingConfig = z.infer<
typeof VoiceCallStreamingConfigSchema
>;
// -----------------------------------------------------------------------------
// Main Voice Call Configuration
// -----------------------------------------------------------------------------
export const VoiceCallConfigSchema = z.object({
/** Enable voice call functionality */
enabled: z.boolean().default(false),
/** Active provider (telnyx, twilio, or mock) */
provider: z.enum(["telnyx", "twilio", "mock"]).optional(),
/** Telnyx-specific configuration */
telnyx: TelnyxConfigSchema.optional(),
/** Twilio-specific configuration */
twilio: TwilioConfigSchema.optional(),
/** Phone number to call from (E.164) */
fromNumber: E164Schema.optional(),
/** Default phone number to call (E.164) */
toNumber: E164Schema.optional(),
/** Inbound call policy */
inboundPolicy: InboundPolicySchema.default("disabled"),
/** Allowlist of phone numbers for inbound calls (E.164) */
allowFrom: z.array(E164Schema).default([]),
/** Greeting message for inbound calls */
inboundGreeting: z.string().optional(),
/** Outbound call configuration */
outbound: OutboundConfigSchema,
/** Maximum call duration in seconds */
maxDurationSeconds: z.number().int().positive().default(300),
/** Silence timeout for end-of-speech detection (ms) */
silenceTimeoutMs: z.number().int().positive().default(800),
/** Timeout for user transcript (ms) */
transcriptTimeoutMs: z.number().int().positive().default(180000),
/** Ring timeout for outbound calls (ms) */
ringTimeoutMs: z.number().int().positive().default(30000),
/** Maximum concurrent calls */
maxConcurrentCalls: z.number().int().positive().default(1),
/** Webhook server configuration */
serve: VoiceCallServeConfigSchema,
/** Tailscale exposure configuration (legacy, prefer tunnel config) */
tailscale: VoiceCallTailscaleConfigSchema,
/** Tunnel configuration (unified ngrok/tailscale) */
tunnel: VoiceCallTunnelConfigSchema,
/** Real-time audio streaming configuration */
streaming: VoiceCallStreamingConfigSchema,
/** Public webhook URL override (if set, bypasses tunnel auto-detection) */
publicUrl: z.string().url().optional(),
/** Skip webhook signature verification (development only, NOT for production) */
skipSignatureVerification: z.boolean().default(false),
/** STT configuration */
stt: SttConfigSchema,
/** TTS configuration */
tts: TtsConfigSchema,
/** Store path for call logs */
store: z.string().optional(),
/** Model for generating voice responses (e.g., "anthropic/claude-sonnet-4", "openai/gpt-4o") */
responseModel: z.string().default("openai/gpt-4o-mini"),
/** System prompt for voice responses */
responseSystemPrompt: z.string().optional(),
/** Timeout for response generation in ms (default 30s) */
responseTimeoutMs: z.number().int().positive().default(30000),
});
export type VoiceCallConfig = z.infer<typeof VoiceCallConfigSchema>;
// -----------------------------------------------------------------------------
// Configuration Helpers
// -----------------------------------------------------------------------------
/**
* Validate that the configuration has all required fields for the selected provider.
*/
export function validateProviderConfig(config: VoiceCallConfig): {
valid: boolean;
errors: string[];
} {
const errors: string[] = [];
if (!config.enabled) {
return { valid: true, errors: [] };
}
if (!config.provider) {
errors.push("plugins.entries.voice-call.config.provider is required");
}
if (!config.fromNumber && config.provider !== "mock") {
errors.push("plugins.entries.voice-call.config.fromNumber is required");
}
if (config.provider === "telnyx") {
if (!config.telnyx?.apiKey) {
errors.push(
"plugins.entries.voice-call.config.telnyx.apiKey is required (or set TELNYX_API_KEY env)",
);
}
if (!config.telnyx?.connectionId) {
errors.push(
"plugins.entries.voice-call.config.telnyx.connectionId is required (or set TELNYX_CONNECTION_ID env)",
);
}
}
if (config.provider === "twilio") {
if (!config.twilio?.accountSid) {
errors.push(
"plugins.entries.voice-call.config.twilio.accountSid is required (or set TWILIO_ACCOUNT_SID env)",
);
}
if (!config.twilio?.authToken) {
errors.push(
"plugins.entries.voice-call.config.twilio.authToken is required (or set TWILIO_AUTH_TOKEN env)",
);
}
}
return { valid: errors.length === 0, errors };
}

View File

@@ -0,0 +1,190 @@
import fs from "node:fs";
import path from "node:path";
import { fileURLToPath, pathToFileURL } from "node:url";
export type CoreConfig = {
session?: {
store?: string;
};
};
type CoreAgentDeps = {
resolveAgentDir: (cfg: CoreConfig, agentId: string) => string;
resolveAgentWorkspaceDir: (cfg: CoreConfig, agentId: string) => string;
resolveAgentIdentity: (
cfg: CoreConfig,
agentId: string,
) => { name?: string | null } | null | undefined;
resolveThinkingDefault: (params: {
cfg: CoreConfig;
provider?: string;
model?: string;
}) => string;
runEmbeddedPiAgent: (params: {
sessionId: string;
sessionKey?: string;
messageProvider?: string;
sessionFile: string;
workspaceDir: string;
config?: CoreConfig;
prompt: string;
provider?: string;
model?: string;
thinkLevel?: string;
verboseLevel?: string;
timeoutMs: number;
runId: string;
lane?: string;
extraSystemPrompt?: string;
agentDir?: string;
}) => Promise<{
payloads?: Array<{ text?: string; isError?: boolean }>;
meta?: { aborted?: boolean };
}>;
resolveAgentTimeoutMs: (opts: { cfg: CoreConfig }) => number;
ensureAgentWorkspace: (params?: { dir: string }) => Promise<void>;
resolveStorePath: (store?: string, opts?: { agentId?: string }) => string;
loadSessionStore: (storePath: string) => Record<string, unknown>;
saveSessionStore: (
storePath: string,
store: Record<string, unknown>,
) => Promise<void>;
resolveSessionFilePath: (
sessionId: string,
entry: unknown,
opts?: { agentId?: string },
) => string;
DEFAULT_MODEL: string;
DEFAULT_PROVIDER: string;
};
let coreRootCache: string | null = null;
let coreDepsPromise: Promise<CoreAgentDeps> | null = null;
function findPackageRoot(startDir: string, name: string): string | null {
let dir = startDir;
for (;;) {
const pkgPath = path.join(dir, "package.json");
try {
if (fs.existsSync(pkgPath)) {
const raw = fs.readFileSync(pkgPath, "utf8");
const pkg = JSON.parse(raw) as { name?: string };
if (pkg.name === name) return dir;
}
} catch {
// ignore parse errors and keep walking
}
const parent = path.dirname(dir);
if (parent === dir) return null;
dir = parent;
}
}
function resolveClawdbotRoot(): string {
if (coreRootCache) return coreRootCache;
const override = process.env.CLAWDBOT_ROOT?.trim();
if (override) {
coreRootCache = override;
return override;
}
const candidates = new Set<string>();
if (process.argv[1]) {
candidates.add(path.dirname(process.argv[1]));
}
candidates.add(process.cwd());
try {
const urlPath = fileURLToPath(import.meta.url);
candidates.add(path.dirname(urlPath));
} catch {
// ignore
}
for (const start of candidates) {
const found = findPackageRoot(start, "clawdbot");
if (found) {
coreRootCache = found;
return found;
}
}
throw new Error(
"Unable to resolve Clawdbot root. Set CLAWDBOT_ROOT to the package root.",
);
}
async function importCoreModule<T>(relativePath: string): Promise<T> {
const root = resolveClawdbotRoot();
const distPath = path.join(root, "dist", relativePath);
if (!fs.existsSync(distPath)) {
throw new Error(
`Missing core module at ${distPath}. Run \`pnpm build\` or install the official package.`,
);
}
return (await import(pathToFileURL(distPath).href)) as T;
}
export async function loadCoreAgentDeps(): Promise<CoreAgentDeps> {
if (coreDepsPromise) return coreDepsPromise;
coreDepsPromise = (async () => {
const [
agentScope,
defaults,
identity,
modelSelection,
piEmbedded,
timeout,
workspace,
sessions,
] = await Promise.all([
importCoreModule<{
resolveAgentDir: CoreAgentDeps["resolveAgentDir"];
resolveAgentWorkspaceDir: CoreAgentDeps["resolveAgentWorkspaceDir"];
}>("agents/agent-scope.js"),
importCoreModule<{
DEFAULT_MODEL: string;
DEFAULT_PROVIDER: string;
}>("agents/defaults.js"),
importCoreModule<{
resolveAgentIdentity: CoreAgentDeps["resolveAgentIdentity"];
}>("agents/identity.js"),
importCoreModule<{
resolveThinkingDefault: CoreAgentDeps["resolveThinkingDefault"];
}>("agents/model-selection.js"),
importCoreModule<{
runEmbeddedPiAgent: CoreAgentDeps["runEmbeddedPiAgent"];
}>("agents/pi-embedded.js"),
importCoreModule<{
resolveAgentTimeoutMs: CoreAgentDeps["resolveAgentTimeoutMs"];
}>("agents/timeout.js"),
importCoreModule<{
ensureAgentWorkspace: CoreAgentDeps["ensureAgentWorkspace"];
}>("agents/workspace.js"),
importCoreModule<{
resolveStorePath: CoreAgentDeps["resolveStorePath"];
loadSessionStore: CoreAgentDeps["loadSessionStore"];
saveSessionStore: CoreAgentDeps["saveSessionStore"];
resolveSessionFilePath: CoreAgentDeps["resolveSessionFilePath"];
}>("config/sessions.js"),
]);
return {
resolveAgentDir: agentScope.resolveAgentDir,
resolveAgentWorkspaceDir: agentScope.resolveAgentWorkspaceDir,
resolveAgentIdentity: identity.resolveAgentIdentity,
resolveThinkingDefault: modelSelection.resolveThinkingDefault,
runEmbeddedPiAgent: piEmbedded.runEmbeddedPiAgent,
resolveAgentTimeoutMs: timeout.resolveAgentTimeoutMs,
ensureAgentWorkspace: workspace.ensureAgentWorkspace,
resolveStorePath: sessions.resolveStorePath,
loadSessionStore: sessions.loadSessionStore,
saveSessionStore: sessions.saveSessionStore,
resolveSessionFilePath: sessions.resolveSessionFilePath,
DEFAULT_MODEL: defaults.DEFAULT_MODEL,
DEFAULT_PROVIDER: defaults.DEFAULT_PROVIDER,
};
})();
return coreDepsPromise;
}

View File

@@ -0,0 +1,846 @@
import crypto from "node:crypto";
import fs from "node:fs";
import fsp from "node:fs/promises";
import os from "node:os";
import path from "node:path";
import { resolveUserPath } from "./utils.js";
import type { CallMode, VoiceCallConfig } from "./config.js";
import type { VoiceCallProvider } from "./providers/base.js";
import {
type CallId,
type CallRecord,
CallRecordSchema,
type CallState,
type NormalizedEvent,
type OutboundCallOptions,
TerminalStates,
type TranscriptEntry,
} from "./types.js";
import { escapeXml, mapVoiceToPolly } from "./voice-mapping.js";
/**
* Manages voice calls: state machine, persistence, and provider coordination.
*/
export class CallManager {
private activeCalls = new Map<CallId, CallRecord>();
private providerCallIdMap = new Map<string, CallId>(); // providerCallId -> internal callId
private processedEventIds = new Set<string>();
private provider: VoiceCallProvider | null = null;
private config: VoiceCallConfig;
private storePath: string;
private webhookUrl: string | null = null;
private transcriptWaiters = new Map<
CallId,
{
resolve: (text: string) => void;
reject: (err: Error) => void;
timeout: NodeJS.Timeout;
}
>();
/** Max duration timers to auto-hangup calls after configured timeout */
private maxDurationTimers = new Map<CallId, NodeJS.Timeout>();
constructor(config: VoiceCallConfig, storePath?: string) {
this.config = config;
// Resolve store path with tilde expansion (like other config values)
const rawPath =
storePath ||
config.store ||
path.join(os.homedir(), "clawd", "voice-calls");
this.storePath = resolveUserPath(rawPath);
}
/**
* Initialize the call manager with a provider.
*/
initialize(provider: VoiceCallProvider, webhookUrl: string): void {
this.provider = provider;
this.webhookUrl = webhookUrl;
// Ensure store directory exists
fs.mkdirSync(this.storePath, { recursive: true });
// Load any persisted active calls
this.loadActiveCalls();
}
/**
* Get the current provider.
*/
getProvider(): VoiceCallProvider | null {
return this.provider;
}
/**
* Initiate an outbound call.
* @param to - The phone number to call
* @param sessionKey - Optional session key for context
* @param options - Optional call options (message, mode)
*/
async initiateCall(
to: string,
sessionKey?: string,
options?: OutboundCallOptions | string,
): Promise<{ callId: CallId; success: boolean; error?: string }> {
// Support legacy string argument for initialMessage
const opts: OutboundCallOptions =
typeof options === "string" ? { message: options } : (options ?? {});
const initialMessage = opts.message;
const mode = opts.mode ?? this.config.outbound.defaultMode;
if (!this.provider) {
return { callId: "", success: false, error: "Provider not initialized" };
}
if (!this.webhookUrl) {
return {
callId: "",
success: false,
error: "Webhook URL not configured",
};
}
// Check concurrent call limit
const activeCalls = this.getActiveCalls();
if (activeCalls.length >= this.config.maxConcurrentCalls) {
return {
callId: "",
success: false,
error: `Maximum concurrent calls (${this.config.maxConcurrentCalls}) reached`,
};
}
const callId = crypto.randomUUID();
const from =
this.config.fromNumber ||
(this.provider?.name === "mock" ? "+15550000000" : undefined);
if (!from) {
return { callId: "", success: false, error: "fromNumber not configured" };
}
// Create call record with mode in metadata
const callRecord: CallRecord = {
callId,
provider: this.provider.name,
direction: "outbound",
state: "initiated",
from,
to,
sessionKey,
startedAt: Date.now(),
transcript: [],
processedEventIds: [],
metadata: {
...(initialMessage && { initialMessage }),
mode,
},
};
this.activeCalls.set(callId, callRecord);
this.persistCallRecord(callRecord);
try {
// For notify mode with a message, use inline TwiML with <Say>
let inlineTwiml: string | undefined;
if (mode === "notify" && initialMessage) {
const pollyVoice = mapVoiceToPolly(this.config.tts.voice);
inlineTwiml = this.generateNotifyTwiml(initialMessage, pollyVoice);
console.log(
`[voice-call] Using inline TwiML for notify mode (voice: ${pollyVoice})`,
);
}
const result = await this.provider.initiateCall({
callId,
from,
to,
webhookUrl: this.webhookUrl,
inlineTwiml,
});
callRecord.providerCallId = result.providerCallId;
this.providerCallIdMap.set(result.providerCallId, callId); // Map providerCallId to internal callId
this.persistCallRecord(callRecord);
return { callId, success: true };
} catch (err) {
callRecord.state = "failed";
callRecord.endedAt = Date.now();
callRecord.endReason = "failed";
this.persistCallRecord(callRecord);
this.activeCalls.delete(callId);
if (callRecord.providerCallId) {
this.providerCallIdMap.delete(callRecord.providerCallId);
}
return {
callId,
success: false,
error: err instanceof Error ? err.message : String(err),
};
}
}
/**
* Speak to user in an active call.
*/
async speak(
callId: CallId,
text: string,
): Promise<{ success: boolean; error?: string }> {
const call = this.activeCalls.get(callId);
if (!call) {
return { success: false, error: "Call not found" };
}
if (!this.provider || !call.providerCallId) {
return { success: false, error: "Call not connected" };
}
if (TerminalStates.has(call.state)) {
return { success: false, error: "Call has ended" };
}
try {
// Update state
call.state = "speaking";
this.persistCallRecord(call);
// Add to transcript
this.addTranscriptEntry(call, "bot", text);
// Play TTS
await this.provider.playTts({
callId,
providerCallId: call.providerCallId,
text,
voice: this.config.tts.voice,
});
return { success: true };
} catch (err) {
return {
success: false,
error: err instanceof Error ? err.message : String(err),
};
}
}
/**
* Speak the initial message for a call (called when media stream connects).
* This is used to auto-play the message passed to initiateCall.
* In notify mode, auto-hangup after the message is delivered.
*/
async speakInitialMessage(providerCallId: string): Promise<void> {
const call = this.getCallByProviderCallId(providerCallId);
if (!call) {
console.warn(
`[voice-call] speakInitialMessage: no call found for ${providerCallId}`,
);
return;
}
const initialMessage = call.metadata?.initialMessage as string | undefined;
const mode = (call.metadata?.mode as CallMode) ?? "conversation";
if (!initialMessage) {
console.log(
`[voice-call] speakInitialMessage: no initial message for ${call.callId}`,
);
return;
}
// Clear the initial message so we don't speak it again
if (call.metadata) {
delete call.metadata.initialMessage;
this.persistCallRecord(call);
}
console.log(
`[voice-call] Speaking initial message for call ${call.callId} (mode: ${mode})`,
);
const result = await this.speak(call.callId, initialMessage);
if (!result.success) {
console.warn(
`[voice-call] Failed to speak initial message: ${result.error}`,
);
return;
}
// In notify mode, auto-hangup after delay
if (mode === "notify") {
const delaySec = this.config.outbound.notifyHangupDelaySec;
console.log(
`[voice-call] Notify mode: auto-hangup in ${delaySec}s for call ${call.callId}`,
);
setTimeout(async () => {
const currentCall = this.getCall(call.callId);
if (currentCall && !TerminalStates.has(currentCall.state)) {
console.log(
`[voice-call] Notify mode: hanging up call ${call.callId}`,
);
await this.endCall(call.callId);
}
}, delaySec * 1000);
}
}
/**
* Start max duration timer for a call.
* Auto-hangup when maxDurationSeconds is reached.
*/
private startMaxDurationTimer(callId: CallId): void {
// Clear any existing timer
this.clearMaxDurationTimer(callId);
const maxDurationMs = this.config.maxDurationSeconds * 1000;
console.log(
`[voice-call] Starting max duration timer (${this.config.maxDurationSeconds}s) for call ${callId}`,
);
const timer = setTimeout(async () => {
this.maxDurationTimers.delete(callId);
const call = this.getCall(callId);
if (call && !TerminalStates.has(call.state)) {
console.log(
`[voice-call] Max duration reached (${this.config.maxDurationSeconds}s), ending call ${callId}`,
);
call.endReason = "timeout";
this.persistCallRecord(call);
await this.endCall(callId);
}
}, maxDurationMs);
this.maxDurationTimers.set(callId, timer);
}
/**
* Clear max duration timer for a call.
*/
private clearMaxDurationTimer(callId: CallId): void {
const timer = this.maxDurationTimers.get(callId);
if (timer) {
clearTimeout(timer);
this.maxDurationTimers.delete(callId);
}
}
private clearTranscriptWaiter(callId: CallId): void {
const waiter = this.transcriptWaiters.get(callId);
if (!waiter) return;
clearTimeout(waiter.timeout);
this.transcriptWaiters.delete(callId);
}
private rejectTranscriptWaiter(callId: CallId, reason: string): void {
const waiter = this.transcriptWaiters.get(callId);
if (!waiter) return;
this.clearTranscriptWaiter(callId);
waiter.reject(new Error(reason));
}
private resolveTranscriptWaiter(callId: CallId, transcript: string): void {
const waiter = this.transcriptWaiters.get(callId);
if (!waiter) return;
this.clearTranscriptWaiter(callId);
waiter.resolve(transcript);
}
private waitForFinalTranscript(callId: CallId): Promise<string> {
// Only allow one in-flight waiter per call.
this.rejectTranscriptWaiter(callId, "Transcript waiter replaced");
const timeoutMs = this.config.transcriptTimeoutMs;
return new Promise((resolve, reject) => {
const timeout = setTimeout(() => {
this.transcriptWaiters.delete(callId);
reject(
new Error(`Timed out waiting for transcript after ${timeoutMs}ms`),
);
}, timeoutMs);
this.transcriptWaiters.set(callId, { resolve, reject, timeout });
});
}
/**
* Continue call: speak prompt, then wait for user's final transcript.
*/
async continueCall(
callId: CallId,
prompt: string,
): Promise<{ success: boolean; transcript?: string; error?: string }> {
const call = this.activeCalls.get(callId);
if (!call) {
return { success: false, error: "Call not found" };
}
if (!this.provider || !call.providerCallId) {
return { success: false, error: "Call not connected" };
}
if (TerminalStates.has(call.state)) {
return { success: false, error: "Call has ended" };
}
try {
await this.speak(callId, prompt);
call.state = "listening";
this.persistCallRecord(call);
await this.provider.startListening({
callId,
providerCallId: call.providerCallId,
});
const transcript = await this.waitForFinalTranscript(callId);
// Best-effort: stop listening after final transcript.
await this.provider.stopListening({
callId,
providerCallId: call.providerCallId,
});
return { success: true, transcript };
} catch (err) {
return {
success: false,
error: err instanceof Error ? err.message : String(err),
};
} finally {
this.clearTranscriptWaiter(callId);
}
}
/**
* End an active call.
*/
async endCall(callId: CallId): Promise<{ success: boolean; error?: string }> {
const call = this.activeCalls.get(callId);
if (!call) {
return { success: false, error: "Call not found" };
}
if (!this.provider || !call.providerCallId) {
return { success: false, error: "Call not connected" };
}
if (TerminalStates.has(call.state)) {
return { success: true }; // Already ended
}
try {
await this.provider.hangupCall({
callId,
providerCallId: call.providerCallId,
reason: "hangup-bot",
});
call.state = "hangup-bot";
call.endedAt = Date.now();
call.endReason = "hangup-bot";
this.persistCallRecord(call);
this.clearMaxDurationTimer(callId);
this.rejectTranscriptWaiter(callId, "Call ended: hangup-bot");
this.activeCalls.delete(callId);
if (call.providerCallId) {
this.providerCallIdMap.delete(call.providerCallId);
}
return { success: true };
} catch (err) {
return {
success: false,
error: err instanceof Error ? err.message : String(err),
};
}
}
/**
* Check if an inbound call should be accepted based on policy.
*/
private shouldAcceptInbound(from: string | undefined): boolean {
const { inboundPolicy: policy, allowFrom } = this.config;
switch (policy) {
case "disabled":
console.log("[voice-call] Inbound call rejected: policy is disabled");
return false;
case "open":
console.log("[voice-call] Inbound call accepted: policy is open");
return true;
case "allowlist":
case "pairing": {
const normalized = from?.replace(/\D/g, "") || "";
const allowed = (allowFrom || []).some((num) => {
const normalizedAllow = num.replace(/\D/g, "");
return (
normalized.endsWith(normalizedAllow) ||
normalizedAllow.endsWith(normalized)
);
});
const status = allowed ? "accepted" : "rejected";
console.log(
`[voice-call] Inbound call ${status}: ${from} ${allowed ? "is in" : "not in"} allowlist`,
);
return allowed;
}
default:
return false;
}
}
/**
* Create a call record for an inbound call.
*/
private createInboundCall(
providerCallId: string,
from: string,
to: string,
): CallRecord {
const callId = crypto.randomUUID();
const callRecord: CallRecord = {
callId,
providerCallId,
provider: this.provider?.name || "twilio",
direction: "inbound",
state: "ringing",
from,
to,
startedAt: Date.now(),
transcript: [],
processedEventIds: [],
metadata: {
initialMessage:
this.config.inboundGreeting || "Hello! How can I help you today?",
},
};
this.activeCalls.set(callId, callRecord);
this.providerCallIdMap.set(providerCallId, callId); // Map providerCallId to internal callId
this.persistCallRecord(callRecord);
console.log(
`[voice-call] Created inbound call record: ${callId} from ${from}`,
);
return callRecord;
}
/**
* Look up a call by either internal callId or providerCallId.
*/
private findCall(callIdOrProviderCallId: string): CallRecord | undefined {
// Try direct lookup by internal callId
const directCall = this.activeCalls.get(callIdOrProviderCallId);
if (directCall) return directCall;
// Try lookup by providerCallId
return this.getCallByProviderCallId(callIdOrProviderCallId);
}
/**
* Process a webhook event.
*/
processEvent(event: NormalizedEvent): void {
// Idempotency check
if (this.processedEventIds.has(event.id)) {
return;
}
this.processedEventIds.add(event.id);
let call = this.findCall(event.callId);
// Handle inbound calls - create record if it doesn't exist
if (!call && event.direction === "inbound" && event.providerCallId) {
// Check if we should accept this inbound call
if (!this.shouldAcceptInbound(event.from)) {
// TODO: Could hang up the call here
return;
}
// Create a new call record for this inbound call
call = this.createInboundCall(
event.providerCallId,
event.from || "unknown",
event.to || this.config.fromNumber || "unknown",
);
// Update the event's callId to use our internal ID
event.callId = call.callId;
}
if (!call) {
// Still no call record - ignore event
return;
}
// Update provider call ID if we got it
if (event.providerCallId && !call.providerCallId) {
call.providerCallId = event.providerCallId;
}
// Track processed event
call.processedEventIds.push(event.id);
// Process event based on type
switch (event.type) {
case "call.initiated":
this.transitionState(call, "initiated");
break;
case "call.ringing":
this.transitionState(call, "ringing");
break;
case "call.answered":
call.answeredAt = event.timestamp;
this.transitionState(call, "answered");
// Start max duration timer when call is answered
this.startMaxDurationTimer(call.callId);
break;
case "call.active":
this.transitionState(call, "active");
break;
case "call.speaking":
this.transitionState(call, "speaking");
break;
case "call.speech":
if (event.isFinal) {
this.addTranscriptEntry(call, "user", event.transcript);
this.resolveTranscriptWaiter(call.callId, event.transcript);
}
this.transitionState(call, "listening");
break;
case "call.ended":
call.endedAt = event.timestamp;
call.endReason = event.reason;
this.transitionState(call, event.reason as CallState);
this.clearMaxDurationTimer(call.callId);
this.rejectTranscriptWaiter(call.callId, `Call ended: ${event.reason}`);
this.activeCalls.delete(call.callId);
if (call.providerCallId) {
this.providerCallIdMap.delete(call.providerCallId);
}
break;
case "call.error":
if (!event.retryable) {
call.endedAt = event.timestamp;
call.endReason = "error";
this.transitionState(call, "error");
this.clearMaxDurationTimer(call.callId);
this.rejectTranscriptWaiter(
call.callId,
`Call error: ${event.error}`,
);
this.activeCalls.delete(call.callId);
if (call.providerCallId) {
this.providerCallIdMap.delete(call.providerCallId);
}
}
break;
}
this.persistCallRecord(call);
}
/**
* Get an active call by ID.
*/
getCall(callId: CallId): CallRecord | undefined {
return this.activeCalls.get(callId);
}
/**
* Get an active call by provider call ID (e.g., Twilio CallSid).
*/
getCallByProviderCallId(providerCallId: string): CallRecord | undefined {
// Fast path: use the providerCallIdMap for O(1) lookup
const callId = this.providerCallIdMap.get(providerCallId);
if (callId) {
return this.activeCalls.get(callId);
}
// Fallback: linear search for cases where map wasn't populated
// (e.g., providerCallId set directly on call record)
for (const call of this.activeCalls.values()) {
if (call.providerCallId === providerCallId) {
return call;
}
}
return undefined;
}
/**
* Get all active calls.
*/
getActiveCalls(): CallRecord[] {
return Array.from(this.activeCalls.values());
}
/**
* Get call history (from persisted logs).
*/
async getCallHistory(limit = 50): Promise<CallRecord[]> {
const logPath = path.join(this.storePath, "calls.jsonl");
try {
await fsp.access(logPath);
} catch {
return [];
}
const content = await fsp.readFile(logPath, "utf-8");
const lines = content.trim().split("\n").filter(Boolean);
const calls: CallRecord[] = [];
// Parse last N lines
for (const line of lines.slice(-limit)) {
try {
const parsed = CallRecordSchema.parse(JSON.parse(line));
calls.push(parsed);
} catch {
// Skip invalid lines
}
}
return calls;
}
// States that can cycle during multi-turn conversations
private static readonly ConversationStates = new Set<CallState>([
"speaking",
"listening",
]);
// Non-terminal state order for monotonic transitions
private static readonly StateOrder: readonly CallState[] = [
"initiated",
"ringing",
"answered",
"active",
"speaking",
"listening",
];
/**
* Transition call state with monotonic enforcement.
*/
private transitionState(call: CallRecord, newState: CallState): void {
// No-op for same state or already terminal
if (call.state === newState || TerminalStates.has(call.state)) return;
// Terminal states can always be reached from non-terminal
if (TerminalStates.has(newState)) {
call.state = newState;
return;
}
// Allow cycling between speaking and listening (multi-turn conversations)
if (
CallManager.ConversationStates.has(call.state) &&
CallManager.ConversationStates.has(newState)
) {
call.state = newState;
return;
}
// Only allow forward transitions in state order
const currentIndex = CallManager.StateOrder.indexOf(call.state);
const newIndex = CallManager.StateOrder.indexOf(newState);
if (newIndex > currentIndex) {
call.state = newState;
}
}
/**
* Add an entry to the call transcript.
*/
private addTranscriptEntry(
call: CallRecord,
speaker: "bot" | "user",
text: string,
): void {
const entry: TranscriptEntry = {
timestamp: Date.now(),
speaker,
text,
isFinal: true,
};
call.transcript.push(entry);
}
/**
* Persist a call record to disk (fire-and-forget async).
*/
private persistCallRecord(call: CallRecord): void {
const logPath = path.join(this.storePath, "calls.jsonl");
const line = `${JSON.stringify(call)}\n`;
// Fire-and-forget async write to avoid blocking event loop
fsp.appendFile(logPath, line).catch((err) => {
console.error("[voice-call] Failed to persist call record:", err);
});
}
/**
* Load active calls from persistence (for crash recovery).
* Uses streaming to handle large log files efficiently.
*/
private loadActiveCalls(): void {
const logPath = path.join(this.storePath, "calls.jsonl");
if (!fs.existsSync(logPath)) return;
// Read file synchronously and parse lines
const content = fs.readFileSync(logPath, "utf-8");
const lines = content.split("\n");
// Build map of latest state per call
const callMap = new Map<CallId, CallRecord>();
for (const line of lines) {
if (!line.trim()) continue;
try {
const call = CallRecordSchema.parse(JSON.parse(line));
callMap.set(call.callId, call);
} catch {
// Skip invalid lines
}
}
// Only keep non-terminal calls
for (const [callId, call] of callMap) {
if (!TerminalStates.has(call.state)) {
this.activeCalls.set(callId, call);
// Populate providerCallId mapping for lookups
if (call.providerCallId) {
this.providerCallIdMap.set(call.providerCallId, callId);
}
// Populate processed event IDs
for (const eventId of call.processedEventIds) {
this.processedEventIds.add(eventId);
}
}
}
}
/**
* Generate TwiML for notify mode (speak message and hang up).
*/
private generateNotifyTwiml(message: string, voice: string): string {
return `<?xml version="1.0" encoding="UTF-8"?>
<Response>
<Say voice="${voice}">${escapeXml(message)}</Say>
<Hangup/>
</Response>`;
}
}

View File

@@ -0,0 +1,279 @@
/**
* Media Stream Handler
*
* Handles bidirectional audio streaming between Twilio and the AI services.
* - Receives mu-law audio from Twilio via WebSocket
* - Forwards to OpenAI Realtime STT for transcription
* - Sends TTS audio back to Twilio
*/
import type { IncomingMessage } from "node:http";
import type { Duplex } from "node:stream";
import { WebSocket, WebSocketServer } from "ws";
import type {
OpenAIRealtimeSTTProvider,
RealtimeSTTSession,
} from "./providers/stt-openai-realtime.js";
/**
* Configuration for the media stream handler.
*/
export interface MediaStreamConfig {
/** STT provider for transcription */
sttProvider: OpenAIRealtimeSTTProvider;
/** Callback when transcript is received */
onTranscript?: (callId: string, transcript: string) => void;
/** Callback for partial transcripts (streaming UI) */
onPartialTranscript?: (callId: string, partial: string) => void;
/** Callback when stream connects */
onConnect?: (callId: string, streamSid: string) => void;
/** Callback when stream disconnects */
onDisconnect?: (callId: string) => void;
}
/**
* Active media stream session.
*/
interface StreamSession {
callId: string;
streamSid: string;
ws: WebSocket;
sttSession: RealtimeSTTSession;
}
/**
* Manages WebSocket connections for Twilio media streams.
*/
export class MediaStreamHandler {
private wss: WebSocketServer | null = null;
private sessions = new Map<string, StreamSession>();
private config: MediaStreamConfig;
constructor(config: MediaStreamConfig) {
this.config = config;
}
/**
* Handle WebSocket upgrade for media stream connections.
*/
handleUpgrade(request: IncomingMessage, socket: Duplex, head: Buffer): void {
if (!this.wss) {
this.wss = new WebSocketServer({ noServer: true });
this.wss.on("connection", (ws, req) => this.handleConnection(ws, req));
}
this.wss.handleUpgrade(request, socket, head, (ws) => {
this.wss?.emit("connection", ws, request);
});
}
/**
* Handle new WebSocket connection from Twilio.
*/
private async handleConnection(
ws: WebSocket,
_request: IncomingMessage,
): Promise<void> {
let session: StreamSession | null = null;
ws.on("message", async (data: Buffer) => {
try {
const message = JSON.parse(data.toString()) as TwilioMediaMessage;
switch (message.event) {
case "connected":
console.log("[MediaStream] Twilio connected");
break;
case "start":
session = await this.handleStart(ws, message);
break;
case "media":
if (session && message.media?.payload) {
// Forward audio to STT
const audioBuffer = Buffer.from(message.media.payload, "base64");
session.sttSession.sendAudio(audioBuffer);
}
break;
case "stop":
if (session) {
this.handleStop(session);
session = null;
}
break;
}
} catch (error) {
console.error("[MediaStream] Error processing message:", error);
}
});
ws.on("close", () => {
if (session) {
this.handleStop(session);
}
});
ws.on("error", (error) => {
console.error("[MediaStream] WebSocket error:", error);
});
}
/**
* Handle stream start event.
*/
private async handleStart(
ws: WebSocket,
message: TwilioMediaMessage,
): Promise<StreamSession> {
const streamSid = message.streamSid || "";
const callSid = message.start?.callSid || "";
console.log(
`[MediaStream] Stream started: ${streamSid} (call: ${callSid})`,
);
// Create STT session
const sttSession = this.config.sttProvider.createSession();
// Set up transcript callbacks
sttSession.onPartial((partial) => {
this.config.onPartialTranscript?.(callSid, partial);
});
sttSession.onTranscript((transcript) => {
this.config.onTranscript?.(callSid, transcript);
});
const session: StreamSession = {
callId: callSid,
streamSid,
ws,
sttSession,
};
this.sessions.set(streamSid, session);
// Notify connection BEFORE STT connect so TTS can work even if STT fails
this.config.onConnect?.(callSid, streamSid);
// Connect to OpenAI STT (non-blocking, log errors but don't fail the call)
sttSession.connect().catch((err) => {
console.warn(
`[MediaStream] STT connection failed (TTS still works):`,
err.message,
);
});
return session;
}
/**
* Handle stream stop event.
*/
private handleStop(session: StreamSession): void {
console.log(`[MediaStream] Stream stopped: ${session.streamSid}`);
session.sttSession.close();
this.sessions.delete(session.streamSid);
this.config.onDisconnect?.(session.callId);
}
/**
* Get an active session with an open WebSocket, or undefined if unavailable.
*/
private getOpenSession(streamSid: string): StreamSession | undefined {
const session = this.sessions.get(streamSid);
return session?.ws.readyState === WebSocket.OPEN ? session : undefined;
}
/**
* Send a message to a stream's WebSocket if available.
*/
private sendToStream(streamSid: string, message: unknown): void {
const session = this.getOpenSession(streamSid);
session?.ws.send(JSON.stringify(message));
}
/**
* Send audio to a specific stream (for TTS playback).
* Audio should be mu-law encoded at 8kHz mono.
*/
sendAudio(streamSid: string, muLawAudio: Buffer): void {
this.sendToStream(streamSid, {
event: "media",
streamSid,
media: { payload: muLawAudio.toString("base64") },
});
}
/**
* Send a mark event to track audio playback position.
*/
sendMark(streamSid: string, name: string): void {
this.sendToStream(streamSid, {
event: "mark",
streamSid,
mark: { name },
});
}
/**
* Clear audio buffer (interrupt playback).
*/
clearAudio(streamSid: string): void {
this.sendToStream(streamSid, { event: "clear", streamSid });
}
/**
* Get active session by call ID.
*/
getSessionByCallId(callId: string): StreamSession | undefined {
return [...this.sessions.values()].find(
(session) => session.callId === callId,
);
}
/**
* Close all sessions.
*/
closeAll(): void {
for (const session of this.sessions.values()) {
session.sttSession.close();
session.ws.close();
}
this.sessions.clear();
}
}
/**
* Twilio Media Stream message format.
*/
interface TwilioMediaMessage {
event: "connected" | "start" | "media" | "stop" | "mark" | "clear";
sequenceNumber?: string;
streamSid?: string;
start?: {
streamSid: string;
accountSid: string;
callSid: string;
tracks: string[];
mediaFormat: {
encoding: string;
sampleRate: number;
channels: number;
};
};
media?: {
track?: string;
chunk?: string;
timestamp?: string;
payload?: string;
};
mark?: {
name: string;
};
}

View File

@@ -0,0 +1,67 @@
import type {
HangupCallInput,
InitiateCallInput,
InitiateCallResult,
PlayTtsInput,
ProviderName,
ProviderWebhookParseResult,
StartListeningInput,
StopListeningInput,
WebhookContext,
WebhookVerificationResult,
} from "../types.js";
/**
* Abstract base interface for voice call providers.
*
* Each provider (Telnyx, Twilio, etc.) implements this interface to provide
* a consistent API for the call manager.
*
* Responsibilities:
* - Webhook verification and event parsing
* - Outbound call initiation and hangup
* - Media control (TTS playback, STT listening)
*/
export interface VoiceCallProvider {
/** Provider identifier */
readonly name: ProviderName;
/**
* Verify webhook signature/HMAC before processing.
* Must be called before parseWebhookEvent.
*/
verifyWebhook(ctx: WebhookContext): WebhookVerificationResult;
/**
* Parse provider-specific webhook payload into normalized events.
* Returns events and optional response to send back to provider.
*/
parseWebhookEvent(ctx: WebhookContext): ProviderWebhookParseResult;
/**
* Initiate an outbound call.
* @returns Provider call ID and status
*/
initiateCall(input: InitiateCallInput): Promise<InitiateCallResult>;
/**
* Hang up an active call.
*/
hangupCall(input: HangupCallInput): Promise<void>;
/**
* Play TTS audio to the caller.
* The provider should handle streaming if supported.
*/
playTts(input: PlayTtsInput): Promise<void>;
/**
* Start listening for user speech (activate STT).
*/
startListening(input: StartListeningInput): Promise<void>;
/**
* Stop listening for user speech (deactivate STT).
*/
stopListening(input: StopListeningInput): Promise<void>;
}

View File

@@ -0,0 +1,9 @@
export type { VoiceCallProvider } from "./base.js";
export { MockProvider } from "./mock.js";
export {
OpenAIRealtimeSTTProvider,
type RealtimeSTTConfig,
type RealtimeSTTSession,
} from "./stt-openai-realtime.js";
export { TelnyxProvider } from "./telnyx.js";
export { TwilioProvider } from "./twilio.js";

View File

@@ -0,0 +1,168 @@
import crypto from "node:crypto";
import type {
EndReason,
HangupCallInput,
InitiateCallInput,
InitiateCallResult,
NormalizedEvent,
PlayTtsInput,
ProviderWebhookParseResult,
StartListeningInput,
StopListeningInput,
WebhookContext,
WebhookVerificationResult,
} from "../types.js";
import type { VoiceCallProvider } from "./base.js";
/**
* Mock voice call provider for local testing.
*
* Events are driven via webhook POST with JSON body:
* - { events: NormalizedEvent[] } for bulk events
* - { event: NormalizedEvent } for single event
*/
export class MockProvider implements VoiceCallProvider {
readonly name = "mock" as const;
verifyWebhook(_ctx: WebhookContext): WebhookVerificationResult {
return { ok: true };
}
parseWebhookEvent(ctx: WebhookContext): ProviderWebhookParseResult {
try {
const payload = JSON.parse(ctx.rawBody);
const events: NormalizedEvent[] = [];
if (Array.isArray(payload.events)) {
for (const evt of payload.events) {
const normalized = this.normalizeEvent(evt);
if (normalized) events.push(normalized);
}
} else if (payload.event) {
const normalized = this.normalizeEvent(payload.event);
if (normalized) events.push(normalized);
}
return { events, statusCode: 200 };
} catch {
return { events: [], statusCode: 400 };
}
}
private normalizeEvent(
evt: Partial<NormalizedEvent>,
): NormalizedEvent | null {
if (!evt.type || !evt.callId) return null;
const base = {
id: evt.id || crypto.randomUUID(),
callId: evt.callId,
providerCallId: evt.providerCallId,
timestamp: evt.timestamp || Date.now(),
};
switch (evt.type) {
case "call.initiated":
case "call.ringing":
case "call.answered":
case "call.active":
return { ...base, type: evt.type };
case "call.speaking": {
const payload = evt as Partial<NormalizedEvent & { text?: string }>;
return {
...base,
type: evt.type,
text: payload.text || "",
};
}
case "call.speech": {
const payload = evt as Partial<
NormalizedEvent & {
transcript?: string;
isFinal?: boolean;
confidence?: number;
}
>;
return {
...base,
type: evt.type,
transcript: payload.transcript || "",
isFinal: payload.isFinal ?? true,
confidence: payload.confidence,
};
}
case "call.silence": {
const payload = evt as Partial<
NormalizedEvent & { durationMs?: number }
>;
return {
...base,
type: evt.type,
durationMs: payload.durationMs || 0,
};
}
case "call.dtmf": {
const payload = evt as Partial<NormalizedEvent & { digits?: string }>;
return {
...base,
type: evt.type,
digits: payload.digits || "",
};
}
case "call.ended": {
const payload = evt as Partial<
NormalizedEvent & { reason?: EndReason }
>;
return {
...base,
type: evt.type,
reason: payload.reason || "completed",
};
}
case "call.error": {
const payload = evt as Partial<
NormalizedEvent & { error?: string; retryable?: boolean }
>;
return {
...base,
type: evt.type,
error: payload.error || "unknown error",
retryable: payload.retryable,
};
}
default:
return null;
}
}
async initiateCall(input: InitiateCallInput): Promise<InitiateCallResult> {
return {
providerCallId: `mock-${input.callId}`,
status: "initiated",
};
}
async hangupCall(_input: HangupCallInput): Promise<void> {
// No-op for mock
}
async playTts(_input: PlayTtsInput): Promise<void> {
// No-op for mock
}
async startListening(_input: StartListeningInput): Promise<void> {
// No-op for mock
}
async stopListening(_input: StopListeningInput): Promise<void> {
// No-op for mock
}
}

View File

@@ -0,0 +1,303 @@
/**
* OpenAI Realtime STT Provider
*
* Uses the OpenAI Realtime API for streaming transcription with:
* - Direct mu-law audio support (no conversion needed)
* - Built-in server-side VAD for turn detection
* - Low-latency streaming transcription
* - Partial transcript callbacks for real-time UI updates
*/
import WebSocket from "ws";
/**
* Configuration for OpenAI Realtime STT.
*/
export interface RealtimeSTTConfig {
/** OpenAI API key */
apiKey: string;
/** Model to use (default: gpt-4o-transcribe) */
model?: string;
/** Silence duration in ms before considering speech ended (default: 800) */
silenceDurationMs?: number;
/** VAD threshold 0-1 (default: 0.5) */
vadThreshold?: number;
}
/**
* Session for streaming audio and receiving transcripts.
*/
export interface RealtimeSTTSession {
/** Connect to the transcription service */
connect(): Promise<void>;
/** Send mu-law audio data (8kHz mono) */
sendAudio(audio: Buffer): void;
/** Wait for next complete transcript (after VAD detects end of speech) */
waitForTranscript(timeoutMs?: number): Promise<string>;
/** Set callback for partial transcripts (streaming) */
onPartial(callback: (partial: string) => void): void;
/** Set callback for final transcripts */
onTranscript(callback: (transcript: string) => void): void;
/** Close the session */
close(): void;
/** Check if session is connected */
isConnected(): boolean;
}
/**
* Provider factory for OpenAI Realtime STT sessions.
*/
export class OpenAIRealtimeSTTProvider {
readonly name = "openai-realtime";
private apiKey: string;
private model: string;
private silenceDurationMs: number;
private vadThreshold: number;
constructor(config: RealtimeSTTConfig) {
if (!config.apiKey) {
throw new Error("OpenAI API key required for Realtime STT");
}
this.apiKey = config.apiKey;
this.model = config.model || "gpt-4o-transcribe";
this.silenceDurationMs = config.silenceDurationMs || 800;
this.vadThreshold = config.vadThreshold || 0.5;
}
/**
* Create a new realtime transcription session.
*/
createSession(): RealtimeSTTSession {
return new OpenAIRealtimeSTTSession(
this.apiKey,
this.model,
this.silenceDurationMs,
this.vadThreshold,
);
}
}
/**
* WebSocket-based session for real-time speech-to-text.
*/
class OpenAIRealtimeSTTSession implements RealtimeSTTSession {
private static readonly MAX_RECONNECT_ATTEMPTS = 5;
private static readonly RECONNECT_DELAY_MS = 1000;
private ws: WebSocket | null = null;
private connected = false;
private closed = false;
private reconnectAttempts = 0;
private pendingTranscript = "";
private onTranscriptCallback: ((transcript: string) => void) | null = null;
private onPartialCallback: ((partial: string) => void) | null = null;
constructor(
private readonly apiKey: string,
private readonly model: string,
private readonly silenceDurationMs: number,
private readonly vadThreshold: number,
) {}
async connect(): Promise<void> {
this.closed = false;
this.reconnectAttempts = 0;
return this.doConnect();
}
private async doConnect(): Promise<void> {
return new Promise((resolve, reject) => {
const url = "wss://api.openai.com/v1/realtime?intent=transcription";
this.ws = new WebSocket(url, {
headers: {
Authorization: `Bearer ${this.apiKey}`,
"OpenAI-Beta": "realtime=v1",
},
});
this.ws.on("open", () => {
console.log("[RealtimeSTT] WebSocket connected");
this.connected = true;
this.reconnectAttempts = 0;
// Configure the transcription session
this.sendEvent({
type: "transcription_session.update",
session: {
input_audio_format: "g711_ulaw",
input_audio_transcription: {
model: this.model,
},
turn_detection: {
type: "server_vad",
threshold: this.vadThreshold,
prefix_padding_ms: 300,
silence_duration_ms: this.silenceDurationMs,
},
},
});
resolve();
});
this.ws.on("message", (data: Buffer) => {
try {
const event = JSON.parse(data.toString());
this.handleEvent(event);
} catch (e) {
console.error("[RealtimeSTT] Failed to parse event:", e);
}
});
this.ws.on("error", (error) => {
console.error("[RealtimeSTT] WebSocket error:", error);
if (!this.connected) reject(error);
});
this.ws.on("close", (code, reason) => {
console.log(
`[RealtimeSTT] WebSocket closed (code: ${code}, reason: ${reason?.toString() || "none"})`,
);
this.connected = false;
// Attempt reconnection if not intentionally closed
if (!this.closed) {
void this.attemptReconnect();
}
});
setTimeout(() => {
if (!this.connected) {
reject(new Error("Realtime STT connection timeout"));
}
}, 10000);
});
}
private async attemptReconnect(): Promise<void> {
if (this.closed) {
return;
}
if (
this.reconnectAttempts >= OpenAIRealtimeSTTSession.MAX_RECONNECT_ATTEMPTS
) {
console.error(
`[RealtimeSTT] Max reconnect attempts (${OpenAIRealtimeSTTSession.MAX_RECONNECT_ATTEMPTS}) reached`,
);
return;
}
this.reconnectAttempts++;
const delay =
OpenAIRealtimeSTTSession.RECONNECT_DELAY_MS *
2 ** (this.reconnectAttempts - 1);
console.log(
`[RealtimeSTT] Reconnecting ${this.reconnectAttempts}/${OpenAIRealtimeSTTSession.MAX_RECONNECT_ATTEMPTS} in ${delay}ms...`,
);
await new Promise((resolve) => setTimeout(resolve, delay));
if (this.closed) {
return;
}
try {
await this.doConnect();
console.log("[RealtimeSTT] Reconnected successfully");
} catch (error) {
console.error("[RealtimeSTT] Reconnect failed:", error);
}
}
private handleEvent(event: {
type: string;
delta?: string;
transcript?: string;
error?: unknown;
}): void {
switch (event.type) {
case "transcription_session.created":
case "transcription_session.updated":
case "input_audio_buffer.speech_stopped":
case "input_audio_buffer.committed":
console.log(`[RealtimeSTT] ${event.type}`);
break;
case "conversation.item.input_audio_transcription.delta":
if (event.delta) {
this.pendingTranscript += event.delta;
this.onPartialCallback?.(this.pendingTranscript);
}
break;
case "conversation.item.input_audio_transcription.completed":
if (event.transcript) {
console.log(`[RealtimeSTT] Transcript: ${event.transcript}`);
this.onTranscriptCallback?.(event.transcript);
}
this.pendingTranscript = "";
break;
case "input_audio_buffer.speech_started":
console.log("[RealtimeSTT] Speech started");
this.pendingTranscript = "";
break;
case "error":
console.error("[RealtimeSTT] Error:", event.error);
break;
}
}
private sendEvent(event: unknown): void {
if (this.ws?.readyState === WebSocket.OPEN) {
this.ws.send(JSON.stringify(event));
}
}
sendAudio(muLawData: Buffer): void {
if (!this.connected) return;
this.sendEvent({
type: "input_audio_buffer.append",
audio: muLawData.toString("base64"),
});
}
onPartial(callback: (partial: string) => void): void {
this.onPartialCallback = callback;
}
onTranscript(callback: (transcript: string) => void): void {
this.onTranscriptCallback = callback;
}
async waitForTranscript(timeoutMs = 30000): Promise<string> {
return new Promise((resolve, reject) => {
const timeout = setTimeout(() => {
this.onTranscriptCallback = null;
reject(new Error("Transcript timeout"));
}, timeoutMs);
this.onTranscriptCallback = (transcript) => {
clearTimeout(timeout);
this.onTranscriptCallback = null;
resolve(transcript);
};
});
}
close(): void {
this.closed = true;
if (this.ws) {
this.ws.close();
this.ws = null;
}
this.connected = false;
}
isConnected(): boolean {
return this.connected;
}
}

View File

@@ -0,0 +1,364 @@
import crypto from "node:crypto";
import type { TelnyxConfig } from "../config.js";
import type {
EndReason,
HangupCallInput,
InitiateCallInput,
InitiateCallResult,
NormalizedEvent,
PlayTtsInput,
ProviderWebhookParseResult,
StartListeningInput,
StopListeningInput,
WebhookContext,
WebhookVerificationResult,
} from "../types.js";
import type { VoiceCallProvider } from "./base.js";
/**
* Telnyx Voice API provider implementation.
*
* Uses Telnyx Call Control API v2 for managing calls.
* @see https://developers.telnyx.com/docs/api/v2/call-control
*/
export class TelnyxProvider implements VoiceCallProvider {
readonly name = "telnyx" as const;
private readonly apiKey: string;
private readonly connectionId: string;
private readonly publicKey: string | undefined;
private readonly baseUrl = "https://api.telnyx.com/v2";
constructor(config: TelnyxConfig) {
if (!config.apiKey) {
throw new Error("Telnyx API key is required");
}
if (!config.connectionId) {
throw new Error("Telnyx connection ID is required");
}
this.apiKey = config.apiKey;
this.connectionId = config.connectionId;
this.publicKey = config.publicKey;
}
/**
* Make an authenticated request to the Telnyx API.
*/
private async apiRequest<T = unknown>(
endpoint: string,
body: Record<string, unknown>,
options?: { allowNotFound?: boolean },
): Promise<T> {
const response = await fetch(`${this.baseUrl}${endpoint}`, {
method: "POST",
headers: {
Authorization: `Bearer ${this.apiKey}`,
"Content-Type": "application/json",
},
body: JSON.stringify(body),
});
if (!response.ok) {
if (options?.allowNotFound && response.status === 404) {
return undefined as T;
}
const errorText = await response.text();
throw new Error(`Telnyx API error: ${response.status} ${errorText}`);
}
const text = await response.text();
return text ? (JSON.parse(text) as T) : (undefined as T);
}
/**
* Verify Telnyx webhook signature using Ed25519.
*/
verifyWebhook(ctx: WebhookContext): WebhookVerificationResult {
if (!this.publicKey) {
// No public key configured, skip verification (not recommended for production)
return { ok: true };
}
const signature = ctx.headers["telnyx-signature-ed25519"];
const timestamp = ctx.headers["telnyx-timestamp"];
if (!signature || !timestamp) {
return { ok: false, reason: "Missing signature or timestamp header" };
}
const signatureStr = Array.isArray(signature) ? signature[0] : signature;
const timestampStr = Array.isArray(timestamp) ? timestamp[0] : timestamp;
if (!signatureStr || !timestampStr) {
return { ok: false, reason: "Empty signature or timestamp" };
}
try {
const signedPayload = `${timestampStr}|${ctx.rawBody}`;
const signatureBuffer = Buffer.from(signatureStr, "base64");
const publicKeyBuffer = Buffer.from(this.publicKey, "base64");
const isValid = crypto.verify(
null, // Ed25519 doesn't use a digest
Buffer.from(signedPayload),
{
key: publicKeyBuffer,
format: "der",
type: "spki",
},
signatureBuffer,
);
if (!isValid) {
return { ok: false, reason: "Invalid signature" };
}
// Check timestamp is within 5 minutes
const eventTime = parseInt(timestampStr, 10) * 1000;
const now = Date.now();
if (Math.abs(now - eventTime) > 5 * 60 * 1000) {
return { ok: false, reason: "Timestamp too old" };
}
return { ok: true };
} catch (err) {
return {
ok: false,
reason: `Verification error: ${err instanceof Error ? err.message : String(err)}`,
};
}
}
/**
* Parse Telnyx webhook event into normalized format.
*/
parseWebhookEvent(ctx: WebhookContext): ProviderWebhookParseResult {
try {
const payload = JSON.parse(ctx.rawBody);
const data = payload.data;
if (!data || !data.event_type) {
return { events: [], statusCode: 200 };
}
const event = this.normalizeEvent(data);
return {
events: event ? [event] : [],
statusCode: 200,
};
} catch {
return { events: [], statusCode: 400 };
}
}
/**
* Convert Telnyx event to normalized event format.
*/
private normalizeEvent(data: TelnyxEvent): NormalizedEvent | null {
// Decode client_state from Base64 (we encode it in initiateCall)
let callId = "";
if (data.payload?.client_state) {
try {
callId = Buffer.from(data.payload.client_state, "base64").toString(
"utf8",
);
} catch {
// Fallback if not valid Base64
callId = data.payload.client_state;
}
}
if (!callId) {
callId = data.payload?.call_control_id || "";
}
const baseEvent = {
id: data.id || crypto.randomUUID(),
callId,
providerCallId: data.payload?.call_control_id,
timestamp: Date.now(),
};
switch (data.event_type) {
case "call.initiated":
return { ...baseEvent, type: "call.initiated" };
case "call.ringing":
return { ...baseEvent, type: "call.ringing" };
case "call.answered":
return { ...baseEvent, type: "call.answered" };
case "call.bridged":
return { ...baseEvent, type: "call.active" };
case "call.speak.started":
return {
...baseEvent,
type: "call.speaking",
text: data.payload?.text || "",
};
case "call.transcription":
return {
...baseEvent,
type: "call.speech",
transcript: data.payload?.transcription || "",
isFinal: data.payload?.is_final ?? true,
confidence: data.payload?.confidence,
};
case "call.hangup":
return {
...baseEvent,
type: "call.ended",
reason: this.mapHangupCause(data.payload?.hangup_cause),
};
case "call.dtmf.received":
return {
...baseEvent,
type: "call.dtmf",
digits: data.payload?.digit || "",
};
default:
return null;
}
}
/**
* Map Telnyx hangup cause to normalized end reason.
* @see https://developers.telnyx.com/docs/api/v2/call-control/Call-Commands#hangup-causes
*/
private mapHangupCause(cause?: string): EndReason {
switch (cause) {
case "normal_clearing":
case "normal_unspecified":
return "completed";
case "originator_cancel":
return "hangup-bot";
case "call_rejected":
case "user_busy":
return "busy";
case "no_answer":
case "no_user_response":
return "no-answer";
case "destination_out_of_order":
case "network_out_of_order":
case "service_unavailable":
case "recovery_on_timer_expire":
return "failed";
case "machine_detected":
case "fax_detected":
return "voicemail";
case "user_hangup":
case "subscriber_absent":
return "hangup-user";
default:
// Unknown cause - log it for debugging and return completed
if (cause) {
console.warn(`[telnyx] Unknown hangup cause: ${cause}`);
}
return "completed";
}
}
/**
* Initiate an outbound call via Telnyx API.
*/
async initiateCall(input: InitiateCallInput): Promise<InitiateCallResult> {
const result = await this.apiRequest<TelnyxCallResponse>("/calls", {
connection_id: this.connectionId,
to: input.to,
from: input.from,
webhook_url: input.webhookUrl,
webhook_url_method: "POST",
client_state: Buffer.from(input.callId).toString("base64"),
timeout_secs: 30,
});
return {
providerCallId: result.data.call_control_id,
status: "initiated",
};
}
/**
* Hang up a call via Telnyx API.
*/
async hangupCall(input: HangupCallInput): Promise<void> {
await this.apiRequest(
`/calls/${input.providerCallId}/actions/hangup`,
{ command_id: crypto.randomUUID() },
{ allowNotFound: true },
);
}
/**
* Play TTS audio via Telnyx speak action.
*/
async playTts(input: PlayTtsInput): Promise<void> {
await this.apiRequest(`/calls/${input.providerCallId}/actions/speak`, {
command_id: crypto.randomUUID(),
payload: input.text,
voice: input.voice || "female",
language: input.locale || "en-US",
});
}
/**
* Start transcription (STT) via Telnyx.
*/
async startListening(input: StartListeningInput): Promise<void> {
await this.apiRequest(
`/calls/${input.providerCallId}/actions/transcription_start`,
{
command_id: crypto.randomUUID(),
language: input.language || "en",
},
);
}
/**
* Stop transcription via Telnyx.
*/
async stopListening(input: StopListeningInput): Promise<void> {
await this.apiRequest(
`/calls/${input.providerCallId}/actions/transcription_stop`,
{ command_id: crypto.randomUUID() },
{ allowNotFound: true },
);
}
}
// -----------------------------------------------------------------------------
// Telnyx-specific types
// -----------------------------------------------------------------------------
interface TelnyxEvent {
id?: string;
event_type: string;
payload?: {
call_control_id?: string;
client_state?: string;
text?: string;
transcription?: string;
is_final?: boolean;
confidence?: number;
hangup_cause?: string;
digit?: string;
[key: string]: unknown;
};
}
interface TelnyxCallResponse {
data: {
call_control_id: string;
call_leg_id: string;
call_session_id: string;
is_alive: boolean;
record_type: string;
};
}

View File

@@ -0,0 +1,264 @@
/**
* OpenAI TTS Provider
*
* Generates speech audio using OpenAI's text-to-speech API.
* Handles audio format conversion for telephony (mu-law 8kHz).
*
* Best practices from OpenAI docs:
* - Use gpt-4o-mini-tts for intelligent realtime applications (supports instructions)
* - Use tts-1 for lower latency, tts-1-hd for higher quality
* - Use marin or cedar voices for best quality
* - Use pcm or wav format for fastest response times
*
* @see https://platform.openai.com/docs/guides/text-to-speech
*/
/**
* OpenAI TTS configuration.
*/
export interface OpenAITTSConfig {
/** OpenAI API key (uses OPENAI_API_KEY env if not set) */
apiKey?: string;
/**
* TTS model:
* - gpt-4o-mini-tts: newest, supports instructions for tone/style control (recommended)
* - tts-1: lower latency
* - tts-1-hd: higher quality
*/
model?: string;
/**
* Voice to use. For best quality, use marin or cedar.
* All 13 voices: alloy, ash, ballad, coral, echo, fable, nova, onyx, sage, shimmer, verse, marin, cedar
* Note: tts-1/tts-1-hd only support: alloy, ash, coral, echo, fable, onyx, nova, sage, shimmer
*/
voice?: string;
/** Speed multiplier (0.25 to 4.0) */
speed?: number;
/**
* Instructions for speech style (only works with gpt-4o-mini-tts model).
* Examples: "Speak in a cheerful tone", "Talk like a sympathetic customer service agent"
*/
instructions?: string;
}
/**
* Supported OpenAI TTS voices (all 13 built-in voices).
* For best quality, use marin or cedar.
* Note: tts-1 and tts-1-hd support a smaller set.
*/
export const OPENAI_TTS_VOICES = [
"alloy",
"ash",
"ballad",
"coral",
"echo",
"fable",
"nova",
"onyx",
"sage",
"shimmer",
"verse",
"marin",
"cedar",
] as const;
export type OpenAITTSVoice = (typeof OPENAI_TTS_VOICES)[number];
/**
* OpenAI TTS Provider for generating speech audio.
*/
export class OpenAITTSProvider {
private apiKey: string;
private model: string;
private voice: OpenAITTSVoice;
private speed: number;
private instructions?: string;
constructor(config: OpenAITTSConfig = {}) {
this.apiKey = config.apiKey || process.env.OPENAI_API_KEY || "";
// Default to gpt-4o-mini-tts for intelligent realtime applications
this.model = config.model || "gpt-4o-mini-tts";
// Default to coral - good balance of quality and natural tone
this.voice = (config.voice as OpenAITTSVoice) || "coral";
this.speed = config.speed || 1.0;
this.instructions = config.instructions;
if (!this.apiKey) {
throw new Error(
"OpenAI API key required (set OPENAI_API_KEY or pass apiKey)",
);
}
}
/**
* Generate speech audio from text.
* Returns raw PCM audio data (24kHz, mono, 16-bit).
*/
async synthesize(text: string, instructions?: string): Promise<Buffer> {
// Build request body
const body: Record<string, unknown> = {
model: this.model,
input: text,
voice: this.voice,
response_format: "pcm", // Raw PCM audio (24kHz, mono, 16-bit signed LE)
speed: this.speed,
};
// Add instructions if using gpt-4o-mini-tts model
const effectiveInstructions = instructions || this.instructions;
if (effectiveInstructions && this.model.includes("gpt-4o-mini-tts")) {
body.instructions = effectiveInstructions;
}
const response = await fetch("https://api.openai.com/v1/audio/speech", {
method: "POST",
headers: {
Authorization: `Bearer ${this.apiKey}`,
"Content-Type": "application/json",
},
body: JSON.stringify(body),
});
if (!response.ok) {
const error = await response.text();
throw new Error(`OpenAI TTS failed: ${response.status} - ${error}`);
}
const arrayBuffer = await response.arrayBuffer();
return Buffer.from(arrayBuffer);
}
/**
* Generate speech and convert to mu-law format for Twilio.
* Twilio Media Streams expect 8kHz mono mu-law audio.
*/
async synthesizeForTwilio(text: string): Promise<Buffer> {
// Get raw PCM from OpenAI (24kHz, 16-bit signed LE, mono)
const pcm24k = await this.synthesize(text);
// Resample from 24kHz to 8kHz
const pcm8k = resample24kTo8k(pcm24k);
// Encode to mu-law
return pcmToMulaw(pcm8k);
}
}
/**
* Resample 24kHz PCM to 8kHz using linear interpolation.
* Input/output: 16-bit signed little-endian mono.
*/
function resample24kTo8k(input: Buffer): Buffer {
const inputSamples = input.length / 2;
const outputSamples = Math.floor(inputSamples / 3);
const output = Buffer.alloc(outputSamples * 2);
for (let i = 0; i < outputSamples; i++) {
// Calculate position in input (3:1 ratio)
const srcPos = i * 3;
const srcIdx = srcPos * 2;
if (srcIdx + 3 < input.length) {
// Linear interpolation between samples
const s0 = input.readInt16LE(srcIdx);
const s1 = input.readInt16LE(srcIdx + 2);
const frac = srcPos % 1 || 0;
const sample = Math.round(s0 + frac * (s1 - s0));
output.writeInt16LE(clamp16(sample), i * 2);
} else {
// Last sample
output.writeInt16LE(input.readInt16LE(srcIdx), i * 2);
}
}
return output;
}
/**
* Clamp value to 16-bit signed integer range.
*/
function clamp16(value: number): number {
return Math.max(-32768, Math.min(32767, value));
}
/**
* Convert 16-bit PCM to 8-bit mu-law.
* Standard G.711 mu-law encoding for telephony.
*/
function pcmToMulaw(pcm: Buffer): Buffer {
const samples = pcm.length / 2;
const mulaw = Buffer.alloc(samples);
for (let i = 0; i < samples; i++) {
const sample = pcm.readInt16LE(i * 2);
mulaw[i] = linearToMulaw(sample);
}
return mulaw;
}
/**
* Convert a single 16-bit linear sample to 8-bit mu-law.
* Implements ITU-T G.711 mu-law encoding.
*/
function linearToMulaw(sample: number): number {
const BIAS = 132;
const CLIP = 32635;
// Get sign bit
const sign = sample < 0 ? 0x80 : 0;
if (sample < 0) sample = -sample;
// Clip to prevent overflow
if (sample > CLIP) sample = CLIP;
// Add bias and find segment
sample += BIAS;
let exponent = 7;
for (
let expMask = 0x4000;
(sample & expMask) === 0 && exponent > 0;
exponent--, expMask >>= 1
) {
// Find the segment (exponent)
}
// Extract mantissa bits
const mantissa = (sample >> (exponent + 3)) & 0x0f;
// Combine into mu-law byte (inverted for transmission)
return ~(sign | (exponent << 4) | mantissa) & 0xff;
}
/**
* Convert 8-bit mu-law to 16-bit linear PCM.
* Useful for decoding incoming audio.
*/
export function mulawToLinear(mulaw: number): number {
// mu-law is transmitted inverted
mulaw = ~mulaw & 0xff;
const sign = mulaw & 0x80;
const exponent = (mulaw >> 4) & 0x07;
const mantissa = mulaw & 0x0f;
let sample = ((mantissa << 3) + 132) << exponent;
sample -= 132;
return sign ? -sample : sample;
}
/**
* Chunk audio buffer into 20ms frames for streaming.
* At 8kHz mono, 20ms = 160 samples = 160 bytes (mu-law).
*/
export function chunkAudio(
audio: Buffer,
chunkSize = 160,
): Generator<Buffer, void, unknown> {
return (function* () {
for (let i = 0; i < audio.length; i += chunkSize) {
yield audio.subarray(i, Math.min(i + chunkSize, audio.length));
}
})();
}

View File

@@ -0,0 +1,537 @@
import crypto from "node:crypto";
import type { TwilioConfig } from "../config.js";
import type { MediaStreamHandler } from "../media-stream.js";
import type {
HangupCallInput,
InitiateCallInput,
InitiateCallResult,
NormalizedEvent,
PlayTtsInput,
ProviderWebhookParseResult,
StartListeningInput,
StopListeningInput,
WebhookContext,
WebhookVerificationResult,
} from "../types.js";
import { escapeXml, mapVoiceToPolly } from "../voice-mapping.js";
import { verifyTwilioWebhook } from "../webhook-security.js";
import type { VoiceCallProvider } from "./base.js";
import type { OpenAITTSProvider } from "./tts-openai.js";
import { chunkAudio } from "./tts-openai.js";
/**
* Twilio Voice API provider implementation.
*
* Uses Twilio Programmable Voice API with Media Streams for real-time
* bidirectional audio streaming.
*
* @see https://www.twilio.com/docs/voice
* @see https://www.twilio.com/docs/voice/media-streams
*/
export interface TwilioProviderOptions {
/** Allow ngrok free tier compatibility mode (less secure) */
allowNgrokFreeTier?: boolean;
/** Override public URL for signature verification */
publicUrl?: string;
/** Path for media stream WebSocket (e.g., /voice/stream) */
streamPath?: string;
/** Skip webhook signature verification (development only) */
skipVerification?: boolean;
}
export class TwilioProvider implements VoiceCallProvider {
readonly name = "twilio" as const;
private readonly accountSid: string;
private readonly authToken: string;
private readonly baseUrl: string;
private readonly callWebhookUrls = new Map<string, string>();
private readonly options: TwilioProviderOptions;
/** Current public webhook URL (set when tunnel starts or from config) */
private currentPublicUrl: string | null = null;
/** Optional OpenAI TTS provider for streaming TTS */
private ttsProvider: OpenAITTSProvider | null = null;
/** Optional media stream handler for sending audio */
private mediaStreamHandler: MediaStreamHandler | null = null;
/** Map of call SID to stream SID for media streams */
private callStreamMap = new Map<string, string>();
constructor(config: TwilioConfig, options: TwilioProviderOptions = {}) {
if (!config.accountSid) {
throw new Error("Twilio Account SID is required");
}
if (!config.authToken) {
throw new Error("Twilio Auth Token is required");
}
this.accountSid = config.accountSid;
this.authToken = config.authToken;
this.baseUrl = `https://api.twilio.com/2010-04-01/Accounts/${this.accountSid}`;
this.options = options;
if (options.publicUrl) {
this.currentPublicUrl = options.publicUrl;
}
}
/**
* Set the current public webhook URL (called when tunnel starts).
*/
setPublicUrl(url: string): void {
this.currentPublicUrl = url;
}
/**
* Get the current public webhook URL.
*/
getPublicUrl(): string | null {
return this.currentPublicUrl;
}
/**
* Set the OpenAI TTS provider for streaming TTS.
* When set, playTts will use OpenAI audio via media streams.
*/
setTTSProvider(provider: OpenAITTSProvider): void {
this.ttsProvider = provider;
}
/**
* Set the media stream handler for sending audio.
*/
setMediaStreamHandler(handler: MediaStreamHandler): void {
this.mediaStreamHandler = handler;
}
/**
* Register a call's stream SID for audio routing.
*/
registerCallStream(callSid: string, streamSid: string): void {
this.callStreamMap.set(callSid, streamSid);
}
/**
* Unregister a call's stream SID.
*/
unregisterCallStream(callSid: string): void {
this.callStreamMap.delete(callSid);
}
/**
* Make an authenticated request to the Twilio API.
*/
private async apiRequest<T = unknown>(
endpoint: string,
params: Record<string, string>,
options?: { allowNotFound?: boolean },
): Promise<T> {
const response = await fetch(`${this.baseUrl}${endpoint}`, {
method: "POST",
headers: {
Authorization: `Basic ${Buffer.from(`${this.accountSid}:${this.authToken}`).toString("base64")}`,
"Content-Type": "application/x-www-form-urlencoded",
},
body: new URLSearchParams(params),
});
if (!response.ok) {
if (options?.allowNotFound && response.status === 404) {
return undefined as T;
}
const errorText = await response.text();
throw new Error(`Twilio API error: ${response.status} ${errorText}`);
}
const text = await response.text();
return text ? (JSON.parse(text) as T) : (undefined as T);
}
/**
* Verify Twilio webhook signature using HMAC-SHA1.
*
* Handles reverse proxy scenarios (Tailscale, nginx, ngrok) by reconstructing
* the public URL from forwarding headers.
*
* @see https://www.twilio.com/docs/usage/webhooks/webhooks-security
*/
verifyWebhook(ctx: WebhookContext): WebhookVerificationResult {
const result = verifyTwilioWebhook(ctx, this.authToken, {
publicUrl: this.currentPublicUrl || undefined,
allowNgrokFreeTier: this.options.allowNgrokFreeTier ?? true,
skipVerification: this.options.skipVerification,
});
if (!result.ok) {
console.warn(`[twilio] Webhook verification failed: ${result.reason}`);
if (result.verificationUrl) {
console.warn(`[twilio] Verification URL: ${result.verificationUrl}`);
}
}
return {
ok: result.ok,
reason: result.reason,
};
}
/**
* Parse Twilio webhook event into normalized format.
*/
parseWebhookEvent(ctx: WebhookContext): ProviderWebhookParseResult {
try {
const params = new URLSearchParams(ctx.rawBody);
const callIdFromQuery =
typeof ctx.query?.callId === "string" && ctx.query.callId.trim()
? ctx.query.callId.trim()
: undefined;
const event = this.normalizeEvent(params, callIdFromQuery);
// For Twilio, we must return TwiML. Most actions are driven by Calls API updates,
// so the webhook response is typically a pause to keep the call alive.
const twiml = this.generateTwimlResponse(ctx);
return {
events: event ? [event] : [],
providerResponseBody: twiml,
providerResponseHeaders: { "Content-Type": "application/xml" },
statusCode: 200,
};
} catch {
return { events: [], statusCode: 400 };
}
}
/**
* Parse Twilio direction to normalized format.
*/
private static parseDirection(
direction: string | null,
): "inbound" | "outbound" | undefined {
if (direction === "inbound") return "inbound";
if (direction === "outbound-api" || direction === "outbound-dial")
return "outbound";
return undefined;
}
/**
* Convert Twilio webhook params to normalized event format.
*/
private normalizeEvent(
params: URLSearchParams,
callIdOverride?: string,
): NormalizedEvent | null {
const callSid = params.get("CallSid") || "";
const baseEvent = {
id: crypto.randomUUID(),
callId: callIdOverride || callSid,
providerCallId: callSid,
timestamp: Date.now(),
direction: TwilioProvider.parseDirection(params.get("Direction")),
from: params.get("From") || undefined,
to: params.get("To") || undefined,
};
// Handle speech result (from <Gather>)
const speechResult = params.get("SpeechResult");
if (speechResult) {
return {
...baseEvent,
type: "call.speech",
transcript: speechResult,
isFinal: true,
confidence: parseFloat(params.get("Confidence") || "0.9"),
};
}
// Handle DTMF
const digits = params.get("Digits");
if (digits) {
return { ...baseEvent, type: "call.dtmf", digits };
}
// Handle call status changes
const callStatus = params.get("CallStatus");
switch (callStatus) {
case "initiated":
return { ...baseEvent, type: "call.initiated" };
case "ringing":
return { ...baseEvent, type: "call.ringing" };
case "in-progress":
return { ...baseEvent, type: "call.answered" };
case "completed":
case "busy":
case "no-answer":
case "failed":
return { ...baseEvent, type: "call.ended", reason: callStatus };
case "canceled":
return { ...baseEvent, type: "call.ended", reason: "hangup-bot" };
default:
return null;
}
}
private static readonly EMPTY_TWIML =
'<?xml version="1.0" encoding="UTF-8"?><Response></Response>';
private static readonly PAUSE_TWIML = `<?xml version="1.0" encoding="UTF-8"?>
<Response>
<Pause length="30"/>
</Response>`;
/**
* Generate TwiML response for webhook.
* When a call is answered, connects to media stream for bidirectional audio.
*/
private generateTwimlResponse(ctx?: WebhookContext): string {
if (!ctx) return TwilioProvider.EMPTY_TWIML;
const params = new URLSearchParams(ctx.rawBody);
const callStatus = params.get("CallStatus");
const direction = params.get("Direction");
console.log(
`[voice-call] generateTwimlResponse: status=${callStatus} direction=${direction}`,
);
// For inbound calls, answer immediately with stream
if (direction === "inbound") {
const streamUrl = this.getStreamUrl();
return streamUrl
? this.getStreamConnectXml(streamUrl)
: TwilioProvider.PAUSE_TWIML;
}
// For outbound calls, only connect to stream when call is in-progress
if (callStatus !== "in-progress") {
return TwilioProvider.EMPTY_TWIML;
}
const streamUrl = this.getStreamUrl();
return streamUrl
? this.getStreamConnectXml(streamUrl)
: TwilioProvider.PAUSE_TWIML;
}
/**
* Get the WebSocket URL for media streaming.
* Derives from the public URL origin + stream path.
*/
private getStreamUrl(): string | null {
if (!this.currentPublicUrl || !this.options.streamPath) {
return null;
}
// Extract just the origin (host) from the public URL, ignoring any path
const url = new URL(this.currentPublicUrl);
const origin = url.origin;
// Convert https:// to wss:// for WebSocket
const wsOrigin = origin
.replace(/^https:\/\//, "wss://")
.replace(/^http:\/\//, "ws://");
// Append the stream path
const path = this.options.streamPath.startsWith("/")
? this.options.streamPath
: `/${this.options.streamPath}`;
return `${wsOrigin}${path}`;
}
/**
* Generate TwiML to connect a call to a WebSocket media stream.
* This enables bidirectional audio streaming for real-time STT/TTS.
*
* @param streamUrl - WebSocket URL (wss://...) for the media stream
*/
getStreamConnectXml(streamUrl: string): string {
return `<?xml version="1.0" encoding="UTF-8"?>
<Response>
<Connect>
<Stream url="${escapeXml(streamUrl)}" />
</Connect>
</Response>`;
}
/**
* Initiate an outbound call via Twilio API.
* If inlineTwiml is provided, uses that directly (for notify mode).
* Otherwise, uses webhook URL for dynamic TwiML.
*/
async initiateCall(input: InitiateCallInput): Promise<InitiateCallResult> {
const url = new URL(input.webhookUrl);
url.searchParams.set("callId", input.callId);
// Build request params
const params: Record<string, string> = {
To: input.to,
From: input.from,
StatusCallback: url.toString(),
StatusCallbackEvent: "initiated ringing answered completed",
Timeout: "30",
};
// Use inline TwiML for notify mode (simpler, no webhook needed)
if (input.inlineTwiml) {
params.Twiml = input.inlineTwiml;
} else {
params.Url = url.toString();
}
const result = await this.apiRequest<TwilioCallResponse>(
"/Calls.json",
params,
);
this.callWebhookUrls.set(result.sid, url.toString());
return {
providerCallId: result.sid,
status: result.status === "queued" ? "queued" : "initiated",
};
}
/**
* Hang up a call via Twilio API.
*/
async hangupCall(input: HangupCallInput): Promise<void> {
this.callWebhookUrls.delete(input.providerCallId);
await this.apiRequest(
`/Calls/${input.providerCallId}.json`,
{ Status: "completed" },
{ allowNotFound: true },
);
}
/**
* Play TTS audio via Twilio.
*
* Two modes:
* 1. OpenAI TTS + Media Streams: If TTS provider and media stream are available,
* generates audio via OpenAI and streams it through WebSocket (preferred).
* 2. TwiML <Say>: Falls back to Twilio's native TTS with Polly voices.
* Note: This may not work on all Twilio accounts.
*/
async playTts(input: PlayTtsInput): Promise<void> {
// Try OpenAI TTS via media stream first (if configured)
const streamSid = this.callStreamMap.get(input.providerCallId);
if (this.ttsProvider && this.mediaStreamHandler && streamSid) {
try {
await this.playTtsViaStream(input.text, streamSid);
return;
} catch (err) {
console.warn(
`[voice-call] OpenAI TTS failed, falling back to Twilio <Say>:`,
err instanceof Error ? err.message : err,
);
// Fall through to TwiML <Say> fallback
}
}
// Fall back to TwiML <Say> (may not work on all accounts)
const webhookUrl = this.callWebhookUrls.get(input.providerCallId);
if (!webhookUrl) {
throw new Error(
"Missing webhook URL for this call (provider state not initialized)",
);
}
console.warn(
"[voice-call] Using TwiML <Say> fallback - OpenAI TTS not configured or media stream not active",
);
const pollyVoice = mapVoiceToPolly(input.voice);
const twiml = `<?xml version="1.0" encoding="UTF-8"?>
<Response>
<Say voice="${pollyVoice}" language="${input.locale || "en-US"}">${escapeXml(input.text)}</Say>
<Gather input="speech" speechTimeout="auto" action="${escapeXml(webhookUrl)}" method="POST">
<Say>.</Say>
</Gather>
</Response>`;
await this.apiRequest(`/Calls/${input.providerCallId}.json`, {
Twiml: twiml,
});
}
/**
* Play TTS via OpenAI and Twilio Media Streams.
* Generates audio with OpenAI TTS, converts to mu-law, and streams via WebSocket.
* Uses a jitter buffer to smooth out timing variations.
*/
private async playTtsViaStream(
text: string,
streamSid: string,
): Promise<void> {
if (!this.ttsProvider || !this.mediaStreamHandler) {
throw new Error("TTS provider and media stream handler required");
}
// Generate audio with OpenAI TTS (returns mu-law at 8kHz)
const muLawAudio = await this.ttsProvider.synthesizeForTwilio(text);
// Stream audio in 20ms chunks (160 bytes at 8kHz mu-law)
const CHUNK_SIZE = 160;
const CHUNK_DELAY_MS = 20;
for (const chunk of chunkAudio(muLawAudio, CHUNK_SIZE)) {
this.mediaStreamHandler.sendAudio(streamSid, chunk);
// Pace the audio to match real-time playback
await new Promise((resolve) => setTimeout(resolve, CHUNK_DELAY_MS));
}
// Send a mark to track when audio finishes
this.mediaStreamHandler.sendMark(streamSid, `tts-${Date.now()}`);
}
/**
* Start listening for speech via Twilio <Gather>.
*/
async startListening(input: StartListeningInput): Promise<void> {
const webhookUrl = this.callWebhookUrls.get(input.providerCallId);
if (!webhookUrl) {
throw new Error(
"Missing webhook URL for this call (provider state not initialized)",
);
}
const twiml = `<?xml version="1.0" encoding="UTF-8"?>
<Response>
<Gather input="speech" speechTimeout="auto" language="${input.language || "en-US"}" action="${escapeXml(webhookUrl)}" method="POST">
</Gather>
</Response>`;
await this.apiRequest(`/Calls/${input.providerCallId}.json`, {
Twiml: twiml,
});
}
/**
* Stop listening - for Twilio this is a no-op as <Gather> auto-ends.
*/
async stopListening(_input: StopListeningInput): Promise<void> {
// Twilio's <Gather> automatically stops on speech end
// No explicit action needed
}
}
// -----------------------------------------------------------------------------
// Twilio-specific types
// -----------------------------------------------------------------------------
interface TwilioCallResponse {
sid: string;
status: string;
direction: string;
from: string;
to: string;
uri: string;
}

View File

@@ -0,0 +1,171 @@
/**
* Voice call response generator - uses the embedded Pi agent for tool support.
* Routes voice responses through the same agent infrastructure as messaging.
*/
import crypto from "node:crypto";
import { loadCoreAgentDeps, type CoreConfig } from "./core-bridge.js";
import type { VoiceCallConfig } from "./config.js";
export type VoiceResponseParams = {
/** Voice call config */
voiceConfig: VoiceCallConfig;
/** Core Clawdbot config */
coreConfig: CoreConfig;
/** Call ID for session tracking */
callId: string;
/** Caller's phone number */
from: string;
/** Conversation transcript */
transcript: Array<{ speaker: "user" | "bot"; text: string }>;
/** Latest user message */
userMessage: string;
};
export type VoiceResponseResult = {
text: string | null;
error?: string;
};
type SessionEntry = {
sessionId: string;
updatedAt: number;
};
/**
* Generate a voice response using the embedded Pi agent with full tool support.
* Uses the same agent infrastructure as messaging for consistent behavior.
*/
export async function generateVoiceResponse(
params: VoiceResponseParams,
): Promise<VoiceResponseResult> {
const { voiceConfig, callId, from, transcript, userMessage, coreConfig } =
params;
if (!coreConfig) {
return { text: null, error: "Core config unavailable for voice response" };
}
let deps: Awaited<ReturnType<typeof loadCoreAgentDeps>>;
try {
deps = await loadCoreAgentDeps();
} catch (err) {
return {
text: null,
error:
err instanceof Error
? err.message
: "Unable to load core agent dependencies",
};
}
const cfg = coreConfig;
// Build voice-specific session key based on phone number
const normalizedPhone = from.replace(/\D/g, "");
const sessionKey = `voice:${normalizedPhone}`;
const agentId = "main";
// Resolve paths
const storePath = deps.resolveStorePath(cfg.session?.store, { agentId });
const agentDir = deps.resolveAgentDir(cfg, agentId);
const workspaceDir = deps.resolveAgentWorkspaceDir(cfg, agentId);
// Ensure workspace exists
await deps.ensureAgentWorkspace({ dir: workspaceDir });
// Load or create session entry
const sessionStore = deps.loadSessionStore(storePath);
const now = Date.now();
let sessionEntry = sessionStore[sessionKey] as SessionEntry | undefined;
if (!sessionEntry) {
sessionEntry = {
sessionId: crypto.randomUUID(),
updatedAt: now,
};
sessionStore[sessionKey] = sessionEntry;
await deps.saveSessionStore(storePath, sessionStore);
}
const sessionId = sessionEntry.sessionId;
const sessionFile = deps.resolveSessionFilePath(sessionId, sessionEntry, {
agentId,
});
// Resolve model from config
const modelRef =
voiceConfig.responseModel ||
`${deps.DEFAULT_PROVIDER}/${deps.DEFAULT_MODEL}`;
const slashIndex = modelRef.indexOf("/");
const provider =
slashIndex === -1 ? deps.DEFAULT_PROVIDER : modelRef.slice(0, slashIndex);
const model = slashIndex === -1 ? modelRef : modelRef.slice(slashIndex + 1);
// Resolve thinking level
const thinkLevel = deps.resolveThinkingDefault({ cfg, provider, model });
// Resolve agent identity for personalized prompt
const identity = deps.resolveAgentIdentity(cfg, agentId);
const agentName = identity?.name?.trim() || "assistant";
// Build system prompt with conversation history
const basePrompt =
voiceConfig.responseSystemPrompt ??
`You are ${agentName}, a helpful voice assistant on a phone call. Keep responses brief and conversational (1-2 sentences max). Be natural and friendly. The caller's phone number is ${from}. You have access to tools - use them when helpful.`;
let extraSystemPrompt = basePrompt;
if (transcript.length > 0) {
const history = transcript
.map(
(entry) =>
`${entry.speaker === "bot" ? "You" : "Caller"}: ${entry.text}`,
)
.join("\n");
extraSystemPrompt = `${basePrompt}\n\nConversation so far:\n${history}`;
}
// Resolve timeout
const timeoutMs =
voiceConfig.responseTimeoutMs ?? deps.resolveAgentTimeoutMs({ cfg });
const runId = `voice:${callId}:${Date.now()}`;
try {
const result = await deps.runEmbeddedPiAgent({
sessionId,
sessionKey,
messageProvider: "voice",
sessionFile,
workspaceDir,
config: cfg,
prompt: userMessage,
provider,
model,
thinkLevel,
verboseLevel: "off",
timeoutMs,
runId,
lane: "voice",
extraSystemPrompt,
agentDir,
});
// Extract text from payloads
const texts = (result.payloads ?? [])
.filter((p) => p.text && !p.isError)
.map((p) => p.text?.trim())
.filter(Boolean);
const text = texts.join(" ") || null;
if (!text && result.meta.aborted) {
return { text: null, error: "Response generation was aborted" };
}
return { text };
} catch (err) {
console.error(`[voice-call] Response generation failed:`, err);
return { text: null, error: String(err) };
}
}

View File

@@ -0,0 +1,194 @@
import type { CoreConfig } from "./core-bridge.js";
import type { VoiceCallConfig } from "./config.js";
import { validateProviderConfig } from "./config.js";
import { CallManager } from "./manager.js";
import type { VoiceCallProvider } from "./providers/base.js";
import { MockProvider } from "./providers/mock.js";
import { TelnyxProvider } from "./providers/telnyx.js";
import { OpenAITTSProvider } from "./providers/tts-openai.js";
import { TwilioProvider } from "./providers/twilio.js";
import { startTunnel, type TunnelResult } from "./tunnel.js";
import {
cleanupTailscaleExposure,
setupTailscaleExposure,
VoiceCallWebhookServer,
} from "./webhook.js";
export type VoiceCallRuntime = {
config: VoiceCallConfig;
provider: VoiceCallProvider;
manager: CallManager;
webhookServer: VoiceCallWebhookServer;
webhookUrl: string;
publicUrl: string | null;
stop: () => Promise<void>;
};
type Logger = {
info: (message: string) => void;
warn: (message: string) => void;
error: (message: string) => void;
debug: (message: string) => void;
};
function resolveProvider(config: VoiceCallConfig): VoiceCallProvider {
switch (config.provider) {
case "telnyx":
return new TelnyxProvider({
apiKey: config.telnyx?.apiKey ?? process.env.TELNYX_API_KEY,
connectionId:
config.telnyx?.connectionId ?? process.env.TELNYX_CONNECTION_ID,
publicKey: config.telnyx?.publicKey ?? process.env.TELNYX_PUBLIC_KEY,
});
case "twilio":
return new TwilioProvider(
{
accountSid:
config.twilio?.accountSid ?? process.env.TWILIO_ACCOUNT_SID,
authToken: config.twilio?.authToken ?? process.env.TWILIO_AUTH_TOKEN,
},
{
allowNgrokFreeTier: config.tunnel?.allowNgrokFreeTier ?? true,
publicUrl: config.publicUrl,
skipVerification: config.skipSignatureVerification,
streamPath: config.streaming?.enabled
? config.streaming.streamPath
: undefined,
},
);
case "mock":
return new MockProvider();
default:
throw new Error(
`Unsupported voice-call provider: ${String(config.provider)}`,
);
}
}
export async function createVoiceCallRuntime(params: {
config: VoiceCallConfig;
coreConfig: CoreConfig;
logger?: Logger;
}): Promise<VoiceCallRuntime> {
const { config, coreConfig, logger } = params;
const log = logger ?? {
info: console.log,
warn: console.warn,
error: console.error,
debug: console.debug,
};
if (!config.enabled) {
throw new Error(
"Voice call disabled. Enable the plugin entry in config.",
);
}
const validation = validateProviderConfig(config);
if (!validation.valid) {
throw new Error(`Invalid voice-call config: ${validation.errors.join("; ")}`);
}
const provider = resolveProvider(config);
const manager = new CallManager(config);
const webhookServer = new VoiceCallWebhookServer(
config,
manager,
provider,
coreConfig,
);
const localUrl = await webhookServer.start();
// Determine public URL - priority: config.publicUrl > tunnel > legacy tailscale
let publicUrl: string | null = config.publicUrl ?? null;
let tunnelResult: TunnelResult | null = null;
if (!publicUrl && config.tunnel?.provider && config.tunnel.provider !== "none") {
try {
tunnelResult = await startTunnel({
provider: config.tunnel.provider,
port: config.serve.port,
path: config.serve.path,
ngrokAuthToken:
config.tunnel.ngrokAuthToken ?? process.env.NGROK_AUTHTOKEN,
ngrokDomain: config.tunnel.ngrokDomain ?? process.env.NGROK_DOMAIN,
});
publicUrl = tunnelResult?.publicUrl ?? null;
} catch (err) {
log.error(
`[voice-call] Tunnel setup failed: ${
err instanceof Error ? err.message : String(err)
}`,
);
}
}
if (!publicUrl && config.tailscale?.mode !== "off") {
publicUrl = await setupTailscaleExposure(config);
}
const webhookUrl = publicUrl ?? localUrl;
if (publicUrl && provider.name === "twilio") {
(provider as TwilioProvider).setPublicUrl(publicUrl);
}
if (provider.name === "twilio" && config.streaming?.enabled) {
const twilioProvider = provider as TwilioProvider;
const openaiApiKey =
config.streaming.openaiApiKey || process.env.OPENAI_API_KEY;
if (openaiApiKey) {
try {
const ttsProvider = new OpenAITTSProvider({
apiKey: openaiApiKey,
voice: config.tts.voice,
model: config.tts.model,
instructions: config.tts.instructions,
});
twilioProvider.setTTSProvider(ttsProvider);
log.info("[voice-call] OpenAI TTS provider configured");
} catch (err) {
log.warn(
`[voice-call] Failed to initialize OpenAI TTS: ${
err instanceof Error ? err.message : String(err)
}`,
);
}
} else {
log.warn("[voice-call] OpenAI TTS key missing; streaming TTS disabled");
}
const mediaHandler = webhookServer.getMediaStreamHandler();
if (mediaHandler) {
twilioProvider.setMediaStreamHandler(mediaHandler);
log.info("[voice-call] Media stream handler wired to provider");
}
}
manager.initialize(provider, webhookUrl);
const stop = async () => {
if (tunnelResult) {
await tunnelResult.stop();
}
await cleanupTailscaleExposure(config);
await webhookServer.stop();
};
log.info("[voice-call] Runtime initialized");
log.info(`[voice-call] Webhook URL: ${webhookUrl}`);
if (publicUrl) {
log.info(`[voice-call] Public URL: ${publicUrl}`);
}
return {
config,
provider,
manager,
webhookServer,
webhookUrl,
publicUrl,
stop,
};
}

View File

@@ -0,0 +1,330 @@
import { spawn } from "node:child_process";
import { getTailscaleDnsName } from "./webhook.js";
/**
* Tunnel configuration for exposing the webhook server.
*/
export interface TunnelConfig {
/** Tunnel provider: ngrok, tailscale-serve, or tailscale-funnel */
provider: "ngrok" | "tailscale-serve" | "tailscale-funnel" | "none";
/** Local port to tunnel */
port: number;
/** Path prefix for the tunnel (e.g., /voice/webhook) */
path: string;
/** ngrok auth token (optional, enables longer sessions) */
ngrokAuthToken?: string;
/** ngrok custom domain (paid feature) */
ngrokDomain?: string;
}
/**
* Result of starting a tunnel.
*/
export interface TunnelResult {
/** The public URL */
publicUrl: string;
/** Function to stop the tunnel */
stop: () => Promise<void>;
/** Tunnel provider name */
provider: string;
}
/**
* Start an ngrok tunnel to expose the local webhook server.
*
* Uses the ngrok CLI which must be installed: https://ngrok.com/download
*
* @example
* const tunnel = await startNgrokTunnel({ port: 3334, path: '/voice/webhook' });
* console.log('Public URL:', tunnel.publicUrl);
* // Later: await tunnel.stop();
*/
export async function startNgrokTunnel(config: {
port: number;
path: string;
authToken?: string;
domain?: string;
}): Promise<TunnelResult> {
// Set auth token if provided
if (config.authToken) {
await runNgrokCommand(["config", "add-authtoken", config.authToken]);
}
// Build ngrok command args
const args = [
"http",
String(config.port),
"--log",
"stdout",
"--log-format",
"json",
];
// Add custom domain if provided (paid ngrok feature)
if (config.domain) {
args.push("--domain", config.domain);
}
return new Promise((resolve, reject) => {
const proc = spawn("ngrok", args, {
stdio: ["ignore", "pipe", "pipe"],
});
let resolved = false;
let publicUrl: string | null = null;
let outputBuffer = "";
const timeout = setTimeout(() => {
if (!resolved) {
resolved = true;
proc.kill("SIGTERM");
reject(new Error("ngrok startup timed out (30s)"));
}
}, 30000);
const processLine = (line: string) => {
try {
const log = JSON.parse(line);
// ngrok logs the public URL in a 'started tunnel' message
if (log.msg === "started tunnel" && log.url) {
publicUrl = log.url;
}
// Also check for the URL field directly
if (log.addr && log.url && !publicUrl) {
publicUrl = log.url;
}
// Check for ready state
if (publicUrl && !resolved) {
resolved = true;
clearTimeout(timeout);
// Add path to the public URL
const fullUrl = publicUrl + config.path;
console.log(`[voice-call] ngrok tunnel active: ${fullUrl}`);
resolve({
publicUrl: fullUrl,
provider: "ngrok",
stop: async () => {
proc.kill("SIGTERM");
await new Promise<void>((res) => {
proc.on("close", () => res());
setTimeout(res, 2000); // Fallback timeout
});
},
});
}
} catch {
// Not JSON, might be startup message
}
};
proc.stdout.on("data", (data: Buffer) => {
outputBuffer += data.toString();
const lines = outputBuffer.split("\n");
outputBuffer = lines.pop() || "";
for (const line of lines) {
if (line.trim()) {
processLine(line);
}
}
});
proc.stderr.on("data", (data: Buffer) => {
const msg = data.toString();
// Check for common errors
if (msg.includes("ERR_NGROK")) {
if (!resolved) {
resolved = true;
clearTimeout(timeout);
reject(new Error(`ngrok error: ${msg}`));
}
}
});
proc.on("error", (err) => {
if (!resolved) {
resolved = true;
clearTimeout(timeout);
reject(new Error(`Failed to start ngrok: ${err.message}`));
}
});
proc.on("close", (code) => {
if (!resolved) {
resolved = true;
clearTimeout(timeout);
reject(new Error(`ngrok exited unexpectedly with code ${code}`));
}
});
});
}
/**
* Run an ngrok command and wait for completion.
*/
async function runNgrokCommand(args: string[]): Promise<string> {
return new Promise((resolve, reject) => {
const proc = spawn("ngrok", args, {
stdio: ["ignore", "pipe", "pipe"],
});
let stdout = "";
let stderr = "";
proc.stdout.on("data", (data) => {
stdout += data.toString();
});
proc.stderr.on("data", (data) => {
stderr += data.toString();
});
proc.on("close", (code) => {
if (code === 0) {
resolve(stdout);
} else {
reject(new Error(`ngrok command failed: ${stderr || stdout}`));
}
});
proc.on("error", reject);
});
}
/**
* Check if ngrok is installed and available.
*/
export async function isNgrokAvailable(): Promise<boolean> {
return new Promise((resolve) => {
const proc = spawn("ngrok", ["version"], {
stdio: ["ignore", "pipe", "pipe"],
});
proc.on("close", (code) => {
resolve(code === 0);
});
proc.on("error", () => {
resolve(false);
});
});
}
/**
* Start a Tailscale serve/funnel tunnel.
*/
export async function startTailscaleTunnel(config: {
mode: "serve" | "funnel";
port: number;
path: string;
}): Promise<TunnelResult> {
// Get Tailscale DNS name
const dnsName = await getTailscaleDnsName();
if (!dnsName) {
throw new Error("Could not get Tailscale DNS name. Is Tailscale running?");
}
const localUrl = `http://127.0.0.1:${config.port}`;
return new Promise((resolve, reject) => {
const proc = spawn(
"tailscale",
[config.mode, "--bg", "--yes", "--set-path", config.path, localUrl],
{ stdio: ["ignore", "pipe", "pipe"] },
);
const timeout = setTimeout(() => {
proc.kill("SIGKILL");
reject(new Error(`Tailscale ${config.mode} timed out`));
}, 10000);
proc.on("close", (code) => {
clearTimeout(timeout);
if (code === 0) {
const publicUrl = `https://${dnsName}${config.path}`;
console.log(
`[voice-call] Tailscale ${config.mode} active: ${publicUrl}`,
);
resolve({
publicUrl,
provider: `tailscale-${config.mode}`,
stop: async () => {
await stopTailscaleTunnel(config.mode, config.path);
},
});
} else {
reject(new Error(`Tailscale ${config.mode} failed with code ${code}`));
}
});
proc.on("error", (err) => {
clearTimeout(timeout);
reject(err);
});
});
}
/**
* Stop a Tailscale serve/funnel tunnel.
*/
async function stopTailscaleTunnel(
mode: "serve" | "funnel",
path: string,
): Promise<void> {
return new Promise((resolve) => {
const proc = spawn("tailscale", [mode, "off", path], {
stdio: "ignore",
});
const timeout = setTimeout(() => {
proc.kill("SIGKILL");
resolve();
}, 5000);
proc.on("close", () => {
clearTimeout(timeout);
resolve();
});
});
}
/**
* Start a tunnel based on configuration.
*/
export async function startTunnel(
config: TunnelConfig,
): Promise<TunnelResult | null> {
switch (config.provider) {
case "ngrok":
return startNgrokTunnel({
port: config.port,
path: config.path,
authToken: config.ngrokAuthToken,
domain: config.ngrokDomain,
});
case "tailscale-serve":
return startTailscaleTunnel({
mode: "serve",
port: config.port,
path: config.path,
});
case "tailscale-funnel":
return startTailscaleTunnel({
mode: "funnel",
port: config.port,
path: config.path,
});
default:
return null;
}
}

View File

@@ -0,0 +1,272 @@
import { z } from "zod";
import type { CallMode } from "./config.js";
// -----------------------------------------------------------------------------
// Provider Identifiers
// -----------------------------------------------------------------------------
export const ProviderNameSchema = z.enum(["telnyx", "twilio", "mock"]);
export type ProviderName = z.infer<typeof ProviderNameSchema>;
// -----------------------------------------------------------------------------
// Core Call Identifiers
// -----------------------------------------------------------------------------
/** Internal call identifier (UUID) */
export type CallId = string;
/** Provider-specific call identifier */
export type ProviderCallId = string;
// -----------------------------------------------------------------------------
// Call Lifecycle States
// -----------------------------------------------------------------------------
export const CallStateSchema = z.enum([
// Non-terminal states
"initiated",
"ringing",
"answered",
"active",
"speaking",
"listening",
// Terminal states
"completed",
"hangup-user",
"hangup-bot",
"timeout",
"error",
"failed",
"no-answer",
"busy",
"voicemail",
]);
export type CallState = z.infer<typeof CallStateSchema>;
export const TerminalStates = new Set<CallState>([
"completed",
"hangup-user",
"hangup-bot",
"timeout",
"error",
"failed",
"no-answer",
"busy",
"voicemail",
]);
export const EndReasonSchema = z.enum([
"completed",
"hangup-user",
"hangup-bot",
"timeout",
"error",
"failed",
"no-answer",
"busy",
"voicemail",
]);
export type EndReason = z.infer<typeof EndReasonSchema>;
// -----------------------------------------------------------------------------
// Normalized Call Events
// -----------------------------------------------------------------------------
const BaseEventSchema = z.object({
id: z.string(),
callId: z.string(),
providerCallId: z.string().optional(),
timestamp: z.number(),
// Optional fields for inbound call detection
direction: z.enum(["inbound", "outbound"]).optional(),
from: z.string().optional(),
to: z.string().optional(),
});
export const NormalizedEventSchema = z.discriminatedUnion("type", [
BaseEventSchema.extend({
type: z.literal("call.initiated"),
}),
BaseEventSchema.extend({
type: z.literal("call.ringing"),
}),
BaseEventSchema.extend({
type: z.literal("call.answered"),
}),
BaseEventSchema.extend({
type: z.literal("call.active"),
}),
BaseEventSchema.extend({
type: z.literal("call.speaking"),
text: z.string(),
}),
BaseEventSchema.extend({
type: z.literal("call.speech"),
transcript: z.string(),
isFinal: z.boolean(),
confidence: z.number().min(0).max(1).optional(),
}),
BaseEventSchema.extend({
type: z.literal("call.silence"),
durationMs: z.number(),
}),
BaseEventSchema.extend({
type: z.literal("call.dtmf"),
digits: z.string(),
}),
BaseEventSchema.extend({
type: z.literal("call.ended"),
reason: EndReasonSchema,
}),
BaseEventSchema.extend({
type: z.literal("call.error"),
error: z.string(),
retryable: z.boolean().optional(),
}),
]);
export type NormalizedEvent = z.infer<typeof NormalizedEventSchema>;
// -----------------------------------------------------------------------------
// Call Direction
// -----------------------------------------------------------------------------
export const CallDirectionSchema = z.enum(["outbound", "inbound"]);
export type CallDirection = z.infer<typeof CallDirectionSchema>;
// -----------------------------------------------------------------------------
// Call Record
// -----------------------------------------------------------------------------
export const TranscriptEntrySchema = z.object({
timestamp: z.number(),
speaker: z.enum(["bot", "user"]),
text: z.string(),
isFinal: z.boolean().default(true),
});
export type TranscriptEntry = z.infer<typeof TranscriptEntrySchema>;
export const CallRecordSchema = z.object({
callId: z.string(),
providerCallId: z.string().optional(),
provider: ProviderNameSchema,
direction: CallDirectionSchema,
state: CallStateSchema,
from: z.string(),
to: z.string(),
sessionKey: z.string().optional(),
startedAt: z.number(),
answeredAt: z.number().optional(),
endedAt: z.number().optional(),
endReason: EndReasonSchema.optional(),
transcript: z.array(TranscriptEntrySchema).default([]),
processedEventIds: z.array(z.string()).default([]),
metadata: z.record(z.string(), z.unknown()).optional(),
});
export type CallRecord = z.infer<typeof CallRecordSchema>;
// -----------------------------------------------------------------------------
// Webhook Types
// -----------------------------------------------------------------------------
export type WebhookVerificationResult = {
ok: boolean;
reason?: string;
};
export type WebhookContext = {
headers: Record<string, string | string[] | undefined>;
rawBody: string;
url: string;
method: "GET" | "POST" | "PUT" | "DELETE" | "PATCH";
query?: Record<string, string | string[] | undefined>;
};
export type ProviderWebhookParseResult = {
events: NormalizedEvent[];
providerResponseBody?: string;
providerResponseHeaders?: Record<string, string>;
statusCode?: number;
};
// -----------------------------------------------------------------------------
// Provider Method Types
// -----------------------------------------------------------------------------
export type InitiateCallInput = {
callId: CallId;
from: string;
to: string;
webhookUrl: string;
clientState?: Record<string, string>;
/** Inline TwiML to execute (skips webhook, used for notify mode) */
inlineTwiml?: string;
};
export type InitiateCallResult = {
providerCallId: ProviderCallId;
status: "initiated" | "queued";
};
export type HangupCallInput = {
callId: CallId;
providerCallId: ProviderCallId;
reason: EndReason;
};
export type PlayTtsInput = {
callId: CallId;
providerCallId: ProviderCallId;
text: string;
voice?: string;
locale?: string;
};
export type StartListeningInput = {
callId: CallId;
providerCallId: ProviderCallId;
language?: string;
};
export type StopListeningInput = {
callId: CallId;
providerCallId: ProviderCallId;
};
// -----------------------------------------------------------------------------
// Outbound Call Options
// -----------------------------------------------------------------------------
export type OutboundCallOptions = {
/** Message to speak when call connects */
message?: string;
/** Call mode (overrides config default) */
mode?: CallMode;
};
// -----------------------------------------------------------------------------
// Tool Result Types
// -----------------------------------------------------------------------------
export type InitiateCallToolResult = {
success: boolean;
callId?: string;
status?: "initiated" | "queued" | "no-answer" | "busy" | "failed";
error?: string;
};
export type ContinueCallToolResult = {
success: boolean;
transcript?: string;
error?: string;
};
export type SpeakToUserToolResult = {
success: boolean;
error?: string;
};
export type EndCallToolResult = {
success: boolean;
error?: string;
};

View File

@@ -0,0 +1,12 @@
import os from "node:os";
import path from "node:path";
export function resolveUserPath(input: string): string {
const trimmed = input.trim();
if (!trimmed) return trimmed;
if (trimmed.startsWith("~")) {
const expanded = trimmed.replace(/^~(?=$|[\\/])/, os.homedir());
return path.resolve(expanded);
}
return path.resolve(trimmed);
}

View File

@@ -0,0 +1,65 @@
/**
* Voice mapping and XML utilities for voice call providers.
*/
/**
* Escape XML special characters for TwiML and other XML responses.
*/
export function escapeXml(text: string): string {
return text
.replace(/&/g, "&amp;")
.replace(/</g, "&lt;")
.replace(/>/g, "&gt;")
.replace(/"/g, "&quot;")
.replace(/'/g, "&apos;");
}
/**
* Map of OpenAI voice names to similar Twilio Polly voices.
*/
const OPENAI_TO_POLLY_MAP: Record<string, string> = {
alloy: "Polly.Joanna", // neutral, warm
echo: "Polly.Matthew", // male, warm
fable: "Polly.Amy", // British, expressive
onyx: "Polly.Brian", // deep male
nova: "Polly.Salli", // female, friendly
shimmer: "Polly.Kimberly", // female, clear
};
/**
* Default Polly voice when no mapping is found.
*/
export const DEFAULT_POLLY_VOICE = "Polly.Joanna";
/**
* Map OpenAI voice names to Twilio Polly equivalents.
* Falls through if already a valid Polly/Google voice.
*
* @param voice - OpenAI voice name (alloy, echo, etc.) or Polly voice name
* @returns Polly voice name suitable for Twilio TwiML
*/
export function mapVoiceToPolly(voice: string | undefined): string {
if (!voice) return DEFAULT_POLLY_VOICE;
// Already a Polly/Google voice - pass through
if (voice.startsWith("Polly.") || voice.startsWith("Google.")) {
return voice;
}
// Map OpenAI voices to Polly equivalents
return OPENAI_TO_POLLY_MAP[voice.toLowerCase()] || DEFAULT_POLLY_VOICE;
}
/**
* Check if a voice name is a known OpenAI voice.
*/
export function isOpenAiVoice(voice: string): boolean {
return voice.toLowerCase() in OPENAI_TO_POLLY_MAP;
}
/**
* Get all supported OpenAI voice names.
*/
export function getOpenAiVoiceNames(): string[] {
return Object.keys(OPENAI_TO_POLLY_MAP);
}

View File

@@ -0,0 +1,197 @@
import crypto from "node:crypto";
import type { WebhookContext } from "./types.js";
/**
* Validate Twilio webhook signature using HMAC-SHA1.
*
* Twilio signs requests by concatenating the URL with sorted POST params,
* then computing HMAC-SHA1 with the auth token.
*
* @see https://www.twilio.com/docs/usage/webhooks/webhooks-security
*/
export function validateTwilioSignature(
authToken: string,
signature: string | undefined,
url: string,
params: URLSearchParams,
): boolean {
if (!signature) {
return false;
}
// Build the string to sign: URL + sorted params (key+value pairs)
let dataToSign = url;
// Sort params alphabetically and append key+value
const sortedParams = Array.from(params.entries()).sort((a, b) =>
a[0].localeCompare(b[0]),
);
for (const [key, value] of sortedParams) {
dataToSign += key + value;
}
// HMAC-SHA1 with auth token, then base64 encode
const expectedSignature = crypto
.createHmac("sha1", authToken)
.update(dataToSign)
.digest("base64");
// Use timing-safe comparison to prevent timing attacks
return timingSafeEqual(signature, expectedSignature);
}
/**
* Timing-safe string comparison to prevent timing attacks.
*/
function timingSafeEqual(a: string, b: string): boolean {
if (a.length !== b.length) {
// Still do comparison to maintain constant time
const dummy = Buffer.from(a);
crypto.timingSafeEqual(dummy, dummy);
return false;
}
const bufA = Buffer.from(a);
const bufB = Buffer.from(b);
return crypto.timingSafeEqual(bufA, bufB);
}
/**
* Reconstruct the public webhook URL from request headers.
*
* When behind a reverse proxy (Tailscale, nginx, ngrok), the original URL
* used by Twilio differs from the local request URL. We use standard
* forwarding headers to reconstruct it.
*
* Priority order:
* 1. X-Forwarded-Proto + X-Forwarded-Host (standard proxy headers)
* 2. X-Original-Host (nginx)
* 3. Ngrok-Forwarded-Host (ngrok specific)
* 4. Host header (direct connection)
*/
export function reconstructWebhookUrl(ctx: WebhookContext): string {
const { headers } = ctx;
const proto = getHeader(headers, "x-forwarded-proto") || "https";
const forwardedHost =
getHeader(headers, "x-forwarded-host") ||
getHeader(headers, "x-original-host") ||
getHeader(headers, "ngrok-forwarded-host") ||
getHeader(headers, "host") ||
"";
// Extract path from the context URL (fallback to "/" on parse failure)
let path = "/";
try {
const parsed = new URL(ctx.url);
path = parsed.pathname + parsed.search;
} catch {
// URL parsing failed
}
// Remove port from host (ngrok URLs don't have ports)
const host = forwardedHost.split(":")[0] || forwardedHost;
return `${proto}://${host}${path}`;
}
/**
* Get a header value, handling both string and string[] types.
*/
function getHeader(
headers: Record<string, string | string[] | undefined>,
name: string,
): string | undefined {
const value = headers[name.toLowerCase()];
if (Array.isArray(value)) {
return value[0];
}
return value;
}
/**
* Result of Twilio webhook verification with detailed info.
*/
export interface TwilioVerificationResult {
ok: boolean;
reason?: string;
/** The URL that was used for verification (for debugging) */
verificationUrl?: string;
/** Whether we're running behind ngrok free tier */
isNgrokFreeTier?: boolean;
}
/**
* Verify Twilio webhook with full context and detailed result.
*
* Handles the special case of ngrok free tier where signature validation
* may fail due to URL discrepancies (ngrok adds interstitial page handling).
*/
export function verifyTwilioWebhook(
ctx: WebhookContext,
authToken: string,
options?: {
/** Override the public URL (e.g., from config) */
publicUrl?: string;
/** Allow ngrok free tier compatibility mode (less secure) */
allowNgrokFreeTier?: boolean;
/** Skip verification entirely (only for development) */
skipVerification?: boolean;
},
): TwilioVerificationResult {
// Allow skipping verification for development/testing
if (options?.skipVerification) {
return { ok: true, reason: "verification skipped (dev mode)" };
}
const signature = getHeader(ctx.headers, "x-twilio-signature");
if (!signature) {
return { ok: false, reason: "Missing X-Twilio-Signature header" };
}
// Reconstruct the URL Twilio used
const verificationUrl = options?.publicUrl || reconstructWebhookUrl(ctx);
// Parse the body as URL-encoded params
const params = new URLSearchParams(ctx.rawBody);
// Validate signature
const isValid = validateTwilioSignature(
authToken,
signature,
verificationUrl,
params,
);
if (isValid) {
return { ok: true, verificationUrl };
}
// Check if this is ngrok free tier - the URL might have different format
const isNgrokFreeTier =
verificationUrl.includes(".ngrok-free.app") ||
verificationUrl.includes(".ngrok.io");
if (isNgrokFreeTier && options?.allowNgrokFreeTier) {
console.warn(
"[voice-call] Twilio signature validation failed (proceeding for ngrok free tier compatibility)",
);
return {
ok: true,
reason: "ngrok free tier compatibility mode",
verificationUrl,
isNgrokFreeTier: true,
};
}
return {
ok: false,
reason: `Invalid signature for URL: ${verificationUrl}`,
verificationUrl,
isNgrokFreeTier,
};
}

View File

@@ -0,0 +1,480 @@
import { spawn } from "node:child_process";
import http from "node:http";
import { URL } from "node:url";
import type { VoiceCallConfig } from "./config.js";
import type { CoreConfig } from "./core-bridge.js";
import type { CallManager } from "./manager.js";
import type { MediaStreamConfig } from "./media-stream.js";
import { MediaStreamHandler } from "./media-stream.js";
import type { VoiceCallProvider } from "./providers/base.js";
import { OpenAIRealtimeSTTProvider } from "./providers/stt-openai-realtime.js";
import type { TwilioProvider } from "./providers/twilio.js";
import type { NormalizedEvent, WebhookContext } from "./types.js";
/**
* HTTP server for receiving voice call webhooks from providers.
* Supports WebSocket upgrades for media streams when streaming is enabled.
*/
export class VoiceCallWebhookServer {
private server: http.Server | null = null;
private config: VoiceCallConfig;
private manager: CallManager;
private provider: VoiceCallProvider;
private coreConfig: CoreConfig | null;
/** Media stream handler for bidirectional audio (when streaming enabled) */
private mediaStreamHandler: MediaStreamHandler | null = null;
constructor(
config: VoiceCallConfig,
manager: CallManager,
provider: VoiceCallProvider,
coreConfig?: CoreConfig,
) {
this.config = config;
this.manager = manager;
this.provider = provider;
this.coreConfig = coreConfig ?? null;
// Initialize media stream handler if streaming is enabled
if (config.streaming?.enabled) {
this.initializeMediaStreaming();
}
}
/**
* Get the media stream handler (for wiring to provider).
*/
getMediaStreamHandler(): MediaStreamHandler | null {
return this.mediaStreamHandler;
}
/**
* Initialize media streaming with OpenAI Realtime STT.
*/
private initializeMediaStreaming(): void {
const apiKey =
this.config.streaming?.openaiApiKey || process.env.OPENAI_API_KEY;
if (!apiKey) {
console.warn(
"[voice-call] Streaming enabled but no OpenAI API key found",
);
return;
}
const sttProvider = new OpenAIRealtimeSTTProvider({
apiKey,
model: this.config.streaming?.sttModel,
silenceDurationMs: this.config.streaming?.silenceDurationMs,
vadThreshold: this.config.streaming?.vadThreshold,
});
const streamConfig: MediaStreamConfig = {
sttProvider,
onTranscript: (providerCallId, transcript) => {
console.log(
`[voice-call] Transcript for ${providerCallId}: ${transcript}`,
);
// Look up our internal call ID from the provider call ID
const call = this.manager.getCallByProviderCallId(providerCallId);
if (!call) {
console.warn(
`[voice-call] No active call found for provider ID: ${providerCallId}`,
);
return;
}
// Create a speech event and process it through the manager
const event: NormalizedEvent = {
id: `stream-transcript-${Date.now()}`,
type: "call.speech",
callId: call.callId,
providerCallId,
timestamp: Date.now(),
transcript,
isFinal: true,
};
this.manager.processEvent(event);
// Auto-respond in conversation mode (inbound always, outbound if mode is conversation)
const callMode = call.metadata?.mode as string | undefined;
const shouldRespond =
call.direction === "inbound" || callMode === "conversation";
if (shouldRespond) {
this.handleInboundResponse(call.callId, transcript).catch((err) => {
console.warn(`[voice-call] Failed to auto-respond:`, err);
});
}
},
onPartialTranscript: (callId, partial) => {
console.log(`[voice-call] Partial for ${callId}: ${partial}`);
},
onConnect: (callId, streamSid) => {
console.log(
`[voice-call] Media stream connected: ${callId} -> ${streamSid}`,
);
// Register stream with provider for TTS routing
if (this.provider.name === "twilio") {
(this.provider as TwilioProvider).registerCallStream(
callId,
streamSid,
);
}
// Speak initial message if one was provided when call was initiated
// Use setTimeout to allow stream setup to complete
setTimeout(() => {
this.manager.speakInitialMessage(callId).catch((err) => {
console.warn(`[voice-call] Failed to speak initial message:`, err);
});
}, 500);
},
onDisconnect: (callId) => {
console.log(`[voice-call] Media stream disconnected: ${callId}`);
if (this.provider.name === "twilio") {
(this.provider as TwilioProvider).unregisterCallStream(callId);
}
},
};
this.mediaStreamHandler = new MediaStreamHandler(streamConfig);
console.log("[voice-call] Media streaming initialized");
}
/**
* Start the webhook server.
*/
async start(): Promise<string> {
const { port, bind, path: webhookPath } = this.config.serve;
const streamPath = this.config.streaming?.streamPath || "/voice/stream";
return new Promise((resolve, reject) => {
this.server = http.createServer((req, res) => {
this.handleRequest(req, res, webhookPath).catch((err) => {
console.error("[voice-call] Webhook error:", err);
res.statusCode = 500;
res.end("Internal Server Error");
});
});
// Handle WebSocket upgrades for media streams
if (this.mediaStreamHandler) {
this.server.on("upgrade", (request, socket, head) => {
const url = new URL(
request.url || "/",
`http://${request.headers.host}`,
);
if (url.pathname === streamPath) {
console.log("[voice-call] WebSocket upgrade for media stream");
this.mediaStreamHandler?.handleUpgrade(request, socket, head);
} else {
socket.destroy();
}
});
}
this.server.on("error", reject);
this.server.listen(port, bind, () => {
const url = `http://${bind}:${port}${webhookPath}`;
console.log(`[voice-call] Webhook server listening on ${url}`);
if (this.mediaStreamHandler) {
console.log(
`[voice-call] Media stream WebSocket on ws://${bind}:${port}${streamPath}`,
);
}
resolve(url);
});
});
}
/**
* Stop the webhook server.
*/
async stop(): Promise<void> {
return new Promise((resolve) => {
if (this.server) {
this.server.close(() => {
this.server = null;
resolve();
});
} else {
resolve();
}
});
}
/**
* Handle incoming HTTP request.
*/
private async handleRequest(
req: http.IncomingMessage,
res: http.ServerResponse,
webhookPath: string,
): Promise<void> {
const url = new URL(req.url || "/", `http://${req.headers.host}`);
// Check path
if (!url.pathname.startsWith(webhookPath)) {
res.statusCode = 404;
res.end("Not Found");
return;
}
// Only accept POST
if (req.method !== "POST") {
res.statusCode = 405;
res.end("Method Not Allowed");
return;
}
// Read body
const body = await this.readBody(req);
// Build webhook context
const ctx: WebhookContext = {
headers: req.headers as Record<string, string | string[] | undefined>,
rawBody: body,
url: `http://${req.headers.host}${req.url}`,
method: "POST",
query: Object.fromEntries(url.searchParams),
};
// Verify signature
const verification = this.provider.verifyWebhook(ctx);
if (!verification.ok) {
console.warn(
`[voice-call] Webhook verification failed: ${verification.reason}`,
);
res.statusCode = 401;
res.end("Unauthorized");
return;
}
// Parse events
const result = this.provider.parseWebhookEvent(ctx);
// Process each event
for (const event of result.events) {
try {
this.manager.processEvent(event);
} catch (err) {
console.error(
`[voice-call] Error processing event ${event.type}:`,
err,
);
}
}
// Send response
res.statusCode = result.statusCode || 200;
if (result.providerResponseHeaders) {
for (const [key, value] of Object.entries(
result.providerResponseHeaders,
)) {
res.setHeader(key, value);
}
}
res.end(result.providerResponseBody || "OK");
}
/**
* Read request body as string.
*/
private readBody(req: http.IncomingMessage): Promise<string> {
return new Promise((resolve, reject) => {
const chunks: Buffer[] = [];
req.on("data", (chunk) => chunks.push(chunk));
req.on("end", () => resolve(Buffer.concat(chunks).toString("utf-8")));
req.on("error", reject);
});
}
/**
* Handle auto-response for inbound calls using the agent system.
* Supports tool calling for richer voice interactions.
*/
private async handleInboundResponse(
callId: string,
userMessage: string,
): Promise<void> {
console.log(
`[voice-call] Auto-responding to inbound call ${callId}: "${userMessage}"`,
);
// Get call context for conversation history
const call = this.manager.getCall(callId);
if (!call) {
console.warn(`[voice-call] Call ${callId} not found for auto-response`);
return;
}
if (!this.coreConfig) {
console.warn("[voice-call] Core config missing; skipping auto-response");
return;
}
try {
const { generateVoiceResponse } = await import("./response-generator.js");
const result = await generateVoiceResponse({
voiceConfig: this.config,
coreConfig: this.coreConfig,
callId,
from: call.from,
transcript: call.transcript,
userMessage,
});
if (result.error) {
console.error(
`[voice-call] Response generation error: ${result.error}`,
);
return;
}
if (result.text) {
console.log(`[voice-call] AI response: "${result.text}"`);
await this.manager.speak(callId, result.text);
}
} catch (err) {
console.error(`[voice-call] Auto-response error:`, err);
}
}
}
/**
* Resolve the current machine's Tailscale DNS name.
*/
export type TailscaleSelfInfo = {
dnsName: string | null;
nodeId: string | null;
};
/**
* Run a tailscale command with timeout, collecting stdout.
*/
function runTailscaleCommand(
args: string[],
timeoutMs = 2500,
): Promise<{ code: number; stdout: string }> {
return new Promise((resolve) => {
const proc = spawn("tailscale", args, {
stdio: ["ignore", "pipe", "pipe"],
});
let stdout = "";
proc.stdout.on("data", (data) => {
stdout += data;
});
const timer = setTimeout(() => {
proc.kill("SIGKILL");
resolve({ code: -1, stdout: "" });
}, timeoutMs);
proc.on("close", (code) => {
clearTimeout(timer);
resolve({ code: code ?? -1, stdout });
});
});
}
export async function getTailscaleSelfInfo(): Promise<TailscaleSelfInfo | null> {
const { code, stdout } = await runTailscaleCommand(["status", "--json"]);
if (code !== 0) return null;
try {
const status = JSON.parse(stdout);
return {
dnsName: status.Self?.DNSName?.replace(/\.$/, "") || null,
nodeId: status.Self?.ID || null,
};
} catch {
return null;
}
}
export async function getTailscaleDnsName(): Promise<string | null> {
const info = await getTailscaleSelfInfo();
return info?.dnsName ?? null;
}
export async function setupTailscaleExposureRoute(opts: {
mode: "serve" | "funnel";
path: string;
localUrl: string;
}): Promise<string | null> {
const dnsName = await getTailscaleDnsName();
if (!dnsName) {
console.warn("[voice-call] Could not get Tailscale DNS name");
return null;
}
const { code } = await runTailscaleCommand([
opts.mode,
"--bg",
"--yes",
"--set-path",
opts.path,
opts.localUrl,
]);
if (code === 0) {
const publicUrl = `https://${dnsName}${opts.path}`;
console.log(`[voice-call] Tailscale ${opts.mode} active: ${publicUrl}`);
return publicUrl;
}
console.warn(`[voice-call] Tailscale ${opts.mode} failed`);
return null;
}
export async function cleanupTailscaleExposureRoute(opts: {
mode: "serve" | "funnel";
path: string;
}): Promise<void> {
await runTailscaleCommand([opts.mode, "off", opts.path]);
}
/**
* Setup Tailscale serve/funnel for the webhook server.
* This is a helper that shells out to `tailscale serve` or `tailscale funnel`.
*/
export async function setupTailscaleExposure(
config: VoiceCallConfig,
): Promise<string | null> {
if (config.tailscale.mode === "off") {
return null;
}
const mode = config.tailscale.mode === "funnel" ? "funnel" : "serve";
// Include the path suffix so tailscale forwards to the correct endpoint
// (tailscale strips the mount path prefix when proxying)
const localUrl = `http://127.0.0.1:${config.serve.port}${config.serve.path}`;
return setupTailscaleExposureRoute({
mode,
path: config.tailscale.path,
localUrl,
});
}
/**
* Cleanup Tailscale serve/funnel.
*/
export async function cleanupTailscaleExposure(
config: VoiceCallConfig,
): Promise<void> {
if (config.tailscale.mode === "off") {
return;
}
const mode = config.tailscale.mode === "funnel" ? "funnel" : "serve";
await cleanupTailscaleExposureRoute({ mode, path: config.tailscale.path });
}