Files
AI-Video/pixelle_video/services/video_generator.py
2025-11-07 16:59:12 +08:00

493 lines
20 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Video Generator Service
End-to-end service for generating short videos from content.
"""
from datetime import datetime
from pathlib import Path
from typing import Optional, Callable, Literal
from loguru import logger
from pixelle_video.models.progress import ProgressEvent
from pixelle_video.models.storyboard import (
Storyboard,
StoryboardFrame,
StoryboardConfig,
ContentMetadata,
VideoGenerationResult
)
class VideoGeneratorService:
"""
Video generation service
Orchestrates the complete pipeline:
1. Generate narrations (LLM)
2. Generate image prompts (LLM)
3. Process each frame (TTS + Image + Compose + Video)
4. Concatenate all segments
5. Add BGM (optional)
"""
def __init__(self, pixelle_video_core):
"""
Initialize video generator service
Args:
pixelle_video_core: PixelleVideoCore instance
"""
self.core = pixelle_video_core
async def __call__(
self,
# === Input ===
text: str,
# === Processing Mode ===
mode: Literal["generate", "fixed"] = "generate",
# === Optional Title ===
title: Optional[str] = None,
# === Basic Config ===
n_scenes: int = 5, # Only used in generate mode; ignored in fixed mode
voice_id: str = "[Chinese] zh-CN Yunjian",
tts_workflow: Optional[str] = None,
tts_speed: float = 1.2,
ref_audio: Optional[str] = None, # Reference audio for voice cloning
output_path: Optional[str] = None,
# === LLM Parameters ===
min_narration_words: int = 5,
max_narration_words: int = 20,
min_image_prompt_words: int = 30,
max_image_prompt_words: int = 60,
# === Image Parameters ===
image_width: int = 1024,
image_height: int = 1024,
image_workflow: Optional[str] = None,
# === Video Parameters ===
video_fps: int = 30,
# === Frame Template (determines video size) ===
frame_template: Optional[str] = None,
# === Image Style ===
prompt_prefix: Optional[str] = None,
# === BGM Parameters ===
bgm_path: Optional[str] = None,
bgm_volume: float = 0.2,
bgm_mode: Literal["once", "loop"] = "loop",
# === Advanced Options ===
content_metadata: Optional[ContentMetadata] = None,
progress_callback: Optional[Callable[[ProgressEvent], None]] = None,
) -> VideoGenerationResult:
"""
Generate short video from text input
Args:
text: Text input (required)
- For generate mode: topic/theme (e.g., "如何提高学习效率")
- For fixed mode: complete narration script (each line is a narration)
mode: Processing mode (default "generate")
- "generate": LLM generates narrations from topic/theme, creates n_scenes
- "fixed": Use existing script as-is, each line becomes a narration
Note: In fixed mode, n_scenes is ignored (uses actual line count)
title: Video title (optional)
- If provided, use it as the video title
- If not provided:
* generate mode → use text as title
* fixed mode → LLM generates title from script
n_scenes: Number of storyboard scenes (default 5)
Only effective in generate mode; ignored in fixed mode
voice_id: TTS voice ID (default "[Chinese] zh-CN Yunjian")
tts_workflow: TTS workflow filename (e.g., "tts_edge.json", None = use default)
tts_speed: TTS speed multiplier (1.0 = normal, 1.2 = 20% faster, default 1.2)
output_path: Output video path (auto-generated if None)
min_narration_words: Min narration length (generate mode only)
max_narration_words: Max narration length (generate mode only)
min_image_prompt_words: Min image prompt length
max_image_prompt_words: Max image prompt length
image_width: Generated image width (default 1024)
image_height: Generated image height (default 1024)
image_workflow: Image workflow filename (e.g., "image_flux.json", None = use default)
video_fps: Video frame rate (default 30)
frame_template: HTML template path with size (None = use default "1080x1920/default.html")
Format: "SIZExSIZE/template.html" (e.g., "1080x1920/default.html", "1920x1080/modern.html")
Video size is automatically determined from template path
prompt_prefix: Image prompt prefix (overrides config.yaml if provided)
e.g., "anime style, vibrant colors" or "" for no prefix
bgm_path: BGM path (filename like "default.mp3", custom path, or None)
bgm_volume: BGM volume 0.0-1.0 (default 0.2)
bgm_mode: BGM mode "once" or "loop" (default "loop")
content_metadata: Content metadata (optional, for display)
progress_callback: Progress callback function(message, progress)
Returns:
VideoGenerationResult with video path and metadata
Examples:
# Generate mode: LLM creates narrations from topic
>>> result = await pixelle_video.generate_video(
... text="如何在信息爆炸时代保持深度思考",
... mode="generate",
... n_scenes=5,
... bgm_path="default"
... )
# Fixed mode: Use existing script (each line is a narration)
>>> script = '''大家好,今天跟你分享三个学习技巧
... 第一个技巧是专注力训练每天冥想10分钟
... 第二个技巧是主动回忆,学完立即复述
... 第三个技巧是间隔重复,学习后定期复习'''
>>> result = await pixelle_video.generate_video(
... text=script,
... mode="fixed",
... title="三个学习技巧"
... )
>>> print(result.video_path)
"""
# ========== Step 0: Process text and determine title ==========
logger.info(f"🚀 Starting video generation in '{mode}' mode")
logger.info(f" Text length: {len(text)} chars")
# Determine final title (priority: user-specified > auto-generated)
if title:
# User specified title, use it directly
final_title = title
logger.info(f" Title: '{title}' (user-specified)")
else:
# Auto-generate title using title_generator service
self._report_progress(progress_callback, "generating_title", 0.01)
if mode == "generate":
# Auto strategy: decide based on content length
final_title = await self.core.title_generator(text, strategy="auto")
logger.info(f" Title: '{final_title}' (auto-generated)")
else: # fixed
# Force LLM strategy: always use LLM for script
final_title = await self.core.title_generator(text, strategy="llm")
logger.info(f" Title: '{final_title}' (LLM-generated)")
# ========== Step 0.5: Create isolated task directory ==========
from pixelle_video.utils.os_util import (
create_task_output_dir,
get_task_final_video_path
)
# Create isolated task directory for this video generation
task_dir, task_id = create_task_output_dir()
logger.info(f"📁 Task directory created: {task_dir}")
logger.info(f" Task ID: {task_id}")
# Determine final video path
user_specified_output = None
if output_path is None:
# Use standardized path: output/{task_id}/final.mp4
output_path = get_task_final_video_path(task_id)
else:
# User specified custom path: save it and use task path for generation
user_specified_output = output_path
output_path = get_task_final_video_path(task_id)
logger.info(f" Will copy final video to: {user_specified_output}")
# Create storyboard config
config = StoryboardConfig(
task_id=task_id, # Pass task_id for file isolation
n_storyboard=n_scenes,
min_narration_words=min_narration_words,
max_narration_words=max_narration_words,
min_image_prompt_words=min_image_prompt_words,
max_image_prompt_words=max_image_prompt_words,
video_fps=video_fps,
voice_id=voice_id,
tts_workflow=tts_workflow,
tts_speed=tts_speed,
ref_audio=ref_audio,
image_width=image_width,
image_height=image_height,
image_workflow=image_workflow,
frame_template=frame_template or "1080x1920/default.html"
)
# Create storyboard
storyboard = Storyboard(
title=final_title, # Use final_title as video title
config=config,
content_metadata=content_metadata,
created_at=datetime.now()
)
try:
# ========== Step 1: Generate/Split narrations ==========
if mode == "generate":
# Generate narrations using LLM
self._report_progress(progress_callback, "generating_narrations", 0.05)
narrations = await self.core.narration_generator.generate_narrations(
config=config,
source_type="topic",
content_metadata=None,
topic=text,
content=None
)
logger.info(f"✅ Generated {len(narrations)} narrations")
else: # fixed
# Split fixed script by lines (trust user input completely)
self._report_progress(progress_callback, "splitting_script", 0.05)
narrations = await self._split_narration_script(text, config)
logger.info(f"✅ Split script into {len(narrations)} segments (by lines)")
logger.info(f" Note: n_scenes={n_scenes} is ignored in fixed mode")
# Step 2: Generate image prompts
self._report_progress(progress_callback, "generating_image_prompts", 0.15)
# Override prompt_prefix if provided (temporarily modify config)
original_prefix = None
if prompt_prefix is not None:
# Fix: image config is under comfyui.image, not directly under config
image_config = self.core.config.get("comfyui", {}).get("image", {})
original_prefix = image_config.get("prompt_prefix")
image_config["prompt_prefix"] = prompt_prefix
logger.info(f"Using custom prompt_prefix: '{prompt_prefix}'")
try:
# Create progress callback wrapper for image prompt generation (15%-30% range)
def image_prompt_progress(completed: int, total: int, message: str):
# Map batch progress to 15%-30% range
batch_progress = completed / total if total > 0 else 0
overall_progress = 0.15 + (batch_progress * 0.15) # 15% -> 30%
self._report_progress(
progress_callback,
"generating_image_prompts",
overall_progress,
extra_info=message
)
image_prompts = await self.core.image_prompt_generator.generate_image_prompts(
narrations=narrations,
config=config,
progress_callback=image_prompt_progress
)
finally:
# Restore original prompt_prefix
if original_prefix is not None:
image_config["prompt_prefix"] = original_prefix
logger.info(f"✅ Generated {len(image_prompts)} image prompts")
# Step 3: Create frames
for i, (narration, image_prompt) in enumerate(zip(narrations, image_prompts)):
frame = StoryboardFrame(
index=i,
narration=narration,
image_prompt=image_prompt,
created_at=datetime.now()
)
storyboard.frames.append(frame)
# Step 4: Process each frame
for i, frame in enumerate(storyboard.frames):
# Calculate fine-grained progress for this frame
base_progress = 0.2 # Frames processing starts at 20%
frame_range = 0.6 # Frames processing takes 60% (20%-80%)
per_frame_progress = frame_range / len(storyboard.frames)
# Create frame-specific progress callback
def frame_progress_callback(event: ProgressEvent):
"""Report sub-step progress within current frame"""
# Calculate overall progress: base + previous frames + current frame progress
overall_progress = base_progress + (per_frame_progress * i) + (per_frame_progress * event.progress)
# Forward the event with adjusted overall progress
if progress_callback:
adjusted_event = ProgressEvent(
event_type=event.event_type,
progress=overall_progress,
frame_current=event.frame_current,
frame_total=event.frame_total,
step=event.step,
action=event.action
)
progress_callback(adjusted_event)
# Report frame start
self._report_progress(
progress_callback,
"processing_frame",
base_progress + (per_frame_progress * i),
frame_current=i+1,
frame_total=len(storyboard.frames)
)
processed_frame = await self.core.frame_processor(
frame=frame,
storyboard=storyboard,
config=config,
total_frames=len(storyboard.frames),
progress_callback=frame_progress_callback
)
storyboard.total_duration += processed_frame.duration
logger.info(f"✅ Frame {i+1} completed ({processed_frame.duration:.2f}s)")
# Step 5: Concatenate videos
self._report_progress(progress_callback, "concatenating", 0.85)
segment_paths = [frame.video_segment_path for frame in storyboard.frames]
from pixelle_video.services.video import VideoService
video_service = VideoService()
final_video_path = video_service.concat_videos(
videos=segment_paths,
output=output_path,
bgm_path=bgm_path,
bgm_volume=bgm_volume,
bgm_mode=bgm_mode
)
storyboard.final_video_path = final_video_path
storyboard.completed_at = datetime.now()
# Copy to user-specified path if provided
if user_specified_output:
import shutil
Path(user_specified_output).parent.mkdir(parents=True, exist_ok=True)
shutil.copy2(final_video_path, user_specified_output)
logger.info(f"📹 Final video copied to: {user_specified_output}")
# Use user-specified path in result
final_video_path = user_specified_output
storyboard.final_video_path = user_specified_output
logger.success(f"🎬 Video generation completed: {final_video_path}")
# Step 6: Create result
self._report_progress(progress_callback, "completed", 1.0)
video_path_obj = Path(final_video_path)
file_size = video_path_obj.stat().st_size
result = VideoGenerationResult(
video_path=final_video_path,
storyboard=storyboard,
duration=storyboard.total_duration,
file_size=file_size
)
logger.info(f"✅ Generated video: {final_video_path}")
logger.info(f" Duration: {storyboard.total_duration:.2f}s")
logger.info(f" Size: {file_size / (1024*1024):.2f} MB")
logger.info(f" Frames: {len(storyboard.frames)}")
return result
except Exception as e:
logger.error(f"❌ Video generation failed: {e}")
raise
def _report_progress(
self,
callback: Optional[Callable[[ProgressEvent], None]],
event_type: str,
progress: float,
**kwargs
):
"""
Report progress via callback
Args:
callback: Progress callback function
event_type: Type of progress event
progress: Progress value (0.0-1.0)
**kwargs: Additional event-specific parameters (frame_current, frame_total, etc.)
"""
if callback:
event = ProgressEvent(event_type=event_type, progress=progress, **kwargs)
callback(event)
logger.debug(f"Progress: {progress*100:.0f}% - {event_type}")
else:
logger.debug(f"Progress: {progress*100:.0f}% - {event_type}")
def _parse_json(self, text: str) -> dict:
"""
Parse JSON from text, with fallback to extract JSON from markdown code blocks
Args:
text: Text containing JSON
Returns:
Parsed JSON dict
"""
import json
import re
# Try direct parsing first
try:
return json.loads(text)
except json.JSONDecodeError:
pass
# Try to extract JSON from markdown code block
json_pattern = r'```(?:json)?\s*([\s\S]+?)\s*```'
match = re.search(json_pattern, text, re.DOTALL)
if match:
try:
return json.loads(match.group(1))
except json.JSONDecodeError:
pass
# Try to find any JSON object in the text (flexible pattern for narrations)
json_pattern = r'\{[^{}]*"narrations"\s*:\s*\[[^\]]*\][^{}]*\}'
match = re.search(json_pattern, text, re.DOTALL)
if match:
try:
return json.loads(match.group(0))
except json.JSONDecodeError:
pass
# If all fails, raise error
raise json.JSONDecodeError("No valid JSON found", text, 0)
async def _split_narration_script(self, script: str, config: StoryboardConfig) -> list[str]:
"""
Split user-provided narration script into segments (trust user input completely).
Simply split by newline, each line becomes a narration segment.
Empty lines are filtered out.
Args:
script: Fixed narration script (each line is a narration)
config: Storyboard configuration (unused, kept for interface compatibility)
Returns:
List of narration segments
"""
logger.info(f"Splitting script by lines (length: {len(script)} chars)")
# Split by newline, filter empty lines
narrations = [line.strip() for line in script.split('\n') if line.strip()]
logger.info(f"✅ Split script into {len(narrations)} segments (by lines)")
# Log statistics
if narrations:
lengths = [len(s) for s in narrations]
logger.info(f" Min: {min(lengths)} chars, Max: {max(lengths)} chars, Avg: {sum(lengths)//len(lengths)} chars")
return narrations