158 lines
4.5 KiB
Python
158 lines
4.5 KiB
Python
"""
|
|
MuseTalk API 服务
|
|
|
|
这个脚本将 MuseTalk 封装为 FastAPI 服务,
|
|
可以独立部署在 GPU 服务器上。
|
|
|
|
用法:
|
|
python musetalk_api.py --port 8001
|
|
"""
|
|
|
|
import os
|
|
import sys
|
|
import argparse
|
|
import tempfile
|
|
import shutil
|
|
from pathlib import Path
|
|
from typing import Optional
|
|
|
|
from fastapi import FastAPI, UploadFile, File, Form, HTTPException
|
|
from fastapi.responses import FileResponse
|
|
from fastapi.middleware.cors import CORSMiddleware
|
|
import uvicorn
|
|
|
|
# 添加 MuseTalk 路径
|
|
MUSETALK_DIR = Path(__file__).parent
|
|
sys.path.insert(0, str(MUSETALK_DIR))
|
|
|
|
app = FastAPI(
|
|
title="MuseTalk API",
|
|
description="唇形同步推理服务",
|
|
version="0.1.0"
|
|
)
|
|
|
|
app.add_middleware(
|
|
CORSMiddleware,
|
|
allow_origins=["*"],
|
|
allow_credentials=True,
|
|
allow_methods=["*"],
|
|
allow_headers=["*"],
|
|
)
|
|
|
|
# 全局模型实例 (懒加载)
|
|
_model = None
|
|
|
|
|
|
def get_model():
|
|
"""懒加载 MuseTalk 模型"""
|
|
global _model
|
|
if _model is None:
|
|
print("🔄 加载 MuseTalk 模型...")
|
|
# TODO: 根据 MuseTalk 实际 API 调整
|
|
# from musetalk.inference import MuseTalkInference
|
|
# _model = MuseTalkInference()
|
|
print("✅ MuseTalk 模型加载完成")
|
|
return _model
|
|
|
|
|
|
@app.get("/")
|
|
async def root():
|
|
return {"name": "MuseTalk API", "status": "ok"}
|
|
|
|
|
|
@app.get("/health")
|
|
async def health():
|
|
"""健康检查"""
|
|
return {"status": "healthy", "gpu": True}
|
|
|
|
|
|
@app.post("/lipsync")
|
|
async def lipsync(
|
|
video: UploadFile = File(..., description="输入视频文件"),
|
|
audio: UploadFile = File(..., description="音频文件"),
|
|
fps: int = Form(25, description="输出帧率")
|
|
):
|
|
"""
|
|
唇形同步推理
|
|
|
|
Args:
|
|
video: 输入视频 (静态人物)
|
|
audio: 驱动音频
|
|
fps: 输出帧率
|
|
|
|
Returns:
|
|
生成的视频文件
|
|
"""
|
|
# 创建临时目录
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
tmpdir = Path(tmpdir)
|
|
|
|
# 保存上传的文件
|
|
video_path = tmpdir / "input_video.mp4"
|
|
audio_path = tmpdir / "input_audio.wav"
|
|
output_path = tmpdir / "output.mp4"
|
|
|
|
with open(video_path, "wb") as f:
|
|
shutil.copyfileobj(video.file, f)
|
|
with open(audio_path, "wb") as f:
|
|
shutil.copyfileobj(audio.file, f)
|
|
|
|
try:
|
|
# 执行唇形同步
|
|
model = get_model()
|
|
|
|
# TODO: 调用实际的 MuseTalk 推理
|
|
# result = model.inference(
|
|
# source_video=str(video_path),
|
|
# driving_audio=str(audio_path),
|
|
# output_path=str(output_path),
|
|
# fps=fps
|
|
# )
|
|
|
|
# 临时: 使用 subprocess 调用 MuseTalk CLI
|
|
import subprocess
|
|
cmd = [
|
|
sys.executable, "-m", "scripts.inference",
|
|
"--video_path", str(video_path),
|
|
"--audio_path", str(audio_path),
|
|
"--output_path", str(output_path),
|
|
]
|
|
|
|
result = subprocess.run(
|
|
cmd,
|
|
cwd=str(MUSETALK_DIR),
|
|
capture_output=True,
|
|
text=True
|
|
)
|
|
|
|
if result.returncode != 0:
|
|
raise RuntimeError(f"MuseTalk 推理失败: {result.stderr}")
|
|
|
|
if not output_path.exists():
|
|
raise RuntimeError("输出文件不存在")
|
|
|
|
# 返回生成的视频
|
|
# 需要先复制到持久化位置
|
|
final_output = Path("outputs") / f"lipsync_{video.filename}"
|
|
final_output.parent.mkdir(exist_ok=True)
|
|
shutil.copy(output_path, final_output)
|
|
|
|
return FileResponse(
|
|
final_output,
|
|
media_type="video/mp4",
|
|
filename=f"lipsync_{video.filename}"
|
|
)
|
|
|
|
except Exception as e:
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
|
|
if __name__ == "__main__":
|
|
parser = argparse.ArgumentParser()
|
|
parser.add_argument("--port", type=int, default=8001)
|
|
parser.add_argument("--host", type=str, default="0.0.0.0")
|
|
args = parser.parse_args()
|
|
|
|
print(f"🚀 MuseTalk API 启动在 http://{args.host}:{args.port}")
|
|
uvicorn.run(app, host=args.host, port=args.port)
|