| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984198519861987198819891990199119921993199419951996199719981999200020012002200320042005200620072008200920102011201220132014201520162017201820192020202120222023202420252026202720282029203020312032203320342035203620372038203920402041204220432044204520462047204820492050205120522053205420552056205720582059206020612062206320642065206620672068206920702071207220732074207520762077207820792080208120822083208420852086208720882089209020912092209320942095209620972098209921002101210221032104210521062107210821092110211121122113211421152116211721182119212021212122212321242125212621272128212921302131213221332134213521362137213821392140214121422143214421452146214721482149215021512152215321542155215621572158215921602161216221632164216521662167216821692170217121722173217421752176217721782179218021812182218321842185218621872188218921902191219221932194219521962197219821992200220122022203220422052206220722082209221022112212221322142215221622172218221922202221222222232224222522262227222822292230223122322233223422352236223722382239224022412242224322442245224622472248224922502251225222532254225522562257225822592260226122622263226422652266226722682269227022712272227322742275227622772278227922802281228222832284228522862287228822892290229122922293229422952296229722982299230023012302230323042305230623072308230923102311231223132314231523162317231823192320232123222323232423252326232723282329233023312332233323342335233623372338233923402341234223432344234523462347234823492350235123522353235423552356235723582359236023612362236323642365236623672368236923702371237223732374237523762377237823792380238123822383238423852386238723882389239023912392239323942395239623972398239924002401240224032404240524062407240824092410241124122413241424152416241724182419242024212422242324242425242624272428242924302431243224332434243524362437243824392440244124422443244424452446244724482449245024512452245324542455245624572458245924602461246224632464246524662467246824692470247124722473247424752476247724782479248024812482248324842485248624872488248924902491249224932494249524962497249824992500250125022503250425052506250725082509251025112512251325142515251625172518251925202521252225232524252525262527252825292530253125322533253425352536253725382539254025412542254325442545254625472548254925502551255225532554255525562557255825592560256125622563256425652566256725682569257025712572257325742575257625772578257925802581258225832584258525862587258825892590259125922593259425952596259725982599260026012602260326042605260626072608260926102611261226132614261526162617261826192620262126222623262426252626262726282629263026312632263326342635263626372638263926402641264226432644264526462647264826492650265126522653265426552656265726582659266026612662266326642665266626672668266926702671267226732674267526762677267826792680268126822683268426852686268726882689269026912692269326942695269626972698269927002701270227032704270527062707270827092710271127122713271427152716271727182719272027212722272327242725272627272728272927302731273227332734273527362737273827392740274127422743274427452746274727482749275027512752275327542755275627572758275927602761276227632764276527662767276827692770277127722773277427752776 |
- # -*- coding: utf-8 -*-
- """
- 小红书视频发布器
- 参考: matrix/xhs_uploader/main.py
- 使用 xhs SDK API 方式发布,更稳定
- """
- import asyncio
- import os
- import sys
- import time
- import concurrent.futures
- from pathlib import Path
- from typing import List
- from .base import (
- BasePublisher,
- PublishParams,
- PublishResult,
- WorkItem,
- WorksResult,
- CommentItem,
- CommentsResult,
- )
- from playwright.async_api import async_playwright
- stored_cookies = None
- # 添加 matrix 项目路径,用于导入签名脚本
- MATRIX_PATH = Path(__file__).parent.parent.parent.parent / "matrix"
- sys.path.insert(0, str(MATRIX_PATH))
- # 尝试导入 xhs SDK
- try:
- from xhs import XhsClient
- XHS_SDK_AVAILABLE = True
- except ImportError:
- print("[Warning] xhs 库未安装,请运行: pip install xhs")
- XhsClient = None
- XHS_SDK_AVAILABLE = False
- # 签名脚本路径
- STEALTH_JS_PATH = MATRIX_PATH / "xhs-api" / "js" / "stealth.min.js"
- _xhs_sign_executor = concurrent.futures.ThreadPoolExecutor(max_workers=1)
- class XiaohongshuPublisher(BasePublisher):
- """
- 小红书视频发布器
- 优先使用 xhs SDK API 方式发布
- """
- platform_name = "xiaohongshu"
- login_url = "https://creator.xiaohongshu.com/"
- publish_url = "https://creator.xiaohongshu.com/publish/publish"
- cookie_domain = ".xiaohongshu.com"
- async def get_sign(self, uri: str, data=None, a1: str = "", web_session: str = ""):
- """获取小红书 API 签名"""
- from playwright.async_api import async_playwright
- try:
- async with async_playwright() as playwright:
- browser = await playwright.chromium.launch(headless=True)
- browser_context = await browser.new_context()
- if STEALTH_JS_PATH.exists():
- await browser_context.add_init_script(path=str(STEALTH_JS_PATH))
- page = await browser_context.new_page()
- await page.goto("https://www.xiaohongshu.com")
- await asyncio.sleep(1)
- await page.reload()
- await asyncio.sleep(1)
- if a1:
- await browser_context.add_cookies(
- [
- {
- "name": "a1",
- "value": a1,
- "domain": ".xiaohongshu.com",
- "path": "/",
- }
- ]
- )
- await page.reload()
- await asyncio.sleep(0.5)
- encrypt_params = await page.evaluate(
- "([url, data]) => window._webmsxyw(url, data)", [uri, data]
- )
- await browser_context.close()
- await browser.close()
- return {"x-s": encrypt_params["X-s"], "x-t": str(encrypt_params["X-t"])}
- except Exception as e:
- import traceback
- traceback.print_exc()
- raise Exception(f"签名失败: {e}")
- def sign_sync(self, uri, data=None, a1="", web_session=""):
- """
- 同步签名函数,供 XhsClient 使用。
- 注意:发布流程运行在 asyncio 事件循环中(通过 asyncio.run 启动)。
- XhsClient 以同步方式调用 sign 回调,但我们需要使用 Playwright Async API 进行签名。
- 因此当处于事件循环中时,将签名逻辑放到独立线程里执行 asyncio.run。
- """
- def run_async_sign():
- return asyncio.run(
- self.get_sign(uri, data=data, a1=a1, web_session=web_session)
- )
- try:
- asyncio.get_running_loop()
- future = _xhs_sign_executor.submit(run_async_sign)
- return future.result(timeout=120)
- except RuntimeError:
- return run_async_sign()
- async def publish_via_api(
- self, cookies: str, params: PublishParams
- ) -> PublishResult:
- """通过 API 发布视频"""
- if not XHS_SDK_AVAILABLE:
- raise Exception("xhs SDK 未安装,请运行: pip install xhs")
- self.report_progress(10, "正在通过 API 发布...")
- print(f"[{self.platform_name}] 使用 XHS SDK API 发布...")
- print(f"[{self.platform_name}] 视频路径: {params.video_path}")
- print(f"[{self.platform_name}] 标题: {params.title}")
- # 转换 cookie 格式
- cookie_list = self.parse_cookies(cookies)
- cookie_string = self.cookies_to_string(cookie_list) if cookie_list else cookies
- print(f"[{self.platform_name}] Cookie 长度: {len(cookie_string)}")
- self.report_progress(20, "正在上传视频...")
- async def ensure_valid_cookie_for_sdk() -> str | None:
- await self.init_browser()
- cookie_list_for_browser = self.parse_cookies(cookie_string)
- await self.set_cookies(cookie_list_for_browser)
- if not self.page or not self.context:
- return None
- await self.page.goto(
- "https://creator.xiaohongshu.com/new/home",
- wait_until="domcontentloaded",
- timeout=60000,
- )
- await asyncio.sleep(2)
- current_url = (self.page.url or "").lower()
- if "login" in current_url or "passport" in current_url:
- if self.headless:
- return None
- waited = 0
- while waited < 180:
- current_url = (self.page.url or "").lower()
- if (
- "login" not in current_url
- and "passport" not in current_url
- and "creator.xiaohongshu.com" in current_url
- ):
- break
- await asyncio.sleep(2)
- waited += 2
- current_url = (self.page.url or "").lower()
- if "login" in current_url or "passport" in current_url:
- return None
- cookies_after = await self.context.cookies()
- try:
- await self.sync_cookies_to_node(cookies_after)
- except Exception:
- pass
- refreshed_cookie_str = self.cookies_to_string(cookies_after)
- return refreshed_cookie_str or None
- def call_create_video_note(sdk_cookie_str: str):
- xhs_client = XhsClient(sdk_cookie_str, sign=self.sign_sync)
- return xhs_client.create_video_note(
- title=params.title,
- desc=params.description or params.title,
- topics=params.tags or [],
- post_time=params.publish_date.strftime("%Y-%m-%d %H:%M:%S")
- if params.publish_date
- else None,
- video_path=params.video_path,
- cover_path=params.cover_path
- if params.cover_path and os.path.exists(params.cover_path)
- else None,
- )
- print(f"[{self.platform_name}] 开始调用 create_video_note...")
- try:
- result = call_create_video_note(cookie_string)
- print(f"[{self.platform_name}] SDK 返回结果: {result}")
- except Exception as e:
- err_text = str(e)
- if (
- "无登录信息" in err_text
- or '"code": -100' in err_text
- or "'code': -100" in err_text
- ):
- self.report_progress(15, "登录信息失效,尝试刷新登录信息...")
- refreshed = await ensure_valid_cookie_for_sdk()
- if not refreshed:
- screenshot_base64 = await self.capture_screenshot()
- page_url = (
- await self.get_page_url()
- if hasattr(self, "get_page_url")
- else (self.page.url if self.page else "")
- )
- return PublishResult(
- success=False,
- platform=self.platform_name,
- error="登录已过期,请使用有头浏览器重新登录",
- screenshot_base64=screenshot_base64,
- page_url=page_url,
- status="need_captcha",
- need_captcha=True,
- captcha_type="login",
- )
- try:
- result = call_create_video_note(refreshed)
- print(f"[{self.platform_name}] SDK 重试返回结果: {result}")
- except Exception as e2:
- import traceback
- traceback.print_exc()
- raise Exception(f"XHS SDK 发布失败: {e2}")
- else:
- import traceback
- traceback.print_exc()
- print(f"[{self.platform_name}] SDK 调用失败: {e}")
- raise Exception(f"XHS SDK 发布失败: {e}")
- # 验证返回结果
- if not result:
- raise Exception("XHS SDK 返回空结果")
- # 检查是否有错误
- if isinstance(result, dict):
- if result.get("code") and result.get("code") != 0:
- raise Exception(f"发布失败: {result.get('msg', '未知错误')}")
- if result.get("success") == False:
- raise Exception(
- f"发布失败: {result.get('msg', result.get('error', '未知错误'))}"
- )
- note_id = result.get("note_id", "") if isinstance(result, dict) else ""
- video_url = result.get("url", "") if isinstance(result, dict) else ""
- if not note_id:
- print(f"[{self.platform_name}] 警告: 未获取到 note_id,返回结果: {result}")
- self.report_progress(100, "发布成功")
- print(f"[{self.platform_name}] 发布成功! note_id={note_id}, url={video_url}")
- return PublishResult(
- success=True,
- platform=self.platform_name,
- video_id=note_id,
- video_url=video_url,
- message="发布成功",
- )
- async def publish(self, cookies: str, params: PublishParams) -> PublishResult:
- """发布视频到小红书 - 参考 matrix/xhs_uploader/main.py"""
- print(f"\n{'=' * 60}")
- print(f"[{self.platform_name}] 开始发布视频")
- print(f"[{self.platform_name}] 视频路径: {params.video_path}")
- print(f"[{self.platform_name}] 标题: {params.title}")
- print(f"[{self.platform_name}] Headless: {self.headless}")
- print(f"[{self.platform_name}] XHS SDK 可用: {XHS_SDK_AVAILABLE}")
- print(f"{'=' * 60}")
- # 检查视频文件
- if not os.path.exists(params.video_path):
- raise Exception(f"视频文件不存在: {params.video_path}")
- print(
- f"[{self.platform_name}] 视频文件存在,大小: {os.path.getsize(params.video_path)} bytes"
- )
- self.report_progress(5, "正在准备发布...")
- if isinstance(
- getattr(self, "proxy_config", None), dict
- ) and self.proxy_config.get("server"):
- print(
- f"[{self.platform_name}] 检测到代理配置,跳过 SDK 方式,使用 Playwright 走代理发布",
- flush=True,
- )
- return await self.publish_via_playwright(cookies, params)
- # 参考 matrix: 优先使用 XHS SDK API 方式发布(更稳定)
- if XHS_SDK_AVAILABLE:
- try:
- print(f"[{self.platform_name}] 尝试使用 XHS SDK API 发布...")
- result = await self.publish_via_api(cookies, params)
- print(f"[{self.platform_name}] API 发布完成: success={result.success}")
- # 如果 API 返回成功,直接返回
- if result.success:
- return result
- # 如果 API 返回失败但有具体错误,也返回
- if result.error and "请刷新" not in result.error:
- return result
- # 其他情况尝试 Playwright 方式
- print(f"[{self.platform_name}] API 方式未成功,尝试 Playwright...")
- except Exception as e:
- err_text = str(e)
- if "登录已过期" in err_text or "无登录信息" in err_text:
- print(
- f"[{self.platform_name}] API 登录失效,切换到 Playwright 方式...",
- flush=True,
- )
- else:
- import traceback
- traceback.print_exc()
- print(f"[{self.platform_name}] API 发布失败: {e}")
- print(f"[{self.platform_name}] 尝试使用 Playwright 方式...")
- # 使用 Playwright 方式发布
- print(f"[{self.platform_name}] 使用 Playwright 方式发布...")
- return await self.publish_via_playwright(cookies, params)
- async def publish_via_playwright(
- self, cookies: str, params: PublishParams
- ) -> PublishResult:
- """通过 Playwright 发布视频"""
- self.report_progress(10, "正在初始化浏览器...")
- print(f"[{self.platform_name}] Playwright 方式开始...")
- await self.init_browser()
- cookie_list = self.parse_cookies(cookies)
- print(f"[{self.platform_name}] 设置 {len(cookie_list)} 个 cookies")
- await self.set_cookies(cookie_list)
- if not self.page:
- raise Exception("Page not initialized")
- self.report_progress(15, "正在打开发布页面...")
- # 直接访问视频发布页面
- publish_url = "https://creator.xiaohongshu.com/publish/publish?source=official"
- print(f"[{self.platform_name}] 打开页面: {publish_url}")
- await self.page.goto(publish_url)
- await asyncio.sleep(3)
- current_url = self.page.url
- print(f"[{self.platform_name}] 当前 URL: {current_url}")
- async def wait_for_manual_login(timeout_seconds: int = 300) -> bool:
- if not self.page:
- return False
- self.report_progress(12, "检测到需要登录,请在浏览器窗口完成登录...")
- try:
- await self.page.bring_to_front()
- except:
- pass
- waited = 0
- while waited < timeout_seconds:
- try:
- url = self.page.url
- if (
- "login" not in url
- and "passport" not in url
- and "creator.xiaohongshu.com" in url
- ):
- return True
- await asyncio.sleep(2)
- waited += 2
- except:
- await asyncio.sleep(2)
- waited += 2
- return False
- async def wait_for_manual_captcha(timeout_seconds: int = 180) -> bool:
- waited = 0
- while waited < timeout_seconds:
- try:
- ai_captcha = await self.ai_check_captcha()
- if not ai_captcha.get("has_captcha"):
- return True
- except:
- pass
- await asyncio.sleep(3)
- waited += 3
- return False
- # 检查登录状态
- if "login" in current_url or "passport" in current_url:
- if not self.headless:
- logged_in = await wait_for_manual_login()
- if logged_in:
- try:
- if self.context:
- cookies_after = await self.context.cookies()
- await self.sync_cookies_to_node(cookies_after)
- except:
- pass
- await self.page.goto(publish_url)
- await asyncio.sleep(3)
- current_url = self.page.url
- else:
- screenshot_base64 = await self.capture_screenshot()
- return PublishResult(
- success=False,
- platform=self.platform_name,
- error="需要登录:请在浏览器窗口完成登录后重试",
- screenshot_base64=screenshot_base64,
- page_url=current_url,
- status="need_captcha",
- need_captcha=True,
- captcha_type="login",
- )
- else:
- screenshot_base64 = await self.capture_screenshot()
- return PublishResult(
- success=False,
- platform=self.platform_name,
- error="登录已过期,请重新登录",
- screenshot_base64=screenshot_base64,
- page_url=current_url,
- status="need_captcha",
- need_captcha=True,
- captcha_type="login",
- )
- # 使用 AI 检查验证码
- ai_captcha = await self.ai_check_captcha()
- if ai_captcha["has_captcha"]:
- print(
- f"[{self.platform_name}] AI检测到验证码: {ai_captcha['captcha_type']}",
- flush=True,
- )
- if not self.headless:
- solved = await wait_for_manual_captcha()
- if solved:
- try:
- if self.context:
- cookies_after = await self.context.cookies()
- await self.sync_cookies_to_node(cookies_after)
- except:
- pass
- else:
- screenshot_base64 = await self.capture_screenshot()
- return PublishResult(
- success=False,
- platform=self.platform_name,
- error=f"需要验证码:请在浏览器窗口完成验证后重试",
- screenshot_base64=screenshot_base64,
- page_url=current_url,
- status="need_captcha",
- need_captcha=True,
- captcha_type=ai_captcha["captcha_type"],
- )
- else:
- screenshot_base64 = await self.capture_screenshot()
- return PublishResult(
- success=False,
- platform=self.platform_name,
- error=f"检测到{ai_captcha['captcha_type']}验证码,需要使用有头浏览器完成验证",
- screenshot_base64=screenshot_base64,
- page_url=current_url,
- status="need_captcha",
- need_captcha=True,
- captcha_type=ai_captcha["captcha_type"],
- )
- self.report_progress(20, "正在上传视频...")
- # 等待页面加载
- await asyncio.sleep(2)
- # 上传视频
- upload_triggered = False
- # 方法1: 直接设置隐藏的 file input
- print(f"[{self.platform_name}] 尝试方法1: 设置 file input")
- file_inputs = self.page.locator('input[type="file"]')
- input_count = await file_inputs.count()
- print(f"[{self.platform_name}] 找到 {input_count} 个 file input")
- if input_count > 0:
- # 找到接受视频的 input
- for i in range(input_count):
- input_el = file_inputs.nth(i)
- accept = await input_el.get_attribute("accept") or ""
- print(f"[{self.platform_name}] Input {i} accept: {accept}")
- if "video" in accept or "*" in accept or not accept:
- await input_el.set_input_files(params.video_path)
- upload_triggered = True
- print(f"[{self.platform_name}] 视频文件已设置到 input {i}")
- break
- # 方法2: 点击上传区域触发文件选择器
- if not upload_triggered:
- print(f"[{self.platform_name}] 尝试方法2: 点击上传区域")
- try:
- upload_area = self.page.locator(
- '[class*="upload-wrapper"], [class*="upload-area"], .upload-input'
- ).first
- if await upload_area.count() > 0:
- async with self.page.expect_file_chooser(timeout=5000) as fc_info:
- await upload_area.click()
- file_chooser = await fc_info.value
- await file_chooser.set_files(params.video_path)
- upload_triggered = True
- print(f"[{self.platform_name}] 通过点击上传区域上传成功")
- except Exception as e:
- print(f"[{self.platform_name}] 方法2失败: {e}")
- if not upload_triggered:
- screenshot_base64 = await self.capture_screenshot()
- page_url = await self.get_page_url()
- return PublishResult(
- success=False,
- platform=self.platform_name,
- error="无法上传视频文件",
- screenshot_base64=screenshot_base64,
- page_url=page_url,
- status="need_action",
- )
- self.report_progress(40, "等待视频上传完成...")
- print(f"[{self.platform_name}] 等待视频上传和处理...")
- # 等待上传完成(检测页面变化)
- upload_complete = False
- for i in range(60): # 最多等待3分钟
- await asyncio.sleep(3)
- # 检查是否有标题输入框(上传完成后出现)
- title_input_count = await self.page.locator(
- 'input[placeholder*="标题"], input[placeholder*="填写标题"]'
- ).count()
- # 或者检查编辑器区域
- editor_count = await self.page.locator(
- '[class*="ql-editor"], [contenteditable="true"]'
- ).count()
- # 检查发布按钮是否可见
- publish_btn_count = await self.page.locator(
- '.publishBtn, button:has-text("发布")'
- ).count()
- print(
- f"[{self.platform_name}] 检测 {i + 1}: 标题框={title_input_count}, 编辑器={editor_count}, 发布按钮={publish_btn_count}"
- )
- if title_input_count > 0 or (editor_count > 0 and publish_btn_count > 0):
- upload_complete = True
- print(f"[{self.platform_name}] 视频上传完成!")
- break
- if not upload_complete:
- screenshot_base64 = await self.capture_screenshot()
- page_url = await self.get_page_url()
- return PublishResult(
- success=False,
- platform=self.platform_name,
- error="视频上传超时",
- screenshot_base64=screenshot_base64,
- page_url=page_url,
- status="need_action",
- )
- await asyncio.sleep(2)
- self.report_progress(60, "正在填写笔记信息...")
- print(f"[{self.platform_name}] 填写标题: {params.title[:20]}")
- # 填写标题
- title_filled = False
- title_selectors = [
- 'input[placeholder*="标题"]',
- 'input[placeholder*="填写标题"]',
- '[class*="title"] input',
- ".c-input_inner",
- ]
- for selector in title_selectors:
- title_input = self.page.locator(selector).first
- if await title_input.count() > 0:
- await title_input.click()
- await title_input.fill("") # 先清空
- await title_input.fill(params.title[:20])
- title_filled = True
- print(f"[{self.platform_name}] 标题已填写,使用选择器: {selector}")
- break
- if not title_filled:
- print(f"[{self.platform_name}] 警告: 未找到标题输入框")
- # 填写描述和标签
- if params.description or params.tags:
- desc_filled = False
- desc_selectors = [
- '[class*="ql-editor"]',
- '[class*="content-input"] [contenteditable="true"]',
- '[class*="editor"] [contenteditable="true"]',
- ".ql-editor",
- ]
- for selector in desc_selectors:
- desc_input = self.page.locator(selector).first
- if await desc_input.count() > 0:
- await desc_input.click()
- await asyncio.sleep(0.5)
- if params.description:
- await self.page.keyboard.type(params.description, delay=20)
- print(f"[{self.platform_name}] 描述已填写")
- if params.tags:
- # 添加标签
- await self.page.keyboard.press("Enter")
- for tag in params.tags[:5]: # 最多5个标签
- await self.page.keyboard.type(f"#{tag}", delay=20)
- await asyncio.sleep(0.3)
- await self.page.keyboard.press("Space")
- print(f"[{self.platform_name}] 标签已填写: {params.tags[:5]}")
- desc_filled = True
- break
- if not desc_filled:
- print(f"[{self.platform_name}] 警告: 未找到描述输入框")
- await asyncio.sleep(2)
- self.report_progress(80, "正在发布...")
- await asyncio.sleep(2)
- # 滚动到页面底部确保发布按钮可见
- await self.page.evaluate("window.scrollTo(0, document.body.scrollHeight)")
- await asyncio.sleep(1)
- print(f"[{self.platform_name}] 查找发布按钮...")
- # 点击发布
- publish_selectors = [
- "button.publishBtn",
- ".publishBtn",
- "button.d-button.red",
- 'button:has-text("发布"):not(:has-text("定时发布"))',
- '[class*="publish"][class*="btn"]',
- ]
- publish_clicked = False
- for selector in publish_selectors:
- try:
- btn = self.page.locator(selector).first
- if await btn.count() > 0:
- is_visible = await btn.is_visible()
- is_enabled = await btn.is_enabled()
- print(
- f"[{self.platform_name}] 按钮 {selector}: visible={is_visible}, enabled={is_enabled}"
- )
- if is_visible and is_enabled:
- box = await btn.bounding_box()
- if box:
- print(
- f"[{self.platform_name}] 点击发布按钮: {selector}, 位置: ({box['x']}, {box['y']})"
- )
- # 使用真实鼠标点击
- await self.page.mouse.click(
- box["x"] + box["width"] / 2,
- box["y"] + box["height"] / 2,
- )
- publish_clicked = True
- break
- except Exception as e:
- print(f"[{self.platform_name}] 选择器 {selector} 错误: {e}")
- if not publish_clicked:
- try:
- suggest = await self.ai_suggest_playwright_selector(
- "点击小红书发布按钮"
- )
- if suggest.get("has_selector") and suggest.get("selector"):
- sel = suggest.get("selector")
- btn = self.page.locator(sel).first
- if (
- await btn.count() > 0
- and await btn.is_visible()
- and await btn.is_enabled()
- ):
- try:
- await btn.click()
- except:
- box = await btn.bounding_box()
- if box:
- await self.page.mouse.click(
- box["x"] + box["width"] / 2,
- box["y"] + box["height"] / 2,
- )
- publish_clicked = True
- except Exception as e:
- print(f"[{self.platform_name}] AI 点击发布按钮失败: {e}", flush=True)
- if not publish_clicked:
- # 保存截图用于调试
- screenshot_dir = os.path.join(
- os.path.dirname(os.path.dirname(os.path.abspath(__file__))),
- "screenshots",
- )
- os.makedirs(screenshot_dir, exist_ok=True)
- screenshot_path = os.path.join(
- screenshot_dir, f"debug_publish_failed_{self.platform_name}.png"
- )
- await self.page.screenshot(path=screenshot_path, full_page=True)
- print(
- f"[{self.platform_name}] 未找到发布按钮,截图保存到: {screenshot_path}"
- )
- # 打印页面 HTML 结构用于调试
- buttons = await self.page.query_selector_all("button")
- print(f"[{self.platform_name}] 页面上共有 {len(buttons)} 个按钮")
- for i, btn in enumerate(buttons[:10]):
- text = await btn.text_content() or ""
- cls = await btn.get_attribute("class") or ""
- print(f" 按钮 {i}: text='{text.strip()[:30]}', class='{cls[:50]}'")
- raise Exception("未找到发布按钮")
- print(f"[{self.platform_name}] 已点击发布按钮,等待发布完成...")
- self.report_progress(90, "等待发布结果...")
- # 等待发布完成(检测 URL 变化或成功提示)
- publish_success = False
- refresh_retry = 0
- for i in range(20): # 最多等待 20 秒
- await asyncio.sleep(1)
- current_url = self.page.url
- # 检查是否跳转到发布成功页面或内容管理页面
- if (
- "published=true" in current_url
- or "success" in current_url
- or "content" in current_url
- ):
- publish_success = True
- print(f"[{self.platform_name}] 发布成功! 跳转到: {current_url}")
- break
- # 检查是否有成功提示
- try:
- success_msg = await self.page.locator(
- '[class*="success"], .toast-success, [class*="Toast"]'
- ).first.is_visible()
- if success_msg:
- publish_success = True
- print(f"[{self.platform_name}] 检测到成功提示!")
- break
- except:
- pass
- # 检查是否有错误提示
- try:
- error_elements = self.page.locator(
- '[class*="error"], .toast-error, [class*="fail"]'
- )
- if await error_elements.count() > 0:
- first_error = error_elements.first
- if await first_error.is_visible():
- error_text = (await first_error.text_content()) or ""
- error_text = error_text.strip()
- if error_text:
- if "请刷新" in error_text and refresh_retry < 3:
- refresh_retry += 1
- print(
- f"[{self.platform_name}] 检测到临时错误: {error_text},尝试刷新并重试发布({refresh_retry}/3)",
- flush=True,
- )
- try:
- await self.page.reload(
- wait_until="domcontentloaded"
- )
- except Exception:
- pass
- await asyncio.sleep(2)
- await self.page.evaluate(
- "window.scrollTo(0, document.body.scrollHeight)"
- )
- await asyncio.sleep(1)
- republish_clicked = False
- for selector in publish_selectors:
- try:
- btn = self.page.locator(selector).first
- if (
- await btn.count() > 0
- and await btn.is_visible()
- and await btn.is_enabled()
- ):
- try:
- await btn.click()
- except:
- box = await btn.bounding_box()
- if box:
- await self.page.mouse.click(
- box["x"] + box["width"] / 2,
- box["y"] + box["height"] / 2,
- )
- republish_clicked = True
- break
- except:
- continue
- continue
- screenshot_base64 = await self.capture_screenshot()
- page_url = await self.get_page_url()
- return PublishResult(
- success=False,
- platform=self.platform_name,
- error=f"发布失败: {error_text}",
- screenshot_base64=screenshot_base64,
- page_url=page_url,
- status="failed",
- )
- except Exception as e:
- if "发布失败" in str(e):
- raise
- # 如果没有明确的成功标志,返回截图供 AI 分析
- if not publish_success:
- final_url = self.page.url
- print(f"[{self.platform_name}] 发布结果不确定,当前 URL: {final_url}")
- screenshot_base64 = await self.capture_screenshot()
- print(f"[{self.platform_name}] 已获取截图供 AI 分析")
- # 如果 URL 还是发布页面,可能需要继续操作
- if "publish/publish" in final_url:
- return PublishResult(
- success=False,
- platform=self.platform_name,
- error="发布结果待确认,请查看截图",
- screenshot_base64=screenshot_base64,
- page_url=final_url,
- status="need_action",
- )
- self.report_progress(100, "发布完成")
- print(f"[{self.platform_name}] Playwright 方式发布完成!")
- screenshot_base64 = await self.capture_screenshot()
- page_url = await self.get_page_url()
- return PublishResult(
- success=True,
- platform=self.platform_name,
- message="发布完成",
- screenshot_base64=screenshot_base64,
- page_url=page_url,
- status="success",
- )
- async def get_account_info(self, cookies: str) -> dict:
- """获取账号信息"""
- print(f"\n{'=' * 60}")
- print(f"[{self.platform_name}] 获取账号信息")
- print(f"{'=' * 60}")
- captured_info = {}
- try:
- await self.init_browser()
- cookie_list = self.parse_cookies(cookies)
- await self.set_cookies(cookie_list)
- if not self.page:
- raise Exception("Page not initialized")
- # 监听个人信息 API
- async def handle_response(response):
- nonlocal captured_info
- if "api/galaxy/creator/home/personal_info" in response.url:
- try:
- json_data = await response.json()
- print(f"[{self.platform_name}] 捕获个人信息 API", flush=True)
- if json_data.get("success") or json_data.get("code") == 0:
- data = json_data.get("data", {})
- captured_info = {
- "account_id": f"xhs_{data.get('red_num', '')}",
- "account_name": data.get("name", ""),
- "avatar_url": data.get("avatar", ""),
- "fans_count": data.get("fans_count", 0),
- "works_count": 0, # 暂时无法直接获取准确的作品数,需要从作品列表获取
- }
- except Exception as e:
- print(
- f"[{self.platform_name}] 解析个人信息失败: {e}", flush=True
- )
- self.page.on("response", handle_response)
- # 访问首页
- print(f"[{self.platform_name}] 访问创作者首页...", flush=True)
- await self.page.goto(
- "https://creator.xiaohongshu.com/new/home",
- wait_until="domcontentloaded",
- )
- # 等待 API 响应
- for _ in range(10):
- if captured_info:
- break
- await asyncio.sleep(1)
- if not captured_info:
- print(
- f"[{self.platform_name}] 未捕获到个人信息,尝试刷新...", flush=True
- )
- await self.page.reload()
- for _ in range(10):
- if captured_info:
- break
- await asyncio.sleep(1)
- if not captured_info:
- raise Exception("无法获取账号信息")
- # 尝试获取作品数(从首页或其他地方)
- # 或者简单地返回已获取的信息,作品数由 get_works 更新
- return {"success": True, **captured_info}
- except Exception as e:
- import traceback
- traceback.print_exc()
- return {"success": False, "error": str(e)}
- finally:
- await self.close_browser()
- async def get_works(
- self, cookies: str, page: int = 0, page_size: int = 20
- ) -> WorksResult:
- """获取小红书作品列表 - 通过直接调用创作者笔记列表 API 获取"""
- print(f"\n{'=' * 60}", flush=True)
- print(f"[{self.platform_name}] 获取作品列表", flush=True)
- print(f"[{self.platform_name}] page={page}, page_size={page_size}", flush=True)
- print(f"{'=' * 60}", flush=True)
- works: List[WorkItem] = []
- total = 0
- has_more = False
- next_page = ""
- api_page_size = 20
- try:
- await self.init_browser()
- cookie_list = self.parse_cookies(cookies)
- # 打印 cookies 信息用于调试
- print(
- f"[{self.platform_name}] 解析到 {len(cookie_list)} 个 cookies",
- flush=True,
- )
- await self.set_cookies(cookie_list)
- if not self.page:
- raise Exception("Page not initialized")
- # 访问笔记管理页面 - 页面会自动发起 API 请求
- print(f"[{self.platform_name}] 访问笔记管理页面...", flush=True)
- try:
- await self.page.goto(
- "https://creator.xiaohongshu.com/new/note-manager",
- wait_until="domcontentloaded",
- timeout=30000,
- )
- except Exception as nav_error:
- print(
- f"[{self.platform_name}] 导航超时,但继续尝试: {nav_error}",
- flush=True,
- )
- # 检查登录状态
- current_url = self.page.url
- print(f"[{self.platform_name}] 当前页面: {current_url}", flush=True)
- if "login" in current_url:
- raise Exception("Cookie 已过期,请重新登录")
- # 等待页面完全加载,确保签名函数可用
- print(
- f"[{self.platform_name}] 等待页面完全加载和签名函数初始化...",
- flush=True,
- )
- await asyncio.sleep(3)
- # 检查签名函数是否可用
- sign_check_attempts = 0
- max_sign_check_attempts = 10
- while sign_check_attempts < max_sign_check_attempts:
- sign_available = await self.page.evaluate("""() => {
- return typeof window !== 'undefined' && typeof window._webmsxyw === 'function';
- }""")
- if sign_available:
- print(
- f"[{self.platform_name}] ✓ 签名函数 _webmsxyw 已可用",
- flush=True,
- )
- break
- sign_check_attempts += 1
- print(
- f"[{self.platform_name}] ⏳ 等待签名函数... ({sign_check_attempts}/{max_sign_check_attempts})",
- flush=True,
- )
- await asyncio.sleep(1)
- if sign_check_attempts >= max_sign_check_attempts:
- print(
- f"[{self.platform_name}] ⚠️ 警告: 签名函数 _webmsxyw 在 {max_sign_check_attempts} 次检查后仍不可用",
- flush=True,
- )
- print(
- f"[{self.platform_name}] 继续尝试,但 API 调用可能会失败",
- flush=True,
- )
- async def fetch_notes_page(p):
- # 再次检查签名函数(每次调用前都检查)
- sign_available = await self.page.evaluate("""() => {
- return typeof window !== 'undefined' && typeof window._webmsxyw === 'function';
- }""")
- if not sign_available:
- print(
- f"[{self.platform_name}] ⚠️ 签名函数 _webmsxyw 不可用,等待...",
- flush=True,
- )
- await asyncio.sleep(2)
- return await self.page.evaluate(
- """async (pageNum) => {
- try {
- // 使用正确的 API 端点:/api/galaxy/v2/creator/note/user/posted
- const url = `/api/galaxy/v2/creator/note/user/posted?tab=0&page=${pageNum}`;
- const headers = {
- 'Accept': 'application/json, text/plain, */*',
- 'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6',
- 'Referer': 'https://creator.xiaohongshu.com/new/note-manager',
- 'Sec-Fetch-Dest': 'empty',
- 'Sec-Fetch-Mode': 'cors',
- 'Sec-Fetch-Site': 'same-origin'
- };
-
- // 尝试获取签名
- let signResult = { hasSign: false, x_s: '', x_t: '', x_s_common: '', error: '' };
- if (typeof window !== 'undefined' && typeof window._webmsxyw === 'function') {
- try {
- const sign = window._webmsxyw(url, '');
- headers['x-s'] = sign['X-s'];
- headers['x-t'] = String(sign['X-t']);
- // 检查是否有 x-s-common
- if (sign['X-s-common']) {
- headers['x-s-common'] = sign['X-s-common'];
- }
- signResult = {
- hasSign: true,
- x_s: sign['X-s'] ? sign['X-s'].substring(0, 50) + '...' : '',
- x_t: String(sign['X-t']),
- x_s_common: sign['X-s-common'] ? sign['X-s-common'].substring(0, 50) + '...' : '',
- error: ''
- };
- console.log('签名生成成功:', signResult);
- } catch (e) {
- signResult.error = e.toString();
- console.error('签名生成失败:', e);
- }
- } else {
- signResult.error = '_webmsxyw function not found';
- console.error('签名函数不存在');
- }
-
- const res = await fetch(url, {
- method: 'GET',
- credentials: 'include',
- headers
- });
-
- const responseData = await res.json();
- return {
- ...responseData,
- _debug: {
- signResult: signResult,
- status: res.status,
- statusText: res.statusText
- }
- };
- } catch (e) {
- return { success: false, error: e.toString() };
- }
- }""",
- p,
- )
- def parse_notes(notes_list):
- parsed = []
- for note in notes_list:
- note_id = note.get("id", "")
- if not note_id:
- continue
- cover_url = ""
- images_list = note.get("images_list", [])
- if images_list:
- cover_url = images_list[0].get("url", "")
- if cover_url.startswith("http://"):
- cover_url = cover_url.replace("http://", "https://")
- duration = note.get("video_info", {}).get("duration", 0)
- status = "published"
- tab_status = note.get("tab_status", 1)
- if tab_status == 0:
- status = "draft"
- elif tab_status == 2:
- status = "reviewing"
- elif tab_status == 3:
- status = "rejected"
- video_url = (
- f"https://www.xiaohongshu.com/explore/{note_id}"
- if note_id
- else ""
- )
- parsed.append(
- WorkItem(
- work_id=note_id,
- title=note.get("display_title", "") or "无标题",
- cover_url=cover_url,
- video_url=video_url,
- duration=duration,
- status=status,
- publish_time=note.get("time", ""),
- play_count=note.get("view_count", 0),
- like_count=note.get("likes", 0),
- comment_count=note.get("comments_count", 0),
- share_count=note.get("shared_count", 0),
- collect_count=note.get("collected_count", 0),
- )
- )
- return parsed
- resp = None
- for attempt in range(1, 4):
- resp = await fetch_notes_page(page)
- # 打印调试信息
- if resp and isinstance(resp, dict) and resp.get("_debug"):
- debug_info = resp.get("_debug", {})
- sign_result = debug_info.get("signResult", {})
- print(
- f"[{self.platform_name}] 🔍 调试信息: 签名可用: {sign_result.get('hasSign', False)}, X-S: {sign_result.get('x_s', '')}, X-T: {sign_result.get('x_t', '')}, X-S-Common: {sign_result.get('x_s_common', '')}, 签名错误: {sign_result.get('error', '')}, HTTP 状态: {debug_info.get('status', 'N/A')}",
- flush=True,
- )
- resp.pop("_debug", None)
- if (
- resp
- and (resp.get("success") or resp.get("code") == 0)
- and resp.get("data")
- ):
- break
- print(
- f"[{self.platform_name}] 拉取作品列表失败,重试 {attempt}/3: {str(resp)[:200]}",
- flush=True,
- )
- await asyncio.sleep(1.2 * attempt)
- if (
- not resp
- or not (resp.get("success") or resp.get("code") == 0)
- or not resp.get("data")
- ):
- error_msg = resp.get("msg") if isinstance(resp, dict) else str(resp)
- # 打印详细的错误信息
- if isinstance(resp, dict):
- if resp.get("msg"):
- print(
- f"[{self.platform_name}] 错误消息: {resp.get('msg')}",
- flush=True,
- )
- if resp.get("message"):
- print(
- f"[{self.platform_name}] 错误消息: {resp.get('message')}",
- flush=True,
- )
- if resp.get("error"):
- print(
- f"[{self.platform_name}] 错误: {resp.get('error')}",
- flush=True,
- )
- raise Exception(f"无法获取作品列表数据: {error_msg}")
- data = resp.get("data", {}) or {}
- notes = data.get("notes", []) or []
- print(
- f"[{self.platform_name}] 第 {page} 页 notes 数量: {len(notes)}",
- flush=True,
- )
- tags = data.get("tags", []) or []
- if tags:
- preferred = 0
- for tag in tags:
- if tag.get("id") == "special.note_time_desc":
- preferred = (
- tag.get("notes_count", 0)
- or tag.get("notesCount", 0)
- or tag.get("count", 0)
- or 0
- )
- break
- if preferred:
- total = preferred
- else:
- total = max(
- [
- int(
- t.get("notes_count", 0)
- or t.get("notesCount", 0)
- or t.get("count", 0)
- or 0
- )
- for t in tags
- ]
- + [0]
- )
- if not total:
- total = int(
- data.get("total", 0)
- or data.get("total_count", 0)
- or data.get("totalCount", 0)
- or 0
- )
- if not total and isinstance(data.get("page", {}), dict):
- total = int(
- data.get("page", {}).get("total", 0)
- or data.get("page", {}).get("totalCount", 0)
- or 0
- )
- next_page = data.get("page", "")
- if next_page == page:
- next_page = page + 1
- works.extend(parse_notes(notes))
- if total:
- has_more = (page * api_page_size + len(notes)) < total
- if has_more and (
- next_page == -1
- or str(next_page) == "-1"
- or next_page == ""
- or next_page is None
- ):
- next_page = page + 1
- else:
- if len(notes) == 0:
- has_more = False
- else:
- next_resp = await fetch_notes_page(page + 1)
- next_data = (
- (next_resp or {}).get("data", {})
- if isinstance(next_resp, dict)
- else {}
- )
- next_notes = next_data.get("notes", []) or []
- has_more = len(next_notes) > 0
- next_page = next_data.get("page", next_page)
- except Exception as e:
- import traceback
- print(f"[{self.platform_name}] 发生异常: {e}", flush=True)
- traceback.print_exc()
- return WorksResult(success=False, platform=self.platform_name, error=str(e))
- finally:
- # 确保关闭浏览器
- await self.close_browser()
- return WorksResult(
- success=True,
- platform=self.platform_name,
- works=works,
- total=total or (page * api_page_size + len(works)),
- has_more=has_more,
- next_page=next_page,
- )
- async def get_all_works(self, cookies: str) -> WorksResult:
- """获取小红书全部作品(单次请求内自动翻页抓全量,避免 Node 侧分页不一致)"""
- print(f"\n{'=' * 60}", flush=True)
- print(f"[{self.platform_name}] 获取全部作品(auto paging)", flush=True)
- print(f"{'=' * 60}", flush=True)
- works: List[WorkItem] = []
- total = 0
- seen_ids = set()
- cursor: object = 0
- max_iters = 800
- api_page_size = 20
- try:
- await self.init_browser()
- cookie_list = self.parse_cookies(cookies)
- print(
- f"[{self.platform_name}] 解析到 {len(cookie_list)} 个 cookies",
- flush=True,
- )
- await self.set_cookies(cookie_list)
- if not self.page:
- raise Exception("Page not initialized")
- print(f"[{self.platform_name}] 访问笔记管理页面...", flush=True)
- try:
- await self.page.goto(
- "https://creator.xiaohongshu.com/new/note-manager",
- wait_until="domcontentloaded",
- timeout=60000,
- )
- print(f"[{self.platform_name}] 页面加载成功", flush=True)
- except Exception as nav_error:
- print(
- f"[{self.platform_name}] 导航超时,但继续尝试: {nav_error}",
- flush=True,
- )
- # 即使超时也检查当前页面状态
- try:
- await asyncio.sleep(2)
- current_url = self.page.url
- print(
- f"[{self.platform_name}] 超时后当前页面: {current_url}",
- flush=True,
- )
- except Exception as e:
- print(f"[{self.platform_name}] 检查页面状态时出错: {e}", flush=True)
- current_url = self.page.url
- print(f"[{self.platform_name}] 当前页面: {current_url}", flush=True)
- if "login" in current_url:
- raise Exception("Cookie 已过期,请重新登录")
- # 等待页面完全加载,确保签名函数可用
- print(
- f"[{self.platform_name}] 等待页面完全加载和签名函数初始化...",
- flush=True,
- )
- await asyncio.sleep(3)
- # 检查签名函数是否可用
- sign_check_attempts = 0
- max_sign_check_attempts = 10
- while sign_check_attempts < max_sign_check_attempts:
- sign_available = await self.page.evaluate("""() => {
- return typeof window !== 'undefined' && typeof window._webmsxyw === 'function';
- }""")
- if sign_available:
- print(
- f"[{self.platform_name}] ✓ 签名函数 _webmsxyw 已可用",
- flush=True,
- )
- break
- sign_check_attempts += 1
- print(
- f"[{self.platform_name}] ⏳ 等待签名函数... ({sign_check_attempts}/{max_sign_check_attempts})",
- flush=True,
- )
- await asyncio.sleep(1)
- if sign_check_attempts >= max_sign_check_attempts:
- print(
- f"[{self.platform_name}] ⚠️ 警告: 签名函数 _webmsxyw 在 {max_sign_check_attempts} 次检查后仍不可用",
- flush=True,
- )
- print(
- f"[{self.platform_name}] 继续尝试,但 API 调用可能会失败",
- flush=True,
- )
- async def fetch_notes_page(p):
- # 再次检查签名函数(每次调用前都检查)
- sign_available = await self.page.evaluate("""() => {
- return typeof window !== 'undefined' && typeof window._webmsxyw === 'function';
- }""")
- if not sign_available:
- print(
- f"[{self.platform_name}] ⚠️ 签名函数 _webmsxyw 不可用,等待...",
- flush=True,
- )
- await asyncio.sleep(2)
- return await self.page.evaluate(
- """async (pageNum) => {
- try {
- // 使用正确的 API 端点:/api/galaxy/v2/creator/note/user/posted
- const url = `/api/galaxy/v2/creator/note/user/posted?tab=0&page=${pageNum}`;
- const headers = {
- 'Accept': 'application/json, text/plain, */*',
- 'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6',
- 'Referer': 'https://creator.xiaohongshu.com/new/note-manager',
- 'Sec-Fetch-Dest': 'empty',
- 'Sec-Fetch-Mode': 'cors',
- 'Sec-Fetch-Site': 'same-origin'
- };
-
- // 尝试获取签名
- let signResult = { hasSign: false, x_s: '', x_t: '', x_s_common: '', error: '' };
- if (typeof window !== 'undefined' && typeof window._webmsxyw === 'function') {
- try {
- const sign = window._webmsxyw(url, '');
- headers['x-s'] = sign['X-s'];
- headers['x-t'] = String(sign['X-t']);
- // 检查是否有 x-s-common
- if (sign['X-s-common']) {
- headers['x-s-common'] = sign['X-s-common'];
- }
- signResult = {
- hasSign: true,
- x_s: sign['X-s'] ? sign['X-s'].substring(0, 50) + '...' : '',
- x_t: String(sign['X-t']),
- x_s_common: sign['X-s-common'] ? sign['X-s-common'].substring(0, 50) + '...' : '',
- error: ''
- };
- console.log('签名生成成功:', signResult);
- } catch (e) {
- signResult.error = e.toString();
- console.error('签名生成失败:', e);
- }
- } else {
- signResult.error = '_webmsxyw function not found';
- console.error('签名函数不存在');
- }
-
- const res = await fetch(url, {
- method: 'GET',
- credentials: 'include',
- headers
- });
-
- const responseData = await res.json();
- return {
- ...responseData,
- _debug: {
- signResult: signResult,
- status: res.status,
- statusText: res.statusText
- }
- };
- } catch (e) {
- return { success: false, error: e.toString() };
- }
- }""",
- p,
- )
- def parse_notes(notes_list):
- parsed = []
- for note in notes_list:
- note_id = note.get("id", "")
- if not note_id:
- continue
- cover_url = ""
- images_list = note.get("images_list", [])
- if images_list:
- cover_url = images_list[0].get("url", "")
- if cover_url.startswith("http://"):
- cover_url = cover_url.replace("http://", "https://")
- duration = note.get("video_info", {}).get("duration", 0)
- status = "published"
- tab_status = note.get("tab_status", 1)
- if tab_status == 0:
- status = "draft"
- elif tab_status == 2:
- status = "reviewing"
- elif tab_status == 3:
- status = "rejected"
- video_url = (
- f"https://www.xiaohongshu.com/explore/{note_id}"
- if note_id
- else ""
- )
- parsed.append(
- WorkItem(
- work_id=note_id,
- title=note.get("display_title", "") or "无标题",
- cover_url=cover_url,
- video_url=video_url,
- duration=duration,
- status=status,
- publish_time=note.get("time", ""),
- play_count=note.get("view_count", 0),
- like_count=note.get("likes", 0),
- comment_count=note.get("comments_count", 0),
- share_count=note.get("shared_count", 0),
- collect_count=note.get("collected_count", 0),
- )
- )
- return parsed
- async def collect_by_scrolling() -> WorksResult:
- print(
- f"[{self.platform_name}] 直连接口被拒绝,切换为滚动页面 + 监听 API 响应模式",
- flush=True,
- )
- captured: List[WorkItem] = []
- captured_total = 0
- captured_seen = set()
- lock = asyncio.Lock()
- async def handle_response(response):
- nonlocal captured_total
- url = response.url
- if (
- "creator.xiaohongshu.com" not in url
- and "edith.xiaohongshu.com" not in url
- ) or "creator/note/user/posted" not in url:
- return
- try:
- json_data = await response.json()
- except Exception:
- return
- if not isinstance(json_data, dict):
- return
- if not (
- json_data.get("success") or json_data.get("code") == 0
- ) or not json_data.get("data"):
- return
- data = json_data.get("data", {}) or {}
- notes = data.get("notes", []) or []
- tags = data.get("tags", []) or []
- declared = 0
- if tags:
- preferred = 0
- for tag in tags:
- if tag.get("id") == "special.note_time_desc":
- preferred = (
- tag.get("notes_count", 0)
- or tag.get("notesCount", 0)
- or tag.get("count", 0)
- or 0
- )
- break
- if preferred:
- declared = int(preferred)
- else:
- declared = max(
- [
- int(
- t.get("notes_count", 0)
- or t.get("notesCount", 0)
- or t.get("count", 0)
- or 0
- )
- for t in tags
- ]
- + [0]
- )
- if not declared:
- declared = int(
- data.get("total", 0)
- or data.get("total_count", 0)
- or data.get("totalCount", 0)
- or 0
- )
- if not declared and isinstance(data.get("page", {}), dict):
- declared = int(
- data.get("page", {}).get("total", 0)
- or data.get("page", {}).get("totalCount", 0)
- or 0
- )
- async with lock:
- if declared:
- captured_total = max(captured_total, declared)
- parsed = parse_notes(notes)
- new_count = 0
- for w in parsed:
- if w.work_id and w.work_id not in captured_seen:
- captured_seen.add(w.work_id)
- captured.append(w)
- new_count += 1
- if new_count > 0:
- print(
- f"[{self.platform_name}] 捕获 notes 响应: notes={len(notes)}, new={new_count}, total_now={len(captured)}, declared_total={captured_total}",
- flush=True,
- )
- self.page.on("response", handle_response)
- try:
- try:
- # 使用更宽松的等待条件,避免超时
- await self.page.goto(
- "https://creator.xiaohongshu.com/new/note-manager",
- wait_until="domcontentloaded",
- timeout=90000,
- )
- print(f"[{self.platform_name}] 页面加载成功", flush=True)
- except Exception as nav_error:
- print(
- f"[{self.platform_name}] 导航异常(继续):{nav_error}",
- flush=True,
- )
- # 即使超时也继续尝试,可能页面已经部分加载
- try:
- await asyncio.sleep(3)
- current_url = self.page.url
- print(
- f"[{self.platform_name}] 超时后当前页面: {current_url}",
- flush=True,
- )
- if "login" in current_url:
- raise Exception("Cookie 已过期,请重新登录")
- except Exception as e:
- if "Cookie" in str(e):
- raise
- print(
- f"[{self.platform_name}] 检查页面状态时出错: {e}",
- flush=True,
- )
- await asyncio.sleep(2.0)
- idle_rounds = 0
- last_count = 0
- last_height = 0
- for _ in range(1, 400):
- scroll_state = await self.page.evaluate(
- """() => {
- const isScrollable = (el) => {
- if (!el) return false;
- const style = window.getComputedStyle(el);
- const oy = style.overflowY;
- return (oy === 'auto' || oy === 'scroll') && (el.scrollHeight - el.clientHeight > 200);
- };
- const pickBest = () => {
- const nodes = Array.from(document.querySelectorAll('*'));
- let best = document.scrollingElement || document.documentElement || document.body;
- let bestScroll = (best.scrollHeight || 0) - (best.clientHeight || 0);
- for (const el of nodes) {
- if (!isScrollable(el)) continue;
- const diff = el.scrollHeight - el.clientHeight;
- if (diff > bestScroll) {
- best = el;
- bestScroll = diff;
- }
- }
- return best;
- };
- const el = pickBest();
- const beforeTop = el.scrollTop || 0;
- const beforeHeight = el.scrollHeight || 0;
- el.scrollTo(0, beforeHeight);
- return {
- beforeTop,
- afterTop: el.scrollTop || 0,
- height: el.scrollHeight || 0,
- client: el.clientHeight || 0,
- };
- }"""
- )
- await asyncio.sleep(1.2)
- async with lock:
- count_now = len(captured)
- total_now = captured_total
- if total_now and count_now >= total_now:
- break
- height_now = (
- int(scroll_state.get("height", 0) or 0)
- if isinstance(scroll_state, dict)
- else 0
- )
- if count_now == last_count and height_now == last_height:
- idle_rounds += 1
- else:
- idle_rounds = 0
- last_count = count_now
- last_height = height_now
- if idle_rounds >= 6:
- break
- async with lock:
- final_works = list(captured)
- final_total = captured_total or len(final_works)
- return WorksResult(
- success=True,
- platform=self.platform_name,
- works=final_works,
- total=final_total,
- has_more=False,
- next_page=-1,
- )
- finally:
- try:
- self.page.remove_listener("response", handle_response)
- except Exception:
- pass
- # 添加请求监听,捕获请求头信息
- captured_requests = []
- async def handle_request(request):
- url = request.url
- if (
- "creator.xiaohongshu.com" in url or "edith.xiaohongshu.com" in url
- ) and "creator/note/user/posted" in url:
- headers = request.headers
- captured_requests.append(
- {
- "url": url,
- "method": request.method,
- "headers": dict(headers),
- "timestamp": asyncio.get_event_loop().time(),
- }
- )
- # 打印关键头部信息
- x_s = headers.get("x-s", "")
- x_t = headers.get("x-t", "")
- x_s_common = headers.get("x-s-common", "")
- print(f"[{self.platform_name}] 📡 API 请求: {url}", flush=True)
- print(
- f"[{self.platform_name}] Method: {request.method}",
- flush=True,
- )
- print(
- f"[{self.platform_name}] X-S: {x_s[:50] if x_s else '(none)'}...",
- flush=True,
- )
- print(f"[{self.platform_name}] X-T: {x_t}", flush=True)
- print(
- f"[{self.platform_name}] X-S-Common: {x_s_common[:50] if x_s_common else '(none)'}...",
- flush=True,
- )
- print(
- f"[{self.platform_name}] Cookie: {headers.get('cookie', '')[:100]}...",
- flush=True,
- )
- self.page.on("request", handle_request)
- iters = 0
- page_count = 0 # 统计实际获取到的页数
- print(
- f"[{self.platform_name}] ========== 开始自动分页获取作品 ==========",
- flush=True,
- )
- print(
- f"[{self.platform_name}] 最大迭代次数: {max_iters}, 每页大小: {api_page_size}",
- flush=True,
- )
- while iters < max_iters:
- iters += 1
- print(
- f"\n[{self.platform_name}] ---------- 第 {iters} 次请求 (cursor={cursor}) ----------",
- flush=True,
- )
- resp = await fetch_notes_page(cursor)
- # 打印调试信息
- if resp and isinstance(resp, dict) and resp.get("_debug"):
- debug_info = resp.get("_debug", {})
- sign_result = debug_info.get("signResult", {})
- print(f"[{self.platform_name}] 🔍 调试信息:", flush=True)
- print(
- f"[{self.platform_name}] 签名可用: {sign_result.get('hasSign', False)}",
- flush=True,
- )
- if sign_result.get("x_s"):
- print(
- f"[{self.platform_name}] X-S: {sign_result.get('x_s', '')}",
- flush=True,
- )
- if sign_result.get("x_t"):
- print(
- f"[{self.platform_name}] X-T: {sign_result.get('x_t', '')}",
- flush=True,
- )
- if sign_result.get("error"):
- print(
- f"[{self.platform_name}] 签名错误: {sign_result.get('error', '')}",
- flush=True,
- )
- print(
- f"[{self.platform_name}] HTTP 状态: {debug_info.get('status', 'N/A')} {debug_info.get('statusText', '')}",
- flush=True,
- )
- # 移除调试信息,避免影响后续处理
- resp.pop("_debug", None)
- if not resp or not isinstance(resp, dict):
- print(
- f"[{self.platform_name}] ❌ 第 {iters} 次拉取无响应,cursor={cursor}",
- flush=True,
- )
- print(
- f"[{self.platform_name}] 响应类型: {type(resp)}, 响应内容: {str(resp)[:500]}",
- flush=True,
- )
- break
- if not (resp.get("success") or resp.get("code") == 0) or not resp.get(
- "data"
- ):
- error_msg = str(resp)[:500]
- print(
- f"[{self.platform_name}] ❌ 拉取失败 cursor={cursor}",
- flush=True,
- )
- print(f"[{self.platform_name}] 响应详情: {error_msg}", flush=True)
- print(
- f"[{self.platform_name}] success={resp.get('success')}, code={resp.get('code')}, has_data={bool(resp.get('data'))}",
- flush=True,
- )
- # 打印详细的错误信息
- if resp.get("msg"):
- print(
- f"[{self.platform_name}] 错误消息: {resp.get('msg')}",
- flush=True,
- )
- if resp.get("message"):
- print(
- f"[{self.platform_name}] 错误消息: {resp.get('message')}",
- flush=True,
- )
- if resp.get("error"):
- print(
- f"[{self.platform_name}] 错误: {resp.get('error')}",
- flush=True,
- )
- # 打印调试信息
- if resp.get("_debug"):
- debug_info = resp.get("_debug", {})
- print(
- f"[{self.platform_name}] HTTP 状态: {debug_info.get('status', 'N/A')} {debug_info.get('statusText', '')}",
- flush=True,
- )
- sign_result = debug_info.get("signResult", {})
- if sign_result.get("error"):
- print(
- f"[{self.platform_name}] 签名错误: {sign_result.get('error')}",
- flush=True,
- )
- if iters == 1:
- print(
- f"[{self.platform_name}] 第一次请求失败,切换到滚动模式",
- flush=True,
- )
- return await collect_by_scrolling()
- break
- data = resp.get("data", {}) or {}
- notes = data.get("notes", []) or []
- if not notes:
- print(
- f"[{self.platform_name}] ⚠️ cursor={cursor} 无作品,停止分页",
- flush=True,
- )
- break
- # 统计页数
- page_count += 1
- print(
- f"[{self.platform_name}] ✅ 第 {page_count} 页获取成功,本页作品数: {len(notes)}",
- flush=True,
- )
- tags = data.get("tags", []) or []
- if tags:
- preferred = 0
- for tag in tags:
- if tag.get("id") == "special.note_time_desc":
- preferred = (
- tag.get("notes_count", 0)
- or tag.get("notesCount", 0)
- or tag.get("count", 0)
- or 0
- )
- break
- if preferred:
- total = max(total, int(preferred))
- print(
- f"[{self.platform_name}] 📊 从 tags 获取总数: {total} (preferred)",
- flush=True,
- )
- else:
- tag_total = max(
- [
- int(
- t.get("notes_count", 0)
- or t.get("notesCount", 0)
- or t.get("count", 0)
- or 0
- )
- for t in tags
- ]
- + [0]
- )
- total = max(total, tag_total)
- if tag_total > 0:
- print(
- f"[{self.platform_name}] 📊 从 tags 获取总数: {total}",
- flush=True,
- )
- if not total:
- t2 = int(
- data.get("total", 0)
- or data.get("total_count", 0)
- or data.get("totalCount", 0)
- or 0
- )
- if not t2 and isinstance(data.get("page", {}), dict):
- t2 = int(
- data.get("page", {}).get("total", 0)
- or data.get("page", {}).get("totalCount", 0)
- or 0
- )
- total = max(total, t2)
- if t2 > 0:
- print(
- f"[{self.platform_name}] 📊 从 data.total 获取总数: {total}",
- flush=True,
- )
- parsed = parse_notes(notes)
- new_items = []
- for w in parsed:
- if w.work_id and w.work_id not in seen_ids:
- seen_ids.add(w.work_id)
- new_items.append(w)
- works.extend(new_items)
- print(
- f"[{self.platform_name}] 📈 累计统计: 本页新作品={len(new_items)}, 累计作品数={len(works)}, 声明总数={total}",
- flush=True,
- )
- if total and len(works) >= total:
- print(
- f"[{self.platform_name}] ✅ 已获取全部作品 (累计={len(works)} >= 总数={total}),停止分页",
- flush=True,
- )
- break
- if len(new_items) == 0:
- print(
- f"[{self.platform_name}] ⚠️ 本页无新作品,停止分页", flush=True
- )
- break
- next_page = data.get("page", "")
- old_cursor = cursor
- if next_page == cursor:
- next_page = ""
- if next_page == -1 or str(next_page) == "-1":
- next_page = ""
- if next_page is None or next_page == "":
- if isinstance(cursor, int):
- cursor = cursor + 1
- else:
- cursor = len(works) // api_page_size
- print(
- f"[{self.platform_name}] 🔄 下一页 cursor: {old_cursor} -> {cursor} (自动递增)",
- flush=True,
- )
- else:
- cursor = next_page
- print(
- f"[{self.platform_name}] 🔄 下一页 cursor: {old_cursor} -> {cursor} (API返回)",
- flush=True,
- )
- await asyncio.sleep(0.5)
- # 移除请求监听器
- try:
- self.page.remove_listener("request", handle_request)
- except Exception:
- pass
- print(
- f"\n[{self.platform_name}] ========== 分页完成 ==========", flush=True
- )
- print(
- f"[{self.platform_name}] 📊 分页统计: 总请求次数={iters}, 成功获取页数={page_count}, 累计作品数={len(works)}, 声明总数={total}",
- flush=True,
- )
- if captured_requests:
- print(
- f"[{self.platform_name}] 📡 捕获到 {len(captured_requests)} 个 API 请求",
- flush=True,
- )
- for i, req in enumerate(captured_requests[:3], 1): # 只显示前3个
- print(
- f"[{self.platform_name}] 请求 {i}: {req['method']} {req['url']}",
- flush=True,
- )
- if "x-s" in req["headers"]:
- print(
- f"[{self.platform_name}] X-S: {req['headers']['x-s'][:50]}...",
- flush=True,
- )
- if "x-t" in req["headers"]:
- print(
- f"[{self.platform_name}] X-T: {req['headers']['x-t']}",
- flush=True,
- )
- print(
- f"[{self.platform_name}] ========================================\n",
- flush=True,
- )
- except Exception as e:
- import traceback
- error_trace = traceback.format_exc()
- print(f"[{self.platform_name}] 发生异常: {e}", flush=True)
- traceback.print_exc()
- return WorksResult(
- success=False,
- platform=self.platform_name,
- error=str(e),
- debug_info=f"异常详情: {error_trace[:500]}",
- )
- finally:
- await self.close_browser()
- debug_info = f"总请求次数={iters}, 成功获取页数={page_count}, 累计作品数={len(works)}, 声明总数={total}"
- if len(works) == 0:
- debug_info += " | 警告: 没有获取到任何作品,可能原因: Cookie失效、API调用失败、或账号无作品"
- return WorksResult(
- success=True,
- platform=self.platform_name,
- works=works,
- total=total or len(works),
- has_more=False,
- next_page=-1,
- debug_info=debug_info,
- )
- async def get_comments(
- self, cookies: str, work_id: str, cursor: str = ""
- ) -> CommentsResult:
- """
- 获取账号下所有作品的评论 —— 完全复刻 get_xiaohongshu_work_comments.py 的7步流程。
- """
- all_comments: List[CommentItem] = []
- total_comments = 0
- has_more = False
- browser = None
- print(222222222222222222222222222222222222)
- print(work_id)
- global stored_cookies
- try:
- # --- Step 1: 初始化浏览器和 Cookie ---
- cookie_list = self.parse_cookies(cookies)
- playwright = await async_playwright().start()
- browser = await playwright.chromium.launch(headless=False)
- context = await browser.new_context(
- viewport={"width": 1400, "height": 900},
- user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
- )
- if os.path.exists("cookies.json"):
- with open("cookies.json", "r") as f:
- stored_cookies = json.load(f)
- if stored_cookies:
- await context.add_cookies(stored_cookies)
- page = await context.new_page()
- # --- Step 2: 打开小红书主页 ---
- await page.goto(
- "https://www.xiaohongshu.com", wait_until="domcontentloaded"
- )
- await asyncio.sleep(1.5)
- # --- Step 3: 检查并处理登录弹窗 ---
- try:
- if await page.is_visible(".login-container", timeout=3000):
- await page.wait_for_selector(
- ".login-container", state="hidden", timeout=120000
- )
- stored_cookies = await context.cookies()
- with open("xiaohongshu_cookies.json", "w") as f:
- json.dump(stored_cookies, f)
- except Exception as e:
- pass # 忽略超时,继续执行
- # --- 提取 User ID ---
- user_id = None
- for cookie in cookie_list:
- if cookie.get("name") == "x-user-id-creator.xiaohongshu.com":
- user_id = cookie.get("value")
- break
- if not user_id:
- raise ValueError("无法从 Cookie 中提取 user_id")
- # --- Step 4: 跳转到用户主页 ---
- profile_url = f"https://www.xiaohongshu.com/user/profile/{user_id}"
- await page.goto(profile_url, wait_until="domcontentloaded")
- await asyncio.sleep(2)
- # --- 等待笔记区域加载 ---
- try:
- await page.wait_for_selector(
- "#userPostedFeeds .note-item", timeout=20000
- )
- except:
- raise Exception("笔记区域未加载,请检查账号是否公开或 Cookie 是否有效")
- # --- Step 5: 滚动到底部加载全部笔记 ---
- last_height = None
- while True:
- await page.evaluate("window.scrollTo(0, document.body.scrollHeight)")
- await asyncio.sleep(2)
- new_height = await page.evaluate("document.body.scrollHeight")
- if new_height == last_height:
- break
- last_height = new_height
- # --- 获取所有封面图 ---
- note_imgs = await page.query_selector_all(
- "#userPostedFeeds .note-item .cover img"
- )
- print(f"共找到 {len(note_imgs)} 张封面图")
- # --- Step 6 & 7: 依次点击封面图,捕获评论并结构化 ---
- for i, img in enumerate(note_imgs):
- try:
- # >>> 新增:从 img 提取 note_id 并与 work_id 比较 <<<
- note_id = await img.evaluate("""el => {
- const item = el.closest('.note-item');
- if (!item) return null;
- const link = item.querySelector('a[href^="/explore/"]');
- return link ? link.href.split('/').pop() : null;
- }""")
- if note_id != work_id:
- print(
- f"note_id {note_id} 与目标 work_id {work_id} 不匹配,跳出循环"
- )
- continue
- # <<< 新增结束 >>>
- await img.scroll_into_view_if_needed()
- await asyncio.sleep(0.5)
- comment_resp = None
- def handle_response(response):
- nonlocal comment_resp
- if (
- "edith.xiaohongshu.com/api/sns/web/v2/comment/page"
- in response.url
- ):
- comment_resp = response
- page.on("response", handle_response)
- await img.click()
- await asyncio.sleep(1.5)
- page.remove_listener("response", handle_response)
- if not comment_resp:
- await page.keyboard.press("Escape")
- continue
- json_data = await comment_resp.json()
- if not (json_data.get("success") or json_data.get("code") == 0):
- await page.keyboard.press("Escape")
- continue
- data = json_data.get("data", {})
- raw_comments = data.get("comments", [])
- note_id = data.get("note_id", "")
- for main_cmt in raw_comments:
- # 主评论
- user_info = main_cmt.get("user_info", {})
- all_comments.append(
- CommentItem(
- comment_id=main_cmt["id"],
- parent_comment_id=None,
- work_id=work_id,
- content=main_cmt["content"],
- author_id=user_info.get("user_id", ""),
- author_name=user_info.get("nickname", ""),
- author_avatar=user_info.get("image", ""),
- like_count=int(main_cmt.get("like_count", 0)),
- reply_count=main_cmt.get("sub_comment_count", 0),
- create_time=self._timestamp_to_readable(
- main_cmt.get("create_time", 0)
- ),
- )
- )
- # 子评论
- for sub_cmt in main_cmt.get("sub_comments", []):
- sub_user = sub_cmt.get("user_info", {})
- all_comments.append(
- CommentItem(
- comment_id=sub_cmt["id"],
- parent_comment_id=main_cmt["id"],
- work_id=work_id,
- content=sub_cmt["content"],
- author_id=sub_user.get("user_id", ""),
- author_name=sub_user.get("nickname", ""),
- author_avatar=sub_user.get("image", ""),
- like_count=int(sub_cmt.get("like_count", 0)),
- reply_count=0,
- create_time=self._timestamp_to_readable(
- sub_cmt.get("create_time", 0)
- ),
- )
- )
- # 关闭弹窗
- await page.keyboard.press("Escape")
- await asyncio.sleep(1)
- except Exception as e:
- # 出错也尝试关闭弹窗
- try:
- await page.keyboard.press("Escape")
- await asyncio.sleep(0.5)
- except:
- pass
- continue
- # --- 返回结果 ---
- total_comments = len(all_comments)
- # return {
- # 'success': True,
- # 'platform': self.platform_name,
- # 'work_comments': all_comments, # 注意:此处为扁平列表,如需按作品分组可在外层处理
- # 'total': total_comments
- # }
- return CommentsResult(
- success=True,
- platform=self.platform_name,
- work_id=work_id,
- comments=all_comments,
- total=total_comments,
- has_more=has_more,
- )
- except Exception as e:
- import traceback
- traceback.print_exc()
- return CommentsResult(
- success=True, platform=self.platform_name, work_id=work_id, total=0
- )
- finally:
- if browser:
- await browser.close()
- def _timestamp_to_readable(self, ts_ms: int) -> str:
- """将毫秒时间戳转换为可读格式"""
- from datetime import datetime
- if not ts_ms:
- return ""
- try:
- return datetime.fromtimestamp(ts_ms / 1000).strftime("%Y-%m-%d %H:%M:%S")
- except Exception:
- return ""
- async def get_all_comments(self, cookies: str) -> dict:
- """获取所有作品的评论 - 通过评论管理页面"""
- print(f"\n{'=' * 60}")
- print(f"[{self.platform_name}] 获取所有作品评论")
- print(f"{'=' * 60}")
- all_work_comments = []
- captured_comments = []
- captured_notes = {} # note_id -> note_info
- try:
- await self.init_browser()
- cookie_list = self.parse_cookies(cookies)
- await self.set_cookies(cookie_list)
- if not self.page:
- raise Exception("Page not initialized")
- # 设置 API 响应监听器
- async def handle_response(response):
- nonlocal captured_comments, captured_notes
- url = response.url
- try:
- # 监听评论列表 API - 多种格式
- if "/comment/" in url and ("page" in url or "list" in url):
- json_data = await response.json()
- print(
- f"[{self.platform_name}] 捕获到评论 API: {url[:100]}...",
- flush=True,
- )
- if json_data.get("success") or json_data.get("code") == 0:
- data = json_data.get("data", {})
- comments = data.get("comments", []) or data.get("list", [])
- # 从 URL 中提取 note_id
- import re
- note_id_match = re.search(r"note_id=([^&]+)", url)
- note_id = note_id_match.group(1) if note_id_match else ""
- if comments:
- for comment in comments:
- # 添加 note_id 到评论中
- if note_id and "note_id" not in comment:
- comment["note_id"] = note_id
- captured_comments.append(comment)
- print(
- f"[{self.platform_name}] 捕获到 {len(comments)} 条评论 (note_id={note_id}),总计: {len(captured_comments)}",
- flush=True,
- )
- # 监听笔记列表 API
- if "/note/" in url and (
- "list" in url or "posted" in url or "manager" in url
- ):
- json_data = await response.json()
- if json_data.get("success") or json_data.get("code") == 0:
- data = json_data.get("data", {})
- notes = data.get("notes", []) or data.get("list", [])
- print(
- f"[{self.platform_name}] 捕获到笔记列表 API: {len(notes)} 个笔记",
- flush=True,
- )
- for note in notes:
- note_id = note.get("note_id", "") or note.get("id", "")
- if note_id:
- cover_url = ""
- cover = note.get("cover", {})
- if isinstance(cover, dict):
- cover_url = cover.get("url", "") or cover.get(
- "url_default", ""
- )
- elif isinstance(cover, str):
- cover_url = cover
- captured_notes[note_id] = {
- "title": note.get("title", "")
- or note.get("display_title", ""),
- "cover": cover_url,
- }
- except Exception as e:
- print(f"[{self.platform_name}] 解析响应失败: {e}", flush=True)
- self.page.on("response", handle_response)
- print(f"[{self.platform_name}] 已注册 API 响应监听器", flush=True)
- # 访问评论管理页面
- print(f"[{self.platform_name}] 访问评论管理页面...", flush=True)
- await self.page.goto(
- "https://creator.xiaohongshu.com/creator/comment",
- wait_until="domcontentloaded",
- timeout=30000,
- )
- await asyncio.sleep(5)
- # 检查登录状态
- current_url = self.page.url
- if "login" in current_url:
- raise Exception("Cookie 已过期,请重新登录")
- print(
- f"[{self.platform_name}] 页面加载完成,当前捕获: {len(captured_comments)} 条评论, {len(captured_notes)} 个笔记",
- flush=True,
- )
- # 滚动加载更多评论
- for i in range(5):
- await self.page.evaluate("window.scrollBy(0, 500)")
- await asyncio.sleep(1)
- await asyncio.sleep(3)
- # 移除监听器
- self.page.remove_listener("response", handle_response)
- print(
- f"[{self.platform_name}] 最终捕获: {len(captured_comments)} 条评论, {len(captured_notes)} 个笔记",
- flush=True,
- )
- # 按作品分组评论
- work_comments_map = {} # note_id -> work_comments
- for comment in captured_comments:
- # 获取笔记信息
- note_info = comment.get("note_info", {}) or comment.get("note", {})
- note_id = (
- comment.get("note_id", "")
- or note_info.get("note_id", "")
- or note_info.get("id", "")
- )
- if not note_id:
- continue
- if note_id not in work_comments_map:
- saved_note = captured_notes.get(note_id, {})
- cover_url = ""
- cover = note_info.get("cover", {})
- if isinstance(cover, dict):
- cover_url = cover.get("url", "") or cover.get("url_default", "")
- elif isinstance(cover, str):
- cover_url = cover
- if not cover_url:
- cover_url = saved_note.get("cover", "")
- work_comments_map[note_id] = {
- "work_id": note_id,
- "title": note_info.get("title", "")
- or note_info.get("display_title", "")
- or saved_note.get("title", ""),
- "cover_url": cover_url,
- "comments": [],
- }
- cid = comment.get("id", "") or comment.get("comment_id", "")
- if not cid:
- continue
- user_info = comment.get("user_info", {}) or comment.get("user", {})
- work_comments_map[note_id]["comments"].append(
- {
- "comment_id": cid,
- "author_id": user_info.get("user_id", "")
- or user_info.get("id", ""),
- "author_name": user_info.get("nickname", "")
- or user_info.get("name", ""),
- "author_avatar": user_info.get("image", "")
- or user_info.get("avatar", ""),
- "content": comment.get("content", ""),
- "like_count": comment.get("like_count", 0),
- "create_time": comment.get("create_time", ""),
- }
- )
- all_work_comments = list(work_comments_map.values())
- total_comments = sum(len(w["comments"]) for w in all_work_comments)
- print(
- f"[{self.platform_name}] 获取到 {len(all_work_comments)} 个作品的 {total_comments} 条评论",
- flush=True,
- )
- except Exception as e:
- import traceback
- traceback.print_exc()
- return {
- "success": False,
- "platform": self.platform_name,
- "error": str(e),
- "work_comments": [],
- }
- finally:
- await self.close_browser()
- return {
- "success": True,
- "platform": self.platform_name,
- "work_comments": all_work_comments,
- "total": len(all_work_comments),
- }
- async def get_note_base(self, cookies: str, note_id: str) -> dict:
- """
- 调用创作者中心「笔记数据- note/base」接口,用于每日作品数据同步。
- 使用账号已存 Cookie,不启浏览器,直接带 Referer 调 note/base。
- """
- import aiohttp
- note_id = (note_id or "").strip()
- if not note_id:
- return {"data": None, "code": -1, "msg": "missing note_id"}
- cookie_list = self.parse_cookies(cookies)
- cookie_dict = {
- c.get("name") or "": c.get("value") or ""
- for c in cookie_list
- if c.get("name")
- }
- api_headers = {
- "Accept": "application/json, text/plain, */*",
- "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
- "Referer": f"https://creator.xiaohongshu.com/statistics/note-detail?noteId={note_id}",
- "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8",
- "Accept-Encoding": "gzip, deflate, br",
- "Connection": "keep-alive",
- }
- api_url = f"https://creator.xiaohongshu.com/api/galaxy/creator/datacenter/note/base?note_id={note_id}"
- async with aiohttp.ClientSession(cookies=cookie_dict) as session:
- async with session.get(
- api_url,
- headers=api_headers,
- timeout=aiohttp.ClientTimeout(total=30),
- ) as resp:
- status = resp.status
- try:
- data = await resp.json()
- except Exception:
- text = await resp.text()
- print(f"[{self.platform_name}] note/base non-JSON: {text[:500]}")
- return {"data": None, "code": status, "msg": "invalid response"}
- if status != 200:
- return {
- "data": None,
- "code": status,
- "msg": data.get("msg") if isinstance(data, dict) else "request failed",
- }
- return (
- data
- if isinstance(data, dict)
- else {"data": None, "code": -1, "msg": "invalid response"}
- )
- async def get_account_base(self, cookies: str) -> dict:
- """
- 调用创作者中心「账号概览- account/base」接口,用于每日用户数据同步。
- 使用账号已存 Cookie,不启浏览器,直接带 Referer 请求 API。
- """
- import aiohttp
- cookie_list = self.parse_cookies(cookies)
- cookie_dict = {
- c.get("name") or "": c.get("value") or ""
- for c in cookie_list
- if c.get("name")
- }
- api_headers = {
- "Accept": "application/json, text/plain, */*",
- "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
- "Referer": "https://creator.xiaohongshu.com/statistics/account/v2",
- "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8",
- "Accept-Encoding": "gzip, deflate, br",
- "Connection": "keep-alive",
- }
- api_url = "https://creator.xiaohongshu.com/api/galaxy/v2/creator/datacenter/account/base"
- async with aiohttp.ClientSession(cookies=cookie_dict) as session:
- async with session.get(
- api_url,
- headers=api_headers,
- timeout=aiohttp.ClientTimeout(total=30),
- ) as resp:
- status = resp.status
- try:
- data = await resp.json()
- except Exception:
- text = await resp.text()
- print(f"[{self.platform_name}] account/base non-JSON: {text[:500]}")
- return {"data": None, "code": status, "msg": "invalid response"}
- if status != 200:
- return {
- "data": None,
- "code": status,
- "msg": data.get("msg") if isinstance(data, dict) else "request failed",
- }
- return (
- data
- if isinstance(data, dict)
- else {"data": None, "code": -1, "msg": "invalid response"}
- )
- async def get_fans_overall_new(self, cookies: str) -> dict:
- """
- 调用创作者中心「粉丝数据- overall_new」接口,用于每日用户数据中的粉丝趋势。
- 使用账号已存 Cookie,不启浏览器,直接带 Referer 请求 API。
- """
- import aiohttp
- cookie_list = self.parse_cookies(cookies)
- cookie_dict = {
- c.get("name") or "": c.get("value") or ""
- for c in cookie_list
- if c.get("name")
- }
- api_headers = {
- "Accept": "application/json, text/plain, */*",
- "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
- "Referer": "https://creator.xiaohongshu.com/statistics/fans-data",
- "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8",
- "Accept-Encoding": "gzip, deflate, br",
- "Connection": "keep-alive",
- }
- api_url = (
- "https://creator.xiaohongshu.com/api/galaxy/creator/data/fans/overall_new"
- )
- async with aiohttp.ClientSession(cookies=cookie_dict) as session:
- async with session.get(
- api_url,
- headers=api_headers,
- timeout=aiohttp.ClientTimeout(total=30),
- ) as resp:
- status = resp.status
- try:
- data = await resp.json()
- except Exception:
- text = await resp.text()
- print(
- f"[{self.platform_name}] fans/overall_new non-JSON: {text[:500]}"
- )
- return {"data": None, "code": status, "msg": "invalid response"}
- if status != 200:
- return {
- "data": None,
- "code": status,
- "msg": data.get("msg") if isinstance(data, dict) else "request failed",
- }
- return (
- data
- if isinstance(data, dict)
- else {"data": None, "code": -1, "msg": "invalid response"}
- )
- async def get_account_overview(self, cookies: str) -> dict:
- """
- 一次请求同时拉取 account/base 与 fans/overall_new,用于每日用户数据同步。
- 使用已存 Cookie,不先访问页面,直接带 Referer 并行请求两个 API。
- 返回: { "account_base": {...}, "fans_overall_new": {...} }
- """
- import aiohttp
- cookie_list = self.parse_cookies(cookies)
- cookie_dict = {
- c.get("name") or "": c.get("value") or ""
- for c in cookie_list
- if c.get("name")
- }
- account_api_headers = {
- "Accept": "application/json, text/plain, */*",
- "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
- "Referer": "https://creator.xiaohongshu.com/statistics/account/v2",
- "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8",
- "Accept-Encoding": "gzip, deflate, br",
- "Connection": "keep-alive",
- }
- fans_api_headers = {
- "Accept": "application/json, text/plain, */*",
- "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
- "Referer": "https://creator.xiaohongshu.com/statistics/fans-data",
- "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8",
- "Accept-Encoding": "gzip, deflate, br",
- "Connection": "keep-alive",
- }
- api_account = "https://creator.xiaohongshu.com/api/galaxy/v2/creator/datacenter/account/base"
- api_fans = (
- "https://creator.xiaohongshu.com/api/galaxy/creator/data/fans/overall_new"
- )
- async def fetch_account_base(session):
- async with session.get(
- api_account,
- headers=account_api_headers,
- timeout=aiohttp.ClientTimeout(total=30),
- ) as resp:
- status = resp.status
- try:
- data = await resp.json()
- except Exception:
- text = await resp.text()
- print(f"[{self.platform_name}] account/base non-JSON: {text[:500]}")
- return {"data": None, "code": status, "msg": "invalid response"}
- if status != 200:
- return {
- "data": None,
- "code": status,
- "msg": data.get("msg")
- if isinstance(data, dict)
- else "request failed",
- }
- return (
- data
- if isinstance(data, dict)
- else {"data": None, "code": -1, "msg": "invalid response"}
- )
- async def fetch_fans_overall_new(session):
- async with session.get(
- api_fans,
- headers=fans_api_headers,
- timeout=aiohttp.ClientTimeout(total=30),
- ) as resp:
- status = resp.status
- try:
- data = await resp.json()
- except Exception:
- text = await resp.text()
- print(
- f"[{self.platform_name}] fans/overall_new non-JSON: {text[:500]}"
- )
- return {"data": None, "code": status, "msg": "invalid response"}
- if status != 200:
- return {
- "data": None,
- "code": status,
- "msg": data.get("msg")
- if isinstance(data, dict)
- else "request failed",
- }
- return (
- data
- if isinstance(data, dict)
- else {"data": None, "code": -1, "msg": "invalid response"}
- )
- async with aiohttp.ClientSession(cookies=cookie_dict) as session:
- account_base_result, fans_overall_new_result = await asyncio.gather(
- fetch_account_base(session),
- fetch_fans_overall_new(session),
- )
- return {
- "account_base": account_base_result,
- "fans_overall_new": fans_overall_new_result,
- }
|