||
- # -*- coding: utf-8 -*-
- """
- 小红书视频发布器
- 参考: matrix/xhs_uploader/main.py
- 使用 xhs SDK API 方式发布,更稳定
- """
- import asyncio
- import os
- import sys
- import time
- from pathlib import Path
- from typing import List
- from .base import (
- BasePublisher, PublishParams, PublishResult,
- WorkItem, WorksResult, CommentItem, CommentsResult
- )
- # 添加 matrix 项目路径,用于导入签名脚本
- MATRIX_PATH = Path(__file__).parent.parent.parent.parent / "matrix"
- sys.path.insert(0, str(MATRIX_PATH))
- # 尝试导入 xhs SDK
- try:
- from xhs import XhsClient
- XHS_SDK_AVAILABLE = True
- except ImportError:
- print("[Warning] xhs 库未安装,请运行: pip install xhs")
- XhsClient = None
- XHS_SDK_AVAILABLE = False
- # 签名脚本路径
- STEALTH_JS_PATH = MATRIX_PATH / "xhs-api" / "js" / "stealth.min.js"
- class XiaohongshuPublisher(BasePublisher):
- """
- 小红书视频发布器
- 优先使用 xhs SDK API 方式发布
- """
-
- platform_name = "xiaohongshu"
- login_url = "https://creator.xiaohongshu.com/"
- publish_url = "https://creator.xiaohongshu.com/publish/publish"
- cookie_domain = ".xiaohongshu.com"
-
- async def get_sign(self, uri: str, data=None, a1: str = "", web_session: str = ""):
- """获取小红书 API 签名"""
- from playwright.async_api import async_playwright
-
- try:
- async with async_playwright() as playwright:
- browser = await playwright.chromium.launch(headless=True)
- browser_context = await browser.new_context()
-
- if STEALTH_JS_PATH.exists():
- await browser_context.add_init_script(path=str(STEALTH_JS_PATH))
-
- page = await browser_context.new_page()
- await page.goto("https://www.xiaohongshu.com")
- await asyncio.sleep(1)
- await page.reload()
- await asyncio.sleep(1)
-
- if a1:
- await browser_context.add_cookies([
- {'name': 'a1', 'value': a1, 'domain': ".xiaohongshu.com", 'path': "/"}
- ])
- await page.reload()
- await asyncio.sleep(0.5)
-
- encrypt_params = await page.evaluate(
- "([url, data]) => window._webmsxyw(url, data)",
- [uri, data]
- )
-
- await browser_context.close()
- await browser.close()
-
- return {
- "x-s": encrypt_params["X-s"],
- "x-t": str(encrypt_params["X-t"])
- }
- except Exception as e:
- import traceback
- traceback.print_exc()
- raise Exception(f"签名失败: {e}")
-
- def sign_sync(self, uri, data=None, a1="", web_session=""):
- """
- 同步签名函数,供 XhsClient 使用。
-
- 注意:发布流程运行在 asyncio 事件循环中(通过 asyncio.run 启动),
- 这里如果再调用 asyncio.run 会触发 “asyncio.run() cannot be called from a running event loop”。
- 因此改为使用 sync_playwright 的同步实现(参考 matrix/xhs_uploader)。
- """
- try:
- from playwright.sync_api import sync_playwright
- except Exception as e:
- raise Exception(f"缺少 playwright 同步接口支持: {e}")
-
- last_exc: Exception | None = None
- for attempt in range(1, 6):
- try:
- with sync_playwright() as playwright:
- browser = playwright.chromium.launch(headless=True)
- context = browser.new_context()
-
- if STEALTH_JS_PATH.exists():
- context.add_init_script(path=str(STEALTH_JS_PATH))
-
- page = context.new_page()
- page.goto("https://www.xiaohongshu.com", wait_until="domcontentloaded", timeout=60000)
-
- if a1:
- context.add_cookies([
- {'name': 'a1', 'value': a1, 'domain': ".xiaohongshu.com", 'path': "/"}
- ])
- page.reload(wait_until="domcontentloaded")
-
- # 参考 matrix:设置完 cookie 后需要稍等,否则可能出现 window._webmsxyw 不存在
- time.sleep(1.5)
-
- encrypt_params = page.evaluate(
- "([url, data]) => window._webmsxyw(url, data)",
- [uri, data]
- )
-
- context.close()
- browser.close()
-
- return {
- "x-s": encrypt_params["X-s"],
- "x-t": str(encrypt_params["X-t"])
- }
- except Exception as e:
- last_exc = e
- # 轻微退避重试
- time.sleep(0.4 * attempt)
-
- raise Exception(f"签名失败: {last_exc}")
-
- async def publish_via_api(self, cookies: str, params: PublishParams) -> PublishResult:
- """通过 API 发布视频"""
- if not XHS_SDK_AVAILABLE:
- raise Exception("xhs SDK 未安装,请运行: pip install xhs")
-
- self.report_progress(10, "正在通过 API 发布...")
- print(f"[{self.platform_name}] 使用 XHS SDK API 发布...")
- print(f"[{self.platform_name}] 视频路径: {params.video_path}")
- print(f"[{self.platform_name}] 标题: {params.title}")
-
- # 转换 cookie 格式
- cookie_list = self.parse_cookies(cookies)
- cookie_string = self.cookies_to_string(cookie_list) if cookie_list else cookies
- print(f"[{self.platform_name}] Cookie 长度: {len(cookie_string)}")
-
- self.report_progress(20, "正在上传视频...")
-
- # 创建客户端
- xhs_client = XhsClient(cookie_string, sign=self.sign_sync)
-
- print(f"[{self.platform_name}] 开始调用 create_video_note...")
-
- # 发布视频
- try:
- result = xhs_client.create_video_note(
- title=params.title,
- desc=params.description or params.title,
- topics=params.tags or [],
- post_time=params.publish_date.strftime("%Y-%m-%d %H:%M:%S") if params.publish_date else None,
- video_path=params.video_path,
- cover_path=params.cover_path if params.cover_path and os.path.exists(params.cover_path) else None
- )
- print(f"[{self.platform_name}] SDK 返回结果: {result}")
- except Exception as e:
- import traceback
- traceback.print_exc()
- print(f"[{self.platform_name}] SDK 调用失败: {e}")
- raise Exception(f"XHS SDK 发布失败: {e}")
-
- # 验证返回结果
- if not result:
- raise Exception("XHS SDK 返回空结果")
-
- # 检查是否有错误
- if isinstance(result, dict):
- if result.get("code") and result.get("code") != 0:
- raise Exception(f"发布失败: {result.get('msg', '未知错误')}")
- if result.get("success") == False:
- raise Exception(f"发布失败: {result.get('msg', result.get('error', '未知错误'))}")
-
- note_id = result.get("note_id", "") if isinstance(result, dict) else ""
- video_url = result.get("url", "") if isinstance(result, dict) else ""
-
- if not note_id:
- print(f"[{self.platform_name}] 警告: 未获取到 note_id,返回结果: {result}")
-
- self.report_progress(100, "发布成功")
- print(f"[{self.platform_name}] 发布成功! note_id={note_id}, url={video_url}")
-
- return PublishResult(
- success=True,
- platform=self.platform_name,
- video_id=note_id,
- video_url=video_url,
- message="发布成功"
- )
-
- async def publish(self, cookies: str, params: PublishParams) -> PublishResult:
- """发布视频到小红书 - 参考 matrix/xhs_uploader/main.py"""
- print(f"\n{'='*60}")
- print(f"[{self.platform_name}] 开始发布视频")
- print(f"[{self.platform_name}] 视频路径: {params.video_path}")
- print(f"[{self.platform_name}] 标题: {params.title}")
- print(f"[{self.platform_name}] Headless: {self.headless}")
- print(f"[{self.platform_name}] XHS SDK 可用: {XHS_SDK_AVAILABLE}")
- print(f"{'='*60}")
-
- # 检查视频文件
- if not os.path.exists(params.video_path):
- raise Exception(f"视频文件不存在: {params.video_path}")
-
- print(f"[{self.platform_name}] 视频文件存在,大小: {os.path.getsize(params.video_path)} bytes")
-
- self.report_progress(5, "正在准备发布...")
-
- # 参考 matrix: 优先使用 XHS SDK API 方式发布(更稳定)
- if XHS_SDK_AVAILABLE:
- try:
- print(f"[{self.platform_name}] 尝试使用 XHS SDK API 发布...")
- result = await self.publish_via_api(cookies, params)
- print(f"[{self.platform_name}] API 发布完成: success={result.success}")
-
- # 如果 API 返回成功,直接返回
- if result.success:
- return result
-
- # 如果 API 返回失败但有具体错误,也返回
- if result.error and "请刷新" not in result.error:
- return result
-
- # 其他情况尝试 Playwright 方式
- print(f"[{self.platform_name}] API 方式未成功,尝试 Playwright...")
- except Exception as e:
- import traceback
- traceback.print_exc()
- print(f"[{self.platform_name}] API 发布失败: {e}")
- print(f"[{self.platform_name}] 尝试使用 Playwright 方式...")
-
- # 使用 Playwright 方式发布
- print(f"[{self.platform_name}] 使用 Playwright 方式发布...")
- return await self.publish_via_playwright(cookies, params)
-
- async def publish_via_playwright(self, cookies: str, params: PublishParams) -> PublishResult:
- """通过 Playwright 发布视频"""
- self.report_progress(10, "正在初始化浏览器...")
- print(f"[{self.platform_name}] Playwright 方式开始...")
-
- await self.init_browser()
-
- cookie_list = self.parse_cookies(cookies)
- print(f"[{self.platform_name}] 设置 {len(cookie_list)} 个 cookies")
- await self.set_cookies(cookie_list)
-
- if not self.page:
- raise Exception("Page not initialized")
-
- self.report_progress(15, "正在打开发布页面...")
-
- # 直接访问视频发布页面
- publish_url = "https://creator.xiaohongshu.com/publish/publish?source=official"
- print(f"[{self.platform_name}] 打开页面: {publish_url}")
- await self.page.goto(publish_url)
- await asyncio.sleep(3)
-
- current_url = self.page.url
- print(f"[{self.platform_name}] 当前 URL: {current_url}")
- async def wait_for_manual_login(timeout_seconds: int = 300) -> bool:
- if not self.page:
- return False
- self.report_progress(12, "检测到需要登录,请在浏览器窗口完成登录...")
- try:
- await self.page.bring_to_front()
- except:
- pass
- waited = 0
- while waited < timeout_seconds:
- try:
- url = self.page.url
- if "login" not in url and "passport" not in url and "creator.xiaohongshu.com" in url:
- return True
- await asyncio.sleep(2)
- waited += 2
- except:
- await asyncio.sleep(2)
- waited += 2
- return False
- async def wait_for_manual_captcha(timeout_seconds: int = 180) -> bool:
- waited = 0
- while waited < timeout_seconds:
- try:
- ai_captcha = await self.ai_check_captcha()
- if not ai_captcha.get("has_captcha"):
- return True
- except:
- pass
- await asyncio.sleep(3)
- waited += 3
- return False
-
- # 检查登录状态
- if "login" in current_url or "passport" in current_url:
- if not self.headless:
- logged_in = await wait_for_manual_login()
- if logged_in:
- try:
- if self.context:
- cookies_after = await self.context.cookies()
- await self.sync_cookies_to_node(cookies_after)
- except:
- pass
- await self.page.goto(publish_url)
- await asyncio.sleep(3)
- current_url = self.page.url
- else:
- screenshot_base64 = await self.capture_screenshot()
- return PublishResult(
- success=False,
- platform=self.platform_name,
- error="需要登录:请在浏览器窗口完成登录后重试",
- screenshot_base64=screenshot_base64,
- page_url=current_url,
- status='need_captcha',
- need_captcha=True,
- captcha_type='login'
- )
- else:
- screenshot_base64 = await self.capture_screenshot()
- return PublishResult(
- success=False,
- platform=self.platform_name,
- error="登录已过期,请重新登录",
- screenshot_base64=screenshot_base64,
- page_url=current_url,
- status='need_captcha',
- need_captcha=True,
- captcha_type='login'
- )
-
- # 使用 AI 检查验证码
- ai_captcha = await self.ai_check_captcha()
- if ai_captcha['has_captcha']:
- print(f"[{self.platform_name}] AI检测到验证码: {ai_captcha['captcha_type']}", flush=True)
- if not self.headless:
- solved = await wait_for_manual_captcha()
- if solved:
- try:
- if self.context:
- cookies_after = await self.context.cookies()
- await self.sync_cookies_to_node(cookies_after)
- except:
- pass
- else:
- screenshot_base64 = await self.capture_screenshot()
- return PublishResult(
- success=False,
- platform=self.platform_name,
- error=f"需要验证码:请在浏览器窗口完成验证后重试",
- screenshot_base64=screenshot_base64,
- page_url=current_url,
- status='need_captcha',
- need_captcha=True,
- captcha_type=ai_captcha['captcha_type']
- )
- else:
- screenshot_base64 = await self.capture_screenshot()
- return PublishResult(
- success=False,
- platform=self.platform_name,
- error=f"检测到{ai_captcha['captcha_type']}验证码,需要使用有头浏览器完成验证",
- screenshot_base64=screenshot_base64,
- page_url=current_url,
- status='need_captcha',
- need_captcha=True,
- captcha_type=ai_captcha['captcha_type']
- )
-
- self.report_progress(20, "正在上传视频...")
-
- # 等待页面加载
- await asyncio.sleep(2)
-
- # 上传视频
- upload_triggered = False
-
- # 方法1: 直接设置隐藏的 file input
- print(f"[{self.platform_name}] 尝试方法1: 设置 file input")
- file_inputs = self.page.locator('input[type="file"]')
- input_count = await file_inputs.count()
- print(f"[{self.platform_name}] 找到 {input_count} 个 file input")
-
- if input_count > 0:
- # 找到接受视频的 input
- for i in range(input_count):
- input_el = file_inputs.nth(i)
- accept = await input_el.get_attribute('accept') or ''
- print(f"[{self.platform_name}] Input {i} accept: {accept}")
- if 'video' in accept or '*' in accept or not accept:
- await input_el.set_input_files(params.video_path)
- upload_triggered = True
- print(f"[{self.platform_name}] 视频文件已设置到 input {i}")
- break
-
- # 方法2: 点击上传区域触发文件选择器
- if not upload_triggered:
- print(f"[{self.platform_name}] 尝试方法2: 点击上传区域")
- try:
- upload_area = self.page.locator('[class*="upload-wrapper"], [class*="upload-area"], .upload-input').first
- if await upload_area.count() > 0:
- async with self.page.expect_file_chooser(timeout=5000) as fc_info:
- await upload_area.click()
- file_chooser = await fc_info.value
- await file_chooser.set_files(params.video_path)
- upload_triggered = True
- print(f"[{self.platform_name}] 通过点击上传区域上传成功")
- except Exception as e:
- print(f"[{self.platform_name}] 方法2失败: {e}")
-
- if not upload_triggered:
- screenshot_base64 = await self.capture_screenshot()
- page_url = await self.get_page_url()
- return PublishResult(
- success=False,
- platform=self.platform_name,
- error="无法上传视频文件",
- screenshot_base64=screenshot_base64,
- page_url=page_url,
- status='need_action'
- )
-
- self.report_progress(40, "等待视频上传完成...")
- print(f"[{self.platform_name}] 等待视频上传和处理...")
-
- # 等待上传完成(检测页面变化)
- upload_complete = False
- for i in range(60): # 最多等待3分钟
- await asyncio.sleep(3)
-
- # 检查是否有标题输入框(上传完成后出现)
- title_input_count = await self.page.locator('input[placeholder*="标题"], input[placeholder*="填写标题"]').count()
- # 或者检查编辑器区域
- editor_count = await self.page.locator('[class*="ql-editor"], [contenteditable="true"]').count()
- # 检查发布按钮是否可见
- publish_btn_count = await self.page.locator('.publishBtn, button:has-text("发布")').count()
-
- print(f"[{self.platform_name}] 检测 {i+1}: 标题框={title_input_count}, 编辑器={editor_count}, 发布按钮={publish_btn_count}")
-
- if title_input_count > 0 or (editor_count > 0 and publish_btn_count > 0):
- upload_complete = True
- print(f"[{self.platform_name}] 视频上传完成!")
- break
-
- if not upload_complete:
- screenshot_base64 = await self.capture_screenshot()
- page_url = await self.get_page_url()
- return PublishResult(
- success=False,
- platform=self.platform_name,
- error="视频上传超时",
- screenshot_base64=screenshot_base64,
- page_url=page_url,
- status='need_action'
- )
-
- await asyncio.sleep(2)
-
- self.report_progress(60, "正在填写笔记信息...")
- print(f"[{self.platform_name}] 填写标题: {params.title[:20]}")
-
- # 填写标题
- title_filled = False
- title_selectors = [
- 'input[placeholder*="标题"]',
- 'input[placeholder*="填写标题"]',
- '[class*="title"] input',
- '.c-input_inner',
- ]
- for selector in title_selectors:
- title_input = self.page.locator(selector).first
- if await title_input.count() > 0:
- await title_input.click()
- await title_input.fill('') # 先清空
- await title_input.fill(params.title[:20])
- title_filled = True
- print(f"[{self.platform_name}] 标题已填写,使用选择器: {selector}")
- break
-
- if not title_filled:
- print(f"[{self.platform_name}] 警告: 未找到标题输入框")
-
- # 填写描述和标签
- if params.description or params.tags:
- desc_filled = False
- desc_selectors = [
- '[class*="ql-editor"]',
- '[class*="content-input"] [contenteditable="true"]',
- '[class*="editor"] [contenteditable="true"]',
- '.ql-editor',
- ]
- for selector in desc_selectors:
- desc_input = self.page.locator(selector).first
- if await desc_input.count() > 0:
- await desc_input.click()
- await asyncio.sleep(0.5)
-
- if params.description:
- await self.page.keyboard.type(params.description, delay=20)
- print(f"[{self.platform_name}] 描述已填写")
-
- if params.tags:
- # 添加标签
- await self.page.keyboard.press("Enter")
- for tag in params.tags[:5]: # 最多5个标签
- await self.page.keyboard.type(f"#{tag}", delay=20)
- await asyncio.sleep(0.3)
- await self.page.keyboard.press("Space")
- print(f"[{self.platform_name}] 标签已填写: {params.tags[:5]}")
-
- desc_filled = True
- break
-
- if not desc_filled:
- print(f"[{self.platform_name}] 警告: 未找到描述输入框")
-
- await asyncio.sleep(2)
- self.report_progress(80, "正在发布...")
-
- await asyncio.sleep(2)
-
- # 滚动到页面底部确保发布按钮可见
- await self.page.evaluate("window.scrollTo(0, document.body.scrollHeight)")
- await asyncio.sleep(1)
-
- print(f"[{self.platform_name}] 查找发布按钮...")
-
- # 点击发布
- publish_selectors = [
- 'button.publishBtn',
- '.publishBtn',
- 'button.d-button.red',
- 'button:has-text("发布"):not(:has-text("定时发布"))',
- '[class*="publish"][class*="btn"]',
- ]
-
- publish_clicked = False
- for selector in publish_selectors:
- try:
- btn = self.page.locator(selector).first
- if await btn.count() > 0:
- is_visible = await btn.is_visible()
- is_enabled = await btn.is_enabled()
- print(f"[{self.platform_name}] 按钮 {selector}: visible={is_visible}, enabled={is_enabled}")
-
- if is_visible and is_enabled:
- box = await btn.bounding_box()
- if box:
- print(f"[{self.platform_name}] 点击发布按钮: {selector}, 位置: ({box['x']}, {box['y']})")
- # 使用真实鼠标点击
- await self.page.mouse.click(box['x'] + box['width']/2, box['y'] + box['height']/2)
- publish_clicked = True
- break
- except Exception as e:
- print(f"[{self.platform_name}] 选择器 {selector} 错误: {e}")
-
- if not publish_clicked:
- try:
- suggest = await self.ai_suggest_playwright_selector("点击小红书发布按钮")
- if suggest.get("has_selector") and suggest.get("selector"):
- sel = suggest.get("selector")
- btn = self.page.locator(sel).first
- if await btn.count() > 0 and await btn.is_visible() and await btn.is_enabled():
- try:
- await btn.click()
- except:
- box = await btn.bounding_box()
- if box:
- await self.page.mouse.click(box['x'] + box['width']/2, box['y'] + box['height']/2)
- publish_clicked = True
- except Exception as e:
- print(f"[{self.platform_name}] AI 点击发布按钮失败: {e}", flush=True)
- if not publish_clicked:
- # 保存截图用于调试
- screenshot_path = f"debug_publish_failed_{self.platform_name}.png"
- await self.page.screenshot(path=screenshot_path, full_page=True)
- print(f"[{self.platform_name}] 未找到发布按钮,截图保存到: {screenshot_path}")
-
- # 打印页面 HTML 结构用于调试
- buttons = await self.page.query_selector_all('button')
- print(f"[{self.platform_name}] 页面上共有 {len(buttons)} 个按钮")
- for i, btn in enumerate(buttons[:10]):
- text = await btn.text_content() or ''
- cls = await btn.get_attribute('class') or ''
- print(f" 按钮 {i}: text='{text.strip()[:30]}', class='{cls[:50]}'")
-
- raise Exception("未找到发布按钮")
-
- print(f"[{self.platform_name}] 已点击发布按钮,等待发布完成...")
- self.report_progress(90, "等待发布结果...")
-
- # 等待发布完成(检测 URL 变化或成功提示)
- publish_success = False
- for i in range(20): # 最多等待 20 秒
- await asyncio.sleep(1)
-
- current_url = self.page.url
-
- # 检查是否跳转到发布成功页面或内容管理页面
- if "published=true" in current_url or "success" in current_url or "content" in current_url:
- publish_success = True
- print(f"[{self.platform_name}] 发布成功! 跳转到: {current_url}")
- break
-
- # 检查是否有成功提示
- try:
- success_msg = await self.page.locator('[class*="success"], .toast-success, [class*="Toast"]').first.is_visible()
- if success_msg:
- publish_success = True
- print(f"[{self.platform_name}] 检测到成功提示!")
- break
- except:
- pass
-
- # 检查是否有错误提示
- try:
- error_elements = self.page.locator('[class*="error"], .toast-error, [class*="fail"]')
- if await error_elements.count() > 0:
- error_text = await error_elements.first.text_content()
- if error_text and len(error_text.strip()) > 0:
- raise Exception(f"发布失败: {error_text.strip()}")
- except Exception as e:
- if "发布失败" in str(e):
- raise
-
- # 如果没有明确的成功标志,返回截图供 AI 分析
- if not publish_success:
- final_url = self.page.url
- print(f"[{self.platform_name}] 发布结果不确定,当前 URL: {final_url}")
- screenshot_base64 = await self.capture_screenshot()
- print(f"[{self.platform_name}] 已获取截图供 AI 分析")
-
- # 如果 URL 还是发布页面,可能需要继续操作
- if "publish/publish" in final_url:
- return PublishResult(
- success=False,
- platform=self.platform_name,
- error="发布结果待确认,请查看截图",
- screenshot_base64=screenshot_base64,
- page_url=final_url,
- status='need_action'
- )
-
- self.report_progress(100, "发布完成")
- print(f"[{self.platform_name}] Playwright 方式发布完成!")
- screenshot_base64 = await self.capture_screenshot()
- page_url = await self.get_page_url()
-
- return PublishResult(
- success=True,
- platform=self.platform_name,
- message="发布完成",
- screenshot_base64=screenshot_base64,
- page_url=page_url,
- status='success'
- )
-
- async def get_account_info(self, cookies: str) -> dict:
- """获取账号信息"""
- print(f"\n{'='*60}")
- print(f"[{self.platform_name}] 获取账号信息")
- print(f"{'='*60}")
-
- captured_info = {}
-
- try:
- await self.init_browser()
- cookie_list = self.parse_cookies(cookies)
- await self.set_cookies(cookie_list)
-
- if not self.page:
- raise Exception("Page not initialized")
-
- # 监听个人信息 API
- async def handle_response(response):
- nonlocal captured_info
- if 'api/galaxy/creator/home/personal_info' in response.url:
- try:
- json_data = await response.json()
- print(f"[{self.platform_name}] 捕获个人信息 API", flush=True)
- if json_data.get('success') or json_data.get('code') == 0:
- data = json_data.get('data', {})
- captured_info = {
- "account_id": f"xhs_{data.get('red_num', '')}",
- "account_name": data.get('name', ''),
- "avatar_url": data.get('avatar', ''),
- "fans_count": data.get('fans_count', 0),
- "works_count": 0 # 暂时无法直接获取准确的作品数,需要从作品列表获取
- }
- except Exception as e:
- print(f"[{self.platform_name}] 解析个人信息失败: {e}", flush=True)
-
- self.page.on('response', handle_response)
-
- # 访问首页
- print(f"[{self.platform_name}] 访问创作者首页...", flush=True)
- await self.page.goto("https://creator.xiaohongshu.com/new/home", wait_until="domcontentloaded")
-
- # 等待 API 响应
- for _ in range(10):
- if captured_info:
- break
- await asyncio.sleep(1)
-
- if not captured_info:
- print(f"[{self.platform_name}] 未捕获到个人信息,尝试刷新...", flush=True)
- await self.page.reload()
- for _ in range(10):
- if captured_info:
- break
- await asyncio.sleep(1)
-
- if not captured_info:
- raise Exception("无法获取账号信息")
-
- # 尝试获取作品数(从首页或其他地方)
- # 或者简单地返回已获取的信息,作品数由 get_works 更新
-
- return {
- "success": True,
- **captured_info
- }
-
- except Exception as e:
- import traceback
- traceback.print_exc()
- return {
- "success": False,
- "error": str(e)
- }
- finally:
- await self.close_browser()
- async def get_works(self, cookies: str, page: int = 0, page_size: int = 20) -> WorksResult:
- """获取小红书作品列表 - 通过直接调用创作者笔记列表 API 获取"""
- print(f"\n{'='*60}", flush=True)
- print(f"[{self.platform_name}] 获取作品列表", flush=True)
- print(f"[{self.platform_name}] page={page}, page_size={page_size}", flush=True)
- print(f"{'='*60}", flush=True)
-
- works: List[WorkItem] = []
- total = 0
- has_more = False
- next_page = ""
- api_page_size = 20
-
- try:
- await self.init_browser()
- cookie_list = self.parse_cookies(cookies)
-
- # 打印 cookies 信息用于调试
- print(f"[{self.platform_name}] 解析到 {len(cookie_list)} 个 cookies", flush=True)
-
- await self.set_cookies(cookie_list)
-
- if not self.page:
- raise Exception("Page not initialized")
- # 访问笔记管理页面 - 页面会自动发起 API 请求
- print(f"[{self.platform_name}] 访问笔记管理页面...", flush=True)
-
- try:
- await self.page.goto("https://creator.xiaohongshu.com/new/note-manager", wait_until="domcontentloaded", timeout=30000)
- except Exception as nav_error:
- print(f"[{self.platform_name}] 导航超时,但继续尝试: {nav_error}", flush=True)
-
- # 检查登录状态
- current_url = self.page.url
- print(f"[{self.platform_name}] 当前页面: {current_url}", flush=True)
- if "login" in current_url:
- raise Exception("Cookie 已过期,请重新登录")
-
- # 等待页面完全加载,确保签名函数可用
- print(f"[{self.platform_name}] 等待页面完全加载和签名函数初始化...", flush=True)
- await asyncio.sleep(3)
-
- # 检查签名函数是否可用
- sign_check_attempts = 0
- max_sign_check_attempts = 10
- while sign_check_attempts < max_sign_check_attempts:
- sign_available = await self.page.evaluate("""() => {
- return typeof window !== 'undefined' && typeof window._webmsxyw === 'function';
- }""")
- if sign_available:
- print(f"[{self.platform_name}] ✓ 签名函数 _webmsxyw 已可用", flush=True)
- break
- sign_check_attempts += 1
- print(f"[{self.platform_name}] ⏳ 等待签名函数... ({sign_check_attempts}/{max_sign_check_attempts})", flush=True)
- await asyncio.sleep(1)
-
- if sign_check_attempts >= max_sign_check_attempts:
- print(f"[{self.platform_name}] ⚠️ 警告: 签名函数 _webmsxyw 在 {max_sign_check_attempts} 次检查后仍不可用", flush=True)
- print(f"[{self.platform_name}] 继续尝试,但 API 调用可能会失败", flush=True)
- async def fetch_notes_page(p):
- # 再次检查签名函数(每次调用前都检查)
- sign_available = await self.page.evaluate("""() => {
- return typeof window !== 'undefined' && typeof window._webmsxyw === 'function';
- }""")
-
- if not sign_available:
- print(f"[{self.platform_name}] ⚠️ 签名函数 _webmsxyw 不可用,等待...", flush=True)
- await asyncio.sleep(2)
-
- return await self.page.evaluate(
- """async (pageNum) => {
- try {
- // 使用正确的 API 端点:/api/galaxy/v2/creator/note/user/posted
- const url = `/api/galaxy/v2/creator/note/user/posted?tab=0&page=${pageNum}`;
- const headers = {
- 'Accept': 'application/json, text/plain, */*',
- 'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6',
- 'Referer': 'https://creator.xiaohongshu.com/new/note-manager',
- 'Sec-Fetch-Dest': 'empty',
- 'Sec-Fetch-Mode': 'cors',
- 'Sec-Fetch-Site': 'same-origin'
- };
-
- // 尝试获取签名
- let signResult = { hasSign: false, x_s: '', x_t: '', x_s_common: '', error: '' };
- if (typeof window !== 'undefined' && typeof window._webmsxyw === 'function') {
- try {
- const sign = window._webmsxyw(url, '');
- headers['x-s'] = sign['X-s'];
- headers['x-t'] = String(sign['X-t']);
- // 检查是否有 x-s-common
- if (sign['X-s-common']) {
- headers['x-s-common'] = sign['X-s-common'];
- }
- signResult = {
- hasSign: true,
- x_s: sign['X-s'] ? sign['X-s'].substring(0, 50) + '...' : '',
- x_t: String(sign['X-t']),
- x_s_common: sign['X-s-common'] ? sign['X-s-common'].substring(0, 50) + '...' : '',
- error: ''
- };
- console.log('签名生成成功:', signResult);
- } catch (e) {
- signResult.error = e.toString();
- console.error('签名生成失败:', e);
- }
- } else {
- signResult.error = '_webmsxyw function not found';
- console.error('签名函数不存在');
- }
-
- const res = await fetch(url, {
- method: 'GET',
- credentials: 'include',
- headers
- });
-
- const responseData = await res.json();
- return {
- ...responseData,
- _debug: {
- signResult: signResult,
- status: res.status,
- statusText: res.statusText
- }
- };
- } catch (e) {
- return { success: false, error: e.toString() };
- }
- }""",
- p
- )
- def parse_notes(notes_list):
- parsed = []
- for note in notes_list:
- note_id = note.get('id', '')
- if not note_id:
- continue
-
- cover_url = ''
- images_list = note.get('images_list', [])
- if images_list:
- cover_url = images_list[0].get('url', '')
- if cover_url.startswith('http://'):
- cover_url = cover_url.replace('http://', 'https://')
-
- duration = note.get('video_info', {}).get('duration', 0)
-
- status = 'published'
- tab_status = note.get('tab_status', 1)
- if tab_status == 0:
- status = 'draft'
- elif tab_status == 2:
- status = 'reviewing'
- elif tab_status == 3:
- status = 'rejected'
-
- parsed.append(WorkItem(
- work_id=note_id,
- title=note.get('display_title', '') or '无标题',
- cover_url=cover_url,
- duration=duration,
- status=status,
- publish_time=note.get('time', ''),
- play_count=note.get('view_count', 0),
- like_count=note.get('likes', 0),
- comment_count=note.get('comments_count', 0),
- share_count=note.get('shared_count', 0),
- collect_count=note.get('collected_count', 0),
- ))
- return parsed
- resp = None
- for attempt in range(1, 4):
- resp = await fetch_notes_page(page)
-
- # 打印调试信息
- if resp and isinstance(resp, dict) and resp.get('_debug'):
- debug_info = resp.get('_debug', {})
- sign_result = debug_info.get('signResult', {})
- print(f"[{self.platform_name}] 🔍 调试信息: 签名可用: {sign_result.get('hasSign', False)}, X-S: {sign_result.get('x_s', '')}, X-T: {sign_result.get('x_t', '')}, X-S-Common: {sign_result.get('x_s_common', '')}, 签名错误: {sign_result.get('error', '')}, HTTP 状态: {debug_info.get('status', 'N/A')}", flush=True)
- resp.pop('_debug', None)
-
- if resp and (resp.get('success') or resp.get('code') == 0) and resp.get('data'):
- break
- print(f"[{self.platform_name}] 拉取作品列表失败,重试 {attempt}/3: {str(resp)[:200]}", flush=True)
- await asyncio.sleep(1.2 * attempt)
- if not resp or not (resp.get('success') or resp.get('code') == 0) or not resp.get('data'):
- error_msg = resp.get('msg') if isinstance(resp, dict) else str(resp)
- # 打印详细的错误信息
- if isinstance(resp, dict):
- if resp.get('msg'):
- print(f"[{self.platform_name}] 错误消息: {resp.get('msg')}", flush=True)
- if resp.get('message'):
- print(f"[{self.platform_name}] 错误消息: {resp.get('message')}", flush=True)
- if resp.get('error'):
- print(f"[{self.platform_name}] 错误: {resp.get('error')}", flush=True)
- raise Exception(f"无法获取作品列表数据: {error_msg}")
- data = resp.get('data', {}) or {}
- notes = data.get('notes', []) or []
- print(f"[{self.platform_name}] 第 {page} 页 notes 数量: {len(notes)}", flush=True)
- tags = data.get('tags', []) or []
- if tags:
- preferred = 0
- for tag in tags:
- if tag.get('id') == 'special.note_time_desc':
- preferred = tag.get('notes_count', 0) or tag.get('notesCount', 0) or tag.get('count', 0) or 0
- break
- if preferred:
- total = preferred
- else:
- total = max([int(t.get('notes_count', 0) or t.get('notesCount', 0) or t.get('count', 0) or 0) for t in tags] + [0])
- if not total:
- total = int(data.get('total', 0) or data.get('total_count', 0) or data.get('totalCount', 0) or 0)
- if not total and isinstance(data.get('page', {}), dict):
- total = int(data.get('page', {}).get('total', 0) or data.get('page', {}).get('totalCount', 0) or 0)
- next_page = data.get('page', "")
- if next_page == page:
- next_page = page + 1
- works.extend(parse_notes(notes))
- if total:
- has_more = (page * api_page_size + len(notes)) < total
- if has_more and (next_page == -1 or str(next_page) == "-1" or next_page == "" or next_page is None):
- next_page = page + 1
- else:
- if len(notes) == 0:
- has_more = False
- else:
- next_resp = await fetch_notes_page(page + 1)
- next_data = (next_resp or {}).get('data', {}) if isinstance(next_resp, dict) else {}
- next_notes = next_data.get('notes', []) or []
- has_more = len(next_notes) > 0
- next_page = next_data.get('page', next_page)
-
- except Exception as e:
- import traceback
- print(f"[{self.platform_name}] 发生异常: {e}", flush=True)
- traceback.print_exc()
- return WorksResult(
- success=False,
- platform=self.platform_name,
- error=str(e)
- )
- finally:
- # 确保关闭浏览器
- await self.close_browser()
-
- return WorksResult(
- success=True,
- platform=self.platform_name,
- works=works,
- total=total or (page * api_page_size + len(works)),
- has_more=has_more,
- next_page=next_page
- )
- async def get_all_works(self, cookies: str) -> WorksResult:
- """获取小红书全部作品(单次请求内自动翻页抓全量,避免 Node 侧分页不一致)"""
- print(f"\n{'='*60}", flush=True)
- print(f"[{self.platform_name}] 获取全部作品(auto paging)", flush=True)
- print(f"{'='*60}", flush=True)
- works: List[WorkItem] = []
- total = 0
- seen_ids = set()
- cursor: object = 0
- max_iters = 800
- api_page_size = 20
- try:
- await self.init_browser()
- cookie_list = self.parse_cookies(cookies)
- print(f"[{self.platform_name}] 解析到 {len(cookie_list)} 个 cookies", flush=True)
- await self.set_cookies(cookie_list)
- if not self.page:
- raise Exception("Page not initialized")
- print(f"[{self.platform_name}] 访问笔记管理页面...", flush=True)
- try:
- await self.page.goto("https://creator.xiaohongshu.com/new/note-manager", wait_until="domcontentloaded", timeout=60000)
- print(f"[{self.platform_name}] 页面加载成功", flush=True)
- except Exception as nav_error:
- print(f"[{self.platform_name}] 导航超时,但继续尝试: {nav_error}", flush=True)
- # 即使超时也检查当前页面状态
- try:
- await asyncio.sleep(2)
- current_url = self.page.url
- print(f"[{self.platform_name}] 超时后当前页面: {current_url}", flush=True)
- except Exception as e:
- print(f"[{self.platform_name}] 检查页面状态时出错: {e}", flush=True)
- current_url = self.page.url
- print(f"[{self.platform_name}] 当前页面: {current_url}", flush=True)
- if "login" in current_url:
- raise Exception("Cookie 已过期,请重新登录")
-
- # 等待页面完全加载,确保签名函数可用
- print(f"[{self.platform_name}] 等待页面完全加载和签名函数初始化...", flush=True)
- await asyncio.sleep(3)
-
- # 检查签名函数是否可用
- sign_check_attempts = 0
- max_sign_check_attempts = 10
- while sign_check_attempts < max_sign_check_attempts:
- sign_available = await self.page.evaluate("""() => {
- return typeof window !== 'undefined' && typeof window._webmsxyw === 'function';
- }""")
- if sign_available:
- print(f"[{self.platform_name}] ✓ 签名函数 _webmsxyw 已可用", flush=True)
- break
- sign_check_attempts += 1
- print(f"[{self.platform_name}] ⏳ 等待签名函数... ({sign_check_attempts}/{max_sign_check_attempts})", flush=True)
- await asyncio.sleep(1)
-
- if sign_check_attempts >= max_sign_check_attempts:
- print(f"[{self.platform_name}] ⚠️ 警告: 签名函数 _webmsxyw 在 {max_sign_check_attempts} 次检查后仍不可用", flush=True)
- print(f"[{self.platform_name}] 继续尝试,但 API 调用可能会失败", flush=True)
- async def fetch_notes_page(p):
- # 再次检查签名函数(每次调用前都检查)
- sign_available = await self.page.evaluate("""() => {
- return typeof window !== 'undefined' && typeof window._webmsxyw === 'function';
- }""")
-
- if not sign_available:
- print(f"[{self.platform_name}] ⚠️ 签名函数 _webmsxyw 不可用,等待...", flush=True)
- await asyncio.sleep(2)
-
- return await self.page.evaluate(
- """async (pageNum) => {
- try {
- // 使用正确的 API 端点:/api/galaxy/v2/creator/note/user/posted
- const url = `/api/galaxy/v2/creator/note/user/posted?tab=0&page=${pageNum}`;
- const headers = {
- 'Accept': 'application/json, text/plain, */*',
- 'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6',
- 'Referer': 'https://creator.xiaohongshu.com/new/note-manager',
- 'Sec-Fetch-Dest': 'empty',
- 'Sec-Fetch-Mode': 'cors',
- 'Sec-Fetch-Site': 'same-origin'
- };
-
- // 尝试获取签名
- let signResult = { hasSign: false, x_s: '', x_t: '', x_s_common: '', error: '' };
- if (typeof window !== 'undefined' && typeof window._webmsxyw === 'function') {
- try {
- const sign = window._webmsxyw(url, '');
- headers['x-s'] = sign['X-s'];
- headers['x-t'] = String(sign['X-t']);
- // 检查是否有 x-s-common
- if (sign['X-s-common']) {
- headers['x-s-common'] = sign['X-s-common'];
- }
- signResult = {
- hasSign: true,
- x_s: sign['X-s'] ? sign['X-s'].substring(0, 50) + '...' : '',
- x_t: String(sign['X-t']),
- x_s_common: sign['X-s-common'] ? sign['X-s-common'].substring(0, 50) + '...' : '',
- error: ''
- };
- console.log('签名生成成功:', signResult);
- } catch (e) {
- signResult.error = e.toString();
- console.error('签名生成失败:', e);
- }
- } else {
- signResult.error = '_webmsxyw function not found';
- console.error('签名函数不存在');
- }
-
- const res = await fetch(url, {
- method: 'GET',
- credentials: 'include',
- headers
- });
-
- const responseData = await res.json();
- return {
- ...responseData,
- _debug: {
- signResult: signResult,
- status: res.status,
- statusText: res.statusText
- }
- };
- } catch (e) {
- return { success: false, error: e.toString() };
- }
- }""",
- p
- )
- def parse_notes(notes_list):
- parsed = []
- for note in notes_list:
- note_id = note.get('id', '')
- if not note_id:
- continue
- cover_url = ''
- images_list = note.get('images_list', [])
- if images_list:
- cover_url = images_list[0].get('url', '')
- if cover_url.startswith('http://'):
- cover_url = cover_url.replace('http://', 'https://')
- duration = note.get('video_info', {}).get('duration', 0)
- status = 'published'
- tab_status = note.get('tab_status', 1)
- if tab_status == 0:
- status = 'draft'
- elif tab_status == 2:
- status = 'reviewing'
- elif tab_status == 3:
- status = 'rejected'
- parsed.append(WorkItem(
- work_id=note_id,
- title=note.get('display_title', '') or '无标题',
- cover_url=cover_url,
- duration=duration,
- status=status,
- publish_time=note.get('time', ''),
- play_count=note.get('view_count', 0),
- like_count=note.get('likes', 0),
- comment_count=note.get('comments_count', 0),
- share_count=note.get('shared_count', 0),
- collect_count=note.get('collected_count', 0),
- ))
- return parsed
- async def collect_by_scrolling() -> WorksResult:
- print(f"[{self.platform_name}] 直连接口被拒绝,切换为滚动页面 + 监听 API 响应模式", flush=True)
- captured: List[WorkItem] = []
- captured_total = 0
- captured_seen = set()
- lock = asyncio.Lock()
- async def handle_response(response):
- nonlocal captured_total
- url = response.url
- if ("creator.xiaohongshu.com" not in url and "edith.xiaohongshu.com" not in url) or "creator/note/user/posted" not in url:
- return
- try:
- json_data = await response.json()
- except Exception:
- return
- if not isinstance(json_data, dict):
- return
- if not (json_data.get("success") or json_data.get("code") == 0) or not json_data.get("data"):
- return
- data = json_data.get("data", {}) or {}
- notes = data.get("notes", []) or []
- tags = data.get("tags", []) or []
- declared = 0
- if tags:
- preferred = 0
- for tag in tags:
- if tag.get("id") == "special.note_time_desc":
- preferred = tag.get("notes_count", 0) or tag.get("notesCount", 0) or tag.get("count", 0) or 0
- break
- if preferred:
- declared = int(preferred)
- else:
- declared = max([int(t.get("notes_count", 0) or t.get("notesCount", 0) or t.get("count", 0) or 0) for t in tags] + [0])
- if not declared:
- declared = int(data.get("total", 0) or data.get("total_count", 0) or data.get("totalCount", 0) or 0)
- if not declared and isinstance(data.get("page", {}), dict):
- declared = int(data.get("page", {}).get("total", 0) or data.get("page", {}).get("totalCount", 0) or 0)
- async with lock:
- if declared:
- captured_total = max(captured_total, declared)
- parsed = parse_notes(notes)
- new_count = 0
- for w in parsed:
- if w.work_id and w.work_id not in captured_seen:
- captured_seen.add(w.work_id)
- captured.append(w)
- new_count += 1
- if new_count > 0:
- print(
- f"[{self.platform_name}] 捕获 notes 响应: notes={len(notes)}, new={new_count}, total_now={len(captured)}, declared_total={captured_total}",
- flush=True
- )
- self.page.on("response", handle_response)
- try:
- try:
- # 使用更宽松的等待条件,避免超时
- await self.page.goto("https://creator.xiaohongshu.com/new/note-manager", wait_until="domcontentloaded", timeout=90000)
- print(f"[{self.platform_name}] 页面加载成功", flush=True)
- except Exception as nav_error:
- print(f"[{self.platform_name}] 导航异常(继续):{nav_error}", flush=True)
- # 即使超时也继续尝试,可能页面已经部分加载
- try:
- await asyncio.sleep(3)
- current_url = self.page.url
- print(f"[{self.platform_name}] 超时后当前页面: {current_url}", flush=True)
- if "login" in current_url:
- raise Exception("Cookie 已过期,请重新登录")
- except Exception as e:
- if "Cookie" in str(e):
- raise
- print(f"[{self.platform_name}] 检查页面状态时出错: {e}", flush=True)
- await asyncio.sleep(2.0)
- idle_rounds = 0
- last_count = 0
- last_height = 0
- for _ in range(1, 400):
- scroll_state = await self.page.evaluate(
- """() => {
- const isScrollable = (el) => {
- if (!el) return false;
- const style = window.getComputedStyle(el);
- const oy = style.overflowY;
- return (oy === 'auto' || oy === 'scroll') && (el.scrollHeight - el.clientHeight > 200);
- };
- const pickBest = () => {
- const nodes = Array.from(document.querySelectorAll('*'));
- let best = document.scrollingElement || document.documentElement || document.body;
- let bestScroll = (best.scrollHeight || 0) - (best.clientHeight || 0);
- for (const el of nodes) {
- if (!isScrollable(el)) continue;
- const diff = el.scrollHeight - el.clientHeight;
- if (diff > bestScroll) {
- best = el;
- bestScroll = diff;
- }
- }
- return best;
- };
- const el = pickBest();
- const beforeTop = el.scrollTop || 0;
- const beforeHeight = el.scrollHeight || 0;
- el.scrollTo(0, beforeHeight);
- return {
- beforeTop,
- afterTop: el.scrollTop || 0,
- height: el.scrollHeight || 0,
- client: el.clientHeight || 0,
- };
- }"""
- )
- await asyncio.sleep(1.2)
- async with lock:
- count_now = len(captured)
- total_now = captured_total
- if total_now and count_now >= total_now:
- break
- height_now = int(scroll_state.get("height", 0) or 0) if isinstance(scroll_state, dict) else 0
- if count_now == last_count and height_now == last_height:
- idle_rounds += 1
- else:
- idle_rounds = 0
- last_count = count_now
- last_height = height_now
- if idle_rounds >= 6:
- break
- async with lock:
- final_works = list(captured)
- final_total = captured_total or len(final_works)
- return WorksResult(
- success=True,
- platform=self.platform_name,
- works=final_works,
- total=final_total,
- has_more=False,
- next_page=-1
- )
- finally:
- try:
- self.page.remove_listener("response", handle_response)
- except Exception:
- pass
- # 添加请求监听,捕获请求头信息
- captured_requests = []
- async def handle_request(request):
- url = request.url
- if ("creator.xiaohongshu.com" in url or "edith.xiaohongshu.com" in url) and "creator/note/user/posted" in url:
- headers = request.headers
- captured_requests.append({
- "url": url,
- "method": request.method,
- "headers": dict(headers),
- "timestamp": asyncio.get_event_loop().time()
- })
- # 打印关键头部信息
- x_s = headers.get('x-s', '')
- x_t = headers.get('x-t', '')
- x_s_common = headers.get('x-s-common', '')
- print(f"[{self.platform_name}] 📡 API 请求: {url}", flush=True)
- print(f"[{self.platform_name}] Method: {request.method}", flush=True)
- print(f"[{self.platform_name}] X-S: {x_s[:50] if x_s else '(none)'}...", flush=True)
- print(f"[{self.platform_name}] X-T: {x_t}", flush=True)
- print(f"[{self.platform_name}] X-S-Common: {x_s_common[:50] if x_s_common else '(none)'}...", flush=True)
- print(f"[{self.platform_name}] Cookie: {headers.get('cookie', '')[:100]}...", flush=True)
-
- self.page.on("request", handle_request)
-
- iters = 0
- page_count = 0 # 统计实际获取到的页数
- print(f"[{self.platform_name}] ========== 开始自动分页获取作品 ==========", flush=True)
- print(f"[{self.platform_name}] 最大迭代次数: {max_iters}, 每页大小: {api_page_size}", flush=True)
-
- while iters < max_iters:
- iters += 1
- print(f"\n[{self.platform_name}] ---------- 第 {iters} 次请求 (cursor={cursor}) ----------", flush=True)
- resp = await fetch_notes_page(cursor)
-
- # 打印调试信息
- if resp and isinstance(resp, dict) and resp.get('_debug'):
- debug_info = resp.get('_debug', {})
- sign_result = debug_info.get('signResult', {})
- print(f"[{self.platform_name}] 🔍 调试信息:", flush=True)
- print(f"[{self.platform_name}] 签名可用: {sign_result.get('hasSign', False)}", flush=True)
- if sign_result.get('x_s'):
- print(f"[{self.platform_name}] X-S: {sign_result.get('x_s', '')}", flush=True)
- if sign_result.get('x_t'):
- print(f"[{self.platform_name}] X-T: {sign_result.get('x_t', '')}", flush=True)
- if sign_result.get('error'):
- print(f"[{self.platform_name}] 签名错误: {sign_result.get('error', '')}", flush=True)
- print(f"[{self.platform_name}] HTTP 状态: {debug_info.get('status', 'N/A')} {debug_info.get('statusText', '')}", flush=True)
- # 移除调试信息,避免影响后续处理
- resp.pop('_debug', None)
-
- if not resp or not isinstance(resp, dict):
- print(f"[{self.platform_name}] ❌ 第 {iters} 次拉取无响应,cursor={cursor}", flush=True)
- print(f"[{self.platform_name}] 响应类型: {type(resp)}, 响应内容: {str(resp)[:500]}", flush=True)
- break
- if not (resp.get('success') or resp.get('code') == 0) or not resp.get('data'):
- error_msg = str(resp)[:500]
- print(f"[{self.platform_name}] ❌ 拉取失败 cursor={cursor}", flush=True)
- print(f"[{self.platform_name}] 响应详情: {error_msg}", flush=True)
- print(f"[{self.platform_name}] success={resp.get('success')}, code={resp.get('code')}, has_data={bool(resp.get('data'))}", flush=True)
- # 打印详细的错误信息
- if resp.get('msg'):
- print(f"[{self.platform_name}] 错误消息: {resp.get('msg')}", flush=True)
- if resp.get('message'):
- print(f"[{self.platform_name}] 错误消息: {resp.get('message')}", flush=True)
- if resp.get('error'):
- print(f"[{self.platform_name}] 错误: {resp.get('error')}", flush=True)
- # 打印调试信息
- if resp.get('_debug'):
- debug_info = resp.get('_debug', {})
- print(f"[{self.platform_name}] HTTP 状态: {debug_info.get('status', 'N/A')} {debug_info.get('statusText', '')}", flush=True)
- sign_result = debug_info.get('signResult', {})
- if sign_result.get('error'):
- print(f"[{self.platform_name}] 签名错误: {sign_result.get('error')}", flush=True)
- if iters == 1:
- print(f"[{self.platform_name}] 第一次请求失败,切换到滚动模式", flush=True)
- return await collect_by_scrolling()
- break
- data = resp.get('data', {}) or {}
- notes = data.get('notes', []) or []
- if not notes:
- print(f"[{self.platform_name}] ⚠️ cursor={cursor} 无作品,停止分页", flush=True)
- break
- # 统计页数
- page_count += 1
- print(f"[{self.platform_name}] ✅ 第 {page_count} 页获取成功,本页作品数: {len(notes)}", flush=True)
- tags = data.get('tags', []) or []
- if tags:
- preferred = 0
- for tag in tags:
- if tag.get('id') == 'special.note_time_desc':
- preferred = tag.get('notes_count', 0) or tag.get('notesCount', 0) or tag.get('count', 0) or 0
- break
- if preferred:
- total = max(total, int(preferred))
- print(f"[{self.platform_name}] 📊 从 tags 获取总数: {total} (preferred)", flush=True)
- else:
- tag_total = max([int(t.get('notes_count', 0) or t.get('notesCount', 0) or t.get('count', 0) or 0) for t in tags] + [0])
- total = max(total, tag_total)
- if tag_total > 0:
- print(f"[{self.platform_name}] 📊 从 tags 获取总数: {total}", flush=True)
- if not total:
- t2 = int(data.get('total', 0) or data.get('total_count', 0) or data.get('totalCount', 0) or 0)
- if not t2 and isinstance(data.get('page', {}), dict):
- t2 = int(data.get('page', {}).get('total', 0) or data.get('page', {}).get('totalCount', 0) or 0)
- total = max(total, t2)
- if t2 > 0:
- print(f"[{self.platform_name}] 📊 从 data.total 获取总数: {total}", flush=True)
- parsed = parse_notes(notes)
- new_items = []
- for w in parsed:
- if w.work_id and w.work_id not in seen_ids:
- seen_ids.add(w.work_id)
- new_items.append(w)
- works.extend(new_items)
- print(f"[{self.platform_name}] 📈 累计统计: 本页新作品={len(new_items)}, 累计作品数={len(works)}, 声明总数={total}", flush=True)
- if total and len(works) >= total:
- print(f"[{self.platform_name}] ✅ 已获取全部作品 (累计={len(works)} >= 总数={total}),停止分页", flush=True)
- break
- if len(new_items) == 0:
- print(f"[{self.platform_name}] ⚠️ 本页无新作品,停止分页", flush=True)
- break
- next_page = data.get('page', "")
- old_cursor = cursor
- if next_page == cursor:
- next_page = ""
- if next_page == -1 or str(next_page) == "-1":
- next_page = ""
- if next_page is None or next_page == "":
- if isinstance(cursor, int):
- cursor = cursor + 1
- else:
- cursor = len(works) // api_page_size
- print(f"[{self.platform_name}] 🔄 下一页 cursor: {old_cursor} -> {cursor} (自动递增)", flush=True)
- else:
- cursor = next_page
- print(f"[{self.platform_name}] 🔄 下一页 cursor: {old_cursor} -> {cursor} (API返回)", flush=True)
- await asyncio.sleep(0.5)
-
- # 移除请求监听器
- try:
- self.page.remove_listener("request", handle_request)
- except Exception:
- pass
-
- print(f"\n[{self.platform_name}] ========== 分页完成 ==========", flush=True)
- print(f"[{self.platform_name}] 📊 分页统计: 总请求次数={iters}, 成功获取页数={page_count}, 累计作品数={len(works)}, 声明总数={total}", flush=True)
- if captured_requests:
- print(f"[{self.platform_name}] 📡 捕获到 {len(captured_requests)} 个 API 请求", flush=True)
- for i, req in enumerate(captured_requests[:3], 1): # 只显示前3个
- print(f"[{self.platform_name}] 请求 {i}: {req['method']} {req['url']}", flush=True)
- if 'x-s' in req['headers']:
- print(f"[{self.platform_name}] X-S: {req['headers']['x-s'][:50]}...", flush=True)
- if 'x-t' in req['headers']:
- print(f"[{self.platform_name}] X-T: {req['headers']['x-t']}", flush=True)
- print(f"[{self.platform_name}] ========================================\n", flush=True)
- except Exception as e:
- import traceback
- error_trace = traceback.format_exc()
- print(f"[{self.platform_name}] 发生异常: {e}", flush=True)
- traceback.print_exc()
- return WorksResult(
- success=False,
- platform=self.platform_name,
- error=str(e),
- debug_info=f"异常详情: {error_trace[:500]}"
- )
- finally:
- await self.close_browser()
- debug_info = f"总请求次数={iters}, 成功获取页数={page_count}, 累计作品数={len(works)}, 声明总数={total}"
- if len(works) == 0:
- debug_info += " | 警告: 没有获取到任何作品,可能原因: Cookie失效、API调用失败、或账号无作品"
-
- return WorksResult(
- success=True,
- platform=self.platform_name,
- works=works,
- total=total or len(works),
- has_more=False,
- next_page=-1,
- debug_info=debug_info
- )
-
- async def get_comments(self, cookies: str, work_id: str, cursor: str = "") -> CommentsResult:
- """获取小红书作品评论 - 通过创作者后台评论管理页面"""
- print(f"\n{'='*60}")
- print(f"[{self.platform_name}] 获取作品评论")
- print(f"[{self.platform_name}] work_id={work_id}, cursor={cursor}")
- print(f"{'='*60}")
-
- comments: List[CommentItem] = []
- total = 0
- has_more = False
- next_cursor = ""
- captured_data = {}
-
- try:
- await self.init_browser()
- cookie_list = self.parse_cookies(cookies)
- await self.set_cookies(cookie_list)
-
- if not self.page:
- raise Exception("Page not initialized")
-
- # 设置 API 响应监听器
- async def handle_response(response):
- nonlocal captured_data
- url = response.url
- # 监听评论相关 API - 创作者后台和普通页面的 API
- if '/comment/' in url and ('page' in url or 'list' in url):
- try:
- json_data = await response.json()
- print(f"[{self.platform_name}] 捕获到评论 API: {url[:100]}...", flush=True)
- if json_data.get('success') or json_data.get('code') == 0:
- data = json_data.get('data', {})
- comment_list = data.get('comments') or data.get('list') or []
- if comment_list:
- captured_data = json_data
- print(f"[{self.platform_name}] 评论 API 响应成功,comments={len(comment_list)}", flush=True)
- else:
- print(f"[{self.platform_name}] 评论 API 响应成功但无评论", flush=True)
- except Exception as e:
- print(f"[{self.platform_name}] 解析评论响应失败: {e}", flush=True)
-
- self.page.on('response', handle_response)
- print(f"[{self.platform_name}] 已注册评论 API 响应监听器", flush=True)
-
- # 访问创作者后台评论管理页面
- comment_url = "https://creator.xiaohongshu.com/creator/comment"
- print(f"[{self.platform_name}] 访问评论管理页面: {comment_url}", flush=True)
- await self.page.goto(comment_url, wait_until="domcontentloaded", timeout=30000)
- await asyncio.sleep(5)
-
- # 检查是否被重定向到登录页
- current_url = self.page.url
- print(f"[{self.platform_name}] 当前页面 URL: {current_url}", flush=True)
- if "login" in current_url:
- raise Exception("Cookie 已过期,请重新登录")
-
- # 等待评论加载
- if not captured_data:
- print(f"[{self.platform_name}] 等待评论 API 响应...", flush=True)
- # 尝试滚动页面触发评论加载
- await self.page.evaluate('window.scrollBy(0, 500)')
- await asyncio.sleep(3)
-
- if not captured_data:
- # 再等待一会,可能评论 API 加载较慢
- print(f"[{self.platform_name}] 继续等待评论加载...", flush=True)
- await asyncio.sleep(5)
-
- # 移除监听器
- self.page.remove_listener('response', handle_response)
-
- # 解析评论数据
- if captured_data:
- data = captured_data.get('data', {})
- comment_list = data.get('comments') or data.get('list') or []
- has_more = data.get('has_more', False)
- next_cursor = data.get('cursor', '')
-
- print(f"[{self.platform_name}] 解析评论: has_more={has_more}, comments={len(comment_list)}", flush=True)
-
- for comment in comment_list:
- cid = comment.get('id', '')
- if not cid:
- continue
-
- user_info = comment.get('user_info', {})
-
- # 解析子评论
- replies = []
- sub_comments = comment.get('sub_comments', []) or []
- for sub in sub_comments:
- sub_user = sub.get('user_info', {})
- replies.append(CommentItem(
- comment_id=sub.get('id', ''),
- work_id=work_id,
- content=sub.get('content', ''),
- author_id=sub_user.get('user_id', ''),
- author_name=sub_user.get('nickname', ''),
- author_avatar=sub_user.get('image', ''),
- like_count=sub.get('like_count', 0),
- create_time=sub.get('create_time', ''),
- ))
-
- comments.append(CommentItem(
- comment_id=cid,
- work_id=work_id,
- content=comment.get('content', ''),
- author_id=user_info.get('user_id', ''),
- author_name=user_info.get('nickname', ''),
- author_avatar=user_info.get('image', ''),
- like_count=comment.get('like_count', 0),
- reply_count=comment.get('sub_comment_count', 0),
- create_time=comment.get('create_time', ''),
- replies=replies,
- ))
-
- total = len(comments)
- print(f"[{self.platform_name}] 解析到 {total} 条评论", flush=True)
- else:
- print(f"[{self.platform_name}] 未捕获到评论 API 响应", flush=True)
-
- except Exception as e:
- import traceback
- traceback.print_exc()
- return CommentsResult(
- success=False,
- platform=self.platform_name,
- work_id=work_id,
- error=str(e)
- )
- finally:
- await self.close_browser()
-
- result = CommentsResult(
- success=True,
- platform=self.platform_name,
- work_id=work_id,
- comments=comments,
- total=total,
- has_more=has_more
- )
- result.__dict__['cursor'] = next_cursor
- return result
-
- async def get_all_comments(self, cookies: str) -> dict:
- """获取所有作品的评论 - 通过评论管理页面"""
- print(f"\n{'='*60}")
- print(f"[{self.platform_name}] 获取所有作品评论")
- print(f"{'='*60}")
-
- all_work_comments = []
- captured_comments = []
- captured_notes = {} # note_id -> note_info
-
- try:
- await self.init_browser()
- cookie_list = self.parse_cookies(cookies)
- await self.set_cookies(cookie_list)
-
- if not self.page:
- raise Exception("Page not initialized")
-
- # 设置 API 响应监听器
- async def handle_response(response):
- nonlocal captured_comments, captured_notes
- url = response.url
- try:
- # 监听评论列表 API - 多种格式
- if '/comment/' in url and ('page' in url or 'list' in url):
- json_data = await response.json()
- print(f"[{self.platform_name}] 捕获到评论 API: {url[:100]}...", flush=True)
-
- if json_data.get('success') or json_data.get('code') == 0:
- data = json_data.get('data', {})
- comments = data.get('comments', []) or data.get('list', [])
-
- # 从 URL 中提取 note_id
- import re
- note_id_match = re.search(r'note_id=([^&]+)', url)
- note_id = note_id_match.group(1) if note_id_match else ''
-
- if comments:
- for comment in comments:
- # 添加 note_id 到评论中
- if note_id and 'note_id' not in comment:
- comment['note_id'] = note_id
- captured_comments.append(comment)
-
- print(f"[{self.platform_name}] 捕获到 {len(comments)} 条评论 (note_id={note_id}),总计: {len(captured_comments)}", flush=True)
-
- # 监听笔记列表 API
- if '/note/' in url and ('list' in url or 'posted' in url or 'manager' in url):
- json_data = await response.json()
- if json_data.get('success') or json_data.get('code') == 0:
- data = json_data.get('data', {})
- notes = data.get('notes', []) or data.get('list', [])
- print(f"[{self.platform_name}] 捕获到笔记列表 API: {len(notes)} 个笔记", flush=True)
- for note in notes:
- note_id = note.get('note_id', '') or note.get('id', '')
- if note_id:
- cover_url = ''
- cover = note.get('cover', {})
- if isinstance(cover, dict):
- cover_url = cover.get('url', '') or cover.get('url_default', '')
- elif isinstance(cover, str):
- cover_url = cover
-
- captured_notes[note_id] = {
- 'title': note.get('title', '') or note.get('display_title', ''),
- 'cover': cover_url,
- }
- except Exception as e:
- print(f"[{self.platform_name}] 解析响应失败: {e}", flush=True)
-
- self.page.on('response', handle_response)
- print(f"[{self.platform_name}] 已注册 API 响应监听器", flush=True)
-
- # 访问评论管理页面
- print(f"[{self.platform_name}] 访问评论管理页面...", flush=True)
- await self.page.goto("https://creator.xiaohongshu.com/creator/comment", wait_until="domcontentloaded", timeout=30000)
- await asyncio.sleep(5)
-
- # 检查登录状态
- current_url = self.page.url
- if "login" in current_url:
- raise Exception("Cookie 已过期,请重新登录")
-
- print(f"[{self.platform_name}] 页面加载完成,当前捕获: {len(captured_comments)} 条评论, {len(captured_notes)} 个笔记", flush=True)
-
- # 滚动加载更多评论
- for i in range(5):
- await self.page.evaluate('window.scrollBy(0, 500)')
- await asyncio.sleep(1)
-
- await asyncio.sleep(3)
-
- # 移除监听器
- self.page.remove_listener('response', handle_response)
-
- print(f"[{self.platform_name}] 最终捕获: {len(captured_comments)} 条评论, {len(captured_notes)} 个笔记", flush=True)
-
- # 按作品分组评论
- work_comments_map = {} # note_id -> work_comments
- for comment in captured_comments:
- # 获取笔记信息
- note_info = comment.get('note_info', {}) or comment.get('note', {})
- note_id = comment.get('note_id', '') or note_info.get('note_id', '') or note_info.get('id', '')
-
- if not note_id:
- continue
-
- if note_id not in work_comments_map:
- saved_note = captured_notes.get(note_id, {})
- cover_url = ''
- cover = note_info.get('cover', {})
- if isinstance(cover, dict):
- cover_url = cover.get('url', '') or cover.get('url_default', '')
- elif isinstance(cover, str):
- cover_url = cover
- if not cover_url:
- cover_url = saved_note.get('cover', '')
-
- work_comments_map[note_id] = {
- 'work_id': note_id,
- 'title': note_info.get('title', '') or note_info.get('display_title', '') or saved_note.get('title', ''),
- 'cover_url': cover_url,
- 'comments': []
- }
-
- cid = comment.get('id', '') or comment.get('comment_id', '')
- if not cid:
- continue
-
- user_info = comment.get('user_info', {}) or comment.get('user', {})
-
- work_comments_map[note_id]['comments'].append({
- 'comment_id': cid,
- 'author_id': user_info.get('user_id', '') or user_info.get('id', ''),
- 'author_name': user_info.get('nickname', '') or user_info.get('name', ''),
- 'author_avatar': user_info.get('image', '') or user_info.get('avatar', ''),
- 'content': comment.get('content', ''),
- 'like_count': comment.get('like_count', 0),
- 'create_time': comment.get('create_time', ''),
- })
-
- all_work_comments = list(work_comments_map.values())
- total_comments = sum(len(w['comments']) for w in all_work_comments)
- print(f"[{self.platform_name}] 获取到 {len(all_work_comments)} 个作品的 {total_comments} 条评论", flush=True)
-
- except Exception as e:
- import traceback
- traceback.print_exc()
- return {
- 'success': False,
- 'platform': self.platform_name,
- 'error': str(e),
- 'work_comments': []
- }
- finally:
- await self.close_browser()
-
- return {
- 'success': True,
- 'platform': self.platform_name,
- 'work_comments': all_work_comments,
- 'total': len(all_work_comments)
- }
|