# -*- coding: utf-8 -*- """ 抖音视频发布器 参考: matrix/douyin_uploader/main.py """ import asyncio import os import json import re from datetime import datetime from typing import List from .base import ( BasePublisher, PublishParams, PublishResult, WorkItem, WorksResult, CommentItem, CommentsResult ) class DouyinPublisher(BasePublisher): """ 抖音视频发布器 使用 Playwright 自动化操作抖音创作者中心 """ platform_name = "douyin" login_url = "https://creator.douyin.com/" publish_url = "https://creator.douyin.com/creator-micro/content/upload" cookie_domain = ".douyin.com" async def set_schedule_time(self, publish_date: datetime): """设置定时发布""" if not self.page: return # 选择定时发布 label_element = self.page.locator("label.radio-d4zkru:has-text('定时发布')") await label_element.click() await asyncio.sleep(1) # 输入时间 publish_date_str = publish_date.strftime("%Y-%m-%d %H:%M") await self.page.locator('.semi-input[placeholder="日期和时间"]').click() await self.page.keyboard.press("Control+KeyA") await self.page.keyboard.type(str(publish_date_str)) await self.page.keyboard.press("Enter") await asyncio.sleep(1) async def handle_upload_error(self, video_path: str): """处理上传错误,重新上传""" if not self.page: return print(f"[{self.platform_name}] 视频出错了,重新上传中...") await self.page.locator('div.progress-div [class^="upload-btn-input"]').set_input_files(video_path) async def check_captcha(self) -> dict: """ 检查页面是否需要验证码 返回: {'need_captcha': bool, 'captcha_type': str} """ if not self.page: return {'need_captcha': False, 'captcha_type': ''} try: # 检查手机验证码弹窗 phone_captcha_selectors = [ 'text="请输入验证码"', 'text="输入手机验证码"', 'text="获取验证码"', 'text="手机号验证"', '[class*="captcha"][class*="phone"]', '[class*="verify"][class*="phone"]', '[class*="sms-code"]', 'input[placeholder*="验证码"]', ] for selector in phone_captcha_selectors: try: if await self.page.locator(selector).count() > 0: print(f"[{self.platform_name}] 检测到手机验证码: {selector}", flush=True) return {'need_captcha': True, 'captcha_type': 'phone'} except: pass # 检查滑块验证码 slider_captcha_selectors = [ '[class*="captcha"][class*="slider"]', '[class*="slide-verify"]', '[class*="drag-verify"]', 'text="按住滑块"', 'text="向右滑动"', 'text="拖动滑块"', ] for selector in slider_captcha_selectors: try: if await self.page.locator(selector).count() > 0: print(f"[{self.platform_name}] 检测到滑块验证码: {selector}", flush=True) return {'need_captcha': True, 'captcha_type': 'slider'} except: pass # 检查图片验证码 image_captcha_selectors = [ '[class*="captcha"][class*="image"]', '[class*="verify-image"]', 'text="点击图片"', 'text="选择正确的"', ] for selector in image_captcha_selectors: try: if await self.page.locator(selector).count() > 0: print(f"[{self.platform_name}] 检测到图片验证码: {selector}", flush=True) return {'need_captcha': True, 'captcha_type': 'image'} except: pass # 检查登录弹窗(Cookie 过期) login_selectors = [ 'text="请先登录"', 'text="登录后继续"', '[class*="login-modal"]', '[class*="login-dialog"]', ] for selector in login_selectors: try: if await self.page.locator(selector).count() > 0: print(f"[{self.platform_name}] 检测到需要登录: {selector}", flush=True) return {'need_captcha': True, 'captcha_type': 'login'} except: pass except Exception as e: print(f"[{self.platform_name}] 验证码检测异常: {e}", flush=True) return {'need_captcha': False, 'captcha_type': ''} async def handle_phone_captcha(self) -> bool: if not self.page: return False try: body_text = "" try: body_text = await self.page.inner_text("body") except: body_text = "" phone_match = re.search(r"(1\d{2}\*{4}\d{4})", body_text or "") masked_phone = phone_match.group(1) if phone_match else "" async def _get_send_button(): candidates = [ self.page.get_by_role("button", name="获取验证码"), self.page.get_by_role("button", name="发送验证码"), self.page.locator('button:has-text("获取验证码")'), self.page.locator('button:has-text("发送验证码")'), self.page.locator('[role="button"]:has-text("获取验证码")'), self.page.locator('[role="button"]:has-text("发送验证码")'), ] for c in candidates: try: if await c.count() > 0 and await c.first.is_visible(): return c.first except: continue return None async def _confirm_sent() -> bool: try: txt = "" try: txt = await self.page.inner_text("body") except: txt = "" if re.search(r"(\d+\s*秒)|(\d+\s*s)|后可重试|重新发送|已发送", txt or ""): return True except: pass try: btn = await _get_send_button() if btn: disabled = await btn.is_disabled() if disabled: return True label = (await btn.inner_text()) if btn else "" if re.search(r"(\d+\s*秒)|(\d+\s*s)|后可重试|重新发送|已发送", label or ""): return True except: pass return False did_click_send = False btn = await _get_send_button() if btn: try: if await btn.is_enabled(): await btn.click(timeout=5000) did_click_send = True print(f"[{self.platform_name}] 已点击发送短信验证码", flush=True) except Exception as e: print(f"[{self.platform_name}] 点击发送验证码按钮失败: {e}", flush=True) if did_click_send: try: await self.page.wait_for_timeout(800) except: pass sent_confirmed = await _confirm_sent() if did_click_send else False ai_state = await self.ai_analyze_sms_send_state() try: if ai_state.get("sent_likely"): sent_confirmed = True except: pass if (not did_click_send or not sent_confirmed) and ai_state.get("suggested_action") == "click_send": btn2 = await _get_send_button() if btn2: try: if await btn2.is_enabled(): await btn2.click(timeout=5000) did_click_send = True await self.page.wait_for_timeout(800) sent_confirmed = await _confirm_sent() ai_state = await self.ai_analyze_sms_send_state() if ai_state.get("sent_likely"): sent_confirmed = True except: pass code_hint = "请输入短信验证码。" if ai_state.get("block_reason") == "slider": code_hint = "检测到滑块/人机验证阻塞,请先在浏览器窗口完成验证后再发送短信验证码。" elif ai_state.get("block_reason") in ["rate_limit", "risk"]: code_hint = f"页面提示可能被限制/风控({ai_state.get('notes','') or '请稍后重试'})。可稍等后重新发送验证码。" elif not did_click_send: code_hint = "未找到或无法点击“发送验证码”按钮,请在弹出的浏览器页面手动点击发送后再输入验证码。" elif sent_confirmed: code_hint = f"已检测到短信验证码已发送({ai_state.get('notes','') or '请查收短信'})。" else: code_hint = f"已尝试点击发送验证码,但未确认发送成功({ai_state.get('notes','') or '请查看是否出现倒计时/重新发送'})。" code = await self.request_sms_code_from_frontend(masked_phone, message=code_hint) input_selectors = [ 'input[placeholder*="验证码"]', 'input[placeholder*="短信"]', 'input[type="tel"]', 'input[type="text"]', ] filled = False for selector in input_selectors: try: el = self.page.locator(selector).first if await el.count() > 0: await el.fill(code) filled = True break except: continue if not filled: raise Exception("未找到验证码输入框") submit_selectors = [ 'button:has-text("确定")', 'button:has-text("确认")', 'button:has-text("提交")', 'button:has-text("完成")', ] for selector in submit_selectors: try: btn = self.page.locator(selector).first if await btn.count() > 0: await btn.click() break except: continue try: await self.page.wait_for_timeout(1000) await self.page.wait_for_selector('text="请输入验证码"', state="hidden", timeout=15000) except: pass print(f"[{self.platform_name}] 短信验证码已提交,继续执行发布流程", flush=True) return True except Exception as e: print(f"[{self.platform_name}] 处理短信验证码失败: {e}", flush=True) return False async def publish(self, cookies: str, params: PublishParams) -> PublishResult: """发布视频到抖音 - 参考 matrix/douyin_uploader/main.py""" print(f"\n{'='*60}") print(f"[{self.platform_name}] 开始发布视频") print(f"[{self.platform_name}] 视频路径: {params.video_path}") print(f"[{self.platform_name}] 标题: {params.title}") print(f"[{self.platform_name}] Headless: {self.headless}") print(f"{'='*60}") self.report_progress(5, "正在初始化浏览器...") # 初始化浏览器 await self.init_browser() print(f"[{self.platform_name}] 浏览器初始化完成") # 解析并设置 cookies cookie_list = self.parse_cookies(cookies) print(f"[{self.platform_name}] 解析到 {len(cookie_list)} 个 cookies") await self.set_cookies(cookie_list) if not self.page: raise Exception("Page not initialized") # 检查视频文件 if not os.path.exists(params.video_path): raise Exception(f"视频文件不存在: {params.video_path}") print(f"[{self.platform_name}] 视频文件存在,大小: {os.path.getsize(params.video_path)} bytes") self.report_progress(10, "正在打开上传页面...") # 访问上传页面 - 参考 matrix await self.page.goto("https://creator.douyin.com/creator-micro/content/upload") print(f"[{self.platform_name}] 等待页面加载...") try: await self.page.wait_for_url("https://creator.douyin.com/creator-micro/content/upload", timeout=30000) except: pass await asyncio.sleep(3) # 检查当前 URL 和页面状态 current_url = self.page.url print(f"[{self.platform_name}] 当前 URL: {current_url}") async def wait_for_manual_login(timeout_seconds: int = 300) -> bool: if not self.page: return False self.report_progress(8, "检测到需要登录,请在浏览器窗口完成登录...") try: await self.page.bring_to_front() except: pass waited = 0 while waited < timeout_seconds: try: url = self.page.url if "login" not in url and "passport" not in url: if "creator.douyin.com" in url: return True await asyncio.sleep(2) waited += 2 except: await asyncio.sleep(2) waited += 2 return False # 检查是否在登录页面或需要登录 if "login" in current_url or "passport" in current_url: if not self.headless: logged_in = await wait_for_manual_login() if logged_in: try: if self.context: cookies_after = await self.context.cookies() await self.sync_cookies_to_node(cookies_after) except: pass await self.page.goto("https://creator.douyin.com/creator-micro/content/upload") await asyncio.sleep(3) current_url = self.page.url else: screenshot_base64 = await self.capture_screenshot() return PublishResult( success=False, platform=self.platform_name, error="需要登录:请在浏览器窗口完成登录后重试", need_captcha=True, captcha_type='login', screenshot_base64=screenshot_base64, page_url=current_url, status='need_captcha' ) else: screenshot_base64 = await self.capture_screenshot() return PublishResult( success=False, platform=self.platform_name, error="Cookie 已过期,需要重新登录", need_captcha=True, captcha_type='login', screenshot_base64=screenshot_base64, page_url=current_url, status='need_captcha' ) # 使用 AI 检测验证码 ai_captcha_result = await self.ai_check_captcha() if ai_captcha_result['has_captcha']: print(f"[{self.platform_name}] AI检测到验证码: {ai_captcha_result['captcha_type']}", flush=True) screenshot_base64 = await self.capture_screenshot() return PublishResult( success=False, platform=self.platform_name, error=f"检测到{ai_captcha_result['captcha_type']}验证码,需要使用有头浏览器完成验证", need_captcha=True, captcha_type=ai_captcha_result['captcha_type'], screenshot_base64=screenshot_base64, page_url=current_url, status='need_captcha' ) # 传统方式检测验证码 captcha_result = await self.check_captcha() if captcha_result['need_captcha']: print(f"[{self.platform_name}] 传统方式检测到验证码: {captcha_result['captcha_type']}", flush=True) if captcha_result['captcha_type'] == 'phone': handled = await self.handle_phone_captcha() if handled: self.report_progress(12, "短信验证码已处理,继续发布...") else: screenshot_base64 = await self.capture_screenshot() return PublishResult( success=False, platform=self.platform_name, error="检测到手机验证码,但自动处理失败", need_captcha=True, captcha_type='phone', screenshot_base64=screenshot_base64, page_url=current_url, status='need_captcha' ) else: screenshot_base64 = await self.capture_screenshot() return PublishResult( success=False, platform=self.platform_name, error=f"需要{captcha_result['captcha_type']}验证码,请使用有头浏览器完成验证", need_captcha=True, captcha_type=captcha_result['captcha_type'], screenshot_base64=screenshot_base64, page_url=current_url, status='need_captcha' ) self.report_progress(15, "正在选择视频文件...") # 点击上传区域 - 参考 matrix: div.container-drag-info-Tl0RGH 或带 container-drag 的 div upload_selectors = [ "div[class*='container-drag-info']", "div[class*='container-drag']", "div.upload-btn", "div[class*='upload']", ] upload_success = False for selector in upload_selectors: try: upload_div = self.page.locator(selector).first if await upload_div.count() > 0: print(f"[{self.platform_name}] 找到上传区域: {selector}") async with self.page.expect_file_chooser(timeout=10000) as fc_info: await upload_div.click() file_chooser = await fc_info.value await file_chooser.set_files(params.video_path) upload_success = True print(f"[{self.platform_name}] 视频文件已选择") break except Exception as e: print(f"[{self.platform_name}] 选择器 {selector} 失败: {e}") if not upload_success: screenshot_base64 = await self.capture_screenshot() return PublishResult( success=False, platform=self.platform_name, error="未找到上传入口", screenshot_base64=screenshot_base64, page_url=await self.get_page_url(), status='failed' ) # 等待跳转到发布页面 - 参考 matrix self.report_progress(20, "等待进入发布页面...") for i in range(60): try: # matrix 等待的 URL: https://creator.douyin.com/creator-micro/content/post/video?enter_from=publish_page await self.page.wait_for_url( "https://creator.douyin.com/creator-micro/content/post/video*", timeout=2000 ) print(f"[{self.platform_name}] 已进入发布页面") break except: print(f"[{self.platform_name}] 等待进入发布页面... {i+1}/60") await asyncio.sleep(1) await asyncio.sleep(2) self.report_progress(30, "正在填充标题和话题...") # 填写标题 - 参考 matrix title_input = self.page.get_by_text('作品标题').locator("..").locator( "xpath=following-sibling::div[1]").locator("input") if await title_input.count(): await title_input.fill(params.title[:30]) print(f"[{self.platform_name}] 标题已填写") else: # 备用方式 - 参考 matrix title_container = self.page.locator(".notranslate") await title_container.click() await self.page.keyboard.press("Backspace") await self.page.keyboard.press("Control+KeyA") await self.page.keyboard.press("Delete") await self.page.keyboard.type(params.title) await self.page.keyboard.press("Enter") print(f"[{self.platform_name}] 标题已填写(备用方式)") # 添加话题标签 - 参考 matrix if params.tags: css_selector = ".zone-container" for index, tag in enumerate(params.tags, start=1): print(f"[{self.platform_name}] 正在添加第{index}个话题: #{tag}") await self.page.type(css_selector, "#" + tag) await self.page.press(css_selector, "Space") self.report_progress(40, "等待视频上传完成...") # 等待视频上传完成 - 参考 matrix: 检测"重新上传"按钮 for i in range(120): try: count = await self.page.locator("div").filter(has_text="重新上传").count() if count > 0: print(f"[{self.platform_name}] 视频上传完毕") break else: print(f"[{self.platform_name}] 正在上传视频中... {i+1}/120") # 检查上传错误 if await self.page.locator('div.progress-div > div:has-text("上传失败")').count(): print(f"[{self.platform_name}] 发现上传出错了,重新上传...") await self.handle_upload_error(params.video_path) await asyncio.sleep(3) except: print(f"[{self.platform_name}] 正在上传视频中...") await asyncio.sleep(3) self.report_progress(60, "处理视频设置...") # 点击"我知道了"弹窗 - 参考 matrix known_count = await self.page.get_by_role("button", name="我知道了").count() if known_count > 0: await self.page.get_by_role("button", name="我知道了").nth(0).click() print(f"[{self.platform_name}] 关闭弹窗") await asyncio.sleep(5) # 设置位置 - 参考 matrix try: await self.page.locator('div.semi-select span:has-text("输入地理位置")').click() await asyncio.sleep(1) await self.page.keyboard.press("Backspace") await self.page.keyboard.press("Control+KeyA") await self.page.keyboard.press("Delete") await self.page.keyboard.type(params.location) await asyncio.sleep(1) await self.page.locator('div[role="listbox"] [role="option"]').first.click() print(f"[{self.platform_name}] 位置设置成功: {params.location}") except Exception as e: print(f"[{self.platform_name}] 设置位置失败: {e}") # 开启头条/西瓜同步 - 参考 matrix try: third_part_element = '[class^="info"] > [class^="first-part"] div div.semi-switch' if await self.page.locator(third_part_element).count(): class_name = await self.page.eval_on_selector( third_part_element, 'div => div.className') if 'semi-switch-checked' not in class_name: await self.page.locator(third_part_element).locator( 'input.semi-switch-native-control').click() print(f"[{self.platform_name}] 已开启头条/西瓜同步") except: pass # 定时发布 if params.publish_date: self.report_progress(70, "设置定时发布...") await self.set_schedule_time(params.publish_date) self.report_progress(80, "正在发布...") print(f"[{self.platform_name}] 查找发布按钮...") # 点击发布 - 参考 matrix for i in range(30): try: # 检查验证码(不要在每次循环都调 AI,太慢) if i % 5 == 0: ai_captcha = await self.ai_check_captcha() if ai_captcha['has_captcha']: print(f"[{self.platform_name}] AI检测到发布过程中需要验证码: {ai_captcha['captcha_type']}", flush=True) if ai_captcha['captcha_type'] == 'phone': handled = await self.handle_phone_captcha() if handled: continue screenshot_base64 = await self.capture_screenshot() page_url = await self.get_page_url() return PublishResult( success=False, platform=self.platform_name, error=f"发布过程中需要{ai_captcha['captcha_type']}验证码,请使用有头浏览器完成验证", need_captcha=True, captcha_type=ai_captcha['captcha_type'], screenshot_base64=screenshot_base64, page_url=page_url, status='need_captcha' ) publish_btn = self.page.get_by_role('button', name="发布", exact=True) btn_count = await publish_btn.count() if btn_count > 0: print(f"[{self.platform_name}] 点击发布按钮...") await publish_btn.click() # 等待跳转到内容管理页面 - 参考 matrix await self.page.wait_for_url( "https://creator.douyin.com/creator-micro/content/manage", timeout=5000 ) self.report_progress(100, "发布成功") print(f"[{self.platform_name}] 视频发布成功!") screenshot_base64 = await self.capture_screenshot() page_url = await self.get_page_url() return PublishResult( success=True, platform=self.platform_name, message="发布成功", screenshot_base64=screenshot_base64, page_url=page_url, status='success' ) except Exception as e: current_url = self.page.url # 检查是否已经在管理页面 if "https://creator.douyin.com/creator-micro/content/manage" in current_url: self.report_progress(100, "发布成功") print(f"[{self.platform_name}] 视频发布成功!") screenshot_base64 = await self.capture_screenshot() return PublishResult( success=True, platform=self.platform_name, message="发布成功", screenshot_base64=screenshot_base64, page_url=current_url, status='success' ) else: print(f"[{self.platform_name}] 视频正在发布中... {i+1}/30, URL: {current_url}") await asyncio.sleep(1) # 发布超时 print(f"[{self.platform_name}] 发布超时,获取截图...") screenshot_base64 = await self.capture_screenshot() page_url = await self.get_page_url() return PublishResult( success=False, platform=self.platform_name, error="发布超时,请检查发布状态", screenshot_base64=screenshot_base64, page_url=page_url, status='need_action' ) async def get_works(self, cookies: str, page: int = 0, page_size: int = 20) -> WorksResult: """获取抖音作品列表""" print(f"\n{'='*60}") print(f"[{self.platform_name}] 获取作品列表") print(f"[{self.platform_name}] page={page}, page_size={page_size}") print(f"{'='*60}") works: List[WorkItem] = [] total = 0 has_more = False try: await self.init_browser() cookie_list = self.parse_cookies(cookies) await self.set_cookies(cookie_list) if not self.page: raise Exception("Page not initialized") # 访问创作者中心首页以触发登录验证 await self.page.goto("https://creator.douyin.com/creator-micro/home") await asyncio.sleep(3) # 检查登录状态 current_url = self.page.url if "login" in current_url or "passport" in current_url: raise Exception("Cookie 已过期,请重新登录") # 调用作品列表 API cursor = page * page_size # 移除 scene=star_atlas 和 aid=1128,使用更通用的参数 api_url = f"https://creator.douyin.com/janus/douyin/creator/pc/work_list?status=0&device_platform=android&count={page_size}&max_cursor={cursor}&cookie_enabled=true&browser_language=zh-CN&browser_platform=Win32&browser_name=Mozilla&browser_online=true&timezone_name=Asia%2FShanghai" response = await self.page.evaluate(f''' async () => {{ try {{ const resp = await fetch("{api_url}", {{ credentials: 'include', headers: {{ 'Accept': 'application/json' }} }}); return await resp.json(); }} catch (e) {{ return {{ error: e.toString() }}; }} }} ''') if response.get('error'): print(f"[{self.platform_name}] API 请求失败: {response.get('error')}", flush=True) print(f"[{self.platform_name}] API 响应: has_more={response.get('has_more')}, aweme_list={len(response.get('aweme_list', []))}") aweme_list = response.get('aweme_list', []) has_more = response.get('has_more', False) for aweme in aweme_list: aweme_id = str(aweme.get('aweme_id', '')) if not aweme_id: continue statistics = aweme.get('statistics', {}) # 打印调试信息,确认字段存在 # print(f"[{self.platform_name}] 作品 {aweme_id} 统计: {statistics}", flush=True) # 获取封面 cover_url = '' if aweme.get('Cover', {}).get('url_list'): cover_url = aweme['Cover']['url_list'][0] elif aweme.get('video', {}).get('cover', {}).get('url_list'): cover_url = aweme['video']['cover']['url_list'][0] # 获取标题 title = aweme.get('item_title', '') or aweme.get('desc', '').split('\n')[0][:50] or '无标题' # 获取时长(毫秒转秒) duration = aweme.get('video', {}).get('duration', 0) // 1000 # 获取发布时间 create_time = aweme.get('create_time', 0) publish_time = datetime.fromtimestamp(create_time).strftime('%Y-%m-%d %H:%M:%S') if create_time else '' works.append(WorkItem( work_id=aweme_id, title=title, cover_url=cover_url, duration=duration, status='published', publish_time=publish_time, play_count=int(statistics.get('play_count', 0)), like_count=int(statistics.get('digg_count', 0)), comment_count=int(statistics.get('comment_count', 0)), share_count=int(statistics.get('share_count', 0)), )) total = len(works) print(f"[{self.platform_name}] 获取到 {total} 个作品") except Exception as e: import traceback traceback.print_exc() return WorksResult( success=False, platform=self.platform_name, error=str(e) ) return WorksResult( success=True, platform=self.platform_name, works=works, total=total, has_more=has_more ) async def get_comments(self, cookies: str, work_id: str, cursor: str = "") -> CommentsResult: """获取抖音作品评论 - 通过访问视频详情页拦截评论 API""" print(f"\n{'='*60}") print(f"[{self.platform_name}] 获取作品评论") print(f"[{self.platform_name}] work_id={work_id}, cursor={cursor}") print(f"{'='*60}") comments: List[CommentItem] = [] total = 0 has_more = False next_cursor = "" captured_data = {} try: await self.init_browser() cookie_list = self.parse_cookies(cookies) await self.set_cookies(cookie_list) if not self.page: raise Exception("Page not initialized") # 设置 API 响应监听器 async def handle_response(response): nonlocal captured_data url = response.url # 监听评论列表 API - 抖音视频页面使用的 API # /aweme/v1/web/comment/list/ 或 /comment/list/ if '/comment/list' in url and ('aweme_id' in url or work_id in url): try: json_data = await response.json() print(f"[{self.platform_name}] 捕获到评论 API: {url[:100]}...", flush=True) # 检查响应是否成功 if json_data.get('status_code') == 0 or json_data.get('comments'): captured_data = json_data comment_count = len(json_data.get('comments', [])) print(f"[{self.platform_name}] 评论 API 响应成功: comments={comment_count}, has_more={json_data.get('has_more')}", flush=True) except Exception as e: print(f"[{self.platform_name}] 解析评论响应失败: {e}", flush=True) self.page.on('response', handle_response) print(f"[{self.platform_name}] 已注册评论 API 响应监听器", flush=True) # 访问视频详情页 - 这会自动触发评论 API 请求 video_url = f"https://www.douyin.com/video/{work_id}" print(f"[{self.platform_name}] 访问视频详情页: {video_url}", flush=True) await self.page.goto(video_url, wait_until="domcontentloaded", timeout=30000) await asyncio.sleep(5) # 检查登录状态 current_url = self.page.url if "login" in current_url or "passport" in current_url: raise Exception("Cookie 已过期,请重新登录") # 等待评论加载 if not captured_data: print(f"[{self.platform_name}] 等待评论 API 响应...", flush=True) # 尝试滚动页面触发评论加载 await self.page.evaluate('window.scrollBy(0, 300)') await asyncio.sleep(3) if not captured_data: # 再等待一会 await asyncio.sleep(3) # 移除监听器 self.page.remove_listener('response', handle_response) # 解析评论数据 if captured_data: comment_list = captured_data.get('comments') or [] has_more = captured_data.get('has_more', False) or captured_data.get('has_more', 0) == 1 next_cursor = str(captured_data.get('cursor', '')) total = captured_data.get('total', 0) or len(comment_list) print(f"[{self.platform_name}] 解析评论: total={total}, has_more={has_more}, comments={len(comment_list)}", flush=True) for comment in comment_list: cid = str(comment.get('cid', '')) if not cid: continue user = comment.get('user', {}) # 解析回复列表 replies = [] reply_list = comment.get('reply_comment', []) or [] for reply in reply_list: reply_user = reply.get('user', {}) replies.append(CommentItem( comment_id=str(reply.get('cid', '')), work_id=work_id, content=reply.get('text', ''), author_id=str(reply_user.get('uid', '')), author_name=reply_user.get('nickname', ''), author_avatar=reply_user.get('avatar_thumb', {}).get('url_list', [''])[0] if reply_user.get('avatar_thumb') else '', like_count=int(reply.get('digg_count', 0)), create_time=datetime.fromtimestamp(reply.get('create_time', 0)).strftime('%Y-%m-%d %H:%M:%S') if reply.get('create_time') else '', is_author=reply.get('is_author', False), )) comments.append(CommentItem( comment_id=cid, work_id=work_id, content=comment.get('text', ''), author_id=str(user.get('uid', '')), author_name=user.get('nickname', ''), author_avatar=user.get('avatar_thumb', {}).get('url_list', [''])[0] if user.get('avatar_thumb') else '', like_count=int(comment.get('digg_count', 0)), reply_count=int(comment.get('reply_comment_total', 0)), create_time=datetime.fromtimestamp(comment.get('create_time', 0)).strftime('%Y-%m-%d %H:%M:%S') if comment.get('create_time') else '', is_author=comment.get('is_author', False), replies=replies, )) print(f"[{self.platform_name}] 解析到 {len(comments)} 条评论", flush=True) else: print(f"[{self.platform_name}] 未捕获到评论 API 响应", flush=True) except Exception as e: import traceback traceback.print_exc() return CommentsResult( success=False, platform=self.platform_name, work_id=work_id, error=str(e) ) finally: await self.close_browser() result = CommentsResult( success=True, platform=self.platform_name, work_id=work_id, comments=comments, total=total, has_more=has_more ) result.__dict__['cursor'] = next_cursor return result async def get_all_comments(self, cookies: str) -> dict: """获取所有作品的评论 - 通过评论管理页面""" print(f"\n{'='*60}") print(f"[{self.platform_name}] 获取所有作品评论") print(f"{'='*60}") all_work_comments = [] captured_comments = [] captured_works = {} # work_id -> work_info try: await self.init_browser() cookie_list = self.parse_cookies(cookies) await self.set_cookies(cookie_list) if not self.page: raise Exception("Page not initialized") # 设置 API 响应监听器 async def handle_response(response): nonlocal captured_comments, captured_works url = response.url try: # 监听评论列表 API - 多种格式 # /comment/list/select/ 或 /comment/read 或 /creator/comment/list if '/comment/list' in url or '/comment/read' in url or 'comment_list' in url: json_data = await response.json() print(f"[{self.platform_name}] 捕获到评论 API: {url[:100]}...", flush=True) # 格式1: comments 字段 comments = json_data.get('comments', []) # 格式2: comment_info_list 字段 if not comments: comments = json_data.get('comment_info_list', []) if comments: # 从 URL 中提取 aweme_id import re aweme_id_match = re.search(r'aweme_id=(\d+)', url) aweme_id = aweme_id_match.group(1) if aweme_id_match else '' for comment in comments: # 添加 aweme_id 到评论中 if aweme_id and 'aweme_id' not in comment: comment['aweme_id'] = aweme_id captured_comments.append(comment) print(f"[{self.platform_name}] 捕获到 {len(comments)} 条评论 (aweme_id={aweme_id}),总计: {len(captured_comments)}", flush=True) # 监听作品列表 API if '/work_list' in url or '/item/list' in url or '/creator/item' in url: json_data = await response.json() aweme_list = json_data.get('aweme_list', []) or json_data.get('item_info_list', []) or json_data.get('item_list', []) print(f"[{self.platform_name}] 捕获到作品列表 API: {len(aweme_list)} 个作品", flush=True) for aweme in aweme_list: aweme_id = str(aweme.get('aweme_id', '') or aweme.get('item_id', '') or aweme.get('item_id_plain', '')) if aweme_id: cover_url = '' if aweme.get('Cover', {}).get('url_list'): cover_url = aweme['Cover']['url_list'][0] elif aweme.get('video', {}).get('cover', {}).get('url_list'): cover_url = aweme['video']['cover']['url_list'][0] elif aweme.get('cover_image_url'): cover_url = aweme['cover_image_url'] captured_works[aweme_id] = { 'title': aweme.get('item_title', '') or aweme.get('title', '') or aweme.get('desc', ''), 'cover': cover_url, 'comment_count': aweme.get('statistics', {}).get('comment_count', 0) or aweme.get('comment_count', 0), } except Exception as e: print(f"[{self.platform_name}] 解析响应失败: {e}", flush=True) self.page.on('response', handle_response) print(f"[{self.platform_name}] 已注册 API 响应监听器", flush=True) # 访问评论管理页面 print(f"[{self.platform_name}] 访问评论管理页面...", flush=True) await self.page.goto("https://creator.douyin.com/creator-micro/interactive/comment", wait_until="domcontentloaded", timeout=30000) await asyncio.sleep(5) # 检查登录状态 current_url = self.page.url if "login" in current_url or "passport" in current_url: raise Exception("Cookie 已过期,请重新登录") print(f"[{self.platform_name}] 页面加载完成,当前捕获: {len(captured_comments)} 条评论, {len(captured_works)} 个作品", flush=True) # 尝试点击"选择作品"来加载作品列表 try: select_btn = await self.page.query_selector('text="选择作品"') if select_btn: print(f"[{self.platform_name}] 点击选择作品按钮...", flush=True) await select_btn.click() await asyncio.sleep(3) # 获取作品列表 work_items = await self.page.query_selector_all('[class*="work-item"], [class*="video-item"], [class*="aweme-item"]') print(f"[{self.platform_name}] 找到 {len(work_items)} 个作品元素", flush=True) # 点击每个作品加载其评论 for i, item in enumerate(work_items[:10]): # 最多处理10个作品 try: await item.click() await asyncio.sleep(2) print(f"[{self.platform_name}] 已点击作品 {i+1}/{min(len(work_items), 10)}", flush=True) except: pass # 关闭选择作品弹窗 close_btn = await self.page.query_selector('[class*="close"], [class*="cancel"]') if close_btn: await close_btn.click() await asyncio.sleep(1) except Exception as e: print(f"[{self.platform_name}] 选择作品操作失败: {e}", flush=True) # 滚动加载更多评论 for i in range(5): await self.page.evaluate('window.scrollBy(0, 500)') await asyncio.sleep(1) await asyncio.sleep(3) # 移除监听器 self.page.remove_listener('response', handle_response) print(f"[{self.platform_name}] 最终捕获: {len(captured_comments)} 条评论, {len(captured_works)} 个作品", flush=True) # 按作品分组评论 work_comments_map = {} # work_id -> work_comments for comment in captured_comments: # 从评论中获取作品信息 aweme = comment.get('aweme', {}) or comment.get('item', {}) aweme_id = str(comment.get('aweme_id', '') or aweme.get('aweme_id', '') or aweme.get('item_id', '')) if not aweme_id: continue if aweme_id not in work_comments_map: work_info = captured_works.get(aweme_id, {}) work_comments_map[aweme_id] = { 'work_id': aweme_id, 'title': aweme.get('title', '') or aweme.get('desc', '') or work_info.get('title', ''), 'cover_url': aweme.get('cover', {}).get('url_list', [''])[0] if aweme.get('cover') else work_info.get('cover', ''), 'comments': [] } cid = str(comment.get('cid', '')) if not cid: continue user = comment.get('user', {}) work_comments_map[aweme_id]['comments'].append({ 'comment_id': cid, 'author_id': str(user.get('uid', '')), 'author_name': user.get('nickname', ''), 'author_avatar': user.get('avatar_thumb', {}).get('url_list', [''])[0] if user.get('avatar_thumb') else '', 'content': comment.get('text', ''), 'like_count': int(comment.get('digg_count', 0)), 'create_time': datetime.fromtimestamp(comment.get('create_time', 0)).strftime('%Y-%m-%d %H:%M:%S') if comment.get('create_time') else '', 'is_author': comment.get('is_author', False), }) all_work_comments = list(work_comments_map.values()) total_comments = sum(len(w['comments']) for w in all_work_comments) print(f"[{self.platform_name}] 获取到 {len(all_work_comments)} 个作品的 {total_comments} 条评论", flush=True) except Exception as e: import traceback traceback.print_exc() return { 'success': False, 'platform': self.platform_name, 'error': str(e), 'work_comments': [] } finally: await self.close_browser() return { 'success': True, 'platform': self.platform_name, 'work_comments': all_work_comments, 'total': len(all_work_comments) } async def auto_reply_private_messages(self, cookies: str) -> dict: """自动回复抖音私信 - 适配新页面结构""" print(f"\n{'='*60}") print(f"[{self.platform_name}] 开始自动回复抖音私信") print(f"{'='*60}") try: await self.init_browser() cookie_list = self.parse_cookies(cookies) await self.set_cookies(cookie_list) if not self.page: raise Exception("Page not initialized") # 访问抖音私信页面 await self.page.goto("https://creator.douyin.com/creator-micro/data/following/chat", timeout=30000) await asyncio.sleep(3) # 检查登录状态 current_url = self.page.url print(f"[{self.platform_name}] 当前 URL: {current_url}") if "login" in current_url or "passport" in current_url: raise Exception("Cookie 已过期,请重新登录") replied_count = 0 # 处理两个tab: 陌生人私信 和 朋友私信 for tab_name in ["陌生人私信", "朋友私信"]: print(f"\n{'='*50}") print(f"[{self.platform_name}] 处理 {tab_name} ...") print(f"{'='*50}") # 点击对应tab tab_locator = self.page.locator(f'div.semi-tabs-tab:text-is("{tab_name}")') if await tab_locator.count() > 0: await tab_locator.click() await asyncio.sleep(2) else: print(f"⚠️ 未找到 {tab_name} 标签,跳过") continue # 获取私信列表 session_items = self.page.locator('.semi-list-item') session_count = await session_items.count() print(f"[{self.platform_name}] {tab_name} 共找到 {session_count} 条会话") if session_count == 0: print(f"[{self.platform_name}] {tab_name} 无新私信") continue for idx in range(session_count): try: # 重新获取列表(防止 DOM 变化) current_sessions = self.page.locator('.semi-list-item') if idx >= await current_sessions.count(): break session = current_sessions.nth(idx) user_name = await session.locator('.item-header-name-vL_79m').inner_text() last_msg = await session.locator('.text-whxV9A').inner_text() print(f"\n ➤ [{idx+1}/{session_count}] 处理用户: {user_name} | 最后消息: {last_msg[:30]}...") # 检查会话预览消息是否包含非文字内容 if "分享" in last_msg and ("视频" in last_msg or "图片" in last_msg or "链接" in last_msg): print(" ➤ 会话预览为非文字消息,跳过") continue # 点击进入聊天 await session.click() await asyncio.sleep(2) # 提取聊天历史(判断最后一条是否是自己发的) chat_messages = self.page.locator('.box-item-dSA1TJ:not(.time-Za5gKL)') msg_count = await chat_messages.count() should_reply = True if msg_count > 0: # 最后一条消息 last_msg_el = chat_messages.nth(msg_count - 1) # 获取元素的 class 属性判断是否是自己发的 classes = await last_msg_el.get_attribute('class') or '' is_my_message = 'is-me-' in classes # 包含 is-me- 表示是自己发的 should_reply = not is_my_message # 如果是自己发的就不回复 if should_reply: # 提取完整聊天历史 chat_history = await self._extract_chat_history() if chat_history: # 生成回复 reply_text = await self._generate_reply_with_ai(chat_history) if not reply_text: reply_text = self._generate_reply(chat_history) if reply_text: print(f" 📝 回复内容: {reply_text}") # 填充输入框 input_box = self.page.locator('div.chat-input-dccKiL[contenteditable="true"]') send_btn = self.page.locator('button:has-text("发送")') if await input_box.is_visible() and await send_btn.is_visible(): await input_box.fill(reply_text) await asyncio.sleep(0.5) await send_btn.click() print(" ✅ 已发送") replied_count += 1 await asyncio.sleep(2) else: print(" ❌ 输入框或发送按钮不可见") else: print(" ➤ 无需回复") else: print(" ➤ 聊天历史为空,跳过") else: print(" ➤ 最后一条是我发的,跳过") except Exception as e: print(f" ❌ 处理会话 {idx+1} 时出错: {e}") continue print(f"[{self.platform_name}] 自动回复完成,共回复 {replied_count} 条消息") return { 'success': True, 'platform': self.platform_name, 'replied_count': replied_count, 'message': f'成功回复 {replied_count} 条私信' } except Exception as e: import traceback traceback.print_exc() return { 'success': False, 'platform': self.platform_name, 'error': str(e) } finally: await self.close_browser() # 辅助方法保持兼容(可复用) def _generate_reply(self, chat_history: list) -> str: """规则回复""" if not chat_history: return "你好!感谢联系~" last_msg = chat_history[-1]["content"] if "谢谢" in last_msg or "感谢" in last_msg: return "不客气!欢迎常来交流~" elif "你好" in last_msg or "在吗" in last_msg: return "你好!请问有什么可以帮您的?" elif "视频" in last_msg or "怎么拍" in last_msg: return "视频是用手机拍摄的,注意光线和稳定哦!" else: return "收到!我会认真阅读您的留言~" async def _extract_chat_history(self) -> list: """精准提取聊天记录,区分作者(自己)和用户""" if not self.page: return [] history = [] # 获取所有聊天消息(排除时间戳元素) message_wrappers = self.page.locator('.box-item-dSA1TJ:not(.time-Za5gKL)') count = await message_wrappers.count() for i in range(count): try: wrapper = message_wrappers.nth(i) # 检查是否为自己发送的消息 classes = await wrapper.get_attribute('class') or '' is_author = 'is-me-' in classes # 包含 is-me- 表示是自己发的 # 获取消息文本内容 text_element = wrapper.locator('.text-X2d7fS') if await text_element.count() > 0: content = await text_element.inner_text() content = content.strip() if content: # 只添加非空消息 # 获取用户名(如果是对方消息) author_name = '' if not is_author: # 尝试获取对方用户名 name_elements = wrapper.locator('.aweme-author-name-m8uoXU') if await name_elements.count() > 0: author_name = await name_elements.nth(0).inner_text() else: author_name = '用户' else: author_name = '我' history.append({ "author": author_name, "content": content, "is_author": is_author, }) except Exception as e: print(f" ⚠️ 解析第 {i+1} 条消息失败: {e}") continue return history async def _generate_reply_with_ai(self, chat_history: list) -> str: """使用 AI 生成回复(保留原逻辑)""" import os, requests, json try: ai_api_key = os.environ.get('DASHSCOPE_API_KEY', '') ai_base_url = os.environ.get('DASHSCOPE_BASE_URL', 'https://dashscope.aliyuncs.com/compatible-mode/v1') ai_model = os.environ.get('AI_MODEL', 'qwen-plus') if not ai_api_key: return self._generate_reply(chat_history) messages = [{"role": "system", "content": "你是一个友好的抖音创作者助手,负责回复粉丝私信。请保持简洁、友好、专业的语气。回复长度不超过20字。"}] for msg in chat_history: role = "assistant" if msg.get("is_author", False) else "user" messages.append({"role": role, "content": msg["content"]}) headers = {'Authorization': f'Bearer {ai_api_key}', 'Content-Type': 'application/json'} payload = {"model": ai_model, "messages": messages, "max_tokens": 150, "temperature": 0.8} response = requests.post(f"{ai_base_url}/chat/completions", headers=headers, json=payload, timeout=30) if response.status_code == 200: ai_reply = response.json().get('choices', [{}])[0].get('message', {}).get('content', '').strip() return ai_reply if ai_reply else self._generate_reply(chat_history) else: return self._generate_reply(chat_history) except: return self._generate_reply(chat_history) async def get_work_comments_mapping(self, cookies: str) -> dict: """获取所有作品及其评论的对应关系 Args: cookies: 抖音创作者平台的cookies Returns: dict: 包含作品和评论对应关系的JSON数据 """ print(f"\n{'='*60}") print(f"[{self.platform_name}] 获取作品和评论对应关系") print(f"{'='*60}") work_comments_mapping = [] try: await self.init_browser() cookie_list = self.parse_cookies(cookies) await self.set_cookies(cookie_list) if not self.page: raise Exception("Page not initialized") # 访问创作者中心首页 await self.page.goto("https://creator.douyin.com/creator-micro/home", timeout=30000) await asyncio.sleep(3) # 检查登录状态 current_url = self.page.url if "login" in current_url or "passport" in current_url: raise Exception("Cookie 已过期,请重新登录") # 访问内容管理页面获取作品列表 print(f"[{self.platform_name}] 访问内容管理页面...") await self.page.goto("https://creator.douyin.com/creator-micro/content/manage", timeout=30000) await asyncio.sleep(5) # 获取作品列表 works_result = await self.get_works(cookies, page=0, page_size=20) if not works_result.success: print(f"[{self.platform_name}] 获取作品列表失败: {works_result.error}") return { 'success': False, 'platform': self.platform_name, 'error': works_result.error, 'work_comments': [] } print(f"[{self.platform_name}] 获取到 {len(works_result.works)} 个作品") # 对每个作品获取评论 for i, work in enumerate(works_result.works): print(f"[{self.platform_name}] 正在获取作品 {i+1}/{len(works_result.works)} 的评论: {work.title[:20]}...") # 获取单个作品的评论 comments_result = await self.get_comments(cookies, work.work_id) if comments_result.success: work_comments_mapping.append({ 'work_info': work.to_dict(), 'comments': [comment.to_dict() for comment in comments_result.comments] }) print(f"[{self.platform_name}] 作品 '{work.title[:20]}...' 获取到 {len(comments_result.comments)} 条评论") else: print(f"[{self.platform_name}] 获取作品 '{work.title[:20]}...' 评论失败: {comments_result.error}") work_comments_mapping.append({ 'work_info': work.to_dict(), 'comments': [], 'error': comments_result.error }) # 添加延时避免请求过于频繁 await asyncio.sleep(2) print(f"[{self.platform_name}] 所有作品评论获取完成") except Exception as e: import traceback traceback.print_exc() return { 'success': False, 'platform': self.platform_name, 'error': str(e), 'work_comments': [] } finally: await self.close_browser() return { 'success': True, 'platform': self.platform_name, 'work_comments': work_comments_mapping, 'summary': { 'total_works': len(work_comments_mapping), 'total_comments': sum(len(item['comments']) for item in work_comments_mapping), } }