diff --git a/pixelle_video/__init__.py b/pixelle_video/__init__.py index 30df08d..145afbb 100644 --- a/pixelle_video/__init__.py +++ b/pixelle_video/__init__.py @@ -13,8 +13,22 @@ Usage: answer = await pixelle_video.llm("Explain atomic habits") audio = await pixelle_video.tts("Hello world") - # Generate video - result = await pixelle_video.generate_video(topic="AI in 2024") + # Generate video with different pipelines + # Standard pipeline (default) + result = await pixelle_video.generate_video( + text="如何提高学习效率", + n_scenes=5 + ) + + # Custom pipeline (template for your own logic) + result = await pixelle_video.generate_video( + text=your_content, + pipeline="custom", + custom_param_example="custom_value" + ) + + # Check available pipelines + print(pixelle_video.pipelines.keys()) # dict_keys(['standard', 'custom']) """ from pixelle_video.service import PixelleVideoCore, pixelle_video diff --git a/pixelle_video/pipelines/__init__.py b/pixelle_video/pipelines/__init__.py new file mode 100644 index 0000000..5f9359f --- /dev/null +++ b/pixelle_video/pipelines/__init__.py @@ -0,0 +1,17 @@ +""" +Pixelle-Video Pipelines + +Video generation pipelines with different strategies and workflows. +Each pipeline implements a specific video generation approach. +""" + +from pixelle_video.pipelines.base import BasePipeline +from pixelle_video.pipelines.standard import StandardPipeline +from pixelle_video.pipelines.custom import CustomPipeline + +__all__ = [ + "BasePipeline", + "StandardPipeline", + "CustomPipeline", +] + diff --git a/pixelle_video/pipelines/base.py b/pixelle_video/pipelines/base.py new file mode 100644 index 0000000..75f704a --- /dev/null +++ b/pixelle_video/pipelines/base.py @@ -0,0 +1,102 @@ +""" +Base Pipeline for Video Generation + +All custom pipelines should inherit from BasePipeline. +""" + +from abc import ABC, abstractmethod +from typing import Optional, Callable + +from loguru import logger + +from pixelle_video.models.progress import ProgressEvent +from pixelle_video.models.storyboard import VideoGenerationResult + + +class BasePipeline(ABC): + """ + Base pipeline for video generation + + All custom pipelines should inherit from this class and implement __call__. + + Design principles: + - Each pipeline represents a complete video generation workflow + - Pipelines are independent and can have completely different logic + - Pipelines have access to all core services via self.core + - Pipelines should report progress via progress_callback + + Example: + >>> class MyPipeline(BasePipeline): + ... async def __call__(self, text: str, **kwargs): + ... # Step 1: Generate content + ... narrations = await some_logic(text) + ... + ... # Step 2: Process frames + ... for narration in narrations: + ... audio = await self.core.tts(narration) + ... # ... + ... + ... return VideoGenerationResult(...) + """ + + def __init__(self, pixelle_video_core): + """ + Initialize pipeline with core services + + Args: + pixelle_video_core: PixelleVideoCore instance (provides access to all services) + """ + self.core = pixelle_video_core + + # Quick access to services (convenience) + self.llm = pixelle_video_core.llm + self.tts = pixelle_video_core.tts + self.image = pixelle_video_core.image + self.video = pixelle_video_core.video + + @abstractmethod + async def __call__( + self, + text: str, + progress_callback: Optional[Callable[[ProgressEvent], None]] = None, + **kwargs + ) -> VideoGenerationResult: + """ + Execute the pipeline + + Args: + text: Input text (meaning varies by pipeline) + progress_callback: Optional callback for progress updates (receives ProgressEvent) + **kwargs: Pipeline-specific parameters + + Returns: + VideoGenerationResult with video path and metadata + + Raises: + Exception: Pipeline-specific exceptions + """ + pass + + def _report_progress( + self, + callback: Optional[Callable[[ProgressEvent], None]], + event_type: str, + progress: float, + **kwargs + ): + """ + Report progress via callback + + Args: + callback: Progress callback function + event_type: Type of progress event + progress: Progress value (0.0-1.0) + **kwargs: Additional event-specific parameters (frame_current, frame_total, etc.) + """ + if callback: + event = ProgressEvent(event_type=event_type, progress=progress, **kwargs) + callback(event) + logger.debug(f"Progress: {progress*100:.0f}% - {event_type}") + else: + logger.debug(f"Progress: {progress*100:.0f}% - {event_type}") + diff --git a/pixelle_video/pipelines/custom.py b/pixelle_video/pipelines/custom.py new file mode 100644 index 0000000..aa4d6fa --- /dev/null +++ b/pixelle_video/pipelines/custom.py @@ -0,0 +1,375 @@ +""" +Custom Video Generation Pipeline + +Template pipeline for creating your own custom video generation workflows. +This serves as a reference implementation showing how to extend BasePipeline. + +For real projects, copy this file and modify it according to your needs. +""" + +from datetime import datetime +from pathlib import Path +from typing import Optional, Callable + +from loguru import logger + +from pixelle_video.pipelines.base import BasePipeline +from pixelle_video.models.progress import ProgressEvent +from pixelle_video.models.storyboard import ( + Storyboard, + StoryboardFrame, + StoryboardConfig, + ContentMetadata, + VideoGenerationResult +) + + +class CustomPipeline(BasePipeline): + """ + Custom video generation pipeline template + + This is a template showing how to create your own pipeline with custom logic. + You can customize: + - Content processing logic + - Narration generation strategy + - Image prompt generation + - Frame composition + - Video assembly + + Example usage: + # 1. Create your own pipeline by copying this file + # 2. Modify the __call__ method with your custom logic + # 3. Register it in service.py or dynamically + + from pixelle_video.pipelines.custom import CustomPipeline + pixelle_video.pipelines["my_custom"] = CustomPipeline(pixelle_video) + + # 4. Use it + result = await pixelle_video.generate_video( + text=your_content, + pipeline="my_custom", + # Your custom parameters here + ) + """ + + async def __call__( + self, + text: str, + # === Custom Parameters === + # Add your own parameters here + custom_param_example: str = "default_value", + + # === Standard Parameters (keep these for compatibility) === + voice_id: str = "[Chinese] zh-CN Yunjian", + tts_workflow: Optional[str] = None, + tts_speed: float = 1.2, + ref_audio: Optional[str] = None, + + image_workflow: Optional[str] = None, + image_width: int = 1024, + image_height: int = 1024, + + frame_template: str = "1080x1920/default.html", + video_fps: int = 30, + output_path: Optional[str] = None, + + bgm_path: Optional[str] = None, + bgm_volume: float = 0.2, + + progress_callback: Optional[Callable[[ProgressEvent], None]] = None, + ) -> VideoGenerationResult: + """ + Custom video generation workflow + + Customize this method to implement your own logic. + + Args: + text: Input text (customize meaning as needed) + custom_param_example: Your custom parameter + (other standard parameters...) + + Returns: + VideoGenerationResult + """ + logger.info("Starting CustomPipeline") + logger.info(f"Input text length: {len(text)} chars") + logger.info(f"Custom parameter: {custom_param_example}") + + # ========== Step 0: Setup ========== + self._report_progress(progress_callback, "initializing", 0.05) + + # Create task directory + from pixelle_video.utils.os_util import ( + create_task_output_dir, + get_task_final_video_path + ) + + task_dir, task_id = create_task_output_dir() + logger.info(f"Task directory: {task_dir}") + + user_specified_output = None + if output_path is None: + output_path = get_task_final_video_path(task_id) + else: + user_specified_output = output_path + output_path = get_task_final_video_path(task_id) + + # ========== Step 1: Process content (CUSTOMIZE THIS) ========== + self._report_progress(progress_callback, "processing_content", 0.10) + + # Example: Generate title using LLM + from pixelle_video.utils.content_generators import generate_title + title = await generate_title(self.llm, text, strategy="llm") + logger.info(f"Generated title: '{title}'") + + # Example: Split or generate narrations + # Option A: Split by lines (for fixed script) + narrations = [line.strip() for line in text.split('\n') if line.strip()] + + # Option B: Use LLM to generate narrations (uncomment to use) + # from pixelle_video.utils.content_generators import generate_narrations_from_topic + # narrations = await generate_narrations_from_topic( + # self.llm, + # topic=text, + # n_scenes=5, + # min_words=20, + # max_words=80 + # ) + + logger.info(f"Generated {len(narrations)} narrations") + + # ========== Step 2: Generate image prompts (CUSTOMIZE THIS) ========== + self._report_progress(progress_callback, "generating_image_prompts", 0.25) + + # Example: Generate image prompts using LLM + from pixelle_video.utils.content_generators import generate_image_prompts + + image_prompts = await generate_image_prompts( + self.llm, + narrations=narrations, + min_words=30, + max_words=60 + ) + + # Example: Apply custom prompt prefix + from pixelle_video.utils.prompt_helper import build_image_prompt + custom_prefix = "cinematic style, professional lighting" # Customize this + + final_image_prompts = [] + for base_prompt in image_prompts: + final_prompt = build_image_prompt(base_prompt, custom_prefix) + final_image_prompts.append(final_prompt) + + logger.info(f"Generated {len(final_image_prompts)} image prompts") + + # ========== Step 3: Create storyboard ========== + config = StoryboardConfig( + task_id=task_id, + n_storyboard=len(narrations), + min_narration_words=20, + max_narration_words=80, + min_image_prompt_words=30, + max_image_prompt_words=60, + video_fps=video_fps, + voice_id=voice_id, + tts_workflow=tts_workflow, + tts_speed=tts_speed, + ref_audio=ref_audio, + image_width=image_width, + image_height=image_height, + image_workflow=image_workflow, + frame_template=frame_template + ) + + # Optional: Add custom metadata + content_metadata = ContentMetadata( + title=title, + subtitle="Custom Pipeline Output" + ) + + storyboard = Storyboard( + title=title, + config=config, + content_metadata=content_metadata, + created_at=datetime.now() + ) + + # Create frames + for i, (narration, image_prompt) in enumerate(zip(narrations, final_image_prompts)): + frame = StoryboardFrame( + index=i, + narration=narration, + image_prompt=image_prompt, + created_at=datetime.now() + ) + storyboard.frames.append(frame) + + try: + # ========== Step 4: Process each frame ========== + # This is the standard frame processing logic + # You can customize frame processing if needed + + for i, frame in enumerate(storyboard.frames): + base_progress = 0.3 + frame_range = 0.5 + per_frame_progress = frame_range / len(storyboard.frames) + + self._report_progress( + progress_callback, + "processing_frame", + base_progress + (per_frame_progress * i), + frame_current=i+1, + frame_total=len(storyboard.frames) + ) + + # Use core frame processor (standard logic) + processed_frame = await self.core.frame_processor( + frame=frame, + storyboard=storyboard, + config=config, + total_frames=len(storyboard.frames), + progress_callback=None + ) + storyboard.total_duration += processed_frame.duration + logger.info(f"Frame {i+1} completed ({processed_frame.duration:.2f}s)") + + # ========== Step 5: Concatenate videos ========== + self._report_progress(progress_callback, "concatenating", 0.85) + segment_paths = [frame.video_segment_path for frame in storyboard.frames] + + from pixelle_video.services.video import VideoService + video_service = VideoService() + + final_video_path = video_service.concat_videos( + videos=segment_paths, + output=output_path, + bgm_path=bgm_path, + bgm_volume=bgm_volume, + bgm_mode="loop" + ) + + storyboard.final_video_path = final_video_path + storyboard.completed_at = datetime.now() + + # Copy to user-specified path if provided + if user_specified_output: + import shutil + Path(user_specified_output).parent.mkdir(parents=True, exist_ok=True) + shutil.copy2(final_video_path, user_specified_output) + logger.info(f"Final video copied to: {user_specified_output}") + final_video_path = user_specified_output + storyboard.final_video_path = user_specified_output + + logger.success(f"Custom pipeline video completed: {final_video_path}") + + # ========== Step 6: Create result ========== + self._report_progress(progress_callback, "completed", 1.0) + + video_path_obj = Path(final_video_path) + file_size = video_path_obj.stat().st_size + + result = VideoGenerationResult( + video_path=final_video_path, + storyboard=storyboard, + duration=storyboard.total_duration, + file_size=file_size + ) + + logger.info(f"Custom pipeline completed") + logger.info(f"Title: {title}") + logger.info(f"Duration: {storyboard.total_duration:.2f}s") + logger.info(f"Size: {file_size / (1024*1024):.2f} MB") + logger.info(f"Frames: {len(storyboard.frames)}") + + return result + + except Exception as e: + logger.error(f"Custom pipeline failed: {e}") + raise + + # ==================== Custom Helper Methods ==================== + # Add your own helper methods here + + async def _custom_content_analysis(self, text: str) -> dict: + """ + Example: Custom content analysis logic + + You can add your own helper methods to process content, + extract metadata, or perform custom transformations. + """ + # Your custom logic here + return { + "processed": text, + "metadata": {} + } + + async def _custom_prompt_generation(self, context: str) -> str: + """ + Example: Custom prompt generation logic + + Create specialized prompts based on your use case. + """ + prompt = f"Generate content based on: {context}" + response = await self.llm(prompt, temperature=0.7, max_tokens=500) + return response.strip() + + +# ==================== Usage Examples ==================== + +""" +Example 1: Register and use custom pipeline +---------------------------------------- +from pixelle_video import pixelle_video +from pixelle_video.pipelines.custom import CustomPipeline + +# Initialize +await pixelle_video.initialize() + +# Register custom pipeline +pixelle_video.pipelines["my_custom"] = CustomPipeline(pixelle_video) + +# Use it +result = await pixelle_video.generate_video( + text="Your input content here", + pipeline="my_custom", + custom_param_example="custom_value" +) + + +Example 2: Create your own pipeline class +---------------------------------------- +from pixelle_video.pipelines.custom import CustomPipeline + +class MySpecialPipeline(CustomPipeline): + async def __call__(self, text: str, **kwargs): + # Your completely custom logic + logger.info("Running my special pipeline") + + # You can reuse parts from CustomPipeline or start from scratch + # ... + + return result + + +Example 3: Inline custom pipeline +---------------------------------------- +from pixelle_video.pipelines.base import BasePipeline + +class QuickPipeline(BasePipeline): + async def __call__(self, text: str, **kwargs): + # Quick custom logic + narrations = text.split('\\n') + + for narration in narrations: + audio = await self.tts(narration) + image = await self.image(prompt=f"illustration of {narration}") + # ... process frame + + # ... concatenate and return + return result + +# Use immediately +pixelle_video.pipelines["quick"] = QuickPipeline(pixelle_video) +result = await pixelle_video.generate_video(text=content, pipeline="quick") +""" + diff --git a/pixelle_video/pipelines/standard.py b/pixelle_video/pipelines/standard.py new file mode 100644 index 0000000..d28abb7 --- /dev/null +++ b/pixelle_video/pipelines/standard.py @@ -0,0 +1,388 @@ +""" +Standard Video Generation Pipeline + +Standard workflow for generating short videos from topic or fixed script. +This is the default pipeline that replicates the original VideoGeneratorService logic. +""" + +from datetime import datetime +from pathlib import Path +from typing import Optional, Callable, Literal + +from loguru import logger + +from pixelle_video.pipelines.base import BasePipeline +from pixelle_video.models.progress import ProgressEvent +from pixelle_video.models.storyboard import ( + Storyboard, + StoryboardFrame, + StoryboardConfig, + ContentMetadata, + VideoGenerationResult +) +from pixelle_video.utils.content_generators import ( + generate_title, + generate_narrations_from_topic, + split_narration_script, + generate_image_prompts, +) + + +class StandardPipeline(BasePipeline): + """ + Standard video generation pipeline + + Workflow: + 1. Generate/determine title + 2. Generate narrations (from topic or split fixed script) + 3. Generate image prompts for each narration + 4. For each frame: + - Generate audio (TTS) + - Generate image + - Compose frame with template + - Create video segment + 5. Concatenate all segments + 6. Add BGM (optional) + + Supports two modes: + - "generate": LLM generates narrations from topic + - "fixed": Use provided script as-is (each line = one narration) + """ + + async def __call__( + self, + # === Input === + text: str, + + # === Processing Mode === + mode: Literal["generate", "fixed"] = "generate", + + # === Optional Title === + title: Optional[str] = None, + + # === Basic Config === + n_scenes: int = 5, # Only used in generate mode; ignored in fixed mode + voice_id: str = "[Chinese] zh-CN Yunjian", + tts_workflow: Optional[str] = None, + tts_speed: float = 1.2, + ref_audio: Optional[str] = None, # Reference audio for voice cloning + output_path: Optional[str] = None, + + # === LLM Parameters === + min_narration_words: int = 5, + max_narration_words: int = 20, + min_image_prompt_words: int = 30, + max_image_prompt_words: int = 60, + + # === Image Parameters === + image_width: int = 1024, + image_height: int = 1024, + image_workflow: Optional[str] = None, + + # === Video Parameters === + video_fps: int = 30, + + # === Frame Template (determines video size) === + frame_template: Optional[str] = None, + + # === Image Style === + prompt_prefix: Optional[str] = None, + + # === BGM Parameters === + bgm_path: Optional[str] = None, + bgm_volume: float = 0.2, + bgm_mode: Literal["once", "loop"] = "loop", + + # === Advanced Options === + content_metadata: Optional[ContentMetadata] = None, + progress_callback: Optional[Callable[[ProgressEvent], None]] = None, + ) -> VideoGenerationResult: + """ + Generate short video from text input + + Args: + text: Text input (required) + - For generate mode: topic/theme (e.g., "如何提高学习效率") + - For fixed mode: complete narration script (each line is a narration) + + mode: Processing mode (default "generate") + - "generate": LLM generates narrations from topic, creates n_scenes + - "fixed": Use existing script as-is, each line becomes a narration + + Note: In fixed mode, n_scenes is ignored (uses actual line count) + + title: Video title (optional) + - If provided, use it as the video title + - If not provided: + * generate mode → use text as title + * fixed mode → LLM generates title from script + + n_scenes: Number of storyboard scenes (default 5) + Only effective in generate mode; ignored in fixed mode + + voice_id: TTS voice ID (default "[Chinese] zh-CN Yunjian") + tts_workflow: TTS workflow filename (e.g., "tts_edge.json", None = use default) + tts_speed: TTS speed multiplier (1.0 = normal, 1.2 = 20% faster, default 1.2) + ref_audio: Reference audio path for voice cloning (optional) + output_path: Output video path (auto-generated if None) + + min_narration_words: Min narration length (generate mode only) + max_narration_words: Max narration length (generate mode only) + min_image_prompt_words: Min image prompt length + max_image_prompt_words: Max image prompt length + + image_width: Generated image width (default 1024) + image_height: Generated image height (default 1024) + image_workflow: Image workflow filename (e.g., "image_flux.json", None = use default) + + video_fps: Video frame rate (default 30) + + frame_template: HTML template path with size (None = use default "1080x1920/default.html") + Format: "SIZExSIZE/template.html" (e.g., "1080x1920/default.html", "1920x1080/modern.html") + Video size is automatically determined from template path + + prompt_prefix: Image prompt prefix (overrides config.yaml if provided) + e.g., "anime style, vibrant colors" or "" for no prefix + + bgm_path: BGM path (filename like "default.mp3", custom path, or None) + bgm_volume: BGM volume 0.0-1.0 (default 0.2) + bgm_mode: BGM mode "once" or "loop" (default "loop") + + content_metadata: Content metadata (optional, for display) + progress_callback: Progress callback function(ProgressEvent) + + Returns: + VideoGenerationResult with video path and metadata + """ + # ========== Step 0: Process text and determine title ========== + logger.info(f"🚀 Starting StandardPipeline in '{mode}' mode") + logger.info(f" Text length: {len(text)} chars") + + # Determine final title + if title: + final_title = title + logger.info(f" Title: '{title}' (user-specified)") + else: + self._report_progress(progress_callback, "generating_title", 0.01) + if mode == "generate": + final_title = await generate_title(self.llm, text, strategy="auto") + logger.info(f" Title: '{final_title}' (auto-generated)") + else: # fixed + final_title = await generate_title(self.llm, text, strategy="llm") + logger.info(f" Title: '{final_title}' (LLM-generated)") + + # ========== Step 0.5: Create isolated task directory ========== + from pixelle_video.utils.os_util import ( + create_task_output_dir, + get_task_final_video_path + ) + + task_dir, task_id = create_task_output_dir() + logger.info(f"📁 Task directory created: {task_dir}") + logger.info(f" Task ID: {task_id}") + + # Determine final video path + user_specified_output = None + if output_path is None: + output_path = get_task_final_video_path(task_id) + else: + user_specified_output = output_path + output_path = get_task_final_video_path(task_id) + logger.info(f" Will copy final video to: {user_specified_output}") + + # Create storyboard config + config = StoryboardConfig( + task_id=task_id, + n_storyboard=n_scenes, + min_narration_words=min_narration_words, + max_narration_words=max_narration_words, + min_image_prompt_words=min_image_prompt_words, + max_image_prompt_words=max_image_prompt_words, + video_fps=video_fps, + voice_id=voice_id, + tts_workflow=tts_workflow, + tts_speed=tts_speed, + ref_audio=ref_audio, + image_width=image_width, + image_height=image_height, + image_workflow=image_workflow, + frame_template=frame_template or "1080x1920/default.html" + ) + + # Create storyboard + storyboard = Storyboard( + title=final_title, + config=config, + content_metadata=content_metadata, + created_at=datetime.now() + ) + + try: + # ========== Step 1: Generate/Split narrations ========== + if mode == "generate": + self._report_progress(progress_callback, "generating_narrations", 0.05) + narrations = await generate_narrations_from_topic( + self.llm, + topic=text, + n_scenes=n_scenes, + min_words=min_narration_words, + max_words=max_narration_words + ) + logger.info(f"✅ Generated {len(narrations)} narrations") + else: # fixed + self._report_progress(progress_callback, "splitting_script", 0.05) + narrations = await split_narration_script(text) + logger.info(f"✅ Split script into {len(narrations)} segments (by lines)") + logger.info(f" Note: n_scenes={n_scenes} is ignored in fixed mode") + + # ========== Step 2: Generate image prompts ========== + self._report_progress(progress_callback, "generating_image_prompts", 0.15) + + # Override prompt_prefix if provided + original_prefix = None + if prompt_prefix is not None: + image_config = self.core.config.get("comfyui", {}).get("image", {}) + original_prefix = image_config.get("prompt_prefix") + image_config["prompt_prefix"] = prompt_prefix + logger.info(f"Using custom prompt_prefix: '{prompt_prefix}'") + + try: + # Create progress callback wrapper for image prompt generation + def image_prompt_progress(completed: int, total: int, message: str): + batch_progress = completed / total if total > 0 else 0 + overall_progress = 0.15 + (batch_progress * 0.15) + self._report_progress( + progress_callback, + "generating_image_prompts", + overall_progress, + extra_info=message + ) + + # Generate base image prompts + base_image_prompts = await generate_image_prompts( + self.llm, + narrations=narrations, + min_words=min_image_prompt_words, + max_words=max_image_prompt_words, + progress_callback=image_prompt_progress + ) + + # Apply prompt prefix + from pixelle_video.utils.prompt_helper import build_image_prompt + image_config = self.core.config.get("comfyui", {}).get("image", {}) + prompt_prefix_to_use = prompt_prefix if prompt_prefix is not None else image_config.get("prompt_prefix", "") + + image_prompts = [] + for base_prompt in base_image_prompts: + final_prompt = build_image_prompt(base_prompt, prompt_prefix_to_use) + image_prompts.append(final_prompt) + + finally: + # Restore original prompt_prefix + if original_prefix is not None: + image_config["prompt_prefix"] = original_prefix + + logger.info(f"✅ Generated {len(image_prompts)} image prompts") + + # ========== Step 3: Create frames ========== + for i, (narration, image_prompt) in enumerate(zip(narrations, image_prompts)): + frame = StoryboardFrame( + index=i, + narration=narration, + image_prompt=image_prompt, + created_at=datetime.now() + ) + storyboard.frames.append(frame) + + # ========== Step 4: Process each frame ========== + for i, frame in enumerate(storyboard.frames): + base_progress = 0.2 + frame_range = 0.6 + per_frame_progress = frame_range / len(storyboard.frames) + + # Create frame-specific progress callback + def frame_progress_callback(event: ProgressEvent): + overall_progress = base_progress + (per_frame_progress * i) + (per_frame_progress * event.progress) + if progress_callback: + adjusted_event = ProgressEvent( + event_type=event.event_type, + progress=overall_progress, + frame_current=event.frame_current, + frame_total=event.frame_total, + step=event.step, + action=event.action + ) + progress_callback(adjusted_event) + + # Report frame start + self._report_progress( + progress_callback, + "processing_frame", + base_progress + (per_frame_progress * i), + frame_current=i+1, + frame_total=len(storyboard.frames) + ) + + processed_frame = await self.core.frame_processor( + frame=frame, + storyboard=storyboard, + config=config, + total_frames=len(storyboard.frames), + progress_callback=frame_progress_callback + ) + storyboard.total_duration += processed_frame.duration + logger.info(f"✅ Frame {i+1} completed ({processed_frame.duration:.2f}s)") + + # ========== Step 5: Concatenate videos ========== + self._report_progress(progress_callback, "concatenating", 0.85) + segment_paths = [frame.video_segment_path for frame in storyboard.frames] + + from pixelle_video.services.video import VideoService + video_service = VideoService() + + final_video_path = video_service.concat_videos( + videos=segment_paths, + output=output_path, + bgm_path=bgm_path, + bgm_volume=bgm_volume, + bgm_mode=bgm_mode + ) + + storyboard.final_video_path = final_video_path + storyboard.completed_at = datetime.now() + + # Copy to user-specified path if provided + if user_specified_output: + import shutil + Path(user_specified_output).parent.mkdir(parents=True, exist_ok=True) + shutil.copy2(final_video_path, user_specified_output) + logger.info(f"📹 Final video copied to: {user_specified_output}") + final_video_path = user_specified_output + storyboard.final_video_path = user_specified_output + + logger.success(f"🎬 Video generation completed: {final_video_path}") + + # ========== Step 6: Create result ========== + self._report_progress(progress_callback, "completed", 1.0) + + video_path_obj = Path(final_video_path) + file_size = video_path_obj.stat().st_size + + result = VideoGenerationResult( + video_path=final_video_path, + storyboard=storyboard, + duration=storyboard.total_duration, + file_size=file_size + ) + + logger.info(f"✅ Generated video: {final_video_path}") + logger.info(f" Duration: {storyboard.total_duration:.2f}s") + logger.info(f" Size: {file_size / (1024*1024):.2f} MB") + logger.info(f" Frames: {len(storyboard.frames)}") + + return result + + except Exception as e: + logger.error(f"❌ Video generation failed: {e}") + raise + diff --git a/pixelle_video/service.py b/pixelle_video/service.py index f422b6a..307af9b 100644 --- a/pixelle_video/service.py +++ b/pixelle_video/service.py @@ -12,11 +12,13 @@ from pixelle_video.config import config_manager from pixelle_video.services.llm_service import LLMService from pixelle_video.services.tts_service import TTSService from pixelle_video.services.image import ImageService +from pixelle_video.services.video import VideoService from pixelle_video.services.narration_generator import NarrationGeneratorService from pixelle_video.services.image_prompt_generator import ImagePromptGeneratorService from pixelle_video.services.title_generator import TitleGeneratorService from pixelle_video.services.frame_processor import FrameProcessor -from pixelle_video.services.video_generator import VideoGeneratorService +from pixelle_video.pipelines.standard import StandardPipeline +from pixelle_video.pipelines.custom import CustomPipeline class PixelleVideoCore: @@ -45,7 +47,11 @@ class PixelleVideoCore: ├── config (configuration) ├── llm (LLM service - direct OpenAI SDK) ├── tts (TTS service - ComfyKit workflows) - └── image (Image service - ComfyKit workflows) + ├── image (Image service - ComfyKit workflows) + └── pipelines (video generation pipelines) + ├── standard (standard workflow) + ├── custom (custom workflow template) + └── ... (extensible) """ def __init__(self, config_path: str = "config.yaml"): @@ -63,6 +69,7 @@ class PixelleVideoCore: self.llm: Optional[LLMService] = None self.tts: Optional[TTSService] = None self.image: Optional[ImageService] = None + self.video: Optional[VideoService] = None # Content generation services self.narration_generator: Optional[NarrationGeneratorService] = None @@ -72,8 +79,11 @@ class PixelleVideoCore: # Frame processing services self.frame_processor: Optional[FrameProcessor] = None - # Video generation service (named as verb for direct calling) - self.generate_video: Optional[VideoGeneratorService] = None + # Video generation pipelines (dictionary of pipeline_name -> pipeline_instance) + self.pipelines = {} + + # Default pipeline callable (for backward compatibility) + self.generate_video = None async def initialize(self): """ @@ -90,10 +100,11 @@ class PixelleVideoCore: logger.info("🚀 Initializing Pixelle-Video...") - # 1. Initialize core services (no capability layer) + # 1. Initialize core services self.llm = LLMService(self.config) self.tts = TTSService(self.config) self.image = ImageService(self.config) + self.video = VideoService() # 2. Initialize content generation services self.narration_generator = NarrationGeneratorService(self) @@ -103,12 +114,67 @@ class PixelleVideoCore: # 3. Initialize frame processing services self.frame_processor = FrameProcessor(self) - # 4. Initialize video generation service - self.generate_video = VideoGeneratorService(self) + # 4. Register video generation pipelines + self.pipelines = { + "standard": StandardPipeline(self), + "custom": CustomPipeline(self), + } + logger.info(f"📹 Registered pipelines: {', '.join(self.pipelines.keys())}") + + # 5. Set default pipeline callable (for backward compatibility) + self.generate_video = self._create_generate_video_wrapper() self._initialized = True logger.info("✅ Pixelle-Video initialized successfully\n") + def _create_generate_video_wrapper(self): + """ + Create a wrapper function for generate_video that supports pipeline selection + + This maintains backward compatibility while adding pipeline support. + """ + async def generate_video_wrapper( + text: str, + pipeline: str = "standard", + **kwargs + ): + """ + Generate video using specified pipeline + + Args: + text: Input text + pipeline: Pipeline name ("standard", "book_summary", etc.) + **kwargs: Pipeline-specific parameters + + Returns: + VideoGenerationResult + + Examples: + # Use standard pipeline (default) + result = await pixelle_video.generate_video( + text="如何提高学习效率", + n_scenes=5 + ) + + # Use custom pipeline + result = await pixelle_video.generate_video( + text=your_content, + pipeline="custom", + custom_param_example="custom_value" + ) + """ + if pipeline not in self.pipelines: + available = ", ".join(self.pipelines.keys()) + raise ValueError( + f"Unknown pipeline: '{pipeline}'. " + f"Available pipelines: {available}" + ) + + pipeline_instance = self.pipelines[pipeline] + return await pipeline_instance(text=text, **kwargs) + + return generate_video_wrapper + @property def project_name(self) -> str: """Get project name from config""" @@ -117,7 +183,8 @@ class PixelleVideoCore: def __repr__(self) -> str: """String representation""" status = "initialized" if self._initialized else "not initialized" - return f"" + pipelines = f"pipelines={list(self.pipelines.keys())}" if self._initialized else "" + return f"" # Global instance diff --git a/pixelle_video/services/__init__.py b/pixelle_video/services/__init__.py index 6a5091a..d4c29b9 100644 --- a/pixelle_video/services/__init__.py +++ b/pixelle_video/services/__init__.py @@ -1,7 +1,20 @@ """ Pixelle-Video Services -Unified service layer providing simplified access to capabilities. +Core services providing atomic capabilities. + +Core Services (Active): +- LLMService: LLM text generation +- TTSService: Text-to-speech +- ImageService: Image generation +- VideoService: Video processing + +Legacy Services (Kept for backward compatibility): +- NarrationGeneratorService: Use pipelines + utils.content_generators instead +- ImagePromptGeneratorService: Use pipelines + utils.content_generators instead +- TitleGeneratorService: Use pipelines + utils.content_generators instead +- FrameProcessor: Use pipelines instead +- VideoGeneratorService: Use pipelines.StandardPipeline instead """ from pixelle_video.services.comfy_base_service import ComfyBaseService @@ -9,6 +22,8 @@ from pixelle_video.services.llm_service import LLMService from pixelle_video.services.tts_service import TTSService from pixelle_video.services.image import ImageService from pixelle_video.services.video import VideoService + +# Legacy services (kept for backward compatibility) from pixelle_video.services.narration_generator import NarrationGeneratorService from pixelle_video.services.image_prompt_generator import ImagePromptGeneratorService from pixelle_video.services.title_generator import TitleGeneratorService @@ -21,6 +36,7 @@ __all__ = [ "TTSService", "ImageService", "VideoService", + # Legacy (backward compatibility) "NarrationGeneratorService", "ImagePromptGeneratorService", "TitleGeneratorService", diff --git a/pixelle_video/utils/__init__.py b/pixelle_video/utils/__init__.py index 1884dbb..2fa943f 100644 --- a/pixelle_video/utils/__init__.py +++ b/pixelle_video/utils/__init__.py @@ -1,3 +1,5 @@ """ -Pixelle-Video utilities -""" +Pixelle-Video Utilities + +Utility functions and helpers. +""" \ No newline at end of file diff --git a/pixelle_video/utils/content_generators.py b/pixelle_video/utils/content_generators.py new file mode 100644 index 0000000..fd8dc52 --- /dev/null +++ b/pixelle_video/utils/content_generators.py @@ -0,0 +1,351 @@ +""" +Content generation utility functions + +Pure/stateless functions for generating content using LLM. +These functions are reusable across different pipelines. +""" + +import json +import re +from typing import List, Optional, Literal + +from loguru import logger + + +async def generate_title( + llm_service, + content: str, + strategy: Literal["auto", "direct", "llm"] = "auto", + max_length: int = 15 +) -> str: + """ + Generate title from content + + Args: + llm_service: LLM service instance + content: Source content (topic or script) + strategy: Generation strategy + - "auto": Auto-decide based on content length (default) + - "direct": Use content directly (truncated if needed) + - "llm": Always use LLM to generate title + max_length: Maximum title length (default: 15) + + Returns: + Generated title + """ + if strategy == "direct": + content = content.strip() + return content[:max_length] if len(content) > max_length else content + + if strategy == "auto": + if len(content.strip()) <= 15: + return content.strip() + # Fall through to LLM + + # Use LLM to generate title + from pixelle_video.prompts import build_title_generation_prompt + + prompt = build_title_generation_prompt(content, max_length=500) + response = await llm_service(prompt, temperature=0.7, max_tokens=50) + + # Clean up response + title = response.strip() + + # Remove quotes if present + if title.startswith('"') and title.endswith('"'): + title = title[1:-1] + if title.startswith("'") and title.endswith("'"): + title = title[1:-1] + + # Limit to max_length (safety) + if len(title) > max_length: + title = title[:max_length] + + logger.debug(f"Generated title: '{title}' (length: {len(title)})") + return title + + +async def generate_narrations_from_topic( + llm_service, + topic: str, + n_scenes: int = 5, + min_words: int = 5, + max_words: int = 20 +) -> List[str]: + """ + Generate narrations from topic using LLM + + Args: + llm_service: LLM service instance + topic: Topic/theme to generate narrations from + n_scenes: Number of narrations to generate + min_words: Minimum narration length + max_words: Maximum narration length + + Returns: + List of narration texts + """ + from pixelle_video.prompts import build_topic_narration_prompt + + logger.info(f"Generating {n_scenes} narrations from topic: {topic}") + + prompt = build_topic_narration_prompt( + topic=topic, + n_storyboard=n_scenes, + min_words=min_words, + max_words=max_words + ) + + response = await llm_service( + prompt=prompt, + temperature=0.8, + max_tokens=2000 + ) + + logger.debug(f"LLM response: {response[:200]}...") + + # Parse JSON + result = _parse_json(response) + + if "narrations" not in result: + raise ValueError("Invalid response format: missing 'narrations' key") + + narrations = result["narrations"] + + # Validate count + if len(narrations) > n_scenes: + logger.warning(f"Got {len(narrations)} narrations, taking first {n_scenes}") + narrations = narrations[:n_scenes] + elif len(narrations) < n_scenes: + raise ValueError(f"Expected {n_scenes} narrations, got only {len(narrations)}") + + logger.info(f"Generated {len(narrations)} narrations successfully") + return narrations + + +async def generate_narrations_from_content( + llm_service, + content: str, + n_scenes: int = 5, + min_words: int = 5, + max_words: int = 20 +) -> List[str]: + """ + Generate narrations from user-provided content using LLM + + Args: + llm_service: LLM service instance + content: User-provided content + n_scenes: Number of narrations to generate + min_words: Minimum narration length + max_words: Maximum narration length + + Returns: + List of narration texts + """ + from pixelle_video.prompts import build_content_narration_prompt + + logger.info(f"Generating {n_scenes} narrations from content ({len(content)} chars)") + + prompt = build_content_narration_prompt( + content=content, + n_storyboard=n_scenes, + min_words=min_words, + max_words=max_words + ) + + response = await llm_service( + prompt=prompt, + temperature=0.8, + max_tokens=2000 + ) + + # Parse JSON + result = _parse_json(response) + + if "narrations" not in result: + raise ValueError("Invalid response format: missing 'narrations' key") + + narrations = result["narrations"] + + # Validate count + if len(narrations) > n_scenes: + logger.warning(f"Got {len(narrations)} narrations, taking first {n_scenes}") + narrations = narrations[:n_scenes] + elif len(narrations) < n_scenes: + raise ValueError(f"Expected {n_scenes} narrations, got only {len(narrations)}") + + logger.info(f"Generated {len(narrations)} narrations successfully") + return narrations + + +async def split_narration_script( + script: str, +) -> List[str]: + """ + Split user-provided narration script into segments by lines + + Args: + script: Fixed narration script (each line is a narration) + + Returns: + List of narration segments + """ + logger.info(f"Splitting script by lines (length: {len(script)} chars)") + + # Split by newline, filter empty lines + narrations = [line.strip() for line in script.split('\n') if line.strip()] + + logger.info(f"✅ Split script into {len(narrations)} segments (by lines)") + + # Log statistics + if narrations: + lengths = [len(s) for s in narrations] + logger.info(f" Min: {min(lengths)} chars, Max: {max(lengths)} chars, Avg: {sum(lengths)//len(lengths)} chars") + + return narrations + + +async def generate_image_prompts( + llm_service, + narrations: List[str], + min_words: int = 30, + max_words: int = 60, + batch_size: int = 10, + max_retries: int = 3, + progress_callback: Optional[callable] = None +) -> List[str]: + """ + Generate image prompts from narrations (with batching and retry) + + Args: + llm_service: LLM service instance + narrations: List of narrations + min_words: Min image prompt length + max_words: Max image prompt length + batch_size: Max narrations per batch (default: 10) + max_retries: Max retry attempts per batch (default: 3) + progress_callback: Optional callback(completed, total, message) for progress updates + + Returns: + List of image prompts (base prompts, without prefix applied) + """ + from pixelle_video.prompts import build_image_prompt_prompt + + logger.info(f"Generating image prompts for {len(narrations)} narrations (batch_size={batch_size})") + + # Split narrations into batches + batches = [narrations[i:i + batch_size] for i in range(0, len(narrations), batch_size)] + logger.info(f"Split into {len(batches)} batches") + + all_prompts = [] + + # Process each batch + for batch_idx, batch_narrations in enumerate(batches, 1): + logger.info(f"Processing batch {batch_idx}/{len(batches)} ({len(batch_narrations)} narrations)") + + # Retry logic for this batch + for attempt in range(1, max_retries + 1): + try: + # Generate prompts for this batch + prompt = build_image_prompt_prompt( + narrations=batch_narrations, + min_words=min_words, + max_words=max_words + ) + + response = await llm_service( + prompt=prompt, + temperature=0.7, + max_tokens=8192 + ) + + logger.debug(f"Batch {batch_idx} attempt {attempt}: LLM response length: {len(response)} chars") + + # Parse JSON + result = _parse_json(response) + + if "image_prompts" not in result: + raise KeyError("Invalid response format: missing 'image_prompts'") + + batch_prompts = result["image_prompts"] + + # Validate count + if len(batch_prompts) != len(batch_narrations): + error_msg = ( + f"Batch {batch_idx} prompt count mismatch (attempt {attempt}/{max_retries}):\n" + f" Expected: {len(batch_narrations)} prompts\n" + f" Got: {len(batch_prompts)} prompts" + ) + logger.warning(error_msg) + + if attempt < max_retries: + logger.info(f"Retrying batch {batch_idx}...") + continue + else: + raise ValueError(error_msg) + + # Success! + logger.info(f"✅ Batch {batch_idx} completed successfully ({len(batch_prompts)} prompts)") + all_prompts.extend(batch_prompts) + + # Report progress + if progress_callback: + progress_callback( + len(all_prompts), + len(narrations), + f"Batch {batch_idx}/{len(batches)} completed" + ) + + break + + except json.JSONDecodeError as e: + logger.error(f"Batch {batch_idx} JSON parse error (attempt {attempt}/{max_retries}): {e}") + if attempt >= max_retries: + raise + logger.info(f"Retrying batch {batch_idx}...") + + logger.info(f"✅ Generated {len(all_prompts)} image prompts") + return all_prompts + + +def _parse_json(text: str) -> dict: + """ + Parse JSON from text, with fallback to extract JSON from markdown code blocks + + Args: + text: Text containing JSON + + Returns: + Parsed JSON dict + + Raises: + json.JSONDecodeError: If no valid JSON found + """ + # Try direct parsing first + try: + return json.loads(text) + except json.JSONDecodeError: + pass + + # Try to extract JSON from markdown code block + json_pattern = r'```(?:json)?\s*([\s\S]+?)\s*```' + match = re.search(json_pattern, text, re.DOTALL) + if match: + try: + return json.loads(match.group(1)) + except json.JSONDecodeError: + pass + + # Try to find any JSON object in the text + json_pattern = r'\{[^{}]*(?:"narrations"|"image_prompts")\s*:\s*\[[^\]]*\][^{}]*\}' + match = re.search(json_pattern, text, re.DOTALL) + if match: + try: + return json.loads(match.group(0)) + except json.JSONDecodeError: + pass + + # If all fails, raise error + raise json.JSONDecodeError("No valid JSON found", text, 0) +