From ea784e0d069bdf3005677f8124a06e43e88edadf Mon Sep 17 00:00:00 2001 From: puke <1129090915@qq.com> Date: Wed, 3 Dec 2025 20:11:32 +0800 Subject: [PATCH] =?UTF-8?q?=E6=94=AF=E6=8C=81=E5=9F=BA=E4=BA=8E=E5=9B=BE?= =?UTF-8?q?=E7=89=87=E7=B4=A0=E6=9D=90=E7=9A=84=E8=A7=86=E9=A2=91=E7=94=9F?= =?UTF-8?q?=E6=88=90=E9=80=BB=E8=BE=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pixelle_video/pipelines/__init__.py | 2 + pixelle_video/pipelines/asset_based.py | 661 ++++++++++++++++++++++ pixelle_video/service.py | 10 +- pixelle_video/services/frame_processor.py | 44 +- pixelle_video/services/image_analysis.py | 197 +++++++ pixelle_video/services/llm_service.py | 174 +++++- pixelle_video/utils/workflow_util.py | 67 +++ workflows/runninghub/analyse_image.json | 4 + workflows/selfhost/analyse_image.json | 61 ++ 9 files changed, 1180 insertions(+), 40 deletions(-) create mode 100644 pixelle_video/pipelines/asset_based.py create mode 100644 pixelle_video/services/image_analysis.py create mode 100644 pixelle_video/utils/workflow_util.py create mode 100644 workflows/runninghub/analyse_image.json create mode 100644 workflows/selfhost/analyse_image.json diff --git a/pixelle_video/pipelines/__init__.py b/pixelle_video/pipelines/__init__.py index 9a58c3e..9850d25 100644 --- a/pixelle_video/pipelines/__init__.py +++ b/pixelle_video/pipelines/__init__.py @@ -21,6 +21,7 @@ from pixelle_video.pipelines.base import BasePipeline from pixelle_video.pipelines.linear import LinearVideoPipeline, PipelineContext from pixelle_video.pipelines.standard import StandardPipeline from pixelle_video.pipelines.custom import CustomPipeline +from pixelle_video.pipelines.asset_based import AssetBasedPipeline __all__ = [ "BasePipeline", @@ -28,5 +29,6 @@ __all__ = [ "PipelineContext", "StandardPipeline", "CustomPipeline", + "AssetBasedPipeline", ] diff --git a/pixelle_video/pipelines/asset_based.py b/pixelle_video/pipelines/asset_based.py new file mode 100644 index 0000000..082ddb8 --- /dev/null +++ b/pixelle_video/pipelines/asset_based.py @@ -0,0 +1,661 @@ +# Copyright (C) 2025 AIDC-AI +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Asset-Based Video Pipeline + +Generates marketing videos from user-provided assets (images/videos) rather than +AI-generated media. Ideal for small businesses with existing media libraries. + +Workflow: +1. Analyze uploaded assets (images/videos) +2. Generate script based on user intent and available assets +3. Match assets to script scenes +4. Compose final video with narrations + +Example: + pipeline = AssetBasedPipeline(pixelle_video) + result = await pipeline( + assets=["/path/img1.jpg", "/path/img2.jpg"], + video_title="Pet Store Year-End Sale", + style="warm and friendly", + duration=30 + ) +""" + +from typing import List, Dict, Any, Optional +from pathlib import Path + +from loguru import logger +from pydantic import BaseModel, Field + +from pixelle_video.pipelines.linear import LinearVideoPipeline, PipelineContext +from pixelle_video.utils.os_util import ( + create_task_output_dir, + get_task_final_video_path +) + + +# ==================== Structured Output Models ==================== + +class SceneScript(BaseModel): + """Single scene in the video script""" + scene_number: int = Field(description="Scene number starting from 1") + asset_path: str = Field(description="Path to the asset file for this scene") + narrations: List[str] = Field(description="List of narration sentences for this scene (1-5 sentences)") + duration: int = Field(description="Estimated duration in seconds for this scene") + + +class VideoScript(BaseModel): + """Complete video script with scenes""" + scenes: List[SceneScript] = Field(description="List of scenes in the video") + + +class AssetBasedPipeline(LinearVideoPipeline): + """ + Asset-Based Video Pipeline + + Generates videos from user-provided assets instead of AI-generated media. + """ + + def __init__(self, core): + """ + Initialize pipeline + + Args: + core: PixelleVideoCore instance + """ + super().__init__(core) + self.asset_index: Dict[str, Any] = {} # In-memory asset metadata + + async def __call__( + self, + assets: List[str], + video_title: str = "", + intent: Optional[str] = None, + style: str = "professional and engaging", + duration: int = 30, + source: str = "runninghub", + bgm_path: Optional[str] = None, + bgm_volume: float = 0.2, + bgm_mode: str = "loop", + **kwargs + ) -> PipelineContext: + """ + Execute pipeline with user-provided assets + + Args: + assets: List of asset file paths + video_title: Video title + intent: Video intent/purpose (defaults to video_title) + style: Video style + duration: Target duration in seconds + source: Workflow source ("runninghub" or "selfhost") + bgm_path: Path to background music file (optional) + bgm_volume: BGM volume (0.0-1.0, default 0.2) + bgm_mode: BGM mode ("loop" or "once", default "loop") + **kwargs: Additional parameters + + Returns: + Pipeline context with generated video + """ + from pixelle_video.pipelines.linear import PipelineContext + + # Create custom context with asset-specific parameters + ctx = PipelineContext( + input_text=intent or video_title, # Use intent or title as input_text + params={ + "assets": assets, + "video_title": video_title, + "intent": intent or video_title, + "style": style, + "duration": duration, + "source": source, + "bgm_path": bgm_path, + "bgm_volume": bgm_volume, + "bgm_mode": bgm_mode, + **kwargs + } + ) + + # Store request parameters in context for easy access + ctx.request = ctx.params + + try: + # Execute pipeline lifecycle + await self.setup_environment(ctx) + await self.determine_title(ctx) + await self.generate_content(ctx) + await self.plan_visuals(ctx) + await self.initialize_storyboard(ctx) + await self.produce_assets(ctx) + await self.post_production(ctx) + await self.finalize(ctx) + + return ctx + + except Exception as e: + await self.handle_exception(ctx, e) + raise + + async def setup_environment(self, context: PipelineContext) -> PipelineContext: + """ + Analyze uploaded assets and build asset index + + Args: + context: Pipeline context with assets list + + Returns: + Updated context with asset_index + """ + # Create isolated task directory + task_dir, task_id = create_task_output_dir() + context.task_id = task_id + context.task_dir = Path(task_dir) # Convert to Path for easier usage + + # Determine final video path + context.final_video_path = get_task_final_video_path(task_id) + + logger.info(f"📁 Task directory created: {task_dir}") + logger.info("🔍 Analyzing uploaded assets...") + + assets: List[str] = context.request.get("assets", []) + if not assets: + raise ValueError("No assets provided. Please upload at least one image or video.") + + logger.info(f"Found {len(assets)} assets to analyze") + + self.asset_index = {} + + for i, asset_path in enumerate(assets, 1): + asset_path_obj = Path(asset_path) + + if not asset_path_obj.exists(): + logger.warning(f"Asset not found: {asset_path}") + continue + + logger.info(f"Analyzing asset {i}/{len(assets)}: {asset_path_obj.name}") + + # Determine asset type + asset_type = self._get_asset_type(asset_path_obj) + + if asset_type == "image": + # Analyze image using ImageAnalysisService + analysis_source = context.request.get("source", "runninghub") + description = await self.core.image_analysis(asset_path, source=analysis_source) + + self.asset_index[asset_path] = { + "path": asset_path, + "type": "image", + "name": asset_path_obj.name, + "description": description + } + + logger.info(f"✅ Image analyzed: {description[:50]}...") + + elif asset_type == "video": + # TODO: Extract keyframes and analyze + # For MVP, we'll skip video analysis and just record metadata + self.asset_index[asset_path] = { + "path": asset_path, + "type": "video", + "name": asset_path_obj.name, + "description": "Video asset" + } + + logger.info(f"⏭️ Video registered (analysis not yet implemented)") + + else: + logger.warning(f"Unknown asset type: {asset_path}") + + logger.success(f"✅ Asset analysis complete: {len(self.asset_index)} assets indexed") + + # Store asset index in context + context.asset_index = self.asset_index + + return context + + async def determine_title(self, context: PipelineContext) -> PipelineContext: + """ + Use user-provided title or generate one via LLM + + Args: + context: Pipeline context + + Returns: + Updated context with title + """ + from pixelle_video.utils.content_generators import generate_title + + title = context.request.get("video_title") + + if title: + context.title = title + logger.info(f"📝 Video title: {title} (user-specified)") + else: + # Generate title from intent using LLM + intent = context.request.get("intent", context.input_text) + context.title = await generate_title( + self.core.llm, + content=intent, + strategy="llm" + ) + logger.info(f"📝 Video title: {context.title} (LLM-generated)") + + return context + + async def generate_content(self, context: PipelineContext) -> PipelineContext: + """ + Generate video script using LLM with structured output + + LLM directly assigns assets to scenes - no complex matching logic needed. + + Args: + context: Pipeline context + + Returns: + Updated context with generated script (scenes already have asset_path assigned) + """ + logger.info("🤖 Generating video script with LLM...") + + # Build prompt for LLM + intent = context.request.get("intent", context.title) + style = context.request.get("style", "professional and engaging") + duration = context.request.get("duration", 30) + + # Prepare asset descriptions with full paths for LLM to reference + asset_info = [] + for asset_path, metadata in self.asset_index.items(): + asset_info.append(f"- Path: {asset_path}\n Description: {metadata['description']}") + + assets_text = "\n".join(asset_info) + + prompt = f"""You are a video script writer. Generate a {duration}-second video script. + +## Requirements +- Intent: {intent} +- Style: {style} +- Target Duration: {duration} seconds + +## Available Assets (use the exact path in your response) +{assets_text} + +## Instructions +1. Decide how many scenes are needed based on the target duration (typically 5-15 seconds per scene) +2. For each scene, directly assign ONE asset from the available assets above +3. Each scene can have 1-5 narration sentences +4. Try to use all available assets, but it's OK to reuse if needed +5. Total duration of all scenes should be approximately {duration} seconds + +## Output Requirements +For each scene, provide: +- scene_number: Sequential number starting from 1 +- asset_path: The EXACT path from the available assets list +- narrations: Array of 1-5 narration sentences +- duration: Estimated duration in seconds + +Generate the video script now:""" + + # Call LLM with structured output + script: VideoScript = await self.core.llm( + prompt=prompt, + response_type=VideoScript, + temperature=0.8, + max_tokens=4000 + ) + + # Convert to dict format for compatibility with downstream code + context.script = [scene.model_dump() for scene in script.scenes] + + # Validate asset paths exist + for scene in context.script: + asset_path = scene.get("asset_path") + if asset_path not in self.asset_index: + # Find closest match (in case LLM slightly modified the path) + matched = False + for known_path in self.asset_index.keys(): + if Path(known_path).name == Path(asset_path).name: + scene["asset_path"] = known_path + matched = True + logger.warning(f"Corrected asset path: {asset_path} -> {known_path}") + break + + if not matched: + # Fallback to first available asset + fallback_path = list(self.asset_index.keys())[0] + logger.warning(f"Unknown asset path '{asset_path}', using fallback: {fallback_path}") + scene["asset_path"] = fallback_path + + logger.success(f"✅ Generated script with {len(context.script)} scenes") + + # Log script preview + for scene in context.script: + narrations = scene.get("narrations", []) + if isinstance(narrations, str): + narrations = [narrations] + narration_preview = " | ".join([n[:30] + "..." if len(n) > 30 else n for n in narrations[:2]]) + asset_name = Path(scene.get("asset_path", "unknown")).name + logger.info(f"Scene {scene['scene_number']} [{asset_name}]: {narration_preview}") + + return context + + async def plan_visuals(self, context: PipelineContext) -> PipelineContext: + """ + Prepare matched scenes from LLM-generated script + + Since LLM already assigned asset_path in generate_content, this method + simply converts the script format to matched_scenes format. + + Args: + context: Pipeline context + + Returns: + Updated context with matched_scenes + """ + logger.info("🎯 Preparing scene-asset mapping...") + + # LLM already assigned asset_path to each scene in generate_content + # Just convert to matched_scenes format for downstream compatibility + context.matched_scenes = [ + { + **scene, + "matched_asset": scene["asset_path"] # Alias for compatibility + } + for scene in context.script + ] + + # Log asset usage summary + asset_usage = {} + for scene in context.matched_scenes: + asset = scene["matched_asset"] + asset_usage[asset] = asset_usage.get(asset, 0) + 1 + + logger.info(f"📊 Asset usage summary:") + for asset_path, count in asset_usage.items(): + logger.info(f" {Path(asset_path).name}: {count} scene(s)") + + return context + + async def initialize_storyboard(self, context: PipelineContext) -> PipelineContext: + """ + Initialize storyboard from matched scenes + + Args: + context: Pipeline context + + Returns: + Updated context with storyboard + """ + from pixelle_video.models.storyboard import ( + Storyboard, + StoryboardFrame, + StoryboardConfig + ) + from datetime import datetime + + # Extract all narrations in order for compatibility + all_narrations = [] + for scene in context.matched_scenes: + narrations = scene.get("narrations", [scene.get("narration", "")]) + if isinstance(narrations, str): + narrations = [narrations] + all_narrations.extend(narrations) + + context.narrations = all_narrations + + # Get template dimensions + template_name = context.params.get("frame_template", "1080x1920/image_default.html") + # Extract dimensions from template name (e.g., "1080x1920") + try: + dims = template_name.split("/")[0].split("x") + media_width = int(dims[0]) + media_height = int(dims[1]) + except: + # Default to 1080x1920 + media_width = 1080 + media_height = 1920 + + # Create StoryboardConfig + context.config = StoryboardConfig( + task_id=context.task_id, + n_storyboard=len(context.matched_scenes), # Number of scenes + min_narration_words=5, + max_narration_words=50, + video_fps=30, + tts_inference_mode="local", + voice_id=context.params.get("voice_id", "zh-CN-YunjianNeural"), + tts_speed=context.params.get("tts_speed", 1.2), + media_width=media_width, + media_height=media_height, + frame_template=template_name, + template_params=context.params.get("template_params") + ) + + # Create Storyboard + context.storyboard = Storyboard( + title=context.title, + config=context.config, + created_at=datetime.now() + ) + + # Create StoryboardFrames - one per scene + for i, scene in enumerate(context.matched_scenes): + # Get first narration for the frame (we'll combine audios later) + narrations = scene.get("narrations", [scene.get("narration", "")]) + if isinstance(narrations, str): + narrations = [narrations] + + # Use first narration as the main text (for subtitle) + # We'll combine all narrations in the audio + main_narration = " ".join(narrations) # Combine for subtitle display + + frame = StoryboardFrame( + index=i, + narration=main_narration, + image_prompt=None, # We're using user assets, not generating images + created_at=datetime.now() + ) + + # Store matched asset path in the frame + frame.image_path = scene["matched_asset"] + frame.media_type = "image" + + # Store scene info for later audio generation + frame._scene_data = scene # Temporary storage for multi-narration + + context.storyboard.frames.append(frame) + + logger.info(f"✅ Created storyboard with {len(context.storyboard.frames)} scenes") + + return context + + async def produce_assets(self, context: PipelineContext) -> PipelineContext: + """ + Generate scene videos using FrameProcessor (asset + multiple narrations + template) + + Args: + context: Pipeline context + + Returns: + Updated context with processed frames + """ + logger.info("🎬 Producing scene videos...") + + storyboard = context.storyboard + config = context.config + + for i, frame in enumerate(storyboard.frames, 1): + logger.info(f"Producing scene {i}/{len(storyboard.frames)}...") + + # Get scene data with narrations + scene = frame._scene_data + narrations = scene.get("narrations", [scene.get("narration", "")]) + if isinstance(narrations, str): + narrations = [narrations] + + logger.info(f"Scene {i} has {len(narrations)} narration(s)") + + # Step 1: Generate audio for each narration and combine + narration_audios = [] + for j, narration_text in enumerate(narrations, 1): + audio_path = Path(context.task_dir) / "frames" / f"{i:02d}_narration_{j}.mp3" + audio_path.parent.mkdir(parents=True, exist_ok=True) + + await self.core.tts( + text=narration_text, + output_path=str(audio_path), + voice_id=config.voice_id, + speed=config.tts_speed + ) + + narration_audios.append(str(audio_path)) + logger.debug(f" Narration {j}/{len(narrations)}: {narration_text[:30]}...") + + # Concatenate all narration audios for this scene + if len(narration_audios) > 1: + from pixelle_video.utils.os_util import get_task_frame_path + + combined_audio_path = Path(context.task_dir) / "frames" / f"{i:02d}_audio.mp3" + + # Use FFmpeg to concatenate audio files + import subprocess + + # Create a file list for FFmpeg concat + filelist_path = Path(context.task_dir) / "frames" / f"{i:02d}_audiolist.txt" + with open(filelist_path, 'w') as f: + for audio_file in narration_audios: + escaped_path = str(Path(audio_file).absolute()).replace("'", "'\\''") + f.write(f"file '{escaped_path}'\n") + + # Concatenate audio files + concat_cmd = [ + 'ffmpeg', + '-f', 'concat', + '-safe', '0', + '-i', str(filelist_path), + '-c', 'copy', + '-y', + str(combined_audio_path) + ] + + subprocess.run(concat_cmd, check=True, capture_output=True) + frame.audio_path = str(combined_audio_path) + + logger.info(f"✅ Combined {len(narration_audios)} narrations into one audio") + else: + frame.audio_path = narration_audios[0] + + # Step 2: Use FrameProcessor to generate composed frame and video + # FrameProcessor will handle: + # - Template rendering (with proper dimensions) + # - Subtitle composition + # - Video segment creation + # - Proper file naming in frames/ + + # Since we already have the audio and image, we bypass some steps + # by manually calling the composition steps + + # Get audio duration for frame duration + import subprocess + duration_cmd = [ + 'ffprobe', + '-v', 'error', + '-show_entries', 'format=duration', + '-of', 'default=noprint_wrappers=1:nokey=1', + frame.audio_path + ] + duration_result = subprocess.run(duration_cmd, capture_output=True, text=True, check=True) + frame.duration = float(duration_result.stdout.strip()) + + # Use FrameProcessor for proper composition + processed_frame = await self.core.frame_processor( + frame=frame, + storyboard=storyboard, + config=config, + total_frames=len(storyboard.frames) + ) + + logger.success(f"✅ Scene {i} complete") + + return context + + async def post_production(self, context: PipelineContext) -> PipelineContext: + """ + Concatenate scene videos and add BGM + + Args: + context: Pipeline context + + Returns: + Updated context with final video path + """ + logger.info("🎞️ Concatenating scenes...") + + # Collect video segments from storyboard frames + scene_videos = [frame.video_segment_path for frame in context.storyboard.frames] + + final_video_path = Path(context.task_dir) / f"{context.title}.mp4" + + # Get BGM parameters + bgm_path = context.request.get("bgm_path") + bgm_volume = context.request.get("bgm_volume", 0.2) + bgm_mode = context.request.get("bgm_mode", "loop") + + if bgm_path: + logger.info(f"🎵 Adding BGM: {bgm_path} (volume={bgm_volume}, mode={bgm_mode})") + + self.core.video.concat_videos( + videos=scene_videos, + output=str(final_video_path), + bgm_path=bgm_path, + bgm_volume=bgm_volume, + bgm_mode=bgm_mode + ) + + context.final_video_path = str(final_video_path) + context.storyboard.final_video_path = str(final_video_path) + + logger.success(f"✅ Final video: {final_video_path}") + + return context + + async def finalize(self, context: PipelineContext) -> PipelineContext: + """ + Finalize and return result + + Args: + context: Pipeline context + + Returns: + Final context + """ + logger.success(f"🎉 Asset-based video generation complete!") + logger.info(f"Video: {context.final_video_path}") + + return context + + # Helper methods + + def _get_asset_type(self, path: Path) -> str: + """Determine asset type from file extension""" + image_exts = {".jpg", ".jpeg", ".png", ".gif", ".webp"} + video_exts = {".mp4", ".mov", ".avi", ".mkv", ".webm"} + + ext = path.suffix.lower() + + if ext in image_exts: + return "image" + elif ext in video_exts: + return "video" + else: + return "unknown" + diff --git a/pixelle_video/service.py b/pixelle_video/service.py index 6de4f50..a006be4 100644 --- a/pixelle_video/service.py +++ b/pixelle_video/service.py @@ -27,12 +27,14 @@ from pixelle_video.config import config_manager from pixelle_video.services.llm_service import LLMService from pixelle_video.services.tts_service import TTSService from pixelle_video.services.media import MediaService +from pixelle_video.services.image_analysis import ImageAnalysisService from pixelle_video.services.video import VideoService from pixelle_video.services.frame_processor import FrameProcessor from pixelle_video.services.persistence import PersistenceService from pixelle_video.services.history_manager import HistoryManager from pixelle_video.pipelines.standard import StandardPipeline from pixelle_video.pipelines.custom import CustomPipeline +from pixelle_video.pipelines.asset_based import AssetBasedPipeline class PixelleVideoCore: @@ -184,9 +186,12 @@ class PixelleVideoCore: logger.info("🚀 Initializing Pixelle-Video...") # 1. Initialize core services (ComfyKit will be lazy-loaded later) + # Initialize services self.llm = LLMService(self.config) - self.tts = TTSService(self.config, self) - self.media = MediaService(self.config, self) + self.tts = TTSService(self.config, core=self) + self.media = MediaService(self.config, core=self) + self.image = self.media # Alias for backward compatibility + self.image_analysis = ImageAnalysisService(self.config, core=self) self.video = VideoService() self.frame_processor = FrameProcessor(self) self.persistence = PersistenceService(output_dir="output") @@ -196,6 +201,7 @@ class PixelleVideoCore: self.pipelines = { "standard": StandardPipeline(self), "custom": CustomPipeline(self), + "asset_based": AssetBasedPipeline(self), } logger.info(f"📹 Registered pipelines: {', '.join(self.pipelines.keys())}") diff --git a/pixelle_video/services/frame_processor.py b/pixelle_video/services/frame_processor.py index c26a9fb..30d16e0 100644 --- a/pixelle_video/services/frame_processor.py +++ b/pixelle_video/services/frame_processor.py @@ -73,23 +73,29 @@ class FrameProcessor: frame_num = frame.index + 1 # Determine if this frame needs image generation - needs_image = frame.image_prompt is not None + # If image_path is already set (e.g. asset-based pipeline), we consider it "needs image" but skip generation + has_existing_image = frame.image_path is not None + needs_generation = frame.image_prompt is not None try: # Step 1: Generate audio (TTS) - if progress_callback: - progress_callback(ProgressEvent( - event_type="frame_step", - progress=0.0, - frame_current=frame_num, - frame_total=total_frames, - step=1, - action="audio" - )) - await self._step_generate_audio(frame, config) + if not frame.audio_path: + if progress_callback: + progress_callback(ProgressEvent( + event_type="frame_step", + progress=0.0, + frame_current=frame_num, + frame_total=total_frames, + step=1, + action="audio" + )) + await self._step_generate_audio(frame, config) + else: + logger.debug(f" 1/4: Using existing audio: {frame.audio_path}") # Step 2: Generate media (image or video, conditional) - if needs_image: + # Step 2: Generate media (image or video, conditional) + if needs_generation: if progress_callback: progress_callback(ProgressEvent( event_type="frame_step", @@ -100,16 +106,18 @@ class FrameProcessor: action="media" )) await self._step_generate_media(frame, config) + elif has_existing_image: + logger.debug(f" 2/4: Using existing image: {frame.image_path}") else: frame.image_path = None frame.media_type = None logger.debug(f" 2/4: Skipped media generation (not required by template)") - + # Step 3: Compose frame (add subtitle) if progress_callback: progress_callback(ProgressEvent( event_type="frame_step", - progress=0.50 if needs_image else 0.33, + progress=0.50 if (needs_generation or has_existing_image) else 0.33, frame_current=frame_num, frame_total=total_frames, step=3, @@ -121,17 +129,18 @@ class FrameProcessor: if progress_callback: progress_callback(ProgressEvent( event_type="frame_step", - progress=0.75 if needs_image else 0.67, + progress=0.75 if (needs_generation or has_existing_image) else 0.67, frame_current=frame_num, frame_total=total_frames, step=4, action="video" )) + await self._step_create_video_segment(frame, config) logger.info(f"✅ Frame {frame.index} completed") return frame - + except Exception as e: logger.error(f"❌ Failed to process frame {frame.index}: {e}") raise @@ -303,6 +312,9 @@ class FrameProcessor: # Generate frame using HTML (size is auto-parsed from template path) generator = HTMLFrameGenerator(template_path) + + logger.debug(f"Generating frame with image: '{frame.image_path}' (type: {type(frame.image_path)})") + composed_path = await generator.generate_frame( title=storyboard.title, text=frame.narration, diff --git a/pixelle_video/services/image_analysis.py b/pixelle_video/services/image_analysis.py new file mode 100644 index 0000000..e87f06a --- /dev/null +++ b/pixelle_video/services/image_analysis.py @@ -0,0 +1,197 @@ +# Copyright (C) 2025 AIDC-AI +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Image Analysis Service - ComfyUI Workflow-based implementation + +Uses Florence-2 or other vision models to analyze images and generate descriptions. +""" + +from typing import Optional, Literal +from pathlib import Path + +from comfykit import ComfyKit +from loguru import logger + +from pixelle_video.services.comfy_base_service import ComfyBaseService + + +class ImageAnalysisService(ComfyBaseService): + """ + Image analysis service - Workflow-based + + Uses ComfyKit to execute image analysis workflows (e.g., Florence-2, BLIP, etc.). + Returns detailed textual descriptions of images. + + Convention: workflows follow {source}/analyse_image.json pattern + - runninghub/analyse_image.json (default, cloud-based) + - selfhost/analyse_image.json (local ComfyUI) + + Usage: + # Use default (runninghub cloud) + description = await pixelle_video.image_analysis("path/to/image.jpg") + + # Use local ComfyUI + description = await pixelle_video.image_analysis( + "path/to/image.jpg", + source="selfhost" + ) + + # List available workflows + workflows = pixelle_video.image_analysis.list_workflows() + """ + + WORKFLOW_PREFIX = "analyse_" + WORKFLOWS_DIR = "workflows" + + def __init__(self, config: dict, core=None): + """ + Initialize image analysis service + + Args: + config: Full application config dict + core: PixelleVideoCore instance (for accessing shared ComfyKit) + """ + super().__init__(config, service_name="image_analysis", core=core) + + async def __call__( + self, + image_path: str, + # Workflow source selection + source: Literal['runninghub', 'selfhost'] = 'runninghub', + workflow: Optional[str] = None, + # ComfyUI connection (optional overrides) + comfyui_url: Optional[str] = None, + runninghub_api_key: Optional[str] = None, + # Additional workflow parameters + **params + ) -> str: + """ + Analyze an image using workflow + + Args: + image_path: Path to the image file (local or URL) + source: Workflow source - 'runninghub' (cloud, default) or 'selfhost' (local ComfyUI) + workflow: Workflow filename (optional, overrides source-based resolution) + comfyui_url: ComfyUI URL (optional, overrides config) + runninghub_api_key: RunningHub API key (optional, overrides config) + **params: Additional workflow parameters + + Returns: + str: Text description of the image + + Examples: + # Simplest: use default (runninghub cloud) + description = await pixelle_video.image_analysis("temp/06.JPG") + + # Use local ComfyUI + description = await pixelle_video.image_analysis( + "temp/06.JPG", + source="selfhost" + ) + + # Use specific workflow (bypass source-based resolution) + description = await pixelle_video.image_analysis( + "temp/06.JPG", + workflow="selfhost/custom_analysis.json" + ) + """ + from pixelle_video.utils.workflow_util import resolve_workflow_path + + # 1. Validate image path + image_path_obj = Path(image_path) + if not image_path_obj.exists(): + raise FileNotFoundError(f"Image file not found: {image_path}") + + # 2. Resolve workflow path using convention + if workflow is None: + # Use standardized naming: {source}/analyse_image.json + workflow = resolve_workflow_path("analyse_image", source) + logger.info(f"Using {source} workflow: {workflow}") + + # 2. Resolve workflow (returns structured info) + workflow_info = self._resolve_workflow(workflow=workflow) + + # 3. Build workflow parameters + workflow_params = { + "image": str(image_path) # Pass image path to workflow + } + + # Add any additional parameters + workflow_params.update(params) + + logger.debug(f"Workflow parameters: {workflow_params}") + + # 4. Execute workflow using shared ComfyKit instance from core + try: + # Get shared ComfyKit instance (lazy initialization + config hot-reload) + kit = await self.core._get_or_create_comfykit() + + # Determine what to pass to ComfyKit based on source + if workflow_info["source"] == "runninghub" and "workflow_id" in workflow_info: + # RunningHub: pass workflow_id + workflow_input = workflow_info["workflow_id"] + logger.info(f"Executing RunningHub workflow: {workflow_input}") + else: + # Selfhost: pass file path + workflow_input = workflow_info["path"] + logger.info(f"Executing selfhost workflow: {workflow_input}") + + result = await kit.execute(workflow_input, workflow_params) + + # 5. Extract description from result + if result.status != "completed": + error_msg = result.msg or "Unknown error" + logger.error(f"Image analysis failed: {error_msg}") + raise Exception(f"Image analysis failed: {error_msg}") + + # Extract text description from result (format varies by source) + description = None + + # Try format 1: Selfhost outputs (direct text in outputs) + # Format: {'6': {'text': ['description text']}} + if result.outputs: + for node_id, node_output in result.outputs.items(): + if 'text' in node_output: + text_list = node_output['text'] + if text_list and len(text_list) > 0: + description = text_list[0] + break + + # Try format 2: RunningHub raw_data (text file URL) + # Format: {'raw_data': [{'fileUrl': 'https://...txt', 'fileType': 'txt', ...}]} + if not description and result.outputs and 'raw_data' in result.outputs: + raw_data = result.outputs['raw_data'] + if raw_data and len(raw_data) > 0: + # Find text file entry + for item in raw_data: + if item.get('fileType') == 'txt' and 'fileUrl' in item: + # Download text content from URL + import aiohttp + async with aiohttp.ClientSession() as session: + async with session.get(item['fileUrl']) as resp: + if resp.status == 200: + description = await resp.text() + description = description.strip() + break + + if not description: + logger.error(f"No text found in outputs: {result.outputs}") + raise Exception("No description generated") + + logger.info(f"✅ Image analyzed: {description[:100]}...") + + return description + + except Exception as e: + logger.error(f"Image analysis error: {e}") + raise diff --git a/pixelle_video/services/llm_service.py b/pixelle_video/services/llm_service.py index ece8708..30cd31c 100644 --- a/pixelle_video/services/llm_service.py +++ b/pixelle_video/services/llm_service.py @@ -12,15 +12,22 @@ """ LLM (Large Language Model) Service - Direct OpenAI SDK implementation + +Supports structured output via response_type parameter (Pydantic model). """ -import os -from typing import Optional +import json +import re +from typing import Optional, Type, TypeVar, Union from openai import AsyncOpenAI +from pydantic import BaseModel from loguru import logger +T = TypeVar("T", bound=BaseModel) + + class LLMService: """ LLM (Large Language Model) service @@ -114,8 +121,9 @@ class LLMService: model: Optional[str] = None, temperature: float = 0.7, max_tokens: int = 2000, + response_type: Optional[Type[T]] = None, **kwargs - ) -> str: + ) -> Union[str, T]: """ Generate text using LLM @@ -126,24 +134,28 @@ class LLMService: model: Model name (optional, uses config if not provided) temperature: Sampling temperature (0.0-2.0). Lower is more deterministic. max_tokens: Maximum tokens to generate + response_type: Optional Pydantic model class for structured output. + If provided, returns parsed model instance instead of string. **kwargs: Additional provider-specific parameters Returns: - Generated text + Generated text (str) or parsed Pydantic model instance (if response_type provided) Examples: - # Use config from config.yaml + # Basic text generation answer = await pixelle_video.llm("Explain atomic habits") - # Override with custom parameters - answer = await pixelle_video.llm( - prompt="Explain atomic habits in 3 sentences", - api_key="sk-custom-key", - base_url="https://api.custom.com/v1", - model="custom-model", - temperature=0.7, - max_tokens=500 + # Structured output with Pydantic model + class MovieReview(BaseModel): + title: str + rating: int + summary: str + + review = await pixelle_video.llm( + prompt="Review the movie Inception", + response_type=MovieReview ) + print(review.title) # Structured access """ # Create client (new instance each time to support parameter overrides) client = self._create_client(api_key=api_key, base_url=base_url) @@ -155,25 +167,143 @@ class LLMService: or "gpt-3.5-turbo" # Default fallback ) - logger.debug(f"LLM call: model={final_model}, base_url={client.base_url}") + logger.debug(f"LLM call: model={final_model}, base_url={client.base_url}, response_type={response_type}") try: - response = await client.chat.completions.create( - model=final_model, + if response_type is not None: + # Structured output mode - try beta.chat.completions.parse first + return await self._call_with_structured_output( + client=client, + model=final_model, + prompt=prompt, + response_type=response_type, + temperature=temperature, + max_tokens=max_tokens, + **kwargs + ) + else: + # Standard text output mode + response = await client.chat.completions.create( + model=final_model, + messages=[{"role": "user", "content": prompt}], + temperature=temperature, + max_tokens=max_tokens, + **kwargs + ) + + result = response.choices[0].message.content + logger.debug(f"LLM response length: {len(result)} chars") + + return result + + except Exception as e: + logger.error(f"LLM call error (model={final_model}, base_url={client.base_url}): {e}") + raise + + async def _call_with_structured_output( + self, + client: AsyncOpenAI, + model: str, + prompt: str, + response_type: Type[T], + temperature: float, + max_tokens: int, + **kwargs + ) -> T: + """ + Call LLM with structured output support + + Tries OpenAI beta.chat.completions.parse first, falls back to JSON parsing + if the provider doesn't support structured outputs. + + Args: + client: OpenAI client + model: Model name + prompt: The prompt + response_type: Pydantic model class + temperature: Sampling temperature + max_tokens: Max tokens + **kwargs: Additional parameters + + Returns: + Parsed Pydantic model instance + """ + # Try OpenAI structured output API first (beta.chat.completions.parse) + try: + response = await client.beta.chat.completions.parse( + model=model, messages=[{"role": "user", "content": prompt}], + response_format=response_type, temperature=temperature, max_tokens=max_tokens, **kwargs ) - result = response.choices[0].message.content - logger.debug(f"LLM response length: {len(result)} chars") + parsed = response.choices[0].message.parsed + if parsed is not None: + logger.debug(f"Structured output parsed successfully via beta API") + return parsed + + # If parsed is None, fall through to fallback + logger.warning("Structured output API returned None, falling back to JSON parsing") + content = response.choices[0].message.content - return result - except Exception as e: - logger.error(f"LLM call error (model={final_model}, base_url={client.base_url}): {e}") - raise + # If beta API not supported, fall back to JSON mode + logger.debug(f"Structured output API not available ({e}), falling back to JSON parsing") + + response = await client.chat.completions.create( + model=model, + messages=[{"role": "user", "content": prompt}], + temperature=temperature, + max_tokens=max_tokens, + **kwargs + ) + content = response.choices[0].message.content + + # Fallback: Parse JSON from response content + return self._parse_response_as_model(content, response_type) + + def _parse_response_as_model(self, content: str, response_type: Type[T]) -> T: + """ + Parse LLM response content as Pydantic model + + Args: + content: Raw LLM response text + response_type: Target Pydantic model class + + Returns: + Parsed model instance + """ + # Try direct JSON parsing first + try: + data = json.loads(content) + return response_type.model_validate(data) + except json.JSONDecodeError: + pass + + # Try extracting from markdown code block + json_pattern = r'```(?:json)?\s*([\s\S]+?)\s*```' + match = re.search(json_pattern, content, re.DOTALL) + if match: + try: + data = json.loads(match.group(1)) + return response_type.model_validate(data) + except json.JSONDecodeError: + pass + + # Try to find any JSON object in the text + brace_start = content.find('{') + brace_end = content.rfind('}') + if brace_start != -1 and brace_end > brace_start: + try: + json_str = content[brace_start:brace_end + 1] + data = json.loads(json_str) + return response_type.model_validate(data) + except json.JSONDecodeError: + pass + + raise ValueError(f"Failed to parse LLM response as {response_type.__name__}: {content[:200]}...") @property def active(self) -> str: diff --git a/pixelle_video/utils/workflow_util.py b/pixelle_video/utils/workflow_util.py new file mode 100644 index 0000000..b4b816d --- /dev/null +++ b/pixelle_video/utils/workflow_util.py @@ -0,0 +1,67 @@ +# Copyright (C) 2025 AIDC-AI +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Workflow Path Resolver + +Standardized workflow path resolution for all ComfyUI services. +Convention: {source}/{service}.json + +Examples: + - Image analysis: selfhost/analyse_image.json, runninghub/analyse_image.json + - Image generation: selfhost/image.json, runninghub/image.json + - Video generation: selfhost/video.json, runninghub/video.json + - TTS: selfhost/tts.json, runninghub/tts.json +""" + +from typing import Literal + +WorkflowSource = Literal['runninghub', 'selfhost'] + + +def resolve_workflow_path( + service_name: str, + source: WorkflowSource = 'runninghub' +) -> str: + """ + Resolve workflow path using standardized naming convention + + Convention: workflows/{source}/{service_name}.json + + Args: + service_name: Service identifier (e.g., "analyse_image", "image", "video", "tts") + source: Workflow source - 'runninghub' (default) or 'selfhost' + + Returns: + Workflow path in format: "{source}/{service_name}.json" + + Examples: + >>> resolve_workflow_path("analyse_image", "runninghub") + 'runninghub/analyse_image.json' + + >>> resolve_workflow_path("analyse_image", "selfhost") + 'selfhost/analyse_image.json' + + >>> resolve_workflow_path("image") # defaults to runninghub + 'runninghub/image.json' + """ + return f"{source}/{service_name}.json" + + +def get_default_source() -> WorkflowSource: + """ + Get default workflow source + + Returns: + 'runninghub' - Cloud-first approach, better for beginners + """ + return 'runninghub' diff --git a/workflows/runninghub/analyse_image.json b/workflows/runninghub/analyse_image.json new file mode 100644 index 0000000..76b3a43 --- /dev/null +++ b/workflows/runninghub/analyse_image.json @@ -0,0 +1,4 @@ +{ + "source": "runninghub", + "workflow_id": "1996069253201739777" +} \ No newline at end of file diff --git a/workflows/selfhost/analyse_image.json b/workflows/selfhost/analyse_image.json new file mode 100644 index 0000000..cafdd27 --- /dev/null +++ b/workflows/selfhost/analyse_image.json @@ -0,0 +1,61 @@ +{ + "3": { + "inputs": { + "model": "microsoft/Florence-2-large", + "precision": "fp16", + "attention": "sdpa", + "convert_to_safetensors": false + }, + "class_type": "DownloadAndLoadFlorence2Model", + "_meta": { + "title": "DownloadAndLoadFlorence2Model" + } + }, + "4": { + "inputs": { + "text_input": "", + "task": "more_detailed_caption", + "fill_mask": true, + "keep_model_loaded": false, + "max_new_tokens": 1024, + "num_beams": 3, + "do_sample": true, + "output_mask_select": "", + "seed": 853848678279928, + "image": [ + "5", + 0 + ], + "florence2_model": [ + "3", + 0 + ] + }, + "class_type": "Florence2Run", + "_meta": { + "title": "Florence2Run" + } + }, + "5": { + "inputs": { + "image": "06.JPG" + }, + "class_type": "LoadImage", + "_meta": { + "title": "$image.image" + } + }, + "6": { + "inputs": { + "text": "The image shows a white cat sitting on a black and white striped stool against a white wall. The cat is wearing a blue knitted sweater and is looking directly at the camera with a curious expression. Its ears are perked up and its eyes are wide open, giving it an alert and inquisitive look. The background is plain white, making the cat the focal point of the image.", + "anything": [ + "4", + 2 + ] + }, + "class_type": "easy showAnything", + "_meta": { + "title": "Show Any" + } + } +} \ No newline at end of file