# -*- coding: utf-8 -*- """ 小红书视频发布器 参考: matrix/xhs_uploader/main.py 使用 xhs SDK API 方式发布,更稳定 """ import asyncio import os import sys from pathlib import Path from .base import BasePublisher, PublishParams, PublishResult # 添加 matrix 项目路径,用于导入签名脚本 MATRIX_PATH = Path(__file__).parent.parent.parent.parent / "matrix" sys.path.insert(0, str(MATRIX_PATH)) # 尝试导入 xhs SDK try: from xhs import XhsClient XHS_SDK_AVAILABLE = True except ImportError: print("[Warning] xhs 库未安装,请运行: pip install xhs") XhsClient = None XHS_SDK_AVAILABLE = False # 签名脚本路径 STEALTH_JS_PATH = MATRIX_PATH / "xhs-api" / "js" / "stealth.min.js" class XiaohongshuPublisher(BasePublisher): """ 小红书视频发布器 优先使用 xhs SDK API 方式发布 """ platform_name = "xiaohongshu" login_url = "https://creator.xiaohongshu.com/" publish_url = "https://creator.xiaohongshu.com/publish/publish" cookie_domain = ".xiaohongshu.com" async def get_sign(self, uri: str, data=None, a1: str = "", web_session: str = ""): """获取小红书 API 签名""" from playwright.async_api import async_playwright try: async with async_playwright() as playwright: browser = await playwright.chromium.launch(headless=True) browser_context = await browser.new_context() if STEALTH_JS_PATH.exists(): await browser_context.add_init_script(path=str(STEALTH_JS_PATH)) page = await browser_context.new_page() await page.goto("https://www.xiaohongshu.com") await asyncio.sleep(1) await page.reload() await asyncio.sleep(1) if a1: await browser_context.add_cookies([ {'name': 'a1', 'value': a1, 'domain': ".xiaohongshu.com", 'path': "/"} ]) await page.reload() await asyncio.sleep(0.5) encrypt_params = await page.evaluate( "([url, data]) => window._webmsxyw(url, data)", [uri, data] ) await browser_context.close() await browser.close() return { "x-s": encrypt_params["X-s"], "x-t": str(encrypt_params["X-t"]) } except Exception as e: import traceback traceback.print_exc() raise Exception(f"签名失败: {e}") def sign_sync(self, uri, data=None, a1="", web_session=""): """同步签名函数,供 XhsClient 使用""" return asyncio.run(self.get_sign(uri, data, a1, web_session)) async def publish_via_api(self, cookies: str, params: PublishParams) -> PublishResult: """通过 API 发布视频""" if not XHS_SDK_AVAILABLE: raise Exception("xhs SDK 未安装,请运行: pip install xhs") self.report_progress(10, "正在通过 API 发布...") # 转换 cookie 格式 cookie_list = self.parse_cookies(cookies) cookie_string = self.cookies_to_string(cookie_list) if cookie_list else cookies self.report_progress(20, "正在上传视频...") # 创建客户端 xhs_client = XhsClient(cookie_string, sign=self.sign_sync) # 发布视频 result = xhs_client.create_video_note( title=params.title, desc=params.description or params.title, topics=params.tags or [], post_time=params.publish_date.strftime("%Y-%m-%d %H:%M:%S") if params.publish_date else None, video_path=params.video_path, cover_path=params.cover_path if params.cover_path and os.path.exists(params.cover_path) else None ) self.report_progress(100, "发布成功") return PublishResult( success=True, platform=self.platform_name, video_id=result.get("note_id", ""), video_url=result.get("url", ""), message="发布成功" ) async def publish(self, cookies: str, params: PublishParams) -> PublishResult: """发布视频到小红书""" # 检查视频文件 if not os.path.exists(params.video_path): raise Exception(f"视频文件不存在: {params.video_path}") self.report_progress(5, "正在准备发布...") # 优先使用 API 方式 if XHS_SDK_AVAILABLE: try: return await self.publish_via_api(cookies, params) except Exception as e: print(f"[{self.platform_name}] API 发布失败: {e}") print(f"[{self.platform_name}] 尝试使用 Playwright 方式...") # 回退到 Playwright 方式 return await self.publish_via_playwright(cookies, params) async def publish_via_playwright(self, cookies: str, params: PublishParams) -> PublishResult: """通过 Playwright 发布视频(备用方式)""" self.report_progress(10, "正在初始化浏览器...") await self.init_browser() cookie_list = self.parse_cookies(cookies) await self.set_cookies(cookie_list) if not self.page: raise Exception("Page not initialized") self.report_progress(15, "正在打开发布页面...") await self.page.goto(self.publish_url) await asyncio.sleep(3) # 检查登录状态 if "login" in self.page.url or "passport" in self.page.url: raise Exception("登录已过期,请重新登录") self.report_progress(20, "正在上传视频...") # 尝试点击视频标签 try: video_tab = self.page.locator('div.tab:has-text("上传视频"), span:has-text("上传视频")').first if await video_tab.count() > 0: await video_tab.click() await asyncio.sleep(1) except: pass # 上传视频 upload_triggered = False # 方法1: 点击上传按钮 try: upload_btn = self.page.locator('button:has-text("上传视频")').first if await upload_btn.count() > 0: async with self.page.expect_file_chooser() as fc_info: await upload_btn.click() file_chooser = await fc_info.value await file_chooser.set_files(params.video_path) upload_triggered = True except: pass # 方法2: 直接设置 file input if not upload_triggered: file_input = await self.page.$('input[type="file"]') if file_input: await file_input.set_input_files(params.video_path) upload_triggered = True if not upload_triggered: raise Exception("无法上传视频文件") self.report_progress(40, "等待视频上传完成...") # 等待上传完成 for _ in range(100): await asyncio.sleep(3) # 检查标题输入框是否出现 title_input = await self.page.locator('input[placeholder*="标题"]').count() if title_input > 0: break self.report_progress(60, "正在填写笔记信息...") # 填写标题 title_selectors = [ 'input[placeholder*="标题"]', '[class*="title"] input', ] for selector in title_selectors: title_input = self.page.locator(selector).first if await title_input.count() > 0: await title_input.fill(params.title[:20]) break # 填写描述和标签 if params.description or params.tags: desc_selectors = [ '[class*="content-input"] [contenteditable="true"]', '[class*="editor"] [contenteditable="true"]', ] for selector in desc_selectors: desc_input = self.page.locator(selector).first if await desc_input.count() > 0: await desc_input.click() if params.description: await self.page.keyboard.type(params.description, delay=30) if params.tags: await self.page.keyboard.press("Enter") for tag in params.tags: await self.page.keyboard.type(f"#{tag} ", delay=30) break self.report_progress(80, "正在发布...") await asyncio.sleep(2) # 点击发布 publish_selectors = [ 'button.publishBtn', '.publishBtn', 'button:has-text("发布")', ] for selector in publish_selectors: btn = self.page.locator(selector).first if await btn.count() > 0 and await btn.is_visible(): box = await btn.bounding_box() if box: await self.page.mouse.click(box['x'] + box['width']/2, box['y'] + box['height']/2) break await asyncio.sleep(5) self.report_progress(100, "发布完成") return PublishResult( success=True, platform=self.platform_name, message="发布完成" )