# -*- coding: utf-8 -*- """ 百家号视频发布器 """ import asyncio import json from typing import List from datetime import datetime from .base import ( BasePublisher, PublishParams, PublishResult, WorkItem, WorksResult, CommentItem, CommentsResult ) class BaijiahaoPublisher(BasePublisher): """ 百家号视频发布器 使用 Playwright 自动化操作百家号创作者中心 """ platform_name = "baijiahao" login_url = "https://baijiahao.baidu.com/" publish_url = "https://baijiahao.baidu.com/builder/rc/edit?type=video" cookie_domain = ".baidu.com" # 登录检测配置 login_check_url = "https://baijiahao.baidu.com/builder/rc/home" login_indicators = ["passport.baidu.com", "/login", "wappass.baidu.com"] login_selectors = ['text="登录"', 'text="请登录"', '[class*="login-btn"]'] async def get_account_info(self, cookies: str) -> dict: """ 获取百家号账号信息 使用直接 HTTP API 调用,不使用浏览器 """ import aiohttp print(f"\n{'='*60}") print(f"[{self.platform_name}] 获取账号信息 (使用 API)") print(f"{'='*60}") try: # 解析 cookies cookie_list = self.parse_cookies(cookies) cookie_dict = {c['name']: c['value'] for c in cookie_list} # 重要:百家号需要先访问主页建立会话上下文 print(f"[{self.platform_name}] 第一步:访问主页建立会话...") session_headers = { 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7', 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36', # Cookie 由 session 管理,不手动设置 'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8', 'Accept-Encoding': 'gzip, deflate, br', 'Connection': 'keep-alive', 'Upgrade-Insecure-Requests': '1', 'Sec-Fetch-Dest': 'document', 'Sec-Fetch-Mode': 'navigate', 'Sec-Fetch-Site': 'none', 'Sec-Fetch-User': '?1', 'sec-ch-ua': '"Not_A Brand";v="8", "Chromium";v="120", "Google Chrome";v="120"', 'sec-ch-ua-mobile': '?0', 'sec-ch-ua-platform': '"Windows"' } headers = { 'Accept': 'application/json, text/plain, */*', 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36', # Cookie 由 session 管理,不手动设置 'Referer': 'https://baijiahao.baidu.com/builder/rc/home', 'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8', 'Accept-Encoding': 'gzip, deflate, br', 'Connection': 'keep-alive', 'Sec-Fetch-Dest': 'empty', 'Sec-Fetch-Mode': 'cors', 'Sec-Fetch-Site': 'same-origin', 'sec-ch-ua': '"Not_A Brand";v="8", "Chromium";v="120", "Google Chrome";v="120"', 'sec-ch-ua-mobile': '?0', 'sec-ch-ua-platform': '"Windows"' } # 使用 cookies 参数初始化 session,让 aiohttp 自动管理 cookie 更新 async with aiohttp.ClientSession(cookies=cookie_dict) as session: # 步骤 0: 先访问主页建立会话上下文(关键步骤!) print(f"[{self.platform_name}] [0/4] 访问主页建立会话上下文...") async with session.get( 'https://baijiahao.baidu.com/builder/rc/home', headers=session_headers, timeout=aiohttp.ClientTimeout(total=30) ) as home_response: home_status = home_response.status print(f"[{self.platform_name}] 主页访问状态: {home_status}") # 获取响应头中的新cookies(如果有) if 'Set-Cookie' in home_response.headers: new_cookies = home_response.headers['Set-Cookie'] print(f"[{self.platform_name}] 获取到新的会话Cookie") # 这里可以处理新的cookies,但暂时跳过复杂处理 # 短暂等待确保会话建立 await asyncio.sleep(1) # 步骤 1: 获取账号基本信息 print(f"[{self.platform_name}] [1/4] 调用 appinfo API...") async with session.get( 'https://baijiahao.baidu.com/builder/app/appinfo', headers=headers, timeout=aiohttp.ClientTimeout(total=30) ) as response: appinfo_result = await response.json() print(f"[{self.platform_name}] appinfo API 完整响应: {json.dumps(appinfo_result, ensure_ascii=False)[:500]}") print(f"[{self.platform_name}] appinfo API 响应: errno={appinfo_result.get('errno')}") # 检查登录状态 if appinfo_result.get('errno') != 0: error_msg = appinfo_result.get('errmsg', '未知错误') errno = appinfo_result.get('errno') print(f"[{self.platform_name}] API 返回错误: errno={errno}, msg={error_msg}") # errno 110 表示未登录 if errno == 110: return { "success": False, "error": "Cookie 已失效,需要重新登录", "need_login": True } # errno 10001402 表示分散认证问题,尝试重新访问主页后重试 if errno == 10001402: print(f"[{self.platform_name}] 检测到分散认证问题,尝试重新访问主页...") await asyncio.sleep(2) # 重新访问主页 async with session.get( 'https://baijiahao.baidu.com/builder/rc/home', headers=session_headers, timeout=aiohttp.ClientTimeout(total=30) ) as retry_home_response: print(f"[{self.platform_name}] 重新访问主页状态: {retry_home_response.status}") await asyncio.sleep(1) # 重试 API 调用 async with session.get( 'https://baijiahao.baidu.com/builder/app/appinfo', headers=headers, timeout=aiohttp.ClientTimeout(total=30) ) as retry_response: retry_result = await retry_response.json() if retry_result.get('errno') == 0: print(f"[{self.platform_name}] 分散认证问题已解决") # 使用重试成功的结果继续处理 appinfo_result = retry_result else: print(f"[{self.platform_name}] 重试仍然失败") return { "success": False, "error": f"分散认证问题: {error_msg}", "need_login": True } return { "success": False, "error": error_msg, "need_login": True } # 获取用户数据 user_data = appinfo_result.get('data', {}).get('user', {}) if not user_data: return { "success": False, "error": "无法获取用户信息", "need_login": True } # 检查账号状态 status = user_data.get('status', '') # 有效的账号状态:audit(审核中), pass(已通过), normal(正常), newbie(新手) valid_statuses = ['audit', 'pass', 'normal', 'newbie'] if status not in valid_statuses: print(f"[{self.platform_name}] 账号状态异常: {status}") # 提取基本信息 account_name = user_data.get('name') or user_data.get('uname') or '百家号账号' app_id = user_data.get('app_id') or user_data.get('id', 0) account_id = str(app_id) if app_id else f"baijiahao_{int(datetime.now().timestamp() * 1000)}" # 处理头像 URL avatar_url = user_data.get('avatar') or user_data.get('avatar_unify', '') if avatar_url and avatar_url.startswith('//'): avatar_url = 'https:' + avatar_url print(f"[{self.platform_name}] 账号名称: {account_name}, ID: {account_id}") # 步骤 2: 获取粉丝数(非关键,失败不影响整体) fans_count = 0 try: print(f"[{self.platform_name}] [2/3] 调用 growth/get_info API 获取粉丝数...") async with session.get( 'https://baijiahao.baidu.com/cms-ui/rights/growth/get_info', headers=headers, timeout=aiohttp.ClientTimeout(total=10) ) as response: growth_result = await response.json() if growth_result.get('errno') == 0: growth_data = growth_result.get('data', {}) fans_count = int(growth_data.get('fans_num', 0)) print(f"[{self.platform_name}] 粉丝数: {fans_count}") else: print(f"[{self.platform_name}] 获取粉丝数失败: {growth_result.get('errmsg')}") except Exception as e: print(f"[{self.platform_name}] 获取粉丝数异常(非关键): {e}") # 步骤 3: 获取作品数量(使用与 Node 端一致的 API) works_count = 0 try: print(f"[{self.platform_name}] [3/3] 调用 article/lists API 获取作品数...") # 使用与 Node 端一致的 API 参数 list_url = 'https://baijiahao.baidu.com/pcui/article/lists?currentPage=1&pageSize=20&search=&type=&collection=&startDate=&endDate=&clearBeforeFetch=false&dynamic=0' async with session.get( list_url, headers={ 'accept': '*/*', 'user-agent': 'PostmanRuntime/7.51.0', # cookie 由 session 管理 'referer': 'https://baijiahao.baidu.com/builder/rc/content', 'connection': 'keep-alive', 'accept-encoding': 'gzip, deflate, br', }, timeout=aiohttp.ClientTimeout(total=30) ) as response: response_text = await response.text() print(f"[{self.platform_name}] ========== Works API Response ==========") print(f"[{self.platform_name}] Full response: {response_text[:1000]}...") # 只打印前1000字符 print(f"[{self.platform_name}] =========================================") works_result = json.loads(response_text) # 处理分散认证问题 (errno=10001402),重试一次 if works_result.get('errno') == 10001402: print(f"[{self.platform_name}] 分散认证问题 (errno=10001402),3秒后重试...") await asyncio.sleep(3) # 重试一次,使用更完整的请求头 retry_headers = headers.copy() retry_headers.update({ 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7', 'Cache-Control': 'max-age=0', 'Upgrade-Insecure-Requests': '1', }) async with session.get( list_url, headers=retry_headers, timeout=aiohttp.ClientTimeout(total=30) ) as retry_response: retry_text = await retry_response.text() print(f"[{self.platform_name}] ========== Works API Retry Response ==========") print(f"[{self.platform_name}] Full retry response: {retry_text[:1000]}...") print(f"[{self.platform_name}] ===============================================") works_result = json.loads(retry_text) if works_result.get('errno') == 10001402: print(f"[{self.platform_name}] 重试仍然失败,返回已获取的账号信息") works_result = None if works_result and works_result.get('errno') == 0: works_data = works_result.get('data', {}) # 优先使用 data.page.totalCount,如果没有则使用 data.total(兼容旧格式) page_info = works_data.get('page', {}) works_count = int(page_info.get('totalCount', works_data.get('total', 0))) print(f"[{self.platform_name}] 作品数: {works_count} (from page.totalCount: {page_info.get('totalCount')}, from total: {works_data.get('total')})") else: errno = works_result.get('errno') if works_result else 'unknown' errmsg = works_result.get('errmsg', 'unknown error') if works_result else 'no response' print(f"[{self.platform_name}] 获取作品数失败: errno={errno}, errmsg={errmsg}") except Exception as e: import traceback print(f"[{self.platform_name}] 获取作品数异常(非关键): {e}") traceback.print_exc() # 返回账号信息 account_info = { "success": True, "account_id": account_id, "account_name": account_name, "avatar_url": avatar_url, "fans_count": fans_count, "works_count": works_count, } print(f"[{self.platform_name}] ✓ 获取成功: {account_name} (粉丝: {fans_count}, 作品: {works_count})") return account_info except Exception as e: import traceback traceback.print_exc() return { "success": False, "error": str(e) } async def check_captcha(self) -> dict: """检查页面是否需要验证码""" if not self.page: return {'need_captcha': False, 'captcha_type': ''} try: # 检查各种验证码 captcha_selectors = [ 'text="请输入验证码"', 'text="滑动验证"', '[class*="captcha"]', '[class*="verify"]', ] for selector in captcha_selectors: try: if await self.page.locator(selector).count() > 0: print(f"[{self.platform_name}] 检测到验证码: {selector}") return {'need_captcha': True, 'captcha_type': 'image'} except: pass # 检查登录弹窗 login_selectors = [ 'text="请登录"', 'text="登录后继续"', '[class*="login-dialog"]', ] for selector in login_selectors: try: if await self.page.locator(selector).count() > 0: print(f"[{self.platform_name}] 检测到需要登录: {selector}") return {'need_captcha': True, 'captcha_type': 'login'} except: pass except Exception as e: print(f"[{self.platform_name}] 验证码检测异常: {e}") return {'need_captcha': False, 'captcha_type': ''} async def publish(self, cookies: str, params: PublishParams) -> PublishResult: """发布视频到百家号""" import os print(f"\n{'='*60}") print(f"[{self.platform_name}] 开始发布视频") print(f"[{self.platform_name}] 视频路径: {params.video_path}") print(f"[{self.platform_name}] 标题: {params.title}") print(f"[{self.platform_name}] Headless: {self.headless}") print(f"{'='*60}") self.report_progress(5, "正在初始化浏览器...") # 初始化浏览器 await self.init_browser() print(f"[{self.platform_name}] 浏览器初始化完成") # 解析并设置 cookies cookie_list = self.parse_cookies(cookies) print(f"[{self.platform_name}] 解析到 {len(cookie_list)} 个 cookies") await self.set_cookies(cookie_list) if not self.page: raise Exception("Page not initialized") # 检查视频文件 if not os.path.exists(params.video_path): raise Exception(f"视频文件不存在: {params.video_path}") print(f"[{self.platform_name}] 视频文件存在,大小: {os.path.getsize(params.video_path)} bytes") self.report_progress(10, "正在打开上传页面...") # 访问视频发布页面(使用新视频发布界面) video_publish_url = "https://baijiahao.baidu.com/builder/rc/edit?type=videoV2&is_from_cms=1" await self.page.goto(video_publish_url, wait_until="domcontentloaded", timeout=60000) await asyncio.sleep(3) # 检查是否跳转到登录页 current_url = self.page.url print(f"[{self.platform_name}] 当前页面: {current_url}") for indicator in self.login_indicators: if indicator in current_url: screenshot_base64 = await self.capture_screenshot() return PublishResult( success=False, platform=self.platform_name, error="Cookie 已过期,需要重新登录", need_captcha=True, captcha_type='login', screenshot_base64=screenshot_base64, page_url=current_url, status='need_captcha' ) # 使用 AI 检查验证码 ai_captcha = await self.ai_check_captcha() if ai_captcha['has_captcha']: print(f"[{self.platform_name}] AI检测到验证码: {ai_captcha['captcha_type']}", flush=True) screenshot_base64 = await self.capture_screenshot() return PublishResult( success=False, platform=self.platform_name, error=f"检测到{ai_captcha['captcha_type']}验证码,需要使用有头浏览器完成验证", need_captcha=True, captcha_type=ai_captcha['captcha_type'], screenshot_base64=screenshot_base64, page_url=current_url, status='need_captcha' ) # 传统方式检查验证码 captcha_result = await self.check_captcha() if captcha_result['need_captcha']: screenshot_base64 = await self.capture_screenshot() return PublishResult( success=False, platform=self.platform_name, error=f"需要{captcha_result['captcha_type']}验证码,请使用有头浏览器完成验证", need_captcha=True, captcha_type=captcha_result['captcha_type'], screenshot_base64=screenshot_base64, page_url=current_url, status='need_captcha' ) self.report_progress(15, "正在选择视频文件...") # 等待页面加载完成 await asyncio.sleep(2) # 关闭可能的弹窗 try: close_buttons = [ 'button:has-text("我知道了")', 'button:has-text("知道了")', '[class*="close"]', '[class*="modal-close"]', ] for btn_selector in close_buttons: try: btn = self.page.locator(btn_selector).first if await btn.count() > 0 and await btn.is_visible(): await btn.click() await asyncio.sleep(0.5) except: pass except: pass # 上传视频 - 尝试多种方式 upload_success = False # 方法1: 直接通过 file input 上传 try: file_inputs = await self.page.query_selector_all('input[type="file"]') print(f"[{self.platform_name}] 找到 {len(file_inputs)} 个文件输入") for file_input in file_inputs: try: await file_input.set_input_files(params.video_path) upload_success = True print(f"[{self.platform_name}] 通过 file input 上传成功") break except Exception as e: print(f"[{self.platform_name}] file input 上传失败: {e}") except Exception as e: print(f"[{self.platform_name}] 查找 file input 失败: {e}") # 方法2: 点击上传区域 if not upload_success: upload_selectors = [ 'div[class*="upload-box"]', 'div[class*="drag-upload"]', 'div[class*="uploader"]', 'div:has-text("点击上传")', 'div:has-text("选择文件")', '[class*="upload-area"]', ] for selector in upload_selectors: if upload_success: break try: upload_area = self.page.locator(selector).first if await upload_area.count() > 0: print(f"[{self.platform_name}] 尝试点击上传区域: {selector}") async with self.page.expect_file_chooser(timeout=10000) as fc_info: await upload_area.click() file_chooser = await fc_info.value await file_chooser.set_files(params.video_path) upload_success = True print(f"[{self.platform_name}] 通过点击上传区域成功") break except Exception as e: print(f"[{self.platform_name}] 选择器 {selector} 失败: {e}") if not upload_success: screenshot_base64 = await self.capture_screenshot() return PublishResult( success=False, platform=self.platform_name, error="未找到上传入口", screenshot_base64=screenshot_base64, page_url=await self.get_page_url(), status='failed' ) self.report_progress(20, "等待视频上传...") # 等待视频上传完成(最多5分钟) upload_timeout = 300 start_time = asyncio.get_event_loop().time() while asyncio.get_event_loop().time() - start_time < upload_timeout: # 检查上传进度 progress_text = '' try: progress_el = self.page.locator('[class*="progress"], [class*="percent"]').first if await progress_el.count() > 0: progress_text = await progress_el.text_content() if progress_text: import re match = re.search(r'(\d+)%', progress_text) if match: pct = int(match.group(1)) self.report_progress(20 + int(pct * 0.4), f"视频上传中 {pct}%...") if pct >= 100: print(f"[{self.platform_name}] 上传完成") break except: pass # 检查是否出现标题输入框(说明上传完成) try: title_input = self.page.locator('input[placeholder*="标题"], textarea[placeholder*="标题"], [class*="title-input"] input').first if await title_input.count() > 0 and await title_input.is_visible(): print(f"[{self.platform_name}] 检测到标题输入框,上传完成") break except: pass # 检查是否有错误提示 try: error_el = self.page.locator('[class*="error"], [class*="fail"]').first if await error_el.count() > 0: error_text = await error_el.text_content() if error_text and ('失败' in error_text or '错误' in error_text): raise Exception(f"上传失败: {error_text}") except: pass await asyncio.sleep(3) self.report_progress(60, "正在填写标题...") await asyncio.sleep(2) # 填写标题 title_filled = False title_selectors = [ 'input[placeholder*="标题"]', 'textarea[placeholder*="标题"]', '[class*="title-input"] input', '[class*="title"] input', 'input[maxlength]', ] for selector in title_selectors: if title_filled: break try: title_input = self.page.locator(selector).first if await title_input.count() > 0 and await title_input.is_visible(): await title_input.click() await self.page.keyboard.press("Control+KeyA") await self.page.keyboard.type(params.title[:30]) # 百家号标题限制30字 title_filled = True print(f"[{self.platform_name}] 标题填写成功") except Exception as e: print(f"[{self.platform_name}] 标题选择器 {selector} 失败: {e}") if not title_filled: print(f"[{self.platform_name}] 警告: 未能填写标题") # 填写描述 if params.description: self.report_progress(65, "正在填写描述...") try: desc_selectors = [ 'textarea[placeholder*="描述"]', 'textarea[placeholder*="简介"]', '[class*="desc"] textarea', '[class*="description"] textarea', ] for selector in desc_selectors: try: desc_input = self.page.locator(selector).first if await desc_input.count() > 0 and await desc_input.is_visible(): await desc_input.click() await self.page.keyboard.type(params.description[:200]) print(f"[{self.platform_name}] 描述填写成功") break except: pass except Exception as e: print(f"[{self.platform_name}] 描述填写失败: {e}") self.report_progress(70, "正在发布...") await asyncio.sleep(2) # 点击发布按钮 publish_selectors = [ 'button:has-text("发布")', 'button:has-text("发表")', 'button:has-text("提交")', '[class*="publish"] button', '[class*="submit"] button', ] publish_clicked = False for selector in publish_selectors: if publish_clicked: break try: btn = self.page.locator(selector).first if await btn.count() > 0 and await btn.is_visible(): # 检查按钮是否可用 is_disabled = await btn.get_attribute('disabled') if is_disabled: print(f"[{self.platform_name}] 按钮 {selector} 被禁用") continue await btn.click() publish_clicked = True print(f"[{self.platform_name}] 点击发布按钮成功") except Exception as e: print(f"[{self.platform_name}] 发布按钮 {selector} 失败: {e}") if not publish_clicked: screenshot_base64 = await self.capture_screenshot() return PublishResult( success=False, platform=self.platform_name, error="未找到发布按钮", screenshot_base64=screenshot_base64, page_url=await self.get_page_url(), status='failed' ) self.report_progress(80, "等待发布完成...") # 记录点击发布前的 URL publish_page_url = self.page.url print(f"[{self.platform_name}] 发布前 URL: {publish_page_url}") # 等待发布完成(最多3分钟) publish_timeout = 180 start_time = asyncio.get_event_loop().time() last_url = publish_page_url while asyncio.get_event_loop().time() - start_time < publish_timeout: await asyncio.sleep(3) current_url = self.page.url # 检测 URL 是否发生变化 if current_url != last_url: print(f"[{self.platform_name}] URL 变化: {last_url} -> {current_url}") last_url = current_url # 检查是否跳转到内容管理页面(真正的成功标志) # 百家号发布成功后会跳转到 /builder/rc/content 页面 if '/builder/rc/content' in current_url and 'edit' not in current_url: self.report_progress(100, "发布成功!") print(f"[{self.platform_name}] 发布成功,已跳转到内容管理页: {current_url}") screenshot_base64 = await self.capture_screenshot() return PublishResult( success=True, platform=self.platform_name, message="发布成功", screenshot_base64=screenshot_base64, page_url=current_url, status='success' ) # 检查是否有明确的成功提示弹窗 try: # 百家号发布成功会显示"发布成功"弹窗 success_modal = self.page.locator('div:has-text("发布成功"), div:has-text("提交成功"), div:has-text("视频发布成功")').first if await success_modal.count() > 0 and await success_modal.is_visible(): self.report_progress(100, "发布成功!") print(f"[{self.platform_name}] 检测到发布成功弹窗") screenshot_base64 = await self.capture_screenshot() # 等待一下看是否会跳转 await asyncio.sleep(3) return PublishResult( success=True, platform=self.platform_name, message="发布成功", screenshot_base64=screenshot_base64, page_url=self.page.url, status='success' ) except Exception as e: print(f"[{self.platform_name}] 检测成功提示异常: {e}") # 检查是否有错误提示 try: error_selectors = [ 'div.error-tip', 'div[class*="error-msg"]', 'span[class*="error"]', 'div:has-text("发布失败")', 'div:has-text("提交失败")', ] for error_selector in error_selectors: error_el = self.page.locator(error_selector).first if await error_el.count() > 0 and await error_el.is_visible(): error_text = await error_el.text_content() if error_text and error_text.strip(): print(f"[{self.platform_name}] 检测到错误: {error_text}") screenshot_base64 = await self.capture_screenshot() return PublishResult( success=False, platform=self.platform_name, error=f"发布失败: {error_text.strip()}", screenshot_base64=screenshot_base64, page_url=current_url, status='failed' ) except Exception as e: print(f"[{self.platform_name}] 检测错误提示异常: {e}") # 检查验证码 captcha_result = await self.check_captcha() if captcha_result['need_captcha']: screenshot_base64 = await self.capture_screenshot() return PublishResult( success=False, platform=self.platform_name, error=f"发布过程中需要{captcha_result['captcha_type']}验证码", need_captcha=True, captcha_type=captcha_result['captcha_type'], screenshot_base64=screenshot_base64, page_url=current_url, status='need_captcha' ) # 检查发布按钮状态(如果还在编辑页面) if 'edit' in current_url: try: # 检查是否正在上传/处理中 processing_indicators = [ '[class*="loading"]', '[class*="uploading"]', '[class*="processing"]', 'div:has-text("正在上传")', 'div:has-text("正在处理")', ] is_processing = False for indicator in processing_indicators: if await self.page.locator(indicator).count() > 0: is_processing = True print(f"[{self.platform_name}] 正在处理中...") break if not is_processing: # 如果不是在处理中,可能需要重新点击发布按钮 elapsed = asyncio.get_event_loop().time() - start_time if elapsed > 30: # 30秒后还在编辑页且不在处理中,可能发布没生效 print(f"[{self.platform_name}] 发布似乎未生效,尝试重新点击发布按钮...") for selector in publish_selectors: try: btn = self.page.locator(selector).first if await btn.count() > 0 and await btn.is_visible(): is_disabled = await btn.get_attribute('disabled') if not is_disabled: await btn.click() print(f"[{self.platform_name}] 重新点击发布按钮") break except: pass except Exception as e: print(f"[{self.platform_name}] 检查处理状态异常: {e}") # 超时,获取截图分析最终状态 print(f"[{self.platform_name}] 发布超时,最终 URL: {self.page.url}") screenshot_base64 = await self.capture_screenshot() # 最后一次检查是否在内容管理页 final_url = self.page.url if '/builder/rc/content' in final_url and 'edit' not in final_url: return PublishResult( success=True, platform=self.platform_name, message="发布成功(延迟确认)", screenshot_base64=screenshot_base64, page_url=final_url, status='success' ) return PublishResult( success=False, platform=self.platform_name, error="发布超时,请手动检查发布状态", screenshot_base64=screenshot_base64, page_url=final_url, status='need_action' ) async def get_works(self, cookies: str, page: int = 0, page_size: int = 20) -> WorksResult: """ 获取百家号作品列表 优先使用内容管理页的接口(pcui/article/lists)。 说明: - 该接口通常需要自定义请求头 token(JWT),仅靠 Cookie 可能会返回“未登录” - 这里使用 Playwright 打开内容页,从 localStorage/sessionStorage/页面脚本中自动提取 token, 再在页面上下文中发起 fetch(携带 cookie + token),以提高成功率 """ import re print(f"\n{'='*60}") print(f"[{self.platform_name}] 获取作品列表 (使用 API)") print(f"[{self.platform_name}] page={page}, page_size={page_size}") print(f"{'='*60}") works: List[WorkItem] = [] total = 0 has_more = False next_page = "" try: # 解析并设置 cookies(Playwright) cookie_list = self.parse_cookies(cookies) await self.init_browser() await self.set_cookies(cookie_list) if not self.page: raise Exception("Page not initialized") # 先打开内容管理页,确保本页 Referer/会话就绪 # Node 侧传 page=0,1,...;接口 currentPage 为 1,2,... current_page = int(page) + 1 page_size = int(page_size) content_url = ( "https://baijiahao.baidu.com/builder/rc/content" f"?currentPage={current_page}&pageSize={page_size}" "&search=&type=&collection=&startDate=&endDate=" ) await self.page.goto(content_url, wait_until="domcontentloaded", timeout=60000) await asyncio.sleep(2) # 1) 提取 token(JWT) token = await self.page.evaluate( """ () => { const isJwtLike = (v) => { if (!v || typeof v !== 'string') return false; const s = v.trim(); if (s.length < 60) return false; const parts = s.split('.'); if (parts.length !== 3) return false; return parts.every(p => /^[A-Za-z0-9_-]+$/.test(p) && p.length > 10); }; const pickFromStorage = (storage) => { try { const keys = Object.keys(storage || {}); for (const k of keys) { const v = storage.getItem(k); if (isJwtLike(v)) return v; } } catch {} return ""; }; // localStorage / sessionStorage let t = pickFromStorage(window.localStorage); if (t) return t; t = pickFromStorage(window.sessionStorage); if (t) return t; // meta 标签 const meta = document.querySelector('meta[name="token"], meta[name="bjh-token"]'); const metaToken = meta && meta.getAttribute('content'); if (isJwtLike(metaToken)) return metaToken; // 简单从全局变量里找 const candidates = [ (window.__INITIAL_STATE__ && window.__INITIAL_STATE__.token) || "", (window.__PRELOADED_STATE__ && window.__PRELOADED_STATE__.token) || "", (window.__NUXT__ && window.__NUXT__.state && window.__NUXT__.state.token) || "", ]; for (const c of candidates) { if (isJwtLike(c)) return c; } return ""; } """ ) # 2) 若仍未取到 token,再从页面 HTML 兜底提取 if not token: html = await self.page.content() m = re.search(r'([A-Za-z0-9_-]{10,}\.[A-Za-z0-9_-]{10,}\.[A-Za-z0-9_-]{10,})', html) if m: token = m.group(1) if not token: raise Exception("未能从页面提取 token(可能未登录或触发风控),请重新登录百家号账号后再试") # 3) 调用接口(在页面上下文 fetch,自动携带 cookie) api_url = ( "https://baijiahao.baidu.com/pcui/article/lists" f"?currentPage={current_page}" f"&pageSize={page_size}" "&search=&type=&collection=&startDate=&endDate=" "&clearBeforeFetch=false" "&dynamic=1" ) resp = await self.page.evaluate( """ async ({ url, token }) => { const r = await fetch(url, { method: 'GET', credentials: 'include', headers: { 'accept': 'application/json, text/plain, */*', ...(token ? { token } : {}), }, }); const text = await r.text(); return { ok: r.ok, status: r.status, text }; } """, {"url": api_url, "token": token}, ) if not resp or not resp.get("ok"): status = resp.get("status") if isinstance(resp, dict) else "unknown" raise Exception(f"百家号接口请求失败: HTTP {status}") api_result = json.loads(resp.get("text") or "{}") print(f"[{self.platform_name}] pcui/article/lists 响应: errno={api_result.get('errno')}, errmsg={api_result.get('errmsg')}") if api_result.get("errno") != 0: errno = api_result.get("errno") errmsg = api_result.get("errmsg", "unknown error") # 20040001 常见为“未登录” if errno in (110, 20040001): raise Exception("百家号未登录或 Cookie/token 失效,请重新登录后再同步") raise Exception(f"百家号接口错误: errno={errno}, errmsg={errmsg}") data = api_result.get("data", {}) or {} items = data.get("list", []) or [] page_info = data.get("page", {}) or {} total = int(page_info.get("totalCount", 0) or 0) total_page = int(page_info.get("totalPage", 0) or 0) cur_page = int(page_info.get("currentPage", current_page) or current_page) has_more = bool(total_page and cur_page < total_page) next_page = cur_page + 1 if has_more else "" print(f"[{self.platform_name}] 获取到 {len(items)} 个作品,总数: {total}, currentPage={cur_page}, totalPage={total_page}") def _pick_cover(item: dict) -> str: cover = item.get("crosswise_cover") or item.get("vertical_cover") or "" if cover: return cover raw = item.get("cover_images") or "" try: # cover_images 可能是 JSON 字符串 parsed = json.loads(raw) if isinstance(raw, str) else raw if isinstance(parsed, list) and parsed: first = parsed[0] if isinstance(first, dict): return first.get("src") or first.get("ori_src") or "" if isinstance(first, str): return first except Exception: pass return "" def _pick_duration(item: dict) -> int: for k in ("rmb_duration", "duration", "long"): try: v = int(item.get(k) or 0) if v > 0: return v except Exception: pass # displaytype_exinfo 里可能有 ugcvideo.video_info.durationInSecond ex = item.get("displaytype_exinfo") or "" try: exj = json.loads(ex) if isinstance(ex, str) and ex else (ex if isinstance(ex, dict) else {}) ugc = (exj.get("ugcvideo") or {}) if isinstance(exj, dict) else {} vi = ugc.get("video_info") or {} v = int(vi.get("durationInSecond") or ugc.get("long") or 0) return v if v > 0 else 0 except Exception: return 0 def _pick_status(item: dict) -> str: qs = str(item.get("quality_status") or "").lower() st = str(item.get("status") or "").lower() if qs == "rejected" or "reject" in st: return "rejected" if st in ("draft", "unpublish", "unpublished"): return "draft" # 百家号常见 publish return "published" for item in items: # 优先使用 nid(builder 预览链接使用这个) work_id = str(item.get("nid") or item.get("feed_id") or item.get("article_id") or item.get("id") or "") if not work_id: continue works.append( WorkItem( work_id=work_id, title=str(item.get("title") or ""), cover_url=_pick_cover(item), video_url=str(item.get("url") or ""), duration=_pick_duration(item), status=_pick_status(item), publish_time=str(item.get("publish_time") or item.get("publish_at") or item.get("created_at") or ""), play_count=int(item.get("read_amount") or 0), like_count=int(item.get("like_amount") or 0), comment_count=int(item.get("comment_amount") or 0), share_count=int(item.get("share_amount") or 0), collect_count=int(item.get("collection_amount") or 0), ) ) print(f"[{self.platform_name}] ✓ 成功解析 {len(works)} 个作品") except Exception as e: import traceback traceback.print_exc() return WorksResult( success=False, platform=self.platform_name, error=str(e), debug_info="baijiahao_get_works_failed" ) return WorksResult( success=True, platform=self.platform_name, works=works, total=total, has_more=has_more, next_page=next_page ) async def check_login_status(self, cookies: str) -> dict: """ 检查百家号 Cookie 登录状态 使用直接 HTTP API 调用,不使用浏览器 """ import aiohttp print(f"[{self.platform_name}] 检查登录状态 (使用 API)") try: # 解析 cookies cookie_list = self.parse_cookies(cookies) cookie_dict = {c['name']: c['value'] for c in cookie_list} # 重要:百家号需要先访问主页建立会话上下文 session_headers = { 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7', 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36', # Cookie 由 session 管理 'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8', 'Accept-Encoding': 'gzip, deflate, br', 'Connection': 'keep-alive', 'Upgrade-Insecure-Requests': '1', 'Sec-Fetch-Dest': 'document', 'Sec-Fetch-Mode': 'navigate', 'Sec-Fetch-Site': 'none', 'Sec-Fetch-User': '?1', 'sec-ch-ua': '"Not_A Brand";v="8", "Chromium";v="120", "Google Chrome";v="120"', 'sec-ch-ua-mobile': '?0', 'sec-ch-ua-platform': '"Windows"' } headers = { 'Accept': 'application/json, text/plain, */*', 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36', # Cookie 由 session 管理 'Referer': 'https://baijiahao.baidu.com/builder/rc/home', 'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8', 'Accept-Encoding': 'gzip, deflate, br', 'Connection': 'keep-alive', 'Sec-Fetch-Dest': 'empty', 'Sec-Fetch-Mode': 'cors', 'Sec-Fetch-Site': 'same-origin', 'sec-ch-ua': '"Not_A Brand";v="8", "Chromium";v="120", "Google Chrome";v="120"', 'sec-ch-ua-mobile': '?0', 'sec-ch-ua-platform': '"Windows"' } async with aiohttp.ClientSession(cookies=cookie_dict) as session: # 步骤 0: 先访问主页建立会话上下文(关键步骤!) print(f"[{self.platform_name}] [0/2] 访问主页建立会话上下文...") async with session.get( 'https://baijiahao.baidu.com/builder/rc/home', headers=session_headers, timeout=aiohttp.ClientTimeout(total=30) ) as home_response: home_status = home_response.status print(f"[{self.platform_name}] 主页访问状态: {home_status}") # 短暂等待确保会话建立 await asyncio.sleep(1) # 步骤 1: 调用 API 检查登录状态 print(f"[{self.platform_name}] [1/2] 调用 appinfo API 检查登录状态...") async with session.get( 'https://baijiahao.baidu.com/builder/app/appinfo', headers=headers, timeout=aiohttp.ClientTimeout(total=30) ) as response: api_result = await response.json() errno = api_result.get('errno') print(f"[{self.platform_name}] API 完整响应: {json.dumps(api_result, ensure_ascii=False)[:500]}") print(f"[{self.platform_name}] API 响应: errno={errno}") # errno 为 0 表示请求成功 if errno == 0: # 检查是否有用户数据 user_data = api_result.get('data', {}).get('user', {}) if user_data: # 检查账号状态 status = user_data.get('status', '') account_name = user_data.get('name') or user_data.get('uname', '') # 有效的账号状态:audit(审核中), pass(已通过), normal(正常), newbie(新手) valid_statuses = ['audit', 'pass', 'normal', 'newbie'] if status in valid_statuses and account_name: print(f"[{self.platform_name}] ✓ 登录状态有效: {account_name} (status={status})") return { "success": True, "valid": True, "need_login": False, "message": "登录状态有效" } else: print(f"[{self.platform_name}] 账号状态异常: status={status}, name={account_name}") return { "success": True, "valid": False, "need_login": True, "message": f"账号状态异常: {status}" } else: print(f"[{self.platform_name}] 无用户数据,Cookie 可能无效") return { "success": True, "valid": False, "need_login": True, "message": "无用户数据" } # errno 非 0 表示请求失败 # 常见错误码:110 = 未登录 error_msg = api_result.get('errmsg', '未知错误') print(f"[{self.platform_name}] Cookie 无效: errno={errno}, msg={error_msg}") return { "success": True, "valid": False, "need_login": True, "message": error_msg } except Exception as e: import traceback traceback.print_exc() return { "success": False, "valid": False, "need_login": True, "error": str(e) } async def get_comments(self, cookies: str, work_id: str, cursor: str = "") -> CommentsResult: """获取百家号作品评论""" # TODO: 实现评论获取逻辑 return CommentsResult( success=False, platform=self.platform_name, work_id=work_id, error="百家号评论功能暂未实现" )