586 lines
23 KiB
Python
586 lines
23 KiB
Python
"""
|
|
Douyin (抖音) uploader using Playwright
|
|
Based on social-auto-upload implementation
|
|
"""
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
from typing import Optional, List, Dict, Any
|
|
import asyncio
|
|
import time
|
|
|
|
from playwright.async_api import Playwright, async_playwright
|
|
from loguru import logger
|
|
|
|
from .base_uploader import BaseUploader
|
|
from .cookie_utils import set_init_script
|
|
|
|
|
|
class DouyinUploader(BaseUploader):
|
|
"""Douyin video uploader using Playwright"""
|
|
|
|
# 超时配置 (秒)
|
|
UPLOAD_TIMEOUT = 300 # 视频上传超时
|
|
PUBLISH_TIMEOUT = 180 # 发布检测超时
|
|
PAGE_REDIRECT_TIMEOUT = 60 # 页面跳转超时
|
|
POLL_INTERVAL = 2 # 轮询间隔
|
|
MAX_CLICK_RETRIES = 3 # 按钮点击重试次数
|
|
|
|
def __init__(
|
|
self,
|
|
title: str,
|
|
file_path: str,
|
|
tags: List[str],
|
|
publish_date: Optional[datetime] = None,
|
|
account_file: Optional[str] = None,
|
|
description: str = ""
|
|
):
|
|
super().__init__(title, file_path, tags, publish_date, account_file, description)
|
|
self.upload_url = "https://creator.douyin.com/creator-micro/content/upload"
|
|
|
|
async def _is_text_visible(self, page, text: str, exact: bool = False) -> bool:
|
|
try:
|
|
return await page.get_by_text(text, exact=exact).first.is_visible()
|
|
except Exception:
|
|
return False
|
|
|
|
async def _first_visible_locator(self, locator, timeout: int = 1000):
|
|
try:
|
|
if await locator.count() == 0:
|
|
return None
|
|
candidate = locator.first
|
|
if await candidate.is_visible(timeout=timeout):
|
|
return candidate
|
|
except Exception:
|
|
return None
|
|
return None
|
|
|
|
async def _wait_for_publish_result(self, page, max_wait_time: int = 180):
|
|
success_texts = ["发布成功", "作品已发布", "再发一条", "查看作品", "审核中", "待审核"]
|
|
weak_texts = ["发布完成"]
|
|
failure_texts = ["发布失败", "发布异常", "发布出错", "请完善", "请补充", "请先上传"]
|
|
start_time = time.time()
|
|
poll_interval = 2
|
|
weak_reason = None
|
|
|
|
while time.time() - start_time < max_wait_time:
|
|
if page.is_closed():
|
|
return False, "页面已关闭", False
|
|
|
|
current_url = page.url
|
|
if "content/manage" in current_url:
|
|
return True, f"已跳转到管理页面 (URL: {current_url})", False
|
|
|
|
for text in success_texts:
|
|
if await self._is_text_visible(page, text, exact=False):
|
|
return True, f"检测到成功提示: {text}", False
|
|
|
|
for text in failure_texts:
|
|
if await self._is_text_visible(page, text, exact=False):
|
|
return False, f"检测到失败提示: {text}", False
|
|
|
|
for text in weak_texts:
|
|
if await self._is_text_visible(page, text, exact=False):
|
|
weak_reason = text
|
|
|
|
logger.info("[抖音] 视频正在发布中...")
|
|
await asyncio.sleep(poll_interval)
|
|
|
|
if weak_reason:
|
|
return False, f"检测到提示: {weak_reason}", True
|
|
|
|
return False, "发布检测超时", True
|
|
|
|
async def _fill_title(self, page, title: str) -> bool:
|
|
title_text = title[:30]
|
|
locator_candidates = []
|
|
|
|
try:
|
|
label_locator = page.get_by_text("作品描述").locator("..").locator("..").locator(
|
|
"xpath=following-sibling::div[1]"
|
|
).locator("textarea, input, div[contenteditable='true']")
|
|
locator_candidates.append(label_locator)
|
|
except Exception:
|
|
pass
|
|
|
|
locator_candidates.extend([
|
|
page.locator("textarea[placeholder*='作品描述']"),
|
|
page.locator("textarea[placeholder*='描述']"),
|
|
page.locator("input[placeholder*='作品描述']"),
|
|
page.locator("input[placeholder*='描述']"),
|
|
page.locator("div[contenteditable='true']"),
|
|
])
|
|
|
|
for locator in locator_candidates:
|
|
try:
|
|
if await locator.count() > 0:
|
|
target = locator.first
|
|
await target.fill(title_text)
|
|
return True
|
|
except Exception:
|
|
continue
|
|
|
|
return False
|
|
|
|
async def _select_cover_if_needed(self, page) -> bool:
|
|
try:
|
|
cover_button = page.get_by_text("选择封面", exact=False).first
|
|
if await cover_button.is_visible():
|
|
await cover_button.click()
|
|
logger.info("[抖音] 尝试选择封面")
|
|
await asyncio.sleep(0.5)
|
|
|
|
dialog = page.locator(
|
|
"div.dy-creator-content-modal-wrap, div[role='dialog'], "
|
|
"div[class*='modal'], div[class*='dialog']"
|
|
).last
|
|
scopes = [dialog] if await dialog.count() > 0 else [page]
|
|
|
|
switched = False
|
|
for scope in scopes:
|
|
for selector in [
|
|
"button:has-text('设置横封面')",
|
|
"div:has-text('设置横封面')",
|
|
"span:has-text('设置横封面')",
|
|
]:
|
|
try:
|
|
button = await self._first_visible_locator(scope.locator(selector))
|
|
if button:
|
|
await button.click()
|
|
logger.info("[抖音] 已切换到横封面设置")
|
|
await asyncio.sleep(0.5)
|
|
switched = True
|
|
break
|
|
except Exception:
|
|
continue
|
|
if switched:
|
|
break
|
|
|
|
selected = False
|
|
for scope in scopes:
|
|
for selector in [
|
|
"div[class*='cover'] img",
|
|
"div[class*='cover']",
|
|
"div[class*='frame'] img",
|
|
"div[class*='frame']",
|
|
"div[class*='preset']",
|
|
"img",
|
|
]:
|
|
try:
|
|
candidate = await self._first_visible_locator(scope.locator(selector))
|
|
if candidate:
|
|
await candidate.click()
|
|
logger.info("[抖音] 已选择封面帧")
|
|
selected = True
|
|
break
|
|
except Exception:
|
|
continue
|
|
if selected:
|
|
break
|
|
|
|
confirm_selectors = [
|
|
"button:has-text('完成')",
|
|
"button:has-text('确定')",
|
|
"button:has-text('保存')",
|
|
"button:has-text('确认')",
|
|
]
|
|
for selector in confirm_selectors:
|
|
try:
|
|
button = await self._first_visible_locator(page.locator(selector))
|
|
if button:
|
|
if not await button.is_enabled():
|
|
for _ in range(8):
|
|
if await button.is_enabled():
|
|
break
|
|
await asyncio.sleep(0.5)
|
|
await button.click()
|
|
logger.info(f"[抖音] 封面已确认: {selector}")
|
|
await asyncio.sleep(0.5)
|
|
if await dialog.count() > 0:
|
|
try:
|
|
await dialog.wait_for(state="hidden", timeout=5000)
|
|
except Exception:
|
|
pass
|
|
return True
|
|
except Exception:
|
|
continue
|
|
|
|
return selected
|
|
except Exception as e:
|
|
logger.warning(f"[抖音] 选择封面失败: {e}")
|
|
return False
|
|
|
|
async def _click_publish_confirm_modal(self, page):
|
|
confirm_selectors = [
|
|
"button:has-text('确认发布')",
|
|
"button:has-text('继续发布')",
|
|
"button:has-text('确定发布')",
|
|
"button:has-text('发布确认')",
|
|
]
|
|
for selector in confirm_selectors:
|
|
try:
|
|
button = page.locator(selector).first
|
|
if await button.is_visible():
|
|
await button.click()
|
|
logger.info(f"[抖音] 点击了发布确认按钮: {selector}")
|
|
await asyncio.sleep(1)
|
|
return True
|
|
except Exception:
|
|
continue
|
|
return False
|
|
|
|
async def _dismiss_blocking_modal(self, page) -> bool:
|
|
modal_locator = page.locator(
|
|
"div.dy-creator-content-modal-wrap, div[role='dialog'], "
|
|
"div[class*='modal'], div[class*='dialog']"
|
|
)
|
|
try:
|
|
count = await modal_locator.count()
|
|
except Exception:
|
|
return False
|
|
|
|
if count == 0:
|
|
return False
|
|
|
|
button_texts = [
|
|
"我知道了",
|
|
"知道了",
|
|
"确定",
|
|
"继续",
|
|
"继续发布",
|
|
"确认",
|
|
"同意并继续",
|
|
"完成",
|
|
"好的",
|
|
"明白了",
|
|
]
|
|
close_selectors = [
|
|
"button[class*='close']",
|
|
"span[class*='close']",
|
|
"i[class*='close']",
|
|
]
|
|
|
|
for index in range(count):
|
|
modal = modal_locator.nth(index)
|
|
try:
|
|
if not await modal.is_visible():
|
|
continue
|
|
|
|
for text in button_texts:
|
|
try:
|
|
button = modal.get_by_role("button", name=text).first
|
|
if await button.is_visible():
|
|
await button.click()
|
|
logger.info(f"[抖音] 关闭弹窗: {text}")
|
|
await asyncio.sleep(0.5)
|
|
return True
|
|
except Exception:
|
|
continue
|
|
|
|
for selector in close_selectors:
|
|
try:
|
|
close_button = modal.locator(selector).first
|
|
if await close_button.is_visible():
|
|
await close_button.click()
|
|
logger.info("[抖音] 关闭弹窗: close")
|
|
await asyncio.sleep(0.5)
|
|
return True
|
|
except Exception:
|
|
continue
|
|
except Exception:
|
|
continue
|
|
|
|
return False
|
|
|
|
async def _verify_publish_in_manage(self, page):
|
|
manage_url = "https://creator.douyin.com/creator-micro/content/manage"
|
|
try:
|
|
await page.goto(manage_url)
|
|
await page.wait_for_load_state("domcontentloaded")
|
|
await asyncio.sleep(2)
|
|
title_text = self.title[:30]
|
|
title_locator = page.get_by_text(title_text, exact=False).first
|
|
if await title_locator.is_visible():
|
|
return True, "内容管理中检测到新作品"
|
|
if await self._is_text_visible(page, "审核中", exact=False):
|
|
return True, "内容管理显示审核中"
|
|
except Exception as e:
|
|
return False, f"无法验证内容管理: {e}"
|
|
return False, "内容管理中未找到视频"
|
|
|
|
async def set_schedule_time(self, page, publish_date):
|
|
"""Set scheduled publish time"""
|
|
try:
|
|
# Click "定时发布" radio button
|
|
label_element = page.locator("[class^='radio']:has-text('定时发布')")
|
|
await label_element.click()
|
|
await asyncio.sleep(1)
|
|
|
|
# Format time
|
|
publish_date_hour = publish_date.strftime("%Y-%m-%d %H:%M")
|
|
|
|
# Fill datetime input
|
|
await page.locator('.semi-input[placeholder="日期和时间"]').click()
|
|
await page.keyboard.press("Control+KeyA")
|
|
await page.keyboard.type(str(publish_date_hour))
|
|
await page.keyboard.press("Enter")
|
|
|
|
await asyncio.sleep(1)
|
|
logger.info(f"[抖音] 已设置定时发布: {publish_date_hour}")
|
|
|
|
except Exception as e:
|
|
logger.error(f"[抖音] 设置定时发布失败: {e}")
|
|
|
|
async def upload(self, playwright: Playwright) -> dict:
|
|
"""Main upload logic with guaranteed resource cleanup"""
|
|
browser = None
|
|
context = None
|
|
try:
|
|
# Launch browser in headless mode for server deployment
|
|
browser = await playwright.chromium.launch(headless=True)
|
|
context = await browser.new_context(storage_state=self.account_file)
|
|
context = await set_init_script(context)
|
|
|
|
page = await context.new_page()
|
|
|
|
# Go to upload page
|
|
await page.goto(self.upload_url)
|
|
await page.wait_for_load_state('domcontentloaded')
|
|
await asyncio.sleep(2)
|
|
|
|
logger.info(f"[抖音] 正在上传: {self.file_path.name}")
|
|
|
|
# Check if redirected to login page (more reliable than text detection)
|
|
current_url = page.url
|
|
if "login" in current_url or "passport" in current_url:
|
|
logger.error("[抖音] Cookie 已失效,被重定向到登录页")
|
|
return {
|
|
"success": False,
|
|
"message": "Cookie 已失效,请重新登录",
|
|
"url": None
|
|
}
|
|
|
|
# Ensure we're on the upload page
|
|
if "content/upload" not in page.url:
|
|
logger.info("[抖音] 当前不在上传页面,强制跳转...")
|
|
await page.goto(self.upload_url)
|
|
await asyncio.sleep(2)
|
|
|
|
# Try multiple selectors for the file input (page structure varies)
|
|
file_uploaded = False
|
|
selectors = [
|
|
"div[class^='container'] input", # Primary selector from SuperIPAgent
|
|
"input[type='file']", # Fallback selector
|
|
"div[class^='upload'] input[type='file']", # Alternative
|
|
]
|
|
|
|
for selector in selectors:
|
|
try:
|
|
logger.info(f"[抖音] 尝试选择器: {selector}")
|
|
locator = page.locator(selector).first
|
|
if await locator.count() > 0:
|
|
await locator.set_input_files(str(self.file_path))
|
|
file_uploaded = True
|
|
logger.info(f"[抖音] 文件上传成功使用选择器: {selector}")
|
|
break
|
|
except Exception as e:
|
|
logger.warning(f"[抖音] 选择器 {selector} 失败: {e}")
|
|
continue
|
|
|
|
if not file_uploaded:
|
|
logger.error("[抖音] 所有选择器都失败,无法上传文件")
|
|
return {
|
|
"success": False,
|
|
"message": "无法找到上传按钮,页面可能已更新",
|
|
"url": None
|
|
}
|
|
|
|
# Wait for redirect to publish page (with timeout)
|
|
redirect_start = time.time()
|
|
while time.time() - redirect_start < self.PAGE_REDIRECT_TIMEOUT:
|
|
current_url = page.url
|
|
if "content/publish" in current_url or "content/post/video" in current_url:
|
|
logger.info("[抖音] 成功进入发布页面")
|
|
break
|
|
await asyncio.sleep(0.5)
|
|
else:
|
|
logger.error("[抖音] 等待发布页面超时")
|
|
return {
|
|
"success": False,
|
|
"message": "等待发布页面超时",
|
|
"url": None
|
|
}
|
|
|
|
# Fill title
|
|
await asyncio.sleep(1)
|
|
logger.info("[抖音] 正在填充标题和话题...")
|
|
|
|
if not await self._fill_title(page, self.title):
|
|
logger.warning("[抖音] 未找到作品描述输入框")
|
|
|
|
# Add tags
|
|
css_selector = ".zone-container"
|
|
for tag in self.tags:
|
|
await page.type(css_selector, "#" + tag)
|
|
await page.press(css_selector, "Space")
|
|
|
|
logger.info(f"[抖音] 总共添加 {len(self.tags)} 个话题")
|
|
|
|
cover_selected = await self._select_cover_if_needed(page)
|
|
if not cover_selected:
|
|
logger.warning("[抖音] 未确认封面选择,可能影响发布")
|
|
|
|
# Wait for upload to complete (with timeout)
|
|
upload_start = time.time()
|
|
while time.time() - upload_start < self.UPLOAD_TIMEOUT:
|
|
try:
|
|
number = await page.locator('[class^="long-card"] div:has-text("重新上传")').count()
|
|
if number > 0:
|
|
logger.success("[抖音] 视频上传完毕")
|
|
break
|
|
else:
|
|
logger.info("[抖音] 正在上传视频中...")
|
|
await asyncio.sleep(self.POLL_INTERVAL)
|
|
except Exception:
|
|
await asyncio.sleep(self.POLL_INTERVAL)
|
|
else:
|
|
logger.error("[抖音] 视频上传超时")
|
|
return {
|
|
"success": False,
|
|
"message": "视频上传超时",
|
|
"url": None
|
|
}
|
|
|
|
# Set scheduled publish time if needed
|
|
if self.publish_date != 0:
|
|
await self.set_schedule_time(page, self.publish_date)
|
|
|
|
# Click publish button
|
|
# 使用更稳健的点击逻辑
|
|
try:
|
|
publish_label = "定时发布" if self.publish_date != 0 else "发布"
|
|
publish_button = page.get_by_role('button', name=publish_label, exact=True)
|
|
# 等待按钮出现
|
|
await publish_button.wait_for(state="visible", timeout=10000)
|
|
if not await publish_button.is_enabled():
|
|
logger.error("[抖音] 发布按钮不可点击,可能需要补充封面或确认信息")
|
|
return {
|
|
"success": False,
|
|
"message": "发布按钮不可点击,请检查封面/声明等必填项",
|
|
"url": None
|
|
}
|
|
await asyncio.sleep(1) # 额外等待以确保可交互
|
|
|
|
clicked = False
|
|
for attempt in range(self.MAX_CLICK_RETRIES):
|
|
await self._dismiss_blocking_modal(page)
|
|
try:
|
|
await publish_button.click(timeout=5000)
|
|
logger.info(f"[抖音] 点击了{publish_label}按钮")
|
|
clicked = True
|
|
break
|
|
except Exception as click_error:
|
|
logger.warning(f"[抖音] 点击发布按钮失败,重试 {attempt + 1}/{self.MAX_CLICK_RETRIES}: {click_error}")
|
|
try:
|
|
await page.keyboard.press("Escape")
|
|
except Exception:
|
|
pass
|
|
await asyncio.sleep(1)
|
|
|
|
if not clicked:
|
|
raise RuntimeError("点击发布按钮失败")
|
|
except Exception as e:
|
|
logger.error(f"[抖音] 点击发布按钮失败: {e}")
|
|
# 尝试备用选择器
|
|
try:
|
|
fallback_selectors = ["button:has-text('发布')", "button:has-text('定时发布')"]
|
|
clicked = False
|
|
for selector in fallback_selectors:
|
|
try:
|
|
await page.click(selector, timeout=5000)
|
|
logger.info(f"[抖音] 使用备用选择器点击了按钮: {selector}")
|
|
clicked = True
|
|
break
|
|
except Exception:
|
|
continue
|
|
|
|
if not clicked:
|
|
return {
|
|
"success": False,
|
|
"message": "无法点击发布按钮,请检查页面状态",
|
|
"url": None
|
|
}
|
|
except Exception:
|
|
return {
|
|
"success": False,
|
|
"message": "无法点击发布按钮,请检查页面状态",
|
|
"url": None
|
|
}
|
|
|
|
await self._click_publish_confirm_modal(page)
|
|
|
|
# 4. 检测发布完成
|
|
publish_success, publish_reason, is_timeout = await self._wait_for_publish_result(page)
|
|
if not publish_success and is_timeout:
|
|
verify_success, verify_reason = await self._verify_publish_in_manage(page)
|
|
if verify_success:
|
|
publish_success = True
|
|
publish_reason = verify_reason
|
|
else:
|
|
publish_reason = f"{publish_reason}; {verify_reason}"
|
|
if publish_success:
|
|
logger.success(f"[抖音] 发布成功: {publish_reason}")
|
|
else:
|
|
if is_timeout:
|
|
logger.warning("[抖音] 发布检测超时,但这不一定代表失败")
|
|
else:
|
|
logger.warning(f"[抖音] 发布未成功: {publish_reason}")
|
|
|
|
# Save updated cookies
|
|
await context.storage_state(path=self.account_file)
|
|
logger.success("[抖音] Cookie 更新完毕")
|
|
|
|
await asyncio.sleep(2)
|
|
|
|
if publish_success:
|
|
return {
|
|
"success": True,
|
|
"message": "发布成功,待审核",
|
|
"url": None
|
|
}
|
|
if is_timeout:
|
|
return {
|
|
"success": True,
|
|
"message": "发布检测超时,请到抖音后台确认",
|
|
"url": None
|
|
}
|
|
return {
|
|
"success": False,
|
|
"message": f"发布失败: {publish_reason}",
|
|
"url": None
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.exception(f"[抖音] 上传失败: {e}")
|
|
return {
|
|
"success": False,
|
|
"message": f"上传失败: {str(e)}",
|
|
"url": None
|
|
}
|
|
finally:
|
|
# 确保资源释放
|
|
if context:
|
|
try:
|
|
await context.close()
|
|
except Exception:
|
|
pass
|
|
if browser:
|
|
try:
|
|
await browser.close()
|
|
except Exception:
|
|
pass
|
|
|
|
async def main(self) -> Dict[str, Any]:
|
|
"""Execute upload"""
|
|
async with async_playwright() as playwright:
|
|
return await self.upload(playwright)
|