""" 视频合成服务 """ import os import subprocess import json import shlex from pathlib import Path from loguru import logger from typing import Optional class VideoService: def __init__(self): pass def get_video_metadata(self, file_path: str) -> dict: """获取视频元信息(含旋转角与有效显示分辨率)""" cmd = [ "ffprobe", "-v", "error", "-select_streams", "v:0", "-show_entries", "stream=width,height:stream_side_data=rotation", "-of", "json", file_path, ] default_info = { "width": 0, "height": 0, "rotation": 0, "effective_width": 0, "effective_height": 0, } try: result = subprocess.run(cmd, capture_output=True, text=True, timeout=10) if result.returncode != 0: return default_info payload = json.loads(result.stdout or "{}") streams = payload.get("streams") or [] if not streams: return default_info stream = streams[0] width = int(stream.get("width") or 0) height = int(stream.get("height") or 0) rotation = 0 for side_data in stream.get("side_data_list") or []: if not isinstance(side_data, dict): continue raw_rotation = side_data.get("rotation") if raw_rotation is None: continue try: rotation = int(round(float(str(raw_rotation)))) except Exception: rotation = 0 break norm_rotation = rotation % 360 if norm_rotation > 180: norm_rotation -= 360 swap_wh = abs(norm_rotation) == 90 effective_width = height if swap_wh else width effective_height = width if swap_wh else height return { "width": width, "height": height, "rotation": norm_rotation, "effective_width": effective_width, "effective_height": effective_height, } except Exception as e: logger.warning(f"获取视频元信息失败: {e}") return default_info def normalize_orientation(self, video_path: str, output_path: str) -> str: """将带旋转元数据的视频转为物理方向,避免后续流程忽略 rotation。""" info = self.get_video_metadata(video_path) rotation = int(info.get("rotation") or 0) if rotation == 0: return video_path Path(output_path).parent.mkdir(parents=True, exist_ok=True) logger.info( f"检测到旋转元数据 rotation={rotation},归一化方向: " f"{info.get('effective_width', 0)}x{info.get('effective_height', 0)}" ) cmd = [ "ffmpeg", "-y", "-i", video_path, "-map", "0:v:0", "-map", "0:a?", "-c:v", "libx264", "-preset", "fast", "-crf", "23", "-c:a", "copy", "-movflags", "+faststart", output_path, ] if self._run_ffmpeg(cmd): normalized = self.get_video_metadata(output_path) logger.info( "视频方向归一化完成: " f"coded={normalized.get('width', 0)}x{normalized.get('height', 0)}, " f"rotation={normalized.get('rotation', 0)}" ) return output_path logger.warning("视频方向归一化失败,回退使用原视频") return video_path def _run_ffmpeg(self, cmd: list) -> bool: cmd_str = ' '.join(shlex.quote(str(c)) for c in cmd) logger.debug(f"FFmpeg CMD: {cmd_str}") try: # Synchronous call for BackgroundTasks compatibility result = subprocess.run( cmd, shell=False, capture_output=True, text=True, encoding='utf-8', ) if result.returncode != 0: logger.error(f"FFmpeg Error: {result.stderr}") return False return True except Exception as e: logger.error(f"FFmpeg Exception: {e}") return False def _get_duration(self, file_path: str) -> float: # Synchronous call for BackgroundTasks compatibility # 使用参数列表形式避免 shell=True 的命令注入风险 cmd = [ 'ffprobe', '-v', 'error', '-show_entries', 'format=duration', '-of', 'default=noprint_wrappers=1:nokey=1', file_path ] try: result = subprocess.run( cmd, capture_output=True, text=True, ) return float(result.stdout.strip()) except Exception: return 0.0 def mix_audio( self, voice_path: str, bgm_path: str, output_path: str, bgm_volume: float = 0.2 ) -> str: """混合人声与背景音乐""" Path(output_path).parent.mkdir(parents=True, exist_ok=True) volume = max(0.0, min(float(bgm_volume), 1.0)) filter_complex = ( f"[0:a]volume=1.0[a0];" f"[1:a]volume={volume}[a1];" f"[a0][a1]amix=inputs=2:duration=first:dropout_transition=2:normalize=0[aout]" ) cmd = [ "ffmpeg", "-y", "-i", voice_path, "-stream_loop", "-1", "-i", bgm_path, "-filter_complex", filter_complex, "-map", "[aout]", "-c:a", "pcm_s16le", "-shortest", output_path, ] if self._run_ffmpeg(cmd): return output_path raise RuntimeError("FFmpeg audio mix failed") async def compose( self, video_path: str, audio_path: str, output_path: str, subtitle_path: Optional[str] = None ) -> str: """合成视频""" # Ensure output dir Path(output_path).parent.mkdir(parents=True, exist_ok=True) video_duration = self._get_duration(video_path) audio_duration = self._get_duration(audio_path) # Audio loop if needed loop_count = 1 if audio_duration > video_duration and video_duration > 0: loop_count = int(audio_duration / video_duration) + 1 cmd = ["ffmpeg", "-y"] # Input video (stream_loop must be before -i) if loop_count > 1: cmd.extend(["-stream_loop", str(loop_count)]) cmd.extend(["-i", video_path]) # Input audio cmd.extend(["-i", audio_path]) # Filter complex filter_complex = [] # Subtitles (skip for now to mimic previous state or implement basic) # Previous state: subtitles disabled due to font issues # if subtitle_path: ... # Audio map with high quality encoding cmd.extend([ "-c:v", "libx264", "-preset", "medium", # 平衡速度与压缩效率 "-crf", "20", # 最终输出:高质量(肉眼无损) "-c:a", "aac", "-b:a", "192k", # 音频比特率 "-shortest" ]) # Use audio from input 1 cmd.extend(["-map", "0:v", "-map", "1:a"]) cmd.append(output_path) if self._run_ffmpeg(cmd): return output_path else: raise RuntimeError("FFmpeg composition failed") def concat_videos(self, video_paths: list, output_path: str, target_fps: int = 25) -> str: """使用 FFmpeg concat demuxer 拼接多个视频片段""" if not video_paths: raise ValueError("No video segments to concat") Path(output_path).parent.mkdir(parents=True, exist_ok=True) # 生成 concat list 文件 list_path = Path(output_path).parent / f"{Path(output_path).stem}_concat.txt" with open(list_path, "w", encoding="utf-8") as f: for vp in video_paths: f.write(f"file '{vp}'\n") cmd = [ "ffmpeg", "-y", "-f", "concat", "-safe", "0", "-fflags", "+genpts", "-i", str(list_path), "-an", "-vsync", "cfr", "-r", str(target_fps), "-c:v", "libx264", "-preset", "fast", "-crf", "23", "-pix_fmt", "yuv420p", "-movflags", "+faststart", output_path, ] try: if self._run_ffmpeg(cmd): return output_path else: raise RuntimeError("FFmpeg concat failed") finally: try: list_path.unlink(missing_ok=True) except Exception: pass def split_audio(self, audio_path: str, start: float, end: float, output_path: str) -> str: """用 FFmpeg 按时间范围切分音频""" Path(output_path).parent.mkdir(parents=True, exist_ok=True) duration = end - start if duration <= 0: raise ValueError(f"Invalid audio split range: start={start}, end={end}, duration={duration}") cmd = [ "ffmpeg", "-y", "-ss", str(start), "-t", str(duration), "-i", audio_path, "-c", "copy", output_path, ] if self._run_ffmpeg(cmd): return output_path raise RuntimeError(f"FFmpeg audio split failed: {start}-{end}") def get_resolution(self, file_path: str) -> tuple[int, int]: """获取视频有效显示分辨率(考虑旋转元数据)。""" info = self.get_video_metadata(file_path) return ( int(info.get("effective_width") or 0), int(info.get("effective_height") or 0), ) def prepare_segment(self, video_path: str, target_duration: float, output_path: str, target_resolution: Optional[tuple] = None, source_start: float = 0.0, source_end: Optional[float] = None, target_fps: Optional[int] = None) -> str: """将素材视频裁剪或循环到指定时长(无音频)。 target_resolution: (width, height) 如需统一分辨率则传入,否则保持原分辨率。 source_start: 源视频截取起点(秒),默认 0。 source_end: 源视频截取终点(秒),默认到素材结尾。 target_fps: 输出帧率(可选),用于多素材拼接前统一时间基。 """ Path(output_path).parent.mkdir(parents=True, exist_ok=True) video_dur = self._get_duration(video_path) if video_dur <= 0: video_dur = target_duration clip_end = video_dur if source_end is not None: try: source_end_value = float(source_end) if source_end_value > source_start: clip_end = min(source_end_value, video_dur) except Exception: pass # 可用时长 = 从 source_start 到视频结尾 available = max(clip_end - source_start, 0.1) needs_loop = target_duration > available needs_scale = target_resolution is not None needs_fps = bool(target_fps and target_fps > 0) has_source_end = clip_end < video_dur # 当需要循环且存在截取范围时,先裁剪出片段,再循环裁剪后的文件 # 避免 stream_loop 循环整个视频(而不是截取后的片段) actual_input = video_path trim_temp = None if needs_loop and (source_start > 0 or has_source_end): trim_temp = str(Path(output_path).parent / (Path(output_path).stem + "_trim_tmp.mp4")) trim_cmd = [ "ffmpeg", "-y", "-ss", str(source_start), "-i", video_path, "-t", str(available), "-an", "-c:v", "libx264", "-preset", "fast", "-crf", "23", trim_temp, ] if not self._run_ffmpeg(trim_cmd): raise RuntimeError(f"FFmpeg trim for loop failed: {video_path}") actual_input = trim_temp source_start = 0.0 # 已裁剪,不需要再 seek # 重新计算循环次数(基于裁剪后文件) available = self._get_duration(trim_temp) or available loop_count = int(target_duration / available) + 1 if needs_loop else 0 cmd = ["ffmpeg", "-y"] if needs_loop: cmd.extend(["-stream_loop", str(loop_count)]) if source_start > 0: cmd.extend(["-ss", str(source_start)]) cmd.extend(["-i", actual_input, "-t", str(target_duration), "-an"]) filters = [] if needs_fps: filters.append(f"fps={int(target_fps)}") if needs_scale: w, h = target_resolution filters.append(f"scale={w}:{h}:force_original_aspect_ratio=decrease,pad={w}:{h}:(ow-iw)/2:(oh-ih)/2") if filters: cmd.extend(["-vf", ",".join(filters)]) if needs_fps: cmd.extend(["-vsync", "cfr", "-r", str(int(target_fps))]) # 需要循环、缩放或指定起点时必须重编码,否则用 stream copy 保持原画质 if needs_loop or needs_scale or source_start > 0 or has_source_end or needs_fps: cmd.extend(["-c:v", "libx264", "-preset", "fast", "-crf", "23"]) else: cmd.extend(["-c:v", "copy"]) cmd.append(output_path) try: if self._run_ffmpeg(cmd): return output_path raise RuntimeError(f"FFmpeg prepare_segment failed: {video_path}") finally: # 清理裁剪临时文件 if trim_temp: try: Path(trim_temp).unlink(missing_ok=True) except Exception: pass