分镜支持视频功能

This commit is contained in:
puke
2025-11-11 20:38:31 +08:00
parent cf9321feac
commit 0e2b6b17d0
17 changed files with 1225 additions and 321 deletions

View File

@@ -18,7 +18,7 @@ Core services providing atomic capabilities.
Services:
- LLMService: LLM text generation
- TTSService: Text-to-speech
- ImageService: Image generation
- MediaService: Media generation (image & video)
- VideoService: Video processing
- FrameProcessor: Frame processing orchestrator
- ComfyBaseService: Base class for ComfyUI-based services
@@ -27,15 +27,19 @@ Services:
from pixelle_video.services.comfy_base_service import ComfyBaseService
from pixelle_video.services.llm_service import LLMService
from pixelle_video.services.tts_service import TTSService
from pixelle_video.services.image import ImageService
from pixelle_video.services.media import MediaService
from pixelle_video.services.video import VideoService
from pixelle_video.services.frame_processor import FrameProcessor
# Backward compatibility alias
ImageService = MediaService
__all__ = [
"ComfyBaseService",
"LLMService",
"TTSService",
"ImageService",
"MediaService",
"ImageService", # Backward compatibility
"VideoService",
"FrameProcessor",
]

View File

@@ -84,7 +84,7 @@ class FrameProcessor:
))
await self._step_generate_audio(frame, config)
# Step 2: Generate image (conditional)
# Step 2: Generate media (image or video, conditional)
if needs_image:
if progress_callback:
progress_callback(ProgressEvent(
@@ -93,12 +93,13 @@ class FrameProcessor:
frame_current=frame_num,
frame_total=total_frames,
step=2,
action="image"
action="media"
))
await self._step_generate_image(frame, config)
await self._step_generate_media(frame, config)
else:
frame.image_path = None
logger.debug(f" 2/4: Skipped image generation (not required by template)")
frame.media_type = None
logger.debug(f" 2/4: Skipped media generation (not required by template)")
# Step 3: Compose frame (add subtitle)
if progress_callback:
@@ -176,27 +177,66 @@ class FrameProcessor:
logger.debug(f" ✓ Audio generated: {audio_path} ({frame.duration:.2f}s)")
async def _step_generate_image(
async def _step_generate_media(
self,
frame: StoryboardFrame,
config: StoryboardConfig
):
"""Step 2: Generate image using ComfyKit"""
logger.debug(f" 2/4: Generating image for frame {frame.index}...")
"""Step 2: Generate media (image or video) using ComfyKit"""
logger.debug(f" 2/4: Generating media for frame {frame.index}...")
# Call Image generation (with optional preset)
image_url = await self.core.image(
# Determine media type based on workflow
# video_ prefix in workflow name indicates video generation
workflow_name = config.image_workflow or ""
is_video_workflow = "video_" in workflow_name.lower()
media_type = "video" if is_video_workflow else "image"
logger.debug(f" → Media type: {media_type} (workflow: {workflow_name})")
# Call Media generation (with optional preset)
media_result = await self.core.media(
prompt=frame.image_prompt,
workflow=config.image_workflow, # Pass workflow from config (None = use default)
media_type=media_type,
width=config.image_width,
height=config.image_height
)
# Download image to local (pass task_id)
local_path = await self._download_image(image_url, frame.index, config.task_id)
frame.image_path = local_path
# Store media type
frame.media_type = media_result.media_type
logger.debug(f" ✓ Image generated: {local_path}")
if media_result.is_image:
# Download image to local (pass task_id)
local_path = await self._download_media(
media_result.url,
frame.index,
config.task_id,
media_type="image"
)
frame.image_path = local_path
logger.debug(f" ✓ Image generated: {local_path}")
elif media_result.is_video:
# Download video to local (pass task_id)
local_path = await self._download_media(
media_result.url,
frame.index,
config.task_id,
media_type="video"
)
frame.video_path = local_path
# Update duration from video if available
if media_result.duration:
frame.duration = media_result.duration
logger.debug(f" ✓ Video generated: {local_path} (duration: {frame.duration:.2f}s)")
else:
# Get video duration from file
frame.duration = await self._get_video_duration(local_path)
logger.debug(f" ✓ Video generated: {local_path} (duration: {frame.duration:.2f}s)")
else:
raise ValueError(f"Unknown media type: {media_result.media_type}")
async def _step_compose_frame(
self,
@@ -211,7 +251,9 @@ class FrameProcessor:
from pixelle_video.utils.os_util import get_task_frame_path
output_path = get_task_frame_path(config.task_id, frame.index, "composed")
# Use HTML template to compose frame
# For video type: render HTML as transparent overlay image
# For image type: render HTML with image background
# In both cases, we need the composed image
composed_path = await self._compose_frame_html(frame, storyboard, config, output_path)
frame.composed_image_path = composed_path
@@ -264,23 +306,60 @@ class FrameProcessor:
frame: StoryboardFrame,
config: StoryboardConfig
):
"""Step 4: Create video segment from image + audio"""
"""Step 4: Create video segment from media + audio"""
logger.debug(f" 4/4: Creating video segment for frame {frame.index}...")
# Generate output path using task_id
from pixelle_video.utils.os_util import get_task_frame_path
output_path = get_task_frame_path(config.task_id, frame.index, "segment")
# Call video compositor to create video from image + audio
from pixelle_video.services.video import VideoService
video_service = VideoService()
segment_path = video_service.create_video_from_image(
image=frame.composed_image_path,
audio=frame.audio_path,
output=output_path,
fps=config.video_fps
)
# Branch based on media type
if frame.media_type == "video":
# Video workflow: overlay HTML template on video, then add audio
logger.debug(f" → Using video-based composition with HTML overlay")
# Step 1: Overlay transparent HTML image on video
# The composed_image_path contains the rendered HTML with transparent background
temp_video_with_overlay = get_task_frame_path(config.task_id, frame.index, "video") + "_overlay.mp4"
video_service.overlay_image_on_video(
video=frame.video_path,
overlay_image=frame.composed_image_path,
output=temp_video_with_overlay,
scale_mode="contain" # Scale video to fit template size (contain mode)
)
# Step 2: Add narration audio to the overlaid video
# Note: The video might have audio (replaced) or be silent (audio added)
segment_path = video_service.merge_audio_video(
video=temp_video_with_overlay,
audio=frame.audio_path,
output=output_path,
replace_audio=True, # Replace video audio with narration
audio_volume=1.0
)
# Clean up temp file
import os
if os.path.exists(temp_video_with_overlay):
os.unlink(temp_video_with_overlay)
elif frame.media_type == "image" or frame.media_type is None:
# Image workflow: create video from image + audio
logger.debug(f" → Using image-based composition")
segment_path = video_service.create_video_from_image(
image=frame.composed_image_path,
audio=frame.audio_path,
output=output_path,
fps=config.video_fps
)
else:
raise ValueError(f"Unknown media type: {frame.media_type}")
frame.video_segment_path = segment_path
@@ -303,10 +382,16 @@ class FrameProcessor:
estimated_duration = file_size / 2000
return max(1.0, estimated_duration) # At least 1 second
async def _download_image(self, url: str, frame_index: int, task_id: str) -> str:
"""Download image from URL to local file"""
async def _download_media(
self,
url: str,
frame_index: int,
task_id: str,
media_type: str
) -> str:
"""Download media (image or video) from URL to local file"""
from pixelle_video.utils.os_util import get_task_frame_path
output_path = get_task_frame_path(task_id, frame_index, "image")
output_path = get_task_frame_path(task_id, frame_index, media_type)
async with httpx.AsyncClient() as client:
response = await client.get(url)
@@ -316,4 +401,16 @@ class FrameProcessor:
f.write(response.content)
return output_path
async def _get_video_duration(self, video_path: str) -> float:
"""Get video duration in seconds"""
try:
import ffmpeg
probe = ffmpeg.probe(video_path)
duration = float(probe['format']['duration'])
return duration
except Exception as e:
logger.warning(f"Failed to get video duration: {e}, using audio duration")
# Fallback: use audio duration if available
return 1.0 # Default to 1 second if unable to determine

View File

@@ -1,192 +0,0 @@
# Copyright (C) 2025 AIDC-AI
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Image Generation Service - ComfyUI Workflow-based implementation
"""
from typing import Optional
from comfykit import ComfyKit
from loguru import logger
from pixelle_video.services.comfy_base_service import ComfyBaseService
class ImageService(ComfyBaseService):
"""
Image generation service - Workflow-based
Uses ComfyKit to execute image generation workflows.
Usage:
# Use default workflow (workflows/image_flux.json)
image_url = await pixelle_video.image(prompt="a cat")
# Use specific workflow
image_url = await pixelle_video.image(
prompt="a cat",
workflow="image_flux.json"
)
# List available workflows
workflows = pixelle_video.image.list_workflows()
"""
WORKFLOW_PREFIX = "image_"
DEFAULT_WORKFLOW = None # No hardcoded default, must be configured
WORKFLOWS_DIR = "workflows"
def __init__(self, config: dict):
"""
Initialize image service
Args:
config: Full application config dict
"""
super().__init__(config, service_name="image")
async def __call__(
self,
prompt: str,
workflow: Optional[str] = None,
# ComfyUI connection (optional overrides)
comfyui_url: Optional[str] = None,
runninghub_api_key: Optional[str] = None,
# Common workflow parameters
width: Optional[int] = None,
height: Optional[int] = None,
negative_prompt: Optional[str] = None,
steps: Optional[int] = None,
seed: Optional[int] = None,
cfg: Optional[float] = None,
sampler: Optional[str] = None,
**params
) -> str:
"""
Generate image using workflow
Args:
prompt: Image generation prompt
workflow: Workflow filename (default: from config or "image_flux.json")
comfyui_url: ComfyUI URL (optional, overrides config)
runninghub_api_key: RunningHub API key (optional, overrides config)
width: Image width
height: Image height
negative_prompt: Negative prompt
steps: Sampling steps
seed: Random seed
cfg: CFG scale
sampler: Sampler name
**params: Additional workflow parameters
Returns:
Generated image URL/path
Examples:
# Simplest: use default workflow (workflows/image_flux.json)
image_url = await pixelle_video.image(prompt="a beautiful cat")
# Use specific workflow
image_url = await pixelle_video.image(
prompt="a cat",
workflow="image_flux.json"
)
# With additional parameters
image_url = await pixelle_video.image(
prompt="a cat",
workflow="image_flux.json",
width=1024,
height=1024,
steps=20,
seed=42
)
# With absolute path
image_url = await pixelle_video.image(
prompt="a cat",
workflow="/path/to/custom.json"
)
# With custom ComfyUI server
image_url = await pixelle_video.image(
prompt="a cat",
comfyui_url="http://192.168.1.100:8188"
)
"""
# 1. Resolve workflow (returns structured info)
workflow_info = self._resolve_workflow(workflow=workflow)
# 2. Prepare ComfyKit config (supports both selfhost and runninghub)
kit_config = self._prepare_comfykit_config(
comfyui_url=comfyui_url,
runninghub_api_key=runninghub_api_key
)
# 3. Build workflow parameters
workflow_params = {"prompt": prompt}
# Add optional parameters
if width is not None:
workflow_params["width"] = width
if height is not None:
workflow_params["height"] = height
if negative_prompt is not None:
workflow_params["negative_prompt"] = negative_prompt
if steps is not None:
workflow_params["steps"] = steps
if seed is not None:
workflow_params["seed"] = seed
if cfg is not None:
workflow_params["cfg"] = cfg
if sampler is not None:
workflow_params["sampler"] = sampler
# Add any additional parameters
workflow_params.update(params)
logger.debug(f"Workflow parameters: {workflow_params}")
# 4. Execute workflow (ComfyKit auto-detects based on input type)
try:
kit = ComfyKit(**kit_config)
# Determine what to pass to ComfyKit based on source
if workflow_info["source"] == "runninghub" and "workflow_id" in workflow_info:
# RunningHub: pass workflow_id (ComfyKit will use runninghub backend)
workflow_input = workflow_info["workflow_id"]
logger.info(f"Executing RunningHub workflow: {workflow_input}")
else:
# Selfhost: pass file path (ComfyKit will use local ComfyUI)
workflow_input = workflow_info["path"]
logger.info(f"Executing selfhost workflow: {workflow_input}")
result = await kit.execute(workflow_input, workflow_params)
# 5. Handle result
if result.status != "completed":
error_msg = result.msg or "Unknown error"
logger.error(f"Image generation failed: {error_msg}")
raise Exception(f"Image generation failed: {error_msg}")
if not result.images:
logger.error("No images generated")
raise Exception("No images generated")
image_url = result.images[0]
logger.info(f"✅ Generated image: {image_url}")
return image_url
except Exception as e:
logger.error(f"Image generation error: {e}")
raise

View File

@@ -0,0 +1,285 @@
# Copyright (C) 2025 AIDC-AI
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Media Generation Service - ComfyUI Workflow-based implementation
Supports both image and video generation workflows.
Automatically detects output type based on ExecuteResult.
"""
from typing import Optional
from comfykit import ComfyKit
from loguru import logger
from pixelle_video.services.comfy_base_service import ComfyBaseService
from pixelle_video.models.media import MediaResult
class MediaService(ComfyBaseService):
"""
Media generation service - Workflow-based
Uses ComfyKit to execute image/video generation workflows.
Supports both image_ and video_ workflow prefixes.
Usage:
# Use default workflow (workflows/image_flux.json)
media = await pixelle_video.media(prompt="a cat")
if media.is_image:
print(f"Generated image: {media.url}")
elif media.is_video:
print(f"Generated video: {media.url} ({media.duration}s)")
# Use specific workflow
media = await pixelle_video.media(
prompt="a cat",
workflow="image_flux.json"
)
# List available workflows
workflows = pixelle_video.media.list_workflows()
"""
WORKFLOW_PREFIX = "" # Will be overridden by _scan_workflows
DEFAULT_WORKFLOW = None # No hardcoded default, must be configured
WORKFLOWS_DIR = "workflows"
def __init__(self, config: dict):
"""
Initialize media service
Args:
config: Full application config dict
"""
super().__init__(config, service_name="image") # Keep "image" for config compatibility
def _scan_workflows(self):
"""
Scan workflows for both image_ and video_ prefixes
Override parent method to support multiple prefixes
"""
from pixelle_video.utils.os_util import list_resource_dirs, list_resource_files, get_resource_path
from pathlib import Path
workflows = []
# Get all workflow source directories
source_dirs = list_resource_dirs("workflows")
if not source_dirs:
logger.warning("No workflow source directories found")
return workflows
# Scan each source directory for workflow files
for source_name in source_dirs:
# Get all JSON files for this source
workflow_files = list_resource_files("workflows", source_name)
# Filter to only files matching image_ or video_ prefix
matching_files = [
f for f in workflow_files
if (f.startswith("image_") or f.startswith("video_")) and f.endswith('.json')
]
for filename in matching_files:
try:
# Get actual file path
file_path = Path(get_resource_path("workflows", source_name, filename))
workflow_info = self._parse_workflow_file(file_path, source_name)
workflows.append(workflow_info)
logger.debug(f"Found workflow: {workflow_info['key']}")
except Exception as e:
logger.error(f"Failed to parse workflow {source_name}/{filename}: {e}")
# Sort by key (source/name)
return sorted(workflows, key=lambda w: w["key"])
async def __call__(
self,
prompt: str,
workflow: Optional[str] = None,
# Media type specification (required for proper handling)
media_type: str = "image", # "image" or "video"
# ComfyUI connection (optional overrides)
comfyui_url: Optional[str] = None,
runninghub_api_key: Optional[str] = None,
# Common workflow parameters
width: Optional[int] = None,
height: Optional[int] = None,
negative_prompt: Optional[str] = None,
steps: Optional[int] = None,
seed: Optional[int] = None,
cfg: Optional[float] = None,
sampler: Optional[str] = None,
**params
) -> MediaResult:
"""
Generate media (image or video) using workflow
Media type must be specified explicitly via media_type parameter.
Returns a MediaResult object containing media type and URL.
Args:
prompt: Media generation prompt
workflow: Workflow filename (default: from config or "image_flux.json")
media_type: Type of media to generate - "image" or "video" (default: "image")
comfyui_url: ComfyUI URL (optional, overrides config)
runninghub_api_key: RunningHub API key (optional, overrides config)
width: Media width
height: Media height
negative_prompt: Negative prompt
steps: Sampling steps
seed: Random seed
cfg: CFG scale
sampler: Sampler name
**params: Additional workflow parameters
Returns:
MediaResult object with media_type ("image" or "video") and url
Examples:
# Simplest: use default workflow (workflows/image_flux.json)
media = await pixelle_video.media(prompt="a beautiful cat")
if media.is_image:
print(f"Image: {media.url}")
# Use specific workflow
media = await pixelle_video.media(
prompt="a cat",
workflow="image_flux.json"
)
# Video workflow
media = await pixelle_video.media(
prompt="a cat running",
workflow="image_video.json"
)
if media.is_video:
print(f"Video: {media.url}, duration: {media.duration}s")
# With additional parameters
media = await pixelle_video.media(
prompt="a cat",
workflow="image_flux.json",
width=1024,
height=1024,
steps=20,
seed=42
)
# With absolute path
media = await pixelle_video.media(
prompt="a cat",
workflow="/path/to/custom.json"
)
# With custom ComfyUI server
media = await pixelle_video.media(
prompt="a cat",
comfyui_url="http://192.168.1.100:8188"
)
"""
# 1. Resolve workflow (returns structured info)
workflow_info = self._resolve_workflow(workflow=workflow)
# 2. Prepare ComfyKit config (supports both selfhost and runninghub)
kit_config = self._prepare_comfykit_config(
comfyui_url=comfyui_url,
runninghub_api_key=runninghub_api_key
)
# 3. Build workflow parameters
workflow_params = {"prompt": prompt}
# Add optional parameters
if width is not None:
workflow_params["width"] = width
if height is not None:
workflow_params["height"] = height
if negative_prompt is not None:
workflow_params["negative_prompt"] = negative_prompt
if steps is not None:
workflow_params["steps"] = steps
if seed is not None:
workflow_params["seed"] = seed
if cfg is not None:
workflow_params["cfg"] = cfg
if sampler is not None:
workflow_params["sampler"] = sampler
# Add any additional parameters
workflow_params.update(params)
logger.debug(f"Workflow parameters: {workflow_params}")
# 4. Execute workflow (ComfyKit auto-detects based on input type)
try:
kit = ComfyKit(**kit_config)
# Determine what to pass to ComfyKit based on source
if workflow_info["source"] == "runninghub" and "workflow_id" in workflow_info:
# RunningHub: pass workflow_id (ComfyKit will use runninghub backend)
workflow_input = workflow_info["workflow_id"]
logger.info(f"Executing RunningHub workflow: {workflow_input}")
else:
# Selfhost: pass file path (ComfyKit will use local ComfyUI)
workflow_input = workflow_info["path"]
logger.info(f"Executing selfhost workflow: {workflow_input}")
result = await kit.execute(workflow_input, workflow_params)
# 5. Handle result based on specified media_type
if result.status != "completed":
error_msg = result.msg or "Unknown error"
logger.error(f"Media generation failed: {error_msg}")
raise Exception(f"Media generation failed: {error_msg}")
# Extract media based on specified type
if media_type == "video":
# Video workflow - get video from result
if not result.videos:
logger.error("No video generated (workflow returned no videos)")
raise Exception("No video generated")
video_url = result.videos[0]
logger.info(f"✅ Generated video: {video_url}")
# Try to extract duration from result (if available)
duration = None
if hasattr(result, 'duration') and result.duration:
duration = result.duration
return MediaResult(
media_type="video",
url=video_url,
duration=duration
)
else: # image
# Image workflow - get image from result
if not result.images:
logger.error("No image generated (workflow returned no images)")
raise Exception("No image generated")
image_url = result.images[0]
logger.info(f"✅ Generated image: {image_url}")
return MediaResult(
media_type="image",
url=image_url
)
except Exception as e:
logger.error(f"Media generation error: {e}")
raise

View File

@@ -239,6 +239,51 @@ class VideoService:
logger.error(f"FFmpeg concat filter error: {error_msg}")
raise RuntimeError(f"Failed to concatenate videos: {error_msg}")
def _get_video_duration(self, video: str) -> float:
"""Get video duration in seconds"""
try:
probe = ffmpeg.probe(video)
duration = float(probe['format']['duration'])
return duration
except Exception as e:
logger.warning(f"Failed to get video duration: {e}")
return 0.0
def _get_audio_duration(self, audio: str) -> float:
"""Get audio duration in seconds"""
try:
probe = ffmpeg.probe(audio)
duration = float(probe['format']['duration'])
return duration
except Exception as e:
logger.warning(f"Failed to get audio duration: {e}, using estimate")
# Fallback: estimate based on file size (very rough)
import os
file_size = os.path.getsize(audio)
# Assume ~16kbps for MP3, so 2KB per second
estimated_duration = file_size / 2000
return max(1.0, estimated_duration) # At least 1 second
def has_audio_stream(self, video: str) -> bool:
"""
Check if video has audio stream
Args:
video: Video file path
Returns:
True if video has audio stream, False otherwise
"""
try:
probe = ffmpeg.probe(video)
audio_streams = [s for s in probe.get('streams', []) if s['codec_type'] == 'audio']
has_audio = len(audio_streams) > 0
logger.debug(f"Video {video} has_audio={has_audio}")
return has_audio
except Exception as e:
logger.warning(f"Failed to probe video audio streams: {e}, assuming no audio")
return False
def merge_audio_video(
self,
video: str,
@@ -247,9 +292,18 @@ class VideoService:
replace_audio: bool = True,
audio_volume: float = 1.0,
video_volume: float = 0.0,
pad_strategy: str = "freeze", # "freeze" (freeze last frame) or "black" (black screen)
) -> str:
"""
Merge audio with video
Merge audio with video, using the longer duration
The output video duration will be the maximum of video and audio duration.
If audio is longer than video, the video will be padded using the specified strategy.
Automatically handles videos with or without audio streams.
- If video has no audio: adds the audio track
- If video has audio and replace_audio=True: replaces with new audio
- If video has audio and replace_audio=False: mixes both audio tracks
Args:
video: Video file path
@@ -259,6 +313,9 @@ class VideoService:
audio_volume: Volume of the new audio (0.0 to 1.0+)
video_volume: Volume of original video audio (0.0 to 1.0+)
Only used when replace_audio=False
pad_strategy: Strategy to pad video if audio is longer
- "freeze": Freeze last frame (default)
- "black": Fill with black screen
Returns:
Path to the output video file
@@ -267,28 +324,110 @@ class VideoService:
RuntimeError: If FFmpeg execution fails
Note:
- When replace_audio=True, video's original audio is removed
- When replace_audio=False, original and new audio are mixed
- Audio is trimmed/extended to match video duration
- Uses the longer duration between video and audio
- When audio is longer, video is padded using pad_strategy
- When video is longer, audio is looped or extended
- Automatically detects if video has audio
- When video is silent, audio is added regardless of replace_audio
- When replace_audio=True and video has audio, original audio is removed
- When replace_audio=False and video has audio, original and new audio are mixed
"""
# Get durations of video and audio
video_duration = self._get_video_duration(video)
audio_duration = self._get_audio_duration(audio)
logger.info(f"Video duration: {video_duration:.2f}s, Audio duration: {audio_duration:.2f}s")
# Determine target duration (max of both)
target_duration = max(video_duration, audio_duration)
logger.info(f"Target output duration: {target_duration:.2f}s")
# Check if video has audio stream
video_has_audio = self.has_audio_stream(video)
# Prepare video stream (potentially with padding)
input_video = ffmpeg.input(video)
video_stream = input_video.video
# Pad video if audio is longer
if audio_duration > video_duration:
pad_duration = audio_duration - video_duration
logger.info(f"Audio is longer, padding video by {pad_duration:.2f}s using '{pad_strategy}' strategy")
if pad_strategy == "freeze":
# Freeze last frame: tpad filter
video_stream = video_stream.filter('tpad', stop_mode='clone', stop_duration=pad_duration)
else: # black
# Generate black frames for padding duration
from pixelle_video.utils.os_util import get_temp_path
import os
# Get video properties
probe = ffmpeg.probe(video)
video_info = next(s for s in probe['streams'] if s['codec_type'] == 'video')
width = int(video_info['width'])
height = int(video_info['height'])
fps_str = video_info['r_frame_rate']
fps_num, fps_den = map(int, fps_str.split('/'))
fps = fps_num / fps_den if fps_den != 0 else 30
# Create black video for padding
black_video_path = get_temp_path(f"black_pad_{os.path.basename(output)}")
black_input = ffmpeg.input(
f'color=c=black:s={width}x{height}:r={fps}',
f='lavfi',
t=pad_duration
)
# Concatenate original video with black padding
video_stream = ffmpeg.concat(video_stream, black_input.video, v=1, a=0)
# Prepare audio stream
input_audio = ffmpeg.input(audio)
audio_stream = input_audio.audio.filter('volume', audio_volume)
if not video_has_audio:
logger.info(f"Video has no audio stream, adding audio track")
# Video is silent, just add the audio
try:
(
ffmpeg
.output(
video_stream,
audio_stream,
output,
vcodec='libx264', # Re-encode video if padded
acodec='aac',
audio_bitrate='192k',
t=target_duration # Trim to target duration
)
.overwrite_output()
.run(capture_stdout=True, capture_stderr=True)
)
logger.success(f"Audio added to silent video: {output}")
return output
except ffmpeg.Error as e:
error_msg = e.stderr.decode() if e.stderr else str(e)
logger.error(f"FFmpeg error adding audio to silent video: {error_msg}")
raise RuntimeError(f"Failed to add audio to video: {error_msg}")
# Video has audio, proceed with merging
logger.info(f"Merging audio with video (replace={replace_audio})")
try:
input_video = ffmpeg.input(video)
input_audio = ffmpeg.input(audio)
if replace_audio:
# Replace audio: use only new audio, ignore original
(
ffmpeg
.output(
input_video.video,
input_audio.audio.filter('volume', audio_volume),
video_stream,
audio_stream,
output,
vcodec='copy',
vcodec='libx264', # Re-encode video if padded
acodec='aac',
audio_bitrate='192k',
shortest=None
t=target_duration # Trim to target duration
)
.overwrite_output()
.run(capture_stdout=True, capture_stderr=True)
@@ -298,22 +437,23 @@ class VideoService:
mixed_audio = ffmpeg.filter(
[
input_video.audio.filter('volume', video_volume),
input_audio.audio.filter('volume', audio_volume)
audio_stream
],
'amix',
inputs=2,
duration='first'
duration='longest' # Use longest audio
)
(
ffmpeg
.output(
input_video.video,
video_stream,
mixed_audio,
output,
vcodec='copy',
vcodec='libx264', # Re-encode video if padded
acodec='aac',
audio_bitrate='192k'
audio_bitrate='192k',
t=target_duration # Trim to target duration
)
.overwrite_output()
.run(capture_stdout=True, capture_stderr=True)
@@ -326,6 +466,92 @@ class VideoService:
logger.error(f"FFmpeg merge error: {error_msg}")
raise RuntimeError(f"Failed to merge audio and video: {error_msg}")
def overlay_image_on_video(
self,
video: str,
overlay_image: str,
output: str,
scale_mode: str = "contain"
) -> str:
"""
Overlay a transparent image on top of video
Args:
video: Base video file path
overlay_image: Transparent overlay image path (e.g., rendered HTML with transparent background)
output: Output video file path
scale_mode: How to scale the base video to fit the overlay size
- "contain": Scale video to fit within overlay dimensions (letterbox/pillarbox)
- "cover": Scale video to cover overlay dimensions (may crop)
- "stretch": Stretch video to exact overlay dimensions
Returns:
Path to the output video file
Raises:
RuntimeError: If FFmpeg execution fails
Note:
- Overlay image should have transparent background
- Video is scaled to match overlay dimensions based on scale_mode
- Final video size matches overlay image size
- Video codec is re-encoded to support overlay
"""
logger.info(f"Overlaying image on video (scale_mode={scale_mode})")
try:
# Get overlay image dimensions
overlay_probe = ffmpeg.probe(overlay_image)
overlay_stream = next(s for s in overlay_probe['streams'] if s['codec_type'] == 'video')
overlay_width = int(overlay_stream['width'])
overlay_height = int(overlay_stream['height'])
logger.debug(f"Overlay dimensions: {overlay_width}x{overlay_height}")
input_video = ffmpeg.input(video)
input_overlay = ffmpeg.input(overlay_image)
# Scale video to fit overlay size using scale_mode
if scale_mode == "contain":
# Scale to fit (letterbox/pillarbox if aspect ratio differs)
# Use scale filter with force_original_aspect_ratio=decrease and pad to center
scaled_video = (
input_video
.filter('scale', overlay_width, overlay_height, force_original_aspect_ratio='decrease')
.filter('pad', overlay_width, overlay_height, '(ow-iw)/2', '(oh-ih)/2', color='black')
)
elif scale_mode == "cover":
# Scale to cover (crop if aspect ratio differs)
scaled_video = (
input_video
.filter('scale', overlay_width, overlay_height, force_original_aspect_ratio='increase')
.filter('crop', overlay_width, overlay_height)
)
else: # stretch
# Stretch to exact dimensions
scaled_video = input_video.filter('scale', overlay_width, overlay_height)
# Overlay the transparent image on top of the scaled video
output_stream = ffmpeg.overlay(scaled_video, input_overlay)
(
ffmpeg
.output(output_stream, output,
vcodec='libx264',
pix_fmt='yuv420p',
preset='medium',
crf=23)
.overwrite_output()
.run(capture_stdout=True, capture_stderr=True)
)
logger.success(f"Image overlaid on video: {output}")
return output
except ffmpeg.Error as e:
error_msg = e.stderr.decode() if e.stderr else str(e)
logger.error(f"FFmpeg overlay error: {error_msg}")
raise RuntimeError(f"Failed to overlay image on video: {error_msg}")
def create_video_from_image(
self,
image: str,