592 lines
20 KiB
Python
592 lines
20 KiB
Python
# Copyright (C) 2025 AIDC-AI
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
|
|
"""
|
|
Video Processing Service
|
|
|
|
High-performance video composition service built on ffmpeg-python.
|
|
|
|
Features:
|
|
- Video concatenation
|
|
- Audio/video merging
|
|
- Background music addition
|
|
- Image to video conversion
|
|
|
|
Note: Requires FFmpeg to be installed on the system.
|
|
"""
|
|
|
|
import os
|
|
import shutil
|
|
import tempfile
|
|
from pathlib import Path
|
|
from typing import List, Literal, Optional
|
|
|
|
import ffmpeg
|
|
from loguru import logger
|
|
|
|
from pixelle_video.utils.os_util import (
|
|
get_resource_path,
|
|
list_resource_files,
|
|
resource_exists
|
|
)
|
|
|
|
|
|
def check_ffmpeg() -> None:
|
|
"""
|
|
Check if FFmpeg is installed on the system
|
|
|
|
Raises:
|
|
RuntimeError: If FFmpeg is not found
|
|
"""
|
|
if not shutil.which("ffmpeg"):
|
|
raise RuntimeError(
|
|
"FFmpeg not found. Please install it:\n"
|
|
" macOS: brew install ffmpeg\n"
|
|
" Ubuntu/Debian: apt-get install ffmpeg\n"
|
|
" Windows: https://ffmpeg.org/download.html"
|
|
)
|
|
|
|
|
|
# Check FFmpeg availability on module import
|
|
check_ffmpeg()
|
|
|
|
|
|
class VideoService:
|
|
"""
|
|
Video compositor for common video processing tasks
|
|
|
|
Uses ffmpeg-python for high-performance video processing.
|
|
All operations preserve video quality when possible (stream copy).
|
|
|
|
Examples:
|
|
>>> compositor = VideoCompositor()
|
|
>>>
|
|
>>> # Concatenate videos
|
|
>>> compositor.concat_videos(
|
|
... ["intro.mp4", "main.mp4", "outro.mp4"],
|
|
... "final.mp4"
|
|
... )
|
|
>>>
|
|
>>> # Add voiceover
|
|
>>> compositor.merge_audio_video(
|
|
... "visual.mp4",
|
|
... "voiceover.mp3",
|
|
... "final.mp4"
|
|
... )
|
|
>>>
|
|
>>> # Add background music
|
|
>>> compositor.add_bgm(
|
|
... "video.mp4",
|
|
... "music.mp3",
|
|
... "final.mp4",
|
|
... bgm_volume=0.3
|
|
... )
|
|
>>>
|
|
>>> # Create video from image + audio
|
|
>>> compositor.create_video_from_image(
|
|
... "frame.png",
|
|
... "narration.mp3",
|
|
... "segment.mp4"
|
|
... )
|
|
"""
|
|
|
|
def concat_videos(
|
|
self,
|
|
videos: List[str],
|
|
output: str,
|
|
method: Literal["demuxer", "filter"] = "demuxer",
|
|
bgm_path: Optional[str] = None,
|
|
bgm_volume: float = 0.2,
|
|
bgm_mode: Literal["once", "loop"] = "loop"
|
|
) -> str:
|
|
"""
|
|
Concatenate multiple videos into one
|
|
|
|
Args:
|
|
videos: List of video file paths to concatenate
|
|
output: Output video file path
|
|
method: Concatenation method
|
|
- "demuxer": Fast, no re-encoding (requires identical formats)
|
|
- "filter": Slower but handles different formats
|
|
bgm_path: Background music file path (optional)
|
|
- None: No BGM
|
|
- Filename (e.g., "default.mp3", "happy.mp3"): Use built-in BGM from bgm/ folder
|
|
- Custom path: Use custom BGM file
|
|
bgm_volume: BGM volume level (0.0-1.0), default 0.2
|
|
bgm_mode: BGM playback mode
|
|
- "once": Play BGM once
|
|
- "loop": Loop BGM to match video duration
|
|
|
|
Returns:
|
|
Path to the output video file
|
|
|
|
Raises:
|
|
ValueError: If videos list is empty
|
|
RuntimeError: If FFmpeg execution fails
|
|
|
|
Note:
|
|
- demuxer method requires all videos to have identical:
|
|
resolution, codec, fps, etc.
|
|
- filter method re-encodes videos, slower but more compatible
|
|
"""
|
|
if not videos:
|
|
raise ValueError("Videos list cannot be empty")
|
|
|
|
if len(videos) == 1:
|
|
logger.info(f"Only one video provided, copying to {output}")
|
|
shutil.copy(videos[0], output)
|
|
return output
|
|
|
|
logger.info(f"Concatenating {len(videos)} videos using {method} method")
|
|
|
|
# Step 1: Concatenate videos
|
|
if bgm_path:
|
|
# If BGM needed, concatenate to temp file first
|
|
temp_output = output.replace('.mp4', '_no_bgm.mp4')
|
|
concat_result = self._concat_demuxer(videos, temp_output) if method == "demuxer" else self._concat_filter(videos, temp_output)
|
|
|
|
# Step 2: Add BGM
|
|
logger.info(f"Adding BGM: {bgm_path} (volume={bgm_volume}, mode={bgm_mode})")
|
|
final_result = self._add_bgm_to_video(
|
|
video=concat_result,
|
|
bgm_path=bgm_path,
|
|
output=output,
|
|
volume=bgm_volume,
|
|
mode=bgm_mode
|
|
)
|
|
|
|
# Clean up temp file
|
|
if os.path.exists(temp_output):
|
|
os.unlink(temp_output)
|
|
|
|
return final_result
|
|
else:
|
|
# No BGM, direct concatenation
|
|
if method == "demuxer":
|
|
return self._concat_demuxer(videos, output)
|
|
else:
|
|
return self._concat_filter(videos, output)
|
|
|
|
def _concat_demuxer(self, videos: List[str], output: str) -> str:
|
|
"""
|
|
Concatenate using concat demuxer (fast, no re-encoding)
|
|
|
|
FFmpeg equivalent:
|
|
ffmpeg -f concat -safe 0 -i filelist.txt -c copy output.mp4
|
|
"""
|
|
# Create temporary file list
|
|
with tempfile.NamedTemporaryFile(
|
|
mode='w',
|
|
delete=False,
|
|
suffix='.txt',
|
|
encoding='utf-8'
|
|
) as f:
|
|
for video in videos:
|
|
abs_path = Path(video).absolute()
|
|
escaped_path = str(abs_path).replace("'", "'\\''")
|
|
f.write(f"file '{escaped_path}'\n")
|
|
filelist = f.name
|
|
|
|
try:
|
|
logger.debug(f"Created filelist: {filelist}")
|
|
(
|
|
ffmpeg
|
|
.input(filelist, format='concat', safe=0)
|
|
.output(output, c='copy')
|
|
.overwrite_output()
|
|
.run(capture_stdout=True, capture_stderr=True)
|
|
)
|
|
logger.success(f"Videos concatenated successfully: {output}")
|
|
return output
|
|
except ffmpeg.Error as e:
|
|
error_msg = e.stderr.decode() if e.stderr else str(e)
|
|
logger.error(f"FFmpeg concat error: {error_msg}")
|
|
raise RuntimeError(f"Failed to concatenate videos: {error_msg}")
|
|
finally:
|
|
if os.path.exists(filelist):
|
|
os.unlink(filelist)
|
|
|
|
def _concat_filter(self, videos: List[str], output: str) -> str:
|
|
"""
|
|
Concatenate using concat filter (slower but handles different formats)
|
|
|
|
FFmpeg equivalent:
|
|
ffmpeg -i v1.mp4 -i v2.mp4 -filter_complex "[0:v][0:a][1:v][1:a]concat=n=2:v=1:a=1[v][a]"
|
|
-map "[v]" -map "[a]" output.mp4
|
|
"""
|
|
try:
|
|
inputs = [ffmpeg.input(v) for v in videos]
|
|
(
|
|
ffmpeg
|
|
.concat(*inputs, v=1, a=1)
|
|
.output(output)
|
|
.overwrite_output()
|
|
.run(capture_stdout=True, capture_stderr=True)
|
|
)
|
|
logger.success(f"Videos concatenated successfully: {output}")
|
|
return output
|
|
except ffmpeg.Error as e:
|
|
error_msg = e.stderr.decode() if e.stderr else str(e)
|
|
logger.error(f"FFmpeg concat filter error: {error_msg}")
|
|
raise RuntimeError(f"Failed to concatenate videos: {error_msg}")
|
|
|
|
def merge_audio_video(
|
|
self,
|
|
video: str,
|
|
audio: str,
|
|
output: str,
|
|
replace_audio: bool = True,
|
|
audio_volume: float = 1.0,
|
|
video_volume: float = 0.0,
|
|
) -> str:
|
|
"""
|
|
Merge audio with video
|
|
|
|
Args:
|
|
video: Video file path
|
|
audio: Audio file path
|
|
output: Output video file path
|
|
replace_audio: If True, replace video's audio; if False, mix with original
|
|
audio_volume: Volume of the new audio (0.0 to 1.0+)
|
|
video_volume: Volume of original video audio (0.0 to 1.0+)
|
|
Only used when replace_audio=False
|
|
|
|
Returns:
|
|
Path to the output video file
|
|
|
|
Raises:
|
|
RuntimeError: If FFmpeg execution fails
|
|
|
|
Note:
|
|
- When replace_audio=True, video's original audio is removed
|
|
- When replace_audio=False, original and new audio are mixed
|
|
- Audio is trimmed/extended to match video duration
|
|
"""
|
|
logger.info(f"Merging audio with video (replace={replace_audio})")
|
|
|
|
try:
|
|
input_video = ffmpeg.input(video)
|
|
input_audio = ffmpeg.input(audio)
|
|
|
|
if replace_audio:
|
|
# Replace audio: use only new audio, ignore original
|
|
(
|
|
ffmpeg
|
|
.output(
|
|
input_video.video,
|
|
input_audio.audio.filter('volume', audio_volume),
|
|
output,
|
|
vcodec='copy',
|
|
acodec='aac',
|
|
audio_bitrate='192k',
|
|
shortest=None
|
|
)
|
|
.overwrite_output()
|
|
.run(capture_stdout=True, capture_stderr=True)
|
|
)
|
|
else:
|
|
# Mix audio: combine original and new audio
|
|
mixed_audio = ffmpeg.filter(
|
|
[
|
|
input_video.audio.filter('volume', video_volume),
|
|
input_audio.audio.filter('volume', audio_volume)
|
|
],
|
|
'amix',
|
|
inputs=2,
|
|
duration='first'
|
|
)
|
|
|
|
(
|
|
ffmpeg
|
|
.output(
|
|
input_video.video,
|
|
mixed_audio,
|
|
output,
|
|
vcodec='copy',
|
|
acodec='aac',
|
|
audio_bitrate='192k'
|
|
)
|
|
.overwrite_output()
|
|
.run(capture_stdout=True, capture_stderr=True)
|
|
)
|
|
|
|
logger.success(f"Audio merged successfully: {output}")
|
|
return output
|
|
except ffmpeg.Error as e:
|
|
error_msg = e.stderr.decode() if e.stderr else str(e)
|
|
logger.error(f"FFmpeg merge error: {error_msg}")
|
|
raise RuntimeError(f"Failed to merge audio and video: {error_msg}")
|
|
|
|
def create_video_from_image(
|
|
self,
|
|
image: str,
|
|
audio: str,
|
|
output: str,
|
|
fps: int = 30,
|
|
) -> str:
|
|
"""
|
|
Create video from static image and audio
|
|
|
|
Args:
|
|
image: Image file path
|
|
audio: Audio file path
|
|
output: Output video path
|
|
fps: Frames per second
|
|
|
|
Returns:
|
|
Path to the output video
|
|
|
|
Raises:
|
|
RuntimeError: If FFmpeg execution fails
|
|
|
|
Note:
|
|
- Image is displayed as static frame for the duration of audio
|
|
- Video duration matches audio duration
|
|
- Useful for creating video segments from storyboard frames
|
|
|
|
Example:
|
|
>>> compositor.create_video_from_image(
|
|
... "frame.png",
|
|
... "narration.mp3",
|
|
... "segment.mp4"
|
|
... )
|
|
"""
|
|
logger.info("Creating video from image and audio")
|
|
|
|
try:
|
|
# Get audio duration to ensure exact video duration match
|
|
probe = ffmpeg.probe(audio)
|
|
audio_duration = float(probe['format']['duration'])
|
|
logger.debug(f"Audio duration: {audio_duration:.3f}s")
|
|
|
|
# Input image with loop (loop=1 means loop indefinitely)
|
|
# Use framerate to set input framerate
|
|
input_image = ffmpeg.input(image, loop=1, framerate=fps)
|
|
input_audio = ffmpeg.input(audio)
|
|
|
|
# Combine image and audio
|
|
# Use -t to explicitly set video duration = audio duration
|
|
(
|
|
ffmpeg
|
|
.output(
|
|
input_image,
|
|
input_audio,
|
|
output,
|
|
t=audio_duration, # Force video duration to match audio exactly
|
|
vcodec='libx264',
|
|
acodec='aac',
|
|
pix_fmt='yuv420p',
|
|
audio_bitrate='192k',
|
|
preset='medium',
|
|
crf=23,
|
|
**{'b:v': '2M'} # Video bitrate
|
|
)
|
|
.overwrite_output()
|
|
.run(capture_stdout=True, capture_stderr=True)
|
|
)
|
|
|
|
logger.success(f"Video created from image: {output} (duration: {audio_duration:.3f}s)")
|
|
return output
|
|
except ffmpeg.Error as e:
|
|
error_msg = e.stderr.decode() if e.stderr else str(e)
|
|
logger.error(f"FFmpeg error creating video from image: {error_msg}")
|
|
raise RuntimeError(f"Failed to create video from image: {error_msg}")
|
|
|
|
def add_bgm(
|
|
self,
|
|
video: str,
|
|
bgm: str,
|
|
output: str,
|
|
bgm_volume: float = 0.3,
|
|
loop: bool = True,
|
|
fade_in: float = 0.0,
|
|
fade_out: float = 0.0,
|
|
) -> str:
|
|
"""
|
|
Add background music to video
|
|
|
|
Args:
|
|
video: Video file path
|
|
bgm: Background music file path
|
|
output: Output video file path
|
|
bgm_volume: BGM volume relative to original (0.0 to 1.0+)
|
|
loop: If True, loop BGM to match video duration
|
|
fade_in: BGM fade-in duration in seconds
|
|
fade_out: BGM fade-out duration in seconds (not yet implemented)
|
|
|
|
Returns:
|
|
Path to the output video file
|
|
|
|
Raises:
|
|
RuntimeError: If FFmpeg execution fails
|
|
|
|
Note:
|
|
- BGM is mixed with original video audio
|
|
- If loop=True, BGM repeats until video ends
|
|
- Fade effects are applied to BGM only
|
|
"""
|
|
logger.info(f"Adding BGM to video (volume={bgm_volume}, loop={loop})")
|
|
|
|
try:
|
|
input_video = ffmpeg.input(video)
|
|
|
|
# Configure BGM input with looping if needed
|
|
bgm_input = ffmpeg.input(
|
|
bgm,
|
|
stream_loop=-1 if loop else 0 # -1 = infinite loop
|
|
)
|
|
|
|
# Apply volume adjustment to BGM
|
|
bgm_audio = bgm_input.audio.filter('volume', bgm_volume)
|
|
|
|
# Apply fade effects if specified
|
|
if fade_in > 0:
|
|
bgm_audio = bgm_audio.filter('afade', type='in', duration=fade_in)
|
|
# Note: fade_out at the end requires knowing the duration, which is complex
|
|
# For now, we skip fade_out in this implementation
|
|
# A more advanced implementation would need to:
|
|
# 1. Get video duration
|
|
# 2. Calculate fade_out start time
|
|
# 3. Apply fade filter with specific start_time
|
|
|
|
# Mix original audio with BGM
|
|
mixed_audio = ffmpeg.filter(
|
|
[input_video.audio, bgm_audio],
|
|
'amix',
|
|
inputs=2,
|
|
duration='first' # Use video's duration
|
|
)
|
|
|
|
(
|
|
ffmpeg
|
|
.output(
|
|
input_video.video,
|
|
mixed_audio,
|
|
output,
|
|
vcodec='copy',
|
|
acodec='aac',
|
|
audio_bitrate='192k'
|
|
)
|
|
.overwrite_output()
|
|
.run(capture_stdout=True, capture_stderr=True)
|
|
)
|
|
|
|
logger.success(f"BGM added successfully: {output}")
|
|
return output
|
|
except ffmpeg.Error as e:
|
|
error_msg = e.stderr.decode() if e.stderr else str(e)
|
|
logger.error(f"FFmpeg BGM error: {error_msg}")
|
|
raise RuntimeError(f"Failed to add BGM: {error_msg}")
|
|
|
|
def _add_bgm_to_video(
|
|
self,
|
|
video: str,
|
|
bgm_path: str,
|
|
output: str,
|
|
volume: float = 0.2,
|
|
mode: Literal["once", "loop"] = "loop"
|
|
) -> str:
|
|
"""
|
|
Internal helper to add BGM to video with path resolution
|
|
|
|
Args:
|
|
video: Video file path
|
|
bgm_path: BGM path (can be preset name or custom path)
|
|
output: Output file path
|
|
volume: BGM volume (0.0-1.0)
|
|
mode: "once" or "loop"
|
|
|
|
Returns:
|
|
Path to output video
|
|
|
|
Raises:
|
|
FileNotFoundError: If BGM file not found
|
|
"""
|
|
# Resolve BGM path (raises FileNotFoundError if not found)
|
|
resolved_bgm = self._resolve_bgm_path(bgm_path)
|
|
|
|
# Add BGM using existing method
|
|
loop = (mode == "loop")
|
|
return self.add_bgm(
|
|
video=video,
|
|
bgm=resolved_bgm,
|
|
output=output,
|
|
bgm_volume=volume,
|
|
loop=loop,
|
|
fade_in=0.0
|
|
)
|
|
|
|
def _resolve_bgm_path(self, bgm_path: str) -> str:
|
|
"""
|
|
Resolve BGM path (filename or custom path) with custom override support
|
|
|
|
Search priority:
|
|
1. Direct path (absolute or relative)
|
|
2. data/bgm/{filename} (custom)
|
|
3. bgm/{filename} (default)
|
|
|
|
Args:
|
|
bgm_path: Can be:
|
|
- Filename with extension (e.g., "default.mp3", "happy.mp3"): auto-resolved from bgm/ or data/bgm/
|
|
- Custom file path (absolute or relative)
|
|
|
|
Returns:
|
|
Resolved absolute path
|
|
|
|
Raises:
|
|
FileNotFoundError: If BGM file not found
|
|
"""
|
|
# Try direct path first (absolute or relative)
|
|
if os.path.exists(bgm_path):
|
|
return os.path.abspath(bgm_path)
|
|
|
|
# Try as filename in resource directories (custom > default)
|
|
if resource_exists("bgm", bgm_path):
|
|
return get_resource_path("bgm", bgm_path)
|
|
|
|
# Not found - provide helpful error message
|
|
tried_paths = [
|
|
os.path.abspath(bgm_path),
|
|
f"data/bgm/{bgm_path} or bgm/{bgm_path}"
|
|
]
|
|
|
|
# List available BGM files
|
|
available_bgm = self._list_available_bgm()
|
|
available_msg = f"\n Available BGM files: {', '.join(available_bgm)}" if available_bgm else ""
|
|
|
|
raise FileNotFoundError(
|
|
f"BGM file not found: '{bgm_path}'\n"
|
|
f" Tried paths:\n"
|
|
f" 1. {tried_paths[0]}\n"
|
|
f" 2. {tried_paths[1]}"
|
|
f"{available_msg}"
|
|
)
|
|
|
|
def _list_available_bgm(self) -> list[str]:
|
|
"""
|
|
List available BGM files (merged from bgm/ and data/bgm/)
|
|
|
|
Returns:
|
|
List of filenames (with extensions), sorted
|
|
"""
|
|
try:
|
|
# Use resource API to get merged list
|
|
all_files = list_resource_files("bgm")
|
|
|
|
# Filter to audio files only
|
|
audio_extensions = ('.mp3', '.wav', '.ogg', '.flac', '.m4a', '.aac')
|
|
return sorted([f for f in all_files if f.lower().endswith(audio_extensions)])
|
|
except Exception as e:
|
|
logger.warning(f"Failed to list BGM files: {e}")
|
|
return []
|
|
|