import asyncio import os import re import json import time import shutil import subprocess import traceback from pathlib import Path from typing import Optional, Any from urllib.parse import unquote import httpx from loguru import logger from app.services.whisper_service import whisper_service from app.services.glm_service import glm_service async def extract_script(file=None, url: Optional[str] = None, rewrite: bool = True, custom_prompt: Optional[str] = None) -> dict: """ 文案提取:上传文件或视频链接 -> Whisper 转写 -> (可选) GLM 改写 """ if not file and not url: raise ValueError("必须提供文件或视频链接") temp_path = None try: timestamp = int(time.time()) temp_dir = Path("/tmp") if os.name == 'nt': temp_dir = Path("d:/tmp") temp_dir.mkdir(parents=True, exist_ok=True) loop = asyncio.get_event_loop() # 1. 获取/保存文件 if file: filename = file.filename if not filename: raise ValueError("文件名无效") safe_filename = Path(filename).name.replace(" ", "_") temp_path = temp_dir / f"tool_extract_{timestamp}_{safe_filename}" max_bytes = 500 * 1024 * 1024 # 500MB total_written = 0 with open(temp_path, "wb") as dst: while True: chunk = file.file.read(1024 * 1024) if not chunk: break total_written += len(chunk) if total_written > max_bytes: dst.close() os.remove(temp_path) raise ValueError("上传文件大小不能超过 500MB") dst.write(chunk) logger.info(f"Tool processing upload file: {temp_path}") else: temp_path = await _download_video(url, temp_dir, timestamp) if not temp_path or not temp_path.exists(): raise ValueError("文件获取失败") # 下载文件体积检查(500MB 上限) max_download_bytes = 500 * 1024 * 1024 file_size = temp_path.stat().st_size if file_size > max_download_bytes: os.remove(temp_path) raise ValueError(f"下载的文件过大({file_size / (1024*1024):.0f}MB),上限 500MB") # 1.5 安全转换: 强制转为 WAV (16k) audio_path = temp_dir / f"extract_audio_{timestamp}.wav" try: await loop.run_in_executor(None, lambda: _convert_to_wav(temp_path, audio_path)) logger.info(f"Converted to WAV: {audio_path}") except ValueError as ve: if str(ve) == "HTML_DETECTED": raise ValueError("下载的文件是网页而非视频,请重试或手动上传。") else: raise ValueError("下载的文件已损坏或格式无法识别。") # 2. 提取文案 (Whisper) script = await whisper_service.transcribe(str(audio_path)) # 3. AI 改写 (GLM) — 失败时降级返回原文 rewritten = None if rewrite and script and len(script.strip()) > 0: logger.info("Rewriting script...") try: rewritten = await glm_service.rewrite_script(script, custom_prompt) except Exception as e: logger.warning(f"GLM rewrite failed, returning original script: {e}") rewritten = None return { "original_script": script, "rewritten_script": rewritten } finally: if temp_path and temp_path.exists(): try: os.remove(temp_path) logger.info(f"Cleaned up temp file: {temp_path}") except Exception as e: logger.warning(f"Failed to cleanup temp file {temp_path}: {e}") def _convert_to_wav(input_path: Path, output_path: Path) -> None: """FFmpeg 转换为 16k WAV""" try: convert_cmd = [ 'ffmpeg', '-i', str(input_path), '-vn', '-acodec', 'pcm_s16le', '-ar', '16000', '-ac', '1', '-y', str(output_path) ] subprocess.run(convert_cmd, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) except subprocess.CalledProcessError as e: error_log = e.stderr.decode('utf-8', errors='ignore') if e.stderr else str(e) logger.error(f"FFmpeg check/convert failed: {error_log}") head = b"" try: with open(input_path, 'rb') as f: head = f.read(100) except: pass if b' Path: """下载视频(yt-dlp 优先,失败回退手动解析)""" url_value = url url_match = re.search(r'https?://[^\s]+', url_value) if url_match: extracted_url = url_match.group(0) logger.info(f"Extracted URL from text: {extracted_url}") url_value = extracted_url logger.info(f"Tool downloading URL: {url_value}") loop = asyncio.get_event_loop() # 先尝试 yt-dlp try: temp_path = await loop.run_in_executor(None, lambda: _download_yt_dlp(url_value, temp_dir, timestamp)) logger.info(f"yt-dlp downloaded to: {temp_path}") return temp_path except Exception as e: logger.warning(f"yt-dlp download failed: {e}. Trying manual fallback...") if "douyin" in url_value: manual_path = await _download_douyin_manual(url_value, temp_dir, timestamp) if manual_path: return manual_path raise ValueError(f"视频下载失败。yt-dlp 报错: {str(e)}") elif "bilibili" in url_value: manual_path = await _download_bilibili_manual(url_value, temp_dir, timestamp) if manual_path: return manual_path raise ValueError(f"视频下载失败。yt-dlp 报错: {str(e)}") else: raise ValueError(f"视频下载失败: {str(e)}") def _download_yt_dlp(url_value: str, temp_dir: Path, timestamp: int) -> Path: """yt-dlp 下载(阻塞调用,应在线程池中运行)""" import yt_dlp logger.info("Attempting download with yt-dlp...") ydl_opts = { 'format': 'bestaudio/best', 'outtmpl': str(temp_dir / f"tool_download_{timestamp}_%(id)s.%(ext)s"), 'quiet': True, 'no_warnings': True, 'http_headers': { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36', 'Referer': 'https://www.douyin.com/', } } with yt_dlp.YoutubeDL(ydl_opts) as ydl: info = ydl.extract_info(url_value, download=True) if 'requested_downloads' in info: downloaded_file = info['requested_downloads'][0]['filepath'] else: ext = info.get('ext', 'mp4') vid_id = info.get('id') downloaded_file = str(temp_dir / f"tool_download_{timestamp}_{vid_id}.{ext}") return Path(downloaded_file) async def _download_douyin_manual(url: str, temp_dir: Path, timestamp: int) -> Optional[Path]: """手动下载抖音视频 (Fallback) — 通过移动端分享页获取播放地址""" logger.info(f"[douyin-fallback] Starting download for: {url}") try: # 1. 解析短链接,提取视频 ID headers = { "user-agent": "Mozilla/5.0 (iPhone; CPU iPhone OS 16_0 like Mac OS X) AppleWebKit/605.1.15" } async with httpx.AsyncClient(follow_redirects=True, timeout=10.0) as client: resp = await client.get(url, headers=headers) final_url = str(resp.url) logger.info(f"[douyin-fallback] Final URL: {final_url}") video_id = None match = re.search(r'/video/(\d+)', final_url) if match: video_id = match.group(1) if not video_id: logger.error("[douyin-fallback] Could not extract video_id") return None logger.info(f"[douyin-fallback] Extracted video_id: {video_id}") # 2. 获取新鲜 ttwid ttwid = "" try: async with httpx.AsyncClient(timeout=10.0) as client: ttwid_resp = await client.post( "https://ttwid.bytedance.com/ttwid/union/register/", json={ "region": "cn", "aid": 6383, "needFid": False, "service": "www.douyin.com", "migrate_info": {"ticket": "", "source": "node"}, "cbUrlProtocol": "https", "union": True, } ) ttwid = ttwid_resp.cookies.get("ttwid", "") logger.info(f"[douyin-fallback] Got fresh ttwid (len={len(ttwid)})") except Exception as e: logger.warning(f"[douyin-fallback] Failed to get ttwid: {e}") # 3. 访问移动端分享页提取播放地址 page_headers = { "user-agent": "Mozilla/5.0 (iPhone; CPU iPhone OS 16_0 like Mac OS X) AppleWebKit/605.1.15", "cookie": f"ttwid={ttwid}" if ttwid else "", } async with httpx.AsyncClient(follow_redirects=True, timeout=15.0) as client: page_resp = await client.get( f"https://m.douyin.com/share/video/{video_id}", headers=page_headers, ) page_text = page_resp.text logger.info(f"[douyin-fallback] Mobile page length: {len(page_text)}") # 4. 提取 play_addr addr_match = re.search( r'"play_addr":\{"uri":"([^"]+)","url_list":\["([^"]+)"', page_text, ) if not addr_match: logger.error("[douyin-fallback] Could not find play_addr in mobile page") return None video_url = addr_match.group(2).replace(r"\u002F", "/") if video_url.startswith("//"): video_url = "https:" + video_url logger.info(f"[douyin-fallback] Found video URL: {video_url[:80]}...") # 5. 下载视频 temp_path = temp_dir / f"douyin_manual_{timestamp}.mp4" download_headers = { "Referer": "https://www.douyin.com/", "User-Agent": "Mozilla/5.0 (iPhone; CPU iPhone OS 16_0 like Mac OS X) AppleWebKit/605.1.15", } async with httpx.AsyncClient(timeout=120.0, follow_redirects=True) as client: async with client.stream("GET", video_url, headers=download_headers) as dl_resp: if dl_resp.status_code == 200: with open(temp_path, "wb") as f: async for chunk in dl_resp.aiter_bytes(chunk_size=8192): f.write(chunk) logger.info(f"[douyin-fallback] Downloaded successfully: {temp_path}") return temp_path else: logger.error(f"[douyin-fallback] Download failed: {dl_resp.status_code}") return None except Exception as e: logger.error(f"[douyin-fallback] Logic failed: {e}") return None async def _download_bilibili_manual(url: str, temp_dir: Path, timestamp: int) -> Optional[Path]: """手动下载 Bilibili 视频 (Playwright Fallback)""" from playwright.async_api import async_playwright logger.info(f"[Playwright] Starting Bilibili download for: {url}") playwright = None browser = None try: playwright = await async_playwright().start() browser = await playwright.chromium.launch(headless=True, args=['--no-sandbox', '--disable-setuid-sandbox']) context = await browser.new_context( user_agent="Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36" ) page = await context.new_page() logger.info("[Playwright] Navigating to Bilibili...") await page.goto(url, timeout=45000) try: await page.wait_for_selector('video', timeout=15000) except: logger.warning("[Playwright] Video selector timeout") playinfo = await page.evaluate("window.__playinfo__") audio_url = None if playinfo and "data" in playinfo and "dash" in playinfo["data"]: dash = playinfo["data"]["dash"] if "audio" in dash and dash["audio"]: audio_url = dash["audio"][0]["baseUrl"] logger.info(f"[Playwright] Found audio stream in __playinfo__: {audio_url[:50]}...") if not audio_url: logger.warning("[Playwright] Could not find audio in __playinfo__") return None temp_path = temp_dir / f"bilibili_audio_{timestamp}.m4s" try: api_request = context.request headers = { "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36", "Referer": "https://www.bilibili.com/" } logger.info(f"[Playwright] Downloading audio stream...") response = await api_request.get(audio_url, headers=headers) if response.status == 200: body = await response.body() with open(temp_path, 'wb') as f: f.write(body) logger.info(f"[Playwright] Downloaded successfully: {temp_path}") return temp_path else: logger.error(f"[Playwright] API Request failed: {response.status}") return None except Exception as e: logger.error(f"[Playwright] Download logic error: {e}") return None except Exception as e: logger.error(f"[Playwright] Bilibili download failed: {e}") return None finally: if browser: await browser.close() if playwright: await playwright.stop()