更新代码

This commit is contained in:
Kevin Wong
2026-02-02 10:51:27 +08:00
parent cf679b34bf
commit 6801d3e8aa
38 changed files with 2234 additions and 293 deletions

View File

@@ -0,0 +1,102 @@
"""
GLM AI 服务
使用智谱 GLM-4.7-Flash 生成标题和标签
"""
import json
import re
import httpx
from loguru import logger
class GLMService:
"""GLM AI 服务"""
API_URL = "https://open.bigmodel.cn/api/paas/v4/chat/completions"
API_KEY = "5915240ea48d4e93b454bc2412d1cc54.e054ej4pPqi9G6rc"
async def generate_title_tags(self, text: str) -> dict:
"""
根据口播文案生成标题和标签
Args:
text: 口播文案
Returns:
{"title": "标题", "tags": ["标签1", "标签2", ...]}
"""
prompt = f"""根据以下口播文案生成一个吸引人的短视频标题和3个相关标签。
口播文案:
{text}
要求:
1. 标题要简洁有力能吸引观众点击不超过10个字
2. 标签要与内容相关便于搜索和推荐只要3个
请严格按以下JSON格式返回不要包含其他内容
{{"title": "标题", "tags": ["标签1", "标签2", "标签3"]}}"""
try:
async with httpx.AsyncClient(timeout=30.0) as client:
response = await client.post(
self.API_URL,
headers={
"Content-Type": "application/json",
"Authorization": f"Bearer {self.API_KEY}"
},
json={
"model": "glm-4-flash",
"messages": [{"role": "user", "content": prompt}],
"max_tokens": 500,
"temperature": 0.7
}
)
response.raise_for_status()
data = response.json()
# 提取生成的内容
content = data["choices"][0]["message"]["content"]
logger.info(f"GLM response: {content}")
# 解析 JSON
result = self._parse_json_response(content)
return result
except httpx.HTTPError as e:
logger.error(f"GLM API request failed: {e}")
raise Exception(f"AI 服务请求失败: {str(e)}")
except Exception as e:
logger.error(f"GLM service error: {e}")
raise Exception(f"AI 生成失败: {str(e)}")
def _parse_json_response(self, content: str) -> dict:
"""解析 GLM 返回的 JSON 内容"""
# 尝试直接解析
try:
return json.loads(content)
except json.JSONDecodeError:
pass
# 尝试提取 JSON 块
json_match = re.search(r'\{[^{}]*"title"[^{}]*"tags"[^{}]*\}', content, re.DOTALL)
if json_match:
try:
return json.loads(json_match.group())
except json.JSONDecodeError:
pass
# 尝试提取 ```json 代码块
code_match = re.search(r'```(?:json)?\s*(\{.*?\})\s*```', content, re.DOTALL)
if code_match:
try:
return json.loads(code_match.group(1))
except json.JSONDecodeError:
pass
logger.error(f"Failed to parse GLM response: {content}")
raise Exception("AI 返回格式解析失败")
# 全局服务实例
glm_service = GLMService()

View File

@@ -73,7 +73,51 @@ class LipSyncService:
logger.warning(f"⚠️ Conda Python 不存在: {self.conda_python}")
return False
return True
def _get_media_duration(self, media_path: str) -> Optional[float]:
"""获取音频或视频的时长(秒)"""
try:
cmd = [
"ffprobe", "-v", "error",
"-show_entries", "format=duration",
"-of", "default=noprint_wrappers=1:nokey=1",
media_path
]
result = subprocess.run(cmd, capture_output=True, text=True, timeout=10)
if result.returncode == 0:
return float(result.stdout.strip())
except Exception as e:
logger.warning(f"⚠️ 获取媒体时长失败: {e}")
return None
def _loop_video_to_duration(self, video_path: str, output_path: str, target_duration: float) -> str:
"""
循环视频以匹配目标时长
使用 FFmpeg stream_loop 实现无缝循环
"""
try:
cmd = [
"ffmpeg", "-y",
"-stream_loop", "-1", # 无限循环
"-i", video_path,
"-t", str(target_duration), # 截取到目标时长
"-c:v", "libx264",
"-preset", "fast",
"-crf", "18",
"-an", # 去掉原音频
output_path
]
result = subprocess.run(cmd, capture_output=True, text=True, timeout=300)
if result.returncode == 0 and Path(output_path).exists():
logger.info(f"✅ 视频循环完成: {target_duration:.1f}s")
return output_path
else:
logger.warning(f"⚠️ 视频循环失败: {result.stderr[:200]}")
return video_path
except Exception as e:
logger.warning(f"⚠️ 视频循环异常: {e}")
return video_path
def _preprocess_video(self, video_path: str, output_path: str, target_height: int = 720) -> str:
"""
视频预处理:压缩视频以加速后续处理
@@ -204,27 +248,34 @@ class LipSyncService:
logger.info("⏳ 等待 GPU 资源 (排队中)...")
async with self._lock:
if self.use_server:
# 模式 A: 调用常驻服务 (加速模式)
return await self._call_persistent_server(video_path, audio_path, output_path)
logger.info("🔄 调用 LatentSync 推理 (subprocess)...")
# 使用临时目录存放输出
# 使用临时目录存放中间文件
with tempfile.TemporaryDirectory() as tmpdir:
tmpdir = Path(tmpdir)
# 获取音频和视频时长
audio_duration = self._get_media_duration(audio_path)
video_duration = self._get_media_duration(video_path)
# 如果音频比视频长,循环视频以匹配音频长度
if audio_duration and video_duration and audio_duration > video_duration + 0.5:
logger.info(f"🔄 音频({audio_duration:.1f}s) > 视频({video_duration:.1f}s),循环视频...")
looped_video = tmpdir / "looped_input.mp4"
actual_video_path = self._loop_video_to_duration(
video_path,
str(looped_video),
audio_duration
)
else:
actual_video_path = video_path
if self.use_server:
# 模式 A: 调用常驻服务 (加速模式)
return await self._call_persistent_server(actual_video_path, audio_path, output_path)
logger.info("🔄 调用 LatentSync 推理 (subprocess)...")
temp_output = tmpdir / "output.mp4"
# 视频预处理:压缩高分辨率视频以加速处理
# preprocessed_video = tmpdir / "preprocessed_input.mp4"
# actual_video_path = self._preprocess_video(
# video_path,
# str(preprocessed_video),
# target_height=720
# )
# 暂时禁用预处理以保持原始分辨率
actual_video_path = video_path
# 构建命令
cmd = [
str(self.conda_python),
@@ -285,7 +336,7 @@ class LipSyncService:
return output_path
logger.info(f"LatentSync 输出:\n{stdout_text[-500:] if stdout_text else 'N/A'}")
# 检查输出文件
if temp_output.exists():
shutil.copy(temp_output, output_path)

View File

@@ -82,8 +82,15 @@ class VideoService:
# Previous state: subtitles disabled due to font issues
# if subtitle_path: ...
# Audio map
cmd.extend(["-c:v", "libx264", "-c:a", "aac", "-shortest"])
# Audio map with high quality encoding
cmd.extend([
"-c:v", "libx264",
"-preset", "slow", # 慢速预设,更好的压缩效率
"-crf", "18", # 高质量(与 LatentSync 一致)
"-c:a", "aac",
"-b:a", "192k", # 音频比特率
"-shortest"
])
# Use audio from input 1
cmd.extend(["-map", "0:v", "-map", "1:a"])

View File

@@ -3,6 +3,7 @@
通过 HTTP 调用 Qwen3-TTS 独立服务 (端口 8009)
"""
import httpx
import asyncio
from pathlib import Path
from typing import Optional
from loguru import logger
@@ -21,6 +22,8 @@ class VoiceCloneService:
# 健康状态缓存
self._health_cache: Optional[dict] = None
self._health_cache_time: float = 0
# GPU 并发锁 (Serial Queue)
self._lock = asyncio.Lock()
async def generate_audio(
self,
@@ -43,41 +46,43 @@ class VoiceCloneService:
Returns:
输出文件路径
"""
logger.info(f"🎤 Voice Clone: {text[:30]}...")
Path(output_path).parent.mkdir(parents=True, exist_ok=True)
# 使用锁确保串行执行,避免 GPU 显存溢出
async with self._lock:
logger.info(f"🎤 Voice Clone: {text[:30]}...")
Path(output_path).parent.mkdir(parents=True, exist_ok=True)
# 读取参考音频
with open(ref_audio_path, "rb") as f:
ref_audio_data = f.read()
# 读取参考音频
with open(ref_audio_path, "rb") as f:
ref_audio_data = f.read()
# 调用 Qwen3-TTS 服务
timeout = httpx.Timeout(300.0) # 5分钟超时
async with httpx.AsyncClient(timeout=timeout) as client:
try:
response = await client.post(
f"{self.base_url}/generate",
files={"ref_audio": ("ref.wav", ref_audio_data, "audio/wav")},
data={
"text": text,
"ref_text": ref_text,
"language": language
}
)
response.raise_for_status()
# 调用 Qwen3-TTS 服务
timeout = httpx.Timeout(300.0) # 5分钟超时
async with httpx.AsyncClient(timeout=timeout) as client:
try:
response = await client.post(
f"{self.base_url}/generate",
files={"ref_audio": ("ref.wav", ref_audio_data, "audio/wav")},
data={
"text": text,
"ref_text": ref_text,
"language": language
}
)
response.raise_for_status()
# 保存返回的音频
with open(output_path, "wb") as f:
f.write(response.content)
# 保存返回的音频
with open(output_path, "wb") as f:
f.write(response.content)
logger.info(f"✅ Voice clone saved: {output_path}")
return output_path
logger.info(f"✅ Voice clone saved: {output_path}")
return output_path
except httpx.HTTPStatusError as e:
logger.error(f"Qwen3-TTS API error: {e.response.status_code} - {e.response.text}")
raise RuntimeError(f"声音克隆服务错误: {e.response.text}")
except httpx.RequestError as e:
logger.error(f"Qwen3-TTS connection error: {e}")
raise RuntimeError("无法连接声音克隆服务,请检查服务是否启动")
except httpx.HTTPStatusError as e:
logger.error(f"Qwen3-TTS API error: {e.response.status_code} - {e.response.text}")
raise RuntimeError(f"声音克隆服务错误: {e.response.text}")
except httpx.RequestError as e:
logger.error(f"Qwen3-TTS connection error: {e}")
raise RuntimeError("无法连接声音克隆服务,请检查服务是否启动")
async def check_health(self) -> dict:
"""健康检查"""

View File

@@ -6,12 +6,17 @@
import json
import re
from pathlib import Path
from typing import Optional
from typing import Optional, List
from loguru import logger
# 模型缓存
_whisper_model = None
# 断句标点
SENTENCE_PUNCTUATION = set('。!?,、;:,.!?;:')
# 每行最大字数
MAX_CHARS_PER_LINE = 12
def split_word_to_chars(word: str, start: float, end: float) -> list:
"""
@@ -50,6 +55,61 @@ def split_word_to_chars(word: str, start: float, end: float) -> list:
return result
def split_segment_to_lines(words: List[dict], max_chars: int = MAX_CHARS_PER_LINE) -> List[dict]:
"""
将长段落按标点和字数拆分成多行
Args:
words: 字列表,每个包含 word/start/end
max_chars: 每行最大字数
Returns:
拆分后的 segment 列表
"""
if not words:
return []
segments = []
current_words = []
current_text = ""
for word_info in words:
char = word_info["word"]
current_words.append(word_info)
current_text += char
# 判断是否需要断句
should_break = False
# 1. 遇到断句标点
if char in SENTENCE_PUNCTUATION:
should_break = True
# 2. 达到最大字数
elif len(current_text) >= max_chars:
should_break = True
if should_break and current_words:
segments.append({
"text": current_text,
"start": current_words[0]["start"],
"end": current_words[-1]["end"],
"words": current_words.copy()
})
current_words = []
current_text = ""
# 处理剩余的字
if current_words:
segments.append({
"text": current_text,
"start": current_words[0]["start"],
"end": current_words[-1]["end"],
"words": current_words.copy()
})
return segments
class WhisperService:
"""字幕对齐服务(基于 faster-whisper"""
@@ -114,16 +174,10 @@ class WhisperService:
logger.info(f"Detected language: {info.language} (prob: {info.language_probability:.2f})")
segments = []
all_segments = []
for segment in segments_iter:
seg_data = {
"text": segment.text.strip(),
"start": segment.start,
"end": segment.end,
"words": []
}
# 提取每个字的时间戳,并拆分成单字
all_words = []
if segment.words:
for word_info in segment.words:
word_text = word_info.word.strip()
@@ -134,12 +188,15 @@ class WhisperService:
word_info.start,
word_info.end
)
seg_data["words"].extend(chars)
all_words.extend(chars)
if seg_data["words"]: # 只添加有内容的段落
segments.append(seg_data)
# 将长段落按标点和字数拆分成多行
if all_words:
line_segments = split_segment_to_lines(all_words, MAX_CHARS_PER_LINE)
all_segments.extend(line_segments)
return {"segments": segments}
logger.info(f"Generated {len(all_segments)} subtitle segments")
return {"segments": all_segments}
# 在线程池中执行
loop = asyncio.get_event_loop()