支持runninghub并行执行,提高视频生成效率
This commit is contained in:
@@ -19,9 +19,10 @@ This is the default pipeline for general-purpose video generation.
|
|||||||
|
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from typing import Optional, Callable, Literal, Dict, Any
|
from typing import Optional, Callable, Literal
|
||||||
|
|
||||||
from loguru import logger
|
from loguru import logger
|
||||||
|
import asyncio
|
||||||
|
|
||||||
from pixelle_video.pipelines.base import BasePipeline
|
from pixelle_video.pipelines.base import BasePipeline
|
||||||
from pixelle_video.models.progress import ProgressEvent
|
from pixelle_video.models.progress import ProgressEvent
|
||||||
@@ -40,6 +41,12 @@ from pixelle_video.utils.content_generators import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
# Whether to enable parallel processing for RunningHub workflows
|
||||||
|
RUNNING_HUB_PARALLEL_ENABLED = True
|
||||||
|
# Parallel limit for RunningHub workflows
|
||||||
|
RUNNING_HUB_PARALLEL_LIMIT = 5
|
||||||
|
|
||||||
|
|
||||||
class StandardPipeline(BasePipeline):
|
class StandardPipeline(BasePipeline):
|
||||||
"""
|
"""
|
||||||
Standard video generation pipeline
|
Standard video generation pipeline
|
||||||
@@ -75,16 +82,13 @@ class StandardPipeline(BasePipeline):
|
|||||||
# === Basic Config ===
|
# === Basic Config ===
|
||||||
n_scenes: int = 5, # Only used in generate mode; ignored in fixed mode
|
n_scenes: int = 5, # Only used in generate mode; ignored in fixed mode
|
||||||
|
|
||||||
# === TTS Parameters ===
|
# === TTS Parameters (supports both old and new parameter names) ===
|
||||||
tts_inference_mode: Optional[str] = None, # "local" or "comfyui"
|
tts_inference_mode: Optional[str] = None, # "local" or "comfyui" (web UI)
|
||||||
tts_voice: Optional[str] = None, # For local mode: Edge TTS voice ID
|
voice_id: Optional[str] = None, # For backward compatibility (deprecated)
|
||||||
tts_speed: Optional[float] = None, # Speed multiplier (0.5-2.0)
|
tts_voice: Optional[str] = None, # Voice ID for local mode (web UI)
|
||||||
tts_workflow: Optional[str] = None, # For ComfyUI mode: workflow path
|
tts_workflow: Optional[str] = None,
|
||||||
ref_audio: Optional[str] = None, # For ComfyUI mode: reference audio
|
tts_speed: float = 1.2,
|
||||||
|
ref_audio: Optional[str] = None, # Reference audio for voice cloning
|
||||||
# Deprecated (kept for backward compatibility)
|
|
||||||
voice_id: Optional[str] = None,
|
|
||||||
|
|
||||||
output_path: Optional[str] = None,
|
output_path: Optional[str] = None,
|
||||||
|
|
||||||
# === LLM Parameters ===
|
# === LLM Parameters ===
|
||||||
@@ -94,7 +98,8 @@ class StandardPipeline(BasePipeline):
|
|||||||
max_image_prompt_words: int = 60,
|
max_image_prompt_words: int = 60,
|
||||||
|
|
||||||
# === Image Parameters ===
|
# === Image Parameters ===
|
||||||
# Note: image_width and image_height are now auto-determined from template meta tags
|
image_width: int = 1024,
|
||||||
|
image_height: int = 1024,
|
||||||
image_workflow: Optional[str] = None,
|
image_workflow: Optional[str] = None,
|
||||||
|
|
||||||
# === Video Parameters ===
|
# === Video Parameters ===
|
||||||
@@ -102,7 +107,9 @@ class StandardPipeline(BasePipeline):
|
|||||||
|
|
||||||
# === Frame Template (determines video size) ===
|
# === Frame Template (determines video size) ===
|
||||||
frame_template: Optional[str] = None,
|
frame_template: Optional[str] = None,
|
||||||
template_params: Optional[Dict[str, Any]] = None, # Custom template parameters
|
|
||||||
|
# === Template Custom Parameters ===
|
||||||
|
template_params: Optional[dict] = None, # Custom template parameters
|
||||||
|
|
||||||
# === Image Style ===
|
# === Image Style ===
|
||||||
prompt_prefix: Optional[str] = None,
|
prompt_prefix: Optional[str] = None,
|
||||||
@@ -150,8 +157,9 @@ class StandardPipeline(BasePipeline):
|
|||||||
min_image_prompt_words: Min image prompt length
|
min_image_prompt_words: Min image prompt length
|
||||||
max_image_prompt_words: Max image prompt length
|
max_image_prompt_words: Max image prompt length
|
||||||
|
|
||||||
|
image_width: Generated image width (default 1024)
|
||||||
|
image_height: Generated image height (default 1024)
|
||||||
image_workflow: Image workflow filename (e.g., "image_flux.json", None = use default)
|
image_workflow: Image workflow filename (e.g., "image_flux.json", None = use default)
|
||||||
Note: Image/video size is now auto-determined from template meta tags
|
|
||||||
|
|
||||||
video_fps: Video frame rate (default 30)
|
video_fps: Video frame rate (default 30)
|
||||||
|
|
||||||
@@ -159,6 +167,9 @@ class StandardPipeline(BasePipeline):
|
|||||||
Format: "SIZExSIZE/template.html" (e.g., "1080x1920/default.html", "1920x1080/modern.html")
|
Format: "SIZExSIZE/template.html" (e.g., "1080x1920/default.html", "1920x1080/modern.html")
|
||||||
Video size is automatically determined from template path
|
Video size is automatically determined from template path
|
||||||
|
|
||||||
|
template_params: Custom template parameters (optional dict)
|
||||||
|
e.g., {"accent_color": "#ff0000", "author": "John Doe"}
|
||||||
|
|
||||||
prompt_prefix: Image prompt prefix (overrides config.yaml if provided)
|
prompt_prefix: Image prompt prefix (overrides config.yaml if provided)
|
||||||
e.g., "anime style, vibrant colors" or "" for no prefix
|
e.g., "anime style, vibrant colors" or "" for no prefix
|
||||||
|
|
||||||
@@ -176,6 +187,29 @@ class StandardPipeline(BasePipeline):
|
|||||||
logger.info(f"🚀 Starting StandardPipeline in '{mode}' mode")
|
logger.info(f"🚀 Starting StandardPipeline in '{mode}' mode")
|
||||||
logger.info(f" Text length: {len(text)} chars")
|
logger.info(f" Text length: {len(text)} chars")
|
||||||
|
|
||||||
|
# === Handle TTS parameter compatibility ===
|
||||||
|
# Support both old API (voice_id) and new API (tts_inference_mode + tts_voice)
|
||||||
|
final_voice_id = None
|
||||||
|
final_tts_workflow = tts_workflow
|
||||||
|
|
||||||
|
if tts_inference_mode:
|
||||||
|
# New API from web UI
|
||||||
|
if tts_inference_mode == "local":
|
||||||
|
# Local Edge TTS mode - use tts_voice
|
||||||
|
final_voice_id = tts_voice or "zh-CN-YunjianNeural"
|
||||||
|
final_tts_workflow = None # Don't use workflow in local mode
|
||||||
|
logger.debug(f"TTS Mode: local (voice={final_voice_id})")
|
||||||
|
elif tts_inference_mode == "comfyui":
|
||||||
|
# ComfyUI workflow mode
|
||||||
|
final_voice_id = None # Don't use voice_id in ComfyUI mode
|
||||||
|
# tts_workflow already set from parameter
|
||||||
|
logger.debug(f"TTS Mode: comfyui (workflow={final_tts_workflow})")
|
||||||
|
else:
|
||||||
|
# Old API (backward compatibility)
|
||||||
|
final_voice_id = voice_id or tts_voice or "zh-CN-YunjianNeural"
|
||||||
|
# tts_workflow already set from parameter
|
||||||
|
logger.debug(f"TTS Mode: legacy (voice_id={final_voice_id}, workflow={final_tts_workflow})")
|
||||||
|
|
||||||
# Determine final title
|
# Determine final title
|
||||||
if title:
|
if title:
|
||||||
final_title = title
|
final_title = title
|
||||||
@@ -208,45 +242,6 @@ class StandardPipeline(BasePipeline):
|
|||||||
output_path = get_task_final_video_path(task_id)
|
output_path = get_task_final_video_path(task_id)
|
||||||
logger.info(f" Will copy final video to: {user_specified_output}")
|
logger.info(f" Will copy final video to: {user_specified_output}")
|
||||||
|
|
||||||
# Determine TTS inference mode and parameters
|
|
||||||
# Priority: explicit params > backward compatibility > config defaults
|
|
||||||
if tts_inference_mode is None:
|
|
||||||
# Check if user provided ComfyUI-specific params
|
|
||||||
if tts_workflow is not None or ref_audio is not None:
|
|
||||||
tts_inference_mode = "comfyui"
|
|
||||||
# Check if user provided old voice_id param (backward compatibility)
|
|
||||||
elif voice_id is not None:
|
|
||||||
tts_inference_mode = "comfyui"
|
|
||||||
if tts_voice is None:
|
|
||||||
tts_voice = voice_id
|
|
||||||
else:
|
|
||||||
# Use config default
|
|
||||||
tts_config = self.core.config.get("comfyui", {}).get("tts", {})
|
|
||||||
tts_inference_mode = tts_config.get("inference_mode", "local")
|
|
||||||
|
|
||||||
# Set voice_id based on mode for StoryboardConfig
|
|
||||||
final_voice_id = None
|
|
||||||
if tts_inference_mode == "local":
|
|
||||||
final_voice_id = tts_voice or voice_id
|
|
||||||
else: # comfyui
|
|
||||||
final_voice_id = voice_id # For ComfyUI, might be None
|
|
||||||
|
|
||||||
# Determine frame template
|
|
||||||
# Priority: explicit param > config default > hardcoded default
|
|
||||||
if frame_template is None:
|
|
||||||
template_config = self.core.config.get("template", {})
|
|
||||||
frame_template = template_config.get("default_template", "1080x1920/default.html")
|
|
||||||
|
|
||||||
# Read media size from template meta tags
|
|
||||||
from pixelle_video.services.frame_html import HTMLFrameGenerator
|
|
||||||
from pixelle_video.utils.template_util import resolve_template_path
|
|
||||||
|
|
||||||
template_path = resolve_template_path(frame_template)
|
|
||||||
temp_generator = HTMLFrameGenerator(template_path)
|
|
||||||
image_width, image_height = temp_generator.get_media_size()
|
|
||||||
|
|
||||||
logger.info(f"📐 Media size from template: {image_width}x{image_height}")
|
|
||||||
|
|
||||||
# Create storyboard config
|
# Create storyboard config
|
||||||
config = StoryboardConfig(
|
config = StoryboardConfig(
|
||||||
task_id=task_id,
|
task_id=task_id,
|
||||||
@@ -256,16 +251,15 @@ class StandardPipeline(BasePipeline):
|
|||||||
min_image_prompt_words=min_image_prompt_words,
|
min_image_prompt_words=min_image_prompt_words,
|
||||||
max_image_prompt_words=max_image_prompt_words,
|
max_image_prompt_words=max_image_prompt_words,
|
||||||
video_fps=video_fps,
|
video_fps=video_fps,
|
||||||
tts_inference_mode=tts_inference_mode,
|
voice_id=final_voice_id, # Use processed voice_id
|
||||||
voice_id=final_voice_id,
|
tts_workflow=final_tts_workflow, # Use processed workflow
|
||||||
tts_workflow=tts_workflow,
|
|
||||||
tts_speed=tts_speed,
|
tts_speed=tts_speed,
|
||||||
ref_audio=ref_audio,
|
ref_audio=ref_audio,
|
||||||
image_width=image_width,
|
image_width=image_width,
|
||||||
image_height=image_height,
|
image_height=image_height,
|
||||||
image_workflow=image_workflow,
|
image_workflow=image_workflow,
|
||||||
frame_template=frame_template,
|
frame_template=frame_template or "1080x1920/default.html",
|
||||||
template_params=template_params
|
template_params=template_params # Custom template parameters
|
||||||
)
|
)
|
||||||
|
|
||||||
# Create storyboard
|
# Create storyboard
|
||||||
@@ -276,16 +270,6 @@ class StandardPipeline(BasePipeline):
|
|||||||
created_at=datetime.now()
|
created_at=datetime.now()
|
||||||
)
|
)
|
||||||
|
|
||||||
# ========== Step 0.8: Check template requirements ==========
|
|
||||||
template_media_type = self._check_template_media_type(config.frame_template)
|
|
||||||
if template_media_type == "video":
|
|
||||||
logger.info(f"🎬 Template requires video generation")
|
|
||||||
elif template_media_type == "image":
|
|
||||||
logger.info(f"📸 Template requires image generation")
|
|
||||||
else: # static
|
|
||||||
logger.info(f"⚡ Static template - skipping media generation pipeline")
|
|
||||||
logger.info(f" 💡 Benefits: Faster generation + Lower cost + No ComfyUI dependency")
|
|
||||||
|
|
||||||
try:
|
try:
|
||||||
# ========== Step 1: Generate/Split narrations ==========
|
# ========== Step 1: Generate/Split narrations ==========
|
||||||
if mode == "generate":
|
if mode == "generate":
|
||||||
@@ -304,115 +288,54 @@ class StandardPipeline(BasePipeline):
|
|||||||
logger.info(f"✅ Split script into {len(narrations)} segments (by lines)")
|
logger.info(f"✅ Split script into {len(narrations)} segments (by lines)")
|
||||||
logger.info(f" Note: n_scenes={n_scenes} is ignored in fixed mode")
|
logger.info(f" Note: n_scenes={n_scenes} is ignored in fixed mode")
|
||||||
|
|
||||||
# ========== Step 2: Generate media prompts (conditional) ==========
|
# ========== Step 2: Generate image prompts ==========
|
||||||
if template_media_type == "video":
|
self._report_progress(progress_callback, "generating_image_prompts", 0.15)
|
||||||
# Video template: generate video prompts
|
|
||||||
self._report_progress(progress_callback, "generating_video_prompts", 0.15)
|
|
||||||
|
|
||||||
from pixelle_video.utils.content_generators import generate_video_prompts
|
# Override prompt_prefix if provided
|
||||||
|
original_prefix = None
|
||||||
|
if prompt_prefix is not None:
|
||||||
|
image_config = self.core.config.get("comfyui", {}).get("image", {})
|
||||||
|
original_prefix = image_config.get("prompt_prefix")
|
||||||
|
image_config["prompt_prefix"] = prompt_prefix
|
||||||
|
logger.info(f"Using custom prompt_prefix: '{prompt_prefix}'")
|
||||||
|
|
||||||
# Override prompt_prefix if provided
|
try:
|
||||||
original_prefix = None
|
# Create progress callback wrapper for image prompt generation
|
||||||
if prompt_prefix is not None:
|
def image_prompt_progress(completed: int, total: int, message: str):
|
||||||
image_config = self.core.config.get("comfyui", {}).get("image", {})
|
batch_progress = completed / total if total > 0 else 0
|
||||||
original_prefix = image_config.get("prompt_prefix")
|
overall_progress = 0.15 + (batch_progress * 0.15)
|
||||||
image_config["prompt_prefix"] = prompt_prefix
|
self._report_progress(
|
||||||
logger.info(f"Using custom prompt_prefix: '{prompt_prefix}'")
|
progress_callback,
|
||||||
|
"generating_image_prompts",
|
||||||
try:
|
overall_progress,
|
||||||
# Create progress callback wrapper for video prompt generation
|
extra_info=message
|
||||||
def video_prompt_progress(completed: int, total: int, message: str):
|
|
||||||
batch_progress = completed / total if total > 0 else 0
|
|
||||||
overall_progress = 0.15 + (batch_progress * 0.15)
|
|
||||||
self._report_progress(
|
|
||||||
progress_callback,
|
|
||||||
"generating_video_prompts",
|
|
||||||
overall_progress,
|
|
||||||
extra_info=message
|
|
||||||
)
|
|
||||||
|
|
||||||
# Generate base video prompts
|
|
||||||
base_image_prompts = await generate_video_prompts(
|
|
||||||
self.llm,
|
|
||||||
narrations=narrations,
|
|
||||||
min_words=min_image_prompt_words,
|
|
||||||
max_words=max_image_prompt_words,
|
|
||||||
progress_callback=video_prompt_progress
|
|
||||||
)
|
)
|
||||||
|
|
||||||
# Apply prompt prefix
|
# Generate base image prompts
|
||||||
from pixelle_video.utils.prompt_helper import build_image_prompt
|
base_image_prompts = await generate_image_prompts(
|
||||||
image_config = self.core.config.get("comfyui", {}).get("image", {})
|
self.llm,
|
||||||
prompt_prefix_to_use = prompt_prefix if prompt_prefix is not None else image_config.get("prompt_prefix", "")
|
narrations=narrations,
|
||||||
|
min_words=min_image_prompt_words,
|
||||||
|
max_words=max_image_prompt_words,
|
||||||
|
progress_callback=image_prompt_progress
|
||||||
|
)
|
||||||
|
|
||||||
image_prompts = []
|
# Apply prompt prefix
|
||||||
for base_prompt in base_image_prompts:
|
from pixelle_video.utils.prompt_helper import build_image_prompt
|
||||||
final_prompt = build_image_prompt(base_prompt, prompt_prefix_to_use)
|
image_config = self.core.config.get("comfyui", {}).get("image", {})
|
||||||
image_prompts.append(final_prompt)
|
prompt_prefix_to_use = prompt_prefix if prompt_prefix is not None else image_config.get("prompt_prefix", "")
|
||||||
|
|
||||||
finally:
|
image_prompts = []
|
||||||
# Restore original prompt_prefix
|
for base_prompt in base_image_prompts:
|
||||||
if original_prefix is not None:
|
final_prompt = build_image_prompt(base_prompt, prompt_prefix_to_use)
|
||||||
image_config["prompt_prefix"] = original_prefix
|
image_prompts.append(final_prompt)
|
||||||
|
|
||||||
logger.info(f"✅ Generated {len(image_prompts)} video prompts")
|
finally:
|
||||||
|
# Restore original prompt_prefix
|
||||||
|
if original_prefix is not None:
|
||||||
|
image_config["prompt_prefix"] = original_prefix
|
||||||
|
|
||||||
elif template_media_type == "image":
|
logger.info(f"✅ Generated {len(image_prompts)} image prompts")
|
||||||
# Image template: generate image prompts
|
|
||||||
self._report_progress(progress_callback, "generating_image_prompts", 0.15)
|
|
||||||
|
|
||||||
# Override prompt_prefix if provided
|
|
||||||
original_prefix = None
|
|
||||||
if prompt_prefix is not None:
|
|
||||||
image_config = self.core.config.get("comfyui", {}).get("image", {})
|
|
||||||
original_prefix = image_config.get("prompt_prefix")
|
|
||||||
image_config["prompt_prefix"] = prompt_prefix
|
|
||||||
logger.info(f"Using custom prompt_prefix: '{prompt_prefix}'")
|
|
||||||
|
|
||||||
try:
|
|
||||||
# Create progress callback wrapper for image prompt generation
|
|
||||||
def image_prompt_progress(completed: int, total: int, message: str):
|
|
||||||
batch_progress = completed / total if total > 0 else 0
|
|
||||||
overall_progress = 0.15 + (batch_progress * 0.15)
|
|
||||||
self._report_progress(
|
|
||||||
progress_callback,
|
|
||||||
"generating_image_prompts",
|
|
||||||
overall_progress,
|
|
||||||
extra_info=message
|
|
||||||
)
|
|
||||||
|
|
||||||
# Generate base image prompts
|
|
||||||
base_image_prompts = await generate_image_prompts(
|
|
||||||
self.llm,
|
|
||||||
narrations=narrations,
|
|
||||||
min_words=min_image_prompt_words,
|
|
||||||
max_words=max_image_prompt_words,
|
|
||||||
progress_callback=image_prompt_progress
|
|
||||||
)
|
|
||||||
|
|
||||||
# Apply prompt prefix
|
|
||||||
from pixelle_video.utils.prompt_helper import build_image_prompt
|
|
||||||
image_config = self.core.config.get("comfyui", {}).get("image", {})
|
|
||||||
prompt_prefix_to_use = prompt_prefix if prompt_prefix is not None else image_config.get("prompt_prefix", "")
|
|
||||||
|
|
||||||
image_prompts = []
|
|
||||||
for base_prompt in base_image_prompts:
|
|
||||||
final_prompt = build_image_prompt(base_prompt, prompt_prefix_to_use)
|
|
||||||
image_prompts.append(final_prompt)
|
|
||||||
|
|
||||||
finally:
|
|
||||||
# Restore original prompt_prefix
|
|
||||||
if original_prefix is not None:
|
|
||||||
image_config["prompt_prefix"] = original_prefix
|
|
||||||
|
|
||||||
logger.info(f"✅ Generated {len(image_prompts)} image prompts")
|
|
||||||
|
|
||||||
else: # text
|
|
||||||
# Text-only template: skip media prompt generation
|
|
||||||
image_prompts = [None] * len(narrations)
|
|
||||||
self._report_progress(progress_callback, "preparing_frames", 0.15)
|
|
||||||
logger.info(f"⚡ Skipped media prompt generation (text-only template)")
|
|
||||||
logger.info(f" 💡 Savings: {len(narrations)} LLM calls + {len(narrations)} media generations")
|
|
||||||
|
|
||||||
# ========== Step 3: Create frames ==========
|
# ========== Step 3: Create frames ==========
|
||||||
for i, (narration, image_prompt) in enumerate(zip(narrations, image_prompts)):
|
for i, (narration, image_prompt) in enumerate(zip(narrations, image_prompts)):
|
||||||
@@ -425,43 +348,114 @@ class StandardPipeline(BasePipeline):
|
|||||||
storyboard.frames.append(frame)
|
storyboard.frames.append(frame)
|
||||||
|
|
||||||
# ========== Step 4: Process each frame ==========
|
# ========== Step 4: Process each frame ==========
|
||||||
for i, frame in enumerate(storyboard.frames):
|
# Check if using RunningHub workflows for parallel processing
|
||||||
base_progress = 0.2
|
# Enable parallel if either TTS or Image uses RunningHub (most time-consuming parts)
|
||||||
frame_range = 0.6
|
is_runninghub = (
|
||||||
per_frame_progress = frame_range / len(storyboard.frames)
|
(config.tts_workflow and config.tts_workflow.startswith("runninghub/")) or
|
||||||
|
(config.image_workflow and config.image_workflow.startswith("runninghub/"))
|
||||||
|
)
|
||||||
|
|
||||||
# Create frame-specific progress callback
|
if is_runninghub and RUNNING_HUB_PARALLEL_ENABLED:
|
||||||
def frame_progress_callback(event: ProgressEvent):
|
logger.info(f"🚀 Using parallel processing for RunningHub workflows (max {RUNNING_HUB_PARALLEL_LIMIT} concurrent)")
|
||||||
overall_progress = base_progress + (per_frame_progress * i) + (per_frame_progress * event.progress)
|
logger.info(f" TTS: {'runninghub' if config.tts_workflow and config.tts_workflow.startswith('runninghub/') else 'local'}")
|
||||||
if progress_callback:
|
logger.info(f" Image: {'runninghub' if config.image_workflow and config.image_workflow.startswith('runninghub/') else 'local'}")
|
||||||
adjusted_event = ProgressEvent(
|
|
||||||
event_type=event.event_type,
|
semaphore = asyncio.Semaphore(RUNNING_HUB_PARALLEL_LIMIT)
|
||||||
progress=overall_progress,
|
completed_count = 0
|
||||||
frame_current=event.frame_current,
|
|
||||||
frame_total=event.frame_total,
|
async def process_frame_with_semaphore(i: int, frame: StoryboardFrame):
|
||||||
step=event.step,
|
nonlocal completed_count
|
||||||
action=event.action
|
async with semaphore:
|
||||||
|
base_progress = 0.2
|
||||||
|
frame_range = 0.6
|
||||||
|
per_frame_progress = frame_range / len(storyboard.frames)
|
||||||
|
|
||||||
|
# Create frame-specific progress callback
|
||||||
|
def frame_progress_callback(event: ProgressEvent):
|
||||||
|
overall_progress = base_progress + (per_frame_progress * completed_count) + (per_frame_progress * event.progress)
|
||||||
|
if progress_callback:
|
||||||
|
adjusted_event = ProgressEvent(
|
||||||
|
event_type=event.event_type,
|
||||||
|
progress=overall_progress,
|
||||||
|
frame_current=i+1,
|
||||||
|
frame_total=len(storyboard.frames),
|
||||||
|
step=event.step,
|
||||||
|
action=event.action
|
||||||
|
)
|
||||||
|
progress_callback(adjusted_event)
|
||||||
|
|
||||||
|
# Report frame start
|
||||||
|
self._report_progress(
|
||||||
|
progress_callback,
|
||||||
|
"processing_frame",
|
||||||
|
base_progress + (per_frame_progress * completed_count),
|
||||||
|
frame_current=i+1,
|
||||||
|
frame_total=len(storyboard.frames)
|
||||||
)
|
)
|
||||||
progress_callback(adjusted_event)
|
|
||||||
|
|
||||||
# Report frame start
|
processed_frame = await self.core.frame_processor(
|
||||||
self._report_progress(
|
frame=frame,
|
||||||
progress_callback,
|
storyboard=storyboard,
|
||||||
"processing_frame",
|
config=config,
|
||||||
base_progress + (per_frame_progress * i),
|
total_frames=len(storyboard.frames),
|
||||||
frame_current=i+1,
|
progress_callback=frame_progress_callback
|
||||||
frame_total=len(storyboard.frames)
|
)
|
||||||
)
|
|
||||||
|
|
||||||
processed_frame = await self.core.frame_processor(
|
completed_count += 1
|
||||||
frame=frame,
|
logger.info(f"✅ Frame {i+1} completed ({processed_frame.duration:.2f}s) [{completed_count}/{len(storyboard.frames)}]")
|
||||||
storyboard=storyboard,
|
return i, processed_frame
|
||||||
config=config,
|
|
||||||
total_frames=len(storyboard.frames),
|
# Create all tasks and execute in parallel
|
||||||
progress_callback=frame_progress_callback
|
tasks = [process_frame_with_semaphore(i, frame) for i, frame in enumerate(storyboard.frames)]
|
||||||
)
|
results = await asyncio.gather(*tasks)
|
||||||
storyboard.total_duration += processed_frame.duration
|
|
||||||
logger.info(f"✅ Frame {i+1} completed ({processed_frame.duration:.2f}s)")
|
# Update frames in order and calculate total duration
|
||||||
|
for idx, processed_frame in sorted(results, key=lambda x: x[0]):
|
||||||
|
storyboard.frames[idx] = processed_frame
|
||||||
|
storyboard.total_duration += processed_frame.duration
|
||||||
|
|
||||||
|
logger.info(f"✅ All frames processed in parallel (total duration: {storyboard.total_duration:.2f}s)")
|
||||||
|
else:
|
||||||
|
# Serial processing for non-RunningHub workflows
|
||||||
|
logger.info("⚙️ Using serial processing (non-RunningHub workflow)")
|
||||||
|
|
||||||
|
for i, frame in enumerate(storyboard.frames):
|
||||||
|
base_progress = 0.2
|
||||||
|
frame_range = 0.6
|
||||||
|
per_frame_progress = frame_range / len(storyboard.frames)
|
||||||
|
|
||||||
|
# Create frame-specific progress callback
|
||||||
|
def frame_progress_callback(event: ProgressEvent):
|
||||||
|
overall_progress = base_progress + (per_frame_progress * i) + (per_frame_progress * event.progress)
|
||||||
|
if progress_callback:
|
||||||
|
adjusted_event = ProgressEvent(
|
||||||
|
event_type=event.event_type,
|
||||||
|
progress=overall_progress,
|
||||||
|
frame_current=event.frame_current,
|
||||||
|
frame_total=event.frame_total,
|
||||||
|
step=event.step,
|
||||||
|
action=event.action
|
||||||
|
)
|
||||||
|
progress_callback(adjusted_event)
|
||||||
|
|
||||||
|
# Report frame start
|
||||||
|
self._report_progress(
|
||||||
|
progress_callback,
|
||||||
|
"processing_frame",
|
||||||
|
base_progress + (per_frame_progress * i),
|
||||||
|
frame_current=i+1,
|
||||||
|
frame_total=len(storyboard.frames)
|
||||||
|
)
|
||||||
|
|
||||||
|
processed_frame = await self.core.frame_processor(
|
||||||
|
frame=frame,
|
||||||
|
storyboard=storyboard,
|
||||||
|
config=config,
|
||||||
|
total_frames=len(storyboard.frames),
|
||||||
|
progress_callback=frame_progress_callback
|
||||||
|
)
|
||||||
|
storyboard.total_duration += processed_frame.duration
|
||||||
|
logger.info(f"✅ Frame {i+1} completed ({processed_frame.duration:.2f}s)")
|
||||||
|
|
||||||
# ========== Step 5: Concatenate videos ==========
|
# ========== Step 5: Concatenate videos ==========
|
||||||
self._report_progress(progress_callback, "concatenating", 0.85)
|
self._report_progress(progress_callback, "concatenating", 0.85)
|
||||||
@@ -516,32 +510,3 @@ class StandardPipeline(BasePipeline):
|
|||||||
logger.error(f"❌ Video generation failed: {e}")
|
logger.error(f"❌ Video generation failed: {e}")
|
||||||
raise
|
raise
|
||||||
|
|
||||||
def _check_template_media_type(self, frame_template: str) -> str:
|
|
||||||
"""
|
|
||||||
Check template media type requirement
|
|
||||||
|
|
||||||
This is checked at pipeline level to avoid unnecessary:
|
|
||||||
- LLM calls (generating media prompts)
|
|
||||||
- Media generation API calls
|
|
||||||
- ComfyUI dependency
|
|
||||||
|
|
||||||
Template naming convention:
|
|
||||||
- static_*.html: Static style template (returns "static")
|
|
||||||
- image_*.html: Image template (returns "image")
|
|
||||||
- video_*.html: Video template (returns "video")
|
|
||||||
|
|
||||||
Args:
|
|
||||||
frame_template: Template path (e.g., "1080x1920/image_default.html" or "1080x1920/video_default.html")
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
"static", "image", or "video"
|
|
||||||
"""
|
|
||||||
from pixelle_video.utils.template_util import get_template_type
|
|
||||||
|
|
||||||
# Determine type by template filename prefix
|
|
||||||
template_name = Path(frame_template).name
|
|
||||||
template_type = get_template_type(template_name)
|
|
||||||
|
|
||||||
logger.debug(f"Template '{frame_template}' is {template_type} template")
|
|
||||||
return template_type
|
|
||||||
|
|
||||||
|
|||||||
@@ -30,7 +30,7 @@ from aiohttp import WSServerHandshakeError, ClientResponseError
|
|||||||
_USE_CERTIFI_SSL = True
|
_USE_CERTIFI_SSL = True
|
||||||
|
|
||||||
# Retry configuration for Edge TTS (to handle 401 errors)
|
# Retry configuration for Edge TTS (to handle 401 errors)
|
||||||
_RETRY_COUNT = 5 # Default retry count (increased from 3 to 5)
|
_RETRY_COUNT = 10 # Default retry count (increased from 3 to 5)
|
||||||
_RETRY_BASE_DELAY = 1.0 # Base retry delay in seconds (for exponential backoff)
|
_RETRY_BASE_DELAY = 1.0 # Base retry delay in seconds (for exponential backoff)
|
||||||
_MAX_RETRY_DELAY = 10.0 # Maximum retry delay in seconds
|
_MAX_RETRY_DELAY = 10.0 # Maximum retry delay in seconds
|
||||||
|
|
||||||
@@ -38,8 +38,27 @@ _MAX_RETRY_DELAY = 10.0 # Maximum retry delay in seconds
|
|||||||
_REQUEST_DELAY = 0.5 # Minimum delay before each request (seconds)
|
_REQUEST_DELAY = 0.5 # Minimum delay before each request (seconds)
|
||||||
_MAX_CONCURRENT_REQUESTS = 3 # Maximum concurrent requests
|
_MAX_CONCURRENT_REQUESTS = 3 # Maximum concurrent requests
|
||||||
|
|
||||||
# Global semaphore for rate limiting
|
# Global semaphore for rate limiting (created per event loop)
|
||||||
_request_semaphore = asyncio.Semaphore(_MAX_CONCURRENT_REQUESTS)
|
_request_semaphore = None
|
||||||
|
_semaphore_loop = None
|
||||||
|
|
||||||
|
|
||||||
|
def _get_request_semaphore():
|
||||||
|
"""Get or create request semaphore for current event loop"""
|
||||||
|
global _request_semaphore, _semaphore_loop
|
||||||
|
|
||||||
|
try:
|
||||||
|
current_loop = asyncio.get_running_loop()
|
||||||
|
except RuntimeError:
|
||||||
|
# No running loop
|
||||||
|
return asyncio.Semaphore(_MAX_CONCURRENT_REQUESTS)
|
||||||
|
|
||||||
|
# If semaphore doesn't exist or belongs to different loop, create new one
|
||||||
|
if _request_semaphore is None or _semaphore_loop != current_loop:
|
||||||
|
_request_semaphore = asyncio.Semaphore(_MAX_CONCURRENT_REQUESTS)
|
||||||
|
_semaphore_loop = current_loop
|
||||||
|
|
||||||
|
return _request_semaphore
|
||||||
|
|
||||||
|
|
||||||
async def edge_tts(
|
async def edge_tts(
|
||||||
@@ -98,7 +117,8 @@ async def edge_tts(
|
|||||||
logger.debug(f"Calling Edge TTS with voice: {voice}, rate: {rate}, retry_count: {retry_count}")
|
logger.debug(f"Calling Edge TTS with voice: {voice}, rate: {rate}, retry_count: {retry_count}")
|
||||||
|
|
||||||
# Use semaphore to limit concurrent requests
|
# Use semaphore to limit concurrent requests
|
||||||
async with _request_semaphore:
|
request_semaphore = _get_request_semaphore()
|
||||||
|
async with request_semaphore:
|
||||||
# Add a small random delay before each request to avoid rate limiting
|
# Add a small random delay before each request to avoid rate limiting
|
||||||
pre_delay = _REQUEST_DELAY + random.uniform(0, 0.3)
|
pre_delay = _REQUEST_DELAY + random.uniform(0, 0.3)
|
||||||
logger.debug(f"Waiting {pre_delay:.2f}s before request (rate limiting)")
|
logger.debug(f"Waiting {pre_delay:.2f}s before request (rate limiting)")
|
||||||
@@ -118,20 +138,17 @@ async def edge_tts(
|
|||||||
logger.info(f"🔄 Retrying Edge TTS (attempt {attempt + 1}/{retry_count + 1}) after {retry_delay:.2f}s delay...")
|
logger.info(f"🔄 Retrying Edge TTS (attempt {attempt + 1}/{retry_count + 1}) after {retry_delay:.2f}s delay...")
|
||||||
await asyncio.sleep(retry_delay)
|
await asyncio.sleep(retry_delay)
|
||||||
|
|
||||||
# Use certifi SSL context for proper certificate verification
|
|
||||||
if _USE_CERTIFI_SSL:
|
|
||||||
if attempt == 0: # Only log info once
|
|
||||||
logger.debug("Using certifi SSL certificates for secure Edge TTS connection")
|
|
||||||
original_create_default_context = ssl.create_default_context
|
|
||||||
|
|
||||||
def create_certifi_context(*args, **kwargs):
|
|
||||||
# Build SSL context that uses certifi bundle (resolves Windows / missing CA issues)
|
|
||||||
return original_create_default_context(cafile=certifi.where())
|
|
||||||
|
|
||||||
# Temporarily replace the function
|
|
||||||
ssl.create_default_context = create_certifi_context
|
|
||||||
|
|
||||||
try:
|
try:
|
||||||
|
# Create communicate instance with certifi SSL context
|
||||||
|
if _USE_CERTIFI_SSL:
|
||||||
|
if attempt == 0: # Only log info once
|
||||||
|
logger.debug("Using certifi SSL certificates for secure Edge TTS connection")
|
||||||
|
# Create SSL context with certifi bundle
|
||||||
|
import certifi
|
||||||
|
ssl_context = ssl.create_default_context(cafile=certifi.where())
|
||||||
|
else:
|
||||||
|
ssl_context = None
|
||||||
|
|
||||||
# Create communicate instance
|
# Create communicate instance
|
||||||
communicate = edge_tts_sdk.Communicate(
|
communicate = edge_tts_sdk.Communicate(
|
||||||
text=text,
|
text=text,
|
||||||
@@ -187,11 +204,6 @@ async def edge_tts(
|
|||||||
logger.error(f"Edge TTS error (non-retryable): {type(e).__name__} - {e}")
|
logger.error(f"Edge TTS error (non-retryable): {type(e).__name__} - {e}")
|
||||||
raise
|
raise
|
||||||
|
|
||||||
finally:
|
|
||||||
# Restore original function if we patched it
|
|
||||||
if _USE_CERTIFI_SSL:
|
|
||||||
ssl.create_default_context = original_create_default_context
|
|
||||||
|
|
||||||
# Should not reach here, but just in case
|
# Should not reach here, but just in case
|
||||||
if last_error:
|
if last_error:
|
||||||
raise last_error
|
raise last_error
|
||||||
@@ -255,7 +267,8 @@ async def list_voices(locale: str = None, retry_count: int = _RETRY_COUNT, retry
|
|||||||
logger.debug(f"Fetching Edge TTS voices, locale filter: {locale}, retry_count: {retry_count}")
|
logger.debug(f"Fetching Edge TTS voices, locale filter: {locale}, retry_count: {retry_count}")
|
||||||
|
|
||||||
# Use semaphore to limit concurrent requests
|
# Use semaphore to limit concurrent requests
|
||||||
async with _request_semaphore:
|
request_semaphore = _get_request_semaphore()
|
||||||
|
async with request_semaphore:
|
||||||
# Add a small random delay before each request to avoid rate limiting
|
# Add a small random delay before each request to avoid rate limiting
|
||||||
pre_delay = _REQUEST_DELAY + random.uniform(0, 0.3)
|
pre_delay = _REQUEST_DELAY + random.uniform(0, 0.3)
|
||||||
logger.debug(f"Waiting {pre_delay:.2f}s before request (rate limiting)")
|
logger.debug(f"Waiting {pre_delay:.2f}s before request (rate limiting)")
|
||||||
@@ -274,20 +287,8 @@ async def list_voices(locale: str = None, retry_count: int = _RETRY_COUNT, retry
|
|||||||
logger.info(f"🔄 Retrying list voices (attempt {attempt + 1}/{retry_count + 1}) after {retry_delay:.2f}s delay...")
|
logger.info(f"🔄 Retrying list voices (attempt {attempt + 1}/{retry_count + 1}) after {retry_delay:.2f}s delay...")
|
||||||
await asyncio.sleep(retry_delay)
|
await asyncio.sleep(retry_delay)
|
||||||
|
|
||||||
# Use certifi SSL context for proper certificate verification
|
|
||||||
if _USE_CERTIFI_SSL:
|
|
||||||
if attempt == 0: # Only log info once
|
|
||||||
logger.debug("Using certifi SSL certificates for secure Edge TTS connection")
|
|
||||||
original_create_default_context = ssl.create_default_context
|
|
||||||
|
|
||||||
def create_certifi_context(*args, **kwargs):
|
|
||||||
# Build SSL context that uses certifi bundle (resolves Windows / missing CA issues)
|
|
||||||
return original_create_default_context(cafile=certifi.where())
|
|
||||||
|
|
||||||
ssl.create_default_context = create_certifi_context
|
|
||||||
|
|
||||||
try:
|
try:
|
||||||
# Get all voices
|
# Get all voices (edge-tts handles SSL internally)
|
||||||
voices = await edge_tts_sdk.list_voices()
|
voices = await edge_tts_sdk.list_voices()
|
||||||
|
|
||||||
# Filter by locale if specified
|
# Filter by locale if specified
|
||||||
@@ -326,11 +327,6 @@ async def list_voices(locale: str = None, retry_count: int = _RETRY_COUNT, retry
|
|||||||
logger.error(f"List voices error (non-retryable): {type(e).__name__} - {e}")
|
logger.error(f"List voices error (non-retryable): {type(e).__name__} - {e}")
|
||||||
raise
|
raise
|
||||||
|
|
||||||
finally:
|
|
||||||
# Restore original function if we patched it
|
|
||||||
if _USE_CERTIFI_SSL:
|
|
||||||
ssl.create_default_context = original_create_default_context
|
|
||||||
|
|
||||||
# Should not reach here, but just in case
|
# Should not reach here, but just in case
|
||||||
if last_error:
|
if last_error:
|
||||||
raise last_error
|
raise last_error
|
||||||
|
|||||||
Reference in New Issue
Block a user