| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653 |
- # -*- coding: utf-8 -*-
- """
- 百家号视频发布器
- """
- import asyncio
- import json
- from typing import List
- from datetime import datetime
- from .base import (
- BasePublisher, PublishParams, PublishResult,
- WorkItem, WorksResult, CommentItem, CommentsResult
- )
- class BaijiahaoPublisher(BasePublisher):
- """
- 百家号视频发布器
- 使用 Playwright 自动化操作百家号创作者中心
- """
-
- platform_name = "baijiahao"
- login_url = "https://baijiahao.baidu.com/"
- publish_url = "https://baijiahao.baidu.com/builder/rc/edit?type=video"
- cookie_domain = ".baidu.com"
-
- # 登录检测配置
- login_check_url = "https://baijiahao.baidu.com/builder/rc/home"
- login_indicators = ["passport.baidu.com", "/login", "wappass.baidu.com"]
- login_selectors = ['text="登录"', 'text="请登录"', '[class*="login-btn"]']
-
- async def get_account_info(self, cookies: str) -> dict:
- """
- 获取百家号账号信息
- 通过调用 settingInfo API 获取用户信息
- """
- print(f"\n{'='*60}")
- print(f"[{self.platform_name}] 获取账号信息")
- print(f"{'='*60}")
-
- try:
- await self.init_browser()
- cookie_list = self.parse_cookies(cookies)
- await self.set_cookies(cookie_list)
-
- if not self.page:
- raise Exception("Page not initialized")
-
- # 访问百家号后台首页
- print(f"[{self.platform_name}] 访问后台首页...")
- await self.page.goto(self.login_check_url, wait_until="domcontentloaded", timeout=30000)
- await asyncio.sleep(3)
-
- # 检查登录状态
- current_url = self.page.url
- print(f"[{self.platform_name}] 当前 URL: {current_url}")
-
- for indicator in self.login_indicators:
- if indicator in current_url:
- print(f"[{self.platform_name}] 检测到登录页面,Cookie 已失效")
- return {
- "success": False,
- "error": "Cookie 已失效,需要重新登录",
- "need_login": True
- }
-
- # 调用 settingInfo API 获取用户信息
- print(f"[{self.platform_name}] 调用 settingInfo API...")
- api_result = await self.page.evaluate('''
- async () => {
- try {
- const response = await fetch('https://baijiahao.baidu.com/user-ui/cms/settingInfo', {
- method: 'GET',
- credentials: 'include',
- headers: {
- 'Accept': 'application/json, text/plain, */*'
- }
- });
- return await response.json();
- } catch (e) {
- return { error: e.message };
- }
- }
- ''')
-
- print(f"[{self.platform_name}] API 响应: errno={api_result.get('errno')}")
-
- if api_result.get('error'):
- return {
- "success": False,
- "error": api_result.get('error')
- }
-
- if api_result.get('errno') == 0 and api_result.get('data'):
- data = api_result['data']
- account_info = {
- "success": True,
- "account_id": str(data.get('new_uc_id', '')) or f"baijiahao_{int(datetime.now().timestamp() * 1000)}",
- "account_name": data.get('name', '') or '百家号账号',
- "avatar_url": data.get('avatar', ''),
- "fans_count": 0, # 百家号 API 不直接返回粉丝数
- "works_count": 0,
- }
- print(f"[{self.platform_name}] 获取成功: {account_info['account_name']}")
- return account_info
- else:
- error_msg = api_result.get('errmsg', '未知错误')
- print(f"[{self.platform_name}] API 返回错误: {error_msg}")
-
- # 如果是登录相关错误,标记需要重新登录
- if api_result.get('errno') in [10000010, 10001401]:
- return {
- "success": False,
- "error": error_msg,
- "need_login": True
- }
-
- return {
- "success": False,
- "error": error_msg
- }
-
- except Exception as e:
- import traceback
- traceback.print_exc()
- return {
- "success": False,
- "error": str(e)
- }
- finally:
- await self.close_browser()
-
- async def check_captcha(self) -> dict:
- """检查页面是否需要验证码"""
- if not self.page:
- return {'need_captcha': False, 'captcha_type': ''}
-
- try:
- # 检查各种验证码
- captcha_selectors = [
- 'text="请输入验证码"',
- 'text="滑动验证"',
- '[class*="captcha"]',
- '[class*="verify"]',
- ]
- for selector in captcha_selectors:
- try:
- if await self.page.locator(selector).count() > 0:
- print(f"[{self.platform_name}] 检测到验证码: {selector}")
- return {'need_captcha': True, 'captcha_type': 'image'}
- except:
- pass
-
- # 检查登录弹窗
- login_selectors = [
- 'text="请登录"',
- 'text="登录后继续"',
- '[class*="login-dialog"]',
- ]
- for selector in login_selectors:
- try:
- if await self.page.locator(selector).count() > 0:
- print(f"[{self.platform_name}] 检测到需要登录: {selector}")
- return {'need_captcha': True, 'captcha_type': 'login'}
- except:
- pass
-
- except Exception as e:
- print(f"[{self.platform_name}] 验证码检测异常: {e}")
-
- return {'need_captcha': False, 'captcha_type': ''}
- async def publish(self, cookies: str, params: PublishParams) -> PublishResult:
- """发布视频到百家号"""
- import os
-
- print(f"\n{'='*60}")
- print(f"[{self.platform_name}] 开始发布视频")
- print(f"[{self.platform_name}] 视频路径: {params.video_path}")
- print(f"[{self.platform_name}] 标题: {params.title}")
- print(f"[{self.platform_name}] Headless: {self.headless}")
- print(f"{'='*60}")
-
- self.report_progress(5, "正在初始化浏览器...")
-
- # 初始化浏览器
- await self.init_browser()
- print(f"[{self.platform_name}] 浏览器初始化完成")
-
- # 解析并设置 cookies
- cookie_list = self.parse_cookies(cookies)
- print(f"[{self.platform_name}] 解析到 {len(cookie_list)} 个 cookies")
- await self.set_cookies(cookie_list)
-
- if not self.page:
- raise Exception("Page not initialized")
-
- # 检查视频文件
- if not os.path.exists(params.video_path):
- raise Exception(f"视频文件不存在: {params.video_path}")
-
- print(f"[{self.platform_name}] 视频文件存在,大小: {os.path.getsize(params.video_path)} bytes")
-
- self.report_progress(10, "正在打开上传页面...")
-
- # 访问视频发布页面(使用新视频发布界面)
- video_publish_url = "https://baijiahao.baidu.com/builder/rc/edit?type=videoV2&is_from_cms=1"
- await self.page.goto(video_publish_url, wait_until="domcontentloaded", timeout=60000)
- await asyncio.sleep(3)
-
- # 检查是否跳转到登录页
- current_url = self.page.url
- print(f"[{self.platform_name}] 当前页面: {current_url}")
-
- for indicator in self.login_indicators:
- if indicator in current_url:
- screenshot_base64 = await self.capture_screenshot()
- return PublishResult(
- success=False,
- platform=self.platform_name,
- error="Cookie 已过期,需要重新登录",
- need_captcha=True,
- captcha_type='login',
- screenshot_base64=screenshot_base64,
- page_url=current_url,
- status='need_captcha'
- )
-
- # 检查验证码
- captcha_result = await self.check_captcha()
- if captcha_result['need_captcha']:
- screenshot_base64 = await self.capture_screenshot()
- return PublishResult(
- success=False,
- platform=self.platform_name,
- error=f"需要{captcha_result['captcha_type']}验证码",
- need_captcha=True,
- captcha_type=captcha_result['captcha_type'],
- screenshot_base64=screenshot_base64,
- page_url=current_url,
- status='need_captcha'
- )
-
- self.report_progress(15, "正在选择视频文件...")
-
- # 等待页面加载完成
- await asyncio.sleep(2)
-
- # 关闭可能的弹窗
- try:
- close_buttons = [
- 'button:has-text("我知道了")',
- 'button:has-text("知道了")',
- '[class*="close"]',
- '[class*="modal-close"]',
- ]
- for btn_selector in close_buttons:
- try:
- btn = self.page.locator(btn_selector).first
- if await btn.count() > 0 and await btn.is_visible():
- await btn.click()
- await asyncio.sleep(0.5)
- except:
- pass
- except:
- pass
-
- # 上传视频 - 尝试多种方式
- upload_success = False
-
- # 方法1: 直接通过 file input 上传
- try:
- file_inputs = await self.page.query_selector_all('input[type="file"]')
- print(f"[{self.platform_name}] 找到 {len(file_inputs)} 个文件输入")
-
- for file_input in file_inputs:
- try:
- await file_input.set_input_files(params.video_path)
- upload_success = True
- print(f"[{self.platform_name}] 通过 file input 上传成功")
- break
- except Exception as e:
- print(f"[{self.platform_name}] file input 上传失败: {e}")
- except Exception as e:
- print(f"[{self.platform_name}] 查找 file input 失败: {e}")
-
- # 方法2: 点击上传区域
- if not upload_success:
- upload_selectors = [
- 'div[class*="upload-box"]',
- 'div[class*="drag-upload"]',
- 'div[class*="uploader"]',
- 'div:has-text("点击上传")',
- 'div:has-text("选择文件")',
- '[class*="upload-area"]',
- ]
-
- for selector in upload_selectors:
- if upload_success:
- break
- try:
- upload_area = self.page.locator(selector).first
- if await upload_area.count() > 0:
- print(f"[{self.platform_name}] 尝试点击上传区域: {selector}")
- async with self.page.expect_file_chooser(timeout=10000) as fc_info:
- await upload_area.click()
- file_chooser = await fc_info.value
- await file_chooser.set_files(params.video_path)
- upload_success = True
- print(f"[{self.platform_name}] 通过点击上传区域成功")
- break
- except Exception as e:
- print(f"[{self.platform_name}] 选择器 {selector} 失败: {e}")
-
- if not upload_success:
- screenshot_base64 = await self.capture_screenshot()
- return PublishResult(
- success=False,
- platform=self.platform_name,
- error="未找到上传入口",
- screenshot_base64=screenshot_base64,
- page_url=await self.get_page_url(),
- status='failed'
- )
-
- self.report_progress(20, "等待视频上传...")
-
- # 等待视频上传完成(最多5分钟)
- upload_timeout = 300
- start_time = asyncio.get_event_loop().time()
-
- while asyncio.get_event_loop().time() - start_time < upload_timeout:
- # 检查上传进度
- progress_text = ''
- try:
- progress_el = self.page.locator('[class*="progress"], [class*="percent"]').first
- if await progress_el.count() > 0:
- progress_text = await progress_el.text_content()
- if progress_text:
- import re
- match = re.search(r'(\d+)%', progress_text)
- if match:
- pct = int(match.group(1))
- self.report_progress(20 + int(pct * 0.4), f"视频上传中 {pct}%...")
- if pct >= 100:
- print(f"[{self.platform_name}] 上传完成")
- break
- except:
- pass
-
- # 检查是否出现标题输入框(说明上传完成)
- try:
- title_input = self.page.locator('input[placeholder*="标题"], textarea[placeholder*="标题"], [class*="title-input"] input').first
- if await title_input.count() > 0 and await title_input.is_visible():
- print(f"[{self.platform_name}] 检测到标题输入框,上传完成")
- break
- except:
- pass
-
- # 检查是否有错误提示
- try:
- error_el = self.page.locator('[class*="error"], [class*="fail"]').first
- if await error_el.count() > 0:
- error_text = await error_el.text_content()
- if error_text and ('失败' in error_text or '错误' in error_text):
- raise Exception(f"上传失败: {error_text}")
- except:
- pass
-
- await asyncio.sleep(3)
-
- self.report_progress(60, "正在填写标题...")
- await asyncio.sleep(2)
-
- # 填写标题
- title_filled = False
- title_selectors = [
- 'input[placeholder*="标题"]',
- 'textarea[placeholder*="标题"]',
- '[class*="title-input"] input',
- '[class*="title"] input',
- 'input[maxlength]',
- ]
-
- for selector in title_selectors:
- if title_filled:
- break
- try:
- title_input = self.page.locator(selector).first
- if await title_input.count() > 0 and await title_input.is_visible():
- await title_input.click()
- await self.page.keyboard.press("Control+KeyA")
- await self.page.keyboard.type(params.title[:30]) # 百家号标题限制30字
- title_filled = True
- print(f"[{self.platform_name}] 标题填写成功")
- except Exception as e:
- print(f"[{self.platform_name}] 标题选择器 {selector} 失败: {e}")
-
- if not title_filled:
- print(f"[{self.platform_name}] 警告: 未能填写标题")
-
- # 填写描述
- if params.description:
- self.report_progress(65, "正在填写描述...")
- try:
- desc_selectors = [
- 'textarea[placeholder*="描述"]',
- 'textarea[placeholder*="简介"]',
- '[class*="desc"] textarea',
- '[class*="description"] textarea',
- ]
- for selector in desc_selectors:
- try:
- desc_input = self.page.locator(selector).first
- if await desc_input.count() > 0 and await desc_input.is_visible():
- await desc_input.click()
- await self.page.keyboard.type(params.description[:200])
- print(f"[{self.platform_name}] 描述填写成功")
- break
- except:
- pass
- except Exception as e:
- print(f"[{self.platform_name}] 描述填写失败: {e}")
-
- self.report_progress(70, "正在发布...")
- await asyncio.sleep(2)
-
- # 点击发布按钮
- publish_selectors = [
- 'button:has-text("发布")',
- 'button:has-text("发表")',
- 'button:has-text("提交")',
- '[class*="publish"] button',
- '[class*="submit"] button',
- ]
-
- publish_clicked = False
- for selector in publish_selectors:
- if publish_clicked:
- break
- try:
- btn = self.page.locator(selector).first
- if await btn.count() > 0 and await btn.is_visible():
- # 检查按钮是否可用
- is_disabled = await btn.get_attribute('disabled')
- if is_disabled:
- print(f"[{self.platform_name}] 按钮 {selector} 被禁用")
- continue
-
- await btn.click()
- publish_clicked = True
- print(f"[{self.platform_name}] 点击发布按钮成功")
- except Exception as e:
- print(f"[{self.platform_name}] 发布按钮 {selector} 失败: {e}")
-
- if not publish_clicked:
- screenshot_base64 = await self.capture_screenshot()
- return PublishResult(
- success=False,
- platform=self.platform_name,
- error="未找到发布按钮",
- screenshot_base64=screenshot_base64,
- page_url=await self.get_page_url(),
- status='failed'
- )
-
- self.report_progress(80, "等待发布完成...")
-
- # 等待发布完成(最多2分钟)
- publish_timeout = 120
- start_time = asyncio.get_event_loop().time()
-
- while asyncio.get_event_loop().time() - start_time < publish_timeout:
- await asyncio.sleep(3)
- current_url = self.page.url
-
- # 检查是否跳转到成功页面
- if 'success' in current_url or 'content' in current_url or 'manage' in current_url:
- self.report_progress(100, "发布成功!")
- print(f"[{self.platform_name}] 发布成功,跳转到: {current_url}")
- screenshot_base64 = await self.capture_screenshot()
- return PublishResult(
- success=True,
- platform=self.platform_name,
- message="发布成功",
- screenshot_base64=screenshot_base64,
- page_url=current_url,
- status='success'
- )
-
- # 检查是否有成功提示
- try:
- success_indicators = [
- 'text="发布成功"',
- 'text="提交成功"',
- '[class*="success"]',
- ]
- for indicator in success_indicators:
- if await self.page.locator(indicator).count() > 0:
- self.report_progress(100, "发布成功!")
- print(f"[{self.platform_name}] 检测到成功提示")
- screenshot_base64 = await self.capture_screenshot()
- return PublishResult(
- success=True,
- platform=self.platform_name,
- message="发布成功",
- screenshot_base64=screenshot_base64,
- page_url=current_url,
- status='success'
- )
- except:
- pass
-
- # 检查是否有错误提示
- try:
- error_el = self.page.locator('[class*="error"], [class*="fail"]').first
- if await error_el.count() > 0:
- error_text = await error_el.text_content()
- if error_text and ('失败' in error_text or '错误' in error_text):
- raise Exception(f"发布失败: {error_text}")
- except:
- pass
-
- # 检查验证码
- captcha_result = await self.check_captcha()
- if captcha_result['need_captcha']:
- screenshot_base64 = await self.capture_screenshot()
- return PublishResult(
- success=False,
- platform=self.platform_name,
- error=f"发布过程中需要{captcha_result['captcha_type']}验证码",
- need_captcha=True,
- captcha_type=captcha_result['captcha_type'],
- screenshot_base64=screenshot_base64,
- page_url=current_url,
- status='need_captcha'
- )
-
- # 超时,返回截图供分析
- screenshot_base64 = await self.capture_screenshot()
- return PublishResult(
- success=False,
- platform=self.platform_name,
- error="发布超时,请检查发布状态",
- screenshot_base64=screenshot_base64,
- page_url=await self.get_page_url(),
- status='need_action'
- )
-
- async def get_works(self, cookies: str, page: int = 0, page_size: int = 20) -> WorksResult:
- """获取百家号作品列表"""
- print(f"\n{'='*60}")
- print(f"[{self.platform_name}] 获取作品列表")
- print(f"[{self.platform_name}] page={page}, page_size={page_size}")
- print(f"{'='*60}")
-
- works: List[WorkItem] = []
- total = 0
- has_more = False
-
- try:
- await self.init_browser()
- cookie_list = self.parse_cookies(cookies)
- await self.set_cookies(cookie_list)
-
- if not self.page:
- raise Exception("Page not initialized")
-
- # 访问内容管理页面
- await self.page.goto("https://baijiahao.baidu.com/builder/rc/content", wait_until="domcontentloaded", timeout=30000)
- await asyncio.sleep(3)
-
- # 检查登录状态
- current_url = self.page.url
- for indicator in self.login_indicators:
- if indicator in current_url:
- raise Exception("Cookie 已过期,请重新登录")
-
- # 调用作品列表 API
- cursor = page * page_size
- api_result = await self.page.evaluate(f'''
- async () => {{
- try {{
- const response = await fetch('https://baijiahao.baidu.com/pcui/article/lists?start={cursor}&count={page_size}&article_type=video', {{
- method: 'GET',
- credentials: 'include',
- headers: {{
- 'Accept': 'application/json'
- }}
- }});
- return await response.json();
- }} catch (e) {{
- return {{ error: e.message }};
- }}
- }}
- ''')
-
- print(f"[{self.platform_name}] API 响应: {json.dumps(api_result, ensure_ascii=False)[:200]}")
-
- if api_result.get('errno') == 0:
- article_list = api_result.get('data', {}).get('article_list', [])
- has_more = api_result.get('data', {}).get('has_more', False)
-
- for article in article_list:
- work_id = str(article.get('article_id', ''))
- if not work_id:
- continue
-
- works.append(WorkItem(
- work_id=work_id,
- title=article.get('title', ''),
- cover_url=article.get('cover_images', [''])[0] if article.get('cover_images') else '',
- duration=0,
- status='published',
- publish_time=article.get('publish_time', ''),
- play_count=int(article.get('read_count', 0)),
- like_count=int(article.get('like_count', 0)),
- comment_count=int(article.get('comment_count', 0)),
- share_count=int(article.get('share_count', 0)),
- ))
-
- total = len(works)
-
- print(f"[{self.platform_name}] 获取到 {total} 个作品")
-
- except Exception as e:
- import traceback
- traceback.print_exc()
- return WorksResult(
- success=False,
- platform=self.platform_name,
- error=str(e)
- )
- finally:
- await self.close_browser()
-
- return WorksResult(
- success=True,
- platform=self.platform_name,
- works=works,
- total=total,
- has_more=has_more
- )
-
- async def get_comments(self, cookies: str, work_id: str, cursor: str = "") -> CommentsResult:
- """获取百家号作品评论"""
- # TODO: 实现评论获取逻辑
- return CommentsResult(
- success=False,
- platform=self.platform_name,
- work_id=work_id,
- error="百家号评论功能暂未实现"
- )
|