This commit is contained in:
Kevin Wong
2026-02-09 14:47:19 +08:00
parent e226224119
commit 3129d45b25
23 changed files with 1529 additions and 294 deletions

View File

@@ -24,6 +24,33 @@ class GenerateMetaResponse(BaseModel):
tags: list[str]
class TranslateRequest(BaseModel):
"""翻译请求"""
text: str
target_lang: str
@router.post("/translate")
async def translate_text(req: TranslateRequest):
"""
AI 翻译文案
将文案翻译为指定目标语言
"""
if not req.text or not req.text.strip():
raise HTTPException(status_code=400, detail="文案不能为空")
if not req.target_lang or not req.target_lang.strip():
raise HTTPException(status_code=400, detail="目标语言不能为空")
try:
logger.info(f"Translating text to {req.target_lang}: {req.text[:50]}...")
translated = await glm_service.translate_text(req.text.strip(), req.target_lang.strip())
return success_response({"translated_text": translated})
except Exception as e:
logger.error(f"Translate failed: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.post("/generate-meta")
async def generate_meta(req: GenerateMetaRequest):
"""

View File

@@ -1,14 +1,16 @@
from pydantic import BaseModel
from typing import Optional
from typing import Optional, List
class GenerateRequest(BaseModel):
text: str
voice: str = "zh-CN-YunxiNeural"
material_path: str
material_paths: Optional[List[str]] = None
tts_mode: str = "edgetts"
ref_audio_id: Optional[str] = None
ref_text: Optional[str] = None
language: str = "zh-CN"
title: Optional[str] = None
enable_subtitles: bool = True
subtitle_style_id: Optional[str] = None

View File

@@ -1,4 +1,4 @@
from typing import Optional, Any
from typing import Optional, Any, List
from pathlib import Path
import time
import traceback
@@ -24,6 +24,17 @@ from .schemas import GenerateRequest
from .task_store import task_store
def _locale_to_whisper_lang(locale: str) -> str:
"""'en-US''en', 'zh-CN''zh'"""
return locale.split("-")[0] if "-" in locale else locale
def _locale_to_qwen_lang(locale: str) -> str:
"""'zh-CN''Chinese', 'en-US''English', 其他 → 'Auto'"""
mapping = {"zh": "Chinese", "en": "English"}
return mapping.get(locale.split("-")[0], "Auto")
_lipsync_service: Optional[LipSyncService] = None
_lipsync_ready: Optional[bool] = None
_lipsync_last_check: float = 0
@@ -79,19 +90,107 @@ def _update_task(task_id: str, **updates: Any) -> None:
task_store.update(task_id, updates)
# ── 多素材辅助函数 ──
def _split_equal(segments: List[dict], material_paths: List[str]) -> List[dict]:
"""按素材数量均分音频时长,对齐到最近的 Whisper 字边界。
Args:
segments: Whisper 产出的 segment 列表, 每个包含 words (字级时间戳)
material_paths: 素材路径列表
Returns:
[{"material_path": "...", "start": 0.0, "end": 5.2, "index": 0}, ...]
"""
# 展平所有 Whisper 字符
all_chars: List[dict] = []
for seg in segments:
for w in seg.get("words", []):
all_chars.append(w)
n = len(material_paths)
if not all_chars or n == 0:
return [{"material_path": material_paths[0] if material_paths else "",
"start": 0.0, "end": 99999.0, "index": 0}]
# 素材数不能超过字符数,否则边界会重复
if n > len(all_chars):
logger.warning(f"[MultiMat] 素材数({n}) > 字符数({len(all_chars)}),裁剪为 {len(all_chars)}")
n = len(all_chars)
total_start = all_chars[0]["start"]
total_end = all_chars[-1]["end"]
seg_dur = (total_end - total_start) / n
# 计算 N-1 个分割点,对齐到最近的字边界
boundaries = [0] # 第一段从第 0 个字开始
for i in range(1, n):
target_time = total_start + i * seg_dur
# 找到 start 时间最接近 target_time 的字
best_idx = boundaries[-1] + 1 # 至少比上一个边界后移 1
best_diff = float("inf")
for j in range(boundaries[-1] + 1, len(all_chars)):
diff = abs(all_chars[j]["start"] - target_time)
if diff < best_diff:
best_diff = diff
best_idx = j
elif diff > best_diff:
break # 时间递增,差值开始变大后可以停了
boundaries.append(min(best_idx, len(all_chars) - 1))
boundaries.append(len(all_chars)) # 最后一段到末尾
# 按边界生成分配结果
assignments: List[dict] = []
for i in range(n):
s_idx = boundaries[i]
e_idx = boundaries[i + 1]
if s_idx >= len(all_chars) or s_idx >= e_idx:
continue
assignments.append({
"material_path": material_paths[i],
"start": all_chars[s_idx]["start"],
"end": all_chars[e_idx - 1]["end"],
"text": "".join(c["word"] for c in all_chars[s_idx:e_idx]),
"index": len(assignments),
})
if not assignments:
return [{"material_path": material_paths[0], "start": 0.0, "end": 99999.0, "index": 0}]
logger.info(f"[MultiMat] 均分 {len(all_chars)} 字为 {len(assignments)}")
for a in assignments:
dur = a["end"] - a["start"]
logger.info(f"{a['index']}: [{a['start']:.2f}-{a['end']:.2f}s] ({dur:.1f}s) {a['text'][:20]}")
return assignments
async def process_video_generation(task_id: str, req: GenerateRequest, user_id: str):
temp_files = []
try:
start_time = time.time()
# ── 确定素材列表 ──
material_paths: List[str] = []
if req.material_paths and len(req.material_paths) > 1:
material_paths = req.material_paths
else:
material_paths = [req.material_path]
is_multi = len(material_paths) > 1
_update_task(task_id, status="processing", progress=5, message="正在下载素材...")
temp_dir = settings.UPLOAD_DIR / "temp"
temp_dir.mkdir(parents=True, exist_ok=True)
input_material_path = temp_dir / f"{task_id}_input.mp4"
temp_files.append(input_material_path)
await _download_material(req.material_path, input_material_path)
# 单素材模式:下载主素材
if not is_multi:
input_material_path = temp_dir / f"{task_id}_input.mp4"
temp_files.append(input_material_path)
await _download_material(material_paths[0], input_material_path)
_update_task(task_id, message="正在生成语音...", progress=10)
@@ -119,7 +218,7 @@ async def process_video_generation(task_id: str, req: GenerateRequest, user_id:
ref_audio_path=str(ref_audio_local),
ref_text=req.ref_text,
output_path=str(audio_path),
language="Chinese"
language=_locale_to_qwen_lang(req.language)
)
else:
_update_task(task_id, message="正在生成语音 (EdgeTTS)...")
@@ -128,52 +227,183 @@ async def process_video_generation(task_id: str, req: GenerateRequest, user_id:
tts_time = time.time() - start_time
print(f"[Pipeline] TTS completed in {tts_time:.1f}s")
_update_task(task_id, progress=25)
_update_task(task_id, message="正在合成唇形 (LatentSync)...", progress=30)
lipsync = _get_lipsync_service()
lipsync_video_path = temp_dir / f"{task_id}_lipsync.mp4"
temp_files.append(lipsync_video_path)
lipsync_start = time.time()
is_ready = await _check_lipsync_ready()
if is_ready:
print(f"[LipSync] Starting LatentSync inference...")
_update_task(task_id, progress=35, message="正在运行 LatentSync 推理...")
await lipsync.generate(str(input_material_path), str(audio_path), str(lipsync_video_path))
else:
print(f"[LipSync] LatentSync not ready, copying original video")
_update_task(task_id, message="唇形同步不可用,使用原始视频...")
import shutil
shutil.copy(str(input_material_path), lipsync_video_path)
lipsync_time = time.time() - lipsync_start
print(f"[Pipeline] LipSync completed in {lipsync_time:.1f}s")
_update_task(task_id, progress=80)
video = VideoService()
captions_path = None
if req.enable_subtitles:
_update_task(task_id, message="正在生成字幕 (Whisper)...", progress=82)
if is_multi:
# ══════════════════════════════════════
# 多素材流水线
# ══════════════════════════════════════
_update_task(task_id, progress=12, message="正在生成字幕 (Whisper)...")
captions_path = temp_dir / f"{task_id}_captions.json"
temp_files.append(captions_path)
try:
await whisper_service.align(
captions_data = await whisper_service.align(
audio_path=str(audio_path),
text=req.text,
output_path=str(captions_path)
output_path=str(captions_path),
language=_locale_to_whisper_lang(req.language),
)
print(f"[Pipeline] Whisper alignment completed")
print(f"[Pipeline] Whisper alignment completed (multi-material)")
except Exception as e:
logger.warning(f"Whisper alignment failed, skipping subtitles: {e}")
logger.warning(f"Whisper alignment failed: {e}")
captions_data = None
captions_path = None
_update_task(task_id, progress=15, message="正在分配素材...")
if captions_data and captions_data.get("segments"):
assignments = _split_equal(captions_data["segments"], material_paths)
else:
# Whisper 失败 → 按时长均分(不依赖字符对齐)
logger.warning("[MultiMat] Whisper 无数据,按时长均分")
audio_dur = video._get_duration(str(audio_path))
if audio_dur <= 0:
audio_dur = 30.0 # 安全兜底
seg_dur = audio_dur / len(material_paths)
assignments = [
{"material_path": material_paths[i], "start": i * seg_dur,
"end": (i + 1) * seg_dur, "index": i}
for i in range(len(material_paths))
]
# 扩展段覆盖完整音频范围首段从0开始末段到音频结尾
audio_duration = video._get_duration(str(audio_path))
if assignments and audio_duration > 0:
assignments[0]["start"] = 0.0
assignments[-1]["end"] = audio_duration
num_segments = len(assignments)
print(f"[Pipeline] Multi-material: {num_segments} segments, {len(material_paths)} materials")
if num_segments == 0:
raise RuntimeError("Multi-material: no valid segments after splitting")
lipsync_start = time.time()
# ── 第一步:下载所有素材并检测分辨率 ──
material_locals: List[Path] = []
resolutions = []
for i, assignment in enumerate(assignments):
material_local = temp_dir / f"{task_id}_material_{i}.mp4"
temp_files.append(material_local)
await _download_material(assignment["material_path"], material_local)
material_locals.append(material_local)
resolutions.append(video.get_resolution(str(material_local)))
# 分辨率不一致时,统一到第一个素材的分辨率
base_res = resolutions[0] if resolutions else (0, 0)
need_scale = any(r != base_res for r in resolutions) and base_res[0] > 0
if need_scale:
logger.info(f"[MultiMat] 素材分辨率不一致,统一到 {base_res[0]}x{base_res[1]}")
# ── 第二步:裁剪每段素材到对应时长 ──
prepared_segments: List[Path] = []
for i, assignment in enumerate(assignments):
seg_progress = 15 + int((i / num_segments) * 30) # 15% → 45%
seg_dur = assignment["end"] - assignment["start"]
_update_task(
task_id,
progress=seg_progress,
message=f"正在准备素材 {i+1}/{num_segments}..."
)
prepared_path = temp_dir / f"{task_id}_prepared_{i}.mp4"
temp_files.append(prepared_path)
video.prepare_segment(
str(material_locals[i]), seg_dur, str(prepared_path),
target_resolution=base_res if need_scale else None
)
prepared_segments.append(prepared_path)
# ── 第二步:拼接所有素材片段 ──
_update_task(task_id, progress=50, message="正在拼接素材片段...")
concat_path = temp_dir / f"{task_id}_concat.mp4"
temp_files.append(concat_path)
video.concat_videos(
[str(p) for p in prepared_segments],
str(concat_path)
)
# ── 第三步:一次 LatentSync 推理 ──
is_ready = await _check_lipsync_ready()
if is_ready:
_update_task(task_id, progress=55, message="正在合成唇形 (LatentSync)...")
print(f"[LipSync] Multi-material: single LatentSync on concatenated video")
try:
await lipsync.generate(str(concat_path), str(audio_path), str(lipsync_video_path))
except Exception as e:
logger.warning(f"[LipSync] Failed, fallback to concat without lipsync: {e}")
import shutil
shutil.copy(str(concat_path), str(lipsync_video_path))
else:
print(f"[LipSync] Not ready, using concatenated video without lipsync")
import shutil
shutil.copy(str(concat_path), str(lipsync_video_path))
lipsync_time = time.time() - lipsync_start
print(f"[Pipeline] Multi-material prepare + concat + LipSync completed in {lipsync_time:.1f}s")
_update_task(task_id, progress=80)
# 如果用户关闭了字幕,清除 captions_pathWhisper 仅用于句子切分)
if not req.enable_subtitles:
captions_path = None
else:
# ══════════════════════════════════════
# 单素材流水线(原有逻辑)
# ══════════════════════════════════════
_update_task(task_id, progress=25)
_update_task(task_id, message="正在合成唇形 (LatentSync)...", progress=30)
lipsync_start = time.time()
is_ready = await _check_lipsync_ready()
if is_ready:
print(f"[LipSync] Starting LatentSync inference...")
_update_task(task_id, progress=35, message="正在运行 LatentSync 推理...")
await lipsync.generate(str(input_material_path), str(audio_path), str(lipsync_video_path))
else:
print(f"[LipSync] LatentSync not ready, copying original video")
_update_task(task_id, message="唇形同步不可用,使用原始视频...")
import shutil
shutil.copy(str(input_material_path), lipsync_video_path)
lipsync_time = time.time() - lipsync_start
print(f"[Pipeline] LipSync completed in {lipsync_time:.1f}s")
_update_task(task_id, progress=80)
# 单素材模式Whisper 在 LatentSync 之后
if req.enable_subtitles:
_update_task(task_id, message="正在生成字幕 (Whisper)...", progress=82)
captions_path = temp_dir / f"{task_id}_captions.json"
temp_files.append(captions_path)
try:
await whisper_service.align(
audio_path=str(audio_path),
text=req.text,
output_path=str(captions_path),
language=_locale_to_whisper_lang(req.language),
)
print(f"[Pipeline] Whisper alignment completed")
except Exception as e:
logger.warning(f"Whisper alignment failed, skipping subtitles: {e}")
captions_path = None
_update_task(task_id, progress=85)
video = VideoService()
final_audio_path = audio_path
if req.bgm_id:
_update_task(task_id, message="正在合成背景音乐...", progress=86)

View File

@@ -43,6 +43,7 @@ class GLMService:
要求:
1. 标题要简洁有力能吸引观众点击不超过10个字
2. 标签要与内容相关便于搜索和推荐只要3个
3. 标题和标签必须使用与口播文案相同的语言(如文案是英文就用英文,日文就用日文)
请严格按以下JSON格式返回不要包含其他内容
{{"title": "标题", "tags": ["标签1", "标签2", "标签3"]}}"""
@@ -120,6 +121,49 @@ class GLMService:
async def translate_text(self, text: str, target_lang: str) -> str:
"""
将文案翻译为指定语言
Args:
text: 原始文案
target_lang: 目标语言(如 English, 日本語 等)
Returns:
翻译后的文案
"""
prompt = f"""请将以下文案翻译为{target_lang}
原文:
{text}
要求:
1. 只返回翻译后的文案,不要添加任何解释或说明
2. 保持原文的语气和风格
3. 翻译要自然流畅,符合目标语言的表达习惯"""
try:
client = self._get_client()
logger.info(f"Using GLM to translate text to {target_lang}")
import asyncio
response = await asyncio.to_thread(
client.chat.completions.create,
model=settings.GLM_MODEL,
messages=[{"role": "user", "content": prompt}],
thinking={"type": "disabled"},
max_tokens=2000,
temperature=0.3
)
content = response.choices[0].message.content
logger.info("GLM translation completed")
return content.strip()
except Exception as e:
logger.error(f"GLM translate error: {e}")
raise Exception(f"AI 翻译失败: {str(e)}")
def _parse_json_response(self, content: str) -> dict:
"""解析 GLM 返回的 JSON 内容"""
# 尝试直接解析

View File

@@ -138,3 +138,109 @@ class VideoService:
return output_path
else:
raise RuntimeError("FFmpeg composition failed")
def concat_videos(self, video_paths: list, output_path: str) -> str:
"""使用 FFmpeg concat demuxer 拼接多个视频片段"""
if not video_paths:
raise ValueError("No video segments to concat")
Path(output_path).parent.mkdir(parents=True, exist_ok=True)
# 生成 concat list 文件
list_path = Path(output_path).parent / f"{Path(output_path).stem}_concat.txt"
with open(list_path, "w", encoding="utf-8") as f:
for vp in video_paths:
f.write(f"file '{vp}'\n")
cmd = [
"ffmpeg", "-y",
"-f", "concat",
"-safe", "0",
"-i", str(list_path),
"-c", "copy",
output_path,
]
try:
if self._run_ffmpeg(cmd):
return output_path
else:
raise RuntimeError("FFmpeg concat failed")
finally:
try:
list_path.unlink(missing_ok=True)
except Exception:
pass
def split_audio(self, audio_path: str, start: float, end: float, output_path: str) -> str:
"""用 FFmpeg 按时间范围切分音频"""
Path(output_path).parent.mkdir(parents=True, exist_ok=True)
duration = end - start
if duration <= 0:
raise ValueError(f"Invalid audio split range: start={start}, end={end}, duration={duration}")
cmd = [
"ffmpeg", "-y",
"-ss", str(start),
"-t", str(duration),
"-i", audio_path,
"-c", "copy",
output_path,
]
if self._run_ffmpeg(cmd):
return output_path
raise RuntimeError(f"FFmpeg audio split failed: {start}-{end}")
def get_resolution(self, file_path: str) -> tuple:
"""获取视频分辨率,返回 (width, height)"""
cmd = [
'ffprobe', '-v', 'error',
'-select_streams', 'v:0',
'-show_entries', 'stream=width,height',
'-of', 'csv=p=0',
file_path
]
try:
result = subprocess.run(cmd, capture_output=True, text=True, timeout=10)
parts = result.stdout.strip().split(',')
return (int(parts[0]), int(parts[1]))
except Exception:
return (0, 0)
def prepare_segment(self, video_path: str, target_duration: float, output_path: str,
target_resolution: tuple = None) -> str:
"""将素材视频裁剪或循环到指定时长(无音频)。
target_resolution: (width, height) 如需统一分辨率则传入,否则保持原分辨率。
"""
Path(output_path).parent.mkdir(parents=True, exist_ok=True)
video_dur = self._get_duration(video_path)
if video_dur <= 0:
video_dur = target_duration
needs_loop = target_duration > video_dur
needs_scale = target_resolution is not None
cmd = ["ffmpeg", "-y"]
if needs_loop:
loop_count = int(target_duration / video_dur) + 1
cmd.extend(["-stream_loop", str(loop_count)])
cmd.extend(["-i", video_path, "-t", str(target_duration), "-an"])
if needs_scale:
w, h = target_resolution
cmd.extend(["-vf", f"scale={w}:{h}:force_original_aspect_ratio=decrease,pad={w}:{h}:(ow-iw)/2:(oh-ih)/2"])
# 需要循环或缩放时必须重编码,否则用 stream copy 保持原画质
if needs_loop or needs_scale:
cmd.extend(["-c:v", "libx264", "-preset", "fast", "-crf", "18"])
else:
cmd.extend(["-c:v", "copy"])
cmd.append(output_path)
if self._run_ffmpeg(cmd):
return output_path
raise RuntimeError(f"FFmpeg prepare_segment failed: {video_path}")

View File

@@ -48,7 +48,7 @@ class VoiceCloneService:
"""
# 使用锁确保串行执行,避免 GPU 显存溢出
async with self._lock:
logger.info(f"🎤 Voice Clone: {text[:30]}...")
logger.info(f"🎤 Voice Clone: {text[:30]}... (language={language})")
Path(output_path).parent.mkdir(parents=True, exist_ok=True)
# 读取参考音频

View File

@@ -20,16 +20,23 @@ MAX_CHARS_PER_LINE = 12
def split_word_to_chars(word: str, start: float, end: float) -> list:
"""
将词拆分成单个字符,时间戳线性插值
将词拆分成单个字符,时间戳线性插值
保留英文词前的空格Whisper 输出如 " Hello"),用于正确重建英文字幕。
Args:
word: 词文本
word: 词文本(可能含前导空格)
start: 词开始时间
end: 词结束时间
Returns:
单字符列表,每个包含 word/start/end
"""
# 保留前导空格(英文 Whisper 输出常见 " Hello" 形式)
leading_space = ""
if word and not word[0].strip():
leading_space = " "
word = word.lstrip()
tokens = []
ascii_buffer = ""
@@ -54,7 +61,8 @@ def split_word_to_chars(word: str, start: float, end: float) -> list:
return []
if len(tokens) == 1:
return [{"word": tokens[0], "start": start, "end": end}]
w = leading_space + tokens[0] if leading_space else tokens[0]
return [{"word": w, "start": start, "end": end}]
# 线性插值时间戳
duration = end - start
@@ -64,8 +72,11 @@ def split_word_to_chars(word: str, start: float, end: float) -> list:
for i, token in enumerate(tokens):
token_start = start + i * token_duration
token_end = start + (i + 1) * token_duration
w = token
if i == 0 and leading_space:
w = leading_space + w
result.append({
"word": token,
"word": w,
"start": round(token_start, 3),
"end": round(token_end, 3)
})
@@ -108,7 +119,7 @@ def split_segment_to_lines(words: List[dict], max_chars: int = MAX_CHARS_PER_LIN
if should_break and current_words:
segments.append({
"text": current_text,
"text": current_text.strip(),
"start": current_words[0]["start"],
"end": current_words[-1]["end"],
"words": current_words.copy()
@@ -119,7 +130,7 @@ def split_segment_to_lines(words: List[dict], max_chars: int = MAX_CHARS_PER_LIN
# 处理剩余的字
if current_words:
segments.append({
"text": current_text,
"text": current_text.strip(),
"start": current_words[0]["start"],
"end": current_words[-1]["end"],
"words": current_words.copy()
@@ -162,7 +173,8 @@ class WhisperService:
self,
audio_path: str,
text: str,
output_path: Optional[str] = None
output_path: Optional[str] = None,
language: str = "zh",
) -> dict:
"""
对音频进行转录,生成字级别时间戳
@@ -171,12 +183,16 @@ class WhisperService:
audio_path: 音频文件路径
text: 原始文本(用于参考,但实际使用 whisper 转录结果)
output_path: 可选,输出 JSON 文件路径
language: 语言代码 (zh/en 等)
Returns:
包含字级别时间戳的字典
"""
import asyncio
# 英文等西文需要更大的每行字数
max_chars = 40 if language != "zh" else MAX_CHARS_PER_LINE
def _do_transcribe():
model = self._load_model()
@@ -185,7 +201,7 @@ class WhisperService:
# 转录并获取字级别时间戳
segments_iter, info = model.transcribe(
audio_path,
language="zh",
language=language,
word_timestamps=True, # 启用字级别时间戳
vad_filter=True, # 启用 VAD 过滤静音
)
@@ -198,9 +214,10 @@ class WhisperService:
all_words = []
if segment.words:
for word_info in segment.words:
word_text = word_info.word.strip()
if word_text:
word_text = word_info.word
if word_text.strip():
# 将词拆分成单字,时间戳线性插值
# 保留前导空格用于英文词间距
chars = split_word_to_chars(
word_text,
word_info.start,
@@ -210,7 +227,7 @@ class WhisperService:
# 将长段落按标点和字数拆分成多行
if all_words:
line_segments = split_segment_to_lines(all_words, MAX_CHARS_PER_LINE)
line_segments = split_segment_to_lines(all_words, max_chars)
all_segments.extend(line_segments)
logger.info(f"Generated {len(all_segments)} subtitle segments")