| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495 |
- # -*- coding: utf-8 -*-
- """
- 平台发布基类
- 提供通用的发布接口和工具方法
- """
- import asyncio
- import json
- import os
- import uuid
- import random
- from abc import ABC, abstractmethod
- from dataclasses import dataclass, field
- from datetime import datetime
- from typing import List, Optional, Callable, Dict, Any
- from playwright.async_api import async_playwright, Browser, BrowserContext, Page
- # 导入反检测工具
- try:
- from utils.stealth import (
- get_browser_context_args,
- get_all_stealth_scripts,
- get_user_agent,
- get_viewport,
- )
- STEALTH_AVAILABLE = True
- except ImportError:
- STEALTH_AVAILABLE = False
- print("[Warning] Stealth utils not available, anti-detection disabled")
- @dataclass
- class PublishParams:
- """发布参数"""
- title: str
- video_path: str
- description: str = ""
- cover_path: Optional[str] = None
- tags: List[str] = field(default_factory=list)
- publish_date: Optional[datetime] = None
- location: str = "重庆市"
- def __post_init__(self):
- if not self.description:
- self.description = self.title
- @dataclass
- class PublishResult:
- """发布结果"""
- success: bool
- platform: str
- video_id: str = ""
- video_url: str = ""
- message: str = ""
- error: str = ""
- need_captcha: bool = False
- captcha_type: str = ""
- screenshot_base64: str = ""
- page_url: str = ""
- status: str = ""
- screenshot_path: str = ""
- @dataclass
- class WorkItem:
- """作品数据"""
- work_id: str
- title: str
- cover_url: str = ""
- video_url: str = ""
- duration: int = 0 # 秒
- status: str = "published" # published, reviewing, rejected, draft
- publish_time: str = ""
- play_count: int = 0
- like_count: int = 0
- comment_count: int = 0
- share_count: int = 0
- collect_count: int = 0
- def to_dict(self) -> Dict[str, Any]:
- return {
- "work_id": self.work_id,
- "title": self.title,
- "cover_url": self.cover_url,
- "video_url": self.video_url,
- "duration": self.duration,
- "status": self.status,
- "publish_time": self.publish_time,
- "play_count": self.play_count,
- "like_count": self.like_count,
- "comment_count": self.comment_count,
- "share_count": self.share_count,
- "collect_count": self.collect_count,
- }
- @dataclass
- class CommentItem:
- """评论数据"""
- comment_id: str
- parent_comment_id: str
- work_id: str
- content: str
- author_id: str = ""
- author_name: str = ""
- author_avatar: str = ""
- like_count: int = 0
- reply_count: int = 0
- create_time: str = ""
- is_author: bool = False # 是否是作者的评论
- replies: List["CommentItem"] = field(default_factory=list)
- def to_dict(self) -> Dict[str, Any]:
- return {
- "comment_id": self.comment_id,
- "parent_comment_id": self.parent_comment_id,
- "work_id": self.work_id,
- "content": self.content,
- "author_id": self.author_id,
- "author_name": self.author_name,
- "author_avatar": self.author_avatar,
- "like_count": self.like_count,
- "reply_count": self.reply_count,
- "create_time": self.create_time,
- "is_author": self.is_author,
- "replies": [r.to_dict() for r in self.replies],
- }
- @dataclass
- class WorksResult:
- """作品列表结果"""
- success: bool
- platform: str
- works: List[WorkItem] = field(default_factory=list)
- total: int = 0
- has_more: bool = False
- next_page: Any = ""
- error: str = ""
- debug_info: str = "" # 调试信息
- def to_dict(self) -> Dict[str, Any]:
- return {
- "success": self.success,
- "platform": self.platform,
- "works": [w.to_dict() for w in self.works],
- "total": self.total,
- "has_more": self.has_more,
- "next_page": self.next_page,
- "error": self.error,
- "debug_info": self.debug_info,
- }
- @dataclass
- class CommentsResult:
- """评论列表结果"""
- success: bool
- platform: str
- work_id: str
- comments: List[CommentItem] = field(default_factory=list)
- total: int = 0
- has_more: bool = False
- error: str = ""
- def to_dict(self) -> Dict[str, Any]:
- return {
- "success": self.success,
- "platform": self.platform,
- "work_id": self.work_id,
- "comments": [c.to_dict() for c in self.comments],
- "total": self.total,
- "has_more": self.has_more,
- "error": self.error,
- }
- class BasePublisher(ABC):
- """
- 平台发布基类
- 所有平台发布器都需要继承此类
- """
- platform_name: str = "base"
- login_url: str = ""
- publish_url: str = ""
- cookie_domain: str = ""
- def __init__(self, headless: bool = True):
- self.headless = headless
- self.browser: Optional[Browser] = None
- self.context: Optional[BrowserContext] = None
- self.page: Optional[Page] = None
- self.on_progress: Optional[Callable[[int, str], None]] = None
- self.user_id: Optional[int] = None
- self.publish_task_id: Optional[int] = None
- self.publish_account_id: Optional[int] = None
- self.proxy_config: Optional[Dict[str, Any]] = None
- def set_progress_callback(self, callback: Callable[[int, str], None]):
- """设置进度回调"""
- self.on_progress = callback
- def report_progress(self, progress: int, message: str):
- """报告进度"""
- print(f"[{self.platform_name}] [{progress}%] {message}")
- if self.on_progress:
- self.on_progress(progress, message)
- @staticmethod
- def parse_cookies(cookies_str: str) -> list:
- """解析 cookie 字符串为列表"""
- try:
- cookies = json.loads(cookies_str)
- if isinstance(cookies, list):
- return cookies
- except json.JSONDecodeError:
- pass
- # 字符串格式: name=value; name2=value2
- cookies = []
- for item in cookies_str.split(";"):
- item = item.strip()
- if "=" in item:
- name, value = item.split("=", 1)
- cookies.append(
- {
- "name": name.strip(),
- "value": value.strip(),
- "domain": "",
- "path": "/",
- }
- )
- return cookies
- @staticmethod
- def _normalize_same_site(value: Any) -> Optional[str]:
- """将不同来源的 sameSite 值转换为 Playwright 接受的值。"""
- if value is None:
- return None
- v = str(value).strip().lower()
- if not v:
- return None
- mapping = {
- "strict": "Strict",
- "lax": "Lax",
- "none": "None",
- "no_restriction": "None", # Electron
- "unspecified": None, # Electron: 不传该字段
- }
- return mapping.get(v, None)
- @classmethod
- def _sanitize_cookie_for_playwright(
- cls, cookie: Dict[str, Any], default_domain: str
- ) -> Optional[Dict[str, Any]]:
- """清洗 cookie 字段,避免 BrowserContext.add_cookies 参数校验失败。"""
- if not isinstance(cookie, dict):
- return None
- name = str(cookie.get("name") or "").strip()
- if not name:
- return None
- cleaned: Dict[str, Any] = {
- "name": name,
- "value": str(cookie.get("value") or ""),
- }
- url = str(cookie.get("url") or "").strip()
- if url:
- cleaned["url"] = url
- else:
- domain = str(cookie.get("domain") or "").strip() or default_domain
- if not domain:
- return None
- cleaned["domain"] = domain
- cleaned["path"] = str(cookie.get("path") or "/")
- if "httpOnly" in cookie:
- cleaned["httpOnly"] = bool(cookie.get("httpOnly"))
- if "secure" in cookie:
- cleaned["secure"] = bool(cookie.get("secure"))
- expires_raw = cookie.get("expires", cookie.get("expirationDate"))
- if expires_raw not in (None, "", 0):
- try:
- expires_val = float(expires_raw)
- if expires_val > 0:
- cleaned["expires"] = expires_val
- except Exception:
- pass
- same_site = cls._normalize_same_site(
- cookie.get("sameSite", cookie.get("same_site"))
- )
- if same_site:
- cleaned["sameSite"] = same_site
- return cleaned
- @staticmethod
- def cookies_to_string(cookies: list) -> str:
- """将 cookie 列表转换为字符串"""
- return "; ".join([f"{c['name']}={c['value']}" for c in cookies])
- async def init_browser(
- self, storage_state: str = None, proxy_config: Dict[str, Any] = None
- ):
- """初始化浏览器(带反检测增强)"""
- print(
- f"[{self.platform_name}] init_browser: headless={self.headless}", flush=True
- )
- playwright = await async_playwright().start()
- proxy = proxy_config or self.proxy_config
- has_proxy = proxy and isinstance(proxy, dict) and proxy.get("server")
- if has_proxy:
- print(f"[{self.platform_name}] 使用代理: {proxy.get('server')}", flush=True)
- # 浏览器启动参数
- launch_args = {
- "headless": self.headless,
- "args": [
- "--disable-blink-features=AutomationControlled",
- "--disable-features=IsolateOrigins,site-per-process",
- "--disable-site-isolation-trials",
- "--no-sandbox",
- "--disable-setuid-sandbox",
- "--disable-dev-shm-usage",
- "--disable-web-security",
- "--disable-features=VizDisplayCompositor",
- "--disable-infobars",
- "--disable-extensions",
- "--disable-gpu",
- "--window-size=1920,1080",
- ],
- }
- if has_proxy:
- launch_args["proxy"] = proxy
- self.browser = await playwright.chromium.launch(**launch_args)
- # 生成浏览器上下文参数(带反检测配置)
- if STEALTH_AVAILABLE:
- context_args = get_browser_context_args(
- proxy_config=proxy if has_proxy else None
- )
- # 确保不使用代理参数(代理在 launch 时已设置)
- if "proxy" in context_args:
- del context_args["proxy"]
- else:
- context_args = {
- "user_agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
- "viewport": {"width": 1920, "height": 1080},
- "locale": "zh-CN",
- "timezone_id": "Asia/Shanghai",
- }
- if storage_state and os.path.exists(storage_state):
- context_args["storage_state"] = storage_state
- self.context = await self.browser.new_context(**context_args)
- # 注入反检测脚本
- if STEALTH_AVAILABLE:
- stealth_script = get_all_stealth_scripts()
- await self.context.add_init_script(stealth_script)
- print(f"[{self.platform_name}] 已注入反检测脚本", flush=True)
- self.page = await self.context.new_page()
- # 设置额外的页面属性
- await self.page.set_extra_http_headers(
- {
- "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8",
- "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8",
- "Accept-Encoding": "gzip, deflate, br",
- "Connection": "keep-alive",
- "Upgrade-Insecure-Requests": "1",
- "Sec-Fetch-Dest": "document",
- "Sec-Fetch-Mode": "navigate",
- "Sec-Fetch-Site": "none",
- "Sec-Fetch-User": "?1",
- "Cache-Control": "max-age=0",
- }
- )
- print(
- f"[{self.platform_name}] 浏览器初始化完成 (stealth={'enabled' if STEALTH_AVAILABLE else 'disabled'})",
- flush=True,
- )
- return self.page
- async def set_cookies(self, cookies: list):
- """设置 cookies"""
- if not self.context:
- raise Exception("Browser context not initialized")
- sanitized: List[Dict[str, Any]] = []
- for cookie in cookies:
- cleaned = self._sanitize_cookie_for_playwright(cookie, self.cookie_domain)
- if cleaned:
- sanitized.append(cleaned)
- if not sanitized:
- raise Exception("没有可用的 Cookie(清洗后为空)")
- await self.context.add_cookies(sanitized)
- async def close_browser(self):
- """关闭浏览器"""
- if self.context:
- await self.context.close()
- if self.browser:
- await self.browser.close()
- async def human_like_delay(self, min_ms: int = 100, max_ms: int = 500):
- """模拟人类操作延迟"""
- delay = random.randint(min_ms, max_ms) / 1000.0
- await asyncio.sleep(delay)
- async def human_like_scroll(self, distance: int = None):
- """模拟人类滚动"""
- if distance is None:
- distance = random.randint(200, 500)
- # 分多次滚动,模拟真实行为
- steps = random.randint(3, 6)
- step_distance = distance // steps
- for _ in range(steps):
- if self.page:
- await self.page.evaluate(f"window.scrollBy(0, {step_distance})")
- await self.human_like_delay(100, 300)
- async def human_like_type(self, selector: str, text: str, clear_first: bool = True):
- """模拟人类输入"""
- if not self.page:
- return
- element = self.page.locator(selector)
- if await element.count() == 0:
- return
- await element.click()
- if clear_first:
- await self.page.keyboard.press("Control+KeyA")
- await self.page.keyboard.press("Backspace")
- await self.human_like_delay(50, 150)
- for char in text:
- delay = random.randint(30, 100) / 1000.0
- await self.page.keyboard.type(char)
- await asyncio.sleep(delay)
- async def random_mouse_move(self):
- """随机移动鼠标(增加真实感)"""
- if not self.page:
- return
- try:
- # 随机移动到页面某处
- x = random.randint(100, 1800)
- y = random.randint(100, 900)
- await self.page.mouse.move(x, y)
- await self.human_like_delay(50, 200)
- except Exception:
- pass
- async def save_cookies(self, file_path: str):
- """保存 cookies 到文件"""
- if self.context:
- await self.context.storage_state(path=file_path)
- async def capture_screenshot(self) -> str:
- """截取当前页面截图,返回 Base64 编码"""
- import base64
- if not self.page:
- return ""
- try:
- screenshot_bytes = await self.page.screenshot(type="jpeg", quality=80)
- return base64.b64encode(screenshot_bytes).decode("utf-8")
- except Exception as e:
- print(f"[{self.platform_name}] 截图失败: {e}")
- return ""
- async def save_screenshot_to_file(
- self, directory: str = None, filename_prefix: str = "publish_failed"
- ) -> str:
- """
- 保存截图到指定目录,返回文件路径
- Args:
- directory: 截图保存目录,默认为 server/python/screenshots
- filename_prefix: 文件名前缀
- Returns:
- str: 保存的文件路径,失败返回空字符串
- """
- if not self.page:
- return ""
- try:
- if directory is None:
- current_dir = os.path.dirname(
- os.path.dirname(os.path.abspath(__file__))
- )
- directory = os.path.join(current_dir, "screenshots")
- directory = os.path.abspath(directory)
- os.makedirs(directory, exist_ok=True)
- timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
- task_info = f"_task{self.publish_task_id}" if self.publish_task_id else ""
- account_info = (
- f"_acc{self.publish_account_id}" if self.publish_account_id else ""
- )
- filename = f"{filename_prefix}_{self.platform_name}{task_info}{account_info}_{timestamp}.png"
- filepath = os.path.join(directory, filename)
- await self.page.screenshot(path=filepath, type="png")
- print(f"[{self.platform_name}] 截图已保存: {filepath}")
- return filepath
- except Exception as e:
- print(f"[{self.platform_name}] 保存截图失败: {e}")
- return ""
- async def request_sms_code_from_frontend(
- self, phone: str = "", timeout_seconds: int = 120, message: str = ""
- ) -> str:
- node_api_url = os.environ.get("NODEJS_API_URL", "http://localhost:3000").rstrip(
- "/"
- )
- internal_api_key = os.environ.get(
- "INTERNAL_API_KEY", "internal-api-key-default"
- )
- if not self.user_id:
- raise Exception("缺少 user_id,无法请求前端输入验证码")
- captcha_task_id = f"py_{self.platform_name}_{uuid.uuid4().hex}"
- payload = {
- "user_id": self.user_id,
- "captcha_task_id": captcha_task_id,
- "type": "sms",
- "phone": phone or "",
- "message": message or "请输入短信验证码",
- "timeout_seconds": timeout_seconds,
- "publish_task_id": self.publish_task_id,
- "publish_account_id": self.publish_account_id,
- }
- import requests
- try:
- resp = requests.post(
- f"{node_api_url}/api/internal/captcha/request",
- headers={
- "Content-Type": "application/json",
- "X-Internal-API-Key": internal_api_key,
- },
- json=payload,
- timeout=timeout_seconds + 30,
- )
- except Exception as e:
- raise Exception(f"请求前端验证码失败: {e}")
- try:
- data = resp.json()
- except Exception:
- raise Exception(f"请求前端验证码失败: HTTP {resp.status_code}")
- if resp.status_code >= 400 or not data.get("success"):
- raise Exception(
- data.get("error")
- or data.get("message")
- or f"请求前端验证码失败: HTTP {resp.status_code}"
- )
- code = data.get("code") or ""
- if not code:
- raise Exception("未收到验证码")
- return str(code)
- async def ai_analyze_sms_send_state(self, screenshot_base64: str = None) -> dict:
- import os
- import requests
- import json
- import re
- try:
- if not screenshot_base64:
- screenshot_base64 = await self.capture_screenshot()
- if not screenshot_base64:
- return {
- "has_sms_modal": False,
- "send_button_state": "unknown",
- "sent_likely": False,
- "block_reason": "unknown",
- "suggested_action": "manual_send",
- "confidence": 0,
- "notes": "无法获取截图",
- }
- ai_api_key = os.environ.get("DASHSCOPE_API_KEY", "")
- ai_base_url = os.environ.get(
- "DASHSCOPE_BASE_URL",
- "https://dashscope.aliyuncs.com/compatible-mode/v1",
- )
- ai_vision_model = os.environ.get("AI_VISION_MODEL", "qwen-vl-plus")
- if not ai_api_key:
- return {
- "has_sms_modal": True,
- "send_button_state": "unknown",
- "sent_likely": False,
- "block_reason": "no_ai_key",
- "suggested_action": "manual_send",
- "confidence": 0,
- "notes": "未配置 AI API Key",
- }
- prompt = """请分析这张网页截图,判断是否处于“短信验证码”验证弹窗/页面,并判断“发送验证码/获取验证码”是否已经触发成功。
- 你需要重点识别:
- 1) 是否存在短信验证码弹窗(包含“请输入验证码/短信验证码/手机号验证/获取验证码/发送验证码”等)
- 2) 发送按钮状态:enabled / disabled / countdown(出现xx秒) / hidden / unknown
- 3) 是否已发送成功:例如出现倒计时、按钮禁用、出现“已发送/重新发送/xx秒后重试”等
- 4) 是否被阻塞:例如出现滑块/人机验证、频繁发送、风控提示、网络异常等
- 请以 JSON 返回:
- ```json
- {
- "has_sms_modal": true,
- "send_button_state": "enabled|disabled|countdown|hidden|unknown",
- "sent_likely": true,
- "block_reason": "none|need_click_send|slider|risk|rate_limit|network|unknown",
- "suggested_action": "wait|click_send|solve_slider|manual_send",
- "confidence": 0-100,
- "notes": "一句话说明你看到的证据"
- }
- ```"""
- headers = {
- "Authorization": f"Bearer {ai_api_key}",
- "Content-Type": "application/json",
- }
- payload = {
- "model": ai_vision_model,
- "messages": [
- {
- "role": "user",
- "content": [
- {
- "type": "image_url",
- "image_url": {
- "url": f"data:image/jpeg;base64,{screenshot_base64}"
- },
- },
- {"type": "text", "text": prompt},
- ],
- }
- ],
- "max_tokens": 500,
- }
- response = requests.post(
- f"{ai_base_url}/chat/completions",
- headers=headers,
- json=payload,
- timeout=30,
- )
- if response.status_code != 200:
- return {
- "has_sms_modal": True,
- "send_button_state": "unknown",
- "sent_likely": False,
- "block_reason": "network",
- "suggested_action": "manual_send",
- "confidence": 0,
- "notes": f"AI API 返回错误 {response.status_code}",
- }
- result = response.json()
- ai_response = (
- result.get("choices", [{}])[0].get("message", {}).get("content", "")
- )
- json_match = re.search(r"```json\\s*([\\s\\S]*?)\\s*```", ai_response)
- if json_match:
- json_str = json_match.group(1)
- else:
- json_match = re.search(r"\\{[\\s\\S]*\\}", ai_response)
- json_str = json_match.group(0) if json_match else "{}"
- try:
- data = json.loads(json_str)
- except Exception:
- data = {}
- return {
- "has_sms_modal": bool(data.get("has_sms_modal", True)),
- "send_button_state": data.get("send_button_state", "unknown"),
- "sent_likely": bool(data.get("sent_likely", False)),
- "block_reason": data.get("block_reason", "unknown"),
- "suggested_action": data.get("suggested_action", "manual_send"),
- "confidence": int(data.get("confidence", 0) or 0),
- "notes": data.get("notes", ""),
- }
- except Exception as e:
- return {
- "has_sms_modal": True,
- "send_button_state": "unknown",
- "sent_likely": False,
- "block_reason": "unknown",
- "suggested_action": "manual_send",
- "confidence": 0,
- "notes": f"AI 分析异常: {e}",
- }
- async def sync_cookies_to_node(self, cookies: list) -> bool:
- import os
- import json
- import requests
- if not self.user_id or not self.publish_account_id:
- return False
- node_api_url = os.environ.get("NODEJS_API_URL", "http://localhost:3000").rstrip(
- "/"
- )
- internal_api_key = os.environ.get(
- "INTERNAL_API_KEY", "internal-api-key-default"
- )
- try:
- payload = {
- "user_id": int(self.user_id),
- "account_id": int(self.publish_account_id),
- "cookies": json.dumps(cookies, ensure_ascii=False),
- }
- resp = requests.post(
- f"{node_api_url}/api/internal/accounts/update-cookies",
- headers={
- "Content-Type": "application/json",
- "X-Internal-API-Key": internal_api_key,
- },
- json=payload,
- timeout=30,
- )
- if resp.status_code >= 400:
- return False
- data = resp.json() if resp.content else {}
- return bool(data.get("success", True))
- except Exception:
- return False
- async def ai_suggest_playwright_selector(
- self, goal: str, screenshot_base64: str = None
- ) -> dict:
- import os
- import requests
- import json
- import re
- try:
- if not screenshot_base64:
- screenshot_base64 = await self.capture_screenshot()
- if not screenshot_base64:
- return {
- "has_selector": False,
- "selector": "",
- "confidence": 0,
- "notes": "无法获取截图",
- }
- ai_api_key = os.environ.get("DASHSCOPE_API_KEY", "")
- ai_base_url = os.environ.get(
- "DASHSCOPE_BASE_URL",
- "https://dashscope.aliyuncs.com/compatible-mode/v1",
- )
- ai_vision_model = os.environ.get("AI_VISION_MODEL", "qwen-vl-plus")
- if not ai_api_key:
- return {
- "has_selector": False,
- "selector": "",
- "confidence": 0,
- "notes": "未配置 AI API Key",
- }
- prompt = f"""请分析这张网页截图,给出一个 Playwright Python 可用的 selector(用于 page.locator(selector))来完成目标操作。
- 目标:{goal}
- 要求:
- 1) selector 尽量稳定(优先 role/text/aria,其次 class,避免过度依赖随机 class)
- 2) selector 必须是 Playwright 支持的选择器语法(如:text="发布"、button:has-text("发布")、[role="button"]:has-text("发布") 等)
- 3) 只返回一个最优 selector
- 以 JSON 返回:
- ```json
- {{
- "has_selector": true,
- "selector": "button:has-text(\\"发布\\")",
- "confidence": 0-100,
- "notes": "你依据的页面证据"
- }}
- ```"""
- headers = {
- "Authorization": f"Bearer {ai_api_key}",
- "Content-Type": "application/json",
- }
- payload = {
- "model": ai_vision_model,
- "messages": [
- {
- "role": "user",
- "content": [
- {
- "type": "image_url",
- "image_url": {
- "url": f"data:image/jpeg;base64,{screenshot_base64}"
- },
- },
- {"type": "text", "text": prompt},
- ],
- }
- ],
- "max_tokens": 300,
- }
- response = requests.post(
- f"{ai_base_url}/chat/completions",
- headers=headers,
- json=payload,
- timeout=30,
- )
- if response.status_code != 200:
- return {
- "has_selector": False,
- "selector": "",
- "confidence": 0,
- "notes": f"AI API 错误 {response.status_code}",
- }
- result = response.json()
- ai_response = (
- result.get("choices", [{}])[0].get("message", {}).get("content", "")
- )
- json_match = re.search(r"```json\\s*([\\s\\S]*?)\\s*```", ai_response)
- if json_match:
- json_str = json_match.group(1)
- else:
- json_match = re.search(r"\\{[\\s\\S]*\\}", ai_response)
- json_str = json_match.group(0) if json_match else "{}"
- try:
- data = json.loads(json_str)
- except Exception:
- data = {}
- selector = str(data.get("selector", "") or "").strip()
- has_selector = bool(data.get("has_selector", False)) and bool(selector)
- confidence = int(data.get("confidence", 0) or 0)
- notes = str(data.get("notes", "") or "")
- if not has_selector:
- return {
- "has_selector": False,
- "selector": "",
- "confidence": confidence,
- "notes": notes or "未给出 selector",
- }
- return {
- "has_selector": True,
- "selector": selector,
- "confidence": confidence,
- "notes": notes,
- }
- except Exception as e:
- return {
- "has_selector": False,
- "selector": "",
- "confidence": 0,
- "notes": f"AI selector 异常: {e}",
- }
- async def ai_check_captcha(self, screenshot_base64: str = None) -> dict:
- """
- 使用 AI 分析截图检测验证码
- Args:
- screenshot_base64: 截图的 Base64 编码,如果为空则自动获取当前页面截图
- Returns:
- dict: {
- "has_captcha": bool, # 是否有验证码
- "captcha_type": str, # 验证码类型: slider, image, phone, rotate, puzzle
- "captcha_description": str, # 验证码描述
- "confidence": float, # 置信度 0-100
- "need_headful": bool # 是否需要切换到有头浏览器
- }
- """
- import os
- import requests
- try:
- # 获取截图
- if not screenshot_base64:
- screenshot_base64 = await self.capture_screenshot()
- if not screenshot_base64:
- print(f"[{self.platform_name}] AI验证码检测: 无法获取截图")
- return {
- "has_captcha": False,
- "captcha_type": "",
- "captcha_description": "",
- "confidence": 0,
- "need_headful": False,
- }
- # 获取 AI 配置
- ai_api_key = os.environ.get("DASHSCOPE_API_KEY", "")
- ai_base_url = os.environ.get(
- "DASHSCOPE_BASE_URL",
- "https://dashscope.aliyuncs.com/compatible-mode/v1",
- )
- ai_vision_model = os.environ.get("AI_VISION_MODEL", "qwen-vl-plus")
- if not ai_api_key:
- print(
- f"[{self.platform_name}] AI验证码检测: 未配置 AI API Key,使用传统方式检测"
- )
- return await self._traditional_captcha_check()
- # 构建 AI 请求
- prompt = """请分析这张网页截图,判断页面上是否存在验证码。
- 请检查以下类型的验证码:
- 1. 滑块验证码(需要滑动滑块到指定位置)
- 2. 图片验证码(需要选择正确的图片、点击图片上的文字等)
- 3. 旋转验证码(需要旋转图片到正确角度)
- 4. 拼图验证码(需要拖动拼图块到正确位置)
- 5. 手机验证码(需要输入手机收到的验证码)
- 6. 计算验证码(需要输入计算结果)
- 请以 JSON 格式返回结果:
- ```json
- {
- "has_captcha": true/false,
- "captcha_type": "slider/image/phone/rotate/puzzle/calculate/none",
- "captcha_description": "验证码的具体描述",
- "confidence": 0-100
- }
- ```
- 注意:
- - 如果页面有明显的验证码弹窗或验证区域,has_captcha 为 true
- - 如果只是普通的登录页面或表单,没有特殊的验证步骤,has_captcha 为 false
- - confidence 表示你对判断结果的信心,100 表示非常确定"""
- headers = {
- "Authorization": f"Bearer {ai_api_key}",
- "Content-Type": "application/json",
- }
- payload = {
- "model": ai_vision_model,
- "messages": [
- {
- "role": "user",
- "content": [
- {
- "type": "image_url",
- "image_url": {
- "url": f"data:image/jpeg;base64,{screenshot_base64}"
- },
- },
- {"type": "text", "text": prompt},
- ],
- }
- ],
- "max_tokens": 500,
- }
- print(f"[{self.platform_name}] AI验证码检测: 正在分析截图...")
- response = requests.post(
- f"{ai_base_url}/chat/completions",
- headers=headers,
- json=payload,
- timeout=30,
- )
- if response.status_code != 200:
- print(
- f"[{self.platform_name}] AI验证码检测: API 返回错误 {response.status_code}"
- )
- return await self._traditional_captcha_check()
- result = response.json()
- ai_response = (
- result.get("choices", [{}])[0].get("message", {}).get("content", "")
- )
- print(f"[{self.platform_name}] AI验证码检测响应: {ai_response[:200]}...")
- # 解析 AI 响应
- import re
- json_match = re.search(r"```json\s*([\s\S]*?)\s*```", ai_response)
- if json_match:
- json_str = json_match.group(1)
- else:
- # 尝试直接解析
- json_match = re.search(r"\{[\s\S]*\}", ai_response)
- if json_match:
- json_str = json_match.group(0)
- else:
- json_str = "{}"
- try:
- ai_result = json.loads(json_str)
- except:
- ai_result = {}
- has_captcha = ai_result.get("has_captcha", False)
- captcha_type = ai_result.get("captcha_type", "")
- captcha_description = ai_result.get("captcha_description", "")
- confidence = ai_result.get("confidence", 0)
- # 如果检测到验证码,需要切换到有头浏览器
- need_headful = has_captcha and captcha_type not in ["none", ""]
- print(
- f"[{self.platform_name}] AI验证码检测结果: has_captcha={has_captcha}, type={captcha_type}, confidence={confidence}"
- )
- return {
- "has_captcha": has_captcha,
- "captcha_type": captcha_type if captcha_type != "none" else "",
- "captcha_description": captcha_description,
- "confidence": confidence,
- "need_headful": need_headful,
- }
- except Exception as e:
- print(f"[{self.platform_name}] AI验证码检测异常: {e}")
- import traceback
- traceback.print_exc()
- return await self._traditional_captcha_check()
- async def _traditional_captcha_check(self) -> dict:
- """传统方式检测验证码(基于 DOM 元素)"""
- if not self.page:
- return {
- "has_captcha": False,
- "captcha_type": "",
- "captcha_description": "",
- "confidence": 0,
- "need_headful": False,
- }
- try:
- # 检查常见的验证码选择器
- captcha_selectors = [
- # 滑块验证码
- ('[class*="slider"]', "slider", "滑块验证码"),
- ('[class*="slide-verify"]', "slider", "滑块验证码"),
- ('text="滑动"', "slider", "滑块验证码"),
- ('text="拖动"', "slider", "滑块验证码"),
- # 图片验证码
- ('[class*="captcha"]', "image", "图片验证码"),
- ('[class*="verify-img"]', "image", "图片验证码"),
- ('text="点击"', "image", "图片验证码"),
- ('text="选择"', "image", "图片验证码"),
- # 手机验证码
- ('text="验证码"', "phone", "手机验证码"),
- ('text="获取验证码"', "phone", "手机验证码"),
- ('[class*="sms-code"]', "phone", "手机验证码"),
- # 旋转验证码
- ('text="旋转"', "rotate", "旋转验证码"),
- ('[class*="rotate"]', "rotate", "旋转验证码"),
- ]
- for selector, captcha_type, description in captcha_selectors:
- try:
- count = await self.page.locator(selector).count()
- if count > 0:
- # 检查是否可见
- element = self.page.locator(selector).first
- if await element.is_visible():
- print(
- f"[{self.platform_name}] 传统检测: 发现验证码 - {selector}"
- )
- return {
- "has_captcha": True,
- "captcha_type": captcha_type,
- "captcha_description": description,
- "confidence": 80,
- "need_headful": True,
- }
- except:
- pass
- return {
- "has_captcha": False,
- "captcha_type": "",
- "captcha_description": "",
- "confidence": 80,
- "need_headful": False,
- }
- except Exception as e:
- print(f"[{self.platform_name}] 传统验证码检测异常: {e}")
- return {
- "has_captcha": False,
- "captcha_type": "",
- "captcha_description": "",
- "confidence": 0,
- "need_headful": False,
- }
- async def get_page_url(self) -> str:
- """获取当前页面 URL"""
- if not self.page:
- return ""
- try:
- return self.page.url
- except:
- return ""
- async def check_publish_status(self) -> dict:
- """
- 检查发布状态
- 返回: {status, screenshot_base64, page_url, message}
- """
- if not self.page:
- return {"status": "error", "message": "页面未初始化"}
- try:
- screenshot = await self.capture_screenshot()
- page_url = await self.get_page_url()
- # 检查常见的成功/失败标志
- page_content = await self.page.content()
- # 检查成功标志
- success_keywords = ["发布成功", "上传成功", "发表成功", "提交成功"]
- for keyword in success_keywords:
- if keyword in page_content:
- return {
- "status": "success",
- "screenshot_base64": screenshot,
- "page_url": page_url,
- "message": "发布成功",
- }
- # 检查验证码标志
- captcha_keywords = [
- "验证码",
- "身份验证",
- "请完成验证",
- "滑动验证",
- "图形验证",
- ]
- for keyword in captcha_keywords:
- if keyword in page_content:
- return {
- "status": "need_captcha",
- "screenshot_base64": screenshot,
- "page_url": page_url,
- "message": f"检测到{keyword}",
- }
- # 检查失败标志
- fail_keywords = ["发布失败", "上传失败", "提交失败", "操作失败"]
- for keyword in fail_keywords:
- if keyword in page_content:
- return {
- "status": "failed",
- "screenshot_base64": screenshot,
- "page_url": page_url,
- "message": keyword,
- }
- # 默认返回处理中
- return {
- "status": "processing",
- "screenshot_base64": screenshot,
- "page_url": page_url,
- "message": "处理中",
- }
- except Exception as e:
- return {
- "status": "error",
- "screenshot_base64": "",
- "page_url": "",
- "message": str(e),
- }
- async def wait_for_upload_complete(self, success_selector: str, timeout: int = 300):
- """等待上传完成"""
- if not self.page:
- raise Exception("Page not initialized")
- for _ in range(timeout // 3):
- try:
- count = await self.page.locator(success_selector).count()
- if count > 0:
- return True
- except:
- pass
- await asyncio.sleep(3)
- self.report_progress(30, "正在上传视频...")
- return False
- @abstractmethod
- async def publish(self, cookies: str, params: PublishParams) -> PublishResult:
- """
- 发布视频 - 子类必须实现
- Args:
- cookies: cookie 字符串或 JSON
- params: 发布参数
- Returns:
- PublishResult: 发布结果
- """
- pass
- async def get_works(
- self, cookies: str, page: int = 0, page_size: int = 20
- ) -> WorksResult:
- """
- 获取作品列表 - 子类可覆盖实现
- Args:
- cookies: cookie 字符串或 JSON
- page: 页码(从0开始)
- page_size: 每页数量
- Returns:
- WorksResult: 作品列表结果
- """
- return WorksResult(
- success=False,
- platform=self.platform_name,
- error="该平台暂不支持获取作品列表",
- )
- async def get_comments(
- self, cookies: str, work_id: str, cursor: str = ""
- ) -> CommentsResult:
- """
- 获取作品评论 - 子类可覆盖实现
- Args:
- cookies: cookie 字符串或 JSON
- work_id: 作品ID
- cursor: 分页游标
- Returns:
- CommentsResult: 评论列表结果
- """
- return CommentsResult(
- success=False,
- platform=self.platform_name,
- work_id=work_id,
- error="该平台暂不支持获取评论",
- )
- async def run(self, cookies: str, params: PublishParams) -> PublishResult:
- """
- 运行发布任务
- 包装了 publish 方法,添加了异常处理和资源清理
- 发布失败时自动保存截图到 uploads/screenshots 目录
- """
- try:
- result = await self.publish(cookies, params)
- if not result.success and self.page:
- screenshot_path = await self.save_screenshot_to_file()
- if screenshot_path:
- result.screenshot_path = screenshot_path
- return result
- except Exception as e:
- import traceback
- traceback.print_exc()
- screenshot_path = ""
- if self.page:
- screenshot_path = await self.save_screenshot_to_file()
- return PublishResult(
- success=False,
- platform=self.platform_name,
- error=str(e),
- screenshot_path=screenshot_path,
- )
- finally:
- await self.close_browser()
- async def run_get_works(
- self, cookies: str, page: int = 0, page_size: int = 20
- ) -> WorksResult:
- """
- 运行获取作品任务
- """
- try:
- return await self.get_works(cookies, page, page_size)
- except Exception as e:
- import traceback
- traceback.print_exc()
- return WorksResult(success=False, platform=self.platform_name, error=str(e))
- finally:
- await self.close_browser()
- async def run_get_comments(
- self, cookies: str, work_id: str, cursor: str = ""
- ) -> CommentsResult:
- """
- 运行获取评论任务
- """
- try:
- return await self.get_comments(cookies, work_id, cursor)
- except Exception as e:
- import traceback
- traceback.print_exc()
- return CommentsResult(
- success=False,
- platform=self.platform_name,
- work_id=work_id,
- error=str(e),
- )
- finally:
- await self.close_browser()
- async def check_login_status(self, cookies: str) -> dict:
- """
- 检查 Cookie 登录状态(通过浏览器访问后台页面检测)
- Args:
- cookies: cookie 字符串或 JSON
- Returns:
- dict: {
- "success": True,
- "valid": True/False,
- "need_login": True/False,
- "message": "状态描述"
- }
- """
- try:
- await self.init_browser()
- cookie_list = self.parse_cookies(cookies)
- await self.set_cookies(cookie_list)
- if not self.page:
- raise Exception("Page not initialized")
- # 访问平台后台首页
- home_url = self.login_url
- print(f"[{self.platform_name}] 访问后台页面: {home_url}")
- await self.page.goto(home_url, wait_until="domcontentloaded", timeout=30000)
- await asyncio.sleep(3)
- # 检查当前 URL 是否被重定向到登录页
- current_url = self.page.url
- print(f"[{self.platform_name}] 当前 URL: {current_url}")
- # 登录页特征
- login_indicators = ["login", "passport", "signin", "auth"]
- is_login_page = any(
- indicator in current_url.lower() for indicator in login_indicators
- )
- # 检查页面是否有登录弹窗
- need_login = is_login_page
- # 风控/验证码特征
- risk_indicators = [
- "captcha",
- "verify",
- "challenge",
- "risk",
- "security",
- "safe",
- "protect",
- "slider",
- ]
- need_verification = any(
- indicator in current_url.lower() for indicator in risk_indicators
- )
- if not need_login:
- # 检查页面内容是否有登录提示
- login_selectors = [
- 'text="请先登录"',
- 'text="登录后继续"',
- 'text="请登录"',
- '[class*="login-modal"]',
- '[class*="login-dialog"]',
- '[class*="login-popup"]',
- ]
- for selector in login_selectors:
- try:
- if await self.page.locator(selector).count() > 0:
- need_login = True
- print(f"[{self.platform_name}] 检测到登录弹窗: {selector}")
- break
- except:
- pass
- if not need_login and not need_verification:
- verification_selectors = [
- 'text="安全验证"',
- 'text="验证码"',
- 'text="人机验证"',
- 'text="滑块"',
- 'text="请完成验证"',
- 'text="系统检测到异常"',
- 'text="访问受限"',
- 'text="行为异常"',
- ]
- for selector in verification_selectors:
- try:
- if await self.page.locator(selector).count() > 0:
- need_verification = True
- print(
- f"[{self.platform_name}] 检测到风控/验证码提示: {selector}"
- )
- break
- except:
- pass
- if need_login:
- return {
- "success": True,
- "valid": False,
- "need_login": True,
- "message": "Cookie 已过期,需要重新登录",
- }
- elif need_verification:
- return {
- "success": True,
- "valid": False,
- "need_login": True,
- "message": "触发风控/需要验证",
- }
- else:
- return {
- "success": True,
- "valid": True,
- "need_login": False,
- "message": "登录状态有效",
- }
- except Exception as e:
- import traceback
- traceback.print_exc()
- return {
- "success": False,
- "valid": False,
- "need_login": True,
- "error": str(e),
- }
- finally:
- await self.close_browser()
|