| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151215221532154215521562157215821592160216121622163216421652166216721682169217021712172217321742175217621772178217921802181218221832184218521862187218821892190219121922193219421952196219721982199220022012202220322042205220622072208220922102211221222132214221522162217221822192220222122222223222422252226222722282229223022312232223322342235223622372238223922402241224222432244224522462247224822492250225122522253225422552256225722582259226022612262226322642265226622672268226922702271227222732274227522762277227822792280228122822283228422852286228722882289229022912292229322942295229622972298229923002301230223032304230523062307230823092310231123122313231423152316231723182319232023212322232323242325232623272328232923302331233223332334233523362337233823392340234123422343234423452346234723482349235023512352235323542355235623572358235923602361236223632364236523662367236823692370237123722373237423752376237723782379238023812382238323842385238623872388238923902391239223932394239523962397239823992400240124022403240424052406240724082409241024112412241324142415241624172418241924202421242224232424242524262427242824292430243124322433243424352436243724382439244024412442244324442445244624472448244924502451245224532454245524562457245824592460246124622463246424652466246724682469247024712472247324742475247624772478247924802481248224832484248524862487248824892490249124922493249424952496249724982499250025012502250325042505250625072508250925102511251225132514251525162517251825192520252125222523252425252526252725282529253025312532253325342535253625372538253925402541254225432544254525462547254825492550255125522553255425552556255725582559256025612562256325642565256625672568256925702571257225732574257525762577257825792580258125822583258425852586258725882589259025912592259325942595259625972598259926002601260226032604260526062607260826092610261126122613261426152616261726182619262026212622262326242625262626272628262926302631263226332634263526362637263826392640264126422643264426452646264726482649265026512652265326542655265626572658265926602661266226632664266526662667266826692670267126722673267426752676267726782679268026812682268326842685268626872688268926902691269226932694269526962697269826992700270127022703270427052706270727082709271027112712271327142715271627172718271927202721272227232724272527262727272827292730273127322733273427352736273727382739274027412742274327442745274627472748274927502751275227532754275527562757275827592760276127622763276427652766276727682769277027712772277327742775277627772778277927802781278227832784278527862787278827892790279127922793279427952796279727982799280028012802280328042805280628072808280928102811281228132814281528162817281828192820282128222823282428252826282728282829283028312832283328342835283628372838283928402841284228432844284528462847284828492850285128522853285428552856285728582859286028612862286328642865286628672868286928702871287228732874287528762877287828792880288128822883288428852886288728882889289028912892289328942895289628972898289929002901290229032904290529062907290829092910291129122913291429152916291729182919292029212922292329242925292629272928292929302931293229332934293529362937293829392940294129422943294429452946294729482949295029512952295329542955295629572958295929602961296229632964296529662967296829692970297129722973297429752976297729782979298029812982298329842985298629872988298929902991299229932994299529962997299829993000300130023003300430053006300730083009301030113012301330143015301630173018301930203021302230233024302530263027302830293030303130323033303430353036303730383039304030413042304330443045304630473048304930503051305230533054305530563057305830593060306130623063306430653066306730683069307030713072307330743075307630773078307930803081308230833084308530863087308830893090309130923093309430953096309730983099310031013102310331043105310631073108310931103111311231133114311531163117311831193120312131223123312431253126312731283129313031313132313331343135313631373138313931403141314231433144314531463147314831493150315131523153315431553156315731583159316031613162316331643165316631673168316931703171317231733174317531763177317831793180318131823183318431853186318731883189319031913192319331943195319631973198319932003201320232033204320532063207320832093210321132123213321432153216321732183219322032213222322332243225322632273228322932303231323232333234323532363237323832393240324132423243324432453246324732483249325032513252325332543255325632573258325932603261326232633264326532663267326832693270327132723273327432753276327732783279328032813282328332843285328632873288328932903291329232933294329532963297329832993300330133023303330433053306330733083309331033113312331333143315331633173318331933203321332233233324 |
- # -*- coding: utf-8 -*-
- """
- 百家号视频发布器
- """
- import asyncio
- import json
- from typing import List
- from datetime import datetime
- from .base import (
- BasePublisher, PublishParams, PublishResult,
- WorkItem, WorksResult, CommentItem, CommentsResult
- )
- class BaijiahaoPublisher(BasePublisher):
- """
- 百家号视频发布器
- 使用 Playwright 自动化操作百家号创作者中心
- """
-
- platform_name = "baijiahao"
- login_url = "https://baijiahao.baidu.com/"
- publish_url = "https://baijiahao.baidu.com/builder/rc/edit?type=video"
- cookie_domain = ".baidu.com"
-
- # 登录检测配置
- login_check_url = "https://baijiahao.baidu.com/builder/rc/home"
- login_indicators = ["passport.baidu.com", "/login", "wappass.baidu.com"]
- login_selectors = ['text="登录"', 'text="请登录"', '[class*="login-btn"]']
-
- async def get_account_info(self, cookies: str) -> dict:
- """
- 获取百家号账号信息
- 使用直接 HTTP API 调用,不使用浏览器
- """
- import aiohttp
-
- print(f"\n{'='*60}")
- print(f"[{self.platform_name}] 获取账号信息 (使用 API)")
- print(f"{'='*60}")
-
- try:
- # 解析 cookies
- cookie_list = self.parse_cookies(cookies)
- cookie_dict = {c['name']: c['value'] for c in cookie_list}
-
- # 重要:百家号需要先访问主页建立会话上下文
- print(f"[{self.platform_name}] 第一步:访问主页建立会话...")
- session_headers = {
- 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7',
- 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
- # Cookie 由 session 管理,不手动设置
- 'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
- 'Accept-Encoding': 'gzip, deflate, br',
- 'Connection': 'keep-alive',
- 'Upgrade-Insecure-Requests': '1',
- 'Sec-Fetch-Dest': 'document',
- 'Sec-Fetch-Mode': 'navigate',
- 'Sec-Fetch-Site': 'none',
- 'Sec-Fetch-User': '?1',
- 'sec-ch-ua': '"Not_A Brand";v="8", "Chromium";v="120", "Google Chrome";v="120"',
- 'sec-ch-ua-mobile': '?0',
- 'sec-ch-ua-platform': '"Windows"'
- }
-
- headers = {
- 'Accept': 'application/json, text/plain, */*',
- 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
- # Cookie 由 session 管理,不手动设置
- 'Referer': 'https://baijiahao.baidu.com/builder/rc/home',
- 'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
- 'Accept-Encoding': 'gzip, deflate, br',
- 'Connection': 'keep-alive',
- 'Sec-Fetch-Dest': 'empty',
- 'Sec-Fetch-Mode': 'cors',
- 'Sec-Fetch-Site': 'same-origin',
- 'sec-ch-ua': '"Not_A Brand";v="8", "Chromium";v="120", "Google Chrome";v="120"',
- 'sec-ch-ua-mobile': '?0',
- 'sec-ch-ua-platform': '"Windows"'
- }
-
- # 使用 cookies 参数初始化 session,让 aiohttp 自动管理 cookie 更新
- async with aiohttp.ClientSession(cookies=cookie_dict) as session:
- # 步骤 0: 先访问主页建立会话上下文(关键步骤!)
- print(f"[{self.platform_name}] [0/4] 访问主页建立会话上下文...")
- async with session.get(
- 'https://baijiahao.baidu.com/builder/rc/home',
- headers=session_headers,
- timeout=aiohttp.ClientTimeout(total=30)
- ) as home_response:
- home_status = home_response.status
- print(f"[{self.platform_name}] 主页访问状态: {home_status}")
-
- # 获取响应头中的新cookies(如果有)
- if 'Set-Cookie' in home_response.headers:
- new_cookies = home_response.headers['Set-Cookie']
- print(f"[{self.platform_name}] 获取到新的会话Cookie")
- # 这里可以处理新的cookies,但暂时跳过复杂处理
-
- # 短暂等待确保会话建立
- await asyncio.sleep(1)
-
- # 步骤 1: 获取账号基本信息
- print(f"[{self.platform_name}] [1/4] 调用 appinfo API...")
- async with session.get(
- 'https://baijiahao.baidu.com/builder/app/appinfo',
- headers=headers,
- timeout=aiohttp.ClientTimeout(total=30)
- ) as response:
- appinfo_result = await response.json()
-
- print(f"[{self.platform_name}] appinfo API 完整响应: {json.dumps(appinfo_result, ensure_ascii=False)[:500]}")
- print(f"[{self.platform_name}] appinfo API 响应: errno={appinfo_result.get('errno')}")
-
- # 检查登录状态
- if appinfo_result.get('errno') != 0:
- error_msg = appinfo_result.get('errmsg', '未知错误')
- errno = appinfo_result.get('errno')
- print(f"[{self.platform_name}] API 返回错误: errno={errno}, msg={error_msg}")
-
- # errno 110 表示未登录
- if errno == 110:
- return {
- "success": False,
- "error": "Cookie 已失效,需要重新登录",
- "need_login": True
- }
-
- # errno 10001402 表示分散认证问题,尝试重新访问主页后重试
- if errno == 10001402:
- print(f"[{self.platform_name}] 检测到分散认证问题,尝试重新访问主页...")
- await asyncio.sleep(2)
-
- # 重新访问主页
- async with session.get(
- 'https://baijiahao.baidu.com/builder/rc/home',
- headers=session_headers,
- timeout=aiohttp.ClientTimeout(total=30)
- ) as retry_home_response:
- print(f"[{self.platform_name}] 重新访问主页状态: {retry_home_response.status}")
-
- await asyncio.sleep(1)
-
- # 重试 API 调用
- async with session.get(
- 'https://baijiahao.baidu.com/builder/app/appinfo',
- headers=headers,
- timeout=aiohttp.ClientTimeout(total=30)
- ) as retry_response:
- retry_result = await retry_response.json()
-
- if retry_result.get('errno') == 0:
- print(f"[{self.platform_name}] 分散认证问题已解决")
- # 使用重试成功的结果继续处理
- appinfo_result = retry_result
- else:
- print(f"[{self.platform_name}] 重试仍然失败")
- return {
- "success": False,
- "error": f"分散认证问题: {error_msg}",
- "need_login": True
- }
-
- return {
- "success": False,
- "error": error_msg,
- "need_login": True
- }
-
- # 获取用户数据
- user_data = appinfo_result.get('data', {}).get('user', {})
- if not user_data:
- return {
- "success": False,
- "error": "无法获取用户信息",
- "need_login": True
- }
-
- # 检查账号状态
- status = user_data.get('status', '')
- # 有效的账号状态:audit(审核中), pass(已通过), normal(正常), newbie(新手)
- valid_statuses = ['audit', 'pass', 'normal', 'newbie']
- if status not in valid_statuses:
- print(f"[{self.platform_name}] 账号状态异常: {status}")
-
- # 提取基本信息
- account_name = user_data.get('name') or user_data.get('uname') or '百家号账号'
- app_id = user_data.get('app_id') or user_data.get('id', 0)
- account_id = str(app_id) if app_id else f"baijiahao_{int(datetime.now().timestamp() * 1000)}"
-
- # 处理头像 URL
- avatar_url = user_data.get('avatar') or user_data.get('avatar_unify', '')
- if avatar_url and avatar_url.startswith('//'):
- avatar_url = 'https:' + avatar_url
-
- print(f"[{self.platform_name}] 账号名称: {account_name}, ID: {account_id}")
-
- # 步骤 2: 获取粉丝数(非关键,失败不影响整体)
- fans_count = 0
- try:
- print(f"[{self.platform_name}] [2/3] 调用 growth/get_info API 获取粉丝数...")
- async with session.get(
- 'https://baijiahao.baidu.com/cms-ui/rights/growth/get_info',
- headers=headers,
- timeout=aiohttp.ClientTimeout(total=10)
- ) as response:
- growth_result = await response.json()
-
- if growth_result.get('errno') == 0:
- growth_data = growth_result.get('data', {})
- fans_count = int(growth_data.get('fans_num', 0))
- print(f"[{self.platform_name}] 粉丝数: {fans_count}")
- else:
- print(f"[{self.platform_name}] 获取粉丝数失败: {growth_result.get('errmsg')}")
- except Exception as e:
- print(f"[{self.platform_name}] 获取粉丝数异常(非关键): {e}")
-
- # 步骤 3: 获取作品数量(使用与 Node 端一致的 API)
- works_count = 0
- try:
- print(f"[{self.platform_name}] [3/3] 调用 article/lists API 获取作品数...")
-
- # 使用与 Node 端一致的 API 参数
- list_url = 'https://baijiahao.baidu.com/pcui/article/lists?currentPage=1&pageSize=20&search=&type=&collection=&startDate=&endDate=&clearBeforeFetch=false&dynamic=0'
-
- async with session.get(
- list_url,
- headers={
- 'accept': '*/*',
- 'user-agent': 'PostmanRuntime/7.51.0',
- # cookie 由 session 管理
- 'referer': 'https://baijiahao.baidu.com/builder/rc/content',
- 'connection': 'keep-alive',
- 'accept-encoding': 'gzip, deflate, br',
- },
- timeout=aiohttp.ClientTimeout(total=30)
- ) as response:
- response_text = await response.text()
- print(f"[{self.platform_name}] ========== Works API Response ==========")
- print(f"[{self.platform_name}] Full response: {response_text[:1000]}...") # 只打印前1000字符
- print(f"[{self.platform_name}] =========================================")
-
- works_result = json.loads(response_text)
-
- # 处理分散认证问题 (errno=10001402),重试一次
- if works_result.get('errno') == 10001402:
- print(f"[{self.platform_name}] 分散认证问题 (errno=10001402),3秒后重试...")
- await asyncio.sleep(3)
-
- # 重试一次,使用更完整的请求头
- retry_headers = headers.copy()
- retry_headers.update({
- 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7',
- 'Cache-Control': 'max-age=0',
- 'Upgrade-Insecure-Requests': '1',
- })
-
- async with session.get(
- list_url,
- headers=retry_headers,
- timeout=aiohttp.ClientTimeout(total=30)
- ) as retry_response:
- retry_text = await retry_response.text()
- print(f"[{self.platform_name}] ========== Works API Retry Response ==========")
- print(f"[{self.platform_name}] Full retry response: {retry_text[:1000]}...")
- print(f"[{self.platform_name}] ===============================================")
-
- works_result = json.loads(retry_text)
-
- if works_result.get('errno') == 10001402:
- print(f"[{self.platform_name}] 重试仍然失败,返回已获取的账号信息")
- works_result = None
-
- if works_result and works_result.get('errno') == 0:
- works_data = works_result.get('data', {})
- # 优先使用 data.page.totalCount,如果没有则使用 data.total(兼容旧格式)
- page_info = works_data.get('page', {})
- works_count = int(page_info.get('totalCount', works_data.get('total', 0)))
- print(f"[{self.platform_name}] 作品数: {works_count} (from page.totalCount: {page_info.get('totalCount')}, from total: {works_data.get('total')})")
- else:
- errno = works_result.get('errno') if works_result else 'unknown'
- errmsg = works_result.get('errmsg', 'unknown error') if works_result else 'no response'
- print(f"[{self.platform_name}] 获取作品数失败: errno={errno}, errmsg={errmsg}")
- except Exception as e:
- import traceback
- print(f"[{self.platform_name}] 获取作品数异常(非关键): {e}")
- traceback.print_exc()
-
- # 返回账号信息
- account_info = {
- "success": True,
- "account_id": account_id,
- "account_name": account_name,
- "avatar_url": avatar_url,
- "fans_count": fans_count,
- "works_count": works_count,
- }
-
- print(f"[{self.platform_name}] ✓ 获取成功: {account_name} (粉丝: {fans_count}, 作品: {works_count})")
- return account_info
-
- except Exception as e:
- import traceback
- traceback.print_exc()
- return {
- "success": False,
- "error": str(e)
- }
-
- async def check_captcha(self) -> dict:
- """检查页面是否需要验证码"""
- if not self.page:
- return {'need_captcha': False, 'captcha_type': ''}
-
- try:
- # 检查各种验证码
- captcha_selectors = [
- 'text="请输入验证码"',
- 'text="滑动验证"',
- '[class*="captcha"]',
- '[class*="verify"]',
- ]
- for selector in captcha_selectors:
- try:
- if await self.page.locator(selector).count() > 0:
- print(f"[{self.platform_name}] 检测到验证码: {selector}")
- return {'need_captcha': True, 'captcha_type': 'image'}
- except:
- pass
-
- # 检查登录弹窗
- login_selectors = [
- 'text="请登录"',
- 'text="登录后继续"',
- '[class*="login-dialog"]',
- ]
- for selector in login_selectors:
- try:
- if await self.page.locator(selector).count() > 0:
- print(f"[{self.platform_name}] 检测到需要登录: {selector}")
- return {'need_captcha': True, 'captcha_type': 'login'}
- except:
- pass
-
- except Exception as e:
- print(f"[{self.platform_name}] 验证码检测异常: {e}")
-
- return {'need_captcha': False, 'captcha_type': ''}
- async def _ai_analyze_upload_state(self, screenshot_base64: str = None) -> dict:
- """
- 使用 AI 识别当前上传状态,返回:
- {
- status: completed|uploading|failed|unknown,
- progress: int|None,
- confidence: int,
- reason: str,
- should_enter_publish_form: bool
- }
- """
- import os
- import ast
- import re
- import requests
- result = {
- "status": "unknown",
- "progress": None,
- "confidence": 0,
- "reason": "",
- "should_enter_publish_form": False,
- }
- try:
- if not screenshot_base64:
- screenshot_base64 = await self.capture_screenshot()
- if not screenshot_base64:
- result["reason"] = "no-screenshot"
- return result
- ai_api_key = os.environ.get('DASHSCOPE_API_KEY', '')
- ai_base_url = os.environ.get('DASHSCOPE_BASE_URL', 'https://dashscope.aliyuncs.com/compatible-mode/v1')
- ai_vision_model = os.environ.get('AI_VISION_MODEL', 'qwen-vl-plus')
- if not ai_api_key:
- result["reason"] = "no-ai-key"
- return result
- prompt = """请分析这张“百家号视频发布页”截图,判断视频上传状态。
- 请只返回 JSON:
- {
- "status": "completed|uploading|failed|unknown",
- "progress": 0-100 或 null,
- "confidence": 0-100,
- "reason": "一句话证据",
- "should_enter_publish_form": true/false
- }
- 判定规则:
- 1) status=completed:
- - 出现“上传完成/处理完成/可发布/可填写标题描述/发布按钮可用”等信号
- - 或者明显已进入可填写发布信息的阶段
- 2) status=uploading:
- - 出现“上传中/处理中/转码中/xx%/请稍候”等
- 3) status=failed:
- - 出现“上传失败/处理失败/格式不支持/文件异常”等明确失败文案
- 4) should_enter_publish_form=true:
- - 画面显示“去发布/下一步/继续/完成编辑”等入口,且看起来应点击进入正式发布表单
- """
- headers = {
- 'Authorization': f'Bearer {ai_api_key}',
- 'Content-Type': 'application/json'
- }
- payload = {
- "model": ai_vision_model,
- "messages": [
- {
- "role": "user",
- "content": [
- {
- "type": "image_url",
- "image_url": {
- "url": f"data:image/jpeg;base64,{screenshot_base64}"
- }
- },
- {
- "type": "text",
- "text": prompt
- }
- ]
- }
- ],
- "max_tokens": 400
- }
- response = requests.post(
- f"{ai_base_url}/chat/completions",
- headers=headers,
- json=payload,
- timeout=30
- )
- if response.status_code != 200:
- result["reason"] = f"ai-http-{response.status_code}"
- return result
- response_json = response.json()
- ai_response = response_json.get('choices', [{}])[0].get('message', {}).get('content', '')
- json_match = re.search(r'```json\\s*([\\s\\S]*?)\\s*```', ai_response)
- if json_match:
- json_str = json_match.group(1)
- else:
- json_match = re.search(r'\\{[\\s\\S]*\\}', ai_response)
- json_str = json_match.group(0) if json_match else '{}'
- try:
- data = json.loads(json_str)
- except Exception:
- try:
- data = ast.literal_eval(json_str) if json_str and json_str != '{}' else {}
- if not isinstance(data, dict):
- data = {}
- except Exception:
- data = {}
- # 兼容中文 key / 非标准结构
- status_hint = str(
- data.get("status")
- or data.get("状态")
- or ""
- ).strip()
- status_raw = status_hint.lower()
- if (
- status_raw in ["complete", "completed", "success", "done", "finished", "ready"]
- or any(k in status_hint for k in ["完成", "成功", "可发布", "已上传"])
- ):
- status = "completed"
- elif (
- status_raw in ["uploading", "processing", "in_progress", "progress", "running"]
- or any(k in status_hint for k in ["上传中", "处理中", "转码", "进行中", "上传"])
- ):
- status = "uploading"
- elif (
- status_raw in ["failed", "error", "fail"]
- or any(k in status_hint for k in ["失败", "错误", "异常"])
- ):
- status = "failed"
- else:
- status = "unknown"
- progress = data.get("progress", data.get("进度", None))
- parsed_progress = None
- try:
- if progress is not None and str(progress).strip() != "":
- parsed_progress = max(0, min(100, int(float(progress))))
- except Exception:
- parsed_progress = None
- if parsed_progress is None:
- try:
- p_match = re.search(r'(\d{1,3})\s*%', ai_response or '')
- if p_match:
- parsed_progress = max(0, min(100, int(p_match.group(1))))
- except Exception:
- parsed_progress = None
- confidence = 0
- try:
- confidence = max(0, min(100, int(float(data.get("confidence", data.get("置信度", 0)) or 0))))
- except Exception:
- confidence = 0
- reason = str(data.get("reason", data.get("原因", "")) or "").strip()
- should_enter_raw = data.get(
- "should_enter_publish_form",
- data.get("是否进入发布表单", False)
- )
- if isinstance(should_enter_raw, bool):
- should_enter = should_enter_raw
- else:
- should_enter_text = str(should_enter_raw or "").strip().lower()
- should_enter = should_enter_text in ["true", "1", "yes", "y", "是"]
- # 当 AI 响应不是严格 JSON 时,按全文关键词推断
- response_text = str(ai_response or "")
- response_lower = response_text.lower()
- if status == "unknown":
- if any(k in response_text for k in ["上传完成", "处理完成", "上传成功", "可发布", "已完成"]):
- status = "completed"
- elif any(k in response_text for k in ["上传失败", "处理失败", "格式不支持", "文件异常", "失败"]):
- status = "failed"
- elif any(k in response_text for k in ["上传中", "处理中", "转码中", "请稍候"]) or re.search(r'(\d{1,3})\s*%', response_text):
- status = "uploading"
- if not should_enter and any(k in response_text for k in ["去发布", "下一步", "继续", "完成编辑"]):
- should_enter = True
- if not reason and response_text:
- reason = response_text.replace("\n", " ").strip()[:120]
- if confidence <= 0 and status != "unknown":
- confidence = 60
- # 二次语义修正
- if status == "uploading" and parsed_progress is not None and parsed_progress >= 100:
- status = "completed"
- should_enter = True
- # AI 有时会把 99/100 仍写成 uploading,这里做语义修正
- if status == "uploading" and parsed_progress is not None and parsed_progress >= 99 and confidence >= 60:
- status = "completed"
- should_enter = True
- return {
- "status": status,
- "progress": parsed_progress,
- "confidence": confidence,
- "reason": reason,
- "should_enter_publish_form": should_enter,
- }
- except Exception as e:
- result["reason"] = f"ai-exception:{e}"
- return result
- async def _extract_bjh_token(self) -> str:
- """从页面上下文提取百家号接口 token。"""
- if not self.page:
- return ""
- try:
- token = await self.page.evaluate(
- """
- () => {
- const isJwtLike = (v) => {
- if (!v || typeof v !== 'string') return false;
- const s = v.trim();
- if (s.length < 60) return false;
- const parts = s.split('.');
- if (parts.length !== 3) return false;
- return parts.every(p => /^[A-Za-z0-9_-]+$/.test(p) && p.length > 10);
- };
- const pickFromStorage = (storage) => {
- try {
- const keys = Object.keys(storage || {});
- for (const k of keys) {
- const v = storage.getItem(k);
- if (isJwtLike(v)) return v;
- }
- } catch {}
- return "";
- };
- let t = pickFromStorage(window.localStorage);
- if (t) return t;
- t = pickFromStorage(window.sessionStorage);
- if (t) return t;
- const meta = document.querySelector('meta[name="token"], meta[name="bjh-token"]');
- const metaToken = meta && meta.getAttribute('content');
- if (isJwtLike(metaToken)) return metaToken;
- const candidates = [
- (window.__INITIAL_STATE__ && window.__INITIAL_STATE__.token) || "",
- (window.__PRELOADED_STATE__ && window.__PRELOADED_STATE__.token) || "",
- (window.__NUXT__ && window.__NUXT__.state && window.__NUXT__.state.token) || "",
- ];
- for (const c of candidates) {
- if (isJwtLike(c)) return c;
- }
- return "";
- }
- """
- )
- if token:
- return str(token)
- except Exception:
- pass
- try:
- import re
- html = await self.page.content()
- m = re.search(r'([A-Za-z0-9_-]{10,}\.[A-Za-z0-9_-]{10,}\.[A-Za-z0-9_-]{10,})', html)
- if m:
- return m.group(1)
- except Exception:
- pass
- return ""
- async def _verify_publish_from_content_page(self, expected_title: str, page_size: int = 20) -> bool:
- """
- 到内容管理页调用列表接口,按标题二次确认是否已发布。
- """
- if not self.page:
- return False
- try:
- content_url = (
- "https://baijiahao.baidu.com/builder/rc/content"
- f"?currentPage=1&pageSize={int(page_size)}"
- "&search=&type=&collection=&startDate=&endDate="
- )
- await self.page.goto(content_url, wait_until="domcontentloaded", timeout=60000)
- await asyncio.sleep(2)
- token = await self._extract_bjh_token()
- expected = (expected_title or "").strip()
- if not expected:
- return False
- fetch_result = await self.page.evaluate(
- """
- async ({ token, pageSize }) => {
- const url =
- "https://baijiahao.baidu.com/pcui/article/lists" +
- "?currentPage=1" +
- `&pageSize=${pageSize}` +
- "&search=&type=&collection=&startDate=&endDate=" +
- "&clearBeforeFetch=false&dynamic=1";
- const r = await fetch(url, {
- method: "GET",
- credentials: "include",
- headers: {
- "accept": "application/json, text/plain, */*",
- ...(token ? { token } : {}),
- },
- });
- const text = await r.text();
- return { ok: r.ok, status: r.status, text };
- }
- """,
- {"token": token, "pageSize": int(page_size)}
- )
- if not fetch_result or not fetch_result.get("ok"):
- status = fetch_result.get("status") if isinstance(fetch_result, dict) else "unknown"
- print(f"[{self.platform_name}] 内容页校验接口失败: HTTP {status}")
- return False
- data = json.loads(fetch_result.get("text") or "{}")
- if data.get("errno") != 0:
- print(f"[{self.platform_name}] 内容页校验接口错误: errno={data.get('errno')}, msg={data.get('errmsg')}")
- return False
- items = ((data.get("data") or {}).get("list") or [])
- if not isinstance(items, list) or not items:
- print(f"[{self.platform_name}] 内容页校验:当前列表为空")
- return False
- # 标题匹配采用“全量相等 + 前缀包含”双策略,兼容平台侧自动截断。
- expected_variants = {expected}
- if len(expected) > 12:
- expected_variants.add(expected[:12])
- if len(expected) > 20:
- expected_variants.add(expected[:20])
- for item in items:
- title = str(item.get("title") or "").strip()
- if not title:
- continue
- for needle in expected_variants:
- if needle and (title == needle or needle in title):
- print(f"[{self.platform_name}] 内容页校验命中标题: {title}")
- return True
- print(f"[{self.platform_name}] 内容页校验未命中标题,expected={expected}")
- return False
- except Exception as e:
- print(f"[{self.platform_name}] 内容页校验异常: {e}")
- return False
- async def publish(self, cookies: str, params: PublishParams) -> PublishResult:
- """发布视频到百家号"""
- import os
- import re
- import shutil
-
- print(f"\n{'='*60}")
- print(f"[{self.platform_name}] 开始发布视频")
- print(f"[{self.platform_name}] 视频路径: {params.video_path}")
- print(f"[{self.platform_name}] 标题: {params.title}")
- print(f"[{self.platform_name}] 描述: {(params.description or '')[:120]}")
- print(f"[{self.platform_name}] Headless: {self.headless}")
- print(f"{'='*60}")
-
- self.report_progress(5, "正在初始化浏览器...")
-
- # 初始化浏览器
- await self.init_browser()
- print(f"[{self.platform_name}] 浏览器初始化完成")
-
- # 解析并设置 cookies
- cookie_list = self.parse_cookies(cookies)
- print(f"[{self.platform_name}] 解析到 {len(cookie_list)} 个 cookies")
- await self.set_cookies(cookie_list)
-
- if not self.page:
- raise Exception("Page not initialized")
-
- # 检查视频文件
- if not os.path.exists(params.video_path):
- raise Exception(f"视频文件不存在: {params.video_path}")
- print(f"[{self.platform_name}] 视频文件存在,大小: {os.path.getsize(params.video_path)} bytes")
- # 关键兜底:百家号在标题框不可编辑时会将“文件名主干”作为默认标题。
- # 因此上传前为视频创建“标题别名文件”(优先硬链接,失败再复制),确保默认标题可控。
- upload_video_path = params.video_path
- try:
- raw_title = (params.title or "").strip()
- if raw_title:
- safe_title = re.sub(r'[<>:"/\\\\|?*\\x00-\\x1F]', '', raw_title)
- safe_title = re.sub(r'\\s+', ' ', safe_title).strip().rstrip('.')
- if not safe_title:
- safe_title = "video"
- safe_title = safe_title[:30]
- src_ext = os.path.splitext(params.video_path)[1] or ".mp4"
- alias_dir = os.path.join(os.path.dirname(params.video_path), "_bjh_upload_alias")
- os.makedirs(alias_dir, exist_ok=True)
- # 轻量清理:删除 24h 前的旧别名文件,避免长期累积
- try:
- now_ts = datetime.now().timestamp()
- for fn in os.listdir(alias_dir):
- full = os.path.join(alias_dir, fn)
- if not os.path.isfile(full):
- continue
- if now_ts - os.path.getmtime(full) > 24 * 3600:
- try:
- os.remove(full)
- except Exception:
- pass
- except Exception:
- pass
- alias_name = f"{safe_title}{src_ext}"
- alias_path = os.path.join(alias_dir, alias_name)
- if os.path.abspath(alias_path) != os.path.abspath(params.video_path):
- if os.path.exists(alias_path):
- try:
- os.remove(alias_path)
- except Exception:
- pass
- try:
- os.link(params.video_path, alias_path)
- upload_video_path = alias_path
- print(f"[{self.platform_name}] 上传别名已创建(硬链接): {upload_video_path}")
- except Exception:
- shutil.copy2(params.video_path, alias_path)
- upload_video_path = alias_path
- print(f"[{self.platform_name}] 上传别名已创建(复制): {upload_video_path}")
- except Exception as e:
- upload_video_path = params.video_path
- print(f"[{self.platform_name}] 创建上传别名失败,回退原文件: {e}")
-
- self.report_progress(10, "正在打开上传页面...")
-
- # 访问视频发布页面(使用新视频发布界面)
- video_publish_url = "https://baijiahao.baidu.com/builder/rc/edit?type=videoV2&is_from_cms=1"
- await self.page.goto(video_publish_url, wait_until="domcontentloaded", timeout=60000)
- await asyncio.sleep(3)
-
- # 检查是否跳转到登录页
- current_url = self.page.url
- print(f"[{self.platform_name}] 当前页面: {current_url}")
-
- for indicator in self.login_indicators:
- if indicator in current_url:
- screenshot_base64 = await self.capture_screenshot()
- return PublishResult(
- success=False,
- platform=self.platform_name,
- error="Cookie 已过期,需要重新登录",
- need_captcha=True,
- captcha_type='login',
- screenshot_base64=screenshot_base64,
- page_url=current_url,
- status='need_captcha'
- )
-
- # 使用 AI 检查验证码
- ai_captcha = await self.ai_check_captcha()
- if ai_captcha['has_captcha']:
- print(f"[{self.platform_name}] AI检测到验证码: {ai_captcha['captcha_type']}", flush=True)
- screenshot_base64 = await self.capture_screenshot()
- return PublishResult(
- success=False,
- platform=self.platform_name,
- error=f"检测到{ai_captcha['captcha_type']}验证码,需要使用有头浏览器完成验证",
- need_captcha=True,
- captcha_type=ai_captcha['captcha_type'],
- screenshot_base64=screenshot_base64,
- page_url=current_url,
- status='need_captcha'
- )
-
- # 传统方式检查验证码
- captcha_result = await self.check_captcha()
- if captcha_result['need_captcha']:
- screenshot_base64 = await self.capture_screenshot()
- return PublishResult(
- success=False,
- platform=self.platform_name,
- error=f"需要{captcha_result['captcha_type']}验证码,请使用有头浏览器完成验证",
- need_captcha=True,
- captcha_type=captcha_result['captcha_type'],
- screenshot_base64=screenshot_base64,
- page_url=current_url,
- status='need_captcha'
- )
-
- self.report_progress(15, "正在选择视频文件...")
-
- # 等待页面加载完成
- await asyncio.sleep(2)
-
- # 关闭可能的弹窗
- try:
- close_buttons = [
- 'button:has-text("我知道了")',
- 'button:has-text("知道了")',
- '[class*="close"]',
- '[class*="modal-close"]',
- ]
- for btn_selector in close_buttons:
- try:
- btn = self.page.locator(btn_selector).first
- if await btn.count() > 0 and await btn.is_visible():
- await btn.click()
- await asyncio.sleep(0.5)
- except:
- pass
- except:
- pass
-
- # 上传视频 - 尝试多种方式
- upload_triggered = False
-
- # 方法1: 直接通过 file input 上传
- try:
- file_inputs = await self.page.query_selector_all('input[type="file"]')
- print(f"[{self.platform_name}] 找到 {len(file_inputs)} 个文件输入")
-
- for file_input in file_inputs:
- try:
- await file_input.set_input_files(upload_video_path)
- upload_triggered = True
- print(f"[{self.platform_name}] 通过 file input 上传成功")
- break
- except Exception as e:
- print(f"[{self.platform_name}] file input 上传失败: {e}")
- except Exception as e:
- print(f"[{self.platform_name}] 查找 file input 失败: {e}")
-
- # 方法2: 点击上传区域
- if not upload_triggered:
- upload_selectors = [
- 'div[class*="upload-box"]',
- 'div[class*="drag-upload"]',
- 'div[class*="uploader"]',
- 'div:has-text("点击上传")',
- 'div:has-text("选择文件")',
- '[class*="upload-area"]',
- ]
-
- for selector in upload_selectors:
- if upload_triggered:
- break
- try:
- upload_area = self.page.locator(selector).first
- if await upload_area.count() > 0:
- print(f"[{self.platform_name}] 尝试点击上传区域: {selector}")
- async with self.page.expect_file_chooser(timeout=10000) as fc_info:
- await upload_area.click()
- file_chooser = await fc_info.value
- await file_chooser.set_files(upload_video_path)
- upload_triggered = True
- print(f"[{self.platform_name}] 通过点击上传区域成功")
- break
- except Exception as e:
- print(f"[{self.platform_name}] 选择器 {selector} 失败: {e}")
-
- if not upload_triggered:
- screenshot_base64 = await self.capture_screenshot()
- return PublishResult(
- success=False,
- platform=self.platform_name,
- error="未找到上传入口",
- screenshot_base64=screenshot_base64,
- page_url=await self.get_page_url(),
- status='failed'
- )
-
- self.report_progress(20, "等待视频上传...")
-
- # 等待视频上传完成(百家号大文件+处理可能较慢)
- upload_timeout = 900
- start_time = asyncio.get_event_loop().time()
- last_heartbeat_time = start_time
- last_signal_time = start_time
- last_stall_log_time = start_time
- last_ai_upload_check_time = start_time - 60
- ai_upload_check_interval = 20
- ai_upload_poll_count = 0
- ai_upload_unknown_streak = 0
- last_pct = -1
- forced_continue_after = 180 # 无进度信号时,3 分钟后执行兜底继续
- processing_since = None
- processing_selector_hit = ""
- processing_stale_continue_after = 300 # 处理态持续 5 分钟仍无明确变化,执行兜底继续
- has_progress_signal = False
- progress_signal_lost_continue_after = 90 # 已看到进度后,若信号中断 90s,直接进入下一步
- hard_cutover_signal_gap_after = 120 # 已出现过进度后,信号中断超过该值则硬切下一阶段
- hard_cutover_elapsed_after = 210 # 上传总耗时超过该值时,硬切下一阶段
- async def _attempt_enter_publish_form_from_upload(stage: str) -> bool:
- enter_selectors = [
- 'button:has-text("去发布")',
- '[role="button"]:has-text("去发布")',
- 'button:has-text("发布视频")',
- '[role="button"]:has-text("发布视频")',
- 'button:has-text("下一步")',
- '[role="button"]:has-text("下一步")',
- 'button:has-text("继续")',
- '[role="button"]:has-text("继续")',
- 'button:has-text("完成编辑")',
- '[role="button"]:has-text("完成编辑")',
- '[class*="next"] button',
- '[class*="step"] button',
- ]
- blocked_exact = {"发布", "定时发布", "立即发布", "取消", "返回", "关闭"}
- blocked_contains = ["定时发布", "立即发布", "取消", "返回", "关闭", "删除", "重传", "重新上传", "清空"]
- for selector in enter_selectors:
- try:
- btns = self.page.locator(selector)
- count = await btns.count()
- for idx in range(min(count, 6)):
- btn = btns.nth(idx)
- if not await btn.is_visible():
- continue
- text = (await btn.text_content() or "").strip()
- compact = re.sub(r"\s+", "", text)
- if compact in blocked_exact or any(w in compact for w in blocked_contains):
- continue
- disabled_attr = await btn.get_attribute('disabled')
- aria_disabled = (await btn.get_attribute('aria-disabled') or '').lower()
- if disabled_attr is not None or aria_disabled == 'true':
- continue
- try:
- await btn.scroll_into_view_if_needed(timeout=1200)
- except Exception:
- pass
- try:
- await btn.click(timeout=2500)
- except Exception:
- await btn.click(force=True, timeout=2500)
- print(f"[{self.platform_name}] 上传阶段尝试切换到发布表单: stage={stage}, selector={selector}, text={compact or text}, idx={idx}")
- await asyncio.sleep(1)
- return True
- except Exception:
- pass
- # 深层 DOM 兜底(含 shadowRoot),应对常规选择器无法命中
- try:
- deep_clicked = await self.page.evaluate(
- """
- () => {
- const wanted = ['去发布', '发布视频', '下一步', '继续', '完成编辑'];
- const blockedExact = new Set(['发布', '定时发布', '立即发布', '取消', '返回', '关闭']);
- const blockedContains = ['定时发布', '立即发布', '取消', '返回', '关闭', '删除', '重传', '重新上传', '清空'];
- const roots = [document];
- const visited = new Set();
- const allNodes = [];
- while (roots.length) {
- const root = roots.pop();
- if (!root || visited.has(root)) continue;
- visited.add(root);
- const nodes = root.querySelectorAll('*');
- for (const n of nodes) {
- allNodes.push(n);
- if (n && n.shadowRoot) roots.push(n.shadowRoot);
- }
- }
- const isVisible = (el) => {
- try {
- const style = window.getComputedStyle(el);
- if (style.display === 'none' || style.visibility === 'hidden' || style.pointerEvents === 'none') return false;
- const rect = el.getBoundingClientRect();
- return !!rect && rect.width > 8 && rect.height > 8;
- } catch {
- return false;
- }
- };
- for (const el of allNodes) {
- const text = String(el.innerText || el.textContent || '').replace(/\\s+/g, '').trim();
- if (!text) continue;
- if (blockedExact.has(text)) continue;
- if (blockedContains.some(x => text.includes(x))) continue;
- if (!wanted.some(x => text.includes(x))) continue;
- if (!isVisible(el)) continue;
- const tag = String(el.tagName || '').toLowerCase();
- const role = String(el.getAttribute && el.getAttribute('role') || '').toLowerCase();
- const cls = String(el.className || '').toLowerCase();
- const clickable = tag === 'button' || tag === 'a' || role === 'button' || /btn|button|next|step/.test(cls);
- if (!clickable) continue;
- try {
- el.click();
- return { ok: true, text };
- } catch {}
- }
- return { ok: false, text: '' };
- }
- """
- )
- if deep_clicked and deep_clicked.get("ok"):
- clicked_text = str(deep_clicked.get("text") or "").strip()
- print(f"[{self.platform_name}] 上传阶段深层DOM切换发布表单成功: stage={stage}, text={clicked_text}")
- await asyncio.sleep(1.2)
- return True
- except Exception:
- pass
- return False
-
- while asyncio.get_event_loop().time() - start_time < upload_timeout:
- now = asyncio.get_event_loop().time()
- elapsed = int(now - start_time)
- status_parts = []
- # 检查上传进度
- pct = None
- try:
- progress_nodes = self.page.locator('[class*="progress"], [class*="percent"], div:has-text("%"), span:has-text("%")')
- node_count = await progress_nodes.count()
- for idx in range(min(node_count, 6)):
- text = await progress_nodes.nth(idx).text_content()
- if not text:
- continue
- match = re.search(r'(\d{1,3})\s*%', text)
- if match:
- pct = max(0, min(100, int(match.group(1))))
- break
- except Exception:
- pass
- if pct is not None:
- status_parts.append(f"progress={pct}%")
- last_signal_time = now
- has_progress_signal = True
- if pct != last_pct:
- self.report_progress(20 + min(35, int(pct * 0.35)), f"视频上传中 {pct}%...")
- last_pct = pct
- if pct >= 100:
- print(f"[{self.platform_name}] 上传完成(进度达到 100%)")
- break
- # 明确的上传完成提示
- upload_done = False
- upload_done_selectors = [
- 'div:has-text("上传完成")',
- 'div:has-text("处理完成")',
- 'div:has-text("上传成功")',
- 'span:has-text("上传完成")',
- '[class*="upload-success"]',
- ]
- try:
- for selector in upload_done_selectors:
- loc = self.page.locator(selector).first
- if await loc.count() > 0 and await loc.is_visible():
- upload_done = True
- print(f"[{self.platform_name}] 检测到上传完成提示: {selector}")
- break
- except Exception:
- pass
- if upload_done:
- last_signal_time = now
- break
-
- # 检查处理态
- is_processing = False
- processing_selectors = [
- 'div:has-text("上传中")',
- 'span:has-text("上传中")',
- 'div:has-text("处理中")',
- 'span:has-text("处理中")',
- 'div:has-text("转码中")',
- 'span:has-text("转码中")',
- 'div:has-text("请稍候")',
- 'span:has-text("请稍候")',
- 'div:has-text("正在上传")',
- 'div:has-text("正在处理")',
- 'text="上传中"',
- 'text="处理中"',
- ]
- try:
- for selector in processing_selectors:
- loc = self.page.locator(selector).first
- if await loc.count() > 0 and await loc.is_visible():
- is_processing = True
- processing_selector_hit = selector
- break
- except Exception:
- pass
- if is_processing:
- if processing_since is None:
- processing_since = now
- processing_elapsed = int(now - processing_since)
- status_parts.append(f"processing={processing_elapsed}s")
- if processing_selector_hit:
- status_parts.append(f"by={processing_selector_hit}")
- # 处理态短时间内视为有效信号;超过阈值后不再持续刷新 signal_gap,避免卡死
- if processing_elapsed <= 180:
- last_signal_time = now
- else:
- processing_since = None
- processing_selector_hit = ""
- # 检查是否出现标题输入框(部分页面会在上传阶段就显示,需结合时间/处理态判断)
- title_input_visible = False
- try:
- title_input = self.page.locator('input[placeholder*="标题"], textarea[placeholder*="标题"], [class*="title-input"] input').first
- title_input_visible = await title_input.count() > 0 and await title_input.is_visible()
- except Exception:
- title_input_visible = False
- if title_input_visible and (
- (not is_processing and elapsed >= 45) or
- (processing_since is not None and (now - processing_since) >= 180) or
- elapsed >= 360
- ):
- print(f"[{self.platform_name}] 检测到可编辑标题,继续后续步骤")
- break
- # 检查是否有错误提示
- error_text = ''
- try:
- error_nodes = self.page.locator('[class*="error"], [class*="fail"], div:has-text("上传失败"), div:has-text("处理失败")')
- err_count = await error_nodes.count()
- for idx in range(min(err_count, 6)):
- txt = (await error_nodes.nth(idx).text_content() or '').strip()
- if txt and any(k in txt for k in ['失败', '错误', '异常', '中断']):
- error_text = txt
- break
- except Exception:
- error_text = ''
- if error_text:
- screenshot_base64 = await self.capture_screenshot()
- return PublishResult(
- success=False,
- platform=self.platform_name,
- error=f"上传失败: {error_text}",
- screenshot_base64=screenshot_base64,
- page_url=await self.get_page_url(),
- status='failed'
- )
- # AI 上传状态判定(节流),用于弥补 DOM/文案信号缺失
- should_run_ai_upload_check = (now - last_ai_upload_check_time) >= ai_upload_check_interval
- if should_run_ai_upload_check:
- ai_upload_poll_count += 1
- ai_upload_state = await self._ai_analyze_upload_state()
- last_ai_upload_check_time = now
- ai_status = str(ai_upload_state.get("status") or "unknown").strip().lower()
- ai_progress = ai_upload_state.get("progress")
- ai_confidence = int(ai_upload_state.get("confidence") or 0)
- ai_reason = str(ai_upload_state.get("reason") or "").strip()
- ai_should_enter_form = bool(ai_upload_state.get("should_enter_publish_form"))
- print(
- f"[{self.platform_name}] AI上传轮询#{ai_upload_poll_count}: elapsed={elapsed}s, "
- f"status={ai_status}, progress={ai_progress}, confidence={ai_confidence}, "
- f"enter_form={ai_should_enter_form}, reason={ai_reason or '-'}"
- )
- if ai_status == "unknown":
- ai_upload_unknown_streak += 1
- else:
- ai_upload_unknown_streak = 0
- if ai_status == "failed":
- screenshot_base64 = await self.capture_screenshot()
- return PublishResult(
- success=False,
- platform=self.platform_name,
- error=f"上传失败(AI判定): {ai_reason or '检测到上传失败信号'}",
- screenshot_base64=screenshot_base64,
- page_url=await self.get_page_url(),
- status='failed'
- )
- if ai_status == "completed":
- if ai_should_enter_form:
- await _attempt_enter_publish_form_from_upload("ai-completed")
- print(f"[{self.platform_name}] AI判定上传已完成,进入下一阶段")
- last_signal_time = now
- break
- if ai_status == "uploading":
- has_progress_signal = True
- last_signal_time = now
- if isinstance(ai_progress, (int, float)):
- ai_pct = max(0, min(100, int(ai_progress)))
- status_parts.append(f"ai-progress={ai_pct}%")
- if ai_pct != last_pct and ai_pct > 0:
- self.report_progress(20 + min(35, int(ai_pct * 0.35)), f"视频上传中 {ai_pct}%...")
- last_pct = ai_pct
- if ai_pct >= 99 and ai_confidence >= 60:
- if ai_should_enter_form:
- await _attempt_enter_publish_form_from_upload("ai-upload-99")
- print(f"[{self.platform_name}] AI判定上传接近完成,进入下一阶段")
- break
- else:
- status_parts.append("ai=uploading")
- if ai_should_enter_form and elapsed >= 60:
- await _attempt_enter_publish_form_from_upload("ai-uploading-enter-form")
- elif ai_status == "unknown" and ai_should_enter_form and elapsed >= 60:
- await _attempt_enter_publish_form_from_upload("ai-unknown-enter-form")
- elif ai_status == "unknown" and ai_upload_unknown_streak >= 3 and elapsed >= 90:
- await _attempt_enter_publish_form_from_upload("ai-unknown-streak")
- # 心跳日志,便于定位“卡住”
- if now - last_heartbeat_time >= 15:
- signal_gap = int(now - last_signal_time)
- extra = ", ".join(status_parts) if status_parts else "no-visible-signal"
- print(f"[{self.platform_name}] 上传等待中: elapsed={elapsed}s, signal_gap={signal_gap}s, {extra}")
- last_heartbeat_time = now
- # 已经出现过进度后,如果进度信号中断较久,进入下一步兜底
- dynamic_signal_lost_after = progress_signal_lost_continue_after
- if last_pct >= 95:
- # 95%+ 阶段可能有短暂静默,适度放宽
- dynamic_signal_lost_after = max(progress_signal_lost_continue_after, 150)
- elif last_pct >= 80:
- # 中后段进度(80%+)可能进入转码/校验静默期,但不应无限等待
- dynamic_signal_lost_after = max(progress_signal_lost_continue_after, 150)
- elif last_pct >= 60:
- dynamic_signal_lost_after = max(progress_signal_lost_continue_after, 120)
- if has_progress_signal and (now - last_signal_time) >= dynamic_signal_lost_after:
- signal_gap = int(now - last_signal_time)
- if last_pct >= 95 or title_input_visible or elapsed >= max(780, upload_timeout - 60):
- print(f"[{self.platform_name}] 上传进度信号中断过久({signal_gap}s>={dynamic_signal_lost_after}s),继续后续步骤(兜底)")
- break
- if (last_pct >= 70 and signal_gap >= hard_cutover_signal_gap_after) or elapsed >= hard_cutover_elapsed_after:
- await _attempt_enter_publish_form_from_upload("hard-cutover-signal")
- print(f"[{self.platform_name}] 上传长时间无新信号,执行硬切换到标题阶段: elapsed={elapsed}s, signal_gap={signal_gap}s, last_pct={last_pct}")
- break
- if now - last_stall_log_time >= 30:
- print(f"[{self.platform_name}] 上传信号中断({signal_gap}s)但进度不足/标题未就绪,继续等待上传完成...")
- last_stall_log_time = now
- # 额外硬切策略:出现过中后段进度但长时间无新增信号时,不再继续卡住
- if has_progress_signal and last_pct >= 70 and (now - last_signal_time) >= hard_cutover_signal_gap_after:
- signal_gap = int(now - last_signal_time)
- await _attempt_enter_publish_form_from_upload("hard-cutover-progress")
- print(f"[{self.platform_name}] 中后段上传信号停滞,强制切换到标题阶段: elapsed={elapsed}s, signal_gap={signal_gap}s, last_pct={last_pct}")
- break
- # 从未出现可见进度信号时,不再长时间卡在 20%
- if (not has_progress_signal) and elapsed >= forced_continue_after and (now - last_signal_time) >= 120:
- if title_input_visible or elapsed >= max(600, upload_timeout - 90):
- print(f"[{self.platform_name}] 上传阶段长时间无可见进度信号,继续后续步骤(兜底)")
- break
- if elapsed >= 480:
- await _attempt_enter_publish_form_from_upload("hard-cutover-no-signal")
- print(f"[{self.platform_name}] 上传持续无可见信号,执行硬切换到标题阶段: elapsed={elapsed}s")
- break
- if now - last_stall_log_time >= 30:
- print(f"[{self.platform_name}] 上传暂无可见信号且标题未就绪,继续等待...")
- last_stall_log_time = now
- # 处理态持续过久时兜底继续,避免固定 DOM 文案导致无限等待
- if processing_since is not None and (now - processing_since) >= processing_stale_continue_after:
- if last_pct >= 95 or title_input_visible or elapsed >= max(780, upload_timeout - 60):
- print(f"[{self.platform_name}] 上传阶段处理态持续过久,继续后续步骤(兜底)")
- break
- if elapsed >= hard_cutover_elapsed_after:
- await _attempt_enter_publish_form_from_upload("hard-cutover-processing")
- print(f"[{self.platform_name}] 处理态持续过久且总耗时较长,执行硬切换到标题阶段: elapsed={elapsed}s")
- break
- if now - last_stall_log_time >= 30:
- print(f"[{self.platform_name}] 处理态持续较久但标题未就绪,继续等待上传收尾...")
- last_stall_log_time = now
-
- await asyncio.sleep(3)
-
- self.report_progress(60, "正在填写标题...")
- await asyncio.sleep(2)
- # 填写标题(严格校验写入结果,避免填错输入框)
- desired_title = (params.title or "").strip()[:30] # 百家号标题限制 30 字
- video_stem = os.path.splitext(os.path.basename(params.video_path or ""))[0].strip().lower()
- def _normalize_title_for_match(value: str) -> str:
- v = re.sub(r"\s+", "", str(value or "")).strip().lower()
- v = re.sub(r"[`~!@#$%^&*()_+=\[\]{}\\|;:'\",.<>/?,。!?;:、()【】《》\-\u3000]", "", v)
- return v
- def _looks_like_non_title_value(value: str) -> bool:
- raw = str(value or "").strip()
- if not raw:
- return True
- compact = raw.lower()
- # 典型 UUID(平台内部资源ID/文件名)
- if re.fullmatch(r"[0-9a-f]{8}-[0-9a-f]{4}-[1-5]?[0-9a-f]{3}-[89ab]?[0-9a-f]{3}-[0-9a-f]{12}", compact):
- return True
- # 纯英文数字/连接符且较长,通常是资源ID而不是标题
- if len(compact) >= 24 and re.fullmatch(r"[a-z0-9_-]+", compact):
- return True
- # 与视频文件名主干一致时,视为误填
- if video_stem and compact == video_stem:
- return True
- # 文件路径或带扩展名文本,视为误填
- if "\\" in raw or "/" in raw:
- return True
- if re.search(r"\.(mp4|mov|avi|mkv|wmv|flv|m4v)$", compact):
- return True
- return False
- def _title_matches_expected(current_value: str) -> bool:
- if not desired_title:
- return False
- current = str(current_value or "").strip()
- if not current:
- return False
- if _looks_like_non_title_value(current):
- return False
- expected_norm = _normalize_title_for_match(desired_title)
- current_norm = _normalize_title_for_match(current)
- if not expected_norm or not current_norm:
- return False
- if expected_norm == current_norm:
- return True
- if len(expected_norm) >= 4 and (expected_norm in current_norm or current_norm in expected_norm):
- return True
- prefix_len = min(8, len(expected_norm))
- if prefix_len >= 4 and expected_norm[:prefix_len] in current_norm:
- return True
- return False
- title_filled = False
- title_verified_value = ""
- title_failure_reason = ""
- title_selectors = [
- 'input[placeholder*="标题"]',
- 'textarea[placeholder*="标题"]',
- 'input[aria-label*="标题"]',
- 'textarea[aria-label*="标题"]',
- 'input[data-placeholder*="标题"]',
- 'textarea[data-placeholder*="标题"]',
- 'input[name*="title"]',
- 'textarea[name*="title"]',
- 'input[id*="title"]',
- 'textarea[id*="title"]',
- '[class*="title-input"] input',
- '[class*="title"] textarea',
- '[class*="title"] input',
- '[class*="headline"] input',
- '[class*="headline"] textarea',
- '[class*="name"] input',
- '[contenteditable="true"][placeholder*="标题"]',
- '[contenteditable="true"][aria-label*="标题"]',
- '[contenteditable="plaintext-only"][placeholder*="标题"]',
- '[data-placeholder*="标题"][contenteditable="true"]',
- '[class*="title"] [contenteditable="true"]',
- '[role="textbox"][aria-label*="标题"]',
- '[role="textbox"][placeholder*="标题"]',
- ]
- async def _has_editable_title_input() -> bool:
- for frame in self.page.frames:
- for selector in title_selectors:
- try:
- nodes = frame.locator(selector)
- count = await nodes.count()
- for idx in range(min(count, 10)):
- node = nodes.nth(idx)
- if not await node.is_visible():
- continue
- node_type = (await node.get_attribute('type') or '').strip().lower()
- if node_type in ['file', 'hidden', 'checkbox', 'radio', 'button', 'submit']:
- continue
- try:
- if await node.is_disabled():
- continue
- except Exception:
- pass
- return True
- except Exception:
- pass
- # 深层 DOM 检查(含 shadowRoot)
- for frame in self.page.frames:
- try:
- deep_found = await frame.evaluate(
- """
- () => {
- const roots = [document];
- const visited = new Set();
- while (roots.length) {
- const root = roots.pop();
- if (!root || visited.has(root)) continue;
- visited.add(root);
- const nodes = root.querySelectorAll('*');
- for (const n of nodes) {
- if (n && n.shadowRoot) roots.push(n.shadowRoot);
- const tag = String(n.tagName || '').toLowerCase();
- if (!['input', 'textarea'].includes(tag) && String(n.getAttribute && n.getAttribute('contenteditable') || '').toLowerCase() !== 'true' && String(n.getAttribute && n.getAttribute('role') || '').toLowerCase() !== 'textbox') {
- continue;
- }
- const type = String(n.getAttribute && n.getAttribute('type') || '').toLowerCase();
- if (tag === 'input' && ['file', 'hidden', 'checkbox', 'radio', 'button', 'submit'].includes(type)) continue;
- if (n.disabled || n.readOnly) continue;
- const style = window.getComputedStyle(n);
- if (style.display === 'none' || style.visibility === 'hidden') continue;
- const rect = n.getBoundingClientRect();
- if (!rect || rect.width < 8 || rect.height < 8) continue;
- return true;
- }
- }
- return false;
- }
- """
- )
- if deep_found:
- return True
- except Exception:
- pass
- return False
- async def _try_enter_publish_form(stage: str) -> bool:
- action_selectors = [
- 'button:has-text("去发布")',
- '[role="button"]:has-text("去发布")',
- 'button:has-text("发布视频")',
- '[role="button"]:has-text("发布视频")',
- 'button:has-text("下一步")',
- '[role="button"]:has-text("下一步")',
- 'button:has-text("继续")',
- '[role="button"]:has-text("继续")',
- 'button:has-text("完成编辑")',
- '[role="button"]:has-text("完成编辑")',
- '[class*="next"] button',
- '[class*="step"] button',
- ]
- blocked_exact = {"发布", "定时发布", "立即发布", "取消", "返回", "关闭"}
- blocked_contains = ["定时发布", "立即发布", "取消", "返回", "关闭", "删除", "重传", "重新上传", "清空"]
- for frame in self.page.frames:
- frame_url = frame.url or "about:blank"
- for selector in action_selectors:
- try:
- btns = frame.locator(selector)
- btn_count = await btns.count()
- for idx in range(min(btn_count, 6)):
- btn = btns.nth(idx)
- if not await btn.is_visible():
- continue
- text = (await btn.text_content() or "").strip()
- compact = re.sub(r"\s+", "", text)
- if compact in blocked_exact or any(t in compact for t in blocked_contains):
- continue
- disabled_attr = await btn.get_attribute('disabled')
- aria_disabled = (await btn.get_attribute('aria-disabled') or '').lower()
- if disabled_attr is not None or aria_disabled == 'true':
- continue
- try:
- await btn.scroll_into_view_if_needed(timeout=1500)
- except Exception:
- pass
- try:
- await btn.click(timeout=3000)
- except Exception:
- await btn.click(force=True, timeout=3000)
- print(f"[{self.platform_name}] 尝试进入发布表单: stage={stage}, frame={frame_url}, selector={selector}, text={compact or text}, idx={idx}")
- await asyncio.sleep(1.2)
- if await _has_editable_title_input():
- print(f"[{self.platform_name}] 已进入可编辑发布表单: stage={stage}")
- return True
- except Exception:
- pass
- # 深层 DOM 兜底(含 shadowRoot)
- try:
- deep_clicked = await self.page.evaluate(
- """
- () => {
- const wanted = ['去发布', '发布视频', '下一步', '继续', '完成编辑'];
- const blockedExact = new Set(['发布', '定时发布', '立即发布', '取消', '返回', '关闭']);
- const blockedContains = ['定时发布', '立即发布', '取消', '返回', '关闭', '删除', '重传', '重新上传', '清空'];
- const roots = [document];
- const visited = new Set();
- const allNodes = [];
- while (roots.length) {
- const root = roots.pop();
- if (!root || visited.has(root)) continue;
- visited.add(root);
- const nodes = root.querySelectorAll('*');
- for (const n of nodes) {
- allNodes.push(n);
- if (n && n.shadowRoot) roots.push(n.shadowRoot);
- }
- }
- const isVisible = (el) => {
- try {
- const style = window.getComputedStyle(el);
- if (style.display === 'none' || style.visibility === 'hidden' || style.pointerEvents === 'none') return false;
- const rect = el.getBoundingClientRect();
- return !!rect && rect.width > 8 && rect.height > 8;
- } catch {
- return false;
- }
- };
- for (const el of allNodes) {
- const text = String(el.innerText || el.textContent || '').replace(/\\s+/g, '').trim();
- if (!text) continue;
- if (blockedExact.has(text)) continue;
- if (blockedContains.some(x => text.includes(x))) continue;
- if (!wanted.some(x => text.includes(x))) continue;
- if (!isVisible(el)) continue;
- const tag = String(el.tagName || '').toLowerCase();
- const role = String(el.getAttribute && el.getAttribute('role') || '').toLowerCase();
- const cls = String(el.className || '').toLowerCase();
- const clickable = tag === 'button' || tag === 'a' || role === 'button' || /btn|button|next|step/.test(cls);
- if (!clickable) continue;
- try {
- el.click();
- return { ok: true, text };
- } catch {}
- }
- return { ok: false, text: '' };
- }
- """
- )
- if deep_clicked and deep_clicked.get("ok"):
- print(f"[{self.platform_name}] 深层DOM进入发布表单成功: stage={stage}, text={str(deep_clicked.get('text') or '').strip()}")
- await asyncio.sleep(1.2)
- if await _has_editable_title_input():
- print(f"[{self.platform_name}] 已进入可编辑发布表单(深层DOM): stage={stage}")
- return True
- except Exception:
- pass
- return False
- # 先等待可编辑标题框出现,避免上传兜底后立即进入导致误命中 file input
- await _try_enter_publish_form("pre-title")
- title_ready = False
- title_wait_deadline = asyncio.get_event_loop().time() + 180
- last_title_wait_log = 0.0
- last_enter_publish_try = 0.0
- while asyncio.get_event_loop().time() < title_wait_deadline and not title_ready:
- try:
- if await _has_editable_title_input():
- title_ready = True
- break
- except Exception:
- pass
- for frame in self.page.frames:
- if title_ready:
- break
- for selector in title_selectors:
- if title_ready:
- break
- try:
- title_nodes = frame.locator(selector)
- node_count = await title_nodes.count()
- for idx in range(min(node_count, 8)):
- node = title_nodes.nth(idx)
- if not await node.is_visible():
- continue
- node_type = (await node.get_attribute('type') or '').strip().lower()
- if node_type in ['file', 'hidden', 'checkbox', 'radio', 'button', 'submit']:
- continue
- try:
- if await node.is_disabled():
- continue
- except Exception:
- pass
- title_ready = True
- break
- except Exception:
- pass
- if title_ready:
- break
- now_wait = asyncio.get_event_loop().time()
- if now_wait - last_title_wait_log >= 10:
- print(f"[{self.platform_name}] 等待可编辑标题输入框... frames={len(self.page.frames)}")
- last_title_wait_log = now_wait
- if now_wait - last_enter_publish_try >= 15:
- await _try_enter_publish_form("title-wait")
- last_enter_publish_try = now_wait
- await asyncio.sleep(2)
- if not title_ready:
- title_failure_reason = "title-not-ready"
- print(f"[{self.platform_name}] 未检测到明确标题输入框,进入兜底识别模式")
- for frame in self.page.frames:
- if title_filled:
- break
- frame_url = frame.url or "about:blank"
- for selector in title_selectors:
- if title_filled:
- break
- try:
- title_nodes = frame.locator(selector)
- node_count = await title_nodes.count()
- for idx in range(min(node_count, 8)):
- node = title_nodes.nth(idx)
- if not await node.is_visible():
- continue
- node_type = (await node.get_attribute('type') or '').strip().lower()
- if node_type in ['file', 'hidden', 'checkbox', 'radio', 'button', 'submit']:
- continue
- try:
- if await node.is_disabled():
- continue
- except Exception:
- pass
- node_tag = ""
- try:
- node_tag = ((await node.evaluate("el => (el.tagName || '').toLowerCase()")) or "").strip()
- except Exception:
- node_tag = ""
- contenteditable_attr = (await node.get_attribute('contenteditable') or '').strip().lower()
- role_attr = (await node.get_attribute('role') or '').strip().lower()
- is_text_input = node_tag in ['input', 'textarea']
- is_editable_block = contenteditable_attr == 'true' or role_attr == 'textbox'
- try:
- await node.click(timeout=2000)
- except Exception:
- pass
- if is_text_input:
- try:
- await node.fill(desired_title, timeout=5000)
- except Exception:
- try:
- await self.page.keyboard.press("Control+KeyA")
- await self.page.keyboard.press("Backspace")
- await self.page.keyboard.type(desired_title)
- except Exception:
- continue
- elif is_editable_block:
- try:
- await self.page.keyboard.press("Control+KeyA")
- await self.page.keyboard.press("Backspace")
- await self.page.keyboard.type(desired_title)
- except Exception:
- try:
- await node.evaluate(
- """
- (el, title) => {
- el.focus();
- el.textContent = title;
- el.dispatchEvent(new Event('input', { bubbles: true }));
- el.dispatchEvent(new Event('change', { bubbles: true }));
- }
- """,
- desired_title
- )
- except Exception:
- continue
- else:
- continue
- await asyncio.sleep(0.2)
- current_value = ""
- if is_text_input:
- try:
- current_value = (await node.input_value() or "").strip()
- except Exception:
- current_value = ""
- else:
- try:
- current_value = ((await node.evaluate("el => (el.innerText || el.textContent || '')")) or "").strip()
- except Exception:
- current_value = ""
- if _title_matches_expected(current_value):
- title_filled = True
- title_verified_value = current_value
- print(f"[{self.platform_name}] 标题填写成功: frame={frame_url}, selector={selector}, idx={idx}, value={current_value}")
- break
- elif current_value:
- title_failure_reason = "candidate-mismatch"
- # 对同一节点再做一次 JS 强制赋值,处理键盘输入未生效的情况
- forced_value = ""
- try:
- forced_value = (
- (await node.evaluate(
- """
- (el, title) => {
- const tag = String(el.tagName || '').toLowerCase();
- const type = String((el.getAttribute('type') || '')).toLowerCase();
- if (tag === 'input' && ['file', 'hidden', 'checkbox', 'radio', 'button', 'submit'].includes(type)) return '';
- const ce = String(el.getAttribute('contenteditable') || '').toLowerCase();
- const role = String(el.getAttribute('role') || '').toLowerCase();
- const isTextInput = tag === 'input' || tag === 'textarea';
- const isEditableBlock = ce === 'true' || role === 'textbox';
- const emit = () => {
- el.dispatchEvent(new Event('input', { bubbles: true }));
- el.dispatchEvent(new Event('change', { bubbles: true }));
- };
- try { el.focus(); } catch {}
- if (isTextInput) {
- try {
- const proto = tag === 'textarea' ? window.HTMLTextAreaElement.prototype : window.HTMLInputElement.prototype;
- const setter = Object.getOwnPropertyDescriptor(proto, 'value')?.set;
- if (setter) {
- setter.call(el, '');
- emit();
- setter.call(el, title);
- emit();
- } else {
- el.value = '';
- emit();
- el.value = title;
- emit();
- }
- } catch {
- el.value = title;
- emit();
- }
- return String(el.value || '').trim();
- }
- if (isEditableBlock) {
- el.textContent = '';
- emit();
- el.textContent = title;
- emit();
- return String(el.innerText || el.textContent || '').trim();
- }
- return '';
- }
- """,
- desired_title
- )) or ""
- ).strip()
- except Exception:
- forced_value = ""
- if _title_matches_expected(forced_value):
- title_filled = True
- title_verified_value = forced_value
- print(f"[{self.platform_name}] 标题强制写入成功: frame={frame_url}, selector={selector}, idx={idx}, value={forced_value}")
- break
- print(f"[{self.platform_name}] 标题候选值不匹配,已忽略: frame={frame_url}, selector={selector}, idx={idx}, value={current_value}")
- except Exception as e:
- print(f"[{self.platform_name}] 标题选择器失败: frame={frame_url}, selector={selector}, err={e}")
- # 深层 DOM 兜底(含 shadowRoot)
- if not title_filled and desired_title:
- for frame in self.page.frames:
- if title_filled:
- break
- frame_url = frame.url or "about:blank"
- try:
- deep_result = await frame.evaluate(
- """
- (title) => {
- const roots = [document];
- const visited = new Set();
- const candidates = [];
- while (roots.length) {
- const root = roots.pop();
- if (!root || visited.has(root)) continue;
- visited.add(root);
- const nodes = root.querySelectorAll('*');
- for (const n of nodes) {
- if (n && n.shadowRoot) roots.push(n.shadowRoot);
- const tag = String(n.tagName || '').toLowerCase();
- const type = String(n.getAttribute && n.getAttribute('type') || '').toLowerCase();
- const ce = String(n.getAttribute && n.getAttribute('contenteditable') || '').toLowerCase();
- const role = String(n.getAttribute && n.getAttribute('role') || '').toLowerCase();
- const isTextInput = tag === 'input' || tag === 'textarea';
- const isEditableBlock = ce === 'true' || role === 'textbox';
- if (!isTextInput && !isEditableBlock) continue;
- if (tag === 'input' && ['file', 'hidden', 'checkbox', 'radio', 'button', 'submit'].includes(type)) continue;
- if (n.disabled || n.readOnly) continue;
- const style = window.getComputedStyle(n);
- if (style.display === 'none' || style.visibility === 'hidden') continue;
- const rect = n.getBoundingClientRect();
- if (!rect || rect.width < 8 || rect.height < 8) continue;
- const ph = String(n.getAttribute && n.getAttribute('placeholder') || '');
- const aria = String(n.getAttribute && n.getAttribute('aria-label') || '');
- const name = String(n.getAttribute && n.getAttribute('name') || '');
- const id = String(n.getAttribute && n.getAttribute('id') || '');
- const cls = String(n.className || '');
- const maxLen = parseInt(String(n.getAttribute && n.getAttribute('maxlength') || '0'), 10) || 0;
- const container = n.closest && n.closest('label, [class*="form"], [class*="item"], [class*="field"], [class*="title"]');
- const ctx = String((container && container.innerText) || '').slice(0, 80);
- let score = 0;
- if (/标题|title/i.test(ph)) score += 7;
- if (/标题|title/i.test(aria)) score += 6;
- if (/标题|title/i.test(name)) score += 5;
- if (/标题|title/i.test(id)) score += 5;
- if (/title|标题/i.test(cls)) score += 4;
- if (/标题|title/i.test(ctx)) score += 5;
- if (maxLen > 0 && maxLen <= 40) score += 3;
- if (isTextInput) score += 2;
- if (isEditableBlock) score += 1;
- candidates.push({ n, score, isTextInput, isEditableBlock });
- }
- }
- candidates.sort((a, b) => b.score - a.score);
- if (!candidates.length) return { ok: false, value: '', reason: 'no-candidate' };
- const emit = (el) => {
- el.dispatchEvent(new Event('input', { bubbles: true }));
- el.dispatchEvent(new Event('change', { bubbles: true }));
- };
- let lastError = '';
- for (const item of candidates.slice(0, 12)) {
- const el = item.n;
- try {
- el.focus();
- if (item.isTextInput) {
- const tag = String(el.tagName || '').toLowerCase();
- const proto = tag === 'textarea' ? window.HTMLTextAreaElement.prototype : window.HTMLInputElement.prototype;
- const setter = Object.getOwnPropertyDescriptor(proto, 'value')?.set;
- if (setter) {
- setter.call(el, '');
- emit(el);
- setter.call(el, title);
- emit(el);
- } else {
- el.value = '';
- emit(el);
- el.value = title;
- emit(el);
- }
- const v = String(el.value || '').trim();
- if (v) return { ok: true, value: v, score: item.score };
- } else if (item.isEditableBlock) {
- el.textContent = '';
- emit(el);
- el.textContent = title;
- emit(el);
- const v = String(el.innerText || el.textContent || '').trim();
- if (v) return { ok: true, value: v, score: item.score };
- }
- } catch (e) {
- lastError = String(e || '');
- }
- }
- return { ok: false, value: '', reason: lastError || 'set-value-failed' };
- }
- """,
- desired_title
- )
- if deep_result and deep_result.get('ok'):
- deep_written = str(deep_result.get('value') or '').strip()
- if _title_matches_expected(deep_written):
- title_filled = True
- title_verified_value = deep_written
- print(f"[{self.platform_name}] 标题深层DOM填写成功: frame={frame_url}, value={deep_written}")
- break
- elif deep_written:
- title_failure_reason = "deep-dom-mismatch"
- print(f"[{self.platform_name}] 标题深层DOM命中但值不匹配: frame={frame_url}, value={deep_written}")
- except Exception:
- pass
- # JS 兜底写入标题
- if not title_filled and desired_title:
- fallback_reason = ""
- for frame in self.page.frames:
- if title_filled:
- break
- frame_url = frame.url or "about:blank"
- try:
- fallback = await frame.evaluate(
- """
- (title) => {
- const nodes = Array.from(document.querySelectorAll(
- 'input:not([type="file"]):not([type="hidden"]), textarea, [contenteditable="true"], [role="textbox"]'
- ));
- const scored = nodes
- .map((el) => {
- const tag = String(el.tagName || '').toLowerCase();
- const type = String((el.getAttribute('type') || '')).toLowerCase();
- if (tag === 'input' && ['file', 'hidden', 'checkbox', 'radio', 'button', 'submit'].includes(type)) return null;
- if (el.disabled || el.readOnly) return null;
- const style = window.getComputedStyle(el);
- if (style.display === 'none' || style.visibility === 'hidden') return null;
- const rect = el.getBoundingClientRect();
- if (!rect || rect.width < 8 || rect.height < 8) return null;
- const ph = String(el.getAttribute('placeholder') || '');
- const aria = String(el.getAttribute('aria-label') || '');
- const name = String(el.getAttribute('name') || '');
- const id = String(el.getAttribute('id') || '');
- const cls = String(el.className || '');
- const ce = String(el.getAttribute('contenteditable') || '').toLowerCase();
- const role = String(el.getAttribute('role') || '').toLowerCase();
- const maxLen = parseInt(String(el.getAttribute('maxlength') || '0'), 10) || 0;
- const container = el.closest('label, [class*="form"], [class*="item"], [class*="field"], [class*="title"]');
- const ctx = String((container && container.innerText) || '').slice(0, 80);
- let score = 0;
- if (ph.includes('标题')) score += 6;
- if (aria.includes('标题')) score += 5;
- if (/title|标题/i.test(name)) score += 4;
- if (/title|标题/i.test(id)) score += 4;
- if (/title|标题/i.test(cls)) score += 3;
- if (/标题|title/i.test(ctx)) score += 4;
- if (maxLen > 0 && maxLen <= 40) score += 3;
- if (tag === 'input' || tag === 'textarea') score += 1;
- if (ce === 'true' || role === 'textbox') score += 2;
- return { el, score, maxLen };
- })
- .filter(x => x && x.score > 0)
- .sort((a, b) => b.score - a.score);
- // 没有明显标题线索时,回退到短输入框(常见标题长度限制)
- const candidates = scored.length
- ? scored
- : nodes
- .map((el) => {
- const tag = String(el.tagName || '').toLowerCase();
- const type = String((el.getAttribute('type') || '')).toLowerCase();
- if (tag === 'input' && ['file', 'hidden', 'checkbox', 'radio', 'button', 'submit'].includes(type)) return null;
- if (el.disabled || el.readOnly) return null;
- const style = window.getComputedStyle(el);
- if (style.display === 'none' || style.visibility === 'hidden') return null;
- const rect = el.getBoundingClientRect();
- if (!rect || rect.width < 8 || rect.height < 8) return null;
- const maxLen = parseInt(String(el.getAttribute('maxlength') || '0'), 10) || 0;
- const score = (maxLen > 0 && maxLen <= 40 ? 3 : 0) + (tag === 'input' || tag === 'textarea' ? 1 : 0);
- return score > 0 ? { el, score, maxLen } : null;
- })
- .filter(Boolean)
- .sort((a, b) => b.score - a.score);
- if (!candidates.length) return { ok: false, value: '', reason: 'no-scored-input' };
- let lastError = '';
- for (const item of candidates.slice(0, 10)) {
- const target = item.el;
- const tag = String(target.tagName || '').toLowerCase();
- const ce = String(target.getAttribute('contenteditable') || '').toLowerCase();
- const role = String(target.getAttribute('role') || '').toLowerCase();
- const isTextInput = tag === 'input' || tag === 'textarea';
- const isEditableBlock = ce === 'true' || role === 'textbox';
- try {
- target.focus();
- if (isTextInput) {
- target.value = '';
- target.dispatchEvent(new Event('input', { bubbles: true }));
- target.value = title;
- target.dispatchEvent(new Event('input', { bubbles: true }));
- target.dispatchEvent(new Event('change', { bubbles: true }));
- const v = String(target.value || '').trim();
- if (v) return { ok: true, value: v, score: item.score || 0 };
- } else if (isEditableBlock) {
- target.textContent = '';
- target.dispatchEvent(new Event('input', { bubbles: true }));
- target.textContent = title;
- target.dispatchEvent(new Event('input', { bubbles: true }));
- target.dispatchEvent(new Event('change', { bubbles: true }));
- const v = String(target.innerText || target.textContent || '').trim();
- if (v) return { ok: true, value: v, score: item.score || 0 };
- }
- } catch (e) {
- lastError = String(e || '');
- }
- }
- return { ok: false, value: '', reason: lastError || 'set-value-failed' };
- }
- """,
- desired_title
- )
- if fallback and fallback.get('ok'):
- written = str(fallback.get('value') or '').strip()
- if _title_matches_expected(written):
- title_filled = True
- title_verified_value = written
- print(f"[{self.platform_name}] 标题 JS 兜底填写成功: frame={frame_url}, value={written}")
- break
- elif written:
- fallback_reason = f"fallback-value-not-match:{written}"
- title_failure_reason = fallback_reason
- print(f"[{self.platform_name}] 标题 JS 兜底命中疑似错误字段,已忽略: frame={frame_url}, value={written}")
- elif fallback:
- fallback_reason = str(fallback.get('reason') or '')
- if fallback_reason:
- title_failure_reason = fallback_reason
- except Exception as e:
- fallback_reason = str(e)
- if fallback_reason:
- title_failure_reason = fallback_reason
- if not title_filled:
- print(f"[{self.platform_name}] 标题 JS 兜底未命中: reason={fallback_reason or 'unknown'}")
- # 强化重试:标题框可能在上传收尾阶段延迟可编辑,循环尝试写入一段时间
- if not title_filled and desired_title:
- print(f"[{self.platform_name}] 标题常规填写未命中,进入强化重试...")
- # 百家号在上传 80%+ 后可能经历较长静默处理期,给更长窗口等待标题输入框真正可编辑
- strong_retry_deadline = asyncio.get_event_loop().time() + 240
- strong_retry_round = 0
- last_retry_log = 0.0
- while asyncio.get_event_loop().time() < strong_retry_deadline and not title_filled:
- strong_retry_round += 1
- retry_reason = ""
- if strong_retry_round == 1 or strong_retry_round % 5 == 0:
- await _try_enter_publish_form(f"title-retry-{strong_retry_round}")
- for frame in self.page.frames:
- if title_filled:
- break
- frame_url = frame.url or "about:blank"
- try:
- retry_result = await frame.evaluate(
- """
- (title) => {
- const nodes = Array.from(document.querySelectorAll(
- 'input:not([type="file"]):not([type="hidden"]), textarea, [contenteditable="true"], [role="textbox"]'
- ));
- const candidates = nodes
- .map((el) => {
- const tag = String(el.tagName || '').toLowerCase();
- const type = String((el.getAttribute('type') || '')).toLowerCase();
- if (tag === 'input' && ['file', 'hidden', 'checkbox', 'radio', 'button', 'submit'].includes(type)) return null;
- if (el.disabled || el.readOnly) return null;
- const style = window.getComputedStyle(el);
- if (style.display === 'none' || style.visibility === 'hidden') return null;
- const rect = el.getBoundingClientRect();
- if (!rect || rect.width < 8 || rect.height < 8) return null;
- const ph = String(el.getAttribute('placeholder') || '');
- const aria = String(el.getAttribute('aria-label') || '');
- const name = String(el.getAttribute('name') || '');
- const id = String(el.getAttribute('id') || '');
- const cls = String(el.className || '');
- const ce = String(el.getAttribute('contenteditable') || '').toLowerCase();
- const role = String(el.getAttribute('role') || '').toLowerCase();
- const maxLen = parseInt(String(el.getAttribute('maxlength') || '0'), 10) || 0;
- const container = el.closest('label, [class*="form"], [class*="item"], [class*="field"], [class*="title"]');
- const ctx = String((container && container.innerText) || '').slice(0, 80);
- let score = 0;
- if (/标题|title/i.test(ph)) score += 7;
- if (/标题|title/i.test(aria)) score += 6;
- if (/标题|title/i.test(name)) score += 5;
- if (/标题|title/i.test(id)) score += 5;
- if (/title|标题/i.test(cls)) score += 4;
- if (/标题|title/i.test(ctx)) score += 5;
- if (maxLen > 0 && maxLen <= 40) score += 3;
- if (tag === 'input' || tag === 'textarea') score += 2;
- if (ce === 'true' || role === 'textbox') score += 1;
- return { el, score };
- })
- .filter(Boolean)
- .sort((a, b) => b.score - a.score);
- if (!candidates.length) {
- return { ok: false, value: '', score: -1, reason: 'no-candidate' };
- }
- let lastError = '';
- for (const item of candidates.slice(0, 12)) {
- const target = item.el;
- const tag = String(target.tagName || '').toLowerCase();
- const ce = String(target.getAttribute('contenteditable') || '').toLowerCase();
- const role = String(target.getAttribute('role') || '').toLowerCase();
- const isTextInput = tag === 'input' || tag === 'textarea';
- const isEditableBlock = ce === 'true' || role === 'textbox';
- const emit = () => {
- target.dispatchEvent(new Event('input', { bubbles: true }));
- target.dispatchEvent(new Event('change', { bubbles: true }));
- };
- try {
- target.focus();
- if (isTextInput) {
- try {
- const proto = tag === 'textarea' ? window.HTMLTextAreaElement.prototype : window.HTMLInputElement.prototype;
- const setter = Object.getOwnPropertyDescriptor(proto, 'value')?.set;
- if (setter) {
- setter.call(target, '');
- emit();
- setter.call(target, title);
- emit();
- } else {
- target.value = '';
- emit();
- target.value = title;
- emit();
- }
- } catch {
- target.value = title;
- emit();
- }
- const v = String(target.value || '').trim();
- if (v) return { ok: true, value: v, score: item.score || 0, reason: '' };
- } else if (isEditableBlock) {
- target.textContent = '';
- emit();
- target.textContent = title;
- emit();
- const v = String(target.innerText || target.textContent || '').trim();
- if (v) return { ok: true, value: v, score: item.score || 0, reason: '' };
- }
- } catch (e) {
- lastError = String(e || '');
- }
- }
- return { ok: false, value: '', score: -1, reason: lastError || 'set-value-failed' };
- }
- """,
- desired_title
- )
- if retry_result and retry_result.get('ok'):
- written = str(retry_result.get('value') or '').strip()
- score = int(retry_result.get('score') or 0)
- # 强化重试仍要求“像标题”且可匹配,避免误写到其他文本框
- if score >= 3 and _title_matches_expected(written):
- title_filled = True
- title_verified_value = written
- print(f"[{self.platform_name}] 标题强化重试成功: round={strong_retry_round}, frame={frame_url}, score={score}, value={written}")
- break
- elif written:
- retry_reason = f"value-not-match:{written},score={score}"
- elif retry_result:
- retry_reason = str(retry_result.get('reason') or '')
- except Exception as e:
- retry_reason = str(e)
- if title_filled:
- break
- now_retry = asyncio.get_event_loop().time()
- if retry_reason in ("no-candidate", "no-scored-input"):
- has_title_input = await _has_editable_title_input()
- if not has_title_input:
- retry_reason = "no-candidate-and-form-not-ready"
- if now_retry - last_retry_log >= 10:
- print(f"[{self.platform_name}] 标题强化重试中: round={strong_retry_round}, reason={retry_reason or 'pending'}")
- last_retry_log = now_retry
- if retry_reason:
- title_failure_reason = retry_reason
- await asyncio.sleep(3)
- # AI 兜底:页面结构变化时,通过视觉识别返回可用 selector
- if not title_filled and desired_title:
- print(f"[{self.platform_name}] 标题强化重试仍未命中,尝试 AI selector 兜底...")
- try:
- ai_goal = "找到页面中用于填写视频标题的输入框或可编辑区域,返回一个可直接输入标题的 Playwright selector"
- ai_selector = await self.ai_suggest_playwright_selector(ai_goal)
- if ai_selector.get("has_selector"):
- selector = str(ai_selector.get("selector") or "").strip()
- confidence = int(ai_selector.get("confidence") or 0)
- print(f"[{self.platform_name}] AI 标题 selector: {selector}, confidence={confidence}")
- for frame in self.page.frames:
- if title_filled:
- break
- frame_url = frame.url or "about:blank"
- try:
- loc = frame.locator(selector).first
- if await loc.count() <= 0 or not await loc.is_visible():
- continue
- try:
- await loc.click(timeout=2500)
- except Exception:
- pass
- node_tag = ""
- try:
- node_tag = ((await loc.evaluate("el => (el.tagName || '').toLowerCase()")) or "").strip()
- except Exception:
- node_tag = ""
- is_text_input = node_tag in ["input", "textarea"]
- if is_text_input:
- try:
- await loc.fill(desired_title, timeout=5000)
- except Exception:
- await self.page.keyboard.press("Control+KeyA")
- await self.page.keyboard.press("Backspace")
- await self.page.keyboard.type(desired_title)
- else:
- try:
- await self.page.keyboard.press("Control+KeyA")
- await self.page.keyboard.press("Backspace")
- await self.page.keyboard.type(desired_title)
- except Exception:
- await loc.evaluate(
- """
- (el, title) => {
- el.focus();
- const tag = String(el.tagName || '').toLowerCase();
- if (tag === 'input' || tag === 'textarea') {
- el.value = title;
- } else {
- el.textContent = title;
- }
- el.dispatchEvent(new Event('input', { bubbles: true }));
- el.dispatchEvent(new Event('change', { bubbles: true }));
- }
- """,
- desired_title
- )
- await asyncio.sleep(0.3)
- current_value = ""
- try:
- if is_text_input:
- current_value = (await loc.input_value() or "").strip()
- else:
- current_value = ((await loc.evaluate("el => (el.innerText || el.textContent || '')")) or "").strip()
- except Exception:
- current_value = ""
- if _title_matches_expected(current_value):
- title_filled = True
- title_verified_value = current_value
- print(f"[{self.platform_name}] AI selector 标题填写成功: frame={frame_url}, value={current_value}")
- break
- else:
- print(f"[{self.platform_name}] AI selector 命中但值不匹配: frame={frame_url}, value={current_value}")
- except Exception as e:
- print(f"[{self.platform_name}] AI selector 执行失败: frame={frame_url}, err={e}")
- else:
- print(f"[{self.platform_name}] AI 未返回可用标题 selector: {ai_selector.get('notes') or 'no-notes'}")
- title_failure_reason = "ai-no-selector"
- except Exception as e:
- print(f"[{self.platform_name}] AI 标题兜底异常: {e}")
- title_failure_reason = f"ai-exception:{e}"
- if not title_filled:
- # 某些版本页面在上传后长期不暴露可编辑标题框;不中断流程,尝试继续发布。
- if any(k in (title_failure_reason or "") for k in ["no-candidate", "form-not-ready", "title-not-ready", "ai-no-selector"]):
- print(f"[{self.platform_name}] 标题输入框未就绪({title_failure_reason}),继续后续发布流程(使用页面现有标题)")
- else:
- screenshot_base64 = await self.capture_screenshot()
- return PublishResult(
- success=False,
- platform=self.platform_name,
- error=f"标题填写失败,已终止发布: {title_failure_reason or 'unknown'}",
- screenshot_base64=screenshot_base64,
- page_url=await self.get_page_url(),
- status='failed'
- )
-
- # 填写描述
- if params.description:
- self.report_progress(65, "正在填写描述...")
- try:
- desc_selectors = [
- 'textarea[placeholder*="描述"]',
- 'textarea[placeholder*="简介"]',
- '[class*="desc"] textarea',
- '[class*="description"] textarea',
- ]
- for selector in desc_selectors:
- try:
- desc_input = self.page.locator(selector).first
- if await desc_input.count() > 0 and await desc_input.is_visible():
- await desc_input.click()
- await self.page.keyboard.type(params.description[:200])
- print(f"[{self.platform_name}] 描述填写成功")
- break
- except:
- pass
- except Exception as e:
- print(f"[{self.platform_name}] 描述填写失败: {e}")
-
- self.report_progress(70, "正在发布...")
- await asyncio.sleep(1.5)
- # 点击发布按钮(等待按钮可点击,避免上传完成后直接误判失败)
- publish_selectors = [
- 'button:has-text("立即发布")',
- '[role="button"]:has-text("立即发布")',
- 'button:has-text("确认发布")',
- '[role="button"]:has-text("确认发布")',
- 'button:has-text("发布")',
- '[role="button"]:has-text("发布")',
- 'button:has-text("发表")',
- 'button:has-text("提交")',
- '[class*="publish"] button',
- '[class*="submit"] button',
- ]
- publish_blocked_keywords = [
- "定时发布",
- "预约发布",
- "存草稿",
- "草稿",
- "取消",
- "返回",
- "预览",
- ]
- publish_processing_indicators = [
- 'div:has-text("发布中")',
- 'div:has-text("提交中")',
- 'span:has-text("发布中")',
- 'span:has-text("提交中")',
- 'div:has-text("正在上传")',
- 'div:has-text("正在处理")',
- 'span:has-text("正在上传")',
- 'span:has-text("正在处理")',
- 'div:has-text("请稍候")',
- 'span:has-text("请稍候")',
- 'div:has-text("审核中")',
- 'span:has-text("审核中")',
- ]
- def _compact_btn_text(text: str) -> str:
- return re.sub(r"\s+", "", str(text or "")).strip()
- def _score_publish_button(btn_text_compact: str, prefer_confirm: bool = False) -> int:
- if not btn_text_compact:
- return -1
- if any(k in btn_text_compact for k in publish_blocked_keywords):
- return -1
- if "发布中" in btn_text_compact or "提交中" in btn_text_compact:
- return -1
- score = -1
- if "立即发布" in btn_text_compact:
- score = 130
- elif btn_text_compact == "确认发布":
- score = 125
- elif "确认发布" in btn_text_compact:
- score = 120
- elif btn_text_compact == "发布":
- score = 115
- elif "发布" in btn_text_compact:
- score = 100
- elif "发表" in btn_text_compact:
- score = 80
- elif "提交" in btn_text_compact:
- score = 70
- if score < 0:
- return -1
- if prefer_confirm and ("确认发布" in btn_text_compact or "立即发布" in btn_text_compact):
- score += 20
- return score
- async def _collect_publish_candidates(prefer_confirm: bool = False):
- candidates = []
- found_visible_button = False
- found_disabled_button = False
- for frame in self.page.frames:
- frame_url = frame.url or "about:blank"
- for selector in publish_selectors:
- try:
- btns = frame.locator(selector)
- btn_count = await btns.count()
- for idx in range(min(btn_count, 6)):
- btn = btns.nth(idx)
- if not await btn.is_visible():
- continue
- found_visible_button = True
- btn_text = (await btn.text_content() or "").strip()
- btn_text_compact = _compact_btn_text(btn_text)
- disabled_attr = await btn.get_attribute('disabled')
- aria_disabled = (await btn.get_attribute('aria-disabled') or '').lower()
- cls = (await btn.get_attribute('class') or '').lower()
- is_disabled = bool(disabled_attr) or aria_disabled == 'true' or 'disabled' in cls
- if is_disabled:
- found_disabled_button = True
- continue
- score = _score_publish_button(btn_text_compact, prefer_confirm=prefer_confirm)
- if score < 0:
- continue
- candidates.append({
- "btn": btn,
- "frame_url": frame_url,
- "selector": selector,
- "idx": idx,
- "text": btn_text,
- "score": score,
- })
- except Exception:
- pass
- candidates.sort(key=lambda x: x.get("score", 0), reverse=True)
- return candidates, found_visible_button, found_disabled_button
- async def _click_publish_candidate(candidate: dict):
- btn = candidate.get("btn")
- if not btn:
- return False, "candidate-empty"
- frame_url = str(candidate.get("frame_url") or "about:blank")
- selector = str(candidate.get("selector") or "")
- idx = int(candidate.get("idx") or 0)
- btn_text = str(candidate.get("text") or "").strip()
- before_url = self.page.url
- try:
- try:
- await btn.scroll_into_view_if_needed(timeout=1500)
- except Exception:
- pass
- try:
- await btn.click(timeout=4000)
- except Exception:
- await btn.click(force=True, timeout=4000)
- await asyncio.sleep(0.6)
- after_url = self.page.url
- state_flags = []
- if after_url != before_url:
- state_flags.append("url-changed")
- try:
- post_text = _compact_btn_text(await btn.text_content() or "")
- if any(k in post_text for k in ["发布中", "提交中", "处理中"]):
- state_flags.append("btn-processing")
- except Exception:
- pass
- try:
- for indicator in publish_processing_indicators:
- loc = self.page.locator(indicator).first
- if await loc.count() > 0 and await loc.is_visible():
- state_flags.append("processing-indicator")
- break
- except Exception:
- pass
- state_desc = ",".join(state_flags) if state_flags else "no-immediate-signal"
- print(f"[{self.platform_name}] 点击发布按钮成功: frame={frame_url}, selector={selector}, idx={idx}, text={btn_text}, state={state_desc}")
- return True, ""
- except Exception as e:
- return False, str(e)
- publish_clicked = False
- publish_click_error = ""
- publish_clicked_text = ""
- click_deadline = asyncio.get_event_loop().time() + 180
- last_publish_log = 0.0
- while asyncio.get_event_loop().time() < click_deadline and not publish_clicked:
- candidates, found_visible_button, found_disabled_button = await _collect_publish_candidates(prefer_confirm=False)
- if candidates:
- for candidate in candidates[:6]:
- ok, err = await _click_publish_candidate(candidate)
- if ok:
- publish_clicked = True
- publish_clicked_text = str(candidate.get("text") or "").strip()
- break
- if err:
- publish_click_error = err
- if publish_clicked:
- break
- now_click = asyncio.get_event_loop().time()
- if now_click - last_publish_log >= 10:
- if found_visible_button and found_disabled_button:
- print(f"[{self.platform_name}] 发布按钮可见但不可点击,等待可用...")
- elif found_visible_button:
- print(f"[{self.platform_name}] 发布按钮可见,但点击失败,继续重试...")
- else:
- print(f"[{self.platform_name}] 尚未找到可见发布按钮,继续等待...")
- last_publish_log = now_click
- await asyncio.sleep(2)
- # 某些页面会二次弹出“确认发布/立即发布”,补一次优先确认点击
- if publish_clicked:
- initial_text = _compact_btn_text(publish_clicked_text)
- if initial_text and initial_text != "立即发布":
- await asyncio.sleep(1)
- confirm_candidates, _, _ = await _collect_publish_candidates(prefer_confirm=True)
- for candidate in confirm_candidates[:4]:
- candidate_text = _compact_btn_text(str(candidate.get("text") or ""))
- if candidate_text == initial_text and ("确认发布" not in candidate_text and "立即发布" not in candidate_text):
- continue
- ok, err = await _click_publish_candidate(candidate)
- if ok:
- print(f"[{self.platform_name}] 检测到二次确认发布流程,已补点确认按钮: {candidate_text}")
- break
- if err:
- publish_click_error = err
- if not publish_clicked:
- screenshot_base64 = await self.capture_screenshot()
- return PublishResult(
- success=False,
- platform=self.platform_name,
- error=f"发布按钮未找到或不可点击(可能仍在处理/必填项未通过)。title={title_verified_value or desired_title}; err={publish_click_error or 'none'}",
- screenshot_base64=screenshot_base64,
- page_url=await self.get_page_url(),
- status='failed'
- )
-
- self.report_progress(80, "等待发布完成...")
-
- # 记录点击发布前的 URL
- publish_page_url = self.page.url
- print(f"[{self.platform_name}] 发布前 URL: {publish_page_url}")
-
- # 等待发布完成(百家号审核/处理链路可能较慢,默认等待 15 分钟)
- publish_timeout = 900
- start_time = asyncio.get_event_loop().time()
- last_url = publish_page_url
- republish_click_count = 0
- republish_attempt_count = 0
- last_republish_attempt_time = 0.0
- republish_attempt_interval = 45 # 失败后至少间隔 45s 再尝试,避免刷屏和误操作
- max_republish_attempts = 2
-
- while asyncio.get_event_loop().time() - start_time < publish_timeout:
- await asyncio.sleep(3)
- current_url = self.page.url
-
- # 检测 URL 是否发生变化
- if current_url != last_url:
- print(f"[{self.platform_name}] URL 变化: {last_url} -> {current_url}")
- last_url = current_url
-
- # 检查是否跳转到内容管理页面(真正的成功标志)
- # 百家号发布成功后会跳转到 /builder/rc/content 页面
- if '/builder/rc/content' in current_url and 'edit' not in current_url:
- self.report_progress(100, "发布成功!")
- print(f"[{self.platform_name}] 发布成功,已跳转到内容管理页: {current_url}")
- screenshot_base64 = await self.capture_screenshot()
- return PublishResult(
- success=True,
- platform=self.platform_name,
- message="发布成功",
- screenshot_base64=screenshot_base64,
- page_url=current_url,
- status='success'
- )
-
- # 检查是否有明确的成功提示弹窗
- try:
- # 百家号发布成功会显示"发布成功"弹窗
- success_modal = self.page.locator('div:has-text("发布成功"), div:has-text("提交成功"), div:has-text("视频发布成功")').first
- if await success_modal.count() > 0 and await success_modal.is_visible():
- self.report_progress(100, "发布成功!")
- print(f"[{self.platform_name}] 检测到发布成功弹窗")
- screenshot_base64 = await self.capture_screenshot()
-
- # 等待一下看是否会跳转
- await asyncio.sleep(3)
-
- return PublishResult(
- success=True,
- platform=self.platform_name,
- message="发布成功",
- screenshot_base64=screenshot_base64,
- page_url=self.page.url,
- status='success'
- )
- except Exception as e:
- print(f"[{self.platform_name}] 检测成功提示异常: {e}")
-
- # 检查是否有错误提示
- try:
- error_selectors = [
- 'div.error-tip',
- 'div[class*="error-msg"]',
- 'span[class*="error"]',
- 'div:has-text("发布失败")',
- 'div:has-text("提交失败")',
- ]
- for error_selector in error_selectors:
- error_el = self.page.locator(error_selector).first
- if await error_el.count() > 0 and await error_el.is_visible():
- error_text = await error_el.text_content()
- if error_text and error_text.strip():
- print(f"[{self.platform_name}] 检测到错误: {error_text}")
- screenshot_base64 = await self.capture_screenshot()
- return PublishResult(
- success=False,
- platform=self.platform_name,
- error=f"发布失败: {error_text.strip()}",
- screenshot_base64=screenshot_base64,
- page_url=current_url,
- status='failed'
- )
- except Exception as e:
- print(f"[{self.platform_name}] 检测错误提示异常: {e}")
-
- # 检查验证码
- captcha_result = await self.check_captcha()
- if captcha_result['need_captcha']:
- screenshot_base64 = await self.capture_screenshot()
- return PublishResult(
- success=False,
- platform=self.platform_name,
- error=f"发布过程中需要{captcha_result['captcha_type']}验证码",
- need_captcha=True,
- captcha_type=captcha_result['captcha_type'],
- screenshot_base64=screenshot_base64,
- page_url=current_url,
- status='need_captcha'
- )
-
- # 检查发布按钮状态(如果还在编辑页面)
- if 'edit' in current_url:
- try:
- is_processing = False
- for indicator in publish_processing_indicators:
- loc = self.page.locator(indicator).first
- if await loc.count() > 0 and await loc.is_visible():
- is_processing = True
- print(f"[{self.platform_name}] 正在处理中...")
- break
-
- if not is_processing:
- # 如果不是在处理中,按节流策略尝试重新点击发布按钮
- now_loop = asyncio.get_event_loop().time()
- elapsed = now_loop - start_time
- if (
- elapsed > 60
- and republish_attempt_count < max_republish_attempts
- and (now_loop - last_republish_attempt_time) >= republish_attempt_interval
- ):
- last_republish_attempt_time = now_loop
- republish_attempt_count += 1
- print(f"[{self.platform_name}] 发布状态未变化,执行第 {republish_attempt_count}/{max_republish_attempts} 次补点发布...")
- republish_done = False
- republish_candidates, _, _ = await _collect_publish_candidates(prefer_confirm=True)
- for candidate in republish_candidates[:6]:
- ok, err = await _click_publish_candidate(candidate)
- if ok:
- republish_done = True
- republish_click_count += 1
- candidate_text = _compact_btn_text(str(candidate.get("text") or ""))
- print(f"[{self.platform_name}] 重新点击发布按钮成功: text={candidate_text}, count={republish_click_count}")
- break
- if err:
- publish_click_error = err
- if not republish_done:
- print(f"[{self.platform_name}] 本轮未找到可用的立即发布按钮,继续等待状态变化")
- except Exception as e:
- print(f"[{self.platform_name}] 检查处理状态异常: {e}")
-
- # 超时,获取截图分析最终状态
- print(f"[{self.platform_name}] 发布超时,最终 URL: {self.page.url}")
- screenshot_base64 = await self.capture_screenshot()
-
- # 最后一次检查是否在内容管理页
- final_url = self.page.url
- if '/builder/rc/content' in final_url and 'edit' not in final_url:
- return PublishResult(
- success=True,
- platform=self.platform_name,
- message="发布成功(延迟确认)",
- screenshot_base64=screenshot_base64,
- page_url=final_url,
- status='success'
- )
- # 超时后兜底:跳转内容管理页按标题校验,避免“已发布但未跳转”误判失败
- print(f"[{self.platform_name}] 超时后执行内容页二次校验,title={params.title}")
- verify_deadline = asyncio.get_event_loop().time() + 120 # 最多再校验 2 分钟
- while asyncio.get_event_loop().time() < verify_deadline:
- if await self._verify_publish_from_content_page(params.title, page_size=20):
- screenshot_base64 = await self.capture_screenshot()
- return PublishResult(
- success=True,
- platform=self.platform_name,
- message="发布成功(内容页校验)",
- screenshot_base64=screenshot_base64,
- page_url=self.page.url,
- status='success'
- )
- await asyncio.sleep(8)
-
- return PublishResult(
- success=False,
- platform=self.platform_name,
- error="发布超时,请手动检查发布状态",
- screenshot_base64=screenshot_base64,
- page_url=final_url,
- status='need_action'
- )
-
- async def get_works(self, cookies: str, page: int = 0, page_size: int = 20) -> WorksResult:
- """
- 获取百家号作品列表
- 优先使用内容管理页的接口(pcui/article/lists)。
- 说明:
- - 该接口通常需要自定义请求头 token(JWT),仅靠 Cookie 可能会返回“未登录”
- - 这里使用 Playwright 打开内容页,从 localStorage/sessionStorage/页面脚本中自动提取 token,
- 再在页面上下文中发起 fetch(携带 cookie + token),以提高成功率
- """
- import re
-
- print(f"\n{'='*60}")
- print(f"[{self.platform_name}] 获取作品列表 (使用 API)")
- print(f"[{self.platform_name}] page={page}, page_size={page_size}")
- print(f"{'='*60}")
-
- works: List[WorkItem] = []
- total = 0
- has_more = False
- next_page = ""
-
- try:
- # 解析并设置 cookies(Playwright)
- cookie_list = self.parse_cookies(cookies)
- await self.init_browser()
- await self.set_cookies(cookie_list)
- if not self.page:
- raise Exception("Page not initialized")
- # 先打开内容管理页,确保本页 Referer/会话就绪
- # Node 侧传 page=0,1,...;接口 currentPage 为 1,2,...
- current_page = int(page) + 1
- page_size = int(page_size)
- content_url = (
- "https://baijiahao.baidu.com/builder/rc/content"
- f"?currentPage={current_page}&pageSize={page_size}"
- "&search=&type=&collection=&startDate=&endDate="
- )
- await self.page.goto(content_url, wait_until="domcontentloaded", timeout=60000)
- await asyncio.sleep(2)
- # 1) 提取 token(JWT)
- token = await self.page.evaluate(
- """
- () => {
- const isJwtLike = (v) => {
- if (!v || typeof v !== 'string') return false;
- const s = v.trim();
- if (s.length < 60) return false;
- const parts = s.split('.');
- if (parts.length !== 3) return false;
- return parts.every(p => /^[A-Za-z0-9_-]+$/.test(p) && p.length > 10);
- };
- const pickFromStorage = (storage) => {
- try {
- const keys = Object.keys(storage || {});
- for (const k of keys) {
- const v = storage.getItem(k);
- if (isJwtLike(v)) return v;
- }
- } catch {}
- return "";
- };
- // localStorage / sessionStorage
- let t = pickFromStorage(window.localStorage);
- if (t) return t;
- t = pickFromStorage(window.sessionStorage);
- if (t) return t;
- // meta 标签
- const meta = document.querySelector('meta[name="token"], meta[name="bjh-token"]');
- const metaToken = meta && meta.getAttribute('content');
- if (isJwtLike(metaToken)) return metaToken;
- // 简单从全局变量里找
- const candidates = [
- (window.__INITIAL_STATE__ && window.__INITIAL_STATE__.token) || "",
- (window.__PRELOADED_STATE__ && window.__PRELOADED_STATE__.token) || "",
- (window.__NUXT__ && window.__NUXT__.state && window.__NUXT__.state.token) || "",
- ];
- for (const c of candidates) {
- if (isJwtLike(c)) return c;
- }
- return "";
- }
- """
- )
- # 2) 若仍未取到 token,再从页面 HTML 兜底提取
- if not token:
- html = await self.page.content()
- m = re.search(r'([A-Za-z0-9_-]{10,}\.[A-Za-z0-9_-]{10,}\.[A-Za-z0-9_-]{10,})', html)
- if m:
- token = m.group(1)
- if not token:
- raise Exception("未能从页面提取 token(可能未登录或触发风控),请重新登录百家号账号后再试")
- # 3) 调用接口(在页面上下文 fetch,自动携带 cookie)
- api_url = (
- "https://baijiahao.baidu.com/pcui/article/lists"
- f"?currentPage={current_page}"
- f"&pageSize={page_size}"
- "&search=&type=&collection=&startDate=&endDate="
- "&clearBeforeFetch=false"
- "&dynamic=1"
- )
- resp = await self.page.evaluate(
- """
- async ({ url, token }) => {
- const r = await fetch(url, {
- method: 'GET',
- credentials: 'include',
- headers: {
- 'accept': 'application/json, text/plain, */*',
- ...(token ? { token } : {}),
- },
- });
- const text = await r.text();
- return { ok: r.ok, status: r.status, text };
- }
- """,
- {"url": api_url, "token": token},
- )
- if not resp or not resp.get("ok"):
- status = resp.get("status") if isinstance(resp, dict) else "unknown"
- raise Exception(f"百家号接口请求失败: HTTP {status}")
- api_result = json.loads(resp.get("text") or "{}")
- print(f"[{self.platform_name}] pcui/article/lists 响应: errno={api_result.get('errno')}, errmsg={api_result.get('errmsg')}")
- if api_result.get("errno") != 0:
- errno = api_result.get("errno")
- errmsg = api_result.get("errmsg", "unknown error")
- # 20040001 常见为“未登录”
- if errno in (110, 20040001):
- raise Exception("百家号未登录或 Cookie/token 失效,请重新登录后再同步")
- raise Exception(f"百家号接口错误: errno={errno}, errmsg={errmsg}")
- data = api_result.get("data", {}) or {}
- items = data.get("list", []) or []
- page_info = data.get("page", {}) or {}
- total = int(page_info.get("totalCount", 0) or 0)
- total_page = int(page_info.get("totalPage", 0) or 0)
- cur_page = int(page_info.get("currentPage", current_page) or current_page)
- has_more = bool(total_page and cur_page < total_page)
- next_page = cur_page + 1 if has_more else ""
- print(f"[{self.platform_name}] 获取到 {len(items)} 个作品,总数: {total}, currentPage={cur_page}, totalPage={total_page}")
- def _pick_cover(item: dict) -> str:
- cover = item.get("crosswise_cover") or item.get("vertical_cover") or ""
- if cover:
- return cover
- raw = item.get("cover_images") or ""
- try:
- # cover_images 可能是 JSON 字符串
- parsed = json.loads(raw) if isinstance(raw, str) else raw
- if isinstance(parsed, list) and parsed:
- first = parsed[0]
- if isinstance(first, dict):
- return first.get("src") or first.get("ori_src") or ""
- if isinstance(first, str):
- return first
- except Exception:
- pass
- return ""
- def _pick_duration(item: dict) -> int:
- for k in ("rmb_duration", "duration", "long"):
- try:
- v = int(item.get(k) or 0)
- if v > 0:
- return v
- except Exception:
- pass
- # displaytype_exinfo 里可能有 ugcvideo.video_info.durationInSecond
- ex = item.get("displaytype_exinfo") or ""
- try:
- exj = json.loads(ex) if isinstance(ex, str) and ex else (ex if isinstance(ex, dict) else {})
- ugc = (exj.get("ugcvideo") or {}) if isinstance(exj, dict) else {}
- vi = ugc.get("video_info") or {}
- v = int(vi.get("durationInSecond") or ugc.get("long") or 0)
- return v if v > 0 else 0
- except Exception:
- return 0
- def _pick_status(item: dict) -> str:
- qs = str(item.get("quality_status") or "").lower()
- st = str(item.get("status") or "").lower()
- if qs == "rejected" or "reject" in st:
- return "rejected"
- if st in ("draft", "unpublish", "unpublished"):
- return "draft"
- # 百家号常见 publish
- return "published"
- for item in items:
- # 优先使用 nid(builder 预览链接使用这个)
- work_id = str(item.get("nid") or item.get("feed_id") or item.get("article_id") or item.get("id") or "")
- if not work_id:
- continue
- works.append(
- WorkItem(
- work_id=work_id,
- title=str(item.get("title") or ""),
- cover_url=_pick_cover(item),
- video_url=str(item.get("url") or ""),
- duration=_pick_duration(item),
- status=_pick_status(item),
- publish_time=str(item.get("publish_time") or item.get("publish_at") or item.get("created_at") or ""),
- play_count=int(item.get("read_amount") or 0),
- like_count=int(item.get("like_amount") or 0),
- comment_count=int(item.get("comment_amount") or 0),
- share_count=int(item.get("share_amount") or 0),
- collect_count=int(item.get("collection_amount") or 0),
- )
- )
- print(f"[{self.platform_name}] ✓ 成功解析 {len(works)} 个作品")
-
- except Exception as e:
- import traceback
- traceback.print_exc()
- return WorksResult(
- success=False,
- platform=self.platform_name,
- error=str(e),
- debug_info="baijiahao_get_works_failed"
- )
-
- return WorksResult(
- success=True,
- platform=self.platform_name,
- works=works,
- total=total,
- has_more=has_more,
- next_page=next_page
- )
-
- async def get_article_stats(
- self,
- cookies: str,
- start_day: str,
- end_day: str,
- stat_type: str,
- num: int,
- count: int,
- ) -> dict:
- """
- 调用百家号 /author/eco/statistics/articleListStatistic 接口(不依赖浏览器 token),用于作品列表维度的每日数据。
- """
- import aiohttp
-
- print(f"[{self.platform_name}] get_article_stats: {start_day}-{end_day}, type={stat_type}, num={num}, count={count}")
-
- # 解析 cookies
- cookie_list = self.parse_cookies(cookies)
- cookie_dict = {c['name']: c['value'] for c in cookie_list}
-
- session_headers = {
- 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7',
- 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
- 'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
- 'Accept-Encoding': 'gzip, deflate, br',
- 'Connection': 'keep-alive',
- 'Upgrade-Insecure-Requests': '1',
- }
- headers = {
- 'Accept': 'application/json, text/plain, */*',
- 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
- 'Referer': 'https://baijiahao.baidu.com/builder/rc/analysiscontent/single',
- 'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
- 'Accept-Encoding': 'gzip, deflate, br',
- 'Connection': 'keep-alive',
- }
- async with aiohttp.ClientSession(cookies=cookie_dict) as session:
- # 0) 先访问 single 页面建立会话上下文(与 Node 端 UI 打开的页面一致)
- try:
- await session.get(
- 'https://baijiahao.baidu.com/builder/rc/analysiscontent/single',
- headers=session_headers,
- timeout=aiohttp.ClientTimeout(total=20),
- )
- except Exception as e:
- print(f"[{self.platform_name}] warmup single page failed (non-fatal): {e}")
-
- # 1) 调用 articleListStatistic
- api_url = (
- "https://baijiahao.baidu.com/author/eco/statistics/articleListStatistic"
- f"?start_day={start_day}&end_day={end_day}&type={stat_type}&num={num}&count={count}"
- )
- async with session.get(
- api_url,
- headers=headers,
- timeout=aiohttp.ClientTimeout(total=30),
- ) as resp:
- status = resp.status
- try:
- data = await resp.json()
- except Exception:
- text = await resp.text()
- print(f"[{self.platform_name}] articleListStatistic non-JSON response: {text[:1000]}")
- raise
-
- errno = data.get('errno')
- errmsg = data.get('errmsg')
- print(f"[{self.platform_name}] articleListStatistic: http={status}, errno={errno}, msg={errmsg}")
-
- return {
- "success": status == 200 and errno == 0,
- "status": status,
- "errno": errno,
- "errmsg": errmsg,
- "data": data.get('data') if isinstance(data, dict) else None,
- }
-
- async def get_trend_data(
- self,
- cookies: str,
- nid: str,
- ) -> dict:
- """
- 调用百家号 /author/eco/statistic/gettrenddata 接口,获取单作品的按日统计数据(basic_list)。
- """
- import aiohttp
-
- print(f"[{self.platform_name}] get_trend_data: nid={nid}")
-
- cookie_list = self.parse_cookies(cookies)
- cookie_dict = {c['name']: c['value'] for c in cookie_list}
-
- session_headers = {
- 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7',
- 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
- 'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
- 'Accept-Encoding': 'gzip, deflate, br',
- 'Connection': 'keep-alive',
- 'Upgrade-Insecure-Requests': '1',
- }
- headers = {
- 'Accept': 'application/json, text/plain, */*',
- 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
- 'Referer': 'https://baijiahao.baidu.com/builder/rc/analysiscontent/single',
- 'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
- 'Accept-Encoding': 'gzip, deflate, br',
- 'Connection': 'keep-alive',
- }
- async with aiohttp.ClientSession(cookies=cookie_dict) as session:
- # 0) warmup
- try:
- await session.get(
- 'https://baijiahao.baidu.com/builder/rc/analysiscontent/single',
- headers=session_headers,
- timeout=aiohttp.ClientTimeout(total=20),
- )
- except Exception as e:
- print(f"[{self.platform_name}] warmup single page (trend) failed (non-fatal): {e}")
-
- api_url = (
- "https://baijiahao.baidu.com/author/eco/statistic/gettrenddata"
- f"?nid={nid}&trend_type=all&data_type=addition"
- )
- async with session.get(
- api_url,
- headers=headers,
- timeout=aiohttp.ClientTimeout(total=30),
- ) as resp:
- status = resp.status
- try:
- data = await resp.json()
- except Exception:
- text = await resp.text()
- print(f"[{self.platform_name}] gettrenddata non-JSON response: {text[:1000]}")
- raise
-
- errno = data.get('errno')
- errmsg = data.get('errmsg')
- print(f"[{self.platform_name}] gettrenddata: http={status}, errno={errno}, msg={errmsg}")
-
- return {
- "success": status == 200 and errno == 0,
- "status": status,
- "errno": errno,
- "errmsg": errmsg,
- "data": data.get('data') if isinstance(data, dict) else None,
- }
- async def get_app_statistic_v3(
- self,
- cookies: str,
- start_day: str,
- end_day: str,
- ) -> dict:
- """
- 调用百家号 appStatisticV3(账号维度近30天基础数据),用于用户每日数据同步。
- 登录模式与打开后台一致:使用账号已存 Cookie,不启浏览器。
- """
- import aiohttp
- print(f"[{self.platform_name}] get_app_statistic_v3: {start_day}-{end_day}")
- cookie_list = self.parse_cookies(cookies)
- cookie_dict = {c['name']: c['value'] for c in cookie_list}
- session_headers = {
- 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
- 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
- 'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
- 'Accept-Encoding': 'gzip, deflate, br',
- 'Connection': 'keep-alive',
- }
- headers = {
- 'Accept': 'application/json, text/plain, */*',
- 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
- 'Referer': 'https://baijiahao.baidu.com/builder/rc/analysiscontent',
- 'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
- 'Accept-Encoding': 'gzip, deflate, br',
- 'Connection': 'keep-alive',
- }
- async with aiohttp.ClientSession(cookies=cookie_dict) as session:
- # warmup:与打开后台一致,先访问后台页面建立会话
- try:
- await session.get(
- 'https://baijiahao.baidu.com/builder/rc/analysiscontent',
- headers=session_headers,
- timeout=aiohttp.ClientTimeout(total=20),
- )
- except Exception as e:
- print(f"[{self.platform_name}] warmup analysiscontent failed (non-fatal): {e}")
- api_url = (
- "https://baijiahao.baidu.com/author/eco/statistics/appStatisticV3"
- f"?type=all&start_day={start_day}&end_day={end_day}&stat=0&special_filter_days=30"
- )
- async with session.get(
- api_url,
- headers=headers,
- timeout=aiohttp.ClientTimeout(total=30),
- ) as resp:
- status = resp.status
- try:
- data = await resp.json()
- except Exception:
- text = await resp.text()
- print(f"[{self.platform_name}] appStatisticV3 non-JSON: {text[:1000]}")
- raise
- errno = data.get('errno') if isinstance(data, dict) else None
- errmsg = data.get('errmsg') if isinstance(data, dict) else None
- print(f"[{self.platform_name}] appStatisticV3: http={status}, errno={errno}, msg={errmsg}")
- return data if isinstance(data, dict) else {"errno": -1, "errmsg": "invalid response", "data": None}
- async def get_fans_basic_info(
- self,
- cookies: str,
- start: str,
- end: str,
- ) -> dict:
- """
- 调用百家号 getFansBasicInfo(近30天粉丝数据),用于用户每日数据同步。
- 登录模式与打开后台一致:使用账号已存 Cookie。
- """
- import aiohttp
- print(f"[{self.platform_name}] get_fans_basic_info: {start}-{end}")
- cookie_list = self.parse_cookies(cookies)
- cookie_dict = {c['name']: c['value'] for c in cookie_list}
- session_headers = {
- 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
- 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
- 'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
- 'Accept-Encoding': 'gzip, deflate, br',
- 'Connection': 'keep-alive',
- }
- headers = {
- 'Accept': 'application/json, text/plain, */*',
- 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
- 'Referer': 'https://baijiahao.baidu.com/builder/rc/analysisfans/basedata',
- 'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
- 'Accept-Encoding': 'gzip, deflate, br',
- 'Connection': 'keep-alive',
- }
- async with aiohttp.ClientSession(cookies=cookie_dict) as session:
- try:
- await session.get(
- 'https://baijiahao.baidu.com/builder/rc/analysisfans/basedata',
- headers=session_headers,
- timeout=aiohttp.ClientTimeout(total=20),
- )
- except Exception as e:
- print(f"[{self.platform_name}] warmup analysisfans/basedata failed (non-fatal): {e}")
- api_url = (
- "https://baijiahao.baidu.com/author/eco/statistics/getFansBasicInfo"
- f"?start={start}&end={end}&fans_type=new%2Csum&sort=asc&is_page=0&show_type=chart"
- )
- async with session.get(
- api_url,
- headers=headers,
- timeout=aiohttp.ClientTimeout(total=30),
- ) as resp:
- status = resp.status
- try:
- data = await resp.json()
- except Exception:
- text = await resp.text()
- print(f"[{self.platform_name}] getFansBasicInfo non-JSON: {text[:1000]}")
- raise
- errno = data.get('errno') if isinstance(data, dict) else None
- errmsg = data.get('errmsg') if isinstance(data, dict) else None
- print(f"[{self.platform_name}] getFansBasicInfo: http={status}, errno={errno}, msg={errmsg}")
- return data if isinstance(data, dict) else {"errno": -1, "errmsg": "invalid response", "data": None}
- async def check_login_status(self, cookies: str) -> dict:
- """
- 检查百家号 Cookie 登录状态
- 现在与其他平台保持一致,直接复用 BasePublisher 的浏览器检测逻辑:
- - 使用 Playwright 打开后台页面
- - 根据是否跳转到登录页 / 是否出现登录弹窗或风控提示,判断登录是否有效
- """
- print(f"[{self.platform_name}] 检查登录状态 (使用通用浏览器逻辑)")
- # 直接调用父类的实现,保持与抖音/小红书/视频号一致
- return await super().check_login_status(cookies)
-
- async def get_comments(self, cookies: str, work_id: str, cursor: str = "") -> CommentsResult:
- """获取百家号作品评论"""
- # TODO: 实现评论获取逻辑
- return CommentsResult(
- success=False,
- platform=self.platform_name,
- work_id=work_id,
- error="百家号评论功能暂未实现"
- )
|