# -*- coding: utf-8 -*- """ 抖音视频发布器 参考: matrix/douyin_uploader/main.py """ import asyncio import os from datetime import datetime from .base import BasePublisher, PublishParams, PublishResult class DouyinPublisher(BasePublisher): """ 抖音视频发布器 使用 Playwright 自动化操作抖音创作者中心 """ platform_name = "douyin" login_url = "https://creator.douyin.com/" publish_url = "https://creator.douyin.com/creator-micro/content/upload" cookie_domain = ".douyin.com" async def set_schedule_time(self, publish_date: datetime): """设置定时发布""" if not self.page: return # 选择定时发布 label_element = self.page.locator("label.radio-d4zkru:has-text('定时发布')") await label_element.click() await asyncio.sleep(1) # 输入时间 publish_date_str = publish_date.strftime("%Y-%m-%d %H:%M") await self.page.locator('.semi-input[placeholder="日期和时间"]').click() await self.page.keyboard.press("Control+KeyA") await self.page.keyboard.type(str(publish_date_str)) await self.page.keyboard.press("Enter") await asyncio.sleep(1) async def handle_upload_error(self, video_path: str): """处理上传错误,重新上传""" if not self.page: return print(f"[{self.platform_name}] 视频出错了,重新上传中...") await self.page.locator('div.progress-div [class^="upload-btn-input"]').set_input_files(video_path) async def publish(self, cookies: str, params: PublishParams) -> PublishResult: """发布视频到抖音""" print(f"\n{'='*60}") print(f"[{self.platform_name}] 开始发布视频") print(f"[{self.platform_name}] 视频路径: {params.video_path}") print(f"[{self.platform_name}] 标题: {params.title}") print(f"[{self.platform_name}] Headless: {self.headless}") print(f"{'='*60}") self.report_progress(5, "正在初始化浏览器...") # 初始化浏览器 await self.init_browser() print(f"[{self.platform_name}] 浏览器初始化完成") # 解析并设置 cookies cookie_list = self.parse_cookies(cookies) print(f"[{self.platform_name}] 解析到 {len(cookie_list)} 个 cookies") await self.set_cookies(cookie_list) if not self.page: raise Exception("Page not initialized") # 检查视频文件 if not os.path.exists(params.video_path): raise Exception(f"视频文件不存在: {params.video_path}") print(f"[{self.platform_name}] 视频文件存在,大小: {os.path.getsize(params.video_path)} bytes") self.report_progress(10, "正在打开上传页面...") # 访问上传页面 await self.page.goto(self.publish_url) await self.page.wait_for_url(self.publish_url, timeout=30000) self.report_progress(15, "正在选择视频文件...") # 点击上传区域 upload_div = self.page.locator("div[class*='container-drag']").first async with self.page.expect_file_chooser() as fc_info: await upload_div.click() file_chooser = await fc_info.value await file_chooser.set_files(params.video_path) # 等待跳转到发布页面 self.report_progress(20, "等待进入发布页面...") for _ in range(60): try: await self.page.wait_for_url( "https://creator.douyin.com/creator-micro/content/post/video*", timeout=2000 ) break except: await asyncio.sleep(1) await asyncio.sleep(2) self.report_progress(30, "正在填充标题和话题...") # 填写标题 title_input = self.page.get_by_text('作品标题').locator("..").locator( "xpath=following-sibling::div[1]").locator("input") if await title_input.count(): await title_input.fill(params.title[:30]) else: # 备用方式 title_container = self.page.locator(".notranslate") await title_container.click() await self.page.keyboard.press("Control+KeyA") await self.page.keyboard.press("Delete") await self.page.keyboard.type(params.title) await self.page.keyboard.press("Enter") # 添加话题标签 if params.tags: css_selector = ".zone-container" for tag in params.tags: print(f"[{self.platform_name}] 添加话题: #{tag}") await self.page.type(css_selector, "#" + tag) await self.page.press(css_selector, "Space") self.report_progress(40, "等待视频上传完成...") # 等待视频上传完成 for _ in range(120): try: count = await self.page.locator("div").filter(has_text="重新上传").count() if count > 0: print(f"[{self.platform_name}] 视频上传完毕") break # 检查上传错误 if await self.page.locator('div.progress-div > div:has-text("上传失败")').count(): await self.handle_upload_error(params.video_path) await asyncio.sleep(3) except: await asyncio.sleep(3) self.report_progress(60, "处理视频设置...") # 关闭弹窗 known_btn = self.page.get_by_role("button", name="我知道了") if await known_btn.count() > 0: await known_btn.first.click() await asyncio.sleep(2) # 设置位置 try: await self.page.locator('div.semi-select span:has-text("输入地理位置")').click() await asyncio.sleep(1) await self.page.keyboard.press("Control+KeyA") await self.page.keyboard.press("Delete") await self.page.keyboard.type(params.location) await asyncio.sleep(1) await self.page.locator('div[role="listbox"] [role="option"]').first.click() except Exception as e: print(f"[{self.platform_name}] 设置位置失败: {e}") # 开启头条/西瓜同步 try: third_part_element = '[class^="info"] > [class^="first-part"] div div.semi-switch' if await self.page.locator(third_part_element).count(): class_name = await self.page.eval_on_selector( third_part_element, 'div => div.className') if 'semi-switch-checked' not in class_name: await self.page.locator(third_part_element).locator( 'input.semi-switch-native-control').click() except: pass # 定时发布 if params.publish_date: self.report_progress(70, "设置定时发布...") await self.set_schedule_time(params.publish_date) self.report_progress(80, "正在发布...") print(f"[{self.platform_name}] 查找发布按钮...") # 点击发布 publish_clicked = False for i in range(30): try: publish_btn = self.page.get_by_role('button', name="发布", exact=True) btn_count = await publish_btn.count() print(f"[{self.platform_name}] 发布按钮数量: {btn_count}") if btn_count > 0: print(f"[{self.platform_name}] 点击发布按钮...") await publish_btn.click() publish_clicked = True await self.page.wait_for_url( "https://creator.douyin.com/creator-micro/content/manage", timeout=5000 ) self.report_progress(100, "发布成功") print(f"[{self.platform_name}] 发布成功! 已跳转到内容管理页面") return PublishResult( success=True, platform=self.platform_name, message="发布成功" ) except Exception as e: current_url = self.page.url print(f"[{self.platform_name}] 尝试 {i+1}/30, 当前URL: {current_url}") if "content/manage" in current_url: self.report_progress(100, "发布成功") print(f"[{self.platform_name}] 发布成功! 已在内容管理页面") return PublishResult( success=True, platform=self.platform_name, message="发布成功" ) # 检查是否有错误提示 try: error_toast = self.page.locator('[class*="toast"][class*="error"], [class*="error-tip"]') if await error_toast.count() > 0: error_text = await error_toast.first.text_content() if error_text: print(f"[{self.platform_name}] 检测到错误提示: {error_text}") raise Exception(f"发布失败: {error_text}") except: pass await asyncio.sleep(1) # 发布超时,保存截图 screenshot_path = f"debug_publish_timeout_{self.platform_name}.png" await self.page.screenshot(path=screenshot_path, full_page=True) print(f"[{self.platform_name}] 发布超时,截图保存到: {screenshot_path}") raise Exception(f"发布超时(截图: {screenshot_path})")