# -*- coding: utf-8 -*- """ 百家号视频发布器 """ import asyncio import json from typing import List from datetime import datetime from .base import ( BasePublisher, PublishParams, PublishResult, WorkItem, WorksResult, CommentItem, CommentsResult ) class BaijiahaoPublisher(BasePublisher): """ 百家号视频发布器 使用 Playwright 自动化操作百家号创作者中心 """ platform_name = "baijiahao" login_url = "https://baijiahao.baidu.com/" publish_url = "https://baijiahao.baidu.com/builder/rc/edit?type=video" cookie_domain = ".baidu.com" # 登录检测配置 login_check_url = "https://baijiahao.baidu.com/builder/rc/home" login_indicators = ["passport.baidu.com", "/login", "wappass.baidu.com"] login_selectors = ['text="登录"', 'text="请登录"', '[class*="login-btn"]'] async def get_account_info(self, cookies: str) -> dict: """ 获取百家号账号信息 使用直接 HTTP API 调用,不使用浏览器 """ import aiohttp print(f"\n{'='*60}") print(f"[{self.platform_name}] 获取账号信息 (使用 API)") print(f"{'='*60}") try: # 解析 cookies cookie_list = self.parse_cookies(cookies) cookie_dict = {c['name']: c['value'] for c in cookie_list} # 重要:百家号需要先访问主页建立会话上下文 print(f"[{self.platform_name}] 第一步:访问主页建立会话...") session_headers = { 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7', 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36', # Cookie 由 session 管理,不手动设置 'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8', 'Accept-Encoding': 'gzip, deflate, br', 'Connection': 'keep-alive', 'Upgrade-Insecure-Requests': '1', 'Sec-Fetch-Dest': 'document', 'Sec-Fetch-Mode': 'navigate', 'Sec-Fetch-Site': 'none', 'Sec-Fetch-User': '?1', 'sec-ch-ua': '"Not_A Brand";v="8", "Chromium";v="120", "Google Chrome";v="120"', 'sec-ch-ua-mobile': '?0', 'sec-ch-ua-platform': '"Windows"' } headers = { 'Accept': 'application/json, text/plain, */*', 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36', # Cookie 由 session 管理,不手动设置 'Referer': 'https://baijiahao.baidu.com/builder/rc/home', 'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8', 'Accept-Encoding': 'gzip, deflate, br', 'Connection': 'keep-alive', 'Sec-Fetch-Dest': 'empty', 'Sec-Fetch-Mode': 'cors', 'Sec-Fetch-Site': 'same-origin', 'sec-ch-ua': '"Not_A Brand";v="8", "Chromium";v="120", "Google Chrome";v="120"', 'sec-ch-ua-mobile': '?0', 'sec-ch-ua-platform': '"Windows"' } # 使用 cookies 参数初始化 session,让 aiohttp 自动管理 cookie 更新 async with aiohttp.ClientSession(cookies=cookie_dict) as session: # 步骤 0: 先访问主页建立会话上下文(关键步骤!) print(f"[{self.platform_name}] [0/4] 访问主页建立会话上下文...") async with session.get( 'https://baijiahao.baidu.com/builder/rc/home', headers=session_headers, timeout=aiohttp.ClientTimeout(total=30) ) as home_response: home_status = home_response.status print(f"[{self.platform_name}] 主页访问状态: {home_status}") # 获取响应头中的新cookies(如果有) if 'Set-Cookie' in home_response.headers: new_cookies = home_response.headers['Set-Cookie'] print(f"[{self.platform_name}] 获取到新的会话Cookie") # 这里可以处理新的cookies,但暂时跳过复杂处理 # 短暂等待确保会话建立 await asyncio.sleep(1) # 步骤 1: 获取账号基本信息 print(f"[{self.platform_name}] [1/4] 调用 appinfo API...") async with session.get( 'https://baijiahao.baidu.com/builder/app/appinfo', headers=headers, timeout=aiohttp.ClientTimeout(total=30) ) as response: appinfo_result = await response.json() print(f"[{self.platform_name}] appinfo API 完整响应: {json.dumps(appinfo_result, ensure_ascii=False)[:500]}") print(f"[{self.platform_name}] appinfo API 响应: errno={appinfo_result.get('errno')}") # 检查登录状态 if appinfo_result.get('errno') != 0: error_msg = appinfo_result.get('errmsg', '未知错误') errno = appinfo_result.get('errno') print(f"[{self.platform_name}] API 返回错误: errno={errno}, msg={error_msg}") # errno 110 表示未登录 if errno == 110: return { "success": False, "error": "Cookie 已失效,需要重新登录", "need_login": True } # errno 10001402 表示分散认证问题,尝试重新访问主页后重试 if errno == 10001402: print(f"[{self.platform_name}] 检测到分散认证问题,尝试重新访问主页...") await asyncio.sleep(2) # 重新访问主页 async with session.get( 'https://baijiahao.baidu.com/builder/rc/home', headers=session_headers, timeout=aiohttp.ClientTimeout(total=30) ) as retry_home_response: print(f"[{self.platform_name}] 重新访问主页状态: {retry_home_response.status}") await asyncio.sleep(1) # 重试 API 调用 async with session.get( 'https://baijiahao.baidu.com/builder/app/appinfo', headers=headers, timeout=aiohttp.ClientTimeout(total=30) ) as retry_response: retry_result = await retry_response.json() if retry_result.get('errno') == 0: print(f"[{self.platform_name}] 分散认证问题已解决") # 使用重试成功的结果继续处理 appinfo_result = retry_result else: print(f"[{self.platform_name}] 重试仍然失败") return { "success": False, "error": f"分散认证问题: {error_msg}", "need_login": True } return { "success": False, "error": error_msg, "need_login": True } # 获取用户数据 user_data = appinfo_result.get('data', {}).get('user', {}) if not user_data: return { "success": False, "error": "无法获取用户信息", "need_login": True } # 检查账号状态 status = user_data.get('status', '') # 有效的账号状态:audit(审核中), pass(已通过), normal(正常), newbie(新手) valid_statuses = ['audit', 'pass', 'normal', 'newbie'] if status not in valid_statuses: print(f"[{self.platform_name}] 账号状态异常: {status}") # 提取基本信息 account_name = user_data.get('name') or user_data.get('uname') or '百家号账号' app_id = user_data.get('app_id') or user_data.get('id', 0) account_id = str(app_id) if app_id else f"baijiahao_{int(datetime.now().timestamp() * 1000)}" # 处理头像 URL avatar_url = user_data.get('avatar') or user_data.get('avatar_unify', '') if avatar_url and avatar_url.startswith('//'): avatar_url = 'https:' + avatar_url print(f"[{self.platform_name}] 账号名称: {account_name}, ID: {account_id}") # 步骤 2: 获取粉丝数(非关键,失败不影响整体) fans_count = 0 try: print(f"[{self.platform_name}] [2/3] 调用 growth/get_info API 获取粉丝数...") async with session.get( 'https://baijiahao.baidu.com/cms-ui/rights/growth/get_info', headers=headers, timeout=aiohttp.ClientTimeout(total=10) ) as response: growth_result = await response.json() if growth_result.get('errno') == 0: growth_data = growth_result.get('data', {}) fans_count = int(growth_data.get('fans_num', 0)) print(f"[{self.platform_name}] 粉丝数: {fans_count}") else: print(f"[{self.platform_name}] 获取粉丝数失败: {growth_result.get('errmsg')}") except Exception as e: print(f"[{self.platform_name}] 获取粉丝数异常(非关键): {e}") # 步骤 3: 获取作品数量(使用与 Node 端一致的 API) works_count = 0 try: print(f"[{self.platform_name}] [3/3] 调用 article/lists API 获取作品数...") # 使用与 Node 端一致的 API 参数 list_url = 'https://baijiahao.baidu.com/pcui/article/lists?currentPage=1&pageSize=20&search=&type=&collection=&startDate=&endDate=&clearBeforeFetch=false&dynamic=0' async with session.get( list_url, headers={ 'accept': '*/*', 'user-agent': 'PostmanRuntime/7.51.0', # cookie 由 session 管理 'referer': 'https://baijiahao.baidu.com/builder/rc/content', 'connection': 'keep-alive', 'accept-encoding': 'gzip, deflate, br', }, timeout=aiohttp.ClientTimeout(total=30) ) as response: response_text = await response.text() print(f"[{self.platform_name}] ========== Works API Response ==========") print(f"[{self.platform_name}] Full response: {response_text[:1000]}...") # 只打印前1000字符 print(f"[{self.platform_name}] =========================================") works_result = json.loads(response_text) # 处理分散认证问题 (errno=10001402),重试一次 if works_result.get('errno') == 10001402: print(f"[{self.platform_name}] 分散认证问题 (errno=10001402),3秒后重试...") await asyncio.sleep(3) # 重试一次,使用更完整的请求头 retry_headers = headers.copy() retry_headers.update({ 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7', 'Cache-Control': 'max-age=0', 'Upgrade-Insecure-Requests': '1', }) async with session.get( list_url, headers=retry_headers, timeout=aiohttp.ClientTimeout(total=30) ) as retry_response: retry_text = await retry_response.text() print(f"[{self.platform_name}] ========== Works API Retry Response ==========") print(f"[{self.platform_name}] Full retry response: {retry_text[:1000]}...") print(f"[{self.platform_name}] ===============================================") works_result = json.loads(retry_text) if works_result.get('errno') == 10001402: print(f"[{self.platform_name}] 重试仍然失败,返回已获取的账号信息") works_result = None if works_result and works_result.get('errno') == 0: works_data = works_result.get('data', {}) # 优先使用 data.page.totalCount,如果没有则使用 data.total(兼容旧格式) page_info = works_data.get('page', {}) works_count = int(page_info.get('totalCount', works_data.get('total', 0))) print(f"[{self.platform_name}] 作品数: {works_count} (from page.totalCount: {page_info.get('totalCount')}, from total: {works_data.get('total')})") else: errno = works_result.get('errno') if works_result else 'unknown' errmsg = works_result.get('errmsg', 'unknown error') if works_result else 'no response' print(f"[{self.platform_name}] 获取作品数失败: errno={errno}, errmsg={errmsg}") except Exception as e: import traceback print(f"[{self.platform_name}] 获取作品数异常(非关键): {e}") traceback.print_exc() # 返回账号信息 account_info = { "success": True, "account_id": account_id, "account_name": account_name, "avatar_url": avatar_url, "fans_count": fans_count, "works_count": works_count, } print(f"[{self.platform_name}] ✓ 获取成功: {account_name} (粉丝: {fans_count}, 作品: {works_count})") return account_info except Exception as e: import traceback traceback.print_exc() return { "success": False, "error": str(e) } async def check_captcha(self) -> dict: """检查页面是否需要验证码""" if not self.page: return {'need_captcha': False, 'captcha_type': ''} try: # 检查各种验证码 captcha_selectors = [ 'text="请输入验证码"', 'text="滑动验证"', '[class*="captcha"]', '[class*="verify"]', ] for selector in captcha_selectors: try: if await self.page.locator(selector).count() > 0: print(f"[{self.platform_name}] 检测到验证码: {selector}") return {'need_captcha': True, 'captcha_type': 'image'} except: pass # 检查登录弹窗 login_selectors = [ 'text="请登录"', 'text="登录后继续"', '[class*="login-dialog"]', ] for selector in login_selectors: try: if await self.page.locator(selector).count() > 0: print(f"[{self.platform_name}] 检测到需要登录: {selector}") return {'need_captcha': True, 'captcha_type': 'login'} except: pass except Exception as e: print(f"[{self.platform_name}] 验证码检测异常: {e}") return {'need_captcha': False, 'captcha_type': ''} async def _ai_analyze_upload_state(self, screenshot_base64: str = None) -> dict: """ 使用 AI 识别当前上传状态,返回: { status: completed|uploading|failed|unknown, progress: int|None, confidence: int, reason: str, should_enter_publish_form: bool } """ import os import ast import re import requests result = { "status": "unknown", "progress": None, "confidence": 0, "reason": "", "should_enter_publish_form": False, } try: if not screenshot_base64: screenshot_base64 = await self.capture_screenshot() if not screenshot_base64: result["reason"] = "no-screenshot" return result ai_api_key = os.environ.get('DASHSCOPE_API_KEY', '') ai_base_url = os.environ.get('DASHSCOPE_BASE_URL', 'https://dashscope.aliyuncs.com/compatible-mode/v1') ai_vision_model = os.environ.get('AI_VISION_MODEL', 'qwen-vl-plus') if not ai_api_key: result["reason"] = "no-ai-key" return result prompt = """请分析这张“百家号视频发布页”截图,判断视频上传状态。 请只返回 JSON: { "status": "completed|uploading|failed|unknown", "progress": 0-100 或 null, "confidence": 0-100, "reason": "一句话证据", "should_enter_publish_form": true/false } 判定规则: 1) status=completed: - 出现“上传完成/处理完成/可发布/可填写标题描述/发布按钮可用”等信号 - 或者明显已进入可填写发布信息的阶段 2) status=uploading: - 出现“上传中/处理中/转码中/xx%/请稍候”等 3) status=failed: - 出现“上传失败/处理失败/格式不支持/文件异常”等明确失败文案 4) should_enter_publish_form=true: - 画面显示“去发布/下一步/继续/完成编辑”等入口,且看起来应点击进入正式发布表单 """ headers = { 'Authorization': f'Bearer {ai_api_key}', 'Content-Type': 'application/json' } payload = { "model": ai_vision_model, "messages": [ { "role": "user", "content": [ { "type": "image_url", "image_url": { "url": f"data:image/jpeg;base64,{screenshot_base64}" } }, { "type": "text", "text": prompt } ] } ], "max_tokens": 400 } response = requests.post( f"{ai_base_url}/chat/completions", headers=headers, json=payload, timeout=30 ) if response.status_code != 200: result["reason"] = f"ai-http-{response.status_code}" return result response_json = response.json() ai_response = response_json.get('choices', [{}])[0].get('message', {}).get('content', '') json_match = re.search(r'```json\\s*([\\s\\S]*?)\\s*```', ai_response) if json_match: json_str = json_match.group(1) else: json_match = re.search(r'\\{[\\s\\S]*\\}', ai_response) json_str = json_match.group(0) if json_match else '{}' try: data = json.loads(json_str) except Exception: try: data = ast.literal_eval(json_str) if json_str and json_str != '{}' else {} if not isinstance(data, dict): data = {} except Exception: data = {} # 兼容中文 key / 非标准结构 status_hint = str( data.get("status") or data.get("状态") or "" ).strip() status_raw = status_hint.lower() if ( status_raw in ["complete", "completed", "success", "done", "finished", "ready"] or any(k in status_hint for k in ["完成", "成功", "可发布", "已上传"]) ): status = "completed" elif ( status_raw in ["uploading", "processing", "in_progress", "progress", "running"] or any(k in status_hint for k in ["上传中", "处理中", "转码", "进行中", "上传"]) ): status = "uploading" elif ( status_raw in ["failed", "error", "fail"] or any(k in status_hint for k in ["失败", "错误", "异常"]) ): status = "failed" else: status = "unknown" progress = data.get("progress", data.get("进度", None)) parsed_progress = None try: if progress is not None and str(progress).strip() != "": parsed_progress = max(0, min(100, int(float(progress)))) except Exception: parsed_progress = None if parsed_progress is None: try: p_match = re.search(r'(\d{1,3})\s*%', ai_response or '') if p_match: parsed_progress = max(0, min(100, int(p_match.group(1)))) except Exception: parsed_progress = None confidence = 0 try: confidence = max(0, min(100, int(float(data.get("confidence", data.get("置信度", 0)) or 0)))) except Exception: confidence = 0 reason = str(data.get("reason", data.get("原因", "")) or "").strip() should_enter_raw = data.get( "should_enter_publish_form", data.get("是否进入发布表单", False) ) if isinstance(should_enter_raw, bool): should_enter = should_enter_raw else: should_enter_text = str(should_enter_raw or "").strip().lower() should_enter = should_enter_text in ["true", "1", "yes", "y", "是"] # 当 AI 响应不是严格 JSON 时,按全文关键词推断 response_text = str(ai_response or "") response_lower = response_text.lower() if status == "unknown": if any(k in response_text for k in ["上传完成", "处理完成", "上传成功", "可发布", "已完成"]): status = "completed" elif any(k in response_text for k in ["上传失败", "处理失败", "格式不支持", "文件异常", "失败"]): status = "failed" elif any(k in response_text for k in ["上传中", "处理中", "转码中", "请稍候"]) or re.search(r'(\d{1,3})\s*%', response_text): status = "uploading" if not should_enter and any(k in response_text for k in ["去发布", "下一步", "继续", "完成编辑"]): should_enter = True if not reason and response_text: reason = response_text.replace("\n", " ").strip()[:120] if confidence <= 0 and status != "unknown": confidence = 60 # 二次语义修正 if status == "uploading" and parsed_progress is not None and parsed_progress >= 100: status = "completed" should_enter = True # AI 有时会把 99/100 仍写成 uploading,这里做语义修正 if status == "uploading" and parsed_progress is not None and parsed_progress >= 99 and confidence >= 60: status = "completed" should_enter = True return { "status": status, "progress": parsed_progress, "confidence": confidence, "reason": reason, "should_enter_publish_form": should_enter, } except Exception as e: result["reason"] = f"ai-exception:{e}" return result async def _extract_bjh_token(self) -> str: """从页面上下文提取百家号接口 token。""" if not self.page: return "" try: token = await self.page.evaluate( """ () => { const isJwtLike = (v) => { if (!v || typeof v !== 'string') return false; const s = v.trim(); if (s.length < 60) return false; const parts = s.split('.'); if (parts.length !== 3) return false; return parts.every(p => /^[A-Za-z0-9_-]+$/.test(p) && p.length > 10); }; const pickFromStorage = (storage) => { try { const keys = Object.keys(storage || {}); for (const k of keys) { const v = storage.getItem(k); if (isJwtLike(v)) return v; } } catch {} return ""; }; let t = pickFromStorage(window.localStorage); if (t) return t; t = pickFromStorage(window.sessionStorage); if (t) return t; const meta = document.querySelector('meta[name="token"], meta[name="bjh-token"]'); const metaToken = meta && meta.getAttribute('content'); if (isJwtLike(metaToken)) return metaToken; const candidates = [ (window.__INITIAL_STATE__ && window.__INITIAL_STATE__.token) || "", (window.__PRELOADED_STATE__ && window.__PRELOADED_STATE__.token) || "", (window.__NUXT__ && window.__NUXT__.state && window.__NUXT__.state.token) || "", ]; for (const c of candidates) { if (isJwtLike(c)) return c; } return ""; } """ ) if token: return str(token) except Exception: pass try: import re html = await self.page.content() m = re.search(r'([A-Za-z0-9_-]{10,}\.[A-Za-z0-9_-]{10,}\.[A-Za-z0-9_-]{10,})', html) if m: return m.group(1) except Exception: pass return "" async def _verify_publish_from_content_page(self, expected_title: str, page_size: int = 20) -> bool: """ 到内容管理页调用列表接口,按标题二次确认是否已发布。 """ if not self.page: return False try: content_url = ( "https://baijiahao.baidu.com/builder/rc/content" f"?currentPage=1&pageSize={int(page_size)}" "&search=&type=&collection=&startDate=&endDate=" ) await self.page.goto(content_url, wait_until="domcontentloaded", timeout=60000) await asyncio.sleep(2) token = await self._extract_bjh_token() expected = (expected_title or "").strip() if not expected: return False fetch_result = await self.page.evaluate( """ async ({ token, pageSize }) => { const url = "https://baijiahao.baidu.com/pcui/article/lists" + "?currentPage=1" + `&pageSize=${pageSize}` + "&search=&type=&collection=&startDate=&endDate=" + "&clearBeforeFetch=false&dynamic=1"; const r = await fetch(url, { method: "GET", credentials: "include", headers: { "accept": "application/json, text/plain, */*", ...(token ? { token } : {}), }, }); const text = await r.text(); return { ok: r.ok, status: r.status, text }; } """, {"token": token, "pageSize": int(page_size)} ) if not fetch_result or not fetch_result.get("ok"): status = fetch_result.get("status") if isinstance(fetch_result, dict) else "unknown" print(f"[{self.platform_name}] 内容页校验接口失败: HTTP {status}") return False data = json.loads(fetch_result.get("text") or "{}") if data.get("errno") != 0: print(f"[{self.platform_name}] 内容页校验接口错误: errno={data.get('errno')}, msg={data.get('errmsg')}") return False items = ((data.get("data") or {}).get("list") or []) if not isinstance(items, list) or not items: print(f"[{self.platform_name}] 内容页校验:当前列表为空") return False # 标题匹配采用“全量相等 + 前缀包含”双策略,兼容平台侧自动截断。 expected_variants = {expected} if len(expected) > 12: expected_variants.add(expected[:12]) if len(expected) > 20: expected_variants.add(expected[:20]) for item in items: title = str(item.get("title") or "").strip() if not title: continue for needle in expected_variants: if needle and (title == needle or needle in title): print(f"[{self.platform_name}] 内容页校验命中标题: {title}") return True print(f"[{self.platform_name}] 内容页校验未命中标题,expected={expected}") return False except Exception as e: print(f"[{self.platform_name}] 内容页校验异常: {e}") return False async def publish(self, cookies: str, params: PublishParams) -> PublishResult: """发布视频到百家号""" import os import re import shutil print(f"\n{'='*60}") print(f"[{self.platform_name}] 开始发布视频") print(f"[{self.platform_name}] 视频路径: {params.video_path}") print(f"[{self.platform_name}] 标题: {params.title}") print(f"[{self.platform_name}] 描述: {(params.description or '')[:120]}") print(f"[{self.platform_name}] Headless: {self.headless}") print(f"{'='*60}") self.report_progress(5, "正在初始化浏览器...") # 初始化浏览器 await self.init_browser() print(f"[{self.platform_name}] 浏览器初始化完成") # 解析并设置 cookies cookie_list = self.parse_cookies(cookies) print(f"[{self.platform_name}] 解析到 {len(cookie_list)} 个 cookies") await self.set_cookies(cookie_list) if not self.page: raise Exception("Page not initialized") # 检查视频文件 if not os.path.exists(params.video_path): raise Exception(f"视频文件不存在: {params.video_path}") print(f"[{self.platform_name}] 视频文件存在,大小: {os.path.getsize(params.video_path)} bytes") # 关键兜底:百家号在标题框不可编辑时会将“文件名主干”作为默认标题。 # 因此上传前为视频创建“标题别名文件”(优先硬链接,失败再复制),确保默认标题可控。 upload_video_path = params.video_path try: raw_title = (params.title or "").strip() if raw_title: safe_title = re.sub(r'[<>:"/\\\\|?*\\x00-\\x1F]', '', raw_title) safe_title = re.sub(r'\\s+', ' ', safe_title).strip().rstrip('.') if not safe_title: safe_title = "video" safe_title = safe_title[:30] src_ext = os.path.splitext(params.video_path)[1] or ".mp4" alias_dir = os.path.join(os.path.dirname(params.video_path), "_bjh_upload_alias") os.makedirs(alias_dir, exist_ok=True) # 轻量清理:删除 24h 前的旧别名文件,避免长期累积 try: now_ts = datetime.now().timestamp() for fn in os.listdir(alias_dir): full = os.path.join(alias_dir, fn) if not os.path.isfile(full): continue if now_ts - os.path.getmtime(full) > 24 * 3600: try: os.remove(full) except Exception: pass except Exception: pass alias_name = f"{safe_title}{src_ext}" alias_path = os.path.join(alias_dir, alias_name) if os.path.abspath(alias_path) != os.path.abspath(params.video_path): if os.path.exists(alias_path): try: os.remove(alias_path) except Exception: pass try: os.link(params.video_path, alias_path) upload_video_path = alias_path print(f"[{self.platform_name}] 上传别名已创建(硬链接): {upload_video_path}") except Exception: shutil.copy2(params.video_path, alias_path) upload_video_path = alias_path print(f"[{self.platform_name}] 上传别名已创建(复制): {upload_video_path}") except Exception as e: upload_video_path = params.video_path print(f"[{self.platform_name}] 创建上传别名失败,回退原文件: {e}") self.report_progress(10, "正在打开上传页面...") # 访问视频发布页面(使用新视频发布界面) video_publish_url = "https://baijiahao.baidu.com/builder/rc/edit?type=videoV2&is_from_cms=1" await self.page.goto(video_publish_url, wait_until="domcontentloaded", timeout=60000) await asyncio.sleep(3) # 检查是否跳转到登录页 current_url = self.page.url print(f"[{self.platform_name}] 当前页面: {current_url}") for indicator in self.login_indicators: if indicator in current_url: screenshot_base64 = await self.capture_screenshot() return PublishResult( success=False, platform=self.platform_name, error="Cookie 已过期,需要重新登录", need_captcha=True, captcha_type='login', screenshot_base64=screenshot_base64, page_url=current_url, status='need_captcha' ) # 使用 AI 检查验证码 ai_captcha = await self.ai_check_captcha() if ai_captcha['has_captcha']: print(f"[{self.platform_name}] AI检测到验证码: {ai_captcha['captcha_type']}", flush=True) screenshot_base64 = await self.capture_screenshot() return PublishResult( success=False, platform=self.platform_name, error=f"检测到{ai_captcha['captcha_type']}验证码,需要使用有头浏览器完成验证", need_captcha=True, captcha_type=ai_captcha['captcha_type'], screenshot_base64=screenshot_base64, page_url=current_url, status='need_captcha' ) # 传统方式检查验证码 captcha_result = await self.check_captcha() if captcha_result['need_captcha']: screenshot_base64 = await self.capture_screenshot() return PublishResult( success=False, platform=self.platform_name, error=f"需要{captcha_result['captcha_type']}验证码,请使用有头浏览器完成验证", need_captcha=True, captcha_type=captcha_result['captcha_type'], screenshot_base64=screenshot_base64, page_url=current_url, status='need_captcha' ) self.report_progress(15, "正在选择视频文件...") # 等待页面加载完成 await asyncio.sleep(2) # 关闭可能的弹窗 try: close_buttons = [ 'button:has-text("我知道了")', 'button:has-text("知道了")', '[class*="close"]', '[class*="modal-close"]', ] for btn_selector in close_buttons: try: btn = self.page.locator(btn_selector).first if await btn.count() > 0 and await btn.is_visible(): await btn.click() await asyncio.sleep(0.5) except: pass except: pass # 上传视频 - 尝试多种方式 upload_triggered = False # 方法1: 直接通过 file input 上传 try: file_inputs = await self.page.query_selector_all('input[type="file"]') print(f"[{self.platform_name}] 找到 {len(file_inputs)} 个文件输入") for file_input in file_inputs: try: await file_input.set_input_files(upload_video_path) upload_triggered = True print(f"[{self.platform_name}] 通过 file input 上传成功") break except Exception as e: print(f"[{self.platform_name}] file input 上传失败: {e}") except Exception as e: print(f"[{self.platform_name}] 查找 file input 失败: {e}") # 方法2: 点击上传区域 if not upload_triggered: upload_selectors = [ 'div[class*="upload-box"]', 'div[class*="drag-upload"]', 'div[class*="uploader"]', 'div:has-text("点击上传")', 'div:has-text("选择文件")', '[class*="upload-area"]', ] for selector in upload_selectors: if upload_triggered: break try: upload_area = self.page.locator(selector).first if await upload_area.count() > 0: print(f"[{self.platform_name}] 尝试点击上传区域: {selector}") async with self.page.expect_file_chooser(timeout=10000) as fc_info: await upload_area.click() file_chooser = await fc_info.value await file_chooser.set_files(upload_video_path) upload_triggered = True print(f"[{self.platform_name}] 通过点击上传区域成功") break except Exception as e: print(f"[{self.platform_name}] 选择器 {selector} 失败: {e}") if not upload_triggered: screenshot_base64 = await self.capture_screenshot() return PublishResult( success=False, platform=self.platform_name, error="未找到上传入口", screenshot_base64=screenshot_base64, page_url=await self.get_page_url(), status='failed' ) self.report_progress(20, "等待视频上传...") # 等待视频上传完成(百家号大文件+处理可能较慢) upload_timeout = 900 start_time = asyncio.get_event_loop().time() last_heartbeat_time = start_time last_signal_time = start_time last_stall_log_time = start_time last_ai_upload_check_time = start_time - 60 ai_upload_check_interval = 20 ai_upload_poll_count = 0 ai_upload_unknown_streak = 0 last_pct = -1 forced_continue_after = 180 # 无进度信号时,3 分钟后执行兜底继续 processing_since = None processing_selector_hit = "" processing_stale_continue_after = 300 # 处理态持续 5 分钟仍无明确变化,执行兜底继续 has_progress_signal = False progress_signal_lost_continue_after = 90 # 已看到进度后,若信号中断 90s,直接进入下一步 hard_cutover_signal_gap_after = 120 # 已出现过进度后,信号中断超过该值则硬切下一阶段 hard_cutover_elapsed_after = 210 # 上传总耗时超过该值时,硬切下一阶段 async def _attempt_enter_publish_form_from_upload(stage: str) -> bool: enter_selectors = [ 'button:has-text("去发布")', '[role="button"]:has-text("去发布")', 'button:has-text("发布视频")', '[role="button"]:has-text("发布视频")', 'button:has-text("下一步")', '[role="button"]:has-text("下一步")', 'button:has-text("继续")', '[role="button"]:has-text("继续")', 'button:has-text("完成编辑")', '[role="button"]:has-text("完成编辑")', '[class*="next"] button', '[class*="step"] button', ] blocked_exact = {"发布", "定时发布", "立即发布", "取消", "返回", "关闭"} blocked_contains = ["定时发布", "立即发布", "取消", "返回", "关闭", "删除", "重传", "重新上传", "清空"] for selector in enter_selectors: try: btns = self.page.locator(selector) count = await btns.count() for idx in range(min(count, 6)): btn = btns.nth(idx) if not await btn.is_visible(): continue text = (await btn.text_content() or "").strip() compact = re.sub(r"\s+", "", text) if compact in blocked_exact or any(w in compact for w in blocked_contains): continue disabled_attr = await btn.get_attribute('disabled') aria_disabled = (await btn.get_attribute('aria-disabled') or '').lower() if disabled_attr is not None or aria_disabled == 'true': continue try: await btn.scroll_into_view_if_needed(timeout=1200) except Exception: pass try: await btn.click(timeout=2500) except Exception: await btn.click(force=True, timeout=2500) print(f"[{self.platform_name}] 上传阶段尝试切换到发布表单: stage={stage}, selector={selector}, text={compact or text}, idx={idx}") await asyncio.sleep(1) return True except Exception: pass # 深层 DOM 兜底(含 shadowRoot),应对常规选择器无法命中 try: deep_clicked = await self.page.evaluate( """ () => { const wanted = ['去发布', '发布视频', '下一步', '继续', '完成编辑']; const blockedExact = new Set(['发布', '定时发布', '立即发布', '取消', '返回', '关闭']); const blockedContains = ['定时发布', '立即发布', '取消', '返回', '关闭', '删除', '重传', '重新上传', '清空']; const roots = [document]; const visited = new Set(); const allNodes = []; while (roots.length) { const root = roots.pop(); if (!root || visited.has(root)) continue; visited.add(root); const nodes = root.querySelectorAll('*'); for (const n of nodes) { allNodes.push(n); if (n && n.shadowRoot) roots.push(n.shadowRoot); } } const isVisible = (el) => { try { const style = window.getComputedStyle(el); if (style.display === 'none' || style.visibility === 'hidden' || style.pointerEvents === 'none') return false; const rect = el.getBoundingClientRect(); return !!rect && rect.width > 8 && rect.height > 8; } catch { return false; } }; for (const el of allNodes) { const text = String(el.innerText || el.textContent || '').replace(/\\s+/g, '').trim(); if (!text) continue; if (blockedExact.has(text)) continue; if (blockedContains.some(x => text.includes(x))) continue; if (!wanted.some(x => text.includes(x))) continue; if (!isVisible(el)) continue; const tag = String(el.tagName || '').toLowerCase(); const role = String(el.getAttribute && el.getAttribute('role') || '').toLowerCase(); const cls = String(el.className || '').toLowerCase(); const clickable = tag === 'button' || tag === 'a' || role === 'button' || /btn|button|next|step/.test(cls); if (!clickable) continue; try { el.click(); return { ok: true, text }; } catch {} } return { ok: false, text: '' }; } """ ) if deep_clicked and deep_clicked.get("ok"): clicked_text = str(deep_clicked.get("text") or "").strip() print(f"[{self.platform_name}] 上传阶段深层DOM切换发布表单成功: stage={stage}, text={clicked_text}") await asyncio.sleep(1.2) return True except Exception: pass return False while asyncio.get_event_loop().time() - start_time < upload_timeout: now = asyncio.get_event_loop().time() elapsed = int(now - start_time) status_parts = [] # 检查上传进度 pct = None try: progress_nodes = self.page.locator('[class*="progress"], [class*="percent"], div:has-text("%"), span:has-text("%")') node_count = await progress_nodes.count() for idx in range(min(node_count, 6)): text = await progress_nodes.nth(idx).text_content() if not text: continue match = re.search(r'(\d{1,3})\s*%', text) if match: pct = max(0, min(100, int(match.group(1)))) break except Exception: pass if pct is not None: status_parts.append(f"progress={pct}%") last_signal_time = now has_progress_signal = True if pct != last_pct: self.report_progress(20 + min(35, int(pct * 0.35)), f"视频上传中 {pct}%...") last_pct = pct if pct >= 100: print(f"[{self.platform_name}] 上传完成(进度达到 100%)") break # 明确的上传完成提示 upload_done = False upload_done_selectors = [ 'div:has-text("上传完成")', 'div:has-text("处理完成")', 'div:has-text("上传成功")', 'span:has-text("上传完成")', '[class*="upload-success"]', ] try: for selector in upload_done_selectors: loc = self.page.locator(selector).first if await loc.count() > 0 and await loc.is_visible(): upload_done = True print(f"[{self.platform_name}] 检测到上传完成提示: {selector}") break except Exception: pass if upload_done: last_signal_time = now break # 检查处理态 is_processing = False processing_selectors = [ 'div:has-text("上传中")', 'span:has-text("上传中")', 'div:has-text("处理中")', 'span:has-text("处理中")', 'div:has-text("转码中")', 'span:has-text("转码中")', 'div:has-text("请稍候")', 'span:has-text("请稍候")', 'div:has-text("正在上传")', 'div:has-text("正在处理")', 'text="上传中"', 'text="处理中"', ] try: for selector in processing_selectors: loc = self.page.locator(selector).first if await loc.count() > 0 and await loc.is_visible(): is_processing = True processing_selector_hit = selector break except Exception: pass if is_processing: if processing_since is None: processing_since = now processing_elapsed = int(now - processing_since) status_parts.append(f"processing={processing_elapsed}s") if processing_selector_hit: status_parts.append(f"by={processing_selector_hit}") # 处理态短时间内视为有效信号;超过阈值后不再持续刷新 signal_gap,避免卡死 if processing_elapsed <= 180: last_signal_time = now else: processing_since = None processing_selector_hit = "" # 检查是否出现标题输入框(部分页面会在上传阶段就显示,需结合时间/处理态判断) title_input_visible = False try: title_input = self.page.locator('input[placeholder*="标题"], textarea[placeholder*="标题"], [class*="title-input"] input').first title_input_visible = await title_input.count() > 0 and await title_input.is_visible() except Exception: title_input_visible = False if title_input_visible and ( (not is_processing and elapsed >= 45) or (processing_since is not None and (now - processing_since) >= 180) or elapsed >= 360 ): print(f"[{self.platform_name}] 检测到可编辑标题,继续后续步骤") break # 检查是否有错误提示 error_text = '' try: error_nodes = self.page.locator('[class*="error"], [class*="fail"], div:has-text("上传失败"), div:has-text("处理失败")') err_count = await error_nodes.count() for idx in range(min(err_count, 6)): txt = (await error_nodes.nth(idx).text_content() or '').strip() if txt and any(k in txt for k in ['失败', '错误', '异常', '中断']): error_text = txt break except Exception: error_text = '' if error_text: screenshot_base64 = await self.capture_screenshot() return PublishResult( success=False, platform=self.platform_name, error=f"上传失败: {error_text}", screenshot_base64=screenshot_base64, page_url=await self.get_page_url(), status='failed' ) # AI 上传状态判定(节流),用于弥补 DOM/文案信号缺失 should_run_ai_upload_check = (now - last_ai_upload_check_time) >= ai_upload_check_interval if should_run_ai_upload_check: ai_upload_poll_count += 1 ai_upload_state = await self._ai_analyze_upload_state() last_ai_upload_check_time = now ai_status = str(ai_upload_state.get("status") or "unknown").strip().lower() ai_progress = ai_upload_state.get("progress") ai_confidence = int(ai_upload_state.get("confidence") or 0) ai_reason = str(ai_upload_state.get("reason") or "").strip() ai_should_enter_form = bool(ai_upload_state.get("should_enter_publish_form")) print( f"[{self.platform_name}] AI上传轮询#{ai_upload_poll_count}: elapsed={elapsed}s, " f"status={ai_status}, progress={ai_progress}, confidence={ai_confidence}, " f"enter_form={ai_should_enter_form}, reason={ai_reason or '-'}" ) if ai_status == "unknown": ai_upload_unknown_streak += 1 else: ai_upload_unknown_streak = 0 if ai_status == "failed": screenshot_base64 = await self.capture_screenshot() return PublishResult( success=False, platform=self.platform_name, error=f"上传失败(AI判定): {ai_reason or '检测到上传失败信号'}", screenshot_base64=screenshot_base64, page_url=await self.get_page_url(), status='failed' ) if ai_status == "completed": if ai_should_enter_form: await _attempt_enter_publish_form_from_upload("ai-completed") print(f"[{self.platform_name}] AI判定上传已完成,进入下一阶段") last_signal_time = now break if ai_status == "uploading": has_progress_signal = True last_signal_time = now if isinstance(ai_progress, (int, float)): ai_pct = max(0, min(100, int(ai_progress))) status_parts.append(f"ai-progress={ai_pct}%") if ai_pct != last_pct and ai_pct > 0: self.report_progress(20 + min(35, int(ai_pct * 0.35)), f"视频上传中 {ai_pct}%...") last_pct = ai_pct if ai_pct >= 99 and ai_confidence >= 60: if ai_should_enter_form: await _attempt_enter_publish_form_from_upload("ai-upload-99") print(f"[{self.platform_name}] AI判定上传接近完成,进入下一阶段") break else: status_parts.append("ai=uploading") if ai_should_enter_form and elapsed >= 60: await _attempt_enter_publish_form_from_upload("ai-uploading-enter-form") elif ai_status == "unknown" and ai_should_enter_form and elapsed >= 60: await _attempt_enter_publish_form_from_upload("ai-unknown-enter-form") elif ai_status == "unknown" and ai_upload_unknown_streak >= 3 and elapsed >= 90: await _attempt_enter_publish_form_from_upload("ai-unknown-streak") # 心跳日志,便于定位“卡住” if now - last_heartbeat_time >= 15: signal_gap = int(now - last_signal_time) extra = ", ".join(status_parts) if status_parts else "no-visible-signal" print(f"[{self.platform_name}] 上传等待中: elapsed={elapsed}s, signal_gap={signal_gap}s, {extra}") last_heartbeat_time = now # 已经出现过进度后,如果进度信号中断较久,进入下一步兜底 dynamic_signal_lost_after = progress_signal_lost_continue_after if last_pct >= 95: # 95%+ 阶段可能有短暂静默,适度放宽 dynamic_signal_lost_after = max(progress_signal_lost_continue_after, 150) elif last_pct >= 80: # 中后段进度(80%+)可能进入转码/校验静默期,但不应无限等待 dynamic_signal_lost_after = max(progress_signal_lost_continue_after, 150) elif last_pct >= 60: dynamic_signal_lost_after = max(progress_signal_lost_continue_after, 120) if has_progress_signal and (now - last_signal_time) >= dynamic_signal_lost_after: signal_gap = int(now - last_signal_time) if last_pct >= 95 or title_input_visible or elapsed >= max(780, upload_timeout - 60): print(f"[{self.platform_name}] 上传进度信号中断过久({signal_gap}s>={dynamic_signal_lost_after}s),继续后续步骤(兜底)") break if (last_pct >= 70 and signal_gap >= hard_cutover_signal_gap_after) or elapsed >= hard_cutover_elapsed_after: await _attempt_enter_publish_form_from_upload("hard-cutover-signal") print(f"[{self.platform_name}] 上传长时间无新信号,执行硬切换到标题阶段: elapsed={elapsed}s, signal_gap={signal_gap}s, last_pct={last_pct}") break if now - last_stall_log_time >= 30: print(f"[{self.platform_name}] 上传信号中断({signal_gap}s)但进度不足/标题未就绪,继续等待上传完成...") last_stall_log_time = now # 额外硬切策略:出现过中后段进度但长时间无新增信号时,不再继续卡住 if has_progress_signal and last_pct >= 70 and (now - last_signal_time) >= hard_cutover_signal_gap_after: signal_gap = int(now - last_signal_time) await _attempt_enter_publish_form_from_upload("hard-cutover-progress") print(f"[{self.platform_name}] 中后段上传信号停滞,强制切换到标题阶段: elapsed={elapsed}s, signal_gap={signal_gap}s, last_pct={last_pct}") break # 从未出现可见进度信号时,不再长时间卡在 20% if (not has_progress_signal) and elapsed >= forced_continue_after and (now - last_signal_time) >= 120: if title_input_visible or elapsed >= max(600, upload_timeout - 90): print(f"[{self.platform_name}] 上传阶段长时间无可见进度信号,继续后续步骤(兜底)") break if elapsed >= 480: await _attempt_enter_publish_form_from_upload("hard-cutover-no-signal") print(f"[{self.platform_name}] 上传持续无可见信号,执行硬切换到标题阶段: elapsed={elapsed}s") break if now - last_stall_log_time >= 30: print(f"[{self.platform_name}] 上传暂无可见信号且标题未就绪,继续等待...") last_stall_log_time = now # 处理态持续过久时兜底继续,避免固定 DOM 文案导致无限等待 if processing_since is not None and (now - processing_since) >= processing_stale_continue_after: if last_pct >= 95 or title_input_visible or elapsed >= max(780, upload_timeout - 60): print(f"[{self.platform_name}] 上传阶段处理态持续过久,继续后续步骤(兜底)") break if elapsed >= hard_cutover_elapsed_after: await _attempt_enter_publish_form_from_upload("hard-cutover-processing") print(f"[{self.platform_name}] 处理态持续过久且总耗时较长,执行硬切换到标题阶段: elapsed={elapsed}s") break if now - last_stall_log_time >= 30: print(f"[{self.platform_name}] 处理态持续较久但标题未就绪,继续等待上传收尾...") last_stall_log_time = now await asyncio.sleep(3) self.report_progress(60, "正在填写标题...") await asyncio.sleep(2) # 填写标题(严格校验写入结果,避免填错输入框) desired_title = (params.title or "").strip()[:30] # 百家号标题限制 30 字 video_stem = os.path.splitext(os.path.basename(params.video_path or ""))[0].strip().lower() def _normalize_title_for_match(value: str) -> str: v = re.sub(r"\s+", "", str(value or "")).strip().lower() v = re.sub(r"[`~!@#$%^&*()_+=\[\]{}\\|;:'\",.<>/?,。!?;:、()【】《》\-\u3000]", "", v) return v def _looks_like_non_title_value(value: str) -> bool: raw = str(value or "").strip() if not raw: return True compact = raw.lower() # 典型 UUID(平台内部资源ID/文件名) if re.fullmatch(r"[0-9a-f]{8}-[0-9a-f]{4}-[1-5]?[0-9a-f]{3}-[89ab]?[0-9a-f]{3}-[0-9a-f]{12}", compact): return True # 纯英文数字/连接符且较长,通常是资源ID而不是标题 if len(compact) >= 24 and re.fullmatch(r"[a-z0-9_-]+", compact): return True # 与视频文件名主干一致时,视为误填 if video_stem and compact == video_stem: return True # 文件路径或带扩展名文本,视为误填 if "\\" in raw or "/" in raw: return True if re.search(r"\.(mp4|mov|avi|mkv|wmv|flv|m4v)$", compact): return True return False def _title_matches_expected(current_value: str) -> bool: if not desired_title: return False current = str(current_value or "").strip() if not current: return False if _looks_like_non_title_value(current): return False expected_norm = _normalize_title_for_match(desired_title) current_norm = _normalize_title_for_match(current) if not expected_norm or not current_norm: return False if expected_norm == current_norm: return True if len(expected_norm) >= 4 and (expected_norm in current_norm or current_norm in expected_norm): return True prefix_len = min(8, len(expected_norm)) if prefix_len >= 4 and expected_norm[:prefix_len] in current_norm: return True return False title_filled = False title_verified_value = "" title_failure_reason = "" title_selectors = [ 'input[placeholder*="标题"]', 'textarea[placeholder*="标题"]', 'input[aria-label*="标题"]', 'textarea[aria-label*="标题"]', 'input[data-placeholder*="标题"]', 'textarea[data-placeholder*="标题"]', 'input[name*="title"]', 'textarea[name*="title"]', 'input[id*="title"]', 'textarea[id*="title"]', '[class*="title-input"] input', '[class*="title"] textarea', '[class*="title"] input', '[class*="headline"] input', '[class*="headline"] textarea', '[class*="name"] input', '[contenteditable="true"][placeholder*="标题"]', '[contenteditable="true"][aria-label*="标题"]', '[contenteditable="plaintext-only"][placeholder*="标题"]', '[data-placeholder*="标题"][contenteditable="true"]', '[class*="title"] [contenteditable="true"]', '[role="textbox"][aria-label*="标题"]', '[role="textbox"][placeholder*="标题"]', ] async def _has_editable_title_input() -> bool: for frame in self.page.frames: for selector in title_selectors: try: nodes = frame.locator(selector) count = await nodes.count() for idx in range(min(count, 10)): node = nodes.nth(idx) if not await node.is_visible(): continue node_type = (await node.get_attribute('type') or '').strip().lower() if node_type in ['file', 'hidden', 'checkbox', 'radio', 'button', 'submit']: continue try: if await node.is_disabled(): continue except Exception: pass return True except Exception: pass # 深层 DOM 检查(含 shadowRoot) for frame in self.page.frames: try: deep_found = await frame.evaluate( """ () => { const roots = [document]; const visited = new Set(); while (roots.length) { const root = roots.pop(); if (!root || visited.has(root)) continue; visited.add(root); const nodes = root.querySelectorAll('*'); for (const n of nodes) { if (n && n.shadowRoot) roots.push(n.shadowRoot); const tag = String(n.tagName || '').toLowerCase(); if (!['input', 'textarea'].includes(tag) && String(n.getAttribute && n.getAttribute('contenteditable') || '').toLowerCase() !== 'true' && String(n.getAttribute && n.getAttribute('role') || '').toLowerCase() !== 'textbox') { continue; } const type = String(n.getAttribute && n.getAttribute('type') || '').toLowerCase(); if (tag === 'input' && ['file', 'hidden', 'checkbox', 'radio', 'button', 'submit'].includes(type)) continue; if (n.disabled || n.readOnly) continue; const style = window.getComputedStyle(n); if (style.display === 'none' || style.visibility === 'hidden') continue; const rect = n.getBoundingClientRect(); if (!rect || rect.width < 8 || rect.height < 8) continue; return true; } } return false; } """ ) if deep_found: return True except Exception: pass return False async def _try_enter_publish_form(stage: str) -> bool: action_selectors = [ 'button:has-text("去发布")', '[role="button"]:has-text("去发布")', 'button:has-text("发布视频")', '[role="button"]:has-text("发布视频")', 'button:has-text("下一步")', '[role="button"]:has-text("下一步")', 'button:has-text("继续")', '[role="button"]:has-text("继续")', 'button:has-text("完成编辑")', '[role="button"]:has-text("完成编辑")', '[class*="next"] button', '[class*="step"] button', ] blocked_exact = {"发布", "定时发布", "立即发布", "取消", "返回", "关闭"} blocked_contains = ["定时发布", "立即发布", "取消", "返回", "关闭", "删除", "重传", "重新上传", "清空"] for frame in self.page.frames: frame_url = frame.url or "about:blank" for selector in action_selectors: try: btns = frame.locator(selector) btn_count = await btns.count() for idx in range(min(btn_count, 6)): btn = btns.nth(idx) if not await btn.is_visible(): continue text = (await btn.text_content() or "").strip() compact = re.sub(r"\s+", "", text) if compact in blocked_exact or any(t in compact for t in blocked_contains): continue disabled_attr = await btn.get_attribute('disabled') aria_disabled = (await btn.get_attribute('aria-disabled') or '').lower() if disabled_attr is not None or aria_disabled == 'true': continue try: await btn.scroll_into_view_if_needed(timeout=1500) except Exception: pass try: await btn.click(timeout=3000) except Exception: await btn.click(force=True, timeout=3000) print(f"[{self.platform_name}] 尝试进入发布表单: stage={stage}, frame={frame_url}, selector={selector}, text={compact or text}, idx={idx}") await asyncio.sleep(1.2) if await _has_editable_title_input(): print(f"[{self.platform_name}] 已进入可编辑发布表单: stage={stage}") return True except Exception: pass # 深层 DOM 兜底(含 shadowRoot) try: deep_clicked = await self.page.evaluate( """ () => { const wanted = ['去发布', '发布视频', '下一步', '继续', '完成编辑']; const blockedExact = new Set(['发布', '定时发布', '立即发布', '取消', '返回', '关闭']); const blockedContains = ['定时发布', '立即发布', '取消', '返回', '关闭', '删除', '重传', '重新上传', '清空']; const roots = [document]; const visited = new Set(); const allNodes = []; while (roots.length) { const root = roots.pop(); if (!root || visited.has(root)) continue; visited.add(root); const nodes = root.querySelectorAll('*'); for (const n of nodes) { allNodes.push(n); if (n && n.shadowRoot) roots.push(n.shadowRoot); } } const isVisible = (el) => { try { const style = window.getComputedStyle(el); if (style.display === 'none' || style.visibility === 'hidden' || style.pointerEvents === 'none') return false; const rect = el.getBoundingClientRect(); return !!rect && rect.width > 8 && rect.height > 8; } catch { return false; } }; for (const el of allNodes) { const text = String(el.innerText || el.textContent || '').replace(/\\s+/g, '').trim(); if (!text) continue; if (blockedExact.has(text)) continue; if (blockedContains.some(x => text.includes(x))) continue; if (!wanted.some(x => text.includes(x))) continue; if (!isVisible(el)) continue; const tag = String(el.tagName || '').toLowerCase(); const role = String(el.getAttribute && el.getAttribute('role') || '').toLowerCase(); const cls = String(el.className || '').toLowerCase(); const clickable = tag === 'button' || tag === 'a' || role === 'button' || /btn|button|next|step/.test(cls); if (!clickable) continue; try { el.click(); return { ok: true, text }; } catch {} } return { ok: false, text: '' }; } """ ) if deep_clicked and deep_clicked.get("ok"): print(f"[{self.platform_name}] 深层DOM进入发布表单成功: stage={stage}, text={str(deep_clicked.get('text') or '').strip()}") await asyncio.sleep(1.2) if await _has_editable_title_input(): print(f"[{self.platform_name}] 已进入可编辑发布表单(深层DOM): stage={stage}") return True except Exception: pass return False # 先等待可编辑标题框出现,避免上传兜底后立即进入导致误命中 file input await _try_enter_publish_form("pre-title") title_ready = False title_wait_deadline = asyncio.get_event_loop().time() + 180 last_title_wait_log = 0.0 last_enter_publish_try = 0.0 while asyncio.get_event_loop().time() < title_wait_deadline and not title_ready: try: if await _has_editable_title_input(): title_ready = True break except Exception: pass for frame in self.page.frames: if title_ready: break for selector in title_selectors: if title_ready: break try: title_nodes = frame.locator(selector) node_count = await title_nodes.count() for idx in range(min(node_count, 8)): node = title_nodes.nth(idx) if not await node.is_visible(): continue node_type = (await node.get_attribute('type') or '').strip().lower() if node_type in ['file', 'hidden', 'checkbox', 'radio', 'button', 'submit']: continue try: if await node.is_disabled(): continue except Exception: pass title_ready = True break except Exception: pass if title_ready: break now_wait = asyncio.get_event_loop().time() if now_wait - last_title_wait_log >= 10: print(f"[{self.platform_name}] 等待可编辑标题输入框... frames={len(self.page.frames)}") last_title_wait_log = now_wait if now_wait - last_enter_publish_try >= 15: await _try_enter_publish_form("title-wait") last_enter_publish_try = now_wait await asyncio.sleep(2) if not title_ready: title_failure_reason = "title-not-ready" print(f"[{self.platform_name}] 未检测到明确标题输入框,进入兜底识别模式") for frame in self.page.frames: if title_filled: break frame_url = frame.url or "about:blank" for selector in title_selectors: if title_filled: break try: title_nodes = frame.locator(selector) node_count = await title_nodes.count() for idx in range(min(node_count, 8)): node = title_nodes.nth(idx) if not await node.is_visible(): continue node_type = (await node.get_attribute('type') or '').strip().lower() if node_type in ['file', 'hidden', 'checkbox', 'radio', 'button', 'submit']: continue try: if await node.is_disabled(): continue except Exception: pass node_tag = "" try: node_tag = ((await node.evaluate("el => (el.tagName || '').toLowerCase()")) or "").strip() except Exception: node_tag = "" contenteditable_attr = (await node.get_attribute('contenteditable') or '').strip().lower() role_attr = (await node.get_attribute('role') or '').strip().lower() is_text_input = node_tag in ['input', 'textarea'] is_editable_block = contenteditable_attr == 'true' or role_attr == 'textbox' try: await node.click(timeout=2000) except Exception: pass if is_text_input: try: await node.fill(desired_title, timeout=5000) except Exception: try: await self.page.keyboard.press("Control+KeyA") await self.page.keyboard.press("Backspace") await self.page.keyboard.type(desired_title) except Exception: continue elif is_editable_block: try: await self.page.keyboard.press("Control+KeyA") await self.page.keyboard.press("Backspace") await self.page.keyboard.type(desired_title) except Exception: try: await node.evaluate( """ (el, title) => { el.focus(); el.textContent = title; el.dispatchEvent(new Event('input', { bubbles: true })); el.dispatchEvent(new Event('change', { bubbles: true })); } """, desired_title ) except Exception: continue else: continue await asyncio.sleep(0.2) current_value = "" if is_text_input: try: current_value = (await node.input_value() or "").strip() except Exception: current_value = "" else: try: current_value = ((await node.evaluate("el => (el.innerText || el.textContent || '')")) or "").strip() except Exception: current_value = "" if _title_matches_expected(current_value): title_filled = True title_verified_value = current_value print(f"[{self.platform_name}] 标题填写成功: frame={frame_url}, selector={selector}, idx={idx}, value={current_value}") break elif current_value: title_failure_reason = "candidate-mismatch" # 对同一节点再做一次 JS 强制赋值,处理键盘输入未生效的情况 forced_value = "" try: forced_value = ( (await node.evaluate( """ (el, title) => { const tag = String(el.tagName || '').toLowerCase(); const type = String((el.getAttribute('type') || '')).toLowerCase(); if (tag === 'input' && ['file', 'hidden', 'checkbox', 'radio', 'button', 'submit'].includes(type)) return ''; const ce = String(el.getAttribute('contenteditable') || '').toLowerCase(); const role = String(el.getAttribute('role') || '').toLowerCase(); const isTextInput = tag === 'input' || tag === 'textarea'; const isEditableBlock = ce === 'true' || role === 'textbox'; const emit = () => { el.dispatchEvent(new Event('input', { bubbles: true })); el.dispatchEvent(new Event('change', { bubbles: true })); }; try { el.focus(); } catch {} if (isTextInput) { try { const proto = tag === 'textarea' ? window.HTMLTextAreaElement.prototype : window.HTMLInputElement.prototype; const setter = Object.getOwnPropertyDescriptor(proto, 'value')?.set; if (setter) { setter.call(el, ''); emit(); setter.call(el, title); emit(); } else { el.value = ''; emit(); el.value = title; emit(); } } catch { el.value = title; emit(); } return String(el.value || '').trim(); } if (isEditableBlock) { el.textContent = ''; emit(); el.textContent = title; emit(); return String(el.innerText || el.textContent || '').trim(); } return ''; } """, desired_title )) or "" ).strip() except Exception: forced_value = "" if _title_matches_expected(forced_value): title_filled = True title_verified_value = forced_value print(f"[{self.platform_name}] 标题强制写入成功: frame={frame_url}, selector={selector}, idx={idx}, value={forced_value}") break print(f"[{self.platform_name}] 标题候选值不匹配,已忽略: frame={frame_url}, selector={selector}, idx={idx}, value={current_value}") except Exception as e: print(f"[{self.platform_name}] 标题选择器失败: frame={frame_url}, selector={selector}, err={e}") # 深层 DOM 兜底(含 shadowRoot) if not title_filled and desired_title: for frame in self.page.frames: if title_filled: break frame_url = frame.url or "about:blank" try: deep_result = await frame.evaluate( """ (title) => { const roots = [document]; const visited = new Set(); const candidates = []; while (roots.length) { const root = roots.pop(); if (!root || visited.has(root)) continue; visited.add(root); const nodes = root.querySelectorAll('*'); for (const n of nodes) { if (n && n.shadowRoot) roots.push(n.shadowRoot); const tag = String(n.tagName || '').toLowerCase(); const type = String(n.getAttribute && n.getAttribute('type') || '').toLowerCase(); const ce = String(n.getAttribute && n.getAttribute('contenteditable') || '').toLowerCase(); const role = String(n.getAttribute && n.getAttribute('role') || '').toLowerCase(); const isTextInput = tag === 'input' || tag === 'textarea'; const isEditableBlock = ce === 'true' || role === 'textbox'; if (!isTextInput && !isEditableBlock) continue; if (tag === 'input' && ['file', 'hidden', 'checkbox', 'radio', 'button', 'submit'].includes(type)) continue; if (n.disabled || n.readOnly) continue; const style = window.getComputedStyle(n); if (style.display === 'none' || style.visibility === 'hidden') continue; const rect = n.getBoundingClientRect(); if (!rect || rect.width < 8 || rect.height < 8) continue; const ph = String(n.getAttribute && n.getAttribute('placeholder') || ''); const aria = String(n.getAttribute && n.getAttribute('aria-label') || ''); const name = String(n.getAttribute && n.getAttribute('name') || ''); const id = String(n.getAttribute && n.getAttribute('id') || ''); const cls = String(n.className || ''); const maxLen = parseInt(String(n.getAttribute && n.getAttribute('maxlength') || '0'), 10) || 0; const container = n.closest && n.closest('label, [class*="form"], [class*="item"], [class*="field"], [class*="title"]'); const ctx = String((container && container.innerText) || '').slice(0, 80); let score = 0; if (/标题|title/i.test(ph)) score += 7; if (/标题|title/i.test(aria)) score += 6; if (/标题|title/i.test(name)) score += 5; if (/标题|title/i.test(id)) score += 5; if (/title|标题/i.test(cls)) score += 4; if (/标题|title/i.test(ctx)) score += 5; if (maxLen > 0 && maxLen <= 40) score += 3; if (isTextInput) score += 2; if (isEditableBlock) score += 1; candidates.push({ n, score, isTextInput, isEditableBlock }); } } candidates.sort((a, b) => b.score - a.score); if (!candidates.length) return { ok: false, value: '', reason: 'no-candidate' }; const emit = (el) => { el.dispatchEvent(new Event('input', { bubbles: true })); el.dispatchEvent(new Event('change', { bubbles: true })); }; let lastError = ''; for (const item of candidates.slice(0, 12)) { const el = item.n; try { el.focus(); if (item.isTextInput) { const tag = String(el.tagName || '').toLowerCase(); const proto = tag === 'textarea' ? window.HTMLTextAreaElement.prototype : window.HTMLInputElement.prototype; const setter = Object.getOwnPropertyDescriptor(proto, 'value')?.set; if (setter) { setter.call(el, ''); emit(el); setter.call(el, title); emit(el); } else { el.value = ''; emit(el); el.value = title; emit(el); } const v = String(el.value || '').trim(); if (v) return { ok: true, value: v, score: item.score }; } else if (item.isEditableBlock) { el.textContent = ''; emit(el); el.textContent = title; emit(el); const v = String(el.innerText || el.textContent || '').trim(); if (v) return { ok: true, value: v, score: item.score }; } } catch (e) { lastError = String(e || ''); } } return { ok: false, value: '', reason: lastError || 'set-value-failed' }; } """, desired_title ) if deep_result and deep_result.get('ok'): deep_written = str(deep_result.get('value') or '').strip() if _title_matches_expected(deep_written): title_filled = True title_verified_value = deep_written print(f"[{self.platform_name}] 标题深层DOM填写成功: frame={frame_url}, value={deep_written}") break elif deep_written: title_failure_reason = "deep-dom-mismatch" print(f"[{self.platform_name}] 标题深层DOM命中但值不匹配: frame={frame_url}, value={deep_written}") except Exception: pass # JS 兜底写入标题 if not title_filled and desired_title: fallback_reason = "" for frame in self.page.frames: if title_filled: break frame_url = frame.url or "about:blank" try: fallback = await frame.evaluate( """ (title) => { const nodes = Array.from(document.querySelectorAll( 'input:not([type="file"]):not([type="hidden"]), textarea, [contenteditable="true"], [role="textbox"]' )); const scored = nodes .map((el) => { const tag = String(el.tagName || '').toLowerCase(); const type = String((el.getAttribute('type') || '')).toLowerCase(); if (tag === 'input' && ['file', 'hidden', 'checkbox', 'radio', 'button', 'submit'].includes(type)) return null; if (el.disabled || el.readOnly) return null; const style = window.getComputedStyle(el); if (style.display === 'none' || style.visibility === 'hidden') return null; const rect = el.getBoundingClientRect(); if (!rect || rect.width < 8 || rect.height < 8) return null; const ph = String(el.getAttribute('placeholder') || ''); const aria = String(el.getAttribute('aria-label') || ''); const name = String(el.getAttribute('name') || ''); const id = String(el.getAttribute('id') || ''); const cls = String(el.className || ''); const ce = String(el.getAttribute('contenteditable') || '').toLowerCase(); const role = String(el.getAttribute('role') || '').toLowerCase(); const maxLen = parseInt(String(el.getAttribute('maxlength') || '0'), 10) || 0; const container = el.closest('label, [class*="form"], [class*="item"], [class*="field"], [class*="title"]'); const ctx = String((container && container.innerText) || '').slice(0, 80); let score = 0; if (ph.includes('标题')) score += 6; if (aria.includes('标题')) score += 5; if (/title|标题/i.test(name)) score += 4; if (/title|标题/i.test(id)) score += 4; if (/title|标题/i.test(cls)) score += 3; if (/标题|title/i.test(ctx)) score += 4; if (maxLen > 0 && maxLen <= 40) score += 3; if (tag === 'input' || tag === 'textarea') score += 1; if (ce === 'true' || role === 'textbox') score += 2; return { el, score, maxLen }; }) .filter(x => x && x.score > 0) .sort((a, b) => b.score - a.score); // 没有明显标题线索时,回退到短输入框(常见标题长度限制) const candidates = scored.length ? scored : nodes .map((el) => { const tag = String(el.tagName || '').toLowerCase(); const type = String((el.getAttribute('type') || '')).toLowerCase(); if (tag === 'input' && ['file', 'hidden', 'checkbox', 'radio', 'button', 'submit'].includes(type)) return null; if (el.disabled || el.readOnly) return null; const style = window.getComputedStyle(el); if (style.display === 'none' || style.visibility === 'hidden') return null; const rect = el.getBoundingClientRect(); if (!rect || rect.width < 8 || rect.height < 8) return null; const maxLen = parseInt(String(el.getAttribute('maxlength') || '0'), 10) || 0; const score = (maxLen > 0 && maxLen <= 40 ? 3 : 0) + (tag === 'input' || tag === 'textarea' ? 1 : 0); return score > 0 ? { el, score, maxLen } : null; }) .filter(Boolean) .sort((a, b) => b.score - a.score); if (!candidates.length) return { ok: false, value: '', reason: 'no-scored-input' }; let lastError = ''; for (const item of candidates.slice(0, 10)) { const target = item.el; const tag = String(target.tagName || '').toLowerCase(); const ce = String(target.getAttribute('contenteditable') || '').toLowerCase(); const role = String(target.getAttribute('role') || '').toLowerCase(); const isTextInput = tag === 'input' || tag === 'textarea'; const isEditableBlock = ce === 'true' || role === 'textbox'; try { target.focus(); if (isTextInput) { target.value = ''; target.dispatchEvent(new Event('input', { bubbles: true })); target.value = title; target.dispatchEvent(new Event('input', { bubbles: true })); target.dispatchEvent(new Event('change', { bubbles: true })); const v = String(target.value || '').trim(); if (v) return { ok: true, value: v, score: item.score || 0 }; } else if (isEditableBlock) { target.textContent = ''; target.dispatchEvent(new Event('input', { bubbles: true })); target.textContent = title; target.dispatchEvent(new Event('input', { bubbles: true })); target.dispatchEvent(new Event('change', { bubbles: true })); const v = String(target.innerText || target.textContent || '').trim(); if (v) return { ok: true, value: v, score: item.score || 0 }; } } catch (e) { lastError = String(e || ''); } } return { ok: false, value: '', reason: lastError || 'set-value-failed' }; } """, desired_title ) if fallback and fallback.get('ok'): written = str(fallback.get('value') or '').strip() if _title_matches_expected(written): title_filled = True title_verified_value = written print(f"[{self.platform_name}] 标题 JS 兜底填写成功: frame={frame_url}, value={written}") break elif written: fallback_reason = f"fallback-value-not-match:{written}" title_failure_reason = fallback_reason print(f"[{self.platform_name}] 标题 JS 兜底命中疑似错误字段,已忽略: frame={frame_url}, value={written}") elif fallback: fallback_reason = str(fallback.get('reason') or '') if fallback_reason: title_failure_reason = fallback_reason except Exception as e: fallback_reason = str(e) if fallback_reason: title_failure_reason = fallback_reason if not title_filled: print(f"[{self.platform_name}] 标题 JS 兜底未命中: reason={fallback_reason or 'unknown'}") # 强化重试:标题框可能在上传收尾阶段延迟可编辑,循环尝试写入一段时间 if not title_filled and desired_title: print(f"[{self.platform_name}] 标题常规填写未命中,进入强化重试...") # 百家号在上传 80%+ 后可能经历较长静默处理期,给更长窗口等待标题输入框真正可编辑 strong_retry_deadline = asyncio.get_event_loop().time() + 240 strong_retry_round = 0 last_retry_log = 0.0 while asyncio.get_event_loop().time() < strong_retry_deadline and not title_filled: strong_retry_round += 1 retry_reason = "" if strong_retry_round == 1 or strong_retry_round % 5 == 0: await _try_enter_publish_form(f"title-retry-{strong_retry_round}") for frame in self.page.frames: if title_filled: break frame_url = frame.url or "about:blank" try: retry_result = await frame.evaluate( """ (title) => { const nodes = Array.from(document.querySelectorAll( 'input:not([type="file"]):not([type="hidden"]), textarea, [contenteditable="true"], [role="textbox"]' )); const candidates = nodes .map((el) => { const tag = String(el.tagName || '').toLowerCase(); const type = String((el.getAttribute('type') || '')).toLowerCase(); if (tag === 'input' && ['file', 'hidden', 'checkbox', 'radio', 'button', 'submit'].includes(type)) return null; if (el.disabled || el.readOnly) return null; const style = window.getComputedStyle(el); if (style.display === 'none' || style.visibility === 'hidden') return null; const rect = el.getBoundingClientRect(); if (!rect || rect.width < 8 || rect.height < 8) return null; const ph = String(el.getAttribute('placeholder') || ''); const aria = String(el.getAttribute('aria-label') || ''); const name = String(el.getAttribute('name') || ''); const id = String(el.getAttribute('id') || ''); const cls = String(el.className || ''); const ce = String(el.getAttribute('contenteditable') || '').toLowerCase(); const role = String(el.getAttribute('role') || '').toLowerCase(); const maxLen = parseInt(String(el.getAttribute('maxlength') || '0'), 10) || 0; const container = el.closest('label, [class*="form"], [class*="item"], [class*="field"], [class*="title"]'); const ctx = String((container && container.innerText) || '').slice(0, 80); let score = 0; if (/标题|title/i.test(ph)) score += 7; if (/标题|title/i.test(aria)) score += 6; if (/标题|title/i.test(name)) score += 5; if (/标题|title/i.test(id)) score += 5; if (/title|标题/i.test(cls)) score += 4; if (/标题|title/i.test(ctx)) score += 5; if (maxLen > 0 && maxLen <= 40) score += 3; if (tag === 'input' || tag === 'textarea') score += 2; if (ce === 'true' || role === 'textbox') score += 1; return { el, score }; }) .filter(Boolean) .sort((a, b) => b.score - a.score); if (!candidates.length) { return { ok: false, value: '', score: -1, reason: 'no-candidate' }; } let lastError = ''; for (const item of candidates.slice(0, 12)) { const target = item.el; const tag = String(target.tagName || '').toLowerCase(); const ce = String(target.getAttribute('contenteditable') || '').toLowerCase(); const role = String(target.getAttribute('role') || '').toLowerCase(); const isTextInput = tag === 'input' || tag === 'textarea'; const isEditableBlock = ce === 'true' || role === 'textbox'; const emit = () => { target.dispatchEvent(new Event('input', { bubbles: true })); target.dispatchEvent(new Event('change', { bubbles: true })); }; try { target.focus(); if (isTextInput) { try { const proto = tag === 'textarea' ? window.HTMLTextAreaElement.prototype : window.HTMLInputElement.prototype; const setter = Object.getOwnPropertyDescriptor(proto, 'value')?.set; if (setter) { setter.call(target, ''); emit(); setter.call(target, title); emit(); } else { target.value = ''; emit(); target.value = title; emit(); } } catch { target.value = title; emit(); } const v = String(target.value || '').trim(); if (v) return { ok: true, value: v, score: item.score || 0, reason: '' }; } else if (isEditableBlock) { target.textContent = ''; emit(); target.textContent = title; emit(); const v = String(target.innerText || target.textContent || '').trim(); if (v) return { ok: true, value: v, score: item.score || 0, reason: '' }; } } catch (e) { lastError = String(e || ''); } } return { ok: false, value: '', score: -1, reason: lastError || 'set-value-failed' }; } """, desired_title ) if retry_result and retry_result.get('ok'): written = str(retry_result.get('value') or '').strip() score = int(retry_result.get('score') or 0) # 强化重试仍要求“像标题”且可匹配,避免误写到其他文本框 if score >= 3 and _title_matches_expected(written): title_filled = True title_verified_value = written print(f"[{self.platform_name}] 标题强化重试成功: round={strong_retry_round}, frame={frame_url}, score={score}, value={written}") break elif written: retry_reason = f"value-not-match:{written},score={score}" elif retry_result: retry_reason = str(retry_result.get('reason') or '') except Exception as e: retry_reason = str(e) if title_filled: break now_retry = asyncio.get_event_loop().time() if retry_reason in ("no-candidate", "no-scored-input"): has_title_input = await _has_editable_title_input() if not has_title_input: retry_reason = "no-candidate-and-form-not-ready" if now_retry - last_retry_log >= 10: print(f"[{self.platform_name}] 标题强化重试中: round={strong_retry_round}, reason={retry_reason or 'pending'}") last_retry_log = now_retry if retry_reason: title_failure_reason = retry_reason await asyncio.sleep(3) # AI 兜底:页面结构变化时,通过视觉识别返回可用 selector if not title_filled and desired_title: print(f"[{self.platform_name}] 标题强化重试仍未命中,尝试 AI selector 兜底...") try: ai_goal = "找到页面中用于填写视频标题的输入框或可编辑区域,返回一个可直接输入标题的 Playwright selector" ai_selector = await self.ai_suggest_playwright_selector(ai_goal) if ai_selector.get("has_selector"): selector = str(ai_selector.get("selector") or "").strip() confidence = int(ai_selector.get("confidence") or 0) print(f"[{self.platform_name}] AI 标题 selector: {selector}, confidence={confidence}") for frame in self.page.frames: if title_filled: break frame_url = frame.url or "about:blank" try: loc = frame.locator(selector).first if await loc.count() <= 0 or not await loc.is_visible(): continue try: await loc.click(timeout=2500) except Exception: pass node_tag = "" try: node_tag = ((await loc.evaluate("el => (el.tagName || '').toLowerCase()")) or "").strip() except Exception: node_tag = "" is_text_input = node_tag in ["input", "textarea"] if is_text_input: try: await loc.fill(desired_title, timeout=5000) except Exception: await self.page.keyboard.press("Control+KeyA") await self.page.keyboard.press("Backspace") await self.page.keyboard.type(desired_title) else: try: await self.page.keyboard.press("Control+KeyA") await self.page.keyboard.press("Backspace") await self.page.keyboard.type(desired_title) except Exception: await loc.evaluate( """ (el, title) => { el.focus(); const tag = String(el.tagName || '').toLowerCase(); if (tag === 'input' || tag === 'textarea') { el.value = title; } else { el.textContent = title; } el.dispatchEvent(new Event('input', { bubbles: true })); el.dispatchEvent(new Event('change', { bubbles: true })); } """, desired_title ) await asyncio.sleep(0.3) current_value = "" try: if is_text_input: current_value = (await loc.input_value() or "").strip() else: current_value = ((await loc.evaluate("el => (el.innerText || el.textContent || '')")) or "").strip() except Exception: current_value = "" if _title_matches_expected(current_value): title_filled = True title_verified_value = current_value print(f"[{self.platform_name}] AI selector 标题填写成功: frame={frame_url}, value={current_value}") break else: print(f"[{self.platform_name}] AI selector 命中但值不匹配: frame={frame_url}, value={current_value}") except Exception as e: print(f"[{self.platform_name}] AI selector 执行失败: frame={frame_url}, err={e}") else: print(f"[{self.platform_name}] AI 未返回可用标题 selector: {ai_selector.get('notes') or 'no-notes'}") title_failure_reason = "ai-no-selector" except Exception as e: print(f"[{self.platform_name}] AI 标题兜底异常: {e}") title_failure_reason = f"ai-exception:{e}" if not title_filled: # 某些版本页面在上传后长期不暴露可编辑标题框;不中断流程,尝试继续发布。 if any(k in (title_failure_reason or "") for k in ["no-candidate", "form-not-ready", "title-not-ready", "ai-no-selector"]): print(f"[{self.platform_name}] 标题输入框未就绪({title_failure_reason}),继续后续发布流程(使用页面现有标题)") else: screenshot_base64 = await self.capture_screenshot() return PublishResult( success=False, platform=self.platform_name, error=f"标题填写失败,已终止发布: {title_failure_reason or 'unknown'}", screenshot_base64=screenshot_base64, page_url=await self.get_page_url(), status='failed' ) # 填写描述 if params.description: self.report_progress(65, "正在填写描述...") try: desc_selectors = [ 'textarea[placeholder*="描述"]', 'textarea[placeholder*="简介"]', '[class*="desc"] textarea', '[class*="description"] textarea', ] for selector in desc_selectors: try: desc_input = self.page.locator(selector).first if await desc_input.count() > 0 and await desc_input.is_visible(): await desc_input.click() await self.page.keyboard.type(params.description[:200]) print(f"[{self.platform_name}] 描述填写成功") break except: pass except Exception as e: print(f"[{self.platform_name}] 描述填写失败: {e}") self.report_progress(70, "正在发布...") await asyncio.sleep(1.5) # 点击发布按钮(等待按钮可点击,避免上传完成后直接误判失败) publish_selectors = [ 'button:has-text("立即发布")', '[role="button"]:has-text("立即发布")', 'button:has-text("确认发布")', '[role="button"]:has-text("确认发布")', 'button:has-text("发布")', '[role="button"]:has-text("发布")', 'button:has-text("发表")', 'button:has-text("提交")', '[class*="publish"] button', '[class*="submit"] button', ] publish_blocked_keywords = [ "定时发布", "预约发布", "存草稿", "草稿", "取消", "返回", "预览", ] publish_processing_indicators = [ 'div:has-text("发布中")', 'div:has-text("提交中")', 'span:has-text("发布中")', 'span:has-text("提交中")', 'div:has-text("正在上传")', 'div:has-text("正在处理")', 'span:has-text("正在上传")', 'span:has-text("正在处理")', 'div:has-text("请稍候")', 'span:has-text("请稍候")', 'div:has-text("审核中")', 'span:has-text("审核中")', ] def _compact_btn_text(text: str) -> str: return re.sub(r"\s+", "", str(text or "")).strip() def _score_publish_button(btn_text_compact: str, prefer_confirm: bool = False) -> int: if not btn_text_compact: return -1 if any(k in btn_text_compact for k in publish_blocked_keywords): return -1 if "发布中" in btn_text_compact or "提交中" in btn_text_compact: return -1 score = -1 if "立即发布" in btn_text_compact: score = 130 elif btn_text_compact == "确认发布": score = 125 elif "确认发布" in btn_text_compact: score = 120 elif btn_text_compact == "发布": score = 115 elif "发布" in btn_text_compact: score = 100 elif "发表" in btn_text_compact: score = 80 elif "提交" in btn_text_compact: score = 70 if score < 0: return -1 if prefer_confirm and ("确认发布" in btn_text_compact or "立即发布" in btn_text_compact): score += 20 return score async def _collect_publish_candidates(prefer_confirm: bool = False): candidates = [] found_visible_button = False found_disabled_button = False for frame in self.page.frames: frame_url = frame.url or "about:blank" for selector in publish_selectors: try: btns = frame.locator(selector) btn_count = await btns.count() for idx in range(min(btn_count, 6)): btn = btns.nth(idx) if not await btn.is_visible(): continue found_visible_button = True btn_text = (await btn.text_content() or "").strip() btn_text_compact = _compact_btn_text(btn_text) disabled_attr = await btn.get_attribute('disabled') aria_disabled = (await btn.get_attribute('aria-disabled') or '').lower() cls = (await btn.get_attribute('class') or '').lower() is_disabled = bool(disabled_attr) or aria_disabled == 'true' or 'disabled' in cls if is_disabled: found_disabled_button = True continue score = _score_publish_button(btn_text_compact, prefer_confirm=prefer_confirm) if score < 0: continue candidates.append({ "btn": btn, "frame_url": frame_url, "selector": selector, "idx": idx, "text": btn_text, "score": score, }) except Exception: pass candidates.sort(key=lambda x: x.get("score", 0), reverse=True) return candidates, found_visible_button, found_disabled_button async def _click_publish_candidate(candidate: dict): btn = candidate.get("btn") if not btn: return False, "candidate-empty" frame_url = str(candidate.get("frame_url") or "about:blank") selector = str(candidate.get("selector") or "") idx = int(candidate.get("idx") or 0) btn_text = str(candidate.get("text") or "").strip() before_url = self.page.url try: try: await btn.scroll_into_view_if_needed(timeout=1500) except Exception: pass try: await btn.click(timeout=4000) except Exception: await btn.click(force=True, timeout=4000) await asyncio.sleep(0.6) after_url = self.page.url state_flags = [] if after_url != before_url: state_flags.append("url-changed") try: post_text = _compact_btn_text(await btn.text_content() or "") if any(k in post_text for k in ["发布中", "提交中", "处理中"]): state_flags.append("btn-processing") except Exception: pass try: for indicator in publish_processing_indicators: loc = self.page.locator(indicator).first if await loc.count() > 0 and await loc.is_visible(): state_flags.append("processing-indicator") break except Exception: pass state_desc = ",".join(state_flags) if state_flags else "no-immediate-signal" print(f"[{self.platform_name}] 点击发布按钮成功: frame={frame_url}, selector={selector}, idx={idx}, text={btn_text}, state={state_desc}") return True, "" except Exception as e: return False, str(e) publish_clicked = False publish_click_error = "" publish_clicked_text = "" click_deadline = asyncio.get_event_loop().time() + 180 last_publish_log = 0.0 while asyncio.get_event_loop().time() < click_deadline and not publish_clicked: candidates, found_visible_button, found_disabled_button = await _collect_publish_candidates(prefer_confirm=False) if candidates: for candidate in candidates[:6]: ok, err = await _click_publish_candidate(candidate) if ok: publish_clicked = True publish_clicked_text = str(candidate.get("text") or "").strip() break if err: publish_click_error = err if publish_clicked: break now_click = asyncio.get_event_loop().time() if now_click - last_publish_log >= 10: if found_visible_button and found_disabled_button: print(f"[{self.platform_name}] 发布按钮可见但不可点击,等待可用...") elif found_visible_button: print(f"[{self.platform_name}] 发布按钮可见,但点击失败,继续重试...") else: print(f"[{self.platform_name}] 尚未找到可见发布按钮,继续等待...") last_publish_log = now_click await asyncio.sleep(2) # 某些页面会二次弹出“确认发布/立即发布”,补一次优先确认点击 if publish_clicked: initial_text = _compact_btn_text(publish_clicked_text) if initial_text and initial_text != "立即发布": await asyncio.sleep(1) confirm_candidates, _, _ = await _collect_publish_candidates(prefer_confirm=True) for candidate in confirm_candidates[:4]: candidate_text = _compact_btn_text(str(candidate.get("text") or "")) if candidate_text == initial_text and ("确认发布" not in candidate_text and "立即发布" not in candidate_text): continue ok, err = await _click_publish_candidate(candidate) if ok: print(f"[{self.platform_name}] 检测到二次确认发布流程,已补点确认按钮: {candidate_text}") break if err: publish_click_error = err if not publish_clicked: screenshot_base64 = await self.capture_screenshot() return PublishResult( success=False, platform=self.platform_name, error=f"发布按钮未找到或不可点击(可能仍在处理/必填项未通过)。title={title_verified_value or desired_title}; err={publish_click_error or 'none'}", screenshot_base64=screenshot_base64, page_url=await self.get_page_url(), status='failed' ) self.report_progress(80, "等待发布完成...") # 记录点击发布前的 URL publish_page_url = self.page.url print(f"[{self.platform_name}] 发布前 URL: {publish_page_url}") # 等待发布完成(百家号审核/处理链路可能较慢,默认等待 15 分钟) publish_timeout = 900 start_time = asyncio.get_event_loop().time() last_url = publish_page_url republish_click_count = 0 republish_attempt_count = 0 last_republish_attempt_time = 0.0 republish_attempt_interval = 45 # 失败后至少间隔 45s 再尝试,避免刷屏和误操作 max_republish_attempts = 2 while asyncio.get_event_loop().time() - start_time < publish_timeout: await asyncio.sleep(3) current_url = self.page.url # 检测 URL 是否发生变化 if current_url != last_url: print(f"[{self.platform_name}] URL 变化: {last_url} -> {current_url}") last_url = current_url # 检查是否跳转到内容管理页面(真正的成功标志) # 百家号发布成功后会跳转到 /builder/rc/content 页面 if '/builder/rc/content' in current_url and 'edit' not in current_url: self.report_progress(100, "发布成功!") print(f"[{self.platform_name}] 发布成功,已跳转到内容管理页: {current_url}") screenshot_base64 = await self.capture_screenshot() return PublishResult( success=True, platform=self.platform_name, message="发布成功", screenshot_base64=screenshot_base64, page_url=current_url, status='success' ) # 检查是否有明确的成功提示弹窗 try: # 百家号发布成功会显示"发布成功"弹窗 success_modal = self.page.locator('div:has-text("发布成功"), div:has-text("提交成功"), div:has-text("视频发布成功")').first if await success_modal.count() > 0 and await success_modal.is_visible(): self.report_progress(100, "发布成功!") print(f"[{self.platform_name}] 检测到发布成功弹窗") screenshot_base64 = await self.capture_screenshot() # 等待一下看是否会跳转 await asyncio.sleep(3) return PublishResult( success=True, platform=self.platform_name, message="发布成功", screenshot_base64=screenshot_base64, page_url=self.page.url, status='success' ) except Exception as e: print(f"[{self.platform_name}] 检测成功提示异常: {e}") # 检查是否有错误提示 try: error_selectors = [ 'div.error-tip', 'div[class*="error-msg"]', 'span[class*="error"]', 'div:has-text("发布失败")', 'div:has-text("提交失败")', ] for error_selector in error_selectors: error_el = self.page.locator(error_selector).first if await error_el.count() > 0 and await error_el.is_visible(): error_text = await error_el.text_content() if error_text and error_text.strip(): print(f"[{self.platform_name}] 检测到错误: {error_text}") screenshot_base64 = await self.capture_screenshot() return PublishResult( success=False, platform=self.platform_name, error=f"发布失败: {error_text.strip()}", screenshot_base64=screenshot_base64, page_url=current_url, status='failed' ) except Exception as e: print(f"[{self.platform_name}] 检测错误提示异常: {e}") # 检查验证码 captcha_result = await self.check_captcha() if captcha_result['need_captcha']: screenshot_base64 = await self.capture_screenshot() return PublishResult( success=False, platform=self.platform_name, error=f"发布过程中需要{captcha_result['captcha_type']}验证码", need_captcha=True, captcha_type=captcha_result['captcha_type'], screenshot_base64=screenshot_base64, page_url=current_url, status='need_captcha' ) # 检查发布按钮状态(如果还在编辑页面) if 'edit' in current_url: try: is_processing = False for indicator in publish_processing_indicators: loc = self.page.locator(indicator).first if await loc.count() > 0 and await loc.is_visible(): is_processing = True print(f"[{self.platform_name}] 正在处理中...") break if not is_processing: # 如果不是在处理中,按节流策略尝试重新点击发布按钮 now_loop = asyncio.get_event_loop().time() elapsed = now_loop - start_time if ( elapsed > 60 and republish_attempt_count < max_republish_attempts and (now_loop - last_republish_attempt_time) >= republish_attempt_interval ): last_republish_attempt_time = now_loop republish_attempt_count += 1 print(f"[{self.platform_name}] 发布状态未变化,执行第 {republish_attempt_count}/{max_republish_attempts} 次补点发布...") republish_done = False republish_candidates, _, _ = await _collect_publish_candidates(prefer_confirm=True) for candidate in republish_candidates[:6]: ok, err = await _click_publish_candidate(candidate) if ok: republish_done = True republish_click_count += 1 candidate_text = _compact_btn_text(str(candidate.get("text") or "")) print(f"[{self.platform_name}] 重新点击发布按钮成功: text={candidate_text}, count={republish_click_count}") break if err: publish_click_error = err if not republish_done: print(f"[{self.platform_name}] 本轮未找到可用的立即发布按钮,继续等待状态变化") except Exception as e: print(f"[{self.platform_name}] 检查处理状态异常: {e}") # 超时,获取截图分析最终状态 print(f"[{self.platform_name}] 发布超时,最终 URL: {self.page.url}") screenshot_base64 = await self.capture_screenshot() # 最后一次检查是否在内容管理页 final_url = self.page.url if '/builder/rc/content' in final_url and 'edit' not in final_url: return PublishResult( success=True, platform=self.platform_name, message="发布成功(延迟确认)", screenshot_base64=screenshot_base64, page_url=final_url, status='success' ) # 超时后兜底:跳转内容管理页按标题校验,避免“已发布但未跳转”误判失败 print(f"[{self.platform_name}] 超时后执行内容页二次校验,title={params.title}") verify_deadline = asyncio.get_event_loop().time() + 120 # 最多再校验 2 分钟 while asyncio.get_event_loop().time() < verify_deadline: if await self._verify_publish_from_content_page(params.title, page_size=20): screenshot_base64 = await self.capture_screenshot() return PublishResult( success=True, platform=self.platform_name, message="发布成功(内容页校验)", screenshot_base64=screenshot_base64, page_url=self.page.url, status='success' ) await asyncio.sleep(8) return PublishResult( success=False, platform=self.platform_name, error="发布超时,请手动检查发布状态", screenshot_base64=screenshot_base64, page_url=final_url, status='need_action' ) async def get_works(self, cookies: str, page: int = 0, page_size: int = 20) -> WorksResult: """ 获取百家号作品列表 优先使用内容管理页的接口(pcui/article/lists)。 说明: - 该接口通常需要自定义请求头 token(JWT),仅靠 Cookie 可能会返回“未登录” - 这里使用 Playwright 打开内容页,从 localStorage/sessionStorage/页面脚本中自动提取 token, 再在页面上下文中发起 fetch(携带 cookie + token),以提高成功率 """ import re print(f"\n{'='*60}") print(f"[{self.platform_name}] 获取作品列表 (使用 API)") print(f"[{self.platform_name}] page={page}, page_size={page_size}") print(f"{'='*60}") works: List[WorkItem] = [] total = 0 has_more = False next_page = "" try: # 解析并设置 cookies(Playwright) cookie_list = self.parse_cookies(cookies) await self.init_browser() await self.set_cookies(cookie_list) if not self.page: raise Exception("Page not initialized") # 先打开内容管理页,确保本页 Referer/会话就绪 # Node 侧传 page=0,1,...;接口 currentPage 为 1,2,... current_page = int(page) + 1 page_size = int(page_size) content_url = ( "https://baijiahao.baidu.com/builder/rc/content" f"?currentPage={current_page}&pageSize={page_size}" "&search=&type=&collection=&startDate=&endDate=" ) await self.page.goto(content_url, wait_until="domcontentloaded", timeout=60000) await asyncio.sleep(2) # 1) 提取 token(JWT) token = await self.page.evaluate( """ () => { const isJwtLike = (v) => { if (!v || typeof v !== 'string') return false; const s = v.trim(); if (s.length < 60) return false; const parts = s.split('.'); if (parts.length !== 3) return false; return parts.every(p => /^[A-Za-z0-9_-]+$/.test(p) && p.length > 10); }; const pickFromStorage = (storage) => { try { const keys = Object.keys(storage || {}); for (const k of keys) { const v = storage.getItem(k); if (isJwtLike(v)) return v; } } catch {} return ""; }; // localStorage / sessionStorage let t = pickFromStorage(window.localStorage); if (t) return t; t = pickFromStorage(window.sessionStorage); if (t) return t; // meta 标签 const meta = document.querySelector('meta[name="token"], meta[name="bjh-token"]'); const metaToken = meta && meta.getAttribute('content'); if (isJwtLike(metaToken)) return metaToken; // 简单从全局变量里找 const candidates = [ (window.__INITIAL_STATE__ && window.__INITIAL_STATE__.token) || "", (window.__PRELOADED_STATE__ && window.__PRELOADED_STATE__.token) || "", (window.__NUXT__ && window.__NUXT__.state && window.__NUXT__.state.token) || "", ]; for (const c of candidates) { if (isJwtLike(c)) return c; } return ""; } """ ) # 2) 若仍未取到 token,再从页面 HTML 兜底提取 if not token: html = await self.page.content() m = re.search(r'([A-Za-z0-9_-]{10,}\.[A-Za-z0-9_-]{10,}\.[A-Za-z0-9_-]{10,})', html) if m: token = m.group(1) if not token: raise Exception("未能从页面提取 token(可能未登录或触发风控),请重新登录百家号账号后再试") # 3) 调用接口(在页面上下文 fetch,自动携带 cookie) api_url = ( "https://baijiahao.baidu.com/pcui/article/lists" f"?currentPage={current_page}" f"&pageSize={page_size}" "&search=&type=&collection=&startDate=&endDate=" "&clearBeforeFetch=false" "&dynamic=1" ) resp = await self.page.evaluate( """ async ({ url, token }) => { const r = await fetch(url, { method: 'GET', credentials: 'include', headers: { 'accept': 'application/json, text/plain, */*', ...(token ? { token } : {}), }, }); const text = await r.text(); return { ok: r.ok, status: r.status, text }; } """, {"url": api_url, "token": token}, ) if not resp or not resp.get("ok"): status = resp.get("status") if isinstance(resp, dict) else "unknown" raise Exception(f"百家号接口请求失败: HTTP {status}") api_result = json.loads(resp.get("text") or "{}") print(f"[{self.platform_name}] pcui/article/lists 响应: errno={api_result.get('errno')}, errmsg={api_result.get('errmsg')}") if api_result.get("errno") != 0: errno = api_result.get("errno") errmsg = api_result.get("errmsg", "unknown error") # 20040001 常见为“未登录” if errno in (110, 20040001): raise Exception("百家号未登录或 Cookie/token 失效,请重新登录后再同步") raise Exception(f"百家号接口错误: errno={errno}, errmsg={errmsg}") data = api_result.get("data", {}) or {} items = data.get("list", []) or [] page_info = data.get("page", {}) or {} total = int(page_info.get("totalCount", 0) or 0) total_page = int(page_info.get("totalPage", 0) or 0) cur_page = int(page_info.get("currentPage", current_page) or current_page) has_more = bool(total_page and cur_page < total_page) next_page = cur_page + 1 if has_more else "" print(f"[{self.platform_name}] 获取到 {len(items)} 个作品,总数: {total}, currentPage={cur_page}, totalPage={total_page}") def _pick_cover(item: dict) -> str: cover = item.get("crosswise_cover") or item.get("vertical_cover") or "" if cover: return cover raw = item.get("cover_images") or "" try: # cover_images 可能是 JSON 字符串 parsed = json.loads(raw) if isinstance(raw, str) else raw if isinstance(parsed, list) and parsed: first = parsed[0] if isinstance(first, dict): return first.get("src") or first.get("ori_src") or "" if isinstance(first, str): return first except Exception: pass return "" def _pick_duration(item: dict) -> int: for k in ("rmb_duration", "duration", "long"): try: v = int(item.get(k) or 0) if v > 0: return v except Exception: pass # displaytype_exinfo 里可能有 ugcvideo.video_info.durationInSecond ex = item.get("displaytype_exinfo") or "" try: exj = json.loads(ex) if isinstance(ex, str) and ex else (ex if isinstance(ex, dict) else {}) ugc = (exj.get("ugcvideo") or {}) if isinstance(exj, dict) else {} vi = ugc.get("video_info") or {} v = int(vi.get("durationInSecond") or ugc.get("long") or 0) return v if v > 0 else 0 except Exception: return 0 def _pick_status(item: dict) -> str: qs = str(item.get("quality_status") or "").lower() st = str(item.get("status") or "").lower() if qs == "rejected" or "reject" in st: return "rejected" if st in ("draft", "unpublish", "unpublished"): return "draft" # 百家号常见 publish return "published" for item in items: # 优先使用 nid(builder 预览链接使用这个) work_id = str(item.get("nid") or item.get("feed_id") or item.get("article_id") or item.get("id") or "") if not work_id: continue works.append( WorkItem( work_id=work_id, title=str(item.get("title") or ""), cover_url=_pick_cover(item), video_url=str(item.get("url") or ""), duration=_pick_duration(item), status=_pick_status(item), publish_time=str(item.get("publish_time") or item.get("publish_at") or item.get("created_at") or ""), play_count=int(item.get("read_amount") or 0), like_count=int(item.get("like_amount") or 0), comment_count=int(item.get("comment_amount") or 0), share_count=int(item.get("share_amount") or 0), collect_count=int(item.get("collection_amount") or 0), ) ) print(f"[{self.platform_name}] ✓ 成功解析 {len(works)} 个作品") except Exception as e: import traceback traceback.print_exc() return WorksResult( success=False, platform=self.platform_name, error=str(e), debug_info="baijiahao_get_works_failed" ) return WorksResult( success=True, platform=self.platform_name, works=works, total=total, has_more=has_more, next_page=next_page ) async def get_article_stats( self, cookies: str, start_day: str, end_day: str, stat_type: str, num: int, count: int, ) -> dict: """ 调用百家号 /author/eco/statistics/articleListStatistic 接口(不依赖浏览器 token),用于作品列表维度的每日数据。 """ import aiohttp print(f"[{self.platform_name}] get_article_stats: {start_day}-{end_day}, type={stat_type}, num={num}, count={count}") # 解析 cookies cookie_list = self.parse_cookies(cookies) cookie_dict = {c['name']: c['value'] for c in cookie_list} session_headers = { 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7', 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36', 'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8', 'Accept-Encoding': 'gzip, deflate, br', 'Connection': 'keep-alive', 'Upgrade-Insecure-Requests': '1', } headers = { 'Accept': 'application/json, text/plain, */*', 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36', 'Referer': 'https://baijiahao.baidu.com/builder/rc/analysiscontent/single', 'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8', 'Accept-Encoding': 'gzip, deflate, br', 'Connection': 'keep-alive', } async with aiohttp.ClientSession(cookies=cookie_dict) as session: # 0) 先访问 single 页面建立会话上下文(与 Node 端 UI 打开的页面一致) try: await session.get( 'https://baijiahao.baidu.com/builder/rc/analysiscontent/single', headers=session_headers, timeout=aiohttp.ClientTimeout(total=20), ) except Exception as e: print(f"[{self.platform_name}] warmup single page failed (non-fatal): {e}") # 1) 调用 articleListStatistic api_url = ( "https://baijiahao.baidu.com/author/eco/statistics/articleListStatistic" f"?start_day={start_day}&end_day={end_day}&type={stat_type}&num={num}&count={count}" ) async with session.get( api_url, headers=headers, timeout=aiohttp.ClientTimeout(total=30), ) as resp: status = resp.status try: data = await resp.json() except Exception: text = await resp.text() print(f"[{self.platform_name}] articleListStatistic non-JSON response: {text[:1000]}") raise errno = data.get('errno') errmsg = data.get('errmsg') print(f"[{self.platform_name}] articleListStatistic: http={status}, errno={errno}, msg={errmsg}") return { "success": status == 200 and errno == 0, "status": status, "errno": errno, "errmsg": errmsg, "data": data.get('data') if isinstance(data, dict) else None, } async def get_trend_data( self, cookies: str, nid: str, ) -> dict: """ 调用百家号 /author/eco/statistic/gettrenddata 接口,获取单作品的按日统计数据(basic_list)。 """ import aiohttp print(f"[{self.platform_name}] get_trend_data: nid={nid}") cookie_list = self.parse_cookies(cookies) cookie_dict = {c['name']: c['value'] for c in cookie_list} session_headers = { 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7', 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36', 'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8', 'Accept-Encoding': 'gzip, deflate, br', 'Connection': 'keep-alive', 'Upgrade-Insecure-Requests': '1', } headers = { 'Accept': 'application/json, text/plain, */*', 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36', 'Referer': 'https://baijiahao.baidu.com/builder/rc/analysiscontent/single', 'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8', 'Accept-Encoding': 'gzip, deflate, br', 'Connection': 'keep-alive', } async with aiohttp.ClientSession(cookies=cookie_dict) as session: # 0) warmup try: await session.get( 'https://baijiahao.baidu.com/builder/rc/analysiscontent/single', headers=session_headers, timeout=aiohttp.ClientTimeout(total=20), ) except Exception as e: print(f"[{self.platform_name}] warmup single page (trend) failed (non-fatal): {e}") api_url = ( "https://baijiahao.baidu.com/author/eco/statistic/gettrenddata" f"?nid={nid}&trend_type=all&data_type=addition" ) async with session.get( api_url, headers=headers, timeout=aiohttp.ClientTimeout(total=30), ) as resp: status = resp.status try: data = await resp.json() except Exception: text = await resp.text() print(f"[{self.platform_name}] gettrenddata non-JSON response: {text[:1000]}") raise errno = data.get('errno') errmsg = data.get('errmsg') print(f"[{self.platform_name}] gettrenddata: http={status}, errno={errno}, msg={errmsg}") return { "success": status == 200 and errno == 0, "status": status, "errno": errno, "errmsg": errmsg, "data": data.get('data') if isinstance(data, dict) else None, } async def check_login_status(self, cookies: str) -> dict: """ 检查百家号 Cookie 登录状态 现在与其他平台保持一致,直接复用 BasePublisher 的浏览器检测逻辑: - 使用 Playwright 打开后台页面 - 根据是否跳转到登录页 / 是否出现登录弹窗或风控提示,判断登录是否有效 """ print(f"[{self.platform_name}] 检查登录状态 (使用通用浏览器逻辑)") # 直接调用父类的实现,保持与抖音/小红书/视频号一致 return await super().check_login_status(cookies) async def get_comments(self, cookies: str, work_id: str, cursor: str = "") -> CommentsResult: """获取百家号作品评论""" # TODO: 实现评论获取逻辑 return CommentsResult( success=False, platform=self.platform_name, work_id=work_id, error="百家号评论功能暂未实现" )