""" QR码自动登录服务 后端Playwright无头模式获取二维码,前端扫码后自动保存Cookie """ import asyncio import time import base64 import json from pathlib import Path from typing import Optional, Dict, Any, List, Sequence, Mapping, Union from playwright.async_api import async_playwright, Page, Frame, BrowserContext, Browser, Playwright as PW from loguru import logger from app.core.config import settings class QRLoginService: """QR码登录服务""" # 登录监控超时 (秒) LOGIN_TIMEOUT = 120 def __init__(self, platform: str, cookies_dir: Path) -> None: self.platform = platform self.cookies_dir = cookies_dir self.qr_code_image: Optional[str] = None self.login_success: bool = False self.cookies_data: Optional[Dict[str, Any]] = None # Playwright 资源 (手动管理生命周期) self.playwright: Optional[PW] = None self.browser: Optional[Browser] = None self.context: Optional[BrowserContext] = None # 每个平台使用多个选择器 (使用逗号分隔,Playwright会同时等待它们) self.platform_configs = { "bilibili": { "url": "https://passport.bilibili.com/login", "qr_selectors": [ "div[class*='qrcode'] canvas", # 常见canvas二维码 "div[class*='qrcode'] img", # 常见图片二维码 ".qrcode-img img", # 旧版 ".login-scan-box img", # 扫码框 "div[class*='scan'] img" ], "success_indicator": "https://www.bilibili.com/" }, "douyin": { "url": "https://creator.douyin.com/", "qr_selectors": [ ".qrcode img", # 优先尝试 "img[alt='qrcode']", "canvas[class*='qr']", "img[src*='qr']" ], "success_indicator": "https://creator.douyin.com/creator-micro" }, "xiaohongshu": { "url": "https://creator.xiaohongshu.com/", "qr_selectors": [ ".qrcode img", "img[alt*='二维码']", "canvas.qr-code", "img[class*='qr']" ], "success_indicator": "https://creator.xiaohongshu.com/publish" }, "weixin": { "url": "https://channels.weixin.qq.com/platform/", "qr_selectors": [ "div[class*='qrcode'] img", "img[alt*='二维码']", "img[src*='qr']", "canvas", "svg", "img[class*='qr']" ], "success_indicator": "https://channels.weixin.qq.com/platform" } } def _resolve_headless_mode(self) -> str: if self.platform != "weixin": return "headless" mode = (settings.WEIXIN_HEADLESS_MODE or "").strip().lower() return mode or "headful" def _is_square_bbox(self, bbox: Optional[Dict[str, float]], min_side: int = 100) -> bool: if not bbox: return False width = bbox.get("width", 0) height = bbox.get("height", 0) if width < min_side or height < min_side: return False if height == 0: return False ratio = width / height return 0.75 <= ratio <= 1.33 async def _pick_best_candidate(self, locator, min_side: int = 100): best = None best_area = 0 try: count = await locator.count() except Exception: return None for i in range(count): try: candidate = locator.nth(i) if not await candidate.is_visible(): continue bbox = await candidate.bounding_box() if not self._is_square_bbox(bbox, min_side=min_side): continue area = bbox["width"] * bbox["height"] if area > best_area: best = candidate best_area = area except Exception: continue return best async def _find_qr_in_frames(self, page: Page, selectors: List[str], min_side: int): combined_selector = ", ".join(selectors) for frame in page.frames: if frame == page.main_frame: continue try: locator = frame.locator(combined_selector) candidate = await self._pick_best_candidate(locator, min_side=min_side) if candidate: return candidate except Exception: continue return None async def _scan_qr_candidates(self, page: Page, selectors: List[str], min_side: int): combined_selector = ", ".join(selectors) try: locator = page.locator(combined_selector) candidate = await self._pick_best_candidate(locator, min_side=min_side) if candidate: return candidate except Exception: pass return await self._find_qr_in_frames(page, selectors, min_side=min_side) async def _try_text_strategy_in_frames(self, page: Page): for frame in page.frames: if frame == page.main_frame: continue try: candidate = await self._try_text_strategy(frame) if candidate: return candidate except Exception: continue return None async def start_login(self) -> Dict[str, Any]: """ 启动登录流程 Returns: dict: 包含二维码base64和状态 """ if self.platform not in self.platform_configs: return {"success": False, "message": "不支持的平台"} config = self.platform_configs[self.platform] try: # 1. 启动 Playwright (不使用 async with,手动管理生命周期) self.playwright = await async_playwright().start() mode = self._resolve_headless_mode() headless = mode not in ("headful", "false", "0", "no") launch_args = [ '--disable-blink-features=AutomationControlled', '--no-sandbox', '--disable-dev-shm-usage' ] if headless and mode in ("new", "headless-new", "headless_new"): launch_args.append("--headless=new") # Stealth模式启动浏览器 launch_options: Dict[str, Any] = { "headless": headless, "args": launch_args, } if self.platform == "weixin": chrome_path = (settings.WEIXIN_CHROME_PATH or "").strip() if chrome_path: if Path(chrome_path).exists(): launch_options["executable_path"] = chrome_path else: logger.warning(f"[weixin] WEIXIN_CHROME_PATH not found: {chrome_path}") else: channel = (settings.WEIXIN_BROWSER_CHANNEL or "").strip() if channel: launch_options["channel"] = channel self.browser = await self.playwright.chromium.launch(**launch_options) # 配置真实浏览器特征 self.context = await self.browser.new_context( viewport={'width': 1920, 'height': 1080}, user_agent=settings.WEIXIN_USER_AGENT, locale=settings.WEIXIN_LOCALE, timezone_id=settings.WEIXIN_TIMEZONE_ID ) page = await self.context.new_page() # 注入stealth.js stealth_path = Path(__file__).parent / 'uploader' / 'stealth.min.js' if stealth_path.exists(): await page.add_init_script(path=str(stealth_path)) logger.debug(f"[{self.platform}] Stealth模式已启用") urls_to_try = [config["url"]] if self.platform == "weixin": urls_to_try = [ "https://channels.weixin.qq.com/platform/", "https://channels.weixin.qq.com/", ] qr_image = None for url in urls_to_try: logger.info(f"[{self.platform}] 打开登录页: {url}") wait_until = "domcontentloaded" if self.platform == "weixin" else "networkidle" await page.goto(url, wait_until=wait_until) # 等待页面加载 await asyncio.sleep(1 if self.platform == "weixin" else 2) # 提取二维码 (并行策略) qr_image = await self._extract_qr_code(page, config["qr_selectors"]) if qr_image: break if not qr_image: await self._cleanup() return {"success": False, "message": "未找到二维码"} logger.info(f"[{self.platform}] 二维码已获取,等待扫码...") # 启动后台监控任务 (浏览器保持开启) asyncio.create_task( self._monitor_login_status(page, config["success_indicator"]) ) return { "success": True, "qr_code": qr_image, "message": "请扫码登录" } except Exception as e: logger.exception(f"[{self.platform}] 启动登录失败: {e}") await self._cleanup() return {"success": False, "message": f"启动失败: {str(e)}"} async def _extract_qr_code(self, page: Page, selectors: List[str]) -> Optional[str]: """ 提取二维码图片 (优化策略顺序) 根据日志分析:抖音和B站使用 Text 策略成功率最高 """ qr_element = None # 针对抖音和B站:优先使用 Text 策略 (成功率最高,速度最快) if self.platform in ("douyin", "bilibili"): # 尝试最多2次 (首次 + 1次重试) for attempt in range(2): if attempt > 0: logger.info(f"[{self.platform}] 等待页面加载后重试...") await asyncio.sleep(2) # 策略1: Text (优先,成功率最高) qr_element = await self._try_text_strategy(page) if qr_element: try: screenshot = await qr_element.screenshot() return base64.b64encode(screenshot).decode() except Exception as e: logger.warning(f"[{self.platform}] Text策略截图失败: {e}") qr_element = None # 策略2: CSS (备用) if not qr_element: try: combined_selector = ", ".join(selectors) logger.debug(f"[{self.platform}] 策略2(CSS): 开始等待...") # 增加超时到5秒,抖音页面加载较慢 el = await page.wait_for_selector(combined_selector, state="visible", timeout=5000) if el: logger.info(f"[{self.platform}] 策略2(CSS): 匹配成功") screenshot = await el.screenshot() return base64.b64encode(screenshot).decode() except Exception as e: logger.warning(f"[{self.platform}] 策略2(CSS) 失败: {e}") # 如果已成功,退出循环 if qr_element: break else: # 其他平台 (小红书/微信等):保持原顺序 CSS -> Text # 策略1: CSS 选择器 try: combined_selector = ", ".join(selectors) logger.debug(f"[{self.platform}] 策略1(CSS): 开始等待...") if self.platform == "weixin": min_side = 120 start_time = time.monotonic() while time.monotonic() - start_time < 12: qr_element = await self._scan_qr_candidates(page, selectors, min_side=min_side) if qr_element: logger.info(f"[{self.platform}] 策略1(CSS): 匹配成功") break await asyncio.sleep(0.5) else: await page.wait_for_selector(combined_selector, state="visible", timeout=5000) locator = page.locator(combined_selector) qr_element = await self._pick_best_candidate(locator, min_side=100) if qr_element: logger.info(f"[{self.platform}] 策略1(CSS): 匹配成功") except Exception as e: logger.warning(f"[{self.platform}] 策略1(CSS) 失败: {e}") # 策略2: Text if not qr_element: qr_element = await self._try_text_strategy(page) if not qr_element and self.platform == "weixin": qr_element = await self._try_text_strategy_in_frames(page) # 如果找到元素,截图返回 if qr_element: try: screenshot = await qr_element.screenshot() return base64.b64encode(screenshot).decode() except Exception as e: logger.error(f"[{self.platform}] 截图失败: {e}") # 所有策略失败 logger.error(f"[{self.platform}] 所有QR码提取策略失败") # 保存调试截图 debug_dir = Path(__file__).parent.parent.parent / 'debug_screenshots' debug_dir.mkdir(exist_ok=True) await page.screenshot(path=str(debug_dir / f"{self.platform}_debug.png")) return None async def _try_text_strategy(self, page: Union[Page, Frame]) -> Optional[Any]: """基于文本查找二维码图片""" try: logger.debug(f"[{self.platform}] 策略Text: 开始搜索...") keywords = [ "扫码登录", "二维码", "打开抖音", "抖音APP", "使用APP扫码", "微信扫码", "请使用微信扫码", "视频号" ] for kw in keywords: try: text_el = page.get_by_text(kw, exact=False).first await text_el.wait_for(state="visible", timeout=2000) # 向上查找图片 parent = text_el for _ in range(5): parent = parent.locator("..") candidates = parent.locator("img, canvas") min_side = 120 if self.platform == "weixin" else 100 best = await self._pick_best_candidate(candidates, min_side=min_side) if best: logger.info(f"[{self.platform}] 策略Text: 成功") return best except Exception: continue except Exception as e: logger.warning(f"[{self.platform}] 策略Text 失败: {e}") return None async def _monitor_login_status(self, page: Page, success_url: str): """监控登录状态""" try: logger.info(f"[{self.platform}] 开始监控登录状态...") key_cookies = { "bilibili": ["SESSDATA"], "douyin": ["sessionid"], "xiaohongshu": ["web_session"], "weixin": [ "wxuin", "wxsid", "pass_ticket", "webwx_data_ticket", "uin", "skey", "p_uin", "p_skey", "pac_uid", ], } target_cookies = key_cookies.get(self.platform, []) for i in range(self.LOGIN_TIMEOUT): await asyncio.sleep(1) try: if not self.context: break # 避免意外关闭 cookies = [dict(cookie) for cookie in await self.context.cookies()] current_url = page.url has_cookie = any((c.get('name') in target_cookies) for c in cookies) if target_cookies else False if i % 5 == 0: logger.debug(f"[{self.platform}] 等待登录... HasCookie: {has_cookie}") if success_url in current_url or has_cookie: logger.success(f"[{self.platform}] 登录成功!") self.login_success = True await asyncio.sleep(2) # 缓冲 # 保存Cookie final_cookies = [dict(cookie) for cookie in await self.context.cookies()] await self._save_cookies(final_cookies) break except Exception as e: logger.warning(f"[{self.platform}] 监控循环警告: {e}") break if not self.login_success: logger.warning(f"[{self.platform}] 登录超时") except Exception as e: logger.error(f"[{self.platform}] 监控异常: {e}") finally: await self._cleanup() async def _cleanup(self) -> None: """清理资源""" if self.context: try: await self.context.close() except Exception: pass self.context = None if self.browser: try: await self.browser.close() except Exception: pass self.browser = None if self.playwright: try: await self.playwright.stop() except Exception: pass self.playwright = None async def _save_cookies(self, cookies: Sequence[Mapping[str, Any]]) -> None: """保存Cookie到文件""" try: cookie_file = self.cookies_dir / f"{self.platform}_cookies.json" if self.platform == "bilibili": # Bilibili 使用简单格式 (biliup库需要) cookie_dict = {c.get('name'): c.get('value') for c in cookies if c.get('name')} required = ['SESSDATA', 'bili_jct', 'DedeUserID', 'DedeUserID__ckMd5'] cookie_dict = {k: v for k, v in cookie_dict.items() if k in required} with open(cookie_file, 'w', encoding='utf-8') as f: json.dump(cookie_dict, f, indent=2) self.cookies_data = cookie_dict else: # Douyin/Xiaohongshu 使用 Playwright storage_state 完整格式 # 这样可以直接用 browser.new_context(storage_state=file) storage_state = { "cookies": cookies, "origins": [] } with open(cookie_file, 'w', encoding='utf-8') as f: json.dump(storage_state, f, indent=2) self.cookies_data = storage_state logger.success(f"[{self.platform}] Cookie已保存") except Exception as e: logger.error(f"[{self.platform}] 保存Cookie失败: {e}") def get_login_status(self) -> Dict[str, Any]: """获取登录状态""" return { "success": self.login_success, "cookies_saved": self.cookies_data is not None }