优化视频音频合并逻辑,新增智能时长调整功能

This commit is contained in:
puke
2025-11-21 00:56:24 +08:00
parent d8e380bdb5
commit 02ef878e3b

View File

@@ -27,6 +27,7 @@ Note: Requires FFmpeg to be installed on the system.
import os
import shutil
import tempfile
import uuid
from pathlib import Path
from typing import List, Literal, Optional
@@ -316,12 +317,16 @@ class VideoService:
audio_volume: float = 1.0,
video_volume: float = 0.0,
pad_strategy: str = "freeze", # "freeze" (freeze last frame) or "black" (black screen)
auto_adjust_duration: bool = True, # Automatically adjust video duration to match audio
duration_tolerance: float = 0.3, # Tolerance for video being longer than audio (seconds)
) -> str:
"""
Merge audio with video, using the longer duration
Merge audio with video with intelligent duration adjustment
The output video duration will be the maximum of video and audio duration.
If audio is longer than video, the video will be padded using the specified strategy.
Automatically handles duration mismatches between video and audio:
- If video < audio: Pad video to match audio (avoid black screen)
- If video > audio (within tolerance): Keep as-is (acceptable)
- If video > audio (exceeds tolerance): Trim video to match audio
Automatically handles videos with or without audio streams.
- If video has no audio: adds the audio track
@@ -339,6 +344,9 @@ class VideoService:
pad_strategy: Strategy to pad video if audio is longer
- "freeze": Freeze last frame (default)
- "black": Fill with black screen
auto_adjust_duration: Enable intelligent duration adjustment (default: True)
duration_tolerance: Tolerance for video being longer than audio in seconds (default: 0.3)
Videos within this tolerance won't be trimmed
Returns:
Path to the output video file
@@ -361,6 +369,28 @@ class VideoService:
logger.info(f"Video duration: {video_duration:.2f}s, Audio duration: {audio_duration:.2f}s")
# Intelligent duration adjustment (if enabled)
if auto_adjust_duration:
diff = video_duration - audio_duration
if diff < 0:
# Video shorter than audio → Must pad to avoid black screen
logger.warning(f"⚠️ Video shorter than audio by {abs(diff):.2f}s, padding required")
video = self._pad_video_to_duration(video, audio_duration, pad_strategy)
video_duration = audio_duration # Update duration after padding
logger.info(f"📌 Padded video to {audio_duration:.2f}s")
elif diff > duration_tolerance:
# Video significantly longer than audio → Trim
logger.info(f"⚠️ Video longer than audio by {diff:.2f}s (tolerance: {duration_tolerance}s)")
video = self._trim_video_to_duration(video, audio_duration)
video_duration = audio_duration # Update duration after trimming
logger.info(f"✂️ Trimmed video to {audio_duration:.2f}s")
else: # 0 <= diff <= duration_tolerance
# Video slightly longer but within tolerance → Keep as-is
logger.info(f"✅ Duration acceptable: video={video_duration:.2f}s, audio={audio_duration:.2f}s (diff={diff:.2f}s)")
# Determine target duration (max of both)
target_duration = max(video_duration, audio_duration)
logger.info(f"Target output duration: {target_duration:.2f}s")
@@ -382,9 +412,6 @@ class VideoService:
video_stream = video_stream.filter('tpad', stop_mode='clone', stop_duration=pad_duration)
else: # black
# Generate black frames for padding duration
from pixelle_video.utils.os_util import get_temp_path
import os
# Get video properties
probe = ffmpeg.probe(video)
video_info = next(s for s in probe['streams'] if s['codec_type'] == 'video')
@@ -395,7 +422,7 @@ class VideoService:
fps = fps_num / fps_den if fps_den != 0 else 30
# Create black video for padding
black_video_path = get_temp_path(f"black_pad_{os.path.basename(output)}")
black_video_path = self._get_unique_temp_path("black_pad", os.path.basename(output))
black_input = ffmpeg.input(
f'color=c=black:s={width}x{height}:r={fps}',
f='lavfi',
@@ -778,6 +805,26 @@ class VideoService:
fade_in=0.0
)
def _get_unique_temp_path(self, prefix: str, original_filename: str) -> str:
"""
Generate unique temporary file path to avoid concurrent conflicts
Args:
prefix: Prefix for the temp file (e.g., "trimmed", "padded", "black_pad")
original_filename: Original filename to preserve in temp path
Returns:
Unique temporary file path with format: temp/{prefix}_{uuid}_{original_filename}
Example:
>>> self._get_unique_temp_path("trimmed", "video.mp4")
>>> # Returns: "temp/trimmed_a3f2d8c1_video.mp4"
"""
from pixelle_video.utils.os_util import get_temp_path
unique_id = uuid.uuid4().hex[:8]
return get_temp_path(f"{prefix}_{unique_id}_{original_filename}")
def _resolve_bgm_path(self, bgm_path: str) -> str:
"""
Resolve BGM path (filename or custom path) with custom override support
@@ -841,4 +888,120 @@ class VideoService:
except Exception as e:
logger.warning(f"Failed to list BGM files: {e}")
return []
def _trim_video_to_duration(self, video: str, target_duration: float) -> str:
"""
Trim video to specified duration
Args:
video: Input video file path
target_duration: Target duration in seconds
Returns:
Path to trimmed video (temp file)
Raises:
RuntimeError: If FFmpeg execution fails
"""
output = self._get_unique_temp_path("trimmed", os.path.basename(video))
try:
# Use stream copy when possible for fast trimming
(
ffmpeg
.input(video, t=target_duration)
.output(output, vcodec='copy', acodec='copy' if self.has_audio_stream(video) else 'copy')
.overwrite_output()
.run(capture_stdout=True, capture_stderr=True, quiet=True)
)
return output
except ffmpeg.Error as e:
error_msg = e.stderr.decode() if e.stderr else str(e)
logger.error(f"FFmpeg error trimming video: {error_msg}")
raise RuntimeError(f"Failed to trim video: {error_msg}")
def _pad_video_to_duration(self, video: str, target_duration: float, pad_strategy: str = "freeze") -> str:
"""
Pad video to specified duration by extending the last frame or adding black frames
Args:
video: Input video file path
target_duration: Target duration in seconds
pad_strategy: Padding strategy - "freeze" (freeze last frame) or "black" (black screen)
Returns:
Path to padded video (temp file)
Raises:
RuntimeError: If FFmpeg execution fails
"""
output = self._get_unique_temp_path("padded", os.path.basename(video))
video_duration = self._get_video_duration(video)
pad_duration = target_duration - video_duration
if pad_duration <= 0:
# No padding needed, return original
return video
try:
input_video = ffmpeg.input(video)
video_stream = input_video.video
if pad_strategy == "freeze":
# Freeze last frame using tpad filter
video_stream = video_stream.filter('tpad', stop_mode='clone', stop_duration=pad_duration)
# Output with re-encoding (tpad requires it)
(
ffmpeg
.output(
video_stream,
output,
vcodec='libx264',
preset='fast',
crf=23
)
.overwrite_output()
.run(capture_stdout=True, capture_stderr=True, quiet=True)
)
else: # black
# Generate black frames for padding duration
# Get video properties
probe = ffmpeg.probe(video)
video_info = next(s for s in probe['streams'] if s['codec_type'] == 'video')
width = int(video_info['width'])
height = int(video_info['height'])
fps_str = video_info['r_frame_rate']
fps_num, fps_den = map(int, fps_str.split('/'))
fps = fps_num / fps_den if fps_den != 0 else 30
# Create black video for padding
black_input = ffmpeg.input(
f'color=c=black:s={width}x{height}:r={fps}',
f='lavfi',
t=pad_duration
)
# Concatenate original video with black padding
video_stream = ffmpeg.concat(video_stream, black_input.video, v=1, a=0)
(
ffmpeg
.output(
video_stream,
output,
vcodec='libx264',
preset='fast',
crf=23
)
.overwrite_output()
.run(capture_stdout=True, capture_stderr=True, quiet=True)
)
return output
except ffmpeg.Error as e:
error_msg = e.stderr.decode() if e.stderr else str(e)
logger.error(f"FFmpeg error padding video: {error_msg}")
raise RuntimeError(f"Failed to pad video: {error_msg}")