""" 唇形同步服务 通过 subprocess 调用 LatentSync conda 环境进行推理 配置为使用 GPU1 (CUDA:1) """ import os import shutil import subprocess import tempfile import asyncio import httpx from pathlib import Path from loguru import logger from typing import Optional from app.core.config import settings class LipSyncService: """唇形同步服务 - LatentSync 1.6 集成 (Subprocess 方式)""" def __init__(self): self.use_local = settings.LATENTSYNC_LOCAL self.api_url = settings.LATENTSYNC_API_URL self.latentsync_dir = settings.LATENTSYNC_DIR self.gpu_id = settings.LATENTSYNC_GPU_ID self.use_server = settings.LATENTSYNC_USE_SERVER # GPU 并发锁 (Serial Queue) self._lock = asyncio.Lock() # Conda 环境 Python 路径 # 根据服务器实际情况调整 self.conda_python = Path.home() / "ProgramFiles" / "miniconda3" / "envs" / "latentsync" / "bin" / "python" # 运行时检测 self._weights_available: Optional[bool] = None def _check_weights(self) -> bool: """检查模型权重是否存在""" if self._weights_available is not None: return self._weights_available required_files = [ self.latentsync_dir / "checkpoints" / "latentsync_unet.pt", self.latentsync_dir / "checkpoints" / "whisper" / "tiny.pt", ] required_dirs = [ self.latentsync_dir / "configs" / "unet", self.latentsync_dir / "latentsync", self.latentsync_dir / "scripts", ] files_ok = all(f.exists() for f in required_files) dirs_ok = all(d.exists() for d in required_dirs) self._weights_available = files_ok and dirs_ok if self._weights_available: logger.info("✅ LatentSync 权重文件已就绪") else: missing_files = [str(f) for f in required_files if not f.exists()] missing_dirs = [str(d) for d in required_dirs if not d.exists()] logger.warning(f"⚠️ 缺少文件: {missing_files}") logger.warning(f"⚠️ 缺少目录: {missing_dirs}") return self._weights_available def _check_conda_env(self) -> bool: """检查 conda 环境是否可用""" if not self.conda_python.exists(): logger.warning(f"⚠️ Conda Python 不存在: {self.conda_python}") return False return True def _get_media_duration(self, media_path: str) -> Optional[float]: """获取音频或视频的时长(秒)""" try: cmd = [ "ffprobe", "-v", "error", "-show_entries", "format=duration", "-of", "default=noprint_wrappers=1:nokey=1", media_path ] result = subprocess.run(cmd, capture_output=True, text=True, timeout=10) if result.returncode == 0: return float(result.stdout.strip()) except Exception as e: logger.warning(f"⚠️ 获取媒体时长失败: {e}") return None def _loop_video_to_duration(self, video_path: str, output_path: str, target_duration: float) -> str: """ 循环视频以匹配目标时长 使用 FFmpeg stream_loop 实现无缝循环 """ try: cmd = [ "ffmpeg", "-y", "-stream_loop", "-1", # 无限循环 "-i", video_path, "-t", str(target_duration), # 截取到目标时长 "-c:v", "libx264", "-preset", "fast", "-crf", "18", "-an", # 去掉原音频 output_path ] result = subprocess.run(cmd, capture_output=True, text=True, timeout=300) if result.returncode == 0 and Path(output_path).exists(): logger.info(f"✅ 视频循环完成: {target_duration:.1f}s") return output_path else: logger.warning(f"⚠️ 视频循环失败: {result.stderr[:200]}") return video_path except Exception as e: logger.warning(f"⚠️ 视频循环异常: {e}") return video_path def _preprocess_video(self, video_path: str, output_path: str, target_height: int = 720) -> str: """ 视频预处理:压缩视频以加速后续处理 - 限制最大高度为 target_height (默认720p) - 保持宽高比 - 使用快速编码预设 Returns: 预处理后的视频路径 """ import subprocess import json # 获取视频信息 (使用 JSON 格式更可靠) probe_cmd = [ "ffprobe", "-v", "error", "-select_streams", "v:0", "-show_entries", "stream=height,width", "-of", "json", video_path ] try: result = subprocess.run(probe_cmd, capture_output=True, text=True, timeout=10) if result.returncode != 0: logger.warning(f"⚠️ ffprobe 失败: {result.stderr[:100]}") return video_path probe_data = json.loads(result.stdout) streams = probe_data.get("streams", []) if not streams: logger.warning("⚠️ 无法获取视频流信息,跳过预处理") return video_path current_height = streams[0].get("height", 0) current_width = streams[0].get("width", 0) if current_height == 0: logger.warning("⚠️ 视频高度为 0,跳过预处理") return video_path logger.info(f"📹 原始视频分辨率: {current_width}×{current_height}") except json.JSONDecodeError as e: logger.warning(f"⚠️ ffprobe 输出解析失败: {e}") return video_path except subprocess.TimeoutExpired: logger.warning("⚠️ ffprobe 超时,跳过预处理") return video_path except Exception as e: logger.warning(f"⚠️ 获取视频信息失败: {e}") return video_path # 如果视频已经足够小,跳过压缩 if current_height <= target_height: logger.info(f"📹 视频高度 {current_height}p <= {target_height}p,无需压缩") return video_path logger.info(f"📹 预处理视频: {current_height}p → {target_height}p") # 使用 FFmpeg 压缩 compress_cmd = [ "ffmpeg", "-y", "-i", video_path, "-vf", f"scale=-2:{target_height}", # 保持宽高比,高度设为 target_height "-c:v", "libx264", "-preset", "ultrafast", # 最快编码速度 "-crf", "23", # 质量因子 "-c:a", "copy", # 音频直接复制 output_path ] try: result = subprocess.run( compress_cmd, capture_output=True, text=True, timeout=120 # 增加超时时间到2分钟 ) if result.returncode == 0 and Path(output_path).exists(): original_size = Path(video_path).stat().st_size / 1024 / 1024 new_size = Path(output_path).stat().st_size / 1024 / 1024 logger.info(f"✅ 视频压缩完成: {original_size:.1f}MB → {new_size:.1f}MB") return output_path else: logger.warning(f"⚠️ 视频压缩失败: {result.stderr[:200]}") return video_path except subprocess.TimeoutExpired: logger.warning("⚠️ 视频压缩超时,使用原始视频") return video_path except Exception as e: logger.warning(f"⚠️ 视频压缩异常: {e}") return video_path async def generate( self, video_path: str, audio_path: str, output_path: str, fps: int = 25 ) -> str: """生成唇形同步视频""" logger.info(f"🎬 唇形同步任务: {Path(video_path).name} + {Path(audio_path).name}") Path(output_path).parent.mkdir(parents=True, exist_ok=True) if self.use_local: return await self._local_generate(video_path, audio_path, output_path, fps) else: return await self._remote_generate(video_path, audio_path, output_path, fps) async def _local_generate( self, video_path: str, audio_path: str, output_path: str, fps: int ) -> str: """使用 subprocess 调用 LatentSync conda 环境""" # 检查前置条件 if not self._check_conda_env(): logger.warning("⚠️ Conda 环境不可用,使用 Fallback") shutil.copy(video_path, output_path) return output_path if not self._check_weights(): logger.warning("⚠️ 模型权重不存在,使用 Fallback") shutil.copy(video_path, output_path) return output_path logger.info("⏳ 等待 GPU 资源 (排队中)...") async with self._lock: # 使用临时目录存放中间文件 with tempfile.TemporaryDirectory() as tmpdir: tmpdir = Path(tmpdir) # 获取音频和视频时长 audio_duration = self._get_media_duration(audio_path) video_duration = self._get_media_duration(video_path) # 如果音频比视频长,循环视频以匹配音频长度 if audio_duration and video_duration and audio_duration > video_duration + 0.5: logger.info(f"🔄 音频({audio_duration:.1f}s) > 视频({video_duration:.1f}s),循环视频...") looped_video = tmpdir / "looped_input.mp4" actual_video_path = self._loop_video_to_duration( video_path, str(looped_video), audio_duration ) else: actual_video_path = video_path if self.use_server: # 模式 A: 调用常驻服务 (加速模式) return await self._call_persistent_server(actual_video_path, audio_path, output_path) logger.info("🔄 调用 LatentSync 推理 (subprocess)...") temp_output = tmpdir / "output.mp4" # 构建命令 cmd = [ str(self.conda_python), "-m", "scripts.inference", "--unet_config_path", "configs/unet/stage2_512.yaml", "--inference_ckpt_path", "checkpoints/latentsync_unet.pt", "--inference_steps", str(settings.LATENTSYNC_INFERENCE_STEPS), "--guidance_scale", str(settings.LATENTSYNC_GUIDANCE_SCALE), "--video_path", str(actual_video_path), # 使用预处理后的视频 "--audio_path", str(audio_path), "--video_out_path", str(temp_output), "--seed", str(settings.LATENTSYNC_SEED), "--temp_dir", str(tmpdir / "cache"), ] if settings.LATENTSYNC_ENABLE_DEEPCACHE: cmd.append("--enable_deepcache") # 设置环境变量 env = os.environ.copy() env["CUDA_VISIBLE_DEVICES"] = str(self.gpu_id) logger.info(f"🖥️ 执行命令: {' '.join(cmd[:8])}...") logger.info(f"🖥️ GPU: CUDA_VISIBLE_DEVICES={self.gpu_id}") try: # 使用 asyncio subprocess 实现真正的异步执行 # 这样事件循环可以继续处理其他请求(如进度查询) process = await asyncio.create_subprocess_exec( *cmd, cwd=str(self.latentsync_dir), env=env, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) # 等待进程完成,带超时 try: stdout, stderr = await asyncio.wait_for( process.communicate(), timeout=900 # 15分钟超时 ) except asyncio.TimeoutError: process.kill() await process.wait() logger.error("⏰ LatentSync 推理超时 (15分钟)") shutil.copy(video_path, output_path) return output_path stdout_text = stdout.decode() if stdout else "" stderr_text = stderr.decode() if stderr else "" if process.returncode != 0: logger.error(f"LatentSync 推理失败:\n{stderr_text}") logger.error(f"stdout:\n{stdout_text[-1000:] if stdout_text else 'N/A'}") # Fallback shutil.copy(video_path, output_path) return output_path logger.info(f"LatentSync 输出:\n{stdout_text[-500:] if stdout_text else 'N/A'}") # 检查输出文件 if temp_output.exists(): shutil.copy(temp_output, output_path) logger.info(f"✅ 唇形同步完成: {output_path}") return output_path else: logger.warning("⚠️ 未找到输出文件,使用 Fallback") shutil.copy(video_path, output_path) return output_path except Exception as e: logger.error(f"❌ 推理异常: {e}") shutil.copy(video_path, output_path) return output_path async def _call_persistent_server(self, video_path: str, audio_path: str, output_path: str) -> str: """调用本地常驻服务 (server.py)""" server_url = "http://localhost:8007" logger.info(f"⚡ 调用常驻服务: {server_url}") # 准备请求数据 (传递绝对路径) payload = { "video_path": str(Path(video_path).resolve()), "audio_path": str(Path(audio_path).resolve()), "video_out_path": str(Path(output_path).resolve()), "inference_steps": settings.LATENTSYNC_INFERENCE_STEPS, "guidance_scale": settings.LATENTSYNC_GUIDANCE_SCALE, "seed": settings.LATENTSYNC_SEED, "temp_dir": os.path.join(tempfile.gettempdir(), "latentsync_temp") } try: async with httpx.AsyncClient(timeout=1200.0) as client: # 先检查健康状态 try: resp = await client.get(f"{server_url}/health", timeout=5.0) if resp.status_code != 200: logger.warning("⚠️ 常驻服务健康检查失败,回退到 subprocess") return await self._local_generate_subprocess(video_path, audio_path, output_path) except Exception: logger.warning("⚠️ 无法连接常驻服务,回退到 subprocess") return await self._local_generate_subprocess(video_path, audio_path, output_path) # 发送生成请求 response = await client.post(f"{server_url}/lipsync", json=payload) if response.status_code == 200: result = response.json() if Path(result["output_path"]).exists(): logger.info(f"✅ 常驻服务推理完成: {output_path}") return output_path logger.error(f"❌ 常驻服务报错: {response.text}") raise RuntimeError(f"Server Error: {response.text}") except Exception as e: logger.error(f"❌ 常驻服务调用失败: {e}") # 这里可以选择回退,或者直接报错 raise e async def _local_generate_subprocess(self, video_path: str, audio_path: str, output_path: str) -> str: """ 原有的 subprocess 回退逻辑 注意:subprocess 回退已被禁用,原因如下: 1. subprocess 模式需要重新加载模型,消耗大量时间和显存 2. 如果常驻服务不可用,应该让用户知道并修复服务,而非静默回退 3. 避免双重资源消耗导致的 GPU OOM 如果常驻服务不可用,请检查: - 服务是否启动: python scripts/server.py (在 models/LatentSync 目录) - 端口是否被占用: lsof -i:8007 - GPU 显存是否充足: nvidia-smi """ raise RuntimeError( "LatentSync 常驻服务不可用,无法进行唇形同步。" "请确保 LatentSync 服务已启动 (cd models/LatentSync && python scripts/server.py)" ) async def _remote_generate( self, video_path: str, audio_path: str, output_path: str, fps: int ) -> str: """调用远程 LatentSync API 服务""" logger.info(f"📡 调用远程 API: {self.api_url}") try: async with httpx.AsyncClient(timeout=600.0) as client: with open(video_path, "rb") as vf, open(audio_path, "rb") as af: files = { "video": (Path(video_path).name, vf, "video/mp4"), "audio": (Path(audio_path).name, af, "audio/mpeg"), } data = {"fps": fps} response = await client.post( f"{self.api_url}/lipsync", files=files, data=data ) if response.status_code == 200: with open(output_path, "wb") as f: f.write(response.content) logger.info(f"✅ 远程推理完成: {output_path}") return output_path else: raise RuntimeError(f"API 错误: {response.status_code}") except Exception as e: logger.error(f"远程 API 调用失败: {e}") shutil.copy(video_path, output_path) return output_path async def check_health(self) -> dict: """健康检查""" conda_ok = self._check_conda_env() weights_ok = self._check_weights() # 检查 GPU gpu_ok = False gpu_name = "Unknown" if conda_ok: try: result = subprocess.run( [str(self.conda_python), "-c", "import torch; print(torch.cuda.get_device_name(0) if torch.cuda.is_available() else 'N/A')"], capture_output=True, text=True, env={**os.environ, "CUDA_VISIBLE_DEVICES": str(self.gpu_id)}, timeout=30 ) gpu_name = result.stdout.strip() gpu_ok = gpu_name != "N/A" and result.returncode == 0 except: pass return { "model": "LatentSync 1.6", "conda_env": conda_ok, "weights": weights_ok, "gpu": gpu_ok, "gpu_name": gpu_name, "gpu_id": self.gpu_id, "inference_steps": settings.LATENTSYNC_INFERENCE_STEPS, "guidance_scale": settings.LATENTSYNC_GUIDANCE_SCALE, "ready": conda_ok and weights_ok and gpu_ok }