||
- # -*- coding: utf-8 -*-
- """
- 抖音视频发布器
- 参考: matrix/douyin_uploader/main.py
- """
- import asyncio
- import os
- import json
- from datetime import datetime
- from typing import List
- from .base import (
- BasePublisher, PublishParams, PublishResult,
- WorkItem, WorksResult, CommentItem, CommentsResult
- )
- class DouyinPublisher(BasePublisher):
- """
- 抖音视频发布器
- 使用 Playwright 自动化操作抖音创作者中心
- """
-
- platform_name = "douyin"
- login_url = "https://creator.douyin.com/"
- publish_url = "https://creator.douyin.com/creator-micro/content/upload"
- cookie_domain = ".douyin.com"
-
- async def set_schedule_time(self, publish_date: datetime):
- """设置定时发布"""
- if not self.page:
- return
-
- # 选择定时发布
- label_element = self.page.locator("label.radio-d4zkru:has-text('定时发布')")
- await label_element.click()
- await asyncio.sleep(1)
-
- # 输入时间
- publish_date_str = publish_date.strftime("%Y-%m-%d %H:%M")
- await self.page.locator('.semi-input[placeholder="日期和时间"]').click()
- await self.page.keyboard.press("Control+KeyA")
- await self.page.keyboard.type(str(publish_date_str))
- await self.page.keyboard.press("Enter")
- await asyncio.sleep(1)
-
- async def handle_upload_error(self, video_path: str):
- """处理上传错误,重新上传"""
- if not self.page:
- return
-
- print(f"[{self.platform_name}] 视频出错了,重新上传中...")
- await self.page.locator('div.progress-div [class^="upload-btn-input"]').set_input_files(video_path)
-
- async def publish(self, cookies: str, params: PublishParams) -> PublishResult:
- """发布视频到抖音"""
- print(f"\n{'='*60}")
- print(f"[{self.platform_name}] 开始发布视频")
- print(f"[{self.platform_name}] 视频路径: {params.video_path}")
- print(f"[{self.platform_name}] 标题: {params.title}")
- print(f"[{self.platform_name}] Headless: {self.headless}")
- print(f"{'='*60}")
-
- self.report_progress(5, "正在初始化浏览器...")
-
- # 初始化浏览器
- await self.init_browser()
- print(f"[{self.platform_name}] 浏览器初始化完成")
-
- # 解析并设置 cookies
- cookie_list = self.parse_cookies(cookies)
- print(f"[{self.platform_name}] 解析到 {len(cookie_list)} 个 cookies")
- await self.set_cookies(cookie_list)
-
- if not self.page:
- raise Exception("Page not initialized")
-
- # 检查视频文件
- if not os.path.exists(params.video_path):
- raise Exception(f"视频文件不存在: {params.video_path}")
-
- print(f"[{self.platform_name}] 视频文件存在,大小: {os.path.getsize(params.video_path)} bytes")
-
- self.report_progress(10, "正在打开上传页面...")
-
- # 访问上传页面
- await self.page.goto(self.publish_url)
- await self.page.wait_for_url(self.publish_url, timeout=30000)
-
- self.report_progress(15, "正在选择视频文件...")
-
- # 点击上传区域
- upload_div = self.page.locator("div[class*='container-drag']").first
- async with self.page.expect_file_chooser() as fc_info:
- await upload_div.click()
- file_chooser = await fc_info.value
- await file_chooser.set_files(params.video_path)
-
- # 等待跳转到发布页面
- self.report_progress(20, "等待进入发布页面...")
- for _ in range(60):
- try:
- await self.page.wait_for_url(
- "https://creator.douyin.com/creator-micro/content/post/video*",
- timeout=2000
- )
- break
- except:
- await asyncio.sleep(1)
-
- await asyncio.sleep(2)
- self.report_progress(30, "正在填充标题和话题...")
-
- # 填写标题
- title_input = self.page.get_by_text('作品标题').locator("..").locator(
- "xpath=following-sibling::div[1]").locator("input")
- if await title_input.count():
- await title_input.fill(params.title[:30])
- else:
- # 备用方式
- title_container = self.page.locator(".notranslate")
- await title_container.click()
- await self.page.keyboard.press("Control+KeyA")
- await self.page.keyboard.press("Delete")
- await self.page.keyboard.type(params.title)
- await self.page.keyboard.press("Enter")
-
- # 添加话题标签
- if params.tags:
- css_selector = ".zone-container"
- for tag in params.tags:
- print(f"[{self.platform_name}] 添加话题: #{tag}")
- await self.page.type(css_selector, "#" + tag)
- await self.page.press(css_selector, "Space")
-
- self.report_progress(40, "等待视频上传完成...")
-
- # 等待视频上传完成
- for _ in range(120):
- try:
- count = await self.page.locator("div").filter(has_text="重新上传").count()
- if count > 0:
- print(f"[{self.platform_name}] 视频上传完毕")
- break
-
- # 检查上传错误
- if await self.page.locator('div.progress-div > div:has-text("上传失败")').count():
- await self.handle_upload_error(params.video_path)
-
- await asyncio.sleep(3)
- except:
- await asyncio.sleep(3)
-
- self.report_progress(60, "处理视频设置...")
-
- # 关闭弹窗
- known_btn = self.page.get_by_role("button", name="我知道了")
- if await known_btn.count() > 0:
- await known_btn.first.click()
-
- await asyncio.sleep(2)
-
- # 设置位置
- try:
- await self.page.locator('div.semi-select span:has-text("输入地理位置")').click()
- await asyncio.sleep(1)
- await self.page.keyboard.press("Control+KeyA")
- await self.page.keyboard.press("Delete")
- await self.page.keyboard.type(params.location)
- await asyncio.sleep(1)
- await self.page.locator('div[role="listbox"] [role="option"]').first.click()
- except Exception as e:
- print(f"[{self.platform_name}] 设置位置失败: {e}")
-
- # 开启头条/西瓜同步
- try:
- third_part_element = '[class^="info"] > [class^="first-part"] div div.semi-switch'
- if await self.page.locator(third_part_element).count():
- class_name = await self.page.eval_on_selector(
- third_part_element, 'div => div.className')
- if 'semi-switch-checked' not in class_name:
- await self.page.locator(third_part_element).locator(
- 'input.semi-switch-native-control').click()
- except:
- pass
-
- # 定时发布
- if params.publish_date:
- self.report_progress(70, "设置定时发布...")
- await self.set_schedule_time(params.publish_date)
-
- self.report_progress(80, "正在发布...")
- print(f"[{self.platform_name}] 查找发布按钮...")
-
- # 点击发布
- publish_clicked = False
- for i in range(30):
- try:
- publish_btn = self.page.get_by_role('button', name="发布", exact=True)
- btn_count = await publish_btn.count()
- print(f"[{self.platform_name}] 发布按钮数量: {btn_count}")
-
- if btn_count > 0:
- print(f"[{self.platform_name}] 点击发布按钮...")
- await publish_btn.click()
- publish_clicked = True
-
- await self.page.wait_for_url(
- "https://creator.douyin.com/creator-micro/content/manage",
- timeout=5000
- )
- self.report_progress(100, "发布成功")
- print(f"[{self.platform_name}] 发布成功! 已跳转到内容管理页面")
- return PublishResult(
- success=True,
- platform=self.platform_name,
- message="发布成功"
- )
- except Exception as e:
- current_url = self.page.url
- print(f"[{self.platform_name}] 尝试 {i+1}/30, 当前URL: {current_url}")
-
- if "content/manage" in current_url:
- self.report_progress(100, "发布成功")
- print(f"[{self.platform_name}] 发布成功! 已在内容管理页面")
- return PublishResult(
- success=True,
- platform=self.platform_name,
- message="发布成功"
- )
-
- # 检查是否有错误提示
- try:
- error_toast = self.page.locator('[class*="toast"][class*="error"], [class*="error-tip"]')
- if await error_toast.count() > 0:
- error_text = await error_toast.first.text_content()
- if error_text:
- print(f"[{self.platform_name}] 检测到错误提示: {error_text}")
- raise Exception(f"发布失败: {error_text}")
- except:
- pass
-
- await asyncio.sleep(1)
-
- # 发布超时,保存截图
- screenshot_path = f"debug_publish_timeout_{self.platform_name}.png"
- await self.page.screenshot(path=screenshot_path, full_page=True)
- print(f"[{self.platform_name}] 发布超时,截图保存到: {screenshot_path}")
- raise Exception(f"发布超时(截图: {screenshot_path})")
-
- async def get_works(self, cookies: str, page: int = 0, page_size: int = 20) -> WorksResult:
- """获取抖音作品列表"""
- print(f"\n{'='*60}")
- print(f"[{self.platform_name}] 获取作品列表")
- print(f"[{self.platform_name}] page={page}, page_size={page_size}")
- print(f"{'='*60}")
-
- works: List[WorkItem] = []
- total = 0
- has_more = False
-
- try:
- await self.init_browser()
- cookie_list = self.parse_cookies(cookies)
- await self.set_cookies(cookie_list)
-
- if not self.page:
- raise Exception("Page not initialized")
-
- # 访问创作者中心首页以触发登录验证
- await self.page.goto("https://creator.douyin.com/creator-micro/home")
- await asyncio.sleep(3)
-
- # 检查登录状态
- current_url = self.page.url
- if "login" in current_url or "passport" in current_url:
- raise Exception("Cookie 已过期,请重新登录")
-
- # 调用作品列表 API
- cursor = page * page_size
- api_url = f"https://creator.douyin.com/janus/douyin/creator/pc/work_list?scene=star_atlas&device_platform=android&count={page_size}&max_cursor={cursor}&cookie_enabled=true&browser_language=zh-CN&browser_platform=Win32&browser_name=Mozilla&browser_online=true&timezone_name=Asia%2FShanghai&aid=1128"
-
- response = await self.page.evaluate(f'''
- async () => {{
- const resp = await fetch("{api_url}", {{
- credentials: 'include',
- headers: {{ 'Accept': 'application/json' }}
- }});
- return await resp.json();
- }}
- ''')
-
- print(f"[{self.platform_name}] API 响应: has_more={response.get('has_more')}, aweme_list={len(response.get('aweme_list', []))}")
-
- aweme_list = response.get('aweme_list', [])
- has_more = response.get('has_more', False)
-
- for aweme in aweme_list:
- aweme_id = str(aweme.get('aweme_id', ''))
- if not aweme_id:
- continue
-
- statistics = aweme.get('statistics', {})
-
- # 获取封面
- cover_url = ''
- if aweme.get('Cover', {}).get('url_list'):
- cover_url = aweme['Cover']['url_list'][0]
- elif aweme.get('video', {}).get('cover', {}).get('url_list'):
- cover_url = aweme['video']['cover']['url_list'][0]
-
- # 获取标题
- title = aweme.get('item_title', '') or aweme.get('desc', '').split('\n')[0][:50] or '无标题'
-
- # 获取时长(毫秒转秒)
- duration = aweme.get('video', {}).get('duration', 0) // 1000
-
- # 获取发布时间
- create_time = aweme.get('create_time', 0)
- publish_time = datetime.fromtimestamp(create_time).strftime('%Y-%m-%d %H:%M:%S') if create_time else ''
-
- works.append(WorkItem(
- work_id=aweme_id,
- title=title,
- cover_url=cover_url,
- duration=duration,
- status='published',
- publish_time=publish_time,
- play_count=int(statistics.get('play_count', 0)),
- like_count=int(statistics.get('digg_count', 0)),
- comment_count=int(statistics.get('comment_count', 0)),
- share_count=int(statistics.get('share_count', 0)),
- ))
-
- total = len(works)
- print(f"[{self.platform_name}] 获取到 {total} 个作品")
-
- except Exception as e:
- import traceback
- traceback.print_exc()
- return WorksResult(
- success=False,
- platform=self.platform_name,
- error=str(e)
- )
-
- return WorksResult(
- success=True,
- platform=self.platform_name,
- works=works,
- total=total,
- has_more=has_more
- )
-
- async def get_comments(self, cookies: str, work_id: str, cursor: str = "") -> CommentsResult:
- """获取抖音作品评论 - 通过访问视频详情页拦截评论 API"""
- print(f"\n{'='*60}")
- print(f"[{self.platform_name}] 获取作品评论")
- print(f"[{self.platform_name}] work_id={work_id}, cursor={cursor}")
- print(f"{'='*60}")
-
- comments: List[CommentItem] = []
- total = 0
- has_more = False
- next_cursor = ""
- captured_data = {}
-
- try:
- await self.init_browser()
- cookie_list = self.parse_cookies(cookies)
- await self.set_cookies(cookie_list)
-
- if not self.page:
- raise Exception("Page not initialized")
-
- # 设置 API 响应监听器
- async def handle_response(response):
- nonlocal captured_data
- url = response.url
- # 监听评论列表 API - 抖音视频页面使用的 API
- # /aweme/v1/web/comment/list/ 或 /comment/list/
- if '/comment/list' in url and ('aweme_id' in url or work_id in url):
- try:
- json_data = await response.json()
- print(f"[{self.platform_name}] 捕获到评论 API: {url[:100]}...", flush=True)
- # 检查响应是否成功
- if json_data.get('status_code') == 0 or json_data.get('comments'):
- captured_data = json_data
- comment_count = len(json_data.get('comments', []))
- print(f"[{self.platform_name}] 评论 API 响应成功: comments={comment_count}, has_more={json_data.get('has_more')}", flush=True)
- except Exception as e:
- print(f"[{self.platform_name}] 解析评论响应失败: {e}", flush=True)
-
- self.page.on('response', handle_response)
- print(f"[{self.platform_name}] 已注册评论 API 响应监听器", flush=True)
-
- # 访问视频详情页 - 这会自动触发评论 API 请求
- video_url = f"https://www.douyin.com/video/{work_id}"
- print(f"[{self.platform_name}] 访问视频详情页: {video_url}", flush=True)
- await self.page.goto(video_url, wait_until="domcontentloaded", timeout=30000)
- await asyncio.sleep(5)
-
- # 检查登录状态
- current_url = self.page.url
- if "login" in current_url or "passport" in current_url:
- raise Exception("Cookie 已过期,请重新登录")
-
- # 等待评论加载
- if not captured_data:
- print(f"[{self.platform_name}] 等待评论 API 响应...", flush=True)
- # 尝试滚动页面触发评论加载
- await self.page.evaluate('window.scrollBy(0, 300)')
- await asyncio.sleep(3)
-
- if not captured_data:
- # 再等待一会
- await asyncio.sleep(3)
-
- # 移除监听器
- self.page.remove_listener('response', handle_response)
-
- # 解析评论数据
- if captured_data:
- comment_list = captured_data.get('comments') or []
- has_more = captured_data.get('has_more', False) or captured_data.get('has_more', 0) == 1
- next_cursor = str(captured_data.get('cursor', ''))
- total = captured_data.get('total', 0) or len(comment_list)
-
- print(f"[{self.platform_name}] 解析评论: total={total}, has_more={has_more}, comments={len(comment_list)}", flush=True)
-
- for comment in comment_list:
- cid = str(comment.get('cid', ''))
- if not cid:
- continue
-
- user = comment.get('user', {})
-
- # 解析回复列表
- replies = []
- reply_list = comment.get('reply_comment', []) or []
- for reply in reply_list:
- reply_user = reply.get('user', {})
- replies.append(CommentItem(
- comment_id=str(reply.get('cid', '')),
- work_id=work_id,
- content=reply.get('text', ''),
- author_id=str(reply_user.get('uid', '')),
- author_name=reply_user.get('nickname', ''),
- author_avatar=reply_user.get('avatar_thumb', {}).get('url_list', [''])[0] if reply_user.get('avatar_thumb') else '',
- like_count=int(reply.get('digg_count', 0)),
- create_time=datetime.fromtimestamp(reply.get('create_time', 0)).strftime('%Y-%m-%d %H:%M:%S') if reply.get('create_time') else '',
- is_author=reply.get('is_author', False),
- ))
-
- comments.append(CommentItem(
- comment_id=cid,
- work_id=work_id,
- content=comment.get('text', ''),
- author_id=str(user.get('uid', '')),
- author_name=user.get('nickname', ''),
- author_avatar=user.get('avatar_thumb', {}).get('url_list', [''])[0] if user.get('avatar_thumb') else '',
- like_count=int(comment.get('digg_count', 0)),
- reply_count=int(comment.get('reply_comment_total', 0)),
- create_time=datetime.fromtimestamp(comment.get('create_time', 0)).strftime('%Y-%m-%d %H:%M:%S') if comment.get('create_time') else '',
- is_author=comment.get('is_author', False),
- replies=replies,
- ))
-
- print(f"[{self.platform_name}] 解析到 {len(comments)} 条评论", flush=True)
- else:
- print(f"[{self.platform_name}] 未捕获到评论 API 响应", flush=True)
-
- except Exception as e:
- import traceback
- traceback.print_exc()
- return CommentsResult(
- success=False,
- platform=self.platform_name,
- work_id=work_id,
- error=str(e)
- )
- finally:
- await self.close_browser()
-
- result = CommentsResult(
- success=True,
- platform=self.platform_name,
- work_id=work_id,
- comments=comments,
- total=total,
- has_more=has_more
- )
- result.__dict__['cursor'] = next_cursor
- return result
-
- async def get_all_comments(self, cookies: str) -> dict:
- """获取所有作品的评论 - 通过评论管理页面"""
- print(f"\n{'='*60}")
- print(f"[{self.platform_name}] 获取所有作品评论")
- print(f"{'='*60}")
-
- all_work_comments = []
- captured_comments = []
- captured_works = {} # work_id -> work_info
-
- try:
- await self.init_browser()
- cookie_list = self.parse_cookies(cookies)
- await self.set_cookies(cookie_list)
-
- if not self.page:
- raise Exception("Page not initialized")
-
- # 设置 API 响应监听器
- async def handle_response(response):
- nonlocal captured_comments, captured_works
- url = response.url
- try:
- # 监听评论列表 API - 多种格式
- # /comment/list/select/ 或 /comment/read 或 /creator/comment/list
- if '/comment/list' in url or '/comment/read' in url or 'comment_list' in url:
- json_data = await response.json()
- print(f"[{self.platform_name}] 捕获到评论 API: {url[:100]}...", flush=True)
-
- # 格式1: comments 字段
- comments = json_data.get('comments', [])
- # 格式2: comment_info_list 字段
- if not comments:
- comments = json_data.get('comment_info_list', [])
-
- if comments:
- # 从 URL 中提取 aweme_id
- import re
- aweme_id_match = re.search(r'aweme_id=(\d+)', url)
- aweme_id = aweme_id_match.group(1) if aweme_id_match else ''
-
- for comment in comments:
- # 添加 aweme_id 到评论中
- if aweme_id and 'aweme_id' not in comment:
- comment['aweme_id'] = aweme_id
- captured_comments.append(comment)
-
- print(f"[{self.platform_name}] 捕获到 {len(comments)} 条评论 (aweme_id={aweme_id}),总计: {len(captured_comments)}", flush=True)
-
- # 监听作品列表 API
- if '/work_list' in url or '/item/list' in url or '/creator/item' in url:
- json_data = await response.json()
- aweme_list = json_data.get('aweme_list', []) or json_data.get('item_info_list', []) or json_data.get('item_list', [])
- print(f"[{self.platform_name}] 捕获到作品列表 API: {len(aweme_list)} 个作品", flush=True)
- for aweme in aweme_list:
- aweme_id = str(aweme.get('aweme_id', '') or aweme.get('item_id', '') or aweme.get('item_id_plain', ''))
- if aweme_id:
- cover_url = ''
- if aweme.get('Cover', {}).get('url_list'):
- cover_url = aweme['Cover']['url_list'][0]
- elif aweme.get('video', {}).get('cover', {}).get('url_list'):
- cover_url = aweme['video']['cover']['url_list'][0]
- elif aweme.get('cover_image_url'):
- cover_url = aweme['cover_image_url']
-
- captured_works[aweme_id] = {
- 'title': aweme.get('item_title', '') or aweme.get('title', '') or aweme.get('desc', ''),
- 'cover': cover_url,
- 'comment_count': aweme.get('statistics', {}).get('comment_count', 0) or aweme.get('comment_count', 0),
- }
- except Exception as e:
- print(f"[{self.platform_name}] 解析响应失败: {e}", flush=True)
-
- self.page.on('response', handle_response)
- print(f"[{self.platform_name}] 已注册 API 响应监听器", flush=True)
-
- # 访问评论管理页面
- print(f"[{self.platform_name}] 访问评论管理页面...", flush=True)
- await self.page.goto("https://creator.douyin.com/creator-micro/interactive/comment", wait_until="domcontentloaded", timeout=30000)
- await asyncio.sleep(5)
-
- # 检查登录状态
- current_url = self.page.url
- if "login" in current_url or "passport" in current_url:
- raise Exception("Cookie 已过期,请重新登录")
-
- print(f"[{self.platform_name}] 页面加载完成,当前捕获: {len(captured_comments)} 条评论, {len(captured_works)} 个作品", flush=True)
-
- # 尝试点击"选择作品"来加载作品列表
- try:
- select_btn = await self.page.query_selector('text="选择作品"')
- if select_btn:
- print(f"[{self.platform_name}] 点击选择作品按钮...", flush=True)
- await select_btn.click()
- await asyncio.sleep(3)
-
- # 获取作品列表
- work_items = await self.page.query_selector_all('[class*="work-item"], [class*="video-item"], [class*="aweme-item"]')
- print(f"[{self.platform_name}] 找到 {len(work_items)} 个作品元素", flush=True)
-
- # 点击每个作品加载其评论
- for i, item in enumerate(work_items[:10]): # 最多处理10个作品
- try:
- await item.click()
- await asyncio.sleep(2)
- print(f"[{self.platform_name}] 已点击作品 {i+1}/{min(len(work_items), 10)}", flush=True)
- except:
- pass
-
- # 关闭选择作品弹窗
- close_btn = await self.page.query_selector('[class*="close"], [class*="cancel"]')
- if close_btn:
- await close_btn.click()
- await asyncio.sleep(1)
- except Exception as e:
- print(f"[{self.platform_name}] 选择作品操作失败: {e}", flush=True)
-
- # 滚动加载更多评论
- for i in range(5):
- await self.page.evaluate('window.scrollBy(0, 500)')
- await asyncio.sleep(1)
-
- await asyncio.sleep(3)
-
- # 移除监听器
- self.page.remove_listener('response', handle_response)
-
- print(f"[{self.platform_name}] 最终捕获: {len(captured_comments)} 条评论, {len(captured_works)} 个作品", flush=True)
-
- # 按作品分组评论
- work_comments_map = {} # work_id -> work_comments
- for comment in captured_comments:
- # 从评论中获取作品信息
- aweme = comment.get('aweme', {}) or comment.get('item', {})
- aweme_id = str(comment.get('aweme_id', '') or aweme.get('aweme_id', '') or aweme.get('item_id', ''))
-
- if not aweme_id:
- continue
-
- if aweme_id not in work_comments_map:
- work_info = captured_works.get(aweme_id, {})
- work_comments_map[aweme_id] = {
- 'work_id': aweme_id,
- 'title': aweme.get('title', '') or aweme.get('desc', '') or work_info.get('title', ''),
- 'cover_url': aweme.get('cover', {}).get('url_list', [''])[0] if aweme.get('cover') else work_info.get('cover', ''),
- 'comments': []
- }
-
- cid = str(comment.get('cid', ''))
- if not cid:
- continue
-
- user = comment.get('user', {})
-
- work_comments_map[aweme_id]['comments'].append({
- 'comment_id': cid,
- 'author_id': str(user.get('uid', '')),
- 'author_name': user.get('nickname', ''),
- 'author_avatar': user.get('avatar_thumb', {}).get('url_list', [''])[0] if user.get('avatar_thumb') else '',
- 'content': comment.get('text', ''),
- 'like_count': int(comment.get('digg_count', 0)),
- 'create_time': datetime.fromtimestamp(comment.get('create_time', 0)).strftime('%Y-%m-%d %H:%M:%S') if comment.get('create_time') else '',
- 'is_author': comment.get('is_author', False),
- })
-
- all_work_comments = list(work_comments_map.values())
- total_comments = sum(len(w['comments']) for w in all_work_comments)
- print(f"[{self.platform_name}] 获取到 {len(all_work_comments)} 个作品的 {total_comments} 条评论", flush=True)
-
- except Exception as e:
- import traceback
- traceback.print_exc()
- return {
- 'success': False,
- 'platform': self.platform_name,
- 'error': str(e),
- 'work_comments': []
- }
- finally:
- await self.close_browser()
-
- return {
- 'success': True,
- 'platform': self.platform_name,
- 'work_comments': all_work_comments,
- 'total': len(all_work_comments)
- }
|