重构capability层

This commit is contained in:
puke
2025-10-27 20:06:27 +08:00
committed by puke
parent c19710d5bd
commit 9937c0fffd
19 changed files with 818 additions and 1160 deletions

View File

@@ -46,8 +46,19 @@ llm:
model: "" # Model name
# ==================== TTS Configuration ====================
# TTS uses Edge TTS by default (free, no configuration needed)
# Optional: Override default voice in code using voice parameter
# TTS supports two modes:
# 1. Edge TTS (default) - Free local SDK, no setup needed
# 2. ComfyUI Workflow - Workflow-based, requires ComfyUI
#
# Configuration (optional):
tts:
default_workflow: "edge" # Default: "edge" (Edge TTS) or "tts_default.json" (ComfyUI workflow)
# comfyui_url: http://127.0.0.1:8188 # Only needed for ComfyUI workflows
# Usage in code:
# await reelforge.tts(text="hello") # Uses default (edge)
# await reelforge.tts(text="hello", workflow="edge") # Explicitly use Edge TTS
# await reelforge.tts(text="hello", workflow="tts_custom.json") # Use ComfyUI workflow
# ==================== Image Generation Configuration ====================
# Image generation uses ComfyUI workflows
@@ -76,8 +87,10 @@ image:
# - WebUI provides quick preset selection
#
# 2. TTS Configuration:
# - No configuration needed, uses Edge TTS by default (free)
# - Override voice in code: await reelforge.tts(text="...", voice="zh-CN-XiaoxiaoNeural")
# - Two modes: Edge TTS (default, free) or ComfyUI Workflow
# - Edge TTS: No setup needed, just use default
# - ComfyUI: Create workflow files in workflows/tts_*.json
# - Override in code: await reelforge.tts(text="...", workflow="edge" or "tts_xxx.json")
#
# 3. Image Generation:
# - Add workflow files: workflows/image_*.json

View File

@@ -1,4 +0,0 @@
"""
Built-in capabilities for ReelForge
"""

View File

@@ -1,111 +0,0 @@
"""
LLM Capabilities using OpenAI SDK
All LLM providers that follow OpenAI SDK protocol are supported:
- OpenAI (gpt-4o, gpt-4o-mini, gpt-3.5-turbo)
- Alibaba Qwen (qwen-max, qwen-plus, qwen-turbo)
- Anthropic Claude (claude-sonnet-4-5, claude-opus-4, claude-haiku-4)
- DeepSeek (deepseek-chat)
- Moonshot Kimi (moonshot-v1-8k, moonshot-v1-32k, moonshot-v1-128k)
- Ollama (llama3.2, qwen2.5, mistral, codellama) - FREE & LOCAL!
- Any custom provider with OpenAI-compatible API
Convention: Unified llm_call tool for all providers
"""
from openai import AsyncOpenAI
from loguru import logger
from pydantic import Field
from reelforge.core.mcp_server import reelforge_mcp
@reelforge_mcp.tool(
description="Generate text using any OpenAI SDK compatible LLM",
meta={
"reelforge": {
"display_name": "LLM (OpenAI SDK)",
"description": "Unified interface for all OpenAI SDK compatible LLMs",
"is_default": True,
}
},
)
async def llm_call(
prompt: str = Field(description="The prompt to generate from"),
api_key: str = Field(description="API key for the LLM provider"),
base_url: str = Field(description="Base URL for the LLM API"),
model: str = Field(description="Model name to use"),
temperature: float = Field(default=0.7, description="Sampling temperature"),
max_tokens: int = Field(default=2000, description="Maximum tokens to generate"),
) -> str:
"""
Generate text using any OpenAI SDK compatible LLM
This is a unified interface that works with any LLM provider
following the OpenAI SDK protocol.
Example:
# OpenAI
result = await llm_call(
prompt="Explain quantum physics",
api_key="sk-xxx",
base_url="https://api.openai.com/v1",
model="gpt-4o"
)
# Qwen
result = await llm_call(
prompt="解释量子物理",
api_key="sk-xxx",
base_url="https://dashscope.aliyuncs.com/compatible-mode/v1",
model="qwen-max"
)
# Anthropic Claude
result = await llm_call(
prompt="Explain reasoning step by step",
api_key="sk-ant-xxx",
base_url="https://api.anthropic.com/v1/",
model="claude-sonnet-4-5"
)
# DeepSeek
result = await llm_call(
prompt="Explain AI",
api_key="sk-xxx",
base_url="https://api.deepseek.com",
model="deepseek-chat"
)
# Ollama (Local - FREE & PRIVATE!)
result = await llm_call(
prompt="Write a Python function to sort a list",
api_key="ollama", # Required but unused
base_url="http://localhost:11434/v1",
model="llama3.2"
)
"""
logger.debug(f"LLM call: model={model}, base_url={base_url}")
try:
client = AsyncOpenAI(
api_key=api_key,
base_url=base_url,
)
response = await client.chat.completions.create(
model=model,
messages=[{"role": "user", "content": prompt}],
temperature=temperature,
max_tokens=max_tokens,
)
result = response.choices[0].message.content
logger.debug(f"LLM response length: {len(result)} chars")
return result
except Exception as e:
logger.error(f"LLM call error (model={model}, base_url={base_url}): {e}")
raise

View File

@@ -1,4 +0,0 @@
"""
Core capability system for ReelForge
"""

View File

@@ -1,173 +0,0 @@
"""
Configuration Manager
Manages capability configuration and provides unified access to MCP tools.
Simplified from Router - only handles config injection, no external routing.
"""
from typing import Any
from fastmcp import Client
from loguru import logger
from reelforge.core.conventions import CapabilityInfo
class ConfigManager:
"""
Configuration manager for capabilities
Core responsibilities:
1. Manage active capability selection
2. Inject configuration into capability calls
3. Provide unified MCP client interface
No external MCP routing - all capabilities are builtin.
"""
def __init__(self, registry, config: dict):
"""
Initialize config manager
Args:
registry: CapabilityRegistry instance with registered capabilities
config: Application configuration dict
"""
self.registry = registry
self.config = config
self._active: dict[str, str] = {} # type -> id
# Create in-memory MCP client for calling builtin capabilities
self._local_client = Client(registry.local_mcp)
self._load_active_from_config()
def _load_active_from_config(self):
"""Load active capability selections from config"""
for cap_type in self.registry.capabilities.keys():
# Read active from flat structure: config[type]["default"]
cap_section = self.config.get(cap_type, {})
if isinstance(cap_section, dict):
configured_id = cap_section.get("default")
if configured_id and configured_id in self.registry.capabilities[cap_type]:
self._active[cap_type] = configured_id
continue
# Otherwise, auto-select default
self._auto_select_default(cap_type)
def _auto_select_default(self, cap_type: str):
"""Auto-select default capability"""
capabilities = self.registry.capabilities[cap_type]
# Find default
for cap_id, cap_info in capabilities.items():
if cap_info.is_default:
self._active[cap_type] = cap_id
logger.info(f"✓ Auto-selected default {cap_type}: {cap_id}")
return
# No default, use first
if capabilities:
first_id = next(iter(capabilities.keys()))
self._active[cap_type] = first_id
logger.info(f"✓ Auto-selected first {cap_type}: {first_id}")
def get_active(self, cap_type: str) -> str | None:
"""Get active capability ID for a type"""
return self._active.get(cap_type)
def get_available_ids(self, cap_type: str) -> list[str]:
"""Get available capability IDs for a type"""
return list(self.registry.capabilities.get(cap_type, {}).keys())
async def call(self, cap_type: str, cap_id: str | None = None, **kwargs) -> Any:
"""
Call a capability with config injection
Args:
cap_type: Capability type (e.g., "llm", "tts")
cap_id: Specific capability ID (defaults to active)
**kwargs: Arguments for the capability
Returns:
Result from the capability
"""
# Determine which capability to use
if cap_id is None:
cap_id = self.get_active(cap_type)
if not cap_id:
raise ValueError(f"No active capability for type: {cap_type}")
# Get capability info
cap_info = self.registry.capabilities[cap_type][cap_id]
logger.debug(f"Calling {cap_info.full_id} ({cap_info.display_label})")
# Prepare tool arguments with config injection
tool_arguments = self._inject_config(cap_info, kwargs)
# Call tool via MCP protocol using in-memory client
async with self._local_client:
result = await self._local_client.call_tool(
name=cap_info.tool_name,
arguments=tool_arguments
)
# Extract content from MCP result
return self._extract_content(result)
def _inject_config(self, cap_info: CapabilityInfo, kwargs: dict) -> dict:
"""
Inject configuration into tool arguments
This is the core value of ConfigManager - it handles config injection
so individual capabilities don't need to know about config.yaml
"""
tool_arguments = kwargs.copy()
# Handle LLM-specific configuration (direct top-level config)
if cap_info.type == "llm":
llm_config = self.config.get("llm", {})
# Add LLM credentials and settings (if not already provided)
tool_arguments.setdefault("api_key", llm_config.get("api_key", ""))
tool_arguments.setdefault("base_url", llm_config.get("base_url", ""))
tool_arguments.setdefault("model", llm_config.get("model", ""))
logger.debug(f"LLM using: {tool_arguments['model']} @ {tool_arguments['base_url']}")
# Handle other capability types (image, tts, etc.)
else:
# Read capability-specific config from flat structure: config[type][id]
cap_section = self.config.get(cap_info.type, {})
if isinstance(cap_section, dict):
cap_config = cap_section.get(cap_info.id, {})
if isinstance(cap_config, dict):
# Merge config (don't override kwargs)
for key, value in cap_config.items():
tool_arguments.setdefault(key, value)
return tool_arguments
def _extract_content(self, result: Any) -> Any:
"""
Extract content from MCP result
MCP returns a CallToolResult with content field
"""
if hasattr(result, 'content'):
# Handle different content types
if isinstance(result.content, list):
# Multiple content items, join them
return '\n'.join(
item.text if hasattr(item, 'text') else str(item)
for item in result.content
)
else:
return result.content
# Fallback: return the result as-is
return result

View File

@@ -1,115 +0,0 @@
"""
ReelForge Tool Naming Convention
================================
All capability tools MUST follow this naming pattern:
{type}_{id}
Where:
- type: MUST be one of the known capability types (llm, tts, image, etc.)
- id: Unique identifier for this specific capability
Parsing strategy (Fail Fast):
- Match against known capability types only
- If no match, return None (fail early to expose configuration errors)
- No guessing or fallback - explicit is better than implicit
Examples:
✅ llm_call → type: llm, id: call
✅ tts_edge → type: tts, id: edge
✅ image_comfykit → type: image, id: comfykit
❌ call_llm → Wrong order
❌ llm-call → Use underscore, not dash
❌ LLM_call → Use lowercase
"""
from typing import Any, Optional
from pydantic import BaseModel, Field
# Known capability types
CAPABILITY_TYPES = {
"llm",
"tts",
"image",
}
def parse_tool_name(tool_name: str) -> Optional[tuple[str, str]]:
"""
Parse tool name into (type, id) - Fail Fast approach
Only accepts tool names that match known capability types.
Returns None for unknown types to fail early and expose configuration errors.
Args:
tool_name: Tool name following convention (e.g., "llm_qwen", "tts_edge")
Returns:
(type, id) tuple if matches known type, None otherwise
Examples:
>>> parse_tool_name("llm_call")
('llm', 'call')
>>> parse_tool_name("tts_edge")
('tts', 'edge')
>>> parse_tool_name("unknown_type_id")
None # Fail fast - unknown type
"""
# Must contain at least one underscore
if "_" not in tool_name:
return None
# Only match against known capability types (sorted by length, longest first)
for cap_type in sorted(CAPABILITY_TYPES, key=len, reverse=True):
# Match pattern: {known_type}_{id}
if tool_name.startswith(cap_type + "_"):
capability_id = tool_name[len(cap_type) + 1:] # +1 for underscore
if capability_id: # Must have a non-empty id
return cap_type, capability_id
# No match found - return None to fail early
return None
class CapabilityInfo(BaseModel):
"""
Capability information
Required: type and id (parsed from tool_name)
Optional: everything else (from meta.reelforge)
"""
# Required (parsed from tool_name)
type: str = Field(description="Capability type (llm, tts, etc.)")
id: str = Field(description="Unique identifier for this capability")
# Optional (from meta.reelforge)
display_name: Optional[str] = Field(
default=None, description="Human-readable name for UI display"
)
description: Optional[str] = Field(default=None, description="Short description")
is_default: bool = Field(default=False, description="Whether this is the default for this type")
# Tool reference
tool_name: str = Field(description="Original tool name")
# Tool reference
tool: Optional[Any] = Field(default=None, description="Tool object reference", exclude=True)
@property
def display_label(self) -> str:
"""Get display label for UI"""
if self.display_name:
return self.display_name
# Auto-generate from id: "my_custom_v2" → "My Custom V2"
return self.id.replace("_", " ").title()
@property
def full_id(self) -> str:
"""Get full identifier: type/id"""
return f"{self.type}/{self.id}"

View File

@@ -1,119 +0,0 @@
"""
Capability Registry System
Registers built-in capabilities from FastMCP tools.
"""
from typing import Any
from fastmcp import FastMCP
from loguru import logger
from reelforge.core.conventions import CapabilityInfo, parse_tool_name
class CapabilityRegistry:
"""
Built-in capability registry
Registers capabilities from local FastMCP tools
based on tool naming convention.
Simplified from CapabilityDiscovery - no external MCP support.
If you need custom capabilities, add them to reelforge/capabilities/
"""
def __init__(self, local_mcp: FastMCP):
self.local_mcp = local_mcp
self.capabilities: dict[str, dict[str, CapabilityInfo]] = {}
# Structure: {type: {id: CapabilityInfo}}
async def register_all(self):
"""Register all built-in capabilities"""
logger.info("📦 Registering built-in capabilities...")
# Get all tools from FastMCP server (returns dict: {name: FunctionTool})
tools_dict = await self.local_mcp.get_tools()
for tool_name, tool in tools_dict.items():
capability_info = self._parse_capability(tool_name=tool_name, tool=tool)
if capability_info:
self._register_capability(capability_info)
self._print_summary()
def _parse_capability(
self, tool_name: str, tool: Any
) -> CapabilityInfo | None:
"""
Parse a tool into CapabilityInfo
Key logic: Parse type and id from tool_name!
"""
# Parse tool name following convention
parsed = parse_tool_name(tool_name)
if not parsed:
# Not a capability tool (doesn't follow convention)
return None
capability_type, capability_id = parsed
# Extract optional metadata from meta.reelforge
reelforge_meta = {}
if hasattr(tool, "meta") and tool.meta:
reelforge_meta = tool.meta.get("reelforge", {})
# Build CapabilityInfo
return CapabilityInfo(
type=capability_type,
id=capability_id,
tool_name=tool_name,
display_name=reelforge_meta.get("display_name"),
description=reelforge_meta.get("description")
or (tool.description if hasattr(tool, "description") else None),
is_default=reelforge_meta.get("is_default", False),
tool=tool,
)
def _register_capability(self, capability_info: CapabilityInfo):
"""Register a capability"""
cap_type = capability_info.type
cap_id = capability_info.id
# Initialize type dict if needed
if cap_type not in self.capabilities:
self.capabilities[cap_type] = {}
# Check for duplicates
if cap_id in self.capabilities[cap_type]:
existing = self.capabilities[cap_type][cap_id]
logger.warning(
f" ⚠️ Duplicate capability: {cap_type}/{cap_id} - keeping first registration"
)
return
# Register
self.capabilities[cap_type][cap_id] = capability_info
# Log
logger.info(f"{cap_type}/{cap_id} ({capability_info.display_label})")
def _print_summary(self):
"""Print registration summary"""
logger.info("\n📊 Registered Capabilities:")
if not self.capabilities:
logger.info(" No capabilities registered")
return
for cap_type in sorted(self.capabilities.keys()):
logger.info(f"\n {cap_type}:")
for cap_id, cap_info in self.capabilities[cap_type].items():
default_marker = " [DEFAULT]" if cap_info.is_default else ""
logger.info(
f"{cap_id} ({cap_info.display_label}){default_marker}"
)

View File

@@ -1,9 +0,0 @@
"""
ReelForge MCP Server
"""
from fastmcp import FastMCP
# Initialize ReelForge MCP server
reelforge_mcp = FastMCP("ReelForge Capabilities 📚")

View File

@@ -1,7 +1,7 @@
"""
ReelForge Core - Capability Layer
ReelForge Core - Service Layer
Provides unified access to all capabilities (LLM, TTS, etc.)
Provides unified access to all capabilities (LLM, TTS, Image, etc.)
"""
from typing import Optional
@@ -9,19 +9,21 @@ from typing import Optional
from loguru import logger
from reelforge.config import load_config
from reelforge.core.discovery import CapabilityRegistry
from reelforge.core.mcp_server import reelforge_mcp
from reelforge.core.config_manager import ConfigManager
from reelforge.services import LLMService, TTSService
from reelforge.services.llm_service import LLMService
from reelforge.services.tts_service import TTSService
from reelforge.services.image import ImageService
from reelforge.services.narration_generator import NarrationGeneratorService
from reelforge.services.image_prompt_generator import ImagePromptGeneratorService
from reelforge.services.frame_composer import FrameComposerService
from reelforge.services.storyboard_processor import StoryboardProcessorService
from reelforge.services.video_generator import VideoGeneratorService
class ReelForgeCore:
"""
ReelForge Core - Capability Layer
ReelForge Core - Service Layer
Provides unified access to all capabilities.
This is the foundation layer, no business logic here.
Usage:
from reelforge import reelforge
@@ -32,18 +34,18 @@ class ReelForgeCore:
# Use capabilities directly
answer = await reelforge.llm("Explain atomic habits")
audio = await reelforge.tts("Hello world")
image = await reelforge.image(prompt="a cat")
# Check active capabilities
print(f"Using LLM: {reelforge.llm.active}")
print(f"Available: {reelforge.llm.available}")
print(f"Available TTS: {reelforge.tts.available}")
Architecture (Simplified):
ReelForgeCore (this class)
├── config (configuration)
├── config_manager (config injection + MCP calls)
├── llm (LLM service)
── tts (TTS service)
└── image (Image service)
├── llm (LLM service - direct OpenAI SDK)
├── tts (TTS service - ComfyKit workflows)
── image (Image service - ComfyKit workflows)
"""
def __init__(self, config_path: str = "config.yaml"):
@@ -54,32 +56,29 @@ class ReelForgeCore:
config_path: Path to configuration file
"""
self.config = load_config(config_path)
self.registry: Optional[CapabilityRegistry] = None
self.config_manager: Optional[ConfigManager] = None
self._initialized = False
# Services (initialized in initialize())
# Core services (initialized in initialize())
self.llm: Optional[LLMService] = None
self.tts: Optional[TTSService] = None
self.image: Optional[ImageService] = None
# Content generation services
self.narration_generator = None
self.image_prompt_generator = None
self.narration_generator: Optional[NarrationGeneratorService] = None
self.image_prompt_generator: Optional[ImagePromptGeneratorService] = None
# Frame processing services
self.frame_composer = None
self.storyboard_processor = None
self.frame_composer: Optional[FrameComposerService] = None
self.storyboard_processor: Optional[StoryboardProcessorService] = None
# Video generation service (named as verb for direct calling)
self.generate_video = None
self.generate_video: Optional[VideoGeneratorService] = None
async def initialize(self):
"""
Initialize core capabilities
This registers built-in capabilities and initializes services.
Must be called before using any capabilities.
This initializes all services and must be called before using any capabilities.
Example:
await reelforge.initialize()
@@ -90,52 +89,25 @@ class ReelForgeCore:
logger.info("🚀 Initializing ReelForge...")
# 1. Import all built-in capabilities (registers them with reelforge_mcp)
self._import_builtin_capabilities()
# 2. Register all built-in capabilities
self.registry = CapabilityRegistry(reelforge_mcp)
await self.registry.register_all()
# 3. Create config manager
self.config_manager = ConfigManager(self.registry, self.config)
# 4. Initialize capability-based services
self.llm = LLMService(self.config_manager)
self.tts = TTSService(self.config_manager)
# Initialize workflow-based services (no capability layer)
# 1. Initialize core services (no capability layer)
self.llm = LLMService(self.config)
self.tts = TTSService(self.config)
self.image = ImageService(self.config)
# 5. Initialize content generation services
from reelforge.services.narration_generator import NarrationGeneratorService
from reelforge.services.image_prompt_generator import ImagePromptGeneratorService
# 2. Initialize content generation services
self.narration_generator = NarrationGeneratorService(self)
self.image_prompt_generator = ImagePromptGeneratorService(self)
# 6. Initialize frame processing services
from reelforge.services.frame_composer import FrameComposerService
from reelforge.services.storyboard_processor import StoryboardProcessorService
# 3. Initialize frame processing services
self.frame_composer = FrameComposerService()
self.storyboard_processor = StoryboardProcessorService(self)
# 7. Initialize video generation service
from reelforge.services.video_generator import VideoGeneratorService
# 4. Initialize video generation service
self.generate_video = VideoGeneratorService(self)
self._initialized = True
logger.info("✅ ReelForge initialized successfully\n")
def _import_builtin_capabilities(self):
"""Import all built-in capability modules to register them"""
# This triggers the @reelforge_mcp.tool decorators
from reelforge.capabilities import llm # noqa: F401
from reelforge.capabilities import tts # noqa: F401
# Note: image no longer uses capability layer (workflow-based)
@property
def project_name(self) -> str:
"""Get project name from config"""
@@ -149,4 +121,3 @@ class ReelForgeCore:
# Global instance
reelforge = ReelForgeCore()

View File

@@ -4,9 +4,9 @@ ReelForge Services
Unified service layer providing simplified access to capabilities.
"""
from reelforge.services.base import BaseService
from reelforge.services.llm import LLMService
from reelforge.services.tts import TTSService
from reelforge.services.comfy_base_service import ComfyBaseService
from reelforge.services.llm_service import LLMService
from reelforge.services.tts_service import TTSService
from reelforge.services.image import ImageService
from reelforge.services.video import VideoService
from reelforge.services.narration_generator import NarrationGeneratorService
@@ -16,7 +16,7 @@ from reelforge.services.storyboard_processor import StoryboardProcessorService
from reelforge.services.video_generator import VideoGeneratorService
__all__ = [
"BaseService",
"ComfyBaseService",
"LLMService",
"TTSService",
"ImageService",

View File

@@ -1,85 +0,0 @@
"""
Base service class for all capability services
"""
from abc import ABC, abstractmethod
from typing import Any, Optional
from reelforge.core.config_manager import ConfigManager
class BaseService(ABC):
"""
Base service class for all capability services
Provides callable interface and basic properties:
- Direct call: result = await service(...)
- Active capability: service.active
- Available IDs: service.available
Usage:
result = await reelforge.llm("Hello world")
print(f"Using: {reelforge.llm.active}")
print(f"Available: {reelforge.llm.available}")
"""
def __init__(self, config_manager: ConfigManager, capability_type: str):
"""
Initialize service
Args:
config_manager: ConfigManager instance
capability_type: Type of capability (llm, tts, etc.)
"""
self._config_manager = config_manager
self._capability_type = capability_type
@abstractmethod
async def __call__(self, **kwargs) -> Any:
"""
Make service callable directly
This is the main entry point for using the service.
Subclasses MUST implement this with specific signatures.
Example:
answer = await reelforge.llm(prompt="Hello")
"""
pass
@property
def active(self) -> Optional[str]:
"""
Get active capability ID
Returns:
Active capability ID (e.g., "call") or None if not set
Example:
print(f"Using LLM: {reelforge.llm.active}")
"""
return self._config_manager.get_active(self._capability_type)
@property
def available(self) -> list[str]:
"""
List available capability IDs
Returns:
List of capability IDs
Example:
print(f"Available LLMs: {reelforge.llm.available}")
"""
return self._config_manager.get_available_ids(self._capability_type)
def __repr__(self) -> str:
"""String representation"""
active = self.active or "none"
available = ", ".join(self.available) if self.available else "none"
return (
f"<{self.__class__.__name__} "
f"active={active!r} "
f"available=[{available}]>"
)

View File

@@ -0,0 +1,191 @@
"""
ComfyUI Base Service - Common logic for ComfyUI-based services
"""
import os
from pathlib import Path
from typing import Optional, List, Dict, Any
from comfykit import ComfyKit
from loguru import logger
class ComfyBaseService:
"""
Base service for ComfyUI workflow-based capabilities
Provides common functionality for TTS, Image, and other ComfyUI-based services.
Subclasses should define:
- WORKFLOW_PREFIX: Prefix for workflow files (e.g., "image_", "tts_")
- DEFAULT_WORKFLOW: Default workflow filename (e.g., "image_default.json")
- WORKFLOWS_DIR: Directory containing workflows (default: "workflows")
"""
WORKFLOW_PREFIX: str = "" # Must be overridden by subclass
DEFAULT_WORKFLOW: str = "" # Must be overridden by subclass
WORKFLOWS_DIR: str = "workflows"
def __init__(self, config: dict, service_name: str):
"""
Initialize ComfyUI base service
Args:
config: Full application config dict
service_name: Service name in config (e.g., "tts", "image")
"""
self.config = config.get(service_name, {})
self.service_name = service_name
self._workflows_cache: Optional[List[str]] = None
def _scan_workflows(self) -> List[str]:
"""
Scan workflows/{prefix}*.json files
Returns:
List of workflow filenames
Example: ["image_default.json", "image_flux.json"]
"""
workflows = []
workflows_dir = Path(self.WORKFLOWS_DIR)
if not workflows_dir.exists():
logger.warning(f"Workflows directory not found: {workflows_dir}")
return workflows
# Scan for {prefix}_*.json files
for file in workflows_dir.glob(f"{self.WORKFLOW_PREFIX}*.json"):
workflows.append(file.name)
logger.debug(f"Found {self.service_name} workflow: {file.name}")
return sorted(workflows)
def _get_default_workflow(self) -> str:
"""
Get default workflow name from config or use DEFAULT_WORKFLOW
Returns:
Default workflow filename
"""
return self.config.get("default_workflow", self.DEFAULT_WORKFLOW)
def _resolve_workflow(self, workflow: Optional[str] = None) -> str:
"""
Resolve workflow to actual workflow path
Args:
workflow: Workflow filename (e.g., "image_default.json")
Can also be:
- Absolute path: "/path/to/workflow.json"
- Relative path: "custom/workflow.json"
- URL: "http://..."
- RunningHub ID: "12345"
Returns:
Workflow file path or identifier
Raises:
ValueError: If workflow not found
"""
# 1. If not specified, use default
if workflow is None:
workflow = self._get_default_workflow()
# 2. If it's an absolute path, URL, or looks like RunningHub ID, use as-is
if (workflow.startswith("/") or
workflow.startswith("http://") or
workflow.startswith("https://") or
workflow.isdigit()):
logger.debug(f"Using workflow identifier: {workflow}")
return workflow
# 3. If it's just a filename, look in workflows/ directory
workflow_path = Path(self.WORKFLOWS_DIR) / workflow
if not workflow_path.exists():
# List available workflows for error message
available = self._scan_workflows()
available_str = ", ".join(available) if available else "none"
raise ValueError(
f"Workflow '{workflow}' not found at {workflow_path}. "
f"Available workflows: {available_str}\n"
f"Please create: {workflow_path}"
)
logger.info(f"🎬 Using {self.service_name} workflow: {workflow}")
return str(workflow_path)
def _prepare_comfykit_config(
self,
comfyui_url: Optional[str] = None,
runninghub_api_key: Optional[str] = None,
) -> Dict[str, Any]:
"""
Prepare ComfyKit configuration
Args:
comfyui_url: ComfyUI URL (optional, overrides config)
runninghub_api_key: RunningHub API key (optional, overrides config)
Returns:
ComfyKit configuration dict
"""
kit_config = {}
# ComfyUI URL (priority: param > config > env > default)
final_comfyui_url = (
comfyui_url
or self.config.get("comfyui_url")
or os.getenv("COMFYUI_BASE_URL")
or "http://127.0.0.1:8188"
)
kit_config["comfyui_url"] = final_comfyui_url
# RunningHub API key (priority: param > config > env)
final_rh_key = (
runninghub_api_key
or self.config.get("runninghub_api_key")
or os.getenv("RUNNINGHUB_API_KEY")
)
if final_rh_key:
kit_config["runninghub_api_key"] = final_rh_key
logger.debug(f"ComfyKit config: {kit_config}")
return kit_config
def list_workflows(self) -> List[str]:
"""
List all available workflows
Returns:
List of workflow filenames (sorted alphabetically)
Example:
workflows = service.list_workflows()
# ['image_default.json', 'image_flux.json']
"""
return self._scan_workflows()
@property
def available(self) -> List[str]:
"""
List available workflows
Returns:
List of available workflow filenames
Example:
print(f"Available workflows: {service.available}")
"""
return self.list_workflows()
def __repr__(self) -> str:
"""String representation"""
default = self._get_default_workflow()
available = ", ".join(self.available) if self.available else "none"
return (
f"<{self.__class__.__name__} "
f"default={default!r} "
f"available=[{available}]>"
)

View File

@@ -1,42 +1,37 @@
"""
Image Generation Service - Workflow-based, no capability layer
This service directly uses ComfyKit to execute workflows without going through
the capability abstraction layer. This is because workflow files themselves
already provide sufficient abstraction and flexibility.
Image Generation Service - ComfyUI Workflow-based implementation
"""
import os
from pathlib import Path
from typing import Optional, List, Dict
from typing import Optional
from comfykit import ComfyKit
from loguru import logger
from reelforge.services.comfy_base_service import ComfyBaseService
class ImageService:
class ImageService(ComfyBaseService):
"""
Image generation service - Workflow-based
Directly uses ComfyKit to execute workflows. No capability abstraction needed
since workflow itself is already the abstraction.
Uses ComfyKit to execute image generation workflows.
Usage:
# Use default preset (workflows/image_default.json)
# Use default workflow (workflows/image_default.json)
image_url = await reelforge.image(prompt="a cat")
# Use specific preset
image_url = await reelforge.image(preset="flux", prompt="a cat")
# Use specific workflow
image_url = await reelforge.image(
prompt="a cat",
workflow="image_flux.json"
)
# List available presets
presets = reelforge.image.list_presets()
# Get preset path
path = reelforge.image.get_preset_path("flux")
# List available workflows
workflows = reelforge.image.list_workflows()
"""
PRESET_PREFIX = "image_"
DEFAULT_PRESET = "default"
WORKFLOW_PREFIX = "image_"
DEFAULT_WORKFLOW = "image_default.json"
WORKFLOWS_DIR = "workflows"
def __init__(self, config: dict):
@@ -46,105 +41,11 @@ class ImageService:
Args:
config: Full application config dict
"""
self.config = config.get("image", {})
self._presets_cache: Optional[Dict[str, str]] = None
def _scan_presets(self) -> Dict[str, str]:
"""
Scan workflows/image_*.json files
Returns:
Dict mapping preset name to workflow path
Example: {"default": "workflows/image_default.json", "flux": "workflows/image_flux.json"}
"""
if self._presets_cache is not None:
return self._presets_cache
presets = {}
workflows_dir = Path(self.WORKFLOWS_DIR)
if not workflows_dir.exists():
logger.warning(f"Workflows directory not found: {workflows_dir}")
return presets
# Scan for image_*.json files
for file in workflows_dir.glob(f"{self.PRESET_PREFIX}*.json"):
# Extract preset name: "image_flux.json" -> "flux"
preset_name = file.stem.replace(self.PRESET_PREFIX, "")
presets[preset_name] = str(file)
logger.debug(f"Found image preset: {preset_name} -> {file}")
self._presets_cache = presets
return presets
def _get_default_preset(self) -> str:
"""
Get default preset name from config or use "default"
Priority:
1. config.yaml: image.default
2. "default"
"""
return self.config.get("default", self.DEFAULT_PRESET)
def _resolve_workflow(
self,
preset: Optional[str] = None,
workflow: Optional[str] = None
) -> str:
"""
Resolve preset/workflow to actual workflow path
Args:
preset: Preset name (e.g., "flux", "default")
workflow: Full workflow path (for backward compatibility)
Returns:
Workflow file path
Raises:
ValueError: If preset not found or no workflows available
"""
# 1. If explicit workflow path provided, use it
if workflow:
logger.debug(f"Using explicit workflow: {workflow}")
return workflow
# 2. Scan available presets
presets = self._scan_presets()
if not presets:
raise ValueError(
f"No workflow presets found in {self.WORKFLOWS_DIR}/ directory. "
f"Please create at least one workflow file: {self.WORKFLOWS_DIR}/{self.PRESET_PREFIX}default.json"
)
# 3. Determine which preset to use
if preset:
# Use specified preset
target_preset = preset
else:
# Use default preset
target_preset = self._get_default_preset()
# 4. Lookup preset
if target_preset not in presets:
available = ", ".join(sorted(presets.keys()))
raise ValueError(
f"Preset '{target_preset}' not found. "
f"Available presets: {available}\n"
f"Please create: {self.WORKFLOWS_DIR}/{self.PRESET_PREFIX}{target_preset}.json"
)
workflow_path = presets[target_preset]
logger.info(f"🎨 Using image preset: {target_preset} ({workflow_path})")
return workflow_path
super().__init__(config, service_name="image")
async def __call__(
self,
prompt: str,
preset: Optional[str] = None,
workflow: Optional[str] = None,
# ComfyUI connection (optional overrides)
comfyui_url: Optional[str] = None,
@@ -164,8 +65,7 @@ class ImageService:
Args:
prompt: Image generation prompt
preset: Preset name (default: from config or "default")
workflow: Full workflow path (backward compatible)
workflow: Workflow filename (default: from config or "image_default.json")
comfyui_url: ComfyUI URL (optional, overrides config)
runninghub_api_key: RunningHub API key (optional, overrides config)
width: Image width
@@ -181,26 +81,29 @@ class ImageService:
Generated image URL/path
Examples:
# Simplest: use default preset (workflows/image_default.json)
# Simplest: use default workflow (workflows/image_default.json)
image_url = await reelforge.image(prompt="a beautiful cat")
# Use specific preset
image_url = await reelforge.image(preset="flux", prompt="a cat")
# Use specific workflow
image_url = await reelforge.image(
prompt="a cat",
workflow="image_flux.json"
)
# With additional parameters
image_url = await reelforge.image(
preset="flux",
prompt="a cat",
workflow="image_flux.json",
width=1024,
height=1024,
steps=20,
seed=42
)
# Backward compatible: direct workflow path
# With absolute path
image_url = await reelforge.image(
workflow="workflows/custom.json",
prompt="a cat"
prompt="a cat",
workflow="/path/to/custom.json"
)
# With custom ComfyUI server
@@ -210,30 +113,13 @@ class ImageService:
)
"""
# 1. Resolve workflow path
workflow_path = self._resolve_workflow(preset=preset, workflow=workflow)
workflow_path = self._resolve_workflow(workflow=workflow)
# 2. Prepare ComfyKit config
kit_config = {}
# ComfyUI URL (priority: param > config > env > default)
final_comfyui_url = (
comfyui_url
or self.config.get("comfyui_url")
or os.getenv("COMFYUI_BASE_URL")
or "http://127.0.0.1:8188"
kit_config = self._prepare_comfykit_config(
comfyui_url=comfyui_url,
runninghub_api_key=runninghub_api_key
)
kit_config["comfyui_url"] = final_comfyui_url
# RunningHub API key (priority: param > config > env)
final_rh_key = (
runninghub_api_key
or self.config.get("runninghub_api_key")
or os.getenv("RUNNINGHUB_API_KEY")
)
if final_rh_key:
kit_config["runninghub_api_key"] = final_rh_key
logger.debug(f"ComfyKit config: {kit_config}")
# 3. Build workflow parameters
workflow_params = {"prompt": prompt}
@@ -283,74 +169,3 @@ class ImageService:
except Exception as e:
logger.error(f"Image generation error: {e}")
raise
def list_presets(self) -> List[str]:
"""
List all available image presets
Returns:
List of preset names (sorted alphabetically)
Example:
presets = reelforge.image.list_presets()
# ['anime', 'default', 'flux', 'sd15']
"""
return sorted(self._scan_presets().keys())
def get_preset_path(self, preset: str) -> Optional[str]:
"""
Get workflow path for a preset
Args:
preset: Preset name
Returns:
Workflow file path, or None if not found
Example:
path = reelforge.image.get_preset_path("flux")
# 'workflows/image_flux.json'
"""
return self._scan_presets().get(preset)
@property
def active(self) -> str:
"""
Get active preset name
This property is provided for compatibility with other services
that use the capability layer.
Returns:
Active preset name
Example:
print(f"Using preset: {reelforge.image.active}")
"""
return self._get_default_preset()
@property
def available(self) -> List[str]:
"""
List available presets
This property is provided for compatibility with other services
that use the capability layer.
Returns:
List of available preset names
Example:
print(f"Available presets: {reelforge.image.available}")
"""
return self.list_presets()
def __repr__(self) -> str:
"""String representation"""
active = self.active
available = ", ".join(self.available) if self.available else "none"
return (
f"<ImageService "
f"active={active!r} "
f"available=[{available}]>"
)

View File

@@ -1,97 +0,0 @@
"""
LLM (Large Language Model) Service
"""
from typing import Optional
from reelforge.services.base import BaseService
class LLMService(BaseService):
"""
LLM (Large Language Model) service
Provides unified access to various LLM providers (Qwen, OpenAI, DeepSeek, Ollama, etc.)
Usage:
# Direct call (recommended)
answer = await reelforge.llm("Explain atomic habits")
# With parameters
answer = await reelforge.llm(
prompt="Explain atomic habits in 3 sentences",
temperature=0.7,
max_tokens=2000
)
# Explicit call syntax
answer = await reelforge.llm.call(prompt="Hello")
# Check active LLM
print(f"Using: {reelforge.llm.active}")
# List available LLMs
print(f"Available: {reelforge.llm.available}")
"""
def __init__(self, router):
super().__init__(router, "llm")
async def __call__(
self,
prompt: str,
api_key: Optional[str] = None,
base_url: Optional[str] = None,
model: Optional[str] = None,
temperature: Optional[float] = None,
max_tokens: Optional[int] = None,
**kwargs
) -> str:
"""
Generate text using LLM
Args:
prompt: The prompt to generate from
api_key: API key (optional, uses config if not provided)
base_url: Base URL (optional, uses config if not provided)
model: Model name (optional, uses config if not provided)
temperature: Sampling temperature (0.0-2.0). Lower is more deterministic.
max_tokens: Maximum tokens to generate
**kwargs: Additional provider-specific parameters
Returns:
Generated text
Examples:
# Use config from config.yaml
answer = await reelforge.llm("Explain atomic habits")
# Override with custom parameters
answer = await reelforge.llm(
"Explain the concept of atomic habits in 3 sentences",
api_key="sk-custom-key",
base_url="https://api.custom.com/v1",
model="custom-model",
temperature=0.7,
max_tokens=500
)
"""
params = {"prompt": prompt}
# Add optional LLM parameters (will override config if provided)
if api_key is not None:
params["api_key"] = api_key
if base_url is not None:
params["base_url"] = base_url
if model is not None:
params["model"] = model
if temperature is not None:
params["temperature"] = temperature
if max_tokens is not None:
params["max_tokens"] = max_tokens
params.update(kwargs)
return await self._config_manager.call(self._capability_type, **params)

View File

@@ -0,0 +1,184 @@
"""
LLM (Large Language Model) Service - Direct OpenAI SDK implementation
"""
import os
from typing import Optional
from openai import AsyncOpenAI
from loguru import logger
class LLMService:
"""
LLM (Large Language Model) service
Direct implementation using OpenAI SDK. No capability layer needed.
Supports all OpenAI SDK compatible providers:
- OpenAI (gpt-4o, gpt-4o-mini, gpt-3.5-turbo)
- Alibaba Qwen (qwen-max, qwen-plus, qwen-turbo)
- Anthropic Claude (claude-sonnet-4-5, claude-opus-4, claude-haiku-4)
- DeepSeek (deepseek-chat)
- Moonshot Kimi (moonshot-v1-8k, moonshot-v1-32k, moonshot-v1-128k)
- Ollama (llama3.2, qwen2.5, mistral, codellama) - FREE & LOCAL!
- Any custom provider with OpenAI-compatible API
Usage:
# Direct call
answer = await reelforge.llm("Explain atomic habits")
# With parameters
answer = await reelforge.llm(
prompt="Explain atomic habits in 3 sentences",
temperature=0.7,
max_tokens=2000
)
"""
def __init__(self, config: dict):
"""
Initialize LLM service
Args:
config: Full application config dict
"""
self.config = config.get("llm", {})
self._client: Optional[AsyncOpenAI] = None
def _get_config_value(self, key: str, default=None):
"""
Get config value from config file
Args:
key: Config key name
default: Default value if not found
Returns:
Config value
"""
return self.config.get(key, default)
def _create_client(
self,
api_key: Optional[str] = None,
base_url: Optional[str] = None,
) -> AsyncOpenAI:
"""
Create OpenAI client
Args:
api_key: API key (optional, uses config if not provided)
base_url: Base URL (optional, uses config if not provided)
Returns:
AsyncOpenAI client instance
"""
# Get API key (priority: parameter > config)
final_api_key = (
api_key
or self._get_config_value("api_key")
or "dummy-key" # Ollama doesn't need real key
)
# Get base URL (priority: parameter > config)
final_base_url = (
base_url
or self._get_config_value("base_url")
)
# Create client
client_kwargs = {"api_key": final_api_key}
if final_base_url:
client_kwargs["base_url"] = final_base_url
return AsyncOpenAI(**client_kwargs)
async def __call__(
self,
prompt: str,
api_key: Optional[str] = None,
base_url: Optional[str] = None,
model: Optional[str] = None,
temperature: float = 0.7,
max_tokens: int = 2000,
**kwargs
) -> str:
"""
Generate text using LLM
Args:
prompt: The prompt to generate from
api_key: API key (optional, uses config if not provided)
base_url: Base URL (optional, uses config if not provided)
model: Model name (optional, uses config if not provided)
temperature: Sampling temperature (0.0-2.0). Lower is more deterministic.
max_tokens: Maximum tokens to generate
**kwargs: Additional provider-specific parameters
Returns:
Generated text
Examples:
# Use config from config.yaml
answer = await reelforge.llm("Explain atomic habits")
# Override with custom parameters
answer = await reelforge.llm(
prompt="Explain atomic habits in 3 sentences",
api_key="sk-custom-key",
base_url="https://api.custom.com/v1",
model="custom-model",
temperature=0.7,
max_tokens=500
)
"""
# Create client (new instance each time to support parameter overrides)
client = self._create_client(api_key=api_key, base_url=base_url)
# Get model (priority: parameter > config)
final_model = (
model
or self._get_config_value("model")
or "gpt-3.5-turbo" # Default fallback
)
logger.debug(f"LLM call: model={final_model}, base_url={client.base_url}")
try:
response = await client.chat.completions.create(
model=final_model,
messages=[{"role": "user", "content": prompt}],
temperature=temperature,
max_tokens=max_tokens,
**kwargs
)
result = response.choices[0].message.content
logger.debug(f"LLM response length: {len(result)} chars")
return result
except Exception as e:
logger.error(f"LLM call error (model={final_model}, base_url={client.base_url}): {e}")
raise
@property
def active(self) -> str:
"""
Get active model name
Returns:
Active model name
Example:
print(f"Using model: {reelforge.llm.active}")
"""
return self._get_config_value("model", "gpt-3.5-turbo")
def __repr__(self) -> str:
"""String representation"""
model = self.active
base_url = self._get_config_value("base_url", "default")
return f"<LLMService model={model!r} base_url={base_url!r}>"

View File

@@ -123,7 +123,8 @@ class StoryboardProcessorService:
# Call TTS
audio_path = await self.core.tts(
text=frame.narration,
voice=config.voice_id
voice=config.voice_id,
rate="+20%",
)
frame.audio_path = audio_path

View File

@@ -1,103 +0,0 @@
"""
TTS (Text-to-Speech) Service
"""
import base64
import uuid
from typing import Optional
from reelforge.services.base import BaseService
from reelforge.utils.os_util import get_temp_path, save_bytes_to_file
class TTSService(BaseService):
"""
TTS (Text-to-Speech) service
Provides unified access to various TTS providers (Edge TTS, Azure TTS, etc.)
Returns path to saved audio file.
Usage:
# Direct call (auto-generate temp path)
audio_path = await reelforge.tts("Hello world")
# Returns: "temp/abc123def456.mp3"
# With voice parameter
audio_path = await reelforge.tts(
text="你好,世界",
voice="zh-CN-YunjianNeural"
)
# Specify custom output path
audio_path = await reelforge.tts(
text="Hello",
output_path="output/greeting.mp3"
)
# Check active TTS
print(f"Using: {reelforge.tts.active}")
"""
def __init__(self, router):
super().__init__(router, "tts")
async def __call__(
self,
text: str,
voice: Optional[str] = None,
rate: Optional[str] = None,
output_path: Optional[str] = None,
**kwargs
) -> str:
"""
Convert text to speech and save to file
Args:
text: Text to convert to speech
voice: Voice ID (uses default if not specified)
rate: Speech rate (e.g., "+0%", "+50%", "-20%")
output_path: Output file path (default: temp/<uuid>.mp3)
**kwargs: Additional provider-specific parameters
Returns:
Path to saved audio file (str)
Example:
# Auto-generate path
audio_path = await reelforge.tts("Hello world")
# Returns: "temp/abc123def456.mp3"
# Specify custom path
audio_path = await reelforge.tts(
"你好,世界",
voice="zh-CN-YunjianNeural",
output_path="output/greeting.mp3"
)
"""
params = {"text": text}
if voice is not None:
params["voice"] = voice
if rate is not None:
params["rate"] = rate
params.update(kwargs)
# Call capability and get base64-encoded audio
audio_base64 = await self._config_manager.call(self._capability_type, **params)
# Decode base64 to bytes
if isinstance(audio_base64, str):
audio_data = base64.b64decode(audio_base64)
else:
audio_data = audio_base64
# Generate output path if not specified
if output_path is None:
# Generate UUID without hyphens for filename
file_uuid = uuid.uuid4().hex
output_path = get_temp_path(f"{file_uuid}.mp3")
# Save to file
saved_path = save_bytes_to_file(audio_data, output_path)
return saved_path

View File

@@ -0,0 +1,311 @@
"""
TTS (Text-to-Speech) Service - Dual implementation (Edge TTS + ComfyUI)
"""
import uuid
from typing import Optional
from comfykit import ComfyKit
from loguru import logger
from reelforge.services.comfy_base_service import ComfyBaseService
from reelforge.utils.os_util import get_temp_path
class TTSService(ComfyBaseService):
"""
TTS (Text-to-Speech) service - Dual implementation
Supports two TTS methods:
1. Edge TTS (default) - Free, local SDK, no workflow needed
2. ComfyUI Workflow - Workflow-based, requires ComfyUI setup
Usage:
# Use default (edge-tts)
audio_path = await reelforge.tts(text="Hello, world!")
# Explicitly use edge-tts
audio_path = await reelforge.tts(
text="你好,世界!",
workflow="edge"
)
# Use ComfyUI workflow
audio_path = await reelforge.tts(
text="Hello",
workflow="tts_comfyui.json"
)
# List available workflows
workflows = reelforge.tts.list_workflows()
"""
WORKFLOW_PREFIX = "tts_"
DEFAULT_WORKFLOW = "edge" # Default to edge-tts
WORKFLOWS_DIR = "workflows"
# Built-in providers (not workflow files)
BUILTIN_PROVIDERS = ["edge", "edge-tts"]
def __init__(self, config: dict):
"""
Initialize TTS service
Args:
config: Full application config dict
"""
super().__init__(config, service_name="tts")
def _resolve_workflow(self, workflow: Optional[str] = None) -> str:
"""
Resolve workflow to actual workflow path or provider name
Args:
workflow: Workflow filename or provider name (e.g., "edge", "tts_default.json")
Returns:
Workflow file path or provider name
"""
# 1. If not specified, use default
if workflow is None:
workflow = self._get_default_workflow()
# 2. If it's a built-in provider, return as-is
if workflow in self.BUILTIN_PROVIDERS:
logger.debug(f"Using built-in TTS provider: {workflow}")
return workflow
# 3. Otherwise, treat as workflow file (use parent logic)
return super()._resolve_workflow(workflow)
async def __call__(
self,
text: str,
workflow: Optional[str] = None,
# ComfyUI connection (optional overrides, only for workflow mode)
comfyui_url: Optional[str] = None,
runninghub_api_key: Optional[str] = None,
# Common TTS parameters (work for both edge-tts and workflows)
voice: Optional[str] = None,
rate: Optional[str] = None,
volume: Optional[str] = None,
pitch: Optional[str] = None,
**params
) -> str:
"""
Generate speech using edge-tts or ComfyUI workflow
Args:
text: Text to convert to speech
workflow: Workflow filename or provider name (default: "edge")
- "edge" or "edge-tts": Use local edge-tts SDK
- "tts_xxx.json": Use ComfyUI workflow
- Absolute path/URL/RunningHub ID: Also supported
comfyui_url: ComfyUI URL (only for workflow mode)
runninghub_api_key: RunningHub API key (only for workflow mode)
voice: Voice ID
rate: Speech rate (e.g., "+0%", "+50%", "-20%")
volume: Speech volume (e.g., "+0%")
pitch: Speech pitch (e.g., "+0Hz")
**params: Additional parameters
Returns:
Generated audio file path
Examples:
# Simplest: use default (edge-tts)
audio_path = await reelforge.tts(text="Hello, world!")
# Explicitly use edge-tts with parameters
audio_path = await reelforge.tts(
text="你好,世界!",
workflow="edge",
voice="zh-CN-XiaoxiaoNeural",
rate="+20%"
)
# Use ComfyUI workflow
audio_path = await reelforge.tts(
text="Hello",
workflow="tts_default.json"
)
# With absolute path
audio_path = await reelforge.tts(
text="Hello",
workflow="/path/to/custom_tts.json"
)
"""
# 1. Resolve workflow path or provider
workflow_or_provider = self._resolve_workflow(workflow=workflow)
# 2. Determine execution path
if workflow_or_provider in self.BUILTIN_PROVIDERS:
# Use edge-tts
return await self._call_edge_tts(
text=text,
voice=voice,
rate=rate,
volume=volume,
pitch=pitch,
**params
)
else:
# Use ComfyUI workflow
return await self._call_comfyui_workflow(
workflow_path=workflow_or_provider,
text=text,
comfyui_url=comfyui_url,
runninghub_api_key=runninghub_api_key,
voice=voice,
rate=rate,
volume=volume,
pitch=pitch,
**params
)
async def _call_edge_tts(
self,
text: str,
voice: Optional[str] = None,
rate: Optional[str] = None,
volume: Optional[str] = None,
pitch: Optional[str] = None,
**params
) -> str:
"""
Generate speech using edge-tts SDK
Args:
text: Text to convert to speech
voice: Voice ID (default: zh-CN-YunjianNeural)
rate: Speech rate (default: +0%)
volume: Speech volume (default: +0%)
pitch: Speech pitch (default: +0Hz)
**params: Additional parameters (e.g., retry_count, retry_delay)
Returns:
Generated audio file path
"""
from reelforge.utils.tts_util import edge_tts
logger.info(f"🎙️ Using edge-tts (local SDK)")
# Generate temp file path
output_path = get_temp_path(f"{uuid.uuid4().hex}.mp3")
# Call edge-tts with output_path to save directly
try:
audio_bytes = await edge_tts(
text=text,
voice=voice or "zh-CN-YunjianNeural",
rate=rate or "+0%",
volume=volume or "+0%",
pitch=pitch or "+0Hz",
output_path=output_path,
**params
)
logger.info(f"✅ Generated audio (edge-tts): {output_path}")
return output_path
except Exception as e:
logger.error(f"Edge TTS generation error: {e}")
raise
async def _call_comfyui_workflow(
self,
workflow_path: str,
text: str,
comfyui_url: Optional[str] = None,
runninghub_api_key: Optional[str] = None,
voice: Optional[str] = None,
rate: Optional[str] = None,
volume: Optional[str] = None,
pitch: Optional[str] = None,
**params
) -> str:
"""
Generate speech using ComfyUI workflow
Args:
workflow_path: Path to workflow file
text: Text to convert to speech
comfyui_url: ComfyUI URL
runninghub_api_key: RunningHub API key
voice: Voice ID (workflow-specific)
rate: Speech rate (workflow-specific)
volume: Speech volume (workflow-specific)
pitch: Speech pitch (workflow-specific)
**params: Additional workflow parameters
Returns:
Generated audio file path/URL
"""
logger.info(f"🎙️ Using ComfyUI workflow: {workflow_path}")
# 1. Prepare ComfyKit config
kit_config = self._prepare_comfykit_config(
comfyui_url=comfyui_url,
runninghub_api_key=runninghub_api_key
)
# 2. Build workflow parameters
workflow_params = {"text": text}
# Add optional TTS parameters
if voice is not None:
workflow_params["voice"] = voice
if rate is not None:
workflow_params["rate"] = rate
if volume is not None:
workflow_params["volume"] = volume
if pitch is not None:
workflow_params["pitch"] = pitch
# Add any additional parameters
workflow_params.update(params)
logger.debug(f"Workflow parameters: {workflow_params}")
# 3. Execute workflow
try:
kit = ComfyKit(**kit_config)
logger.info(f"Executing TTS workflow: {workflow_path}")
result = await kit.execute(workflow_path, workflow_params)
# 4. Handle result
if result.status != "completed":
error_msg = result.msg or "Unknown error"
logger.error(f"TTS generation failed: {error_msg}")
raise Exception(f"TTS generation failed: {error_msg}")
# ComfyKit result can have audio files in different output types
# Try to get audio file path from result
audio_path = None
# Check for audio files in result.audios (if available)
if hasattr(result, 'audios') and result.audios:
audio_path = result.audios[0]
# Check for files in result.files
elif hasattr(result, 'files') and result.files:
audio_path = result.files[0]
# Check in outputs dictionary
elif hasattr(result, 'outputs') and result.outputs:
# Try to find audio file in outputs
for key, value in result.outputs.items():
if isinstance(value, str) and any(value.endswith(ext) for ext in ['.mp3', '.wav', '.flac']):
audio_path = value
break
if not audio_path:
logger.error("No audio file generated")
raise Exception("No audio file generated by workflow")
logger.info(f"✅ Generated audio (ComfyUI): {audio_path}")
return audio_path
except Exception as e:
logger.error(f"TTS generation error: {e}")
raise

View File

@@ -1,25 +1,16 @@
"""
TTS Capabilities using edge-tts
Edge TTS Utility - Temporarily not used
edge-tts provides free access to Microsoft Edge's online text-to-speech service:
- 400+ voices across 100+ languages
- Natural sounding speech
- No API key required
- Free to use
Convention: Tool names must be tts_{id}
This is the original edge-tts implementation, kept here for potential future use.
Currently, TTS service uses ComfyUI workflows only.
"""
import asyncio
import base64
import ssl
import certifi
import edge_tts
import edge_tts as edge_tts_sdk
from loguru import logger
from pydantic import Field
from aiohttp import WSServerHandshakeError, ClientResponseError
from reelforge.core.mcp_server import reelforge_mcp
# Global flag for SSL verification (set to False for development only)
_SSL_VERIFY_ENABLED = False
@@ -29,36 +20,40 @@ _RETRY_COUNT = 3 # Default retry count
_RETRY_DELAY = 2.0 # Retry delay in seconds
@reelforge_mcp.tool(
description="Convert text to speech using Microsoft Edge TTS - free and natural sounding",
meta={
"reelforge": {
"display_name": "Edge TTS",
"description": "Microsoft Edge Text-to-Speech - 免费且音质自然",
"is_default": True,
}
},
)
async def tts_edge(
text: str = Field(description="Text to convert to speech"),
voice: str = Field(default="zh-CN-YunjianNeural", description="Voice ID (e.g., zh-CN-YunjianNeural, en-US-JennyNeural)"),
rate: str = Field(default="+0%", description="Speech rate (e.g., +0%, +50%, -20%)"),
volume: str = Field(default="+0%", description="Speech volume (e.g., +0%, +50%, -20%)"),
pitch: str = Field(default="+0Hz", description="Speech pitch (e.g., +0Hz, +10Hz, -5Hz)"),
retry_count: int = Field(default=_RETRY_COUNT, description="Number of retries on failure (default: 3)"),
retry_delay: float = Field(default=_RETRY_DELAY, description="Delay between retries in seconds (default: 2.0)"),
) -> str:
async def edge_tts(
text: str,
voice: str = "zh-CN-YunjianNeural",
rate: str = "+0%",
volume: str = "+0%",
pitch: str = "+0Hz",
output_path: str = None,
retry_count: int = _RETRY_COUNT,
retry_delay: float = _RETRY_DELAY,
) -> bytes:
"""
Convert text to speech using Microsoft Edge TTS
This service is free and requires no API key.
Supports 400+ voices across 100+ languages.
Returns audio data as base64-encoded string (MP3 format).
Returns audio data as bytes (MP3 format).
Includes automatic retry mechanism to handle 401 authentication errors
and temporary network issues (default: 3 retries with 2s delay).
Args:
text: Text to convert to speech
voice: Voice ID (e.g., zh-CN-YunjianNeural, en-US-JennyNeural)
rate: Speech rate (e.g., +0%, +50%, -20%)
volume: Speech volume (e.g., +0%, +50%, -20%)
pitch: Speech pitch (e.g., +0Hz, +10Hz, -5Hz)
output_path: Optional output file path to save audio
retry_count: Number of retries on failure (default: 3)
retry_delay: Delay between retries in seconds (default: 2.0)
Returns:
Audio data as bytes (MP3 format)
Popular Chinese voices:
- zh-CN-YunjianNeural (male, default)
- zh-CN-XiaoxiaoNeural (female)
@@ -71,12 +66,11 @@ async def tts_edge(
- en-GB-SoniaNeural (female, British)
Example:
audio_base64 = await tts_edge(
audio_bytes = await edge_tts(
text="你好,世界!",
voice="zh-CN-YunjianNeural",
rate="+20%"
)
# Decode: audio_bytes = base64.b64decode(audio_base64)
"""
logger.debug(f"Calling Edge TTS with voice: {voice}, rate: {rate}, retry_count: {retry_count}")
@@ -106,7 +100,7 @@ async def tts_edge(
try:
# Create communicate instance
communicate = edge_tts.Communicate(
communicate = edge_tts_sdk.Communicate(
text=text,
voice=voice,
rate=rate,
@@ -127,9 +121,13 @@ async def tts_edge(
logger.info(f"Generated {len(audio_data)} bytes of audio data")
# Encode as base64 for JSON serialization
audio_base64 = base64.b64encode(audio_data).decode('utf-8')
return audio_base64
# Save to file if output_path is provided
if output_path:
with open(output_path, "wb") as f:
f.write(audio_data)
logger.info(f"Audio saved to: {output_path}")
return audio_data
finally:
# Restore original function if we patched it
@@ -160,21 +158,7 @@ async def tts_edge(
raise RuntimeError("Edge TTS failed without error (unexpected)")
@reelforge_mcp.tool(
description="List all available voices for Edge TTS",
meta={
"reelforge": {
"display_name": "List Edge TTS Voices",
"description": "获取所有可用的Edge TTS音色列表",
"is_default": False,
}
},
)
async def tts_edge_list_voices(
locale: str | None = Field(default=None, description="Filter by locale (e.g., zh-CN, en-US, ja-JP)"),
retry_count: int = Field(default=_RETRY_COUNT, description="Number of retries on failure (default: 3)"),
retry_delay: float = Field(default=_RETRY_DELAY, description="Delay between retries in seconds (default: 2.0)"),
) -> list[str]:
async def list_voices(locale: str = None, retry_count: int = _RETRY_COUNT, retry_delay: float = _RETRY_DELAY) -> list[str]:
"""
List all available voices for Edge TTS
@@ -184,13 +168,21 @@ async def tts_edge_list_voices(
Includes automatic retry mechanism to handle network errors
(default: 3 retries with 2s delay).
Args:
locale: Filter by locale (e.g., zh-CN, en-US, ja-JP)
retry_count: Number of retries on failure (default: 3)
retry_delay: Delay between retries in seconds (default: 2.0)
Returns:
List of voice IDs
Example:
# List all voices
voices = await tts_edge_list_voices()
voices = await list_voices()
# Returns: ['zh-CN-YunjianNeural', 'zh-CN-XiaoxiaoNeural', ...]
# List Chinese voices only
voices = await tts_edge_list_voices(locale="zh-CN")
voices = await list_voices(locale="zh-CN")
# Returns: ['zh-CN-YunjianNeural', 'zh-CN-XiaoxiaoNeural', ...]
"""
logger.debug(f"Fetching Edge TTS voices, locale filter: {locale}, retry_count: {retry_count}")
@@ -220,7 +212,7 @@ async def tts_edge_list_voices(
try:
# Get all voices
voices = await edge_tts.list_voices()
voices = await edge_tts_sdk.list_voices()
# Filter by locale if specified
if locale: