更新
This commit is contained in:
@@ -13,8 +13,9 @@ DEFAULT_TTS_VOICE=zh-CN-YunxiNeural
|
||||
|
||||
# =============== LatentSync 配置 ===============
|
||||
# GPU 选择 (0=第一块GPU, 1=第二块GPU)
|
||||
LATENTSYNC_GPU_ID=0
|
||||
LATENTSYNC_GPU_ID=1
|
||||
|
||||
# 使用本地模式 (true) 或远程 API (false)
|
||||
# 使用本地模式 (true) 或远程 API (false)
|
||||
LATENTSYNC_LOCAL=true
|
||||
|
||||
@@ -34,7 +35,7 @@ LATENTSYNC_GUIDANCE_SCALE=1.5
|
||||
LATENTSYNC_ENABLE_DEEPCACHE=true
|
||||
|
||||
# 随机种子 (设为 -1 则随机)
|
||||
LATENTSYNC_SEED=-1
|
||||
LATENTSYNC_SEED=1247
|
||||
|
||||
# =============== 上传配置 ===============
|
||||
# 最大上传文件大小 (MB)
|
||||
@@ -46,16 +47,17 @@ MAX_UPLOAD_SIZE_MB=500
|
||||
|
||||
# =============== Supabase 配置 ===============
|
||||
# 从 Supabase 项目设置 > API 获取
|
||||
SUPABASE_URL=your_supabase_url_here
|
||||
SUPABASE_KEY=your_supabase_anon_key_here
|
||||
SUPABASE_URL=http://localhost:8008/
|
||||
SUPABASE_PUBLIC_URL=https://api.hbyrkj.top
|
||||
SUPABASE_KEY=eyJhbGciOiAiSFMyNTYiLCAidHlwIjogIkpXVCJ9.eyJyb2xlIjogInNlcnZpY2Vfcm9sZSIsICJpc3MiOiAic3VwYWJhc2UiLCAiaWF0IjogMTc2OTQwNzU2NSwgImV4cCI6IDIwODQ3Njc1NjV9.LBPaimygpnM9o3mZ2Pi-iL8taJ90JjGbQ0HW6yFlmhg
|
||||
|
||||
# =============== JWT 配置 ===============
|
||||
# 用于签名 JWT Token 的密钥 (请更换为随机字符串)
|
||||
JWT_SECRET_KEY=generate_your_secure_random_key_here
|
||||
JWT_SECRET_KEY=F4MagRkf7nJsN-ag9AB7Q-30MbZRe7Iu4E9p9xRzyic
|
||||
JWT_ALGORITHM=HS256
|
||||
JWT_EXPIRE_HOURS=168
|
||||
|
||||
# =============== 管理员配置 ===============
|
||||
# 服务启动时自动创建的管理员账号
|
||||
ADMIN_EMAIL=admin@example.com
|
||||
ADMIN_PASSWORD=change_this_password_immediately
|
||||
ADMIN_EMAIL=lamnickdavid@gmail.com
|
||||
ADMIN_PASSWORD=lam1988324
|
||||
|
||||
@@ -1,100 +1,331 @@
|
||||
from fastapi import APIRouter, UploadFile, File, HTTPException
|
||||
from fastapi import APIRouter, UploadFile, File, HTTPException, Request, BackgroundTasks, Depends
|
||||
from app.core.config import settings
|
||||
import shutil
|
||||
from app.core.deps import get_current_user
|
||||
from app.services.storage import storage_service
|
||||
import re
|
||||
import time
|
||||
import traceback
|
||||
import os
|
||||
import aiofiles
|
||||
from pathlib import Path
|
||||
from loguru import logger
|
||||
|
||||
router = APIRouter()
|
||||
|
||||
|
||||
def sanitize_filename(filename: str) -> str:
|
||||
"""清理文件名,移除不安全字符"""
|
||||
# 移除路径分隔符和特殊字符
|
||||
safe_name = re.sub(r'[<>:"/\\|?*]', '_', filename)
|
||||
# 限制长度
|
||||
if len(safe_name) > 100:
|
||||
ext = Path(safe_name).suffix
|
||||
safe_name = safe_name[:100 - len(ext)] + ext
|
||||
return safe_name
|
||||
|
||||
async def process_and_upload(temp_file_path: str, original_filename: str, content_type: str, user_id: str):
|
||||
"""Background task to strip multipart headers and upload to Supabase"""
|
||||
try:
|
||||
logger.info(f"Processing raw upload: {temp_file_path} for user {user_id}")
|
||||
|
||||
# 1. Analyze file to find actual video content (strip multipart boundaries)
|
||||
# This is a simplified manual parser for a SINGLE file upload.
|
||||
# Structure:
|
||||
# --boundary
|
||||
# Content-Disposition: form-data; name="file"; filename="..."
|
||||
# Content-Type: video/mp4
|
||||
# \r\n\r\n
|
||||
# [DATA]
|
||||
# \r\n--boundary--
|
||||
|
||||
# We need to read the first few KB to find the header end
|
||||
start_offset = 0
|
||||
end_offset = 0
|
||||
boundary = b""
|
||||
|
||||
file_size = os.path.getsize(temp_file_path)
|
||||
|
||||
with open(temp_file_path, 'rb') as f:
|
||||
# Read first 4KB to find header
|
||||
head = f.read(4096)
|
||||
|
||||
# Find boundary
|
||||
first_line_end = head.find(b'\r\n')
|
||||
if first_line_end == -1:
|
||||
raise Exception("Could not find boundary in multipart body")
|
||||
|
||||
boundary = head[:first_line_end] # e.g. --boundary123
|
||||
logger.info(f"Detected boundary: {boundary}")
|
||||
|
||||
# Find end of headers (\r\n\r\n)
|
||||
header_end = head.find(b'\r\n\r\n')
|
||||
if header_end == -1:
|
||||
raise Exception("Could not find end of multipart headers")
|
||||
|
||||
start_offset = header_end + 4
|
||||
logger.info(f"Video data starts at offset: {start_offset}")
|
||||
|
||||
# Find end boundary (read from end of file)
|
||||
# It should be \r\n + boundary + -- + \r\n
|
||||
# We seek to end-200 bytes
|
||||
f.seek(max(0, file_size - 200))
|
||||
tail = f.read()
|
||||
|
||||
# The closing boundary is usually --boundary--
|
||||
# We look for the last occurrence of the boundary
|
||||
last_boundary_pos = tail.rfind(boundary)
|
||||
if last_boundary_pos != -1:
|
||||
# The data ends before \r\n + boundary
|
||||
# The tail buffer relative position needs to be converted to absolute
|
||||
end_pos_in_tail = last_boundary_pos
|
||||
# We also need to check for the preceding \r\n
|
||||
if end_pos_in_tail >= 2 and tail[end_pos_in_tail-2:end_pos_in_tail] == b'\r\n':
|
||||
end_pos_in_tail -= 2
|
||||
|
||||
# Absolute end offset
|
||||
end_offset = (file_size - 200) + last_boundary_pos
|
||||
# Correction for CRLF before boundary
|
||||
# Actually, simply: read until (file_size - len(tail) + last_boundary_pos) - 2
|
||||
end_offset = (max(0, file_size - 200) + last_boundary_pos) - 2
|
||||
else:
|
||||
logger.warning("Could not find closing boundary, assuming EOF")
|
||||
end_offset = file_size
|
||||
|
||||
logger.info(f"Video data ends at offset: {end_offset}. Total video size: {end_offset - start_offset}")
|
||||
|
||||
# 2. Extract and Upload to Supabase
|
||||
# Since we have the file on disk, we can just pass the file object (seeked) to upload_file?
|
||||
# Or if upload_file expects bytes/path, checking storage.py...
|
||||
# It takes `file_data` (bytes) or file-like?
|
||||
# supabase-py's `upload` method handles parsing if we pass a file object.
|
||||
# But we need to pass ONLY the video slice.
|
||||
# So we create a generator or a sliced file object?
|
||||
# Simpler: Read the slice into memory if < 1GB? Or copy to new temp file?
|
||||
# Copying to new temp file is safer for memory.
|
||||
|
||||
video_path = temp_file_path + "_video.mp4"
|
||||
with open(temp_file_path, 'rb') as src, open(video_path, 'wb') as dst:
|
||||
src.seek(start_offset)
|
||||
# Copy in chunks
|
||||
bytes_to_copy = end_offset - start_offset
|
||||
copied = 0
|
||||
while copied < bytes_to_copy:
|
||||
chunk_size = min(1024*1024*10, bytes_to_copy - copied) # 10MB chunks
|
||||
chunk = src.read(chunk_size)
|
||||
if not chunk:
|
||||
break
|
||||
dst.write(chunk)
|
||||
copied += len(chunk)
|
||||
|
||||
logger.info(f"Extracted video content to {video_path}")
|
||||
|
||||
# 3. Upload to Supabase with user isolation
|
||||
timestamp = int(time.time())
|
||||
safe_name = re.sub(r'[^a-zA-Z0-9._-]', '', original_filename)
|
||||
# 使用 user_id 作为目录前缀实现隔离
|
||||
storage_path = f"{user_id}/{timestamp}_{safe_name}"
|
||||
|
||||
# Use storage service (this calls Supabase which might do its own http request)
|
||||
# We read the cleaned video file
|
||||
with open(video_path, 'rb') as f:
|
||||
file_content = f.read() # Still reading into memory for simple upload call, but server has 32GB RAM so ok for 500MB
|
||||
await storage_service.upload_file(
|
||||
bucket=storage_service.BUCKET_MATERIALS,
|
||||
path=storage_path,
|
||||
file_data=file_content,
|
||||
content_type=content_type
|
||||
)
|
||||
|
||||
logger.info(f"Upload to Supabase complete: {storage_path}")
|
||||
|
||||
# Cleanup
|
||||
os.remove(temp_file_path)
|
||||
os.remove(video_path)
|
||||
|
||||
return storage_path
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Background upload processing failed: {e}\n{traceback.format_exc()}")
|
||||
raise
|
||||
|
||||
|
||||
@router.post("")
|
||||
async def upload_material(file: UploadFile = File(...)):
|
||||
if not file.filename.lower().endswith(('.mp4', '.mov', '.avi')):
|
||||
raise HTTPException(400, "Invalid format")
|
||||
async def upload_material(
|
||||
request: Request,
|
||||
background_tasks: BackgroundTasks,
|
||||
current_user: dict = Depends(get_current_user)
|
||||
):
|
||||
user_id = current_user["id"]
|
||||
logger.info(f"ENTERED upload_material (Streaming Mode) for user {user_id}. Headers: {request.headers}")
|
||||
|
||||
# 使用时间戳+原始文件名(保留原始名称,避免冲突)
|
||||
filename = "unknown_video.mp4" # Fallback
|
||||
content_type = "video/mp4"
|
||||
|
||||
# Try to parse filename from header if possible (unreliable in raw stream)
|
||||
# We will rely on post-processing or client hint
|
||||
# Frontend sends standard multipart.
|
||||
|
||||
# Create temp file
|
||||
timestamp = int(time.time())
|
||||
safe_name = sanitize_filename(file.filename)
|
||||
save_path = settings.UPLOAD_DIR / "materials" / f"{timestamp}_{safe_name}"
|
||||
|
||||
# Save file
|
||||
with open(save_path, "wb") as buffer:
|
||||
shutil.copyfileobj(file.file, buffer)
|
||||
|
||||
# Calculate size
|
||||
size_mb = save_path.stat().st_size / (1024 * 1024)
|
||||
|
||||
# 提取显示名称(去掉时间戳前缀)
|
||||
display_name = safe_name
|
||||
temp_filename = f"upload_{timestamp}.raw"
|
||||
temp_path = os.path.join("/tmp", temp_filename) # Use /tmp on Linux
|
||||
# Ensure /tmp exists (it does) but verify paths
|
||||
if os.name == 'nt': # Local dev
|
||||
temp_path = f"d:/tmp/{temp_filename}"
|
||||
os.makedirs("d:/tmp", exist_ok=True)
|
||||
|
||||
try:
|
||||
total_size = 0
|
||||
last_log = 0
|
||||
|
||||
return {
|
||||
"id": save_path.stem,
|
||||
"name": display_name,
|
||||
"path": f"uploads/materials/{save_path.name}",
|
||||
"size_mb": size_mb,
|
||||
"type": "video"
|
||||
}
|
||||
async with aiofiles.open(temp_path, 'wb') as f:
|
||||
async for chunk in request.stream():
|
||||
await f.write(chunk)
|
||||
total_size += len(chunk)
|
||||
|
||||
# Log progress every 20MB
|
||||
if total_size - last_log > 20 * 1024 * 1024:
|
||||
logger.info(f"Receiving stream... Processed {total_size / (1024*1024):.2f} MB")
|
||||
last_log = total_size
|
||||
|
||||
logger.info(f"Stream reception complete. Total size: {total_size} bytes. Saved to {temp_path}")
|
||||
|
||||
if total_size == 0:
|
||||
raise HTTPException(400, "Received empty body")
|
||||
|
||||
# Attempt to extract filename from the saved file's first bytes?
|
||||
# Or just accept it as "uploaded_video.mp4" for now to prove it works.
|
||||
# We can try to regex the header in the file content we just wrote.
|
||||
# Implemented in background task to return success immediately.
|
||||
|
||||
# Wait, if we return immediately, the user's UI might not show the file yet?
|
||||
# The prompt says "Wait for upload".
|
||||
# But to avoid User Waiting Timeout, maybe returning early is better?
|
||||
# NO, user expects the file to be in the list.
|
||||
# So we Must await the processing.
|
||||
# But "Processing" (Strip + Upload to Supabase) takes time.
|
||||
# Receiving took time.
|
||||
# If we await Supabase upload, does it timeout?
|
||||
# Supabase upload is outgoing. Usually faster/stable.
|
||||
|
||||
# Let's await the processing to ensure "List Materials" shows it.
|
||||
# We need to extract the filename for the list.
|
||||
|
||||
# Quick extract filename from first 4kb
|
||||
with open(temp_path, 'rb') as f:
|
||||
head = f.read(4096).decode('utf-8', errors='ignore')
|
||||
match = re.search(r'filename="([^"]+)"', head)
|
||||
if match:
|
||||
filename = match.group(1)
|
||||
logger.info(f"Extracted filename from body: {filename}")
|
||||
|
||||
# Run processing sync (in await)
|
||||
storage_path = await process_and_upload(temp_path, filename, content_type, user_id)
|
||||
|
||||
# Get signed URL (it exists now)
|
||||
signed_url = await storage_service.get_signed_url(
|
||||
bucket=storage_service.BUCKET_MATERIALS,
|
||||
path=storage_path
|
||||
)
|
||||
|
||||
size_mb = total_size / (1024 * 1024) # Approximate (includes headers)
|
||||
|
||||
# 从 storage_path 提取显示名
|
||||
display_name = storage_path.split('/')[-1] # 去掉 user_id 前缀
|
||||
if '_' in display_name:
|
||||
parts = display_name.split('_', 1)
|
||||
if parts[0].isdigit():
|
||||
display_name = parts[1]
|
||||
|
||||
return {
|
||||
"id": storage_path,
|
||||
"name": display_name,
|
||||
"path": signed_url,
|
||||
"size_mb": size_mb,
|
||||
"type": "video"
|
||||
}
|
||||
|
||||
except Exception as e:
|
||||
error_msg = f"Streaming upload failed: {str(e)}"
|
||||
detail_msg = f"Exception: {repr(e)}\nArgs: {e.args}\n{traceback.format_exc()}"
|
||||
logger.error(error_msg + "\n" + detail_msg)
|
||||
|
||||
# Write to debug file
|
||||
try:
|
||||
with open("debug_upload.log", "a") as logf:
|
||||
logf.write(f"\n--- Error at {time.ctime()} ---\n")
|
||||
logf.write(detail_msg)
|
||||
logf.write("\n-----------------------------\n")
|
||||
except:
|
||||
pass
|
||||
|
||||
if os.path.exists(temp_path):
|
||||
try:
|
||||
os.remove(temp_path)
|
||||
except:
|
||||
pass
|
||||
raise HTTPException(500, f"Upload failed. Check server logs. Error: {str(e)}")
|
||||
|
||||
|
||||
@router.get("")
|
||||
async def list_materials():
|
||||
materials_dir = settings.UPLOAD_DIR / "materials"
|
||||
files = []
|
||||
if materials_dir.exists():
|
||||
for f in materials_dir.glob("*"):
|
||||
try:
|
||||
stat = f.stat()
|
||||
# 提取显示名称:去掉时间戳前缀 (格式: {timestamp}_{原始文件名})
|
||||
display_name = f.name
|
||||
if '_' in f.name:
|
||||
parts = f.name.split('_', 1)
|
||||
if parts[0].isdigit():
|
||||
display_name = parts[1] # 原始文件名
|
||||
|
||||
files.append({
|
||||
"id": f.stem,
|
||||
"name": display_name,
|
||||
"path": f"uploads/materials/{f.name}",
|
||||
"size_mb": stat.st_size / (1024 * 1024),
|
||||
"type": "video",
|
||||
"created_at": stat.st_ctime
|
||||
})
|
||||
except Exception:
|
||||
continue
|
||||
# Sort by creation time desc
|
||||
files.sort(key=lambda x: x.get("created_at", 0), reverse=True)
|
||||
return {"materials": files}
|
||||
|
||||
|
||||
@router.delete("/{material_id}")
|
||||
async def delete_material(material_id: str):
|
||||
"""删除素材文件"""
|
||||
materials_dir = settings.UPLOAD_DIR / "materials"
|
||||
|
||||
# 查找匹配的文件(ID 是文件名不含扩展名)
|
||||
found = None
|
||||
for f in materials_dir.glob("*"):
|
||||
if f.stem == material_id:
|
||||
found = f
|
||||
break
|
||||
|
||||
if not found:
|
||||
raise HTTPException(404, "Material not found")
|
||||
|
||||
async def list_materials(current_user: dict = Depends(get_current_user)):
|
||||
user_id = current_user["id"]
|
||||
try:
|
||||
found.unlink()
|
||||
# 只列出当前用户目录下的文件
|
||||
files_obj = await storage_service.list_files(
|
||||
bucket=storage_service.BUCKET_MATERIALS,
|
||||
path=user_id
|
||||
)
|
||||
materials = []
|
||||
for f in files_obj:
|
||||
name = f.get('name')
|
||||
if not name or name == '.emptyFolderPlaceholder':
|
||||
continue
|
||||
display_name = name
|
||||
if '_' in name:
|
||||
parts = name.split('_', 1)
|
||||
if parts[0].isdigit():
|
||||
display_name = parts[1]
|
||||
# 完整路径包含 user_id
|
||||
full_path = f"{user_id}/{name}"
|
||||
signed_url = await storage_service.get_signed_url(
|
||||
bucket=storage_service.BUCKET_MATERIALS,
|
||||
path=full_path
|
||||
)
|
||||
metadata = f.get('metadata', {})
|
||||
size = metadata.get('size', 0)
|
||||
# created_at 在顶层,是 ISO 字符串
|
||||
created_at_str = f.get('created_at', '')
|
||||
created_at = 0
|
||||
if created_at_str:
|
||||
from datetime import datetime
|
||||
try:
|
||||
dt = datetime.fromisoformat(created_at_str.replace('Z', '+00:00'))
|
||||
created_at = int(dt.timestamp())
|
||||
except:
|
||||
pass
|
||||
materials.append({
|
||||
"id": full_path, # ID 使用完整路径
|
||||
"name": display_name,
|
||||
"path": signed_url,
|
||||
"size_mb": size / (1024 * 1024),
|
||||
"type": "video",
|
||||
"created_at": created_at
|
||||
})
|
||||
materials.sort(key=lambda x: x['id'], reverse=True)
|
||||
return {"materials": materials}
|
||||
except Exception as e:
|
||||
logger.error(f"List materials failed: {e}")
|
||||
return {"materials": []}
|
||||
|
||||
|
||||
@router.delete("/{material_id:path}")
|
||||
async def delete_material(material_id: str, current_user: dict = Depends(get_current_user)):
|
||||
user_id = current_user["id"]
|
||||
# 验证 material_id 属于当前用户
|
||||
if not material_id.startswith(f"{user_id}/"):
|
||||
raise HTTPException(403, "无权删除此素材")
|
||||
try:
|
||||
await storage_service.delete_file(
|
||||
bucket=storage_service.BUCKET_MATERIALS,
|
||||
path=material_id
|
||||
)
|
||||
return {"success": True, "message": "素材已删除"}
|
||||
except Exception as e:
|
||||
raise HTTPException(500, f"删除失败: {str(e)}")
|
||||
|
||||
|
||||
@@ -1,14 +1,19 @@
|
||||
from fastapi import APIRouter, HTTPException, BackgroundTasks
|
||||
from fastapi import APIRouter, HTTPException, BackgroundTasks, Depends, Request
|
||||
from pydantic import BaseModel
|
||||
from typing import Optional
|
||||
from pathlib import Path
|
||||
from loguru import logger
|
||||
import uuid
|
||||
import traceback
|
||||
import time
|
||||
import httpx
|
||||
import os
|
||||
from app.services.tts_service import TTSService
|
||||
from app.services.video_service import VideoService
|
||||
from app.services.lipsync_service import LipSyncService
|
||||
from app.services.storage import storage_service
|
||||
from app.core.config import settings
|
||||
from app.core.deps import get_current_user
|
||||
|
||||
router = APIRouter()
|
||||
|
||||
@@ -47,42 +52,73 @@ async def _check_lipsync_ready(force: bool = False) -> bool:
|
||||
print(f"[LipSync] Health check: ready={_lipsync_ready}")
|
||||
return _lipsync_ready
|
||||
|
||||
async def _process_video_generation(task_id: str, req: GenerateRequest):
|
||||
async def _download_material(path_or_url: str, temp_path: Path):
|
||||
"""下载素材到临时文件 (流式下载,节省内存)"""
|
||||
if path_or_url.startswith("http"):
|
||||
# Download from URL
|
||||
timeout = httpx.Timeout(None) # Disable timeout for large files
|
||||
async with httpx.AsyncClient(timeout=timeout) as client:
|
||||
async with client.stream("GET", path_or_url) as resp:
|
||||
resp.raise_for_status()
|
||||
with open(temp_path, "wb") as f:
|
||||
async for chunk in resp.aiter_bytes():
|
||||
f.write(chunk)
|
||||
else:
|
||||
# Local file (legacy or absolute path)
|
||||
src = Path(path_or_url)
|
||||
if not src.is_absolute():
|
||||
src = settings.BASE_DIR.parent / path_or_url
|
||||
|
||||
if src.exists():
|
||||
import shutil
|
||||
shutil.copy(src, temp_path)
|
||||
else:
|
||||
raise FileNotFoundError(f"Material not found: {path_or_url}")
|
||||
|
||||
async def _process_video_generation(task_id: str, req: GenerateRequest, user_id: str):
|
||||
temp_files = [] # Track files to clean up
|
||||
try:
|
||||
start_time = time.time()
|
||||
|
||||
# Resolve path if it's relative
|
||||
input_material_path = Path(req.material_path)
|
||||
if not input_material_path.is_absolute():
|
||||
input_material_path = settings.BASE_DIR.parent / req.material_path
|
||||
|
||||
|
||||
tasks[task_id]["status"] = "processing"
|
||||
tasks[task_id]["progress"] = 5
|
||||
tasks[task_id]["message"] = "正在初始化..."
|
||||
|
||||
tasks[task_id]["message"] = "正在下载素材..."
|
||||
|
||||
# Prepare temp dir
|
||||
temp_dir = settings.UPLOAD_DIR / "temp"
|
||||
temp_dir.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
# 0. Download Material
|
||||
input_material_path = temp_dir / f"{task_id}_input.mp4"
|
||||
temp_files.append(input_material_path)
|
||||
|
||||
await _download_material(req.material_path, input_material_path)
|
||||
|
||||
# 1. TTS - 进度 5% -> 25%
|
||||
tasks[task_id]["message"] = "正在生成语音 (TTS)..."
|
||||
tasks[task_id]["progress"] = 10
|
||||
|
||||
|
||||
tts = TTSService()
|
||||
audio_path = settings.OUTPUT_DIR / f"{task_id}_audio.mp3"
|
||||
audio_path = temp_dir / f"{task_id}_audio.mp3"
|
||||
temp_files.append(audio_path)
|
||||
await tts.generate_audio(req.text, req.voice, str(audio_path))
|
||||
|
||||
|
||||
tts_time = time.time() - start_time
|
||||
print(f"[Pipeline] TTS completed in {tts_time:.1f}s")
|
||||
tasks[task_id]["progress"] = 25
|
||||
|
||||
|
||||
# 2. LipSync - 进度 25% -> 85%
|
||||
tasks[task_id]["message"] = "正在合成唇形 (LatentSync)..."
|
||||
tasks[task_id]["progress"] = 30
|
||||
|
||||
|
||||
lipsync = _get_lipsync_service()
|
||||
lipsync_video_path = settings.OUTPUT_DIR / f"{task_id}_lipsync.mp4"
|
||||
|
||||
lipsync_video_path = temp_dir / f"{task_id}_lipsync.mp4"
|
||||
temp_files.append(lipsync_video_path)
|
||||
|
||||
# 使用缓存的健康检查结果
|
||||
lipsync_start = time.time()
|
||||
is_ready = await _check_lipsync_ready()
|
||||
|
||||
|
||||
if is_ready:
|
||||
print(f"[LipSync] Starting LatentSync inference...")
|
||||
tasks[task_id]["progress"] = 35
|
||||
@@ -98,34 +134,72 @@ async def _process_video_generation(task_id: str, req: GenerateRequest):
|
||||
lipsync_time = time.time() - lipsync_start
|
||||
print(f"[Pipeline] LipSync completed in {lipsync_time:.1f}s")
|
||||
tasks[task_id]["progress"] = 85
|
||||
|
||||
|
||||
# 3. Composition - 进度 85% -> 100%
|
||||
tasks[task_id]["message"] = "正在合成最终视频..."
|
||||
tasks[task_id]["progress"] = 90
|
||||
|
||||
|
||||
video = VideoService()
|
||||
final_output = settings.OUTPUT_DIR / f"{task_id}_output.mp4"
|
||||
await video.compose(str(lipsync_video_path), str(audio_path), str(final_output))
|
||||
|
||||
final_output_local_path = temp_dir / f"{task_id}_output.mp4"
|
||||
temp_files.append(final_output_local_path)
|
||||
|
||||
await video.compose(str(lipsync_video_path), str(audio_path), str(final_output_local_path))
|
||||
|
||||
total_time = time.time() - start_time
|
||||
|
||||
# 4. Upload to Supabase with user isolation
|
||||
tasks[task_id]["message"] = "正在上传结果..."
|
||||
tasks[task_id]["progress"] = 95
|
||||
|
||||
# 使用 user_id 作为目录前缀实现隔离
|
||||
storage_path = f"{user_id}/{task_id}_output.mp4"
|
||||
with open(final_output_local_path, "rb") as f:
|
||||
file_data = f.read()
|
||||
await storage_service.upload_file(
|
||||
bucket=storage_service.BUCKET_OUTPUTS,
|
||||
path=storage_path,
|
||||
file_data=file_data,
|
||||
content_type="video/mp4"
|
||||
)
|
||||
|
||||
# Get Signed URL
|
||||
signed_url = await storage_service.get_signed_url(
|
||||
bucket=storage_service.BUCKET_OUTPUTS,
|
||||
path=storage_path
|
||||
)
|
||||
|
||||
print(f"[Pipeline] Total generation time: {total_time:.1f}s")
|
||||
|
||||
|
||||
tasks[task_id]["status"] = "completed"
|
||||
tasks[task_id]["progress"] = 100
|
||||
tasks[task_id]["message"] = f"生成完成!耗时 {total_time:.0f} 秒"
|
||||
tasks[task_id]["output"] = str(final_output)
|
||||
tasks[task_id]["download_url"] = f"/outputs/{final_output.name}"
|
||||
tasks[task_id]["output"] = storage_path
|
||||
tasks[task_id]["download_url"] = signed_url
|
||||
|
||||
except Exception as e:
|
||||
tasks[task_id]["status"] = "failed"
|
||||
tasks[task_id]["message"] = f"错误: {str(e)}"
|
||||
tasks[task_id]["error"] = traceback.format_exc()
|
||||
logger.error(f"Generate video failed: {e}")
|
||||
finally:
|
||||
# Cleanup temp files
|
||||
for f in temp_files:
|
||||
try:
|
||||
if f.exists():
|
||||
f.unlink()
|
||||
except Exception as e:
|
||||
print(f"Error cleaning up {f}: {e}")
|
||||
|
||||
@router.post("/generate")
|
||||
async def generate_video(req: GenerateRequest, background_tasks: BackgroundTasks):
|
||||
async def generate_video(
|
||||
req: GenerateRequest,
|
||||
background_tasks: BackgroundTasks,
|
||||
current_user: dict = Depends(get_current_user)
|
||||
):
|
||||
user_id = current_user["id"]
|
||||
task_id = str(uuid.uuid4())
|
||||
tasks[task_id] = {"status": "pending", "task_id": task_id, "progress": 0}
|
||||
background_tasks.add_task(_process_video_generation, task_id, req)
|
||||
tasks[task_id] = {"status": "pending", "task_id": task_id, "progress": 0, "user_id": user_id}
|
||||
background_tasks.add_task(_process_video_generation, task_id, req, user_id)
|
||||
return {"task_id": task_id}
|
||||
|
||||
@router.get("/tasks/{task_id}")
|
||||
@@ -144,54 +218,81 @@ async def lipsync_health():
|
||||
|
||||
|
||||
@router.get("/generated")
|
||||
async def list_generated_videos():
|
||||
"""从文件系统读取生成的视频列表(持久化)"""
|
||||
output_dir = settings.OUTPUT_DIR
|
||||
videos = []
|
||||
|
||||
if output_dir.exists():
|
||||
for f in output_dir.glob("*_output.mp4"):
|
||||
try:
|
||||
stat = f.stat()
|
||||
videos.append({
|
||||
"id": f.stem,
|
||||
"name": f.name,
|
||||
"path": f"/outputs/{f.name}",
|
||||
"size_mb": stat.st_size / (1024 * 1024),
|
||||
"created_at": stat.st_ctime
|
||||
})
|
||||
except Exception:
|
||||
async def list_generated_videos(current_user: dict = Depends(get_current_user)):
|
||||
"""从 Storage 读取当前用户生成的视频列表"""
|
||||
user_id = current_user["id"]
|
||||
try:
|
||||
# 只列出当前用户目录下的文件
|
||||
files_obj = await storage_service.list_files(
|
||||
bucket=storage_service.BUCKET_OUTPUTS,
|
||||
path=user_id
|
||||
)
|
||||
|
||||
videos = []
|
||||
for f in files_obj:
|
||||
name = f.get('name')
|
||||
if not name or name == '.emptyFolderPlaceholder':
|
||||
continue
|
||||
|
||||
# Sort by creation time desc (newest first)
|
||||
videos.sort(key=lambda x: x.get("created_at", 0), reverse=True)
|
||||
return {"videos": videos}
|
||||
|
||||
# 过滤非 output.mp4 文件
|
||||
if not name.endswith("_output.mp4"):
|
||||
continue
|
||||
|
||||
# 获取 ID (即文件名去除后缀)
|
||||
video_id = Path(name).stem
|
||||
|
||||
# 完整路径包含 user_id
|
||||
full_path = f"{user_id}/{name}"
|
||||
|
||||
# 获取签名链接
|
||||
signed_url = await storage_service.get_signed_url(
|
||||
bucket=storage_service.BUCKET_OUTPUTS,
|
||||
path=full_path
|
||||
)
|
||||
|
||||
metadata = f.get('metadata', {})
|
||||
size = metadata.get('size', 0)
|
||||
# created_at 在顶层,是 ISO 字符串,转换为 Unix 时间戳
|
||||
created_at_str = f.get('created_at', '')
|
||||
created_at = 0
|
||||
if created_at_str:
|
||||
from datetime import datetime
|
||||
try:
|
||||
dt = datetime.fromisoformat(created_at_str.replace('Z', '+00:00'))
|
||||
created_at = int(dt.timestamp())
|
||||
except:
|
||||
pass
|
||||
|
||||
videos.append({
|
||||
"id": video_id,
|
||||
"name": name,
|
||||
"path": signed_url, # Direct playable URL
|
||||
"size_mb": size / (1024 * 1024),
|
||||
"created_at": created_at
|
||||
})
|
||||
|
||||
# Sort by created_at desc (newest first)
|
||||
# Supabase API usually returns ISO string, simpler string sort works for ISO
|
||||
videos.sort(key=lambda x: x.get("created_at", ""), reverse=True)
|
||||
return {"videos": videos}
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"List generated videos failed: {e}")
|
||||
return {"videos": []}
|
||||
|
||||
|
||||
@router.delete("/generated/{video_id}")
|
||||
async def delete_generated_video(video_id: str):
|
||||
async def delete_generated_video(video_id: str, current_user: dict = Depends(get_current_user)):
|
||||
"""删除生成的视频"""
|
||||
output_dir = settings.OUTPUT_DIR
|
||||
|
||||
# 查找匹配的文件
|
||||
found = None
|
||||
for f in output_dir.glob("*.mp4"):
|
||||
if f.stem == video_id:
|
||||
found = f
|
||||
break
|
||||
|
||||
if not found:
|
||||
raise HTTPException(404, "Video not found")
|
||||
|
||||
user_id = current_user["id"]
|
||||
try:
|
||||
found.unlink()
|
||||
# 同时删除相关的临时文件(如果存在)
|
||||
task_id = video_id.replace("_output", "")
|
||||
for suffix in ["_audio.mp3", "_lipsync.mp4"]:
|
||||
temp_file = output_dir / f"{task_id}{suffix}"
|
||||
if temp_file.exists():
|
||||
temp_file.unlink()
|
||||
|
||||
# video_id 通常是 uuid_output,完整路径需要加上 user_id
|
||||
storage_path = f"{user_id}/{video_id}.mp4"
|
||||
|
||||
await storage_service.delete_file(
|
||||
bucket=storage_service.BUCKET_OUTPUTS,
|
||||
path=storage_path
|
||||
)
|
||||
return {"success": True, "message": "视频已删除"}
|
||||
except Exception as e:
|
||||
raise HTTPException(500, f"删除失败: {str(e)}")
|
||||
|
||||
@@ -28,6 +28,7 @@ class Settings(BaseSettings):
|
||||
|
||||
# Supabase 配置
|
||||
SUPABASE_URL: str = ""
|
||||
SUPABASE_PUBLIC_URL: str = "" # 公网访问地址,用于生成前端可访问的 URL
|
||||
SUPABASE_KEY: str = ""
|
||||
|
||||
# JWT 配置
|
||||
|
||||
@@ -10,6 +10,28 @@ settings = config.settings
|
||||
|
||||
app = FastAPI(title="ViGent TalkingHead Agent")
|
||||
|
||||
from fastapi import Request
|
||||
from starlette.middleware.base import BaseHTTPMiddleware
|
||||
import time
|
||||
import traceback
|
||||
|
||||
class LoggingMiddleware(BaseHTTPMiddleware):
|
||||
async def dispatch(self, request: Request, call_next):
|
||||
start_time = time.time()
|
||||
logger.info(f"START Request: {request.method} {request.url}")
|
||||
logger.info(f"HEADERS: {dict(request.headers)}")
|
||||
try:
|
||||
response = await call_next(request)
|
||||
process_time = time.time() - start_time
|
||||
logger.info(f"END Request: {request.method} {request.url} - Status: {response.status_code} - Duration: {process_time:.2f}s")
|
||||
return response
|
||||
except Exception as e:
|
||||
process_time = time.time() - start_time
|
||||
logger.error(f"EXCEPTION during request {request.method} {request.url}: {str(e)}\n{traceback.format_exc()}")
|
||||
raise e
|
||||
|
||||
app.add_middleware(LoggingMiddleware)
|
||||
|
||||
app.add_middleware(
|
||||
CORSMiddleware,
|
||||
allow_origins=["*"],
|
||||
|
||||
@@ -2,12 +2,17 @@
|
||||
发布服务 (支持用户隔离)
|
||||
"""
|
||||
import json
|
||||
import os
|
||||
import re
|
||||
import tempfile
|
||||
import httpx
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
from typing import Optional, List, Dict, Any
|
||||
from loguru import logger
|
||||
from app.core.config import settings
|
||||
from app.core.paths import get_user_cookie_dir, get_platform_cookie_path, get_legacy_cookie_dir, get_legacy_cookie_path
|
||||
from app.services.storage import storage_service
|
||||
|
||||
# Import platform uploaders
|
||||
from .uploader.bilibili_uploader import BilibiliUploader
|
||||
@@ -17,7 +22,7 @@ from .uploader.xiaohongshu_uploader import XiaohongshuUploader
|
||||
|
||||
class PublishService:
|
||||
"""Social media publishing service (with user isolation)"""
|
||||
|
||||
|
||||
# 支持的平台配置
|
||||
PLATFORMS: Dict[str, Dict[str, Any]] = {
|
||||
"bilibili": {"name": "B站", "url": "https://member.bilibili.com/platform/upload/video/frame", "enabled": True},
|
||||
@@ -113,13 +118,56 @@ class PublishService:
|
||||
logger.info(f"[发布] 视频: {video_path}")
|
||||
logger.info(f"[发布] 标题: {title}")
|
||||
logger.info(f"[发布] 用户: {user_id or 'legacy'}")
|
||||
|
||||
|
||||
temp_file = None
|
||||
try:
|
||||
# 处理视频路径
|
||||
if video_path.startswith('http://') or video_path.startswith('https://'):
|
||||
# 尝试从 URL 解析 bucket 和 path,直接使用本地文件
|
||||
local_video_path = None
|
||||
|
||||
# URL 格式: .../storage/v1/object/sign/{bucket}/{path}?token=...
|
||||
match = re.search(r'/storage/v1/object/sign/([^/]+)/(.+?)\?', video_path)
|
||||
if match:
|
||||
bucket = match.group(1)
|
||||
storage_path = match.group(2)
|
||||
logger.info(f"[发布] 解析 URL: bucket={bucket}, path={storage_path}")
|
||||
|
||||
# 尝试获取本地文件路径
|
||||
local_video_path = storage_service.get_local_file_path(bucket, storage_path)
|
||||
|
||||
if local_video_path and os.path.exists(local_video_path):
|
||||
logger.info(f"[发布] 直接使用本地文件: {local_video_path}")
|
||||
else:
|
||||
# 本地文件不存在,通过 HTTP 下载
|
||||
logger.info(f"[发布] 本地文件不存在,通过 HTTP 下载...")
|
||||
temp_file = tempfile.NamedTemporaryFile(delete=False, suffix='.mp4')
|
||||
temp_file.close()
|
||||
|
||||
# 将公网 URL 替换为内网 URL
|
||||
download_url = video_path
|
||||
if settings.SUPABASE_PUBLIC_URL and settings.SUPABASE_URL:
|
||||
public_url = settings.SUPABASE_PUBLIC_URL.rstrip('/')
|
||||
internal_url = settings.SUPABASE_URL.rstrip('/')
|
||||
download_url = video_path.replace(public_url, internal_url)
|
||||
|
||||
async with httpx.AsyncClient(timeout=httpx.Timeout(None)) as client:
|
||||
async with client.stream("GET", download_url) as resp:
|
||||
resp.raise_for_status()
|
||||
with open(temp_file.name, 'wb') as f:
|
||||
async for chunk in resp.aiter_bytes():
|
||||
f.write(chunk)
|
||||
local_video_path = temp_file.name
|
||||
logger.info(f"[发布] 视频已下载到: {local_video_path}")
|
||||
else:
|
||||
# 本地相对路径
|
||||
local_video_path = str(settings.BASE_DIR.parent / video_path)
|
||||
|
||||
# Select appropriate uploader
|
||||
if platform == "bilibili":
|
||||
uploader = BilibiliUploader(
|
||||
title=title,
|
||||
file_path=str(settings.BASE_DIR.parent / video_path),
|
||||
file_path=local_video_path,
|
||||
tags=tags,
|
||||
publish_date=publish_time,
|
||||
account_file=str(account_file),
|
||||
@@ -130,7 +178,7 @@ class PublishService:
|
||||
elif platform == "douyin":
|
||||
uploader = DouyinUploader(
|
||||
title=title,
|
||||
file_path=str(settings.BASE_DIR.parent / video_path),
|
||||
file_path=local_video_path,
|
||||
tags=tags,
|
||||
publish_date=publish_time,
|
||||
account_file=str(account_file),
|
||||
@@ -139,7 +187,7 @@ class PublishService:
|
||||
elif platform == "xiaohongshu":
|
||||
uploader = XiaohongshuUploader(
|
||||
title=title,
|
||||
file_path=str(settings.BASE_DIR.parent / video_path),
|
||||
file_path=local_video_path,
|
||||
tags=tags,
|
||||
publish_date=publish_time,
|
||||
account_file=str(account_file),
|
||||
@@ -157,7 +205,7 @@ class PublishService:
|
||||
result = await uploader.main()
|
||||
result['platform'] = platform
|
||||
return result
|
||||
|
||||
|
||||
except Exception as e:
|
||||
logger.exception(f"[发布] 上传异常: {e}")
|
||||
return {
|
||||
@@ -165,6 +213,14 @@ class PublishService:
|
||||
"message": f"上传异常: {str(e)}",
|
||||
"platform": platform
|
||||
}
|
||||
finally:
|
||||
# 清理临时文件
|
||||
if temp_file and os.path.exists(temp_file.name):
|
||||
try:
|
||||
os.remove(temp_file.name)
|
||||
logger.info(f"[发布] 已清理临时文件: {temp_file.name}")
|
||||
except Exception as e:
|
||||
logger.warning(f"[发布] 清理临时文件失败: {e}")
|
||||
|
||||
async def login(self, platform: str, user_id: Optional[str] = None) -> Dict[str, Any]:
|
||||
"""
|
||||
|
||||
148
backend/app/services/storage.py
Normal file
148
backend/app/services/storage.py
Normal file
@@ -0,0 +1,148 @@
|
||||
from supabase import Client
|
||||
from app.core.supabase import get_supabase
|
||||
from app.core.config import settings
|
||||
from loguru import logger
|
||||
from typing import Optional, Union, Dict, List, Any
|
||||
from pathlib import Path
|
||||
import asyncio
|
||||
import functools
|
||||
import os
|
||||
|
||||
# Supabase Storage 本地存储根目录
|
||||
SUPABASE_STORAGE_LOCAL_PATH = Path("/home/rongye/ProgramFiles/Supabase/volumes/storage/stub/stub")
|
||||
|
||||
class StorageService:
|
||||
def __init__(self):
|
||||
self.supabase: Client = get_supabase()
|
||||
self.BUCKET_MATERIALS = "materials"
|
||||
self.BUCKET_OUTPUTS = "outputs"
|
||||
|
||||
def _convert_to_public_url(self, url: str) -> str:
|
||||
"""将内部 URL 转换为公网可访问的 URL"""
|
||||
if settings.SUPABASE_PUBLIC_URL and settings.SUPABASE_URL:
|
||||
# 去掉末尾斜杠进行替换
|
||||
internal_url = settings.SUPABASE_URL.rstrip('/')
|
||||
public_url = settings.SUPABASE_PUBLIC_URL.rstrip('/')
|
||||
return url.replace(internal_url, public_url)
|
||||
return url
|
||||
|
||||
def get_local_file_path(self, bucket: str, path: str) -> Optional[str]:
|
||||
"""
|
||||
获取 Storage 文件的本地磁盘路径
|
||||
|
||||
Supabase Storage 文件存储结构:
|
||||
{STORAGE_ROOT}/{bucket}/{path}/{internal_uuid}
|
||||
|
||||
Returns:
|
||||
本地文件路径,如果不存在返回 None
|
||||
"""
|
||||
try:
|
||||
# 构建目录路径
|
||||
dir_path = SUPABASE_STORAGE_LOCAL_PATH / bucket / path
|
||||
|
||||
if not dir_path.exists():
|
||||
logger.warning(f"Storage 目录不存在: {dir_path}")
|
||||
return None
|
||||
|
||||
# 目录下只有一个文件(internal_uuid)
|
||||
files = list(dir_path.iterdir())
|
||||
if not files:
|
||||
logger.warning(f"Storage 目录为空: {dir_path}")
|
||||
return None
|
||||
|
||||
local_path = str(files[0])
|
||||
logger.info(f"获取本地文件路径: {local_path}")
|
||||
return local_path
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"获取本地文件路径失败: {e}")
|
||||
return None
|
||||
|
||||
async def upload_file(self, bucket: str, path: str, file_data: bytes, content_type: str) -> str:
|
||||
"""
|
||||
异步上传文件到 Supabase Storage
|
||||
"""
|
||||
try:
|
||||
# 运行在线程池中,避免阻塞事件循环
|
||||
loop = asyncio.get_running_loop()
|
||||
await loop.run_in_executor(
|
||||
None,
|
||||
functools.partial(
|
||||
self.supabase.storage.from_(bucket).upload,
|
||||
path=path,
|
||||
file=file_data,
|
||||
file_options={"content-type": content_type, "upsert": "true"}
|
||||
)
|
||||
)
|
||||
logger.info(f"Storage upload success: {path}")
|
||||
return path
|
||||
except Exception as e:
|
||||
logger.error(f"Storage upload failed: {e}")
|
||||
raise e
|
||||
|
||||
async def get_signed_url(self, bucket: str, path: str, expires_in: int = 3600) -> str:
|
||||
"""异步获取签名访问链接"""
|
||||
try:
|
||||
loop = asyncio.get_running_loop()
|
||||
res = await loop.run_in_executor(
|
||||
None,
|
||||
lambda: self.supabase.storage.from_(bucket).create_signed_url(path, expires_in)
|
||||
)
|
||||
|
||||
# 兼容处理
|
||||
url = ""
|
||||
if isinstance(res, dict) and "signedURL" in res:
|
||||
url = res["signedURL"]
|
||||
elif isinstance(res, str):
|
||||
url = res
|
||||
else:
|
||||
logger.warning(f"Unexpected signed_url response: {res}")
|
||||
url = res.get("signedURL", "") if isinstance(res, dict) else str(res)
|
||||
|
||||
# 转换为公网可访问的 URL
|
||||
return self._convert_to_public_url(url)
|
||||
except Exception as e:
|
||||
logger.error(f"Get signed URL failed: {e}")
|
||||
return ""
|
||||
|
||||
async def get_public_url(self, bucket: str, path: str) -> str:
|
||||
"""获取公开访问链接"""
|
||||
try:
|
||||
loop = asyncio.get_running_loop()
|
||||
res = await loop.run_in_executor(
|
||||
None,
|
||||
lambda: self.supabase.storage.from_(bucket).get_public_url(path)
|
||||
)
|
||||
# 转换为公网可访问的 URL
|
||||
return self._convert_to_public_url(res)
|
||||
except Exception as e:
|
||||
logger.error(f"Get public URL failed: {e}")
|
||||
return ""
|
||||
|
||||
async def delete_file(self, bucket: str, path: str):
|
||||
"""异步删除文件"""
|
||||
try:
|
||||
loop = asyncio.get_running_loop()
|
||||
await loop.run_in_executor(
|
||||
None,
|
||||
lambda: self.supabase.storage.from_(bucket).remove([path])
|
||||
)
|
||||
logger.info(f"Deleted file: {bucket}/{path}")
|
||||
except Exception as e:
|
||||
logger.error(f"Delete file failed: {e}")
|
||||
pass
|
||||
|
||||
async def list_files(self, bucket: str, path: str) -> List[Any]:
|
||||
"""异步列出文件"""
|
||||
try:
|
||||
loop = asyncio.get_running_loop()
|
||||
res = await loop.run_in_executor(
|
||||
None,
|
||||
lambda: self.supabase.storage.from_(bucket).list(path)
|
||||
)
|
||||
return res or []
|
||||
except Exception as e:
|
||||
logger.error(f"List files failed: {e}")
|
||||
return []
|
||||
|
||||
storage_service = StorageService()
|
||||
Reference in New Issue
Block a user