重构pipeline的UI架构,支持后续pipeline的动态拓展

This commit is contained in:
puke
2025-12-03 11:33:18 +08:00
parent b58f529ce0
commit 6e99612a68
10 changed files with 784 additions and 562 deletions

View File

@@ -18,11 +18,14 @@ Each pipeline implements a specific video generation approach.
"""
from pixelle_video.pipelines.base import BasePipeline
from pixelle_video.pipelines.linear import LinearVideoPipeline, PipelineContext
from pixelle_video.pipelines.standard import StandardPipeline
from pixelle_video.pipelines.custom import CustomPipeline
__all__ = [
"BasePipeline",
"LinearVideoPipeline",
"PipelineContext",
"StandardPipeline",
"CustomPipeline",
]

View File

@@ -0,0 +1,161 @@
# Copyright (C) 2025 AIDC-AI
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Linear Video Pipeline Base Class
This module defines the template method pattern for linear video generation workflows.
It introduces `PipelineContext` for state management and `LinearVideoPipeline` for
process orchestration.
"""
from dataclasses import dataclass, field
from typing import Optional, List, Dict, Any, Callable
from loguru import logger
from pixelle_video.pipelines.base import BasePipeline
from pixelle_video.models.storyboard import (
Storyboard,
VideoGenerationResult,
StoryboardConfig
)
from pixelle_video.models.progress import ProgressEvent
@dataclass
class PipelineContext:
"""
Context object holding the state of a single pipeline execution.
This object is passed between steps in the LinearVideoPipeline lifecycle.
"""
# === Input ===
input_text: str
params: Dict[str, Any]
progress_callback: Optional[Callable[[ProgressEvent], None]] = None
# === Task State ===
task_id: Optional[str] = None
task_dir: Optional[str] = None
# === Content ===
title: Optional[str] = None
narrations: List[str] = field(default_factory=list)
# === Visuals ===
image_prompts: List[Optional[str]] = field(default_factory=list)
# === Configuration & Storyboard ===
config: Optional[StoryboardConfig] = None
storyboard: Optional[Storyboard] = None
# === Output ===
final_video_path: Optional[str] = None
result: Optional[VideoGenerationResult] = None
class LinearVideoPipeline(BasePipeline):
"""
Base class for linear video generation pipelines using the Template Method pattern.
This class orchestrates the video generation process into distinct lifecycle steps:
1. setup_environment
2. generate_content
3. determine_title
4. plan_visuals
5. initialize_storyboard
6. produce_assets
7. post_production
8. finalize
Subclasses should override specific steps to customize behavior while maintaining
the overall workflow structure.
"""
async def __call__(
self,
text: str,
progress_callback: Optional[Callable[[ProgressEvent], None]] = None,
**kwargs
) -> VideoGenerationResult:
"""
Execute the pipeline using the template method.
"""
# 1. Initialize context
ctx = PipelineContext(
input_text=text,
params=kwargs,
progress_callback=progress_callback
)
try:
# === Phase 1: Preparation ===
await self.setup_environment(ctx)
# === Phase 2: Content Creation ===
await self.generate_content(ctx)
await self.determine_title(ctx)
# === Phase 3: Visual Planning ===
await self.plan_visuals(ctx)
await self.initialize_storyboard(ctx)
# === Phase 4: Asset Production ===
await self.produce_assets(ctx)
# === Phase 5: Post Production ===
await self.post_production(ctx)
# === Phase 6: Finalization ===
return await self.finalize(ctx)
except Exception as e:
await self.handle_exception(ctx, e)
raise
# ==================== Lifecycle Methods ====================
async def setup_environment(self, ctx: PipelineContext):
"""Step 1: Setup task directory and environment."""
pass
async def generate_content(self, ctx: PipelineContext):
"""Step 2: Generate or process script/narrations."""
pass
async def determine_title(self, ctx: PipelineContext):
"""Step 3: Determine or generate video title."""
pass
async def plan_visuals(self, ctx: PipelineContext):
"""Step 4: Generate image prompts or visual descriptions."""
pass
async def initialize_storyboard(self, ctx: PipelineContext):
"""Step 5: Create Storyboard object and frames."""
pass
async def produce_assets(self, ctx: PipelineContext):
"""Step 6: Generate audio, images, and render frames (Core processing)."""
pass
async def post_production(self, ctx: PipelineContext):
"""Step 7: Concatenate videos and add BGM."""
pass
async def finalize(self, ctx: PipelineContext) -> VideoGenerationResult:
"""Step 8: Create result object and persist metadata."""
raise NotImplementedError("finalize must be implemented by subclass")
async def handle_exception(self, ctx: PipelineContext, error: Exception):
"""Handle exceptions during pipeline execution."""
logger.error(f"Pipeline execution failed: {error}")

View File

@@ -15,16 +15,18 @@ Standard Video Generation Pipeline
Standard workflow for generating short videos from topic or fixed script.
This is the default pipeline for general-purpose video generation.
Refactored to use LinearVideoPipeline (Template Method Pattern).
"""
from datetime import datetime
from pathlib import Path
from typing import Optional, Callable, Literal
from typing import Optional, Callable, Literal, List
import asyncio
import shutil
from loguru import logger
import asyncio
from pixelle_video.pipelines.base import BasePipeline
from pixelle_video.pipelines.linear import LinearVideoPipeline, PipelineContext
from pixelle_video.models.progress import ProgressEvent
from pixelle_video.models.storyboard import (
Storyboard,
@@ -39,13 +41,20 @@ from pixelle_video.utils.content_generators import (
split_narration_script,
generate_image_prompts,
)
from pixelle_video.utils.os_util import (
create_task_output_dir,
get_task_final_video_path
)
from pixelle_video.utils.template_util import get_template_type
from pixelle_video.utils.prompt_helper import build_image_prompt
from pixelle_video.services.video import VideoService
# Parallel limit for RunningHub workflows (Call by sequential if set to 1)
RUNNING_HUB_PARALLEL_LIMIT = 1
class StandardPipeline(BasePipeline):
class StandardPipeline(LinearVideoPipeline):
"""
Standard video generation pipeline
@@ -66,405 +75,265 @@ class StandardPipeline(BasePipeline):
- "fixed": Use provided script as-is (each line = one narration)
"""
async def __call__(
self,
# === Input (Required) ===
text: str,
media_width: int, # Required: Media width (from template)
media_height: int, # Required: Media height (from template)
# ==================== Lifecycle Methods ====================
async def setup_environment(self, ctx: PipelineContext):
"""Step 1: Setup task directory and environment."""
text = ctx.input_text
mode = ctx.params.get("mode", "generate")
# === Processing Mode ===
mode: Literal["generate", "fixed"] = "generate",
# === Optional Title ===
title: Optional[str] = None,
# === Basic Config ===
n_scenes: int = 5, # Only used in generate mode; ignored in fixed mode
# === TTS Parameters (supports both old and new parameter names) ===
tts_inference_mode: Optional[str] = None, # "local" or "comfyui" (web UI)
voice_id: Optional[str] = None, # For backward compatibility (deprecated)
tts_voice: Optional[str] = None, # Voice ID for local mode (web UI)
tts_workflow: Optional[str] = None,
tts_speed: float = 1.2,
ref_audio: Optional[str] = None, # Reference audio for voice cloning
output_path: Optional[str] = None,
# === LLM Parameters ===
min_narration_words: int = 5,
max_narration_words: int = 20,
min_image_prompt_words: int = 30,
max_image_prompt_words: int = 60,
# === Media Workflow ===
media_workflow: Optional[str] = None,
# === Video Parameters ===
video_fps: int = 30,
# === Frame Template (determines video size) ===
frame_template: Optional[str] = None,
# === Template Custom Parameters ===
template_params: Optional[dict] = None, # Custom template parameters
# === Image Style ===
prompt_prefix: Optional[str] = None,
# === BGM Parameters ===
bgm_path: Optional[str] = None,
bgm_volume: float = 0.2,
bgm_mode: Literal["once", "loop"] = "loop",
# === Advanced Options ===
content_metadata: Optional[ContentMetadata] = None,
progress_callback: Optional[Callable[[ProgressEvent], None]] = None,
) -> VideoGenerationResult:
"""
Generate short video from text input
Args:
text: Text input (required)
- For generate mode: topic/theme (e.g., "如何提高学习效率")
- For fixed mode: complete narration script (each line is a narration)
mode: Processing mode (default "generate")
- "generate": LLM generates narrations from topic, creates n_scenes
- "fixed": Use existing script as-is, each line becomes a narration
Note: In fixed mode, n_scenes is ignored (uses actual line count)
title: Video title (optional)
- If provided, use it as the video title
- If not provided:
* generate mode → use text as title
* fixed mode → LLM generates title from script
n_scenes: Number of storyboard scenes (default 5)
Only effective in generate mode; ignored in fixed mode
voice_id: TTS voice ID (default "[Chinese] zh-CN Yunjian")
tts_workflow: TTS workflow filename (e.g., "tts_edge.json", None = use default)
tts_speed: TTS speed multiplier (1.0 = normal, 1.2 = 20% faster, default 1.2)
ref_audio: Reference audio path for voice cloning (optional)
output_path: Output video path (auto-generated if None)
min_narration_words: Min narration length (generate mode only)
max_narration_words: Max narration length (generate mode only)
min_image_prompt_words: Min image prompt length
max_image_prompt_words: Max image prompt length
media_width: Media width (image or video, required)
media_height: Media height (image or video, required)
media_workflow: Media workflow filename (image or video, e.g., "image_flux.json", "video_wan.json", None = use default)
video_fps: Video frame rate (default 30)
frame_template: HTML template path with size (None = use default "1080x1920/default.html")
Format: "SIZExSIZE/template.html" (e.g., "1080x1920/default.html", "1920x1080/modern.html")
Video size is automatically determined from template path
template_params: Custom template parameters (optional dict)
e.g., {"accent_color": "#ff0000", "author": "John Doe"}
prompt_prefix: Image prompt prefix (overrides config.yaml if provided)
e.g., "anime style, vibrant colors" or "" for no prefix
bgm_path: BGM path (filename like "default.mp3", custom path, or None)
bgm_volume: BGM volume 0.0-1.0 (default 0.2)
bgm_mode: BGM mode "once" or "loop" (default "loop")
content_metadata: Content metadata (optional, for display)
progress_callback: Progress callback function(ProgressEvent)
Returns:
VideoGenerationResult with video path and metadata
"""
# ========== Step 0: Process text and determine title ==========
logger.info(f"🚀 Starting StandardPipeline in '{mode}' mode")
logger.info(f" Text length: {len(text)} chars")
# Create isolated task directory
task_dir, task_id = create_task_output_dir()
ctx.task_id = task_id
ctx.task_dir = task_dir
logger.info(f"📁 Task directory created: {task_dir}")
logger.info(f" Task ID: {task_id}")
# Determine final video path
output_path = ctx.params.get("output_path")
if output_path is None:
ctx.final_video_path = get_task_final_video_path(task_id)
else:
# We will copy to this path in finalize/post_production
# For internal processing, we still use the task dir path?
# Actually StandardPipeline logic used get_task_final_video_path as the target for concat
# and then copied. Let's stick to that.
ctx.final_video_path = get_task_final_video_path(task_id)
logger.info(f" Will copy final video to: {output_path}")
async def generate_content(self, ctx: PipelineContext):
"""Step 2: Generate or process script/narrations."""
mode = ctx.params.get("mode", "generate")
text = ctx.input_text
n_scenes = ctx.params.get("n_scenes", 5)
min_words = ctx.params.get("min_narration_words", 5)
max_words = ctx.params.get("max_narration_words", 20)
if mode == "generate":
self._report_progress(ctx.progress_callback, "generating_narrations", 0.05)
ctx.narrations = await generate_narrations_from_topic(
self.llm,
topic=text,
n_scenes=n_scenes,
min_words=min_words,
max_words=max_words
)
logger.info(f"✅ Generated {len(ctx.narrations)} narrations")
else: # fixed
self._report_progress(ctx.progress_callback, "splitting_script", 0.05)
ctx.narrations = await split_narration_script(text)
logger.info(f"✅ Split script into {len(ctx.narrations)} segments (by lines)")
logger.info(f" Note: n_scenes={n_scenes} is ignored in fixed mode")
async def determine_title(self, ctx: PipelineContext):
"""Step 3: Determine or generate video title."""
# Note: Swapped order with generate_content in base class call,
# but in StandardPipeline original code, title was determined BEFORE narrations.
# However, LinearVideoPipeline defines generate_content BEFORE determine_title.
# This is fine as they are independent in StandardPipeline logic.
title = ctx.params.get("title")
mode = ctx.params.get("mode", "generate")
text = ctx.input_text
if title:
ctx.title = title
logger.info(f" Title: '{title}' (user-specified)")
else:
self._report_progress(ctx.progress_callback, "generating_title", 0.01)
if mode == "generate":
ctx.title = await generate_title(self.llm, text, strategy="auto")
logger.info(f" Title: '{ctx.title}' (auto-generated)")
else: # fixed
ctx.title = await generate_title(self.llm, text, strategy="llm")
logger.info(f" Title: '{ctx.title}' (LLM-generated)")
async def plan_visuals(self, ctx: PipelineContext):
"""Step 4: Generate image prompts or visual descriptions."""
# Detect template type to determine if media generation is needed
frame_template = ctx.params.get("frame_template") or "1080x1920/default.html"
template_name = Path(frame_template).name
template_type = get_template_type(template_name)
template_requires_media = (template_type in ["image", "video"])
if template_type == "image":
logger.info(f"📸 Template requires image generation")
elif template_type == "video":
logger.info(f"🎬 Template requires video generation")
else: # static
logger.info(f"⚡ Static template - skipping media generation pipeline")
logger.info(f" 💡 Benefits: Faster generation + Lower cost + No ComfyUI dependency")
# Only generate image prompts if template requires media
if template_requires_media:
self._report_progress(ctx.progress_callback, "generating_image_prompts", 0.15)
prompt_prefix = ctx.params.get("prompt_prefix")
min_words = ctx.params.get("min_image_prompt_words", 30)
max_words = ctx.params.get("max_image_prompt_words", 60)
# Override prompt_prefix if provided
original_prefix = None
if prompt_prefix is not None:
image_config = self.core.config.get("comfyui", {}).get("image", {})
original_prefix = image_config.get("prompt_prefix")
image_config["prompt_prefix"] = prompt_prefix
logger.info(f"Using custom prompt_prefix: '{prompt_prefix}'")
try:
# Create progress callback wrapper for image prompt generation
def image_prompt_progress(completed: int, total: int, message: str):
batch_progress = completed / total if total > 0 else 0
overall_progress = 0.15 + (batch_progress * 0.15)
self._report_progress(
ctx.progress_callback,
"generating_image_prompts",
overall_progress,
extra_info=message
)
# Generate base image prompts
base_image_prompts = await generate_image_prompts(
self.llm,
narrations=ctx.narrations,
min_words=min_words,
max_words=max_words,
progress_callback=image_prompt_progress
)
# Apply prompt prefix
image_config = self.core.config.get("comfyui", {}).get("image", {})
prompt_prefix_to_use = prompt_prefix if prompt_prefix is not None else image_config.get("prompt_prefix", "")
ctx.image_prompts = []
for base_prompt in base_image_prompts:
final_prompt = build_image_prompt(base_prompt, prompt_prefix_to_use)
ctx.image_prompts.append(final_prompt)
finally:
# Restore original prompt_prefix
if original_prefix is not None:
image_config["prompt_prefix"] = original_prefix
logger.info(f"✅ Generated {len(ctx.image_prompts)} image prompts")
else:
# Static template - skip image prompt generation entirely
ctx.image_prompts = [None] * len(ctx.narrations)
logger.info(f"⚡ Skipped image prompt generation (static template)")
logger.info(f" 💡 Savings: {len(ctx.narrations)} LLM calls + {len(ctx.narrations)} media generations")
async def initialize_storyboard(self, ctx: PipelineContext):
"""Step 5: Create Storyboard object and frames."""
# === Handle TTS parameter compatibility ===
# Support both old API (voice_id) and new API (tts_inference_mode + tts_voice)
tts_inference_mode = ctx.params.get("tts_inference_mode")
tts_voice = ctx.params.get("tts_voice")
voice_id = ctx.params.get("voice_id")
tts_workflow = ctx.params.get("tts_workflow")
final_voice_id = None
final_tts_workflow = tts_workflow
if tts_inference_mode:
# New API from web UI
if tts_inference_mode == "local":
# Local Edge TTS mode - use tts_voice
final_voice_id = tts_voice or "zh-CN-YunjianNeural"
final_tts_workflow = None # Don't use workflow in local mode
final_tts_workflow = None
logger.debug(f"TTS Mode: local (voice={final_voice_id})")
elif tts_inference_mode == "comfyui":
# ComfyUI workflow mode
final_voice_id = None # Don't use voice_id in ComfyUI mode
# tts_workflow already set from parameter
final_voice_id = None
logger.debug(f"TTS Mode: comfyui (workflow={final_tts_workflow})")
else:
# Old API (backward compatibility)
# Old API
final_voice_id = voice_id or tts_voice or "zh-CN-YunjianNeural"
# tts_workflow already set from parameter
logger.debug(f"TTS Mode: legacy (voice_id={final_voice_id}, workflow={final_tts_workflow})")
# Determine final title
if title:
final_title = title
logger.info(f" Title: '{title}' (user-specified)")
else:
self._report_progress(progress_callback, "generating_title", 0.01)
if mode == "generate":
final_title = await generate_title(self.llm, text, strategy="auto")
logger.info(f" Title: '{final_title}' (auto-generated)")
else: # fixed
final_title = await generate_title(self.llm, text, strategy="llm")
logger.info(f" Title: '{final_title}' (LLM-generated)")
# ========== Step 0.5: Create isolated task directory ==========
from pixelle_video.utils.os_util import (
create_task_output_dir,
get_task_final_video_path
)
task_dir, task_id = create_task_output_dir()
logger.info(f"📁 Task directory created: {task_dir}")
logger.info(f" Task ID: {task_id}")
# Determine final video path
user_specified_output = None
if output_path is None:
output_path = get_task_final_video_path(task_id)
else:
user_specified_output = output_path
output_path = get_task_final_video_path(task_id)
logger.info(f" Will copy final video to: {user_specified_output}")
# Create storyboard config
config = StoryboardConfig(
task_id=task_id,
n_storyboard=n_scenes,
min_narration_words=min_narration_words,
max_narration_words=max_narration_words,
min_image_prompt_words=min_image_prompt_words,
max_image_prompt_words=max_image_prompt_words,
video_fps=video_fps,
tts_inference_mode=tts_inference_mode or "local", # TTS inference mode (CRITICAL FIX)
voice_id=final_voice_id, # Use processed voice_id
tts_workflow=final_tts_workflow, # Use processed workflow
tts_speed=tts_speed,
ref_audio=ref_audio,
media_width=media_width,
media_height=media_height,
media_workflow=media_workflow,
frame_template=frame_template or "1080x1920/default.html",
template_params=template_params # Custom template parameters
# Create config
ctx.config = StoryboardConfig(
task_id=ctx.task_id,
n_storyboard=len(ctx.narrations), # Use actual length
min_narration_words=ctx.params.get("min_narration_words", 5),
max_narration_words=ctx.params.get("max_narration_words", 20),
min_image_prompt_words=ctx.params.get("min_image_prompt_words", 30),
max_image_prompt_words=ctx.params.get("max_image_prompt_words", 60),
video_fps=ctx.params.get("video_fps", 30),
tts_inference_mode=tts_inference_mode or "local",
voice_id=final_voice_id,
tts_workflow=final_tts_workflow,
tts_speed=ctx.params.get("tts_speed", 1.2),
ref_audio=ctx.params.get("ref_audio"),
media_width=ctx.params.get("media_width"),
media_height=ctx.params.get("media_height"),
media_workflow=ctx.params.get("media_workflow"),
frame_template=ctx.params.get("frame_template") or "1080x1920/default.html",
template_params=ctx.params.get("template_params")
)
# Create storyboard
storyboard = Storyboard(
title=final_title,
config=config,
content_metadata=content_metadata,
ctx.storyboard = Storyboard(
title=ctx.title,
config=ctx.config,
content_metadata=ctx.params.get("content_metadata"),
created_at=datetime.now()
)
try:
# ========== Step 1: Generate/Split narrations ==========
if mode == "generate":
self._report_progress(progress_callback, "generating_narrations", 0.05)
narrations = await generate_narrations_from_topic(
self.llm,
topic=text,
n_scenes=n_scenes,
min_words=min_narration_words,
max_words=max_narration_words
)
logger.info(f"✅ Generated {len(narrations)} narrations")
else: # fixed
self._report_progress(progress_callback, "splitting_script", 0.05)
narrations = await split_narration_script(text)
logger.info(f"✅ Split script into {len(narrations)} segments (by lines)")
logger.info(f" Note: n_scenes={n_scenes} is ignored in fixed mode")
# ========== Step 2: Check template type and conditionally generate image prompts ==========
# Detect template type to determine if media generation is needed
from pathlib import Path
from pixelle_video.utils.template_util import get_template_type
template_name = Path(config.frame_template).name
template_type = get_template_type(template_name)
template_requires_media = (template_type in ["image", "video"])
if template_type == "image":
logger.info(f"📸 Template requires image generation")
elif template_type == "video":
logger.info(f"🎬 Template requires video generation")
else: # static
logger.info(f"⚡ Static template - skipping media generation pipeline")
logger.info(f" 💡 Benefits: Faster generation + Lower cost + No ComfyUI dependency")
# Only generate image prompts if template requires media
if template_requires_media:
self._report_progress(progress_callback, "generating_image_prompts", 0.15)
# Override prompt_prefix if provided
original_prefix = None
if prompt_prefix is not None:
image_config = self.core.config.get("comfyui", {}).get("image", {})
original_prefix = image_config.get("prompt_prefix")
image_config["prompt_prefix"] = prompt_prefix
logger.info(f"Using custom prompt_prefix: '{prompt_prefix}'")
try:
# Create progress callback wrapper for image prompt generation
def image_prompt_progress(completed: int, total: int, message: str):
batch_progress = completed / total if total > 0 else 0
overall_progress = 0.15 + (batch_progress * 0.15)
self._report_progress(
progress_callback,
"generating_image_prompts",
overall_progress,
extra_info=message
)
# Generate base image prompts
base_image_prompts = await generate_image_prompts(
self.llm,
narrations=narrations,
min_words=min_image_prompt_words,
max_words=max_image_prompt_words,
progress_callback=image_prompt_progress
)
# Apply prompt prefix
from pixelle_video.utils.prompt_helper import build_image_prompt
image_config = self.core.config.get("comfyui", {}).get("image", {})
prompt_prefix_to_use = prompt_prefix if prompt_prefix is not None else image_config.get("prompt_prefix", "")
image_prompts = []
for base_prompt in base_image_prompts:
final_prompt = build_image_prompt(base_prompt, prompt_prefix_to_use)
image_prompts.append(final_prompt)
finally:
# Restore original prompt_prefix
if original_prefix is not None:
image_config["prompt_prefix"] = original_prefix
logger.info(f"✅ Generated {len(image_prompts)} image prompts")
else:
# Static template - skip image prompt generation entirely
image_prompts = [None] * len(narrations)
logger.info(f"⚡ Skipped image prompt generation (static template)")
logger.info(f" 💡 Savings: {len(narrations)} LLM calls + {len(narrations)} media generations")
# ========== Step 3: Create frames ==========
for i, (narration, image_prompt) in enumerate(zip(narrations, image_prompts)):
frame = StoryboardFrame(
index=i,
narration=narration,
image_prompt=image_prompt,
created_at=datetime.now()
)
storyboard.frames.append(frame)
# ========== Step 4: Process each frame ==========
# Check if using RunningHub workflows for parallel processing
# Enable parallel if either TTS or Image uses RunningHub (most time-consuming parts)
is_runninghub = (
(config.tts_workflow and config.tts_workflow.startswith("runninghub/")) or
(config.media_workflow and config.media_workflow.startswith("runninghub/"))
# Create frames
for i, (narration, image_prompt) in enumerate(zip(ctx.narrations, ctx.image_prompts)):
frame = StoryboardFrame(
index=i,
narration=narration,
image_prompt=image_prompt,
created_at=datetime.now()
)
ctx.storyboard.frames.append(frame)
async def produce_assets(self, ctx: PipelineContext):
"""Step 6: Generate audio, images, and render frames (Core processing)."""
storyboard = ctx.storyboard
config = ctx.config
# Check if using RunningHub workflows for parallel processing
is_runninghub = (
(config.tts_workflow and config.tts_workflow.startswith("runninghub/")) or
(config.media_workflow and config.media_workflow.startswith("runninghub/"))
)
if is_runninghub and RUNNING_HUB_PARALLEL_LIMIT > 1:
logger.info(f"🚀 Using parallel processing for RunningHub workflows (max {RUNNING_HUB_PARALLEL_LIMIT} concurrent)")
if is_runninghub and RUNNING_HUB_PARALLEL_LIMIT > 1:
logger.info(f"🚀 Using parallel processing for RunningHub workflows (max {RUNNING_HUB_PARALLEL_LIMIT} concurrent)")
logger.info(f" TTS: {'runninghub' if config.tts_workflow and config.tts_workflow.startswith('runninghub/') else 'local'}")
logger.info(f" Media: {'runninghub' if config.media_workflow and config.media_workflow.startswith('runninghub/') else 'local'}")
semaphore = asyncio.Semaphore(RUNNING_HUB_PARALLEL_LIMIT)
completed_count = 0
async def process_frame_with_semaphore(i: int, frame: StoryboardFrame):
nonlocal completed_count
async with semaphore:
base_progress = 0.2
frame_range = 0.6
per_frame_progress = frame_range / len(storyboard.frames)
# Create frame-specific progress callback
def frame_progress_callback(event: ProgressEvent):
overall_progress = base_progress + (per_frame_progress * completed_count) + (per_frame_progress * event.progress)
if progress_callback:
adjusted_event = ProgressEvent(
event_type=event.event_type,
progress=overall_progress,
frame_current=i+1,
frame_total=len(storyboard.frames),
step=event.step,
action=event.action
)
progress_callback(adjusted_event)
# Report frame start
self._report_progress(
progress_callback,
"processing_frame",
base_progress + (per_frame_progress * completed_count),
frame_current=i+1,
frame_total=len(storyboard.frames)
)
processed_frame = await self.core.frame_processor(
frame=frame,
storyboard=storyboard,
config=config,
total_frames=len(storyboard.frames),
progress_callback=frame_progress_callback
)
completed_count += 1
logger.info(f"✅ Frame {i+1} completed ({processed_frame.duration:.2f}s) [{completed_count}/{len(storyboard.frames)}]")
return i, processed_frame
# Create all tasks and execute in parallel
tasks = [process_frame_with_semaphore(i, frame) for i, frame in enumerate(storyboard.frames)]
results = await asyncio.gather(*tasks)
# Update frames in order and calculate total duration
for idx, processed_frame in sorted(results, key=lambda x: x[0]):
storyboard.frames[idx] = processed_frame
storyboard.total_duration += processed_frame.duration
logger.info(f"✅ All frames processed in parallel (total duration: {storyboard.total_duration:.2f}s)")
else:
# Serial processing for non-RunningHub workflows
logger.info("⚙️ Using serial processing (non-RunningHub workflow)")
for i, frame in enumerate(storyboard.frames):
semaphore = asyncio.Semaphore(RUNNING_HUB_PARALLEL_LIMIT)
completed_count = 0
async def process_frame_with_semaphore(i: int, frame: StoryboardFrame):
nonlocal completed_count
async with semaphore:
base_progress = 0.2
frame_range = 0.6
per_frame_progress = frame_range / len(storyboard.frames)
# Create frame-specific progress callback
def frame_progress_callback(event: ProgressEvent):
overall_progress = base_progress + (per_frame_progress * i) + (per_frame_progress * event.progress)
if progress_callback:
overall_progress = base_progress + (per_frame_progress * completed_count) + (per_frame_progress * event.progress)
if ctx.progress_callback:
adjusted_event = ProgressEvent(
event_type=event.event_type,
progress=overall_progress,
frame_current=event.frame_current,
frame_total=event.frame_total,
frame_current=i+1,
frame_total=len(storyboard.frames),
step=event.step,
action=event.action
)
progress_callback(adjusted_event)
ctx.progress_callback(adjusted_event)
# Report frame start
self._report_progress(
progress_callback,
ctx.progress_callback,
"processing_frame",
base_progress + (per_frame_progress * i),
base_progress + (per_frame_progress * completed_count),
frame_current=i+1,
frame_total=len(storyboard.frames)
)
@@ -476,110 +345,136 @@ class StandardPipeline(BasePipeline):
total_frames=len(storyboard.frames),
progress_callback=frame_progress_callback
)
storyboard.total_duration += processed_frame.duration
logger.info(f"✅ Frame {i+1} completed ({processed_frame.duration:.2f}s)")
completed_count += 1
logger.info(f"✅ Frame {i+1} completed ({processed_frame.duration:.2f}s) [{completed_count}/{len(storyboard.frames)}]")
return i, processed_frame
# ========== Step 5: Concatenate videos ==========
self._report_progress(progress_callback, "concatenating", 0.85)
segment_paths = [frame.video_segment_path for frame in storyboard.frames]
# Create all tasks and execute in parallel
tasks = [process_frame_with_semaphore(i, frame) for i, frame in enumerate(storyboard.frames)]
results = await asyncio.gather(*tasks)
from pixelle_video.services.video import VideoService
video_service = VideoService()
# Update frames in order and calculate total duration
for idx, processed_frame in sorted(results, key=lambda x: x[0]):
storyboard.frames[idx] = processed_frame
storyboard.total_duration += processed_frame.duration
final_video_path = video_service.concat_videos(
videos=segment_paths,
output=output_path,
bgm_path=bgm_path,
bgm_volume=bgm_volume,
bgm_mode=bgm_mode
)
logger.info(f"✅ All frames processed in parallel (total duration: {storyboard.total_duration:.2f}s)")
else:
# Serial processing for non-RunningHub workflows
logger.info("⚙️ Using serial processing (non-RunningHub workflow)")
storyboard.final_video_path = final_video_path
storyboard.completed_at = datetime.now()
# Copy to user-specified path if provided
if user_specified_output:
import shutil
Path(user_specified_output).parent.mkdir(parents=True, exist_ok=True)
shutil.copy2(final_video_path, user_specified_output)
logger.info(f"📹 Final video copied to: {user_specified_output}")
final_video_path = user_specified_output
storyboard.final_video_path = user_specified_output
logger.success(f"🎬 Video generation completed: {final_video_path}")
# ========== Step 6: Create result ==========
self._report_progress(progress_callback, "completed", 1.0)
video_path_obj = Path(final_video_path)
file_size = video_path_obj.stat().st_size
result = VideoGenerationResult(
video_path=final_video_path,
storyboard=storyboard,
duration=storyboard.total_duration,
file_size=file_size
)
logger.info(f"✅ Generated video: {final_video_path}")
logger.info(f" Duration: {storyboard.total_duration:.2f}s")
logger.info(f" Size: {file_size / (1024*1024):.2f} MB")
logger.info(f" Frames: {len(storyboard.frames)}")
# ========== Step 7: Persist metadata and storyboard ==========
await self._persist_task_data(
storyboard=storyboard,
result=result,
input_params={
"text": text,
"mode": mode,
"title": title,
"n_scenes": n_scenes,
"tts_inference_mode": tts_inference_mode,
"tts_voice": tts_voice,
"voice_id": voice_id,
"tts_workflow": tts_workflow,
"tts_speed": tts_speed,
"ref_audio": ref_audio,
"media_workflow": media_workflow,
"prompt_prefix": prompt_prefix,
"frame_template": frame_template,
"template_params": template_params,
"bgm_path": bgm_path,
"bgm_volume": bgm_volume,
"bgm_mode": bgm_mode,
}
)
return result
except Exception as e:
logger.error(f"❌ Video generation failed: {e}")
raise
async def _persist_task_data(
self,
storyboard: Storyboard,
result: VideoGenerationResult,
input_params: dict
):
for i, frame in enumerate(storyboard.frames):
base_progress = 0.2
frame_range = 0.6
per_frame_progress = frame_range / len(storyboard.frames)
# Create frame-specific progress callback
def frame_progress_callback(event: ProgressEvent):
overall_progress = base_progress + (per_frame_progress * i) + (per_frame_progress * event.progress)
if ctx.progress_callback:
adjusted_event = ProgressEvent(
event_type=event.event_type,
progress=overall_progress,
frame_current=event.frame_current,
frame_total=event.frame_total,
step=event.step,
action=event.action
)
ctx.progress_callback(adjusted_event)
# Report frame start
self._report_progress(
ctx.progress_callback,
"processing_frame",
base_progress + (per_frame_progress * i),
frame_current=i+1,
frame_total=len(storyboard.frames)
)
processed_frame = await self.core.frame_processor(
frame=frame,
storyboard=storyboard,
config=config,
total_frames=len(storyboard.frames),
progress_callback=frame_progress_callback
)
storyboard.total_duration += processed_frame.duration
logger.info(f"✅ Frame {i+1} completed ({processed_frame.duration:.2f}s)")
async def post_production(self, ctx: PipelineContext):
"""Step 7: Concatenate videos and add BGM."""
self._report_progress(ctx.progress_callback, "concatenating", 0.85)
storyboard = ctx.storyboard
segment_paths = [frame.video_segment_path for frame in storyboard.frames]
video_service = VideoService()
final_video_path = video_service.concat_videos(
videos=segment_paths,
output=ctx.final_video_path,
bgm_path=ctx.params.get("bgm_path"),
bgm_volume=ctx.params.get("bgm_volume", 0.2),
bgm_mode=ctx.params.get("bgm_mode", "loop")
)
storyboard.final_video_path = final_video_path
storyboard.completed_at = datetime.now()
# Copy to user-specified path if provided
user_specified_output = ctx.params.get("output_path")
if user_specified_output:
Path(user_specified_output).parent.mkdir(parents=True, exist_ok=True)
shutil.copy2(final_video_path, user_specified_output)
logger.info(f"📹 Final video copied to: {user_specified_output}")
ctx.final_video_path = user_specified_output
storyboard.final_video_path = user_specified_output
logger.success(f"🎬 Video generation completed: {ctx.final_video_path}")
async def finalize(self, ctx: PipelineContext) -> VideoGenerationResult:
"""Step 8: Create result object and persist metadata."""
self._report_progress(ctx.progress_callback, "completed", 1.0)
video_path_obj = Path(ctx.final_video_path)
file_size = video_path_obj.stat().st_size
result = VideoGenerationResult(
video_path=ctx.final_video_path,
storyboard=ctx.storyboard,
duration=ctx.storyboard.total_duration,
file_size=file_size
)
ctx.result = result
logger.info(f"✅ Generated video: {ctx.final_video_path}")
logger.info(f" Duration: {ctx.storyboard.total_duration:.2f}s")
logger.info(f" Size: {file_size / (1024*1024):.2f} MB")
logger.info(f" Frames: {len(ctx.storyboard.frames)}")
# Persist metadata
await self._persist_task_data(ctx)
return result
async def _persist_task_data(self, ctx: PipelineContext):
"""
Persist task metadata and storyboard to filesystem
Args:
storyboard: Complete storyboard
result: Video generation result
input_params: Input parameters used for generation
"""
try:
storyboard = ctx.storyboard
result = ctx.result
task_id = storyboard.config.task_id
if not task_id:
logger.warning("No task_id in storyboard, skipping persistence")
return
# Build metadata
# If user didn't provide a title, use the generated one from storyboard
input_with_title = input_params.copy()
input_with_title = ctx.params.copy()
input_with_title["text"] = ctx.input_text # Ensure text is included
if not input_with_title.get("title"):
input_with_title["title"] = storyboard.title
@@ -617,4 +512,3 @@ class StandardPipeline(BasePipeline):
except Exception as e:
logger.error(f"Failed to persist task data: {e}")
# Don't raise - persistence failure shouldn't break video generation