# -*- coding: utf-8 -*- """ 小红书视频发布器 参考: matrix/xhs_uploader/main.py 使用 xhs SDK API 方式发布,更稳定 """ import asyncio import os import sys import time import concurrent.futures from pathlib import Path from typing import List from .base import ( BasePublisher, PublishParams, PublishResult, WorkItem, WorksResult, CommentItem, CommentsResult, ) from playwright.async_api import async_playwright stored_cookies = None # 添加 matrix 项目路径,用于导入签名脚本 MATRIX_PATH = Path(__file__).parent.parent.parent.parent / "matrix" sys.path.insert(0, str(MATRIX_PATH)) # 尝试导入 xhs SDK try: from xhs import XhsClient XHS_SDK_AVAILABLE = True except ImportError: print("[Warning] xhs 库未安装,请运行: pip install xhs") XhsClient = None XHS_SDK_AVAILABLE = False # 签名脚本路径 STEALTH_JS_PATH = MATRIX_PATH / "xhs-api" / "js" / "stealth.min.js" _xhs_sign_executor = concurrent.futures.ThreadPoolExecutor(max_workers=1) class XiaohongshuPublisher(BasePublisher): """ 小红书视频发布器 优先使用 xhs SDK API 方式发布 """ platform_name = "xiaohongshu" login_url = "https://creator.xiaohongshu.com/" publish_url = "https://creator.xiaohongshu.com/publish/publish" cookie_domain = ".xiaohongshu.com" async def get_sign(self, uri: str, data=None, a1: str = "", web_session: str = ""): """获取小红书 API 签名""" from playwright.async_api import async_playwright try: async with async_playwright() as playwright: try: browser = await playwright.chromium.launch(headless=True, channel="chrome") except Exception: browser = await playwright.chromium.launch(headless=True) browser_context = await browser.new_context() if STEALTH_JS_PATH.exists(): await browser_context.add_init_script(path=str(STEALTH_JS_PATH)) page = await browser_context.new_page() await page.goto("https://www.xiaohongshu.com") await asyncio.sleep(1) await page.reload() await asyncio.sleep(1) if a1: await browser_context.add_cookies( [ { "name": "a1", "value": a1, "domain": ".xiaohongshu.com", "path": "/", } ] ) await page.reload() await asyncio.sleep(0.5) encrypt_params = await page.evaluate( "([url, data]) => window._webmsxyw(url, data)", [uri, data] ) await browser_context.close() await browser.close() return {"x-s": encrypt_params["X-s"], "x-t": str(encrypt_params["X-t"])} except Exception as e: import traceback traceback.print_exc() raise Exception(f"签名失败: {e}") def sign_sync(self, uri, data=None, a1="", web_session=""): """ 同步签名函数,供 XhsClient 使用。 注意:发布流程运行在 asyncio 事件循环中(通过 asyncio.run 启动)。 XhsClient 以同步方式调用 sign 回调,但我们需要使用 Playwright Async API 进行签名。 因此当处于事件循环中时,将签名逻辑放到独立线程里执行 asyncio.run。 """ def run_async_sign(): return asyncio.run( self.get_sign(uri, data=data, a1=a1, web_session=web_session) ) try: asyncio.get_running_loop() future = _xhs_sign_executor.submit(run_async_sign) return future.result(timeout=120) except RuntimeError: return run_async_sign() async def publish_via_api( self, cookies: str, params: PublishParams ) -> PublishResult: """通过 API 发布视频""" if not XHS_SDK_AVAILABLE: raise Exception("xhs SDK 未安装,请运行: pip install xhs") self.report_progress(10, "正在通过 API 发布...") print(f"[{self.platform_name}] 使用 XHS SDK API 发布...") print(f"[{self.platform_name}] 视频路径: {params.video_path}") print(f"[{self.platform_name}] 标题: {params.title}") # 转换 cookie 格式 cookie_list = self.parse_cookies(cookies) cookie_string = self.cookies_to_string(cookie_list) if cookie_list else cookies print(f"[{self.platform_name}] Cookie 长度: {len(cookie_string)}") self.report_progress(20, "正在上传视频...") async def ensure_valid_cookie_for_sdk() -> str | None: await self.init_browser() cookie_list_for_browser = self.parse_cookies(cookie_string) await self.set_cookies(cookie_list_for_browser) if not self.page or not self.context: return None await self.page.goto( "https://creator.xiaohongshu.com/new/home", wait_until="domcontentloaded", timeout=60000, ) await asyncio.sleep(2) current_url = (self.page.url or "").lower() if "login" in current_url or "passport" in current_url: if self.headless: return None waited = 0 while waited < 180: current_url = (self.page.url or "").lower() if ( "login" not in current_url and "passport" not in current_url and "creator.xiaohongshu.com" in current_url ): break await asyncio.sleep(2) waited += 2 current_url = (self.page.url or "").lower() if "login" in current_url or "passport" in current_url: return None cookies_after = await self.context.cookies() try: await self.sync_cookies_to_node(cookies_after) except Exception: pass refreshed_cookie_str = self.cookies_to_string(cookies_after) return refreshed_cookie_str or None def call_create_video_note(sdk_cookie_str: str): xhs_client = XhsClient(sdk_cookie_str, sign=self.sign_sync) return xhs_client.create_video_note( title=params.title, desc=params.description or params.title, topics=params.tags or [], post_time=params.publish_date.strftime("%Y-%m-%d %H:%M:%S") if params.publish_date else None, video_path=params.video_path, cover_path=params.cover_path if params.cover_path and os.path.exists(params.cover_path) else None, ) print(f"[{self.platform_name}] 开始调用 create_video_note...") try: result = call_create_video_note(cookie_string) print(f"[{self.platform_name}] SDK 返回结果: {result}") except Exception as e: err_text = str(e) if ( "无登录信息" in err_text or '"code": -100' in err_text or "'code': -100" in err_text ): self.report_progress(15, "登录信息失效,尝试刷新登录信息...") refreshed = await ensure_valid_cookie_for_sdk() if not refreshed: screenshot_base64 = await self.capture_screenshot() page_url = ( await self.get_page_url() if hasattr(self, "get_page_url") else (self.page.url if self.page else "") ) return PublishResult( success=False, platform=self.platform_name, error="登录已过期,请使用有头浏览器重新登录", screenshot_base64=screenshot_base64, page_url=page_url, status="need_captcha", need_captcha=True, captcha_type="login", ) try: result = call_create_video_note(refreshed) print(f"[{self.platform_name}] SDK 重试返回结果: {result}") except Exception as e2: import traceback traceback.print_exc() raise Exception(f"XHS SDK 发布失败: {e2}") else: import traceback traceback.print_exc() print(f"[{self.platform_name}] SDK 调用失败: {e}") raise Exception(f"XHS SDK 发布失败: {e}") # 验证返回结果 if not result: raise Exception("XHS SDK 返回空结果") # 检查是否有错误 if isinstance(result, dict): if result.get("code") and result.get("code") != 0: raise Exception(f"发布失败: {result.get('msg', '未知错误')}") if result.get("success") == False: raise Exception( f"发布失败: {result.get('msg', result.get('error', '未知错误'))}" ) note_id = result.get("note_id", "") if isinstance(result, dict) else "" video_url = result.get("url", "") if isinstance(result, dict) else "" if not note_id: print(f"[{self.platform_name}] 警告: 未获取到 note_id,返回结果: {result}") self.report_progress(100, "发布成功") print(f"[{self.platform_name}] 发布成功! note_id={note_id}, url={video_url}") return PublishResult( success=True, platform=self.platform_name, video_id=note_id, video_url=video_url, message="发布成功", ) async def publish(self, cookies: str, params: PublishParams) -> PublishResult: """发布视频到小红书 - 参考 matrix/xhs_uploader/main.py""" print(f"\n{'=' * 60}") print(f"[{self.platform_name}] 开始发布视频") print(f"[{self.platform_name}] 视频路径: {params.video_path}") print(f"[{self.platform_name}] 标题: {params.title}") print(f"[{self.platform_name}] Headless: {self.headless}") print(f"[{self.platform_name}] XHS SDK 可用: {XHS_SDK_AVAILABLE}") print(f"{'=' * 60}") # 检查视频文件 if not os.path.exists(params.video_path): raise Exception(f"视频文件不存在: {params.video_path}") print( f"[{self.platform_name}] 视频文件存在,大小: {os.path.getsize(params.video_path)} bytes" ) self.report_progress(5, "正在准备发布...") if isinstance( getattr(self, "proxy_config", None), dict ) and self.proxy_config.get("server"): print( f"[{self.platform_name}] 检测到代理配置,跳过 SDK 方式,使用 Playwright 走代理发布", flush=True, ) return await self.publish_via_playwright(cookies, params) # 参考 matrix: 优先使用 XHS SDK API 方式发布(更稳定) if XHS_SDK_AVAILABLE: try: print(f"[{self.platform_name}] 尝试使用 XHS SDK API 发布...") result = await self.publish_via_api(cookies, params) print(f"[{self.platform_name}] API 发布完成: success={result.success}") # 如果 API 返回成功,直接返回 if result.success: return result # 如果 API 返回失败但有具体错误,也返回 if result.error and "请刷新" not in result.error: return result # 其他情况尝试 Playwright 方式 print(f"[{self.platform_name}] API 方式未成功,尝试 Playwright...") except Exception as e: err_text = str(e) if "登录已过期" in err_text or "无登录信息" in err_text: print( f"[{self.platform_name}] API 登录失效,切换到 Playwright 方式...", flush=True, ) else: import traceback traceback.print_exc() print(f"[{self.platform_name}] API 发布失败: {e}") print(f"[{self.platform_name}] 尝试使用 Playwright 方式...") # 使用 Playwright 方式发布 print(f"[{self.platform_name}] 使用 Playwright 方式发布...") return await self.publish_via_playwright(cookies, params) async def publish_via_playwright( self, cookies: str, params: PublishParams ) -> PublishResult: """通过 Playwright 发布视频""" self.report_progress(10, "正在初始化浏览器...") print(f"[{self.platform_name}] Playwright 方式开始...") await self.init_browser() cookie_list = self.parse_cookies(cookies) print(f"[{self.platform_name}] 设置 {len(cookie_list)} 个 cookies") await self.set_cookies(cookie_list) if not self.page: raise Exception("Page not initialized") self.report_progress(15, "正在打开发布页面...") # 直接访问视频发布页面 publish_url = "https://creator.xiaohongshu.com/publish/publish?source=official" print(f"[{self.platform_name}] 打开页面: {publish_url}") await self.page.goto(publish_url) await asyncio.sleep(3) current_url = self.page.url print(f"[{self.platform_name}] 当前 URL: {current_url}") async def wait_for_manual_login(timeout_seconds: int = 300) -> bool: if not self.page: return False self.report_progress(12, "检测到需要登录,请在浏览器窗口完成登录...") try: await self.page.bring_to_front() except: pass waited = 0 while waited < timeout_seconds: try: url = self.page.url if ( "login" not in url and "passport" not in url and "creator.xiaohongshu.com" in url ): return True await asyncio.sleep(2) waited += 2 except: await asyncio.sleep(2) waited += 2 return False async def wait_for_manual_captcha(timeout_seconds: int = 180) -> bool: waited = 0 while waited < timeout_seconds: try: ai_captcha = await self.ai_check_captcha() if not ai_captcha.get("has_captcha"): return True except: pass await asyncio.sleep(3) waited += 3 return False # 检查登录状态 if "login" in current_url or "passport" in current_url: if not self.headless: logged_in = await wait_for_manual_login() if logged_in: try: if self.context: cookies_after = await self.context.cookies() await self.sync_cookies_to_node(cookies_after) except: pass await self.page.goto(publish_url) await asyncio.sleep(3) current_url = self.page.url else: screenshot_base64 = await self.capture_screenshot() return PublishResult( success=False, platform=self.platform_name, error="需要登录:请在浏览器窗口完成登录后重试", screenshot_base64=screenshot_base64, page_url=current_url, status="need_captcha", need_captcha=True, captcha_type="login", ) else: screenshot_base64 = await self.capture_screenshot() return PublishResult( success=False, platform=self.platform_name, error="登录已过期,请重新登录", screenshot_base64=screenshot_base64, page_url=current_url, status="need_captcha", need_captcha=True, captcha_type="login", ) # 使用 AI 检查验证码 ai_captcha = await self.ai_check_captcha() if ai_captcha["has_captcha"]: print( f"[{self.platform_name}] AI检测到验证码: {ai_captcha['captcha_type']}", flush=True, ) if not self.headless: solved = await wait_for_manual_captcha() if solved: try: if self.context: cookies_after = await self.context.cookies() await self.sync_cookies_to_node(cookies_after) except: pass else: screenshot_base64 = await self.capture_screenshot() return PublishResult( success=False, platform=self.platform_name, error=f"需要验证码:请在浏览器窗口完成验证后重试", screenshot_base64=screenshot_base64, page_url=current_url, status="need_captcha", need_captcha=True, captcha_type=ai_captcha["captcha_type"], ) else: screenshot_base64 = await self.capture_screenshot() return PublishResult( success=False, platform=self.platform_name, error=f"检测到{ai_captcha['captcha_type']}验证码,需要使用有头浏览器完成验证", screenshot_base64=screenshot_base64, page_url=current_url, status="need_captcha", need_captcha=True, captcha_type=ai_captcha["captcha_type"], ) self.report_progress(20, "正在上传视频...") # 验证视频文件存在 if not os.path.isfile(params.video_path): return PublishResult( success=False, platform=self.platform_name, error=f"视频文件不存在: {params.video_path}", status="failed", ) file_size_mb = os.path.getsize(params.video_path) / (1024 * 1024) print(f"[{self.platform_name}] 视频文件: {params.video_path} ({file_size_mb:.1f} MB)") # 等待页面加载 await asyncio.sleep(2) # 打印页面结构辅助调试 try: file_inputs = self.page.locator('input[type="file"]') input_count = await file_inputs.count() print(f"[{self.platform_name}] 页面 file input 数量: {input_count}") for i in range(input_count): accept = await file_inputs.nth(i).get_attribute("accept") or "" print(f"[{self.platform_name}] input[{i}] accept='{accept}'") all_btns = self.page.locator("button") btn_count = await all_btns.count() print(f"[{self.platform_name}] 页面 button 数量: {btn_count}") for i in range(min(btn_count, 15)): text = (await all_btns.nth(i).text_content() or "").strip() vis = await all_btns.nth(i).is_visible() if text: print(f"[{self.platform_name}] button[{i}] text='{text[:30]}' visible={vis}") except Exception as e: print(f"[{self.platform_name}] 页面结构诊断失败: {e}") upload_triggered = False # 辅助函数:点击元素并通过 file_chooser 上传 async def _click_and_upload(locator, label: str) -> bool: try: if await locator.count() == 0 or not await locator.is_visible(): return False print(f"[{self.platform_name}] 尝试点击: {label}") async with self.page.expect_file_chooser(timeout=10000) as fc_info: await locator.click() file_chooser = await fc_info.value await file_chooser.set_files(params.video_path) print(f"[{self.platform_name}] 通过 {label} 上传成功") return True except Exception as e: print(f"[{self.platform_name}] {label} 失败: {e}") return False # 方法1: 点击页面上的"上传视频"红色按钮或上传区域(最可靠) print(f"[{self.platform_name}] 方法1: 点击上传按钮/区域") click_selectors = [ # 红色"上传视频"按钮(排除顶部 tab) ('button:has-text("上传视频")', "上传视频 button"), ('[class*="upload-btn"]:has-text("上传")', "upload-btn"), ('[class*="btn"]:has-text("上传视频")', "btn 上传视频"), # 上传拖拽区域 ('[class*="drag"]', "拖拽区域"), ('[class*="upload-area"]', "upload-area"), ('[class*="upload-wrapper"]', "upload-wrapper"), ('[class*="upload-container"]', "upload-container"), ] for sel, label in click_selectors: el = self.page.locator(sel).first if await _click_and_upload(el, f"方法1-{label}"): upload_triggered = True break # 方法2: 遍历所有按钮,精确匹配文本"上传视频" if not upload_triggered: print(f"[{self.platform_name}] 方法2: 遍历按钮匹配文本") try: all_btns = self.page.locator("button") count = await all_btns.count() for i in range(count): btn = all_btns.nth(i) try: text = (await btn.text_content() or "").strip() if "上传视频" in text and await btn.is_visible(): if await _click_and_upload(btn, f"方法2-button[{i}]({text})"): upload_triggered = True break except: continue except Exception as e: print(f"[{self.platform_name}] 方法2失败: {e}") # 方法3: 直接 set_input_files(某些页面有效) if not upload_triggered: print(f"[{self.platform_name}] 方法3: 直接 set_input_files") try: file_inputs = self.page.locator('input[type="file"]') input_count = await file_inputs.count() for i in range(input_count): input_el = file_inputs.nth(i) accept = await input_el.get_attribute("accept") or "" if "video" in accept or "*" in accept or not accept: await input_el.set_input_files(params.video_path) upload_triggered = True print(f"[{self.platform_name}] set_input_files 到 input[{i}] accept='{accept}'") break except Exception as e: print(f"[{self.platform_name}] 方法3失败: {e}") # 方法4: AI 识别上传按钮 if not upload_triggered: print(f"[{self.platform_name}] 方法4: AI 识别上传按钮") try: suggest = await self.ai_suggest_playwright_selector( "点击小红书视频上传页面中间区域的红色'上传视频'按钮(不是顶部的'上传视频'标签页),该按钮用于打开文件选择器选择视频文件" ) if suggest.get("has_selector") and suggest.get("selector"): sel = suggest["selector"] print(f"[{self.platform_name}] AI 建议选择器: {sel} (置信度: {suggest.get('confidence')})") el = self.page.locator(sel).first if await _click_and_upload(el, f"方法4-AI({sel})"): upload_triggered = True except Exception as e: print(f"[{self.platform_name}] 方法4失败: {e}") if not upload_triggered: screenshot_base64 = await self.capture_screenshot() page_url = await self.get_page_url() return PublishResult( success=False, platform=self.platform_name, error="无法上传视频文件", screenshot_base64=screenshot_base64, page_url=page_url, status="need_action", ) self.report_progress(40, "等待视频上传完成...") print(f"[{self.platform_name}] 等待视频上传和处理...") # 辅助:检查页面是否处于上传选择页(即跳回了) async def _is_on_upload_page() -> bool: """检测页面是否回到了视频选择/上传页面""" try: drag_area = await self.page.locator('[class*="drag"], [class*="upload-area"], [class*="upload-wrapper"]').count() upload_btn_visible = False for sel in ['span.d-text:has-text("上传视频")', 'button:has-text("上传视频")']: loc = self.page.locator(sel).first if await loc.count() > 0: try: if await loc.is_visible(): upload_btn_visible = True break except: pass title_count = await self.page.locator('input[placeholder*="标题"], input[placeholder*="填写标题"]').count() if (drag_area > 0 or upload_btn_visible) and title_count == 0: return True except: pass return False # 辅助:抓取页面上的 toast / 错误提示 async def _get_page_error() -> str: """尝试获取页面上的错误提示文本""" error_selectors = [ '.toast', '.el-message', '[class*="toast"]', '[class*="Toast"]', '[class*="message"]', '[class*="Message"]', '[class*="notice"]', '[class*="error"]', '[class*="Error"]', '[class*="alert"]', '[class*="tip"]', '[class*="warn"]', '.ant-message', '[role="alert"]', '[role="status"]', ] errors = [] for sel in error_selectors: try: els = self.page.locator(sel) count = await els.count() for j in range(min(count, 5)): el = els.nth(j) if await el.is_visible(): text = (await el.text_content() or "").strip() if text and len(text) < 200: errors.append(text) except: continue return "; ".join(errors[:3]) if errors else "" # 辅助:执行一次上传操作(用于重试) async def _do_upload() -> bool: """触发文件上传,返回是否成功触发""" for sel, label in [ ('[class*="drag"]', "drag-area"), ('span.d-text:has-text("上传视频")', "span上传视频"), ('button:has-text("上传视频")', "btn上传视频"), ]: try: el = self.page.locator(sel).first if await el.count() > 0 and await el.is_visible(): async with self.page.expect_file_chooser(timeout=8000) as fc_info: await el.click() fc = await fc_info.value await fc.set_files(params.video_path) print(f"[{self.platform_name}] 重试上传成功: {label}") return True except Exception as e: print(f"[{self.platform_name}] 重试上传-{label}失败: {e}") # set_input_files 兜底 try: file_inputs = self.page.locator('input[type="file"]') for k in range(await file_inputs.count()): inp = file_inputs.nth(k) accept = await inp.get_attribute("accept") or "" if "video" in accept or "*" in accept or not accept: await inp.set_input_files(params.video_path) print(f"[{self.platform_name}] 重试 set_input_files 成功") return True except: pass return False upload_complete = False redirect_retry_count = 0 max_redirect_retries = 2 for i in range(90): # 最多等待 ~4.5 分钟 await asyncio.sleep(3) current_url = self.page.url # 检测是否跳转到登录页 if "login" in current_url or "passport" in current_url: print(f"[{self.platform_name}] ⚠ 检测到跳转到登录页: {current_url}") screenshot_base64 = await self.capture_screenshot() return PublishResult( success=False, platform=self.platform_name, error="上传过程中登录态失效,请重新登录后再试", screenshot_base64=screenshot_base64, page_url=current_url, status="need_captcha", need_captcha=True, captcha_type="login", ) # 检测是否跳回了上传选择页面 if await _is_on_upload_page(): error_msg = await _get_page_error() redirect_retry_count += 1 print( f"[{self.platform_name}] ⚠ 页面跳回上传选择页 (第 {redirect_retry_count} 次), " f"URL={current_url}, 页面错误: {error_msg or '无'}" ) if redirect_retry_count > max_redirect_retries: screenshot_base64 = await self.capture_screenshot() page_url = await self.get_page_url() return PublishResult( success=False, platform=self.platform_name, error=f"视频上传失败(页面反复跳回上传页){': ' + error_msg if error_msg else ',可能是登录过期或视频格式不支持'}", screenshot_base64=screenshot_base64, page_url=page_url, status="need_action", ) # 等一下再重试上传 print(f"[{self.platform_name}] 等待 3 秒后重试上传...") await asyncio.sleep(3) retry_ok = await _do_upload() if not retry_ok: screenshot_base64 = await self.capture_screenshot() page_url = await self.get_page_url() return PublishResult( success=False, platform=self.platform_name, error=f"重试上传失败{': ' + error_msg if error_msg else ''}", screenshot_base64=screenshot_base64, page_url=page_url, status="need_action", ) continue # 检查是否有标题输入框(上传完成后出现) title_input_count = await self.page.locator( 'input[placeholder*="标题"], input[placeholder*="填写标题"]' ).count() editor_count = await self.page.locator( '[class*="ql-editor"], [contenteditable="true"]' ).count() publish_btn_count = await self.page.locator( '.publishBtn, button:has-text("发布")' ).count() if i % 5 == 0 or title_input_count > 0 or editor_count > 0: print( f"[{self.platform_name}] 检测 {i + 1}: 标题框={title_input_count}, " f"编辑器={editor_count}, 发布按钮={publish_btn_count}, URL={current_url}" ) if title_input_count > 0 or (editor_count > 0 and publish_btn_count > 0): upload_complete = True print(f"[{self.platform_name}] 视频上传完成!") break # 每 30 秒抓一次页面错误提示 if i > 0 and i % 10 == 0: err = await _get_page_error() if err: print(f"[{self.platform_name}] 页面提示: {err}") if not upload_complete: screenshot_base64 = await self.capture_screenshot() page_url = await self.get_page_url() error_msg = await _get_page_error() return PublishResult( success=False, platform=self.platform_name, error=f"视频上传超时{': ' + error_msg if error_msg else ''}", screenshot_base64=screenshot_base64, page_url=page_url, status="need_action", ) await asyncio.sleep(2) self.report_progress(60, "正在填写笔记信息...") print(f"[{self.platform_name}] 填写标题: {params.title[:20]}") # 填写标题 title_filled = False title_selectors = [ 'input[placeholder*="标题"]', 'input[placeholder*="填写标题"]', '[class*="title"] input', ".c-input_inner", ] for selector in title_selectors: title_input = self.page.locator(selector).first if await title_input.count() > 0: await title_input.click() await title_input.fill("") # 先清空 await title_input.fill(params.title[:20]) title_filled = True print(f"[{self.platform_name}] 标题已填写,使用选择器: {selector}") break if not title_filled: print(f"[{self.platform_name}] 警告: 未找到标题输入框") # 填写描述和标签 if params.description or params.tags: desc_filled = False desc_selectors = [ '[class*="ql-editor"]', '[class*="content-input"] [contenteditable="true"]', '[class*="editor"] [contenteditable="true"]', ".ql-editor", ] for selector in desc_selectors: desc_input = self.page.locator(selector).first if await desc_input.count() > 0: await desc_input.click() await asyncio.sleep(0.5) if params.description: await self.page.keyboard.type(params.description, delay=20) print(f"[{self.platform_name}] 描述已填写") if params.tags: # 添加标签 await self.page.keyboard.press("Enter") for tag in params.tags[:5]: # 最多5个标签 await self.page.keyboard.type(f"#{tag}", delay=20) await asyncio.sleep(0.3) await self.page.keyboard.press("Space") print(f"[{self.platform_name}] 标签已填写: {params.tags[:5]}") desc_filled = True break if not desc_filled: print(f"[{self.platform_name}] 警告: 未找到描述输入框") await asyncio.sleep(2) self.report_progress(80, "正在发布...") # ========== 等待视频处理完成 ========== # 小红书上传视频后需要转码处理,发布按钮在处理完前可能是禁用状态 print(f"[{self.platform_name}] 等待视频处理完成...") for wait_i in range(40): # 最多等待 ~2 分钟 await asyncio.sleep(3) # 检查是否有 "处理中"、"上传中"、进度条等 processing_indicators = [ '[class*="progress"]', '[class*="loading"]', '[class*="uploading"]', ':text("上传中")', ':text("处理中")', ':text("转码")', ] still_processing = False for ind_sel in processing_indicators: try: loc = self.page.locator(ind_sel).first if await loc.count() > 0 and await loc.is_visible(): still_processing = True if wait_i % 5 == 0: text = (await loc.text_content() or "").strip()[:40] print(f"[{self.platform_name}] 视频仍在处理: {ind_sel} -> '{text}'") break except: continue # 检查发布按钮是否已可用 try: pub_btn = self.page.locator( 'button:has-text("发布"):not(:has-text("定时发布")):not(:has-text("定时"))' ).first if await pub_btn.count() > 0 and await pub_btn.is_visible(): if await pub_btn.is_enabled(): print(f"[{self.platform_name}] 发布按钮已可用(等待了 {wait_i * 3} 秒)") break else: if wait_i % 5 == 0: print(f"[{self.platform_name}] 发布按钮存在但尚未启用,继续等待...") except: pass if not still_processing and wait_i > 3: print(f"[{self.platform_name}] 未检测到处理指示器,继续尝试发布") break await asyncio.sleep(2) # 滚动到页面底部确保发布按钮可见 await self.page.evaluate("window.scrollTo(0, document.body.scrollHeight)") await asyncio.sleep(1) # ========== 诊断页面上所有按钮 ========== print(f"[{self.platform_name}] === 页面按钮诊断 ===") try: all_page_btns = self.page.locator("button") btn_total = await all_page_btns.count() print(f"[{self.platform_name}] 页面共有 {btn_total} 个