baijiahao.py 26 KB


  1. # -*- coding: utf-8 -*-
  2. """
  3. 百家号视频发布器
  4. """
  5. import asyncio
  6. import json
  7. from typing import List
  8. from datetime import datetime
  9. from .base import (
  10. BasePublisher, PublishParams, PublishResult,
  11. WorkItem, WorksResult, CommentItem, CommentsResult
  12. )
  13. class BaijiahaoPublisher(BasePublisher):
  14. """
  15. 百家号视频发布器
  16. 使用 Playwright 自动化操作百家号创作者中心
  17. """
  18. platform_name = "baijiahao"
  19. login_url = "https://baijiahao.baidu.com/"
  20. publish_url = "https://baijiahao.baidu.com/builder/rc/edit?type=video"
  21. cookie_domain = ".baidu.com"
  22. # 登录检测配置
  23. login_check_url = "https://baijiahao.baidu.com/builder/rc/home"
  24. login_indicators = ["passport.baidu.com", "/login", "wappass.baidu.com"]
  25. login_selectors = ['text="登录"', 'text="请登录"', '[class*="login-btn"]']
  26. async def get_account_info(self, cookies: str) -> dict:
  27. """
  28. 获取百家号账号信息
  29. 通过调用 settingInfo API 获取用户信息
  30. """
  31. print(f"\n{'='*60}")
  32. print(f"[{self.platform_name}] 获取账号信息")
  33. print(f"{'='*60}")
  34. try:
  35. await self.init_browser()
  36. cookie_list = self.parse_cookies(cookies)
  37. await self.set_cookies(cookie_list)
  38. if not self.page:
  39. raise Exception("Page not initialized")
  40. # 访问百家号后台首页
  41. print(f"[{self.platform_name}] 访问后台首页...")
  42. await self.page.goto(self.login_check_url, wait_until="domcontentloaded", timeout=30000)
  43. await asyncio.sleep(3)
  44. # 检查登录状态
  45. current_url = self.page.url
  46. print(f"[{self.platform_name}] 当前 URL: {current_url}")
  47. for indicator in self.login_indicators:
  48. if indicator in current_url:
  49. print(f"[{self.platform_name}] 检测到登录页面,Cookie 已失效")
  50. return {
  51. "success": False,
  52. "error": "Cookie 已失效,需要重新登录",
  53. "need_login": True
  54. }
  55. # 调用 settingInfo API 获取用户信息
  56. print(f"[{self.platform_name}] 调用 settingInfo API...")
  57. api_result = await self.page.evaluate('''
  58. async () => {
  59. try {
  60. const response = await fetch('https://baijiahao.baidu.com/user-ui/cms/settingInfo', {
  61. method: 'GET',
  62. credentials: 'include',
  63. headers: {
  64. 'Accept': 'application/json, text/plain, */*'
  65. }
  66. });
  67. return await response.json();
  68. } catch (e) {
  69. return { error: e.message };
  70. }
  71. }
  72. ''')
  73. print(f"[{self.platform_name}] API 响应: errno={api_result.get('errno')}")
  74. if api_result.get('error'):
  75. return {
  76. "success": False,
  77. "error": api_result.get('error')
  78. }
  79. if api_result.get('errno') == 0 and api_result.get('data'):
  80. data = api_result['data']
  81. account_info = {
  82. "success": True,
  83. "account_id": str(data.get('new_uc_id', '')) or f"baijiahao_{int(datetime.now().timestamp() * 1000)}",
  84. "account_name": data.get('name', '') or '百家号账号',
  85. "avatar_url": data.get('avatar', ''),
  86. "fans_count": 0, # 百家号 API 不直接返回粉丝数
  87. "works_count": 0,
  88. }
  89. print(f"[{self.platform_name}] 获取成功: {account_info['account_name']}")
  90. return account_info
  91. else:
  92. error_msg = api_result.get('errmsg', '未知错误')
  93. print(f"[{self.platform_name}] API 返回错误: {error_msg}")
  94. # 如果是登录相关错误,标记需要重新登录
  95. if api_result.get('errno') in [10000010, 10001401]:
  96. return {
  97. "success": False,
  98. "error": error_msg,
  99. "need_login": True
  100. }
  101. return {
  102. "success": False,
  103. "error": error_msg
  104. }
  105. except Exception as e:
  106. import traceback
  107. traceback.print_exc()
  108. return {
  109. "success": False,
  110. "error": str(e)
  111. }
  112. finally:
  113. await self.close_browser()
  114. async def check_captcha(self) -> dict:
  115. """检查页面是否需要验证码"""
  116. if not self.page:
  117. return {'need_captcha': False, 'captcha_type': ''}
  118. try:
  119. # 检查各种验证码
  120. captcha_selectors = [
  121. 'text="请输入验证码"',
  122. 'text="滑动验证"',
  123. '[class*="captcha"]',
  124. '[class*="verify"]',
  125. ]
  126. for selector in captcha_selectors:
  127. try:
  128. if await self.page.locator(selector).count() > 0:
  129. print(f"[{self.platform_name}] 检测到验证码: {selector}")
  130. return {'need_captcha': True, 'captcha_type': 'image'}
  131. except:
  132. pass
  133. # 检查登录弹窗
  134. login_selectors = [
  135. 'text="请登录"',
  136. 'text="登录后继续"',
  137. '[class*="login-dialog"]',
  138. ]
  139. for selector in login_selectors:
  140. try:
  141. if await self.page.locator(selector).count() > 0:
  142. print(f"[{self.platform_name}] 检测到需要登录: {selector}")
  143. return {'need_captcha': True, 'captcha_type': 'login'}
  144. except:
  145. pass
  146. except Exception as e:
  147. print(f"[{self.platform_name}] 验证码检测异常: {e}")
  148. return {'need_captcha': False, 'captcha_type': ''}
  149. async def publish(self, cookies: str, params: PublishParams) -> PublishResult:
  150. """发布视频到百家号"""
  151. import os
  152. print(f"\n{'='*60}")
  153. print(f"[{self.platform_name}] 开始发布视频")
  154. print(f"[{self.platform_name}] 视频路径: {params.video_path}")
  155. print(f"[{self.platform_name}] 标题: {params.title}")
  156. print(f"[{self.platform_name}] Headless: {self.headless}")
  157. print(f"{'='*60}")
  158. self.report_progress(5, "正在初始化浏览器...")
  159. # 初始化浏览器
  160. await self.init_browser()
  161. print(f"[{self.platform_name}] 浏览器初始化完成")
  162. # 解析并设置 cookies
  163. cookie_list = self.parse_cookies(cookies)
  164. print(f"[{self.platform_name}] 解析到 {len(cookie_list)} 个 cookies")
  165. await self.set_cookies(cookie_list)
  166. if not self.page:
  167. raise Exception("Page not initialized")
  168. # 检查视频文件
  169. if not os.path.exists(params.video_path):
  170. raise Exception(f"视频文件不存在: {params.video_path}")
  171. print(f"[{self.platform_name}] 视频文件存在,大小: {os.path.getsize(params.video_path)} bytes")
  172. self.report_progress(10, "正在打开上传页面...")
  173. # 访问视频发布页面(使用新视频发布界面)
  174. video_publish_url = "https://baijiahao.baidu.com/builder/rc/edit?type=videoV2&is_from_cms=1"
  175. await self.page.goto(video_publish_url, wait_until="domcontentloaded", timeout=60000)
  176. await asyncio.sleep(3)
  177. # 检查是否跳转到登录页
  178. current_url = self.page.url
  179. print(f"[{self.platform_name}] 当前页面: {current_url}")
  180. for indicator in self.login_indicators:
  181. if indicator in current_url:
  182. screenshot_base64 = await self.capture_screenshot()
  183. return PublishResult(
  184. success=False,
  185. platform=self.platform_name,
  186. error="Cookie 已过期,需要重新登录",
  187. need_captcha=True,
  188. captcha_type='login',
  189. screenshot_base64=screenshot_base64,
  190. page_url=current_url,
  191. status='need_captcha'
  192. )
  193. # 检查验证码
  194. captcha_result = await self.check_captcha()
  195. if captcha_result['need_captcha']:
  196. screenshot_base64 = await self.capture_screenshot()
  197. return PublishResult(
  198. success=False,
  199. platform=self.platform_name,
  200. error=f"需要{captcha_result['captcha_type']}验证码",
  201. need_captcha=True,
  202. captcha_type=captcha_result['captcha_type'],
  203. screenshot_base64=screenshot_base64,
  204. page_url=current_url,
  205. status='need_captcha'
  206. )
  207. self.report_progress(15, "正在选择视频文件...")
  208. # 等待页面加载完成
  209. await asyncio.sleep(2)
  210. # 关闭可能的弹窗
  211. try:
  212. close_buttons = [
  213. 'button:has-text("我知道了")',
  214. 'button:has-text("知道了")',
  215. '[class*="close"]',
  216. '[class*="modal-close"]',
  217. ]
  218. for btn_selector in close_buttons:
  219. try:
  220. btn = self.page.locator(btn_selector).first
  221. if await btn.count() > 0 and await btn.is_visible():
  222. await btn.click()
  223. await asyncio.sleep(0.5)
  224. except:
  225. pass
  226. except:
  227. pass
  228. # 上传视频 - 尝试多种方式
  229. upload_success = False
  230. # 方法1: 直接通过 file input 上传
  231. try:
  232. file_inputs = await self.page.query_selector_all('input[type="file"]')
  233. print(f"[{self.platform_name}] 找到 {len(file_inputs)} 个文件输入")
  234. for file_input in file_inputs:
  235. try:
  236. await file_input.set_input_files(params.video_path)
  237. upload_success = True
  238. print(f"[{self.platform_name}] 通过 file input 上传成功")
  239. break
  240. except Exception as e:
  241. print(f"[{self.platform_name}] file input 上传失败: {e}")
  242. except Exception as e:
  243. print(f"[{self.platform_name}] 查找 file input 失败: {e}")
  244. # 方法2: 点击上传区域
  245. if not upload_success:
  246. upload_selectors = [
  247. 'div[class*="upload-box"]',
  248. 'div[class*="drag-upload"]',
  249. 'div[class*="uploader"]',
  250. 'div:has-text("点击上传")',
  251. 'div:has-text("选择文件")',
  252. '[class*="upload-area"]',
  253. ]
  254. for selector in upload_selectors:
  255. if upload_success:
  256. break
  257. try:
  258. upload_area = self.page.locator(selector).first
  259. if await upload_area.count() > 0:
  260. print(f"[{self.platform_name}] 尝试点击上传区域: {selector}")
  261. async with self.page.expect_file_chooser(timeout=10000) as fc_info:
  262. await upload_area.click()
  263. file_chooser = await fc_info.value
  264. await file_chooser.set_files(params.video_path)
  265. upload_success = True
  266. print(f"[{self.platform_name}] 通过点击上传区域成功")
  267. break
  268. except Exception as e:
  269. print(f"[{self.platform_name}] 选择器 {selector} 失败: {e}")
  270. if not upload_success:
  271. screenshot_base64 = await self.capture_screenshot()
  272. return PublishResult(
  273. success=False,
  274. platform=self.platform_name,
  275. error="未找到上传入口",
  276. screenshot_base64=screenshot_base64,
  277. page_url=await self.get_page_url(),
  278. status='failed'
  279. )
  280. self.report_progress(20, "等待视频上传...")
  281. # 等待视频上传完成(最多5分钟)
  282. upload_timeout = 300
  283. start_time = asyncio.get_event_loop().time()
  284. while asyncio.get_event_loop().time() - start_time < upload_timeout:
  285. # 检查上传进度
  286. progress_text = ''
  287. try:
  288. progress_el = self.page.locator('[class*="progress"], [class*="percent"]').first
  289. if await progress_el.count() > 0:
  290. progress_text = await progress_el.text_content()
  291. if progress_text:
  292. import re
  293. match = re.search(r'(\d+)%', progress_text)
  294. if match:
  295. pct = int(match.group(1))
  296. self.report_progress(20 + int(pct * 0.4), f"视频上传中 {pct}%...")
  297. if pct >= 100:
  298. print(f"[{self.platform_name}] 上传完成")
  299. break
  300. except:
  301. pass
  302. # 检查是否出现标题输入框(说明上传完成)
  303. try:
  304. title_input = self.page.locator('input[placeholder*="标题"], textarea[placeholder*="标题"], [class*="title-input"] input').first
  305. if await title_input.count() > 0 and await title_input.is_visible():
  306. print(f"[{self.platform_name}] 检测到标题输入框,上传完成")
  307. break
  308. except:
  309. pass
  310. # 检查是否有错误提示
  311. try:
  312. error_el = self.page.locator('[class*="error"], [class*="fail"]').first
  313. if await error_el.count() > 0:
  314. error_text = await error_el.text_content()
  315. if error_text and ('失败' in error_text or '错误' in error_text):
  316. raise Exception(f"上传失败: {error_text}")
  317. except:
  318. pass
  319. await asyncio.sleep(3)
  320. self.report_progress(60, "正在填写标题...")
  321. await asyncio.sleep(2)
  322. # 填写标题
  323. title_filled = False
  324. title_selectors = [
  325. 'input[placeholder*="标题"]',
  326. 'textarea[placeholder*="标题"]',
  327. '[class*="title-input"] input',
  328. '[class*="title"] input',
  329. 'input[maxlength]',
  330. ]
  331. for selector in title_selectors:
  332. if title_filled:
  333. break
  334. try:
  335. title_input = self.page.locator(selector).first
  336. if await title_input.count() > 0 and await title_input.is_visible():
  337. await title_input.click()
  338. await self.page.keyboard.press("Control+KeyA")
  339. await self.page.keyboard.type(params.title[:30]) # 百家号标题限制30字
  340. title_filled = True
  341. print(f"[{self.platform_name}] 标题填写成功")
  342. except Exception as e:
  343. print(f"[{self.platform_name}] 标题选择器 {selector} 失败: {e}")
  344. if not title_filled:
  345. print(f"[{self.platform_name}] 警告: 未能填写标题")
  346. # 填写描述
  347. if params.description:
  348. self.report_progress(65, "正在填写描述...")
  349. try:
  350. desc_selectors = [
  351. 'textarea[placeholder*="描述"]',
  352. 'textarea[placeholder*="简介"]',
  353. '[class*="desc"] textarea',
  354. '[class*="description"] textarea',
  355. ]
  356. for selector in desc_selectors:
  357. try:
  358. desc_input = self.page.locator(selector).first
  359. if await desc_input.count() > 0 and await desc_input.is_visible():
  360. await desc_input.click()
  361. await self.page.keyboard.type(params.description[:200])
  362. print(f"[{self.platform_name}] 描述填写成功")
  363. break
  364. except:
  365. pass
  366. except Exception as e:
  367. print(f"[{self.platform_name}] 描述填写失败: {e}")
  368. self.report_progress(70, "正在发布...")
  369. await asyncio.sleep(2)
  370. # 点击发布按钮
  371. publish_selectors = [
  372. 'button:has-text("发布")',
  373. 'button:has-text("发表")',
  374. 'button:has-text("提交")',
  375. '[class*="publish"] button',
  376. '[class*="submit"] button',
  377. ]
  378. publish_clicked = False
  379. for selector in publish_selectors:
  380. if publish_clicked:
  381. break
  382. try:
  383. btn = self.page.locator(selector).first
  384. if await btn.count() > 0 and await btn.is_visible():
  385. # 检查按钮是否可用
  386. is_disabled = await btn.get_attribute('disabled')
  387. if is_disabled:
  388. print(f"[{self.platform_name}] 按钮 {selector} 被禁用")
  389. continue
  390. await btn.click()
  391. publish_clicked = True
  392. print(f"[{self.platform_name}] 点击发布按钮成功")
  393. except Exception as e:
  394. print(f"[{self.platform_name}] 发布按钮 {selector} 失败: {e}")
  395. if not publish_clicked:
  396. screenshot_base64 = await self.capture_screenshot()
  397. return PublishResult(
  398. success=False,
  399. platform=self.platform_name,
  400. error="未找到发布按钮",
  401. screenshot_base64=screenshot_base64,
  402. page_url=await self.get_page_url(),
  403. status='failed'
  404. )
  405. self.report_progress(80, "等待发布完成...")
  406. # 等待发布完成(最多2分钟)
  407. publish_timeout = 120
  408. start_time = asyncio.get_event_loop().time()
  409. while asyncio.get_event_loop().time() - start_time < publish_timeout:
  410. await asyncio.sleep(3)
  411. current_url = self.page.url
  412. # 检查是否跳转到成功页面
  413. if 'success' in current_url or 'content' in current_url or 'manage' in current_url:
  414. self.report_progress(100, "发布成功!")
  415. print(f"[{self.platform_name}] 发布成功,跳转到: {current_url}")
  416. screenshot_base64 = await self.capture_screenshot()
  417. return PublishResult(
  418. success=True,
  419. platform=self.platform_name,
  420. message="发布成功",
  421. screenshot_base64=screenshot_base64,
  422. page_url=current_url,
  423. status='success'
  424. )
  425. # 检查是否有成功提示
  426. try:
  427. success_indicators = [
  428. 'text="发布成功"',
  429. 'text="提交成功"',
  430. '[class*="success"]',
  431. ]
  432. for indicator in success_indicators:
  433. if await self.page.locator(indicator).count() > 0:
  434. self.report_progress(100, "发布成功!")
  435. print(f"[{self.platform_name}] 检测到成功提示")
  436. screenshot_base64 = await self.capture_screenshot()
  437. return PublishResult(
  438. success=True,
  439. platform=self.platform_name,
  440. message="发布成功",
  441. screenshot_base64=screenshot_base64,
  442. page_url=current_url,
  443. status='success'
  444. )
  445. except:
  446. pass
  447. # 检查是否有错误提示
  448. try:
  449. error_el = self.page.locator('[class*="error"], [class*="fail"]').first
  450. if await error_el.count() > 0:
  451. error_text = await error_el.text_content()
  452. if error_text and ('失败' in error_text or '错误' in error_text):
  453. raise Exception(f"发布失败: {error_text}")
  454. except:
  455. pass
  456. # 检查验证码
  457. captcha_result = await self.check_captcha()
  458. if captcha_result['need_captcha']:
  459. screenshot_base64 = await self.capture_screenshot()
  460. return PublishResult(
  461. success=False,
  462. platform=self.platform_name,
  463. error=f"发布过程中需要{captcha_result['captcha_type']}验证码",
  464. need_captcha=True,
  465. captcha_type=captcha_result['captcha_type'],
  466. screenshot_base64=screenshot_base64,
  467. page_url=current_url,
  468. status='need_captcha'
  469. )
  470. # 超时,返回截图供分析
  471. screenshot_base64 = await self.capture_screenshot()
  472. return PublishResult(
  473. success=False,
  474. platform=self.platform_name,
  475. error="发布超时,请检查发布状态",
  476. screenshot_base64=screenshot_base64,
  477. page_url=await self.get_page_url(),
  478. status='need_action'
  479. )
  480. async def get_works(self, cookies: str, page: int = 0, page_size: int = 20) -> WorksResult:
  481. """获取百家号作品列表"""
  482. print(f"\n{'='*60}")
  483. print(f"[{self.platform_name}] 获取作品列表")
  484. print(f"[{self.platform_name}] page={page}, page_size={page_size}")
  485. print(f"{'='*60}")
  486. works: List[WorkItem] = []
  487. total = 0
  488. has_more = False
  489. try:
  490. await self.init_browser()
  491. cookie_list = self.parse_cookies(cookies)
  492. await self.set_cookies(cookie_list)
  493. if not self.page:
  494. raise Exception("Page not initialized")
  495. # 访问内容管理页面
  496. await self.page.goto("https://baijiahao.baidu.com/builder/rc/content", wait_until="domcontentloaded", timeout=30000)
  497. await asyncio.sleep(3)
  498. # 检查登录状态
  499. current_url = self.page.url
  500. for indicator in self.login_indicators:
  501. if indicator in current_url:
  502. raise Exception("Cookie 已过期,请重新登录")
  503. # 调用作品列表 API
  504. cursor = page * page_size
  505. api_result = await self.page.evaluate(f'''
  506. async () => {{
  507. try {{
  508. const response = await fetch('https://baijiahao.baidu.com/pcui/article/lists?start={cursor}&count={page_size}&article_type=video', {{
  509. method: 'GET',
  510. credentials: 'include',
  511. headers: {{
  512. 'Accept': 'application/json'
  513. }}
  514. }});
  515. return await response.json();
  516. }} catch (e) {{
  517. return {{ error: e.message }};
  518. }}
  519. }}
  520. ''')
  521. print(f"[{self.platform_name}] API 响应: {json.dumps(api_result, ensure_ascii=False)[:200]}")
  522. if api_result.get('errno') == 0:
  523. article_list = api_result.get('data', {}).get('article_list', [])
  524. has_more = api_result.get('data', {}).get('has_more', False)
  525. for article in article_list:
  526. work_id = str(article.get('article_id', ''))
  527. if not work_id:
  528. continue
  529. works.append(WorkItem(
  530. work_id=work_id,
  531. title=article.get('title', ''),
  532. cover_url=article.get('cover_images', [''])[0] if article.get('cover_images') else '',
  533. duration=0,
  534. status='published',
  535. publish_time=article.get('publish_time', ''),
  536. play_count=int(article.get('read_count', 0)),
  537. like_count=int(article.get('like_count', 0)),
  538. comment_count=int(article.get('comment_count', 0)),
  539. share_count=int(article.get('share_count', 0)),
  540. ))
  541. total = len(works)
  542. print(f"[{self.platform_name}] 获取到 {total} 个作品")
  543. except Exception as e:
  544. import traceback
  545. traceback.print_exc()
  546. return WorksResult(
  547. success=False,
  548. platform=self.platform_name,
  549. error=str(e)
  550. )
  551. finally:
  552. await self.close_browser()
  553. return WorksResult(
  554. success=True,
  555. platform=self.platform_name,
  556. works=works,
  557. total=total,
  558. has_more=has_more
  559. )
  560. async def get_comments(self, cookies: str, work_id: str, cursor: str = "") -> CommentsResult:
  561. """获取百家号作品评论"""
  562. # TODO: 实现评论获取逻辑
  563. return CommentsResult(
  564. success=False,
  565. platform=self.platform_name,
  566. work_id=work_id,
  567. error="百家号评论功能暂未实现"
  568. )