Files
ViGent2/backend/app/services/uploader/douyin_uploader.py
Kevin Wong 945262a7fc 更新
2026-02-06 16:02:58 +08:00

1542 lines
63 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Douyin (抖音) uploader using Playwright
Based on social-auto-upload implementation
"""
from datetime import datetime
from pathlib import Path
from typing import Optional, List, Dict, Any
import asyncio
import os
import re
import shutil
import time
from playwright.async_api import Playwright, async_playwright, TimeoutError as PlaywrightTimeoutError
from loguru import logger
from .base_uploader import BaseUploader
from .cookie_utils import set_init_script
from app.core.config import settings
class DouyinUploader(BaseUploader):
"""Douyin video uploader using Playwright"""
# 超时配置 (秒)
UPLOAD_TIMEOUT = 300 # 视频上传超时
PUBLISH_TIMEOUT = 180 # 发布检测超时
PUBLISH_BUTTON_TIMEOUT = 60 # 等待发布按钮可点超时
POST_UPLOAD_STAGE_TIMEOUT = 60 # 上传完成后到发布结果的总超时
PAGE_REDIRECT_TIMEOUT = 60 # 页面跳转超时
POLL_INTERVAL = 2 # 轮询间隔
MAX_CLICK_RETRIES = 3 # 按钮点击重试次数
def __init__(
self,
title: str,
file_path: str,
tags: List[str],
publish_date: Optional[datetime] = None,
account_file: Optional[str] = None,
description: str = "",
user_id: Optional[str] = None,
):
super().__init__(title, file_path, tags, publish_date, account_file, description)
self.user_id = user_id
self.upload_url = "https://creator.douyin.com/creator-micro/content/upload"
self._temp_upload_paths: List[Path] = []
self._video_upload_committed = False
self._cover_generated = False
self._cover_gen_count = 0
self._publish_api_error: Optional[str] = None
def _resolve_headless_mode(self) -> str:
mode = (settings.DOUYIN_HEADLESS_MODE or "").strip().lower()
return mode or "headless-new"
def _build_launch_options(self) -> Dict[str, Any]:
mode = self._resolve_headless_mode()
args = [
"--no-sandbox",
"--disable-dev-shm-usage",
"--disable-blink-features=AutomationControlled",
]
headless = mode not in ("headful", "false", "0", "no")
if headless and mode in ("new", "headless-new", "headless_new"):
args.append("--headless=new")
if settings.DOUYIN_FORCE_SWIFTSHADER or headless:
args.extend([
"--enable-unsafe-swiftshader",
"--use-gl=swiftshader",
])
options: Dict[str, Any] = {"headless": headless, "args": args}
chrome_path = (settings.DOUYIN_CHROME_PATH or "").strip()
if chrome_path:
if Path(chrome_path).exists():
options["executable_path"] = chrome_path
else:
logger.warning(f"[抖音] DOUYIN_CHROME_PATH 不存在: {chrome_path}")
else:
channel = (settings.DOUYIN_BROWSER_CHANNEL or "").strip()
if channel:
options["channel"] = channel
return options
def _debug_log_path(self) -> Path:
debug_dir = Path(__file__).parent.parent.parent / "debug_screenshots"
debug_dir.mkdir(exist_ok=True)
return debug_dir / "douyin_network.log"
def _debug_artifacts_enabled(self) -> bool:
return bool(settings.DEBUG and settings.DOUYIN_DEBUG_ARTIFACTS)
def _record_video_enabled(self) -> bool:
return bool(self._debug_artifacts_enabled() and settings.DOUYIN_RECORD_VIDEO)
def _append_debug_log(self, message: str) -> None:
if not self._debug_artifacts_enabled():
return
try:
timestamp = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
log_path = self._debug_log_path()
with log_path.open("a", encoding="utf-8") as handle:
handle.write(f"[{timestamp}] {message}\n")
except Exception:
pass
def _video_record_dir(self) -> Path:
debug_dir = Path(__file__).parent.parent.parent / "debug_screenshots" / "videos"
debug_dir.mkdir(parents=True, exist_ok=True)
return debug_dir
async def _save_recorded_video(self, video, success: bool) -> Optional[Path]:
if not self._record_video_enabled():
return None
if not video:
return None
try:
timestamp = time.strftime("%Y%m%d_%H%M%S", time.localtime())
status = "success" if success else "failed"
file_path = self._video_record_dir() / f"douyin_{timestamp}_{status}.webm"
await video.save_as(str(file_path))
self._append_debug_log(f"[douyin][record] saved={file_path}")
if success and not settings.DOUYIN_KEEP_SUCCESS_VIDEO:
try:
file_path.unlink(missing_ok=True)
except TypeError:
if file_path.exists():
file_path.unlink()
self._append_debug_log("[douyin][record] removed_success_video")
return None
return file_path
except Exception as e:
logger.warning(f"[抖音] 保存录屏失败: {e}")
return None
async def _save_debug_screenshot(self, page, name: str) -> None:
if not self._debug_artifacts_enabled():
return
try:
debug_dir = Path(__file__).parent.parent.parent / "debug_screenshots"
debug_dir.mkdir(exist_ok=True)
file_path = debug_dir / f"douyin_{name}.png"
await page.screenshot(path=str(file_path), full_page=True)
except Exception as e:
logger.warning(f"[抖音] 保存截图失败: {e}")
def _publish_screenshot_dir(self) -> Path:
user_key = re.sub(r"[^A-Za-z0-9_-]", "_", self.user_id or "legacy")[:64] or "legacy"
target = settings.PUBLISH_SCREENSHOT_DIR / user_key
target.mkdir(parents=True, exist_ok=True)
return target
async def _open_manage_review_tab(self, page) -> tuple[bool, str]:
manage_url = "https://creator.douyin.com/creator-micro/content/manage"
try:
if "content/manage" not in page.url:
await page.goto(manage_url)
await page.wait_for_load_state("domcontentloaded")
await asyncio.sleep(1.5)
review_selectors = [
"button:has-text('审核中')",
"div[role='tab']:has-text('审核中')",
"span:has-text('审核中')",
"button:has-text('待审核')",
"div[role='tab']:has-text('待审核')",
"span:has-text('待审核')",
]
for selector in review_selectors:
target = await self._first_visible_locator(page.locator(selector), timeout=1200)
if not target:
continue
try:
await target.scroll_into_view_if_needed(timeout=1000)
except Exception:
pass
try:
await target.click(timeout=2500)
except Exception:
await target.evaluate("el => el.click()")
await asyncio.sleep(1.0)
self._append_debug_log(f"[douyin][manage] review_tab_clicked selector={selector}")
return True, selector
text_click_targets = [
page.get_by_text("审核中", exact=False).first,
page.get_by_text("待审核", exact=False).first,
]
for target in text_click_targets:
try:
if not await target.is_visible(timeout=800):
continue
try:
await target.scroll_into_view_if_needed(timeout=1000)
except Exception:
pass
try:
await target.click(timeout=2000)
except Exception:
await target.evaluate("el => el.click()")
await asyncio.sleep(1.0)
self._append_debug_log("[douyin][manage] review_tab_clicked selector=text")
return True, "text"
except Exception:
continue
return False, "review-tab-not-found"
except Exception as e:
return False, f"open-manage-error:{e}"
async def _save_publish_success_screenshot(self, page) -> Optional[str]:
try:
timestamp = time.strftime("%Y%m%d_%H%M%S", time.localtime())
filename = f"douyin_success_review_{timestamp}_{int(time.time() * 1000) % 1000:03d}.png"
file_path = self._publish_screenshot_dir() / filename
opened, reason = await self._open_manage_review_tab(page)
if not opened:
self._append_debug_log(f"[douyin][manage] review_tab_open_failed reason={reason}")
return None
title_text = self.title[:30]
title_found = False
for _ in range(2):
deadline = time.time() + 8
while time.time() < deadline:
try:
title_locator = page.get_by_text(title_text, exact=False).first
if await title_locator.is_visible(timeout=500):
title_found = True
break
except Exception:
pass
await asyncio.sleep(0.4)
if title_found:
break
# 审核中列表有延迟,刷新后重查一次
await page.reload(wait_until="domcontentloaded")
await asyncio.sleep(1.0)
opened, _ = await self._open_manage_review_tab(page)
if not opened:
break
self._append_debug_log(
f"[douyin][manage] review_screenshot title_found={title_found} title={title_text}"
)
await page.screenshot(path=str(file_path), full_page=False)
return f"/api/publish/screenshot/{filename}"
except Exception as e:
logger.warning(f"[抖音] 保存发布成功截图失败: {e}")
return None
def _track_temp_upload_path(self, path: Path) -> None:
if path not in self._temp_upload_paths:
self._temp_upload_paths.append(path)
def _prepare_upload_file(self) -> Path:
src = self.file_path
if src.suffix:
return src
parent_suffix = Path(src.parent.name).suffix
if not parent_suffix:
logger.warning(f"[抖音] 上传文件缺少扩展名,且无法从父目录推断后缀: {src}")
self._append_debug_log(f"[douyin][upload_file_prepare] no suffix source={src}")
return src
temp_dir = Path("/tmp/vigent_uploads")
temp_dir.mkdir(parents=True, exist_ok=True)
target = temp_dir / src.parent.name
try:
if target.exists():
target.unlink()
except Exception:
pass
try:
os.link(src, target)
except Exception:
shutil.copy2(src, target)
self._track_temp_upload_path(target)
logger.info(f"[抖音] 使用临时上传文件: {target}")
self._append_debug_log(f"[douyin][upload_file_prepare] source={src} prepared={target}")
return target
def _cleanup_upload_file(self) -> None:
if not self._temp_upload_paths:
return
paths = list(self._temp_upload_paths)
self._temp_upload_paths = []
for path in paths:
try:
if path.exists():
path.unlink()
except Exception as e:
logger.warning(f"[抖音] 清理临时上传文件失败: {e}")
def _should_log_request(self, request) -> bool:
try:
if request.resource_type not in ("xhr", "fetch"):
return False
if request.method not in ("POST", "PUT"):
return False
return True
except Exception:
return False
def _attach_debug_listeners(self, page) -> None:
def log_console(msg):
try:
if not self._debug_artifacts_enabled():
return
if msg.type in ("error", "warning"):
text = f"[douyin][console] {msg.type}: {msg.text}"
logger.warning(text)
self._append_debug_log(text)
except Exception:
pass
def log_page_error(err):
try:
if not self._debug_artifacts_enabled():
return
text = f"[douyin][pageerror] {err}"
logger.warning(text)
self._append_debug_log(text)
except Exception:
pass
def log_request_failed(request):
try:
if not self._debug_artifacts_enabled():
return
failure = request.failure
error_text = failure.error_text if failure else "unknown"
if self._should_log_request(request):
text = f"[douyin][requestfailed] {request.method} {request.url} -> {error_text}"
logger.warning(text)
self._append_debug_log(text)
except Exception:
pass
def log_request(request):
try:
if not self._debug_artifacts_enabled():
return
if self._should_log_request(request):
text = f"[douyin][request] {request.method} {request.url}"
self._append_debug_log(text)
except Exception:
pass
def log_response(response):
try:
request = response.request
if self._debug_artifacts_enabled() and (self._should_log_request(request) or response.status >= 400):
text = f"[douyin][response] {response.status} {request.method} {response.url}"
logger.warning(text)
self._append_debug_log(text)
if response.status < 400:
url = response.url
if "vod.bytedanceapi.com/" in url and "Action=CommitUploadInner" in url:
self._video_upload_committed = True
self._append_debug_log("[douyin][upload_signal] commit_upload_inner_ok")
if "/aweme/v1/cover/gen/post" in url:
self._cover_generated = True
self._cover_gen_count += 1
self._append_debug_log("[douyin][upload_signal] cover_gen_ok")
else:
url = response.url
if "/web/api/media/aweme/create_v2/" in url or "/aweme/create_v2/" in url:
self._publish_api_error = f"发布请求失败HTTP {response.status}"
self._append_debug_log(
f"[douyin][publish_api_error] status={response.status} endpoint=create_v2"
)
except Exception:
pass
page.on("console", log_console)
page.on("pageerror", log_page_error)
page.on("requestfailed", log_request_failed)
page.on("request", log_request)
page.on("response", log_response)
async def _is_text_visible(self, page, text: str, exact: bool = False) -> bool:
try:
return await page.get_by_text(text, exact=exact).first.is_visible()
except Exception:
return False
async def _first_visible_locator(self, locator, timeout: int = 1000):
try:
if await locator.count() == 0:
return None
candidate = locator.first
if await candidate.is_visible(timeout=timeout):
return candidate
except Exception:
return None
return None
async def _open_upload_page(self, page) -> None:
home_url = "https://creator.douyin.com/creator-micro/home"
clicked_high_quality = False
try:
await page.goto(home_url)
await page.wait_for_load_state("domcontentloaded")
await asyncio.sleep(1)
for selector in [
"button:has-text('高清发布')",
"a:has-text('高清发布')",
"div[role='button']:has-text('高清发布')",
]:
try:
button = await self._first_visible_locator(page.locator(selector), timeout=1200)
if not button:
continue
await button.click()
logger.info(f"[抖音] 已点击上传入口: {selector}")
clicked_high_quality = True
await asyncio.sleep(1.5)
break
except Exception:
continue
except Exception as e:
logger.warning(f"[抖音] 打开首页失败,将直接进入上传页: {e}")
if "content/upload" not in page.url:
await page.goto(self.upload_url)
await page.wait_for_load_state("domcontentloaded")
await asyncio.sleep(1)
elif clicked_high_quality:
await asyncio.sleep(1)
async def _is_login_page(self, page) -> bool:
try:
current_url = page.url or ""
lower_url = current_url.lower()
if any(token in lower_url for token in ("passport", "login", "check_qrconnect", "sso")):
return True
text_hints = [
"扫码登录",
"验证码登录",
"密码登录",
"立即登录",
"登录后",
"抖音APP扫码登录",
"创作者登录",
"我想MCN机构",
]
for text in text_hints:
if await self._is_text_visible(page, text, exact=False):
return True
selector_hints = [
"input[placeholder*='手机号']",
"input[placeholder*='验证码']",
"button:has-text('立即登录')",
"button:has-text('登录')",
]
for selector in selector_hints:
try:
target = await self._first_visible_locator(page.locator(selector), timeout=600)
if target:
return True
except Exception:
continue
except Exception:
return False
return False
async def _handle_unfinished_draft(self, page) -> None:
if not await self._is_text_visible(page, "你还有上次未发布的视频", exact=False):
return
logger.info("[抖音] 检测到未发布草稿,尝试放弃后重新上传")
clicked = False
for selector in [
"button:has-text('放弃')",
"a:has-text('放弃')",
"span:has-text('放弃')",
]:
try:
target = await self._first_visible_locator(page.locator(selector), timeout=1200)
if not target:
continue
await target.click()
clicked = True
logger.info(f"[抖音] 已点击草稿放弃按钮: {selector}")
await asyncio.sleep(0.8)
break
except Exception:
continue
if not clicked:
return
for selector in [
"button:has-text('确认放弃')",
"button:has-text('确认')",
"button:has-text('确定')",
]:
try:
confirm_button = await self._first_visible_locator(page.locator(selector), timeout=800)
if confirm_button:
await confirm_button.click()
logger.info(f"[抖音] 已确认放弃草稿: {selector}")
await asyncio.sleep(0.6)
break
except Exception:
continue
async def _is_publish_form_ready(self, page) -> bool:
current_url = page.url
if "content/publish" in current_url or "content/post/video" in current_url:
return True
for text in ["基础信息", "作品描述", "发布设置", "重新上传"]:
if await self._is_text_visible(page, text, exact=False):
return True
for selector in [
"button:has-text('发布')",
"button:has-text('定时发布')",
]:
try:
button = await self._first_visible_locator(page.locator(selector), timeout=600)
if button:
return True
except Exception:
continue
return False
async def _wait_for_publish_form_ready(self, page, timeout: int = 60) -> bool:
start_time = time.time()
while time.time() - start_time < timeout:
if await self._is_publish_form_ready(page):
return True
await asyncio.sleep(0.5)
return False
async def _is_upload_completed(self, page) -> bool:
in_progress = await self._is_upload_in_progress(page)
reupload_visible = await self._is_text_visible(page, "重新上传", exact=False)
preview_video_visible = await self._is_text_visible(page, "预览视频", exact=False)
preview_cover_visible = await self._is_text_visible(page, "预览封面/标题", exact=False)
# 页面已出现“重新上传 + 预览”并且视频提交信号已到,优先认为可进入封面步骤
# 避免“处理中”文案残留导致长时间不开始封面设置
if reupload_visible and (preview_video_visible or preview_cover_visible) and self._video_upload_committed:
self._append_debug_log("[douyin][upload_ready] visual_done_reupload_preview_commit")
return True
if reupload_visible and (preview_video_visible or preview_cover_visible) and not in_progress:
self._append_debug_log("[douyin][upload_ready] visual_done_reupload_preview")
return True
if reupload_visible and not in_progress:
self._append_debug_log("[douyin][upload_ready] visual_done_reupload")
return True
if (preview_video_visible or preview_cover_visible) and self._video_upload_committed and not in_progress:
if self._cover_generated:
self._append_debug_log("[douyin][upload_ready] visual_done_commit_and_cover")
else:
self._append_debug_log("[douyin][upload_ready] visual_done_commit")
return True
return False
async def _is_upload_in_progress(self, page) -> bool:
if await self._is_text_visible(page, "上传过程中请不要刷新", exact=False):
return True
if await self._is_text_visible(page, "取消上传", exact=False):
return True
progress_hints = ["已上传", "当前速度", "剩余时间", "上传中", "处理中", "转码中", "封面生成"]
hit = 0
for text in progress_hints:
if await self._is_text_visible(page, text, exact=False):
hit += 1
return hit >= 2
async def _wait_for_upload_completion(self, page, timeout: int) -> bool:
start_time = time.time()
while time.time() - start_time < timeout:
try:
if await self._is_upload_completed(page):
return True
elapsed = int(time.time() - start_time)
if await self._is_upload_in_progress(page):
logger.info(f"[抖音] 视频上传进行中...({elapsed}s)")
self._append_debug_log(f"[douyin][upload_wait] in_progress elapsed={elapsed}")
else:
self._append_debug_log(f"[douyin][upload_wait] waiting_signals elapsed={elapsed}")
except Exception:
pass
await asyncio.sleep(1)
return False
async def _find_publish_button(self, page, publish_label: str):
selectors = [
page.get_by_role("button", name=publish_label, exact=True).first,
page.get_by_role("button", name=publish_label, exact=False).first,
page.locator(f"button:has-text('{publish_label}')").first,
]
if publish_label == "发布":
selectors.extend([
page.locator("button:has-text('发布')").first,
page.locator("button:has-text('立即发布')").first,
])
else:
selectors.append(page.locator("button:has-text('定时发布')").first)
for button in selectors:
try:
if await button.is_visible(timeout=600):
return button
except Exception:
continue
return None
async def _click_publish_button(self, page, publish_label: str, timeout: Optional[float] = None) -> tuple[bool, str]:
start_time = time.time()
last_error = ""
wait_timeout = timeout if timeout is not None else self.PUBLISH_BUTTON_TIMEOUT
while time.time() - start_time < wait_timeout:
await self._dismiss_blocking_modal(page)
button = await self._find_publish_button(page, publish_label)
if button:
try:
if await button.is_enabled():
self._append_debug_log("[douyin][publish_click] try=primary")
await button.click(timeout=5000)
self._append_debug_log(f"[douyin][publish_click] clicked label={publish_label}")
logger.info(f"[抖音] 点击了{publish_label}按钮")
return True, ""
self._append_debug_log(f"[douyin][publish_wait] disabled label={publish_label}")
except Exception as e:
last_error = str(e)
fallback_selectors = ["button:has-text('发布')", "button:has-text('定时发布')"]
for selector in fallback_selectors:
try:
target = await self._first_visible_locator(page.locator(selector), timeout=500)
if not target:
continue
if not await target.is_enabled():
continue
self._append_debug_log(f"[douyin][publish_click] try=fallback selector={selector}")
await target.click(timeout=3000)
self._append_debug_log(f"[douyin][publish_click] clicked selector={selector}")
logger.info(f"[抖音] 使用备用选择器点击了按钮: {selector}")
return True, ""
except Exception as e:
last_error = str(e)
continue
elapsed = int(time.time() - start_time)
logger.info(f"[抖音] 等待{publish_label}按钮可点击...({elapsed}s)")
self._append_debug_log(f"[douyin][publish_wait] label={publish_label} elapsed={elapsed}")
await asyncio.sleep(1)
return False, last_error or "发布按钮未进入可点击状态"
async def _set_input_file_and_log(self, locator, upload_path: Path) -> None:
await locator.set_input_files(str(upload_path))
size = upload_path.stat().st_size
info = f"[douyin][upload_file] path={upload_path} size={size} suffix={upload_path.suffix}"
logger.info(info)
self._append_debug_log(info)
file_info = await locator.evaluate(
"""
(input) => {
const file = input && input.files ? input.files[0] : null;
if (!file) return null;
return { name: file.name, size: file.size, type: file.type };
}
"""
)
if file_info:
text = f"[douyin][file_input] name={file_info.get('name')} size={file_info.get('size')} type={file_info.get('type')}"
else:
text = "[douyin][file_input] empty"
logger.info(text)
self._append_debug_log(text)
async def _try_upload_via_file_chooser(self, page, upload_path: Path) -> bool:
for selector in [
"button:has-text('上传视频')",
"span:has-text('上传视频')",
"div:has-text('上传视频')",
]:
try:
trigger = await self._first_visible_locator(page.locator(selector), timeout=1200)
if not trigger:
continue
self._append_debug_log(f"[douyin][upload_trigger] filechooser selector={selector}")
async with page.expect_file_chooser(timeout=5000) as chooser_info:
await trigger.click()
chooser = await chooser_info.value
await chooser.set_files(str(upload_path))
size = upload_path.stat().st_size
self._append_debug_log(
f"[douyin][upload_file] path={upload_path} size={size} suffix={upload_path.suffix} mode=filechooser"
)
if await self._wait_for_publish_form_ready(page, timeout=20):
return True
except PlaywrightTimeoutError:
continue
except Exception:
continue
return False
async def _wait_for_publish_result(self, page, max_wait_time: int = 180):
success_texts = ["发布成功", "作品已发布", "再发一条", "查看作品", "审核中", "待审核"]
weak_texts = ["发布完成"]
failure_texts = [
"发布失败",
"发布异常",
"发布出错",
"请完善",
"请补充",
"请先上传",
"网络不佳",
"网络异常",
"网络开小差",
"请稍后重试",
]
start_time = time.time()
poll_interval = 2
weak_reason = None
while time.time() - start_time < max_wait_time:
if page.is_closed():
return False, "页面已关闭", False
if self._publish_api_error:
return False, self._publish_api_error, False
current_url = page.url
if "content/manage" in current_url:
return True, f"已跳转到管理页面 (URL: {current_url})", False
for text in success_texts:
if await self._is_text_visible(page, text, exact=False):
return True, f"检测到成功提示: {text}", False
for text in failure_texts:
if await self._is_text_visible(page, text, exact=False):
return False, f"检测到失败提示: {text}", False
for text in weak_texts:
if await self._is_text_visible(page, text, exact=False):
weak_reason = text
logger.info("[抖音] 视频正在发布中...")
await asyncio.sleep(poll_interval)
if weak_reason:
return False, f"检测到提示: {weak_reason}", True
return False, "发布检测超时", True
async def _is_cover_required(self, page) -> bool:
try:
if await page.get_by_text("设置封面", exact=False).count() == 0:
return False
# 老版页面通常会明确展示 "必填"
if await page.get_by_text("必填", exact=False).count() > 0:
self._append_debug_log("[douyin][cover] required_by_text")
return True
return False
except Exception:
return False
async def _confirm_cover_selection(self, page, wait_enabled_timeout: float = 6.0) -> bool:
confirm_selectors = [
"button:has-text('完成')",
"button:has-text('确定')",
"button:has-text('保存')",
"button:has-text('确认')",
]
for selector in confirm_selectors:
try:
button = await self._first_visible_locator(page.locator(selector), timeout=600)
if not button:
continue
deadline = time.time() + wait_enabled_timeout
while time.time() < deadline:
try:
if await button.is_enabled():
break
except Exception:
break
await asyncio.sleep(0.2)
try:
if not await button.is_enabled():
continue
except Exception:
continue
await button.click()
logger.info(f"[抖音] 封面已确认: {selector}")
self._append_debug_log(f"[douyin][cover] confirmed selector={selector}")
await asyncio.sleep(0.3)
return True
except Exception:
continue
return False
async def _switch_to_horizontal_cover(self, scopes, timeout: float = 5.0) -> tuple[bool, bool]:
selectors = [
"button:has-text('设置横封面')",
"div:has-text('设置横封面')",
"span:has-text('设置横封面')",
"button:has-text('横封面')",
"div[role='tab']:has-text('横封面')",
"span:has-text('横封面')",
"button:has-text('横版封面')",
"span:has-text('横版封面')",
]
available = False
deadline = time.time() + timeout
while time.time() < deadline:
for scope in scopes:
for selector in selectors:
try:
target = await self._first_visible_locator(scope.locator(selector), timeout=700)
if not target:
continue
available = True
try:
await target.scroll_into_view_if_needed(timeout=1000)
except Exception:
pass
try:
await target.click(timeout=2000)
except Exception:
await target.evaluate("el => el.click()")
logger.info(f"[抖音] 已切换到横封面设置: {selector}")
self._append_debug_log(f"[douyin][cover] switched_horizontal selector={selector}")
await asyncio.sleep(0.2)
return True, True
except Exception:
continue
await asyncio.sleep(0.25)
return False, available
async def _wait_for_cover_effect_pass(self, page, baseline_cover_count: int, timeout: float = 12.0) -> bool:
pass_texts = [
"封面效果检测通过",
"封面效果检测已通过",
"封面检测通过",
"检测通过",
]
deadline = time.time() + timeout
while time.time() < deadline:
if self._cover_gen_count > baseline_cover_count:
self._append_debug_log("[douyin][cover] effect_passed_by_cover_gen")
return True
for text in pass_texts:
if await self._is_text_visible(page, text, exact=False):
self._append_debug_log(f"[douyin][cover] effect_passed_by_text={text}")
return True
await asyncio.sleep(0.25)
self._append_debug_log("[douyin][cover] effect_check_timeout")
return False
async def _is_cover_configured_on_page(self, page) -> bool:
try:
if await page.get_by_text("设置封面", exact=False).count() == 0:
return False
has_horizontal_label = await self._is_text_visible(page, "横封面", exact=False)
has_vertical_label = await self._is_text_visible(page, "竖封面", exact=False)
if not has_horizontal_label and not has_vertical_label:
return False
selectors = [
"div:has-text('竖封面') img",
"div:has-text('横封面') img",
"div[class*='cover'] img",
]
for selector in selectors:
try:
locator = page.locator(selector)
count = await locator.count()
for idx in range(min(count, 6)):
try:
if await locator.nth(idx).is_visible(timeout=400):
self._append_debug_log(f"[douyin][cover] configured_visual selector={selector}")
return True
except Exception:
continue
except Exception:
continue
# 某些页面不会给图片节点明显 class兜底以标签+无“选择封面”判断
has_choose_cover = await page.get_by_text("选择封面", exact=False).count() > 0
if not has_choose_cover and has_horizontal_label and has_vertical_label:
self._append_debug_log("[douyin][cover] configured_by_labels_no_choose_button")
return True
except Exception:
return False
return False
async def _fill_title(self, page, title: str) -> bool:
title_text = title[:30]
locator_candidates = [
page.locator("input[placeholder*='填写作品标题']"),
page.locator("input[placeholder*='作品标题']"),
page.locator("textarea[placeholder*='填写作品标题']"),
page.locator("textarea[placeholder*='作品描述']"),
page.locator("input[placeholder*='作品描述']"),
page.locator("textarea[placeholder*='描述']"),
page.locator("input[placeholder*='描述']"),
]
for locator in locator_candidates:
try:
if await locator.count() > 0:
target = locator.first
await target.fill(title_text)
return True
except Exception:
continue
try:
editable = page.locator("div[contenteditable='true']").first
if await editable.count() > 0:
await editable.click()
await page.keyboard.press("Control+KeyA")
await page.keyboard.press("Delete")
await page.keyboard.type(title_text)
return True
except Exception:
pass
return False
async def _fill_description_with_tags(self, page) -> bool:
tags = [tag.strip().lstrip("#") for tag in self.tags if tag and tag.strip()]
if not tags:
return True
tag_text = " ".join(f"#{tag}" for tag in tags)
locator_candidates = [
page.locator("textarea[placeholder*='添加作品简介']"),
page.locator("textarea[placeholder*='作品简介']"),
page.locator("textarea[placeholder*='简介']"),
page.locator("div[contenteditable='true'][data-placeholder*='添加作品简介']"),
page.locator("div[contenteditable='true'][placeholder*='添加作品简介']"),
page.locator(".zone-container"),
]
for locator in locator_candidates:
try:
if await locator.count() == 0:
continue
target = locator.first
await target.click()
try:
current_value = await target.input_value()
value = (current_value or "").strip()
merged = f"{value} {tag_text}".strip()
await target.fill(merged)
except Exception:
await page.keyboard.type(tag_text + " ")
logger.info(f"[抖音] 已添加话题: {tag_text}")
return True
except Exception:
continue
return False
async def _select_cover_if_needed(self, page, require_horizontal: bool = False) -> bool:
try:
started_at = time.time()
baseline_cover_count = self._cover_gen_count
cover_button = page.get_by_text("选择封面", exact=False).first
if not await cover_button.is_visible():
if await self._is_cover_configured_on_page(page):
self._append_debug_log("[douyin][cover] already_configured_no_choose_button")
if require_horizontal:
await self._wait_for_cover_effect_pass(page, baseline_cover_count, timeout=3.0)
return True
self._append_debug_log("[douyin][cover] no_cover_button")
return False
await cover_button.click()
logger.info("[抖音] 尝试选择封面")
await asyncio.sleep(0.3)
dialog = page.locator(
"div.dy-creator-content-modal-wrap, div[role='dialog'], "
"div[class*='modal'], div[class*='dialog']"
).last
scopes = [dialog] if await dialog.count() > 0 else [page]
await self._dismiss_blocking_modal(page)
switched, horizontal_available = await self._switch_to_horizontal_cover(scopes)
if horizontal_available and not switched:
# 弹窗可能遮挡了横封面入口,再尝试一次
await self._dismiss_blocking_modal(page)
switched, _ = await self._switch_to_horizontal_cover(scopes)
if horizontal_available and not switched:
if await self._is_cover_configured_on_page(page):
self._append_debug_log("[douyin][cover] horizontal_missed_but_already_configured")
await self._wait_for_cover_effect_pass(page, baseline_cover_count, timeout=3.0)
return True
self._append_debug_log("[douyin][cover] horizontal_switch_missed")
return False
if require_horizontal and not switched:
self._append_debug_log("[douyin][cover] required_horizontal_not_switched")
return False
# 某些旧版页面切到横封面后会默认选中,可直接完成
if switched and await self._confirm_cover_selection(page, wait_enabled_timeout=1.5):
if await dialog.count() > 0:
try:
await dialog.wait_for(state="hidden", timeout=4000)
except Exception:
pass
if not await self._wait_for_cover_effect_pass(page, baseline_cover_count, timeout=12.0):
return False
elapsed = int(time.time() - started_at)
self._append_debug_log(f"[douyin][cover] fast_confirm_after_switch elapsed={elapsed}")
return True
# 没有横封面入口时,尝试直接确认(兼容其它页面形态)
if not require_horizontal and not horizontal_available and await self._confirm_cover_selection(page, wait_enabled_timeout=1.5):
if await dialog.count() > 0:
try:
await dialog.wait_for(state="hidden", timeout=4000)
except Exception:
pass
elapsed = int(time.time() - started_at)
self._append_debug_log(f"[douyin][cover] fast_confirm elapsed={elapsed}")
return True
selected = False
for scope in scopes:
for selector in [
"div[class*='cover'] img",
"div[class*='cover']",
"div[class*='frame'] img",
"div[class*='frame']",
"div[class*='preset']",
]:
try:
candidate = await self._first_visible_locator(scope.locator(selector))
if candidate:
await candidate.click()
logger.info("[抖音] 已选择封面帧")
selected = True
break
except Exception:
continue
if selected:
break
if not selected:
self._append_debug_log("[douyin][cover] no_cover_candidate")
return False
confirmed = await self._confirm_cover_selection(page, wait_enabled_timeout=8.0)
if confirmed and await dialog.count() > 0:
try:
await dialog.wait_for(state="hidden", timeout=5000)
except Exception:
pass
if confirmed and (switched or require_horizontal):
if not await self._wait_for_cover_effect_pass(page, baseline_cover_count, timeout=12.0):
return False
elapsed = int(time.time() - started_at)
self._append_debug_log(
f"[douyin][cover] selected={selected} confirmed={confirmed} elapsed={elapsed}"
)
return confirmed
except Exception as e:
logger.warning(f"[抖音] 选择封面失败: {e}")
return False
async def _click_publish_confirm_modal(self, page):
confirm_selectors = [
"button:has-text('确认发布')",
"button:has-text('继续发布')",
"button:has-text('确定发布')",
"button:has-text('发布确认')",
]
for selector in confirm_selectors:
try:
button = page.locator(selector).first
if await button.is_visible():
await button.click()
logger.info(f"[抖音] 点击了发布确认按钮: {selector}")
await asyncio.sleep(1)
return True
except Exception:
continue
return False
async def _dismiss_blocking_modal(self, page) -> bool:
modal_locator = page.locator(
"div.dy-creator-content-modal-wrap, div[role='dialog'], "
"div[class*='modal'], div[class*='dialog']"
)
try:
count = await modal_locator.count()
except Exception:
return False
if count == 0:
return False
button_texts = [
"我知道了",
"知道了",
"确定",
"继续",
"继续发布",
"确认",
"同意并继续",
"完成",
"暂不设置",
"好的",
"明白了",
]
close_selectors = [
"button[class*='close']",
"span[class*='close']",
"i[class*='close']",
]
for index in range(count):
modal = modal_locator.nth(index)
try:
if not await modal.is_visible():
continue
for text in button_texts:
try:
button = modal.get_by_role("button", name=text).first
if await button.is_visible():
await button.click()
logger.info(f"[抖音] 关闭弹窗: {text}")
await asyncio.sleep(0.5)
return True
except Exception:
continue
for selector in close_selectors:
try:
close_button = modal.locator(selector).first
if await close_button.is_visible():
await close_button.click()
logger.info("[抖音] 关闭弹窗: close")
await asyncio.sleep(0.5)
return True
except Exception:
continue
except Exception:
continue
return False
async def _verify_publish_in_manage(self, page):
try:
opened, reason = await self._open_manage_review_tab(page)
if not opened:
self._append_debug_log(f"[douyin][manage] verify_open_review_failed reason={reason}")
title_text = self.title[:30]
title_locator = page.get_by_text(title_text, exact=False).first
if await title_locator.is_visible():
return True, "审核中列表检测到新作品"
except Exception as e:
return False, f"无法验证内容管理: {e}"
return False, "审核中列表未找到视频"
async def set_schedule_time(self, page, publish_date):
"""Set scheduled publish time"""
try:
# Click "定时发布" radio button
label_element = page.locator("[class^='radio']:has-text('定时发布')")
await label_element.click()
await asyncio.sleep(1)
# Format time
publish_date_hour = publish_date.strftime("%Y-%m-%d %H:%M")
# Fill datetime input
await page.locator('.semi-input[placeholder="日期和时间"]').click()
await page.keyboard.press("Control+KeyA")
await page.keyboard.type(str(publish_date_hour))
await page.keyboard.press("Enter")
await asyncio.sleep(1)
logger.info(f"[抖音] 已设置定时发布: {publish_date_hour}")
except Exception as e:
logger.error(f"[抖音] 设置定时发布失败: {e}")
async def upload(self, playwright: Playwright) -> dict:
"""Main upload logic with guaranteed resource cleanup"""
browser = None
context = None
page = None
recorded_video = None
final_success = False
try:
launch_options = self._build_launch_options()
browser = await playwright.chromium.launch(**launch_options)
context_kwargs = {
"storage_state": self.account_file,
"viewport": {"width": 1920, "height": 1080},
"user_agent": settings.DOUYIN_USER_AGENT,
"locale": settings.DOUYIN_LOCALE,
"timezone_id": settings.DOUYIN_TIMEZONE_ID,
}
if self._record_video_enabled():
context_kwargs["record_video_dir"] = str(self._video_record_dir())
context_kwargs["record_video_size"] = {
"width": settings.DOUYIN_RECORD_VIDEO_WIDTH,
"height": settings.DOUYIN_RECORD_VIDEO_HEIGHT,
}
context = await browser.new_context(
**context_kwargs,
)
context = await set_init_script(context)
page = await context.new_page()
recorded_video = page.video
self._attach_debug_listeners(page)
await self._open_upload_page(page)
upload_path = self._prepare_upload_file()
logger.info(f"[抖音] 正在上传: {upload_path.name}")
current_url = page.url
if "login" in current_url or "passport" in current_url or await self._is_login_page(page):
logger.error("[抖音] Cookie 已失效,被重定向到登录页")
self._append_debug_log(f"[douyin][auth] login_required url={current_url}")
await self._save_debug_screenshot(page, "login_redirect")
return {
"success": False,
"message": "Cookie 已失效,请重新登录",
"url": None
}
await self._handle_unfinished_draft(page)
self._video_upload_committed = False
self._cover_generated = False
upload_ok = await self._try_upload_via_file_chooser(page, upload_path)
if not upload_ok:
self._append_debug_log("[douyin][upload_flow] filechooser_not_triggered_first_try")
if not upload_ok:
logger.error("[抖音] 文件已选择但页面未进入发布态,按首页流程重试一次")
await self._save_debug_screenshot(page, "upload_not_accepted_before_retry")
await self._open_upload_page(page)
await self._handle_unfinished_draft(page)
self._video_upload_committed = False
self._cover_generated = False
upload_ok = await self._try_upload_via_file_chooser(page, upload_path)
if not upload_ok:
self._append_debug_log("[douyin][upload_flow] filechooser_not_triggered_second_try")
if not upload_ok:
if await self._is_login_page(page):
logger.error("[抖音] 上传阶段检测到登录页Cookie 已失效")
self._append_debug_log(f"[douyin][auth] login_required_after_upload_retry url={page.url}")
await self._save_debug_screenshot(page, "login_redirect")
return {
"success": False,
"message": "Cookie 已失效,请重新登录",
"url": None
}
logger.error("[抖音] 上传未生效,页面结构可能已变化")
await self._save_debug_screenshot(page, "publish_page_timeout")
return {
"success": False,
"message": "未触发上传视频文件选择弹窗,无法进入发布页面",
"url": None
}
if not await self._wait_for_publish_form_ready(page, timeout=self.PAGE_REDIRECT_TIMEOUT):
logger.error("[抖音] 等待发布页面超时")
await self._save_debug_screenshot(page, "publish_page_timeout")
return {
"success": False,
"message": "等待发布页面超时",
"url": None
}
# Fill title
await asyncio.sleep(1)
logger.info("[抖音] 正在填充标题和话题...")
if not await self._fill_title(page, self.title):
logger.warning("[抖音] 未找到作品描述输入框")
if not await self._fill_description_with_tags(page):
logger.warning("[抖音] 未找到简介输入框,无法自动填充话题")
# Set scheduled publish time if needed
if self.publish_date != 0:
await self.set_schedule_time(page, self.publish_date)
if not await self._wait_for_upload_completion(page, timeout=self.UPLOAD_TIMEOUT):
logger.error("[抖音] 视频上传完成检测超时")
await self._save_debug_screenshot(page, "upload_timeout")
return {
"success": False,
"message": "视频上传未完成(超时),未执行发布",
"url": None
}
post_upload_deadline = time.time() + self.POST_UPLOAD_STAGE_TIMEOUT
self._append_debug_log(
f"[douyin][post_upload] stage_timeout={self.POST_UPLOAD_STAGE_TIMEOUT}s"
)
cover_required = await self._is_cover_required(page)
if cover_required:
logger.info("[抖音] 当前页面封面为必填,开始设置封面")
cover_selected = False
for attempt in range(2):
cover_selected = await self._select_cover_if_needed(page, require_horizontal=True)
if cover_selected:
break
await asyncio.sleep(1)
if not cover_selected and await self._is_cover_configured_on_page(page):
logger.warning("[抖音] 封面流程返回失败,但页面已显示封面配置,继续发布")
self._append_debug_log("[douyin][cover] configured_fallback_continue")
cover_selected = True
if cover_selected:
logger.info("[抖音] 封面设置完成")
self._append_debug_log("[douyin][cover] selected")
else:
logger.error("[抖音] 封面设置未完成,停止发布")
self._append_debug_log("[douyin][cover] not_selected")
await self._save_debug_screenshot(page, "cover_not_selected")
return {
"success": False,
"message": "封面为必填但未设置成功,已停止发布",
"url": None
}
else:
logger.info("[抖音] 当前页面封面非必填,跳过封面设置")
self._append_debug_log("[douyin][cover] optional_skip")
logger.info("[抖音] 检测到上传完成等待2秒后再点击发布")
self._append_debug_log("[douyin][upload_ready] wait_before_publish=2s")
await asyncio.sleep(2)
remaining = post_upload_deadline - time.time()
if remaining <= 0:
return {
"success": False,
"message": "上传完成后发布阶段超时超过60秒",
"url": None
}
# Click publish button
publish_label = "定时发布" if self.publish_date != 0 else "发布"
self._publish_api_error = None
click_timeout = max(5, min(self.PUBLISH_BUTTON_TIMEOUT, remaining))
clicked, reason = await self._click_publish_button(page, publish_label, timeout=click_timeout)
if not clicked and not cover_required:
logger.warning("[抖音] 首次点击发布失败,尝试先设置封面后重试")
self._append_debug_log("[douyin][cover] retry_select_before_publish")
for _ in range(2):
if await self._select_cover_if_needed(page, require_horizontal=False):
await asyncio.sleep(1)
remaining = post_upload_deadline - time.time()
if remaining <= 0:
break
click_timeout = max(5, min(self.PUBLISH_BUTTON_TIMEOUT, remaining))
clicked, reason = await self._click_publish_button(
page,
publish_label,
timeout=click_timeout,
)
if clicked:
self._append_debug_log("[douyin][cover] retry_click_success")
break
await asyncio.sleep(1)
if not clicked:
logger.error(f"[抖音] 点击发布按钮失败: {reason}")
await self._save_debug_screenshot(page, "publish_button_disabled")
return {
"success": False,
"message": "发布按钮长时间不可点击或点击失败,请检查声明/权限/页面弹窗",
"url": None
}
await self._click_publish_confirm_modal(page)
# 4. 检测发布完成
remaining = int(post_upload_deadline - time.time())
if remaining <= 0:
return {
"success": False,
"message": "上传完成后发布阶段超时超过60秒",
"url": None
}
publish_wait_timeout = max(5, min(self.PUBLISH_TIMEOUT, remaining))
publish_success, publish_reason, is_timeout = await self._wait_for_publish_result(
page,
max_wait_time=publish_wait_timeout,
)
if not publish_success and is_timeout:
verify_success, verify_reason = await self._verify_publish_in_manage(page)
if verify_success:
publish_reason = f"{publish_reason}; 管理页检测到同名作品: {verify_reason}"
else:
publish_reason = f"{publish_reason}; {verify_reason}"
if publish_success:
logger.success(f"[抖音] 发布成功: {publish_reason}")
else:
if is_timeout:
logger.warning("[抖音] 发布检测超时,但这不一定代表失败")
else:
logger.warning(f"[抖音] 发布未成功: {publish_reason}")
# Save updated cookies
await context.storage_state(path=self.account_file)
logger.success("[抖音] Cookie 更新完毕")
if publish_success:
final_success = True
try:
await page.wait_for_load_state("domcontentloaded", timeout=5000)
except Exception:
pass
await asyncio.sleep(3)
screenshot_url = await self._save_publish_success_screenshot(page)
return {
"success": True,
"message": "发布成功,待审核",
"url": None,
"screenshot_url": screenshot_url,
}
if is_timeout:
return {
"success": False,
"message": f"发布状态未知(检测超时),请到抖音后台确认: {publish_reason}",
"url": None
}
return {
"success": False,
"message": f"发布失败: {publish_reason}",
"url": None
}
except Exception as e:
logger.exception(f"[抖音] 上传失败: {e}")
return {
"success": False,
"message": f"上传失败: {str(e)}",
"url": None
}
finally:
# 确保资源释放
video_path = None
if page:
try:
if not page.is_closed():
await page.close()
except Exception:
pass
video_path = await self._save_recorded_video(recorded_video, final_success)
if video_path:
logger.info(f"[抖音] 调试录屏已保存: {video_path}")
if context:
try:
await context.close()
except Exception:
pass
if browser:
try:
await browser.close()
except Exception:
pass
self._cleanup_upload_file()
async def main(self) -> Dict[str, Any]:
"""Execute upload"""
async with async_playwright() as playwright:
return await self.upload(playwright)