Files
ViGent2/backend/app/services/lipsync_service.py
2026-01-20 17:14:10 +08:00

365 lines
14 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
唇形同步服务
通过 subprocess 调用 LatentSync conda 环境进行推理
配置为使用 GPU1 (CUDA:1)
"""
import os
import shutil
import subprocess
import tempfile
import httpx
from pathlib import Path
from loguru import logger
from typing import Optional
from app.core.config import settings
class LipSyncService:
"""唇形同步服务 - LatentSync 1.6 集成 (Subprocess 方式)"""
def __init__(self):
self.use_local = settings.LATENTSYNC_LOCAL
self.api_url = settings.LATENTSYNC_API_URL
self.latentsync_dir = settings.LATENTSYNC_DIR
self.gpu_id = settings.LATENTSYNC_GPU_ID
# Conda 环境 Python 路径
# 根据服务器实际情况调整
self.conda_python = Path.home() / "ProgramFiles" / "miniconda3" / "envs" / "latentsync" / "bin" / "python"
# 运行时检测
self._weights_available: Optional[bool] = None
def _check_weights(self) -> bool:
"""检查模型权重是否存在"""
if self._weights_available is not None:
return self._weights_available
required_files = [
self.latentsync_dir / "checkpoints" / "latentsync_unet.pt",
self.latentsync_dir / "checkpoints" / "whisper" / "tiny.pt",
]
required_dirs = [
self.latentsync_dir / "configs" / "unet",
self.latentsync_dir / "latentsync",
self.latentsync_dir / "scripts",
]
files_ok = all(f.exists() for f in required_files)
dirs_ok = all(d.exists() for d in required_dirs)
self._weights_available = files_ok and dirs_ok
if self._weights_available:
logger.info("✅ LatentSync 权重文件已就绪")
else:
missing_files = [str(f) for f in required_files if not f.exists()]
missing_dirs = [str(d) for d in required_dirs if not d.exists()]
logger.warning(f"⚠️ 缺少文件: {missing_files}")
logger.warning(f"⚠️ 缺少目录: {missing_dirs}")
return self._weights_available
def _check_conda_env(self) -> bool:
"""检查 conda 环境是否可用"""
if not self.conda_python.exists():
logger.warning(f"⚠️ Conda Python 不存在: {self.conda_python}")
return False
return True
def _preprocess_video(self, video_path: str, output_path: str, target_height: int = 720) -> str:
"""
视频预处理:压缩视频以加速后续处理
- 限制最大高度为 target_height (默认720p)
- 保持宽高比
- 使用快速编码预设
Returns: 预处理后的视频路径
"""
import subprocess
import json
# 获取视频信息 (使用 JSON 格式更可靠)
probe_cmd = [
"ffprobe", "-v", "error",
"-select_streams", "v:0",
"-show_entries", "stream=height,width",
"-of", "json",
video_path
]
try:
result = subprocess.run(probe_cmd, capture_output=True, text=True, timeout=10)
if result.returncode != 0:
logger.warning(f"⚠️ ffprobe 失败: {result.stderr[:100]}")
return video_path
probe_data = json.loads(result.stdout)
streams = probe_data.get("streams", [])
if not streams:
logger.warning("⚠️ 无法获取视频流信息,跳过预处理")
return video_path
current_height = streams[0].get("height", 0)
current_width = streams[0].get("width", 0)
if current_height == 0:
logger.warning("⚠️ 视频高度为 0跳过预处理")
return video_path
logger.info(f"📹 原始视频分辨率: {current_width}×{current_height}")
except json.JSONDecodeError as e:
logger.warning(f"⚠️ ffprobe 输出解析失败: {e}")
return video_path
except subprocess.TimeoutExpired:
logger.warning("⚠️ ffprobe 超时,跳过预处理")
return video_path
except Exception as e:
logger.warning(f"⚠️ 获取视频信息失败: {e}")
return video_path
# 如果视频已经足够小,跳过压缩
if current_height <= target_height:
logger.info(f"📹 视频高度 {current_height}p <= {target_height}p无需压缩")
return video_path
logger.info(f"📹 预处理视频: {current_height}p → {target_height}p")
# 使用 FFmpeg 压缩
compress_cmd = [
"ffmpeg", "-y",
"-i", video_path,
"-vf", f"scale=-2:{target_height}", # 保持宽高比,高度设为 target_height
"-c:v", "libx264",
"-preset", "ultrafast", # 最快编码速度
"-crf", "23", # 质量因子
"-c:a", "copy", # 音频直接复制
output_path
]
try:
result = subprocess.run(
compress_cmd,
capture_output=True,
text=True,
timeout=120 # 增加超时时间到2分钟
)
if result.returncode == 0 and Path(output_path).exists():
original_size = Path(video_path).stat().st_size / 1024 / 1024
new_size = Path(output_path).stat().st_size / 1024 / 1024
logger.info(f"✅ 视频压缩完成: {original_size:.1f}MB → {new_size:.1f}MB")
return output_path
else:
logger.warning(f"⚠️ 视频压缩失败: {result.stderr[:200]}")
return video_path
except subprocess.TimeoutExpired:
logger.warning("⚠️ 视频压缩超时,使用原始视频")
return video_path
except Exception as e:
logger.warning(f"⚠️ 视频压缩异常: {e}")
return video_path
async def generate(
self,
video_path: str,
audio_path: str,
output_path: str,
fps: int = 25
) -> str:
"""生成唇形同步视频"""
logger.info(f"🎬 唇形同步任务: {Path(video_path).name} + {Path(audio_path).name}")
Path(output_path).parent.mkdir(parents=True, exist_ok=True)
if self.use_local:
return await self._local_generate(video_path, audio_path, output_path, fps)
else:
return await self._remote_generate(video_path, audio_path, output_path, fps)
async def _local_generate(
self,
video_path: str,
audio_path: str,
output_path: str,
fps: int
) -> str:
"""使用 subprocess 调用 LatentSync conda 环境"""
# 检查前置条件
if not self._check_conda_env():
logger.warning("⚠️ Conda 环境不可用,使用 Fallback")
shutil.copy(video_path, output_path)
return output_path
if not self._check_weights():
logger.warning("⚠️ 模型权重不存在,使用 Fallback")
shutil.copy(video_path, output_path)
return output_path
logger.info("🔄 调用 LatentSync 推理 (subprocess)...")
# 使用临时目录存放输出
with tempfile.TemporaryDirectory() as tmpdir:
tmpdir = Path(tmpdir)
temp_output = tmpdir / "output.mp4"
# 视频预处理:压缩高分辨率视频以加速处理
preprocessed_video = tmpdir / "preprocessed_input.mp4"
actual_video_path = self._preprocess_video(
video_path,
str(preprocessed_video),
target_height=720
)
# 构建命令
cmd = [
str(self.conda_python),
"-m", "scripts.inference",
"--unet_config_path", "configs/unet/stage2_512.yaml",
"--inference_ckpt_path", "checkpoints/latentsync_unet.pt",
"--inference_steps", str(settings.LATENTSYNC_INFERENCE_STEPS),
"--guidance_scale", str(settings.LATENTSYNC_GUIDANCE_SCALE),
"--video_path", str(actual_video_path), # 使用预处理后的视频
"--audio_path", str(audio_path),
"--video_out_path", str(temp_output),
"--seed", str(settings.LATENTSYNC_SEED),
"--temp_dir", str(tmpdir / "cache"),
]
if settings.LATENTSYNC_ENABLE_DEEPCACHE:
cmd.append("--enable_deepcache")
# 设置环境变量
env = os.environ.copy()
env["CUDA_VISIBLE_DEVICES"] = str(self.gpu_id)
logger.info(f"🖥️ 执行命令: {' '.join(cmd[:8])}...")
logger.info(f"🖥️ GPU: CUDA_VISIBLE_DEVICES={self.gpu_id}")
try:
import asyncio
# 使用 asyncio subprocess 实现真正的异步执行
# 这样事件循环可以继续处理其他请求(如进度查询)
process = await asyncio.create_subprocess_exec(
*cmd,
cwd=str(self.latentsync_dir),
env=env,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
# 等待进程完成,带超时
try:
stdout, stderr = await asyncio.wait_for(
process.communicate(),
timeout=900 # 15分钟超时
)
except asyncio.TimeoutError:
process.kill()
await process.wait()
logger.error("⏰ LatentSync 推理超时 (15分钟)")
shutil.copy(video_path, output_path)
return output_path
stdout_text = stdout.decode() if stdout else ""
stderr_text = stderr.decode() if stderr else ""
if process.returncode != 0:
logger.error(f"LatentSync 推理失败:\n{stderr_text}")
logger.error(f"stdout:\n{stdout_text[-1000:] if stdout_text else 'N/A'}")
# Fallback
shutil.copy(video_path, output_path)
return output_path
logger.info(f"LatentSync 输出:\n{stdout_text[-500:] if stdout_text else 'N/A'}")
# 检查输出文件
if temp_output.exists():
shutil.copy(temp_output, output_path)
logger.info(f"✅ 唇形同步完成: {output_path}")
return output_path
else:
logger.warning("⚠️ 未找到输出文件,使用 Fallback")
shutil.copy(video_path, output_path)
return output_path
except Exception as e:
logger.error(f"❌ 推理异常: {e}")
shutil.copy(video_path, output_path)
return output_path
async def _remote_generate(
self,
video_path: str,
audio_path: str,
output_path: str,
fps: int
) -> str:
"""调用远程 LatentSync API 服务"""
logger.info(f"📡 调用远程 API: {self.api_url}")
try:
async with httpx.AsyncClient(timeout=600.0) as client:
with open(video_path, "rb") as vf, open(audio_path, "rb") as af:
files = {
"video": (Path(video_path).name, vf, "video/mp4"),
"audio": (Path(audio_path).name, af, "audio/mpeg"),
}
data = {"fps": fps}
response = await client.post(
f"{self.api_url}/lipsync",
files=files,
data=data
)
if response.status_code == 200:
with open(output_path, "wb") as f:
f.write(response.content)
logger.info(f"✅ 远程推理完成: {output_path}")
return output_path
else:
raise RuntimeError(f"API 错误: {response.status_code}")
except Exception as e:
logger.error(f"远程 API 调用失败: {e}")
shutil.copy(video_path, output_path)
return output_path
async def check_health(self) -> dict:
"""健康检查"""
conda_ok = self._check_conda_env()
weights_ok = self._check_weights()
# 检查 GPU
gpu_ok = False
gpu_name = "Unknown"
if conda_ok:
try:
result = subprocess.run(
[str(self.conda_python), "-c",
"import torch; print(torch.cuda.get_device_name(0) if torch.cuda.is_available() else 'N/A')"],
capture_output=True,
text=True,
env={**os.environ, "CUDA_VISIBLE_DEVICES": str(self.gpu_id)},
timeout=30
)
gpu_name = result.stdout.strip()
gpu_ok = gpu_name != "N/A" and result.returncode == 0
except:
pass
return {
"model": "LatentSync 1.6",
"conda_env": conda_ok,
"weights": weights_ok,
"gpu": gpu_ok,
"gpu_name": gpu_name,
"gpu_id": self.gpu_id,
"inference_steps": settings.LATENTSYNC_INFERENCE_STEPS,
"guidance_scale": settings.LATENTSYNC_GUIDANCE_SCALE,
"ready": conda_ok and weights_ok and gpu_ok
}