# -*- coding: utf-8 -*- """ 百家号视频发布器 """ import asyncio import json from typing import List from datetime import datetime from .base import ( BasePublisher, PublishParams, PublishResult, WorkItem, WorksResult, CommentItem, CommentsResult ) class BaijiahaoPublisher(BasePublisher): """ 百家号视频发布器 使用 Playwright 自动化操作百家号创作者中心 """ platform_name = "baijiahao" login_url = "https://baijiahao.baidu.com/" publish_url = "https://baijiahao.baidu.com/builder/rc/edit?type=video" cookie_domain = ".baidu.com" # 登录检测配置 login_check_url = "https://baijiahao.baidu.com/builder/rc/home" login_indicators = ["passport.baidu.com", "/login", "wappass.baidu.com"] login_selectors = ['text="登录"', 'text="请登录"', '[class*="login-btn"]'] async def get_account_info(self, cookies: str) -> dict: """ 获取百家号账号信息 通过调用 settingInfo API 获取用户信息 """ print(f"\n{'='*60}") print(f"[{self.platform_name}] 获取账号信息") print(f"{'='*60}") try: await self.init_browser() cookie_list = self.parse_cookies(cookies) await self.set_cookies(cookie_list) if not self.page: raise Exception("Page not initialized") # 访问百家号后台首页 print(f"[{self.platform_name}] 访问后台首页...") await self.page.goto(self.login_check_url, wait_until="domcontentloaded", timeout=30000) await asyncio.sleep(3) # 检查登录状态 current_url = self.page.url print(f"[{self.platform_name}] 当前 URL: {current_url}") for indicator in self.login_indicators: if indicator in current_url: print(f"[{self.platform_name}] 检测到登录页面,Cookie 已失效") return { "success": False, "error": "Cookie 已失效,需要重新登录", "need_login": True } # 调用 settingInfo API 获取用户信息 print(f"[{self.platform_name}] 调用 settingInfo API...") api_result = await self.page.evaluate(''' async () => { try { const response = await fetch('https://baijiahao.baidu.com/user-ui/cms/settingInfo', { method: 'GET', credentials: 'include', headers: { 'Accept': 'application/json, text/plain, */*' } }); return await response.json(); } catch (e) { return { error: e.message }; } } ''') print(f"[{self.platform_name}] API 响应: errno={api_result.get('errno')}") if api_result.get('error'): return { "success": False, "error": api_result.get('error') } if api_result.get('errno') == 0 and api_result.get('data'): data = api_result['data'] account_info = { "success": True, "account_id": str(data.get('new_uc_id', '')) or f"baijiahao_{int(datetime.now().timestamp() * 1000)}", "account_name": data.get('name', '') or '百家号账号', "avatar_url": data.get('avatar', ''), "fans_count": 0, # 百家号 API 不直接返回粉丝数 "works_count": 0, } print(f"[{self.platform_name}] 获取成功: {account_info['account_name']}") return account_info else: error_msg = api_result.get('errmsg', '未知错误') print(f"[{self.platform_name}] API 返回错误: {error_msg}") # 如果是登录相关错误,标记需要重新登录 if api_result.get('errno') in [10000010, 10001401]: return { "success": False, "error": error_msg, "need_login": True } return { "success": False, "error": error_msg } except Exception as e: import traceback traceback.print_exc() return { "success": False, "error": str(e) } finally: await self.close_browser() async def check_captcha(self) -> dict: """检查页面是否需要验证码""" if not self.page: return {'need_captcha': False, 'captcha_type': ''} try: # 检查各种验证码 captcha_selectors = [ 'text="请输入验证码"', 'text="滑动验证"', '[class*="captcha"]', '[class*="verify"]', ] for selector in captcha_selectors: try: if await self.page.locator(selector).count() > 0: print(f"[{self.platform_name}] 检测到验证码: {selector}") return {'need_captcha': True, 'captcha_type': 'image'} except: pass # 检查登录弹窗 login_selectors = [ 'text="请登录"', 'text="登录后继续"', '[class*="login-dialog"]', ] for selector in login_selectors: try: if await self.page.locator(selector).count() > 0: print(f"[{self.platform_name}] 检测到需要登录: {selector}") return {'need_captcha': True, 'captcha_type': 'login'} except: pass except Exception as e: print(f"[{self.platform_name}] 验证码检测异常: {e}") return {'need_captcha': False, 'captcha_type': ''} async def publish(self, cookies: str, params: PublishParams) -> PublishResult: """发布视频到百家号""" import os print(f"\n{'='*60}") print(f"[{self.platform_name}] 开始发布视频") print(f"[{self.platform_name}] 视频路径: {params.video_path}") print(f"[{self.platform_name}] 标题: {params.title}") print(f"[{self.platform_name}] Headless: {self.headless}") print(f"{'='*60}") self.report_progress(5, "正在初始化浏览器...") # 初始化浏览器 await self.init_browser() print(f"[{self.platform_name}] 浏览器初始化完成") # 解析并设置 cookies cookie_list = self.parse_cookies(cookies) print(f"[{self.platform_name}] 解析到 {len(cookie_list)} 个 cookies") await self.set_cookies(cookie_list) if not self.page: raise Exception("Page not initialized") # 检查视频文件 if not os.path.exists(params.video_path): raise Exception(f"视频文件不存在: {params.video_path}") print(f"[{self.platform_name}] 视频文件存在,大小: {os.path.getsize(params.video_path)} bytes") self.report_progress(10, "正在打开上传页面...") # 访问视频发布页面(使用新视频发布界面) video_publish_url = "https://baijiahao.baidu.com/builder/rc/edit?type=videoV2&is_from_cms=1" await self.page.goto(video_publish_url, wait_until="domcontentloaded", timeout=60000) await asyncio.sleep(3) # 检查是否跳转到登录页 current_url = self.page.url print(f"[{self.platform_name}] 当前页面: {current_url}") for indicator in self.login_indicators: if indicator in current_url: screenshot_base64 = await self.capture_screenshot() return PublishResult( success=False, platform=self.platform_name, error="Cookie 已过期,需要重新登录", need_captcha=True, captcha_type='login', screenshot_base64=screenshot_base64, page_url=current_url, status='need_captcha' ) # 检查验证码 captcha_result = await self.check_captcha() if captcha_result['need_captcha']: screenshot_base64 = await self.capture_screenshot() return PublishResult( success=False, platform=self.platform_name, error=f"需要{captcha_result['captcha_type']}验证码", need_captcha=True, captcha_type=captcha_result['captcha_type'], screenshot_base64=screenshot_base64, page_url=current_url, status='need_captcha' ) self.report_progress(15, "正在选择视频文件...") # 等待页面加载完成 await asyncio.sleep(2) # 关闭可能的弹窗 try: close_buttons = [ 'button:has-text("我知道了")', 'button:has-text("知道了")', '[class*="close"]', '[class*="modal-close"]', ] for btn_selector in close_buttons: try: btn = self.page.locator(btn_selector).first if await btn.count() > 0 and await btn.is_visible(): await btn.click() await asyncio.sleep(0.5) except: pass except: pass # 上传视频 - 尝试多种方式 upload_success = False # 方法1: 直接通过 file input 上传 try: file_inputs = await self.page.query_selector_all('input[type="file"]') print(f"[{self.platform_name}] 找到 {len(file_inputs)} 个文件输入") for file_input in file_inputs: try: await file_input.set_input_files(params.video_path) upload_success = True print(f"[{self.platform_name}] 通过 file input 上传成功") break except Exception as e: print(f"[{self.platform_name}] file input 上传失败: {e}") except Exception as e: print(f"[{self.platform_name}] 查找 file input 失败: {e}") # 方法2: 点击上传区域 if not upload_success: upload_selectors = [ 'div[class*="upload-box"]', 'div[class*="drag-upload"]', 'div[class*="uploader"]', 'div:has-text("点击上传")', 'div:has-text("选择文件")', '[class*="upload-area"]', ] for selector in upload_selectors: if upload_success: break try: upload_area = self.page.locator(selector).first if await upload_area.count() > 0: print(f"[{self.platform_name}] 尝试点击上传区域: {selector}") async with self.page.expect_file_chooser(timeout=10000) as fc_info: await upload_area.click() file_chooser = await fc_info.value await file_chooser.set_files(params.video_path) upload_success = True print(f"[{self.platform_name}] 通过点击上传区域成功") break except Exception as e: print(f"[{self.platform_name}] 选择器 {selector} 失败: {e}") if not upload_success: screenshot_base64 = await self.capture_screenshot() return PublishResult( success=False, platform=self.platform_name, error="未找到上传入口", screenshot_base64=screenshot_base64, page_url=await self.get_page_url(), status='failed' ) self.report_progress(20, "等待视频上传...") # 等待视频上传完成(最多5分钟) upload_timeout = 300 start_time = asyncio.get_event_loop().time() while asyncio.get_event_loop().time() - start_time < upload_timeout: # 检查上传进度 progress_text = '' try: progress_el = self.page.locator('[class*="progress"], [class*="percent"]').first if await progress_el.count() > 0: progress_text = await progress_el.text_content() if progress_text: import re match = re.search(r'(\d+)%', progress_text) if match: pct = int(match.group(1)) self.report_progress(20 + int(pct * 0.4), f"视频上传中 {pct}%...") if pct >= 100: print(f"[{self.platform_name}] 上传完成") break except: pass # 检查是否出现标题输入框(说明上传完成) try: title_input = self.page.locator('input[placeholder*="标题"], textarea[placeholder*="标题"], [class*="title-input"] input').first if await title_input.count() > 0 and await title_input.is_visible(): print(f"[{self.platform_name}] 检测到标题输入框,上传完成") break except: pass # 检查是否有错误提示 try: error_el = self.page.locator('[class*="error"], [class*="fail"]').first if await error_el.count() > 0: error_text = await error_el.text_content() if error_text and ('失败' in error_text or '错误' in error_text): raise Exception(f"上传失败: {error_text}") except: pass await asyncio.sleep(3) self.report_progress(60, "正在填写标题...") await asyncio.sleep(2) # 填写标题 title_filled = False title_selectors = [ 'input[placeholder*="标题"]', 'textarea[placeholder*="标题"]', '[class*="title-input"] input', '[class*="title"] input', 'input[maxlength]', ] for selector in title_selectors: if title_filled: break try: title_input = self.page.locator(selector).first if await title_input.count() > 0 and await title_input.is_visible(): await title_input.click() await self.page.keyboard.press("Control+KeyA") await self.page.keyboard.type(params.title[:30]) # 百家号标题限制30字 title_filled = True print(f"[{self.platform_name}] 标题填写成功") except Exception as e: print(f"[{self.platform_name}] 标题选择器 {selector} 失败: {e}") if not title_filled: print(f"[{self.platform_name}] 警告: 未能填写标题") # 填写描述 if params.description: self.report_progress(65, "正在填写描述...") try: desc_selectors = [ 'textarea[placeholder*="描述"]', 'textarea[placeholder*="简介"]', '[class*="desc"] textarea', '[class*="description"] textarea', ] for selector in desc_selectors: try: desc_input = self.page.locator(selector).first if await desc_input.count() > 0 and await desc_input.is_visible(): await desc_input.click() await self.page.keyboard.type(params.description[:200]) print(f"[{self.platform_name}] 描述填写成功") break except: pass except Exception as e: print(f"[{self.platform_name}] 描述填写失败: {e}") self.report_progress(70, "正在发布...") await asyncio.sleep(2) # 点击发布按钮 publish_selectors = [ 'button:has-text("发布")', 'button:has-text("发表")', 'button:has-text("提交")', '[class*="publish"] button', '[class*="submit"] button', ] publish_clicked = False for selector in publish_selectors: if publish_clicked: break try: btn = self.page.locator(selector).first if await btn.count() > 0 and await btn.is_visible(): # 检查按钮是否可用 is_disabled = await btn.get_attribute('disabled') if is_disabled: print(f"[{self.platform_name}] 按钮 {selector} 被禁用") continue await btn.click() publish_clicked = True print(f"[{self.platform_name}] 点击发布按钮成功") except Exception as e: print(f"[{self.platform_name}] 发布按钮 {selector} 失败: {e}") if not publish_clicked: screenshot_base64 = await self.capture_screenshot() return PublishResult( success=False, platform=self.platform_name, error="未找到发布按钮", screenshot_base64=screenshot_base64, page_url=await self.get_page_url(), status='failed' ) self.report_progress(80, "等待发布完成...") # 等待发布完成(最多2分钟) publish_timeout = 120 start_time = asyncio.get_event_loop().time() while asyncio.get_event_loop().time() - start_time < publish_timeout: await asyncio.sleep(3) current_url = self.page.url # 检查是否跳转到成功页面 if 'success' in current_url or 'content' in current_url or 'manage' in current_url: self.report_progress(100, "发布成功!") print(f"[{self.platform_name}] 发布成功,跳转到: {current_url}") screenshot_base64 = await self.capture_screenshot() return PublishResult( success=True, platform=self.platform_name, message="发布成功", screenshot_base64=screenshot_base64, page_url=current_url, status='success' ) # 检查是否有成功提示 try: success_indicators = [ 'text="发布成功"', 'text="提交成功"', '[class*="success"]', ] for indicator in success_indicators: if await self.page.locator(indicator).count() > 0: self.report_progress(100, "发布成功!") print(f"[{self.platform_name}] 检测到成功提示") screenshot_base64 = await self.capture_screenshot() return PublishResult( success=True, platform=self.platform_name, message="发布成功", screenshot_base64=screenshot_base64, page_url=current_url, status='success' ) except: pass # 检查是否有错误提示 try: error_el = self.page.locator('[class*="error"], [class*="fail"]').first if await error_el.count() > 0: error_text = await error_el.text_content() if error_text and ('失败' in error_text or '错误' in error_text): raise Exception(f"发布失败: {error_text}") except: pass # 检查验证码 captcha_result = await self.check_captcha() if captcha_result['need_captcha']: screenshot_base64 = await self.capture_screenshot() return PublishResult( success=False, platform=self.platform_name, error=f"发布过程中需要{captcha_result['captcha_type']}验证码", need_captcha=True, captcha_type=captcha_result['captcha_type'], screenshot_base64=screenshot_base64, page_url=current_url, status='need_captcha' ) # 超时,返回截图供分析 screenshot_base64 = await self.capture_screenshot() return PublishResult( success=False, platform=self.platform_name, error="发布超时,请检查发布状态", screenshot_base64=screenshot_base64, page_url=await self.get_page_url(), status='need_action' ) async def get_works(self, cookies: str, page: int = 0, page_size: int = 20) -> WorksResult: """获取百家号作品列表""" print(f"\n{'='*60}") print(f"[{self.platform_name}] 获取作品列表") print(f"[{self.platform_name}] page={page}, page_size={page_size}") print(f"{'='*60}") works: List[WorkItem] = [] total = 0 has_more = False try: await self.init_browser() cookie_list = self.parse_cookies(cookies) await self.set_cookies(cookie_list) if not self.page: raise Exception("Page not initialized") # 访问内容管理页面 await self.page.goto("https://baijiahao.baidu.com/builder/rc/content", wait_until="domcontentloaded", timeout=30000) await asyncio.sleep(3) # 检查登录状态 current_url = self.page.url for indicator in self.login_indicators: if indicator in current_url: raise Exception("Cookie 已过期,请重新登录") # 调用作品列表 API cursor = page * page_size api_result = await self.page.evaluate(f''' async () => {{ try {{ const response = await fetch('https://baijiahao.baidu.com/pcui/article/lists?start={cursor}&count={page_size}&article_type=video', {{ method: 'GET', credentials: 'include', headers: {{ 'Accept': 'application/json' }} }}); return await response.json(); }} catch (e) {{ return {{ error: e.message }}; }} }} ''') print(f"[{self.platform_name}] API 响应: {json.dumps(api_result, ensure_ascii=False)[:200]}") if api_result.get('errno') == 0: article_list = api_result.get('data', {}).get('article_list', []) has_more = api_result.get('data', {}).get('has_more', False) for article in article_list: work_id = str(article.get('article_id', '')) if not work_id: continue works.append(WorkItem( work_id=work_id, title=article.get('title', ''), cover_url=article.get('cover_images', [''])[0] if article.get('cover_images') else '', duration=0, status='published', publish_time=article.get('publish_time', ''), play_count=int(article.get('read_count', 0)), like_count=int(article.get('like_count', 0)), comment_count=int(article.get('comment_count', 0)), share_count=int(article.get('share_count', 0)), )) total = len(works) print(f"[{self.platform_name}] 获取到 {total} 个作品") except Exception as e: import traceback traceback.print_exc() return WorksResult( success=False, platform=self.platform_name, error=str(e) ) finally: await self.close_browser() return WorksResult( success=True, platform=self.platform_name, works=works, total=total, has_more=has_more ) async def get_comments(self, cookies: str, work_id: str, cursor: str = "") -> CommentsResult: """获取百家号作品评论""" # TODO: 实现评论获取逻辑 return CommentsResult( success=False, platform=self.platform_name, work_id=work_id, error="百家号评论功能暂未实现" )