| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213 |
- # -*- coding: utf-8 -*-
- """
- 百家号视频发布器
- """
- import asyncio
- import json
- from typing import List
- from datetime import datetime
- from .base import (
- BasePublisher, PublishParams, PublishResult,
- WorkItem, WorksResult, CommentItem, CommentsResult
- )
- class BaijiahaoPublisher(BasePublisher):
- """
- 百家号视频发布器
- 使用 Playwright 自动化操作百家号创作者中心
- """
-
- platform_name = "baijiahao"
- login_url = "https://baijiahao.baidu.com/"
- publish_url = "https://baijiahao.baidu.com/builder/rc/edit?type=video"
- cookie_domain = ".baidu.com"
-
- # 登录检测配置
- login_check_url = "https://baijiahao.baidu.com/builder/rc/home"
- login_indicators = ["passport.baidu.com", "/login", "wappass.baidu.com"]
- login_selectors = ['text="登录"', 'text="请登录"', '[class*="login-btn"]']
-
- async def get_account_info(self, cookies: str) -> dict:
- """
- 获取百家号账号信息
- 使用直接 HTTP API 调用,不使用浏览器
- """
- import aiohttp
-
- print(f"\n{'='*60}")
- print(f"[{self.platform_name}] 获取账号信息 (使用 API)")
- print(f"{'='*60}")
-
- try:
- # 解析 cookies
- cookie_list = self.parse_cookies(cookies)
- cookie_dict = {c['name']: c['value'] for c in cookie_list}
-
- # 重要:百家号需要先访问主页建立会话上下文
- print(f"[{self.platform_name}] 第一步:访问主页建立会话...")
- session_headers = {
- 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7',
- 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
- # Cookie 由 session 管理,不手动设置
- 'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
- 'Accept-Encoding': 'gzip, deflate, br',
- 'Connection': 'keep-alive',
- 'Upgrade-Insecure-Requests': '1',
- 'Sec-Fetch-Dest': 'document',
- 'Sec-Fetch-Mode': 'navigate',
- 'Sec-Fetch-Site': 'none',
- 'Sec-Fetch-User': '?1',
- 'sec-ch-ua': '"Not_A Brand";v="8", "Chromium";v="120", "Google Chrome";v="120"',
- 'sec-ch-ua-mobile': '?0',
- 'sec-ch-ua-platform': '"Windows"'
- }
-
- headers = {
- 'Accept': 'application/json, text/plain, */*',
- 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
- # Cookie 由 session 管理,不手动设置
- 'Referer': 'https://baijiahao.baidu.com/builder/rc/home',
- 'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
- 'Accept-Encoding': 'gzip, deflate, br',
- 'Connection': 'keep-alive',
- 'Sec-Fetch-Dest': 'empty',
- 'Sec-Fetch-Mode': 'cors',
- 'Sec-Fetch-Site': 'same-origin',
- 'sec-ch-ua': '"Not_A Brand";v="8", "Chromium";v="120", "Google Chrome";v="120"',
- 'sec-ch-ua-mobile': '?0',
- 'sec-ch-ua-platform': '"Windows"'
- }
-
- # 使用 cookies 参数初始化 session,让 aiohttp 自动管理 cookie 更新
- async with aiohttp.ClientSession(cookies=cookie_dict) as session:
- # 步骤 0: 先访问主页建立会话上下文(关键步骤!)
- print(f"[{self.platform_name}] [0/4] 访问主页建立会话上下文...")
- async with session.get(
- 'https://baijiahao.baidu.com/builder/rc/home',
- headers=session_headers,
- timeout=aiohttp.ClientTimeout(total=30)
- ) as home_response:
- home_status = home_response.status
- print(f"[{self.platform_name}] 主页访问状态: {home_status}")
-
- # 获取响应头中的新cookies(如果有)
- if 'Set-Cookie' in home_response.headers:
- new_cookies = home_response.headers['Set-Cookie']
- print(f"[{self.platform_name}] 获取到新的会话Cookie")
- # 这里可以处理新的cookies,但暂时跳过复杂处理
-
- # 短暂等待确保会话建立
- await asyncio.sleep(1)
-
- # 步骤 1: 获取账号基本信息
- print(f"[{self.platform_name}] [1/4] 调用 appinfo API...")
- async with session.get(
- 'https://baijiahao.baidu.com/builder/app/appinfo',
- headers=headers,
- timeout=aiohttp.ClientTimeout(total=30)
- ) as response:
- appinfo_result = await response.json()
-
- print(f"[{self.platform_name}] appinfo API 完整响应: {json.dumps(appinfo_result, ensure_ascii=False)[:500]}")
- print(f"[{self.platform_name}] appinfo API 响应: errno={appinfo_result.get('errno')}")
-
- # 检查登录状态
- if appinfo_result.get('errno') != 0:
- error_msg = appinfo_result.get('errmsg', '未知错误')
- errno = appinfo_result.get('errno')
- print(f"[{self.platform_name}] API 返回错误: errno={errno}, msg={error_msg}")
-
- # errno 110 表示未登录
- if errno == 110:
- return {
- "success": False,
- "error": "Cookie 已失效,需要重新登录",
- "need_login": True
- }
-
- # errno 10001402 表示分散认证问题,尝试重新访问主页后重试
- if errno == 10001402:
- print(f"[{self.platform_name}] 检测到分散认证问题,尝试重新访问主页...")
- await asyncio.sleep(2)
-
- # 重新访问主页
- async with session.get(
- 'https://baijiahao.baidu.com/builder/rc/home',
- headers=session_headers,
- timeout=aiohttp.ClientTimeout(total=30)
- ) as retry_home_response:
- print(f"[{self.platform_name}] 重新访问主页状态: {retry_home_response.status}")
-
- await asyncio.sleep(1)
-
- # 重试 API 调用
- async with session.get(
- 'https://baijiahao.baidu.com/builder/app/appinfo',
- headers=headers,
- timeout=aiohttp.ClientTimeout(total=30)
- ) as retry_response:
- retry_result = await retry_response.json()
-
- if retry_result.get('errno') == 0:
- print(f"[{self.platform_name}] 分散认证问题已解决")
- # 使用重试成功的结果继续处理
- appinfo_result = retry_result
- else:
- print(f"[{self.platform_name}] 重试仍然失败")
- return {
- "success": False,
- "error": f"分散认证问题: {error_msg}",
- "need_login": True
- }
-
- return {
- "success": False,
- "error": error_msg,
- "need_login": True
- }
-
- # 获取用户数据
- user_data = appinfo_result.get('data', {}).get('user', {})
- if not user_data:
- return {
- "success": False,
- "error": "无法获取用户信息",
- "need_login": True
- }
-
- # 检查账号状态
- status = user_data.get('status', '')
- # 有效的账号状态:audit(审核中), pass(已通过), normal(正常), newbie(新手)
- valid_statuses = ['audit', 'pass', 'normal', 'newbie']
- if status not in valid_statuses:
- print(f"[{self.platform_name}] 账号状态异常: {status}")
-
- # 提取基本信息
- account_name = user_data.get('name') or user_data.get('uname') or '百家号账号'
- app_id = user_data.get('app_id') or user_data.get('id', 0)
- account_id = str(app_id) if app_id else f"baijiahao_{int(datetime.now().timestamp() * 1000)}"
-
- # 处理头像 URL
- avatar_url = user_data.get('avatar') or user_data.get('avatar_unify', '')
- if avatar_url and avatar_url.startswith('//'):
- avatar_url = 'https:' + avatar_url
-
- print(f"[{self.platform_name}] 账号名称: {account_name}, ID: {account_id}")
-
- # 步骤 2: 获取粉丝数(非关键,失败不影响整体)
- fans_count = 0
- try:
- print(f"[{self.platform_name}] [2/3] 调用 growth/get_info API 获取粉丝数...")
- async with session.get(
- 'https://baijiahao.baidu.com/cms-ui/rights/growth/get_info',
- headers=headers,
- timeout=aiohttp.ClientTimeout(total=10)
- ) as response:
- growth_result = await response.json()
-
- if growth_result.get('errno') == 0:
- growth_data = growth_result.get('data', {})
- fans_count = int(growth_data.get('fans_num', 0))
- print(f"[{self.platform_name}] 粉丝数: {fans_count}")
- else:
- print(f"[{self.platform_name}] 获取粉丝数失败: {growth_result.get('errmsg')}")
- except Exception as e:
- print(f"[{self.platform_name}] 获取粉丝数异常(非关键): {e}")
-
- # 步骤 3: 获取作品数量(使用与 Node 端一致的 API)
- works_count = 0
- try:
- print(f"[{self.platform_name}] [3/3] 调用 article/lists API 获取作品数...")
-
- # 使用与 Node 端一致的 API 参数
- list_url = 'https://baijiahao.baidu.com/pcui/article/lists?currentPage=1&pageSize=20&search=&type=&collection=&startDate=&endDate=&clearBeforeFetch=false&dynamic=0'
-
- async with session.get(
- list_url,
- headers={
- 'accept': '*/*',
- 'user-agent': 'PostmanRuntime/7.51.0',
- # cookie 由 session 管理
- 'referer': 'https://baijiahao.baidu.com/builder/rc/content',
- 'connection': 'keep-alive',
- 'accept-encoding': 'gzip, deflate, br',
- },
- timeout=aiohttp.ClientTimeout(total=30)
- ) as response:
- response_text = await response.text()
- print(f"[{self.platform_name}] ========== Works API Response ==========")
- print(f"[{self.platform_name}] Full response: {response_text[:1000]}...") # 只打印前1000字符
- print(f"[{self.platform_name}] =========================================")
-
- works_result = json.loads(response_text)
-
- # 处理分散认证问题 (errno=10001402),重试一次
- if works_result.get('errno') == 10001402:
- print(f"[{self.platform_name}] 分散认证问题 (errno=10001402),3秒后重试...")
- await asyncio.sleep(3)
-
- # 重试一次,使用更完整的请求头
- retry_headers = headers.copy()
- retry_headers.update({
- 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7',
- 'Cache-Control': 'max-age=0',
- 'Upgrade-Insecure-Requests': '1',
- })
-
- async with session.get(
- list_url,
- headers=retry_headers,
- timeout=aiohttp.ClientTimeout(total=30)
- ) as retry_response:
- retry_text = await retry_response.text()
- print(f"[{self.platform_name}] ========== Works API Retry Response ==========")
- print(f"[{self.platform_name}] Full retry response: {retry_text[:1000]}...")
- print(f"[{self.platform_name}] ===============================================")
-
- works_result = json.loads(retry_text)
-
- if works_result.get('errno') == 10001402:
- print(f"[{self.platform_name}] 重试仍然失败,返回已获取的账号信息")
- works_result = None
-
- if works_result and works_result.get('errno') == 0:
- works_data = works_result.get('data', {})
- # 优先使用 data.page.totalCount,如果没有则使用 data.total(兼容旧格式)
- page_info = works_data.get('page', {})
- works_count = int(page_info.get('totalCount', works_data.get('total', 0)))
- print(f"[{self.platform_name}] 作品数: {works_count} (from page.totalCount: {page_info.get('totalCount')}, from total: {works_data.get('total')})")
- else:
- errno = works_result.get('errno') if works_result else 'unknown'
- errmsg = works_result.get('errmsg', 'unknown error') if works_result else 'no response'
- print(f"[{self.platform_name}] 获取作品数失败: errno={errno}, errmsg={errmsg}")
- except Exception as e:
- import traceback
- print(f"[{self.platform_name}] 获取作品数异常(非关键): {e}")
- traceback.print_exc()
-
- # 返回账号信息
- account_info = {
- "success": True,
- "account_id": account_id,
- "account_name": account_name,
- "avatar_url": avatar_url,
- "fans_count": fans_count,
- "works_count": works_count,
- }
-
- print(f"[{self.platform_name}] ✓ 获取成功: {account_name} (粉丝: {fans_count}, 作品: {works_count})")
- return account_info
-
- except Exception as e:
- import traceback
- traceback.print_exc()
- return {
- "success": False,
- "error": str(e)
- }
-
- async def check_captcha(self) -> dict:
- """检查页面是否需要验证码"""
- if not self.page:
- return {'need_captcha': False, 'captcha_type': ''}
-
- try:
- # 检查各种验证码
- captcha_selectors = [
- 'text="请输入验证码"',
- 'text="滑动验证"',
- '[class*="captcha"]',
- '[class*="verify"]',
- ]
- for selector in captcha_selectors:
- try:
- if await self.page.locator(selector).count() > 0:
- print(f"[{self.platform_name}] 检测到验证码: {selector}")
- return {'need_captcha': True, 'captcha_type': 'image'}
- except:
- pass
-
- # 检查登录弹窗
- login_selectors = [
- 'text="请登录"',
- 'text="登录后继续"',
- '[class*="login-dialog"]',
- ]
- for selector in login_selectors:
- try:
- if await self.page.locator(selector).count() > 0:
- print(f"[{self.platform_name}] 检测到需要登录: {selector}")
- return {'need_captcha': True, 'captcha_type': 'login'}
- except:
- pass
-
- except Exception as e:
- print(f"[{self.platform_name}] 验证码检测异常: {e}")
-
- return {'need_captcha': False, 'captcha_type': ''}
- async def publish(self, cookies: str, params: PublishParams) -> PublishResult:
- """发布视频到百家号"""
- import os
-
- print(f"\n{'='*60}")
- print(f"[{self.platform_name}] 开始发布视频")
- print(f"[{self.platform_name}] 视频路径: {params.video_path}")
- print(f"[{self.platform_name}] 标题: {params.title}")
- print(f"[{self.platform_name}] Headless: {self.headless}")
- print(f"{'='*60}")
-
- self.report_progress(5, "正在初始化浏览器...")
-
- # 初始化浏览器
- await self.init_browser()
- print(f"[{self.platform_name}] 浏览器初始化完成")
-
- # 解析并设置 cookies
- cookie_list = self.parse_cookies(cookies)
- print(f"[{self.platform_name}] 解析到 {len(cookie_list)} 个 cookies")
- await self.set_cookies(cookie_list)
-
- if not self.page:
- raise Exception("Page not initialized")
-
- # 检查视频文件
- if not os.path.exists(params.video_path):
- raise Exception(f"视频文件不存在: {params.video_path}")
-
- print(f"[{self.platform_name}] 视频文件存在,大小: {os.path.getsize(params.video_path)} bytes")
-
- self.report_progress(10, "正在打开上传页面...")
-
- # 访问视频发布页面(使用新视频发布界面)
- video_publish_url = "https://baijiahao.baidu.com/builder/rc/edit?type=videoV2&is_from_cms=1"
- await self.page.goto(video_publish_url, wait_until="domcontentloaded", timeout=60000)
- await asyncio.sleep(3)
-
- # 检查是否跳转到登录页
- current_url = self.page.url
- print(f"[{self.platform_name}] 当前页面: {current_url}")
-
- for indicator in self.login_indicators:
- if indicator in current_url:
- screenshot_base64 = await self.capture_screenshot()
- return PublishResult(
- success=False,
- platform=self.platform_name,
- error="Cookie 已过期,需要重新登录",
- need_captcha=True,
- captcha_type='login',
- screenshot_base64=screenshot_base64,
- page_url=current_url,
- status='need_captcha'
- )
-
- # 使用 AI 检查验证码
- ai_captcha = await self.ai_check_captcha()
- if ai_captcha['has_captcha']:
- print(f"[{self.platform_name}] AI检测到验证码: {ai_captcha['captcha_type']}", flush=True)
- screenshot_base64 = await self.capture_screenshot()
- return PublishResult(
- success=False,
- platform=self.platform_name,
- error=f"检测到{ai_captcha['captcha_type']}验证码,需要使用有头浏览器完成验证",
- need_captcha=True,
- captcha_type=ai_captcha['captcha_type'],
- screenshot_base64=screenshot_base64,
- page_url=current_url,
- status='need_captcha'
- )
-
- # 传统方式检查验证码
- captcha_result = await self.check_captcha()
- if captcha_result['need_captcha']:
- screenshot_base64 = await self.capture_screenshot()
- return PublishResult(
- success=False,
- platform=self.platform_name,
- error=f"需要{captcha_result['captcha_type']}验证码,请使用有头浏览器完成验证",
- need_captcha=True,
- captcha_type=captcha_result['captcha_type'],
- screenshot_base64=screenshot_base64,
- page_url=current_url,
- status='need_captcha'
- )
-
- self.report_progress(15, "正在选择视频文件...")
-
- # 等待页面加载完成
- await asyncio.sleep(2)
-
- # 关闭可能的弹窗
- try:
- close_buttons = [
- 'button:has-text("我知道了")',
- 'button:has-text("知道了")',
- '[class*="close"]',
- '[class*="modal-close"]',
- ]
- for btn_selector in close_buttons:
- try:
- btn = self.page.locator(btn_selector).first
- if await btn.count() > 0 and await btn.is_visible():
- await btn.click()
- await asyncio.sleep(0.5)
- except:
- pass
- except:
- pass
-
- # 上传视频 - 尝试多种方式
- upload_success = False
-
- # 方法1: 直接通过 file input 上传
- try:
- file_inputs = await self.page.query_selector_all('input[type="file"]')
- print(f"[{self.platform_name}] 找到 {len(file_inputs)} 个文件输入")
-
- for file_input in file_inputs:
- try:
- await file_input.set_input_files(params.video_path)
- upload_success = True
- print(f"[{self.platform_name}] 通过 file input 上传成功")
- break
- except Exception as e:
- print(f"[{self.platform_name}] file input 上传失败: {e}")
- except Exception as e:
- print(f"[{self.platform_name}] 查找 file input 失败: {e}")
-
- # 方法2: 点击上传区域
- if not upload_success:
- upload_selectors = [
- 'div[class*="upload-box"]',
- 'div[class*="drag-upload"]',
- 'div[class*="uploader"]',
- 'div:has-text("点击上传")',
- 'div:has-text("选择文件")',
- '[class*="upload-area"]',
- ]
-
- for selector in upload_selectors:
- if upload_success:
- break
- try:
- upload_area = self.page.locator(selector).first
- if await upload_area.count() > 0:
- print(f"[{self.platform_name}] 尝试点击上传区域: {selector}")
- async with self.page.expect_file_chooser(timeout=10000) as fc_info:
- await upload_area.click()
- file_chooser = await fc_info.value
- await file_chooser.set_files(params.video_path)
- upload_success = True
- print(f"[{self.platform_name}] 通过点击上传区域成功")
- break
- except Exception as e:
- print(f"[{self.platform_name}] 选择器 {selector} 失败: {e}")
-
- if not upload_success:
- screenshot_base64 = await self.capture_screenshot()
- return PublishResult(
- success=False,
- platform=self.platform_name,
- error="未找到上传入口",
- screenshot_base64=screenshot_base64,
- page_url=await self.get_page_url(),
- status='failed'
- )
-
- self.report_progress(20, "等待视频上传...")
-
- # 等待视频上传完成(最多5分钟)
- upload_timeout = 300
- start_time = asyncio.get_event_loop().time()
-
- while asyncio.get_event_loop().time() - start_time < upload_timeout:
- # 检查上传进度
- progress_text = ''
- try:
- progress_el = self.page.locator('[class*="progress"], [class*="percent"]').first
- if await progress_el.count() > 0:
- progress_text = await progress_el.text_content()
- if progress_text:
- import re
- match = re.search(r'(\d+)%', progress_text)
- if match:
- pct = int(match.group(1))
- self.report_progress(20 + int(pct * 0.4), f"视频上传中 {pct}%...")
- if pct >= 100:
- print(f"[{self.platform_name}] 上传完成")
- break
- except:
- pass
-
- # 检查是否出现标题输入框(说明上传完成)
- try:
- title_input = self.page.locator('input[placeholder*="标题"], textarea[placeholder*="标题"], [class*="title-input"] input').first
- if await title_input.count() > 0 and await title_input.is_visible():
- print(f"[{self.platform_name}] 检测到标题输入框,上传完成")
- break
- except:
- pass
-
- # 检查是否有错误提示
- try:
- error_el = self.page.locator('[class*="error"], [class*="fail"]').first
- if await error_el.count() > 0:
- error_text = await error_el.text_content()
- if error_text and ('失败' in error_text or '错误' in error_text):
- raise Exception(f"上传失败: {error_text}")
- except:
- pass
-
- await asyncio.sleep(3)
-
- self.report_progress(60, "正在填写标题...")
- await asyncio.sleep(2)
-
- # 填写标题
- title_filled = False
- title_selectors = [
- 'input[placeholder*="标题"]',
- 'textarea[placeholder*="标题"]',
- '[class*="title-input"] input',
- '[class*="title"] input',
- 'input[maxlength]',
- ]
-
- for selector in title_selectors:
- if title_filled:
- break
- try:
- title_input = self.page.locator(selector).first
- if await title_input.count() > 0 and await title_input.is_visible():
- await title_input.click()
- await self.page.keyboard.press("Control+KeyA")
- await self.page.keyboard.type(params.title[:30]) # 百家号标题限制30字
- title_filled = True
- print(f"[{self.platform_name}] 标题填写成功")
- except Exception as e:
- print(f"[{self.platform_name}] 标题选择器 {selector} 失败: {e}")
-
- if not title_filled:
- print(f"[{self.platform_name}] 警告: 未能填写标题")
-
- # 填写描述
- if params.description:
- self.report_progress(65, "正在填写描述...")
- try:
- desc_selectors = [
- 'textarea[placeholder*="描述"]',
- 'textarea[placeholder*="简介"]',
- '[class*="desc"] textarea',
- '[class*="description"] textarea',
- ]
- for selector in desc_selectors:
- try:
- desc_input = self.page.locator(selector).first
- if await desc_input.count() > 0 and await desc_input.is_visible():
- await desc_input.click()
- await self.page.keyboard.type(params.description[:200])
- print(f"[{self.platform_name}] 描述填写成功")
- break
- except:
- pass
- except Exception as e:
- print(f"[{self.platform_name}] 描述填写失败: {e}")
-
- self.report_progress(70, "正在发布...")
- await asyncio.sleep(2)
-
- # 点击发布按钮
- publish_selectors = [
- 'button:has-text("发布")',
- 'button:has-text("发表")',
- 'button:has-text("提交")',
- '[class*="publish"] button',
- '[class*="submit"] button',
- ]
-
- publish_clicked = False
- for selector in publish_selectors:
- if publish_clicked:
- break
- try:
- btn = self.page.locator(selector).first
- if await btn.count() > 0 and await btn.is_visible():
- # 检查按钮是否可用
- is_disabled = await btn.get_attribute('disabled')
- if is_disabled:
- print(f"[{self.platform_name}] 按钮 {selector} 被禁用")
- continue
-
- await btn.click()
- publish_clicked = True
- print(f"[{self.platform_name}] 点击发布按钮成功")
- except Exception as e:
- print(f"[{self.platform_name}] 发布按钮 {selector} 失败: {e}")
-
- if not publish_clicked:
- screenshot_base64 = await self.capture_screenshot()
- return PublishResult(
- success=False,
- platform=self.platform_name,
- error="未找到发布按钮",
- screenshot_base64=screenshot_base64,
- page_url=await self.get_page_url(),
- status='failed'
- )
-
- self.report_progress(80, "等待发布完成...")
-
- # 记录点击发布前的 URL
- publish_page_url = self.page.url
- print(f"[{self.platform_name}] 发布前 URL: {publish_page_url}")
-
- # 等待发布完成(最多3分钟)
- publish_timeout = 180
- start_time = asyncio.get_event_loop().time()
- last_url = publish_page_url
-
- while asyncio.get_event_loop().time() - start_time < publish_timeout:
- await asyncio.sleep(3)
- current_url = self.page.url
-
- # 检测 URL 是否发生变化
- if current_url != last_url:
- print(f"[{self.platform_name}] URL 变化: {last_url} -> {current_url}")
- last_url = current_url
-
- # 检查是否跳转到内容管理页面(真正的成功标志)
- # 百家号发布成功后会跳转到 /builder/rc/content 页面
- if '/builder/rc/content' in current_url and 'edit' not in current_url:
- self.report_progress(100, "发布成功!")
- print(f"[{self.platform_name}] 发布成功,已跳转到内容管理页: {current_url}")
- screenshot_base64 = await self.capture_screenshot()
- return PublishResult(
- success=True,
- platform=self.platform_name,
- message="发布成功",
- screenshot_base64=screenshot_base64,
- page_url=current_url,
- status='success'
- )
-
- # 检查是否有明确的成功提示弹窗
- try:
- # 百家号发布成功会显示"发布成功"弹窗
- success_modal = self.page.locator('div:has-text("发布成功"), div:has-text("提交成功"), div:has-text("视频发布成功")').first
- if await success_modal.count() > 0 and await success_modal.is_visible():
- self.report_progress(100, "发布成功!")
- print(f"[{self.platform_name}] 检测到发布成功弹窗")
- screenshot_base64 = await self.capture_screenshot()
-
- # 等待一下看是否会跳转
- await asyncio.sleep(3)
-
- return PublishResult(
- success=True,
- platform=self.platform_name,
- message="发布成功",
- screenshot_base64=screenshot_base64,
- page_url=self.page.url,
- status='success'
- )
- except Exception as e:
- print(f"[{self.platform_name}] 检测成功提示异常: {e}")
-
- # 检查是否有错误提示
- try:
- error_selectors = [
- 'div.error-tip',
- 'div[class*="error-msg"]',
- 'span[class*="error"]',
- 'div:has-text("发布失败")',
- 'div:has-text("提交失败")',
- ]
- for error_selector in error_selectors:
- error_el = self.page.locator(error_selector).first
- if await error_el.count() > 0 and await error_el.is_visible():
- error_text = await error_el.text_content()
- if error_text and error_text.strip():
- print(f"[{self.platform_name}] 检测到错误: {error_text}")
- screenshot_base64 = await self.capture_screenshot()
- return PublishResult(
- success=False,
- platform=self.platform_name,
- error=f"发布失败: {error_text.strip()}",
- screenshot_base64=screenshot_base64,
- page_url=current_url,
- status='failed'
- )
- except Exception as e:
- print(f"[{self.platform_name}] 检测错误提示异常: {e}")
-
- # 检查验证码
- captcha_result = await self.check_captcha()
- if captcha_result['need_captcha']:
- screenshot_base64 = await self.capture_screenshot()
- return PublishResult(
- success=False,
- platform=self.platform_name,
- error=f"发布过程中需要{captcha_result['captcha_type']}验证码",
- need_captcha=True,
- captcha_type=captcha_result['captcha_type'],
- screenshot_base64=screenshot_base64,
- page_url=current_url,
- status='need_captcha'
- )
-
- # 检查发布按钮状态(如果还在编辑页面)
- if 'edit' in current_url:
- try:
- # 检查是否正在上传/处理中
- processing_indicators = [
- '[class*="loading"]',
- '[class*="uploading"]',
- '[class*="processing"]',
- 'div:has-text("正在上传")',
- 'div:has-text("正在处理")',
- ]
- is_processing = False
- for indicator in processing_indicators:
- if await self.page.locator(indicator).count() > 0:
- is_processing = True
- print(f"[{self.platform_name}] 正在处理中...")
- break
-
- if not is_processing:
- # 如果不是在处理中,可能需要重新点击发布按钮
- elapsed = asyncio.get_event_loop().time() - start_time
- if elapsed > 30: # 30秒后还在编辑页且不在处理中,可能发布没生效
- print(f"[{self.platform_name}] 发布似乎未生效,尝试重新点击发布按钮...")
- for selector in publish_selectors:
- try:
- btn = self.page.locator(selector).first
- if await btn.count() > 0 and await btn.is_visible():
- is_disabled = await btn.get_attribute('disabled')
- if not is_disabled:
- await btn.click()
- print(f"[{self.platform_name}] 重新点击发布按钮")
- break
- except:
- pass
- except Exception as e:
- print(f"[{self.platform_name}] 检查处理状态异常: {e}")
-
- # 超时,获取截图分析最终状态
- print(f"[{self.platform_name}] 发布超时,最终 URL: {self.page.url}")
- screenshot_base64 = await self.capture_screenshot()
-
- # 最后一次检查是否在内容管理页
- final_url = self.page.url
- if '/builder/rc/content' in final_url and 'edit' not in final_url:
- return PublishResult(
- success=True,
- platform=self.platform_name,
- message="发布成功(延迟确认)",
- screenshot_base64=screenshot_base64,
- page_url=final_url,
- status='success'
- )
-
- return PublishResult(
- success=False,
- platform=self.platform_name,
- error="发布超时,请手动检查发布状态",
- screenshot_base64=screenshot_base64,
- page_url=final_url,
- status='need_action'
- )
-
- async def get_works(self, cookies: str, page: int = 0, page_size: int = 20) -> WorksResult:
- """
- 获取百家号作品列表
- 优先使用内容管理页的接口(pcui/article/lists)。
- 说明:
- - 该接口通常需要自定义请求头 token(JWT),仅靠 Cookie 可能会返回“未登录”
- - 这里使用 Playwright 打开内容页,从 localStorage/sessionStorage/页面脚本中自动提取 token,
- 再在页面上下文中发起 fetch(携带 cookie + token),以提高成功率
- """
- import re
-
- print(f"\n{'='*60}")
- print(f"[{self.platform_name}] 获取作品列表 (使用 API)")
- print(f"[{self.platform_name}] page={page}, page_size={page_size}")
- print(f"{'='*60}")
-
- works: List[WorkItem] = []
- total = 0
- has_more = False
- next_page = ""
-
- try:
- # 解析并设置 cookies(Playwright)
- cookie_list = self.parse_cookies(cookies)
- await self.init_browser()
- await self.set_cookies(cookie_list)
- if not self.page:
- raise Exception("Page not initialized")
- # 先打开内容管理页,确保本页 Referer/会话就绪
- # Node 侧传 page=0,1,...;接口 currentPage 为 1,2,...
- current_page = int(page) + 1
- page_size = int(page_size)
- content_url = (
- "https://baijiahao.baidu.com/builder/rc/content"
- f"?currentPage={current_page}&pageSize={page_size}"
- "&search=&type=&collection=&startDate=&endDate="
- )
- await self.page.goto(content_url, wait_until="domcontentloaded", timeout=60000)
- await asyncio.sleep(2)
- # 1) 提取 token(JWT)
- token = await self.page.evaluate(
- """
- () => {
- const isJwtLike = (v) => {
- if (!v || typeof v !== 'string') return false;
- const s = v.trim();
- if (s.length < 60) return false;
- const parts = s.split('.');
- if (parts.length !== 3) return false;
- return parts.every(p => /^[A-Za-z0-9_-]+$/.test(p) && p.length > 10);
- };
- const pickFromStorage = (storage) => {
- try {
- const keys = Object.keys(storage || {});
- for (const k of keys) {
- const v = storage.getItem(k);
- if (isJwtLike(v)) return v;
- }
- } catch {}
- return "";
- };
- // localStorage / sessionStorage
- let t = pickFromStorage(window.localStorage);
- if (t) return t;
- t = pickFromStorage(window.sessionStorage);
- if (t) return t;
- // meta 标签
- const meta = document.querySelector('meta[name="token"], meta[name="bjh-token"]');
- const metaToken = meta && meta.getAttribute('content');
- if (isJwtLike(metaToken)) return metaToken;
- // 简单从全局变量里找
- const candidates = [
- (window.__INITIAL_STATE__ && window.__INITIAL_STATE__.token) || "",
- (window.__PRELOADED_STATE__ && window.__PRELOADED_STATE__.token) || "",
- (window.__NUXT__ && window.__NUXT__.state && window.__NUXT__.state.token) || "",
- ];
- for (const c of candidates) {
- if (isJwtLike(c)) return c;
- }
- return "";
- }
- """
- )
- # 2) 若仍未取到 token,再从页面 HTML 兜底提取
- if not token:
- html = await self.page.content()
- m = re.search(r'([A-Za-z0-9_-]{10,}\.[A-Za-z0-9_-]{10,}\.[A-Za-z0-9_-]{10,})', html)
- if m:
- token = m.group(1)
- if not token:
- raise Exception("未能从页面提取 token(可能未登录或触发风控),请重新登录百家号账号后再试")
- # 3) 调用接口(在页面上下文 fetch,自动携带 cookie)
- api_url = (
- "https://baijiahao.baidu.com/pcui/article/lists"
- f"?currentPage={current_page}"
- f"&pageSize={page_size}"
- "&search=&type=&collection=&startDate=&endDate="
- "&clearBeforeFetch=false"
- "&dynamic=1"
- )
- resp = await self.page.evaluate(
- """
- async ({ url, token }) => {
- const r = await fetch(url, {
- method: 'GET',
- credentials: 'include',
- headers: {
- 'accept': 'application/json, text/plain, */*',
- ...(token ? { token } : {}),
- },
- });
- const text = await r.text();
- return { ok: r.ok, status: r.status, text };
- }
- """,
- {"url": api_url, "token": token},
- )
- if not resp or not resp.get("ok"):
- status = resp.get("status") if isinstance(resp, dict) else "unknown"
- raise Exception(f"百家号接口请求失败: HTTP {status}")
- api_result = json.loads(resp.get("text") or "{}")
- print(f"[{self.platform_name}] pcui/article/lists 响应: errno={api_result.get('errno')}, errmsg={api_result.get('errmsg')}")
- if api_result.get("errno") != 0:
- errno = api_result.get("errno")
- errmsg = api_result.get("errmsg", "unknown error")
- # 20040001 常见为“未登录”
- if errno in (110, 20040001):
- raise Exception("百家号未登录或 Cookie/token 失效,请重新登录后再同步")
- raise Exception(f"百家号接口错误: errno={errno}, errmsg={errmsg}")
- data = api_result.get("data", {}) or {}
- items = data.get("list", []) or []
- page_info = data.get("page", {}) or {}
- total = int(page_info.get("totalCount", 0) or 0)
- total_page = int(page_info.get("totalPage", 0) or 0)
- cur_page = int(page_info.get("currentPage", current_page) or current_page)
- has_more = bool(total_page and cur_page < total_page)
- next_page = cur_page + 1 if has_more else ""
- print(f"[{self.platform_name}] 获取到 {len(items)} 个作品,总数: {total}, currentPage={cur_page}, totalPage={total_page}")
- def _pick_cover(item: dict) -> str:
- cover = item.get("crosswise_cover") or item.get("vertical_cover") or ""
- if cover:
- return cover
- raw = item.get("cover_images") or ""
- try:
- # cover_images 可能是 JSON 字符串
- parsed = json.loads(raw) if isinstance(raw, str) else raw
- if isinstance(parsed, list) and parsed:
- first = parsed[0]
- if isinstance(first, dict):
- return first.get("src") or first.get("ori_src") or ""
- if isinstance(first, str):
- return first
- except Exception:
- pass
- return ""
- def _pick_duration(item: dict) -> int:
- for k in ("rmb_duration", "duration", "long"):
- try:
- v = int(item.get(k) or 0)
- if v > 0:
- return v
- except Exception:
- pass
- # displaytype_exinfo 里可能有 ugcvideo.video_info.durationInSecond
- ex = item.get("displaytype_exinfo") or ""
- try:
- exj = json.loads(ex) if isinstance(ex, str) and ex else (ex if isinstance(ex, dict) else {})
- ugc = (exj.get("ugcvideo") or {}) if isinstance(exj, dict) else {}
- vi = ugc.get("video_info") or {}
- v = int(vi.get("durationInSecond") or ugc.get("long") or 0)
- return v if v > 0 else 0
- except Exception:
- return 0
- def _pick_status(item: dict) -> str:
- qs = str(item.get("quality_status") or "").lower()
- st = str(item.get("status") or "").lower()
- if qs == "rejected" or "reject" in st:
- return "rejected"
- if st in ("draft", "unpublish", "unpublished"):
- return "draft"
- # 百家号常见 publish
- return "published"
- for item in items:
- # 优先使用 nid(builder 预览链接使用这个)
- work_id = str(item.get("nid") or item.get("feed_id") or item.get("article_id") or item.get("id") or "")
- if not work_id:
- continue
- works.append(
- WorkItem(
- work_id=work_id,
- title=str(item.get("title") or ""),
- cover_url=_pick_cover(item),
- video_url=str(item.get("url") or ""),
- duration=_pick_duration(item),
- status=_pick_status(item),
- publish_time=str(item.get("publish_time") or item.get("publish_at") or item.get("created_at") or ""),
- play_count=int(item.get("read_amount") or 0),
- like_count=int(item.get("like_amount") or 0),
- comment_count=int(item.get("comment_amount") or 0),
- share_count=int(item.get("share_amount") or 0),
- collect_count=int(item.get("collection_amount") or 0),
- )
- )
- print(f"[{self.platform_name}] ✓ 成功解析 {len(works)} 个作品")
-
- except Exception as e:
- import traceback
- traceback.print_exc()
- return WorksResult(
- success=False,
- platform=self.platform_name,
- error=str(e),
- debug_info="baijiahao_get_works_failed"
- )
-
- return WorksResult(
- success=True,
- platform=self.platform_name,
- works=works,
- total=total,
- has_more=has_more,
- next_page=next_page
- )
-
- async def check_login_status(self, cookies: str) -> dict:
- """
- 检查百家号 Cookie 登录状态
- 使用直接 HTTP API 调用,不使用浏览器
- """
- import aiohttp
-
- print(f"[{self.platform_name}] 检查登录状态 (使用 API)")
-
- try:
- # 解析 cookies
- cookie_list = self.parse_cookies(cookies)
- cookie_dict = {c['name']: c['value'] for c in cookie_list}
-
- # 重要:百家号需要先访问主页建立会话上下文
- session_headers = {
- 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7',
- 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
- # Cookie 由 session 管理
- 'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
- 'Accept-Encoding': 'gzip, deflate, br',
- 'Connection': 'keep-alive',
- 'Upgrade-Insecure-Requests': '1',
- 'Sec-Fetch-Dest': 'document',
- 'Sec-Fetch-Mode': 'navigate',
- 'Sec-Fetch-Site': 'none',
- 'Sec-Fetch-User': '?1',
- 'sec-ch-ua': '"Not_A Brand";v="8", "Chromium";v="120", "Google Chrome";v="120"',
- 'sec-ch-ua-mobile': '?0',
- 'sec-ch-ua-platform': '"Windows"'
- }
-
- headers = {
- 'Accept': 'application/json, text/plain, */*',
- 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
- # Cookie 由 session 管理
- 'Referer': 'https://baijiahao.baidu.com/builder/rc/home',
- 'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
- 'Accept-Encoding': 'gzip, deflate, br',
- 'Connection': 'keep-alive',
- 'Sec-Fetch-Dest': 'empty',
- 'Sec-Fetch-Mode': 'cors',
- 'Sec-Fetch-Site': 'same-origin',
- 'sec-ch-ua': '"Not_A Brand";v="8", "Chromium";v="120", "Google Chrome";v="120"',
- 'sec-ch-ua-mobile': '?0',
- 'sec-ch-ua-platform': '"Windows"'
- }
-
- async with aiohttp.ClientSession(cookies=cookie_dict) as session:
- # 步骤 0: 先访问主页建立会话上下文(关键步骤!)
- print(f"[{self.platform_name}] [0/2] 访问主页建立会话上下文...")
- async with session.get(
- 'https://baijiahao.baidu.com/builder/rc/home',
- headers=session_headers,
- timeout=aiohttp.ClientTimeout(total=30)
- ) as home_response:
- home_status = home_response.status
- print(f"[{self.platform_name}] 主页访问状态: {home_status}")
-
- # 短暂等待确保会话建立
- await asyncio.sleep(1)
-
- # 步骤 1: 调用 API 检查登录状态
- print(f"[{self.platform_name}] [1/2] 调用 appinfo API 检查登录状态...")
-
- async with session.get(
- 'https://baijiahao.baidu.com/builder/app/appinfo',
- headers=headers,
- timeout=aiohttp.ClientTimeout(total=30)
- ) as response:
- api_result = await response.json()
-
- errno = api_result.get('errno')
- print(f"[{self.platform_name}] API 完整响应: {json.dumps(api_result, ensure_ascii=False)[:500]}")
- print(f"[{self.platform_name}] API 响应: errno={errno}")
-
- # errno 为 0 表示请求成功
- if errno == 0:
- # 检查是否有用户数据
- user_data = api_result.get('data', {}).get('user', {})
- if user_data:
- # 检查账号状态
- status = user_data.get('status', '')
- account_name = user_data.get('name') or user_data.get('uname', '')
-
- # 有效的账号状态:audit(审核中), pass(已通过), normal(正常), newbie(新手)
- valid_statuses = ['audit', 'pass', 'normal', 'newbie']
-
- if status in valid_statuses and account_name:
- print(f"[{self.platform_name}] ✓ 登录状态有效: {account_name} (status={status})")
- return {
- "success": True,
- "valid": True,
- "need_login": False,
- "message": "登录状态有效"
- }
- else:
- print(f"[{self.platform_name}] 账号状态异常: status={status}, name={account_name}")
- return {
- "success": True,
- "valid": False,
- "need_login": True,
- "message": f"账号状态异常: {status}"
- }
- else:
- print(f"[{self.platform_name}] 无用户数据,Cookie 可能无效")
- return {
- "success": True,
- "valid": False,
- "need_login": True,
- "message": "无用户数据"
- }
-
- # errno 非 0 表示请求失败
- # 常见错误码:110 = 未登录
- error_msg = api_result.get('errmsg', '未知错误')
- print(f"[{self.platform_name}] Cookie 无效: errno={errno}, msg={error_msg}")
-
- return {
- "success": True,
- "valid": False,
- "need_login": True,
- "message": error_msg
- }
-
- except Exception as e:
- import traceback
- traceback.print_exc()
- return {
- "success": False,
- "valid": False,
- "need_login": True,
- "error": str(e)
- }
-
- async def get_comments(self, cookies: str, work_id: str, cursor: str = "") -> CommentsResult:
- """获取百家号作品评论"""
- # TODO: 实现评论获取逻辑
- return CommentsResult(
- success=False,
- platform=self.platform_name,
- work_id=work_id,
- error="百家号评论功能暂未实现"
- )
|