=============================================================================== 智媒通 - 计算机软件著作权申请 源代码材料(前60页+后60页) =============================================================================== 软件全称:智媒通 V1.0 软件简称:智媒通 版本号:V1.0 开发完成日期:2025年 首次发表日期:2025年 权利取得方式:原始取得 权利范围:全部权利 著作权人:(请填写单位或个人名称) =============================================================================== 代码说明 =============================================================================== 本软件为多平台自媒体账号管理系统,支持抖音、快手、视频号、小红书、百家号等 主流平台,具备视频自动发布、评论统一管理、数据分析等功能。采用 Electron + Vue3 桌面客户端 + Express 后端 + Python 发布服务的架构。以下为系统核心源代码。 =============================================================================== 第一部分:平台发布基类 =============================================================================== ================================================================================ 文件: server\python\platforms\base.py ================================================================================ # -*- coding: utf-8 -*- """ 平台发布基类 提供通用的发布接口和工具方法 """ import asyncio import json import os import uuid from abc import ABC, abstractmethod from dataclasses import dataclass, field from datetime import datetime from typing import List, Optional, Callable, Dict, Any from playwright.async_api import async_playwright, Browser, BrowserContext, Page @dataclass class PublishParams: """发布参数""" title: str video_path: str description: str = "" cover_path: Optional[str] = None tags: List[str] = field(default_factory=list) publish_date: Optional[datetime] = None location: str = "重庆市" def __post_init__(self): if not self.description: self.description = self.title @dataclass class PublishResult: """发布结果""" success: bool platform: str video_id: str = "" video_url: str = "" message: str = "" error: str = "" need_captcha: bool = False # 是否需要验证码 captcha_type: str = "" # 验证码类型: phone, slider, image screenshot_base64: str = "" # 页面截图(Base64) page_url: str = "" # 当前页面 URL status: str = "" # 状态: uploading, processing, success, failed, need_captcha, need_action @dataclass class WorkItem: """作品数据""" work_id: str title: str cover_url: str = "" video_url: str = "" duration: int = 0 # 秒 status: str = "published" # published, reviewing, rejected, draft publish_time: str = "" play_count: int = 0 like_count: int = 0 comment_count: int = 0 share_count: int = 0 collect_count: int = 0 def to_dict(self) -> Dict[str, Any]: return { "work_id": self.work_id, "title": self.title, "cover_url": self.cover_url, "video_url": self.video_url, "duration": self.duration, "status": self.status, "publish_time": self.publish_time, "play_count": self.play_count, "like_count": self.like_count, "comment_count": self.comment_count, "share_count": self.share_count, "collect_count": self.collect_count, } @dataclass class CommentItem: """评论数据""" comment_id: str parent_comment_id: str work_id: str content: str author_id: str = "" author_name: str = "" author_avatar: str = "" like_count: int = 0 reply_count: int = 0 create_time: str = "" is_author: bool = False # 是否是作者的评论 replies: List['CommentItem'] = field(default_factory=list) def to_dict(self) -> Dict[str, Any]: return { "comment_id": self.comment_id, "parent_comment_id": self.parent_comment_id, "work_id": self.work_id, "content": self.content, "author_id": self.author_id, "author_name": self.author_name, "author_avatar": self.author_avatar, "like_count": self.like_count, "reply_count": self.reply_count, "create_time": self.create_time, "is_author": self.is_author, "replies": [r.to_dict() for r in self.replies], } @dataclass class WorksResult: """作品列表结果""" success: bool platform: str works: List[WorkItem] = field(default_factory=list) total: int = 0 has_more: bool = False next_page: Any = "" error: str = "" debug_info: str = "" # 调试信息 def to_dict(self) -> Dict[str, Any]: return { "success": self.success, "platform": self.platform, "works": [w.to_dict() for w in self.works], "total": self.total, "has_more": self.has_more, "next_page": self.next_page, "error": self.error, "debug_info": self.debug_info, } @dataclass class CommentsResult: """评论列表结果""" success: bool platform: str work_id: str comments: List[CommentItem] = field(default_factory=list) total: int = 0 has_more: bool = False error: str = "" def to_dict(self) -> Dict[str, Any]: return { "success": self.success, "platform": self.platform, "work_id": self.work_id, "comments": [c.to_dict() for c in self.comments], "total": self.total, "has_more": self.has_more, "error": self.error, } class BasePublisher(ABC): """ 平台发布基类 所有平台发布器都需要继承此类 """ platform_name: str = "base" login_url: str = "" publish_url: str = "" cookie_domain: str = "" def __init__(self, headless: bool = True): self.headless = headless self.browser: Optional[Browser] = None self.context: Optional[BrowserContext] = None self.page: Optional[Page] = None self.on_progress: Optional[Callable[[int, str], None]] = None self.user_id: Optional[int] = None self.publish_task_id: Optional[int] = None self.publish_account_id: Optional[int] = None self.proxy_config: Optional[Dict[str, Any]] = None def set_progress_callback(self, callback: Callable[[int, str], None]): """设置进度回调""" self.on_progress = callback def report_progress(self, progress: int, message: str): """报告进度""" print(f"[{self.platform_name}] [{progress}%] {message}") if self.on_progress: self.on_progress(progress, message) @staticmethod def parse_cookies(cookies_str: str) -> list: """解析 cookie 字符串为列表""" try: cookies = json.loads(cookies_str) if isinstance(cookies, list): return cookies except json.JSONDecodeError: pass # 字符串格式: name=value; name2=value2 cookies = [] for item in cookies_str.split(';'): item = item.strip() if '=' in item: name, value = item.split('=', 1) cookies.append({ 'name': name.strip(), 'value': value.strip(), 'domain': '', 'path': '/' }) return cookies @staticmethod def cookies_to_string(cookies: list) -> str: """将 cookie 列表转换为字符串""" return '; '.join([f"{c['name']}={c['value']}" for c in cookies]) async def init_browser(self, storage_state: str = None, proxy_config: Dict[str, Any] = None): """初始化浏览器""" print(f"[{self.platform_name}] init_browser: headless={self.headless}", flush=True) playwright = await async_playwright().start() proxy = proxy_config or self.proxy_config if proxy and isinstance(proxy, dict) and proxy.get('server'): print(f"[{self.platform_name}] 使用代理: {proxy.get('server')}", flush=True) self.browser = await playwright.chromium.launch(headless=self.headless, proxy=proxy) else: self.browser = await playwright.chromium.launch(headless=self.headless) if storage_state and os.path.exists(storage_state): self.context = await self.browser.new_context(storage_state=storage_state) else: self.context = await self.browser.new_context() self.page = await self.context.new_page() return self.page async def set_cookies(self, cookies: list): """设置 cookies""" if not self.context: raise Exception("Browser context not initialized") # 设置默认域名 for cookie in cookies: if 'domain' not in cookie or not cookie['domain']: cookie['domain'] = self.cookie_domain await self.context.add_cookies(cookies) async def close_browser(self): """关闭浏览器""" if self.context: await self.context.close() if self.browser: await self.browser.close() async def save_cookies(self, file_path: str): """保存 cookies 到文件""" if self.context: await self.context.storage_state(path=file_path) async def capture_screenshot(self) -> str: """截取当前页面截图,返回 Base64 编码""" import base64 if not self.page: return "" try: screenshot_bytes = await self.page.screenshot(type="jpeg", quality=80) return base64.b64encode(screenshot_bytes).decode('utf-8') except Exception as e: print(f"[{self.platform_name}] 截图失败: {e}") return "" async def request_sms_code_from_frontend(self, phone: str = "", timeout_seconds: int = 120, message: str = "") -> str: node_api_url = os.environ.get('NODEJS_API_URL', 'http://localhost:3000').rstrip('/') internal_api_key = os.environ.get('INTERNAL_API_KEY', 'internal-api-key-default') if not self.user_id: raise Exception("缺少 user_id,无法请求前端输入验证码") captcha_task_id = f"py_{self.platform_name}_{uuid.uuid4().hex}" payload = { "user_id": self.user_id, "captcha_task_id": captcha_task_id, "type": "sms", "phone": phone or "", "message": message or "请输入短信验证码", "timeout_seconds": timeout_seconds, "publish_task_id": self.publish_task_id, "publish_account_id": self.publish_account_id, } import requests try: resp = requests.post( f"{node_api_url}/api/internal/captcha/request", headers={ "Content-Type": "application/json", "X-Internal-API-Key": internal_api_key, }, json=payload, timeout=timeout_seconds + 30, ) except Exception as e: raise Exception(f"请求前端验证码失败: {e}") try: data = resp.json() except Exception: raise Exception(f"请求前端验证码失败: HTTP {resp.status_code}") if resp.status_code >= 400 or not data.get("success"): raise Exception(data.get("error") or data.get("message") or f"请求前端验证码失败: HTTP {resp.status_code}") code = data.get("code") or "" if not code: raise Exception("未收到验证码") return str(code) async def ai_analyze_sms_send_state(self, screenshot_base64: str = None) -> dict: import os import requests import json import re try: if not screenshot_base64: screenshot_base64 = await self.capture_screenshot() if not screenshot_base64: return { "has_sms_modal": False, "send_button_state": "unknown", "sent_likely": False, "block_reason": "unknown", "suggested_action": "manual_send", "confidence": 0, "notes": "无法获取截图", } ai_api_key = os.environ.get('DASHSCOPE_API_KEY', '') ai_base_url = os.environ.get('DASHSCOPE_BASE_URL', 'https://dashscope.aliyuncs.com/compatible-mode/v1') ai_vision_model = os.environ.get('AI_VISION_MODEL', 'qwen-vl-plus') if not ai_api_key: return { "has_sms_modal": True, "send_button_state": "unknown", "sent_likely": False, "block_reason": "no_ai_key", "suggested_action": "manual_send", "confidence": 0, "notes": "未配置 AI API Key", } prompt = """请分析这张网页截图,判断是否处于“短信验证码”验证弹窗/页面,并判断“发送验证码/获取验证码”是否已经触发成功。 你需要重点识别: 1) 是否存在短信验证码弹窗(包含“请输入验证码/短信验证码/手机号验证/获取验证码/发送验证码”等) 2) 发送按钮状态:enabled / disabled / countdown(出现xx秒) / hidden / unknown 3) 是否已发送成功:例如出现倒计时、按钮禁用、出现“已发送/重新发送/xx秒后重试”等 4) 是否被阻塞:例如出现滑块/人机验证、频繁发送、风控提示、网络异常等 请以 JSON 返回: ```json { "has_sms_modal": true, "send_button_state": "enabled|disabled|countdown|hidden|unknown", "sent_likely": true, "block_reason": "none|need_click_send|slider|risk|rate_limit|network|unknown", "suggested_action": "wait|click_send|solve_slider|manual_send", "confidence": 0-100, "notes": "一句话说明你看到的证据" } ```""" headers = { 'Authorization': f'Bearer {ai_api_key}', 'Content-Type': 'application/json' } payload = { "model": ai_vision_model, "messages": [ { "role": "user", "content": [ { "type": "image_url", "image_url": { "url": f"data:image/jpeg;base64,{screenshot_base64}" } }, { "type": "text", "text": prompt } ] } ], "max_tokens": 500 } response = requests.post( f"{ai_base_url}/chat/completions", headers=headers, json=payload, timeout=30 ) if response.status_code != 200: return { "has_sms_modal": True, "send_button_state": "unknown", "sent_likely": False, "block_reason": "network", "suggested_action": "manual_send", "confidence": 0, "notes": f"AI API 返回错误 {response.status_code}", } result = response.json() ai_response = result.get('choices', [{}])[0].get('message', {}).get('content', '') json_match = re.search(r'```json\\s*([\\s\\S]*?)\\s*```', ai_response) if json_match: json_str = json_match.group(1) else: json_match = re.search(r'\\{[\\s\\S]*\\}', ai_response) json_str = json_match.group(0) if json_match else '{}' try: data = json.loads(json_str) except Exception: data = {} return { "has_sms_modal": bool(data.get("has_sms_modal", True)), "send_button_state": data.get("send_button_state", "unknown"), "sent_likely": bool(data.get("sent_likely", False)), "block_reason": data.get("block_reason", "unknown"), "suggested_action": data.get("suggested_action", "manual_send"), "confidence": int(data.get("confidence", 0) or 0), "notes": data.get("notes", ""), } except Exception as e: return { "has_sms_modal": True, "send_button_state": "unknown", "sent_likely": False, "block_reason": "unknown", "suggested_action": "manual_send", "confidence": 0, "notes": f"AI 分析异常: {e}", } async def sync_cookies_to_node(self, cookies: list) -> bool: import os import json import requests if not self.user_id or not self.publish_account_id: return False node_api_url = os.environ.get('NODEJS_API_URL', 'http://localhost:3000').rstrip('/') internal_api_key = os.environ.get('INTERNAL_API_KEY', 'internal-api-key-default') try: payload = { "user_id": int(self.user_id), "account_id": int(self.publish_account_id), "cookies": json.dumps(cookies, ensure_ascii=False), } resp = requests.post( f"{node_api_url}/api/internal/accounts/update-cookies", headers={ "Content-Type": "application/json", "X-Internal-API-Key": internal_api_key, }, json=payload, timeout=30, ) if resp.status_code >= 400: return False data = resp.json() if resp.content else {} return bool(data.get("success", True)) except Exception: return False async def ai_suggest_playwright_selector(self, goal: str, screenshot_base64: str = None) -> dict: import os import requests import json import re try: if not screenshot_base64: screenshot_base64 = await self.capture_screenshot() if not screenshot_base64: return {"has_selector": False, "selector": "", "confidence": 0, "notes": "无法获取截图"} ai_api_key = os.environ.get('DASHSCOPE_API_KEY', '') ai_base_url = os.environ.get('DASHSCOPE_BASE_URL', 'https://dashscope.aliyuncs.com/compatible-mode/v1') ai_vision_model = os.environ.get('AI_VISION_MODEL', 'qwen-vl-plus') if not ai_api_key: return {"has_selector": False, "selector": "", "confidence": 0, "notes": "未配置 AI API Key"} prompt = f"""请分析这张网页截图,给出一个 Playwright Python 可用的 selector(用于 page.locator(selector))来完成目标操作。 目标:{goal} 要求: 1) selector 尽量稳定(优先 role/text/aria,其次 class,避免过度依赖随机 class) 2) selector 必须是 Playwright 支持的选择器语法(如:text="发布"、button:has-text("发布")、[role="button"]:has-text("发布") 等) 3) 只返回一个最优 selector 以 JSON 返回: ```json {{ "has_selector": true, "selector": "button:has-text(\\"发布\\")", "confidence": 0-100, "notes": "你依据的页面证据" }} ```""" headers = { 'Authorization': f'Bearer {ai_api_key}', 'Content-Type': 'application/json' } payload = { "model": ai_vision_model, "messages": [ { "role": "user", "content": [ { "type": "image_url", "image_url": { "url": f"data:image/jpeg;base64,{screenshot_base64}" } }, { "type": "text", "text": prompt } ] } ], "max_tokens": 300 } response = requests.post( f"{ai_base_url}/chat/completions", headers=headers, json=payload, timeout=30 ) if response.status_code != 200: return {"has_selector": False, "selector": "", "confidence": 0, "notes": f"AI API 错误 {response.status_code}"} result = response.json() ai_response = result.get('choices', [{}])[0].get('message', {}).get('content', '') json_match = re.search(r'```json\\s*([\\s\\S]*?)\\s*```', ai_response) if json_match: json_str = json_match.group(1) else: json_match = re.search(r'\\{[\\s\\S]*\\}', ai_response) json_str = json_match.group(0) if json_match else '{}' try: data = json.loads(json_str) except Exception: data = {} selector = str(data.get("selector", "") or "").strip() has_selector = bool(data.get("has_selector", False)) and bool(selector) confidence = int(data.get("confidence", 0) or 0) notes = str(data.get("notes", "") or "") if not has_selector: return {"has_selector": False, "selector": "", "confidence": confidence, "notes": notes or "未给出 selector"} return {"has_selector": True, "selector": selector, "confidence": confidence, "notes": notes} except Exception as e: return {"has_selector": False, "selector": "", "confidence": 0, "notes": f"AI selector 异常: {e}"} async def ai_check_captcha(self, screenshot_base64: str = None) -> dict: """ 使用 AI 分析截图检测验证码 Args: screenshot_base64: 截图的 Base64 编码,如果为空则自动获取当前页面截图 Returns: dict: { "has_captcha": bool, # 是否有验证码 "captcha_type": str, # 验证码类型: slider, image, phone, rotate, puzzle "captcha_description": str, # 验证码描述 "confidence": float, # 置信度 0-100 "need_headful": bool # 是否需要切换到有头浏览器 } """ import os import requests try: # 获取截图 if not screenshot_base64: screenshot_base64 = await self.capture_screenshot() if not screenshot_base64: print(f"[{self.platform_name}] AI验证码检测: 无法获取截图") return { "has_captcha": False, "captcha_type": "", "captcha_description": "", "confidence": 0, "need_headful": False } # 获取 AI 配置 ai_api_key = os.environ.get('DASHSCOPE_API_KEY', '') ai_base_url = os.environ.get('DASHSCOPE_BASE_URL', 'https://dashscope.aliyuncs.com/compatible-mode/v1') ai_vision_model = os.environ.get('AI_VISION_MODEL', 'qwen-vl-plus') if not ai_api_key: print(f"[{self.platform_name}] AI验证码检测: 未配置 AI API Key,使用传统方式检测") return await self._traditional_captcha_check() # 构建 AI 请求 prompt = """请分析这张网页截图,判断页面上是否存在验证码。 请检查以下类型的验证码: 1. 滑块验证码(需要滑动滑块到指定位置) 2. 图片验证码(需要选择正确的图片、点击图片上的文字等) 3. 旋转验证码(需要旋转图片到正确角度) 4. 拼图验证码(需要拖动拼图块到正确位置) 5. 手机验证码(需要输入手机收到的验证码) 6. 计算验证码(需要输入计算结果) 请以 JSON 格式返回结果: ```json { "has_captcha": true/false, "captcha_type": "slider/image/phone/rotate/puzzle/calculate/none", "captcha_description": "验证码的具体描述", "confidence": 0-100 } ``` 注意: - 如果页面有明显的验证码弹窗或验证区域,has_captcha 为 true - 如果只是普通的登录页面或表单,没有特殊的验证步骤,has_captcha 为 false - confidence 表示你对判断结果的信心,100 表示非常确定""" headers = { 'Authorization': f'Bearer {ai_api_key}', 'Content-Type': 'application/json' } payload = { "model": ai_vision_model, "messages": [ { "role": "user", "content": [ { "type": "image_url", "image_url": { "url": f"data:image/jpeg;base64,{screenshot_base64}" } }, { "type": "text", "text": prompt } ] } ], "max_tokens": 500 } print(f"[{self.platform_name}] AI验证码检测: 正在分析截图...") response = requests.post( f"{ai_base_url}/chat/completions", headers=headers, json=payload, timeout=30 ) if response.status_code != 200: print(f"[{self.platform_name}] AI验证码检测: API 返回错误 {response.status_code}") return await self._traditional_captcha_check() result = response.json() ai_response = result.get('choices', [{}])[0].get('message', {}).get('content', '') print(f"[{self.platform_name}] AI验证码检测响应: {ai_response[:200]}...") # 解析 AI 响应 import re json_match = re.search(r'```json\s*([\s\S]*?)\s*```', ai_response) if json_match: json_str = json_match.group(1) else: # 尝试直接解析 json_match = re.search(r'\{[\s\S]*\}', ai_response) if json_match: json_str = json_match.group(0) else: json_str = '{}' try: ai_result = json.loads(json_str) except: ai_result = {} has_captcha = ai_result.get('has_captcha', False) captcha_type = ai_result.get('captcha_type', '') captcha_description = ai_result.get('captcha_description', '') confidence = ai_result.get('confidence', 0) # 如果检测到验证码,需要切换到有头浏览器 need_headful = has_captcha and captcha_type not in ['none', ''] print(f"[{self.platform_name}] AI验证码检测结果: has_captcha={has_captcha}, type={captcha_type}, confidence={confidence}") return { "has_captcha": has_captcha, "captcha_type": captcha_type if captcha_type != 'none' else '', "captcha_description": captcha_description, "confidence": confidence, "need_headful": need_headful } except Exception as e: print(f"[{self.platform_name}] AI验证码检测异常: {e}") import traceback traceback.print_exc() return await self._traditional_captcha_check() async def _traditional_captcha_check(self) -> dict: """传统方式检测验证码(基于 DOM 元素)""" if not self.page: return { "has_captcha": False, "captcha_type": "", "captcha_description": "", "confidence": 0, "need_headful": False } try: # 检查常见的验证码选择器 captcha_selectors = [ # 滑块验证码 ('[class*="slider"]', 'slider', '滑块验证码'), ('[class*="slide-verify"]', 'slider', '滑块验证码'), ('text="滑动"', 'slider', '滑块验证码'), ('text="拖动"', 'slider', '滑块验证码'), # 图片验证码 ('[class*="captcha"]', 'image', '图片验证码'), ('[class*="verify-img"]', 'image', '图片验证码'), ('text="点击"', 'image', '图片验证码'), ('text="选择"', 'image', '图片验证码'), # 手机验证码 ('text="验证码"', 'phone', '手机验证码'), ('text="获取验证码"', 'phone', '手机验证码'), ('[class*="sms-code"]', 'phone', '手机验证码'), # 旋转验证码 ('text="旋转"', 'rotate', '旋转验证码'), ('[class*="rotate"]', 'rotate', '旋转验证码'), ] for selector, captcha_type, description in captcha_selectors: try: count = await self.page.locator(selector).count() if count > 0: # 检查是否可见 element = self.page.locator(selector).first if await element.is_visible(): print(f"[{self.platform_name}] 传统检测: 发现验证码 - {selector}") return { "has_captcha": True, "captcha_type": captcha_type, "captcha_description": description, "confidence": 80, "need_headful": True } except: pass return { "has_captcha": False, "captcha_type": "", "captcha_description": "", "confidence": 80, "need_headful": False } except Exception as e: print(f"[{self.platform_name}] 传统验证码检测异常: {e}") return { "has_captcha": False, "captcha_type": "", "captcha_description": "", "confidence": 0, "need_headful": False } async def get_page_url(self) -> str: """获取当前页面 URL""" if not self.page: return "" try: return self.page.url except: return "" async def check_publish_status(self) -> dict: """ 检查发布状态 返回: {status, screenshot_base64, page_url, message} """ if not self.page: return {"status": "error", "message": "页面未初始化"} try: screenshot = await self.capture_screenshot() page_url = await self.get_page_url() # 检查常见的成功/失败标志 page_content = await self.page.content() # 检查成功标志 success_keywords = ['发布成功', '上传成功', '发表成功', '提交成功'] for keyword in success_keywords: if keyword in page_content: return { "status": "success", "screenshot_base64": screenshot, "page_url": page_url, "message": "发布成功" } # 检查验证码标志 captcha_keywords = ['验证码', '身份验证', '请完成验证', '滑动验证', '图形验证'] for keyword in captcha_keywords: if keyword in page_content: return { "status": "need_captcha", "screenshot_base64": screenshot, "page_url": page_url, "message": f"检测到{keyword}" } # 检查失败标志 fail_keywords = ['发布失败', '上传失败', '提交失败', '操作失败'] for keyword in fail_keywords: if keyword in page_content: return { "status": "failed", "screenshot_base64": screenshot, "page_url": page_url, "message": keyword } # 默认返回处理中 return { "status": "processing", "screenshot_base64": screenshot, "page_url": page_url, "message": "处理中" } except Exception as e: return { "status": "error", "screenshot_base64": "", "page_url": "", "message": str(e) } async def wait_for_upload_complete(self, success_selector: str, timeout: int = 300): """等待上传完成""" if not self.page: raise Exception("Page not initialized") for _ in range(timeout // 3): try: count = await self.page.locator(success_selector).count() if count > 0: return True except: pass await asyncio.sleep(3) self.report_progress(30, "正在上传视频...") return False @abstractmethod async def publish(self, cookies: str, params: PublishParams) -> PublishResult: """ 发布视频 - 子类必须实现 Args: cookies: cookie 字符串或 JSON params: 发布参数 Returns: PublishResult: 发布结果 """ pass async def get_works(self, cookies: str, page: int = 0, page_size: int = 20) -> WorksResult: """ 获取作品列表 - 子类可覆盖实现 Args: cookies: cookie 字符串或 JSON page: 页码(从0开始) page_size: 每页数量 Returns: WorksResult: 作品列表结果 """ return WorksResult( success=False, platform=self.platform_name, error="该平台暂不支持获取作品列表" ) async def get_comments(self, cookies: str, work_id: str, cursor: str = "") -> CommentsResult: """ 获取作品评论 - 子类可覆盖实现 Args: cookies: cookie 字符串或 JSON work_id: 作品ID cursor: 分页游标 Returns: CommentsResult: 评论列表结果 """ return CommentsResult( success=False, platform=self.platform_name, work_id=work_id, error="该平台暂不支持获取评论" ) async def run(self, cookies: str, params: PublishParams) -> PublishResult: """ 运行发布任务 包装了 publish 方法,添加了异常处理和资源清理 """ try: return await self.publish(cookies, params) except Exception as e: import traceback traceback.print_exc() return PublishResult( success=False, platform=self.platform_name, error=str(e) ) finally: await self.close_browser() async def run_get_works(self, cookies: str, page: int = 0, page_size: int = 20) -> WorksResult: """ 运行获取作品任务 """ try: return await self.get_works(cookies, page, page_size) except Exception as e: import traceback traceback.print_exc() return WorksResult( success=False, platform=self.platform_name, error=str(e) ) finally: await self.close_browser() async def run_get_comments(self, cookies: str, work_id: str, cursor: str = "") -> CommentsResult: """ 运行获取评论任务 """ try: return await self.get_comments(cookies, work_id, cursor) except Exception as e: import traceback traceback.print_exc() return CommentsResult( success=False, platform=self.platform_name, work_id=work_id, error=str(e) ) finally: await self.close_browser() async def check_login_status(self, cookies: str) -> dict: """ 检查 Cookie 登录状态(通过浏览器访问后台页面检测) Args: cookies: cookie 字符串或 JSON Returns: dict: { "success": True, "valid": True/False, "need_login": True/False, "message": "状态描述" } """ try: await self.init_browser() cookie_list = self.parse_cookies(cookies) await self.set_cookies(cookie_list) if not self.page: raise Exception("Page not initialized") # 访问平台后台首页 home_url = self.login_url print(f"[{self.platform_name}] 访问后台页面: {home_url}") await self.page.goto(home_url, wait_until='domcontentloaded', timeout=30000) await asyncio.sleep(3) # 检查当前 URL 是否被重定向到登录页 current_url = self.page.url print(f"[{self.platform_name}] 当前 URL: {current_url}") # 登录页特征 login_indicators = ['login', 'passport', 'signin', 'auth'] is_login_page = any(indicator in current_url.lower() for indicator in login_indicators) # 检查页面是否有登录弹窗 need_login = is_login_page # 风控/验证码特征 risk_indicators = ['captcha', 'verify', 'challenge', 'risk', 'security', 'safe', 'protect', 'slider'] need_verification = any(indicator in current_url.lower() for indicator in risk_indicators) if not need_login: # 检查页面内容是否有登录提示 login_selectors = [ 'text="请先登录"', 'text="登录后继续"', 'text="请登录"', '[class*="login-modal"]', '[class*="login-dialog"]', '[class*="login-popup"]', ] for selector in login_selectors: try: if await self.page.locator(selector).count() > 0: need_login = True print(f"[{self.platform_name}] 检测到登录弹窗: {selector}") break except: pass if not need_login and not need_verification: verification_selectors = [ 'text="安全验证"', 'text="验证码"', 'text="人机验证"', 'text="滑块"', 'text="请完成验证"', 'text="系统检测到异常"', 'text="访问受限"', 'text="行为异常"', ] for selector in verification_selectors: try: if await self.page.locator(selector).count() > 0: need_verification = True print(f"[{self.platform_name}] 检测到风控/验证码提示: {selector}") break except: pass if need_login: return { "success": True, "valid": False, "need_login": True, "message": "Cookie 已过期,需要重新登录" } elif need_verification: return { "success": True, "valid": False, "need_login": True, "message": "触发风控/需要验证" } else: return { "success": True, "valid": True, "need_login": False, "message": "登录状态有效" } except Exception as e: import traceback traceback.print_exc() return { "success": False, "valid": False, "need_login": True, "error": str(e) } finally: await self.close_browser() ================================================================================ 文件: server\python\app.py ================================================================================ #!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 智媒通视频发布服务 - 统一入口 支持平台: 抖音、小红书、视频号、快手 参考项目: matrix (https://github.com/kebenxiaoming/matrix) 使用方式: python app.py # 启动 HTTP 服务 (端口 5005) python app.py --port 8080 # 指定端口 python app.py --headless false # 显示浏览器窗口 """ import asyncio import os import sys import argparse import random import re import time # 禁用输出缓冲,确保 print 立即输出 os.environ['PYTHONUNBUFFERED'] = '1' # 修复 Windows 终端中文输出乱码 if sys.platform == 'win32': import io sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8', errors='replace', line_buffering=True) sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding='utf-8', errors='replace', line_buffering=True) # 设置环境变量 os.environ['PYTHONIOENCODING'] = 'utf-8' import traceback import requests from datetime import datetime, date from pathlib import Path # 确保当前目录在 Python 路径中 CURRENT_DIR = Path(__file__).parent.resolve() if str(CURRENT_DIR) not in sys.path: sys.path.insert(0, str(CURRENT_DIR)) # 从 server/.env 文件加载环境变量 def load_env_file(): """从 server/.env 文件加载环境变量""" env_path = CURRENT_DIR.parent / '.env' if env_path.exists(): print(f"[Config] Loading env from: {env_path}") with open(env_path, 'r', encoding='utf-8') as f: for line in f: line = line.strip() if line and not line.startswith('#') and '=' in line: key, value = line.split('=', 1) key = key.strip() value = value.strip() # 移除引号 if value.startswith('"') and value.endswith('"'): value = value[1:-1] elif value.startswith("'") and value.endswith("'"): value = value[1:-1] # 只在环境变量未设置时加载 if key not in os.environ: os.environ[key] = value safe_key = key.upper() is_sensitive = any(p in safe_key for p in ['PASSWORD', 'SECRET', 'TOKEN', 'KEY', 'ENCRYPT']) print(f"[Config] Loaded: {key}=***" if is_sensitive else f"[Config] Loaded: {key}={value}") else: print(f"[Config] .env file not found: {env_path}") # 加载环境变量 load_env_file() from flask import Flask, request, jsonify from flask_cors import CORS from platforms import get_publisher, PLATFORM_MAP from platforms.base import PublishParams from platforms.weixin import WeixinPublisher def parse_datetime(date_str: str): """解析日期时间字符串""" if not date_str: return None formats = [ "%Y-%m-%d %H:%M:%S", "%Y-%m-%d %H:%M", "%Y/%m/%d %H:%M:%S", "%Y/%m/%d %H:%M", "%Y-%m-%dT%H:%M:%S", "%Y-%m-%dT%H:%M:%SZ", ] for fmt in formats: try: return datetime.strptime(date_str, fmt) except ValueError: continue return None def _extract_ip_ports(text: str): if not text: return [] matches = re.findall(r'\b(?:\d{1,3}\.){3}\d{1,3}:\d{2,5}\b', text) seen = set() results = [] for m in matches: if m in seen: continue seen.add(m) results.append(m) return results def _mask_ip_port(ip_port: str) -> str: try: host, port = ip_port.split(':', 1) parts = host.split('.') if len(parts) == 4: return f"{parts[0]}.{parts[1]}.{parts[2]}.***:{port}" except Exception: pass return '***' def _build_requests_proxy_meta(host: str, port: int, username: str = '', password: str = '') -> str: host = str(host).strip() port = int(port) if username and password: return f"http://{username}:{password}@{host}:{port}" return f"http://{host}:{port}" def _test_proxy_connectivity(test_url: str, host: str, port: int, username: str = '', password: str = '', timeout: int = 10) -> bool: proxy_meta = _build_requests_proxy_meta(host, port, username, password) proxies = {"http": proxy_meta, "https": proxy_meta} start = int(round(time.time() * 1000)) try: session = requests.Session() session.trust_env = False resp = session.get(test_url, proxies=proxies, timeout=timeout) _ = resp.text cost = int(round(time.time() * 1000)) - start print(f"[Proxy] test ok: {_mask_ip_port(host + ':' + str(port))} cost={cost}ms", flush=True) return True except Exception as e: print(f"[Proxy] test failed: {_mask_ip_port(host + ':' + str(port))} err={type(e).__name__}", flush=True) return False _PROXY_CACHE_TTL_SECONDS = 20 * 60 _resolved_proxy_cache = {} def _resolve_shenlong_proxy(proxy_payload: dict) -> dict: test_url = 'http://myip.ipip.net' city = str(proxy_payload.get('city') or '').strip() region_code = str(proxy_payload.get('regionCode') or '').strip() api_url = str(proxy_payload.get('apiUrl') or '').strip() product_key = str(proxy_payload.get('productKey') or '').strip() signature = str(proxy_payload.get('signature') or '').strip() isp = str(proxy_payload.get('isp') or '').strip() publish_task_id = str(proxy_payload.get('publish_task_id') or '').strip() if not product_key: raise Exception('缺少神龙产品Key') if not signature: raise Exception('缺少神龙签名') if region_code and region_code.isdigit() and len(region_code) == 6: if region_code.endswith('0000'): region_code = '' elif not region_code.endswith('00'): region_code = region_code[:4] + '00' cache_key = '' if publish_task_id: cache_key = f"publish_task:{publish_task_id}:area:{region_code or '-'}:isp:{isp or '-'}" now = int(time.time()) cached = _resolved_proxy_cache.get(cache_key) if isinstance(cached, dict) and cached.get('expire_at', 0) > now and cached.get('server'): server = str(cached.get('server') or '').strip() if server: print(f"[Proxy] cache hit: task={publish_task_id} area={region_code or '-'} isp={isp or '-'}", flush=True) return {'server': server} request_url = api_url or 'http://api.shenlongip.com/ip' params = { 'key': product_key, 'sign': signature, 'count': 1, 'pattern': 'json', 'mr': 1, } if region_code: params['area'] = region_code if isp: params['isp'] = isp payload = None session = requests.Session() session.trust_env = False resp = session.get( request_url, params=params, headers={ 'User-Agent': 'Mozilla/5.0', 'Accept': 'application/json', }, timeout=15, ) content_type = (resp.headers.get('content-type') or '').lower() raw_text = resp.text or '' try: if 'application/json' in content_type or raw_text.strip().startswith('{') or raw_text.strip().startswith('['): payload = resp.json() except Exception: payload = None if isinstance(payload, dict) and payload.get('code') is not None: try: api_code = int(payload.get('code')) except Exception: api_code = -1 if api_code != 200: raise Exception(f"代理提取失败: code={api_code} msg={str(payload.get('msg') or '').strip() or 'unknown'}") elif resp.status_code >= 400: raise Exception(f"代理提取失败: HTTP {resp.status_code}") def collect_ip_ports(data_list, city_filter: str): ips = [] for item in data_list: if isinstance(item, str): for ip_port in _extract_ip_ports(item): ips.append(ip_port) continue if not isinstance(item, dict): continue item_city = str(item.get('city') or item.get('area') or '').strip() if city_filter and item_city and item_city != city_filter: continue ip = str(item.get('ip') or item.get('host') or item.get('proxy_ip') or '').strip() port = str(item.get('port') or item.get('proxy_port') or '').strip() if ip and port: ips.append(f"{ip}:{port}") proxy = str(item.get('proxy') or item.get('ip_port') or '').strip() if proxy: for ip_port in _extract_ip_ports(proxy): ips.append(ip_port) return ips ip_ports = [] if payload is not None: if isinstance(payload, dict): if isinstance(payload.get('data'), list): ip_ports = collect_ip_ports(payload.get('data'), '') elif isinstance(payload.get('list'), list): ip_ports = collect_ip_ports(payload.get('list'), '') elif payload.get('ip') and payload.get('port'): ip_ports = collect_ip_ports([payload], '') elif isinstance(payload, list): ip_ports = collect_ip_ports(payload, '') else: ip_ports = _extract_ip_ports(raw_text) if not ip_ports: raise Exception('代理提取结果为空') random.shuffle(ip_ports) candidates = ip_ports[: min(10, len(ip_ports))] print(f"[Proxy] shenlong resolved: city={city or '-'} area={region_code or '-'} candidates={len(candidates)}/{len(ip_ports)}", flush=True) for ip_port in candidates: try: host, port_str = ip_port.split(':', 1) port = int(port_str) except Exception: continue if _test_proxy_connectivity(test_url, host, port, timeout=10): server = f"http://{host}:{port}" if cache_key: _resolved_proxy_cache[cache_key] = { 'server': server, 'expire_at': int(time.time()) + _PROXY_CACHE_TTL_SECONDS, } print(f"[Proxy] cache set: task={publish_task_id} ttl={_PROXY_CACHE_TTL_SECONDS}s", flush=True) return {'server': server} raise Exception('未找到可用代理IP') def validate_video_file(video_path: str) -> bool: """验证视频文件是否有效""" if not video_path: return False if not os.path.exists(video_path): return False if not os.path.isfile(video_path): return False valid_extensions = ['.mp4', '.mov', '.avi', '.mkv', '.flv', '.wmv', '.webm'] ext = os.path.splitext(video_path)[1].lower() if ext not in valid_extensions: return False if os.path.getsize(video_path) < 1024: return False return True # 创建 Flask 应用 app = Flask(__name__) CORS(app) # 配置日志以显示所有 HTTP 请求 import logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') # 让 werkzeug 日志显示 werkzeug_logger = logging.getLogger('werkzeug') werkzeug_logger.setLevel(logging.INFO) # 添加 StreamHandler 确保输出到控制台 handler = logging.StreamHandler(sys.stdout) handler.setLevel(logging.INFO) werkzeug_logger.addHandler(handler) logging.getLogger('urllib3').setLevel(logging.WARNING) # 添加请求钩子,打印所有收到的请求 @app.before_request def log_request_info(): """在处理每个请求前打印详细信息""" print(f"\n{'='*60}", flush=True) print(f"[HTTP Request] {request.method} {request.path}", flush=True) print(f"[HTTP Request] From: {request.remote_addr}", flush=True) if request.content_type and 'json' in request.content_type: try: data = request.get_json(silent=True) if data: # 打印部分参数,避免太长 keys = list(data.keys()) if data else [] print(f"[HTTP Request] JSON keys: {keys}", flush=True) except: pass print(f"{'='*60}\n", flush=True) # 全局配置 HEADLESS_MODE = os.environ.get('HEADLESS', 'true').lower() == 'true' print(f"[Config] HEADLESS env value: '{os.environ.get('HEADLESS', 'NOT SET')}'", flush=True) print(f"[Config] HEADLESS_MODE: {HEADLESS_MODE}", flush=True) # Node.js API 配置 NODEJS_API_BASE_URL = os.environ.get('NODEJS_API_URL', 'http://localhost:3000') INTERNAL_API_KEY = os.environ.get('INTERNAL_API_KEY', 'internal-api-key-default') print(f"[API Config] Node.js API: {NODEJS_API_BASE_URL}", flush=True) class NodeApiError(Exception): """用于把 Node 内部接口的错误状态码/内容透传给调用方。""" def __init__(self, status_code: int, payload: dict): super().__init__(payload.get("error") or payload.get("message") or "Node API error") self.status_code = status_code self.payload = payload def call_nodejs_api(method: str, endpoint: str, data: dict = None, params: dict = None) -> dict: """调用 Node.js 内部 API""" url = f"{NODEJS_API_BASE_URL}/api/internal{endpoint}" headers = { 'Content-Type': 'application/json', 'X-Internal-API-Key': INTERNAL_API_KEY, } try: if method.upper() == 'GET': response = requests.get(url, headers=headers, params=params, timeout=30) elif method.upper() == 'POST': response = requests.post(url, headers=headers, json=data, timeout=30) else: raise ValueError(f"Unsupported HTTP method: {method}") # 兼容 Node 可能返回非 JSON 的情况 try: payload = response.json() except Exception: payload = { "success": False, "error": "Node.js API 返回非 JSON 响应", "status": response.status_code, "text": (response.text or "")[:2000], "url": url, "endpoint": endpoint, } if response.status_code >= 400: # 把真实状态码/返回体抛出去,由路由决定如何返回给前端 if isinstance(payload, dict): payload.setdefault("success", False) payload.setdefault("status", response.status_code) payload.setdefault("url", url) payload.setdefault("endpoint", endpoint) raise NodeApiError(response.status_code, payload if isinstance(payload, dict) else { "success": False, "error": "Node.js API 调用失败", "status": response.status_code, "data": payload, "url": url, "endpoint": endpoint, }) return payload except requests.exceptions.RequestException as e: # 连接失败/超时等(此时通常拿不到 response) print(f"[API Error] 调用 Node.js API 失败: {e}", flush=True) raise NodeApiError(502, { "success": False, "error": f"无法连接 Node.js API: {str(e)}", "status": 502, "url": url, "endpoint": endpoint, }) # ==================== 签名相关(小红书专用) ==================== @app.route("/sign", methods=["POST"]) def sign_endpoint(): """小红书签名接口""" try: from platforms.xiaohongshu import XiaohongshuPublisher data = request.json publisher = XiaohongshuPublisher(headless=True) result = asyncio.run(publisher.get_sign( data.get("uri", ""), data.get("data"), data.get("a1", ""), data.get("web_session", "") )) return jsonify(result) except Exception as e: traceback.print_exc() return jsonify({"error": str(e)}), 500 # ==================== 统一发布接口 ==================== @app.route("/publish", methods=["POST"]) def publish_video(): """ 统一发布接口 请求体: { "platform": "douyin", # douyin | xiaohongshu | weixin | kuaishou "cookie": "cookie字符串或JSON", "title": "视频标题", "description": "视频描述(可选)", "video_path": "视频文件绝对路径", "cover_path": "封面图片绝对路径(可选)", "tags": ["话题1", "话题2"], "post_time": "定时发布时间(可选,格式:2024-01-20 12:00:00)", "location": "位置(可选,默认:重庆市)" } 响应: { "success": true, "platform": "douyin", "video_id": "xxx", "video_url": "xxx", "message": "发布成功" } """ try: data = request.json # 获取参数 platform = data.get("platform", "").lower() cookie_str = data.get("cookie", "") title = data.get("title", "") description = data.get("description", "") video_path = data.get("video_path", "") cover_path = data.get("cover_path") tags = data.get("tags", []) post_time = data.get("post_time") location = data.get("location", "重庆市") # 调试日志 print(f"[Publish] 收到请求: platform={platform}, title={title}, video_path={video_path}") # 参数验证 if not platform: print("[Publish] 错误: 缺少 platform 参数") return jsonify({"success": False, "error": "缺少 platform 参数"}), 400 if platform not in PLATFORM_MAP: print(f"[Publish] 错误: 不支持的平台 {platform}") return jsonify({ "success": False, "error": f"不支持的平台: {platform},支持: {list(PLATFORM_MAP.keys())}" }), 400 if not cookie_str: print("[Publish] 错误: 缺少 cookie 参数") return jsonify({"success": False, "error": "缺少 cookie 参数"}), 400 if not title: print("[Publish] 错误: 缺少 title 参数") return jsonify({"success": False, "error": "缺少 title 参数"}), 400 if not video_path: print("[Publish] 错误: 缺少 video_path 参数") return jsonify({"success": False, "error": "缺少 video_path 参数"}), 400 # 视频文件验证(增加详细信息) if not os.path.exists(video_path): print(f"[Publish] 错误: 视频文件不存在: {video_path}") return jsonify({"success": False, "error": f"视频文件不存在: {video_path}"}), 400 if not os.path.isfile(video_path): print(f"[Publish] 错误: 路径不是文件: {video_path}") return jsonify({"success": False, "error": f"路径不是文件: {video_path}"}), 400 # 解析发布时间 publish_date = parse_datetime(post_time) if post_time else None # 创建发布参数 params = PublishParams( title=title, video_path=video_path, description=description, cover_path=cover_path, tags=tags, publish_date=publish_date, location=location ) print("=" * 60) print(f"[Publish] 平台: {platform}") print(f"[Publish] 标题: {title}") print(f"[Publish] 视频: {video_path}") print(f"[Publish] 封面: {cover_path}") print(f"[Publish] 话题: {tags}") print(f"[Publish] 定时: {publish_date}") print("=" * 60) # 获取对应平台的发布器 PublisherClass = get_publisher(platform) publisher = PublisherClass(headless=HEADLESS_MODE) proxy_payload = data.get('proxy') if isinstance(proxy_payload, dict) and proxy_payload.get('enabled'): provider = str(proxy_payload.get('provider') or 'shenlong').strip().lower() if provider == 'shenlong': proxy_payload_with_task = dict(proxy_payload) if data.get('publish_task_id') is not None: proxy_payload_with_task['publish_task_id'] = data.get('publish_task_id') publisher.proxy_config = _resolve_shenlong_proxy(proxy_payload_with_task) # 执行发布 result = asyncio.run(publisher.run(cookie_str, params)) response_data = { "success": result.success, "platform": result.platform, "video_id": result.video_id, "video_url": result.video_url, "message": result.message, "error": result.error, "need_captcha": result.need_captcha, "captcha_type": result.captcha_type, "screenshot_base64": result.screenshot_base64, "page_url": result.page_url, "status": result.status } # 如果需要验证码,打印明确的日志 if result.need_captcha: print(f"[Publish] 需要验证码: type={result.captcha_type}") return jsonify(response_data) except Exception as e: traceback.print_exc() return jsonify({"success": False, "error": str(e)}), 500 # ==================== AI 辅助发布接口 ==================== # 存储活跃的发布会话 active_publish_sessions = {} @app.route("/publish/ai-assisted", methods=["POST"]) def publish_ai_assisted(): """ AI 辅助发布接口 与普通发布接口的区别: 1. 发布过程中会返回截图供 AI 分析 2. 如果检测到需要验证码,返回截图和状态,等待外部处理 3. 支持继续发布(输入验证码后) 请求体: { "platform": "douyin", "cookie": "cookie字符串", "title": "视频标题", "video_path": "视频文件路径", ... "return_screenshot": true // 是否返回截图 } 响应: { "success": true/false, "status": "success|failed|need_captcha|processing", "screenshot_base64": "...", // 当前页面截图 "page_url": "...", ... } """ # 立即打印请求日志,确保能看到 print("\n" + "!" * 60, flush=True) print("!!! [AI-Assisted Publish] 收到请求 !!!", flush=True) print("!" * 60 + "\n", flush=True) try: data = request.json print(f"[AI-Assisted Publish] 请求数据: platform={data.get('platform')}, title={data.get('title')}", flush=True) # 获取参数 platform = data.get("platform", "").lower() cookie_str = data.get("cookie", "") title = data.get("title", "") description = data.get("description", "") video_path = data.get("video_path", "") cover_path = data.get("cover_path") tags = data.get("tags", []) post_time = data.get("post_time") location = data.get("location", "重庆市") return_screenshot = data.get("return_screenshot", True) # 支持请求级别的 headless 参数,用于验证码场景下的有头浏览器模式 headless = data.get("headless", HEADLESS_MODE) if isinstance(headless, str): headless = headless.lower() == 'true' # 参数验证 if not platform: return jsonify({"success": False, "error": "缺少 platform 参数"}), 400 if platform not in PLATFORM_MAP: return jsonify({"success": False, "error": f"不支持的平台: {platform}"}), 400 if not cookie_str: return jsonify({"success": False, "error": "缺少 cookie 参数"}), 400 if not title: return jsonify({"success": False, "error": "缺少 title 参数"}), 400 if not video_path or not os.path.exists(video_path): return jsonify({"success": False, "error": f"视频文件不存在: {video_path}"}), 400 # 解析发布时间 publish_date = parse_datetime(post_time) if post_time else None # 创建发布参数 params = PublishParams( title=title, video_path=video_path, description=description, cover_path=cover_path, tags=tags, publish_date=publish_date, location=location ) print("=" * 60) print(f"[AI Publish] 平台: {platform}") print(f"[AI Publish] 标题: {title}") print(f"[AI Publish] 视频: {video_path}") print(f"[AI Publish] Headless: {headless}") print("=" * 60) # 获取对应平台的发布器 PublisherClass = get_publisher(platform) publisher = PublisherClass(headless=headless) # 使用请求参数中的 headless 值 proxy_payload = data.get('proxy') if isinstance(proxy_payload, dict) and proxy_payload.get('enabled'): provider = str(proxy_payload.get('provider') or 'shenlong').strip().lower() if provider == 'shenlong': proxy_payload_with_task = dict(proxy_payload) if data.get('publish_task_id') is not None: proxy_payload_with_task['publish_task_id'] = data.get('publish_task_id') publisher.proxy_config = _resolve_shenlong_proxy(proxy_payload_with_task) try: publisher.user_id = int(data.get("user_id")) if data.get("user_id") is not None else None except Exception: publisher.user_id = None try: publisher.publish_task_id = int(data.get("publish_task_id")) if data.get("publish_task_id") is not None else None except Exception: publisher.publish_task_id = None try: publisher.publish_account_id = int(data.get("publish_account_id")) if data.get("publish_account_id") is not None else None except Exception: publisher.publish_account_id = None # 执行发布 result = asyncio.run(publisher.run(cookie_str, params)) response_data = { "success": result.success, "platform": result.platform, "video_id": result.video_id, "video_url": result.video_url, "message": result.message, "error": result.error, "need_captcha": result.need_captcha, "captcha_type": result.captcha_type, "status": result.status or ("success" if result.success else "failed"), "page_url": result.page_url } # 如果请求返回截图 if return_screenshot and result.screenshot_base64: response_data["screenshot_base64"] = result.screenshot_base64 return jsonify(response_data) except Exception as e: traceback.print_exc() return jsonify({"success": False, "error": str(e), "status": "error"}), 500 # ==================== 批量发布接口 ==================== @app.route("/publish/batch", methods=["POST"]) def publish_batch(): """ 批量发布接口 - 发布到多个平台 请求体: { "platforms": ["douyin", "xiaohongshu"], "cookies": { "douyin": "cookie字符串", "xiaohongshu": "cookie字符串" }, "title": "视频标题", "video_path": "视频文件绝对路径", ... } """ try: data = request.json platforms = data.get("platforms", []) cookies = data.get("cookies", {}) if not platforms: return jsonify({"success": False, "error": "缺少 platforms 参数"}), 400 results = [] for platform in platforms: platform = platform.lower() cookie_str = cookies.get(platform, "") if not cookie_str: results.append({ "platform": platform, "success": False, "error": f"缺少 {platform} 的 cookie" }) continue try: # 创建参数 params = PublishParams( title=data.get("title", ""), video_path=data.get("video_path", ""), description=data.get("description", ""), cover_path=data.get("cover_path"), tags=data.get("tags", []), publish_date=parse_datetime(data.get("post_time")), location=data.get("location", "重庆市") ) # 发布 PublisherClass = get_publisher(platform) publisher = PublisherClass(headless=HEADLESS_MODE) result = asyncio.run(publisher.run(cookie_str, params)) results.append({ "platform": result.platform, "success": result.success, "video_id": result.video_id, "message": result.message, "error": result.error }) except Exception as e: results.append({ "platform": platform, "success": False, "error": str(e) }) # 统计成功/失败数量 success_count = sum(1 for r in results if r.get("success")) return jsonify({ "success": success_count > 0, "total": len(platforms), "success_count": success_count, "fail_count": len(platforms) - success_count, "results": results }) except Exception as e: traceback.print_exc() return jsonify({"success": False, "error": str(e)}), 500 # ==================== Cookie 验证接口 ==================== @app.route("/check_cookie", methods=["POST"]) def check_cookie(): """检查 cookie 是否有效""" try: data = request.json platform = data.get("platform", "").lower() cookie_str = data.get("cookie", "") if not cookie_str: return jsonify({"valid": False, "error": "缺少 cookie 参数"}), 400 # 目前只支持小红书的 cookie 验证 if platform == "xiaohongshu": try: from platforms.xiaohongshu import XiaohongshuPublisher, XHS_SDK_AVAILABLE if XHS_SDK_AVAILABLE: from xhs import XhsClient publisher = XiaohongshuPublisher() xhs_client = XhsClient(cookie_str, sign=publisher.sign_sync) info = xhs_client.get_self_info() if info: return jsonify({ "valid": True, "user_info": { "user_id": info.get("user_id"), "nickname": info.get("nickname"), "avatar": info.get("images") } }) except Exception as e: return jsonify({"valid": False, "error": str(e)}) # 其他平台返回格式正确但未验证 return jsonify({ "valid": True, "message": "Cookie 格式正确,但未进行在线验证" }) except Exception as e: traceback.print_exc() return jsonify({"valid": False, "error": str(e)}) # ==================== 获取作品列表接口 ==================== @app.route("/works", methods=["POST"]) def get_works(): """ 获取作品列表 请求体: { "platform": "douyin", # douyin | xiaohongshu | kuaishou "cookie": "cookie字符串或JSON", "page": 0, # 页码(从0开始,可选,默认0) "page_size": 20 # 每页数量(可选,默认20) } 响应: { "success": true, "platform": "douyin", "works": [...], "total": 100, "has_more": true } """ try: data = request.json platform = data.get("platform", "").lower() cookie_str = data.get("cookie", "") page = data.get("page", 0) page_size = data.get("page_size", 20) auto_paging = bool(data.get("auto_paging", False)) print(f"[Works] 收到请求: platform={platform}, page={page}, page_size={page_size}, auto_paging={auto_paging}", flush=True) if not platform: return jsonify({"success": False, "error": "缺少 platform 参数"}), 400 if platform not in PLATFORM_MAP: return jsonify({ "success": False, "error": f"不支持的平台: {platform},支持: {list(PLATFORM_MAP.keys())}" }), 400 if not cookie_str: return jsonify({"success": False, "error": "缺少 cookie 参数"}), 400 # 获取对应平台的发布器 PublisherClass = get_publisher(platform) publisher = PublisherClass(headless=HEADLESS_MODE) # 执行获取作品 if platform == "xiaohongshu" and auto_paging and hasattr(publisher, "get_all_works"): result = asyncio.run(publisher.get_all_works(cookie_str)) else: result = asyncio.run(publisher.run_get_works(cookie_str, page, page_size)) return jsonify(result.to_dict()) except Exception as e: traceback.print_exc() return jsonify({"success": False, "error": str(e)}), 500 # ==================== 保存作品日统计数据接口 ==================== @app.route("/work_day_statistics", methods=["POST"]) def save_work_day_statistics(): """ 保存作品每日统计数据 当天的数据走更新流,日期变化走新增流 请求体: { "statistics": [ { "work_id": 1, "fans_count": 1000, "play_count": 5000, "like_count": 200, "comment_count": 50, "share_count": 30, "collect_count": 100 }, ... ] } 响应: { "success": true, "inserted": 5, "updated": 3, "message": "保存成功" } """ print("=" * 60, flush=True) print("[DEBUG] ===== 进入 save_work_day_statistics 方法 =====", flush=True) print(f"[DEBUG] 请求方法: {request.method}", flush=True) print(f"[DEBUG] 请求数据: {request.json}", flush=True) print("=" * 60, flush=True) try: data = request.json statistics_list = data.get("statistics", []) if not statistics_list: return jsonify({"success": False, "error": "缺少 statistics 参数"}), 400 print(f"[WorkDayStatistics] 收到请求: {len(statistics_list)} 条统计数据") # 调用 Node.js API 保存数据 result = call_nodejs_api('POST', '/work-day-statistics', { 'statistics': statistics_list }) print(f"[WorkDayStatistics] 完成: 新增 {result.get('inserted', 0)} 条, 更新 {result.get('updated', 0)} 条") return jsonify(result) except Exception as e: traceback.print_exc() return jsonify({"success": False, "error": str(e)}), 500 @app.route("/work_day_statistics/trend", methods=["GET"]) def get_statistics_trend(): """ 获取数据趋势(用于 Dashboard 数据看板 和 数据分析页面) 查询参数: user_id: 用户ID (必填) days: 天数 (可选,默认7天,最大30天) - 与 start_date/end_date 二选一 start_date: 开始日期 (可选,格式 YYYY-MM-DD) end_date: 结束日期 (可选,格式 YYYY-MM-DD) account_id: 账号ID (可选,不填则查询所有账号) 响应: { "success": true, "data": { "dates": ["01-16", "01-17", "01-18", ...], "fans": [100, 120, 130, ...], "views": [1000, 1200, 1500, ...], "likes": [50, 60, 70, ...], "comments": [10, 12, 15, ...], "shares": [5, 6, 8, ...], "collects": [20, 25, 30, ...] } } """ try: user_id = request.args.get("user_id") days = request.args.get("days") start_date = request.args.get("start_date") end_date = request.args.get("end_date") account_id = request.args.get("account_id") if not user_id: return jsonify({"success": False, "error": "缺少 user_id 参数"}), 400 # 调用 Node.js API 获取数据 params = {"user_id": user_id} if days: params["days"] = days if start_date: params["start_date"] = start_date if end_date: params["end_date"] = end_date if account_id: params["account_id"] = account_id result = call_nodejs_api('GET', '/work-day-statistics/trend', params=params) return jsonify(result) except Exception as e: traceback.print_exc() return jsonify({"success": False, "error": str(e)}), 500 @app.route("/work_day_statistics/platforms", methods=["GET"]) def get_statistics_by_platform(): """ 按平台分组获取统计数据(用于数据分析页面的平台对比) 数据来源: - 粉丝数:从 platform_accounts 表获取(账号级别数据) - 播放量/点赞/评论/收藏:从 work_day_statistics 表按平台汇总 - 粉丝增量:通过比较区间内最早和最新的粉丝数计算 查询参数: user_id: 用户ID (必填) days: 天数 (可选,默认30天,最大30天) - 与 start_date/end_date 二选一 start_date: 开始日期 (可选,格式 YYYY-MM-DD) end_date: 结束日期 (可选,格式 YYYY-MM-DD) 响应: { "success": true, "data": [ { "platform": "douyin", "fansCount": 1000, "fansIncrease": 50, "viewsCount": 5000, "likesCount": 200, "commentsCount": 30, "collectsCount": 100 }, ... ] } """ try: user_id = request.args.get("user_id") days = request.args.get("days") start_date = request.args.get("start_date") end_date = request.args.get("end_date") if not user_id: return jsonify({"success": False, "error": "缺少 user_id 参数"}), 400 # 调用 Node.js API 获取数据 params = {"user_id": user_id} if days: params["days"] = days if start_date: params["start_date"] = start_date if end_date: params["end_date"] = end_date result = call_nodejs_api('GET', '/work-day-statistics/platforms', params=params) print(f"[PlatformStats] 返回 {len(result.get('data', []))} 个平台的数据") return jsonify(result) except Exception as e: traceback.print_exc() return jsonify({"success": False, "error": str(e)}), 500 @app.route("/work_day_statistics/batch", methods=["POST"]) def get_work_statistics_history(): """ 批量获取作品的历史统计数据 请求体: { "work_ids": [1, 2, 3], "start_date": "2025-01-01", # 可选 "end_date": "2025-01-21" # 可选 } 响应: { "success": true, "data": { "1": [ {"record_date": "2025-01-20", "play_count": 100, ...}, {"record_date": "2025-01-21", "play_count": 150, ...} ], ... } } """ try: data = request.json work_ids = data.get("work_ids", []) start_date = data.get("start_date") end_date = data.get("end_date") if not work_ids: return jsonify({"success": False, "error": "缺少 work_ids 参数"}), 400 # 调用 Node.js API 获取数据 request_data = {"work_ids": work_ids} if start_date: request_data["start_date"] = start_date if end_date: request_data["end_date"] = end_date result = call_nodejs_api('POST', '/work-day-statistics/batch', data=request_data) return jsonify(result) except Exception as e: traceback.print_exc() return jsonify({"success": False, "error": str(e)}), 500 @app.route("/work_day_statistics/overview", methods=["GET"]) def get_overview(): """ 获取数据总览(账号列表和汇总统计) 查询参数: user_id: 用户ID (必填) 响应: { "success": true, "data": { "accounts": [ { "id": 1, "nickname": "账号名称", "username": "账号ID", "avatarUrl": "头像URL", "platform": "douyin", "groupId": 1, "fansCount": 1000, "totalIncome": null, "yesterdayIncome": null, "totalViews": 5000, "yesterdayViews": 100, "yesterdayComments": 10, "yesterdayLikes": 50, "yesterdayFansIncrease": 5, "updateTime": "2025-01-26T10:00:00Z", "status": "active" }, ... ], "summary": { "totalAccounts": 5, "totalIncome": 0, "yesterdayIncome": 0, "totalViews": 10000, "yesterdayViews": 200, "totalFans": 5000, "yesterdayComments": 20, "yesterdayLikes": 100, "yesterdayFansIncrease": 10 } } } """ try: user_id = request.args.get("user_id") if not user_id: return jsonify({"success": False, "error": "缺少 user_id 参数"}), 400 # 调用 Node.js API 获取数据 params = {"user_id": user_id} result = call_nodejs_api('GET', '/work-day-statistics/overview', params=params) return jsonify(result) except NodeApiError as e: # 透传 Node 的真实状态码/错误内容,避免所有错误都变成 500 return jsonify(e.payload), e.status_code except Exception as e: traceback.print_exc() return jsonify({"success": False, "error": str(e)}), 500 # ==================== 获取评论列表接口 ==================== @app.route("/comments", methods=["POST"]) def get_comments(): """ 获取作品评论 请求体: { "platform": "douyin", # douyin | xiaohongshu | kuaishou "cookie": "cookie字符串或JSON", "work_id": "作品ID", "cursor": "" # 分页游标(可选) } 响应: { "success": true, "platform": "douyin", "work_id": "xxx", "comments": [...], "total": 50, "has_more": true, "cursor": "xxx" } """ try: data = request.json platform = data.get("platform", "").lower() cookie_str = data.get("cookie", "") work_id = data.get("work_id", "") cursor = data.get("cursor", "") print(f"[Comments] 收到请求: platform={platform}, work_id={work_id}") if not platform: return jsonify({"success": False, "error": "缺少 platform 参数"}), 400 if platform not in PLATFORM_MAP: return jsonify({ "success": False, "error": f"不支持的平台: {platform},支持: {list(PLATFORM_MAP.keys())}" }), 400 if not cookie_str: return jsonify({"success": False, "error": "缺少 cookie 参数"}), 400 if not work_id: return jsonify({"success": False, "error": "缺少 work_id 参数"}), 400 # 获取对应平台的发布器 PublisherClass = get_publisher(platform) publisher = PublisherClass(headless=HEADLESS_MODE) # 执行获取评论 result = asyncio.run(publisher.run_get_comments(cookie_str, work_id, cursor)) result_dict = result.to_dict() # 添加 cursor 到响应 if hasattr(result, '__dict__') and 'cursor' in result.__dict__: result_dict['cursor'] = result.__dict__['cursor'] return jsonify(result_dict) except Exception as e: traceback.print_exc() return jsonify({"success": False, "error": str(e)}), 500 # ==================== 获取所有作品评论接口 ==================== @app.route("/all_comments", methods=["POST"]) def get_all_comments(): """ 获取所有作品的评论(一次性获取) 请求体: { "platform": "douyin", # douyin | xiaohongshu "cookie": "cookie字符串或JSON" } 响应: { "success": true, "platform": "douyin", "work_comments": [ { "work_id": "xxx", "title": "作品标题", "cover_url": "封面URL", "comments": [...] } ], "total": 5 } """ try: data = request.json platform = data.get("platform", "").lower() cookie_str = data.get("cookie", "") print(f"[AllComments] 收到请求: platform={platform}") if not platform: return jsonify({"success": False, "error": "缺少 platform 参数"}), 400 if platform not in ['douyin', 'xiaohongshu']: return jsonify({ "success": False, "error": f"该接口只支持 douyin 和 xiaohongshu 平台" }), 400 if not cookie_str: return jsonify({"success": False, "error": "缺少 cookie 参数"}), 400 # 获取对应平台的发布器 PublisherClass = get_publisher(platform) publisher = PublisherClass(headless=HEADLESS_MODE) # 执行获取所有评论 result = asyncio.run(publisher.get_all_comments(cookie_str)) return jsonify(result) except Exception as e: traceback.print_exc() return jsonify({"success": False, "error": str(e)}), 500 # ==================== 登录状态检查接口 ==================== @app.route("/check_login", methods=["POST"]) def check_login(): """ 检查 Cookie 登录状态(通过浏览器访问后台页面检测) 请求体: { "platform": "douyin", # douyin | xiaohongshu | kuaishou | weixin "cookie": "cookie字符串或JSON" } 响应: { "success": true, "valid": true, # Cookie 是否有效 "need_login": false, # 是否需要重新登录 "message": "登录状态有效" } """ try: data = request.json platform = data.get("platform", "").lower() cookie_str = data.get("cookie", "") print(f"[CheckLogin] 收到请求: platform={platform}") if not platform: return jsonify({"success": False, "error": "缺少 platform 参数"}), 400 if platform not in PLATFORM_MAP: return jsonify({ "success": False, "error": f"不支持的平台: {platform},支持: {list(PLATFORM_MAP.keys())}" }), 400 if not cookie_str: return jsonify({"success": False, "error": "缺少 cookie 参数"}), 400 # 获取对应平台的发布器 PublisherClass = get_publisher(platform) publisher = PublisherClass(headless=HEADLESS_MODE) # 执行登录检查 result = asyncio.run(publisher.check_login_status(cookie_str)) return jsonify(result) except Exception as e: traceback.print_exc() return jsonify({ "success": False, "valid": False, "need_login": True, "error": str(e) }), 500 # ==================== 获取账号信息接口 ==================== @app.route("/account_info", methods=["POST"]) def get_account_info(): """ 获取账号信息 请求体: { "platform": "baijiahao", # 平台 "cookie": "cookie字符串或JSON" } 响应: { "success": true, "account_id": "xxx", "account_name": "用户名", "avatar_url": "头像URL", "fans_count": 0, "works_count": 0 } """ try: data = request.json platform = data.get("platform", "").lower() cookie_str = data.get("cookie", "") print(f"[AccountInfo] 收到请求: platform={platform}") if not platform: return jsonify({"success": False, "error": "缺少 platform 参数"}), 400 if platform not in PLATFORM_MAP: return jsonify({ "success": False, "error": f"不支持的平台: {platform},支持: {list(PLATFORM_MAP.keys())}" }), 400 if not cookie_str: return jsonify({"success": False, "error": "缺少 cookie 参数"}), 400 # 获取对应平台的发布器 PublisherClass = get_publisher(platform) publisher = PublisherClass(headless=HEADLESS_MODE) # 检查是否有 get_account_info 方法 if hasattr(publisher, 'get_account_info'): result = asyncio.run(publisher.get_account_info(cookie_str)) return jsonify(result) else: return jsonify({ "success": False, "error": f"平台 {platform} 不支持获取账号信息" }), 400 except Exception as e: traceback.print_exc() return jsonify({"success": False, "error": str(e)}), 500 # ==================== 健康检查 ==================== @app.route("/health", methods=["GET"]) def health_check(): """健康检查""" # 检查 xhs SDK 是否可用 xhs_available = False try: from platforms.xiaohongshu import XHS_SDK_AVAILABLE xhs_available = XHS_SDK_AVAILABLE except: pass return jsonify({ "status": "ok", "xhs_sdk": xhs_available, "supported_platforms": list(PLATFORM_MAP.keys()), "headless_mode": HEADLESS_MODE }) @app.route("/", methods=["GET"]) def index(): """首页""" return jsonify({ "name": "智媒通视频发布服务", "version": "1.2.0", "endpoints": { "GET /": "服务信息", "GET /health": "健康检查", "POST /publish": "发布视频", "POST /publish/batch": "批量发布", "POST /works": "获取作品列表", "POST /comments": "获取作品评论", "POST /all_comments": "获取所有作品评论", "POST /work_day_statistics": "保存作品每日统计数据", "POST /work_day_statistics/batch": "获取作品历史统计数据", "POST /check_cookie": "检查 Cookie", "POST /sign": "小红书签名" }, "supported_platforms": list(PLATFORM_MAP.keys()) }) # ==================== 命令行启动 ==================== def main(): parser = argparse.ArgumentParser(description='智媒通视频发布服务') parser.add_argument('--port', type=int, default=5005, help='服务端口 (默认: 5005)') # 从环境变量读取 HOST,默认仅本地访问 default_host = os.environ.get('PYTHON_HOST', os.environ.get('HOST', '127.0.0.1')) parser.add_argument('--host', type=str, default=default_host, help='监听地址 (默认: 127.0.0.1,可通过 HOST 环境变量配置)') parser.add_argument('--headless', type=str, default='true', help='是否无头模式 (默认: true)') parser.add_argument('--debug', action='store_true', help='调试模式') args = parser.parse_args() global HEADLESS_MODE HEADLESS_MODE = args.headless.lower() == 'true' # 检查 xhs SDK xhs_status = "未安装" try: from platforms.xiaohongshu import XHS_SDK_AVAILABLE xhs_status = "已安装" if XHS_SDK_AVAILABLE else "未安装" except: pass print("=" * 60) print("智媒通视频发布服务") print("=" * 60) print(f"XHS SDK: {xhs_status}") print(f"Headless 模式: {HEADLESS_MODE}") print(f"支持平台: {', '.join(PLATFORM_MAP.keys())}") print("=" * 60) print(f"启动服务: http://{args.host}:{args.port}") print("=" * 60) app.run(host=args.host, port=args.port, debug=bool(args.debug), threaded=True, use_reloader=False) @app.route('/auto-reply', methods=['POST']) def auto_reply(): """ 微信视频号自动回复私信 POST /auto-reply Body: { "platform": "weixin", "cookie": "..." } """ try: data = request.json platform = data.get('platform', '').lower() cookie = data.get('cookie', '') if platform != 'weixin': return jsonify({ 'success': False, 'error': '只支持微信视频号平台' }), 400 if not cookie: return jsonify({ 'success': False, 'error': '缺少 Cookie' }), 400 print(f"[API] 接收自动回复请求: platform={platform}") # 创建 Publisher 实例 publisher = WeixinPublisher(headless=HEADLESS_MODE) # 执行自动回复 loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) result = loop.run_until_complete(publisher.auto_reply_private_messages(cookie)) loop.close() print(f"[API] 自动回复结果: {result}") return jsonify(result) except Exception as e: print(f"[API] 自动回复异常: {e}") traceback.print_exc() return jsonify({ 'success': False, 'error': str(e) }), 500 if __name__ == '__main__': main() ================================================================================ 文件: server\python\platforms\weixin.py ================================================================================ # -*- coding: utf-8 -*- """ 微信视频号发布器 参考: matrix/tencent_uploader/main.py """ import asyncio import json import os from datetime import datetime from typing import List from .base import ( BasePublisher, PublishParams, PublishResult, WorkItem, WorksResult, CommentItem, CommentsResult ) import os import time # 允许通过环境变量手动指定“上传视频入口”的选择器,便于在页面结构频繁变更时快速调整 WEIXIN_UPLOAD_SELECTOR = os.environ.get("WEIXIN_UPLOAD_SELECTOR", "").strip() def format_short_title(origin_title: str) -> str: """ 格式化短标题 - 移除特殊字符 - 长度限制在 6-16 字符 """ allowed_special_chars = "《》"":+?%°" filtered_chars = [ char if char.isalnum() or char in allowed_special_chars else ' ' if char == ',' else '' for char in origin_title ] formatted_string = ''.join(filtered_chars) if len(formatted_string) > 16: formatted_string = formatted_string[:16] elif len(formatted_string) < 6: formatted_string += ' ' * (6 - len(formatted_string)) return formatted_string class WeixinPublisher(BasePublisher): """ 微信视频号发布器 使用 Playwright 自动化操作视频号创作者中心 注意: 需要使用 Chrome 浏览器,否则可能出现 H264 编码错误 """ platform_name = "weixin" login_url = "https://channels.weixin.qq.com/platform" publish_url = "https://channels.weixin.qq.com/platform/post/create" cookie_domain = ".weixin.qq.com" def _parse_count(self, count_str: str) -> int: """解析数字(支持带'万'的格式)""" try: count_str = count_str.strip() if '万' in count_str: return int(float(count_str.replace('万', '')) * 10000) return int(count_str) except: return 0 async def ai_find_upload_selector(self, frame_html: str, frame_name: str = "main") -> str: """ 使用 AI 从 HTML 中识别“上传视频/选择文件”相关元素的 CSS 选择器。 设计思路: - 仅在常规 DOM 选择器都失败时调用,避免频繁占用 AI 配额; - 通过 DashScope 文本模型(与验证码识别同一套配置)分析 HTML; - 返回一个适合用于 frame.locator(selector) 的 CSS 选择器。 """ import json import re import requests import os # 避免 HTML 过长导致 token 超限,只截取前 N 字符 if not frame_html: return "" max_len = 20000 if len(frame_html) > max_len: frame_html = frame_html[:max_len] ai_api_key = os.environ.get("DASHSCOPE_API_KEY", "") ai_base_url = os.environ.get("DASHSCOPE_BASE_URL", "https://dashscope.aliyuncs.com/compatible-mode/v1") ai_text_model = os.environ.get("AI_TEXT_MODEL", "qwen-plus") if not ai_api_key: print(f"[{self.platform_name}] AI上传入口识别: 未配置 AI API Key,跳过") return "" prompt = f""" 你是熟悉微信视频号后台的前端工程师,现在需要在一段 HTML 中找到“上传视频文件”的入口。 页面说明: - 平台:微信视频号(channels.weixin.qq.com) - 目标:用于上传视频文件的按钮或 input(一般会触发文件选择框) - 你会收到某个 frame 的完整 HTML 片段(不包含截图)。 请你根据下面的 HTML,推断最适合用于上传视频文件的元素,并输出一个可以被 Playwright 使用的 CSS 选择器。 要求: 1. 只考虑“上传/选择视频文件”的入口,不要返回“发布/发表/下一步”等按钮; 2. 选择器需要尽量稳定,不要使用自动生成的随机类名(例如带很多随机字母/数字的类名可以用前缀匹配); 3. 选择器必须是 CSS 选择器(不要返回 XPath); 4. 如果确实找不到合理的上传入口,返回 selector 为空字符串。 请以 JSON 格式输出,严格遵守以下结构(不要添加任何解释文字): ```json {{ "selector": "CSS 选择器字符串,比如:input[type='file'] 或 div.upload-content input[type='file']" }} ``` 下面是 frame=\"{frame_name}\" 的 HTML: ```html {frame_html} ```""" payload = { "model": ai_text_model, "messages": [ { "role": "user", "content": prompt, } ], "max_tokens": 600, } headers = { "Authorization": f"Bearer {ai_api_key}", "Content-Type": "application/json", } try: print(f"[{self.platform_name}] AI上传入口识别: 正在分析 frame={frame_name} HTML...") resp = requests.post( f"{ai_base_url}/chat/completions", headers=headers, json=payload, timeout=40, ) if resp.status_code != 200: print(f"[{self.platform_name}] AI上传入口识别: API 返回错误 {resp.status_code}") return "" data = resp.json() content = data.get("choices", [{}])[0].get("message", {}).get("content", "") or "" # 尝试从 ```json``` 代码块中解析 json_match = re.search(r"```json\\s*([\\s\\S]*?)\\s*```", content) if json_match: json_str = json_match.group(1) else: json_match = re.search(r"\\{[\\s\\S]*\\}", content) json_str = json_match.group(0) if json_match else "{}" try: result = json.loads(json_str) except Exception: result = {} selector = (result.get("selector") or "").strip() print(f"[{self.platform_name}] AI上传入口识别结果: selector='{selector}'") return selector except Exception as e: print(f"[{self.platform_name}] AI上传入口识别异常: {e}") return "" async def ai_pick_selector_from_candidates(self, candidates: list, goal: str, frame_name: str = "main") -> str: """ 将“候选元素列表(包含 css selector + 文本/属性)”发给 AI,让 AI 直接挑选最符合 goal 的元素。 适用于:HTML 里看不出上传入口、或页面大量动态渲染时。 """ import json import re import requests import os if not candidates: return "" ai_api_key = os.environ.get("DASHSCOPE_API_KEY", "") ai_base_url = os.environ.get("DASHSCOPE_BASE_URL", "https://dashscope.aliyuncs.com/compatible-mode/v1") ai_text_model = os.environ.get("AI_TEXT_MODEL", "qwen-plus") if not ai_api_key: print(f"[{self.platform_name}] AI候选选择器: 未配置 AI API Key,跳过") return "" # 控制长度,最多取前 120 个候选 candidates = candidates[:120] prompt = f""" 你是自动化发布工程师。现在要在微信视频号(channels.weixin.qq.com)发布页面里找到“{goal}”相关的入口元素。 我会给你一组候选元素,每个候选都包含: - css: 可直接用于 Playwright 的 CSS 选择器 - tag / type / role / ariaLabel / text / id / className(部分字段可能为空) 你的任务: - 从候选中选出最可能用于“{goal}”的元素,返回它的 css 选择器; - 如果没有任何候选符合,返回空字符串。 注意: - 如果 goal 是“上传视频入口”,优先选择 input[type=file] 或看起来会触发选择文件/上传的区域; - 不要选择“发布/发表/下一步”等按钮(除非 goal 明确是发布按钮)。 请严格按 JSON 输出(不要解释): ```json {{ "selector": "..." }} ``` 候选列表(frame={frame_name}): ```json {json.dumps(candidates, ensure_ascii=False)} ```""" payload = { "model": ai_text_model, "messages": [{"role": "user", "content": prompt}], "max_tokens": 400, } headers = { "Authorization": f"Bearer {ai_api_key}", "Content-Type": "application/json", } try: print(f"[{self.platform_name}] AI候选选择器: 正在分析 frame={frame_name}, goal={goal} ...") resp = requests.post( f"{ai_base_url}/chat/completions", headers=headers, json=payload, timeout=40, ) if resp.status_code != 200: print(f"[{self.platform_name}] AI候选选择器: API 返回错误 {resp.status_code}") return "" data = resp.json() content = data.get("choices", [{}])[0].get("message", {}).get("content", "") or "" json_match = re.search(r"```json\\s*([\\s\\S]*?)\\s*```", content) if json_match: json_str = json_match.group(1) else: json_match = re.search(r"\\{[\\s\\S]*\\}", content) json_str = json_match.group(0) if json_match else "{}" try: result = json.loads(json_str) except Exception: result = {} selector = (result.get("selector") or "").strip() print(f"[{self.platform_name}] AI候选选择器结果: selector='{selector}'") return selector except Exception as e: print(f"[{self.platform_name}] AI候选选择器异常: {e}") return "" async def _extract_relevant_html_snippets(self, html: str) -> str: """ 从 HTML 中抽取与上传相关的片段,减少 token,提升 AI 命中率。 - 优先抓取包含 upload/上传/file/input 等关键词的窗口片段 - 若未命中关键词,返回“开头 + 结尾”的拼接 """ import re if not html: return "" patterns = [ r"upload", r"uploader", r"file", r"type\\s*=\\s*['\\\"]file['\\\"]", r"input", r"drag", r"drop", r"选择", r"上传", r"添加", r"视频", ] regex = re.compile("|".join(patterns), re.IGNORECASE) snippets = [] for m in regex.finditer(html): start = max(0, m.start() - 350) end = min(len(html), m.end() + 350) snippets.append(html[start:end]) if len(snippets) >= 18: break if snippets: # 去重(粗略) unique = [] seen = set() for s in snippets: key = hash(s) if key not in seen: seen.add(key) unique.append(s) return "\n\n\n\n".join(unique)[:20000] # fallback: head + tail head = html[:9000] tail = html[-9000:] if len(html) > 9000 else "" return (head + "\n\n\n\n" + tail)[:20000] async def init_browser(self, storage_state: str = None): """初始化浏览器 - 参考 matrix 使用 channel=chrome 避免 H264 编码错误""" from playwright.async_api import async_playwright playwright = await async_playwright().start() proxy = self.proxy_config if isinstance(getattr(self, 'proxy_config', None), dict) else None if proxy and proxy.get('server'): print(f"[{self.platform_name}] 使用代理: {proxy.get('server')}", flush=True) # 参考 matrix: 使用系统内的 Chrome 浏览器,避免 H264 编码错误 # 如果没有安装 Chrome,则使用默认 Chromium try: self.browser = await playwright.chromium.launch( headless=self.headless, channel="chrome", # 使用系统 Chrome proxy=proxy if proxy and proxy.get('server') else None ) print(f"[{self.platform_name}] 使用系统 Chrome 浏览器") except Exception as e: print(f"[{self.platform_name}] Chrome 不可用,使用 Chromium: {e}") self.browser = await playwright.chromium.launch( headless=self.headless, proxy=proxy if proxy and proxy.get('server') else None ) # 设置 HTTP Headers 防止重定向 headers = { "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36", "Referer": "https://channels.weixin.qq.com/platform/post/list", } self.context = await self.browser.new_context( extra_http_headers=headers, ignore_https_errors=True, viewport={"width": 1920, "height": 1080}, user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36" ) self.page = await self.context.new_page() return self.page async def set_schedule_time(self, publish_date: datetime): """设置定时发布""" if not self.page: return print(f"[{self.platform_name}] 设置定时发布...") # 点击定时选项 label_element = self.page.locator("label").filter(has_text="定时").nth(1) await label_element.click() # 选择日期 await self.page.click('input[placeholder="请选择发表时间"]') publish_month = f"{publish_date.month:02d}" current_month = f"{publish_month}月" # 检查月份 page_month = await self.page.inner_text('span.weui-desktop-picker__panel__label:has-text("月")') if page_month != current_month: await self.page.click('button.weui-desktop-btn__icon__right') # 选择日期 elements = await self.page.query_selector_all('table.weui-desktop-picker__table a') for element in elements: class_name = await element.evaluate('el => el.className') if 'weui-desktop-picker__disabled' in class_name: continue text = await element.inner_text() if text.strip() == str(publish_date.day): await element.click() break # 输入时间 await self.page.click('input[placeholder="请选择时间"]') await self.page.keyboard.press("Control+KeyA") await self.page.keyboard.type(str(publish_date.hour)) # 点击其他地方确认 await self.page.locator("div.input-editor").click() async def handle_upload_error(self, video_path: str): """处理上传错误""" if not self.page: return print(f"[{self.platform_name}] 视频出错了,重新上传中...") await self.page.locator('div.media-status-content div.tag-inner:has-text("删除")').click() await self.page.get_by_role('button', name="删除", exact=True).click() file_input = self.page.locator('input[type="file"]') await file_input.set_input_files(video_path) async def add_title_tags(self, params: PublishParams): """添加标题和话题""" if not self.page: return await self.page.locator("div.input-editor").click() await self.page.keyboard.type(params.title) if params.tags: await self.page.keyboard.press("Enter") for tag in params.tags: await self.page.keyboard.type("#" + tag) await self.page.keyboard.press("Space") print(f"[{self.platform_name}] 成功添加标题和 {len(params.tags)} 个话题") async def add_short_title(self): """添加短标题""" if not self.page: return try: short_title_element = self.page.get_by_text("短标题", exact=True).locator("..").locator( "xpath=following-sibling::div").locator('span input[type="text"]') if await short_title_element.count(): # 获取已有内容作为短标题 pass except: pass async def upload_cover(self, cover_path: str): """上传封面图""" if not self.page or not cover_path or not os.path.exists(cover_path): return try: await asyncio.sleep(2) preview_btn_info = await self.page.locator( 'div.finder-tag-wrap.btn:has-text("更换封面")').get_attribute('class') if "disabled" not in preview_btn_info: await self.page.locator('div.finder-tag-wrap.btn:has-text("更换封面")').click() await self.page.locator('div.single-cover-uploader-wrap > div.wrap').hover() # 删除现有封面 if await self.page.locator(".del-wrap > .svg-icon").count(): await self.page.locator(".del-wrap > .svg-icon").click() # 上传新封面 preview_div = self.page.locator("div.single-cover-uploader-wrap > div.wrap") async with self.page.expect_file_chooser() as fc_info: await preview_div.click() preview_chooser = await fc_info.value await preview_chooser.set_files(cover_path) await asyncio.sleep(2) await self.page.get_by_role("button", name="确定").click() await asyncio.sleep(1) await self.page.get_by_role("button", name="确认").click() print(f"[{self.platform_name}] 封面上传成功") except Exception as e: print(f"[{self.platform_name}] 封面上传失败: {e}") async def check_captcha(self) -> dict: """检查页面是否需要验证码""" if not self.page: return {'need_captcha': False, 'captcha_type': ''} try: # 检查各种验证码 captcha_selectors = [ 'text="请输入验证码"', 'text="滑动验证"', '[class*="captcha"]', '[class*="verify"]', ] for selector in captcha_selectors: try: if await self.page.locator(selector).count() > 0: print(f"[{self.platform_name}] 检测到验证码: {selector}") return {'need_captcha': True, 'captcha_type': 'image'} except: pass # 检查登录弹窗 login_selectors = [ 'text="请登录"', 'text="扫码登录"', '[class*="login-dialog"]', ] for selector in login_selectors: try: if await self.page.locator(selector).count() > 0: print(f"[{self.platform_name}] 检测到需要登录: {selector}") return {'need_captcha': True, 'captcha_type': 'login'} except: pass except Exception as e: print(f"[{self.platform_name}] 验证码检测异常: {e}") return {'need_captcha': False, 'captcha_type': ''} async def publish(self, cookies: str, params: PublishParams) -> PublishResult: """发布视频到视频号""" print(f"\n{'='*60}") print(f"[{self.platform_name}] 开始发布视频") print(f"[{self.platform_name}] 视频路径: {params.video_path}") print(f"[{self.platform_name}] 标题: {params.title}") print(f"[{self.platform_name}] Headless: {self.headless}") print(f"{'='*60}") self.report_progress(5, "正在初始化浏览器...") # 初始化浏览器(使用 Chrome) await self.init_browser() print(f"[{self.platform_name}] 浏览器初始化完成") # 解析并设置 cookies cookie_list = self.parse_cookies(cookies) print(cookie_list) print(f"[{self.platform_name}] 解析到 {len(cookie_list)} 个 cookies") await self.set_cookies(cookie_list) if not self.page: raise Exception("Page not initialized") # 检查视频文件 if not os.path.exists(params.video_path): raise Exception(f"视频文件不存在: {params.video_path}") print(f"[{self.platform_name}] 视频文件存在,大小: {os.path.getsize(params.video_path)} bytes") self.report_progress(10, "正在打开上传页面...") # 访问上传页面 await self.page.goto(self.publish_url, wait_until="networkidle", timeout=60000) await asyncio.sleep(3) # 检查是否跳转到登录页 current_url = self.page.url print(f"[{self.platform_name}] 当前页面: {current_url}") if "login" in current_url: screenshot_base64 = await self.capture_screenshot() return PublishResult( success=False, platform=self.platform_name, error="Cookie 已过期,需要重新登录", need_captcha=True, captcha_type='login', screenshot_base64=screenshot_base64, page_url=current_url, status='need_captcha' ) # 使用 AI 检查验证码 ai_captcha = await self.ai_check_captcha() if ai_captcha['has_captcha']: print(f"[{self.platform_name}] AI检测到验证码: {ai_captcha['captcha_type']}", flush=True) screenshot_base64 = await self.capture_screenshot() return PublishResult( success=False, platform=self.platform_name, error=f"检测到{ai_captcha['captcha_type']}验证码,需要使用有头浏览器完成验证", need_captcha=True, captcha_type=ai_captcha['captcha_type'], screenshot_base64=screenshot_base64, page_url=current_url, status='need_captcha' ) # 传统方式检查验证码 captcha_result = await self.check_captcha() if captcha_result['need_captcha']: screenshot_base64 = await self.capture_screenshot() return PublishResult( success=False, platform=self.platform_name, error=f"需要{captcha_result['captcha_type']}验证码,请使用有头浏览器完成验证", need_captcha=True, captcha_type=captcha_result['captcha_type'], screenshot_base64=screenshot_base64, page_url=current_url, status='need_captcha' ) self.report_progress(15, "正在选择视频文件...") # 上传视频 # 说明:视频号发布页在不同账号/地区/灰度下 DOM 结构差异较大,且上传组件可能在 iframe 中。 # 因此这里按 matrix 的思路“点击触发 file chooser”,同时增加“遍历全部 frame + 精确挑选 video input”的兜底。 upload_success = False if not self.page: raise Exception("Page not initialized") # 等待页面把上传区域渲染出来(避免过早判断) try: await self.page.wait_for_selector("div.upload-content, input[type='file'], iframe", timeout=20000) except Exception: pass async def _try_set_files_in_frame(frame, frame_name: str) -> bool: """在指定 frame 中尝试触发上传""" nonlocal upload_success if upload_success: return True # 方法0:如果用户通过环境变量显式配置了选择器,优先尝试这个 if WEIXIN_UPLOAD_SELECTOR: try: el = frame.locator(WEIXIN_UPLOAD_SELECTOR).first if await el.count() > 0 and await el.is_visible(): print(f"[{self.platform_name}] [{frame_name}] 使用环境变量 WEIXIN_UPLOAD_SELECTOR: {WEIXIN_UPLOAD_SELECTOR}") try: async with self.page.expect_file_chooser(timeout=5000) as fc_info: await el.click() chooser = await fc_info.value await chooser.set_files(params.video_path) upload_success = True print(f"[{self.platform_name}] [{frame_name}] 通过环境变量选择器上传成功") return True except Exception as e: print(f"[{self.platform_name}] [{frame_name}] 环境变量选择器点击失败,尝试直接 set_input_files: {e}") try: await el.set_input_files(params.video_path) upload_success = True print(f"[{self.platform_name}] [{frame_name}] 环境变量选择器 set_input_files 成功") return True except Exception as e2: print(f"[{self.platform_name}] [{frame_name}] 环境变量选择器 set_input_files 仍失败: {e2}") except Exception as e: print(f"[{self.platform_name}] [{frame_name}] 使用环境变量选择器定位元素失败: {e}") # 先尝试点击上传区域触发 chooser(最贴近 matrix) click_selectors = [ "div.upload-content", "div[class*='upload-content']", "div[class*='upload']", "div.add-wrap", "[class*='uploader']", "text=点击上传", "text=上传视频", "text=选择视频", ] for selector in click_selectors: try: el = frame.locator(selector).first if await el.count() > 0 and await el.is_visible(): print(f"[{self.platform_name}] [{frame_name}] 找到可点击上传区域: {selector}") try: async with self.page.expect_file_chooser(timeout=5000) as fc_info: await el.click() chooser = await fc_info.value await chooser.set_files(params.video_path) upload_success = True print(f"[{self.platform_name}] [{frame_name}] 通过 file chooser 上传成功") return True except Exception as e: print(f"[{self.platform_name}] [{frame_name}] 点击触发 chooser 失败: {e}") except Exception: pass # 再尝试直接设置 input[type=file](iframe/隐藏 input 常见) try: inputs = frame.locator("input[type='file']") cnt = await inputs.count() if cnt > 0: best_idx = 0 best_score = -1 for i in range(cnt): try: inp = inputs.nth(i) accept = (await inp.get_attribute("accept")) or "" multiple = (await inp.get_attribute("multiple")) or "" score = 0 if "video" in accept: score += 10 if "mp4" in accept: score += 3 if multiple: score += 1 if score > best_score: best_score = score best_idx = i except Exception: continue target = inputs.nth(best_idx) print(f"[{self.platform_name}] [{frame_name}] 尝试对 input[{best_idx}] set_input_files (score={best_score})") await target.set_input_files(params.video_path) upload_success = True print(f"[{self.platform_name}] [{frame_name}] 通过 file input 上传成功") return True except Exception as e: print(f"[{self.platform_name}] [{frame_name}] file input 上传失败: {e}") # 不直接返回,让后面的 AI 兜底有机会执行 # 方法4: 兜底使用 AI 分析 HTML,猜测上传入口 try: frame_url = getattr(frame, "url", "") html_full = await frame.content() html_for_ai = await self._extract_relevant_html_snippets(html_full) print(f"[{self.platform_name}] [{frame_name}] frame_url={frame_url}, html_len={len(html_full)}, html_for_ai_len={len(html_for_ai)}") ai_selector = await self.ai_find_upload_selector(html_for_ai, frame_name=frame_name) if ai_selector: try: el = frame.locator(ai_selector).first if await el.count() > 0: print(f"[{self.platform_name}] [{frame_name}] 使用 AI 选择器点击上传入口: {ai_selector}") try: async with self.page.expect_file_chooser(timeout=5000) as fc_info: await el.click() chooser = await fc_info.value await chooser.set_files(params.video_path) upload_success = True print(f"[{self.platform_name}] [{frame_name}] 通过 AI 选择器上传成功") return True except Exception as e: print(f"[{self.platform_name}] [{frame_name}] AI 选择器点击失败,改为直接 set_input_files: {e}") try: await el.set_input_files(params.video_path) upload_success = True print(f"[{self.platform_name}] [{frame_name}] AI 选择器直接 set_input_files 成功") return True except Exception as e2: print(f"[{self.platform_name}] [{frame_name}] AI 选择器 set_input_files 仍失败: {e2}") except Exception as e: print(f"[{self.platform_name}] [{frame_name}] 使用 AI 选择器定位元素失败: {e}") else: # 如果 AI 无法从 HTML 推断,退一步:构造候选元素列表交给 AI 选择 try: candidates = await frame.evaluate(""" () => { function cssEscape(s) { try { return CSS.escape(s); } catch (e) { return s.replace(/[^a-zA-Z0-9_-]/g, '\\\\$&'); } } function buildSelector(el) { if (!el || el.nodeType !== 1) return ''; if (el.id) return `#${cssEscape(el.id)}`; let parts = []; let cur = el; for (let depth = 0; cur && cur.nodeType === 1 && depth < 5; depth++) { let part = cur.tagName.toLowerCase(); const role = cur.getAttribute('role'); const type = cur.getAttribute('type'); if (type) part += `[type="${type}"]`; if (role) part += `[role="${role}"]`; const cls = (cur.className || '').toString().trim().split(/\\s+/).filter(Boolean); if (cls.length) part += '.' + cls.slice(0, 2).map(cssEscape).join('.'); // nth-of-type let idx = 1; let sib = cur; while (sib && (sib = sib.previousElementSibling)) { if (sib.tagName === cur.tagName) idx++; } part += `:nth-of-type(${idx})`; parts.unshift(part); cur = cur.parentElement; } return parts.join(' > '); } const nodes = Array.from(document.querySelectorAll('input, button, a, div, span')) .filter(el => { const tag = el.tagName.toLowerCase(); const type = (el.getAttribute('type') || '').toLowerCase(); const role = (el.getAttribute('role') || '').toLowerCase(); const aria = (el.getAttribute('aria-label') || '').toLowerCase(); const txt = (el.innerText || '').trim().slice(0, 60); const cls = (el.className || '').toString().toLowerCase(); const isFile = tag === 'input' && type === 'file'; const looksClickable = tag === 'button' || tag === 'a' || role === 'button' || el.onclick || cls.includes('upload') || cls.includes('uploader') || cls.includes('drag') || aria.includes('上传') || aria.includes('选择') || aria.includes('添加') || txt.includes('上传') || txt.includes('选择') || txt.includes('添加') || txt.includes('点击上传'); if (!isFile && !looksClickable) return false; const r = el.getBoundingClientRect(); const visible = r.width > 5 && r.height > 5; return visible; }); const limited = nodes.slice(0, 120).map(el => ({ css: buildSelector(el), tag: el.tagName.toLowerCase(), type: el.getAttribute('type') || '', role: el.getAttribute('role') || '', ariaLabel: el.getAttribute('aria-label') || '', text: (el.innerText || '').trim().slice(0, 80), id: el.id || '', className: (el.className || '').toString().slice(0, 120), accept: el.getAttribute('accept') || '', })); return limited; } """) ai_selector2 = await self.ai_pick_selector_from_candidates( candidates=candidates, goal="上传视频入口", frame_name=frame_name ) if ai_selector2: el2 = frame.locator(ai_selector2).first if await el2.count() > 0: print(f"[{self.platform_name}] [{frame_name}] 使用 AI 候选选择器点击上传入口: {ai_selector2}") try: async with self.page.expect_file_chooser(timeout=5000) as fc_info: await el2.click() chooser2 = await fc_info.value await chooser2.set_files(params.video_path) upload_success = True print(f"[{self.platform_name}] [{frame_name}] 通过 AI 候选选择器上传成功") return True except Exception as e: print(f"[{self.platform_name}] [{frame_name}] AI 候选选择器点击失败,尝试 set_input_files: {e}") try: await el2.set_input_files(params.video_path) upload_success = True print(f"[{self.platform_name}] [{frame_name}] AI 候选选择器 set_input_files 成功") return True except Exception as e2: print(f"[{self.platform_name}] [{frame_name}] AI 候选选择器 set_input_files 仍失败: {e2}") except Exception as e: print(f"[{self.platform_name}] [{frame_name}] 构造候选并交给 AI 失败: {e}") except Exception as e: print(f"[{self.platform_name}] [{frame_name}] AI 上传入口识别整体失败: {e}") return False # 先尝试主 frame try: await _try_set_files_in_frame(self.page.main_frame, "main") except Exception as e: print(f"[{self.platform_name}] main frame 上传尝试异常: {e}") # 再遍历所有子 frame if not upload_success: try: frames = self.page.frames print(f"[{self.platform_name}] 发现 frames: {len(frames)}") for idx, fr in enumerate(frames): if upload_success: break # main_frame 已尝试过 if fr == self.page.main_frame: continue name = fr.name or f"frame-{idx}" await _try_set_files_in_frame(fr, name) except Exception as e: print(f"[{self.platform_name}] 遍历 frames 异常: {e}") if not upload_success: screenshot_base64 = await self.capture_screenshot() return PublishResult( success=False, platform=self.platform_name, error="未找到上传入口(可能在 iframe 中或页面结构已变更)", screenshot_base64=screenshot_base64, page_url=await self.get_page_url(), status='failed' ) self.report_progress(20, "正在填充标题和话题...") # 添加标题和话题 await self.add_title_tags(params) self.report_progress(30, "等待视频上传完成...") # 等待上传完成 for _ in range(120): try: button_info = await self.page.get_by_role("button", name="发表").get_attribute('class') if "weui-desktop-btn_disabled" not in button_info: print(f"[{self.platform_name}] 视频上传完毕") # 上传封面 self.report_progress(50, "正在上传封面...") await self.upload_cover(params.cover_path) break else: # 检查上传错误 if await self.page.locator('div.status-msg.error').count(): if await self.page.locator('div.media-status-content div.tag-inner:has-text("删除")').count(): await self.handle_upload_error(params.video_path) await asyncio.sleep(3) except: await asyncio.sleep(3) self.report_progress(60, "处理视频设置...") # 添加短标题 try: short_title_el = self.page.get_by_text("短标题", exact=True).locator("..").locator( "xpath=following-sibling::div").locator('span input[type="text"]') if await short_title_el.count(): short_title = format_short_title(params.title) await short_title_el.fill(short_title) except: pass # 定时发布 if params.publish_date: self.report_progress(70, "设置定时发布...") await self.set_schedule_time(params.publish_date) self.report_progress(80, "正在发布...") # 点击发布 - 参考 matrix for i in range(30): try: # 参考 matrix: div.form-btns button:has-text("发表") publish_btn = self.page.locator('div.form-btns button:has-text("发表")') if await publish_btn.count(): print(f"[{self.platform_name}] 点击发布按钮...") await publish_btn.click() # 等待跳转到作品列表页面 - 参考 matrix await self.page.wait_for_url( "https://channels.weixin.qq.com/platform/post/list", timeout=10000 ) self.report_progress(100, "发布成功") print(f"[{self.platform_name}] 视频发布成功!") screenshot_base64 = await self.capture_screenshot() return PublishResult( success=True, platform=self.platform_name, message="发布成功", screenshot_base64=screenshot_base64, page_url=self.page.url, status='success' ) except Exception as e: current_url = self.page.url if "https://channels.weixin.qq.com/platform/post/list" in current_url: self.report_progress(100, "发布成功") print(f"[{self.platform_name}] 视频发布成功!") screenshot_base64 = await self.capture_screenshot() return PublishResult( success=True, platform=self.platform_name, message="发布成功", screenshot_base64=screenshot_base64, page_url=current_url, status='success' ) else: print(f"[{self.platform_name}] 视频正在发布中... {i+1}/30, URL: {current_url}") await asyncio.sleep(1) # 发布超时 screenshot_base64 = await self.capture_screenshot() page_url = await self.get_page_url() return PublishResult( success=False, platform=self.platform_name, error="发布超时,请检查发布状态", screenshot_base64=screenshot_base64, page_url=page_url, status='need_action' ) async def _get_works_fallback_dom(self, page_size: int) -> tuple: """API 失败时从当前页面 DOM 抓取作品列表(兼容新账号/不同入口)""" works: List[WorkItem] = [] total = 0 has_more = False try: for selector in ["div.post-feed-item", "[class*='post-feed']", "[class*='feed-item']", "div[class*='post']"]: try: await self.page.wait_for_selector(selector, timeout=8000) break except Exception: continue post_items = self.page.locator("div.post-feed-item") item_count = await post_items.count() if item_count == 0: post_items = self.page.locator("[class*='post-feed']") item_count = await post_items.count() for i in range(min(item_count, page_size)): try: item = post_items.nth(i) cover_el = item.locator("div.media img.thumb").first cover_url = await cover_el.get_attribute("src") or "" if await cover_el.count() > 0 else "" if not cover_url: cover_el = item.locator("img").first cover_url = await cover_el.get_attribute("src") or "" if await cover_el.count() > 0 else "" title_el = item.locator("div.post-title").first title = (await title_el.text_content() or "").strip() if await title_el.count() > 0 else "" time_el = item.locator("div.post-time span").first publish_time = (await time_el.text_content() or "").strip() if await time_el.count() > 0 else "" play_count = like_count = comment_count = share_count = collect_count = 0 data_items = item.locator("div.post-data div.data-item") for j in range(await data_items.count()): data_item = data_items.nth(j) count_text = (await data_item.locator("span.count").text_content() or "0").strip() if await data_item.locator("span.weui-icon-outlined-eyes-on").count() > 0: play_count = self._parse_count(count_text) elif await data_item.locator("span.weui-icon-outlined-like").count() > 0: like_count = self._parse_count(count_text) elif await data_item.locator("span.weui-icon-outlined-comment").count() > 0: comment_count = self._parse_count(count_text) elif await data_item.locator("use[xlink\\:href='#icon-share']").count() > 0: share_count = self._parse_count(count_text) elif await data_item.locator("use[xlink\\:href='#icon-thumb']").count() > 0: collect_count = self._parse_count(count_text) work_id = f"weixin_{i}_{hash(title)}_{hash(publish_time)}" works.append(WorkItem( work_id=work_id, title=title or "无标题", cover_url=cover_url, duration=0, status="published", publish_time=publish_time, play_count=play_count, like_count=like_count, comment_count=comment_count, share_count=share_count, collect_count=collect_count, )) except Exception as e: print(f"[{self.platform_name}] DOM 解析作品 {i} 失败: {e}", flush=True) continue total = len(works) has_more = item_count > page_size print(f"[{self.platform_name}] DOM 回退获取 {len(works)} 条", flush=True) except Exception as e: print(f"[{self.platform_name}] DOM 回退失败: {e}", flush=True) return (works, total, has_more, "") async def get_works(self, cookies: str, page: int = 0, page_size: int = 20) -> WorksResult: """获取视频号作品列表(调用 post_list 接口) page: 页码从 0 开始,或上一页返回的 rawKeyBuff/lastBuff 字符串 """ # 分页:首页 currentPage=1/rawKeyBuff=null,下一页用 currentPage 递增或 rawKeyBuff if page is None or page == "" or (isinstance(page, int) and page == 0): current_page = 1 raw_key_buff = None elif isinstance(page, int): current_page = page + 1 raw_key_buff = None else: current_page = 1 raw_key_buff = str(page) ts_ms = str(int(time.time() * 1000)) print(f"\n{'='*60}") print(f"[{self.platform_name}] 获取作品列表 currentPage={current_page}, pageSize={page_size}, rawKeyBuff={raw_key_buff[:40] if raw_key_buff else 'null'}...") print(f"{'='*60}") works: List[WorkItem] = [] total = 0 has_more = False next_page = "" try: await self.init_browser() cookie_list = self.parse_cookies(cookies) await self.set_cookies(cookie_list) if not self.page: raise Exception("Page not initialized") await self.page.goto("https://channels.weixin.qq.com/platform/post/list", timeout=30000) await asyncio.sleep(3) current_url = self.page.url if "login" in current_url: raise Exception("Cookie 已过期,请重新登录") api_url = "https://channels.weixin.qq.com/micro/content/cgi-bin/mmfinderassistant-bin/post/post_list" req_body = { "pageSize": page_size, "currentPage": current_page, "userpageType": 11, "stickyOrder": True, "timestamp": ts_ms, "_log_finder_uin": "", "_log_finder_id": "", "rawKeyBuff": raw_key_buff, "pluginSessionId": None, "scene": 7, "reqScene": 7, } body_str = json.dumps(req_body) response = await self.page.evaluate(""" async ([url, bodyStr]) => { try { const resp = await fetch(url, { method: 'POST', credentials: 'include', headers: { 'Content-Type': 'application/json', 'Accept': '*/*', 'Referer': 'https://channels.weixin.qq.com/platform/post/list' }, body: bodyStr }); return await resp.json(); } catch (e) { return { error: e.toString() }; } } """, [api_url, body_str]) is_first_page = current_page == 1 and raw_key_buff is None if response.get("error"): print(f"[{self.platform_name}] API 请求失败: {response.get('error')}", flush=True) if is_first_page: works, total, has_more, next_page = await self._get_works_fallback_dom(page_size) if works: return WorksResult(success=True, platform=self.platform_name, works=works, total=total, has_more=has_more, next_page=next_page) return WorksResult(success=False, platform=self.platform_name, error=response.get("error", "API 请求失败")) err_code = response.get("errCode", -1) if err_code != 0: err_msg = response.get("errMsg", "unknown") print(f"[{self.platform_name}] API errCode={err_code}, errMsg={err_msg}, 完整响应(前800字): {json.dumps(response, ensure_ascii=False)[:800]}", flush=True) if is_first_page: works, total, has_more, next_page = await self._get_works_fallback_dom(page_size) if works: return WorksResult(success=True, platform=self.platform_name, works=works, total=total, has_more=has_more, next_page=next_page) return WorksResult(success=False, platform=self.platform_name, error=f"errCode={err_code}, errMsg={err_msg}") data = response.get("data") or {} raw_list = data.get("list") or [] total = int(data.get("totalCount") or 0) has_more = bool(data.get("continueFlag", False)) next_page = (data.get("lastBuff") or "").strip() print(f"[{self.platform_name}] API 响应: list_len={len(raw_list)}, totalCount={total}, continueFlag={has_more}, lastBuff={next_page[:50] if next_page else ''}...") if is_first_page and len(raw_list) == 0: works_fb, total_fb, has_more_fb, _ = await self._get_works_fallback_dom(page_size) if works_fb: return WorksResult(success=True, platform=self.platform_name, works=works_fb, total=total_fb, has_more=has_more_fb, next_page="") for item in raw_list: try: # 存 works.platform_video_id 统一用 post_list 接口回参中的 exportId(如 export/xxx) work_id = str(item.get("exportId") or item.get("objectId") or item.get("id") or "").strip() if not work_id: work_id = f"weixin_{hash(item.get('createTime',0))}_{hash(item.get('desc', {}).get('description',''))}" desc = item.get("desc") or {} title = (desc.get("description") or "").strip() or "无标题" cover_url = "" duration = 0 media_list = desc.get("media") or [] if media_list and isinstance(media_list[0], dict): m = media_list[0] cover_url = (m.get("coverUrl") or m.get("thumbUrl") or "").strip() duration = int(m.get("videoPlayLen") or 0) create_ts = item.get("createTime") or 0 if isinstance(create_ts, (int, float)) and create_ts: publish_time = datetime.fromtimestamp(create_ts).strftime("%Y-%m-%d %H:%M:%S") else: publish_time = str(create_ts) if create_ts else "" read_count = int(item.get("readCount") or 0) like_count = int(item.get("likeCount") or 0) comment_count = int(item.get("commentCount") or 0) forward_count = int(item.get("forwardCount") or 0) fav_count = int(item.get("favCount") or 0) works.append(WorkItem( work_id=work_id, title=title, cover_url=cover_url, duration=duration, status="published", publish_time=publish_time, play_count=read_count, like_count=like_count, comment_count=comment_count, share_count=forward_count, collect_count=fav_count, )) except Exception as e: print(f"[{self.platform_name}] 解析作品项失败: {e}", flush=True) continue if total == 0 and works: total = len(works) print(f"[{self.platform_name}] 本页获取 {len(works)} 条,totalCount={total}, next_page={bool(next_page)}") except Exception as e: import traceback traceback.print_exc() return WorksResult(success=False, platform=self.platform_name, error=str(e)) return WorksResult(success=True, platform=self.platform_name, works=works, total=total, has_more=has_more, next_page=next_page) async def get_comments(self, cookies: str, work_id: str, cursor: str = "") -> CommentsResult: """ 获取视频号作品评论(完全参考 get_weixin_work_comments.py 的接口监听逻辑) 支持递归提取二级评论,正确处理 parent_comment_id """ print(f"\n{'='*60}") print(f"[{self.platform_name}] 获取作品评论") print(f"[{self.platform_name}] work_id={work_id}") print(f"{'='*60}") comments: List[CommentItem] = [] total = 0 has_more = False try: await self.init_browser() cookie_list = self.parse_cookies(cookies) await self.set_cookies(cookie_list) if not self.page: raise Exception("Page not initialized") # 访问评论管理页面 print(f"[{self.platform_name}] 正在打开评论页面...") await self.page.goto("https://channels.weixin.qq.com/platform/interaction/comment", timeout=30000) await asyncio.sleep(2) # 检查登录状态 current_url = self.page.url if "login" in current_url: raise Exception("Cookie 已过期,请重新登录") # === 步骤1: 监听 post_list 接口获取作品列表 === posts = [] try: async with self.page.expect_response( lambda res: "/post/post_list" in res.url, timeout=20000 ) as post_resp_info: await self.page.wait_for_selector('.scroll-list .comment-feed-wrap', timeout=15000) post_resp = await post_resp_info.value post_data = await post_resp.json() if post_data.get("errCode") == 0: posts = post_data.get("data", {}).get("list", []) print(f"[{self.platform_name}] ✅ 获取 {len(posts)} 个作品") else: err_msg = post_data.get("errMsg", "未知错误") print(f"[{self.platform_name}] ❌ post_list 业务错误: {err_msg}") return CommentsResult( success=False, platform=self.platform_name, work_id=work_id, error=f"post_list 业务错误: {err_msg}" ) except Exception as e: print(f"[{self.platform_name}] ❌ 获取 post_list 失败: {e}") return CommentsResult( success=False, platform=self.platform_name, work_id=work_id, error=f"获取 post_list 失败: {e}" ) # === 步骤2: 在 DOM 中查找目标作品 === feed_wraps = await self.page.query_selector_all('.scroll-list .comment-feed-wrap') target_feed = None target_post = None target_index = -1 for i, feed in enumerate(feed_wraps): if i >= len(posts): break post = posts[i] object_nonce = post.get("objectNonce", "") post_work_id = post.get("objectId", "") or object_nonce # 匹配 work_id(支持 objectId 或 objectNonce 匹配) if work_id in [post_work_id, object_nonce] or post_work_id in work_id or object_nonce in work_id: target_feed = feed target_post = post target_index = i work_title = post.get("desc", {}).get("description", "无标题") print(f"[{self.platform_name}] ✅ 找到目标作品: {work_title}") continue if not target_feed or not target_post: print(f"[{self.platform_name}] ❌ 未找到 work_id={work_id} 对应的作品") return CommentsResult( success=True, platform=self.platform_name, work_id=work_id, comments=[], total=0, has_more=False ) # 准备作品信息(用于递归函数) object_nonce = target_post.get("objectNonce", f"nonce_{target_index}") work_title = target_post.get("desc", {}).get("description", f"作品{target_index+1}") work_info = { "work_id": object_nonce, "work_title": work_title } # === 步骤3: 点击作品触发 comment_list 接口 === content_wrap = await target_feed.query_selector('.feed-content') or target_feed try: async with self.page.expect_response( lambda res: "/comment/comment_list" in res.url, timeout=15000 ) as comment_resp_info: await content_wrap.click() await asyncio.sleep(0.8) comment_resp = await comment_resp_info.value comment_data = await comment_resp.json() if comment_data.get("errCode") != 0: err_msg = comment_data.get("errMsg", "未知错误") print(f"[{self.platform_name}] ❌ 评论接口错误: {err_msg}") return CommentsResult( success=False, platform=self.platform_name, work_id=work_id, error=f"评论接口错误: {err_msg}" ) raw_comments = comment_data.get("data", {}).get("comment", []) total = comment_data.get("data", {}).get("totalCount", len(raw_comments)) print(f"[{self.platform_name}] 📊 原始评论数: {len(raw_comments)}, 总数: {total}") # === 步骤4: 递归提取所有评论(含子评论)=== extracted = self._extract_comments(raw_comments, parent_id="", work_info=work_info) # === 步骤5: 转换为 CommentItem 列表(保留 weixin.py 的数据结构)=== for c in extracted: # 使用接口返回的 comment_id comment_id = c.get("comment_id", "") parent_comment_id = c.get("parent_comment_id", "") # 构建 CommentItem(保留原有数据结构用于数据库入库) comment_item = CommentItem( comment_id=comment_id, parent_comment_id=parent_comment_id, work_id=work_id, content=c.get("content", ""), author_id=c.get("username", ""), # 使用 username 作为 author_id author_name=c.get("nickname", ""), author_avatar=c.get("avatar", ""), like_count=c.get("like_count", 0), reply_count=0, create_time=c.get("create_time", ""), ) # 添加扩展字段(用于数据库存储和后续处理) # comment_item.parent_comment_id = c.get("parent_comment_id", "") comment_item.is_author = c.get("is_author", False) comment_item.create_time_unix = c.get("create_time_unix", 0) comment_item.work_title = c.get("work_title", "") print(comment_item) comments.append(comment_item) # 打印日志 author_tag = " 👤(作者)" if c.get("is_author") else "" parent_tag = f" [回复: {c.get('parent_comment_id', '')}]" if c.get("parent_comment_id") else "" print(f"[{self.platform_name}] - [{c.get('nickname', '')}] {c.get('content', '')[:30]}... " f"({c.get('create_time', '')}){author_tag}{parent_tag}") # 判断是否还有更多(优先使用接口返回的 continueFlag,否则根据数量判断) has_more = comment_data.get("data", {}).get("continueFlag", False) or len(extracted) < total print(f"[{self.platform_name}] ✅ 共提取 {len(comments)} 条评论(含子评论)") except Exception as e: print(f"[{self.platform_name}] ❌ 获取评论失败: {e}") import traceback traceback.print_exc() return CommentsResult( success=False, platform=self.platform_name, work_id=work_id, error=f"获取评论失败: {e}" ) except Exception as e: import traceback traceback.print_exc() return CommentsResult( success=False, platform=self.platform_name, work_id=work_id, error=str(e) ) return CommentsResult( success=True, platform=self.platform_name, work_id=work_id, comments=comments, total=total, has_more=has_more ) def _extract_comments(self, comment_list: list, parent_id: str = "", work_info: dict = None) -> list: """ 递归提取一级和二级评论(完全参考 get_weixin_work_comments.py 的 extract_comments 函数) Args: comment_list: 评论列表(原始接口数据) parent_id: 父评论ID(一级评论为空字符串"",二级评论为父级评论ID) work_info: 作品信息字典 Returns: list: 扁平化的评论列表,包含一级和二级评论 """ result = [] # 获取当前用户 username(用于判断是否为作者) # 优先从环境变量获取,也可通过其他方式配置 my_username = getattr(self, 'my_username', '') or os.environ.get('WEIXIN_MY_USERNAME', '') for cmt in comment_list: # 处理时间戳 create_ts = int(cmt.get("commentCreatetime", 0) or 0) readable_time = ( datetime.fromtimestamp(create_ts).strftime('%Y-%m-%d %H:%M:%S') if create_ts > 0 else "" ) # 判断是否作者(如果配置了 my_username) username = cmt.get("username", "") or "" is_author = (my_username != "") and (username == my_username) # 构建评论条目 - 完全参考 get_weixin_work_comments.py 的字段 entry = { "work_id": work_info.get("work_id", "") if work_info else "", "work_title": work_info.get("work_title", "") if work_info else "", "comment_id": cmt.get("commentId"), "parent_comment_id": parent_id, # 关键:一级评论为空字符串"",二级评论为父评论ID "username": username, "nickname": cmt.get("commentNickname", ""), "avatar": cmt.get("commentHeadurl", ""), "content": cmt.get("commentContent", ""), "create_time_unix": create_ts, "create_time": readable_time, "is_author": is_author, "like_count": cmt.get("commentLikeCount", 0) or 0 } result.append(entry) # 递归处理二级评论(levelTwoComment) # 关键:二级评论的 parent_id 应该是当前这条评论的 comment_id level_two = cmt.get("levelTwoComment", []) or [] if level_two and isinstance(level_two, list) and len(level_two) > 0: # 当前评论的 ID 作为其子评论的 parent_id current_comment_id = cmt.get("commentId", "") result.extend( self._extract_comments(level_two, parent_id=current_comment_id, work_info=work_info) ) return result async def auto_reply_private_messages(self, cookies: str) -> dict: """自动回复私信 - 集成自 pw3.py""" print(f"\n{'='*60}") print(f"[{self.platform_name}] 开始自动回复私信") print(f"{'='*60}") try: await self.init_browser() cookie_list = self.parse_cookies(cookies) await self.set_cookies(cookie_list) if not self.page: raise Exception("Page not initialized") # 访问私信页面 await self.page.goto("https://channels.weixin.qq.com/platform/private_msg", timeout=30000) await asyncio.sleep(3) # 检查登录状态 current_url = self.page.url print(f"[{self.platform_name}] 当前 URL: {current_url}") if "login" in current_url: raise Exception("Cookie 已过期,请重新登录") # 等待私信页面加载(使用多个选择器容错) try: await self.page.wait_for_selector('.private-msg-list-header', timeout=15000) except: # 尝试其他选择器 try: await self.page.wait_for_selector('.weui-desktop-tab__navs__inner', timeout=10000) print(f"[{self.platform_name}] 使用备用选择器加载成功") except: # 截图调试 screenshot_path = f"weixin_private_msg_{int(asyncio.get_event_loop().time())}.png" await self.page.screenshot(path=screenshot_path) print(f"[{self.platform_name}] 页面加载失败,截图: {screenshot_path}") raise Exception(f"私信页面加载超时,当前 URL: {current_url}") print(f"[{self.platform_name}] 私信页面加载完成") # 处理两个 tab total_replied = 0 for tab_name in ["打招呼消息", "私信"]: replied_count = await self._process_tab_sessions(tab_name) total_replied += replied_count print(f"[{self.platform_name}] 自动回复完成,共回复 {total_replied} 条消息") return { 'success': True, 'platform': self.platform_name, 'replied_count': total_replied, 'message': f'成功回复 {total_replied} 条私信' } except Exception as e: import traceback traceback.print_exc() return { 'success': False, 'platform': self.platform_name, 'error': str(e) } async def _process_tab_sessions(self, tab_name: str) -> int: """处理指定 tab 下的所有会话""" print(f"\n🔄 正在处理「{tab_name}」中的所有会话...") if not self.page: return 0 replied_count = 0 try: # 点击 tab if tab_name == "私信": tab_link = self.page.locator('.weui-desktop-tab__navs__inner li').first.locator('a') elif tab_name == "打招呼消息": tab_link = self.page.locator('.weui-desktop-tab__navs__inner li').nth(1).locator('a') else: return 0 if await tab_link.is_visible(): await tab_link.click() print(f" ➤ 已点击「{tab_name}」tab") else: print(f" ❌ 「{tab_name}」tab 不可见") return 0 # 等待会话列表加载 try: await self.page.wait_for_function(""" () => { const hasSession = document.querySelectorAll('.session-wrap').length > 0; const hasEmpty = !!document.querySelector('.empty-text'); return hasSession || hasEmpty; } """, timeout=8000) print(" ✅ 会话列表区域已加载") except: print(" ⚠️ 等待会话列表超时,继续尝试读取...") # 获取会话 session_wraps = self.page.locator('.session-wrap') session_count = await session_wraps.count() print(f" 💬 共找到 {session_count} 个会话") if session_count == 0: return 0 # 遍历每个会话 for idx in range(session_count): try: current_sessions = self.page.locator('.session-wrap') if idx >= await current_sessions.count(): break session = current_sessions.nth(idx) user_name = await session.locator('.name').inner_text() last_preview = await session.locator('.feed-info').inner_text() print(f"\n ➤ [{idx+1}/{session_count}] 正在处理: {user_name} | 最后消息: {last_preview}") await session.click() await asyncio.sleep(2) # 提取聊天历史 history = await self._extract_chat_history() need_reply = (not history) or (not history[-1]["is_author"]) if need_reply: reply_text = await self._generate_reply_with_ai(history) if reply_text=="": reply_text = self._generate_reply(history) # # 生成回复 # if history and history[-1]["is_author"]: # reply_text = await self._generate_reply_with_ai(history) # else: # reply_text = self._generate_reply(history) if reply_text: print(f" 📝 回复内容: {reply_text}") try: textarea = self.page.locator('.edit_area').first send_btn = self.page.locator('button:has-text("发送")').first if await textarea.is_visible() and await send_btn.is_visible(): await textarea.fill(reply_text) await asyncio.sleep(0.5) await send_btn.click() print(" ✅ 已发送") replied_count += 1 await asyncio.sleep(1.5) else: print(" ❌ 输入框或发送按钮不可见") except Exception as e: print(f" ❌ 发送失败: {e}") else: print(" ➤ 无需回复") else: print(" ➤ 最后一条是我发的,跳过回复") except Exception as e: print(f" ❌ 处理会话 {idx+1} 时出错: {e}") continue except Exception as e: print(f"❌ 处理「{tab_name}」失败: {e}") return replied_count async def _extract_chat_history(self) -> list: """精准提取聊天记录,区分作者(自己)和用户""" if not self.page: return [] history = [] message_wrappers = self.page.locator('.session-content-wrapper > div:not(.footer) > .text-wrapper') count = await message_wrappers.count() for i in range(count): try: wrapper = message_wrappers.nth(i) # 判断方向 is_right = await wrapper.locator('.content-right').count() > 0 is_left = await wrapper.locator('.content-left').count() > 0 if not (is_left or is_right): continue # 提取消息文本 pre_el = wrapper.locator('pre.message-plain') content = '' if await pre_el.count() > 0: content = await pre_el.inner_text() content = content.strip() if not content: continue # 获取头像 avatar_img = wrapper.locator('.avatar').first avatar_src = '' if await avatar_img.count() > 0: avatar_src = await avatar_img.get_attribute("src") or '' # 右侧 = 作者(自己) is_author = is_right # 获取用户名 if is_left: name_el = wrapper.locator('.profile .name') author_name = '用户' if await name_el.count() > 0: author_name = await name_el.inner_text() else: author_name = "我" history.append({ "author": author_name, "content": content, "is_author": is_author, "avatar": avatar_src }) except Exception as e: print(f" ⚠️ 解析第 {i+1} 条消息失败: {e}") continue return history async def _generate_reply_with_ai(self, chat_history: list) -> str: """使用 AI 生成智能回复""" import requests import json try: # 获取 AI 配置 ai_api_key = os.environ.get('DASHSCOPE_API_KEY', '') ai_base_url = os.environ.get('DASHSCOPE_BASE_URL', 'https://dashscope.aliyuncs.com/compatible-mode/v1') ai_model = os.environ.get('AI_MODEL', 'qwen-plus') if not ai_api_key: print("⚠️ 未配置 AI API Key,使用规则回复") return self._generate_reply(chat_history) # 构建对话上下文 messages = [{"role": "system", "content": "你是一个友好的微信视频号创作者助手,负责回复粉丝私信。请保持简洁、友好、专业的语气。回复长度不超过20字。"}] for msg in chat_history: role = "assistant" if msg["is_author"] else "user" messages.append({ "role": role, "content": msg["content"] }) # 调用 AI API headers = { 'Authorization': f'Bearer {ai_api_key}', 'Content-Type': 'application/json' } payload = { "model": ai_model, "messages": messages, "max_tokens": 150, "temperature": 0.8 } print(" 🤖 正在调用 AI 生成回复...") response = requests.post( f"{ai_base_url}/chat/completions", headers=headers, json=payload, timeout=30 ) if response.status_code != 200: print(f" ⚠️ AI API 返回错误 {response.status_code},使用规则回复") return self._generate_reply(chat_history) result = response.json() ai_reply = result.get('choices', [{}])[0].get('message', {}).get('content', '').strip() if ai_reply: print(f" ✅ AI 生成回复: {ai_reply}") return ai_reply else: print(" ⚠️ AI 返回空内容,使用规则回复") return self._generate_reply(chat_history) except Exception as e: print(f" ⚠️ AI 回复生成失败: {e},使用规则回复") return self._generate_reply(chat_history) def _generate_reply(self, chat_history: list) -> str: """根据完整聊天历史生成回复(规则回复方式)""" if not chat_history: return "你好!感谢联系~" # 检查最后一条是否是作者发的 if chat_history[-1]["is_author"]: return "" # 不回复 # 找最后一条用户消息 last_user_msg = chat_history[-1]["content"] # 简单规则回复 if "谢谢" in last_user_msg or "感谢" in last_user_msg: return "不客气!欢迎常来交流~" elif "你好" in last_user_msg or "在吗" in last_user_msg: return "你好!请问有什么可以帮您的?" elif "视频" in last_user_msg or "怎么拍" in last_user_msg: return "视频是用手机拍摄的,注意光线和稳定哦!" else: return "收到!我会认真阅读您的留言~" ================================================================================ 文件: server\python\platforms\xiaohongshu.py ================================================================================ # -*- coding: utf-8 -*- """ 小红书视频发布器 参考: matrix/xhs_uploader/main.py 使用 xhs SDK API 方式发布,更稳定 """ import asyncio import os import sys import time import concurrent.futures from pathlib import Path from typing import List from .base import ( BasePublisher, PublishParams, PublishResult, WorkItem, WorksResult, CommentItem, CommentsResult ) from playwright.async_api import async_playwright stored_cookies = None # 添加 matrix 项目路径,用于导入签名脚本 MATRIX_PATH = Path(__file__).parent.parent.parent.parent / "matrix" sys.path.insert(0, str(MATRIX_PATH)) # 尝试导入 xhs SDK try: from xhs import XhsClient XHS_SDK_AVAILABLE = True except ImportError: print("[Warning] xhs 库未安装,请运行: pip install xhs") XhsClient = None XHS_SDK_AVAILABLE = False # 签名脚本路径 STEALTH_JS_PATH = MATRIX_PATH / "xhs-api" / "js" / "stealth.min.js" _xhs_sign_executor = concurrent.futures.ThreadPoolExecutor(max_workers=1) class XiaohongshuPublisher(BasePublisher): """ 小红书视频发布器 优先使用 xhs SDK API 方式发布 """ platform_name = "xiaohongshu" login_url = "https://creator.xiaohongshu.com/" publish_url = "https://creator.xiaohongshu.com/publish/publish" cookie_domain = ".xiaohongshu.com" async def get_sign(self, uri: str, data=None, a1: str = "", web_session: str = ""): """获取小红书 API 签名""" from playwright.async_api import async_playwright try: async with async_playwright() as playwright: browser = await playwright.chromium.launch(headless=True) browser_context = await browser.new_context() if STEALTH_JS_PATH.exists(): await browser_context.add_init_script(path=str(STEALTH_JS_PATH)) page = await browser_context.new_page() await page.goto("https://www.xiaohongshu.com") await asyncio.sleep(1) await page.reload() await asyncio.sleep(1) if a1: await browser_context.add_cookies([ {'name': 'a1', 'value': a1, 'domain': ".xiaohongshu.com", 'path': "/"} ]) await page.reload() await asyncio.sleep(0.5) encrypt_params = await page.evaluate( "([url, data]) => window._webmsxyw(url, data)", [uri, data] ) await browser_context.close() await browser.close() return { "x-s": encrypt_params["X-s"], "x-t": str(encrypt_params["X-t"]) } except Exception as e: import traceback traceback.print_exc() raise Exception(f"签名失败: {e}") def sign_sync(self, uri, data=None, a1="", web_session=""): """ 同步签名函数,供 XhsClient 使用。 注意:发布流程运行在 asyncio 事件循环中(通过 asyncio.run 启动)。 XhsClient 以同步方式调用 sign 回调,但我们需要使用 Playwright Async API 进行签名。 因此当处于事件循环中时,将签名逻辑放到独立线程里执行 asyncio.run。 """ def run_async_sign(): return asyncio.run(self.get_sign(uri, data=data, a1=a1, web_session=web_session)) try: asyncio.get_running_loop() future = _xhs_sign_executor.submit(run_async_sign) return future.result(timeout=120) except RuntimeError: return run_async_sign() async def publish_via_api(self, cookies: str, params: PublishParams) -> PublishResult: """通过 API 发布视频""" if not XHS_SDK_AVAILABLE: raise Exception("xhs SDK 未安装,请运行: pip install xhs") self.report_progress(10, "正在通过 API 发布...") print(f"[{self.platform_name}] 使用 XHS SDK API 发布...") print(f"[{self.platform_name}] 视频路径: {params.video_path}") print(f"[{self.platform_name}] 标题: {params.title}") # 转换 cookie 格式 cookie_list = self.parse_cookies(cookies) cookie_string = self.cookies_to_string(cookie_list) if cookie_list else cookies print(f"[{self.platform_name}] Cookie 长度: {len(cookie_string)}") self.report_progress(20, "正在上传视频...") async def ensure_valid_cookie_for_sdk() -> str | None: await self.init_browser() cookie_list_for_browser = self.parse_cookies(cookie_string) await self.set_cookies(cookie_list_for_browser) if not self.page or not self.context: return None await self.page.goto("https://creator.xiaohongshu.com/new/home", wait_until="domcontentloaded", timeout=60000) await asyncio.sleep(2) current_url = (self.page.url or '').lower() if 'login' in current_url or 'passport' in current_url: if self.headless: return None waited = 0 while waited < 180: current_url = (self.page.url or '').lower() if 'login' not in current_url and 'passport' not in current_url and 'creator.xiaohongshu.com' in current_url: break await asyncio.sleep(2) waited += 2 current_url = (self.page.url or '').lower() if 'login' in current_url or 'passport' in current_url: return None cookies_after = await self.context.cookies() try: await self.sync_cookies_to_node(cookies_after) except Exception: pass refreshed_cookie_str = self.cookies_to_string(cookies_after) return refreshed_cookie_str or None def call_create_video_note(sdk_cookie_str: str): xhs_client = XhsClient(sdk_cookie_str, sign=self.sign_sync) return xhs_client.create_video_note( title=params.title, desc=params.description or params.title, topics=params.tags or [], post_time=params.publish_date.strftime("%Y-%m-%d %H:%M:%S") if params.publish_date else None, video_path=params.video_path, cover_path=params.cover_path if params.cover_path and os.path.exists(params.cover_path) else None ) print(f"[{self.platform_name}] 开始调用 create_video_note...") try: result = call_create_video_note(cookie_string) print(f"[{self.platform_name}] SDK 返回结果: {result}") except Exception as e: err_text = str(e) if '无登录信息' in err_text or '"code": -100' in err_text or "'code': -100" in err_text: self.report_progress(15, "登录信息失效,尝试刷新登录信息...") refreshed = await ensure_valid_cookie_for_sdk() if not refreshed: screenshot_base64 = await self.capture_screenshot() page_url = await self.get_page_url() if hasattr(self, 'get_page_url') else (self.page.url if self.page else "") return PublishResult( success=False, platform=self.platform_name, error="登录已过期,请使用有头浏览器重新登录", screenshot_base64=screenshot_base64, page_url=page_url, status='need_captcha', need_captcha=True, captcha_type='login' ) try: result = call_create_video_note(refreshed) print(f"[{self.platform_name}] SDK 重试返回结果: {result}") except Exception as e2: import traceback traceback.print_exc() raise Exception(f"XHS SDK 发布失败: {e2}") else: import traceback traceback.print_exc() print(f"[{self.platform_name}] SDK 调用失败: {e}") raise Exception(f"XHS SDK 发布失败: {e}") # 验证返回结果 if not result: raise Exception("XHS SDK 返回空结果") # 检查是否有错误 if isinstance(result, dict): if result.get("code") and result.get("code") != 0: raise Exception(f"发布失败: {result.get('msg', '未知错误')}") if result.get("success") == False: raise Exception(f"发布失败: {result.get('msg', result.get('error', '未知错误'))}") note_id = result.get("note_id", "") if isinstance(result, dict) else "" video_url = result.get("url", "") if isinstance(result, dict) else "" if not note_id: print(f"[{self.platform_name}] 警告: 未获取到 note_id,返回结果: {result}") self.report_progress(100, "发布成功") print(f"[{self.platform_name}] 发布成功! note_id={note_id}, url={video_url}") return PublishResult( success=True, platform=self.platform_name, video_id=note_id, video_url=video_url, message="发布成功" ) async def publish(self, cookies: str, params: PublishParams) -> PublishResult: """发布视频到小红书 - 参考 matrix/xhs_uploader/main.py""" print(f"\n{'='*60}") print(f"[{self.platform_name}] 开始发布视频") print(f"[{self.platform_name}] 视频路径: {params.video_path}") print(f"[{self.platform_name}] 标题: {params.title}") print(f"[{self.platform_name}] Headless: {self.headless}") print(f"[{self.platform_name}] XHS SDK 可用: {XHS_SDK_AVAILABLE}") print(f"{'='*60}") # 检查视频文件 if not os.path.exists(params.video_path): raise Exception(f"视频文件不存在: {params.video_path}") print(f"[{self.platform_name}] 视频文件存在,大小: {os.path.getsize(params.video_path)} bytes") self.report_progress(5, "正在准备发布...") if isinstance(getattr(self, 'proxy_config', None), dict) and self.proxy_config.get('server'): print(f"[{self.platform_name}] 检测到代理配置,跳过 SDK 方式,使用 Playwright 走代理发布", flush=True) return await self.publish_via_playwright(cookies, params) # 参考 matrix: 优先使用 XHS SDK API 方式发布(更稳定) if XHS_SDK_AVAILABLE: try: print(f"[{self.platform_name}] 尝试使用 XHS SDK API 发布...") result = await self.publish_via_api(cookies, params) print(f"[{self.platform_name}] API 发布完成: success={result.success}") # 如果 API 返回成功,直接返回 if result.success: return result # 如果 API 返回失败但有具体错误,也返回 if result.error and "请刷新" not in result.error: return result # 其他情况尝试 Playwright 方式 print(f"[{self.platform_name}] API 方式未成功,尝试 Playwright...") except Exception as e: err_text = str(e) if '登录已过期' in err_text or '无登录信息' in err_text: print(f"[{self.platform_name}] API 登录失效,切换到 Playwright 方式...", flush=True) else: import traceback traceback.print_exc() print(f"[{self.platform_name}] API 发布失败: {e}") print(f"[{self.platform_name}] 尝试使用 Playwright 方式...") # 使用 Playwright 方式发布 print(f"[{self.platform_name}] 使用 Playwright 方式发布...") return await self.publish_via_playwright(cookies, params) async def publish_via_playwright(self, cookies: str, params: PublishParams) -> PublishResult: """通过 Playwright 发布视频""" self.report_progress(10, "正在初始化浏览器...") print(f"[{self.platform_name}] Playwright 方式开始...") await self.init_browser() cookie_list = self.parse_cookies(cookies) print(f"[{self.platform_name}] 设置 {len(cookie_list)} 个 cookies") await self.set_cookies(cookie_list) if not self.page: raise Exception("Page not initialized") self.report_progress(15, "正在打开发布页面...") # 直接访问视频发布页面 publish_url = "https://creator.xiaohongshu.com/publish/publish?source=official" print(f"[{self.platform_name}] 打开页面: {publish_url}") await self.page.goto(publish_url) await asyncio.sleep(3) current_url = self.page.url print(f"[{self.platform_name}] 当前 URL: {current_url}") async def wait_for_manual_login(timeout_seconds: int = 300) -> bool: if not self.page: return False self.report_progress(12, "检测到需要登录,请在浏览器窗口完成登录...") try: await self.page.bring_to_front() except: pass waited = 0 while waited < timeout_seconds: try: url = self.page.url if "login" not in url and "passport" not in url and "creator.xiaohongshu.com" in url: return True await asyncio.sleep(2) waited += 2 except: await asyncio.sleep(2) waited += 2 return False async def wait_for_manual_captcha(timeout_seconds: int = 180) -> bool: waited = 0 while waited < timeout_seconds: try: ai_captcha = await self.ai_check_captcha() if not ai_captcha.get("has_captcha"): return True except: pass await asyncio.sleep(3) waited += 3 return False # 检查登录状态 if "login" in current_url or "passport" in current_url: if not self.headless: logged_in = await wait_for_manual_login() if logged_in: try: if self.context: cookies_after = await self.context.cookies() await self.sync_cookies_to_node(cookies_after) except: pass await self.page.goto(publish_url) await asyncio.sleep(3) current_url = self.page.url else: screenshot_base64 = await self.capture_screenshot() return PublishResult( success=False, platform=self.platform_name, error="需要登录:请在浏览器窗口完成登录后重试", screenshot_base64=screenshot_base64, page_url=current_url, status='need_captcha', need_captcha=True, captcha_type='login' ) else: screenshot_base64 = await self.capture_screenshot() return PublishResult( success=False, platform=self.platform_name, error="登录已过期,请重新登录", screenshot_base64=screenshot_base64, page_url=current_url, status='need_captcha', need_captcha=True, captcha_type='login' ) # 使用 AI 检查验证码 ai_captcha = await self.ai_check_captcha() if ai_captcha['has_captcha']: print(f"[{self.platform_name}] AI检测到验证码: {ai_captcha['captcha_type']}", flush=True) if not self.headless: solved = await wait_for_manual_captcha() if solved: try: if self.context: cookies_after = await self.context.cookies() await self.sync_cookies_to_node(cookies_after) except: pass else: screenshot_base64 = await self.capture_screenshot() return PublishResult( success=False, platform=self.platform_name, error=f"需要验证码:请在浏览器窗口完成验证后重试", screenshot_base64=screenshot_base64, page_url=current_url, status='need_captcha', need_captcha=True, captcha_type=ai_captcha['captcha_type'] ) else: screenshot_base64 = await self.capture_screenshot() return PublishResult( success=False, platform=self.platform_name, error=f"检测到{ai_captcha['captcha_type']}验证码,需要使用有头浏览器完成验证", screenshot_base64=screenshot_base64, page_url=current_url, status='need_captcha', need_captcha=True, captcha_type=ai_captcha['captcha_type'] ) self.report_progress(20, "正在上传视频...") # 等待页面加载 await asyncio.sleep(2) # 上传视频 upload_triggered = False # 方法1: 直接设置隐藏的 file input print(f"[{self.platform_name}] 尝试方法1: 设置 file input") file_inputs = self.page.locator('input[type="file"]') input_count = await file_inputs.count() print(f"[{self.platform_name}] 找到 {input_count} 个 file input") if input_count > 0: # 找到接受视频的 input for i in range(input_count): input_el = file_inputs.nth(i) accept = await input_el.get_attribute('accept') or '' print(f"[{self.platform_name}] Input {i} accept: {accept}") if 'video' in accept or '*' in accept or not accept: await input_el.set_input_files(params.video_path) upload_triggered = True print(f"[{self.platform_name}] 视频文件已设置到 input {i}") break # 方法2: 点击上传区域触发文件选择器 if not upload_triggered: print(f"[{self.platform_name}] 尝试方法2: 点击上传区域") try: upload_area = self.page.locator('[class*="upload-wrapper"], [class*="upload-area"], .upload-input').first if await upload_area.count() > 0: async with self.page.expect_file_chooser(timeout=5000) as fc_info: await upload_area.click() file_chooser = await fc_info.value await file_chooser.set_files(params.video_path) upload_triggered = True print(f"[{self.platform_name}] 通过点击上传区域上传成功") except Exception as e: print(f"[{self.platform_name}] 方法2失败: {e}") if not upload_triggered: screenshot_base64 = await self.capture_screenshot() page_url = await self.get_page_url() return PublishResult( success=False, platform=self.platform_name, error="无法上传视频文件", screenshot_base64=screenshot_base64, page_url=page_url, status='need_action' ) self.report_progress(40, "等待视频上传完成...") print(f"[{self.platform_name}] 等待视频上传和处理...") # 等待上传完成(检测页面变化) upload_complete = False for i in range(60): # 最多等待3分钟 await asyncio.sleep(3) # 检查是否有标题输入框(上传完成后出现) title_input_count = await self.page.locator('input[placeholder*="标题"], input[placeholder*="填写标题"]').count() # 或者检查编辑器区域 editor_count = await self.page.locator('[class*="ql-editor"], [contenteditable="true"]').count() # 检查发布按钮是否可见 publish_btn_count = await self.page.locator('.publishBtn, button:has-text("发布")').count() print(f"[{self.platform_name}] 检测 {i+1}: 标题框={title_input_count}, 编辑器={editor_count}, 发布按钮={publish_btn_count}") if title_input_count > 0 or (editor_count > 0 and publish_btn_count > 0): upload_complete = True print(f"[{self.platform_name}] 视频上传完成!") break if not upload_complete: screenshot_base64 = await self.capture_screenshot() page_url = await self.get_page_url() return PublishResult( success=False, platform=self.platform_name, error="视频上传超时", screenshot_base64=screenshot_base64, page_url=page_url, status='need_action' ) await asyncio.sleep(2) self.report_progress(60, "正在填写笔记信息...") print(f"[{self.platform_name}] 填写标题: {params.title[:20]}") # 填写标题 title_filled = False title_selectors = [ 'input[placeholder*="标题"]', 'input[placeholder*="填写标题"]', '[class*="title"] input', '.c-input_inner', ] for selector in title_selectors: title_input = self.page.locator(selector).first if await title_input.count() > 0: await title_input.click() await title_input.fill('') # 先清空 await title_input.fill(params.title[:20]) title_filled = True print(f"[{self.platform_name}] 标题已填写,使用选择器: {selector}") break if not title_filled: print(f"[{self.platform_name}] 警告: 未找到标题输入框") # 填写描述和标签 if params.description or params.tags: desc_filled = False desc_selectors = [ '[class*="ql-editor"]', '[class*="content-input"] [contenteditable="true"]', '[class*="editor"] [contenteditable="true"]', '.ql-editor', ] for selector in desc_selectors: desc_input = self.page.locator(selector).first if await desc_input.count() > 0: await desc_input.click() await asyncio.sleep(0.5) if params.description: await self.page.keyboard.type(params.description, delay=20) print(f"[{self.platform_name}] 描述已填写") if params.tags: # 添加标签 await self.page.keyboard.press("Enter") for tag in params.tags[:5]: # 最多5个标签 await self.page.keyboard.type(f"#{tag}", delay=20) await asyncio.sleep(0.3) await self.page.keyboard.press("Space") print(f"[{self.platform_name}] 标签已填写: {params.tags[:5]}") desc_filled = True break if not desc_filled: print(f"[{self.platform_name}] 警告: 未找到描述输入框") await asyncio.sleep(2) self.report_progress(80, "正在发布...") await asyncio.sleep(2) # 滚动到页面底部确保发布按钮可见 await self.page.evaluate("window.scrollTo(0, document.body.scrollHeight)") await asyncio.sleep(1) print(f"[{self.platform_name}] 查找发布按钮...") # 点击发布 publish_selectors = [ 'button.publishBtn', '.publishBtn', 'button.d-button.red', 'button:has-text("发布"):not(:has-text("定时发布"))', '[class*="publish"][class*="btn"]', ] publish_clicked = False for selector in publish_selectors: try: btn = self.page.locator(selector).first if await btn.count() > 0: is_visible = await btn.is_visible() is_enabled = await btn.is_enabled() print(f"[{self.platform_name}] 按钮 {selector}: visible={is_visible}, enabled={is_enabled}") if is_visible and is_enabled: box = await btn.bounding_box() if box: print(f"[{self.platform_name}] 点击发布按钮: {selector}, 位置: ({box['x']}, {box['y']})") # 使用真实鼠标点击 await self.page.mouse.click(box['x'] + box['width']/2, box['y'] + box['height']/2) publish_clicked = True break except Exception as e: print(f"[{self.platform_name}] 选择器 {selector} 错误: {e}") if not publish_clicked: try: suggest = await self.ai_suggest_playwright_selector("点击小红书发布按钮") if suggest.get("has_selector") and suggest.get("selector"): sel = suggest.get("selector") btn = self.page.locator(sel).first if await btn.count() > 0 and await btn.is_visible() and await btn.is_enabled(): try: await btn.click() except: box = await btn.bounding_box() if box: await self.page.mouse.click(box['x'] + box['width']/2, box['y'] + box['height']/2) publish_clicked = True except Exception as e: print(f"[{self.platform_name}] AI 点击发布按钮失败: {e}", flush=True) if not publish_clicked: # 保存截图用于调试 screenshot_path = f"debug_publish_failed_{self.platform_name}.png" await self.page.screenshot(path=screenshot_path, full_page=True) print(f"[{self.platform_name}] 未找到发布按钮,截图保存到: {screenshot_path}") # 打印页面 HTML 结构用于调试 buttons = await self.page.query_selector_all('button') print(f"[{self.platform_name}] 页面上共有 {len(buttons)} 个按钮") for i, btn in enumerate(buttons[:10]): text = await btn.text_content() or '' cls = await btn.get_attribute('class') or '' print(f" 按钮 {i}: text='{text.strip()[:30]}', class='{cls[:50]}'") raise Exception("未找到发布按钮") print(f"[{self.platform_name}] 已点击发布按钮,等待发布完成...") self.report_progress(90, "等待发布结果...") # 等待发布完成(检测 URL 变化或成功提示) publish_success = False refresh_retry = 0 for i in range(20): # 最多等待 20 秒 await asyncio.sleep(1) current_url = self.page.url # 检查是否跳转到发布成功页面或内容管理页面 if "published=true" in current_url or "success" in current_url or "content" in current_url: publish_success = True print(f"[{self.platform_name}] 发布成功! 跳转到: {current_url}") break # 检查是否有成功提示 try: success_msg = await self.page.locator('[class*="success"], .toast-success, [class*="Toast"]').first.is_visible() if success_msg: publish_success = True print(f"[{self.platform_name}] 检测到成功提示!") break except: pass # 检查是否有错误提示 try: error_elements = self.page.locator('[class*="error"], .toast-error, [class*="fail"]') if await error_elements.count() > 0: first_error = error_elements.first if await first_error.is_visible(): error_text = (await first_error.text_content()) or '' error_text = error_text.strip() if error_text: if '请刷新' in error_text and refresh_retry < 3: refresh_retry += 1 print(f"[{self.platform_name}] 检测到临时错误: {error_text},尝试刷新并重试发布({refresh_retry}/3)", flush=True) try: await self.page.reload(wait_until="domcontentloaded") except Exception: pass await asyncio.sleep(2) await self.page.evaluate("window.scrollTo(0, document.body.scrollHeight)") await asyncio.sleep(1) republish_clicked = False for selector in publish_selectors: try: btn = self.page.locator(selector).first if await btn.count() > 0 and await btn.is_visible() and await btn.is_enabled(): try: await btn.click() except: box = await btn.bounding_box() if box: await self.page.mouse.click(box['x'] + box['width']/2, box['y'] + box['height']/2) republish_clicked = True break except: continue continue screenshot_base64 = await self.capture_screenshot() page_url = await self.get_page_url() return PublishResult( success=False, platform=self.platform_name, error=f"发布失败: {error_text}", screenshot_base64=screenshot_base64, page_url=page_url, status='failed' ) except Exception as e: if "发布失败" in str(e): raise # 如果没有明确的成功标志,返回截图供 AI 分析 if not publish_success: final_url = self.page.url print(f"[{self.platform_name}] 发布结果不确定,当前 URL: {final_url}") screenshot_base64 = await self.capture_screenshot() print(f"[{self.platform_name}] 已获取截图供 AI 分析") # 如果 URL 还是发布页面,可能需要继续操作 if "publish/publish" in final_url: return PublishResult( success=False, platform=self.platform_name, error="发布结果待确认,请查看截图", screenshot_base64=screenshot_base64, page_url=final_url, status='need_action' ) self.report_progress(100, "发布完成") print(f"[{self.platform_name}] Playwright 方式发布完成!") screenshot_base64 = await self.capture_screenshot() page_url = await self.get_page_url() return PublishResult( success=True, platform=self.platform_name, message="发布完成", screenshot_base64=screenshot_base64, page_url=page_url, status='success' ) async def get_account_info(self, cookies: str) -> dict: """获取账号信息""" print(f"\n{'='*60}") print(f"[{self.platform_name}] 获取账号信息") print(f"{'='*60}") captured_info = {} try: await self.init_browser() cookie_list = self.parse_cookies(cookies) await self.set_cookies(cookie_list) if not self.page: raise Exception("Page not initialized") # 监听个人信息 API async def handle_response(response): nonlocal captured_info if 'api/galaxy/creator/home/personal_info' in response.url: try: json_data = await response.json() print(f"[{self.platform_name}] 捕获个人信息 API", flush=True) if json_data.get('success') or json_data.get('code') == 0: data = json_data.get('data', {}) captured_info = { "account_id": f"xhs_{data.get('red_num', '')}", "account_name": data.get('name', ''), "avatar_url": data.get('avatar', ''), "fans_count": data.get('fans_count', 0), "works_count": 0 # 暂时无法直接获取准确的作品数,需要从作品列表获取 } except Exception as e: print(f"[{self.platform_name}] 解析个人信息失败: {e}", flush=True) self.page.on('response', handle_response) # 访问首页 print(f"[{self.platform_name}] 访问创作者首页...", flush=True) await self.page.goto("https://creator.xiaohongshu.com/new/home", wait_until="domcontentloaded") # 等待 API 响应 for _ in range(10): if captured_info: break await asyncio.sleep(1) if not captured_info: print(f"[{self.platform_name}] 未捕获到个人信息,尝试刷新...", flush=True) await self.page.reload() for _ in range(10): if captured_info: break await asyncio.sleep(1) if not captured_info: raise Exception("无法获取账号信息") # 尝试获取作品数(从首页或其他地方) # 或者简单地返回已获取的信息,作品数由 get_works 更新 return { "success": True, **captured_info } except Exception as e: import traceback traceback.print_exc() return { "success": False, "error": str(e) } finally: await self.close_browser() async def get_works(self, cookies: str, page: int = 0, page_size: int = 20) -> WorksResult: """获取小红书作品列表 - 通过直接调用创作者笔记列表 API 获取""" print(f"\n{'='*60}", flush=True) print(f"[{self.platform_name}] 获取作品列表", flush=True) print(f"[{self.platform_name}] page={page}, page_size={page_size}", flush=True) print(f"{'='*60}", flush=True) works: List[WorkItem] = [] total = 0 has_more = False next_page = "" api_page_size = 20 try: await self.init_browser() cookie_list = self.parse_cookies(cookies) # 打印 cookies 信息用于调试 print(f"[{self.platform_name}] 解析到 {len(cookie_list)} 个 cookies", flush=True) await self.set_cookies(cookie_list) if not self.page: raise Exception("Page not initialized") # 访问笔记管理页面 - 页面会自动发起 API 请求 print(f"[{self.platform_name}] 访问笔记管理页面...", flush=True) try: await self.page.goto("https://creator.xiaohongshu.com/new/note-manager", wait_until="domcontentloaded", timeout=30000) except Exception as nav_error: print(f"[{self.platform_name}] 导航超时,但继续尝试: {nav_error}", flush=True) # 检查登录状态 current_url = self.page.url print(f"[{self.platform_name}] 当前页面: {current_url}", flush=True) if "login" in current_url: raise Exception("Cookie 已过期,请重新登录") # 等待页面完全加载,确保签名函数可用 print(f"[{self.platform_name}] 等待页面完全加载和签名函数初始化...", flush=True) await asyncio.sleep(3) # 检查签名函数是否可用 sign_check_attempts = 0 max_sign_check_attempts = 10 while sign_check_attempts < max_sign_check_attempts: sign_available = await self.page.evaluate("""() => { return typeof window !== 'undefined' && typeof window._webmsxyw === 'function'; }""") if sign_available: print(f"[{self.platform_name}] ✓ 签名函数 _webmsxyw 已可用", flush=True) break sign_check_attempts += 1 print(f"[{self.platform_name}] ⏳ 等待签名函数... ({sign_check_attempts}/{max_sign_check_attempts})", flush=True) await asyncio.sleep(1) if sign_check_attempts >= max_sign_check_attempts: print(f"[{self.platform_name}] ⚠️ 警告: 签名函数 _webmsxyw 在 {max_sign_check_attempts} 次检查后仍不可用", flush=True) print(f"[{self.platform_name}] 继续尝试,但 API 调用可能会失败", flush=True) async def fetch_notes_page(p): # 再次检查签名函数(每次调用前都检查) sign_available = await self.page.evaluate("""() => { return typeof window !== 'undefined' && typeof window._webmsxyw === 'function'; }""") if not sign_available: print(f"[{self.platform_name}] ⚠️ 签名函数 _webmsxyw 不可用,等待...", flush=True) await asyncio.sleep(2) return await self.page.evaluate( """async (pageNum) => { try { // 使用正确的 API 端点:/api/galaxy/v2/creator/note/user/posted const url = `/api/galaxy/v2/creator/note/user/posted?tab=0&page=${pageNum}`; const headers = { 'Accept': 'application/json, text/plain, */*', 'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6', 'Referer': 'https://creator.xiaohongshu.com/new/note-manager', 'Sec-Fetch-Dest': 'empty', 'Sec-Fetch-Mode': 'cors', 'Sec-Fetch-Site': 'same-origin' }; // 尝试获取签名 let signResult = { hasSign: false, x_s: '', x_t: '', x_s_common: '', error: '' }; if (typeof window !== 'undefined' && typeof window._webmsxyw === 'function') { try { const sign = window._webmsxyw(url, ''); headers['x-s'] = sign['X-s']; headers['x-t'] = String(sign['X-t']); // 检查是否有 x-s-common if (sign['X-s-common']) { headers['x-s-common'] = sign['X-s-common']; } signResult = { hasSign: true, x_s: sign['X-s'] ? sign['X-s'].substring(0, 50) + '...' : '', x_t: String(sign['X-t']), x_s_common: sign['X-s-common'] ? sign['X-s-common'].substring(0, 50) + '...' : '', error: '' }; console.log('签名生成成功:', signResult); } catch (e) { signResult.error = e.toString(); console.error('签名生成失败:', e); } } else { signResult.error = '_webmsxyw function not found'; console.error('签名函数不存在'); } const res = await fetch(url, { method: 'GET', credentials: 'include', headers }); const responseData = await res.json(); return { ...responseData, _debug: { signResult: signResult, status: res.status, statusText: res.statusText } }; } catch (e) { return { success: false, error: e.toString() }; } }""", p ) def parse_notes(notes_list): parsed = [] for note in notes_list: note_id = note.get('id', '') if not note_id: continue cover_url = '' images_list = note.get('images_list', []) if images_list: cover_url = images_list[0].get('url', '') if cover_url.startswith('http://'): cover_url = cover_url.replace('http://', 'https://') duration = note.get('video_info', {}).get('duration', 0) status = 'published' tab_status = note.get('tab_status', 1) if tab_status == 0: status = 'draft' elif tab_status == 2: status = 'reviewing' elif tab_status == 3: status = 'rejected' video_url = f"https://www.xiaohongshu.com/explore/{note_id}" if note_id else "" parsed.append(WorkItem( work_id=note_id, title=note.get('display_title', '') or '无标题', cover_url=cover_url, video_url=video_url, duration=duration, status=status, publish_time=note.get('time', ''), play_count=note.get('view_count', 0), like_count=note.get('likes', 0), comment_count=note.get('comments_count', 0), share_count=note.get('shared_count', 0), collect_count=note.get('collected_count', 0), )) return parsed resp = None for attempt in range(1, 4): resp = await fetch_notes_page(page) # 打印调试信息 if resp and isinstance(resp, dict) and resp.get('_debug'): debug_info = resp.get('_debug', {}) sign_result = debug_info.get('signResult', {}) print(f"[{self.platform_name}] 🔍 调试信息: 签名可用: {sign_result.get('hasSign', False)}, X-S: {sign_result.get('x_s', '')}, X-T: {sign_result.get('x_t', '')}, X-S-Common: {sign_result.get('x_s_common', '')}, 签名错误: {sign_result.get('error', '')}, HTTP 状态: {debug_info.get('status', 'N/A')}", flush=True) resp.pop('_debug', None) if resp and (resp.get('success') or resp.get('code') == 0) and resp.get('data'): break print(f"[{self.platform_name}] 拉取作品列表失败,重试 {attempt}/3: {str(resp)[:200]}", flush=True) await asyncio.sleep(1.2 * attempt) if not resp or not (resp.get('success') or resp.get('code') == 0) or not resp.get('data'): error_msg = resp.get('msg') if isinstance(resp, dict) else str(resp) # 打印详细的错误信息 if isinstance(resp, dict): if resp.get('msg'): print(f"[{self.platform_name}] 错误消息: {resp.get('msg')}", flush=True) if resp.get('message'): print(f"[{self.platform_name}] 错误消息: {resp.get('message')}", flush=True) if resp.get('error'): print(f"[{self.platform_name}] 错误: {resp.get('error')}", flush=True) raise Exception(f"无法获取作品列表数据: {error_msg}") data = resp.get('data', {}) or {} notes = data.get('notes', []) or [] print(f"[{self.platform_name}] 第 {page} 页 notes 数量: {len(notes)}", flush=True) tags = data.get('tags', []) or [] if tags: preferred = 0 for tag in tags: if tag.get('id') == 'special.note_time_desc': preferred = tag.get('notes_count', 0) or tag.get('notesCount', 0) or tag.get('count', 0) or 0 break if preferred: total = preferred else: total = max([int(t.get('notes_count', 0) or t.get('notesCount', 0) or t.get('count', 0) or 0) for t in tags] + [0]) if not total: total = int(data.get('total', 0) or data.get('total_count', 0) or data.get('totalCount', 0) or 0) if not total and isinstance(data.get('page', {}), dict): total = int(data.get('page', {}).get('total', 0) or data.get('page', {}).get('totalCount', 0) or 0) next_page = data.get('page', "") if next_page == page: next_page = page + 1 works.extend(parse_notes(notes)) if total: has_more = (page * api_page_size + len(notes)) < total if has_more and (next_page == -1 or str(next_page) == "-1" or next_page == "" or next_page is None): next_page = page + 1 else: if len(notes) == 0: has_more = False else: next_resp = await fetch_notes_page(page + 1) next_data = (next_resp or {}).get('data', {}) if isinstance(next_resp, dict) else {} next_notes = next_data.get('notes', []) or [] has_more = len(next_notes) > 0 next_page = next_data.get('page', next_page) except Exception as e: import traceback print(f"[{self.platform_name}] 发生异常: {e}", flush=True) traceback.print_exc() return WorksResult( success=False, platform=self.platform_name, error=str(e) ) finally: # 确保关闭浏览器 await self.close_browser() return WorksResult( success=True, platform=self.platform_name, works=works, total=total or (page * api_page_size + len(works)), has_more=has_more, next_page=next_page ) async def get_all_works(self, cookies: str) -> WorksResult: """获取小红书全部作品(单次请求内自动翻页抓全量,避免 Node 侧分页不一致)""" print(f"\n{'='*60}", flush=True) print(f"[{self.platform_name}] 获取全部作品(auto paging)", flush=True) print(f"{'='*60}", flush=True) works: List[WorkItem] = [] total = 0 seen_ids = set() cursor: object = 0 max_iters = 800 api_page_size = 20 try: await self.init_browser() cookie_list = self.parse_cookies(cookies) print(f"[{self.platform_name}] 解析到 {len(cookie_list)} 个 cookies", flush=True) await self.set_cookies(cookie_list) if not self.page: raise Exception("Page not initialized") print(f"[{self.platform_name}] 访问笔记管理页面...", flush=True) try: await self.page.goto("https://creator.xiaohongshu.com/new/note-manager", wait_until="domcontentloaded", timeout=60000) print(f"[{self.platform_name}] 页面加载成功", flush=True) except Exception as nav_error: print(f"[{self.platform_name}] 导航超时,但继续尝试: {nav_error}", flush=True) # 即使超时也检查当前页面状态 try: await asyncio.sleep(2) current_url = self.page.url print(f"[{self.platform_name}] 超时后当前页面: {current_url}", flush=True) except Exception as e: print(f"[{self.platform_name}] 检查页面状态时出错: {e}", flush=True) current_url = self.page.url print(f"[{self.platform_name}] 当前页面: {current_url}", flush=True) if "login" in current_url: raise Exception("Cookie 已过期,请重新登录") # 等待页面完全加载,确保签名函数可用 print(f"[{self.platform_name}] 等待页面完全加载和签名函数初始化...", flush=True) await asyncio.sleep(3) # 检查签名函数是否可用 sign_check_attempts = 0 max_sign_check_attempts = 10 while sign_check_attempts < max_sign_check_attempts: sign_available = await self.page.evaluate("""() => { return typeof window !== 'undefined' && typeof window._webmsxyw === 'function'; }""") if sign_available: print(f"[{self.platform_name}] ✓ 签名函数 _webmsxyw 已可用", flush=True) break sign_check_attempts += 1 print(f"[{self.platform_name}] ⏳ 等待签名函数... ({sign_check_attempts}/{max_sign_check_attempts})", flush=True) await asyncio.sleep(1) if sign_check_attempts >= max_sign_check_attempts: print(f"[{self.platform_name}] ⚠️ 警告: 签名函数 _webmsxyw 在 {max_sign_check_attempts} 次检查后仍不可用", flush=True) print(f"[{self.platform_name}] 继续尝试,但 API 调用可能会失败", flush=True) async def fetch_notes_page(p): # 再次检查签名函数(每次调用前都检查) sign_available = await self.page.evaluate("""() => { return typeof window !== 'undefined' && typeof window._webmsxyw === 'function'; }""") if not sign_available: print(f"[{self.platform_name}] ⚠️ 签名函数 _webmsxyw 不可用,等待...", flush=True) await asyncio.sleep(2) return await self.page.evaluate( """async (pageNum) => { try { // 使用正确的 API 端点:/api/galaxy/v2/creator/note/user/posted const url = `/api/galaxy/v2/creator/note/user/posted?tab=0&page=${pageNum}`; const headers = { 'Accept': 'application/json, text/plain, */*', 'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6', 'Referer': 'https://creator.xiaohongshu.com/new/note-manager', 'Sec-Fetch-Dest': 'empty', 'Sec-Fetch-Mode': 'cors', 'Sec-Fetch-Site': 'same-origin' }; // 尝试获取签名 let signResult = { hasSign: false, x_s: '', x_t: '', x_s_common: '', error: '' }; if (typeof window !== 'undefined' && typeof window._webmsxyw === 'function') { try { const sign = window._webmsxyw(url, ''); headers['x-s'] = sign['X-s']; headers['x-t'] = String(sign['X-t']); // 检查是否有 x-s-common if (sign['X-s-common']) { headers['x-s-common'] = sign['X-s-common']; } signResult = { hasSign: true, x_s: sign['X-s'] ? sign['X-s'].substring(0, 50) + '...' : '', x_t: String(sign['X-t']), x_s_common: sign['X-s-common'] ? sign['X-s-common'].substring(0, 50) + '...' : '', error: '' }; console.log('签名生成成功:', signResult); } catch (e) { signResult.error = e.toString(); console.error('签名生成失败:', e); } } else { signResult.error = '_webmsxyw function not found'; console.error('签名函数不存在'); } const res = await fetch(url, { method: 'GET', credentials: 'include', headers }); const responseData = await res.json(); return { ...responseData, _debug: { signResult: signResult, status: res.status, statusText: res.statusText } }; } catch (e) { return { success: false, error: e.toString() }; } }""", p ) def parse_notes(notes_list): parsed = [] for note in notes_list: note_id = note.get('id', '') if not note_id: continue cover_url = '' images_list = note.get('images_list', []) if images_list: cover_url = images_list[0].get('url', '') if cover_url.startswith('http://'): cover_url = cover_url.replace('http://', 'https://') duration = note.get('video_info', {}).get('duration', 0) status = 'published' tab_status = note.get('tab_status', 1) if tab_status == 0: status = 'draft' elif tab_status == 2: status = 'reviewing' elif tab_status == 3: status = 'rejected' video_url = f"https://www.xiaohongshu.com/explore/{note_id}" if note_id else "" parsed.append(WorkItem( work_id=note_id, title=note.get('display_title', '') or '无标题', cover_url=cover_url, video_url=video_url, duration=duration, status=status, publish_time=note.get('time', ''), play_count=note.get('view_count', 0), like_count=note.get('likes', 0), comment_count=note.get('comments_count', 0), share_count=note.get('shared_count', 0), collect_count=note.get('collected_count', 0), )) return parsed async def collect_by_scrolling() -> WorksResult: print(f"[{self.platform_name}] 直连接口被拒绝,切换为滚动页面 + 监听 API 响应模式", flush=True) captured: List[WorkItem] = [] captured_total = 0 captured_seen = set() lock = asyncio.Lock() async def handle_response(response): nonlocal captured_total url = response.url if ("creator.xiaohongshu.com" not in url and "edith.xiaohongshu.com" not in url) or "creator/note/user/posted" not in url: return try: json_data = await response.json() except Exception: return if not isinstance(json_data, dict): return if not (json_data.get("success") or json_data.get("code") == 0) or not json_data.get("data"): return data = json_data.get("data", {}) or {} notes = data.get("notes", []) or [] tags = data.get("tags", []) or [] declared = 0 if tags: preferred = 0 for tag in tags: if tag.get("id") == "special.note_time_desc": preferred = tag.get("notes_count", 0) or tag.get("notesCount", 0) or tag.get("count", 0) or 0 break if preferred: declared = int(preferred) else: declared = max([int(t.get("notes_count", 0) or t.get("notesCount", 0) or t.get("count", 0) or 0) for t in tags] + [0]) if not declared: declared = int(data.get("total", 0) or data.get("total_count", 0) or data.get("totalCount", 0) or 0) if not declared and isinstance(data.get("page", {}), dict): declared = int(data.get("page", {}).get("total", 0) or data.get("page", {}).get("totalCount", 0) or 0) async with lock: if declared: captured_total = max(captured_total, declared) parsed = parse_notes(notes) new_count = 0 for w in parsed: if w.work_id and w.work_id not in captured_seen: captured_seen.add(w.work_id) captured.append(w) new_count += 1 if new_count > 0: print( f"[{self.platform_name}] 捕获 notes 响应: notes={len(notes)}, new={new_count}, total_now={len(captured)}, declared_total={captured_total}", flush=True ) self.page.on("response", handle_response) try: try: # 使用更宽松的等待条件,避免超时 await self.page.goto("https://creator.xiaohongshu.com/new/note-manager", wait_until="domcontentloaded", timeout=90000) print(f"[{self.platform_name}] 页面加载成功", flush=True) except Exception as nav_error: print(f"[{self.platform_name}] 导航异常(继续):{nav_error}", flush=True) # 即使超时也继续尝试,可能页面已经部分加载 try: await asyncio.sleep(3) current_url = self.page.url print(f"[{self.platform_name}] 超时后当前页面: {current_url}", flush=True) if "login" in current_url: raise Exception("Cookie 已过期,请重新登录") except Exception as e: if "Cookie" in str(e): raise print(f"[{self.platform_name}] 检查页面状态时出错: {e}", flush=True) await asyncio.sleep(2.0) idle_rounds = 0 last_count = 0 last_height = 0 for _ in range(1, 400): scroll_state = await self.page.evaluate( """() => { const isScrollable = (el) => { if (!el) return false; const style = window.getComputedStyle(el); const oy = style.overflowY; return (oy === 'auto' || oy === 'scroll') && (el.scrollHeight - el.clientHeight > 200); }; const pickBest = () => { const nodes = Array.from(document.querySelectorAll('*')); let best = document.scrollingElement || document.documentElement || document.body; let bestScroll = (best.scrollHeight || 0) - (best.clientHeight || 0); for (const el of nodes) { if (!isScrollable(el)) continue; const diff = el.scrollHeight - el.clientHeight; if (diff > bestScroll) { best = el; bestScroll = diff; } } return best; }; const el = pickBest(); const beforeTop = el.scrollTop || 0; const beforeHeight = el.scrollHeight || 0; el.scrollTo(0, beforeHeight); return { beforeTop, afterTop: el.scrollTop || 0, height: el.scrollHeight || 0, client: el.clientHeight || 0, }; }""" ) await asyncio.sleep(1.2) async with lock: count_now = len(captured) total_now = captured_total if total_now and count_now >= total_now: break height_now = int(scroll_state.get("height", 0) or 0) if isinstance(scroll_state, dict) else 0 if count_now == last_count and height_now == last_height: idle_rounds += 1 else: idle_rounds = 0 last_count = count_now last_height = height_now if idle_rounds >= 6: break async with lock: final_works = list(captured) final_total = captured_total or len(final_works) return WorksResult( success=True, platform=self.platform_name, works=final_works, total=final_total, has_more=False, next_page=-1 ) finally: try: self.page.remove_listener("response", handle_response) except Exception: pass # 添加请求监听,捕获请求头信息 captured_requests = [] async def handle_request(request): url = request.url if ("creator.xiaohongshu.com" in url or "edith.xiaohongshu.com" in url) and "creator/note/user/posted" in url: headers = request.headers captured_requests.append({ "url": url, "method": request.method, "headers": dict(headers), "timestamp": asyncio.get_event_loop().time() }) # 打印关键头部信息 x_s = headers.get('x-s', '') x_t = headers.get('x-t', '') x_s_common = headers.get('x-s-common', '') print(f"[{self.platform_name}] 📡 API 请求: {url}", flush=True) print(f"[{self.platform_name}] Method: {request.method}", flush=True) print(f"[{self.platform_name}] X-S: {x_s[:50] if x_s else '(none)'}...", flush=True) print(f"[{self.platform_name}] X-T: {x_t}", flush=True) print(f"[{self.platform_name}] X-S-Common: {x_s_common[:50] if x_s_common else '(none)'}...", flush=True) print(f"[{self.platform_name}] Cookie: {headers.get('cookie', '')[:100]}...", flush=True) self.page.on("request", handle_request) iters = 0 page_count = 0 # 统计实际获取到的页数 print(f"[{self.platform_name}] ========== 开始自动分页获取作品 ==========", flush=True) print(f"[{self.platform_name}] 最大迭代次数: {max_iters}, 每页大小: {api_page_size}", flush=True) while iters < max_iters: iters += 1 print(f"\n[{self.platform_name}] ---------- 第 {iters} 次请求 (cursor={cursor}) ----------", flush=True) resp = await fetch_notes_page(cursor) # 打印调试信息 if resp and isinstance(resp, dict) and resp.get('_debug'): debug_info = resp.get('_debug', {}) sign_result = debug_info.get('signResult', {}) print(f"[{self.platform_name}] 🔍 调试信息:", flush=True) print(f"[{self.platform_name}] 签名可用: {sign_result.get('hasSign', False)}", flush=True) if sign_result.get('x_s'): print(f"[{self.platform_name}] X-S: {sign_result.get('x_s', '')}", flush=True) if sign_result.get('x_t'): print(f"[{self.platform_name}] X-T: {sign_result.get('x_t', '')}", flush=True) if sign_result.get('error'): print(f"[{self.platform_name}] 签名错误: {sign_result.get('error', '')}", flush=True) print(f"[{self.platform_name}] HTTP 状态: {debug_info.get('status', 'N/A')} {debug_info.get('statusText', '')}", flush=True) # 移除调试信息,避免影响后续处理 resp.pop('_debug', None) if not resp or not isinstance(resp, dict): print(f"[{self.platform_name}] ❌ 第 {iters} 次拉取无响应,cursor={cursor}", flush=True) print(f"[{self.platform_name}] 响应类型: {type(resp)}, 响应内容: {str(resp)[:500]}", flush=True) break if not (resp.get('success') or resp.get('code') == 0) or not resp.get('data'): error_msg = str(resp)[:500] print(f"[{self.platform_name}] ❌ 拉取失败 cursor={cursor}", flush=True) print(f"[{self.platform_name}] 响应详情: {error_msg}", flush=True) print(f"[{self.platform_name}] success={resp.get('success')}, code={resp.get('code')}, has_data={bool(resp.get('data'))}", flush=True) # 打印详细的错误信息 if resp.get('msg'): print(f"[{self.platform_name}] 错误消息: {resp.get('msg')}", flush=True) if resp.get('message'): print(f"[{self.platform_name}] 错误消息: {resp.get('message')}", flush=True) if resp.get('error'): print(f"[{self.platform_name}] 错误: {resp.get('error')}", flush=True) # 打印调试信息 if resp.get('_debug'): debug_info = resp.get('_debug', {}) print(f"[{self.platform_name}] HTTP 状态: {debug_info.get('status', 'N/A')} {debug_info.get('statusText', '')}", flush=True) sign_result = debug_info.get('signResult', {}) if sign_result.get('error'): print(f"[{self.platform_name}] 签名错误: {sign_result.get('error')}", flush=True) if iters == 1: print(f"[{self.platform_name}] 第一次请求失败,切换到滚动模式", flush=True) return await collect_by_scrolling() break data = resp.get('data', {}) or {} notes = data.get('notes', []) or [] if not notes: print(f"[{self.platform_name}] ⚠️ cursor={cursor} 无作品,停止分页", flush=True) break # 统计页数 page_count += 1 print(f"[{self.platform_name}] ✅ 第 {page_count} 页获取成功,本页作品数: {len(notes)}", flush=True) tags = data.get('tags', []) or [] if tags: preferred = 0 for tag in tags: if tag.get('id') == 'special.note_time_desc': preferred = tag.get('notes_count', 0) or tag.get('notesCount', 0) or tag.get('count', 0) or 0 break if preferred: total = max(total, int(preferred)) print(f"[{self.platform_name}] 📊 从 tags 获取总数: {total} (preferred)", flush=True) else: tag_total = max([int(t.get('notes_count', 0) or t.get('notesCount', 0) or t.get('count', 0) or 0) for t in tags] + [0]) total = max(total, tag_total) if tag_total > 0: print(f"[{self.platform_name}] 📊 从 tags 获取总数: {total}", flush=True) if not total: t2 = int(data.get('total', 0) or data.get('total_count', 0) or data.get('totalCount', 0) or 0) if not t2 and isinstance(data.get('page', {}), dict): t2 = int(data.get('page', {}).get('total', 0) or data.get('page', {}).get('totalCount', 0) or 0) total = max(total, t2) if t2 > 0: print(f"[{self.platform_name}] 📊 从 data.total 获取总数: {total}", flush=True) parsed = parse_notes(notes) new_items = [] for w in parsed: if w.work_id and w.work_id not in seen_ids: seen_ids.add(w.work_id) new_items.append(w) works.extend(new_items) print(f"[{self.platform_name}] 📈 累计统计: 本页新作品={len(new_items)}, 累计作品数={len(works)}, 声明总数={total}", flush=True) if total and len(works) >= total: print(f"[{self.platform_name}] ✅ 已获取全部作品 (累计={len(works)} >= 总数={total}),停止分页", flush=True) break if len(new_items) == 0: print(f"[{self.platform_name}] ⚠️ 本页无新作品,停止分页", flush=True) break next_page = data.get('page', "") old_cursor = cursor if next_page == cursor: next_page = "" if next_page == -1 or str(next_page) == "-1": next_page = "" if next_page is None or next_page == "": if isinstance(cursor, int): cursor = cursor + 1 else: cursor = len(works) // api_page_size print(f"[{self.platform_name}] 🔄 下一页 cursor: {old_cursor} -> {cursor} (自动递增)", flush=True) else: cursor = next_page print(f"[{self.platform_name}] 🔄 下一页 cursor: {old_cursor} -> {cursor} (API返回)", flush=True) await asyncio.sleep(0.5) # 移除请求监听器 try: self.page.remove_listener("request", handle_request) except Exception: pass print(f"\n[{self.platform_name}] ========== 分页完成 ==========", flush=True) print(f"[{self.platform_name}] 📊 分页统计: 总请求次数={iters}, 成功获取页数={page_count}, 累计作品数={len(works)}, 声明总数={total}", flush=True) if captured_requests: print(f"[{self.platform_name}] 📡 捕获到 {len(captured_requests)} 个 API 请求", flush=True) for i, req in enumerate(captured_requests[:3], 1): # 只显示前3个 print(f"[{self.platform_name}] 请求 {i}: {req['method']} {req['url']}", flush=True) if 'x-s' in req['headers']: print(f"[{self.platform_name}] X-S: {req['headers']['x-s'][:50]}...", flush=True) if 'x-t' in req['headers']: print(f"[{self.platform_name}] X-T: {req['headers']['x-t']}", flush=True) print(f"[{self.platform_name}] ========================================\n", flush=True) except Exception as e: import traceback error_trace = traceback.format_exc() print(f"[{self.platform_name}] 发生异常: {e}", flush=True) traceback.print_exc() return WorksResult( success=False, platform=self.platform_name, error=str(e), debug_info=f"异常详情: {error_trace[:500]}" ) finally: await self.close_browser() debug_info = f"总请求次数={iters}, 成功获取页数={page_count}, 累计作品数={len(works)}, 声明总数={total}" if len(works) == 0: debug_info += " | 警告: 没有获取到任何作品,可能原因: Cookie失效、API调用失败、或账号无作品" return WorksResult( success=True, platform=self.platform_name, works=works, total=total or len(works), has_more=False, next_page=-1, debug_info=debug_info ) async def get_comments(self, cookies: str, work_id: str, cursor: str = "") -> CommentsResult: """ 获取账号下所有作品的评论 —— 完全复刻 get_xiaohongshu_work_comments.py 的7步流程。 """ all_comments: List[CommentItem] = [] total_comments = 0 has_more = False browser = None print(222222222222222222222222222222222222) print(work_id) global stored_cookies try: # --- Step 1: 初始化浏览器和 Cookie --- cookie_list = self.parse_cookies(cookies) playwright = await async_playwright().start() browser = await playwright.chromium.launch(headless=False) context = await browser.new_context( viewport={"width": 1400, "height": 900}, user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36" ) if os.path.exists("cookies.json"): with open("cookies.json", "r") as f: stored_cookies = json.load(f) if stored_cookies: await context.add_cookies(stored_cookies) page = await context.new_page() # --- Step 2: 打开小红书主页 --- await page.goto("https://www.xiaohongshu.com", wait_until="domcontentloaded") await asyncio.sleep(1.5) # --- Step 3: 检查并处理登录弹窗 --- try: if await page.is_visible(".login-container", timeout=3000): await page.wait_for_selector(".login-container", state="hidden", timeout=120000) stored_cookies = await context.cookies() with open("xiaohongshu_cookies.json", "w") as f: json.dump(stored_cookies, f) except Exception as e: pass # 忽略超时,继续执行 # --- 提取 User ID --- user_id = None for cookie in cookie_list: if cookie.get('name') == 'x-user-id-creator.xiaohongshu.com': user_id = cookie.get('value') break if not user_id: raise ValueError("无法从 Cookie 中提取 user_id") # --- Step 4: 跳转到用户主页 --- profile_url = f"https://www.xiaohongshu.com/user/profile/{user_id}" await page.goto(profile_url, wait_until="domcontentloaded") await asyncio.sleep(2) # --- 等待笔记区域加载 --- try: await page.wait_for_selector("#userPostedFeeds .note-item", timeout=20000) except: raise Exception("笔记区域未加载,请检查账号是否公开或 Cookie 是否有效") # --- Step 5: 滚动到底部加载全部笔记 --- last_height = None while True: await page.evaluate("window.scrollTo(0, document.body.scrollHeight)") await asyncio.sleep(2) new_height = await page.evaluate("document.body.scrollHeight") if new_height == last_height: break last_height = new_height # --- 获取所有封面图 --- note_imgs = await page.query_selector_all("#userPostedFeeds .note-item .cover img") print(f"共找到 {len(note_imgs)} 张封面图") # --- Step 6 & 7: 依次点击封面图,捕获评论并结构化 --- for i, img in enumerate(note_imgs): try: # >>> 新增:从 img 提取 note_id 并与 work_id 比较 <<< note_id = await img.evaluate('''el => { const item = el.closest('.note-item'); if (!item) return null; const link = item.querySelector('a[href^="/explore/"]'); return link ? link.href.split('/').pop() : null; }''') if note_id != work_id: print(f"note_id {note_id} 与目标 work_id {work_id} 不匹配,跳出循环") continue # <<< 新增结束 >>> await img.scroll_into_view_if_needed() await asyncio.sleep(0.5) comment_resp = None def handle_response(response): nonlocal comment_resp if "edith.xiaohongshu.com/api/sns/web/v2/comment/page" in response.url: comment_resp = response page.on("response", handle_response) await img.click() await asyncio.sleep(1.5) page.remove_listener("response", handle_response) if not comment_resp: await page.keyboard.press("Escape") continue json_data = await comment_resp.json() if not (json_data.get("success") or json_data.get("code") == 0): await page.keyboard.press("Escape") continue data = json_data.get("data", {}) raw_comments = data.get("comments", []) note_id = data.get("note_id", "") for main_cmt in raw_comments: # 主评论 user_info = main_cmt.get("user_info", {}) all_comments.append(CommentItem( comment_id=main_cmt["id"], parent_comment_id=None, work_id=work_id, content=main_cmt["content"], author_id=user_info.get("user_id", ""), author_name=user_info.get("nickname", ""), author_avatar=user_info.get("image", ""), like_count=int(main_cmt.get("like_count", 0)), reply_count=main_cmt.get("sub_comment_count", 0), create_time=self._timestamp_to_readable(main_cmt.get("create_time", 0)), )) # 子评论 for sub_cmt in main_cmt.get("sub_comments", []): sub_user = sub_cmt.get("user_info", {}) all_comments.append(CommentItem( comment_id=sub_cmt["id"], parent_comment_id=main_cmt["id"], work_id=work_id, content=sub_cmt["content"], author_id=sub_user.get("user_id", ""), author_name=sub_user.get("nickname", ""), author_avatar=sub_user.get("image", ""), like_count=int(sub_cmt.get("like_count", 0)), reply_count=0, create_time=self._timestamp_to_readable(sub_cmt.get("create_time", 0)), )) # 关闭弹窗 await page.keyboard.press("Escape") await asyncio.sleep(1) except Exception as e: # 出错也尝试关闭弹窗 try: await page.keyboard.press("Escape") await asyncio.sleep(0.5) except: pass continue # --- 返回结果 --- total_comments = len(all_comments) # return { # 'success': True, # 'platform': self.platform_name, # 'work_comments': all_comments, # 注意:此处为扁平列表,如需按作品分组可在外层处理 # 'total': total_comments # } return CommentsResult( success=True, platform=self.platform_name, work_id=work_id, comments=all_comments, total=total_comments, has_more=has_more ) except Exception as e: import traceback traceback.print_exc() return CommentsResult( success=True, platform=self.platform_name, work_id=work_id, total=0 ) finally: if browser: await browser.close() def _timestamp_to_readable(self, ts_ms: int) -> str: """将毫秒时间戳转换为可读格式""" from datetime import datetime if not ts_ms: return "" try: return datetime.fromtimestamp(ts_ms / 1000).strftime("%Y-%m-%d %H:%M:%S") except Exception: return "" async def get_all_comments(self, cookies: str) -> dict: """获取所有作品的评论 - 通过评论管理页面""" print(f"\n{'='*60}") print(f"[{self.platform_name}] 获取所有作品评论") print(f"{'='*60}") all_work_comments = [] captured_comments = [] captured_notes = {} # note_id -> note_info try: await self.init_browser() cookie_list = self.parse_cookies(cookies) await self.set_cookies(cookie_list) if not self.page: raise Exception("Page not initialized") # 设置 API 响应监听器 async def handle_response(response): nonlocal captured_comments, captured_notes url = response.url try: # 监听评论列表 API - 多种格式 if '/comment/' in url and ('page' in url or 'list' in url): json_data = await response.json() print(f"[{self.platform_name}] 捕获到评论 API: {url[:100]}...", flush=True) if json_data.get('success') or json_data.get('code') == 0: data = json_data.get('data', {}) comments = data.get('comments', []) or data.get('list', []) # 从 URL 中提取 note_id import re note_id_match = re.search(r'note_id=([^&]+)', url) note_id = note_id_match.group(1) if note_id_match else '' if comments: for comment in comments: # 添加 note_id 到评论中 if note_id and 'note_id' not in comment: comment['note_id'] = note_id captured_comments.append(comment) print(f"[{self.platform_name}] 捕获到 {len(comments)} 条评论 (note_id={note_id}),总计: {len(captured_comments)}", flush=True) # 监听笔记列表 API if '/note/' in url and ('list' in url or 'posted' in url or 'manager' in url): json_data = await response.json() if json_data.get('success') or json_data.get('code') == 0: data = json_data.get('data', {}) notes = data.get('notes', []) or data.get('list', []) print(f"[{self.platform_name}] 捕获到笔记列表 API: {len(notes)} 个笔记", flush=True) for note in notes: note_id = note.get('note_id', '') or note.get('id', '') if note_id: cover_url = '' cover = note.get('cover', {}) if isinstance(cover, dict): cover_url = cover.get('url', '') or cover.get('url_default', '') elif isinstance(cover, str): cover_url = cover captured_notes[note_id] = { 'title': note.get('title', '') or note.get('display_title', ''), 'cover': cover_url, } except Exception as e: print(f"[{self.platform_name}] 解析响应失败: {e}", flush=True) self.page.on('response', handle_response) print(f"[{self.platform_name}] 已注册 API 响应监听器", flush=True) # 访问评论管理页面 print(f"[{self.platform_name}] 访问评论管理页面...", flush=True) await self.page.goto("https://creator.xiaohongshu.com/creator/comment", wait_until="domcontentloaded", timeout=30000) await asyncio.sleep(5) # 检查登录状态 current_url = self.page.url if "login" in current_url: raise Exception("Cookie 已过期,请重新登录") print(f"[{self.platform_name}] 页面加载完成,当前捕获: {len(captured_comments)} 条评论, {len(captured_notes)} 个笔记", flush=True) # 滚动加载更多评论 for i in range(5): await self.page.evaluate('window.scrollBy(0, 500)') await asyncio.sleep(1) await asyncio.sleep(3) # 移除监听器 self.page.remove_listener('response', handle_response) print(f"[{self.platform_name}] 最终捕获: {len(captured_comments)} 条评论, {len(captured_notes)} 个笔记", flush=True) # 按作品分组评论 work_comments_map = {} # note_id -> work_comments for comment in captured_comments: # 获取笔记信息 note_info = comment.get('note_info', {}) or comment.get('note', {}) note_id = comment.get('note_id', '') or note_info.get('note_id', '') or note_info.get('id', '') if not note_id: continue if note_id not in work_comments_map: saved_note = captured_notes.get(note_id, {}) cover_url = '' cover = note_info.get('cover', {}) if isinstance(cover, dict): cover_url = cover.get('url', '') or cover.get('url_default', '') elif isinstance(cover, str): cover_url = cover if not cover_url: cover_url = saved_note.get('cover', '') work_comments_map[note_id] = { 'work_id': note_id, 'title': note_info.get('title', '') or note_info.get('display_title', '') or saved_note.get('title', ''), 'cover_url': cover_url, 'comments': [] } cid = comment.get('id', '') or comment.get('comment_id', '') if not cid: continue user_info = comment.get('user_info', {}) or comment.get('user', {}) work_comments_map[note_id]['comments'].append({ 'comment_id': cid, 'author_id': user_info.get('user_id', '') or user_info.get('id', ''), 'author_name': user_info.get('nickname', '') or user_info.get('name', ''), 'author_avatar': user_info.get('image', '') or user_info.get('avatar', ''), 'content': comment.get('content', ''), 'like_count': comment.get('like_count', 0), 'create_time': comment.get('create_time', ''), }) all_work_comments = list(work_comments_map.values()) total_comments = sum(len(w['comments']) for w in all_work_comments) print(f"[{self.platform_name}] 获取到 {len(all_work_comments)} 个作品的 {total_comments} 条评论", flush=True) except Exception as e: import traceback traceback.print_exc() return { 'success': False, 'platform': self.platform_name, 'error': str(e), 'work_comments': [] } finally: await self.close_browser() return { 'success': True, 'platform': self.platform_name, 'work_comments': all_work_comments, 'total': len(all_work_comments) } ================================================================================ 文件: server\python\platforms\baijiahao.py ================================================================================ # -*- coding: utf-8 -*- """ 百家号视频发布器 """ import asyncio import json from typing import List from datetime import datetime from .base import ( BasePublisher, PublishParams, PublishResult, WorkItem, WorksResult, CommentItem, CommentsResult ) class BaijiahaoPublisher(BasePublisher): """ 百家号视频发布器 使用 Playwright 自动化操作百家号创作者中心 """ platform_name = "baijiahao" login_url = "https://baijiahao.baidu.com/" publish_url = "https://baijiahao.baidu.com/builder/rc/edit?type=video" cookie_domain = ".baidu.com" # 登录检测配置 login_check_url = "https://baijiahao.baidu.com/builder/rc/home" login_indicators = ["passport.baidu.com", "/login", "wappass.baidu.com"] login_selectors = ['text="登录"', 'text="请登录"', '[class*="login-btn"]'] async def get_account_info(self, cookies: str) -> dict: """ 获取百家号账号信息 使用直接 HTTP API 调用,不使用浏览器 """ import aiohttp print(f"\n{'='*60}") print(f"[{self.platform_name}] 获取账号信息 (使用 API)") print(f"{'='*60}") try: # 解析 cookies cookie_list = self.parse_cookies(cookies) cookie_dict = {c['name']: c['value'] for c in cookie_list} # 重要:百家号需要先访问主页建立会话上下文 print(f"[{self.platform_name}] 第一步:访问主页建立会话...") session_headers = { 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7', 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36', # Cookie 由 session 管理,不手动设置 'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8', 'Accept-Encoding': 'gzip, deflate, br', 'Connection': 'keep-alive', 'Upgrade-Insecure-Requests': '1', 'Sec-Fetch-Dest': 'document', 'Sec-Fetch-Mode': 'navigate', 'Sec-Fetch-Site': 'none', 'Sec-Fetch-User': '?1', 'sec-ch-ua': '"Not_A Brand";v="8", "Chromium";v="120", "Google Chrome";v="120"', 'sec-ch-ua-mobile': '?0', 'sec-ch-ua-platform': '"Windows"' } headers = { 'Accept': 'application/json, text/plain, */*', 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36', # Cookie 由 session 管理,不手动设置 'Referer': 'https://baijiahao.baidu.com/builder/rc/home', 'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8', 'Accept-Encoding': 'gzip, deflate, br', 'Connection': 'keep-alive', 'Sec-Fetch-Dest': 'empty', 'Sec-Fetch-Mode': 'cors', 'Sec-Fetch-Site': 'same-origin', 'sec-ch-ua': '"Not_A Brand";v="8", "Chromium";v="120", "Google Chrome";v="120"', 'sec-ch-ua-mobile': '?0', 'sec-ch-ua-platform': '"Windows"' } # 使用 cookies 参数初始化 session,让 aiohttp 自动管理 cookie 更新 async with aiohttp.ClientSession(cookies=cookie_dict) as session: # 步骤 0: 先访问主页建立会话上下文(关键步骤!) print(f"[{self.platform_name}] [0/4] 访问主页建立会话上下文...") async with session.get( 'https://baijiahao.baidu.com/builder/rc/home', headers=session_headers, timeout=aiohttp.ClientTimeout(total=30) ) as home_response: home_status = home_response.status print(f"[{self.platform_name}] 主页访问状态: {home_status}") # 获取响应头中的新cookies(如果有) if 'Set-Cookie' in home_response.headers: new_cookies = home_response.headers['Set-Cookie'] print(f"[{self.platform_name}] 获取到新的会话Cookie") # 这里可以处理新的cookies,但暂时跳过复杂处理 # 短暂等待确保会话建立 await asyncio.sleep(1) # 步骤 1: 获取账号基本信息 print(f"[{self.platform_name}] [1/4] 调用 appinfo API...") async with session.get( 'https://baijiahao.baidu.com/builder/app/appinfo', headers=headers, timeout=aiohttp.ClientTimeout(total=30) ) as response: appinfo_result = await response.json() print(f"[{self.platform_name}] appinfo API 完整响应: {json.dumps(appinfo_result, ensure_ascii=False)[:500]}") print(f"[{self.platform_name}] appinfo API 响应: errno={appinfo_result.get('errno')}") # 检查登录状态 if appinfo_result.get('errno') != 0: error_msg = appinfo_result.get('errmsg', '未知错误') errno = appinfo_result.get('errno') print(f"[{self.platform_name}] API 返回错误: errno={errno}, msg={error_msg}") # errno 110 表示未登录 if errno == 110: return { "success": False, "error": "Cookie 已失效,需要重新登录", "need_login": True } # errno 10001402 表示分散认证问题,尝试重新访问主页后重试 if errno == 10001402: print(f"[{self.platform_name}] 检测到分散认证问题,尝试重新访问主页...") await asyncio.sleep(2) # 重新访问主页 async with session.get( 'https://baijiahao.baidu.com/builder/rc/home', headers=session_headers, timeout=aiohttp.ClientTimeout(total=30) ) as retry_home_response: print(f"[{self.platform_name}] 重新访问主页状态: {retry_home_response.status}") await asyncio.sleep(1) # 重试 API 调用 async with session.get( 'https://baijiahao.baidu.com/builder/app/appinfo', headers=headers, timeout=aiohttp.ClientTimeout(total=30) ) as retry_response: retry_result = await retry_response.json() if retry_result.get('errno') == 0: print(f"[{self.platform_name}] 分散认证问题已解决") # 使用重试成功的结果继续处理 appinfo_result = retry_result else: print(f"[{self.platform_name}] 重试仍然失败") return { "success": False, "error": f"分散认证问题: {error_msg}", "need_login": True } return { "success": False, "error": error_msg, "need_login": True } # 获取用户数据 user_data = appinfo_result.get('data', {}).get('user', {}) if not user_data: return { "success": False, "error": "无法获取用户信息", "need_login": True } # 检查账号状态 status = user_data.get('status', '') # 有效的账号状态:audit(审核中), pass(已通过), normal(正常), newbie(新手) valid_statuses = ['audit', 'pass', 'normal', 'newbie'] if status not in valid_statuses: print(f"[{self.platform_name}] 账号状态异常: {status}") # 提取基本信息 account_name = user_data.get('name') or user_data.get('uname') or '百家号账号' app_id = user_data.get('app_id') or user_data.get('id', 0) account_id = str(app_id) if app_id else f"baijiahao_{int(datetime.now().timestamp() * 1000)}" # 处理头像 URL avatar_url = user_data.get('avatar') or user_data.get('avatar_unify', '') if avatar_url and avatar_url.startswith('//'): avatar_url = 'https:' + avatar_url print(f"[{self.platform_name}] 账号名称: {account_name}, ID: {account_id}") # 步骤 2: 获取粉丝数(非关键,失败不影响整体) fans_count = 0 try: print(f"[{self.platform_name}] [2/3] 调用 growth/get_info API 获取粉丝数...") async with session.get( 'https://baijiahao.baidu.com/cms-ui/rights/growth/get_info', headers=headers, timeout=aiohttp.ClientTimeout(total=10) ) as response: growth_result = await response.json() if growth_result.get('errno') == 0: growth_data = growth_result.get('data', {}) fans_count = int(growth_data.get('fans_num', 0)) print(f"[{self.platform_name}] 粉丝数: {fans_count}") else: print(f"[{self.platform_name}] 获取粉丝数失败: {growth_result.get('errmsg')}") except Exception as e: print(f"[{self.platform_name}] 获取粉丝数异常(非关键): {e}") # 步骤 3: 获取作品数量(使用与 Node 端一致的 API) works_count = 0 try: print(f"[{self.platform_name}] [3/3] 调用 article/lists API 获取作品数...") # 使用与 Node 端一致的 API 参数 list_url = 'https://baijiahao.baidu.com/pcui/article/lists?currentPage=1&pageSize=20&search=&type=&collection=&startDate=&endDate=&clearBeforeFetch=false&dynamic=0' async with session.get( list_url, headers={ 'accept': '*/*', 'user-agent': 'PostmanRuntime/7.51.0', # cookie 由 session 管理 'referer': 'https://baijiahao.baidu.com/builder/rc/content', 'connection': 'keep-alive', 'accept-encoding': 'gzip, deflate, br', }, timeout=aiohttp.ClientTimeout(total=30) ) as response: response_text = await response.text() print(f"[{self.platform_name}] ========== Works API Response ==========") print(f"[{self.platform_name}] Full response: {response_text[:1000]}...") # 只打印前1000字符 print(f"[{self.platform_name}] =========================================") works_result = json.loads(response_text) # 处理分散认证问题 (errno=10001402),重试一次 if works_result.get('errno') == 10001402: print(f"[{self.platform_name}] 分散认证问题 (errno=10001402),3秒后重试...") await asyncio.sleep(3) # 重试一次,使用更完整的请求头 retry_headers = headers.copy() retry_headers.update({ 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7', 'Cache-Control': 'max-age=0', 'Upgrade-Insecure-Requests': '1', }) async with session.get( list_url, headers=retry_headers, timeout=aiohttp.ClientTimeout(total=30) ) as retry_response: retry_text = await retry_response.text() print(f"[{self.platform_name}] ========== Works API Retry Response ==========") print(f"[{self.platform_name}] Full retry response: {retry_text[:1000]}...") print(f"[{self.platform_name}] ===============================================") works_result = json.loads(retry_text) if works_result.get('errno') == 10001402: print(f"[{self.platform_name}] 重试仍然失败,返回已获取的账号信息") works_result = None if works_result and works_result.get('errno') == 0: works_data = works_result.get('data', {}) # 优先使用 data.page.totalCount,如果没有则使用 data.total(兼容旧格式) page_info = works_data.get('page', {}) works_count = int(page_info.get('totalCount', works_data.get('total', 0))) print(f"[{self.platform_name}] 作品数: {works_count} (from page.totalCount: {page_info.get('totalCount')}, from total: {works_data.get('total')})") else: errno = works_result.get('errno') if works_result else 'unknown' errmsg = works_result.get('errmsg', 'unknown error') if works_result else 'no response' print(f"[{self.platform_name}] 获取作品数失败: errno={errno}, errmsg={errmsg}") except Exception as e: import traceback print(f"[{self.platform_name}] 获取作品数异常(非关键): {e}") traceback.print_exc() # 返回账号信息 account_info = { "success": True, "account_id": account_id, "account_name": account_name, "avatar_url": avatar_url, "fans_count": fans_count, "works_count": works_count, } print(f"[{self.platform_name}] ✓ 获取成功: {account_name} (粉丝: {fans_count}, 作品: {works_count})") return account_info except Exception as e: import traceback traceback.print_exc() return { "success": False, "error": str(e) } async def check_captcha(self) -> dict: """检查页面是否需要验证码""" if not self.page: return {'need_captcha': False, 'captcha_type': ''} try: # 检查各种验证码 captcha_selectors = [ 'text="请输入验证码"', 'text="滑动验证"', '[class*="captcha"]', '[class*="verify"]', ] for selector in captcha_selectors: try: if await self.page.locator(selector).count() > 0: print(f"[{self.platform_name}] 检测到验证码: {selector}") return {'need_captcha': True, 'captcha_type': 'image'} except: pass # 检查登录弹窗 login_selectors = [ 'text="请登录"', 'text="登录后继续"', '[class*="login-dialog"]', ] for selector in login_selectors: try: if await self.page.locator(selector).count() > 0: print(f"[{self.platform_name}] 检测到需要登录: {selector}") return {'need_captcha': True, 'captcha_type': 'login'} except: pass except Exception as e: print(f"[{self.platform_name}] 验证码检测异常: {e}") return {'need_captcha': False, 'captcha_type': ''} async def publish(self, cookies: str, params: PublishParams) -> PublishResult: """发布视频到百家号""" import os print(f"\n{'='*60}") print(f"[{self.platform_name}] 开始发布视频") print(f"[{self.platform_name}] 视频路径: {params.video_path}") print(f"[{self.platform_name}] 标题: {params.title}") print(f"[{self.platform_name}] Headless: {self.headless}") print(f"{'='*60}") self.report_progress(5, "正在初始化浏览器...") # 初始化浏览器 await self.init_browser() print(f"[{self.platform_name}] 浏览器初始化完成") # 解析并设置 cookies cookie_list = self.parse_cookies(cookies) print(f"[{self.platform_name}] 解析到 {len(cookie_list)} 个 cookies") await self.set_cookies(cookie_list) if not self.page: raise Exception("Page not initialized") # 检查视频文件 if not os.path.exists(params.video_path): raise Exception(f"视频文件不存在: {params.video_path}") print(f"[{self.platform_name}] 视频文件存在,大小: {os.path.getsize(params.video_path)} bytes") self.report_progress(10, "正在打开上传页面...") # 访问视频发布页面(使用新视频发布界面) video_publish_url = "https://baijiahao.baidu.com/builder/rc/edit?type=videoV2&is_from_cms=1" await self.page.goto(video_publish_url, wait_until="domcontentloaded", timeout=60000) await asyncio.sleep(3) # 检查是否跳转到登录页 current_url = self.page.url print(f"[{self.platform_name}] 当前页面: {current_url}") for indicator in self.login_indicators: if indicator in current_url: screenshot_base64 = await self.capture_screenshot() return PublishResult( success=False, platform=self.platform_name, error="Cookie 已过期,需要重新登录", need_captcha=True, captcha_type='login', screenshot_base64=screenshot_base64, page_url=current_url, status='need_captcha' ) # 使用 AI 检查验证码 ai_captcha = await self.ai_check_captcha() if ai_captcha['has_captcha']: print(f"[{self.platform_name}] AI检测到验证码: {ai_captcha['captcha_type']}", flush=True) screenshot_base64 = await self.capture_screenshot() return PublishResult( success=False, platform=self.platform_name, error=f"检测到{ai_captcha['captcha_type']}验证码,需要使用有头浏览器完成验证", need_captcha=True, captcha_type=ai_captcha['captcha_type'], screenshot_base64=screenshot_base64, page_url=current_url, status='need_captcha' ) # 传统方式检查验证码 captcha_result = await self.check_captcha() if captcha_result['need_captcha']: screenshot_base64 = await self.capture_screenshot() return PublishResult( success=False, platform=self.platform_name, error=f"需要{captcha_result['captcha_type']}验证码,请使用有头浏览器完成验证", need_captcha=True, captcha_type=captcha_result['captcha_type'], screenshot_base64=screenshot_base64, page_url=current_url, status='need_captcha' ) self.report_progress(15, "正在选择视频文件...") # 等待页面加载完成 await asyncio.sleep(2) # 关闭可能的弹窗 try: close_buttons = [ 'button:has-text("我知道了")', 'button:has-text("知道了")', '[class*="close"]', '[class*="modal-close"]', ] for btn_selector in close_buttons: try: btn = self.page.locator(btn_selector).first if await btn.count() > 0 and await btn.is_visible(): await btn.click() await asyncio.sleep(0.5) except: pass except: pass # 上传视频 - 尝试多种方式 upload_success = False # 方法1: 直接通过 file input 上传 try: file_inputs = await self.page.query_selector_all('input[type="file"]') print(f"[{self.platform_name}] 找到 {len(file_inputs)} 个文件输入") for file_input in file_inputs: try: await file_input.set_input_files(params.video_path) upload_success = True print(f"[{self.platform_name}] 通过 file input 上传成功") break except Exception as e: print(f"[{self.platform_name}] file input 上传失败: {e}") except Exception as e: print(f"[{self.platform_name}] 查找 file input 失败: {e}") # 方法2: 点击上传区域 if not upload_success: upload_selectors = [ 'div[class*="upload-box"]', 'div[class*="drag-upload"]', 'div[class*="uploader"]', 'div:has-text("点击上传")', 'div:has-text("选择文件")', '[class*="upload-area"]', ] for selector in upload_selectors: if upload_success: break try: upload_area = self.page.locator(selector).first if await upload_area.count() > 0: print(f"[{self.platform_name}] 尝试点击上传区域: {selector}") async with self.page.expect_file_chooser(timeout=10000) as fc_info: await upload_area.click() file_chooser = await fc_info.value await file_chooser.set_files(params.video_path) upload_success = True print(f"[{self.platform_name}] 通过点击上传区域成功") break except Exception as e: print(f"[{self.platform_name}] 选择器 {selector} 失败: {e}") if not upload_success: screenshot_base64 = await self.capture_screenshot() return PublishResult( success=False, platform=self.platform_name, error="未找到上传入口", screenshot_base64=screenshot_base64, page_url=await self.get_page_url(), status='failed' ) self.report_progress(20, "等待视频上传...") # 等待视频上传完成(最多5分钟) upload_timeout = 300 start_time = asyncio.get_event_loop().time() while asyncio.get_event_loop().time() - start_time < upload_timeout: # 检查上传进度 progress_text = '' try: progress_el = self.page.locator('[class*="progress"], [class*="percent"]').first if await progress_el.count() > 0: progress_text = await progress_el.text_content() if progress_text: import re match = re.search(r'(\d+)%', progress_text) if match: pct = int(match.group(1)) self.report_progress(20 + int(pct * 0.4), f"视频上传中 {pct}%...") if pct >= 100: print(f"[{self.platform_name}] 上传完成") break except: pass # 检查是否出现标题输入框(说明上传完成) try: title_input = self.page.locator('input[placeholder*="标题"], textarea[placeholder*="标题"], [class*="title-input"] input').first if await title_input.count() > 0 and await title_input.is_visible(): print(f"[{self.platform_name}] 检测到标题输入框,上传完成") break except: pass # 检查是否有错误提示 try: error_el = self.page.locator('[class*="error"], [class*="fail"]').first if await error_el.count() > 0: error_text = await error_el.text_content() if error_text and ('失败' in error_text or '错误' in error_text): raise Exception(f"上传失败: {error_text}") except: pass await asyncio.sleep(3) self.report_progress(60, "正在填写标题...") await asyncio.sleep(2) # 填写标题 title_filled = False title_selectors = [ 'input[placeholder*="标题"]', 'textarea[placeholder*="标题"]', '[class*="title-input"] input', '[class*="title"] input', 'input[maxlength]', ] for selector in title_selectors: if title_filled: break try: title_input = self.page.locator(selector).first if await title_input.count() > 0 and await title_input.is_visible(): await title_input.click() await self.page.keyboard.press("Control+KeyA") await self.page.keyboard.type(params.title[:30]) # 百家号标题限制30字 title_filled = True print(f"[{self.platform_name}] 标题填写成功") except Exception as e: print(f"[{self.platform_name}] 标题选择器 {selector} 失败: {e}") if not title_filled: print(f"[{self.platform_name}] 警告: 未能填写标题") # 填写描述 if params.description: self.report_progress(65, "正在填写描述...") try: desc_selectors = [ 'textarea[placeholder*="描述"]', 'textarea[placeholder*="简介"]', '[class*="desc"] textarea', '[class*="description"] textarea', ] for selector in desc_selectors: try: desc_input = self.page.locator(selector).first if await desc_input.count() > 0 and await desc_input.is_visible(): await desc_input.click() await self.page.keyboard.type(params.description[:200]) print(f"[{self.platform_name}] 描述填写成功") break except: pass except Exception as e: print(f"[{self.platform_name}] 描述填写失败: {e}") self.report_progress(70, "正在发布...") await asyncio.sleep(2) # 点击发布按钮 publish_selectors = [ 'button:has-text("发布")', 'button:has-text("发表")', 'button:has-text("提交")', '[class*="publish"] button', '[class*="submit"] button', ] publish_clicked = False for selector in publish_selectors: if publish_clicked: break try: btn = self.page.locator(selector).first if await btn.count() > 0 and await btn.is_visible(): # 检查按钮是否可用 is_disabled = await btn.get_attribute('disabled') if is_disabled: print(f"[{self.platform_name}] 按钮 {selector} 被禁用") continue await btn.click() publish_clicked = True print(f"[{self.platform_name}] 点击发布按钮成功") except Exception as e: print(f"[{self.platform_name}] 发布按钮 {selector} 失败: {e}") if not publish_clicked: screenshot_base64 = await self.capture_screenshot() return PublishResult( success=False, platform=self.platform_name, error="未找到发布按钮", screenshot_base64=screenshot_base64, page_url=await self.get_page_url(), status='failed' ) self.report_progress(80, "等待发布完成...") # 记录点击发布前的 URL publish_page_url = self.page.url print(f"[{self.platform_name}] 发布前 URL: {publish_page_url}") # 等待发布完成(最多3分钟) publish_timeout = 180 start_time = asyncio.get_event_loop().time() last_url = publish_page_url while asyncio.get_event_loop().time() - start_time < publish_timeout: await asyncio.sleep(3) current_url = self.page.url # 检测 URL 是否发生变化 if current_url != last_url: print(f"[{self.platform_name}] URL 变化: {last_url} -> {current_url}") last_url = current_url # 检查是否跳转到内容管理页面(真正的成功标志) # 百家号发布成功后会跳转到 /builder/rc/content 页面 if '/builder/rc/content' in current_url and 'edit' not in current_url: self.report_progress(100, "发布成功!") print(f"[{self.platform_name}] 发布成功,已跳转到内容管理页: {current_url}") screenshot_base64 = await self.capture_screenshot() return PublishResult( success=True, platform=self.platform_name, message="发布成功", screenshot_base64=screenshot_base64, page_url=current_url, status='success' ) # 检查是否有明确的成功提示弹窗 try: # 百家号发布成功会显示"发布成功"弹窗 success_modal = self.page.locator('div:has-text("发布成功"), div:has-text("提交成功"), div:has-text("视频发布成功")').first if await success_modal.count() > 0 and await success_modal.is_visible(): self.report_progress(100, "发布成功!") print(f"[{self.platform_name}] 检测到发布成功弹窗") screenshot_base64 = await self.capture_screenshot() # 等待一下看是否会跳转 await asyncio.sleep(3) return PublishResult( success=True, platform=self.platform_name, message="发布成功", screenshot_base64=screenshot_base64, page_url=self.page.url, status='success' ) except Exception as e: print(f"[{self.platform_name}] 检测成功提示异常: {e}") # 检查是否有错误提示 try: error_selectors = [ 'div.error-tip', 'div[class*="error-msg"]', 'span[class*="error"]', 'div:has-text("发布失败")', 'div:has-text("提交失败")', ] for error_selector in error_selectors: error_el = self.page.locator(error_selector).first if await error_el.count() > 0 and await error_el.is_visible(): error_text = await error_el.text_content() if error_text and error_text.strip(): print(f"[{self.platform_name}] 检测到错误: {error_text}") screenshot_base64 = await self.capture_screenshot() return PublishResult( success=False, platform=self.platform_name, error=f"发布失败: {error_text.strip()}", screenshot_base64=screenshot_base64, page_url=current_url, status='failed' ) except Exception as e: print(f"[{self.platform_name}] 检测错误提示异常: {e}") # 检查验证码 captcha_result = await self.check_captcha() if captcha_result['need_captcha']: screenshot_base64 = await self.capture_screenshot() return PublishResult( success=False, platform=self.platform_name, error=f"发布过程中需要{captcha_result['captcha_type']}验证码", need_captcha=True, captcha_type=captcha_result['captcha_type'], screenshot_base64=screenshot_base64, page_url=current_url, status='need_captcha' ) # 检查发布按钮状态(如果还在编辑页面) if 'edit' in current_url: try: # 检查是否正在上传/处理中 processing_indicators = [ '[class*="loading"]', '[class*="uploading"]', '[class*="processing"]', 'div:has-text("正在上传")', 'div:has-text("正在处理")', ] is_processing = False for indicator in processing_indicators: if await self.page.locator(indicator).count() > 0: is_processing = True print(f"[{self.platform_name}] 正在处理中...") break if not is_processing: # 如果不是在处理中,可能需要重新点击发布按钮 elapsed = asyncio.get_event_loop().time() - start_time if elapsed > 30: # 30秒后还在编辑页且不在处理中,可能发布没生效 print(f"[{self.platform_name}] 发布似乎未生效,尝试重新点击发布按钮...") for selector in publish_selectors: try: btn = self.page.locator(selector).first if await btn.count() > 0 and await btn.is_visible(): is_disabled = await btn.get_attribute('disabled') if not is_disabled: await btn.click() print(f"[{self.platform_name}] 重新点击发布按钮") break except: pass except Exception as e: print(f"[{self.platform_name}] 检查处理状态异常: {e}") # 超时,获取截图分析最终状态 print(f"[{self.platform_name}] 发布超时,最终 URL: {self.page.url}") screenshot_base64 = await self.capture_screenshot() # 最后一次检查是否在内容管理页 final_url = self.page.url if '/builder/rc/content' in final_url and 'edit' not in final_url: return PublishResult( success=True, platform=self.platform_name, message="发布成功(延迟确认)", screenshot_base64=screenshot_base64, page_url=final_url, status='success' ) return PublishResult( success=False, platform=self.platform_name, error="发布超时,请手动检查发布状态", screenshot_base64=screenshot_base64, page_url=final_url, status='need_action' ) async def get_works(self, cookies: str, page: int = 0, page_size: int = 20) -> WorksResult: """ 获取百家号作品列表 优先使用内容管理页的接口(pcui/article/lists)。 说明: - 该接口通常需要自定义请求头 token(JWT),仅靠 Cookie 可能会返回“未登录” - 这里使用 Playwright 打开内容页,从 localStorage/sessionStorage/页面脚本中自动提取 token, 再在页面上下文中发起 fetch(携带 cookie + token),以提高成功率 """ import re print(f"\n{'='*60}") print(f"[{self.platform_name}] 获取作品列表 (使用 API)") print(f"[{self.platform_name}] page={page}, page_size={page_size}") print(f"{'='*60}") works: List[WorkItem] = [] total = 0 has_more = False next_page = "" try: # 解析并设置 cookies(Playwright) cookie_list = self.parse_cookies(cookies) await self.init_browser() await self.set_cookies(cookie_list) if not self.page: raise Exception("Page not initialized") # 先打开内容管理页,确保本页 Referer/会话就绪 # Node 侧传 page=0,1,...;接口 currentPage 为 1,2,... current_page = int(page) + 1 page_size = int(page_size) content_url = ( "https://baijiahao.baidu.com/builder/rc/content" f"?currentPage={current_page}&pageSize={page_size}" "&search=&type=&collection=&startDate=&endDate=" ) await self.page.goto(content_url, wait_until="domcontentloaded", timeout=60000) await asyncio.sleep(2) # 1) 提取 token(JWT) token = await self.page.evaluate( """ () => { const isJwtLike = (v) => { if (!v || typeof v !== 'string') return false; const s = v.trim(); if (s.length < 60) return false; const parts = s.split('.'); if (parts.length !== 3) return false; return parts.every(p => /^[A-Za-z0-9_-]+$/.test(p) && p.length > 10); }; const pickFromStorage = (storage) => { try { const keys = Object.keys(storage || {}); for (const k of keys) { const v = storage.getItem(k); if (isJwtLike(v)) return v; } } catch {} return ""; }; // localStorage / sessionStorage let t = pickFromStorage(window.localStorage); if (t) return t; t = pickFromStorage(window.sessionStorage); if (t) return t; // meta 标签 const meta = document.querySelector('meta[name="token"], meta[name="bjh-token"]'); const metaToken = meta && meta.getAttribute('content'); if (isJwtLike(metaToken)) return metaToken; // 简单从全局变量里找 const candidates = [ (window.__INITIAL_STATE__ && window.__INITIAL_STATE__.token) || "", (window.__PRELOADED_STATE__ && window.__PRELOADED_STATE__.token) || "", (window.__NUXT__ && window.__NUXT__.state && window.__NUXT__.state.token) || "", ]; for (const c of candidates) { if (isJwtLike(c)) return c; } return ""; } """ ) # 2) 若仍未取到 token,再从页面 HTML 兜底提取 if not token: html = await self.page.content() m = re.search(r'([A-Za-z0-9_-]{10,}\.[A-Za-z0-9_-]{10,}\.[A-Za-z0-9_-]{10,})', html) if m: token = m.group(1) if not token: raise Exception("未能从页面提取 token(可能未登录或触发风控),请重新登录百家号账号后再试") # 3) 调用接口(在页面上下文 fetch,自动携带 cookie) api_url = ( "https://baijiahao.baidu.com/pcui/article/lists" f"?currentPage={current_page}" f"&pageSize={page_size}" "&search=&type=&collection=&startDate=&endDate=" "&clearBeforeFetch=false" "&dynamic=1" ) resp = await self.page.evaluate( """ async ({ url, token }) => { const r = await fetch(url, { method: 'GET', credentials: 'include', headers: { 'accept': 'application/json, text/plain, */*', ...(token ? { token } : {}), }, }); const text = await r.text(); return { ok: r.ok, status: r.status, text }; } """, {"url": api_url, "token": token}, ) if not resp or not resp.get("ok"): status = resp.get("status") if isinstance(resp, dict) else "unknown" raise Exception(f"百家号接口请求失败: HTTP {status}") api_result = json.loads(resp.get("text") or "{}") print(f"[{self.platform_name}] pcui/article/lists 响应: errno={api_result.get('errno')}, errmsg={api_result.get('errmsg')}") if api_result.get("errno") != 0: errno = api_result.get("errno") errmsg = api_result.get("errmsg", "unknown error") # 20040001 常见为“未登录” if errno in (110, 20040001): raise Exception("百家号未登录或 Cookie/token 失效,请重新登录后再同步") raise Exception(f"百家号接口错误: errno={errno}, errmsg={errmsg}") data = api_result.get("data", {}) or {} items = data.get("list", []) or [] page_info = data.get("page", {}) or {} total = int(page_info.get("totalCount", 0) or 0) total_page = int(page_info.get("totalPage", 0) or 0) cur_page = int(page_info.get("currentPage", current_page) or current_page) has_more = bool(total_page and cur_page < total_page) next_page = cur_page + 1 if has_more else "" print(f"[{self.platform_name}] 获取到 {len(items)} 个作品,总数: {total}, currentPage={cur_page}, totalPage={total_page}") def _pick_cover(item: dict) -> str: cover = item.get("crosswise_cover") or item.get("vertical_cover") or "" if cover: return cover raw = item.get("cover_images") or "" try: # cover_images 可能是 JSON 字符串 parsed = json.loads(raw) if isinstance(raw, str) else raw if isinstance(parsed, list) and parsed: first = parsed[0] if isinstance(first, dict): return first.get("src") or first.get("ori_src") or "" if isinstance(first, str): return first except Exception: pass return "" def _pick_duration(item: dict) -> int: for k in ("rmb_duration", "duration", "long"): try: v = int(item.get(k) or 0) if v > 0: return v except Exception: pass # displaytype_exinfo 里可能有 ugcvideo.video_info.durationInSecond ex = item.get("displaytype_exinfo") or "" try: exj = json.loads(ex) if isinstance(ex, str) and ex else (ex if isinstance(ex, dict) else {}) ugc = (exj.get("ugcvideo") or {}) if isinstance(exj, dict) else {} vi = ugc.get("video_info") or {} v = int(vi.get("durationInSecond") or ugc.get("long") or 0) return v if v > 0 else 0 except Exception: return 0 def _pick_status(item: dict) -> str: qs = str(item.get("quality_status") or "").lower() st = str(item.get("status") or "").lower() if qs == "rejected" or "reject" in st: return "rejected" if st in ("draft", "unpublish", "unpublished"): return "draft" # 百家号常见 publish return "published" for item in items: # 优先使用 nid(builder 预览链接使用这个) work_id = str(item.get("nid") or item.get("feed_id") or item.get("article_id") or item.get("id") or "") if not work_id: continue works.append( WorkItem( work_id=work_id, title=str(item.get("title") or ""), cover_url=_pick_cover(item), video_url=str(item.get("url") or ""), duration=_pick_duration(item), status=_pick_status(item), publish_time=str(item.get("publish_time") or item.get("publish_at") or item.get("created_at") or ""), play_count=int(item.get("read_amount") or 0), like_count=int(item.get("like_amount") or 0), comment_count=int(item.get("comment_amount") or 0), share_count=int(item.get("share_amount") or 0), collect_count=int(item.get("collection_amount") or 0), ) ) print(f"[{self.platform_name}] ✓ 成功解析 {len(works)} 个作品") except Exception as e: import traceback traceback.print_exc() return WorksResult( success=False, platform=self.platform_name, error=str(e), debug_info="baijiahao_get_works_failed" ) return WorksResult( success=True, platform=self.platform_name, works=works, total=total, has_more=has_more, next_page=next_page ) async def check_login_status(self, cookies: str) -> dict: """ 检查百家号 Cookie 登录状态 使用直接 HTTP API 调用,不使用浏览器 """ import aiohttp print(f"[{self.platform_name}] 检查登录状态 (使用 API)") try: # 解析 cookies cookie_list = self.parse_cookies(cookies) cookie_dict = {c['name']: c['value'] for c in cookie_list} # 重要:百家号需要先访问主页建立会话上下文 session_headers = { 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7', 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36', # Cookie 由 session 管理 'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8', 'Accept-Encoding': 'gzip, deflate, br', 'Connection': 'keep-alive', 'Upgrade-Insecure-Requests': '1', 'Sec-Fetch-Dest': 'document', 'Sec-Fetch-Mode': 'navigate', 'Sec-Fetch-Site': 'none', 'Sec-Fetch-User': '?1', 'sec-ch-ua': '"Not_A Brand";v="8", "Chromium";v="120", "Google Chrome";v="120"', 'sec-ch-ua-mobile': '?0', 'sec-ch-ua-platform': '"Windows"' } headers = { 'Accept': 'application/json, text/plain, */*', 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36', # Cookie 由 session 管理 'Referer': 'https://baijiahao.baidu.com/builder/rc/home', 'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8', 'Accept-Encoding': 'gzip, deflate, br', 'Connection': 'keep-alive', 'Sec-Fetch-Dest': 'empty', 'Sec-Fetch-Mode': 'cors', 'Sec-Fetch-Site': 'same-origin', 'sec-ch-ua': '"Not_A Brand";v="8", "Chromium";v="120", "Google Chrome";v="120"', 'sec-ch-ua-mobile': '?0', 'sec-ch-ua-platform': '"Windows"' } async with aiohttp.ClientSession(cookies=cookie_dict) as session: # 步骤 0: 先访问主页建立会话上下文(关键步骤!) print(f"[{self.platform_name}] [0/2] 访问主页建立会话上下文...") async with session.get( 'https://baijiahao.baidu.com/builder/rc/home', headers=session_headers, timeout=aiohttp.ClientTimeout(total=30) ) as home_response: home_status = home_response.status print(f"[{self.platform_name}] 主页访问状态: {home_status}") # 短暂等待确保会话建立 await asyncio.sleep(1) # 步骤 1: 调用 API 检查登录状态 print(f"[{self.platform_name}] [1/2] 调用 appinfo API 检查登录状态...") async with session.get( 'https://baijiahao.baidu.com/builder/app/appinfo', headers=headers, timeout=aiohttp.ClientTimeout(total=30) ) as response: api_result = await response.json() errno = api_result.get('errno') print(f"[{self.platform_name}] API 完整响应: {json.dumps(api_result, ensure_ascii=False)[:500]}") print(f"[{self.platform_name}] API 响应: errno={errno}") # errno 为 0 表示请求成功 if errno == 0: # 检查是否有用户数据 user_data = api_result.get('data', {}).get('user', {}) if user_data: # 检查账号状态 status = user_data.get('status', '') account_name = user_data.get('name') or user_data.get('uname', '') # 有效的账号状态:audit(审核中), pass(已通过), normal(正常), newbie(新手) valid_statuses = ['audit', 'pass', 'normal', 'newbie'] if status in valid_statuses and account_name: print(f"[{self.platform_name}] ✓ 登录状态有效: {account_name} (status={status})") return { "success": True, "valid": True, "need_login": False, "message": "登录状态有效" } else: print(f"[{self.platform_name}] 账号状态异常: status={status}, name={account_name}") return { "success": True, "valid": False, "need_login": True, "message": f"账号状态异常: {status}" } else: print(f"[{self.platform_name}] 无用户数据,Cookie 可能无效") return { "success": True, "valid": False, "need_login": True, "message": "无用户数据" } # errno 非 0 表示请求失败 # 常见错误码:110 = 未登录 error_msg = api_result.get('errmsg', '未知错误') print(f"[{self.platform_name}] Cookie 无效: errno={errno}, msg={error_msg}") return { "success": True, "valid": False, "need_login": True, "message": error_msg } except Exception as e: import traceback traceback.print_exc() return { "success": False, "valid": False, "need_login": True, "error": str(e) } async def get_comments(self, cookies: str, work_id: str, cursor: str = "") -> CommentsResult: """获取百家号作品评论""" # TODO: 实现评论获取逻辑 return CommentsResult( success=False, platform=self.platform_name, work_id=work_id, error="百家号评论功能暂未实现" ) ================================================================================ 文件: server\python\platforms\douyin.py ================================================================================ # -*- coding: utf-8 -*- """ 抖音视频发布器 参考: matrix/douyin_uploader/main.py """ import asyncio import os import json import re from datetime import datetime from typing import List from .base import ( BasePublisher, PublishParams, PublishResult, WorkItem, WorksResult, CommentItem, CommentsResult ) class DouyinPublisher(BasePublisher): """ 抖音视频发布器 使用 Playwright 自动化操作抖音创作者中心 """ platform_name = "douyin" login_url = "https://creator.douyin.com/" publish_url = "https://creator.douyin.com/creator-micro/content/upload" cookie_domain = ".douyin.com" async def set_schedule_time(self, publish_date: datetime): """设置定时发布""" if not self.page: return # 选择定时发布 label_element = self.page.locator("label.radio-d4zkru:has-text('定时发布')") await label_element.click() await asyncio.sleep(1) # 输入时间 publish_date_str = publish_date.strftime("%Y-%m-%d %H:%M") await self.page.locator('.semi-input[placeholder="日期和时间"]').click() await self.page.keyboard.press("Control+KeyA") await self.page.keyboard.type(str(publish_date_str)) await self.page.keyboard.press("Enter") await asyncio.sleep(1) async def handle_upload_error(self, video_path: str): """处理上传错误,重新上传""" if not self.page: return print(f"[{self.platform_name}] 视频出错了,重新上传中...") await self.page.locator('div.progress-div [class^="upload-btn-input"]').set_input_files(video_path) async def check_captcha(self) -> dict: """ 检查页面是否需要验证码 返回: {'need_captcha': bool, 'captcha_type': str} """ if not self.page: return {'need_captcha': False, 'captcha_type': ''} try: # 检查手机验证码弹窗 phone_captcha_selectors = [ 'text="请输入验证码"', 'text="输入手机验证码"', 'text="获取验证码"', 'text="手机号验证"', '[class*="captcha"][class*="phone"]', '[class*="verify"][class*="phone"]', '[class*="sms-code"]', 'input[placeholder*="验证码"]', ] for selector in phone_captcha_selectors: try: if await self.page.locator(selector).count() > 0: print(f"[{self.platform_name}] 检测到手机验证码: {selector}", flush=True) return {'need_captcha': True, 'captcha_type': 'phone'} except: pass # 检查滑块验证码 slider_captcha_selectors = [ '[class*="captcha"][class*="slider"]', '[class*="slide-verify"]', '[class*="drag-verify"]', 'text="按住滑块"', 'text="向右滑动"', 'text="拖动滑块"', ] for selector in slider_captcha_selectors: try: if await self.page.locator(selector).count() > 0: print(f"[{self.platform_name}] 检测到滑块验证码: {selector}", flush=True) return {'need_captcha': True, 'captcha_type': 'slider'} except: pass # 检查图片验证码 image_captcha_selectors = [ '[class*="captcha"][class*="image"]', '[class*="verify-image"]', 'text="点击图片"', 'text="选择正确的"', ] for selector in image_captcha_selectors: try: if await self.page.locator(selector).count() > 0: print(f"[{self.platform_name}] 检测到图片验证码: {selector}", flush=True) return {'need_captcha': True, 'captcha_type': 'image'} except: pass # 检查登录弹窗(Cookie 过期) login_selectors = [ 'text="请先登录"', 'text="登录后继续"', '[class*="login-modal"]', '[class*="login-dialog"]', ] for selector in login_selectors: try: if await self.page.locator(selector).count() > 0: print(f"[{self.platform_name}] 检测到需要登录: {selector}", flush=True) return {'need_captcha': True, 'captcha_type': 'login'} except: pass except Exception as e: print(f"[{self.platform_name}] 验证码检测异常: {e}", flush=True) return {'need_captcha': False, 'captcha_type': ''} async def handle_phone_captcha(self) -> bool: if not self.page: return False try: body_text = "" try: body_text = await self.page.inner_text("body") except: body_text = "" phone_match = re.search(r"(1\d{2}\*{4}\d{4})", body_text or "") masked_phone = phone_match.group(1) if phone_match else "" async def _get_send_button(): candidates = [ self.page.get_by_role("button", name="获取验证码"), self.page.get_by_role("button", name="发送验证码"), self.page.locator('button:has-text("获取验证码")'), self.page.locator('button:has-text("发送验证码")'), self.page.locator('[role="button"]:has-text("获取验证码")'), self.page.locator('[role="button"]:has-text("发送验证码")'), ] for c in candidates: try: if await c.count() > 0 and await c.first.is_visible(): return c.first except: continue return None async def _confirm_sent() -> bool: try: txt = "" try: txt = await self.page.inner_text("body") except: txt = "" if re.search(r"(\d+\s*秒)|(\d+\s*s)|后可重试|重新发送|已发送", txt or ""): return True except: pass try: btn = await _get_send_button() if btn: disabled = await btn.is_disabled() if disabled: return True label = (await btn.inner_text()) if btn else "" if re.search(r"(\d+\s*秒)|(\d+\s*s)|后可重试|重新发送|已发送", label or ""): return True except: pass return False did_click_send = False btn = await _get_send_button() if btn: try: if await btn.is_enabled(): await btn.click(timeout=5000) did_click_send = True print(f"[{self.platform_name}] 已点击发送短信验证码", flush=True) except Exception as e: print(f"[{self.platform_name}] 点击发送验证码按钮失败: {e}", flush=True) if did_click_send: try: await self.page.wait_for_timeout(800) except: pass sent_confirmed = await _confirm_sent() if did_click_send else False ai_state = await self.ai_analyze_sms_send_state() try: if ai_state.get("sent_likely"): sent_confirmed = True except: pass if (not did_click_send or not sent_confirmed) and ai_state.get("suggested_action") == "click_send": btn2 = await _get_send_button() if btn2: try: if await btn2.is_enabled(): await btn2.click(timeout=5000) did_click_send = True await self.page.wait_for_timeout(800) sent_confirmed = await _confirm_sent() ai_state = await self.ai_analyze_sms_send_state() if ai_state.get("sent_likely"): sent_confirmed = True except: pass code_hint = "请输入短信验证码。" if ai_state.get("block_reason") == "slider": code_hint = "检测到滑块/人机验证阻塞,请先在浏览器窗口完成验证后再发送短信验证码。" elif ai_state.get("block_reason") in ["rate_limit", "risk"]: code_hint = f"页面提示可能被限制/风控({ai_state.get('notes','') or '请稍后重试'})。可稍等后重新发送验证码。" elif not did_click_send: code_hint = "未找到或无法点击“发送验证码”按钮,请在弹出的浏览器页面手动点击发送后再输入验证码。" elif sent_confirmed: code_hint = f"已检测到短信验证码已发送({ai_state.get('notes','') or '请查收短信'})。" else: code_hint = f"已尝试点击发送验证码,但未确认发送成功({ai_state.get('notes','') or '请查看是否出现倒计时/重新发送'})。" code = await self.request_sms_code_from_frontend(masked_phone, message=code_hint) input_selectors = [ 'input[placeholder*="验证码"]', 'input[placeholder*="短信"]', 'input[type="tel"]', 'input[type="text"]', ] filled = False for selector in input_selectors: try: el = self.page.locator(selector).first if await el.count() > 0: await el.fill(code) filled = True break except: continue if not filled: raise Exception("未找到验证码输入框") submit_selectors = [ 'button:has-text("确定")', 'button:has-text("确认")', 'button:has-text("提交")', 'button:has-text("完成")', ] for selector in submit_selectors: try: btn = self.page.locator(selector).first if await btn.count() > 0: await btn.click() break except: continue try: await self.page.wait_for_timeout(1000) await self.page.wait_for_selector('text="请输入验证码"', state="hidden", timeout=15000) except: pass print(f"[{self.platform_name}] 短信验证码已提交,继续执行发布流程", flush=True) return True except Exception as e: print(f"[{self.platform_name}] 处理短信验证码失败: {e}", flush=True) return False async def publish(self, cookies: str, params: PublishParams) -> PublishResult: """发布视频到抖音 - 参考 matrix/douyin_uploader/main.py""" print(f"\n{'='*60}") print(f"[{self.platform_name}] 开始发布视频") print(f"[{self.platform_name}] 视频路径: {params.video_path}") print(f"[{self.platform_name}] 标题: {params.title}") print(f"[{self.platform_name}] Headless: {self.headless}") print(f"{'='*60}") self.report_progress(5, "正在初始化浏览器...") # 初始化浏览器 await self.init_browser() print(f"[{self.platform_name}] 浏览器初始化完成") # 解析并设置 cookies cookie_list = self.parse_cookies(cookies) print(f"[{self.platform_name}] 解析到 {len(cookie_list)} 个 cookies") await self.set_cookies(cookie_list) if not self.page: raise Exception("Page not initialized") # 检查视频文件 if not os.path.exists(params.video_path): raise Exception(f"视频文件不存在: {params.video_path}") print(f"[{self.platform_name}] 视频文件存在,大小: {os.path.getsize(params.video_path)} bytes") self.report_progress(10, "正在打开上传页面...") # 访问上传页面 - 参考 matrix await self.page.goto("https://creator.douyin.com/creator-micro/content/upload") print(f"[{self.platform_name}] 等待页面加载...") try: await self.page.wait_for_url("https://creator.douyin.com/creator-micro/content/upload", timeout=30000) except: pass await asyncio.sleep(3) # 检查当前 URL 和页面状态 current_url = self.page.url print(f"[{self.platform_name}] 当前 URL: {current_url}") async def wait_for_manual_login(timeout_seconds: int = 300) -> bool: if not self.page: return False self.report_progress(8, "检测到需要登录,请在浏览器窗口完成登录...") try: await self.page.bring_to_front() except: pass waited = 0 while waited < timeout_seconds: try: url = self.page.url if "login" not in url and "passport" not in url: if "creator.douyin.com" in url: return True await asyncio.sleep(2) waited += 2 except: await asyncio.sleep(2) waited += 2 return False # 检查是否在登录页面或需要登录 if "login" in current_url or "passport" in current_url: if not self.headless: logged_in = await wait_for_manual_login() if logged_in: try: if self.context: cookies_after = await self.context.cookies() await self.sync_cookies_to_node(cookies_after) except: pass await self.page.goto("https://creator.douyin.com/creator-micro/content/upload") await asyncio.sleep(3) current_url = self.page.url else: screenshot_base64 = await self.capture_screenshot() return PublishResult( success=False, platform=self.platform_name, error="需要登录:请在浏览器窗口完成登录后重试", need_captcha=True, captcha_type='login', screenshot_base64=screenshot_base64, page_url=current_url, status='need_captcha' ) else: screenshot_base64 = await self.capture_screenshot() return PublishResult( success=False, platform=self.platform_name, error="Cookie 已过期,需要重新登录", need_captcha=True, captcha_type='login', screenshot_base64=screenshot_base64, page_url=current_url, status='need_captcha' ) # 使用 AI 检测验证码 ai_captcha_result = await self.ai_check_captcha() if ai_captcha_result['has_captcha']: print(f"[{self.platform_name}] AI检测到验证码: {ai_captcha_result['captcha_type']}", flush=True) screenshot_base64 = await self.capture_screenshot() return PublishResult( success=False, platform=self.platform_name, error=f"检测到{ai_captcha_result['captcha_type']}验证码,需要使用有头浏览器完成验证", need_captcha=True, captcha_type=ai_captcha_result['captcha_type'], screenshot_base64=screenshot_base64, page_url=current_url, status='need_captcha' ) # 传统方式检测验证码 captcha_result = await self.check_captcha() if captcha_result['need_captcha']: print(f"[{self.platform_name}] 传统方式检测到验证码: {captcha_result['captcha_type']}", flush=True) if captcha_result['captcha_type'] == 'phone': handled = await self.handle_phone_captcha() if handled: self.report_progress(12, "短信验证码已处理,继续发布...") else: screenshot_base64 = await self.capture_screenshot() return PublishResult( success=False, platform=self.platform_name, error="检测到手机验证码,但自动处理失败", need_captcha=True, captcha_type='phone', screenshot_base64=screenshot_base64, page_url=current_url, status='need_captcha' ) else: screenshot_base64 = await self.capture_screenshot() return PublishResult( success=False, platform=self.platform_name, error=f"需要{captcha_result['captcha_type']}验证码,请使用有头浏览器完成验证", need_captcha=True, captcha_type=captcha_result['captcha_type'], screenshot_base64=screenshot_base64, page_url=current_url, status='need_captcha' ) self.report_progress(15, "正在选择视频文件...") # 点击上传区域 - 参考 matrix: div.container-drag-info-Tl0RGH 或带 container-drag 的 div upload_selectors = [ "div[class*='container-drag-info']", "div[class*='container-drag']", "div.upload-btn", "div[class*='upload']", ] upload_success = False for selector in upload_selectors: try: upload_div = self.page.locator(selector).first if await upload_div.count() > 0: print(f"[{self.platform_name}] 找到上传区域: {selector}") async with self.page.expect_file_chooser(timeout=10000) as fc_info: await upload_div.click() file_chooser = await fc_info.value await file_chooser.set_files(params.video_path) upload_success = True print(f"[{self.platform_name}] 视频文件已选择") break except Exception as e: print(f"[{self.platform_name}] 选择器 {selector} 失败: {e}") if not upload_success: screenshot_base64 = await self.capture_screenshot() return PublishResult( success=False, platform=self.platform_name, error="未找到上传入口", screenshot_base64=screenshot_base64, page_url=await self.get_page_url(), status='failed' ) # 等待跳转到发布页面 - 参考 matrix self.report_progress(20, "等待进入发布页面...") for i in range(60): try: # matrix 等待的 URL: https://creator.douyin.com/creator-micro/content/post/video?enter_from=publish_page await self.page.wait_for_url( "https://creator.douyin.com/creator-micro/content/post/video*", timeout=2000 ) print(f"[{self.platform_name}] 已进入发布页面") break except: print(f"[{self.platform_name}] 等待进入发布页面... {i+1}/60") await asyncio.sleep(1) await asyncio.sleep(2) self.report_progress(30, "正在填充标题和话题...") # 填写标题 - 参考 matrix title_input = self.page.get_by_text('作品标题').locator("..").locator( "xpath=following-sibling::div[1]").locator("input") if await title_input.count(): await title_input.fill(params.title[:30]) print(f"[{self.platform_name}] 标题已填写") else: # 备用方式 - 参考 matrix title_container = self.page.locator(".notranslate") await title_container.click() await self.page.keyboard.press("Backspace") await self.page.keyboard.press("Control+KeyA") await self.page.keyboard.press("Delete") await self.page.keyboard.type(params.title) await self.page.keyboard.press("Enter") print(f"[{self.platform_name}] 标题已填写(备用方式)") # 添加话题标签 - 参考 matrix if params.tags: css_selector = ".zone-container" for index, tag in enumerate(params.tags, start=1): print(f"[{self.platform_name}] 正在添加第{index}个话题: #{tag}") await self.page.type(css_selector, "#" + tag) await self.page.press(css_selector, "Space") self.report_progress(40, "等待视频上传完成...") # 等待视频上传完成 - 参考 matrix: 检测"重新上传"按钮 for i in range(120): try: count = await self.page.locator("div").filter(has_text="重新上传").count() if count > 0: print(f"[{self.platform_name}] 视频上传完毕") break else: print(f"[{self.platform_name}] 正在上传视频中... {i+1}/120") # 检查上传错误 if await self.page.locator('div.progress-div > div:has-text("上传失败")').count(): print(f"[{self.platform_name}] 发现上传出错了,重新上传...") await self.handle_upload_error(params.video_path) await asyncio.sleep(3) except: print(f"[{self.platform_name}] 正在上传视频中...") await asyncio.sleep(3) self.report_progress(60, "处理视频设置...") # 点击"我知道了"弹窗 - 参考 matrix known_count = await self.page.get_by_role("button", name="我知道了").count() if known_count > 0: await self.page.get_by_role("button", name="我知道了").nth(0).click() print(f"[{self.platform_name}] 关闭弹窗") await asyncio.sleep(5) # 设置位置 - 参考 matrix try: await self.page.locator('div.semi-select span:has-text("输入地理位置")').click() await asyncio.sleep(1) await self.page.keyboard.press("Backspace") await self.page.keyboard.press("Control+KeyA") await self.page.keyboard.press("Delete") await self.page.keyboard.type(params.location) await asyncio.sleep(1) await self.page.locator('div[role="listbox"] [role="option"]').first.click() print(f"[{self.platform_name}] 位置设置成功: {params.location}") except Exception as e: print(f"[{self.platform_name}] 设置位置失败: {e}") # 开启头条/西瓜同步 - 参考 matrix try: third_part_element = '[class^="info"] > [class^="first-part"] div div.semi-switch' if await self.page.locator(third_part_element).count(): class_name = await self.page.eval_on_selector( third_part_element, 'div => div.className') if 'semi-switch-checked' not in class_name: await self.page.locator(third_part_element).locator( 'input.semi-switch-native-control').click() print(f"[{self.platform_name}] 已开启头条/西瓜同步") except: pass # 定时发布 if params.publish_date: self.report_progress(70, "设置定时发布...") await self.set_schedule_time(params.publish_date) self.report_progress(80, "正在发布...") print(f"[{self.platform_name}] 查找发布按钮...") # 点击发布 - 参考 matrix for i in range(30): try: # 检查验证码(不要在每次循环都调 AI,太慢) if i % 5 == 0: ai_captcha = await self.ai_check_captcha() if ai_captcha['has_captcha']: print(f"[{self.platform_name}] AI检测到发布过程中需要验证码: {ai_captcha['captcha_type']}", flush=True) if ai_captcha['captcha_type'] == 'phone': handled = await self.handle_phone_captcha() if handled: continue screenshot_base64 = await self.capture_screenshot() page_url = await self.get_page_url() return PublishResult( success=False, platform=self.platform_name, error=f"发布过程中需要{ai_captcha['captcha_type']}验证码,请使用有头浏览器完成验证", need_captcha=True, captcha_type=ai_captcha['captcha_type'], screenshot_base64=screenshot_base64, page_url=page_url, status='need_captcha' ) publish_btn = self.page.get_by_role('button', name="发布", exact=True) btn_count = await publish_btn.count() if btn_count > 0: print(f"[{self.platform_name}] 点击发布按钮...") await publish_btn.click() # 等待跳转到内容管理页面 - 参考 matrix await self.page.wait_for_url( "https://creator.douyin.com/creator-micro/content/manage", timeout=5000 ) self.report_progress(100, "发布成功") print(f"[{self.platform_name}] 视频发布成功!") screenshot_base64 = await self.capture_screenshot() page_url = await self.get_page_url() return PublishResult( success=True, platform=self.platform_name, message="发布成功", screenshot_base64=screenshot_base64, page_url=page_url, status='success' ) except Exception as e: current_url = self.page.url # 检查是否已经在管理页面 if "https://creator.douyin.com/creator-micro/content/manage" in current_url: self.report_progress(100, "发布成功") print(f"[{self.platform_name}] 视频发布成功!") screenshot_base64 = await self.capture_screenshot() return PublishResult( success=True, platform=self.platform_name, message="发布成功", screenshot_base64=screenshot_base64, page_url=current_url, status='success' ) else: print(f"[{self.platform_name}] 视频正在发布中... {i+1}/30, URL: {current_url}") await asyncio.sleep(1) # 发布超时 print(f"[{self.platform_name}] 发布超时,获取截图...") screenshot_base64 = await self.capture_screenshot() page_url = await self.get_page_url() return PublishResult( success=False, platform=self.platform_name, error="发布超时,请检查发布状态", screenshot_base64=screenshot_base64, page_url=page_url, status='need_action' ) async def get_works(self, cookies: str, page: int = 0, page_size: int = 20) -> WorksResult: """获取抖音作品列表 Args: cookies: Cookie 字符串或 JSON page: 分页参数,首次请求传 0,后续传上一次返回的 next_page(即 API 的 max_cursor) page_size: 每页数量 Returns: WorksResult: 包含 works, total, has_more, next_page(用于下一页请求) """ print(f"\n{'='*60}") print(f"[{self.platform_name}] 获取作品列表") print(f"[{self.platform_name}] cursor={page}, page_size={page_size}") print(f"{'='*60}") works: List[WorkItem] = [] total = 0 has_more = False next_cursor = 0 try: await self.init_browser() cookie_list = self.parse_cookies(cookies) await self.set_cookies(cookie_list) if not self.page: raise Exception("Page not initialized") # 访问创作者中心首页以触发登录验证 await self.page.goto("https://creator.douyin.com/creator-micro/home") await asyncio.sleep(3) # 检查登录状态 current_url = self.page.url if "login" in current_url or "passport" in current_url: raise Exception("Cookie 已过期,请重新登录") # 调用作品列表 API:page 作为 max_cursor(首次 0,后续为上一页返回的 max_cursor) max_cursor = page api_url = f"https://creator.douyin.com/janus/douyin/creator/pc/work_list?status=0&device_platform=android&count={page_size}&max_cursor={max_cursor}&cookie_enabled=true&browser_language=zh-CN&browser_platform=Win32&browser_name=Mozilla&browser_online=true&timezone_name=Asia%2FShanghai" response = await self.page.evaluate(f''' async () => {{ try {{ const resp = await fetch("{api_url}", {{ credentials: 'include', headers: {{ 'Accept': 'application/json' }} }}); return await resp.json(); }} catch (e) {{ return {{ error: e.toString() }}; }} }} ''') if response.get('error'): print(f"[{self.platform_name}] API 请求失败: {response.get('error')}", flush=True) aweme_list = response.get('aweme_list', []) or [] has_more = response.get('has_more', False) # 下一页游标:优先 max_cursor,兼容 next_cursor(与创作者中心 work_list 一致) next_cursor = response.get('max_cursor') if 'max_cursor' in response else response.get('next_cursor') if next_cursor is None: next_cursor = 0 # 从第一个作品的 author.aweme_count 获取总作品数 if aweme_list and len(aweme_list) > 0: first_aweme = aweme_list[0] author_aweme_count = first_aweme.get('author', {}).get('aweme_count', 0) if author_aweme_count > 0: total = author_aweme_count print(f"[{self.platform_name}] 从 author.aweme_count 获取总作品数: {total}") print(f"[{self.platform_name}] API 响应: has_more={has_more}, aweme_list={len(aweme_list)}, next_cursor={next_cursor}") for aweme in aweme_list: aweme_id = str(aweme.get('aweme_id', '')) if not aweme_id: continue statistics = aweme.get('statistics', {}) # 打印调试信息,确认字段存在 # print(f"[{self.platform_name}] 作品 {aweme_id} 统计: {statistics}", flush=True) # 获取封面 cover_url = '' if aweme.get('Cover', {}).get('url_list'): cover_url = aweme['Cover']['url_list'][0] elif aweme.get('video', {}).get('cover', {}).get('url_list'): cover_url = aweme['video']['cover']['url_list'][0] # 获取标题 title = aweme.get('item_title', '') or aweme.get('desc', '').split('\n')[0][:50] or '无标题' # 获取时长(毫秒转秒) duration = aweme.get('video', {}).get('duration', 0) // 1000 # 获取发布时间 create_time = aweme.get('create_time', 0) publish_time = datetime.fromtimestamp(create_time).strftime('%Y-%m-%d %H:%M:%S') if create_time else '' # 入库 video_url 使用 play_addr.url_list 的第一项,无则用分享页链接 url_list = (aweme.get('video') or {}).get('play_addr', {}).get('url_list') or [] video_url = url_list[0] if url_list else (f"https://www.douyin.com/video/{aweme_id}" if aweme_id else "") works.append(WorkItem( work_id=aweme_id, title=title, cover_url=cover_url, video_url=video_url, duration=duration, status='published', publish_time=publish_time, play_count=int(statistics.get('play_count', 0)), like_count=int(statistics.get('digg_count', 0)), comment_count=int(statistics.get('comment_count', 0)), share_count=int(statistics.get('share_count', 0)), collect_count=int(statistics.get('collect_count', 0)), )) if total == 0: total = len(works) print(f"[{self.platform_name}] 本页获取到 {len(works)} 个作品") except Exception as e: import traceback traceback.print_exc() return WorksResult( success=False, platform=self.platform_name, error=str(e) ) return WorksResult( success=True, platform=self.platform_name, works=works, total=total, has_more=has_more, next_page=next_cursor ) async def get_comments(self, cookies: str, work_id: str, cursor: str = "") -> CommentsResult: """获取抖音作品评论 - 通过访问视频详情页拦截评论 API""" print(f"\n{'='*60}") print(f"[{self.platform_name}] 获取作品评论") print(f"[{self.platform_name}] work_id={work_id}, cursor={cursor}") print(f"{'='*60}") comments: List[CommentItem] = [] total = 0 has_more = False next_cursor = "" captured_data = {} try: await self.init_browser() cookie_list = self.parse_cookies(cookies) await self.set_cookies(cookie_list) if not self.page: raise Exception("Page not initialized") # 设置 API 响应监听器 async def handle_response(response): nonlocal captured_data url = response.url # 监听评论列表 API - 抖音视频页面使用的 API # /aweme/v1/web/comment/list/ 或 /comment/list/ if '/comment/list' in url and ('aweme_id' in url or work_id in url): try: json_data = await response.json() print(f"[{self.platform_name}] 捕获到评论 API: {url[:100]}...", flush=True) # 检查响应是否成功 if json_data.get('status_code') == 0 or json_data.get('comments'): captured_data = json_data comment_count = len(json_data.get('comments', [])) print(f"[{self.platform_name}] 评论 API 响应成功: comments={comment_count}, has_more={json_data.get('has_more')}", flush=True) except Exception as e: print(f"[{self.platform_name}] 解析评论响应失败: {e}", flush=True) self.page.on('response', handle_response) print(f"[{self.platform_name}] 已注册评论 API 响应监听器", flush=True) # 访问视频详情页 - 这会自动触发评论 API 请求 video_url = f"https://www.douyin.com/video/{work_id}" print(f"[{self.platform_name}] 访问视频详情页: {video_url}", flush=True) await self.page.goto(video_url, wait_until="domcontentloaded", timeout=30000) await asyncio.sleep(5) # 检查登录状态 current_url = self.page.url if "login" in current_url or "passport" in current_url: raise Exception("Cookie 已过期,请重新登录") # 等待评论加载 if not captured_data: print(f"[{self.platform_name}] 等待评论 API 响应...", flush=True) # 尝试滚动页面触发评论加载 await self.page.evaluate('window.scrollBy(0, 300)') await asyncio.sleep(3) if not captured_data: # 再等待一会 await asyncio.sleep(3) # 移除监听器 self.page.remove_listener('response', handle_response) # 解析评论数据 if captured_data: comment_list = captured_data.get('comments') or [] has_more = captured_data.get('has_more', False) or captured_data.get('has_more', 0) == 1 next_cursor = str(captured_data.get('cursor', '')) total = captured_data.get('total', 0) or len(comment_list) print(f"[{self.platform_name}] 解析评论: total={total}, has_more={has_more}, comments={len(comment_list)}", flush=True) for comment in comment_list: cid = str(comment.get('cid', '')) if not cid: continue user = comment.get('user', {}) # 解析回复列表 replies = [] reply_list = comment.get('reply_comment', []) or [] for reply in reply_list: reply_user = reply.get('user', {}) replies.append(CommentItem( comment_id=str(reply.get('cid', '')), work_id=work_id, content=reply.get('text', ''), author_id=str(reply_user.get('uid', '')), author_name=reply_user.get('nickname', ''), author_avatar=reply_user.get('avatar_thumb', {}).get('url_list', [''])[0] if reply_user.get('avatar_thumb') else '', like_count=int(reply.get('digg_count', 0)), create_time=datetime.fromtimestamp(reply.get('create_time', 0)).strftime('%Y-%m-%d %H:%M:%S') if reply.get('create_time') else '', is_author=reply.get('is_author', False), )) comments.append(CommentItem( comment_id=cid, work_id=work_id, content=comment.get('text', ''), author_id=str(user.get('uid', '')), author_name=user.get('nickname', ''), author_avatar=user.get('avatar_thumb', {}).get('url_list', [''])[0] if user.get('avatar_thumb') else '', like_count=int(comment.get('digg_count', 0)), reply_count=int(comment.get('reply_comment_total', 0)), create_time=datetime.fromtimestamp(comment.get('create_time', 0)).strftime('%Y-%m-%d %H:%M:%S') if comment.get('create_time') else '', is_author=comment.get('is_author', False), replies=replies, )) print(f"[{self.platform_name}] 解析到 {len(comments)} 条评论", flush=True) else: print(f"[{self.platform_name}] 未捕获到评论 API 响应", flush=True) except Exception as e: import traceback traceback.print_exc() return CommentsResult( success=False, platform=self.platform_name, work_id=work_id, error=str(e) ) finally: await self.close_browser() result = CommentsResult( success=True, platform=self.platform_name, work_id=work_id, comments=comments, total=total, has_more=has_more ) result.__dict__['cursor'] = next_cursor return result async def get_all_comments(self, cookies: str) -> dict: """获取所有作品的评论 - 通过评论管理页面""" print(f"\n{'='*60}") print(f"[{self.platform_name}] 获取所有作品评论") print(f"{'='*60}") all_work_comments = [] captured_comments = [] captured_works = {} # work_id -> work_info try: await self.init_browser() cookie_list = self.parse_cookies(cookies) await self.set_cookies(cookie_list) if not self.page: raise Exception("Page not initialized") # 设置 API 响应监听器 async def handle_response(response): nonlocal captured_comments, captured_works url = response.url try: # 监听评论列表 API - 多种格式 # /comment/list/select/ 或 /comment/read 或 /creator/comment/list if '/comment/list' in url or '/comment/read' in url or 'comment_list' in url: json_data = await response.json() print(f"[{self.platform_name}] 捕获到评论 API: {url[:100]}...", flush=True) # 格式1: comments 字段 comments = json_data.get('comments', []) # 格式2: comment_info_list 字段 if not comments: comments = json_data.get('comment_info_list', []) if comments: # 从 URL 中提取 aweme_id import re aweme_id_match = re.search(r'aweme_id=(\d+)', url) aweme_id = aweme_id_match.group(1) if aweme_id_match else '' for comment in comments: # 添加 aweme_id 到评论中 if aweme_id and 'aweme_id' not in comment: comment['aweme_id'] = aweme_id captured_comments.append(comment) print(f"[{self.platform_name}] 捕获到 {len(comments)} 条评论 (aweme_id={aweme_id}),总计: {len(captured_comments)}", flush=True) # 监听作品列表 API if '/work_list' in url or '/item/list' in url or '/creator/item' in url: json_data = await response.json() aweme_list = json_data.get('aweme_list', []) or json_data.get('item_info_list', []) or json_data.get('item_list', []) print(f"[{self.platform_name}] 捕获到作品列表 API: {len(aweme_list)} 个作品", flush=True) for aweme in aweme_list: aweme_id = str(aweme.get('aweme_id', '') or aweme.get('item_id', '') or aweme.get('item_id_plain', '')) if aweme_id: cover_url = '' if aweme.get('Cover', {}).get('url_list'): cover_url = aweme['Cover']['url_list'][0] elif aweme.get('video', {}).get('cover', {}).get('url_list'): cover_url = aweme['video']['cover']['url_list'][0] elif aweme.get('cover_image_url'): cover_url = aweme['cover_image_url'] captured_works[aweme_id] = { 'title': aweme.get('item_title', '') or aweme.get('title', '') or aweme.get('desc', ''), 'cover': cover_url, 'comment_count': aweme.get('statistics', {}).get('comment_count', 0) or aweme.get('comment_count', 0), } except Exception as e: print(f"[{self.platform_name}] 解析响应失败: {e}", flush=True) self.page.on('response', handle_response) print(f"[{self.platform_name}] 已注册 API 响应监听器", flush=True) # 访问评论管理页面 print(f"[{self.platform_name}] 访问评论管理页面...", flush=True) await self.page.goto("https://creator.douyin.com/creator-micro/interactive/comment", wait_until="domcontentloaded", timeout=30000) await asyncio.sleep(5) # 检查登录状态 current_url = self.page.url if "login" in current_url or "passport" in current_url: raise Exception("Cookie 已过期,请重新登录") print(f"[{self.platform_name}] 页面加载完成,当前捕获: {len(captured_comments)} 条评论, {len(captured_works)} 个作品", flush=True) # 尝试点击"选择作品"来加载作品列表 try: select_btn = await self.page.query_selector('text="选择作品"') if select_btn: print(f"[{self.platform_name}] 点击选择作品按钮...", flush=True) await select_btn.click() await asyncio.sleep(3) # 获取作品列表 work_items = await self.page.query_selector_all('[class*="work-item"], [class*="video-item"], [class*="aweme-item"]') print(f"[{self.platform_name}] 找到 {len(work_items)} 个作品元素", flush=True) # 点击每个作品加载其评论 for i, item in enumerate(work_items[:10]): # 最多处理10个作品 try: await item.click() await asyncio.sleep(2) print(f"[{self.platform_name}] 已点击作品 {i+1}/{min(len(work_items), 10)}", flush=True) except: pass # 关闭选择作品弹窗 close_btn = await self.page.query_selector('[class*="close"], [class*="cancel"]') if close_btn: await close_btn.click() await asyncio.sleep(1) except Exception as e: print(f"[{self.platform_name}] 选择作品操作失败: {e}", flush=True) # 滚动加载更多评论 for i in range(5): await self.page.evaluate('window.scrollBy(0, 500)') await asyncio.sleep(1) await asyncio.sleep(3) # 移除监听器 self.page.remove_listener('response', handle_response) print(f"[{self.platform_name}] 最终捕获: {len(captured_comments)} 条评论, {len(captured_works)} 个作品", flush=True) # 按作品分组评论 work_comments_map = {} # work_id -> work_comments for comment in captured_comments: # 从评论中获取作品信息 aweme = comment.get('aweme', {}) or comment.get('item', {}) aweme_id = str(comment.get('aweme_id', '') or aweme.get('aweme_id', '') or aweme.get('item_id', '')) if not aweme_id: continue if aweme_id not in work_comments_map: work_info = captured_works.get(aweme_id, {}) work_comments_map[aweme_id] = { 'work_id': aweme_id, 'title': aweme.get('title', '') or aweme.get('desc', '') or work_info.get('title', ''), 'cover_url': aweme.get('cover', {}).get('url_list', [''])[0] if aweme.get('cover') else work_info.get('cover', ''), 'comments': [] } cid = str(comment.get('cid', '')) if not cid: continue user = comment.get('user', {}) work_comments_map[aweme_id]['comments'].append({ 'comment_id': cid, 'author_id': str(user.get('uid', '')), 'author_name': user.get('nickname', ''), 'author_avatar': user.get('avatar_thumb', {}).get('url_list', [''])[0] if user.get('avatar_thumb') else '', 'content': comment.get('text', ''), 'like_count': int(comment.get('digg_count', 0)), 'create_time': datetime.fromtimestamp(comment.get('create_time', 0)).strftime('%Y-%m-%d %H:%M:%S') if comment.get('create_time') else '', 'is_author': comment.get('is_author', False), }) all_work_comments = list(work_comments_map.values()) total_comments = sum(len(w['comments']) for w in all_work_comments) print(f"[{self.platform_name}] 获取到 {len(all_work_comments)} 个作品的 {total_comments} 条评论", flush=True) except Exception as e: import traceback traceback.print_exc() return { 'success': False, 'platform': self.platform_name, 'error': str(e), 'work_comments': [] } finally: await self.close_browser() return { 'success': True, 'platform': self.platform_name, 'work_comments': all_work_comments, 'total': len(all_work_comments) } async def auto_reply_private_messages(self, cookies: str) -> dict: """自动回复抖音私信 - 适配新页面结构""" print(f"\n{'='*60}") print(f"[{self.platform_name}] 开始自动回复抖音私信") print(f"{'='*60}") try: await self.init_browser() cookie_list = self.parse_cookies(cookies) await self.set_cookies(cookie_list) if not self.page: raise Exception("Page not initialized") # 访问抖音私信页面 await self.page.goto("https://creator.douyin.com/creator-micro/data/following/chat", timeout=30000) await asyncio.sleep(3) # 检查登录状态 current_url = self.page.url print(f"[{self.platform_name}] 当前 URL: {current_url}") if "login" in current_url or "passport" in current_url: raise Exception("Cookie 已过期,请重新登录") replied_count = 0 # 处理两个tab: 陌生人私信 和 朋友私信 for tab_name in ["陌生人私信", "朋友私信"]: print(f"\n{'='*50}") print(f"[{self.platform_name}] 处理 {tab_name} ...") print(f"{'='*50}") # 点击对应tab tab_locator = self.page.locator(f'div.semi-tabs-tab:text-is("{tab_name}")') if await tab_locator.count() > 0: await tab_locator.click() await asyncio.sleep(2) else: print(f"⚠️ 未找到 {tab_name} 标签,跳过") continue # 获取私信列表 session_items = self.page.locator('.semi-list-item') session_count = await session_items.count() print(f"[{self.platform_name}] {tab_name} 共找到 {session_count} 条会话") if session_count == 0: print(f"[{self.platform_name}] {tab_name} 无新私信") continue for idx in range(session_count): try: # 重新获取列表(防止 DOM 变化) current_sessions = self.page.locator('.semi-list-item') if idx >= await current_sessions.count(): break session = current_sessions.nth(idx) user_name = await session.locator('.item-header-name-vL_79m').inner_text() last_msg = await session.locator('.text-whxV9A').inner_text() print(f"\n ➤ [{idx+1}/{session_count}] 处理用户: {user_name} | 最后消息: {last_msg[:30]}...") # 检查会话预览消息是否包含非文字内容 if "分享" in last_msg and ("视频" in last_msg or "图片" in last_msg or "链接" in last_msg): print(" ➤ 会话预览为非文字消息,跳过") continue # 点击进入聊天 await session.click() await asyncio.sleep(2) # 提取聊天历史(判断最后一条是否是自己发的) chat_messages = self.page.locator('.box-item-dSA1TJ:not(.time-Za5gKL)') msg_count = await chat_messages.count() should_reply = True if msg_count > 0: # 最后一条消息 last_msg_el = chat_messages.nth(msg_count - 1) # 获取元素的 class 属性判断是否是自己发的 classes = await last_msg_el.get_attribute('class') or '' is_my_message = 'is-me-' in classes # 包含 is-me- 表示是自己发的 should_reply = not is_my_message # 如果是自己发的就不回复 if should_reply: # 提取完整聊天历史 chat_history = await self._extract_chat_history() if chat_history: # 生成回复 reply_text = await self._generate_reply_with_ai(chat_history) if not reply_text: reply_text = self._generate_reply(chat_history) if reply_text: print(f" 📝 回复内容: {reply_text}") # 填充输入框 input_box = self.page.locator('div.chat-input-dccKiL[contenteditable="true"]') send_btn = self.page.locator('button:has-text("发送")') if await input_box.is_visible() and await send_btn.is_visible(): await input_box.fill(reply_text) await asyncio.sleep(0.5) await send_btn.click() print(" ✅ 已发送") replied_count += 1 await asyncio.sleep(2) else: print(" ❌ 输入框或发送按钮不可见") else: print(" ➤ 无需回复") else: print(" ➤ 聊天历史为空,跳过") else: print(" ➤ 最后一条是我发的,跳过") except Exception as e: print(f" ❌ 处理会话 {idx+1} 时出错: {e}") continue print(f"[{self.platform_name}] 自动回复完成,共回复 {replied_count} 条消息") return { 'success': True, 'platform': self.platform_name, 'replied_count': replied_count, 'message': f'成功回复 {replied_count} 条私信' } except Exception as e: import traceback traceback.print_exc() return { 'success': False, 'platform': self.platform_name, 'error': str(e) } finally: await self.close_browser() # 辅助方法保持兼容(可复用) def _generate_reply(self, chat_history: list) -> str: """规则回复""" if not chat_history: return "你好!感谢联系~" last_msg = chat_history[-1]["content"] if "谢谢" in last_msg or "感谢" in last_msg: return "不客气!欢迎常来交流~" elif "你好" in last_msg or "在吗" in last_msg: return "你好!请问有什么可以帮您的?" elif "视频" in last_msg or "怎么拍" in last_msg: return "视频是用手机拍摄的,注意光线和稳定哦!" else: return "收到!我会认真阅读您的留言~" async def _extract_chat_history(self) -> list: """精准提取聊天记录,区分作者(自己)和用户""" if not self.page: return [] history = [] # 获取所有聊天消息(排除时间戳元素) message_wrappers = self.page.locator('.box-item-dSA1TJ:not(.time-Za5gKL)') count = await message_wrappers.count() for i in range(count): try: wrapper = message_wrappers.nth(i) # 检查是否为自己发送的消息 classes = await wrapper.get_attribute('class') or '' is_author = 'is-me-' in classes # 包含 is-me- 表示是自己发的 # 获取消息文本内容 text_element = wrapper.locator('.text-X2d7fS') if await text_element.count() > 0: content = await text_element.inner_text() content = content.strip() if content: # 只添加非空消息 # 获取用户名(如果是对方消息) author_name = '' if not is_author: # 尝试获取对方用户名 name_elements = wrapper.locator('.aweme-author-name-m8uoXU') if await name_elements.count() > 0: author_name = await name_elements.nth(0).inner_text() else: author_name = '用户' else: author_name = '我' history.append({ "author": author_name, "content": content, "is_author": is_author, }) except Exception as e: print(f" ⚠️ 解析第 {i+1} 条消息失败: {e}") continue return history async def _generate_reply_with_ai(self, chat_history: list) -> str: """使用 AI 生成回复(保留原逻辑)""" import os, requests, json try: ai_api_key = os.environ.get('DASHSCOPE_API_KEY', '') ai_base_url = os.environ.get('DASHSCOPE_BASE_URL', 'https://dashscope.aliyuncs.com/compatible-mode/v1') ai_model = os.environ.get('AI_MODEL', 'qwen-plus') if not ai_api_key: return self._generate_reply(chat_history) messages = [{"role": "system", "content": "你是一个友好的抖音创作者助手,负责回复粉丝私信。请保持简洁、友好、专业的语气。回复长度不超过20字。"}] for msg in chat_history: role = "assistant" if msg.get("is_author", False) else "user" messages.append({"role": role, "content": msg["content"]}) headers = {'Authorization': f'Bearer {ai_api_key}', 'Content-Type': 'application/json'} payload = {"model": ai_model, "messages": messages, "max_tokens": 150, "temperature": 0.8} response = requests.post(f"{ai_base_url}/chat/completions", headers=headers, json=payload, timeout=30) if response.status_code == 200: ai_reply = response.json().get('choices', [{}])[0].get('message', {}).get('content', '').strip() return ai_reply if ai_reply else self._generate_reply(chat_history) else: return self._generate_reply(chat_history) except: return self._generate_reply(chat_history) async def get_work_comments_mapping(self, cookies: str) -> dict: """获取所有作品及其评论的对应关系 Args: cookies: 抖音创作者平台的cookies Returns: dict: 包含作品和评论对应关系的JSON数据 """ print(f"\n{'='*60}") print(f"[{self.platform_name}] 获取作品和评论对应关系") print(f"{'='*60}") work_comments_mapping = [] try: await self.init_browser() cookie_list = self.parse_cookies(cookies) await self.set_cookies(cookie_list) if not self.page: raise Exception("Page not initialized") # 访问创作者中心首页 await self.page.goto("https://creator.douyin.com/creator-micro/home", timeout=30000) await asyncio.sleep(3) # 检查登录状态 current_url = self.page.url if "login" in current_url or "passport" in current_url: raise Exception("Cookie 已过期,请重新登录") # 访问内容管理页面获取作品列表 print(f"[{self.platform_name}] 访问内容管理页面...") await self.page.goto("https://creator.douyin.com/creator-micro/content/manage", timeout=30000) await asyncio.sleep(5) # 获取作品列表 works_result = await self.get_works(cookies, page=0, page_size=20) if not works_result.success: print(f"[{self.platform_name}] 获取作品列表失败: {works_result.error}") return { 'success': False, 'platform': self.platform_name, 'error': works_result.error, 'work_comments': [] } print(f"[{self.platform_name}] 获取到 {len(works_result.works)} 个作品") # 对每个作品获取评论 for i, work in enumerate(works_result.works): print(f"[{self.platform_name}] 正在获取作品 {i+1}/{len(works_result.works)} 的评论: {work.title[:20]}...") # 获取单个作品的评论 comments_result = await self.get_comments(cookies, work.work_id) if comments_result.success: work_comments_mapping.append({ 'work_info': work.to_dict(), 'comments': [comment.to_dict() for comment in comments_result.comments] }) print(f"[{self.platform_name}] 作品 '{work.title[:20]}...' 获取到 {len(comments_result.comments)} 条评论") else: print(f"[{self.platform_name}] 获取作品 '{work.title[:20]}...' 评论失败: {comments_result.error}") work_comments_mapping.append({ 'work_info': work.to_dict(), 'comments': [], 'error': comments_result.error }) # 添加延时避免请求过于频繁 await asyncio.sleep(2) print(f"[{self.platform_name}] 所有作品评论获取完成") except Exception as e: import traceback traceback.print_exc() return { 'success': False, 'platform': self.platform_name, 'error': str(e), 'work_comments': [] } finally: await self.close_browser() return { 'success': True, 'platform': self.platform_name, 'work_comments': work_comments_mapping, 'summary': { 'total_works': len(work_comments_mapping), 'total_comments': sum(len(item['comments']) for item in work_comments_mapping), } } ================================================================================ 文件: server\python\platforms\kuaishou.py ================================================================================ # -*- coding: utf-8 -*- """ 快手视频发布器 参考: matrix/ks_uploader/main.py """ import asyncio import os from datetime import datetime from typing import List from .base import ( BasePublisher, PublishParams, PublishResult, WorkItem, WorksResult, CommentItem, CommentsResult ) class KuaishouPublisher(BasePublisher): """ 快手视频发布器 使用 Playwright 自动化操作快手创作者中心 """ platform_name = "kuaishou" login_url = "https://cp.kuaishou.com/" publish_url = "https://cp.kuaishou.com/article/publish/video" cookie_domain = ".kuaishou.com" async def set_schedule_time(self, publish_date: datetime): """设置定时发布""" if not self.page: return # 选择定时发布 label_element = self.page.locator("label.radio--4Gpx6:has-text('定时发布')") await label_element.click() await asyncio.sleep(1) # 输入时间 publish_date_str = publish_date.strftime("%Y-%m-%d %H:%M") await self.page.locator('.semi-input[placeholder="日期和时间"]').click() await self.page.keyboard.press("Control+KeyA") await self.page.keyboard.type(str(publish_date_str)) await self.page.keyboard.press("Enter") await asyncio.sleep(1) async def upload_cover(self, cover_path: str): """上传封面图""" if not self.page or not cover_path or not os.path.exists(cover_path): return try: await self.page.get_by_role("button", name="编辑封面").click() await asyncio.sleep(1) await self.page.get_by_role("tab", name="上传封面").click() preview_div = self.page.get_by_role("tabpanel", name="上传封面").locator("div").nth(1) async with self.page.expect_file_chooser() as fc_info: await preview_div.click() preview_chooser = await fc_info.value await preview_chooser.set_files(cover_path) await self.page.get_by_role("button", name="确认").click() await asyncio.sleep(3) print(f"[{self.platform_name}] 封面上传成功") except Exception as e: print(f"[{self.platform_name}] 封面上传失败: {e}") async def publish(self, cookies: str, params: PublishParams) -> PublishResult: """发布视频到快手 - 参考 matrix/ks_uploader/main.py""" print(f"\n{'='*60}") print(f"[{self.platform_name}] 开始发布视频") print(f"[{self.platform_name}] 视频路径: {params.video_path}") print(f"[{self.platform_name}] 标题: {params.title}") print(f"[{self.platform_name}] Headless: {self.headless}") print(f"{'='*60}") self.report_progress(5, "正在初始化浏览器...") # 初始化浏览器 await self.init_browser() print(f"[{self.platform_name}] 浏览器初始化完成") # 解析并设置 cookies cookie_list = self.parse_cookies(cookies) print(f"[{self.platform_name}] 解析到 {len(cookie_list)} 个 cookies") await self.set_cookies(cookie_list) if not self.page: raise Exception("Page not initialized") # 检查视频文件 if not os.path.exists(params.video_path): raise Exception(f"视频文件不存在: {params.video_path}") print(f"[{self.platform_name}] 视频文件存在,大小: {os.path.getsize(params.video_path)} bytes") self.report_progress(10, "正在打开上传页面...") # 访问上传页面 - 参考 matrix await self.page.goto("https://cp.kuaishou.com/article/publish/video") print(f"[{self.platform_name}] 等待页面加载...") try: await self.page.wait_for_url("https://cp.kuaishou.com/article/publish/video", timeout=30000) except: pass await asyncio.sleep(3) # 检查是否跳转到登录页 current_url = self.page.url print(f"[{self.platform_name}] 当前 URL: {current_url}") if "passport" in current_url or "login" in current_url: screenshot_base64 = await self.capture_screenshot() return PublishResult( success=False, platform=self.platform_name, error="Cookie 已过期,需要重新登录", need_captcha=True, captcha_type='login', screenshot_base64=screenshot_base64, page_url=current_url, status='need_captcha' ) # 使用 AI 检查验证码 ai_captcha = await self.ai_check_captcha() if ai_captcha['has_captcha']: print(f"[{self.platform_name}] AI检测到验证码: {ai_captcha['captcha_type']}", flush=True) screenshot_base64 = await self.capture_screenshot() return PublishResult( success=False, platform=self.platform_name, error=f"检测到{ai_captcha['captcha_type']}验证码,需要使用有头浏览器完成验证", need_captcha=True, captcha_type=ai_captcha['captcha_type'], screenshot_base64=screenshot_base64, page_url=current_url, status='need_captcha' ) self.report_progress(15, "正在选择视频文件...") # 点击上传按钮 - 参考 matrix: page.get_by_role("button", name="上传视频") upload_btn = self.page.get_by_role("button", name="上传视频") async with self.page.expect_file_chooser(timeout=10000) as fc_info: await upload_btn.click() file_chooser = await fc_info.value await file_chooser.set_files(params.video_path) print(f"[{self.platform_name}] 视频文件已选择") await asyncio.sleep(1) # 关闭可能的弹窗 - 参考 matrix known_btn = self.page.get_by_role("button", name="我知道了") if await known_btn.count(): await known_btn.click() print(f"[{self.platform_name}] 关闭弹窗") self.report_progress(20, "正在填充标题...") # 填写标题 - 参考 matrix await asyncio.sleep(1) title_input = self.page.get_by_placeholder('添加合适的话题和描述,作品能获得更多推荐~') if await title_input.count(): await title_input.click() await title_input.fill(params.title[:30]) print(f"[{self.platform_name}] 标题已填写") self.report_progress(30, "等待视频上传完成...") # 等待上传完成 - 参考 matrix: span:has-text("上传成功") for i in range(120): try: count = await self.page.locator('span:has-text("上传成功")').count() if count > 0: print(f"[{self.platform_name}] 视频上传完毕") break else: print(f"[{self.platform_name}] 正在上传视频中... {i+1}/120") await asyncio.sleep(3) except: print(f"[{self.platform_name}] 正在上传视频中...") await asyncio.sleep(3) self.report_progress(50, "正在上传封面...") # 上传封面 - 参考 matrix await self.upload_cover(params.cover_path) await asyncio.sleep(5) self.report_progress(80, "正在发布...") # 点击发布 - 参考 matrix for i in range(30): try: publish_btn = self.page.get_by_role('button', name="发布", exact=True) if await publish_btn.count(): print(f"[{self.platform_name}] 点击发布按钮...") await publish_btn.click() # 等待跳转到管理页面 - 参考 matrix: https://cp.kuaishou.com/article/manage/video?status=2&from=publish await self.page.wait_for_url( "https://cp.kuaishou.com/article/manage/video*", timeout=1500 ) self.report_progress(100, "发布成功") print(f"[{self.platform_name}] 视频发布成功!") screenshot_base64 = await self.capture_screenshot() return PublishResult( success=True, platform=self.platform_name, message="发布成功", screenshot_base64=screenshot_base64, page_url=self.page.url, status='success' ) except Exception as e: current_url = self.page.url if "manage/video" in current_url: self.report_progress(100, "发布成功") print(f"[{self.platform_name}] 视频发布成功!") screenshot_base64 = await self.capture_screenshot() return PublishResult( success=True, platform=self.platform_name, message="发布成功", screenshot_base64=screenshot_base64, page_url=current_url, status='success' ) else: print(f"[{self.platform_name}] 视频正在发布中... {i+1}/30") await asyncio.sleep(0.5) # 发布超时 screenshot_base64 = await self.capture_screenshot() page_url = await self.get_page_url() return PublishResult( success=False, platform=self.platform_name, error="发布超时,请检查发布状态", screenshot_base64=screenshot_base64, page_url=page_url, status='need_action' ) async def get_works(self, cookies: str, page: int = 0, page_size: int = 20) -> WorksResult: """获取快手作品列表""" print(f"\n{'='*60}") print(f"[{self.platform_name}] 获取作品列表") print(f"[{self.platform_name}] page={page}, page_size={page_size}") print(f"{'='*60}") works: List[WorkItem] = [] total = 0 has_more = False try: await self.init_browser() cookie_list = self.parse_cookies(cookies) await self.set_cookies(cookie_list) if not self.page: raise Exception("Page not initialized") # 访问创作者中心 await self.page.goto("https://cp.kuaishou.com/") await asyncio.sleep(3) # 检查登录状态 current_url = self.page.url if "passport" in current_url or "login" in current_url: raise Exception("Cookie 已过期,请重新登录") # 调用作品列表 API pcursor = "" if page == 0 else str(page) api_url = f"https://cp.kuaishou.com/rest/cp/works/v2/video/pc/photo/list?count={page_size}&pcursor={pcursor}&status=public" js_code = f""" async () => {{ const resp = await fetch("{api_url}", {{ credentials: 'include', headers: {{ 'Accept': 'application/json' }} }}); return await resp.json(); }} """ response = await self.page.evaluate(js_code) if response.get('result') == 1: data = response.get('data', {}) photo_list = data.get('list', []) has_more = len(photo_list) >= page_size for photo in photo_list: photo_id = photo.get('photoId', '') if not photo_id: continue # 封面 cover_url = photo.get('coverUrl', '') if cover_url.startswith('http://'): cover_url = cover_url.replace('http://', 'https://') # 时长 duration = photo.get('duration', 0) // 1000 # 毫秒转秒 # 发布时间 create_time = photo.get('timestamp', 0) // 1000 publish_time = '' if create_time: from datetime import datetime publish_time = datetime.fromtimestamp(create_time).strftime('%Y-%m-%d %H:%M:%S') works.append(WorkItem( work_id=str(photo_id), title=photo.get('caption', '') or '无标题', cover_url=cover_url, duration=duration, status='published', publish_time=publish_time, play_count=photo.get('viewCount', 0), like_count=photo.get('likeCount', 0), comment_count=photo.get('commentCount', 0), share_count=photo.get('shareCount', 0), )) print(f"[{self.platform_name}] 获取到 {len(works)} 个作品") except Exception as e: import traceback traceback.print_exc() return WorksResult(success=False, platform=self.platform_name, error=str(e)) return WorksResult(success=True, platform=self.platform_name, works=works, total=total or len(works), has_more=has_more) async def get_comments(self, cookies: str, work_id: str, cursor: str = "") -> CommentsResult: """获取快手作品评论""" print(f"\n{'='*60}") print(f"[{self.platform_name}] 获取作品评论") print(f"[{self.platform_name}] work_id={work_id}") print(f"{'='*60}") comments: List[CommentItem] = [] total = 0 has_more = False try: await self.init_browser() cookie_list = self.parse_cookies(cookies) await self.set_cookies(cookie_list) if not self.page: raise Exception("Page not initialized") await self.page.goto("https://cp.kuaishou.com/") await asyncio.sleep(3) current_url = self.page.url if "passport" in current_url or "login" in current_url: raise Exception("Cookie 已过期,请重新登录") # 调用评论列表 API pcursor = cursor or "" api_url = f"https://cp.kuaishou.com/rest/cp/works/comment/list?photoId={work_id}&pcursor={pcursor}&count=20" js_code = f""" async () => {{ const resp = await fetch("{api_url}", {{ credentials: 'include', headers: {{ 'Accept': 'application/json' }} }}); return await resp.json(); }} """ response = await self.page.evaluate(js_code) if response.get('result') == 1: data = response.get('data', {}) comment_list = data.get('list', []) has_more = data.get('pcursor', '') != '' for comment in comment_list: cid = comment.get('commentId', '') if not cid: continue author = comment.get('author', {}) # 解析子评论 replies = [] sub_list = comment.get('subComments', []) or [] for sub in sub_list: sub_author = sub.get('author', {}) replies.append(CommentItem( comment_id=str(sub.get('commentId', '')), work_id=work_id, content=sub.get('content', ''), author_id=str(sub_author.get('id', '')), author_name=sub_author.get('name', ''), author_avatar=sub_author.get('headurl', ''), like_count=sub.get('likeCount', 0), create_time=str(sub.get('timestamp', '')), )) comments.append(CommentItem( comment_id=str(cid), work_id=work_id, content=comment.get('content', ''), author_id=str(author.get('id', '')), author_name=author.get('name', ''), author_avatar=author.get('headurl', ''), like_count=comment.get('likeCount', 0), reply_count=comment.get('subCommentCount', 0), create_time=str(comment.get('timestamp', '')), replies=replies, )) total = len(comments) print(f"[{self.platform_name}] 获取到 {total} 条评论") except Exception as e: import traceback traceback.print_exc() return CommentsResult(success=False, platform=self.platform_name, work_id=work_id, error=str(e)) return CommentsResult(success=True, platform=self.platform_name, work_id=work_id, comments=comments, total=total, has_more=has_more)