Files
ViGent2/backend/app/services/whisper_service.py
Kevin Wong cb10da52fc 更新
2026-02-03 13:46:52 +08:00

289 lines
8.0 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
字幕对齐服务
使用 faster-whisper 生成字级别时间戳
"""
import json
import re
from pathlib import Path
from typing import Optional, List
from loguru import logger
# 模型缓存
_whisper_model = None
# 断句标点
SENTENCE_PUNCTUATION = set('。!?,、;:,.!?;:')
# 每行最大字数
MAX_CHARS_PER_LINE = 12
def split_word_to_chars(word: str, start: float, end: float) -> list:
"""
将词拆分成单个字符,时间戳线性插值
Args:
word: 词文本
start: 词开始时间
end: 词结束时间
Returns:
单字符列表,每个包含 word/start/end
"""
tokens = []
ascii_buffer = ""
for char in word:
if not char.strip():
continue
if char.isascii() and char.isalnum():
ascii_buffer += char
continue
if ascii_buffer:
tokens.append(ascii_buffer)
ascii_buffer = ""
tokens.append(char)
if ascii_buffer:
tokens.append(ascii_buffer)
if not tokens:
return []
if len(tokens) == 1:
return [{"word": tokens[0], "start": start, "end": end}]
# 线性插值时间戳
duration = end - start
token_duration = duration / len(tokens)
result = []
for i, token in enumerate(tokens):
token_start = start + i * token_duration
token_end = start + (i + 1) * token_duration
result.append({
"word": token,
"start": round(token_start, 3),
"end": round(token_end, 3)
})
return result
def split_segment_to_lines(words: List[dict], max_chars: int = MAX_CHARS_PER_LINE) -> List[dict]:
"""
将长段落按标点和字数拆分成多行
Args:
words: 字列表,每个包含 word/start/end
max_chars: 每行最大字数
Returns:
拆分后的 segment 列表
"""
if not words:
return []
segments = []
current_words = []
current_text = ""
for word_info in words:
char = word_info["word"]
current_words.append(word_info)
current_text += char
# 判断是否需要断句
should_break = False
# 1. 遇到断句标点
if char in SENTENCE_PUNCTUATION:
should_break = True
# 2. 达到最大字数
elif len(current_text) >= max_chars:
should_break = True
if should_break and current_words:
segments.append({
"text": current_text,
"start": current_words[0]["start"],
"end": current_words[-1]["end"],
"words": current_words.copy()
})
current_words = []
current_text = ""
# 处理剩余的字
if current_words:
segments.append({
"text": current_text,
"start": current_words[0]["start"],
"end": current_words[-1]["end"],
"words": current_words.copy()
})
return segments
class WhisperService:
"""字幕对齐服务(基于 faster-whisper"""
def __init__(
self,
model_size: str = "large-v3",
device: str = "cuda",
compute_type: str = "float16",
):
self.model_size = model_size
self.device = device
self.compute_type = compute_type
def _load_model(self):
"""懒加载 faster-whisper 模型"""
global _whisper_model
if _whisper_model is None:
from faster_whisper import WhisperModel
logger.info(f"Loading faster-whisper model: {self.model_size} on {self.device}")
_whisper_model = WhisperModel(
self.model_size,
device=self.device,
compute_type=self.compute_type
)
logger.info("faster-whisper model loaded")
return _whisper_model
async def align(
self,
audio_path: str,
text: str,
output_path: Optional[str] = None
) -> dict:
"""
对音频进行转录,生成字级别时间戳
Args:
audio_path: 音频文件路径
text: 原始文本(用于参考,但实际使用 whisper 转录结果)
output_path: 可选,输出 JSON 文件路径
Returns:
包含字级别时间戳的字典
"""
import asyncio
def _do_transcribe():
model = self._load_model()
logger.info(f"Transcribing audio: {audio_path}")
# 转录并获取字级别时间戳
segments_iter, info = model.transcribe(
audio_path,
language="zh",
word_timestamps=True, # 启用字级别时间戳
vad_filter=True, # 启用 VAD 过滤静音
)
logger.info(f"Detected language: {info.language} (prob: {info.language_probability:.2f})")
all_segments = []
for segment in segments_iter:
# 提取每个字的时间戳,并拆分成单字
all_words = []
if segment.words:
for word_info in segment.words:
word_text = word_info.word.strip()
if word_text:
# 将词拆分成单字,时间戳线性插值
chars = split_word_to_chars(
word_text,
word_info.start,
word_info.end
)
all_words.extend(chars)
# 将长段落按标点和字数拆分成多行
if all_words:
line_segments = split_segment_to_lines(all_words, MAX_CHARS_PER_LINE)
all_segments.extend(line_segments)
logger.info(f"Generated {len(all_segments)} subtitle segments")
return {"segments": all_segments}
# 在线程池中执行
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(None, _do_transcribe)
# 保存到文件
if output_path:
output_file = Path(output_path)
output_file.parent.mkdir(parents=True, exist_ok=True)
with open(output_file, "w", encoding="utf-8") as f:
json.dump(result, f, ensure_ascii=False, indent=2)
logger.info(f"Captions saved to: {output_path}")
return result
async def transcribe(self, audio_path: str) -> str:
"""
仅转录文本(用于提取文案)
Args:
audio_path: 音频/视频文件路径
Returns:
纯文本内容
"""
import asyncio
def _do_transcribe_text():
model = self._load_model()
logger.info(f"Extracting script from: {audio_path}")
# 转录 (无需字级时间戳)
segments_iter, _ = model.transcribe(
audio_path,
language="zh",
word_timestamps=False,
vad_filter=True,
)
text_parts = []
for segment in segments_iter:
text_parts.append(segment.text.strip())
full_text = " ".join(text_parts)
logger.info(f"Extracted text length: {len(full_text)}")
return full_text
# 在线程池中执行
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(None, _do_transcribe_text)
return result
async def check_health(self) -> dict:
"""检查服务健康状态"""
try:
from faster_whisper import WhisperModel
return {
"ready": True,
"model_size": self.model_size,
"device": self.device,
"backend": "faster-whisper"
}
except ImportError:
return {
"ready": False,
"error": "faster-whisper not installed"
}
# 全局服务实例
whisper_service = WhisperService()