Files
ViGent2/backend/app/services/lipsync_service.py
Kevin Wong 1e52346eb4 更新
2026-02-07 14:29:57 +08:00

491 lines
21 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
唇形同步服务
通过 subprocess 调用 LatentSync conda 环境进行推理
配置为使用 GPU1 (CUDA:1)
"""
import os
import shutil
import subprocess
import tempfile
import asyncio
import httpx
from pathlib import Path
from loguru import logger
from typing import Optional
from app.core.config import settings
class LipSyncService:
"""唇形同步服务 - LatentSync 1.6 集成 (Subprocess 方式)"""
def __init__(self):
self.use_local = settings.LATENTSYNC_LOCAL
self.api_url = settings.LATENTSYNC_API_URL
self.latentsync_dir = settings.LATENTSYNC_DIR
self.gpu_id = settings.LATENTSYNC_GPU_ID
self.use_server = settings.LATENTSYNC_USE_SERVER
# GPU 并发锁 (Serial Queue)
self._lock = asyncio.Lock()
# Conda 环境 Python 路径
# 根据服务器实际情况调整
self.conda_python = Path.home() / "ProgramFiles" / "miniconda3" / "envs" / "latentsync" / "bin" / "python"
# 运行时检测
self._weights_available: Optional[bool] = None
def _check_weights(self) -> bool:
"""检查模型权重是否存在"""
if self._weights_available is not None:
return self._weights_available
required_files = [
self.latentsync_dir / "checkpoints" / "latentsync_unet.pt",
self.latentsync_dir / "checkpoints" / "whisper" / "tiny.pt",
]
required_dirs = [
self.latentsync_dir / "configs" / "unet",
self.latentsync_dir / "latentsync",
self.latentsync_dir / "scripts",
]
files_ok = all(f.exists() for f in required_files)
dirs_ok = all(d.exists() for d in required_dirs)
self._weights_available = files_ok and dirs_ok
if self._weights_available:
logger.info("✅ LatentSync 权重文件已就绪")
else:
missing_files = [str(f) for f in required_files if not f.exists()]
missing_dirs = [str(d) for d in required_dirs if not d.exists()]
logger.warning(f"⚠️ 缺少文件: {missing_files}")
logger.warning(f"⚠️ 缺少目录: {missing_dirs}")
return self._weights_available
def _check_conda_env(self) -> bool:
"""检查 conda 环境是否可用"""
if not self.conda_python.exists():
logger.warning(f"⚠️ Conda Python 不存在: {self.conda_python}")
return False
return True
def _get_media_duration(self, media_path: str) -> Optional[float]:
"""获取音频或视频的时长(秒)"""
try:
cmd = [
"ffprobe", "-v", "error",
"-show_entries", "format=duration",
"-of", "default=noprint_wrappers=1:nokey=1",
media_path
]
result = subprocess.run(cmd, capture_output=True, text=True, timeout=10)
if result.returncode == 0:
return float(result.stdout.strip())
except Exception as e:
logger.warning(f"⚠️ 获取媒体时长失败: {e}")
return None
def _loop_video_to_duration(self, video_path: str, output_path: str, target_duration: float) -> str:
"""
循环视频以匹配目标时长
使用 FFmpeg stream_loop 实现无缝循环
"""
try:
cmd = [
"ffmpeg", "-y",
"-stream_loop", "-1", # 无限循环
"-i", video_path,
"-t", str(target_duration), # 截取到目标时长
"-c:v", "libx264",
"-preset", "fast",
"-crf", "18",
"-an", # 去掉原音频
output_path
]
result = subprocess.run(cmd, capture_output=True, text=True, timeout=300)
if result.returncode == 0 and Path(output_path).exists():
logger.info(f"✅ 视频循环完成: {target_duration:.1f}s")
return output_path
else:
logger.warning(f"⚠️ 视频循环失败: {result.stderr[:200]}")
return video_path
except Exception as e:
logger.warning(f"⚠️ 视频循环异常: {e}")
return video_path
def _preprocess_video(self, video_path: str, output_path: str, target_height: int = 720) -> str:
"""
视频预处理:压缩视频以加速后续处理
- 限制最大高度为 target_height (默认720p)
- 保持宽高比
- 使用快速编码预设
Returns: 预处理后的视频路径
"""
import subprocess
import json
# 获取视频信息 (使用 JSON 格式更可靠)
probe_cmd = [
"ffprobe", "-v", "error",
"-select_streams", "v:0",
"-show_entries", "stream=height,width",
"-of", "json",
video_path
]
try:
result = subprocess.run(probe_cmd, capture_output=True, text=True, timeout=10)
if result.returncode != 0:
logger.warning(f"⚠️ ffprobe 失败: {result.stderr[:100]}")
return video_path
probe_data = json.loads(result.stdout)
streams = probe_data.get("streams", [])
if not streams:
logger.warning("⚠️ 无法获取视频流信息,跳过预处理")
return video_path
current_height = streams[0].get("height", 0)
current_width = streams[0].get("width", 0)
if current_height == 0:
logger.warning("⚠️ 视频高度为 0跳过预处理")
return video_path
logger.info(f"📹 原始视频分辨率: {current_width}×{current_height}")
except json.JSONDecodeError as e:
logger.warning(f"⚠️ ffprobe 输出解析失败: {e}")
return video_path
except subprocess.TimeoutExpired:
logger.warning("⚠️ ffprobe 超时,跳过预处理")
return video_path
except Exception as e:
logger.warning(f"⚠️ 获取视频信息失败: {e}")
return video_path
# 如果视频已经足够小,跳过压缩
if current_height <= target_height:
logger.info(f"📹 视频高度 {current_height}p <= {target_height}p无需压缩")
return video_path
logger.info(f"📹 预处理视频: {current_height}p → {target_height}p")
# 使用 FFmpeg 压缩
compress_cmd = [
"ffmpeg", "-y",
"-i", video_path,
"-vf", f"scale=-2:{target_height}", # 保持宽高比,高度设为 target_height
"-c:v", "libx264",
"-preset", "ultrafast", # 最快编码速度
"-crf", "23", # 质量因子
"-c:a", "copy", # 音频直接复制
output_path
]
try:
result = subprocess.run(
compress_cmd,
capture_output=True,
text=True,
timeout=120 # 增加超时时间到2分钟
)
if result.returncode == 0 and Path(output_path).exists():
original_size = Path(video_path).stat().st_size / 1024 / 1024
new_size = Path(output_path).stat().st_size / 1024 / 1024
logger.info(f"✅ 视频压缩完成: {original_size:.1f}MB → {new_size:.1f}MB")
return output_path
else:
logger.warning(f"⚠️ 视频压缩失败: {result.stderr[:200]}")
return video_path
except subprocess.TimeoutExpired:
logger.warning("⚠️ 视频压缩超时,使用原始视频")
return video_path
except Exception as e:
logger.warning(f"⚠️ 视频压缩异常: {e}")
return video_path
async def generate(
self,
video_path: str,
audio_path: str,
output_path: str,
fps: int = 25
) -> str:
"""生成唇形同步视频"""
logger.info(f"🎬 唇形同步任务: {Path(video_path).name} + {Path(audio_path).name}")
Path(output_path).parent.mkdir(parents=True, exist_ok=True)
if self.use_local:
return await self._local_generate(video_path, audio_path, output_path, fps)
else:
return await self._remote_generate(video_path, audio_path, output_path, fps)
async def _local_generate(
self,
video_path: str,
audio_path: str,
output_path: str,
fps: int
) -> str:
"""使用 subprocess 调用 LatentSync conda 环境"""
# 检查前置条件
if not self._check_conda_env():
logger.warning("⚠️ Conda 环境不可用,使用 Fallback")
shutil.copy(video_path, output_path)
return output_path
if not self._check_weights():
logger.warning("⚠️ 模型权重不存在,使用 Fallback")
shutil.copy(video_path, output_path)
return output_path
logger.info("⏳ 等待 GPU 资源 (排队中)...")
async with self._lock:
# 使用临时目录存放中间文件
with tempfile.TemporaryDirectory() as tmpdir:
tmpdir = Path(tmpdir)
# 获取音频和视频时长
audio_duration = self._get_media_duration(audio_path)
video_duration = self._get_media_duration(video_path)
# 如果音频比视频长,循环视频以匹配音频长度
if audio_duration and video_duration and audio_duration > video_duration + 0.5:
logger.info(f"🔄 音频({audio_duration:.1f}s) > 视频({video_duration:.1f}s),循环视频...")
looped_video = tmpdir / "looped_input.mp4"
actual_video_path = self._loop_video_to_duration(
video_path,
str(looped_video),
audio_duration
)
else:
actual_video_path = video_path
if self.use_server:
# 模式 A: 调用常驻服务 (加速模式)
return await self._call_persistent_server(actual_video_path, audio_path, output_path)
logger.info("🔄 调用 LatentSync 推理 (subprocess)...")
temp_output = tmpdir / "output.mp4"
# 构建命令
cmd = [
str(self.conda_python),
"-m", "scripts.inference",
"--unet_config_path", "configs/unet/stage2_512.yaml",
"--inference_ckpt_path", "checkpoints/latentsync_unet.pt",
"--inference_steps", str(settings.LATENTSYNC_INFERENCE_STEPS),
"--guidance_scale", str(settings.LATENTSYNC_GUIDANCE_SCALE),
"--video_path", str(actual_video_path), # 使用预处理后的视频
"--audio_path", str(audio_path),
"--video_out_path", str(temp_output),
"--seed", str(settings.LATENTSYNC_SEED),
"--temp_dir", str(tmpdir / "cache"),
]
if settings.LATENTSYNC_ENABLE_DEEPCACHE:
cmd.append("--enable_deepcache")
# 设置环境变量
env = os.environ.copy()
env["CUDA_VISIBLE_DEVICES"] = str(self.gpu_id)
logger.info(f"🖥️ 执行命令: {' '.join(cmd[:8])}...")
logger.info(f"🖥️ GPU: CUDA_VISIBLE_DEVICES={self.gpu_id}")
try:
# 使用 asyncio subprocess 实现真正的异步执行
# 这样事件循环可以继续处理其他请求(如进度查询)
process = await asyncio.create_subprocess_exec(
*cmd,
cwd=str(self.latentsync_dir),
env=env,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
# 等待进程完成,带超时
try:
stdout, stderr = await asyncio.wait_for(
process.communicate(),
timeout=900 # 15分钟超时
)
except asyncio.TimeoutError:
process.kill()
await process.wait()
logger.error("⏰ LatentSync 推理超时 (15分钟)")
shutil.copy(video_path, output_path)
return output_path
stdout_text = stdout.decode() if stdout else ""
stderr_text = stderr.decode() if stderr else ""
if process.returncode != 0:
logger.error(f"LatentSync 推理失败:\n{stderr_text}")
logger.error(f"stdout:\n{stdout_text[-1000:] if stdout_text else 'N/A'}")
# Fallback
shutil.copy(video_path, output_path)
return output_path
logger.info(f"LatentSync 输出:\n{stdout_text[-500:] if stdout_text else 'N/A'}")
# 检查输出文件
if temp_output.exists():
shutil.copy(temp_output, output_path)
logger.info(f"✅ 唇形同步完成: {output_path}")
return output_path
else:
logger.warning("⚠️ 未找到输出文件,使用 Fallback")
shutil.copy(video_path, output_path)
return output_path
except Exception as e:
logger.error(f"❌ 推理异常: {e}")
shutil.copy(video_path, output_path)
return output_path
async def _call_persistent_server(self, video_path: str, audio_path: str, output_path: str) -> str:
"""调用本地常驻服务 (server.py)"""
server_url = "http://localhost:8007"
logger.info(f"⚡ 调用常驻服务: {server_url}")
# 准备请求数据 (传递绝对路径)
payload = {
"video_path": str(Path(video_path).resolve()),
"audio_path": str(Path(audio_path).resolve()),
"video_out_path": str(Path(output_path).resolve()),
"inference_steps": settings.LATENTSYNC_INFERENCE_STEPS,
"guidance_scale": settings.LATENTSYNC_GUIDANCE_SCALE,
"seed": settings.LATENTSYNC_SEED,
"temp_dir": os.path.join(tempfile.gettempdir(), "latentsync_temp")
}
try:
async with httpx.AsyncClient(timeout=1200.0) as client:
# 先检查健康状态
try:
resp = await client.get(f"{server_url}/health", timeout=5.0)
if resp.status_code != 200:
logger.warning("⚠️ 常驻服务健康检查失败,回退到 subprocess")
return await self._local_generate_subprocess(video_path, audio_path, output_path)
except Exception:
logger.warning("⚠️ 无法连接常驻服务,回退到 subprocess")
return await self._local_generate_subprocess(video_path, audio_path, output_path)
# 发送生成请求
response = await client.post(f"{server_url}/lipsync", json=payload)
if response.status_code == 200:
result = response.json()
if Path(result["output_path"]).exists():
logger.info(f"✅ 常驻服务推理完成: {output_path}")
return output_path
logger.error(f"❌ 常驻服务报错: {response.text}")
raise RuntimeError(f"Server Error: {response.text}")
except Exception as e:
logger.error(f"❌ 常驻服务调用失败: {e}")
# 这里可以选择回退,或者直接报错
raise e
async def _local_generate_subprocess(self, video_path: str, audio_path: str, output_path: str) -> str:
"""
原有的 subprocess 回退逻辑
注意subprocess 回退已被禁用,原因如下:
1. subprocess 模式需要重新加载模型,消耗大量时间和显存
2. 如果常驻服务不可用,应该让用户知道并修复服务,而非静默回退
3. 避免双重资源消耗导致的 GPU OOM
如果常驻服务不可用,请检查:
- 服务是否启动: python scripts/server.py (在 models/LatentSync 目录)
- 端口是否被占用: lsof -i:8007
- GPU 显存是否充足: nvidia-smi
"""
raise RuntimeError(
"LatentSync 常驻服务不可用,无法进行唇形同步。"
"请确保 LatentSync 服务已启动 (cd models/LatentSync && python scripts/server.py)"
)
async def _remote_generate(
self,
video_path: str,
audio_path: str,
output_path: str,
fps: int
) -> str:
"""调用远程 LatentSync API 服务"""
logger.info(f"📡 调用远程 API: {self.api_url}")
try:
async with httpx.AsyncClient(timeout=600.0) as client:
with open(video_path, "rb") as vf, open(audio_path, "rb") as af:
files = {
"video": (Path(video_path).name, vf, "video/mp4"),
"audio": (Path(audio_path).name, af, "audio/mpeg"),
}
data = {"fps": fps}
response = await client.post(
f"{self.api_url}/lipsync",
files=files,
data=data
)
if response.status_code == 200:
with open(output_path, "wb") as f:
f.write(response.content)
logger.info(f"✅ 远程推理完成: {output_path}")
return output_path
else:
raise RuntimeError(f"API 错误: {response.status_code}")
except Exception as e:
logger.error(f"远程 API 调用失败: {e}")
shutil.copy(video_path, output_path)
return output_path
async def check_health(self) -> dict:
"""健康检查"""
conda_ok = self._check_conda_env()
weights_ok = self._check_weights()
# 检查 GPU
gpu_ok = False
gpu_name = "Unknown"
if conda_ok:
try:
result = subprocess.run(
[str(self.conda_python), "-c",
"import torch; print(torch.cuda.get_device_name(0) if torch.cuda.is_available() else 'N/A')"],
capture_output=True,
text=True,
env={**os.environ, "CUDA_VISIBLE_DEVICES": str(self.gpu_id)},
timeout=30
)
gpu_name = result.stdout.strip()
gpu_ok = gpu_name != "N/A" and result.returncode == 0
except:
pass
return {
"model": "LatentSync 1.6",
"conda_env": conda_ok,
"weights": weights_ok,
"gpu": gpu_ok,
"gpu_name": gpu_name,
"gpu_id": self.gpu_id,
"inference_steps": settings.LATENTSYNC_INFERENCE_STEPS,
"guidance_scale": settings.LATENTSYNC_GUIDANCE_SCALE,
"ready": conda_ok and weights_ok and gpu_ok
}