From b5f7ba502de237a50bb6c1143b28788d27f19454 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Wed, 14 Jan 2026 05:40:19 +0000 Subject: [PATCH] refactor(voice-call): split manager --- extensions/voice-call/src/manager.ts | 746 +----------------- extensions/voice-call/src/manager/context.ts | 22 + extensions/voice-call/src/manager/events.ts | 178 +++++ extensions/voice-call/src/manager/lookup.ts | 34 + extensions/voice-call/src/manager/outbound.ts | 247 ++++++ extensions/voice-call/src/manager/state.ts | 51 ++ extensions/voice-call/src/manager/store.ts | 89 +++ extensions/voice-call/src/manager/timers.ts | 87 ++ extensions/voice-call/src/manager/twiml.ts | 10 + 9 files changed, 760 insertions(+), 704 deletions(-) create mode 100644 extensions/voice-call/src/manager/context.ts create mode 100644 extensions/voice-call/src/manager/events.ts create mode 100644 extensions/voice-call/src/manager/lookup.ts create mode 100644 extensions/voice-call/src/manager/outbound.ts create mode 100644 extensions/voice-call/src/manager/state.ts create mode 100644 extensions/voice-call/src/manager/store.ts create mode 100644 extensions/voice-call/src/manager/timers.ts create mode 100644 extensions/voice-call/src/manager/twiml.ts diff --git a/extensions/voice-call/src/manager.ts b/extensions/voice-call/src/manager.ts index 9020a5780..9173f76b5 100644 --- a/extensions/voice-call/src/manager.ts +++ b/extensions/voice-call/src/manager.ts @@ -1,23 +1,27 @@ -import crypto from "node:crypto"; import fs from "node:fs"; -import fsp from "node:fs/promises"; import os from "node:os"; import path from "node:path"; import { resolveUserPath } from "./utils.js"; -import type { CallMode, VoiceCallConfig } from "./config.js"; +import type { VoiceCallConfig } from "./config.js"; import type { VoiceCallProvider } from "./providers/base.js"; import { type CallId, type CallRecord, - CallRecordSchema, - type CallState, type NormalizedEvent, type OutboundCallOptions, - TerminalStates, - type TranscriptEntry, } from "./types.js"; -import { escapeXml, mapVoiceToPolly } from "./voice-mapping.js"; +import type { CallManagerContext } from "./manager/context.js"; +import { processEvent } from "./manager/events.js"; +import { getCallByProviderCallId } from "./manager/lookup.js"; +import { + continueCall, + endCall, + initiateCall, + speak, + speakInitialMessage, +} from "./manager/outbound.js"; +import { getCallHistoryFromStore, loadActiveCallsFromStore } from "./manager/store.js"; /** * Manages voice calls: state machine, persistence, and provider coordination. @@ -51,6 +55,20 @@ export class CallManager { this.storePath = resolveUserPath(rawPath); } + private buildContext(): CallManagerContext { + return { + activeCalls: this.activeCalls, + providerCallIdMap: this.providerCallIdMap, + processedEventIds: this.processedEventIds, + provider: this.provider, + config: this.config, + storePath: this.storePath, + webhookUrl: this.webhookUrl, + transcriptWaiters: this.transcriptWaiters, + maxDurationTimers: this.maxDurationTimers, + }; + } + /** * Initialize the call manager with a provider. */ @@ -62,7 +80,10 @@ export class CallManager { fs.mkdirSync(this.storePath, { recursive: true }); // Load any persisted active calls - this.loadActiveCalls(); + const restored = loadActiveCallsFromStore(this.storePath); + this.activeCalls = restored.activeCalls; + this.providerCallIdMap = restored.providerCallIdMap; + this.processedEventIds = restored.processedEventIds; } /** @@ -83,102 +104,7 @@ export class CallManager { sessionKey?: string, options?: OutboundCallOptions | string, ): Promise<{ callId: CallId; success: boolean; error?: string }> { - // Support legacy string argument for initialMessage - const opts: OutboundCallOptions = - typeof options === "string" ? { message: options } : (options ?? {}); - const initialMessage = opts.message; - const mode = opts.mode ?? this.config.outbound.defaultMode; - if (!this.provider) { - return { callId: "", success: false, error: "Provider not initialized" }; - } - - if (!this.webhookUrl) { - return { - callId: "", - success: false, - error: "Webhook URL not configured", - }; - } - - // Check concurrent call limit - const activeCalls = this.getActiveCalls(); - if (activeCalls.length >= this.config.maxConcurrentCalls) { - return { - callId: "", - success: false, - error: `Maximum concurrent calls (${this.config.maxConcurrentCalls}) reached`, - }; - } - - const callId = crypto.randomUUID(); - const from = - this.config.fromNumber || - (this.provider?.name === "mock" ? "+15550000000" : undefined); - if (!from) { - return { callId: "", success: false, error: "fromNumber not configured" }; - } - - // Create call record with mode in metadata - const callRecord: CallRecord = { - callId, - provider: this.provider.name, - direction: "outbound", - state: "initiated", - from, - to, - sessionKey, - startedAt: Date.now(), - transcript: [], - processedEventIds: [], - metadata: { - ...(initialMessage && { initialMessage }), - mode, - }, - }; - - this.activeCalls.set(callId, callRecord); - this.persistCallRecord(callRecord); - - try { - // For notify mode with a message, use inline TwiML with - let inlineTwiml: string | undefined; - if (mode === "notify" && initialMessage) { - const pollyVoice = mapVoiceToPolly(this.config.tts.voice); - inlineTwiml = this.generateNotifyTwiml(initialMessage, pollyVoice); - console.log( - `[voice-call] Using inline TwiML for notify mode (voice: ${pollyVoice})`, - ); - } - - const result = await this.provider.initiateCall({ - callId, - from, - to, - webhookUrl: this.webhookUrl, - inlineTwiml, - }); - - callRecord.providerCallId = result.providerCallId; - this.providerCallIdMap.set(result.providerCallId, callId); // Map providerCallId to internal callId - this.persistCallRecord(callRecord); - - return { callId, success: true }; - } catch (err) { - callRecord.state = "failed"; - callRecord.endedAt = Date.now(); - callRecord.endReason = "failed"; - this.persistCallRecord(callRecord); - this.activeCalls.delete(callId); - if (callRecord.providerCallId) { - this.providerCallIdMap.delete(callRecord.providerCallId); - } - - return { - callId, - success: false, - error: err instanceof Error ? err.message : String(err), - }; - } + return await initiateCall(this.buildContext(), to, sessionKey, options); } /** @@ -188,42 +114,7 @@ export class CallManager { callId: CallId, text: string, ): Promise<{ success: boolean; error?: string }> { - const call = this.activeCalls.get(callId); - if (!call) { - return { success: false, error: "Call not found" }; - } - - if (!this.provider || !call.providerCallId) { - return { success: false, error: "Call not connected" }; - } - - if (TerminalStates.has(call.state)) { - return { success: false, error: "Call has ended" }; - } - - try { - // Update state - call.state = "speaking"; - this.persistCallRecord(call); - - // Add to transcript - this.addTranscriptEntry(call, "bot", text); - - // Play TTS - await this.provider.playTts({ - callId, - providerCallId: call.providerCallId, - text, - voice: this.config.tts.voice, - }); - - return { success: true }; - } catch (err) { - return { - success: false, - error: err instanceof Error ? err.message : String(err), - }; - } + return await speak(this.buildContext(), callId, text); } /** @@ -232,135 +123,7 @@ export class CallManager { * In notify mode, auto-hangup after the message is delivered. */ async speakInitialMessage(providerCallId: string): Promise { - const call = this.getCallByProviderCallId(providerCallId); - if (!call) { - console.warn( - `[voice-call] speakInitialMessage: no call found for ${providerCallId}`, - ); - return; - } - - const initialMessage = call.metadata?.initialMessage as string | undefined; - const mode = (call.metadata?.mode as CallMode) ?? "conversation"; - - if (!initialMessage) { - console.log( - `[voice-call] speakInitialMessage: no initial message for ${call.callId}`, - ); - return; - } - - // Clear the initial message so we don't speak it again - if (call.metadata) { - delete call.metadata.initialMessage; - this.persistCallRecord(call); - } - - console.log( - `[voice-call] Speaking initial message for call ${call.callId} (mode: ${mode})`, - ); - const result = await this.speak(call.callId, initialMessage); - if (!result.success) { - console.warn( - `[voice-call] Failed to speak initial message: ${result.error}`, - ); - return; - } - - // In notify mode, auto-hangup after delay - if (mode === "notify") { - const delaySec = this.config.outbound.notifyHangupDelaySec; - console.log( - `[voice-call] Notify mode: auto-hangup in ${delaySec}s for call ${call.callId}`, - ); - setTimeout(async () => { - const currentCall = this.getCall(call.callId); - if (currentCall && !TerminalStates.has(currentCall.state)) { - console.log( - `[voice-call] Notify mode: hanging up call ${call.callId}`, - ); - await this.endCall(call.callId); - } - }, delaySec * 1000); - } - } - - /** - * Start max duration timer for a call. - * Auto-hangup when maxDurationSeconds is reached. - */ - private startMaxDurationTimer(callId: CallId): void { - // Clear any existing timer - this.clearMaxDurationTimer(callId); - - const maxDurationMs = this.config.maxDurationSeconds * 1000; - console.log( - `[voice-call] Starting max duration timer (${this.config.maxDurationSeconds}s) for call ${callId}`, - ); - - const timer = setTimeout(async () => { - this.maxDurationTimers.delete(callId); - const call = this.getCall(callId); - if (call && !TerminalStates.has(call.state)) { - console.log( - `[voice-call] Max duration reached (${this.config.maxDurationSeconds}s), ending call ${callId}`, - ); - call.endReason = "timeout"; - this.persistCallRecord(call); - await this.endCall(callId); - } - }, maxDurationMs); - - this.maxDurationTimers.set(callId, timer); - } - - /** - * Clear max duration timer for a call. - */ - private clearMaxDurationTimer(callId: CallId): void { - const timer = this.maxDurationTimers.get(callId); - if (timer) { - clearTimeout(timer); - this.maxDurationTimers.delete(callId); - } - } - - private clearTranscriptWaiter(callId: CallId): void { - const waiter = this.transcriptWaiters.get(callId); - if (!waiter) return; - clearTimeout(waiter.timeout); - this.transcriptWaiters.delete(callId); - } - - private rejectTranscriptWaiter(callId: CallId, reason: string): void { - const waiter = this.transcriptWaiters.get(callId); - if (!waiter) return; - this.clearTranscriptWaiter(callId); - waiter.reject(new Error(reason)); - } - - private resolveTranscriptWaiter(callId: CallId, transcript: string): void { - const waiter = this.transcriptWaiters.get(callId); - if (!waiter) return; - this.clearTranscriptWaiter(callId); - waiter.resolve(transcript); - } - - private waitForFinalTranscript(callId: CallId): Promise { - // Only allow one in-flight waiter per call. - this.rejectTranscriptWaiter(callId, "Transcript waiter replaced"); - - const timeoutMs = this.config.transcriptTimeoutMs; - return new Promise((resolve, reject) => { - const timeout = setTimeout(() => { - this.transcriptWaiters.delete(callId); - reject( - new Error(`Timed out waiting for transcript after ${timeoutMs}ms`), - ); - }, timeoutMs); - - this.transcriptWaiters.set(callId, { resolve, reject, timeout }); - }); + await speakInitialMessage(this.buildContext(), providerCallId); } /** @@ -370,287 +133,21 @@ export class CallManager { callId: CallId, prompt: string, ): Promise<{ success: boolean; transcript?: string; error?: string }> { - const call = this.activeCalls.get(callId); - if (!call) { - return { success: false, error: "Call not found" }; - } - - if (!this.provider || !call.providerCallId) { - return { success: false, error: "Call not connected" }; - } - - if (TerminalStates.has(call.state)) { - return { success: false, error: "Call has ended" }; - } - - try { - await this.speak(callId, prompt); - - call.state = "listening"; - this.persistCallRecord(call); - - await this.provider.startListening({ - callId, - providerCallId: call.providerCallId, - }); - - const transcript = await this.waitForFinalTranscript(callId); - - // Best-effort: stop listening after final transcript. - await this.provider.stopListening({ - callId, - providerCallId: call.providerCallId, - }); - - return { success: true, transcript }; - } catch (err) { - return { - success: false, - error: err instanceof Error ? err.message : String(err), - }; - } finally { - this.clearTranscriptWaiter(callId); - } + return await continueCall(this.buildContext(), callId, prompt); } /** * End an active call. */ async endCall(callId: CallId): Promise<{ success: boolean; error?: string }> { - const call = this.activeCalls.get(callId); - if (!call) { - return { success: false, error: "Call not found" }; - } - - if (!this.provider || !call.providerCallId) { - return { success: false, error: "Call not connected" }; - } - - if (TerminalStates.has(call.state)) { - return { success: true }; // Already ended - } - - try { - await this.provider.hangupCall({ - callId, - providerCallId: call.providerCallId, - reason: "hangup-bot", - }); - - call.state = "hangup-bot"; - call.endedAt = Date.now(); - call.endReason = "hangup-bot"; - this.persistCallRecord(call); - this.clearMaxDurationTimer(callId); - this.rejectTranscriptWaiter(callId, "Call ended: hangup-bot"); - this.activeCalls.delete(callId); - if (call.providerCallId) { - this.providerCallIdMap.delete(call.providerCallId); - } - - return { success: true }; - } catch (err) { - return { - success: false, - error: err instanceof Error ? err.message : String(err), - }; - } - } - - /** - * Check if an inbound call should be accepted based on policy. - */ - private shouldAcceptInbound(from: string | undefined): boolean { - const { inboundPolicy: policy, allowFrom } = this.config; - - switch (policy) { - case "disabled": - console.log("[voice-call] Inbound call rejected: policy is disabled"); - return false; - - case "open": - console.log("[voice-call] Inbound call accepted: policy is open"); - return true; - - case "allowlist": - case "pairing": { - const normalized = from?.replace(/\D/g, "") || ""; - const allowed = (allowFrom || []).some((num) => { - const normalizedAllow = num.replace(/\D/g, ""); - return ( - normalized.endsWith(normalizedAllow) || - normalizedAllow.endsWith(normalized) - ); - }); - const status = allowed ? "accepted" : "rejected"; - console.log( - `[voice-call] Inbound call ${status}: ${from} ${allowed ? "is in" : "not in"} allowlist`, - ); - return allowed; - } - - default: - return false; - } - } - - /** - * Create a call record for an inbound call. - */ - private createInboundCall( - providerCallId: string, - from: string, - to: string, - ): CallRecord { - const callId = crypto.randomUUID(); - - const callRecord: CallRecord = { - callId, - providerCallId, - provider: this.provider?.name || "twilio", - direction: "inbound", - state: "ringing", - from, - to, - startedAt: Date.now(), - transcript: [], - processedEventIds: [], - metadata: { - initialMessage: - this.config.inboundGreeting || "Hello! How can I help you today?", - }, - }; - - this.activeCalls.set(callId, callRecord); - this.providerCallIdMap.set(providerCallId, callId); // Map providerCallId to internal callId - this.persistCallRecord(callRecord); - - console.log( - `[voice-call] Created inbound call record: ${callId} from ${from}`, - ); - return callRecord; - } - - /** - * Look up a call by either internal callId or providerCallId. - */ - private findCall(callIdOrProviderCallId: string): CallRecord | undefined { - // Try direct lookup by internal callId - const directCall = this.activeCalls.get(callIdOrProviderCallId); - if (directCall) return directCall; - - // Try lookup by providerCallId - return this.getCallByProviderCallId(callIdOrProviderCallId); + return await endCall(this.buildContext(), callId); } /** * Process a webhook event. */ processEvent(event: NormalizedEvent): void { - // Idempotency check - if (this.processedEventIds.has(event.id)) { - return; - } - this.processedEventIds.add(event.id); - - let call = this.findCall(event.callId); - - // Handle inbound calls - create record if it doesn't exist - if (!call && event.direction === "inbound" && event.providerCallId) { - // Check if we should accept this inbound call - if (!this.shouldAcceptInbound(event.from)) { - // TODO: Could hang up the call here - return; - } - - // Create a new call record for this inbound call - call = this.createInboundCall( - event.providerCallId, - event.from || "unknown", - event.to || this.config.fromNumber || "unknown", - ); - - // Update the event's callId to use our internal ID - event.callId = call.callId; - } - - if (!call) { - // Still no call record - ignore event - return; - } - - // Update provider call ID if we got it - if (event.providerCallId && !call.providerCallId) { - call.providerCallId = event.providerCallId; - } - - // Track processed event - call.processedEventIds.push(event.id); - - // Process event based on type - switch (event.type) { - case "call.initiated": - this.transitionState(call, "initiated"); - break; - - case "call.ringing": - this.transitionState(call, "ringing"); - break; - - case "call.answered": - call.answeredAt = event.timestamp; - this.transitionState(call, "answered"); - // Start max duration timer when call is answered - this.startMaxDurationTimer(call.callId); - break; - - case "call.active": - this.transitionState(call, "active"); - break; - - case "call.speaking": - this.transitionState(call, "speaking"); - break; - - case "call.speech": - if (event.isFinal) { - this.addTranscriptEntry(call, "user", event.transcript); - this.resolveTranscriptWaiter(call.callId, event.transcript); - } - this.transitionState(call, "listening"); - break; - - case "call.ended": - call.endedAt = event.timestamp; - call.endReason = event.reason; - this.transitionState(call, event.reason as CallState); - this.clearMaxDurationTimer(call.callId); - this.rejectTranscriptWaiter(call.callId, `Call ended: ${event.reason}`); - this.activeCalls.delete(call.callId); - if (call.providerCallId) { - this.providerCallIdMap.delete(call.providerCallId); - } - break; - - case "call.error": - if (!event.retryable) { - call.endedAt = event.timestamp; - call.endReason = "error"; - this.transitionState(call, "error"); - this.clearMaxDurationTimer(call.callId); - this.rejectTranscriptWaiter( - call.callId, - `Call error: ${event.error}`, - ); - this.activeCalls.delete(call.callId); - if (call.providerCallId) { - this.providerCallIdMap.delete(call.providerCallId); - } - } - break; - } - - this.persistCallRecord(call); + processEvent(this.buildContext(), event); } /** @@ -664,20 +161,11 @@ export class CallManager { * Get an active call by provider call ID (e.g., Twilio CallSid). */ getCallByProviderCallId(providerCallId: string): CallRecord | undefined { - // Fast path: use the providerCallIdMap for O(1) lookup - const callId = this.providerCallIdMap.get(providerCallId); - if (callId) { - return this.activeCalls.get(callId); - } - - // Fallback: linear search for cases where map wasn't populated - // (e.g., providerCallId set directly on call record) - for (const call of this.activeCalls.values()) { - if (call.providerCallId === providerCallId) { - return call; - } - } - return undefined; + return getCallByProviderCallId({ + activeCalls: this.activeCalls, + providerCallIdMap: this.providerCallIdMap, + providerCallId, + }); } /** @@ -691,156 +179,6 @@ export class CallManager { * Get call history (from persisted logs). */ async getCallHistory(limit = 50): Promise { - const logPath = path.join(this.storePath, "calls.jsonl"); - - try { - await fsp.access(logPath); - } catch { - return []; - } - - const content = await fsp.readFile(logPath, "utf-8"); - const lines = content.trim().split("\n").filter(Boolean); - const calls: CallRecord[] = []; - - // Parse last N lines - for (const line of lines.slice(-limit)) { - try { - const parsed = CallRecordSchema.parse(JSON.parse(line)); - calls.push(parsed); - } catch { - // Skip invalid lines - } - } - - return calls; - } - - // States that can cycle during multi-turn conversations - private static readonly ConversationStates = new Set([ - "speaking", - "listening", - ]); - - // Non-terminal state order for monotonic transitions - private static readonly StateOrder: readonly CallState[] = [ - "initiated", - "ringing", - "answered", - "active", - "speaking", - "listening", - ]; - - /** - * Transition call state with monotonic enforcement. - */ - private transitionState(call: CallRecord, newState: CallState): void { - // No-op for same state or already terminal - if (call.state === newState || TerminalStates.has(call.state)) return; - - // Terminal states can always be reached from non-terminal - if (TerminalStates.has(newState)) { - call.state = newState; - return; - } - - // Allow cycling between speaking and listening (multi-turn conversations) - if ( - CallManager.ConversationStates.has(call.state) && - CallManager.ConversationStates.has(newState) - ) { - call.state = newState; - return; - } - - // Only allow forward transitions in state order - const currentIndex = CallManager.StateOrder.indexOf(call.state); - const newIndex = CallManager.StateOrder.indexOf(newState); - - if (newIndex > currentIndex) { - call.state = newState; - } - } - - /** - * Add an entry to the call transcript. - */ - private addTranscriptEntry( - call: CallRecord, - speaker: "bot" | "user", - text: string, - ): void { - const entry: TranscriptEntry = { - timestamp: Date.now(), - speaker, - text, - isFinal: true, - }; - call.transcript.push(entry); - } - - /** - * Persist a call record to disk (fire-and-forget async). - */ - private persistCallRecord(call: CallRecord): void { - const logPath = path.join(this.storePath, "calls.jsonl"); - const line = `${JSON.stringify(call)}\n`; - // Fire-and-forget async write to avoid blocking event loop - fsp.appendFile(logPath, line).catch((err) => { - console.error("[voice-call] Failed to persist call record:", err); - }); - } - - /** - * Load active calls from persistence (for crash recovery). - * Uses streaming to handle large log files efficiently. - */ - private loadActiveCalls(): void { - const logPath = path.join(this.storePath, "calls.jsonl"); - if (!fs.existsSync(logPath)) return; - - // Read file synchronously and parse lines - const content = fs.readFileSync(logPath, "utf-8"); - const lines = content.split("\n"); - - // Build map of latest state per call - const callMap = new Map(); - - for (const line of lines) { - if (!line.trim()) continue; - try { - const call = CallRecordSchema.parse(JSON.parse(line)); - callMap.set(call.callId, call); - } catch { - // Skip invalid lines - } - } - - // Only keep non-terminal calls - for (const [callId, call] of callMap) { - if (!TerminalStates.has(call.state)) { - this.activeCalls.set(callId, call); - // Populate providerCallId mapping for lookups - if (call.providerCallId) { - this.providerCallIdMap.set(call.providerCallId, callId); - } - // Populate processed event IDs - for (const eventId of call.processedEventIds) { - this.processedEventIds.add(eventId); - } - } - } - } - - /** - * Generate TwiML for notify mode (speak message and hang up). - */ - private generateNotifyTwiml(message: string, voice: string): string { - return ` - - ${escapeXml(message)} - -`; + return await getCallHistoryFromStore(this.storePath, limit); } } diff --git a/extensions/voice-call/src/manager/context.ts b/extensions/voice-call/src/manager/context.ts new file mode 100644 index 000000000..38fc7aa1b --- /dev/null +++ b/extensions/voice-call/src/manager/context.ts @@ -0,0 +1,22 @@ +import type { CallId, CallRecord } from "../types.js"; +import type { VoiceCallConfig } from "../config.js"; +import type { VoiceCallProvider } from "../providers/base.js"; + +export type TranscriptWaiter = { + resolve: (text: string) => void; + reject: (err: Error) => void; + timeout: NodeJS.Timeout; +}; + +export type CallManagerContext = { + activeCalls: Map; + providerCallIdMap: Map; + processedEventIds: Set; + provider: VoiceCallProvider | null; + config: VoiceCallConfig; + storePath: string; + webhookUrl: string | null; + transcriptWaiters: Map; + maxDurationTimers: Map; +}; + diff --git a/extensions/voice-call/src/manager/events.ts b/extensions/voice-call/src/manager/events.ts new file mode 100644 index 000000000..8cf7ad5b4 --- /dev/null +++ b/extensions/voice-call/src/manager/events.ts @@ -0,0 +1,178 @@ +import crypto from "node:crypto"; + +import type { CallId, CallRecord, CallState, NormalizedEvent } from "../types.js"; +import { TerminalStates } from "../types.js"; +import type { CallManagerContext } from "./context.js"; +import { findCall } from "./lookup.js"; +import { addTranscriptEntry, transitionState } from "./state.js"; +import { persistCallRecord } from "./store.js"; +import { + clearMaxDurationTimer, + rejectTranscriptWaiter, + resolveTranscriptWaiter, + startMaxDurationTimer, +} from "./timers.js"; +import { endCall } from "./outbound.js"; + +function shouldAcceptInbound(config: CallManagerContext["config"], from: string | undefined): boolean { + const { inboundPolicy: policy, allowFrom } = config; + + switch (policy) { + case "disabled": + console.log("[voice-call] Inbound call rejected: policy is disabled"); + return false; + + case "open": + console.log("[voice-call] Inbound call accepted: policy is open"); + return true; + + case "allowlist": + case "pairing": { + const normalized = from?.replace(/\D/g, "") || ""; + const allowed = (allowFrom || []).some((num) => { + const normalizedAllow = num.replace(/\D/g, ""); + return normalized.endsWith(normalizedAllow) || normalizedAllow.endsWith(normalized); + }); + const status = allowed ? "accepted" : "rejected"; + console.log( + `[voice-call] Inbound call ${status}: ${from} ${allowed ? "is in" : "not in"} allowlist`, + ); + return allowed; + } + + default: + return false; + } +} + +function createInboundCall(params: { + ctx: CallManagerContext; + providerCallId: string; + from: string; + to: string; +}): CallRecord { + const callId = crypto.randomUUID(); + + const callRecord: CallRecord = { + callId, + providerCallId: params.providerCallId, + provider: params.ctx.provider?.name || "twilio", + direction: "inbound", + state: "ringing", + from: params.from, + to: params.to, + startedAt: Date.now(), + transcript: [], + processedEventIds: [], + metadata: { + initialMessage: params.ctx.config.inboundGreeting || "Hello! How can I help you today?", + }, + }; + + params.ctx.activeCalls.set(callId, callRecord); + params.ctx.providerCallIdMap.set(params.providerCallId, callId); + persistCallRecord(params.ctx.storePath, callRecord); + + console.log(`[voice-call] Created inbound call record: ${callId} from ${params.from}`); + return callRecord; +} + +export function processEvent(ctx: CallManagerContext, event: NormalizedEvent): void { + if (ctx.processedEventIds.has(event.id)) return; + ctx.processedEventIds.add(event.id); + + let call = findCall({ + activeCalls: ctx.activeCalls, + providerCallIdMap: ctx.providerCallIdMap, + callIdOrProviderCallId: event.callId, + }); + + if (!call && event.direction === "inbound" && event.providerCallId) { + if (!shouldAcceptInbound(ctx.config, event.from)) { + // TODO: Could hang up the call here. + return; + } + + call = createInboundCall({ + ctx, + providerCallId: event.providerCallId, + from: event.from || "unknown", + to: event.to || ctx.config.fromNumber || "unknown", + }); + + // Normalize event to internal ID for downstream consumers. + event.callId = call.callId; + } + + if (!call) return; + + if (event.providerCallId && !call.providerCallId) { + call.providerCallId = event.providerCallId; + ctx.providerCallIdMap.set(event.providerCallId, call.callId); + } + + call.processedEventIds.push(event.id); + + switch (event.type) { + case "call.initiated": + transitionState(call, "initiated"); + break; + + case "call.ringing": + transitionState(call, "ringing"); + break; + + case "call.answered": + call.answeredAt = event.timestamp; + transitionState(call, "answered"); + startMaxDurationTimer({ + ctx, + callId: call.callId, + onTimeout: async (callId) => { + await endCall(ctx, callId); + }, + }); + break; + + case "call.active": + transitionState(call, "active"); + break; + + case "call.speaking": + transitionState(call, "speaking"); + break; + + case "call.speech": + if (event.isFinal) { + addTranscriptEntry(call, "user", event.transcript); + resolveTranscriptWaiter(ctx, call.callId, event.transcript); + } + transitionState(call, "listening"); + break; + + case "call.ended": + call.endedAt = event.timestamp; + call.endReason = event.reason; + transitionState(call, event.reason as CallState); + clearMaxDurationTimer(ctx, call.callId); + rejectTranscriptWaiter(ctx, call.callId, `Call ended: ${event.reason}`); + ctx.activeCalls.delete(call.callId); + if (call.providerCallId) ctx.providerCallIdMap.delete(call.providerCallId); + break; + + case "call.error": + if (!event.retryable) { + call.endedAt = event.timestamp; + call.endReason = "error"; + transitionState(call, "error"); + clearMaxDurationTimer(ctx, call.callId); + rejectTranscriptWaiter(ctx, call.callId, `Call error: ${event.error}`); + ctx.activeCalls.delete(call.callId); + if (call.providerCallId) ctx.providerCallIdMap.delete(call.providerCallId); + } + break; + } + + persistCallRecord(ctx.storePath, call); +} + diff --git a/extensions/voice-call/src/manager/lookup.ts b/extensions/voice-call/src/manager/lookup.ts new file mode 100644 index 000000000..bc0b5f8ee --- /dev/null +++ b/extensions/voice-call/src/manager/lookup.ts @@ -0,0 +1,34 @@ +import type { CallId, CallRecord } from "../types.js"; + +export function getCallByProviderCallId(params: { + activeCalls: Map; + providerCallIdMap: Map; + providerCallId: string; +}): CallRecord | undefined { + const callId = params.providerCallIdMap.get(params.providerCallId); + if (callId) { + return params.activeCalls.get(callId); + } + + for (const call of params.activeCalls.values()) { + if (call.providerCallId === params.providerCallId) { + return call; + } + } + return undefined; +} + +export function findCall(params: { + activeCalls: Map; + providerCallIdMap: Map; + callIdOrProviderCallId: string; +}): CallRecord | undefined { + const directCall = params.activeCalls.get(params.callIdOrProviderCallId); + if (directCall) return directCall; + return getCallByProviderCallId({ + activeCalls: params.activeCalls, + providerCallIdMap: params.providerCallIdMap, + providerCallId: params.callIdOrProviderCallId, + }); +} + diff --git a/extensions/voice-call/src/manager/outbound.ts b/extensions/voice-call/src/manager/outbound.ts new file mode 100644 index 000000000..6cb037252 --- /dev/null +++ b/extensions/voice-call/src/manager/outbound.ts @@ -0,0 +1,247 @@ +import crypto from "node:crypto"; + +import { TerminalStates, type CallId, type CallRecord, type OutboundCallOptions } from "../types.js"; +import type { CallMode } from "../config.js"; +import { mapVoiceToPolly } from "../voice-mapping.js"; +import type { CallManagerContext } from "./context.js"; +import { getCallByProviderCallId } from "./lookup.js"; +import { generateNotifyTwiml } from "./twiml.js"; +import { addTranscriptEntry, transitionState } from "./state.js"; +import { persistCallRecord } from "./store.js"; +import { clearMaxDurationTimer, clearTranscriptWaiter, rejectTranscriptWaiter, waitForFinalTranscript } from "./timers.js"; + +export async function initiateCall( + ctx: CallManagerContext, + to: string, + sessionKey?: string, + options?: OutboundCallOptions | string, +): Promise<{ callId: CallId; success: boolean; error?: string }> { + const opts: OutboundCallOptions = + typeof options === "string" ? { message: options } : (options ?? {}); + const initialMessage = opts.message; + const mode = opts.mode ?? ctx.config.outbound.defaultMode; + + if (!ctx.provider) { + return { callId: "", success: false, error: "Provider not initialized" }; + } + if (!ctx.webhookUrl) { + return { callId: "", success: false, error: "Webhook URL not configured" }; + } + + if (ctx.activeCalls.size >= ctx.config.maxConcurrentCalls) { + return { + callId: "", + success: false, + error: `Maximum concurrent calls (${ctx.config.maxConcurrentCalls}) reached`, + }; + } + + const callId = crypto.randomUUID(); + const from = + ctx.config.fromNumber || + (ctx.provider?.name === "mock" ? "+15550000000" : undefined); + if (!from) { + return { callId: "", success: false, error: "fromNumber not configured" }; + } + + const callRecord: CallRecord = { + callId, + provider: ctx.provider.name, + direction: "outbound", + state: "initiated", + from, + to, + sessionKey, + startedAt: Date.now(), + transcript: [], + processedEventIds: [], + metadata: { + ...(initialMessage && { initialMessage }), + mode, + }, + }; + + ctx.activeCalls.set(callId, callRecord); + persistCallRecord(ctx.storePath, callRecord); + + try { + // For notify mode with a message, use inline TwiML with . + let inlineTwiml: string | undefined; + if (mode === "notify" && initialMessage) { + const pollyVoice = mapVoiceToPolly(ctx.config.tts.voice); + inlineTwiml = generateNotifyTwiml(initialMessage, pollyVoice); + console.log(`[voice-call] Using inline TwiML for notify mode (voice: ${pollyVoice})`); + } + + const result = await ctx.provider.initiateCall({ + callId, + from, + to, + webhookUrl: ctx.webhookUrl, + inlineTwiml, + }); + + callRecord.providerCallId = result.providerCallId; + ctx.providerCallIdMap.set(result.providerCallId, callId); + persistCallRecord(ctx.storePath, callRecord); + + return { callId, success: true }; + } catch (err) { + callRecord.state = "failed"; + callRecord.endedAt = Date.now(); + callRecord.endReason = "failed"; + persistCallRecord(ctx.storePath, callRecord); + ctx.activeCalls.delete(callId); + if (callRecord.providerCallId) { + ctx.providerCallIdMap.delete(callRecord.providerCallId); + } + + return { + callId, + success: false, + error: err instanceof Error ? err.message : String(err), + }; + } +} + +export async function speak( + ctx: CallManagerContext, + callId: CallId, + text: string, +): Promise<{ success: boolean; error?: string }> { + const call = ctx.activeCalls.get(callId); + if (!call) return { success: false, error: "Call not found" }; + if (!ctx.provider || !call.providerCallId) return { success: false, error: "Call not connected" }; + if (TerminalStates.has(call.state)) return { success: false, error: "Call has ended" }; + + try { + transitionState(call, "speaking"); + persistCallRecord(ctx.storePath, call); + + addTranscriptEntry(call, "bot", text); + + await ctx.provider.playTts({ + callId, + providerCallId: call.providerCallId, + text, + voice: ctx.config.tts.voice, + }); + + return { success: true }; + } catch (err) { + return { success: false, error: err instanceof Error ? err.message : String(err) }; + } +} + +export async function speakInitialMessage( + ctx: CallManagerContext, + providerCallId: string, +): Promise { + const call = getCallByProviderCallId({ + activeCalls: ctx.activeCalls, + providerCallIdMap: ctx.providerCallIdMap, + providerCallId, + }); + if (!call) { + console.warn(`[voice-call] speakInitialMessage: no call found for ${providerCallId}`); + return; + } + + const initialMessage = call.metadata?.initialMessage as string | undefined; + const mode = (call.metadata?.mode as CallMode) ?? "conversation"; + + if (!initialMessage) { + console.log(`[voice-call] speakInitialMessage: no initial message for ${call.callId}`); + return; + } + + // Clear so we don't speak it again if the provider reconnects. + if (call.metadata) { + delete call.metadata.initialMessage; + persistCallRecord(ctx.storePath, call); + } + + console.log(`[voice-call] Speaking initial message for call ${call.callId} (mode: ${mode})`); + const result = await speak(ctx, call.callId, initialMessage); + if (!result.success) { + console.warn(`[voice-call] Failed to speak initial message: ${result.error}`); + return; + } + + if (mode === "notify") { + const delaySec = ctx.config.outbound.notifyHangupDelaySec; + console.log(`[voice-call] Notify mode: auto-hangup in ${delaySec}s for call ${call.callId}`); + setTimeout(async () => { + const currentCall = ctx.activeCalls.get(call.callId); + if (currentCall && !TerminalStates.has(currentCall.state)) { + console.log(`[voice-call] Notify mode: hanging up call ${call.callId}`); + await endCall(ctx, call.callId); + } + }, delaySec * 1000); + } +} + +export async function continueCall( + ctx: CallManagerContext, + callId: CallId, + prompt: string, +): Promise<{ success: boolean; transcript?: string; error?: string }> { + const call = ctx.activeCalls.get(callId); + if (!call) return { success: false, error: "Call not found" }; + if (!ctx.provider || !call.providerCallId) return { success: false, error: "Call not connected" }; + if (TerminalStates.has(call.state)) return { success: false, error: "Call has ended" }; + + try { + await speak(ctx, callId, prompt); + + transitionState(call, "listening"); + persistCallRecord(ctx.storePath, call); + + await ctx.provider.startListening({ callId, providerCallId: call.providerCallId }); + + const transcript = await waitForFinalTranscript(ctx, callId); + + // Best-effort: stop listening after final transcript. + await ctx.provider.stopListening({ callId, providerCallId: call.providerCallId }); + + return { success: true, transcript }; + } catch (err) { + return { success: false, error: err instanceof Error ? err.message : String(err) }; + } finally { + clearTranscriptWaiter(ctx, callId); + } +} + +export async function endCall( + ctx: CallManagerContext, + callId: CallId, +): Promise<{ success: boolean; error?: string }> { + const call = ctx.activeCalls.get(callId); + if (!call) return { success: false, error: "Call not found" }; + if (!ctx.provider || !call.providerCallId) return { success: false, error: "Call not connected" }; + if (TerminalStates.has(call.state)) return { success: true }; + + try { + await ctx.provider.hangupCall({ + callId, + providerCallId: call.providerCallId, + reason: "hangup-bot", + }); + + call.state = "hangup-bot"; + call.endedAt = Date.now(); + call.endReason = "hangup-bot"; + persistCallRecord(ctx.storePath, call); + + clearMaxDurationTimer(ctx, callId); + rejectTranscriptWaiter(ctx, callId, "Call ended: hangup-bot"); + + ctx.activeCalls.delete(callId); + if (call.providerCallId) ctx.providerCallIdMap.delete(call.providerCallId); + + return { success: true }; + } catch (err) { + return { success: false, error: err instanceof Error ? err.message : String(err) }; + } +} + diff --git a/extensions/voice-call/src/manager/state.ts b/extensions/voice-call/src/manager/state.ts new file mode 100644 index 000000000..7131d6b7d --- /dev/null +++ b/extensions/voice-call/src/manager/state.ts @@ -0,0 +1,51 @@ +import { TerminalStates, type CallRecord, type CallState, type TranscriptEntry } from "../types.js"; + +const ConversationStates = new Set(["speaking", "listening"]); + +const StateOrder: readonly CallState[] = [ + "initiated", + "ringing", + "answered", + "active", + "speaking", + "listening", +]; + +export function transitionState(call: CallRecord, newState: CallState): void { + // No-op for same state or already terminal. + if (call.state === newState || TerminalStates.has(call.state)) return; + + // Terminal states can always be reached from non-terminal. + if (TerminalStates.has(newState)) { + call.state = newState; + return; + } + + // Allow cycling between speaking and listening (multi-turn conversations). + if (ConversationStates.has(call.state) && ConversationStates.has(newState)) { + call.state = newState; + return; + } + + // Only allow forward transitions in state order. + const currentIndex = StateOrder.indexOf(call.state); + const newIndex = StateOrder.indexOf(newState); + if (newIndex > currentIndex) { + call.state = newState; + } +} + +export function addTranscriptEntry( + call: CallRecord, + speaker: "bot" | "user", + text: string, +): void { + const entry: TranscriptEntry = { + timestamp: Date.now(), + speaker, + text, + isFinal: true, + }; + call.transcript.push(entry); +} + diff --git a/extensions/voice-call/src/manager/store.ts b/extensions/voice-call/src/manager/store.ts new file mode 100644 index 000000000..96525479a --- /dev/null +++ b/extensions/voice-call/src/manager/store.ts @@ -0,0 +1,89 @@ +import fs from "node:fs"; +import fsp from "node:fs/promises"; +import path from "node:path"; + +import { CallRecordSchema, TerminalStates, type CallId, type CallRecord } from "../types.js"; + +export function persistCallRecord(storePath: string, call: CallRecord): void { + const logPath = path.join(storePath, "calls.jsonl"); + const line = `${JSON.stringify(call)}\n`; + // Fire-and-forget async write to avoid blocking event loop. + fsp.appendFile(logPath, line).catch((err) => { + console.error("[voice-call] Failed to persist call record:", err); + }); +} + +export function loadActiveCallsFromStore(storePath: string): { + activeCalls: Map; + providerCallIdMap: Map; + processedEventIds: Set; +} { + const logPath = path.join(storePath, "calls.jsonl"); + if (!fs.existsSync(logPath)) { + return { + activeCalls: new Map(), + providerCallIdMap: new Map(), + processedEventIds: new Set(), + }; + } + + const content = fs.readFileSync(logPath, "utf-8"); + const lines = content.split("\n"); + + const callMap = new Map(); + for (const line of lines) { + if (!line.trim()) continue; + try { + const call = CallRecordSchema.parse(JSON.parse(line)); + callMap.set(call.callId, call); + } catch { + // Skip invalid lines. + } + } + + const activeCalls = new Map(); + const providerCallIdMap = new Map(); + const processedEventIds = new Set(); + + for (const [callId, call] of callMap) { + if (TerminalStates.has(call.state)) continue; + activeCalls.set(callId, call); + if (call.providerCallId) { + providerCallIdMap.set(call.providerCallId, callId); + } + for (const eventId of call.processedEventIds) { + processedEventIds.add(eventId); + } + } + + return { activeCalls, providerCallIdMap, processedEventIds }; +} + +export async function getCallHistoryFromStore( + storePath: string, + limit = 50, +): Promise { + const logPath = path.join(storePath, "calls.jsonl"); + + try { + await fsp.access(logPath); + } catch { + return []; + } + + const content = await fsp.readFile(logPath, "utf-8"); + const lines = content.trim().split("\n").filter(Boolean); + const calls: CallRecord[] = []; + + for (const line of lines.slice(-limit)) { + try { + const parsed = CallRecordSchema.parse(JSON.parse(line)); + calls.push(parsed); + } catch { + // Skip invalid lines. + } + } + + return calls; +} + diff --git a/extensions/voice-call/src/manager/timers.ts b/extensions/voice-call/src/manager/timers.ts new file mode 100644 index 000000000..d56a26fc7 --- /dev/null +++ b/extensions/voice-call/src/manager/timers.ts @@ -0,0 +1,87 @@ +import { TerminalStates, type CallId } from "../types.js"; +import type { CallManagerContext } from "./context.js"; +import { persistCallRecord } from "./store.js"; + +export function clearMaxDurationTimer(ctx: CallManagerContext, callId: CallId): void { + const timer = ctx.maxDurationTimers.get(callId); + if (timer) { + clearTimeout(timer); + ctx.maxDurationTimers.delete(callId); + } +} + +export function startMaxDurationTimer(params: { + ctx: CallManagerContext; + callId: CallId; + onTimeout: (callId: CallId) => Promise; +}): void { + clearMaxDurationTimer(params.ctx, params.callId); + + const maxDurationMs = params.ctx.config.maxDurationSeconds * 1000; + console.log( + `[voice-call] Starting max duration timer (${params.ctx.config.maxDurationSeconds}s) for call ${params.callId}`, + ); + + const timer = setTimeout(async () => { + params.ctx.maxDurationTimers.delete(params.callId); + const call = params.ctx.activeCalls.get(params.callId); + if (call && !TerminalStates.has(call.state)) { + console.log( + `[voice-call] Max duration reached (${params.ctx.config.maxDurationSeconds}s), ending call ${params.callId}`, + ); + call.endReason = "timeout"; + persistCallRecord(params.ctx.storePath, call); + await params.onTimeout(params.callId); + } + }, maxDurationMs); + + params.ctx.maxDurationTimers.set(params.callId, timer); +} + +export function clearTranscriptWaiter(ctx: CallManagerContext, callId: CallId): void { + const waiter = ctx.transcriptWaiters.get(callId); + if (!waiter) return; + clearTimeout(waiter.timeout); + ctx.transcriptWaiters.delete(callId); +} + +export function rejectTranscriptWaiter( + ctx: CallManagerContext, + callId: CallId, + reason: string, +): void { + const waiter = ctx.transcriptWaiters.get(callId); + if (!waiter) return; + clearTranscriptWaiter(ctx, callId); + waiter.reject(new Error(reason)); +} + +export function resolveTranscriptWaiter( + ctx: CallManagerContext, + callId: CallId, + transcript: string, +): void { + const waiter = ctx.transcriptWaiters.get(callId); + if (!waiter) return; + clearTranscriptWaiter(ctx, callId); + waiter.resolve(transcript); +} + +export function waitForFinalTranscript( + ctx: CallManagerContext, + callId: CallId, +): Promise { + // Only allow one in-flight waiter per call. + rejectTranscriptWaiter(ctx, callId, "Transcript waiter replaced"); + + const timeoutMs = ctx.config.transcriptTimeoutMs; + return new Promise((resolve, reject) => { + const timeout = setTimeout(() => { + ctx.transcriptWaiters.delete(callId); + reject(new Error(`Timed out waiting for transcript after ${timeoutMs}ms`)); + }, timeoutMs); + + ctx.transcriptWaiters.set(callId, { resolve, reject, timeout }); + }); +} + diff --git a/extensions/voice-call/src/manager/twiml.ts b/extensions/voice-call/src/manager/twiml.ts new file mode 100644 index 000000000..d6c1dd038 --- /dev/null +++ b/extensions/voice-call/src/manager/twiml.ts @@ -0,0 +1,10 @@ +import { escapeXml } from "../voice-mapping.js"; + +export function generateNotifyTwiml(message: string, voice: string): string { + return ` + + ${escapeXml(message)} + +`; +} +