""" 视频合成服务 """ import os import subprocess import json import shlex from pathlib import Path from loguru import logger from typing import Optional class VideoService: def __init__(self): pass def _run_ffmpeg(self, cmd: list) -> bool: cmd_str = ' '.join(shlex.quote(str(c)) for c in cmd) logger.debug(f"FFmpeg CMD: {cmd_str}") try: # Synchronous call for BackgroundTasks compatibility result = subprocess.run( cmd, shell=False, capture_output=True, text=True, encoding='utf-8', ) if result.returncode != 0: logger.error(f"FFmpeg Error: {result.stderr}") return False return True except Exception as e: logger.error(f"FFmpeg Exception: {e}") return False def _get_duration(self, file_path: str) -> float: # Synchronous call for BackgroundTasks compatibility # 使用参数列表形式避免 shell=True 的命令注入风险 cmd = [ 'ffprobe', '-v', 'error', '-show_entries', 'format=duration', '-of', 'default=noprint_wrappers=1:nokey=1', file_path ] try: result = subprocess.run( cmd, capture_output=True, text=True, ) return float(result.stdout.strip()) except Exception: return 0.0 def mix_audio( self, voice_path: str, bgm_path: str, output_path: str, bgm_volume: float = 0.2 ) -> str: """混合人声与背景音乐""" Path(output_path).parent.mkdir(parents=True, exist_ok=True) volume = max(0.0, min(float(bgm_volume), 1.0)) filter_complex = ( f"[0:a]volume=1.0[a0];" f"[1:a]volume={volume}[a1];" f"[a0][a1]amix=inputs=2:duration=first:dropout_transition=2:normalize=0[aout]" ) cmd = [ "ffmpeg", "-y", "-i", voice_path, "-stream_loop", "-1", "-i", bgm_path, "-filter_complex", filter_complex, "-map", "[aout]", "-c:a", "pcm_s16le", "-shortest", output_path, ] if self._run_ffmpeg(cmd): return output_path raise RuntimeError("FFmpeg audio mix failed") async def compose( self, video_path: str, audio_path: str, output_path: str, subtitle_path: Optional[str] = None ) -> str: """合成视频""" # Ensure output dir Path(output_path).parent.mkdir(parents=True, exist_ok=True) video_duration = self._get_duration(video_path) audio_duration = self._get_duration(audio_path) # Audio loop if needed loop_count = 1 if audio_duration > video_duration and video_duration > 0: loop_count = int(audio_duration / video_duration) + 1 cmd = ["ffmpeg", "-y"] # Input video (stream_loop must be before -i) if loop_count > 1: cmd.extend(["-stream_loop", str(loop_count)]) cmd.extend(["-i", video_path]) # Input audio cmd.extend(["-i", audio_path]) # Filter complex filter_complex = [] # Subtitles (skip for now to mimic previous state or implement basic) # Previous state: subtitles disabled due to font issues # if subtitle_path: ... # Audio map with high quality encoding cmd.extend([ "-c:v", "libx264", "-preset", "slow", # 慢速预设,更好的压缩效率 "-crf", "18", # 高质量(与 LatentSync 一致) "-c:a", "aac", "-b:a", "192k", # 音频比特率 "-shortest" ]) # Use audio from input 1 cmd.extend(["-map", "0:v", "-map", "1:a"]) cmd.append(output_path) if self._run_ffmpeg(cmd): return output_path else: raise RuntimeError("FFmpeg composition failed")