xiaohongshu.py 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465
  1. # -*- coding: utf-8 -*-
  2. """
  3. 小红书视频发布器
  4. 参考: matrix/xhs_uploader/main.py
  5. 使用 xhs SDK API 方式发布,更稳定
  6. """
  7. import asyncio
  8. import os
  9. import sys
  10. from pathlib import Path
  11. from .base import BasePublisher, PublishParams, PublishResult
  12. # 添加 matrix 项目路径,用于导入签名脚本
  13. MATRIX_PATH = Path(__file__).parent.parent.parent.parent / "matrix"
  14. sys.path.insert(0, str(MATRIX_PATH))
  15. # 尝试导入 xhs SDK
  16. try:
  17. from xhs import XhsClient
  18. XHS_SDK_AVAILABLE = True
  19. except ImportError:
  20. print("[Warning] xhs 库未安装,请运行: pip install xhs")
  21. XhsClient = None
  22. XHS_SDK_AVAILABLE = False
  23. # 签名脚本路径
  24. STEALTH_JS_PATH = MATRIX_PATH / "xhs-api" / "js" / "stealth.min.js"
  25. class XiaohongshuPublisher(BasePublisher):
  26. """
  27. 小红书视频发布器
  28. 优先使用 xhs SDK API 方式发布
  29. """
  30. platform_name = "xiaohongshu"
  31. login_url = "https://creator.xiaohongshu.com/"
  32. publish_url = "https://creator.xiaohongshu.com/publish/publish"
  33. cookie_domain = ".xiaohongshu.com"
  34. async def get_sign(self, uri: str, data=None, a1: str = "", web_session: str = ""):
  35. """获取小红书 API 签名"""
  36. from playwright.async_api import async_playwright
  37. try:
  38. async with async_playwright() as playwright:
  39. browser = await playwright.chromium.launch(headless=True)
  40. browser_context = await browser.new_context()
  41. if STEALTH_JS_PATH.exists():
  42. await browser_context.add_init_script(path=str(STEALTH_JS_PATH))
  43. page = await browser_context.new_page()
  44. await page.goto("https://www.xiaohongshu.com")
  45. await asyncio.sleep(1)
  46. await page.reload()
  47. await asyncio.sleep(1)
  48. if a1:
  49. await browser_context.add_cookies([
  50. {'name': 'a1', 'value': a1, 'domain': ".xiaohongshu.com", 'path': "/"}
  51. ])
  52. await page.reload()
  53. await asyncio.sleep(0.5)
  54. encrypt_params = await page.evaluate(
  55. "([url, data]) => window._webmsxyw(url, data)",
  56. [uri, data]
  57. )
  58. await browser_context.close()
  59. await browser.close()
  60. return {
  61. "x-s": encrypt_params["X-s"],
  62. "x-t": str(encrypt_params["X-t"])
  63. }
  64. except Exception as e:
  65. import traceback
  66. traceback.print_exc()
  67. raise Exception(f"签名失败: {e}")
  68. def sign_sync(self, uri, data=None, a1="", web_session=""):
  69. """同步签名函数,供 XhsClient 使用"""
  70. return asyncio.run(self.get_sign(uri, data, a1, web_session))
  71. async def publish_via_api(self, cookies: str, params: PublishParams) -> PublishResult:
  72. """通过 API 发布视频"""
  73. if not XHS_SDK_AVAILABLE:
  74. raise Exception("xhs SDK 未安装,请运行: pip install xhs")
  75. self.report_progress(10, "正在通过 API 发布...")
  76. print(f"[{self.platform_name}] 使用 XHS SDK API 发布...")
  77. print(f"[{self.platform_name}] 视频路径: {params.video_path}")
  78. print(f"[{self.platform_name}] 标题: {params.title}")
  79. # 转换 cookie 格式
  80. cookie_list = self.parse_cookies(cookies)
  81. cookie_string = self.cookies_to_string(cookie_list) if cookie_list else cookies
  82. print(f"[{self.platform_name}] Cookie 长度: {len(cookie_string)}")
  83. self.report_progress(20, "正在上传视频...")
  84. # 创建客户端
  85. xhs_client = XhsClient(cookie_string, sign=self.sign_sync)
  86. print(f"[{self.platform_name}] 开始调用 create_video_note...")
  87. # 发布视频
  88. try:
  89. result = xhs_client.create_video_note(
  90. title=params.title,
  91. desc=params.description or params.title,
  92. topics=params.tags or [],
  93. post_time=params.publish_date.strftime("%Y-%m-%d %H:%M:%S") if params.publish_date else None,
  94. video_path=params.video_path,
  95. cover_path=params.cover_path if params.cover_path and os.path.exists(params.cover_path) else None
  96. )
  97. print(f"[{self.platform_name}] SDK 返回结果: {result}")
  98. except Exception as e:
  99. import traceback
  100. traceback.print_exc()
  101. print(f"[{self.platform_name}] SDK 调用失败: {e}")
  102. raise Exception(f"XHS SDK 发布失败: {e}")
  103. # 验证返回结果
  104. if not result:
  105. raise Exception("XHS SDK 返回空结果")
  106. # 检查是否有错误
  107. if isinstance(result, dict):
  108. if result.get("code") and result.get("code") != 0:
  109. raise Exception(f"发布失败: {result.get('msg', '未知错误')}")
  110. if result.get("success") == False:
  111. raise Exception(f"发布失败: {result.get('msg', result.get('error', '未知错误'))}")
  112. note_id = result.get("note_id", "") if isinstance(result, dict) else ""
  113. video_url = result.get("url", "") if isinstance(result, dict) else ""
  114. if not note_id:
  115. print(f"[{self.platform_name}] 警告: 未获取到 note_id,返回结果: {result}")
  116. self.report_progress(100, "发布成功")
  117. print(f"[{self.platform_name}] 发布成功! note_id={note_id}, url={video_url}")
  118. return PublishResult(
  119. success=True,
  120. platform=self.platform_name,
  121. video_id=note_id,
  122. video_url=video_url,
  123. message="发布成功"
  124. )
  125. async def publish(self, cookies: str, params: PublishParams) -> PublishResult:
  126. """发布视频到小红书"""
  127. print(f"\n{'='*60}")
  128. print(f"[{self.platform_name}] 开始发布视频")
  129. print(f"[{self.platform_name}] 视频路径: {params.video_path}")
  130. print(f"[{self.platform_name}] 标题: {params.title}")
  131. print(f"[{self.platform_name}] XHS SDK 可用: {XHS_SDK_AVAILABLE}")
  132. print(f"{'='*60}")
  133. # 检查视频文件
  134. if not os.path.exists(params.video_path):
  135. raise Exception(f"视频文件不存在: {params.video_path}")
  136. print(f"[{self.platform_name}] 视频文件存在,大小: {os.path.getsize(params.video_path)} bytes")
  137. self.report_progress(5, "正在准备发布...")
  138. # 临时禁用 API 方式,直接使用 Playwright(更稳定)
  139. # TODO: 后续优化 API 方式的返回值验证
  140. # if XHS_SDK_AVAILABLE:
  141. # try:
  142. # result = await self.publish_via_api(cookies, params)
  143. # print(f"[{self.platform_name}] API 发布完成: {result}")
  144. # return result
  145. # except Exception as e:
  146. # import traceback
  147. # traceback.print_exc()
  148. # print(f"[{self.platform_name}] API 发布失败: {e}")
  149. # print(f"[{self.platform_name}] 尝试使用 Playwright 方式...")
  150. # 使用 Playwright 方式发布(更可靠)
  151. print(f"[{self.platform_name}] 使用 Playwright 方式发布...")
  152. return await self.publish_via_playwright(cookies, params)
  153. async def publish_via_playwright(self, cookies: str, params: PublishParams) -> PublishResult:
  154. """通过 Playwright 发布视频"""
  155. self.report_progress(10, "正在初始化浏览器...")
  156. print(f"[{self.platform_name}] Playwright 方式开始...")
  157. await self.init_browser()
  158. cookie_list = self.parse_cookies(cookies)
  159. print(f"[{self.platform_name}] 设置 {len(cookie_list)} 个 cookies")
  160. await self.set_cookies(cookie_list)
  161. if not self.page:
  162. raise Exception("Page not initialized")
  163. self.report_progress(15, "正在打开发布页面...")
  164. # 直接访问视频发布页面
  165. publish_url = "https://creator.xiaohongshu.com/publish/publish?source=official"
  166. print(f"[{self.platform_name}] 打开页面: {publish_url}")
  167. await self.page.goto(publish_url)
  168. await asyncio.sleep(3)
  169. current_url = self.page.url
  170. print(f"[{self.platform_name}] 当前 URL: {current_url}")
  171. # 检查登录状态
  172. if "login" in current_url or "passport" in current_url:
  173. screenshot_path = f"debug_login_required_{self.platform_name}.png"
  174. await self.page.screenshot(path=screenshot_path)
  175. raise Exception(f"登录已过期,请重新登录(截图: {screenshot_path})")
  176. self.report_progress(20, "正在上传视频...")
  177. # 等待页面加载
  178. await asyncio.sleep(2)
  179. # 上传视频
  180. upload_triggered = False
  181. # 方法1: 直接设置隐藏的 file input
  182. print(f"[{self.platform_name}] 尝试方法1: 设置 file input")
  183. file_inputs = self.page.locator('input[type="file"]')
  184. input_count = await file_inputs.count()
  185. print(f"[{self.platform_name}] 找到 {input_count} 个 file input")
  186. if input_count > 0:
  187. # 找到接受视频的 input
  188. for i in range(input_count):
  189. input_el = file_inputs.nth(i)
  190. accept = await input_el.get_attribute('accept') or ''
  191. print(f"[{self.platform_name}] Input {i} accept: {accept}")
  192. if 'video' in accept or '*' in accept or not accept:
  193. await input_el.set_input_files(params.video_path)
  194. upload_triggered = True
  195. print(f"[{self.platform_name}] 视频文件已设置到 input {i}")
  196. break
  197. # 方法2: 点击上传区域触发文件选择器
  198. if not upload_triggered:
  199. print(f"[{self.platform_name}] 尝试方法2: 点击上传区域")
  200. try:
  201. upload_area = self.page.locator('[class*="upload-wrapper"], [class*="upload-area"], .upload-input').first
  202. if await upload_area.count() > 0:
  203. async with self.page.expect_file_chooser(timeout=5000) as fc_info:
  204. await upload_area.click()
  205. file_chooser = await fc_info.value
  206. await file_chooser.set_files(params.video_path)
  207. upload_triggered = True
  208. print(f"[{self.platform_name}] 通过点击上传区域上传成功")
  209. except Exception as e:
  210. print(f"[{self.platform_name}] 方法2失败: {e}")
  211. if not upload_triggered:
  212. screenshot_path = f"debug_upload_failed_{self.platform_name}.png"
  213. await self.page.screenshot(path=screenshot_path)
  214. raise Exception(f"无法上传视频文件(截图: {screenshot_path})")
  215. self.report_progress(40, "等待视频上传完成...")
  216. print(f"[{self.platform_name}] 等待视频上传和处理...")
  217. # 等待上传完成(检测页面变化)
  218. upload_complete = False
  219. for i in range(60): # 最多等待3分钟
  220. await asyncio.sleep(3)
  221. # 检查是否有标题输入框(上传完成后出现)
  222. title_input_count = await self.page.locator('input[placeholder*="标题"], input[placeholder*="填写标题"]').count()
  223. # 或者检查编辑器区域
  224. editor_count = await self.page.locator('[class*="ql-editor"], [contenteditable="true"]').count()
  225. # 检查发布按钮是否可见
  226. publish_btn_count = await self.page.locator('.publishBtn, button:has-text("发布")').count()
  227. print(f"[{self.platform_name}] 检测 {i+1}: 标题框={title_input_count}, 编辑器={editor_count}, 发布按钮={publish_btn_count}")
  228. if title_input_count > 0 or (editor_count > 0 and publish_btn_count > 0):
  229. upload_complete = True
  230. print(f"[{self.platform_name}] 视频上传完成!")
  231. break
  232. if not upload_complete:
  233. screenshot_path = f"debug_upload_timeout_{self.platform_name}.png"
  234. await self.page.screenshot(path=screenshot_path)
  235. raise Exception(f"视频上传超时(截图: {screenshot_path})")
  236. await asyncio.sleep(2)
  237. self.report_progress(60, "正在填写笔记信息...")
  238. print(f"[{self.platform_name}] 填写标题: {params.title[:20]}")
  239. # 填写标题
  240. title_filled = False
  241. title_selectors = [
  242. 'input[placeholder*="标题"]',
  243. 'input[placeholder*="填写标题"]',
  244. '[class*="title"] input',
  245. '.c-input_inner',
  246. ]
  247. for selector in title_selectors:
  248. title_input = self.page.locator(selector).first
  249. if await title_input.count() > 0:
  250. await title_input.click()
  251. await title_input.fill('') # 先清空
  252. await title_input.fill(params.title[:20])
  253. title_filled = True
  254. print(f"[{self.platform_name}] 标题已填写,使用选择器: {selector}")
  255. break
  256. if not title_filled:
  257. print(f"[{self.platform_name}] 警告: 未找到标题输入框")
  258. # 填写描述和标签
  259. if params.description or params.tags:
  260. desc_filled = False
  261. desc_selectors = [
  262. '[class*="ql-editor"]',
  263. '[class*="content-input"] [contenteditable="true"]',
  264. '[class*="editor"] [contenteditable="true"]',
  265. '.ql-editor',
  266. ]
  267. for selector in desc_selectors:
  268. desc_input = self.page.locator(selector).first
  269. if await desc_input.count() > 0:
  270. await desc_input.click()
  271. await asyncio.sleep(0.5)
  272. if params.description:
  273. await self.page.keyboard.type(params.description, delay=20)
  274. print(f"[{self.platform_name}] 描述已填写")
  275. if params.tags:
  276. # 添加标签
  277. await self.page.keyboard.press("Enter")
  278. for tag in params.tags[:5]: # 最多5个标签
  279. await self.page.keyboard.type(f"#{tag}", delay=20)
  280. await asyncio.sleep(0.3)
  281. await self.page.keyboard.press("Space")
  282. print(f"[{self.platform_name}] 标签已填写: {params.tags[:5]}")
  283. desc_filled = True
  284. break
  285. if not desc_filled:
  286. print(f"[{self.platform_name}] 警告: 未找到描述输入框")
  287. await asyncio.sleep(2)
  288. self.report_progress(80, "正在发布...")
  289. await asyncio.sleep(2)
  290. # 滚动到页面底部确保发布按钮可见
  291. await self.page.evaluate("window.scrollTo(0, document.body.scrollHeight)")
  292. await asyncio.sleep(1)
  293. print(f"[{self.platform_name}] 查找发布按钮...")
  294. # 点击发布
  295. publish_selectors = [
  296. 'button.publishBtn',
  297. '.publishBtn',
  298. 'button.d-button.red',
  299. 'button:has-text("发布"):not(:has-text("定时发布"))',
  300. '[class*="publish"][class*="btn"]',
  301. ]
  302. publish_clicked = False
  303. for selector in publish_selectors:
  304. try:
  305. btn = self.page.locator(selector).first
  306. if await btn.count() > 0:
  307. is_visible = await btn.is_visible()
  308. is_enabled = await btn.is_enabled()
  309. print(f"[{self.platform_name}] 按钮 {selector}: visible={is_visible}, enabled={is_enabled}")
  310. if is_visible and is_enabled:
  311. box = await btn.bounding_box()
  312. if box:
  313. print(f"[{self.platform_name}] 点击发布按钮: {selector}, 位置: ({box['x']}, {box['y']})")
  314. # 使用真实鼠标点击
  315. await self.page.mouse.click(box['x'] + box['width']/2, box['y'] + box['height']/2)
  316. publish_clicked = True
  317. break
  318. except Exception as e:
  319. print(f"[{self.platform_name}] 选择器 {selector} 错误: {e}")
  320. if not publish_clicked:
  321. # 保存截图用于调试
  322. screenshot_path = f"debug_publish_failed_{self.platform_name}.png"
  323. await self.page.screenshot(path=screenshot_path, full_page=True)
  324. print(f"[{self.platform_name}] 未找到发布按钮,截图保存到: {screenshot_path}")
  325. # 打印页面 HTML 结构用于调试
  326. buttons = await self.page.query_selector_all('button')
  327. print(f"[{self.platform_name}] 页面上共有 {len(buttons)} 个按钮")
  328. for i, btn in enumerate(buttons[:10]):
  329. text = await btn.text_content() or ''
  330. cls = await btn.get_attribute('class') or ''
  331. print(f" 按钮 {i}: text='{text.strip()[:30]}', class='{cls[:50]}'")
  332. raise Exception("未找到发布按钮")
  333. print(f"[{self.platform_name}] 已点击发布按钮,等待发布完成...")
  334. self.report_progress(90, "等待发布结果...")
  335. # 等待发布完成(检测 URL 变化或成功提示)
  336. publish_success = False
  337. for i in range(20): # 最多等待 20 秒
  338. await asyncio.sleep(1)
  339. current_url = self.page.url
  340. # 检查是否跳转到发布成功页面或内容管理页面
  341. if "published=true" in current_url or "success" in current_url or "content" in current_url:
  342. publish_success = True
  343. print(f"[{self.platform_name}] 发布成功! 跳转到: {current_url}")
  344. break
  345. # 检查是否有成功提示
  346. try:
  347. success_msg = await self.page.locator('[class*="success"], .toast-success, [class*="Toast"]').first.is_visible()
  348. if success_msg:
  349. publish_success = True
  350. print(f"[{self.platform_name}] 检测到成功提示!")
  351. break
  352. except:
  353. pass
  354. # 检查是否有错误提示
  355. try:
  356. error_elements = self.page.locator('[class*="error"], .toast-error, [class*="fail"]')
  357. if await error_elements.count() > 0:
  358. error_text = await error_elements.first.text_content()
  359. if error_text and len(error_text.strip()) > 0:
  360. raise Exception(f"发布失败: {error_text.strip()}")
  361. except Exception as e:
  362. if "发布失败" in str(e):
  363. raise
  364. # 如果没有明确的成功标志,保存截图
  365. if not publish_success:
  366. final_url = self.page.url
  367. print(f"[{self.platform_name}] 发布结果不确定,当前 URL: {final_url}")
  368. screenshot_path = f"debug_publish_result_{self.platform_name}.png"
  369. await self.page.screenshot(path=screenshot_path, full_page=True)
  370. print(f"[{self.platform_name}] 截图保存到: {screenshot_path}")
  371. # 如果 URL 还是发布页面,可能发布失败
  372. if "publish/publish" in final_url:
  373. raise Exception(f"发布可能失败,仍停留在发布页面(截图: {screenshot_path})")
  374. self.report_progress(100, "发布完成")
  375. print(f"[{self.platform_name}] Playwright 方式发布完成!")
  376. return PublishResult(
  377. success=True,
  378. platform=self.platform_name,
  379. message="发布完成"
  380. )