491 lines
21 KiB
Python
491 lines
21 KiB
Python
"""
|
||
唇形同步服务
|
||
通过 subprocess 调用 LatentSync conda 环境进行推理
|
||
配置为使用 GPU1 (CUDA:1)
|
||
"""
|
||
import os
|
||
import shutil
|
||
import subprocess
|
||
import tempfile
|
||
import asyncio
|
||
import httpx
|
||
from pathlib import Path
|
||
from loguru import logger
|
||
from typing import Optional
|
||
|
||
from app.core.config import settings
|
||
|
||
|
||
class LipSyncService:
|
||
"""唇形同步服务 - LatentSync 1.6 集成 (Subprocess 方式)"""
|
||
|
||
def __init__(self):
|
||
self.use_local = settings.LATENTSYNC_LOCAL
|
||
self.api_url = settings.LATENTSYNC_API_URL
|
||
self.latentsync_dir = settings.LATENTSYNC_DIR
|
||
self.gpu_id = settings.LATENTSYNC_GPU_ID
|
||
self.use_server = settings.LATENTSYNC_USE_SERVER
|
||
|
||
# GPU 并发锁 (Serial Queue)
|
||
self._lock = asyncio.Lock()
|
||
|
||
# Conda 环境 Python 路径
|
||
# 根据服务器实际情况调整
|
||
self.conda_python = Path.home() / "ProgramFiles" / "miniconda3" / "envs" / "latentsync" / "bin" / "python"
|
||
|
||
# 运行时检测
|
||
self._weights_available: Optional[bool] = None
|
||
|
||
def _check_weights(self) -> bool:
|
||
"""检查模型权重是否存在"""
|
||
if self._weights_available is not None:
|
||
return self._weights_available
|
||
|
||
required_files = [
|
||
self.latentsync_dir / "checkpoints" / "latentsync_unet.pt",
|
||
self.latentsync_dir / "checkpoints" / "whisper" / "tiny.pt",
|
||
]
|
||
|
||
required_dirs = [
|
||
self.latentsync_dir / "configs" / "unet",
|
||
self.latentsync_dir / "latentsync",
|
||
self.latentsync_dir / "scripts",
|
||
]
|
||
|
||
files_ok = all(f.exists() for f in required_files)
|
||
dirs_ok = all(d.exists() for d in required_dirs)
|
||
|
||
self._weights_available = files_ok and dirs_ok
|
||
|
||
if self._weights_available:
|
||
logger.info("✅ LatentSync 权重文件已就绪")
|
||
else:
|
||
missing_files = [str(f) for f in required_files if not f.exists()]
|
||
missing_dirs = [str(d) for d in required_dirs if not d.exists()]
|
||
logger.warning(f"⚠️ 缺少文件: {missing_files}")
|
||
logger.warning(f"⚠️ 缺少目录: {missing_dirs}")
|
||
|
||
return self._weights_available
|
||
|
||
def _check_conda_env(self) -> bool:
|
||
"""检查 conda 环境是否可用"""
|
||
if not self.conda_python.exists():
|
||
logger.warning(f"⚠️ Conda Python 不存在: {self.conda_python}")
|
||
return False
|
||
return True
|
||
|
||
def _get_media_duration(self, media_path: str) -> Optional[float]:
|
||
"""获取音频或视频的时长(秒)"""
|
||
try:
|
||
cmd = [
|
||
"ffprobe", "-v", "error",
|
||
"-show_entries", "format=duration",
|
||
"-of", "default=noprint_wrappers=1:nokey=1",
|
||
media_path
|
||
]
|
||
result = subprocess.run(cmd, capture_output=True, text=True, timeout=10)
|
||
if result.returncode == 0:
|
||
return float(result.stdout.strip())
|
||
except Exception as e:
|
||
logger.warning(f"⚠️ 获取媒体时长失败: {e}")
|
||
return None
|
||
|
||
def _loop_video_to_duration(self, video_path: str, output_path: str, target_duration: float) -> str:
|
||
"""
|
||
循环视频以匹配目标时长
|
||
使用 FFmpeg stream_loop 实现无缝循环
|
||
"""
|
||
try:
|
||
cmd = [
|
||
"ffmpeg", "-y",
|
||
"-stream_loop", "-1", # 无限循环
|
||
"-i", video_path,
|
||
"-t", str(target_duration), # 截取到目标时长
|
||
"-c:v", "libx264",
|
||
"-preset", "fast",
|
||
"-crf", "18",
|
||
"-an", # 去掉原音频
|
||
output_path
|
||
]
|
||
result = subprocess.run(cmd, capture_output=True, text=True, timeout=300)
|
||
if result.returncode == 0 and Path(output_path).exists():
|
||
logger.info(f"✅ 视频循环完成: {target_duration:.1f}s")
|
||
return output_path
|
||
else:
|
||
logger.warning(f"⚠️ 视频循环失败: {result.stderr[:200]}")
|
||
return video_path
|
||
except Exception as e:
|
||
logger.warning(f"⚠️ 视频循环异常: {e}")
|
||
return video_path
|
||
|
||
def _preprocess_video(self, video_path: str, output_path: str, target_height: int = 720) -> str:
|
||
"""
|
||
视频预处理:压缩视频以加速后续处理
|
||
- 限制最大高度为 target_height (默认720p)
|
||
- 保持宽高比
|
||
- 使用快速编码预设
|
||
|
||
Returns: 预处理后的视频路径
|
||
"""
|
||
import subprocess
|
||
import json
|
||
|
||
# 获取视频信息 (使用 JSON 格式更可靠)
|
||
probe_cmd = [
|
||
"ffprobe", "-v", "error",
|
||
"-select_streams", "v:0",
|
||
"-show_entries", "stream=height,width",
|
||
"-of", "json",
|
||
video_path
|
||
]
|
||
try:
|
||
result = subprocess.run(probe_cmd, capture_output=True, text=True, timeout=10)
|
||
if result.returncode != 0:
|
||
logger.warning(f"⚠️ ffprobe 失败: {result.stderr[:100]}")
|
||
return video_path
|
||
|
||
probe_data = json.loads(result.stdout)
|
||
streams = probe_data.get("streams", [])
|
||
if not streams:
|
||
logger.warning("⚠️ 无法获取视频流信息,跳过预处理")
|
||
return video_path
|
||
|
||
current_height = streams[0].get("height", 0)
|
||
current_width = streams[0].get("width", 0)
|
||
|
||
if current_height == 0:
|
||
logger.warning("⚠️ 视频高度为 0,跳过预处理")
|
||
return video_path
|
||
|
||
logger.info(f"📹 原始视频分辨率: {current_width}×{current_height}")
|
||
|
||
except json.JSONDecodeError as e:
|
||
logger.warning(f"⚠️ ffprobe 输出解析失败: {e}")
|
||
return video_path
|
||
except subprocess.TimeoutExpired:
|
||
logger.warning("⚠️ ffprobe 超时,跳过预处理")
|
||
return video_path
|
||
except Exception as e:
|
||
logger.warning(f"⚠️ 获取视频信息失败: {e}")
|
||
return video_path
|
||
|
||
# 如果视频已经足够小,跳过压缩
|
||
if current_height <= target_height:
|
||
logger.info(f"📹 视频高度 {current_height}p <= {target_height}p,无需压缩")
|
||
return video_path
|
||
|
||
logger.info(f"📹 预处理视频: {current_height}p → {target_height}p")
|
||
|
||
# 使用 FFmpeg 压缩
|
||
compress_cmd = [
|
||
"ffmpeg", "-y",
|
||
"-i", video_path,
|
||
"-vf", f"scale=-2:{target_height}", # 保持宽高比,高度设为 target_height
|
||
"-c:v", "libx264",
|
||
"-preset", "ultrafast", # 最快编码速度
|
||
"-crf", "23", # 质量因子
|
||
"-c:a", "copy", # 音频直接复制
|
||
output_path
|
||
]
|
||
|
||
try:
|
||
result = subprocess.run(
|
||
compress_cmd,
|
||
capture_output=True,
|
||
text=True,
|
||
timeout=120 # 增加超时时间到2分钟
|
||
)
|
||
if result.returncode == 0 and Path(output_path).exists():
|
||
original_size = Path(video_path).stat().st_size / 1024 / 1024
|
||
new_size = Path(output_path).stat().st_size / 1024 / 1024
|
||
logger.info(f"✅ 视频压缩完成: {original_size:.1f}MB → {new_size:.1f}MB")
|
||
return output_path
|
||
else:
|
||
logger.warning(f"⚠️ 视频压缩失败: {result.stderr[:200]}")
|
||
return video_path
|
||
except subprocess.TimeoutExpired:
|
||
logger.warning("⚠️ 视频压缩超时,使用原始视频")
|
||
return video_path
|
||
except Exception as e:
|
||
logger.warning(f"⚠️ 视频压缩异常: {e}")
|
||
return video_path
|
||
|
||
async def generate(
|
||
self,
|
||
video_path: str,
|
||
audio_path: str,
|
||
output_path: str,
|
||
fps: int = 25
|
||
) -> str:
|
||
"""生成唇形同步视频"""
|
||
logger.info(f"🎬 唇形同步任务: {Path(video_path).name} + {Path(audio_path).name}")
|
||
Path(output_path).parent.mkdir(parents=True, exist_ok=True)
|
||
|
||
if self.use_local:
|
||
return await self._local_generate(video_path, audio_path, output_path, fps)
|
||
else:
|
||
return await self._remote_generate(video_path, audio_path, output_path, fps)
|
||
|
||
async def _local_generate(
|
||
self,
|
||
video_path: str,
|
||
audio_path: str,
|
||
output_path: str,
|
||
fps: int
|
||
) -> str:
|
||
"""使用 subprocess 调用 LatentSync conda 环境"""
|
||
|
||
# 检查前置条件
|
||
if not self._check_conda_env():
|
||
logger.warning("⚠️ Conda 环境不可用,使用 Fallback")
|
||
shutil.copy(video_path, output_path)
|
||
return output_path
|
||
|
||
if not self._check_weights():
|
||
logger.warning("⚠️ 模型权重不存在,使用 Fallback")
|
||
shutil.copy(video_path, output_path)
|
||
return output_path
|
||
|
||
logger.info("⏳ 等待 GPU 资源 (排队中)...")
|
||
async with self._lock:
|
||
# 使用临时目录存放中间文件
|
||
with tempfile.TemporaryDirectory() as tmpdir:
|
||
tmpdir = Path(tmpdir)
|
||
|
||
# 获取音频和视频时长
|
||
audio_duration = self._get_media_duration(audio_path)
|
||
video_duration = self._get_media_duration(video_path)
|
||
|
||
# 如果音频比视频长,循环视频以匹配音频长度
|
||
if audio_duration and video_duration and audio_duration > video_duration + 0.5:
|
||
logger.info(f"🔄 音频({audio_duration:.1f}s) > 视频({video_duration:.1f}s),循环视频...")
|
||
looped_video = tmpdir / "looped_input.mp4"
|
||
actual_video_path = self._loop_video_to_duration(
|
||
video_path,
|
||
str(looped_video),
|
||
audio_duration
|
||
)
|
||
else:
|
||
actual_video_path = video_path
|
||
|
||
if self.use_server:
|
||
# 模式 A: 调用常驻服务 (加速模式)
|
||
return await self._call_persistent_server(actual_video_path, audio_path, output_path)
|
||
|
||
logger.info("🔄 调用 LatentSync 推理 (subprocess)...")
|
||
|
||
temp_output = tmpdir / "output.mp4"
|
||
|
||
# 构建命令
|
||
cmd = [
|
||
str(self.conda_python),
|
||
"-m", "scripts.inference",
|
||
"--unet_config_path", "configs/unet/stage2_512.yaml",
|
||
"--inference_ckpt_path", "checkpoints/latentsync_unet.pt",
|
||
"--inference_steps", str(settings.LATENTSYNC_INFERENCE_STEPS),
|
||
"--guidance_scale", str(settings.LATENTSYNC_GUIDANCE_SCALE),
|
||
"--video_path", str(actual_video_path), # 使用预处理后的视频
|
||
"--audio_path", str(audio_path),
|
||
"--video_out_path", str(temp_output),
|
||
"--seed", str(settings.LATENTSYNC_SEED),
|
||
"--temp_dir", str(tmpdir / "cache"),
|
||
]
|
||
|
||
if settings.LATENTSYNC_ENABLE_DEEPCACHE:
|
||
cmd.append("--enable_deepcache")
|
||
|
||
# 设置环境变量
|
||
env = os.environ.copy()
|
||
env["CUDA_VISIBLE_DEVICES"] = str(self.gpu_id)
|
||
|
||
logger.info(f"🖥️ 执行命令: {' '.join(cmd[:8])}...")
|
||
logger.info(f"🖥️ GPU: CUDA_VISIBLE_DEVICES={self.gpu_id}")
|
||
|
||
try:
|
||
# 使用 asyncio subprocess 实现真正的异步执行
|
||
# 这样事件循环可以继续处理其他请求(如进度查询)
|
||
process = await asyncio.create_subprocess_exec(
|
||
*cmd,
|
||
cwd=str(self.latentsync_dir),
|
||
env=env,
|
||
stdout=asyncio.subprocess.PIPE,
|
||
stderr=asyncio.subprocess.PIPE,
|
||
)
|
||
|
||
# 等待进程完成,带超时
|
||
try:
|
||
stdout, stderr = await asyncio.wait_for(
|
||
process.communicate(),
|
||
timeout=900 # 15分钟超时
|
||
)
|
||
except asyncio.TimeoutError:
|
||
process.kill()
|
||
await process.wait()
|
||
logger.error("⏰ LatentSync 推理超时 (15分钟)")
|
||
shutil.copy(video_path, output_path)
|
||
return output_path
|
||
|
||
stdout_text = stdout.decode() if stdout else ""
|
||
stderr_text = stderr.decode() if stderr else ""
|
||
|
||
if process.returncode != 0:
|
||
logger.error(f"LatentSync 推理失败:\n{stderr_text}")
|
||
logger.error(f"stdout:\n{stdout_text[-1000:] if stdout_text else 'N/A'}")
|
||
# Fallback
|
||
shutil.copy(video_path, output_path)
|
||
return output_path
|
||
|
||
logger.info(f"LatentSync 输出:\n{stdout_text[-500:] if stdout_text else 'N/A'}")
|
||
|
||
# 检查输出文件
|
||
if temp_output.exists():
|
||
shutil.copy(temp_output, output_path)
|
||
logger.info(f"✅ 唇形同步完成: {output_path}")
|
||
return output_path
|
||
else:
|
||
logger.warning("⚠️ 未找到输出文件,使用 Fallback")
|
||
shutil.copy(video_path, output_path)
|
||
return output_path
|
||
|
||
except Exception as e:
|
||
logger.error(f"❌ 推理异常: {e}")
|
||
shutil.copy(video_path, output_path)
|
||
return output_path
|
||
|
||
async def _call_persistent_server(self, video_path: str, audio_path: str, output_path: str) -> str:
|
||
"""调用本地常驻服务 (server.py)"""
|
||
server_url = "http://localhost:8007"
|
||
logger.info(f"⚡ 调用常驻服务: {server_url}")
|
||
|
||
# 准备请求数据 (传递绝对路径)
|
||
payload = {
|
||
"video_path": str(Path(video_path).resolve()),
|
||
"audio_path": str(Path(audio_path).resolve()),
|
||
"video_out_path": str(Path(output_path).resolve()),
|
||
"inference_steps": settings.LATENTSYNC_INFERENCE_STEPS,
|
||
"guidance_scale": settings.LATENTSYNC_GUIDANCE_SCALE,
|
||
"seed": settings.LATENTSYNC_SEED,
|
||
"temp_dir": os.path.join(tempfile.gettempdir(), "latentsync_temp")
|
||
}
|
||
|
||
try:
|
||
async with httpx.AsyncClient(timeout=3600.0) as client:
|
||
# 先检查健康状态
|
||
try:
|
||
resp = await client.get(f"{server_url}/health", timeout=5.0)
|
||
if resp.status_code != 200:
|
||
logger.warning("⚠️ 常驻服务健康检查失败,回退到 subprocess")
|
||
return await self._local_generate_subprocess(video_path, audio_path, output_path)
|
||
except Exception:
|
||
logger.warning("⚠️ 无法连接常驻服务,回退到 subprocess")
|
||
return await self._local_generate_subprocess(video_path, audio_path, output_path)
|
||
|
||
# 发送生成请求
|
||
response = await client.post(f"{server_url}/lipsync", json=payload)
|
||
|
||
if response.status_code == 200:
|
||
result = response.json()
|
||
if Path(result["output_path"]).exists():
|
||
logger.info(f"✅ 常驻服务推理完成: {output_path}")
|
||
return output_path
|
||
|
||
logger.error(f"❌ 常驻服务报错: {response.text}")
|
||
raise RuntimeError(f"Server Error: {response.text}")
|
||
|
||
except Exception as e:
|
||
logger.error(f"❌ 常驻服务调用失败: {e}")
|
||
# 这里可以选择回退,或者直接报错
|
||
raise e
|
||
|
||
async def _local_generate_subprocess(self, video_path: str, audio_path: str, output_path: str) -> str:
|
||
"""
|
||
原有的 subprocess 回退逻辑
|
||
|
||
注意:subprocess 回退已被禁用,原因如下:
|
||
1. subprocess 模式需要重新加载模型,消耗大量时间和显存
|
||
2. 如果常驻服务不可用,应该让用户知道并修复服务,而非静默回退
|
||
3. 避免双重资源消耗导致的 GPU OOM
|
||
|
||
如果常驻服务不可用,请检查:
|
||
- 服务是否启动: python scripts/server.py (在 models/LatentSync 目录)
|
||
- 端口是否被占用: lsof -i:8007
|
||
- GPU 显存是否充足: nvidia-smi
|
||
"""
|
||
raise RuntimeError(
|
||
"LatentSync 常驻服务不可用,无法进行唇形同步。"
|
||
"请确保 LatentSync 服务已启动 (cd models/LatentSync && python scripts/server.py)"
|
||
)
|
||
|
||
async def _remote_generate(
|
||
self,
|
||
video_path: str,
|
||
audio_path: str,
|
||
output_path: str,
|
||
fps: int
|
||
) -> str:
|
||
"""调用远程 LatentSync API 服务"""
|
||
logger.info(f"📡 调用远程 API: {self.api_url}")
|
||
|
||
try:
|
||
async with httpx.AsyncClient(timeout=600.0) as client:
|
||
with open(video_path, "rb") as vf, open(audio_path, "rb") as af:
|
||
files = {
|
||
"video": (Path(video_path).name, vf, "video/mp4"),
|
||
"audio": (Path(audio_path).name, af, "audio/mpeg"),
|
||
}
|
||
data = {"fps": fps}
|
||
|
||
response = await client.post(
|
||
f"{self.api_url}/lipsync",
|
||
files=files,
|
||
data=data
|
||
)
|
||
|
||
if response.status_code == 200:
|
||
with open(output_path, "wb") as f:
|
||
f.write(response.content)
|
||
logger.info(f"✅ 远程推理完成: {output_path}")
|
||
return output_path
|
||
else:
|
||
raise RuntimeError(f"API 错误: {response.status_code}")
|
||
|
||
except Exception as e:
|
||
logger.error(f"远程 API 调用失败: {e}")
|
||
shutil.copy(video_path, output_path)
|
||
return output_path
|
||
|
||
async def check_health(self) -> dict:
|
||
"""健康检查"""
|
||
conda_ok = self._check_conda_env()
|
||
weights_ok = self._check_weights()
|
||
|
||
# 检查 GPU
|
||
gpu_ok = False
|
||
gpu_name = "Unknown"
|
||
if conda_ok:
|
||
try:
|
||
result = subprocess.run(
|
||
[str(self.conda_python), "-c",
|
||
"import torch; print(torch.cuda.get_device_name(0) if torch.cuda.is_available() else 'N/A')"],
|
||
capture_output=True,
|
||
text=True,
|
||
env={**os.environ, "CUDA_VISIBLE_DEVICES": str(self.gpu_id)},
|
||
timeout=30
|
||
)
|
||
gpu_name = result.stdout.strip()
|
||
gpu_ok = gpu_name != "N/A" and result.returncode == 0
|
||
except:
|
||
pass
|
||
|
||
return {
|
||
"model": "LatentSync 1.6",
|
||
"conda_env": conda_ok,
|
||
"weights": weights_ok,
|
||
"gpu": gpu_ok,
|
||
"gpu_name": gpu_name,
|
||
"gpu_id": self.gpu_id,
|
||
"inference_steps": settings.LATENTSYNC_INFERENCE_STEPS,
|
||
"guidance_scale": settings.LATENTSYNC_GUIDANCE_SCALE,
|
||
"ready": conda_ok and weights_ok and gpu_ok
|
||
}
|