init
This commit is contained in:
4
reelforge/capabilities/__init__.py
Normal file
4
reelforge/capabilities/__init__.py
Normal file
@@ -0,0 +1,4 @@
|
||||
"""
|
||||
Built-in capabilities for ReelForge
|
||||
"""
|
||||
|
||||
173
reelforge/capabilities/book_fetcher.py
Normal file
173
reelforge/capabilities/book_fetcher.py
Normal file
@@ -0,0 +1,173 @@
|
||||
"""
|
||||
Book Fetcher Capabilities
|
||||
|
||||
Fetch book information from various sources:
|
||||
- Google Books API (free, stable, English/Chinese books)
|
||||
- Douban (framework, requires custom implementation)
|
||||
|
||||
Convention: Tool names must be book_fetcher_{id}
|
||||
|
||||
Note: For LLM-based book info generation, use services/book_fetcher.py
|
||||
which combines LLM capability with book fetcher logic.
|
||||
"""
|
||||
|
||||
import json
|
||||
from typing import Optional
|
||||
|
||||
import httpx
|
||||
from loguru import logger
|
||||
from pydantic import Field
|
||||
|
||||
from reelforge.core.mcp_server import reelforge_mcp
|
||||
|
||||
|
||||
@reelforge_mcp.tool(
|
||||
description="Fetch book information from Google Books API",
|
||||
meta={
|
||||
"reelforge": {
|
||||
"display_name": "Google Books",
|
||||
"description": "Fetch book info from Google Books (English/Chinese books)",
|
||||
"is_default": True,
|
||||
}
|
||||
},
|
||||
)
|
||||
async def book_fetcher_google(
|
||||
book_name: str = Field(description="Book name"),
|
||||
author: Optional[str] = Field(default=None, description="Author name (optional)"),
|
||||
) -> str:
|
||||
"""
|
||||
Fetch book information from Google Books API
|
||||
|
||||
Free API, no key required. Works for both English and Chinese books.
|
||||
|
||||
Args:
|
||||
book_name: Book name
|
||||
author: Author name (optional, for better search results)
|
||||
|
||||
Returns:
|
||||
JSON string with book information:
|
||||
{
|
||||
"title": "Book title",
|
||||
"author": "Author name",
|
||||
"summary": "Book summary",
|
||||
"genre": "Category",
|
||||
"publication_year": "2018",
|
||||
"cover_url": "https://...",
|
||||
"isbn": "9781234567890"
|
||||
}
|
||||
|
||||
Example:
|
||||
>>> info = await book_fetcher_google("Atomic Habits")
|
||||
>>> book = json.loads(info)
|
||||
>>> print(book['title'])
|
||||
Atomic Habits
|
||||
"""
|
||||
logger.info(f"Fetching book info from Google Books: {book_name}")
|
||||
|
||||
try:
|
||||
# Build search query
|
||||
query = book_name
|
||||
if author:
|
||||
query += f"+inauthor:{author}"
|
||||
|
||||
# Call Google Books API
|
||||
async with httpx.AsyncClient() as client:
|
||||
url = "https://www.googleapis.com/books/v1/volumes"
|
||||
params = {
|
||||
"q": query,
|
||||
"maxResults": 1,
|
||||
"langRestrict": "zh-CN,en", # Chinese and English
|
||||
}
|
||||
|
||||
response = await client.get(url, params=params, timeout=10.0)
|
||||
response.raise_for_status()
|
||||
data = response.json()
|
||||
|
||||
if "items" not in data or len(data["items"]) == 0:
|
||||
logger.warning(f"No results found for: {book_name}")
|
||||
raise ValueError(f"No results found for book: {book_name}")
|
||||
|
||||
# Parse first result
|
||||
item = data["items"][0]
|
||||
volume_info = item.get("volumeInfo", {})
|
||||
|
||||
book_info = {
|
||||
"title": volume_info.get("title", book_name),
|
||||
"author": ", ".join(volume_info.get("authors", [author or "Unknown"])),
|
||||
"summary": volume_info.get("description", "No description available"),
|
||||
"genre": ", ".join(volume_info.get("categories", ["Uncategorized"])),
|
||||
"publication_year": volume_info.get("publishedDate", "")[:4] if volume_info.get("publishedDate") else "",
|
||||
"cover_url": volume_info.get("imageLinks", {}).get("thumbnail", ""),
|
||||
"isbn": next(
|
||||
(id_info["identifier"] for id_info in volume_info.get("industryIdentifiers", [])
|
||||
if id_info["type"] in ["ISBN_13", "ISBN_10"]),
|
||||
""
|
||||
),
|
||||
}
|
||||
|
||||
logger.info(f"✅ Successfully fetched from Google Books: {book_info['title']}")
|
||||
return json.dumps(book_info, ensure_ascii=False, indent=2)
|
||||
|
||||
except httpx.HTTPError as e:
|
||||
logger.error(f"HTTP error fetching from Google Books: {e}")
|
||||
raise
|
||||
except Exception as e:
|
||||
logger.error(f"Error fetching from Google Books: {e}")
|
||||
raise
|
||||
|
||||
|
||||
@reelforge_mcp.tool(
|
||||
description="Fetch book information from Douban (requires custom implementation)",
|
||||
meta={
|
||||
"reelforge": {
|
||||
"display_name": "豆瓣读书 (Douban)",
|
||||
"description": "Fetch book info from Douban (best for Chinese books) - requires custom implementation",
|
||||
"is_default": False,
|
||||
}
|
||||
},
|
||||
)
|
||||
async def book_fetcher_douban(
|
||||
book_name: str = Field(description="Book name"),
|
||||
author: Optional[str] = Field(default=None, description="Author name (optional)"),
|
||||
) -> str:
|
||||
"""
|
||||
Fetch book information from Douban
|
||||
|
||||
NOTE: Douban official API is closed. This is a framework for custom implementation.
|
||||
|
||||
You can implement this using:
|
||||
1. Third-party Douban API services
|
||||
2. Web scraping (be careful with rate limits)
|
||||
3. Cached database
|
||||
|
||||
Args:
|
||||
book_name: Book name
|
||||
author: Author name (optional)
|
||||
|
||||
Returns:
|
||||
JSON string with book information
|
||||
|
||||
Example implementation:
|
||||
```python
|
||||
# Option 1: Use third-party API
|
||||
async with httpx.AsyncClient() as client:
|
||||
url = "https://your-douban-api-service.com/search"
|
||||
params = {"q": book_name}
|
||||
response = await client.get(url, params=params)
|
||||
data = response.json()
|
||||
return json.dumps(data, ensure_ascii=False)
|
||||
|
||||
# Option 2: Web scraping
|
||||
# Use BeautifulSoup + httpx to scrape Douban pages
|
||||
|
||||
# Option 3: Pre-built database
|
||||
# Query your own book database
|
||||
```
|
||||
"""
|
||||
logger.error("book_fetcher_douban is not implemented")
|
||||
logger.info("To implement: Edit reelforge/capabilities/book_fetcher.py and add your logic")
|
||||
raise NotImplementedError(
|
||||
"book_fetcher_douban requires custom implementation. "
|
||||
"Please edit reelforge/capabilities/book_fetcher.py to add your Douban API integration."
|
||||
)
|
||||
|
||||
141
reelforge/capabilities/image.py
Normal file
141
reelforge/capabilities/image.py
Normal file
@@ -0,0 +1,141 @@
|
||||
"""
|
||||
Image Generation Capabilities using ComfyKit
|
||||
|
||||
ComfyKit provides unified access to ComfyUI workflows:
|
||||
- Local ComfyUI execution
|
||||
- RunningHub cloud execution
|
||||
- Flexible workflow-based generation
|
||||
- Structured result handling
|
||||
|
||||
Convention: Tool names must be image_{id}
|
||||
"""
|
||||
|
||||
import os
|
||||
from typing import Any
|
||||
|
||||
from comfykit import ComfyKit
|
||||
from loguru import logger
|
||||
from pydantic import Field
|
||||
|
||||
from reelforge.core.mcp_server import reelforge_mcp
|
||||
|
||||
|
||||
@reelforge_mcp.tool(
|
||||
description="Generate images using ComfyKit (local ComfyUI or RunningHub cloud)",
|
||||
meta={
|
||||
"reelforge": {
|
||||
"display_name": "ComfyKit Image Generator",
|
||||
"description": "基于 ComfyKit 的图像生成 - 支持本地和云端",
|
||||
"is_default": True,
|
||||
}
|
||||
},
|
||||
)
|
||||
async def image_comfykit(
|
||||
workflow: str = Field(description="Workflow path, ID, or URL"),
|
||||
comfyui_url: str | None = Field(default=None, description="ComfyUI server URL (default: http://127.0.0.1:8188)"),
|
||||
runninghub_api_key: str | None = Field(default=None, description="RunningHub API key (for cloud execution)"),
|
||||
# Common workflow parameters
|
||||
prompt: str | None = Field(default=None, description="Image generation prompt"),
|
||||
width: int | None = Field(default=None, description="Image width"),
|
||||
height: int | None = Field(default=None, description="Image height"),
|
||||
negative_prompt: str | None = Field(default=None, description="Negative prompt"),
|
||||
steps: int | None = Field(default=None, description="Sampling steps"),
|
||||
seed: int | None = Field(default=None, description="Random seed"),
|
||||
cfg: float | None = Field(default=None, description="CFG scale"),
|
||||
sampler: str | None = Field(default=None, description="Sampler name"),
|
||||
) -> str:
|
||||
"""
|
||||
Generate image using ComfyKit
|
||||
|
||||
Supports both local ComfyUI and RunningHub cloud execution.
|
||||
Returns the first generated image URL.
|
||||
|
||||
Environment variables (optional):
|
||||
- COMFYUI_BASE_URL: ComfyUI server URL
|
||||
- RUNNINGHUB_API_KEY: RunningHub API key
|
||||
|
||||
Example:
|
||||
# Local ComfyUI (default)
|
||||
image_url = await image_comfykit(
|
||||
workflow="workflows/book_cover.json",
|
||||
title="Atomic Habits",
|
||||
author="James Clear"
|
||||
)
|
||||
|
||||
# RunningHub cloud
|
||||
image_url = await image_comfykit(
|
||||
workflow="12345", # RunningHub workflow ID
|
||||
runninghub_api_key="rh-key-xxx",
|
||||
prompt="a beautiful landscape"
|
||||
)
|
||||
|
||||
# Custom ComfyUI server
|
||||
image_url = await image_comfykit(
|
||||
workflow="workflows/text2img.json",
|
||||
comfyui_url="http://192.168.1.100:8188",
|
||||
prompt="a cute cat"
|
||||
)
|
||||
"""
|
||||
logger.debug(f"Generating image with workflow: {workflow}")
|
||||
|
||||
try:
|
||||
# Initialize ComfyKit
|
||||
kit_config = {}
|
||||
|
||||
# Local ComfyUI configuration
|
||||
if comfyui_url:
|
||||
kit_config["comfyui_url"] = comfyui_url
|
||||
elif os.getenv("COMFYUI_BASE_URL"):
|
||||
kit_config["comfyui_url"] = os.getenv("COMFYUI_BASE_URL")
|
||||
|
||||
# RunningHub cloud configuration
|
||||
if runninghub_api_key:
|
||||
kit_config["runninghub_api_key"] = runninghub_api_key
|
||||
elif os.getenv("RUNNINGHUB_API_KEY"):
|
||||
kit_config["runninghub_api_key"] = os.getenv("RUNNINGHUB_API_KEY")
|
||||
|
||||
kit = ComfyKit(**kit_config)
|
||||
|
||||
# Build workflow parameters
|
||||
workflow_params = {}
|
||||
if prompt is not None:
|
||||
workflow_params["prompt"] = prompt
|
||||
if width is not None:
|
||||
workflow_params["width"] = width
|
||||
if height is not None:
|
||||
workflow_params["height"] = height
|
||||
if negative_prompt is not None:
|
||||
workflow_params["negative_prompt"] = negative_prompt
|
||||
if steps is not None:
|
||||
workflow_params["steps"] = steps
|
||||
if seed is not None:
|
||||
workflow_params["seed"] = seed
|
||||
if cfg is not None:
|
||||
workflow_params["cfg"] = cfg
|
||||
if sampler is not None:
|
||||
workflow_params["sampler"] = sampler
|
||||
|
||||
logger.debug(f"Workflow parameters: {workflow_params}")
|
||||
|
||||
# Execute workflow
|
||||
result = await kit.execute(workflow, workflow_params)
|
||||
|
||||
# Check execution status
|
||||
if result.status != "completed":
|
||||
error_msg = result.msg or "Unknown error"
|
||||
logger.error(f"Image generation failed: {error_msg}")
|
||||
raise Exception(f"Image generation failed: {error_msg}")
|
||||
|
||||
# Return first image URL
|
||||
if result.images:
|
||||
image_url = result.images[0]
|
||||
logger.info(f"Generated image: {image_url}")
|
||||
return image_url
|
||||
else:
|
||||
logger.error("No images generated")
|
||||
raise Exception("No images generated")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"ComfyKit error: {e}")
|
||||
raise
|
||||
|
||||
111
reelforge/capabilities/llm.py
Normal file
111
reelforge/capabilities/llm.py
Normal file
@@ -0,0 +1,111 @@
|
||||
"""
|
||||
LLM Capabilities using OpenAI SDK
|
||||
|
||||
All LLM providers that follow OpenAI SDK protocol are supported:
|
||||
- OpenAI (gpt-4o, gpt-4o-mini, gpt-3.5-turbo)
|
||||
- Alibaba Qwen (qwen-max, qwen-plus, qwen-turbo)
|
||||
- Anthropic Claude (claude-sonnet-4-5, claude-opus-4, claude-haiku-4)
|
||||
- DeepSeek (deepseek-chat)
|
||||
- Moonshot Kimi (moonshot-v1-8k, moonshot-v1-32k, moonshot-v1-128k)
|
||||
- Ollama (llama3.2, qwen2.5, mistral, codellama) - FREE & LOCAL!
|
||||
- Any custom provider with OpenAI-compatible API
|
||||
|
||||
Convention: Unified llm_call tool for all providers
|
||||
"""
|
||||
|
||||
from openai import AsyncOpenAI
|
||||
from loguru import logger
|
||||
from pydantic import Field
|
||||
|
||||
from reelforge.core.mcp_server import reelforge_mcp
|
||||
|
||||
|
||||
@reelforge_mcp.tool(
|
||||
description="Generate text using any OpenAI SDK compatible LLM",
|
||||
meta={
|
||||
"reelforge": {
|
||||
"display_name": "LLM (OpenAI SDK)",
|
||||
"description": "Unified interface for all OpenAI SDK compatible LLMs",
|
||||
"is_default": True,
|
||||
}
|
||||
},
|
||||
)
|
||||
async def llm_call(
|
||||
prompt: str = Field(description="The prompt to generate from"),
|
||||
api_key: str = Field(description="API key for the LLM provider"),
|
||||
base_url: str = Field(description="Base URL for the LLM API"),
|
||||
model: str = Field(description="Model name to use"),
|
||||
temperature: float = Field(default=0.7, description="Sampling temperature"),
|
||||
max_tokens: int = Field(default=2000, description="Maximum tokens to generate"),
|
||||
) -> str:
|
||||
"""
|
||||
Generate text using any OpenAI SDK compatible LLM
|
||||
|
||||
This is a unified interface that works with any LLM provider
|
||||
following the OpenAI SDK protocol.
|
||||
|
||||
Example:
|
||||
# OpenAI
|
||||
result = await llm_call(
|
||||
prompt="Explain quantum physics",
|
||||
api_key="sk-xxx",
|
||||
base_url="https://api.openai.com/v1",
|
||||
model="gpt-4o"
|
||||
)
|
||||
|
||||
# Qwen
|
||||
result = await llm_call(
|
||||
prompt="解释量子物理",
|
||||
api_key="sk-xxx",
|
||||
base_url="https://dashscope.aliyuncs.com/compatible-mode/v1",
|
||||
model="qwen-max"
|
||||
)
|
||||
|
||||
# Anthropic Claude
|
||||
result = await llm_call(
|
||||
prompt="Explain reasoning step by step",
|
||||
api_key="sk-ant-xxx",
|
||||
base_url="https://api.anthropic.com/v1/",
|
||||
model="claude-sonnet-4-5"
|
||||
)
|
||||
|
||||
# DeepSeek
|
||||
result = await llm_call(
|
||||
prompt="Explain AI",
|
||||
api_key="sk-xxx",
|
||||
base_url="https://api.deepseek.com",
|
||||
model="deepseek-chat"
|
||||
)
|
||||
|
||||
# Ollama (Local - FREE & PRIVATE!)
|
||||
result = await llm_call(
|
||||
prompt="Write a Python function to sort a list",
|
||||
api_key="ollama", # Required but unused
|
||||
base_url="http://localhost:11434/v1",
|
||||
model="llama3.2"
|
||||
)
|
||||
"""
|
||||
logger.debug(f"LLM call: model={model}, base_url={base_url}")
|
||||
|
||||
try:
|
||||
client = AsyncOpenAI(
|
||||
api_key=api_key,
|
||||
base_url=base_url,
|
||||
)
|
||||
|
||||
response = await client.chat.completions.create(
|
||||
model=model,
|
||||
messages=[{"role": "user", "content": prompt}],
|
||||
temperature=temperature,
|
||||
max_tokens=max_tokens,
|
||||
)
|
||||
|
||||
result = response.choices[0].message.content
|
||||
logger.debug(f"LLM response length: {len(result)} chars")
|
||||
|
||||
return result
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"LLM call error (model={model}, base_url={base_url}): {e}")
|
||||
raise
|
||||
|
||||
263
reelforge/capabilities/tts.py
Normal file
263
reelforge/capabilities/tts.py
Normal file
@@ -0,0 +1,263 @@
|
||||
"""
|
||||
TTS Capabilities using edge-tts
|
||||
|
||||
edge-tts provides free access to Microsoft Edge's online text-to-speech service:
|
||||
- 400+ voices across 100+ languages
|
||||
- Natural sounding speech
|
||||
- No API key required
|
||||
- Free to use
|
||||
|
||||
Convention: Tool names must be tts_{id}
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import base64
|
||||
import ssl
|
||||
import certifi
|
||||
import edge_tts
|
||||
from loguru import logger
|
||||
from pydantic import Field
|
||||
from aiohttp import WSServerHandshakeError, ClientResponseError
|
||||
|
||||
from reelforge.core.mcp_server import reelforge_mcp
|
||||
|
||||
# Global flag for SSL verification (set to False for development only)
|
||||
_SSL_VERIFY_ENABLED = False
|
||||
|
||||
# Retry configuration for Edge TTS (to handle 401 errors)
|
||||
_RETRY_COUNT = 3 # Default retry count
|
||||
_RETRY_DELAY = 2.0 # Retry delay in seconds
|
||||
|
||||
|
||||
@reelforge_mcp.tool(
|
||||
description="Convert text to speech using Microsoft Edge TTS - free and natural sounding",
|
||||
meta={
|
||||
"reelforge": {
|
||||
"display_name": "Edge TTS",
|
||||
"description": "Microsoft Edge Text-to-Speech - 免费且音质自然",
|
||||
"is_default": True,
|
||||
}
|
||||
},
|
||||
)
|
||||
async def tts_edge(
|
||||
text: str = Field(description="Text to convert to speech"),
|
||||
voice: str = Field(default="zh-CN-YunjianNeural", description="Voice ID (e.g., zh-CN-YunjianNeural, en-US-JennyNeural)"),
|
||||
rate: str = Field(default="+0%", description="Speech rate (e.g., +0%, +50%, -20%)"),
|
||||
volume: str = Field(default="+0%", description="Speech volume (e.g., +0%, +50%, -20%)"),
|
||||
pitch: str = Field(default="+0Hz", description="Speech pitch (e.g., +0Hz, +10Hz, -5Hz)"),
|
||||
retry_count: int = Field(default=_RETRY_COUNT, description="Number of retries on failure (default: 3)"),
|
||||
retry_delay: float = Field(default=_RETRY_DELAY, description="Delay between retries in seconds (default: 2.0)"),
|
||||
) -> str:
|
||||
"""
|
||||
Convert text to speech using Microsoft Edge TTS
|
||||
|
||||
This service is free and requires no API key.
|
||||
Supports 400+ voices across 100+ languages.
|
||||
|
||||
Returns audio data as base64-encoded string (MP3 format).
|
||||
|
||||
Includes automatic retry mechanism to handle 401 authentication errors
|
||||
and temporary network issues (default: 3 retries with 2s delay).
|
||||
|
||||
Popular Chinese voices:
|
||||
- zh-CN-YunjianNeural (male, default)
|
||||
- zh-CN-XiaoxiaoNeural (female)
|
||||
- zh-CN-YunxiNeural (male)
|
||||
- zh-CN-XiaoyiNeural (female)
|
||||
|
||||
Popular English voices:
|
||||
- en-US-JennyNeural (female)
|
||||
- en-US-GuyNeural (male)
|
||||
- en-GB-SoniaNeural (female, British)
|
||||
|
||||
Example:
|
||||
audio_base64 = await tts_edge(
|
||||
text="你好,世界!",
|
||||
voice="zh-CN-YunjianNeural",
|
||||
rate="+20%"
|
||||
)
|
||||
# Decode: audio_bytes = base64.b64decode(audio_base64)
|
||||
"""
|
||||
logger.debug(f"Calling Edge TTS with voice: {voice}, rate: {rate}, retry_count: {retry_count}")
|
||||
|
||||
last_error = None
|
||||
|
||||
# Retry loop
|
||||
for attempt in range(retry_count + 1): # +1 because first attempt is not a retry
|
||||
try:
|
||||
if attempt > 0:
|
||||
logger.info(f"🔄 Retrying Edge TTS (attempt {attempt + 1}/{retry_count + 1}) after {retry_delay}s delay...")
|
||||
await asyncio.sleep(retry_delay)
|
||||
|
||||
# Monkey patch ssl.create_default_context if SSL verification is disabled
|
||||
if not _SSL_VERIFY_ENABLED:
|
||||
if attempt == 0: # Only log warning once
|
||||
logger.warning("SSL verification is disabled for development. This is NOT recommended for production!")
|
||||
original_create_default_context = ssl.create_default_context
|
||||
|
||||
def create_unverified_context(*args, **kwargs):
|
||||
ctx = original_create_default_context(*args, **kwargs)
|
||||
ctx.check_hostname = False
|
||||
ctx.verify_mode = ssl.CERT_NONE
|
||||
return ctx
|
||||
|
||||
# Temporarily replace the function
|
||||
ssl.create_default_context = create_unverified_context
|
||||
|
||||
try:
|
||||
# Create communicate instance
|
||||
communicate = edge_tts.Communicate(
|
||||
text=text,
|
||||
voice=voice,
|
||||
rate=rate,
|
||||
volume=volume,
|
||||
pitch=pitch,
|
||||
)
|
||||
|
||||
# Collect audio chunks
|
||||
audio_chunks = []
|
||||
async for chunk in communicate.stream():
|
||||
if chunk["type"] == "audio":
|
||||
audio_chunks.append(chunk["data"])
|
||||
|
||||
audio_data = b"".join(audio_chunks)
|
||||
|
||||
if attempt > 0:
|
||||
logger.success(f"✅ Retry succeeded on attempt {attempt + 1}")
|
||||
|
||||
logger.info(f"Generated {len(audio_data)} bytes of audio data")
|
||||
|
||||
# Encode as base64 for JSON serialization
|
||||
audio_base64 = base64.b64encode(audio_data).decode('utf-8')
|
||||
return audio_base64
|
||||
|
||||
finally:
|
||||
# Restore original function if we patched it
|
||||
if not _SSL_VERIFY_ENABLED:
|
||||
ssl.create_default_context = original_create_default_context
|
||||
|
||||
except (WSServerHandshakeError, ClientResponseError) as e:
|
||||
# Network/authentication errors - retry
|
||||
last_error = e
|
||||
error_code = getattr(e, 'status', 'unknown')
|
||||
logger.warning(f"⚠️ Edge TTS error (attempt {attempt + 1}/{retry_count + 1}): {error_code} - {e}")
|
||||
|
||||
if attempt >= retry_count:
|
||||
# Last attempt failed
|
||||
logger.error(f"❌ All {retry_count + 1} attempts failed. Giving up.")
|
||||
raise
|
||||
# Otherwise, continue to next retry
|
||||
|
||||
except Exception as e:
|
||||
# Other errors - don't retry, raise immediately
|
||||
logger.error(f"Edge TTS error (non-retryable): {e}")
|
||||
raise
|
||||
|
||||
# Should not reach here, but just in case
|
||||
if last_error:
|
||||
raise last_error
|
||||
else:
|
||||
raise RuntimeError("Edge TTS failed without error (unexpected)")
|
||||
|
||||
|
||||
@reelforge_mcp.tool(
|
||||
description="List all available voices for Edge TTS",
|
||||
meta={
|
||||
"reelforge": {
|
||||
"display_name": "List Edge TTS Voices",
|
||||
"description": "获取所有可用的Edge TTS音色列表",
|
||||
"is_default": False,
|
||||
}
|
||||
},
|
||||
)
|
||||
async def tts_edge_list_voices(
|
||||
locale: str | None = Field(default=None, description="Filter by locale (e.g., zh-CN, en-US, ja-JP)"),
|
||||
retry_count: int = Field(default=_RETRY_COUNT, description="Number of retries on failure (default: 3)"),
|
||||
retry_delay: float = Field(default=_RETRY_DELAY, description="Delay between retries in seconds (default: 2.0)"),
|
||||
) -> list[str]:
|
||||
"""
|
||||
List all available voices for Edge TTS
|
||||
|
||||
Returns a list of voice IDs (ShortName).
|
||||
Optionally filter by locale.
|
||||
|
||||
Includes automatic retry mechanism to handle network errors
|
||||
(default: 3 retries with 2s delay).
|
||||
|
||||
Example:
|
||||
# List all voices
|
||||
voices = await tts_edge_list_voices()
|
||||
# Returns: ['zh-CN-YunjianNeural', 'zh-CN-XiaoxiaoNeural', ...]
|
||||
|
||||
# List Chinese voices only
|
||||
voices = await tts_edge_list_voices(locale="zh-CN")
|
||||
# Returns: ['zh-CN-YunjianNeural', 'zh-CN-XiaoxiaoNeural', ...]
|
||||
"""
|
||||
logger.debug(f"Fetching Edge TTS voices, locale filter: {locale}, retry_count: {retry_count}")
|
||||
|
||||
last_error = None
|
||||
|
||||
# Retry loop
|
||||
for attempt in range(retry_count + 1):
|
||||
try:
|
||||
if attempt > 0:
|
||||
logger.info(f"🔄 Retrying list voices (attempt {attempt + 1}/{retry_count + 1}) after {retry_delay}s delay...")
|
||||
await asyncio.sleep(retry_delay)
|
||||
|
||||
# Monkey patch SSL if verification is disabled
|
||||
if not _SSL_VERIFY_ENABLED:
|
||||
if attempt == 0: # Only log warning once
|
||||
logger.warning("SSL verification is disabled for development. This is NOT recommended for production!")
|
||||
original_create_default_context = ssl.create_default_context
|
||||
|
||||
def create_unverified_context(*args, **kwargs):
|
||||
ctx = original_create_default_context(*args, **kwargs)
|
||||
ctx.check_hostname = False
|
||||
ctx.verify_mode = ssl.CERT_NONE
|
||||
return ctx
|
||||
|
||||
ssl.create_default_context = create_unverified_context
|
||||
|
||||
try:
|
||||
# Get all voices
|
||||
voices = await edge_tts.list_voices()
|
||||
|
||||
# Filter by locale if specified
|
||||
if locale:
|
||||
voices = [v for v in voices if v["Locale"].startswith(locale)]
|
||||
|
||||
# Extract voice IDs (ShortName)
|
||||
voice_ids = [voice["ShortName"] for voice in voices]
|
||||
|
||||
if attempt > 0:
|
||||
logger.success(f"✅ Retry succeeded on attempt {attempt + 1}")
|
||||
|
||||
logger.info(f"Found {len(voice_ids)} voices" + (f" for locale '{locale}'" if locale else ""))
|
||||
return voice_ids
|
||||
|
||||
finally:
|
||||
# Restore original function if we patched it
|
||||
if not _SSL_VERIFY_ENABLED:
|
||||
ssl.create_default_context = original_create_default_context
|
||||
|
||||
except (WSServerHandshakeError, ClientResponseError) as e:
|
||||
# Network/authentication errors - retry
|
||||
last_error = e
|
||||
error_code = getattr(e, 'status', 'unknown')
|
||||
logger.warning(f"⚠️ List voices error (attempt {attempt + 1}/{retry_count + 1}): {error_code} - {e}")
|
||||
|
||||
if attempt >= retry_count:
|
||||
logger.error(f"❌ All {retry_count + 1} attempts failed. Giving up.")
|
||||
raise
|
||||
|
||||
except Exception as e:
|
||||
# Other errors - don't retry, raise immediately
|
||||
logger.error(f"List voices error (non-retryable): {e}")
|
||||
raise
|
||||
|
||||
# Should not reach here, but just in case
|
||||
if last_error:
|
||||
raise last_error
|
||||
else:
|
||||
raise RuntimeError("List voices failed without error (unexpected)")
|
||||
|
||||
Reference in New Issue
Block a user