# -*- coding: utf-8 -*- """ 小红书视频发布器 参考: matrix/xhs_uploader/main.py 使用 xhs SDK API 方式发布,更稳定 """ import asyncio import os import sys from pathlib import Path from typing import List from .base import ( BasePublisher, PublishParams, PublishResult, WorkItem, WorksResult, CommentItem, CommentsResult ) # 添加 matrix 项目路径,用于导入签名脚本 MATRIX_PATH = Path(__file__).parent.parent.parent.parent / "matrix" sys.path.insert(0, str(MATRIX_PATH)) # 尝试导入 xhs SDK try: from xhs import XhsClient XHS_SDK_AVAILABLE = True except ImportError: print("[Warning] xhs 库未安装,请运行: pip install xhs") XhsClient = None XHS_SDK_AVAILABLE = False # 签名脚本路径 STEALTH_JS_PATH = MATRIX_PATH / "xhs-api" / "js" / "stealth.min.js" class XiaohongshuPublisher(BasePublisher): """ 小红书视频发布器 优先使用 xhs SDK API 方式发布 """ platform_name = "xiaohongshu" login_url = "https://creator.xiaohongshu.com/" publish_url = "https://creator.xiaohongshu.com/publish/publish" cookie_domain = ".xiaohongshu.com" async def get_sign(self, uri: str, data=None, a1: str = "", web_session: str = ""): """获取小红书 API 签名""" from playwright.async_api import async_playwright try: async with async_playwright() as playwright: browser = await playwright.chromium.launch(headless=True) browser_context = await browser.new_context() if STEALTH_JS_PATH.exists(): await browser_context.add_init_script(path=str(STEALTH_JS_PATH)) page = await browser_context.new_page() await page.goto("https://www.xiaohongshu.com") await asyncio.sleep(1) await page.reload() await asyncio.sleep(1) if a1: await browser_context.add_cookies([ {'name': 'a1', 'value': a1, 'domain': ".xiaohongshu.com", 'path': "/"} ]) await page.reload() await asyncio.sleep(0.5) encrypt_params = await page.evaluate( "([url, data]) => window._webmsxyw(url, data)", [uri, data] ) await browser_context.close() await browser.close() return { "x-s": encrypt_params["X-s"], "x-t": str(encrypt_params["X-t"]) } except Exception as e: import traceback traceback.print_exc() raise Exception(f"签名失败: {e}") def sign_sync(self, uri, data=None, a1="", web_session=""): """同步签名函数,供 XhsClient 使用""" return asyncio.run(self.get_sign(uri, data, a1, web_session)) async def publish_via_api(self, cookies: str, params: PublishParams) -> PublishResult: """通过 API 发布视频""" if not XHS_SDK_AVAILABLE: raise Exception("xhs SDK 未安装,请运行: pip install xhs") self.report_progress(10, "正在通过 API 发布...") print(f"[{self.platform_name}] 使用 XHS SDK API 发布...") print(f"[{self.platform_name}] 视频路径: {params.video_path}") print(f"[{self.platform_name}] 标题: {params.title}") # 转换 cookie 格式 cookie_list = self.parse_cookies(cookies) cookie_string = self.cookies_to_string(cookie_list) if cookie_list else cookies print(f"[{self.platform_name}] Cookie 长度: {len(cookie_string)}") self.report_progress(20, "正在上传视频...") # 创建客户端 xhs_client = XhsClient(cookie_string, sign=self.sign_sync) print(f"[{self.platform_name}] 开始调用 create_video_note...") # 发布视频 try: result = xhs_client.create_video_note( title=params.title, desc=params.description or params.title, topics=params.tags or [], post_time=params.publish_date.strftime("%Y-%m-%d %H:%M:%S") if params.publish_date else None, video_path=params.video_path, cover_path=params.cover_path if params.cover_path and os.path.exists(params.cover_path) else None ) print(f"[{self.platform_name}] SDK 返回结果: {result}") except Exception as e: import traceback traceback.print_exc() print(f"[{self.platform_name}] SDK 调用失败: {e}") raise Exception(f"XHS SDK 发布失败: {e}") # 验证返回结果 if not result: raise Exception("XHS SDK 返回空结果") # 检查是否有错误 if isinstance(result, dict): if result.get("code") and result.get("code") != 0: raise Exception(f"发布失败: {result.get('msg', '未知错误')}") if result.get("success") == False: raise Exception(f"发布失败: {result.get('msg', result.get('error', '未知错误'))}") note_id = result.get("note_id", "") if isinstance(result, dict) else "" video_url = result.get("url", "") if isinstance(result, dict) else "" if not note_id: print(f"[{self.platform_name}] 警告: 未获取到 note_id,返回结果: {result}") self.report_progress(100, "发布成功") print(f"[{self.platform_name}] 发布成功! note_id={note_id}, url={video_url}") return PublishResult( success=True, platform=self.platform_name, video_id=note_id, video_url=video_url, message="发布成功" ) async def publish(self, cookies: str, params: PublishParams) -> PublishResult: """发布视频到小红书""" print(f"\n{'='*60}") print(f"[{self.platform_name}] 开始发布视频") print(f"[{self.platform_name}] 视频路径: {params.video_path}") print(f"[{self.platform_name}] 标题: {params.title}") print(f"[{self.platform_name}] XHS SDK 可用: {XHS_SDK_AVAILABLE}") print(f"{'='*60}") # 检查视频文件 if not os.path.exists(params.video_path): raise Exception(f"视频文件不存在: {params.video_path}") print(f"[{self.platform_name}] 视频文件存在,大小: {os.path.getsize(params.video_path)} bytes") self.report_progress(5, "正在准备发布...") # 临时禁用 API 方式,直接使用 Playwright(更稳定) # TODO: 后续优化 API 方式的返回值验证 # if XHS_SDK_AVAILABLE: # try: # result = await self.publish_via_api(cookies, params) # print(f"[{self.platform_name}] API 发布完成: {result}") # return result # except Exception as e: # import traceback # traceback.print_exc() # print(f"[{self.platform_name}] API 发布失败: {e}") # print(f"[{self.platform_name}] 尝试使用 Playwright 方式...") # 使用 Playwright 方式发布(更可靠) print(f"[{self.platform_name}] 使用 Playwright 方式发布...") return await self.publish_via_playwright(cookies, params) async def publish_via_playwright(self, cookies: str, params: PublishParams) -> PublishResult: """通过 Playwright 发布视频""" self.report_progress(10, "正在初始化浏览器...") print(f"[{self.platform_name}] Playwright 方式开始...") await self.init_browser() cookie_list = self.parse_cookies(cookies) print(f"[{self.platform_name}] 设置 {len(cookie_list)} 个 cookies") await self.set_cookies(cookie_list) if not self.page: raise Exception("Page not initialized") self.report_progress(15, "正在打开发布页面...") # 直接访问视频发布页面 publish_url = "https://creator.xiaohongshu.com/publish/publish?source=official" print(f"[{self.platform_name}] 打开页面: {publish_url}") await self.page.goto(publish_url) await asyncio.sleep(3) current_url = self.page.url print(f"[{self.platform_name}] 当前 URL: {current_url}") # 检查登录状态 if "login" in current_url or "passport" in current_url: screenshot_path = f"debug_login_required_{self.platform_name}.png" await self.page.screenshot(path=screenshot_path) raise Exception(f"登录已过期,请重新登录(截图: {screenshot_path})") self.report_progress(20, "正在上传视频...") # 等待页面加载 await asyncio.sleep(2) # 上传视频 upload_triggered = False # 方法1: 直接设置隐藏的 file input print(f"[{self.platform_name}] 尝试方法1: 设置 file input") file_inputs = self.page.locator('input[type="file"]') input_count = await file_inputs.count() print(f"[{self.platform_name}] 找到 {input_count} 个 file input") if input_count > 0: # 找到接受视频的 input for i in range(input_count): input_el = file_inputs.nth(i) accept = await input_el.get_attribute('accept') or '' print(f"[{self.platform_name}] Input {i} accept: {accept}") if 'video' in accept or '*' in accept or not accept: await input_el.set_input_files(params.video_path) upload_triggered = True print(f"[{self.platform_name}] 视频文件已设置到 input {i}") break # 方法2: 点击上传区域触发文件选择器 if not upload_triggered: print(f"[{self.platform_name}] 尝试方法2: 点击上传区域") try: upload_area = self.page.locator('[class*="upload-wrapper"], [class*="upload-area"], .upload-input').first if await upload_area.count() > 0: async with self.page.expect_file_chooser(timeout=5000) as fc_info: await upload_area.click() file_chooser = await fc_info.value await file_chooser.set_files(params.video_path) upload_triggered = True print(f"[{self.platform_name}] 通过点击上传区域上传成功") except Exception as e: print(f"[{self.platform_name}] 方法2失败: {e}") if not upload_triggered: screenshot_path = f"debug_upload_failed_{self.platform_name}.png" await self.page.screenshot(path=screenshot_path) raise Exception(f"无法上传视频文件(截图: {screenshot_path})") self.report_progress(40, "等待视频上传完成...") print(f"[{self.platform_name}] 等待视频上传和处理...") # 等待上传完成(检测页面变化) upload_complete = False for i in range(60): # 最多等待3分钟 await asyncio.sleep(3) # 检查是否有标题输入框(上传完成后出现) title_input_count = await self.page.locator('input[placeholder*="标题"], input[placeholder*="填写标题"]').count() # 或者检查编辑器区域 editor_count = await self.page.locator('[class*="ql-editor"], [contenteditable="true"]').count() # 检查发布按钮是否可见 publish_btn_count = await self.page.locator('.publishBtn, button:has-text("发布")').count() print(f"[{self.platform_name}] 检测 {i+1}: 标题框={title_input_count}, 编辑器={editor_count}, 发布按钮={publish_btn_count}") if title_input_count > 0 or (editor_count > 0 and publish_btn_count > 0): upload_complete = True print(f"[{self.platform_name}] 视频上传完成!") break if not upload_complete: screenshot_path = f"debug_upload_timeout_{self.platform_name}.png" await self.page.screenshot(path=screenshot_path) raise Exception(f"视频上传超时(截图: {screenshot_path})") await asyncio.sleep(2) self.report_progress(60, "正在填写笔记信息...") print(f"[{self.platform_name}] 填写标题: {params.title[:20]}") # 填写标题 title_filled = False title_selectors = [ 'input[placeholder*="标题"]', 'input[placeholder*="填写标题"]', '[class*="title"] input', '.c-input_inner', ] for selector in title_selectors: title_input = self.page.locator(selector).first if await title_input.count() > 0: await title_input.click() await title_input.fill('') # 先清空 await title_input.fill(params.title[:20]) title_filled = True print(f"[{self.platform_name}] 标题已填写,使用选择器: {selector}") break if not title_filled: print(f"[{self.platform_name}] 警告: 未找到标题输入框") # 填写描述和标签 if params.description or params.tags: desc_filled = False desc_selectors = [ '[class*="ql-editor"]', '[class*="content-input"] [contenteditable="true"]', '[class*="editor"] [contenteditable="true"]', '.ql-editor', ] for selector in desc_selectors: desc_input = self.page.locator(selector).first if await desc_input.count() > 0: await desc_input.click() await asyncio.sleep(0.5) if params.description: await self.page.keyboard.type(params.description, delay=20) print(f"[{self.platform_name}] 描述已填写") if params.tags: # 添加标签 await self.page.keyboard.press("Enter") for tag in params.tags[:5]: # 最多5个标签 await self.page.keyboard.type(f"#{tag}", delay=20) await asyncio.sleep(0.3) await self.page.keyboard.press("Space") print(f"[{self.platform_name}] 标签已填写: {params.tags[:5]}") desc_filled = True break if not desc_filled: print(f"[{self.platform_name}] 警告: 未找到描述输入框") await asyncio.sleep(2) self.report_progress(80, "正在发布...") await asyncio.sleep(2) # 滚动到页面底部确保发布按钮可见 await self.page.evaluate("window.scrollTo(0, document.body.scrollHeight)") await asyncio.sleep(1) print(f"[{self.platform_name}] 查找发布按钮...") # 点击发布 publish_selectors = [ 'button.publishBtn', '.publishBtn', 'button.d-button.red', 'button:has-text("发布"):not(:has-text("定时发布"))', '[class*="publish"][class*="btn"]', ] publish_clicked = False for selector in publish_selectors: try: btn = self.page.locator(selector).first if await btn.count() > 0: is_visible = await btn.is_visible() is_enabled = await btn.is_enabled() print(f"[{self.platform_name}] 按钮 {selector}: visible={is_visible}, enabled={is_enabled}") if is_visible and is_enabled: box = await btn.bounding_box() if box: print(f"[{self.platform_name}] 点击发布按钮: {selector}, 位置: ({box['x']}, {box['y']})") # 使用真实鼠标点击 await self.page.mouse.click(box['x'] + box['width']/2, box['y'] + box['height']/2) publish_clicked = True break except Exception as e: print(f"[{self.platform_name}] 选择器 {selector} 错误: {e}") if not publish_clicked: # 保存截图用于调试 screenshot_path = f"debug_publish_failed_{self.platform_name}.png" await self.page.screenshot(path=screenshot_path, full_page=True) print(f"[{self.platform_name}] 未找到发布按钮,截图保存到: {screenshot_path}") # 打印页面 HTML 结构用于调试 buttons = await self.page.query_selector_all('button') print(f"[{self.platform_name}] 页面上共有 {len(buttons)} 个按钮") for i, btn in enumerate(buttons[:10]): text = await btn.text_content() or '' cls = await btn.get_attribute('class') or '' print(f" 按钮 {i}: text='{text.strip()[:30]}', class='{cls[:50]}'") raise Exception("未找到发布按钮") print(f"[{self.platform_name}] 已点击发布按钮,等待发布完成...") self.report_progress(90, "等待发布结果...") # 等待发布完成(检测 URL 变化或成功提示) publish_success = False for i in range(20): # 最多等待 20 秒 await asyncio.sleep(1) current_url = self.page.url # 检查是否跳转到发布成功页面或内容管理页面 if "published=true" in current_url or "success" in current_url or "content" in current_url: publish_success = True print(f"[{self.platform_name}] 发布成功! 跳转到: {current_url}") break # 检查是否有成功提示 try: success_msg = await self.page.locator('[class*="success"], .toast-success, [class*="Toast"]').first.is_visible() if success_msg: publish_success = True print(f"[{self.platform_name}] 检测到成功提示!") break except: pass # 检查是否有错误提示 try: error_elements = self.page.locator('[class*="error"], .toast-error, [class*="fail"]') if await error_elements.count() > 0: error_text = await error_elements.first.text_content() if error_text and len(error_text.strip()) > 0: raise Exception(f"发布失败: {error_text.strip()}") except Exception as e: if "发布失败" in str(e): raise # 如果没有明确的成功标志,保存截图 if not publish_success: final_url = self.page.url print(f"[{self.platform_name}] 发布结果不确定,当前 URL: {final_url}") screenshot_path = f"debug_publish_result_{self.platform_name}.png" await self.page.screenshot(path=screenshot_path, full_page=True) print(f"[{self.platform_name}] 截图保存到: {screenshot_path}") # 如果 URL 还是发布页面,可能发布失败 if "publish/publish" in final_url: raise Exception(f"发布可能失败,仍停留在发布页面(截图: {screenshot_path})") self.report_progress(100, "发布完成") print(f"[{self.platform_name}] Playwright 方式发布完成!") return PublishResult( success=True, platform=self.platform_name, message="发布完成" ) async def get_works(self, cookies: str, page: int = 0, page_size: int = 20) -> WorksResult: """获取小红书作品列表 - 通过监听页面网络响应获取数据""" print(f"\n{'='*60}", flush=True) print(f"[{self.platform_name}] 获取作品列表", flush=True) print(f"[{self.platform_name}] page={page}, page_size={page_size}", flush=True) print(f"{'='*60}", flush=True) works: List[WorkItem] = [] total = 0 has_more = False captured_data = {} try: await self.init_browser() cookie_list = self.parse_cookies(cookies) # 打印 cookies 信息用于调试 print(f"[{self.platform_name}] 解析到 {len(cookie_list)} 个 cookies", flush=True) await self.set_cookies(cookie_list) if not self.page: raise Exception("Page not initialized") # 定义响应监听器 - 捕获页面自动发起的 API 请求 async def handle_response(response): nonlocal captured_data url = response.url # 监听作品列表 API if 'creator/note/user/posted' in url or 'creator/note_list' in url: try: json_data = await response.json() print(f"[{self.platform_name}] 捕获到 API 响应: {url[:80]}...", flush=True) if json_data.get('success') or json_data.get('code') == 0: captured_data = json_data print(f"[{self.platform_name}] API 响应成功,data keys: {list(json_data.get('data', {}).keys())}", flush=True) except Exception as e: print(f"[{self.platform_name}] 解析响应失败: {e}", flush=True) # 注册响应监听器 self.page.on('response', handle_response) print(f"[{self.platform_name}] 已注册 API 响应监听器", flush=True) # 访问笔记管理页面 - 页面会自动发起 API 请求 print(f"[{self.platform_name}] 访问笔记管理页面...", flush=True) try: await self.page.goto("https://creator.xiaohongshu.com/new/note-manager", wait_until="domcontentloaded", timeout=30000) except Exception as nav_error: print(f"[{self.platform_name}] 导航超时,但继续尝试: {nav_error}", flush=True) # 等待 API 响应被捕获 await asyncio.sleep(5) # 检查登录状态 current_url = self.page.url print(f"[{self.platform_name}] 当前页面: {current_url}", flush=True) if "login" in current_url: raise Exception("Cookie 已过期,请重新登录") # 如果还没有捕获到数据,等待更长时间 if not captured_data: print(f"[{self.platform_name}] 等待 API 响应...", flush=True) await asyncio.sleep(5) # 移除监听器 self.page.remove_listener('response', handle_response) # 处理捕获到的数据 import json if captured_data: print(f"[{self.platform_name}] 成功捕获到 API 数据", flush=True) data = captured_data.get('data', {}) notes = data.get('notes', []) print(f"[{self.platform_name}] notes 数量: {len(notes)}", flush=True) # 从 tags 获取总数 tags = data.get('tags', []) for tag in tags: if tag.get('id') == 'special.note_time_desc': total = tag.get('notes_count', 0) break has_more = data.get('page', -1) != -1 for note in notes: note_id = note.get('id', '') if not note_id: continue # 获取封面 cover_url = '' images_list = note.get('images_list', []) if images_list: cover_url = images_list[0].get('url', '') if cover_url.startswith('http://'): cover_url = cover_url.replace('http://', 'https://') # 获取时长 duration = note.get('video_info', {}).get('duration', 0) # 解析状态 status = 'published' tab_status = note.get('tab_status', 1) if tab_status == 0: status = 'draft' elif tab_status == 2: status = 'reviewing' elif tab_status == 3: status = 'rejected' works.append(WorkItem( work_id=note_id, title=note.get('display_title', '') or '无标题', cover_url=cover_url, duration=duration, status=status, publish_time=note.get('time', ''), play_count=note.get('view_count', 0), like_count=note.get('likes', 0), comment_count=note.get('comments_count', 0), share_count=note.get('shared_count', 0), collect_count=note.get('collected_count', 0), )) print(f"[{self.platform_name}] 解析到 {len(works)} 个作品,总计: {total}", flush=True) else: print(f"[{self.platform_name}] 未能捕获到 API 数据", flush=True) except Exception as e: import traceback print(f"[{self.platform_name}] 发生异常: {e}", flush=True) traceback.print_exc() return WorksResult( success=False, platform=self.platform_name, error=str(e) ) finally: # 确保关闭浏览器 await self.close_browser() return WorksResult( success=True, platform=self.platform_name, works=works, total=total or len(works), has_more=has_more ) async def get_comments(self, cookies: str, work_id: str, cursor: str = "") -> CommentsResult: """获取小红书作品评论 - 通过创作者后台评论管理页面""" print(f"\n{'='*60}") print(f"[{self.platform_name}] 获取作品评论") print(f"[{self.platform_name}] work_id={work_id}, cursor={cursor}") print(f"{'='*60}") comments: List[CommentItem] = [] total = 0 has_more = False next_cursor = "" captured_data = {} try: await self.init_browser() cookie_list = self.parse_cookies(cookies) await self.set_cookies(cookie_list) if not self.page: raise Exception("Page not initialized") # 设置 API 响应监听器 async def handle_response(response): nonlocal captured_data url = response.url # 监听评论相关 API - 创作者后台和普通页面的 API if '/comment/' in url and ('page' in url or 'list' in url): try: json_data = await response.json() print(f"[{self.platform_name}] 捕获到评论 API: {url[:100]}...", flush=True) if json_data.get('success') or json_data.get('code') == 0: data = json_data.get('data', {}) comment_list = data.get('comments') or data.get('list') or [] if comment_list: captured_data = json_data print(f"[{self.platform_name}] 评论 API 响应成功,comments={len(comment_list)}", flush=True) else: print(f"[{self.platform_name}] 评论 API 响应成功但无评论", flush=True) except Exception as e: print(f"[{self.platform_name}] 解析评论响应失败: {e}", flush=True) self.page.on('response', handle_response) print(f"[{self.platform_name}] 已注册评论 API 响应监听器", flush=True) # 访问创作者后台评论管理页面 comment_url = "https://creator.xiaohongshu.com/creator/comment" print(f"[{self.platform_name}] 访问评论管理页面: {comment_url}", flush=True) await self.page.goto(comment_url, wait_until="domcontentloaded", timeout=30000) await asyncio.sleep(5) # 检查是否被重定向到登录页 current_url = self.page.url print(f"[{self.platform_name}] 当前页面 URL: {current_url}", flush=True) if "login" in current_url: raise Exception("Cookie 已过期,请重新登录") # 等待评论加载 if not captured_data: print(f"[{self.platform_name}] 等待评论 API 响应...", flush=True) # 尝试滚动页面触发评论加载 await self.page.evaluate('window.scrollBy(0, 500)') await asyncio.sleep(3) if not captured_data: # 再等待一会,可能评论 API 加载较慢 print(f"[{self.platform_name}] 继续等待评论加载...", flush=True) await asyncio.sleep(5) # 移除监听器 self.page.remove_listener('response', handle_response) # 解析评论数据 if captured_data: data = captured_data.get('data', {}) comment_list = data.get('comments') or data.get('list') or [] has_more = data.get('has_more', False) next_cursor = data.get('cursor', '') print(f"[{self.platform_name}] 解析评论: has_more={has_more}, comments={len(comment_list)}", flush=True) for comment in comment_list: cid = comment.get('id', '') if not cid: continue user_info = comment.get('user_info', {}) # 解析子评论 replies = [] sub_comments = comment.get('sub_comments', []) or [] for sub in sub_comments: sub_user = sub.get('user_info', {}) replies.append(CommentItem( comment_id=sub.get('id', ''), work_id=work_id, content=sub.get('content', ''), author_id=sub_user.get('user_id', ''), author_name=sub_user.get('nickname', ''), author_avatar=sub_user.get('image', ''), like_count=sub.get('like_count', 0), create_time=sub.get('create_time', ''), )) comments.append(CommentItem( comment_id=cid, work_id=work_id, content=comment.get('content', ''), author_id=user_info.get('user_id', ''), author_name=user_info.get('nickname', ''), author_avatar=user_info.get('image', ''), like_count=comment.get('like_count', 0), reply_count=comment.get('sub_comment_count', 0), create_time=comment.get('create_time', ''), replies=replies, )) total = len(comments) print(f"[{self.platform_name}] 解析到 {total} 条评论", flush=True) else: print(f"[{self.platform_name}] 未捕获到评论 API 响应", flush=True) except Exception as e: import traceback traceback.print_exc() return CommentsResult( success=False, platform=self.platform_name, work_id=work_id, error=str(e) ) finally: await self.close_browser() result = CommentsResult( success=True, platform=self.platform_name, work_id=work_id, comments=comments, total=total, has_more=has_more ) result.__dict__['cursor'] = next_cursor return result async def get_all_comments(self, cookies: str) -> dict: """获取所有作品的评论 - 通过评论管理页面""" print(f"\n{'='*60}") print(f"[{self.platform_name}] 获取所有作品评论") print(f"{'='*60}") all_work_comments = [] captured_comments = [] captured_notes = {} # note_id -> note_info try: await self.init_browser() cookie_list = self.parse_cookies(cookies) await self.set_cookies(cookie_list) if not self.page: raise Exception("Page not initialized") # 设置 API 响应监听器 async def handle_response(response): nonlocal captured_comments, captured_notes url = response.url try: # 监听评论列表 API - 多种格式 if '/comment/' in url and ('page' in url or 'list' in url): json_data = await response.json() print(f"[{self.platform_name}] 捕获到评论 API: {url[:100]}...", flush=True) if json_data.get('success') or json_data.get('code') == 0: data = json_data.get('data', {}) comments = data.get('comments', []) or data.get('list', []) # 从 URL 中提取 note_id import re note_id_match = re.search(r'note_id=([^&]+)', url) note_id = note_id_match.group(1) if note_id_match else '' if comments: for comment in comments: # 添加 note_id 到评论中 if note_id and 'note_id' not in comment: comment['note_id'] = note_id captured_comments.append(comment) print(f"[{self.platform_name}] 捕获到 {len(comments)} 条评论 (note_id={note_id}),总计: {len(captured_comments)}", flush=True) # 监听笔记列表 API if '/note/' in url and ('list' in url or 'posted' in url or 'manager' in url): json_data = await response.json() if json_data.get('success') or json_data.get('code') == 0: data = json_data.get('data', {}) notes = data.get('notes', []) or data.get('list', []) print(f"[{self.platform_name}] 捕获到笔记列表 API: {len(notes)} 个笔记", flush=True) for note in notes: note_id = note.get('note_id', '') or note.get('id', '') if note_id: cover_url = '' cover = note.get('cover', {}) if isinstance(cover, dict): cover_url = cover.get('url', '') or cover.get('url_default', '') elif isinstance(cover, str): cover_url = cover captured_notes[note_id] = { 'title': note.get('title', '') or note.get('display_title', ''), 'cover': cover_url, } except Exception as e: print(f"[{self.platform_name}] 解析响应失败: {e}", flush=True) self.page.on('response', handle_response) print(f"[{self.platform_name}] 已注册 API 响应监听器", flush=True) # 访问评论管理页面 print(f"[{self.platform_name}] 访问评论管理页面...", flush=True) await self.page.goto("https://creator.xiaohongshu.com/creator/comment", wait_until="domcontentloaded", timeout=30000) await asyncio.sleep(5) # 检查登录状态 current_url = self.page.url if "login" in current_url: raise Exception("Cookie 已过期,请重新登录") print(f"[{self.platform_name}] 页面加载完成,当前捕获: {len(captured_comments)} 条评论, {len(captured_notes)} 个笔记", flush=True) # 滚动加载更多评论 for i in range(5): await self.page.evaluate('window.scrollBy(0, 500)') await asyncio.sleep(1) await asyncio.sleep(3) # 移除监听器 self.page.remove_listener('response', handle_response) print(f"[{self.platform_name}] 最终捕获: {len(captured_comments)} 条评论, {len(captured_notes)} 个笔记", flush=True) # 按作品分组评论 work_comments_map = {} # note_id -> work_comments for comment in captured_comments: # 获取笔记信息 note_info = comment.get('note_info', {}) or comment.get('note', {}) note_id = comment.get('note_id', '') or note_info.get('note_id', '') or note_info.get('id', '') if not note_id: continue if note_id not in work_comments_map: saved_note = captured_notes.get(note_id, {}) cover_url = '' cover = note_info.get('cover', {}) if isinstance(cover, dict): cover_url = cover.get('url', '') or cover.get('url_default', '') elif isinstance(cover, str): cover_url = cover if not cover_url: cover_url = saved_note.get('cover', '') work_comments_map[note_id] = { 'work_id': note_id, 'title': note_info.get('title', '') or note_info.get('display_title', '') or saved_note.get('title', ''), 'cover_url': cover_url, 'comments': [] } cid = comment.get('id', '') or comment.get('comment_id', '') if not cid: continue user_info = comment.get('user_info', {}) or comment.get('user', {}) work_comments_map[note_id]['comments'].append({ 'comment_id': cid, 'author_id': user_info.get('user_id', '') or user_info.get('id', ''), 'author_name': user_info.get('nickname', '') or user_info.get('name', ''), 'author_avatar': user_info.get('image', '') or user_info.get('avatar', ''), 'content': comment.get('content', ''), 'like_count': comment.get('like_count', 0), 'create_time': comment.get('create_time', ''), }) all_work_comments = list(work_comments_map.values()) total_comments = sum(len(w['comments']) for w in all_work_comments) print(f"[{self.platform_name}] 获取到 {len(all_work_comments)} 个作品的 {total_comments} 条评论", flush=True) except Exception as e: import traceback traceback.print_exc() return { 'success': False, 'platform': self.platform_name, 'error': str(e), 'work_comments': [] } finally: await self.close_browser() return { 'success': True, 'platform': self.platform_name, 'work_comments': all_work_comments, 'total': len(all_work_comments) }