xiaohongshu.py 9.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268
  1. # -*- coding: utf-8 -*-
  2. """
  3. 小红书视频发布器
  4. 参考: matrix/xhs_uploader/main.py
  5. 使用 xhs SDK API 方式发布,更稳定
  6. """
  7. import asyncio
  8. import os
  9. import sys
  10. from pathlib import Path
  11. from .base import BasePublisher, PublishParams, PublishResult
  12. # 添加 matrix 项目路径,用于导入签名脚本
  13. MATRIX_PATH = Path(__file__).parent.parent.parent.parent / "matrix"
  14. sys.path.insert(0, str(MATRIX_PATH))
  15. # 尝试导入 xhs SDK
  16. try:
  17. from xhs import XhsClient
  18. XHS_SDK_AVAILABLE = True
  19. except ImportError:
  20. print("[Warning] xhs 库未安装,请运行: pip install xhs")
  21. XhsClient = None
  22. XHS_SDK_AVAILABLE = False
  23. # 签名脚本路径
  24. STEALTH_JS_PATH = MATRIX_PATH / "xhs-api" / "js" / "stealth.min.js"
  25. class XiaohongshuPublisher(BasePublisher):
  26. """
  27. 小红书视频发布器
  28. 优先使用 xhs SDK API 方式发布
  29. """
  30. platform_name = "xiaohongshu"
  31. login_url = "https://creator.xiaohongshu.com/"
  32. publish_url = "https://creator.xiaohongshu.com/publish/publish"
  33. cookie_domain = ".xiaohongshu.com"
  34. async def get_sign(self, uri: str, data=None, a1: str = "", web_session: str = ""):
  35. """获取小红书 API 签名"""
  36. from playwright.async_api import async_playwright
  37. try:
  38. async with async_playwright() as playwright:
  39. browser = await playwright.chromium.launch(headless=True)
  40. browser_context = await browser.new_context()
  41. if STEALTH_JS_PATH.exists():
  42. await browser_context.add_init_script(path=str(STEALTH_JS_PATH))
  43. page = await browser_context.new_page()
  44. await page.goto("https://www.xiaohongshu.com")
  45. await asyncio.sleep(1)
  46. await page.reload()
  47. await asyncio.sleep(1)
  48. if a1:
  49. await browser_context.add_cookies([
  50. {'name': 'a1', 'value': a1, 'domain': ".xiaohongshu.com", 'path': "/"}
  51. ])
  52. await page.reload()
  53. await asyncio.sleep(0.5)
  54. encrypt_params = await page.evaluate(
  55. "([url, data]) => window._webmsxyw(url, data)",
  56. [uri, data]
  57. )
  58. await browser_context.close()
  59. await browser.close()
  60. return {
  61. "x-s": encrypt_params["X-s"],
  62. "x-t": str(encrypt_params["X-t"])
  63. }
  64. except Exception as e:
  65. import traceback
  66. traceback.print_exc()
  67. raise Exception(f"签名失败: {e}")
  68. def sign_sync(self, uri, data=None, a1="", web_session=""):
  69. """同步签名函数,供 XhsClient 使用"""
  70. return asyncio.run(self.get_sign(uri, data, a1, web_session))
  71. async def publish_via_api(self, cookies: str, params: PublishParams) -> PublishResult:
  72. """通过 API 发布视频"""
  73. if not XHS_SDK_AVAILABLE:
  74. raise Exception("xhs SDK 未安装,请运行: pip install xhs")
  75. self.report_progress(10, "正在通过 API 发布...")
  76. # 转换 cookie 格式
  77. cookie_list = self.parse_cookies(cookies)
  78. cookie_string = self.cookies_to_string(cookie_list) if cookie_list else cookies
  79. self.report_progress(20, "正在上传视频...")
  80. # 创建客户端
  81. xhs_client = XhsClient(cookie_string, sign=self.sign_sync)
  82. # 发布视频
  83. result = xhs_client.create_video_note(
  84. title=params.title,
  85. desc=params.description or params.title,
  86. topics=params.tags or [],
  87. post_time=params.publish_date.strftime("%Y-%m-%d %H:%M:%S") if params.publish_date else None,
  88. video_path=params.video_path,
  89. cover_path=params.cover_path if params.cover_path and os.path.exists(params.cover_path) else None
  90. )
  91. self.report_progress(100, "发布成功")
  92. return PublishResult(
  93. success=True,
  94. platform=self.platform_name,
  95. video_id=result.get("note_id", ""),
  96. video_url=result.get("url", ""),
  97. message="发布成功"
  98. )
  99. async def publish(self, cookies: str, params: PublishParams) -> PublishResult:
  100. """发布视频到小红书"""
  101. # 检查视频文件
  102. if not os.path.exists(params.video_path):
  103. raise Exception(f"视频文件不存在: {params.video_path}")
  104. self.report_progress(5, "正在准备发布...")
  105. # 优先使用 API 方式
  106. if XHS_SDK_AVAILABLE:
  107. try:
  108. return await self.publish_via_api(cookies, params)
  109. except Exception as e:
  110. print(f"[{self.platform_name}] API 发布失败: {e}")
  111. print(f"[{self.platform_name}] 尝试使用 Playwright 方式...")
  112. # 回退到 Playwright 方式
  113. return await self.publish_via_playwright(cookies, params)
  114. async def publish_via_playwright(self, cookies: str, params: PublishParams) -> PublishResult:
  115. """通过 Playwright 发布视频(备用方式)"""
  116. self.report_progress(10, "正在初始化浏览器...")
  117. await self.init_browser()
  118. cookie_list = self.parse_cookies(cookies)
  119. await self.set_cookies(cookie_list)
  120. if not self.page:
  121. raise Exception("Page not initialized")
  122. self.report_progress(15, "正在打开发布页面...")
  123. await self.page.goto(self.publish_url)
  124. await asyncio.sleep(3)
  125. # 检查登录状态
  126. if "login" in self.page.url or "passport" in self.page.url:
  127. raise Exception("登录已过期,请重新登录")
  128. self.report_progress(20, "正在上传视频...")
  129. # 尝试点击视频标签
  130. try:
  131. video_tab = self.page.locator('div.tab:has-text("上传视频"), span:has-text("上传视频")').first
  132. if await video_tab.count() > 0:
  133. await video_tab.click()
  134. await asyncio.sleep(1)
  135. except:
  136. pass
  137. # 上传视频
  138. upload_triggered = False
  139. # 方法1: 点击上传按钮
  140. try:
  141. upload_btn = self.page.locator('button:has-text("上传视频")').first
  142. if await upload_btn.count() > 0:
  143. async with self.page.expect_file_chooser() as fc_info:
  144. await upload_btn.click()
  145. file_chooser = await fc_info.value
  146. await file_chooser.set_files(params.video_path)
  147. upload_triggered = True
  148. except:
  149. pass
  150. # 方法2: 直接设置 file input
  151. if not upload_triggered:
  152. file_input = await self.page.$('input[type="file"]')
  153. if file_input:
  154. await file_input.set_input_files(params.video_path)
  155. upload_triggered = True
  156. if not upload_triggered:
  157. raise Exception("无法上传视频文件")
  158. self.report_progress(40, "等待视频上传完成...")
  159. # 等待上传完成
  160. for _ in range(100):
  161. await asyncio.sleep(3)
  162. # 检查标题输入框是否出现
  163. title_input = await self.page.locator('input[placeholder*="标题"]').count()
  164. if title_input > 0:
  165. break
  166. self.report_progress(60, "正在填写笔记信息...")
  167. # 填写标题
  168. title_selectors = [
  169. 'input[placeholder*="标题"]',
  170. '[class*="title"] input',
  171. ]
  172. for selector in title_selectors:
  173. title_input = self.page.locator(selector).first
  174. if await title_input.count() > 0:
  175. await title_input.fill(params.title[:20])
  176. break
  177. # 填写描述和标签
  178. if params.description or params.tags:
  179. desc_selectors = [
  180. '[class*="content-input"] [contenteditable="true"]',
  181. '[class*="editor"] [contenteditable="true"]',
  182. ]
  183. for selector in desc_selectors:
  184. desc_input = self.page.locator(selector).first
  185. if await desc_input.count() > 0:
  186. await desc_input.click()
  187. if params.description:
  188. await self.page.keyboard.type(params.description, delay=30)
  189. if params.tags:
  190. await self.page.keyboard.press("Enter")
  191. for tag in params.tags:
  192. await self.page.keyboard.type(f"#{tag} ", delay=30)
  193. break
  194. self.report_progress(80, "正在发布...")
  195. await asyncio.sleep(2)
  196. # 点击发布
  197. publish_selectors = [
  198. 'button.publishBtn',
  199. '.publishBtn',
  200. 'button:has-text("发布")',
  201. ]
  202. for selector in publish_selectors:
  203. btn = self.page.locator(selector).first
  204. if await btn.count() > 0 and await btn.is_visible():
  205. box = await btn.bounding_box()
  206. if box:
  207. await self.page.mouse.click(box['x'] + box['width']/2, box['y'] + box['height']/2)
  208. break
  209. await asyncio.sleep(5)
  210. self.report_progress(100, "发布完成")
  211. return PublishResult(
  212. success=True,
  213. platform=self.platform_name,
  214. message="发布完成"
  215. )