327 lines
14 KiB
Python
327 lines
14 KiB
Python
"""
|
||
QR码自动登录服务
|
||
后端Playwright无头模式获取二维码,前端扫码后自动保存Cookie
|
||
"""
|
||
import asyncio
|
||
import base64
|
||
import json
|
||
from pathlib import Path
|
||
from typing import Optional, Dict, Any, List
|
||
from playwright.async_api import async_playwright, Page, BrowserContext, Browser, Playwright as PW
|
||
from loguru import logger
|
||
|
||
|
||
class QRLoginService:
|
||
"""QR码登录服务"""
|
||
|
||
# 登录监控超时 (秒)
|
||
LOGIN_TIMEOUT = 120
|
||
|
||
def __init__(self, platform: str, cookies_dir: Path) -> None:
|
||
self.platform = platform
|
||
self.cookies_dir = cookies_dir
|
||
self.qr_code_image: Optional[str] = None
|
||
self.login_success: bool = False
|
||
self.cookies_data: Optional[Dict[str, Any]] = None
|
||
|
||
# Playwright 资源 (手动管理生命周期)
|
||
self.playwright: Optional[PW] = None
|
||
self.browser: Optional[Browser] = None
|
||
self.context: Optional[BrowserContext] = None
|
||
|
||
# 每个平台使用多个选择器 (使用逗号分隔,Playwright会同时等待它们)
|
||
self.platform_configs = {
|
||
"bilibili": {
|
||
"url": "https://passport.bilibili.com/login",
|
||
"qr_selectors": [
|
||
"div[class*='qrcode'] canvas", # 常见canvas二维码
|
||
"div[class*='qrcode'] img", # 常见图片二维码
|
||
".qrcode-img img", # 旧版
|
||
".login-scan-box img", # 扫码框
|
||
"div[class*='scan'] img"
|
||
],
|
||
"success_indicator": "https://www.bilibili.com/"
|
||
},
|
||
"douyin": {
|
||
"url": "https://creator.douyin.com/",
|
||
"qr_selectors": [
|
||
".qrcode img", # 优先尝试
|
||
"img[alt='qrcode']",
|
||
"canvas[class*='qr']",
|
||
"img[src*='qr']"
|
||
],
|
||
"success_indicator": "https://creator.douyin.com/creator-micro"
|
||
},
|
||
"xiaohongshu": {
|
||
"url": "https://creator.xiaohongshu.com/",
|
||
"qr_selectors": [
|
||
".qrcode img",
|
||
"img[alt*='二维码']",
|
||
"canvas.qr-code",
|
||
"img[class*='qr']"
|
||
],
|
||
"success_indicator": "https://creator.xiaohongshu.com/publish"
|
||
}
|
||
}
|
||
|
||
async def start_login(self) -> Dict[str, Any]:
|
||
"""
|
||
启动登录流程
|
||
|
||
Returns:
|
||
dict: 包含二维码base64和状态
|
||
"""
|
||
if self.platform not in self.platform_configs:
|
||
return {"success": False, "message": "不支持的平台"}
|
||
|
||
config = self.platform_configs[self.platform]
|
||
|
||
try:
|
||
# 1. 启动 Playwright (不使用 async with,手动管理生命周期)
|
||
self.playwright = await async_playwright().start()
|
||
|
||
# Stealth模式启动浏览器
|
||
self.browser = await self.playwright.chromium.launch(
|
||
headless=True,
|
||
args=[
|
||
'--disable-blink-features=AutomationControlled',
|
||
'--no-sandbox',
|
||
'--disable-dev-shm-usage'
|
||
]
|
||
)
|
||
|
||
# 配置真实浏览器特征
|
||
self.context = await self.browser.new_context(
|
||
viewport={'width': 1920, 'height': 1080},
|
||
user_agent='Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
|
||
locale='zh-CN',
|
||
timezone_id='Asia/Shanghai'
|
||
)
|
||
|
||
page = await self.context.new_page()
|
||
|
||
# 注入stealth.js
|
||
stealth_path = Path(__file__).parent / 'uploader' / 'stealth.min.js'
|
||
if stealth_path.exists():
|
||
await page.add_init_script(path=str(stealth_path))
|
||
logger.debug(f"[{self.platform}] Stealth模式已启用")
|
||
|
||
logger.info(f"[{self.platform}] 打开登录页...")
|
||
await page.goto(config["url"], wait_until='networkidle')
|
||
|
||
# 等待页面加载 (缩短等待)
|
||
await asyncio.sleep(2)
|
||
|
||
# 提取二维码 (并行策略)
|
||
qr_image = await self._extract_qr_code(page, config["qr_selectors"])
|
||
|
||
if not qr_image:
|
||
await self._cleanup()
|
||
return {"success": False, "message": "未找到二维码"}
|
||
|
||
logger.info(f"[{self.platform}] 二维码已获取,等待扫码...")
|
||
|
||
# 启动后台监控任务 (浏览器保持开启)
|
||
asyncio.create_task(
|
||
self._monitor_login_status(page, config["success_indicator"])
|
||
)
|
||
|
||
return {
|
||
"success": True,
|
||
"qr_code": qr_image,
|
||
"message": "请扫码登录"
|
||
}
|
||
|
||
except Exception as e:
|
||
logger.exception(f"[{self.platform}] 启动登录失败: {e}")
|
||
await self._cleanup()
|
||
return {"success": False, "message": f"启动失败: {str(e)}"}
|
||
|
||
async def _extract_qr_code(self, page: Page, selectors: List[str]) -> Optional[str]:
|
||
"""
|
||
提取二维码图片 (借鉴 SuperIPAgent 的方式)
|
||
"""
|
||
qr_element = None
|
||
|
||
# 策略1: 使用 get_by_role (最可靠, SuperIPAgent 使用此方法)
|
||
if self.platform == "douyin":
|
||
try:
|
||
logger.debug(f"[{self.platform}] 策略1(Role): 尝试 get_by_role('img', name='二维码')...")
|
||
img = page.get_by_role("img", name="二维码")
|
||
await img.wait_for(state="visible", timeout=10000)
|
||
if await img.count() > 0:
|
||
# 获取 src 属性,如果是 data:image 则直接用,否则截图
|
||
src = await img.get_attribute("src")
|
||
if src and src.startswith("data:image"):
|
||
logger.info(f"[{self.platform}] 策略1(Role): 获取到 data URI")
|
||
# 提取 base64 部分
|
||
return src.split(",")[1] if "," in src else src
|
||
else:
|
||
logger.info(f"[{self.platform}] 策略1(Role): 截图获取")
|
||
screenshot = await img.screenshot()
|
||
return base64.b64encode(screenshot).decode()
|
||
except Exception as e:
|
||
logger.warning(f"[{self.platform}] 策略1(Role) 失败: {e}")
|
||
|
||
# 策略2: CSS 选择器
|
||
try:
|
||
combined_selector = ", ".join(selectors)
|
||
logger.debug(f"[{self.platform}] 策略2(CSS): 开始等待...")
|
||
el = await page.wait_for_selector(combined_selector, state="visible", timeout=8000)
|
||
if el:
|
||
logger.info(f"[{self.platform}] 策略2(CSS): 匹配成功")
|
||
qr_element = el
|
||
except Exception as e:
|
||
logger.warning(f"[{self.platform}] 策略2(CSS) 失败: {e}")
|
||
|
||
# 策略3: 基于文本查找附近图片
|
||
if not qr_element:
|
||
try:
|
||
logger.debug(f"[{self.platform}] 策略3(Text): 开始搜索...")
|
||
keywords = ["扫码登录", "二维码", "打开抖音", "抖音APP"]
|
||
|
||
for kw in keywords:
|
||
try:
|
||
text_el = page.get_by_text(kw, exact=False).first
|
||
await text_el.wait_for(state="visible", timeout=2000)
|
||
|
||
# 向上查找图片
|
||
parent = text_el
|
||
for _ in range(5):
|
||
parent = parent.locator("..")
|
||
imgs = parent.locator("img")
|
||
|
||
for i in range(await imgs.count()):
|
||
img = imgs.nth(i)
|
||
if await img.is_visible():
|
||
bbox = await img.bounding_box()
|
||
if bbox and bbox['width'] > 100:
|
||
logger.info(f"[{self.platform}] 策略3(Text): 成功")
|
||
qr_element = img
|
||
break
|
||
if qr_element:
|
||
break
|
||
if qr_element:
|
||
break
|
||
except Exception:
|
||
continue
|
||
except Exception as e:
|
||
logger.warning(f"[{self.platform}] 策略3(Text) 失败: {e}")
|
||
|
||
# 如果找到元素,截图返回
|
||
if qr_element:
|
||
try:
|
||
screenshot = await qr_element.screenshot()
|
||
return base64.b64encode(screenshot).decode()
|
||
except Exception as e:
|
||
logger.error(f"[{self.platform}] 截图失败: {e}")
|
||
|
||
# 所有策略失败 - 不使用全页截图,直接返回 None
|
||
logger.error(f"[{self.platform}] 所有QR码提取策略失败")
|
||
|
||
# 保存调试截图
|
||
debug_dir = Path(__file__).parent.parent.parent / 'debug_screenshots'
|
||
debug_dir.mkdir(exist_ok=True)
|
||
await page.screenshot(path=str(debug_dir / f"{self.platform}_debug.png"))
|
||
|
||
return None # 不再回退到全页截图
|
||
|
||
async def _monitor_login_status(self, page: Page, success_url: str):
|
||
"""监控登录状态"""
|
||
try:
|
||
logger.info(f"[{self.platform}] 开始监控登录状态...")
|
||
key_cookies = {"bilibili": "SESSDATA", "douyin": "sessionid", "xiaohongshu": "web_session"}
|
||
target_cookie = key_cookies.get(self.platform, "")
|
||
|
||
for i in range(self.LOGIN_TIMEOUT):
|
||
await asyncio.sleep(1)
|
||
|
||
try:
|
||
if not self.context: break # 避免意外关闭
|
||
|
||
cookies = await self.context.cookies()
|
||
current_url = page.url
|
||
has_cookie = any(c['name'] == target_cookie for c in cookies)
|
||
|
||
if i % 5 == 0:
|
||
logger.debug(f"[{self.platform}] 等待登录... HasCookie: {has_cookie}")
|
||
|
||
if success_url in current_url or has_cookie:
|
||
logger.success(f"[{self.platform}] 登录成功!")
|
||
self.login_success = True
|
||
await asyncio.sleep(2) # 缓冲
|
||
|
||
# 保存Cookie
|
||
final_cookies = await self.context.cookies()
|
||
await self._save_cookies(final_cookies)
|
||
break
|
||
|
||
except Exception as e:
|
||
logger.warning(f"[{self.platform}] 监控循环警告: {e}")
|
||
break
|
||
|
||
if not self.login_success:
|
||
logger.warning(f"[{self.platform}] 登录超时")
|
||
|
||
except Exception as e:
|
||
logger.error(f"[{self.platform}] 监控异常: {e}")
|
||
finally:
|
||
await self._cleanup()
|
||
|
||
async def _cleanup(self) -> None:
|
||
"""清理资源"""
|
||
if self.context:
|
||
try:
|
||
await self.context.close()
|
||
except Exception:
|
||
pass
|
||
self.context = None
|
||
if self.browser:
|
||
try:
|
||
await self.browser.close()
|
||
except Exception:
|
||
pass
|
||
self.browser = None
|
||
if self.playwright:
|
||
try:
|
||
await self.playwright.stop()
|
||
except Exception:
|
||
pass
|
||
self.playwright = None
|
||
|
||
async def _save_cookies(self, cookies: List[Dict[str, Any]]) -> None:
|
||
"""保存Cookie到文件"""
|
||
try:
|
||
cookie_file = self.cookies_dir / f"{self.platform}_cookies.json"
|
||
|
||
if self.platform == "bilibili":
|
||
# Bilibili 使用简单格式 (biliup库需要)
|
||
cookie_dict = {c['name']: c['value'] for c in cookies}
|
||
required = ['SESSDATA', 'bili_jct', 'DedeUserID', 'DedeUserID__ckMd5']
|
||
cookie_dict = {k: v for k, v in cookie_dict.items() if k in required}
|
||
|
||
with open(cookie_file, 'w', encoding='utf-8') as f:
|
||
json.dump(cookie_dict, f, indent=2)
|
||
self.cookies_data = cookie_dict
|
||
else:
|
||
# Douyin/Xiaohongshu 使用 Playwright storage_state 完整格式
|
||
# 这样可以直接用 browser.new_context(storage_state=file)
|
||
storage_state = {
|
||
"cookies": cookies,
|
||
"origins": []
|
||
}
|
||
with open(cookie_file, 'w', encoding='utf-8') as f:
|
||
json.dump(storage_state, f, indent=2)
|
||
self.cookies_data = storage_state
|
||
|
||
logger.success(f"[{self.platform}] Cookie已保存")
|
||
except Exception as e:
|
||
logger.error(f"[{self.platform}] 保存Cookie失败: {e}")
|
||
|
||
def get_login_status(self) -> Dict[str, Any]:
|
||
"""获取登录状态"""
|
||
return {
|
||
"success": self.login_success,
|
||
"cookies_saved": self.cookies_data is not None
|
||
}
|