# -*- coding: utf-8 -*- """ 抖音视频发布器 参考: matrix/douyin_uploader/main.py """ import asyncio import os import json from datetime import datetime from typing import List from .base import ( BasePublisher, PublishParams, PublishResult, WorkItem, WorksResult, CommentItem, CommentsResult ) class DouyinPublisher(BasePublisher): """ 抖音视频发布器 使用 Playwright 自动化操作抖音创作者中心 """ platform_name = "douyin" login_url = "https://creator.douyin.com/" publish_url = "https://creator.douyin.com/creator-micro/content/upload" cookie_domain = ".douyin.com" async def set_schedule_time(self, publish_date: datetime): """设置定时发布""" if not self.page: return # 选择定时发布 label_element = self.page.locator("label.radio-d4zkru:has-text('定时发布')") await label_element.click() await asyncio.sleep(1) # 输入时间 publish_date_str = publish_date.strftime("%Y-%m-%d %H:%M") await self.page.locator('.semi-input[placeholder="日期和时间"]').click() await self.page.keyboard.press("Control+KeyA") await self.page.keyboard.type(str(publish_date_str)) await self.page.keyboard.press("Enter") await asyncio.sleep(1) async def handle_upload_error(self, video_path: str): """处理上传错误,重新上传""" if not self.page: return print(f"[{self.platform_name}] 视频出错了,重新上传中...") await self.page.locator('div.progress-div [class^="upload-btn-input"]').set_input_files(video_path) async def check_captcha(self) -> dict: """ 检查页面是否需要验证码 返回: {'need_captcha': bool, 'captcha_type': str} """ if not self.page: return {'need_captcha': False, 'captcha_type': ''} try: # 检查手机验证码弹窗 phone_captcha_selectors = [ 'text="请输入验证码"', 'text="输入手机验证码"', 'text="获取验证码"', 'text="手机号验证"', '[class*="captcha"][class*="phone"]', '[class*="verify"][class*="phone"]', '[class*="sms-code"]', 'input[placeholder*="验证码"]', ] for selector in phone_captcha_selectors: try: if await self.page.locator(selector).count() > 0: print(f"[{self.platform_name}] 检测到手机验证码: {selector}", flush=True) return {'need_captcha': True, 'captcha_type': 'phone'} except: pass # 检查滑块验证码 slider_captcha_selectors = [ '[class*="captcha"][class*="slider"]', '[class*="slide-verify"]', '[class*="drag-verify"]', 'text="按住滑块"', 'text="向右滑动"', 'text="拖动滑块"', ] for selector in slider_captcha_selectors: try: if await self.page.locator(selector).count() > 0: print(f"[{self.platform_name}] 检测到滑块验证码: {selector}", flush=True) return {'need_captcha': True, 'captcha_type': 'slider'} except: pass # 检查图片验证码 image_captcha_selectors = [ '[class*="captcha"][class*="image"]', '[class*="verify-image"]', 'text="点击图片"', 'text="选择正确的"', ] for selector in image_captcha_selectors: try: if await self.page.locator(selector).count() > 0: print(f"[{self.platform_name}] 检测到图片验证码: {selector}", flush=True) return {'need_captcha': True, 'captcha_type': 'image'} except: pass # 检查登录弹窗(Cookie 过期) login_selectors = [ 'text="请先登录"', 'text="登录后继续"', '[class*="login-modal"]', '[class*="login-dialog"]', ] for selector in login_selectors: try: if await self.page.locator(selector).count() > 0: print(f"[{self.platform_name}] 检测到需要登录: {selector}", flush=True) return {'need_captcha': True, 'captcha_type': 'login'} except: pass except Exception as e: print(f"[{self.platform_name}] 验证码检测异常: {e}", flush=True) return {'need_captcha': False, 'captcha_type': ''} async def publish(self, cookies: str, params: PublishParams) -> PublishResult: """发布视频到抖音""" print(f"\n{'='*60}") print(f"[{self.platform_name}] 开始发布视频") print(f"[{self.platform_name}] 视频路径: {params.video_path}") print(f"[{self.platform_name}] 标题: {params.title}") print(f"[{self.platform_name}] Headless: {self.headless}") print(f"{'='*60}") self.report_progress(5, "正在初始化浏览器...") # 初始化浏览器 await self.init_browser() print(f"[{self.platform_name}] 浏览器初始化完成") # 解析并设置 cookies cookie_list = self.parse_cookies(cookies) print(f"[{self.platform_name}] 解析到 {len(cookie_list)} 个 cookies") await self.set_cookies(cookie_list) if not self.page: raise Exception("Page not initialized") # 检查视频文件 if not os.path.exists(params.video_path): raise Exception(f"视频文件不存在: {params.video_path}") print(f"[{self.platform_name}] 视频文件存在,大小: {os.path.getsize(params.video_path)} bytes") self.report_progress(10, "正在打开上传页面...") # 访问上传页面 await self.page.goto(self.publish_url) await self.page.wait_for_url(self.publish_url, timeout=30000) # 等待页面加载,检查验证码 await asyncio.sleep(2) captcha_result = await self.check_captcha() if captcha_result['need_captcha']: print(f"[{self.platform_name}] 检测到需要验证码: {captcha_result['captcha_type']}", flush=True) return PublishResult( success=False, platform=self.platform_name, error=f"需要{captcha_result['captcha_type']}验证码", need_captcha=True, captcha_type=captcha_result['captcha_type'] ) self.report_progress(15, "正在选择视频文件...") # 点击上传区域 upload_div = self.page.locator("div[class*='container-drag']").first async with self.page.expect_file_chooser() as fc_info: await upload_div.click() file_chooser = await fc_info.value await file_chooser.set_files(params.video_path) # 等待跳转到发布页面 self.report_progress(20, "等待进入发布页面...") for _ in range(60): try: await self.page.wait_for_url( "https://creator.douyin.com/creator-micro/content/post/video*", timeout=2000 ) break except: await asyncio.sleep(1) await asyncio.sleep(2) self.report_progress(30, "正在填充标题和话题...") # 填写标题 title_input = self.page.get_by_text('作品标题').locator("..").locator( "xpath=following-sibling::div[1]").locator("input") if await title_input.count(): await title_input.fill(params.title[:30]) else: # 备用方式 title_container = self.page.locator(".notranslate") await title_container.click() await self.page.keyboard.press("Control+KeyA") await self.page.keyboard.press("Delete") await self.page.keyboard.type(params.title) await self.page.keyboard.press("Enter") # 添加话题标签 if params.tags: css_selector = ".zone-container" for tag in params.tags: print(f"[{self.platform_name}] 添加话题: #{tag}") await self.page.type(css_selector, "#" + tag) await self.page.press(css_selector, "Space") self.report_progress(40, "等待视频上传完成...") # 等待视频上传完成 for _ in range(120): try: count = await self.page.locator("div").filter(has_text="重新上传").count() if count > 0: print(f"[{self.platform_name}] 视频上传完毕") break # 检查上传错误 if await self.page.locator('div.progress-div > div:has-text("上传失败")').count(): await self.handle_upload_error(params.video_path) await asyncio.sleep(3) except: await asyncio.sleep(3) self.report_progress(60, "处理视频设置...") # 关闭弹窗 known_btn = self.page.get_by_role("button", name="我知道了") if await known_btn.count() > 0: await known_btn.first.click() await asyncio.sleep(2) # 设置位置 try: await self.page.locator('div.semi-select span:has-text("输入地理位置")').click() await asyncio.sleep(1) await self.page.keyboard.press("Control+KeyA") await self.page.keyboard.press("Delete") await self.page.keyboard.type(params.location) await asyncio.sleep(1) await self.page.locator('div[role="listbox"] [role="option"]').first.click() except Exception as e: print(f"[{self.platform_name}] 设置位置失败: {e}") # 开启头条/西瓜同步 try: third_part_element = '[class^="info"] > [class^="first-part"] div div.semi-switch' if await self.page.locator(third_part_element).count(): class_name = await self.page.eval_on_selector( third_part_element, 'div => div.className') if 'semi-switch-checked' not in class_name: await self.page.locator(third_part_element).locator( 'input.semi-switch-native-control').click() except: pass # 定时发布 if params.publish_date: self.report_progress(70, "设置定时发布...") await self.set_schedule_time(params.publish_date) self.report_progress(80, "正在发布...") print(f"[{self.platform_name}] 查找发布按钮...") # 点击发布 publish_clicked = False for i in range(30): try: # 每次循环都检查验证码 captcha_result = await self.check_captcha() if captcha_result['need_captcha']: print(f"[{self.platform_name}] 发布过程中检测到需要验证码: {captcha_result['captcha_type']}", flush=True) # 保存截图供调试 screenshot_path = f"debug_captcha_{self.platform_name}_{i}.png" await self.page.screenshot(path=screenshot_path, full_page=True) print(f"[{self.platform_name}] 验证码截图保存到: {screenshot_path}", flush=True) return PublishResult( success=False, platform=self.platform_name, error=f"发布过程中需要{captcha_result['captcha_type']}验证码", need_captcha=True, captcha_type=captcha_result['captcha_type'] ) publish_btn = self.page.get_by_role('button', name="发布", exact=True) btn_count = await publish_btn.count() print(f"[{self.platform_name}] 发布按钮数量: {btn_count}") if btn_count > 0: print(f"[{self.platform_name}] 点击发布按钮...") await publish_btn.click() publish_clicked = True # 点击后等待并检查验证码 await asyncio.sleep(2) captcha_result = await self.check_captcha() if captcha_result['need_captcha']: print(f"[{self.platform_name}] 点击发布后需要验证码: {captcha_result['captcha_type']}", flush=True) screenshot_path = f"debug_captcha_after_publish_{self.platform_name}.png" await self.page.screenshot(path=screenshot_path, full_page=True) return PublishResult( success=False, platform=self.platform_name, error=f"发布需要{captcha_result['captcha_type']}验证码", need_captcha=True, captcha_type=captcha_result['captcha_type'] ) await self.page.wait_for_url( "https://creator.douyin.com/creator-micro/content/manage", timeout=5000 ) self.report_progress(100, "发布成功") print(f"[{self.platform_name}] 发布成功! 已跳转到内容管理页面") return PublishResult( success=True, platform=self.platform_name, message="发布成功" ) except Exception as e: current_url = self.page.url print(f"[{self.platform_name}] 尝试 {i+1}/30, 当前URL: {current_url}") if "content/manage" in current_url: self.report_progress(100, "发布成功") print(f"[{self.platform_name}] 发布成功! 已在内容管理页面") return PublishResult( success=True, platform=self.platform_name, message="发布成功" ) # 检查是否有错误提示 try: error_toast = self.page.locator('[class*="toast"][class*="error"], [class*="error-tip"]') if await error_toast.count() > 0: error_text = await error_toast.first.text_content() if error_text: print(f"[{self.platform_name}] 检测到错误提示: {error_text}") raise Exception(f"发布失败: {error_text}") except: pass await asyncio.sleep(1) # 发布超时,保存截图 screenshot_path = f"debug_publish_timeout_{self.platform_name}.png" await self.page.screenshot(path=screenshot_path, full_page=True) print(f"[{self.platform_name}] 发布超时,截图保存到: {screenshot_path}") raise Exception(f"发布超时(截图: {screenshot_path})") async def get_works(self, cookies: str, page: int = 0, page_size: int = 20) -> WorksResult: """获取抖音作品列表""" print(f"\n{'='*60}") print(f"[{self.platform_name}] 获取作品列表") print(f"[{self.platform_name}] page={page}, page_size={page_size}") print(f"{'='*60}") works: List[WorkItem] = [] total = 0 has_more = False try: await self.init_browser() cookie_list = self.parse_cookies(cookies) await self.set_cookies(cookie_list) if not self.page: raise Exception("Page not initialized") # 访问创作者中心首页以触发登录验证 await self.page.goto("https://creator.douyin.com/creator-micro/home") await asyncio.sleep(3) # 检查登录状态 current_url = self.page.url if "login" in current_url or "passport" in current_url: raise Exception("Cookie 已过期,请重新登录") # 调用作品列表 API cursor = page * page_size api_url = f"https://creator.douyin.com/janus/douyin/creator/pc/work_list?scene=star_atlas&device_platform=android&count={page_size}&max_cursor={cursor}&cookie_enabled=true&browser_language=zh-CN&browser_platform=Win32&browser_name=Mozilla&browser_online=true&timezone_name=Asia%2FShanghai&aid=1128" response = await self.page.evaluate(f''' async () => {{ const resp = await fetch("{api_url}", {{ credentials: 'include', headers: {{ 'Accept': 'application/json' }} }}); return await resp.json(); }} ''') print(f"[{self.platform_name}] API 响应: has_more={response.get('has_more')}, aweme_list={len(response.get('aweme_list', []))}") aweme_list = response.get('aweme_list', []) has_more = response.get('has_more', False) for aweme in aweme_list: aweme_id = str(aweme.get('aweme_id', '')) if not aweme_id: continue statistics = aweme.get('statistics', {}) # 获取封面 cover_url = '' if aweme.get('Cover', {}).get('url_list'): cover_url = aweme['Cover']['url_list'][0] elif aweme.get('video', {}).get('cover', {}).get('url_list'): cover_url = aweme['video']['cover']['url_list'][0] # 获取标题 title = aweme.get('item_title', '') or aweme.get('desc', '').split('\n')[0][:50] or '无标题' # 获取时长(毫秒转秒) duration = aweme.get('video', {}).get('duration', 0) // 1000 # 获取发布时间 create_time = aweme.get('create_time', 0) publish_time = datetime.fromtimestamp(create_time).strftime('%Y-%m-%d %H:%M:%S') if create_time else '' works.append(WorkItem( work_id=aweme_id, title=title, cover_url=cover_url, duration=duration, status='published', publish_time=publish_time, play_count=int(statistics.get('play_count', 0)), like_count=int(statistics.get('digg_count', 0)), comment_count=int(statistics.get('comment_count', 0)), share_count=int(statistics.get('share_count', 0)), )) total = len(works) print(f"[{self.platform_name}] 获取到 {total} 个作品") except Exception as e: import traceback traceback.print_exc() return WorksResult( success=False, platform=self.platform_name, error=str(e) ) return WorksResult( success=True, platform=self.platform_name, works=works, total=total, has_more=has_more ) async def get_comments(self, cookies: str, work_id: str, cursor: str = "") -> CommentsResult: """获取抖音作品评论 - 通过访问视频详情页拦截评论 API""" print(f"\n{'='*60}") print(f"[{self.platform_name}] 获取作品评论") print(f"[{self.platform_name}] work_id={work_id}, cursor={cursor}") print(f"{'='*60}") comments: List[CommentItem] = [] total = 0 has_more = False next_cursor = "" captured_data = {} try: await self.init_browser() cookie_list = self.parse_cookies(cookies) await self.set_cookies(cookie_list) if not self.page: raise Exception("Page not initialized") # 设置 API 响应监听器 async def handle_response(response): nonlocal captured_data url = response.url # 监听评论列表 API - 抖音视频页面使用的 API # /aweme/v1/web/comment/list/ 或 /comment/list/ if '/comment/list' in url and ('aweme_id' in url or work_id in url): try: json_data = await response.json() print(f"[{self.platform_name}] 捕获到评论 API: {url[:100]}...", flush=True) # 检查响应是否成功 if json_data.get('status_code') == 0 or json_data.get('comments'): captured_data = json_data comment_count = len(json_data.get('comments', [])) print(f"[{self.platform_name}] 评论 API 响应成功: comments={comment_count}, has_more={json_data.get('has_more')}", flush=True) except Exception as e: print(f"[{self.platform_name}] 解析评论响应失败: {e}", flush=True) self.page.on('response', handle_response) print(f"[{self.platform_name}] 已注册评论 API 响应监听器", flush=True) # 访问视频详情页 - 这会自动触发评论 API 请求 video_url = f"https://www.douyin.com/video/{work_id}" print(f"[{self.platform_name}] 访问视频详情页: {video_url}", flush=True) await self.page.goto(video_url, wait_until="domcontentloaded", timeout=30000) await asyncio.sleep(5) # 检查登录状态 current_url = self.page.url if "login" in current_url or "passport" in current_url: raise Exception("Cookie 已过期,请重新登录") # 等待评论加载 if not captured_data: print(f"[{self.platform_name}] 等待评论 API 响应...", flush=True) # 尝试滚动页面触发评论加载 await self.page.evaluate('window.scrollBy(0, 300)') await asyncio.sleep(3) if not captured_data: # 再等待一会 await asyncio.sleep(3) # 移除监听器 self.page.remove_listener('response', handle_response) # 解析评论数据 if captured_data: comment_list = captured_data.get('comments') or [] has_more = captured_data.get('has_more', False) or captured_data.get('has_more', 0) == 1 next_cursor = str(captured_data.get('cursor', '')) total = captured_data.get('total', 0) or len(comment_list) print(f"[{self.platform_name}] 解析评论: total={total}, has_more={has_more}, comments={len(comment_list)}", flush=True) for comment in comment_list: cid = str(comment.get('cid', '')) if not cid: continue user = comment.get('user', {}) # 解析回复列表 replies = [] reply_list = comment.get('reply_comment', []) or [] for reply in reply_list: reply_user = reply.get('user', {}) replies.append(CommentItem( comment_id=str(reply.get('cid', '')), work_id=work_id, content=reply.get('text', ''), author_id=str(reply_user.get('uid', '')), author_name=reply_user.get('nickname', ''), author_avatar=reply_user.get('avatar_thumb', {}).get('url_list', [''])[0] if reply_user.get('avatar_thumb') else '', like_count=int(reply.get('digg_count', 0)), create_time=datetime.fromtimestamp(reply.get('create_time', 0)).strftime('%Y-%m-%d %H:%M:%S') if reply.get('create_time') else '', is_author=reply.get('is_author', False), )) comments.append(CommentItem( comment_id=cid, work_id=work_id, content=comment.get('text', ''), author_id=str(user.get('uid', '')), author_name=user.get('nickname', ''), author_avatar=user.get('avatar_thumb', {}).get('url_list', [''])[0] if user.get('avatar_thumb') else '', like_count=int(comment.get('digg_count', 0)), reply_count=int(comment.get('reply_comment_total', 0)), create_time=datetime.fromtimestamp(comment.get('create_time', 0)).strftime('%Y-%m-%d %H:%M:%S') if comment.get('create_time') else '', is_author=comment.get('is_author', False), replies=replies, )) print(f"[{self.platform_name}] 解析到 {len(comments)} 条评论", flush=True) else: print(f"[{self.platform_name}] 未捕获到评论 API 响应", flush=True) except Exception as e: import traceback traceback.print_exc() return CommentsResult( success=False, platform=self.platform_name, work_id=work_id, error=str(e) ) finally: await self.close_browser() result = CommentsResult( success=True, platform=self.platform_name, work_id=work_id, comments=comments, total=total, has_more=has_more ) result.__dict__['cursor'] = next_cursor return result async def get_all_comments(self, cookies: str) -> dict: """获取所有作品的评论 - 通过评论管理页面""" print(f"\n{'='*60}") print(f"[{self.platform_name}] 获取所有作品评论") print(f"{'='*60}") all_work_comments = [] captured_comments = [] captured_works = {} # work_id -> work_info try: await self.init_browser() cookie_list = self.parse_cookies(cookies) await self.set_cookies(cookie_list) if not self.page: raise Exception("Page not initialized") # 设置 API 响应监听器 async def handle_response(response): nonlocal captured_comments, captured_works url = response.url try: # 监听评论列表 API - 多种格式 # /comment/list/select/ 或 /comment/read 或 /creator/comment/list if '/comment/list' in url or '/comment/read' in url or 'comment_list' in url: json_data = await response.json() print(f"[{self.platform_name}] 捕获到评论 API: {url[:100]}...", flush=True) # 格式1: comments 字段 comments = json_data.get('comments', []) # 格式2: comment_info_list 字段 if not comments: comments = json_data.get('comment_info_list', []) if comments: # 从 URL 中提取 aweme_id import re aweme_id_match = re.search(r'aweme_id=(\d+)', url) aweme_id = aweme_id_match.group(1) if aweme_id_match else '' for comment in comments: # 添加 aweme_id 到评论中 if aweme_id and 'aweme_id' not in comment: comment['aweme_id'] = aweme_id captured_comments.append(comment) print(f"[{self.platform_name}] 捕获到 {len(comments)} 条评论 (aweme_id={aweme_id}),总计: {len(captured_comments)}", flush=True) # 监听作品列表 API if '/work_list' in url or '/item/list' in url or '/creator/item' in url: json_data = await response.json() aweme_list = json_data.get('aweme_list', []) or json_data.get('item_info_list', []) or json_data.get('item_list', []) print(f"[{self.platform_name}] 捕获到作品列表 API: {len(aweme_list)} 个作品", flush=True) for aweme in aweme_list: aweme_id = str(aweme.get('aweme_id', '') or aweme.get('item_id', '') or aweme.get('item_id_plain', '')) if aweme_id: cover_url = '' if aweme.get('Cover', {}).get('url_list'): cover_url = aweme['Cover']['url_list'][0] elif aweme.get('video', {}).get('cover', {}).get('url_list'): cover_url = aweme['video']['cover']['url_list'][0] elif aweme.get('cover_image_url'): cover_url = aweme['cover_image_url'] captured_works[aweme_id] = { 'title': aweme.get('item_title', '') or aweme.get('title', '') or aweme.get('desc', ''), 'cover': cover_url, 'comment_count': aweme.get('statistics', {}).get('comment_count', 0) or aweme.get('comment_count', 0), } except Exception as e: print(f"[{self.platform_name}] 解析响应失败: {e}", flush=True) self.page.on('response', handle_response) print(f"[{self.platform_name}] 已注册 API 响应监听器", flush=True) # 访问评论管理页面 print(f"[{self.platform_name}] 访问评论管理页面...", flush=True) await self.page.goto("https://creator.douyin.com/creator-micro/interactive/comment", wait_until="domcontentloaded", timeout=30000) await asyncio.sleep(5) # 检查登录状态 current_url = self.page.url if "login" in current_url or "passport" in current_url: raise Exception("Cookie 已过期,请重新登录") print(f"[{self.platform_name}] 页面加载完成,当前捕获: {len(captured_comments)} 条评论, {len(captured_works)} 个作品", flush=True) # 尝试点击"选择作品"来加载作品列表 try: select_btn = await self.page.query_selector('text="选择作品"') if select_btn: print(f"[{self.platform_name}] 点击选择作品按钮...", flush=True) await select_btn.click() await asyncio.sleep(3) # 获取作品列表 work_items = await self.page.query_selector_all('[class*="work-item"], [class*="video-item"], [class*="aweme-item"]') print(f"[{self.platform_name}] 找到 {len(work_items)} 个作品元素", flush=True) # 点击每个作品加载其评论 for i, item in enumerate(work_items[:10]): # 最多处理10个作品 try: await item.click() await asyncio.sleep(2) print(f"[{self.platform_name}] 已点击作品 {i+1}/{min(len(work_items), 10)}", flush=True) except: pass # 关闭选择作品弹窗 close_btn = await self.page.query_selector('[class*="close"], [class*="cancel"]') if close_btn: await close_btn.click() await asyncio.sleep(1) except Exception as e: print(f"[{self.platform_name}] 选择作品操作失败: {e}", flush=True) # 滚动加载更多评论 for i in range(5): await self.page.evaluate('window.scrollBy(0, 500)') await asyncio.sleep(1) await asyncio.sleep(3) # 移除监听器 self.page.remove_listener('response', handle_response) print(f"[{self.platform_name}] 最终捕获: {len(captured_comments)} 条评论, {len(captured_works)} 个作品", flush=True) # 按作品分组评论 work_comments_map = {} # work_id -> work_comments for comment in captured_comments: # 从评论中获取作品信息 aweme = comment.get('aweme', {}) or comment.get('item', {}) aweme_id = str(comment.get('aweme_id', '') or aweme.get('aweme_id', '') or aweme.get('item_id', '')) if not aweme_id: continue if aweme_id not in work_comments_map: work_info = captured_works.get(aweme_id, {}) work_comments_map[aweme_id] = { 'work_id': aweme_id, 'title': aweme.get('title', '') or aweme.get('desc', '') or work_info.get('title', ''), 'cover_url': aweme.get('cover', {}).get('url_list', [''])[0] if aweme.get('cover') else work_info.get('cover', ''), 'comments': [] } cid = str(comment.get('cid', '')) if not cid: continue user = comment.get('user', {}) work_comments_map[aweme_id]['comments'].append({ 'comment_id': cid, 'author_id': str(user.get('uid', '')), 'author_name': user.get('nickname', ''), 'author_avatar': user.get('avatar_thumb', {}).get('url_list', [''])[0] if user.get('avatar_thumb') else '', 'content': comment.get('text', ''), 'like_count': int(comment.get('digg_count', 0)), 'create_time': datetime.fromtimestamp(comment.get('create_time', 0)).strftime('%Y-%m-%d %H:%M:%S') if comment.get('create_time') else '', 'is_author': comment.get('is_author', False), }) all_work_comments = list(work_comments_map.values()) total_comments = sum(len(w['comments']) for w in all_work_comments) print(f"[{self.platform_name}] 获取到 {len(all_work_comments)} 个作品的 {total_comments} 条评论", flush=True) except Exception as e: import traceback traceback.print_exc() return { 'success': False, 'platform': self.platform_name, 'error': str(e), 'work_comments': [] } finally: await self.close_browser() return { 'success': True, 'platform': self.platform_name, 'work_comments': all_work_comments, 'total': len(all_work_comments) }