| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244 |
- # -*- coding: utf-8 -*-
- """
- 抖音视频发布器
- 参考: matrix/douyin_uploader/main.py
- """
- import asyncio
- import os
- from datetime import datetime
- from .base import BasePublisher, PublishParams, PublishResult
- class DouyinPublisher(BasePublisher):
- """
- 抖音视频发布器
- 使用 Playwright 自动化操作抖音创作者中心
- """
-
- platform_name = "douyin"
- login_url = "https://creator.douyin.com/"
- publish_url = "https://creator.douyin.com/creator-micro/content/upload"
- cookie_domain = ".douyin.com"
-
- async def set_schedule_time(self, publish_date: datetime):
- """设置定时发布"""
- if not self.page:
- return
-
- # 选择定时发布
- label_element = self.page.locator("label.radio-d4zkru:has-text('定时发布')")
- await label_element.click()
- await asyncio.sleep(1)
-
- # 输入时间
- publish_date_str = publish_date.strftime("%Y-%m-%d %H:%M")
- await self.page.locator('.semi-input[placeholder="日期和时间"]').click()
- await self.page.keyboard.press("Control+KeyA")
- await self.page.keyboard.type(str(publish_date_str))
- await self.page.keyboard.press("Enter")
- await asyncio.sleep(1)
-
- async def handle_upload_error(self, video_path: str):
- """处理上传错误,重新上传"""
- if not self.page:
- return
-
- print(f"[{self.platform_name}] 视频出错了,重新上传中...")
- await self.page.locator('div.progress-div [class^="upload-btn-input"]').set_input_files(video_path)
-
- async def publish(self, cookies: str, params: PublishParams) -> PublishResult:
- """发布视频到抖音"""
- print(f"\n{'='*60}")
- print(f"[{self.platform_name}] 开始发布视频")
- print(f"[{self.platform_name}] 视频路径: {params.video_path}")
- print(f"[{self.platform_name}] 标题: {params.title}")
- print(f"[{self.platform_name}] Headless: {self.headless}")
- print(f"{'='*60}")
-
- self.report_progress(5, "正在初始化浏览器...")
-
- # 初始化浏览器
- await self.init_browser()
- print(f"[{self.platform_name}] 浏览器初始化完成")
-
- # 解析并设置 cookies
- cookie_list = self.parse_cookies(cookies)
- print(f"[{self.platform_name}] 解析到 {len(cookie_list)} 个 cookies")
- await self.set_cookies(cookie_list)
-
- if not self.page:
- raise Exception("Page not initialized")
-
- # 检查视频文件
- if not os.path.exists(params.video_path):
- raise Exception(f"视频文件不存在: {params.video_path}")
-
- print(f"[{self.platform_name}] 视频文件存在,大小: {os.path.getsize(params.video_path)} bytes")
-
- self.report_progress(10, "正在打开上传页面...")
-
- # 访问上传页面
- await self.page.goto(self.publish_url)
- await self.page.wait_for_url(self.publish_url, timeout=30000)
-
- self.report_progress(15, "正在选择视频文件...")
-
- # 点击上传区域
- upload_div = self.page.locator("div[class*='container-drag']").first
- async with self.page.expect_file_chooser() as fc_info:
- await upload_div.click()
- file_chooser = await fc_info.value
- await file_chooser.set_files(params.video_path)
-
- # 等待跳转到发布页面
- self.report_progress(20, "等待进入发布页面...")
- for _ in range(60):
- try:
- await self.page.wait_for_url(
- "https://creator.douyin.com/creator-micro/content/post/video*",
- timeout=2000
- )
- break
- except:
- await asyncio.sleep(1)
-
- await asyncio.sleep(2)
- self.report_progress(30, "正在填充标题和话题...")
-
- # 填写标题
- title_input = self.page.get_by_text('作品标题').locator("..").locator(
- "xpath=following-sibling::div[1]").locator("input")
- if await title_input.count():
- await title_input.fill(params.title[:30])
- else:
- # 备用方式
- title_container = self.page.locator(".notranslate")
- await title_container.click()
- await self.page.keyboard.press("Control+KeyA")
- await self.page.keyboard.press("Delete")
- await self.page.keyboard.type(params.title)
- await self.page.keyboard.press("Enter")
-
- # 添加话题标签
- if params.tags:
- css_selector = ".zone-container"
- for tag in params.tags:
- print(f"[{self.platform_name}] 添加话题: #{tag}")
- await self.page.type(css_selector, "#" + tag)
- await self.page.press(css_selector, "Space")
-
- self.report_progress(40, "等待视频上传完成...")
-
- # 等待视频上传完成
- for _ in range(120):
- try:
- count = await self.page.locator("div").filter(has_text="重新上传").count()
- if count > 0:
- print(f"[{self.platform_name}] 视频上传完毕")
- break
-
- # 检查上传错误
- if await self.page.locator('div.progress-div > div:has-text("上传失败")').count():
- await self.handle_upload_error(params.video_path)
-
- await asyncio.sleep(3)
- except:
- await asyncio.sleep(3)
-
- self.report_progress(60, "处理视频设置...")
-
- # 关闭弹窗
- known_btn = self.page.get_by_role("button", name="我知道了")
- if await known_btn.count() > 0:
- await known_btn.first.click()
-
- await asyncio.sleep(2)
-
- # 设置位置
- try:
- await self.page.locator('div.semi-select span:has-text("输入地理位置")').click()
- await asyncio.sleep(1)
- await self.page.keyboard.press("Control+KeyA")
- await self.page.keyboard.press("Delete")
- await self.page.keyboard.type(params.location)
- await asyncio.sleep(1)
- await self.page.locator('div[role="listbox"] [role="option"]').first.click()
- except Exception as e:
- print(f"[{self.platform_name}] 设置位置失败: {e}")
-
- # 开启头条/西瓜同步
- try:
- third_part_element = '[class^="info"] > [class^="first-part"] div div.semi-switch'
- if await self.page.locator(third_part_element).count():
- class_name = await self.page.eval_on_selector(
- third_part_element, 'div => div.className')
- if 'semi-switch-checked' not in class_name:
- await self.page.locator(third_part_element).locator(
- 'input.semi-switch-native-control').click()
- except:
- pass
-
- # 定时发布
- if params.publish_date:
- self.report_progress(70, "设置定时发布...")
- await self.set_schedule_time(params.publish_date)
-
- self.report_progress(80, "正在发布...")
- print(f"[{self.platform_name}] 查找发布按钮...")
-
- # 点击发布
- publish_clicked = False
- for i in range(30):
- try:
- publish_btn = self.page.get_by_role('button', name="发布", exact=True)
- btn_count = await publish_btn.count()
- print(f"[{self.platform_name}] 发布按钮数量: {btn_count}")
-
- if btn_count > 0:
- print(f"[{self.platform_name}] 点击发布按钮...")
- await publish_btn.click()
- publish_clicked = True
-
- await self.page.wait_for_url(
- "https://creator.douyin.com/creator-micro/content/manage",
- timeout=5000
- )
- self.report_progress(100, "发布成功")
- print(f"[{self.platform_name}] 发布成功! 已跳转到内容管理页面")
- return PublishResult(
- success=True,
- platform=self.platform_name,
- message="发布成功"
- )
- except Exception as e:
- current_url = self.page.url
- print(f"[{self.platform_name}] 尝试 {i+1}/30, 当前URL: {current_url}")
-
- if "content/manage" in current_url:
- self.report_progress(100, "发布成功")
- print(f"[{self.platform_name}] 发布成功! 已在内容管理页面")
- return PublishResult(
- success=True,
- platform=self.platform_name,
- message="发布成功"
- )
-
- # 检查是否有错误提示
- try:
- error_toast = self.page.locator('[class*="toast"][class*="error"], [class*="error-tip"]')
- if await error_toast.count() > 0:
- error_text = await error_toast.first.text_content()
- if error_text:
- print(f"[{self.platform_name}] 检测到错误提示: {error_text}")
- raise Exception(f"发布失败: {error_text}")
- except:
- pass
-
- await asyncio.sleep(1)
-
- # 发布超时,保存截图
- screenshot_path = f"debug_publish_timeout_{self.platform_name}.png"
- await self.page.screenshot(path=screenshot_path, full_page=True)
- print(f"[{self.platform_name}] 发布超时,截图保存到: {screenshot_path}")
- raise Exception(f"发布超时(截图: {screenshot_path})")
|