From 946b0229e8a45c0b1cd11cae9e743a7c0cd49f5f Mon Sep 17 00:00:00 2001 From: vrknetha Date: Tue, 13 Jan 2026 17:16:02 +0530 Subject: [PATCH] Voice Call: add Plivo provider --- docs/plugins/voice-call.md | 244 ++---- extensions/voice-call/README.md | 12 +- extensions/voice-call/index.ts | 2 +- extensions/voice-call/src/config.ts | 28 +- extensions/voice-call/src/manager.test.ts | 73 ++ extensions/voice-call/src/manager.ts | 802 +++++++++++++++++- extensions/voice-call/src/providers/index.ts | 1 + .../voice-call/src/providers/plivo.test.ts | 29 + extensions/voice-call/src/providers/plivo.ts | 504 +++++++++++ extensions/voice-call/src/runtime.ts | 13 + extensions/voice-call/src/types.ts | 2 +- .../voice-call/src/webhook-security.test.ts | 156 ++++ extensions/voice-call/src/webhook-security.ts | 230 +++++ skills/voice-call/SKILL.md | 3 +- vitest.config.ts | 6 +- 15 files changed, 1861 insertions(+), 244 deletions(-) create mode 100644 extensions/voice-call/src/manager.test.ts create mode 100644 extensions/voice-call/src/providers/plivo.test.ts create mode 100644 extensions/voice-call/src/providers/plivo.ts create mode 100644 extensions/voice-call/src/webhook-security.test.ts diff --git a/docs/plugins/voice-call.md b/docs/plugins/voice-call.md index 775a7677a..5c55cec88 100644 --- a/docs/plugins/voice-call.md +++ b/docs/plugins/voice-call.md @@ -1,5 +1,5 @@ --- -summary: "Voice Call plugin: outbound and inbound calls via Twilio/Telnyx, with CLI, tools, and streaming" +summary: "Voice Call plugin: outbound + inbound calls via Twilio/Telnyx/Plivo (plugin install + config + CLI)" read_when: - You want to place an outbound voice call from Clawdbot - You are configuring or developing the voice-call plugin @@ -7,34 +7,26 @@ read_when: # Voice Call (plugin) -Voice calls for Clawdbot. Use it to place outbound notifications, run multi-turn -phone conversations, and accept inbound calls with an explicit policy. +Voice calls for Clawdbot via a plugin. Supports outbound notifications and +multi-turn conversations with inbound policies. Current providers: - `twilio` (Programmable Voice + Media Streams) - `telnyx` (Call Control v2) +- `plivo` (Voice API + XML transfer + GetInput speech) - `mock` (dev/no network) -What you get: -- Outbound calls in notify or conversation mode -- Inbound calls with allowlist or open policies -- Provider webhooks with signature verification -- Optional streaming (Twilio Media Streams + OpenAI Realtime STT) -- CLI commands, a tool surface, and JSONL call logs - Quick mental model: -1. Install plugin -2. Restart Gateway -3. Configure `plugins.entries.voice-call.config` -4. Expose a public webhook URL -5. Call via `clawdbot voicecall ...` or the `voice_call` tool +- Install plugin +- Restart Gateway +- Configure under `plugins.entries.voice-call.config` +- Use `clawdbot voicecall ...` or the `voice_call` tool ## Where it runs (local vs remote) -The Voice Call plugin runs inside the Gateway process. +The Voice Call plugin runs **inside the Gateway process**. -If you use a remote Gateway, install and configure the plugin on the machine -running the Gateway, then restart the Gateway to load it. +If you use a remote Gateway, install/configure the plugin on the **machine running the Gateway**, then restart the Gateway to load it. ## Install @@ -55,15 +47,9 @@ cd ./extensions/voice-call && pnpm install Restart the Gateway afterwards. -Note: use `pnpm` for repo work. Bun is not recommended and can cause issues in -other Clawdbot channels (especially WhatsApp and Telegram). +## Config -## Config overview - -All config lives under `plugins.entries.voice-call.config`. Phone numbers must -be in E.164 format (`+15550001234`). - -Minimal example (Twilio outbound only): +Set config under `plugins.entries.voice-call.config`: ```json5 { @@ -72,16 +58,39 @@ Minimal example (Twilio outbound only): "voice-call": { enabled: true, config: { - provider: "twilio", + provider: "twilio", // or "telnyx" | "plivo" | "mock" fromNumber: "+15550001234", toNumber: "+15550005678", + twilio: { accountSid: "ACxxxxxxxx", authToken: "..." }, - serve: { port: 3334, bind: "127.0.0.1", path: "/voice/webhook" }, - publicUrl: "https://example.ngrok.app/voice/webhook", - outbound: { defaultMode: "notify", notifyHangupDelaySec: 3 } + + plivo: { + authId: "MAxxxxxxxxxxxxxxxxxxxx", + authToken: "..." + }, + + // Webhook server + serve: { + port: 3334, + path: "/voice/webhook" + }, + + // Public exposure (pick one) + // publicUrl: "https://example.ngrok.app/voice/webhook", + // tunnel: { provider: "ngrok" }, + // tailscale: { mode: "funnel", path: "/voice/webhook" } + + outbound: { + defaultMode: "notify" // notify | conversation + }, + + streaming: { + enabled: true, + streamPath: "/voice/stream" + } } } } @@ -90,178 +99,27 @@ Minimal example (Twilio outbound only): ``` Notes: -- Twilio/Telnyx require a publicly reachable webhook URL. +- Twilio/Telnyx require a **publicly reachable** webhook URL. +- Plivo requires a **publicly reachable** webhook URL. - `mock` is a local dev provider (no network calls). - `skipSignatureVerification` is for local testing only. -## Public URL and webhook exposure +## Inbound calls -Providers send webhooks from the public internet. Your `serve.path` must be -reachable from them. - -You have three options: -- `publicUrl`: you already have a public HTTPS URL pointing at the Gateway host. -- `tunnel`: use ngrok or Tailscale (recommended for quick setup). -- `tailscale`: legacy Tailscale serve/funnel config (still supported, but - `tunnel` is preferred). - -Example using ngrok: +Inbound policy defaults to `disabled`. To enable inbound calls, set: ```json5 { - tunnel: { - provider: "ngrok", - ngrokAuthToken: "..." - } + inboundPolicy: "allowlist", + allowFrom: ["+15550001234"], + inboundGreeting: "Hello! How can I help?" } ``` -Example using Tailscale Funnel: - -```json5 -{ - tunnel: { provider: "tailscale-funnel" } -} -``` - -CLI helper (Tailscale only): - -```bash -clawdbot voicecall expose --mode funnel -``` - -If you use Tailscale Serve without Funnel, the URL is private to your tailnet, -so Twilio/Telnyx will not be able to reach it. - -## Providers - -### Twilio - -Twilio uses Programmable Voice with optional Media Streams for real-time audio. - -Required config: -- `twilio.accountSid` and `twilio.authToken` -- (or `TWILIO_ACCOUNT_SID` / `TWILIO_AUTH_TOKEN`) -- A Twilio phone number that can reach your webhook - -Inbound setup: -- In the Twilio Console for your phone number, set the Voice webhook to your - public `serve.path` URL (HTTP POST). - -Outbound setup: -- Outbound calls are created via Twilio API; the plugin supplies the webhook URL - per call. - -Streaming (optional, Twilio only): -- Enable `streaming.enabled` and set `streaming.streamPath` -- Provide `OPENAI_API_KEY` or `streaming.openaiApiKey` -- The stream WebSocket URL is derived from your `publicUrl` host + `streamPath` - (https -> wss) - -Signature verification: -- Webhooks are verified by default. -- If you are using ngrok free tier, leave `tunnel.allowNgrokFreeTier` as `true` - so URL rewriting does not break verification. -- Use `skipSignatureVerification` only for local dev. - -### Telnyx - -Telnyx uses Call Control v2. - -Required config: -- `telnyx.apiKey` and `telnyx.connectionId` -- (or `TELNYX_API_KEY` / `TELNYX_CONNECTION_ID`) - -Inbound setup: -- In your Telnyx Call Control App, set the webhook URL to your public - `serve.path`. - -Signature verification: -- Set `telnyx.publicKey` to enable Ed25519 signature verification. -- If you do not set a public key, webhooks are accepted without verification - (not recommended for production). - -Transcription: -- Telnyx uses its own transcription events for `continue` responses. - -### Mock (dev) - -`mock` is for local testing and does not make network calls. - -## Call modes - -Outbound calls support two modes: -- `notify`: speak a message and auto-hangup after `notifyHangupDelaySec`. -- `conversation`: keep the call open and allow back-and-forth. - -Examples: - -```bash -clawdbot voicecall call --to "+15555550123" --message "Hello" --mode notify -clawdbot voicecall call --to "+15555550123" --message "Ready to talk?" --mode conversation -``` - -## Inbound calls and policy - -Inbound calls are blocked by default. - -Policies: -- `disabled`: block all inbound calls -- `allowlist`: allow only numbers in `allowFrom` -- `pairing`: currently behaves like `allowlist` -- `open`: accept all inbound calls - -Inbound greeting: -- `inboundGreeting` controls the first message spoken when a call is accepted. - -## Auto-responses and models - -When a caller speaks, the plugin can auto-respond using the embedded Clawdbot -agent. - -Key settings: -- `responseModel`: model reference for voice responses (default `openai/gpt-4o-mini`) -- `responseSystemPrompt`: optional override for the voice system prompt -- `responseTimeoutMs`: response generation timeout - -Responses use the same agent system as messaging, including tool access. -The default system prompt keeps replies short and conversational (about 1-2 sentences). - -## Streaming (Twilio only) - -When `streaming.enabled` is on: -- The webhook server also accepts WebSocket upgrades at `streaming.streamPath`. -- Audio is forwarded to OpenAI Realtime STT. -- Final transcripts are fed into the call manager and used by `continue` and - auto-responses. - -Required: -- A public HTTPS URL for the Gateway (used to derive `wss://...`). -- `OPENAI_API_KEY` or `streaming.openaiApiKey`. - -If no OpenAI key is available, streaming does not start and real-time transcripts -will not arrive. - -## Limits and timeouts - -These settings are enforced by the call manager: -- `maxDurationSeconds`: auto-hangup after this many seconds (starts when answered). -- `maxConcurrentCalls`: max simultaneous active calls. -- `transcriptTimeoutMs`: how long `continue` waits for a final transcript. - -## Logs and debugging - -Calls are appended as JSONL to: -- `${store}/calls.jsonl`, or -- `~/clawd/voice-calls/calls.jsonl` by default - -Set `store` if you want a different base directory for call logs. - -Use: - -```bash -clawdbot voicecall tail -``` +Auto-responses use the agent system. Tune with: +- `responseModel` +- `responseSystemPrompt` +- `responseTimeoutMs` ## CLI @@ -286,7 +144,7 @@ Actions: - `end_call` (callId) - `get_status` (callId) -If you want a ready-made skill entry, grab it from [ClawdHub.com](https://ClawdHub.com). +This repo ships a matching skill doc at `skills/voice-call/SKILL.md`. ## Gateway RPC diff --git a/extensions/voice-call/README.md b/extensions/voice-call/README.md index c17b71c93..11ff8324a 100644 --- a/extensions/voice-call/README.md +++ b/extensions/voice-call/README.md @@ -5,6 +5,7 @@ Official Voice Call plugin for **Clawdbot**. Providers: - **Twilio** (Programmable Voice + Media Streams) - **Telnyx** (Call Control v2) +- **Plivo** (Voice API + XML transfer + GetInput speech) - **Mock** (dev/no network) Docs: `https://docs.clawd.bot/plugins/voice-call` @@ -34,7 +35,7 @@ Put under `plugins.entries.voice-call.config`: ```json5 { - provider: "twilio", // or "telnyx" | "mock" + provider: "twilio", // or "telnyx" | "plivo" | "mock" fromNumber: "+15550001234", toNumber: "+15550005678", @@ -43,6 +44,11 @@ Put under `plugins.entries.voice-call.config`: authToken: "your_token" }, + plivo: { + authId: "MAxxxxxxxxxxxxxxxxxxxx", + authToken: "your_token" + }, + // Webhook server serve: { port: 3334, @@ -66,7 +72,7 @@ Put under `plugins.entries.voice-call.config`: ``` Notes: -- Twilio/Telnyx require a **publicly reachable** webhook URL. +- Twilio/Telnyx/Plivo require a **publicly reachable** webhook URL. - `mock` is a local dev provider (no network calls). ## CLI @@ -102,6 +108,6 @@ Actions: ## Notes -- Uses webhook signature verification for Twilio/Telnyx. +- Uses webhook signature verification for Twilio/Telnyx/Plivo. - `responseModel` / `responseSystemPrompt` control AI auto-responses. - Media streaming requires `ws` and OpenAI Realtime API key. diff --git a/extensions/voice-call/index.ts b/extensions/voice-call/index.ts index 755242459..f0fc8e3ad 100644 --- a/extensions/voice-call/index.ts +++ b/extensions/voice-call/index.ts @@ -125,7 +125,7 @@ const VoiceCallToolSchema = Type.Union([ const voiceCallPlugin = { id: "voice-call", name: "Voice Call", - description: "Voice-call plugin with Telnyx/Twilio providers", + description: "Voice-call plugin with Telnyx/Twilio/Plivo providers", configSchema: voiceCallConfigSchema, register(api) { const cfg = voiceCallConfigSchema.parse(api.pluginConfig); diff --git a/extensions/voice-call/src/config.ts b/extensions/voice-call/src/config.ts index 83879a7d2..ede197ac9 100644 --- a/extensions/voice-call/src/config.ts +++ b/extensions/voice-call/src/config.ts @@ -53,6 +53,14 @@ export const TwilioConfigSchema = z.object({ }); export type TwilioConfig = z.infer; +export const PlivoConfigSchema = z.object({ + /** Plivo Auth ID (starts with MA/SA) */ + authId: z.string().min(1).optional(), + /** Plivo Auth Token */ + authToken: z.string().min(1).optional(), +}); +export type PlivoConfig = z.infer; + // ----------------------------------------------------------------------------- // STT/TTS Configuration // ----------------------------------------------------------------------------- @@ -219,8 +227,8 @@ export const VoiceCallConfigSchema = z.object({ /** Enable voice call functionality */ enabled: z.boolean().default(false), - /** Active provider (telnyx, twilio, or mock) */ - provider: z.enum(["telnyx", "twilio", "mock"]).optional(), + /** Active provider (telnyx, twilio, plivo, or mock) */ + provider: z.enum(["telnyx", "twilio", "plivo", "mock"]).optional(), /** Telnyx-specific configuration */ telnyx: TelnyxConfigSchema.optional(), @@ -228,6 +236,9 @@ export const VoiceCallConfigSchema = z.object({ /** Twilio-specific configuration */ twilio: TwilioConfigSchema.optional(), + /** Plivo-specific configuration */ + plivo: PlivoConfigSchema.optional(), + /** Phone number to call from (E.164) */ fromNumber: E164Schema.optional(), @@ -351,5 +362,18 @@ export function validateProviderConfig(config: VoiceCallConfig): { } } + if (config.provider === "plivo") { + if (!config.plivo?.authId) { + errors.push( + "plugins.entries.voice-call.config.plivo.authId is required (or set PLIVO_AUTH_ID env)", + ); + } + if (!config.plivo?.authToken) { + errors.push( + "plugins.entries.voice-call.config.plivo.authToken is required (or set PLIVO_AUTH_TOKEN env)", + ); + } + } + return { valid: errors.length === 0, errors }; } diff --git a/extensions/voice-call/src/manager.test.ts b/extensions/voice-call/src/manager.test.ts new file mode 100644 index 000000000..71522dd50 --- /dev/null +++ b/extensions/voice-call/src/manager.test.ts @@ -0,0 +1,73 @@ +import os from "node:os"; +import path from "node:path"; + +import { describe, expect, it } from "vitest"; + +import { VoiceCallConfigSchema } from "./config.js"; +import { CallManager } from "./manager.js"; +import type { + HangupCallInput, + InitiateCallInput, + InitiateCallResult, + PlayTtsInput, + ProviderWebhookParseResult, + StartListeningInput, + StopListeningInput, + WebhookContext, + WebhookVerificationResult, +} from "./types.js"; +import type { VoiceCallProvider } from "./providers/base.js"; + +class FakeProvider implements VoiceCallProvider { + readonly name = "plivo" as const; + + verifyWebhook(_ctx: WebhookContext): WebhookVerificationResult { + return { ok: true }; + } + parseWebhookEvent(_ctx: WebhookContext): ProviderWebhookParseResult { + return { events: [], statusCode: 200 }; + } + async initiateCall(_input: InitiateCallInput): Promise { + return { providerCallId: "request-uuid", status: "initiated" }; + } + async hangupCall(_input: HangupCallInput): Promise {} + async playTts(_input: PlayTtsInput): Promise {} + async startListening(_input: StartListeningInput): Promise {} + async stopListening(_input: StopListeningInput): Promise {} +} + +describe("CallManager", () => { + it("upgrades providerCallId mapping when provider ID changes", async () => { + const config = VoiceCallConfigSchema.parse({ + enabled: true, + provider: "plivo", + fromNumber: "+15550000000", + }); + + const storePath = path.join(os.tmpdir(), `clawdbot-voice-call-test-${Date.now()}`); + const manager = new CallManager(config, storePath); + manager.initialize(new FakeProvider(), "https://example.com/voice/webhook"); + + const { callId, success, error } = await manager.initiateCall("+15550000001"); + expect(success).toBe(true); + expect(error).toBeUndefined(); + + // The provider returned a request UUID as the initial providerCallId. + expect(manager.getCall(callId)?.providerCallId).toBe("request-uuid"); + expect(manager.getCallByProviderCallId("request-uuid")?.callId).toBe(callId); + + // Provider later reports the actual call UUID. + manager.processEvent({ + id: "evt-1", + type: "call.answered", + callId, + providerCallId: "call-uuid", + timestamp: Date.now(), + }); + + expect(manager.getCall(callId)?.providerCallId).toBe("call-uuid"); + expect(manager.getCallByProviderCallId("call-uuid")?.callId).toBe(callId); + expect(manager.getCallByProviderCallId("request-uuid")).toBeUndefined(); + }); +}); + diff --git a/extensions/voice-call/src/manager.ts b/extensions/voice-call/src/manager.ts index 9173f76b5..60f553f06 100644 --- a/extensions/voice-call/src/manager.ts +++ b/extensions/voice-call/src/manager.ts @@ -1,27 +1,23 @@ +import crypto from "node:crypto"; import fs from "node:fs"; +import fsp from "node:fs/promises"; import os from "node:os"; import path from "node:path"; import { resolveUserPath } from "./utils.js"; -import type { VoiceCallConfig } from "./config.js"; +import type { CallMode, VoiceCallConfig } from "./config.js"; import type { VoiceCallProvider } from "./providers/base.js"; import { type CallId, type CallRecord, + CallRecordSchema, + type CallState, type NormalizedEvent, type OutboundCallOptions, + TerminalStates, + type TranscriptEntry, } from "./types.js"; -import type { CallManagerContext } from "./manager/context.js"; -import { processEvent } from "./manager/events.js"; -import { getCallByProviderCallId } from "./manager/lookup.js"; -import { - continueCall, - endCall, - initiateCall, - speak, - speakInitialMessage, -} from "./manager/outbound.js"; -import { getCallHistoryFromStore, loadActiveCallsFromStore } from "./manager/store.js"; +import { escapeXml, mapVoiceToPolly } from "./voice-mapping.js"; /** * Manages voice calls: state machine, persistence, and provider coordination. @@ -55,20 +51,6 @@ export class CallManager { this.storePath = resolveUserPath(rawPath); } - private buildContext(): CallManagerContext { - return { - activeCalls: this.activeCalls, - providerCallIdMap: this.providerCallIdMap, - processedEventIds: this.processedEventIds, - provider: this.provider, - config: this.config, - storePath: this.storePath, - webhookUrl: this.webhookUrl, - transcriptWaiters: this.transcriptWaiters, - maxDurationTimers: this.maxDurationTimers, - }; - } - /** * Initialize the call manager with a provider. */ @@ -80,10 +62,7 @@ export class CallManager { fs.mkdirSync(this.storePath, { recursive: true }); // Load any persisted active calls - const restored = loadActiveCallsFromStore(this.storePath); - this.activeCalls = restored.activeCalls; - this.providerCallIdMap = restored.providerCallIdMap; - this.processedEventIds = restored.processedEventIds; + this.loadActiveCalls(); } /** @@ -104,7 +83,102 @@ export class CallManager { sessionKey?: string, options?: OutboundCallOptions | string, ): Promise<{ callId: CallId; success: boolean; error?: string }> { - return await initiateCall(this.buildContext(), to, sessionKey, options); + // Support legacy string argument for initialMessage + const opts: OutboundCallOptions = + typeof options === "string" ? { message: options } : (options ?? {}); + const initialMessage = opts.message; + const mode = opts.mode ?? this.config.outbound.defaultMode; + if (!this.provider) { + return { callId: "", success: false, error: "Provider not initialized" }; + } + + if (!this.webhookUrl) { + return { + callId: "", + success: false, + error: "Webhook URL not configured", + }; + } + + // Check concurrent call limit + const activeCalls = this.getActiveCalls(); + if (activeCalls.length >= this.config.maxConcurrentCalls) { + return { + callId: "", + success: false, + error: `Maximum concurrent calls (${this.config.maxConcurrentCalls}) reached`, + }; + } + + const callId = crypto.randomUUID(); + const from = + this.config.fromNumber || + (this.provider?.name === "mock" ? "+15550000000" : undefined); + if (!from) { + return { callId: "", success: false, error: "fromNumber not configured" }; + } + + // Create call record with mode in metadata + const callRecord: CallRecord = { + callId, + provider: this.provider.name, + direction: "outbound", + state: "initiated", + from, + to, + sessionKey, + startedAt: Date.now(), + transcript: [], + processedEventIds: [], + metadata: { + ...(initialMessage && { initialMessage }), + mode, + }, + }; + + this.activeCalls.set(callId, callRecord); + this.persistCallRecord(callRecord); + + try { + // For notify mode with a message, use inline TwiML with + let inlineTwiml: string | undefined; + if (mode === "notify" && initialMessage) { + const pollyVoice = mapVoiceToPolly(this.config.tts.voice); + inlineTwiml = this.generateNotifyTwiml(initialMessage, pollyVoice); + console.log( + `[voice-call] Using inline TwiML for notify mode (voice: ${pollyVoice})`, + ); + } + + const result = await this.provider.initiateCall({ + callId, + from, + to, + webhookUrl: this.webhookUrl, + inlineTwiml, + }); + + callRecord.providerCallId = result.providerCallId; + this.providerCallIdMap.set(result.providerCallId, callId); // Map providerCallId to internal callId + this.persistCallRecord(callRecord); + + return { callId, success: true }; + } catch (err) { + callRecord.state = "failed"; + callRecord.endedAt = Date.now(); + callRecord.endReason = "failed"; + this.persistCallRecord(callRecord); + this.activeCalls.delete(callId); + if (callRecord.providerCallId) { + this.providerCallIdMap.delete(callRecord.providerCallId); + } + + return { + callId, + success: false, + error: err instanceof Error ? err.message : String(err), + }; + } } /** @@ -114,7 +188,42 @@ export class CallManager { callId: CallId, text: string, ): Promise<{ success: boolean; error?: string }> { - return await speak(this.buildContext(), callId, text); + const call = this.activeCalls.get(callId); + if (!call) { + return { success: false, error: "Call not found" }; + } + + if (!this.provider || !call.providerCallId) { + return { success: false, error: "Call not connected" }; + } + + if (TerminalStates.has(call.state)) { + return { success: false, error: "Call has ended" }; + } + + try { + // Update state + call.state = "speaking"; + this.persistCallRecord(call); + + // Add to transcript + this.addTranscriptEntry(call, "bot", text); + + // Play TTS + await this.provider.playTts({ + callId, + providerCallId: call.providerCallId, + text, + voice: this.config.tts.voice, + }); + + return { success: true }; + } catch (err) { + return { + success: false, + error: err instanceof Error ? err.message : String(err), + }; + } } /** @@ -123,7 +232,135 @@ export class CallManager { * In notify mode, auto-hangup after the message is delivered. */ async speakInitialMessage(providerCallId: string): Promise { - await speakInitialMessage(this.buildContext(), providerCallId); + const call = this.getCallByProviderCallId(providerCallId); + if (!call) { + console.warn( + `[voice-call] speakInitialMessage: no call found for ${providerCallId}`, + ); + return; + } + + const initialMessage = call.metadata?.initialMessage as string | undefined; + const mode = (call.metadata?.mode as CallMode) ?? "conversation"; + + if (!initialMessage) { + console.log( + `[voice-call] speakInitialMessage: no initial message for ${call.callId}`, + ); + return; + } + + // Clear the initial message so we don't speak it again + if (call.metadata) { + delete call.metadata.initialMessage; + this.persistCallRecord(call); + } + + console.log( + `[voice-call] Speaking initial message for call ${call.callId} (mode: ${mode})`, + ); + const result = await this.speak(call.callId, initialMessage); + if (!result.success) { + console.warn( + `[voice-call] Failed to speak initial message: ${result.error}`, + ); + return; + } + + // In notify mode, auto-hangup after delay + if (mode === "notify") { + const delaySec = this.config.outbound.notifyHangupDelaySec; + console.log( + `[voice-call] Notify mode: auto-hangup in ${delaySec}s for call ${call.callId}`, + ); + setTimeout(async () => { + const currentCall = this.getCall(call.callId); + if (currentCall && !TerminalStates.has(currentCall.state)) { + console.log( + `[voice-call] Notify mode: hanging up call ${call.callId}`, + ); + await this.endCall(call.callId); + } + }, delaySec * 1000); + } + } + + /** + * Start max duration timer for a call. + * Auto-hangup when maxDurationSeconds is reached. + */ + private startMaxDurationTimer(callId: CallId): void { + // Clear any existing timer + this.clearMaxDurationTimer(callId); + + const maxDurationMs = this.config.maxDurationSeconds * 1000; + console.log( + `[voice-call] Starting max duration timer (${this.config.maxDurationSeconds}s) for call ${callId}`, + ); + + const timer = setTimeout(async () => { + this.maxDurationTimers.delete(callId); + const call = this.getCall(callId); + if (call && !TerminalStates.has(call.state)) { + console.log( + `[voice-call] Max duration reached (${this.config.maxDurationSeconds}s), ending call ${callId}`, + ); + call.endReason = "timeout"; + this.persistCallRecord(call); + await this.endCall(callId); + } + }, maxDurationMs); + + this.maxDurationTimers.set(callId, timer); + } + + /** + * Clear max duration timer for a call. + */ + private clearMaxDurationTimer(callId: CallId): void { + const timer = this.maxDurationTimers.get(callId); + if (timer) { + clearTimeout(timer); + this.maxDurationTimers.delete(callId); + } + } + + private clearTranscriptWaiter(callId: CallId): void { + const waiter = this.transcriptWaiters.get(callId); + if (!waiter) return; + clearTimeout(waiter.timeout); + this.transcriptWaiters.delete(callId); + } + + private rejectTranscriptWaiter(callId: CallId, reason: string): void { + const waiter = this.transcriptWaiters.get(callId); + if (!waiter) return; + this.clearTranscriptWaiter(callId); + waiter.reject(new Error(reason)); + } + + private resolveTranscriptWaiter(callId: CallId, transcript: string): void { + const waiter = this.transcriptWaiters.get(callId); + if (!waiter) return; + this.clearTranscriptWaiter(callId); + waiter.resolve(transcript); + } + + private waitForFinalTranscript(callId: CallId): Promise { + // Only allow one in-flight waiter per call. + this.rejectTranscriptWaiter(callId, "Transcript waiter replaced"); + + const timeoutMs = this.config.transcriptTimeoutMs; + return new Promise((resolve, reject) => { + const timeout = setTimeout(() => { + this.transcriptWaiters.delete(callId); + reject( + new Error(`Timed out waiting for transcript after ${timeoutMs}ms`), + ); + }, timeoutMs); + + this.transcriptWaiters.set(callId, { resolve, reject, timeout }); + }); } /** @@ -133,21 +370,343 @@ export class CallManager { callId: CallId, prompt: string, ): Promise<{ success: boolean; transcript?: string; error?: string }> { - return await continueCall(this.buildContext(), callId, prompt); + const call = this.activeCalls.get(callId); + if (!call) { + return { success: false, error: "Call not found" }; + } + + if (!this.provider || !call.providerCallId) { + return { success: false, error: "Call not connected" }; + } + + if (TerminalStates.has(call.state)) { + return { success: false, error: "Call has ended" }; + } + + try { + await this.speak(callId, prompt); + + call.state = "listening"; + this.persistCallRecord(call); + + await this.provider.startListening({ + callId, + providerCallId: call.providerCallId, + }); + + const transcript = await this.waitForFinalTranscript(callId); + + // Best-effort: stop listening after final transcript. + await this.provider.stopListening({ + callId, + providerCallId: call.providerCallId, + }); + + return { success: true, transcript }; + } catch (err) { + return { + success: false, + error: err instanceof Error ? err.message : String(err), + }; + } finally { + this.clearTranscriptWaiter(callId); + } } /** * End an active call. */ async endCall(callId: CallId): Promise<{ success: boolean; error?: string }> { - return await endCall(this.buildContext(), callId); + const call = this.activeCalls.get(callId); + if (!call) { + return { success: false, error: "Call not found" }; + } + + if (!this.provider || !call.providerCallId) { + return { success: false, error: "Call not connected" }; + } + + if (TerminalStates.has(call.state)) { + return { success: true }; // Already ended + } + + try { + await this.provider.hangupCall({ + callId, + providerCallId: call.providerCallId, + reason: "hangup-bot", + }); + + call.state = "hangup-bot"; + call.endedAt = Date.now(); + call.endReason = "hangup-bot"; + this.persistCallRecord(call); + this.clearMaxDurationTimer(callId); + this.rejectTranscriptWaiter(callId, "Call ended: hangup-bot"); + this.activeCalls.delete(callId); + if (call.providerCallId) { + this.providerCallIdMap.delete(call.providerCallId); + } + + return { success: true }; + } catch (err) { + return { + success: false, + error: err instanceof Error ? err.message : String(err), + }; + } + } + + /** + * Check if an inbound call should be accepted based on policy. + */ + private shouldAcceptInbound(from: string | undefined): boolean { + const { inboundPolicy: policy, allowFrom } = this.config; + + switch (policy) { + case "disabled": + console.log("[voice-call] Inbound call rejected: policy is disabled"); + return false; + + case "open": + console.log("[voice-call] Inbound call accepted: policy is open"); + return true; + + case "allowlist": + case "pairing": { + const normalized = from?.replace(/\D/g, "") || ""; + const allowed = (allowFrom || []).some((num) => { + const normalizedAllow = num.replace(/\D/g, ""); + return ( + normalized.endsWith(normalizedAllow) || + normalizedAllow.endsWith(normalized) + ); + }); + const status = allowed ? "accepted" : "rejected"; + console.log( + `[voice-call] Inbound call ${status}: ${from} ${allowed ? "is in" : "not in"} allowlist`, + ); + return allowed; + } + + default: + return false; + } + } + + /** + * Create a call record for an inbound call. + */ + private createInboundCall( + providerCallId: string, + from: string, + to: string, + ): CallRecord { + const callId = crypto.randomUUID(); + + const callRecord: CallRecord = { + callId, + providerCallId, + provider: this.provider?.name || "twilio", + direction: "inbound", + state: "ringing", + from, + to, + startedAt: Date.now(), + transcript: [], + processedEventIds: [], + metadata: { + initialMessage: + this.config.inboundGreeting || "Hello! How can I help you today?", + }, + }; + + this.activeCalls.set(callId, callRecord); + this.providerCallIdMap.set(providerCallId, callId); // Map providerCallId to internal callId + this.persistCallRecord(callRecord); + + console.log( + `[voice-call] Created inbound call record: ${callId} from ${from}`, + ); + return callRecord; + } + + /** + * Look up a call by either internal callId or providerCallId. + */ + private findCall(callIdOrProviderCallId: string): CallRecord | undefined { + // Try direct lookup by internal callId + const directCall = this.activeCalls.get(callIdOrProviderCallId); + if (directCall) return directCall; + + // Try lookup by providerCallId + return this.getCallByProviderCallId(callIdOrProviderCallId); } /** * Process a webhook event. */ processEvent(event: NormalizedEvent): void { - processEvent(this.buildContext(), event); + // Idempotency check + if (this.processedEventIds.has(event.id)) { + return; + } + this.processedEventIds.add(event.id); + + let call = this.findCall(event.callId); + + // Handle inbound calls - create record if it doesn't exist + if (!call && event.direction === "inbound" && event.providerCallId) { + // Check if we should accept this inbound call + if (!this.shouldAcceptInbound(event.from)) { + // TODO: Could hang up the call here + return; + } + + // Create a new call record for this inbound call + call = this.createInboundCall( + event.providerCallId, + event.from || "unknown", + event.to || this.config.fromNumber || "unknown", + ); + + // Update the event's callId to use our internal ID + event.callId = call.callId; + } + + if (!call) { + // Still no call record - ignore event + return; + } + + // Update provider call ID if we got it + if (event.providerCallId && event.providerCallId !== call.providerCallId) { + const previousProviderCallId = call.providerCallId; + call.providerCallId = event.providerCallId; + this.providerCallIdMap.set(event.providerCallId, call.callId); + if (previousProviderCallId) { + const mapped = this.providerCallIdMap.get(previousProviderCallId); + if (mapped === call.callId) { + this.providerCallIdMap.delete(previousProviderCallId); + } + } + } + + // Track processed event + call.processedEventIds.push(event.id); + + // Process event based on type + switch (event.type) { + case "call.initiated": + this.transitionState(call, "initiated"); + break; + + case "call.ringing": + this.transitionState(call, "ringing"); + break; + + case "call.answered": + call.answeredAt = event.timestamp; + this.transitionState(call, "answered"); + // Start max duration timer when call is answered + this.startMaxDurationTimer(call.callId); + // Best-effort: speak initial message (for inbound greetings and outbound + // conversation mode) once the call is answered. + this.maybeSpeakInitialMessageOnAnswered(call); + break; + + case "call.active": + this.transitionState(call, "active"); + break; + + case "call.speaking": + this.transitionState(call, "speaking"); + break; + + case "call.speech": + if (event.isFinal) { + this.addTranscriptEntry(call, "user", event.transcript); + this.resolveTranscriptWaiter(call.callId, event.transcript); + } + this.transitionState(call, "listening"); + break; + + case "call.ended": + call.endedAt = event.timestamp; + call.endReason = event.reason; + this.transitionState(call, event.reason as CallState); + this.clearMaxDurationTimer(call.callId); + this.rejectTranscriptWaiter(call.callId, `Call ended: ${event.reason}`); + this.activeCalls.delete(call.callId); + if (call.providerCallId) { + this.providerCallIdMap.delete(call.providerCallId); + } + break; + + case "call.error": + if (!event.retryable) { + call.endedAt = event.timestamp; + call.endReason = "error"; + this.transitionState(call, "error"); + this.clearMaxDurationTimer(call.callId); + this.rejectTranscriptWaiter( + call.callId, + `Call error: ${event.error}`, + ); + this.activeCalls.delete(call.callId); + if (call.providerCallId) { + this.providerCallIdMap.delete(call.providerCallId); + } + } + break; + } + + this.persistCallRecord(call); + } + + private maybeSpeakInitialMessageOnAnswered(call: CallRecord): void { + const initialMessage = + typeof call.metadata?.initialMessage === "string" + ? call.metadata.initialMessage.trim() + : ""; + + if (!initialMessage) return; + + // For outbound notify mode, we already use inline TwiML (provider-specific) to + // deliver the message and hang up; do not double-speak. + const mode = call.metadata?.mode as CallMode | undefined; + if (call.direction === "outbound" && mode === "notify") return; + + if (!this.provider || !call.providerCallId) return; + + // Twilio has provider-specific state for speaking ( fallback) and can + // fail for inbound calls; keep existing Twilio behavior unchanged. + if (this.provider.name === "twilio") return; + + // Clear the initial message so it only plays once. + if (call.metadata) { + delete call.metadata.initialMessage; + } + this.persistCallRecord(call); + + void this.provider + .playTts({ + callId: call.callId, + providerCallId: call.providerCallId, + text: initialMessage, + voice: this.config.tts.voice, + }) + .then(() => { + this.addTranscriptEntry(call, "bot", initialMessage); + this.persistCallRecord(call); + }) + .catch((err) => { + console.warn( + `[voice-call] Failed to speak initial message on answered: ${ + err instanceof Error ? err.message : String(err) + }`, + ); + }); } /** @@ -161,11 +720,20 @@ export class CallManager { * Get an active call by provider call ID (e.g., Twilio CallSid). */ getCallByProviderCallId(providerCallId: string): CallRecord | undefined { - return getCallByProviderCallId({ - activeCalls: this.activeCalls, - providerCallIdMap: this.providerCallIdMap, - providerCallId, - }); + // Fast path: use the providerCallIdMap for O(1) lookup + const callId = this.providerCallIdMap.get(providerCallId); + if (callId) { + return this.activeCalls.get(callId); + } + + // Fallback: linear search for cases where map wasn't populated + // (e.g., providerCallId set directly on call record) + for (const call of this.activeCalls.values()) { + if (call.providerCallId === providerCallId) { + return call; + } + } + return undefined; } /** @@ -179,6 +747,156 @@ export class CallManager { * Get call history (from persisted logs). */ async getCallHistory(limit = 50): Promise { - return await getCallHistoryFromStore(this.storePath, limit); + const logPath = path.join(this.storePath, "calls.jsonl"); + + try { + await fsp.access(logPath); + } catch { + return []; + } + + const content = await fsp.readFile(logPath, "utf-8"); + const lines = content.trim().split("\n").filter(Boolean); + const calls: CallRecord[] = []; + + // Parse last N lines + for (const line of lines.slice(-limit)) { + try { + const parsed = CallRecordSchema.parse(JSON.parse(line)); + calls.push(parsed); + } catch { + // Skip invalid lines + } + } + + return calls; + } + + // States that can cycle during multi-turn conversations + private static readonly ConversationStates = new Set([ + "speaking", + "listening", + ]); + + // Non-terminal state order for monotonic transitions + private static readonly StateOrder: readonly CallState[] = [ + "initiated", + "ringing", + "answered", + "active", + "speaking", + "listening", + ]; + + /** + * Transition call state with monotonic enforcement. + */ + private transitionState(call: CallRecord, newState: CallState): void { + // No-op for same state or already terminal + if (call.state === newState || TerminalStates.has(call.state)) return; + + // Terminal states can always be reached from non-terminal + if (TerminalStates.has(newState)) { + call.state = newState; + return; + } + + // Allow cycling between speaking and listening (multi-turn conversations) + if ( + CallManager.ConversationStates.has(call.state) && + CallManager.ConversationStates.has(newState) + ) { + call.state = newState; + return; + } + + // Only allow forward transitions in state order + const currentIndex = CallManager.StateOrder.indexOf(call.state); + const newIndex = CallManager.StateOrder.indexOf(newState); + + if (newIndex > currentIndex) { + call.state = newState; + } + } + + /** + * Add an entry to the call transcript. + */ + private addTranscriptEntry( + call: CallRecord, + speaker: "bot" | "user", + text: string, + ): void { + const entry: TranscriptEntry = { + timestamp: Date.now(), + speaker, + text, + isFinal: true, + }; + call.transcript.push(entry); + } + + /** + * Persist a call record to disk (fire-and-forget async). + */ + private persistCallRecord(call: CallRecord): void { + const logPath = path.join(this.storePath, "calls.jsonl"); + const line = `${JSON.stringify(call)}\n`; + // Fire-and-forget async write to avoid blocking event loop + fsp.appendFile(logPath, line).catch((err) => { + console.error("[voice-call] Failed to persist call record:", err); + }); + } + + /** + * Load active calls from persistence (for crash recovery). + * Uses streaming to handle large log files efficiently. + */ + private loadActiveCalls(): void { + const logPath = path.join(this.storePath, "calls.jsonl"); + if (!fs.existsSync(logPath)) return; + + // Read file synchronously and parse lines + const content = fs.readFileSync(logPath, "utf-8"); + const lines = content.split("\n"); + + // Build map of latest state per call + const callMap = new Map(); + + for (const line of lines) { + if (!line.trim()) continue; + try { + const call = CallRecordSchema.parse(JSON.parse(line)); + callMap.set(call.callId, call); + } catch { + // Skip invalid lines + } + } + + // Only keep non-terminal calls + for (const [callId, call] of callMap) { + if (!TerminalStates.has(call.state)) { + this.activeCalls.set(callId, call); + // Populate providerCallId mapping for lookups + if (call.providerCallId) { + this.providerCallIdMap.set(call.providerCallId, callId); + } + // Populate processed event IDs + for (const eventId of call.processedEventIds) { + this.processedEventIds.add(eventId); + } + } + } + } + + /** + * Generate TwiML for notify mode (speak message and hang up). + */ + private generateNotifyTwiml(message: string, voice: string): string { + return ` + + ${escapeXml(message)} + +`; } } diff --git a/extensions/voice-call/src/providers/index.ts b/extensions/voice-call/src/providers/index.ts index d322174ab..c8183622e 100644 --- a/extensions/voice-call/src/providers/index.ts +++ b/extensions/voice-call/src/providers/index.ts @@ -7,3 +7,4 @@ export { } from "./stt-openai-realtime.js"; export { TelnyxProvider } from "./telnyx.js"; export { TwilioProvider } from "./twilio.js"; +export { PlivoProvider } from "./plivo.js"; diff --git a/extensions/voice-call/src/providers/plivo.test.ts b/extensions/voice-call/src/providers/plivo.test.ts new file mode 100644 index 000000000..0674a7dd2 --- /dev/null +++ b/extensions/voice-call/src/providers/plivo.test.ts @@ -0,0 +1,29 @@ +import { describe, expect, it } from "vitest"; + +import { PlivoProvider } from "./plivo.js"; + +describe("PlivoProvider", () => { + it("parses answer callback into call.answered and returns keep-alive XML", () => { + const provider = new PlivoProvider({ + authId: "MA000000000000000000", + authToken: "test-token", + }); + + const result = provider.parseWebhookEvent({ + headers: { host: "example.com" }, + rawBody: + "CallUUID=call-uuid&CallStatus=in-progress&Direction=outbound&From=%2B15550000000&To=%2B15550000001&Event=StartApp", + url: "https://example.com/voice/webhook?provider=plivo&flow=answer&callId=internal-call-id", + method: "POST", + query: { provider: "plivo", flow: "answer", callId: "internal-call-id" }, + }); + + expect(result.events).toHaveLength(1); + expect(result.events[0]?.type).toBe("call.answered"); + expect(result.events[0]?.callId).toBe("internal-call-id"); + expect(result.events[0]?.providerCallId).toBe("call-uuid"); + expect(result.providerResponseBody).toContain("(); + + // Used for transfer URLs and GetInput action URLs. + private callIdToWebhookUrl = new Map(); + private callUuidToWebhookUrl = new Map(); + + private pendingSpeakByCallId = new Map(); + private pendingListenByCallId = new Map(); + + constructor(config: PlivoConfig, options: PlivoProviderOptions = {}) { + if (!config.authId) { + throw new Error("Plivo Auth ID is required"); + } + if (!config.authToken) { + throw new Error("Plivo Auth Token is required"); + } + + this.authId = config.authId; + this.authToken = config.authToken; + this.baseUrl = `https://api.plivo.com/v1/Account/${this.authId}`; + this.options = options; + } + + private async apiRequest(params: { + method: "GET" | "POST" | "DELETE"; + endpoint: string; + body?: Record; + allowNotFound?: boolean; + }): Promise { + const { method, endpoint, body, allowNotFound } = params; + const response = await fetch(`${this.baseUrl}${endpoint}`, { + method, + headers: { + Authorization: `Basic ${Buffer.from(`${this.authId}:${this.authToken}`).toString("base64")}`, + "Content-Type": "application/json", + }, + body: body ? JSON.stringify(body) : undefined, + }); + + if (!response.ok) { + if (allowNotFound && response.status === 404) { + return undefined as T; + } + const errorText = await response.text(); + throw new Error(`Plivo API error: ${response.status} ${errorText}`); + } + + const text = await response.text(); + return text ? (JSON.parse(text) as T) : (undefined as T); + } + + verifyWebhook(ctx: WebhookContext): WebhookVerificationResult { + const result = verifyPlivoWebhook(ctx, this.authToken, { + publicUrl: this.options.publicUrl, + skipVerification: this.options.skipVerification, + }); + + if (!result.ok) { + console.warn(`[plivo] Webhook verification failed: ${result.reason}`); + } + + return { ok: result.ok, reason: result.reason }; + } + + parseWebhookEvent(ctx: WebhookContext): ProviderWebhookParseResult { + const flow = + typeof ctx.query?.flow === "string" ? ctx.query.flow.trim() : ""; + + const parsed = this.parseBody(ctx.rawBody); + if (!parsed) { + return { events: [], statusCode: 400 }; + } + + // Keep providerCallId mapping for later call control. + const callUuid = parsed.get("CallUUID") || undefined; + if (callUuid) { + const webhookBase = PlivoProvider.baseWebhookUrlFromCtx(ctx); + if (webhookBase) { + this.callUuidToWebhookUrl.set(callUuid, webhookBase); + } + } + + // Special flows that exist only to return Plivo XML (no events). + if (flow === "xml-speak") { + const callId = this.getCallIdFromQuery(ctx); + const pending = callId ? this.pendingSpeakByCallId.get(callId) : undefined; + if (callId) this.pendingSpeakByCallId.delete(callId); + + const xml = pending + ? PlivoProvider.xmlSpeak(pending.text, pending.locale) + : PlivoProvider.xmlKeepAlive(); + return { + events: [], + providerResponseBody: xml, + providerResponseHeaders: { "Content-Type": "text/xml" }, + statusCode: 200, + }; + } + + if (flow === "xml-listen") { + const callId = this.getCallIdFromQuery(ctx); + const pending = callId + ? this.pendingListenByCallId.get(callId) + : undefined; + if (callId) this.pendingListenByCallId.delete(callId); + + const actionUrl = this.buildActionUrl(ctx, { + flow: "getinput", + callId, + }); + + const xml = + actionUrl && callId + ? PlivoProvider.xmlGetInputSpeech({ + actionUrl, + language: pending?.language, + }) + : PlivoProvider.xmlKeepAlive(); + + return { + events: [], + providerResponseBody: xml, + providerResponseHeaders: { "Content-Type": "text/xml" }, + statusCode: 200, + }; + } + + // Normal events. + const callIdFromQuery = this.getCallIdFromQuery(ctx); + const event = this.normalizeEvent(parsed, callIdFromQuery); + + return { + events: event ? [event] : [], + providerResponseBody: + flow === "answer" || flow === "getinput" + ? PlivoProvider.xmlKeepAlive() + : PlivoProvider.xmlEmpty(), + providerResponseHeaders: { "Content-Type": "text/xml" }, + statusCode: 200, + }; + } + + private normalizeEvent( + params: URLSearchParams, + callIdOverride?: string, + ): NormalizedEvent | null { + const callUuid = params.get("CallUUID") || ""; + const requestUuid = params.get("RequestUUID") || ""; + + if (requestUuid && callUuid) { + this.requestUuidToCallUuid.set(requestUuid, callUuid); + } + + const direction = params.get("Direction"); + const from = params.get("From") || undefined; + const to = params.get("To") || undefined; + const callStatus = params.get("CallStatus"); + + const baseEvent = { + id: crypto.randomUUID(), + callId: callIdOverride || callUuid || requestUuid, + providerCallId: callUuid || requestUuid || undefined, + timestamp: Date.now(), + direction: + direction === "inbound" + ? ("inbound" as const) + : direction === "outbound" + ? ("outbound" as const) + : undefined, + from, + to, + }; + + const digits = params.get("Digits"); + if (digits) { + return { ...baseEvent, type: "call.dtmf", digits }; + } + + const transcript = PlivoProvider.extractTranscript(params); + if (transcript) { + return { + ...baseEvent, + type: "call.speech", + transcript, + isFinal: true, + }; + } + + // Call lifecycle. + if (callStatus === "ringing") { + return { ...baseEvent, type: "call.ringing" }; + } + + if (callStatus === "in-progress") { + return { ...baseEvent, type: "call.answered" }; + } + + if ( + callStatus === "completed" || + callStatus === "busy" || + callStatus === "no-answer" || + callStatus === "failed" + ) { + return { + ...baseEvent, + type: "call.ended", + reason: + callStatus === "completed" + ? "completed" + : callStatus === "busy" + ? "busy" + : callStatus === "no-answer" + ? "no-answer" + : "failed", + }; + } + + // Plivo will call our answer_url when the call is answered; if we don't have + // a CallStatus for some reason, treat it as answered so the call can proceed. + if (params.get("Event") === "StartApp" && callUuid) { + return { ...baseEvent, type: "call.answered" }; + } + + return null; + } + + async initiateCall(input: InitiateCallInput): Promise { + const webhookUrl = new URL(input.webhookUrl); + webhookUrl.searchParams.set("provider", "plivo"); + webhookUrl.searchParams.set("callId", input.callId); + + const answerUrl = new URL(webhookUrl); + answerUrl.searchParams.set("flow", "answer"); + + const hangupUrl = new URL(webhookUrl); + hangupUrl.searchParams.set("flow", "hangup"); + + this.callIdToWebhookUrl.set(input.callId, input.webhookUrl); + + const ringTimeoutSec = this.options.ringTimeoutSec ?? 30; + + const result = await this.apiRequest({ + method: "POST", + endpoint: "/Call/", + body: { + from: PlivoProvider.normalizeNumber(input.from), + to: PlivoProvider.normalizeNumber(input.to), + answer_url: answerUrl.toString(), + answer_method: "POST", + hangup_url: hangupUrl.toString(), + hangup_method: "POST", + // Plivo's API uses `hangup_on_ring` for outbound ring timeout. + hangup_on_ring: ringTimeoutSec, + }, + }); + + const requestUuid = Array.isArray(result.request_uuid) + ? result.request_uuid[0] + : result.request_uuid; + if (!requestUuid) { + throw new Error("Plivo call create returned no request_uuid"); + } + + return { providerCallId: requestUuid, status: "initiated" }; + } + + async hangupCall(input: HangupCallInput): Promise { + const callUuid = this.requestUuidToCallUuid.get(input.providerCallId); + if (callUuid) { + await this.apiRequest({ + method: "DELETE", + endpoint: `/Call/${callUuid}/`, + allowNotFound: true, + }); + return; + } + + // Best-effort: try hangup (call UUID), then cancel (request UUID). + await this.apiRequest({ + method: "DELETE", + endpoint: `/Call/${input.providerCallId}/`, + allowNotFound: true, + }); + await this.apiRequest({ + method: "DELETE", + endpoint: `/Request/${input.providerCallId}/`, + allowNotFound: true, + }); + } + + async playTts(input: PlayTtsInput): Promise { + const callUuid = this.requestUuidToCallUuid.get(input.providerCallId) ?? + input.providerCallId; + const webhookBase = + this.callUuidToWebhookUrl.get(callUuid) || + this.callIdToWebhookUrl.get(input.callId); + if (!webhookBase) { + throw new Error("Missing webhook URL for this call (provider state missing)"); + } + + if (!callUuid) { + throw new Error("Missing Plivo CallUUID for playTts"); + } + + const transferUrl = new URL(webhookBase); + transferUrl.searchParams.set("provider", "plivo"); + transferUrl.searchParams.set("flow", "xml-speak"); + transferUrl.searchParams.set("callId", input.callId); + + this.pendingSpeakByCallId.set(input.callId, { + text: input.text, + locale: input.locale, + }); + + await this.apiRequest({ + method: "POST", + endpoint: `/Call/${callUuid}/`, + body: { + legs: "aleg", + aleg_url: transferUrl.toString(), + aleg_method: "POST", + }, + }); + } + + async startListening(input: StartListeningInput): Promise { + const callUuid = this.requestUuidToCallUuid.get(input.providerCallId) ?? + input.providerCallId; + const webhookBase = + this.callUuidToWebhookUrl.get(callUuid) || + this.callIdToWebhookUrl.get(input.callId); + if (!webhookBase) { + throw new Error("Missing webhook URL for this call (provider state missing)"); + } + + if (!callUuid) { + throw new Error("Missing Plivo CallUUID for startListening"); + } + + const transferUrl = new URL(webhookBase); + transferUrl.searchParams.set("provider", "plivo"); + transferUrl.searchParams.set("flow", "xml-listen"); + transferUrl.searchParams.set("callId", input.callId); + + this.pendingListenByCallId.set(input.callId, { + language: input.language, + }); + + await this.apiRequest({ + method: "POST", + endpoint: `/Call/${callUuid}/`, + body: { + legs: "aleg", + aleg_url: transferUrl.toString(), + aleg_method: "POST", + }, + }); + } + + async stopListening(_input: StopListeningInput): Promise { + // GetInput ends automatically when speech ends. + } + + private static normalizeNumber(numberOrSip: string): string { + const trimmed = numberOrSip.trim(); + if (trimmed.toLowerCase().startsWith("sip:")) return trimmed; + return trimmed.startsWith("+") ? trimmed.slice(1) : trimmed; + } + + private static xmlEmpty(): string { + return ``; + } + + private static xmlKeepAlive(): string { + return ` + + +`; + } + + private static xmlSpeak(text: string, locale?: string): string { + const language = locale || "en-US"; + return ` + + ${escapeXml(text)} + +`; + } + + private static xmlGetInputSpeech(params: { + actionUrl: string; + language?: string; + }): string { + const language = params.language || "en-US"; + return ` + + + + +`; + } + + private getCallIdFromQuery(ctx: WebhookContext): string | undefined { + const callId = + typeof ctx.query?.callId === "string" && ctx.query.callId.trim() + ? ctx.query.callId.trim() + : undefined; + return callId || undefined; + } + + private buildActionUrl( + ctx: WebhookContext, + opts: { flow: string; callId?: string }, + ): string | null { + const base = PlivoProvider.baseWebhookUrlFromCtx(ctx); + if (!base) return null; + + const u = new URL(base); + u.searchParams.set("provider", "plivo"); + u.searchParams.set("flow", opts.flow); + if (opts.callId) u.searchParams.set("callId", opts.callId); + return u.toString(); + } + + private static baseWebhookUrlFromCtx(ctx: WebhookContext): string | null { + try { + const u = new URL(reconstructWebhookUrl(ctx)); + return `${u.origin}${u.pathname}`; + } catch { + return null; + } + } + + private parseBody(rawBody: string): URLSearchParams | null { + try { + return new URLSearchParams(rawBody); + } catch { + return null; + } + } + + private static extractTranscript(params: URLSearchParams): string | null { + const candidates = [ + "Speech", + "Transcription", + "TranscriptionText", + "SpeechResult", + "RecognizedSpeech", + "Text", + ] as const; + + for (const key of candidates) { + const value = params.get(key); + if (value && value.trim()) return value.trim(); + } + return null; + } +} + +type PlivoCreateCallResponse = { + api_id?: string; + message?: string; + request_uuid?: string | string[]; +}; diff --git a/extensions/voice-call/src/runtime.ts b/extensions/voice-call/src/runtime.ts index 35e140252..08e7e5de2 100644 --- a/extensions/voice-call/src/runtime.ts +++ b/extensions/voice-call/src/runtime.ts @@ -4,6 +4,7 @@ import { validateProviderConfig } from "./config.js"; import { CallManager } from "./manager.js"; import type { VoiceCallProvider } from "./providers/base.js"; import { MockProvider } from "./providers/mock.js"; +import { PlivoProvider } from "./providers/plivo.js"; import { TelnyxProvider } from "./providers/telnyx.js"; import { OpenAITTSProvider } from "./providers/tts-openai.js"; import { TwilioProvider } from "./providers/twilio.js"; @@ -56,6 +57,18 @@ function resolveProvider(config: VoiceCallConfig): VoiceCallProvider { : undefined, }, ); + case "plivo": + return new PlivoProvider( + { + authId: config.plivo?.authId ?? process.env.PLIVO_AUTH_ID, + authToken: config.plivo?.authToken ?? process.env.PLIVO_AUTH_TOKEN, + }, + { + publicUrl: config.publicUrl, + skipVerification: config.skipSignatureVerification, + ringTimeoutSec: Math.max(1, Math.floor(config.ringTimeoutMs / 1000)), + }, + ); case "mock": return new MockProvider(); default: diff --git a/extensions/voice-call/src/types.ts b/extensions/voice-call/src/types.ts index 8c124091f..7f3928778 100644 --- a/extensions/voice-call/src/types.ts +++ b/extensions/voice-call/src/types.ts @@ -6,7 +6,7 @@ import type { CallMode } from "./config.js"; // Provider Identifiers // ----------------------------------------------------------------------------- -export const ProviderNameSchema = z.enum(["telnyx", "twilio", "mock"]); +export const ProviderNameSchema = z.enum(["telnyx", "twilio", "plivo", "mock"]); export type ProviderName = z.infer; // ----------------------------------------------------------------------------- diff --git a/extensions/voice-call/src/webhook-security.test.ts b/extensions/voice-call/src/webhook-security.test.ts new file mode 100644 index 000000000..058a760d3 --- /dev/null +++ b/extensions/voice-call/src/webhook-security.test.ts @@ -0,0 +1,156 @@ +import crypto from "node:crypto"; + +import { describe, expect, it } from "vitest"; + +import { verifyPlivoWebhook } from "./webhook-security.js"; + +function canonicalizeBase64(input: string): string { + return Buffer.from(input, "base64").toString("base64"); +} + +function plivoV2Signature(params: { + authToken: string; + urlNoQuery: string; + nonce: string; +}): string { + const digest = crypto + .createHmac("sha256", params.authToken) + .update(params.urlNoQuery + params.nonce) + .digest("base64"); + return canonicalizeBase64(digest); +} + +function plivoV3Signature(params: { + authToken: string; + urlWithQuery: string; + postBody: string; + nonce: string; +}): string { + const u = new URL(params.urlWithQuery); + const baseNoQuery = `${u.protocol}//${u.host}${u.pathname}`; + const queryPairs: Array<[string, string]> = []; + for (const [k, v] of u.searchParams.entries()) queryPairs.push([k, v]); + + const queryMap = new Map(); + for (const [k, v] of queryPairs) { + queryMap.set(k, (queryMap.get(k) ?? []).concat(v)); + } + + const sortedQuery = Array.from(queryMap.keys()) + .sort() + .flatMap((k) => + [...(queryMap.get(k) ?? [])].sort().map((v) => `${k}=${v}`), + ) + .join("&"); + + const postParams = new URLSearchParams(params.postBody); + const postMap = new Map(); + for (const [k, v] of postParams.entries()) { + postMap.set(k, (postMap.get(k) ?? []).concat(v)); + } + + const sortedPost = Array.from(postMap.keys()) + .sort() + .flatMap((k) => [...(postMap.get(k) ?? [])].sort().map((v) => `${k}${v}`)) + .join(""); + + const hasPost = sortedPost.length > 0; + let baseUrl = baseNoQuery; + if (sortedQuery.length > 0 || hasPost) { + baseUrl = `${baseNoQuery}?${sortedQuery}`; + } + if (sortedQuery.length > 0 && hasPost) { + baseUrl = `${baseUrl}.`; + } + baseUrl = `${baseUrl}${sortedPost}`; + + const digest = crypto + .createHmac("sha256", params.authToken) + .update(`${baseUrl}.${params.nonce}`) + .digest("base64"); + return canonicalizeBase64(digest); +} + +describe("verifyPlivoWebhook", () => { + it("accepts valid V2 signature", () => { + const authToken = "test-auth-token"; + const nonce = "nonce-123"; + + const ctxUrl = "http://local/voice/webhook?flow=answer&callId=abc"; + const verificationUrl = "https://example.com/voice/webhook"; + const signature = plivoV2Signature({ + authToken, + urlNoQuery: verificationUrl, + nonce, + }); + + const result = verifyPlivoWebhook( + { + headers: { + host: "example.com", + "x-forwarded-proto": "https", + "x-plivo-signature-v2": signature, + "x-plivo-signature-v2-nonce": nonce, + }, + rawBody: "CallUUID=uuid&CallStatus=in-progress", + url: ctxUrl, + method: "POST", + query: { flow: "answer", callId: "abc" }, + }, + authToken, + ); + + expect(result.ok).toBe(true); + expect(result.version).toBe("v2"); + }); + + it("accepts valid V3 signature (including multi-signature header)", () => { + const authToken = "test-auth-token"; + const nonce = "nonce-456"; + + const urlWithQuery = "https://example.com/voice/webhook?flow=answer&callId=abc"; + const postBody = "CallUUID=uuid&CallStatus=in-progress&From=%2B15550000000"; + + const good = plivoV3Signature({ + authToken, + urlWithQuery, + postBody, + nonce, + }); + + const result = verifyPlivoWebhook( + { + headers: { + host: "example.com", + "x-forwarded-proto": "https", + "x-plivo-signature-v3": `bad, ${good}`, + "x-plivo-signature-v3-nonce": nonce, + }, + rawBody: postBody, + url: urlWithQuery, + method: "POST", + query: { flow: "answer", callId: "abc" }, + }, + authToken, + ); + + expect(result.ok).toBe(true); + expect(result.version).toBe("v3"); + }); + + it("rejects missing signatures", () => { + const result = verifyPlivoWebhook( + { + headers: { host: "example.com", "x-forwarded-proto": "https" }, + rawBody: "", + url: "https://example.com/voice/webhook", + method: "POST", + }, + "token", + ); + + expect(result.ok).toBe(false); + expect(result.reason).toMatch(/Missing Plivo signature headers/); + }); +}); + diff --git a/extensions/voice-call/src/webhook-security.ts b/extensions/voice-call/src/webhook-security.ts index 0cfc491de..880eace79 100644 --- a/extensions/voice-call/src/webhook-security.ts +++ b/extensions/voice-call/src/webhook-security.ts @@ -195,3 +195,233 @@ export function verifyTwilioWebhook( isNgrokFreeTier, }; } + +// ----------------------------------------------------------------------------- +// Plivo webhook verification +// ----------------------------------------------------------------------------- + +/** + * Result of Plivo webhook verification with detailed info. + */ +export interface PlivoVerificationResult { + ok: boolean; + reason?: string; + verificationUrl?: string; + /** Signature version used for verification */ + version?: "v3" | "v2"; +} + +function normalizeSignatureBase64(input: string): string { + // Canonicalize base64 to match Plivo SDK behavior (decode then re-encode). + return Buffer.from(input, "base64").toString("base64"); +} + +function getBaseUrlNoQuery(url: string): string { + const u = new URL(url); + return `${u.protocol}//${u.host}${u.pathname}`; +} + +function timingSafeEqualString(a: string, b: string): boolean { + if (a.length !== b.length) { + const dummy = Buffer.from(a); + crypto.timingSafeEqual(dummy, dummy); + return false; + } + return crypto.timingSafeEqual(Buffer.from(a), Buffer.from(b)); +} + +function validatePlivoV2Signature(params: { + authToken: string; + signature: string; + nonce: string; + url: string; +}): boolean { + const baseUrl = getBaseUrlNoQuery(params.url); + const digest = crypto + .createHmac("sha256", params.authToken) + .update(baseUrl + params.nonce) + .digest("base64"); + const expected = normalizeSignatureBase64(digest); + const provided = normalizeSignatureBase64(params.signature); + return timingSafeEqualString(expected, provided); +} + +type PlivoParamMap = Record; + +function toParamMapFromSearchParams(sp: URLSearchParams): PlivoParamMap { + const map: PlivoParamMap = {}; + for (const [key, value] of sp.entries()) { + if (!map[key]) map[key] = []; + map[key].push(value); + } + return map; +} + +function sortedQueryString(params: PlivoParamMap): string { + const parts: string[] = []; + for (const key of Object.keys(params).sort()) { + const values = [...params[key]].sort(); + for (const value of values) { + parts.push(`${key}=${value}`); + } + } + return parts.join("&"); +} + +function sortedParamsString(params: PlivoParamMap): string { + const parts: string[] = []; + for (const key of Object.keys(params).sort()) { + const values = [...params[key]].sort(); + for (const value of values) { + parts.push(`${key}${value}`); + } + } + return parts.join(""); +} + +function constructPlivoV3BaseUrl(params: { + method: "GET" | "POST"; + url: string; + postParams: PlivoParamMap; +}): string { + const hasPostParams = Object.keys(params.postParams).length > 0; + const u = new URL(params.url); + const baseNoQuery = `${u.protocol}//${u.host}${u.pathname}`; + + const queryMap = toParamMapFromSearchParams(u.searchParams); + const queryString = sortedQueryString(queryMap); + + // In the Plivo V3 algorithm, the query portion is always sorted, and if we + // have POST params we add a '.' separator after the query string. + let baseUrl = baseNoQuery; + if (queryString.length > 0 || hasPostParams) { + baseUrl = `${baseNoQuery}?${queryString}`; + } + if (queryString.length > 0 && hasPostParams) { + baseUrl = `${baseUrl}.`; + } + + if (params.method === "GET") { + return baseUrl; + } + + return baseUrl + sortedParamsString(params.postParams); +} + +function validatePlivoV3Signature(params: { + authToken: string; + signatureHeader: string; + nonce: string; + method: "GET" | "POST"; + url: string; + postParams: PlivoParamMap; +}): boolean { + const baseUrl = constructPlivoV3BaseUrl({ + method: params.method, + url: params.url, + postParams: params.postParams, + }); + + const hmacBase = `${baseUrl}.${params.nonce}`; + const digest = crypto + .createHmac("sha256", params.authToken) + .update(hmacBase) + .digest("base64"); + const expected = normalizeSignatureBase64(digest); + + // Header can contain multiple signatures separated by commas. + const provided = params.signatureHeader + .split(",") + .map((s) => s.trim()) + .filter(Boolean) + .map((s) => normalizeSignatureBase64(s)); + + for (const sig of provided) { + if (timingSafeEqualString(expected, sig)) return true; + } + return false; +} + +/** + * Verify Plivo webhooks using V3 signature if present; fall back to V2. + * + * Header names (case-insensitive; Node provides lower-case keys): + * - V3: X-Plivo-Signature-V3 / X-Plivo-Signature-V3-Nonce + * - V2: X-Plivo-Signature-V2 / X-Plivo-Signature-V2-Nonce + */ +export function verifyPlivoWebhook( + ctx: WebhookContext, + authToken: string, + options?: { + /** Override the public URL origin (host) used for verification */ + publicUrl?: string; + /** Skip verification entirely (only for development) */ + skipVerification?: boolean; + }, +): PlivoVerificationResult { + if (options?.skipVerification) { + return { ok: true, reason: "verification skipped (dev mode)" }; + } + + const signatureV3 = getHeader(ctx.headers, "x-plivo-signature-v3"); + const nonceV3 = getHeader(ctx.headers, "x-plivo-signature-v3-nonce"); + const signatureV2 = getHeader(ctx.headers, "x-plivo-signature-v2"); + const nonceV2 = getHeader(ctx.headers, "x-plivo-signature-v2-nonce"); + + const reconstructed = reconstructWebhookUrl(ctx); + let verificationUrl = reconstructed; + if (options?.publicUrl) { + try { + const req = new URL(reconstructed); + const base = new URL(options.publicUrl); + base.pathname = req.pathname; + base.search = req.search; + verificationUrl = base.toString(); + } catch { + verificationUrl = reconstructed; + } + } + + if (signatureV3 && nonceV3) { + const postParams = toParamMapFromSearchParams(new URLSearchParams(ctx.rawBody)); + const ok = validatePlivoV3Signature({ + authToken, + signatureHeader: signatureV3, + nonce: nonceV3, + method: ctx.method, + url: verificationUrl, + postParams, + }); + return ok + ? { ok: true, version: "v3", verificationUrl } + : { + ok: false, + version: "v3", + verificationUrl, + reason: "Invalid Plivo V3 signature", + }; + } + + if (signatureV2 && nonceV2) { + const ok = validatePlivoV2Signature({ + authToken, + signature: signatureV2, + nonce: nonceV2, + url: verificationUrl, + }); + return ok + ? { ok: true, version: "v2", verificationUrl } + : { + ok: false, + version: "v2", + verificationUrl, + reason: "Invalid Plivo V2 signature", + }; + } + + return { + ok: false, + reason: "Missing Plivo signature headers (V3 or V2)", + verificationUrl, + }; +} diff --git a/skills/voice-call/SKILL.md b/skills/voice-call/SKILL.md index a15ba8d69..6f24f50a8 100644 --- a/skills/voice-call/SKILL.md +++ b/skills/voice-call/SKILL.md @@ -6,7 +6,7 @@ metadata: {"clawdbot":{"emoji":"📞","skillKey":"voice-call","requires":{"confi # Voice Call -Use the voice-call plugin to start or inspect calls (Twilio, Telnyx, or mock). +Use the voice-call plugin to start or inspect calls (Twilio, Telnyx, Plivo, or mock). ## CLI @@ -31,4 +31,5 @@ Notes: - Plugin config lives under `plugins.entries.voice-call.config`. - Twilio config: `provider: "twilio"` + `twilio.accountSid/authToken` + `fromNumber`. - Telnyx config: `provider: "telnyx"` + `telnyx.apiKey/connectionId` + `fromNumber`. +- Plivo config: `provider: "plivo"` + `plivo.authId/authToken` + `fromNumber`. - Dev fallback: `provider: "mock"` (no network). diff --git a/vitest.config.ts b/vitest.config.ts index d06c620c6..f540e33f9 100644 --- a/vitest.config.ts +++ b/vitest.config.ts @@ -2,7 +2,11 @@ import { defineConfig } from "vitest/config"; export default defineConfig({ test: { - include: ["src/**/*.test.ts", "test/format-error.test.ts"], + include: [ + "src/**/*.test.ts", + "extensions/**/*.test.ts", + "test/format-error.test.ts", + ], setupFiles: ["test/setup.ts"], exclude: [ "dist/**",