Voice Call: add Plivo provider

This commit is contained in:
vrknetha
2026-01-13 17:16:02 +05:30
committed by Peter Steinberger
parent 0a1eeedc10
commit 946b0229e8
15 changed files with 1861 additions and 244 deletions

View File

@@ -1,5 +1,5 @@
---
summary: "Voice Call plugin: outbound and inbound calls via Twilio/Telnyx, with CLI, tools, and streaming"
summary: "Voice Call plugin: outbound + inbound calls via Twilio/Telnyx/Plivo (plugin install + config + CLI)"
read_when:
- You want to place an outbound voice call from Clawdbot
- You are configuring or developing the voice-call plugin
@@ -7,34 +7,26 @@ read_when:
# Voice Call (plugin)
Voice calls for Clawdbot. Use it to place outbound notifications, run multi-turn
phone conversations, and accept inbound calls with an explicit policy.
Voice calls for Clawdbot via a plugin. Supports outbound notifications and
multi-turn conversations with inbound policies.
Current providers:
- `twilio` (Programmable Voice + Media Streams)
- `telnyx` (Call Control v2)
- `plivo` (Voice API + XML transfer + GetInput speech)
- `mock` (dev/no network)
What you get:
- Outbound calls in notify or conversation mode
- Inbound calls with allowlist or open policies
- Provider webhooks with signature verification
- Optional streaming (Twilio Media Streams + OpenAI Realtime STT)
- CLI commands, a tool surface, and JSONL call logs
Quick mental model:
1. Install plugin
2. Restart Gateway
3. Configure `plugins.entries.voice-call.config`
4. Expose a public webhook URL
5. Call via `clawdbot voicecall ...` or the `voice_call` tool
- Install plugin
- Restart Gateway
- Configure under `plugins.entries.voice-call.config`
- Use `clawdbot voicecall ...` or the `voice_call` tool
## Where it runs (local vs remote)
The Voice Call plugin runs inside the Gateway process.
The Voice Call plugin runs **inside the Gateway process**.
If you use a remote Gateway, install and configure the plugin on the machine
running the Gateway, then restart the Gateway to load it.
If you use a remote Gateway, install/configure the plugin on the **machine running the Gateway**, then restart the Gateway to load it.
## Install
@@ -55,15 +47,9 @@ cd ./extensions/voice-call && pnpm install
Restart the Gateway afterwards.
Note: use `pnpm` for repo work. Bun is not recommended and can cause issues in
other Clawdbot channels (especially WhatsApp and Telegram).
## Config
## Config overview
All config lives under `plugins.entries.voice-call.config`. Phone numbers must
be in E.164 format (`+15550001234`).
Minimal example (Twilio outbound only):
Set config under `plugins.entries.voice-call.config`:
```json5
{
@@ -72,16 +58,39 @@ Minimal example (Twilio outbound only):
"voice-call": {
enabled: true,
config: {
provider: "twilio",
provider: "twilio", // or "telnyx" | "plivo" | "mock"
fromNumber: "+15550001234",
toNumber: "+15550005678",
twilio: {
accountSid: "ACxxxxxxxx",
authToken: "..."
},
serve: { port: 3334, bind: "127.0.0.1", path: "/voice/webhook" },
publicUrl: "https://example.ngrok.app/voice/webhook",
outbound: { defaultMode: "notify", notifyHangupDelaySec: 3 }
plivo: {
authId: "MAxxxxxxxxxxxxxxxxxxxx",
authToken: "..."
},
// Webhook server
serve: {
port: 3334,
path: "/voice/webhook"
},
// Public exposure (pick one)
// publicUrl: "https://example.ngrok.app/voice/webhook",
// tunnel: { provider: "ngrok" },
// tailscale: { mode: "funnel", path: "/voice/webhook" }
outbound: {
defaultMode: "notify" // notify | conversation
},
streaming: {
enabled: true,
streamPath: "/voice/stream"
}
}
}
}
@@ -90,178 +99,27 @@ Minimal example (Twilio outbound only):
```
Notes:
- Twilio/Telnyx require a publicly reachable webhook URL.
- Twilio/Telnyx require a **publicly reachable** webhook URL.
- Plivo requires a **publicly reachable** webhook URL.
- `mock` is a local dev provider (no network calls).
- `skipSignatureVerification` is for local testing only.
## Public URL and webhook exposure
## Inbound calls
Providers send webhooks from the public internet. Your `serve.path` must be
reachable from them.
You have three options:
- `publicUrl`: you already have a public HTTPS URL pointing at the Gateway host.
- `tunnel`: use ngrok or Tailscale (recommended for quick setup).
- `tailscale`: legacy Tailscale serve/funnel config (still supported, but
`tunnel` is preferred).
Example using ngrok:
Inbound policy defaults to `disabled`. To enable inbound calls, set:
```json5
{
tunnel: {
provider: "ngrok",
ngrokAuthToken: "..."
}
inboundPolicy: "allowlist",
allowFrom: ["+15550001234"],
inboundGreeting: "Hello! How can I help?"
}
```
Example using Tailscale Funnel:
```json5
{
tunnel: { provider: "tailscale-funnel" }
}
```
CLI helper (Tailscale only):
```bash
clawdbot voicecall expose --mode funnel
```
If you use Tailscale Serve without Funnel, the URL is private to your tailnet,
so Twilio/Telnyx will not be able to reach it.
## Providers
### Twilio
Twilio uses Programmable Voice with optional Media Streams for real-time audio.
Required config:
- `twilio.accountSid` and `twilio.authToken`
- (or `TWILIO_ACCOUNT_SID` / `TWILIO_AUTH_TOKEN`)
- A Twilio phone number that can reach your webhook
Inbound setup:
- In the Twilio Console for your phone number, set the Voice webhook to your
public `serve.path` URL (HTTP POST).
Outbound setup:
- Outbound calls are created via Twilio API; the plugin supplies the webhook URL
per call.
Streaming (optional, Twilio only):
- Enable `streaming.enabled` and set `streaming.streamPath`
- Provide `OPENAI_API_KEY` or `streaming.openaiApiKey`
- The stream WebSocket URL is derived from your `publicUrl` host + `streamPath`
(https -> wss)
Signature verification:
- Webhooks are verified by default.
- If you are using ngrok free tier, leave `tunnel.allowNgrokFreeTier` as `true`
so URL rewriting does not break verification.
- Use `skipSignatureVerification` only for local dev.
### Telnyx
Telnyx uses Call Control v2.
Required config:
- `telnyx.apiKey` and `telnyx.connectionId`
- (or `TELNYX_API_KEY` / `TELNYX_CONNECTION_ID`)
Inbound setup:
- In your Telnyx Call Control App, set the webhook URL to your public
`serve.path`.
Signature verification:
- Set `telnyx.publicKey` to enable Ed25519 signature verification.
- If you do not set a public key, webhooks are accepted without verification
(not recommended for production).
Transcription:
- Telnyx uses its own transcription events for `continue` responses.
### Mock (dev)
`mock` is for local testing and does not make network calls.
## Call modes
Outbound calls support two modes:
- `notify`: speak a message and auto-hangup after `notifyHangupDelaySec`.
- `conversation`: keep the call open and allow back-and-forth.
Examples:
```bash
clawdbot voicecall call --to "+15555550123" --message "Hello" --mode notify
clawdbot voicecall call --to "+15555550123" --message "Ready to talk?" --mode conversation
```
## Inbound calls and policy
Inbound calls are blocked by default.
Policies:
- `disabled`: block all inbound calls
- `allowlist`: allow only numbers in `allowFrom`
- `pairing`: currently behaves like `allowlist`
- `open`: accept all inbound calls
Inbound greeting:
- `inboundGreeting` controls the first message spoken when a call is accepted.
## Auto-responses and models
When a caller speaks, the plugin can auto-respond using the embedded Clawdbot
agent.
Key settings:
- `responseModel`: model reference for voice responses (default `openai/gpt-4o-mini`)
- `responseSystemPrompt`: optional override for the voice system prompt
- `responseTimeoutMs`: response generation timeout
Responses use the same agent system as messaging, including tool access.
The default system prompt keeps replies short and conversational (about 1-2 sentences).
## Streaming (Twilio only)
When `streaming.enabled` is on:
- The webhook server also accepts WebSocket upgrades at `streaming.streamPath`.
- Audio is forwarded to OpenAI Realtime STT.
- Final transcripts are fed into the call manager and used by `continue` and
auto-responses.
Required:
- A public HTTPS URL for the Gateway (used to derive `wss://...`).
- `OPENAI_API_KEY` or `streaming.openaiApiKey`.
If no OpenAI key is available, streaming does not start and real-time transcripts
will not arrive.
## Limits and timeouts
These settings are enforced by the call manager:
- `maxDurationSeconds`: auto-hangup after this many seconds (starts when answered).
- `maxConcurrentCalls`: max simultaneous active calls.
- `transcriptTimeoutMs`: how long `continue` waits for a final transcript.
## Logs and debugging
Calls are appended as JSONL to:
- `${store}/calls.jsonl`, or
- `~/clawd/voice-calls/calls.jsonl` by default
Set `store` if you want a different base directory for call logs.
Use:
```bash
clawdbot voicecall tail
```
Auto-responses use the agent system. Tune with:
- `responseModel`
- `responseSystemPrompt`
- `responseTimeoutMs`
## CLI
@@ -286,7 +144,7 @@ Actions:
- `end_call` (callId)
- `get_status` (callId)
If you want a ready-made skill entry, grab it from [ClawdHub.com](https://ClawdHub.com).
This repo ships a matching skill doc at `skills/voice-call/SKILL.md`.
## Gateway RPC

View File

@@ -5,6 +5,7 @@ Official Voice Call plugin for **Clawdbot**.
Providers:
- **Twilio** (Programmable Voice + Media Streams)
- **Telnyx** (Call Control v2)
- **Plivo** (Voice API + XML transfer + GetInput speech)
- **Mock** (dev/no network)
Docs: `https://docs.clawd.bot/plugins/voice-call`
@@ -34,7 +35,7 @@ Put under `plugins.entries.voice-call.config`:
```json5
{
provider: "twilio", // or "telnyx" | "mock"
provider: "twilio", // or "telnyx" | "plivo" | "mock"
fromNumber: "+15550001234",
toNumber: "+15550005678",
@@ -43,6 +44,11 @@ Put under `plugins.entries.voice-call.config`:
authToken: "your_token"
},
plivo: {
authId: "MAxxxxxxxxxxxxxxxxxxxx",
authToken: "your_token"
},
// Webhook server
serve: {
port: 3334,
@@ -66,7 +72,7 @@ Put under `plugins.entries.voice-call.config`:
```
Notes:
- Twilio/Telnyx require a **publicly reachable** webhook URL.
- Twilio/Telnyx/Plivo require a **publicly reachable** webhook URL.
- `mock` is a local dev provider (no network calls).
## CLI
@@ -102,6 +108,6 @@ Actions:
## Notes
- Uses webhook signature verification for Twilio/Telnyx.
- Uses webhook signature verification for Twilio/Telnyx/Plivo.
- `responseModel` / `responseSystemPrompt` control AI auto-responses.
- Media streaming requires `ws` and OpenAI Realtime API key.

View File

@@ -125,7 +125,7 @@ const VoiceCallToolSchema = Type.Union([
const voiceCallPlugin = {
id: "voice-call",
name: "Voice Call",
description: "Voice-call plugin with Telnyx/Twilio providers",
description: "Voice-call plugin with Telnyx/Twilio/Plivo providers",
configSchema: voiceCallConfigSchema,
register(api) {
const cfg = voiceCallConfigSchema.parse(api.pluginConfig);

View File

@@ -53,6 +53,14 @@ export const TwilioConfigSchema = z.object({
});
export type TwilioConfig = z.infer<typeof TwilioConfigSchema>;
export const PlivoConfigSchema = z.object({
/** Plivo Auth ID (starts with MA/SA) */
authId: z.string().min(1).optional(),
/** Plivo Auth Token */
authToken: z.string().min(1).optional(),
});
export type PlivoConfig = z.infer<typeof PlivoConfigSchema>;
// -----------------------------------------------------------------------------
// STT/TTS Configuration
// -----------------------------------------------------------------------------
@@ -219,8 +227,8 @@ export const VoiceCallConfigSchema = z.object({
/** Enable voice call functionality */
enabled: z.boolean().default(false),
/** Active provider (telnyx, twilio, or mock) */
provider: z.enum(["telnyx", "twilio", "mock"]).optional(),
/** Active provider (telnyx, twilio, plivo, or mock) */
provider: z.enum(["telnyx", "twilio", "plivo", "mock"]).optional(),
/** Telnyx-specific configuration */
telnyx: TelnyxConfigSchema.optional(),
@@ -228,6 +236,9 @@ export const VoiceCallConfigSchema = z.object({
/** Twilio-specific configuration */
twilio: TwilioConfigSchema.optional(),
/** Plivo-specific configuration */
plivo: PlivoConfigSchema.optional(),
/** Phone number to call from (E.164) */
fromNumber: E164Schema.optional(),
@@ -351,5 +362,18 @@ export function validateProviderConfig(config: VoiceCallConfig): {
}
}
if (config.provider === "plivo") {
if (!config.plivo?.authId) {
errors.push(
"plugins.entries.voice-call.config.plivo.authId is required (or set PLIVO_AUTH_ID env)",
);
}
if (!config.plivo?.authToken) {
errors.push(
"plugins.entries.voice-call.config.plivo.authToken is required (or set PLIVO_AUTH_TOKEN env)",
);
}
}
return { valid: errors.length === 0, errors };
}

View File

@@ -0,0 +1,73 @@
import os from "node:os";
import path from "node:path";
import { describe, expect, it } from "vitest";
import { VoiceCallConfigSchema } from "./config.js";
import { CallManager } from "./manager.js";
import type {
HangupCallInput,
InitiateCallInput,
InitiateCallResult,
PlayTtsInput,
ProviderWebhookParseResult,
StartListeningInput,
StopListeningInput,
WebhookContext,
WebhookVerificationResult,
} from "./types.js";
import type { VoiceCallProvider } from "./providers/base.js";
class FakeProvider implements VoiceCallProvider {
readonly name = "plivo" as const;
verifyWebhook(_ctx: WebhookContext): WebhookVerificationResult {
return { ok: true };
}
parseWebhookEvent(_ctx: WebhookContext): ProviderWebhookParseResult {
return { events: [], statusCode: 200 };
}
async initiateCall(_input: InitiateCallInput): Promise<InitiateCallResult> {
return { providerCallId: "request-uuid", status: "initiated" };
}
async hangupCall(_input: HangupCallInput): Promise<void> {}
async playTts(_input: PlayTtsInput): Promise<void> {}
async startListening(_input: StartListeningInput): Promise<void> {}
async stopListening(_input: StopListeningInput): Promise<void> {}
}
describe("CallManager", () => {
it("upgrades providerCallId mapping when provider ID changes", async () => {
const config = VoiceCallConfigSchema.parse({
enabled: true,
provider: "plivo",
fromNumber: "+15550000000",
});
const storePath = path.join(os.tmpdir(), `clawdbot-voice-call-test-${Date.now()}`);
const manager = new CallManager(config, storePath);
manager.initialize(new FakeProvider(), "https://example.com/voice/webhook");
const { callId, success, error } = await manager.initiateCall("+15550000001");
expect(success).toBe(true);
expect(error).toBeUndefined();
// The provider returned a request UUID as the initial providerCallId.
expect(manager.getCall(callId)?.providerCallId).toBe("request-uuid");
expect(manager.getCallByProviderCallId("request-uuid")?.callId).toBe(callId);
// Provider later reports the actual call UUID.
manager.processEvent({
id: "evt-1",
type: "call.answered",
callId,
providerCallId: "call-uuid",
timestamp: Date.now(),
});
expect(manager.getCall(callId)?.providerCallId).toBe("call-uuid");
expect(manager.getCallByProviderCallId("call-uuid")?.callId).toBe(callId);
expect(manager.getCallByProviderCallId("request-uuid")).toBeUndefined();
});
});

View File

@@ -1,27 +1,23 @@
import crypto from "node:crypto";
import fs from "node:fs";
import fsp from "node:fs/promises";
import os from "node:os";
import path from "node:path";
import { resolveUserPath } from "./utils.js";
import type { VoiceCallConfig } from "./config.js";
import type { CallMode, VoiceCallConfig } from "./config.js";
import type { VoiceCallProvider } from "./providers/base.js";
import {
type CallId,
type CallRecord,
CallRecordSchema,
type CallState,
type NormalizedEvent,
type OutboundCallOptions,
TerminalStates,
type TranscriptEntry,
} from "./types.js";
import type { CallManagerContext } from "./manager/context.js";
import { processEvent } from "./manager/events.js";
import { getCallByProviderCallId } from "./manager/lookup.js";
import {
continueCall,
endCall,
initiateCall,
speak,
speakInitialMessage,
} from "./manager/outbound.js";
import { getCallHistoryFromStore, loadActiveCallsFromStore } from "./manager/store.js";
import { escapeXml, mapVoiceToPolly } from "./voice-mapping.js";
/**
* Manages voice calls: state machine, persistence, and provider coordination.
@@ -55,20 +51,6 @@ export class CallManager {
this.storePath = resolveUserPath(rawPath);
}
private buildContext(): CallManagerContext {
return {
activeCalls: this.activeCalls,
providerCallIdMap: this.providerCallIdMap,
processedEventIds: this.processedEventIds,
provider: this.provider,
config: this.config,
storePath: this.storePath,
webhookUrl: this.webhookUrl,
transcriptWaiters: this.transcriptWaiters,
maxDurationTimers: this.maxDurationTimers,
};
}
/**
* Initialize the call manager with a provider.
*/
@@ -80,10 +62,7 @@ export class CallManager {
fs.mkdirSync(this.storePath, { recursive: true });
// Load any persisted active calls
const restored = loadActiveCallsFromStore(this.storePath);
this.activeCalls = restored.activeCalls;
this.providerCallIdMap = restored.providerCallIdMap;
this.processedEventIds = restored.processedEventIds;
this.loadActiveCalls();
}
/**
@@ -104,7 +83,102 @@ export class CallManager {
sessionKey?: string,
options?: OutboundCallOptions | string,
): Promise<{ callId: CallId; success: boolean; error?: string }> {
return await initiateCall(this.buildContext(), to, sessionKey, options);
// Support legacy string argument for initialMessage
const opts: OutboundCallOptions =
typeof options === "string" ? { message: options } : (options ?? {});
const initialMessage = opts.message;
const mode = opts.mode ?? this.config.outbound.defaultMode;
if (!this.provider) {
return { callId: "", success: false, error: "Provider not initialized" };
}
if (!this.webhookUrl) {
return {
callId: "",
success: false,
error: "Webhook URL not configured",
};
}
// Check concurrent call limit
const activeCalls = this.getActiveCalls();
if (activeCalls.length >= this.config.maxConcurrentCalls) {
return {
callId: "",
success: false,
error: `Maximum concurrent calls (${this.config.maxConcurrentCalls}) reached`,
};
}
const callId = crypto.randomUUID();
const from =
this.config.fromNumber ||
(this.provider?.name === "mock" ? "+15550000000" : undefined);
if (!from) {
return { callId: "", success: false, error: "fromNumber not configured" };
}
// Create call record with mode in metadata
const callRecord: CallRecord = {
callId,
provider: this.provider.name,
direction: "outbound",
state: "initiated",
from,
to,
sessionKey,
startedAt: Date.now(),
transcript: [],
processedEventIds: [],
metadata: {
...(initialMessage && { initialMessage }),
mode,
},
};
this.activeCalls.set(callId, callRecord);
this.persistCallRecord(callRecord);
try {
// For notify mode with a message, use inline TwiML with <Say>
let inlineTwiml: string | undefined;
if (mode === "notify" && initialMessage) {
const pollyVoice = mapVoiceToPolly(this.config.tts.voice);
inlineTwiml = this.generateNotifyTwiml(initialMessage, pollyVoice);
console.log(
`[voice-call] Using inline TwiML for notify mode (voice: ${pollyVoice})`,
);
}
const result = await this.provider.initiateCall({
callId,
from,
to,
webhookUrl: this.webhookUrl,
inlineTwiml,
});
callRecord.providerCallId = result.providerCallId;
this.providerCallIdMap.set(result.providerCallId, callId); // Map providerCallId to internal callId
this.persistCallRecord(callRecord);
return { callId, success: true };
} catch (err) {
callRecord.state = "failed";
callRecord.endedAt = Date.now();
callRecord.endReason = "failed";
this.persistCallRecord(callRecord);
this.activeCalls.delete(callId);
if (callRecord.providerCallId) {
this.providerCallIdMap.delete(callRecord.providerCallId);
}
return {
callId,
success: false,
error: err instanceof Error ? err.message : String(err),
};
}
}
/**
@@ -114,7 +188,42 @@ export class CallManager {
callId: CallId,
text: string,
): Promise<{ success: boolean; error?: string }> {
return await speak(this.buildContext(), callId, text);
const call = this.activeCalls.get(callId);
if (!call) {
return { success: false, error: "Call not found" };
}
if (!this.provider || !call.providerCallId) {
return { success: false, error: "Call not connected" };
}
if (TerminalStates.has(call.state)) {
return { success: false, error: "Call has ended" };
}
try {
// Update state
call.state = "speaking";
this.persistCallRecord(call);
// Add to transcript
this.addTranscriptEntry(call, "bot", text);
// Play TTS
await this.provider.playTts({
callId,
providerCallId: call.providerCallId,
text,
voice: this.config.tts.voice,
});
return { success: true };
} catch (err) {
return {
success: false,
error: err instanceof Error ? err.message : String(err),
};
}
}
/**
@@ -123,7 +232,135 @@ export class CallManager {
* In notify mode, auto-hangup after the message is delivered.
*/
async speakInitialMessage(providerCallId: string): Promise<void> {
await speakInitialMessage(this.buildContext(), providerCallId);
const call = this.getCallByProviderCallId(providerCallId);
if (!call) {
console.warn(
`[voice-call] speakInitialMessage: no call found for ${providerCallId}`,
);
return;
}
const initialMessage = call.metadata?.initialMessage as string | undefined;
const mode = (call.metadata?.mode as CallMode) ?? "conversation";
if (!initialMessage) {
console.log(
`[voice-call] speakInitialMessage: no initial message for ${call.callId}`,
);
return;
}
// Clear the initial message so we don't speak it again
if (call.metadata) {
delete call.metadata.initialMessage;
this.persistCallRecord(call);
}
console.log(
`[voice-call] Speaking initial message for call ${call.callId} (mode: ${mode})`,
);
const result = await this.speak(call.callId, initialMessage);
if (!result.success) {
console.warn(
`[voice-call] Failed to speak initial message: ${result.error}`,
);
return;
}
// In notify mode, auto-hangup after delay
if (mode === "notify") {
const delaySec = this.config.outbound.notifyHangupDelaySec;
console.log(
`[voice-call] Notify mode: auto-hangup in ${delaySec}s for call ${call.callId}`,
);
setTimeout(async () => {
const currentCall = this.getCall(call.callId);
if (currentCall && !TerminalStates.has(currentCall.state)) {
console.log(
`[voice-call] Notify mode: hanging up call ${call.callId}`,
);
await this.endCall(call.callId);
}
}, delaySec * 1000);
}
}
/**
* Start max duration timer for a call.
* Auto-hangup when maxDurationSeconds is reached.
*/
private startMaxDurationTimer(callId: CallId): void {
// Clear any existing timer
this.clearMaxDurationTimer(callId);
const maxDurationMs = this.config.maxDurationSeconds * 1000;
console.log(
`[voice-call] Starting max duration timer (${this.config.maxDurationSeconds}s) for call ${callId}`,
);
const timer = setTimeout(async () => {
this.maxDurationTimers.delete(callId);
const call = this.getCall(callId);
if (call && !TerminalStates.has(call.state)) {
console.log(
`[voice-call] Max duration reached (${this.config.maxDurationSeconds}s), ending call ${callId}`,
);
call.endReason = "timeout";
this.persistCallRecord(call);
await this.endCall(callId);
}
}, maxDurationMs);
this.maxDurationTimers.set(callId, timer);
}
/**
* Clear max duration timer for a call.
*/
private clearMaxDurationTimer(callId: CallId): void {
const timer = this.maxDurationTimers.get(callId);
if (timer) {
clearTimeout(timer);
this.maxDurationTimers.delete(callId);
}
}
private clearTranscriptWaiter(callId: CallId): void {
const waiter = this.transcriptWaiters.get(callId);
if (!waiter) return;
clearTimeout(waiter.timeout);
this.transcriptWaiters.delete(callId);
}
private rejectTranscriptWaiter(callId: CallId, reason: string): void {
const waiter = this.transcriptWaiters.get(callId);
if (!waiter) return;
this.clearTranscriptWaiter(callId);
waiter.reject(new Error(reason));
}
private resolveTranscriptWaiter(callId: CallId, transcript: string): void {
const waiter = this.transcriptWaiters.get(callId);
if (!waiter) return;
this.clearTranscriptWaiter(callId);
waiter.resolve(transcript);
}
private waitForFinalTranscript(callId: CallId): Promise<string> {
// Only allow one in-flight waiter per call.
this.rejectTranscriptWaiter(callId, "Transcript waiter replaced");
const timeoutMs = this.config.transcriptTimeoutMs;
return new Promise((resolve, reject) => {
const timeout = setTimeout(() => {
this.transcriptWaiters.delete(callId);
reject(
new Error(`Timed out waiting for transcript after ${timeoutMs}ms`),
);
}, timeoutMs);
this.transcriptWaiters.set(callId, { resolve, reject, timeout });
});
}
/**
@@ -133,21 +370,343 @@ export class CallManager {
callId: CallId,
prompt: string,
): Promise<{ success: boolean; transcript?: string; error?: string }> {
return await continueCall(this.buildContext(), callId, prompt);
const call = this.activeCalls.get(callId);
if (!call) {
return { success: false, error: "Call not found" };
}
if (!this.provider || !call.providerCallId) {
return { success: false, error: "Call not connected" };
}
if (TerminalStates.has(call.state)) {
return { success: false, error: "Call has ended" };
}
try {
await this.speak(callId, prompt);
call.state = "listening";
this.persistCallRecord(call);
await this.provider.startListening({
callId,
providerCallId: call.providerCallId,
});
const transcript = await this.waitForFinalTranscript(callId);
// Best-effort: stop listening after final transcript.
await this.provider.stopListening({
callId,
providerCallId: call.providerCallId,
});
return { success: true, transcript };
} catch (err) {
return {
success: false,
error: err instanceof Error ? err.message : String(err),
};
} finally {
this.clearTranscriptWaiter(callId);
}
}
/**
* End an active call.
*/
async endCall(callId: CallId): Promise<{ success: boolean; error?: string }> {
return await endCall(this.buildContext(), callId);
const call = this.activeCalls.get(callId);
if (!call) {
return { success: false, error: "Call not found" };
}
if (!this.provider || !call.providerCallId) {
return { success: false, error: "Call not connected" };
}
if (TerminalStates.has(call.state)) {
return { success: true }; // Already ended
}
try {
await this.provider.hangupCall({
callId,
providerCallId: call.providerCallId,
reason: "hangup-bot",
});
call.state = "hangup-bot";
call.endedAt = Date.now();
call.endReason = "hangup-bot";
this.persistCallRecord(call);
this.clearMaxDurationTimer(callId);
this.rejectTranscriptWaiter(callId, "Call ended: hangup-bot");
this.activeCalls.delete(callId);
if (call.providerCallId) {
this.providerCallIdMap.delete(call.providerCallId);
}
return { success: true };
} catch (err) {
return {
success: false,
error: err instanceof Error ? err.message : String(err),
};
}
}
/**
* Check if an inbound call should be accepted based on policy.
*/
private shouldAcceptInbound(from: string | undefined): boolean {
const { inboundPolicy: policy, allowFrom } = this.config;
switch (policy) {
case "disabled":
console.log("[voice-call] Inbound call rejected: policy is disabled");
return false;
case "open":
console.log("[voice-call] Inbound call accepted: policy is open");
return true;
case "allowlist":
case "pairing": {
const normalized = from?.replace(/\D/g, "") || "";
const allowed = (allowFrom || []).some((num) => {
const normalizedAllow = num.replace(/\D/g, "");
return (
normalized.endsWith(normalizedAllow) ||
normalizedAllow.endsWith(normalized)
);
});
const status = allowed ? "accepted" : "rejected";
console.log(
`[voice-call] Inbound call ${status}: ${from} ${allowed ? "is in" : "not in"} allowlist`,
);
return allowed;
}
default:
return false;
}
}
/**
* Create a call record for an inbound call.
*/
private createInboundCall(
providerCallId: string,
from: string,
to: string,
): CallRecord {
const callId = crypto.randomUUID();
const callRecord: CallRecord = {
callId,
providerCallId,
provider: this.provider?.name || "twilio",
direction: "inbound",
state: "ringing",
from,
to,
startedAt: Date.now(),
transcript: [],
processedEventIds: [],
metadata: {
initialMessage:
this.config.inboundGreeting || "Hello! How can I help you today?",
},
};
this.activeCalls.set(callId, callRecord);
this.providerCallIdMap.set(providerCallId, callId); // Map providerCallId to internal callId
this.persistCallRecord(callRecord);
console.log(
`[voice-call] Created inbound call record: ${callId} from ${from}`,
);
return callRecord;
}
/**
* Look up a call by either internal callId or providerCallId.
*/
private findCall(callIdOrProviderCallId: string): CallRecord | undefined {
// Try direct lookup by internal callId
const directCall = this.activeCalls.get(callIdOrProviderCallId);
if (directCall) return directCall;
// Try lookup by providerCallId
return this.getCallByProviderCallId(callIdOrProviderCallId);
}
/**
* Process a webhook event.
*/
processEvent(event: NormalizedEvent): void {
processEvent(this.buildContext(), event);
// Idempotency check
if (this.processedEventIds.has(event.id)) {
return;
}
this.processedEventIds.add(event.id);
let call = this.findCall(event.callId);
// Handle inbound calls - create record if it doesn't exist
if (!call && event.direction === "inbound" && event.providerCallId) {
// Check if we should accept this inbound call
if (!this.shouldAcceptInbound(event.from)) {
// TODO: Could hang up the call here
return;
}
// Create a new call record for this inbound call
call = this.createInboundCall(
event.providerCallId,
event.from || "unknown",
event.to || this.config.fromNumber || "unknown",
);
// Update the event's callId to use our internal ID
event.callId = call.callId;
}
if (!call) {
// Still no call record - ignore event
return;
}
// Update provider call ID if we got it
if (event.providerCallId && event.providerCallId !== call.providerCallId) {
const previousProviderCallId = call.providerCallId;
call.providerCallId = event.providerCallId;
this.providerCallIdMap.set(event.providerCallId, call.callId);
if (previousProviderCallId) {
const mapped = this.providerCallIdMap.get(previousProviderCallId);
if (mapped === call.callId) {
this.providerCallIdMap.delete(previousProviderCallId);
}
}
}
// Track processed event
call.processedEventIds.push(event.id);
// Process event based on type
switch (event.type) {
case "call.initiated":
this.transitionState(call, "initiated");
break;
case "call.ringing":
this.transitionState(call, "ringing");
break;
case "call.answered":
call.answeredAt = event.timestamp;
this.transitionState(call, "answered");
// Start max duration timer when call is answered
this.startMaxDurationTimer(call.callId);
// Best-effort: speak initial message (for inbound greetings and outbound
// conversation mode) once the call is answered.
this.maybeSpeakInitialMessageOnAnswered(call);
break;
case "call.active":
this.transitionState(call, "active");
break;
case "call.speaking":
this.transitionState(call, "speaking");
break;
case "call.speech":
if (event.isFinal) {
this.addTranscriptEntry(call, "user", event.transcript);
this.resolveTranscriptWaiter(call.callId, event.transcript);
}
this.transitionState(call, "listening");
break;
case "call.ended":
call.endedAt = event.timestamp;
call.endReason = event.reason;
this.transitionState(call, event.reason as CallState);
this.clearMaxDurationTimer(call.callId);
this.rejectTranscriptWaiter(call.callId, `Call ended: ${event.reason}`);
this.activeCalls.delete(call.callId);
if (call.providerCallId) {
this.providerCallIdMap.delete(call.providerCallId);
}
break;
case "call.error":
if (!event.retryable) {
call.endedAt = event.timestamp;
call.endReason = "error";
this.transitionState(call, "error");
this.clearMaxDurationTimer(call.callId);
this.rejectTranscriptWaiter(
call.callId,
`Call error: ${event.error}`,
);
this.activeCalls.delete(call.callId);
if (call.providerCallId) {
this.providerCallIdMap.delete(call.providerCallId);
}
}
break;
}
this.persistCallRecord(call);
}
private maybeSpeakInitialMessageOnAnswered(call: CallRecord): void {
const initialMessage =
typeof call.metadata?.initialMessage === "string"
? call.metadata.initialMessage.trim()
: "";
if (!initialMessage) return;
// For outbound notify mode, we already use inline TwiML (provider-specific) to
// deliver the message and hang up; do not double-speak.
const mode = call.metadata?.mode as CallMode | undefined;
if (call.direction === "outbound" && mode === "notify") return;
if (!this.provider || !call.providerCallId) return;
// Twilio has provider-specific state for speaking (<Say> fallback) and can
// fail for inbound calls; keep existing Twilio behavior unchanged.
if (this.provider.name === "twilio") return;
// Clear the initial message so it only plays once.
if (call.metadata) {
delete call.metadata.initialMessage;
}
this.persistCallRecord(call);
void this.provider
.playTts({
callId: call.callId,
providerCallId: call.providerCallId,
text: initialMessage,
voice: this.config.tts.voice,
})
.then(() => {
this.addTranscriptEntry(call, "bot", initialMessage);
this.persistCallRecord(call);
})
.catch((err) => {
console.warn(
`[voice-call] Failed to speak initial message on answered: ${
err instanceof Error ? err.message : String(err)
}`,
);
});
}
/**
@@ -161,11 +720,20 @@ export class CallManager {
* Get an active call by provider call ID (e.g., Twilio CallSid).
*/
getCallByProviderCallId(providerCallId: string): CallRecord | undefined {
return getCallByProviderCallId({
activeCalls: this.activeCalls,
providerCallIdMap: this.providerCallIdMap,
providerCallId,
});
// Fast path: use the providerCallIdMap for O(1) lookup
const callId = this.providerCallIdMap.get(providerCallId);
if (callId) {
return this.activeCalls.get(callId);
}
// Fallback: linear search for cases where map wasn't populated
// (e.g., providerCallId set directly on call record)
for (const call of this.activeCalls.values()) {
if (call.providerCallId === providerCallId) {
return call;
}
}
return undefined;
}
/**
@@ -179,6 +747,156 @@ export class CallManager {
* Get call history (from persisted logs).
*/
async getCallHistory(limit = 50): Promise<CallRecord[]> {
return await getCallHistoryFromStore(this.storePath, limit);
const logPath = path.join(this.storePath, "calls.jsonl");
try {
await fsp.access(logPath);
} catch {
return [];
}
const content = await fsp.readFile(logPath, "utf-8");
const lines = content.trim().split("\n").filter(Boolean);
const calls: CallRecord[] = [];
// Parse last N lines
for (const line of lines.slice(-limit)) {
try {
const parsed = CallRecordSchema.parse(JSON.parse(line));
calls.push(parsed);
} catch {
// Skip invalid lines
}
}
return calls;
}
// States that can cycle during multi-turn conversations
private static readonly ConversationStates = new Set<CallState>([
"speaking",
"listening",
]);
// Non-terminal state order for monotonic transitions
private static readonly StateOrder: readonly CallState[] = [
"initiated",
"ringing",
"answered",
"active",
"speaking",
"listening",
];
/**
* Transition call state with monotonic enforcement.
*/
private transitionState(call: CallRecord, newState: CallState): void {
// No-op for same state or already terminal
if (call.state === newState || TerminalStates.has(call.state)) return;
// Terminal states can always be reached from non-terminal
if (TerminalStates.has(newState)) {
call.state = newState;
return;
}
// Allow cycling between speaking and listening (multi-turn conversations)
if (
CallManager.ConversationStates.has(call.state) &&
CallManager.ConversationStates.has(newState)
) {
call.state = newState;
return;
}
// Only allow forward transitions in state order
const currentIndex = CallManager.StateOrder.indexOf(call.state);
const newIndex = CallManager.StateOrder.indexOf(newState);
if (newIndex > currentIndex) {
call.state = newState;
}
}
/**
* Add an entry to the call transcript.
*/
private addTranscriptEntry(
call: CallRecord,
speaker: "bot" | "user",
text: string,
): void {
const entry: TranscriptEntry = {
timestamp: Date.now(),
speaker,
text,
isFinal: true,
};
call.transcript.push(entry);
}
/**
* Persist a call record to disk (fire-and-forget async).
*/
private persistCallRecord(call: CallRecord): void {
const logPath = path.join(this.storePath, "calls.jsonl");
const line = `${JSON.stringify(call)}\n`;
// Fire-and-forget async write to avoid blocking event loop
fsp.appendFile(logPath, line).catch((err) => {
console.error("[voice-call] Failed to persist call record:", err);
});
}
/**
* Load active calls from persistence (for crash recovery).
* Uses streaming to handle large log files efficiently.
*/
private loadActiveCalls(): void {
const logPath = path.join(this.storePath, "calls.jsonl");
if (!fs.existsSync(logPath)) return;
// Read file synchronously and parse lines
const content = fs.readFileSync(logPath, "utf-8");
const lines = content.split("\n");
// Build map of latest state per call
const callMap = new Map<CallId, CallRecord>();
for (const line of lines) {
if (!line.trim()) continue;
try {
const call = CallRecordSchema.parse(JSON.parse(line));
callMap.set(call.callId, call);
} catch {
// Skip invalid lines
}
}
// Only keep non-terminal calls
for (const [callId, call] of callMap) {
if (!TerminalStates.has(call.state)) {
this.activeCalls.set(callId, call);
// Populate providerCallId mapping for lookups
if (call.providerCallId) {
this.providerCallIdMap.set(call.providerCallId, callId);
}
// Populate processed event IDs
for (const eventId of call.processedEventIds) {
this.processedEventIds.add(eventId);
}
}
}
}
/**
* Generate TwiML for notify mode (speak message and hang up).
*/
private generateNotifyTwiml(message: string, voice: string): string {
return `<?xml version="1.0" encoding="UTF-8"?>
<Response>
<Say voice="${voice}">${escapeXml(message)}</Say>
<Hangup/>
</Response>`;
}
}

View File

@@ -7,3 +7,4 @@ export {
} from "./stt-openai-realtime.js";
export { TelnyxProvider } from "./telnyx.js";
export { TwilioProvider } from "./twilio.js";
export { PlivoProvider } from "./plivo.js";

View File

@@ -0,0 +1,29 @@
import { describe, expect, it } from "vitest";
import { PlivoProvider } from "./plivo.js";
describe("PlivoProvider", () => {
it("parses answer callback into call.answered and returns keep-alive XML", () => {
const provider = new PlivoProvider({
authId: "MA000000000000000000",
authToken: "test-token",
});
const result = provider.parseWebhookEvent({
headers: { host: "example.com" },
rawBody:
"CallUUID=call-uuid&CallStatus=in-progress&Direction=outbound&From=%2B15550000000&To=%2B15550000001&Event=StartApp",
url: "https://example.com/voice/webhook?provider=plivo&flow=answer&callId=internal-call-id",
method: "POST",
query: { provider: "plivo", flow: "answer", callId: "internal-call-id" },
});
expect(result.events).toHaveLength(1);
expect(result.events[0]?.type).toBe("call.answered");
expect(result.events[0]?.callId).toBe("internal-call-id");
expect(result.events[0]?.providerCallId).toBe("call-uuid");
expect(result.providerResponseBody).toContain("<Wait");
expect(result.providerResponseBody).toContain('length="300"');
});
});

View File

@@ -0,0 +1,504 @@
import crypto from "node:crypto";
import type { PlivoConfig } from "../config.js";
import type {
HangupCallInput,
InitiateCallInput,
InitiateCallResult,
NormalizedEvent,
PlayTtsInput,
ProviderWebhookParseResult,
StartListeningInput,
StopListeningInput,
WebhookContext,
WebhookVerificationResult,
} from "../types.js";
import { escapeXml } from "../voice-mapping.js";
import { reconstructWebhookUrl, verifyPlivoWebhook } from "../webhook-security.js";
import type { VoiceCallProvider } from "./base.js";
export interface PlivoProviderOptions {
/** Override public URL origin for signature verification */
publicUrl?: string;
/** Skip webhook signature verification (development only) */
skipVerification?: boolean;
/** Outbound ring timeout in seconds */
ringTimeoutSec?: number;
}
type PendingSpeak = { text: string; locale?: string };
type PendingListen = { language?: string };
export class PlivoProvider implements VoiceCallProvider {
readonly name = "plivo" as const;
private readonly authId: string;
private readonly authToken: string;
private readonly baseUrl: string;
private readonly options: PlivoProviderOptions;
// Best-effort mapping between create-call request UUID and call UUID.
private requestUuidToCallUuid = new Map<string, string>();
// Used for transfer URLs and GetInput action URLs.
private callIdToWebhookUrl = new Map<string, string>();
private callUuidToWebhookUrl = new Map<string, string>();
private pendingSpeakByCallId = new Map<string, PendingSpeak>();
private pendingListenByCallId = new Map<string, PendingListen>();
constructor(config: PlivoConfig, options: PlivoProviderOptions = {}) {
if (!config.authId) {
throw new Error("Plivo Auth ID is required");
}
if (!config.authToken) {
throw new Error("Plivo Auth Token is required");
}
this.authId = config.authId;
this.authToken = config.authToken;
this.baseUrl = `https://api.plivo.com/v1/Account/${this.authId}`;
this.options = options;
}
private async apiRequest<T = unknown>(params: {
method: "GET" | "POST" | "DELETE";
endpoint: string;
body?: Record<string, unknown>;
allowNotFound?: boolean;
}): Promise<T> {
const { method, endpoint, body, allowNotFound } = params;
const response = await fetch(`${this.baseUrl}${endpoint}`, {
method,
headers: {
Authorization: `Basic ${Buffer.from(`${this.authId}:${this.authToken}`).toString("base64")}`,
"Content-Type": "application/json",
},
body: body ? JSON.stringify(body) : undefined,
});
if (!response.ok) {
if (allowNotFound && response.status === 404) {
return undefined as T;
}
const errorText = await response.text();
throw new Error(`Plivo API error: ${response.status} ${errorText}`);
}
const text = await response.text();
return text ? (JSON.parse(text) as T) : (undefined as T);
}
verifyWebhook(ctx: WebhookContext): WebhookVerificationResult {
const result = verifyPlivoWebhook(ctx, this.authToken, {
publicUrl: this.options.publicUrl,
skipVerification: this.options.skipVerification,
});
if (!result.ok) {
console.warn(`[plivo] Webhook verification failed: ${result.reason}`);
}
return { ok: result.ok, reason: result.reason };
}
parseWebhookEvent(ctx: WebhookContext): ProviderWebhookParseResult {
const flow =
typeof ctx.query?.flow === "string" ? ctx.query.flow.trim() : "";
const parsed = this.parseBody(ctx.rawBody);
if (!parsed) {
return { events: [], statusCode: 400 };
}
// Keep providerCallId mapping for later call control.
const callUuid = parsed.get("CallUUID") || undefined;
if (callUuid) {
const webhookBase = PlivoProvider.baseWebhookUrlFromCtx(ctx);
if (webhookBase) {
this.callUuidToWebhookUrl.set(callUuid, webhookBase);
}
}
// Special flows that exist only to return Plivo XML (no events).
if (flow === "xml-speak") {
const callId = this.getCallIdFromQuery(ctx);
const pending = callId ? this.pendingSpeakByCallId.get(callId) : undefined;
if (callId) this.pendingSpeakByCallId.delete(callId);
const xml = pending
? PlivoProvider.xmlSpeak(pending.text, pending.locale)
: PlivoProvider.xmlKeepAlive();
return {
events: [],
providerResponseBody: xml,
providerResponseHeaders: { "Content-Type": "text/xml" },
statusCode: 200,
};
}
if (flow === "xml-listen") {
const callId = this.getCallIdFromQuery(ctx);
const pending = callId
? this.pendingListenByCallId.get(callId)
: undefined;
if (callId) this.pendingListenByCallId.delete(callId);
const actionUrl = this.buildActionUrl(ctx, {
flow: "getinput",
callId,
});
const xml =
actionUrl && callId
? PlivoProvider.xmlGetInputSpeech({
actionUrl,
language: pending?.language,
})
: PlivoProvider.xmlKeepAlive();
return {
events: [],
providerResponseBody: xml,
providerResponseHeaders: { "Content-Type": "text/xml" },
statusCode: 200,
};
}
// Normal events.
const callIdFromQuery = this.getCallIdFromQuery(ctx);
const event = this.normalizeEvent(parsed, callIdFromQuery);
return {
events: event ? [event] : [],
providerResponseBody:
flow === "answer" || flow === "getinput"
? PlivoProvider.xmlKeepAlive()
: PlivoProvider.xmlEmpty(),
providerResponseHeaders: { "Content-Type": "text/xml" },
statusCode: 200,
};
}
private normalizeEvent(
params: URLSearchParams,
callIdOverride?: string,
): NormalizedEvent | null {
const callUuid = params.get("CallUUID") || "";
const requestUuid = params.get("RequestUUID") || "";
if (requestUuid && callUuid) {
this.requestUuidToCallUuid.set(requestUuid, callUuid);
}
const direction = params.get("Direction");
const from = params.get("From") || undefined;
const to = params.get("To") || undefined;
const callStatus = params.get("CallStatus");
const baseEvent = {
id: crypto.randomUUID(),
callId: callIdOverride || callUuid || requestUuid,
providerCallId: callUuid || requestUuid || undefined,
timestamp: Date.now(),
direction:
direction === "inbound"
? ("inbound" as const)
: direction === "outbound"
? ("outbound" as const)
: undefined,
from,
to,
};
const digits = params.get("Digits");
if (digits) {
return { ...baseEvent, type: "call.dtmf", digits };
}
const transcript = PlivoProvider.extractTranscript(params);
if (transcript) {
return {
...baseEvent,
type: "call.speech",
transcript,
isFinal: true,
};
}
// Call lifecycle.
if (callStatus === "ringing") {
return { ...baseEvent, type: "call.ringing" };
}
if (callStatus === "in-progress") {
return { ...baseEvent, type: "call.answered" };
}
if (
callStatus === "completed" ||
callStatus === "busy" ||
callStatus === "no-answer" ||
callStatus === "failed"
) {
return {
...baseEvent,
type: "call.ended",
reason:
callStatus === "completed"
? "completed"
: callStatus === "busy"
? "busy"
: callStatus === "no-answer"
? "no-answer"
: "failed",
};
}
// Plivo will call our answer_url when the call is answered; if we don't have
// a CallStatus for some reason, treat it as answered so the call can proceed.
if (params.get("Event") === "StartApp" && callUuid) {
return { ...baseEvent, type: "call.answered" };
}
return null;
}
async initiateCall(input: InitiateCallInput): Promise<InitiateCallResult> {
const webhookUrl = new URL(input.webhookUrl);
webhookUrl.searchParams.set("provider", "plivo");
webhookUrl.searchParams.set("callId", input.callId);
const answerUrl = new URL(webhookUrl);
answerUrl.searchParams.set("flow", "answer");
const hangupUrl = new URL(webhookUrl);
hangupUrl.searchParams.set("flow", "hangup");
this.callIdToWebhookUrl.set(input.callId, input.webhookUrl);
const ringTimeoutSec = this.options.ringTimeoutSec ?? 30;
const result = await this.apiRequest<PlivoCreateCallResponse>({
method: "POST",
endpoint: "/Call/",
body: {
from: PlivoProvider.normalizeNumber(input.from),
to: PlivoProvider.normalizeNumber(input.to),
answer_url: answerUrl.toString(),
answer_method: "POST",
hangup_url: hangupUrl.toString(),
hangup_method: "POST",
// Plivo's API uses `hangup_on_ring` for outbound ring timeout.
hangup_on_ring: ringTimeoutSec,
},
});
const requestUuid = Array.isArray(result.request_uuid)
? result.request_uuid[0]
: result.request_uuid;
if (!requestUuid) {
throw new Error("Plivo call create returned no request_uuid");
}
return { providerCallId: requestUuid, status: "initiated" };
}
async hangupCall(input: HangupCallInput): Promise<void> {
const callUuid = this.requestUuidToCallUuid.get(input.providerCallId);
if (callUuid) {
await this.apiRequest({
method: "DELETE",
endpoint: `/Call/${callUuid}/`,
allowNotFound: true,
});
return;
}
// Best-effort: try hangup (call UUID), then cancel (request UUID).
await this.apiRequest({
method: "DELETE",
endpoint: `/Call/${input.providerCallId}/`,
allowNotFound: true,
});
await this.apiRequest({
method: "DELETE",
endpoint: `/Request/${input.providerCallId}/`,
allowNotFound: true,
});
}
async playTts(input: PlayTtsInput): Promise<void> {
const callUuid = this.requestUuidToCallUuid.get(input.providerCallId) ??
input.providerCallId;
const webhookBase =
this.callUuidToWebhookUrl.get(callUuid) ||
this.callIdToWebhookUrl.get(input.callId);
if (!webhookBase) {
throw new Error("Missing webhook URL for this call (provider state missing)");
}
if (!callUuid) {
throw new Error("Missing Plivo CallUUID for playTts");
}
const transferUrl = new URL(webhookBase);
transferUrl.searchParams.set("provider", "plivo");
transferUrl.searchParams.set("flow", "xml-speak");
transferUrl.searchParams.set("callId", input.callId);
this.pendingSpeakByCallId.set(input.callId, {
text: input.text,
locale: input.locale,
});
await this.apiRequest({
method: "POST",
endpoint: `/Call/${callUuid}/`,
body: {
legs: "aleg",
aleg_url: transferUrl.toString(),
aleg_method: "POST",
},
});
}
async startListening(input: StartListeningInput): Promise<void> {
const callUuid = this.requestUuidToCallUuid.get(input.providerCallId) ??
input.providerCallId;
const webhookBase =
this.callUuidToWebhookUrl.get(callUuid) ||
this.callIdToWebhookUrl.get(input.callId);
if (!webhookBase) {
throw new Error("Missing webhook URL for this call (provider state missing)");
}
if (!callUuid) {
throw new Error("Missing Plivo CallUUID for startListening");
}
const transferUrl = new URL(webhookBase);
transferUrl.searchParams.set("provider", "plivo");
transferUrl.searchParams.set("flow", "xml-listen");
transferUrl.searchParams.set("callId", input.callId);
this.pendingListenByCallId.set(input.callId, {
language: input.language,
});
await this.apiRequest({
method: "POST",
endpoint: `/Call/${callUuid}/`,
body: {
legs: "aleg",
aleg_url: transferUrl.toString(),
aleg_method: "POST",
},
});
}
async stopListening(_input: StopListeningInput): Promise<void> {
// GetInput ends automatically when speech ends.
}
private static normalizeNumber(numberOrSip: string): string {
const trimmed = numberOrSip.trim();
if (trimmed.toLowerCase().startsWith("sip:")) return trimmed;
return trimmed.startsWith("+") ? trimmed.slice(1) : trimmed;
}
private static xmlEmpty(): string {
return `<?xml version="1.0" encoding="UTF-8"?><Response></Response>`;
}
private static xmlKeepAlive(): string {
return `<?xml version="1.0" encoding="UTF-8"?>
<Response>
<Wait length="300" />
</Response>`;
}
private static xmlSpeak(text: string, locale?: string): string {
const language = locale || "en-US";
return `<?xml version="1.0" encoding="UTF-8"?>
<Response>
<Speak language="${escapeXml(language)}">${escapeXml(text)}</Speak>
<Wait length="300" />
</Response>`;
}
private static xmlGetInputSpeech(params: {
actionUrl: string;
language?: string;
}): string {
const language = params.language || "en-US";
return `<?xml version="1.0" encoding="UTF-8"?>
<Response>
<GetInput inputType="speech" method="POST" action="${escapeXml(params.actionUrl)}" language="${escapeXml(language)}" executionTimeout="30" speechEndTimeout="1" redirect="false">
</GetInput>
<Wait length="300" />
</Response>`;
}
private getCallIdFromQuery(ctx: WebhookContext): string | undefined {
const callId =
typeof ctx.query?.callId === "string" && ctx.query.callId.trim()
? ctx.query.callId.trim()
: undefined;
return callId || undefined;
}
private buildActionUrl(
ctx: WebhookContext,
opts: { flow: string; callId?: string },
): string | null {
const base = PlivoProvider.baseWebhookUrlFromCtx(ctx);
if (!base) return null;
const u = new URL(base);
u.searchParams.set("provider", "plivo");
u.searchParams.set("flow", opts.flow);
if (opts.callId) u.searchParams.set("callId", opts.callId);
return u.toString();
}
private static baseWebhookUrlFromCtx(ctx: WebhookContext): string | null {
try {
const u = new URL(reconstructWebhookUrl(ctx));
return `${u.origin}${u.pathname}`;
} catch {
return null;
}
}
private parseBody(rawBody: string): URLSearchParams | null {
try {
return new URLSearchParams(rawBody);
} catch {
return null;
}
}
private static extractTranscript(params: URLSearchParams): string | null {
const candidates = [
"Speech",
"Transcription",
"TranscriptionText",
"SpeechResult",
"RecognizedSpeech",
"Text",
] as const;
for (const key of candidates) {
const value = params.get(key);
if (value && value.trim()) return value.trim();
}
return null;
}
}
type PlivoCreateCallResponse = {
api_id?: string;
message?: string;
request_uuid?: string | string[];
};

View File

@@ -4,6 +4,7 @@ import { validateProviderConfig } from "./config.js";
import { CallManager } from "./manager.js";
import type { VoiceCallProvider } from "./providers/base.js";
import { MockProvider } from "./providers/mock.js";
import { PlivoProvider } from "./providers/plivo.js";
import { TelnyxProvider } from "./providers/telnyx.js";
import { OpenAITTSProvider } from "./providers/tts-openai.js";
import { TwilioProvider } from "./providers/twilio.js";
@@ -56,6 +57,18 @@ function resolveProvider(config: VoiceCallConfig): VoiceCallProvider {
: undefined,
},
);
case "plivo":
return new PlivoProvider(
{
authId: config.plivo?.authId ?? process.env.PLIVO_AUTH_ID,
authToken: config.plivo?.authToken ?? process.env.PLIVO_AUTH_TOKEN,
},
{
publicUrl: config.publicUrl,
skipVerification: config.skipSignatureVerification,
ringTimeoutSec: Math.max(1, Math.floor(config.ringTimeoutMs / 1000)),
},
);
case "mock":
return new MockProvider();
default:

View File

@@ -6,7 +6,7 @@ import type { CallMode } from "./config.js";
// Provider Identifiers
// -----------------------------------------------------------------------------
export const ProviderNameSchema = z.enum(["telnyx", "twilio", "mock"]);
export const ProviderNameSchema = z.enum(["telnyx", "twilio", "plivo", "mock"]);
export type ProviderName = z.infer<typeof ProviderNameSchema>;
// -----------------------------------------------------------------------------

View File

@@ -0,0 +1,156 @@
import crypto from "node:crypto";
import { describe, expect, it } from "vitest";
import { verifyPlivoWebhook } from "./webhook-security.js";
function canonicalizeBase64(input: string): string {
return Buffer.from(input, "base64").toString("base64");
}
function plivoV2Signature(params: {
authToken: string;
urlNoQuery: string;
nonce: string;
}): string {
const digest = crypto
.createHmac("sha256", params.authToken)
.update(params.urlNoQuery + params.nonce)
.digest("base64");
return canonicalizeBase64(digest);
}
function plivoV3Signature(params: {
authToken: string;
urlWithQuery: string;
postBody: string;
nonce: string;
}): string {
const u = new URL(params.urlWithQuery);
const baseNoQuery = `${u.protocol}//${u.host}${u.pathname}`;
const queryPairs: Array<[string, string]> = [];
for (const [k, v] of u.searchParams.entries()) queryPairs.push([k, v]);
const queryMap = new Map<string, string[]>();
for (const [k, v] of queryPairs) {
queryMap.set(k, (queryMap.get(k) ?? []).concat(v));
}
const sortedQuery = Array.from(queryMap.keys())
.sort()
.flatMap((k) =>
[...(queryMap.get(k) ?? [])].sort().map((v) => `${k}=${v}`),
)
.join("&");
const postParams = new URLSearchParams(params.postBody);
const postMap = new Map<string, string[]>();
for (const [k, v] of postParams.entries()) {
postMap.set(k, (postMap.get(k) ?? []).concat(v));
}
const sortedPost = Array.from(postMap.keys())
.sort()
.flatMap((k) => [...(postMap.get(k) ?? [])].sort().map((v) => `${k}${v}`))
.join("");
const hasPost = sortedPost.length > 0;
let baseUrl = baseNoQuery;
if (sortedQuery.length > 0 || hasPost) {
baseUrl = `${baseNoQuery}?${sortedQuery}`;
}
if (sortedQuery.length > 0 && hasPost) {
baseUrl = `${baseUrl}.`;
}
baseUrl = `${baseUrl}${sortedPost}`;
const digest = crypto
.createHmac("sha256", params.authToken)
.update(`${baseUrl}.${params.nonce}`)
.digest("base64");
return canonicalizeBase64(digest);
}
describe("verifyPlivoWebhook", () => {
it("accepts valid V2 signature", () => {
const authToken = "test-auth-token";
const nonce = "nonce-123";
const ctxUrl = "http://local/voice/webhook?flow=answer&callId=abc";
const verificationUrl = "https://example.com/voice/webhook";
const signature = plivoV2Signature({
authToken,
urlNoQuery: verificationUrl,
nonce,
});
const result = verifyPlivoWebhook(
{
headers: {
host: "example.com",
"x-forwarded-proto": "https",
"x-plivo-signature-v2": signature,
"x-plivo-signature-v2-nonce": nonce,
},
rawBody: "CallUUID=uuid&CallStatus=in-progress",
url: ctxUrl,
method: "POST",
query: { flow: "answer", callId: "abc" },
},
authToken,
);
expect(result.ok).toBe(true);
expect(result.version).toBe("v2");
});
it("accepts valid V3 signature (including multi-signature header)", () => {
const authToken = "test-auth-token";
const nonce = "nonce-456";
const urlWithQuery = "https://example.com/voice/webhook?flow=answer&callId=abc";
const postBody = "CallUUID=uuid&CallStatus=in-progress&From=%2B15550000000";
const good = plivoV3Signature({
authToken,
urlWithQuery,
postBody,
nonce,
});
const result = verifyPlivoWebhook(
{
headers: {
host: "example.com",
"x-forwarded-proto": "https",
"x-plivo-signature-v3": `bad, ${good}`,
"x-plivo-signature-v3-nonce": nonce,
},
rawBody: postBody,
url: urlWithQuery,
method: "POST",
query: { flow: "answer", callId: "abc" },
},
authToken,
);
expect(result.ok).toBe(true);
expect(result.version).toBe("v3");
});
it("rejects missing signatures", () => {
const result = verifyPlivoWebhook(
{
headers: { host: "example.com", "x-forwarded-proto": "https" },
rawBody: "",
url: "https://example.com/voice/webhook",
method: "POST",
},
"token",
);
expect(result.ok).toBe(false);
expect(result.reason).toMatch(/Missing Plivo signature headers/);
});
});

View File

@@ -195,3 +195,233 @@ export function verifyTwilioWebhook(
isNgrokFreeTier,
};
}
// -----------------------------------------------------------------------------
// Plivo webhook verification
// -----------------------------------------------------------------------------
/**
* Result of Plivo webhook verification with detailed info.
*/
export interface PlivoVerificationResult {
ok: boolean;
reason?: string;
verificationUrl?: string;
/** Signature version used for verification */
version?: "v3" | "v2";
}
function normalizeSignatureBase64(input: string): string {
// Canonicalize base64 to match Plivo SDK behavior (decode then re-encode).
return Buffer.from(input, "base64").toString("base64");
}
function getBaseUrlNoQuery(url: string): string {
const u = new URL(url);
return `${u.protocol}//${u.host}${u.pathname}`;
}
function timingSafeEqualString(a: string, b: string): boolean {
if (a.length !== b.length) {
const dummy = Buffer.from(a);
crypto.timingSafeEqual(dummy, dummy);
return false;
}
return crypto.timingSafeEqual(Buffer.from(a), Buffer.from(b));
}
function validatePlivoV2Signature(params: {
authToken: string;
signature: string;
nonce: string;
url: string;
}): boolean {
const baseUrl = getBaseUrlNoQuery(params.url);
const digest = crypto
.createHmac("sha256", params.authToken)
.update(baseUrl + params.nonce)
.digest("base64");
const expected = normalizeSignatureBase64(digest);
const provided = normalizeSignatureBase64(params.signature);
return timingSafeEqualString(expected, provided);
}
type PlivoParamMap = Record<string, string[]>;
function toParamMapFromSearchParams(sp: URLSearchParams): PlivoParamMap {
const map: PlivoParamMap = {};
for (const [key, value] of sp.entries()) {
if (!map[key]) map[key] = [];
map[key].push(value);
}
return map;
}
function sortedQueryString(params: PlivoParamMap): string {
const parts: string[] = [];
for (const key of Object.keys(params).sort()) {
const values = [...params[key]].sort();
for (const value of values) {
parts.push(`${key}=${value}`);
}
}
return parts.join("&");
}
function sortedParamsString(params: PlivoParamMap): string {
const parts: string[] = [];
for (const key of Object.keys(params).sort()) {
const values = [...params[key]].sort();
for (const value of values) {
parts.push(`${key}${value}`);
}
}
return parts.join("");
}
function constructPlivoV3BaseUrl(params: {
method: "GET" | "POST";
url: string;
postParams: PlivoParamMap;
}): string {
const hasPostParams = Object.keys(params.postParams).length > 0;
const u = new URL(params.url);
const baseNoQuery = `${u.protocol}//${u.host}${u.pathname}`;
const queryMap = toParamMapFromSearchParams(u.searchParams);
const queryString = sortedQueryString(queryMap);
// In the Plivo V3 algorithm, the query portion is always sorted, and if we
// have POST params we add a '.' separator after the query string.
let baseUrl = baseNoQuery;
if (queryString.length > 0 || hasPostParams) {
baseUrl = `${baseNoQuery}?${queryString}`;
}
if (queryString.length > 0 && hasPostParams) {
baseUrl = `${baseUrl}.`;
}
if (params.method === "GET") {
return baseUrl;
}
return baseUrl + sortedParamsString(params.postParams);
}
function validatePlivoV3Signature(params: {
authToken: string;
signatureHeader: string;
nonce: string;
method: "GET" | "POST";
url: string;
postParams: PlivoParamMap;
}): boolean {
const baseUrl = constructPlivoV3BaseUrl({
method: params.method,
url: params.url,
postParams: params.postParams,
});
const hmacBase = `${baseUrl}.${params.nonce}`;
const digest = crypto
.createHmac("sha256", params.authToken)
.update(hmacBase)
.digest("base64");
const expected = normalizeSignatureBase64(digest);
// Header can contain multiple signatures separated by commas.
const provided = params.signatureHeader
.split(",")
.map((s) => s.trim())
.filter(Boolean)
.map((s) => normalizeSignatureBase64(s));
for (const sig of provided) {
if (timingSafeEqualString(expected, sig)) return true;
}
return false;
}
/**
* Verify Plivo webhooks using V3 signature if present; fall back to V2.
*
* Header names (case-insensitive; Node provides lower-case keys):
* - V3: X-Plivo-Signature-V3 / X-Plivo-Signature-V3-Nonce
* - V2: X-Plivo-Signature-V2 / X-Plivo-Signature-V2-Nonce
*/
export function verifyPlivoWebhook(
ctx: WebhookContext,
authToken: string,
options?: {
/** Override the public URL origin (host) used for verification */
publicUrl?: string;
/** Skip verification entirely (only for development) */
skipVerification?: boolean;
},
): PlivoVerificationResult {
if (options?.skipVerification) {
return { ok: true, reason: "verification skipped (dev mode)" };
}
const signatureV3 = getHeader(ctx.headers, "x-plivo-signature-v3");
const nonceV3 = getHeader(ctx.headers, "x-plivo-signature-v3-nonce");
const signatureV2 = getHeader(ctx.headers, "x-plivo-signature-v2");
const nonceV2 = getHeader(ctx.headers, "x-plivo-signature-v2-nonce");
const reconstructed = reconstructWebhookUrl(ctx);
let verificationUrl = reconstructed;
if (options?.publicUrl) {
try {
const req = new URL(reconstructed);
const base = new URL(options.publicUrl);
base.pathname = req.pathname;
base.search = req.search;
verificationUrl = base.toString();
} catch {
verificationUrl = reconstructed;
}
}
if (signatureV3 && nonceV3) {
const postParams = toParamMapFromSearchParams(new URLSearchParams(ctx.rawBody));
const ok = validatePlivoV3Signature({
authToken,
signatureHeader: signatureV3,
nonce: nonceV3,
method: ctx.method,
url: verificationUrl,
postParams,
});
return ok
? { ok: true, version: "v3", verificationUrl }
: {
ok: false,
version: "v3",
verificationUrl,
reason: "Invalid Plivo V3 signature",
};
}
if (signatureV2 && nonceV2) {
const ok = validatePlivoV2Signature({
authToken,
signature: signatureV2,
nonce: nonceV2,
url: verificationUrl,
});
return ok
? { ok: true, version: "v2", verificationUrl }
: {
ok: false,
version: "v2",
verificationUrl,
reason: "Invalid Plivo V2 signature",
};
}
return {
ok: false,
reason: "Missing Plivo signature headers (V3 or V2)",
verificationUrl,
};
}

View File

@@ -6,7 +6,7 @@ metadata: {"clawdbot":{"emoji":"📞","skillKey":"voice-call","requires":{"confi
# Voice Call
Use the voice-call plugin to start or inspect calls (Twilio, Telnyx, or mock).
Use the voice-call plugin to start or inspect calls (Twilio, Telnyx, Plivo, or mock).
## CLI
@@ -31,4 +31,5 @@ Notes:
- Plugin config lives under `plugins.entries.voice-call.config`.
- Twilio config: `provider: "twilio"` + `twilio.accountSid/authToken` + `fromNumber`.
- Telnyx config: `provider: "telnyx"` + `telnyx.apiKey/connectionId` + `fromNumber`.
- Plivo config: `provider: "plivo"` + `plivo.authId/authToken` + `fromNumber`.
- Dev fallback: `provider: "mock"` (no network).

View File

@@ -2,7 +2,11 @@ import { defineConfig } from "vitest/config";
export default defineConfig({
test: {
include: ["src/**/*.test.ts", "test/format-error.test.ts"],
include: [
"src/**/*.test.ts",
"extensions/**/*.test.ts",
"test/format-error.test.ts",
],
setupFiles: ["test/setup.ts"],
exclude: [
"dist/**",