283 lines
9.8 KiB
Python
283 lines
9.8 KiB
Python
"""
|
|
视频合成服务
|
|
"""
|
|
import os
|
|
import subprocess
|
|
import json
|
|
import shlex
|
|
from pathlib import Path
|
|
from loguru import logger
|
|
from typing import Optional
|
|
|
|
class VideoService:
|
|
def __init__(self):
|
|
pass
|
|
|
|
def _run_ffmpeg(self, cmd: list) -> bool:
|
|
cmd_str = ' '.join(shlex.quote(str(c)) for c in cmd)
|
|
logger.debug(f"FFmpeg CMD: {cmd_str}")
|
|
try:
|
|
# Synchronous call for BackgroundTasks compatibility
|
|
result = subprocess.run(
|
|
cmd,
|
|
shell=False,
|
|
capture_output=True,
|
|
text=True,
|
|
encoding='utf-8',
|
|
)
|
|
if result.returncode != 0:
|
|
logger.error(f"FFmpeg Error: {result.stderr}")
|
|
return False
|
|
return True
|
|
except Exception as e:
|
|
logger.error(f"FFmpeg Exception: {e}")
|
|
return False
|
|
|
|
def _get_duration(self, file_path: str) -> float:
|
|
# Synchronous call for BackgroundTasks compatibility
|
|
# 使用参数列表形式避免 shell=True 的命令注入风险
|
|
cmd = [
|
|
'ffprobe', '-v', 'error',
|
|
'-show_entries', 'format=duration',
|
|
'-of', 'default=noprint_wrappers=1:nokey=1',
|
|
file_path
|
|
]
|
|
try:
|
|
result = subprocess.run(
|
|
cmd,
|
|
capture_output=True,
|
|
text=True,
|
|
)
|
|
return float(result.stdout.strip())
|
|
except Exception:
|
|
return 0.0
|
|
|
|
def mix_audio(
|
|
self,
|
|
voice_path: str,
|
|
bgm_path: str,
|
|
output_path: str,
|
|
bgm_volume: float = 0.2
|
|
) -> str:
|
|
"""混合人声与背景音乐"""
|
|
Path(output_path).parent.mkdir(parents=True, exist_ok=True)
|
|
|
|
volume = max(0.0, min(float(bgm_volume), 1.0))
|
|
filter_complex = (
|
|
f"[0:a]volume=1.0[a0];"
|
|
f"[1:a]volume={volume}[a1];"
|
|
f"[a0][a1]amix=inputs=2:duration=first:dropout_transition=2:normalize=0[aout]"
|
|
)
|
|
|
|
cmd = [
|
|
"ffmpeg", "-y",
|
|
"-i", voice_path,
|
|
"-stream_loop", "-1", "-i", bgm_path,
|
|
"-filter_complex", filter_complex,
|
|
"-map", "[aout]",
|
|
"-c:a", "pcm_s16le",
|
|
"-shortest",
|
|
output_path,
|
|
]
|
|
|
|
if self._run_ffmpeg(cmd):
|
|
return output_path
|
|
raise RuntimeError("FFmpeg audio mix failed")
|
|
|
|
async def compose(
|
|
self,
|
|
video_path: str,
|
|
audio_path: str,
|
|
output_path: str,
|
|
subtitle_path: Optional[str] = None
|
|
) -> str:
|
|
"""合成视频"""
|
|
# Ensure output dir
|
|
Path(output_path).parent.mkdir(parents=True, exist_ok=True)
|
|
|
|
video_duration = self._get_duration(video_path)
|
|
audio_duration = self._get_duration(audio_path)
|
|
|
|
# Audio loop if needed
|
|
loop_count = 1
|
|
if audio_duration > video_duration and video_duration > 0:
|
|
loop_count = int(audio_duration / video_duration) + 1
|
|
|
|
cmd = ["ffmpeg", "-y"]
|
|
|
|
# Input video (stream_loop must be before -i)
|
|
if loop_count > 1:
|
|
cmd.extend(["-stream_loop", str(loop_count)])
|
|
cmd.extend(["-i", video_path])
|
|
|
|
# Input audio
|
|
cmd.extend(["-i", audio_path])
|
|
|
|
# Filter complex
|
|
filter_complex = []
|
|
|
|
# Subtitles (skip for now to mimic previous state or implement basic)
|
|
# Previous state: subtitles disabled due to font issues
|
|
# if subtitle_path: ...
|
|
|
|
# Audio map with high quality encoding
|
|
cmd.extend([
|
|
"-c:v", "libx264",
|
|
"-preset", "slow", # 慢速预设,更好的压缩效率
|
|
"-crf", "18", # 高质量(与 LatentSync 一致)
|
|
"-c:a", "aac",
|
|
"-b:a", "192k", # 音频比特率
|
|
"-shortest"
|
|
])
|
|
# Use audio from input 1
|
|
cmd.extend(["-map", "0:v", "-map", "1:a"])
|
|
|
|
cmd.append(output_path)
|
|
|
|
if self._run_ffmpeg(cmd):
|
|
return output_path
|
|
else:
|
|
raise RuntimeError("FFmpeg composition failed")
|
|
|
|
def concat_videos(self, video_paths: list, output_path: str) -> str:
|
|
"""使用 FFmpeg concat demuxer 拼接多个视频片段"""
|
|
if not video_paths:
|
|
raise ValueError("No video segments to concat")
|
|
|
|
Path(output_path).parent.mkdir(parents=True, exist_ok=True)
|
|
|
|
# 生成 concat list 文件
|
|
list_path = Path(output_path).parent / f"{Path(output_path).stem}_concat.txt"
|
|
with open(list_path, "w", encoding="utf-8") as f:
|
|
for vp in video_paths:
|
|
f.write(f"file '{vp}'\n")
|
|
|
|
cmd = [
|
|
"ffmpeg", "-y",
|
|
"-f", "concat",
|
|
"-safe", "0",
|
|
"-i", str(list_path),
|
|
"-c", "copy",
|
|
output_path,
|
|
]
|
|
|
|
try:
|
|
if self._run_ffmpeg(cmd):
|
|
return output_path
|
|
else:
|
|
raise RuntimeError("FFmpeg concat failed")
|
|
finally:
|
|
try:
|
|
list_path.unlink(missing_ok=True)
|
|
except Exception:
|
|
pass
|
|
|
|
def split_audio(self, audio_path: str, start: float, end: float, output_path: str) -> str:
|
|
"""用 FFmpeg 按时间范围切分音频"""
|
|
Path(output_path).parent.mkdir(parents=True, exist_ok=True)
|
|
|
|
duration = end - start
|
|
if duration <= 0:
|
|
raise ValueError(f"Invalid audio split range: start={start}, end={end}, duration={duration}")
|
|
|
|
cmd = [
|
|
"ffmpeg", "-y",
|
|
"-ss", str(start),
|
|
"-t", str(duration),
|
|
"-i", audio_path,
|
|
"-c", "copy",
|
|
output_path,
|
|
]
|
|
|
|
if self._run_ffmpeg(cmd):
|
|
return output_path
|
|
raise RuntimeError(f"FFmpeg audio split failed: {start}-{end}")
|
|
|
|
def get_resolution(self, file_path: str) -> tuple:
|
|
"""获取视频分辨率,返回 (width, height)"""
|
|
cmd = [
|
|
'ffprobe', '-v', 'error',
|
|
'-select_streams', 'v:0',
|
|
'-show_entries', 'stream=width,height',
|
|
'-of', 'csv=p=0',
|
|
file_path
|
|
]
|
|
try:
|
|
result = subprocess.run(cmd, capture_output=True, text=True, timeout=10)
|
|
parts = result.stdout.strip().split(',')
|
|
return (int(parts[0]), int(parts[1]))
|
|
except Exception:
|
|
return (0, 0)
|
|
|
|
def prepare_segment(self, video_path: str, target_duration: float, output_path: str,
|
|
target_resolution: tuple = None, source_start: float = 0.0) -> str:
|
|
"""将素材视频裁剪或循环到指定时长(无音频)。
|
|
target_resolution: (width, height) 如需统一分辨率则传入,否则保持原分辨率。
|
|
source_start: 源视频截取起点(秒),默认 0。
|
|
"""
|
|
Path(output_path).parent.mkdir(parents=True, exist_ok=True)
|
|
|
|
video_dur = self._get_duration(video_path)
|
|
if video_dur <= 0:
|
|
video_dur = target_duration
|
|
|
|
# 可用时长 = 从 source_start 到视频结尾
|
|
available = max(video_dur - source_start, 0.1)
|
|
needs_loop = target_duration > available
|
|
needs_scale = target_resolution is not None
|
|
|
|
# 当需要循环且有 source_start 时,先裁剪出片段,再循环裁剪后的文件
|
|
# 避免 stream_loop 循环整个视频(而不是从 source_start 开始的片段)
|
|
actual_input = video_path
|
|
trim_temp = None
|
|
if needs_loop and source_start > 0:
|
|
trim_temp = str(Path(output_path).parent / (Path(output_path).stem + "_trim_tmp.mp4"))
|
|
trim_cmd = [
|
|
"ffmpeg", "-y",
|
|
"-ss", str(source_start),
|
|
"-i", video_path,
|
|
"-t", str(available),
|
|
"-an",
|
|
"-c:v", "libx264", "-preset", "fast", "-crf", "18",
|
|
trim_temp,
|
|
]
|
|
if not self._run_ffmpeg(trim_cmd):
|
|
raise RuntimeError(f"FFmpeg trim for loop failed: {video_path}")
|
|
actual_input = trim_temp
|
|
source_start = 0.0 # 已裁剪,不需要再 seek
|
|
# 重新计算循环次数(基于裁剪后文件)
|
|
available = self._get_duration(trim_temp) or available
|
|
|
|
loop_count = int(target_duration / available) + 1 if needs_loop else 0
|
|
|
|
cmd = ["ffmpeg", "-y"]
|
|
if needs_loop:
|
|
cmd.extend(["-stream_loop", str(loop_count)])
|
|
if source_start > 0:
|
|
cmd.extend(["-ss", str(source_start)])
|
|
cmd.extend(["-i", actual_input, "-t", str(target_duration), "-an"])
|
|
|
|
if needs_scale:
|
|
w, h = target_resolution
|
|
cmd.extend(["-vf", f"scale={w}:{h}:force_original_aspect_ratio=decrease,pad={w}:{h}:(ow-iw)/2:(oh-ih)/2"])
|
|
|
|
# 需要循环、缩放或指定起点时必须重编码,否则用 stream copy 保持原画质
|
|
if needs_loop or needs_scale or source_start > 0:
|
|
cmd.extend(["-c:v", "libx264", "-preset", "fast", "-crf", "18"])
|
|
else:
|
|
cmd.extend(["-c:v", "copy"])
|
|
|
|
cmd.append(output_path)
|
|
|
|
try:
|
|
if self._run_ffmpeg(cmd):
|
|
return output_path
|
|
raise RuntimeError(f"FFmpeg prepare_segment failed: {video_path}")
|
|
finally:
|
|
# 清理裁剪临时文件
|
|
if trim_temp:
|
|
try:
|
|
Path(trim_temp).unlink(missing_ok=True)
|
|
except Exception:
|
|
pass
|