| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102 |
- # -*- coding: utf-8 -*-
- """
- 平台发布基类
- 提供通用的发布接口和工具方法
- """
- import asyncio
- import json
- import os
- import uuid
- from abc import ABC, abstractmethod
- from dataclasses import dataclass, field
- from datetime import datetime
- from typing import List, Optional, Callable, Dict, Any
- from playwright.async_api import async_playwright, Browser, BrowserContext, Page
- @dataclass
- class PublishParams:
- """发布参数"""
- title: str
- video_path: str
- description: str = ""
- cover_path: Optional[str] = None
- tags: List[str] = field(default_factory=list)
- publish_date: Optional[datetime] = None
- location: str = "重庆市"
-
- def __post_init__(self):
- if not self.description:
- self.description = self.title
- @dataclass
- class PublishResult:
- """发布结果"""
- success: bool
- platform: str
- video_id: str = ""
- video_url: str = ""
- message: str = ""
- error: str = ""
- need_captcha: bool = False # 是否需要验证码
- captcha_type: str = "" # 验证码类型: phone, slider, image
- screenshot_base64: str = "" # 页面截图(Base64)
- page_url: str = "" # 当前页面 URL
- status: str = "" # 状态: uploading, processing, success, failed, need_captcha, need_action
- @dataclass
- class WorkItem:
- """作品数据"""
- work_id: str
- title: str
- cover_url: str = ""
- video_url: str = ""
- duration: int = 0 # 秒
- status: str = "published" # published, reviewing, rejected, draft
- publish_time: str = ""
- play_count: int = 0
- like_count: int = 0
- comment_count: int = 0
- share_count: int = 0
- collect_count: int = 0
-
- def to_dict(self) -> Dict[str, Any]:
- return {
- "work_id": self.work_id,
- "title": self.title,
- "cover_url": self.cover_url,
- "video_url": self.video_url,
- "duration": self.duration,
- "status": self.status,
- "publish_time": self.publish_time,
- "play_count": self.play_count,
- "like_count": self.like_count,
- "comment_count": self.comment_count,
- "share_count": self.share_count,
- "collect_count": self.collect_count,
- }
- @dataclass
- class CommentItem:
- """评论数据"""
- comment_id: str
- work_id: str
- content: str
- author_id: str = ""
- author_name: str = ""
- author_avatar: str = ""
- like_count: int = 0
- reply_count: int = 0
- create_time: str = ""
- is_author: bool = False # 是否是作者的评论
- replies: List['CommentItem'] = field(default_factory=list)
-
- def to_dict(self) -> Dict[str, Any]:
- return {
- "comment_id": self.comment_id,
- "work_id": self.work_id,
- "content": self.content,
- "author_id": self.author_id,
- "author_name": self.author_name,
- "author_avatar": self.author_avatar,
- "like_count": self.like_count,
- "reply_count": self.reply_count,
- "create_time": self.create_time,
- "is_author": self.is_author,
- "replies": [r.to_dict() for r in self.replies],
- }
- @dataclass
- class WorksResult:
- """作品列表结果"""
- success: bool
- platform: str
- works: List[WorkItem] = field(default_factory=list)
- total: int = 0
- has_more: bool = False
- next_page: Any = ""
- error: str = ""
- debug_info: str = "" # 调试信息
-
- def to_dict(self) -> Dict[str, Any]:
- return {
- "success": self.success,
- "platform": self.platform,
- "works": [w.to_dict() for w in self.works],
- "total": self.total,
- "has_more": self.has_more,
- "next_page": self.next_page,
- "error": self.error,
- "debug_info": self.debug_info,
- }
- @dataclass
- class CommentsResult:
- """评论列表结果"""
- success: bool
- platform: str
- work_id: str
- comments: List[CommentItem] = field(default_factory=list)
- total: int = 0
- has_more: bool = False
- error: str = ""
-
- def to_dict(self) -> Dict[str, Any]:
- return {
- "success": self.success,
- "platform": self.platform,
- "work_id": self.work_id,
- "comments": [c.to_dict() for c in self.comments],
- "total": self.total,
- "has_more": self.has_more,
- "error": self.error,
- }
- class BasePublisher(ABC):
- """
- 平台发布基类
- 所有平台发布器都需要继承此类
- """
-
- platform_name: str = "base"
- login_url: str = ""
- publish_url: str = ""
- cookie_domain: str = ""
-
- def __init__(self, headless: bool = True):
- self.headless = headless
- self.browser: Optional[Browser] = None
- self.context: Optional[BrowserContext] = None
- self.page: Optional[Page] = None
- self.on_progress: Optional[Callable[[int, str], None]] = None
- self.user_id: Optional[int] = None
- self.publish_task_id: Optional[int] = None
- self.publish_account_id: Optional[int] = None
- self.proxy_config: Optional[Dict[str, Any]] = None
-
- def set_progress_callback(self, callback: Callable[[int, str], None]):
- """设置进度回调"""
- self.on_progress = callback
-
- def report_progress(self, progress: int, message: str):
- """报告进度"""
- print(f"[{self.platform_name}] [{progress}%] {message}")
- if self.on_progress:
- self.on_progress(progress, message)
-
- @staticmethod
- def parse_cookies(cookies_str: str) -> list:
- """解析 cookie 字符串为列表"""
- try:
- cookies = json.loads(cookies_str)
- if isinstance(cookies, list):
- return cookies
- except json.JSONDecodeError:
- pass
-
- # 字符串格式: name=value; name2=value2
- cookies = []
- for item in cookies_str.split(';'):
- item = item.strip()
- if '=' in item:
- name, value = item.split('=', 1)
- cookies.append({
- 'name': name.strip(),
- 'value': value.strip(),
- 'domain': '',
- 'path': '/'
- })
- return cookies
-
- @staticmethod
- def cookies_to_string(cookies: list) -> str:
- """将 cookie 列表转换为字符串"""
- return '; '.join([f"{c['name']}={c['value']}" for c in cookies])
-
- async def init_browser(self, storage_state: str = None, proxy_config: Dict[str, Any] = None):
- """初始化浏览器"""
- print(f"[{self.platform_name}] init_browser: headless={self.headless}", flush=True)
- playwright = await async_playwright().start()
- proxy = proxy_config or self.proxy_config
- if proxy and isinstance(proxy, dict) and proxy.get('server'):
- self.browser = await playwright.chromium.launch(headless=self.headless, proxy=proxy)
- else:
- self.browser = await playwright.chromium.launch(headless=self.headless)
-
- if storage_state and os.path.exists(storage_state):
- self.context = await self.browser.new_context(storage_state=storage_state)
- else:
- self.context = await self.browser.new_context()
-
- self.page = await self.context.new_page()
- return self.page
-
- async def set_cookies(self, cookies: list):
- """设置 cookies"""
- if not self.context:
- raise Exception("Browser context not initialized")
-
- # 设置默认域名
- for cookie in cookies:
- if 'domain' not in cookie or not cookie['domain']:
- cookie['domain'] = self.cookie_domain
-
- await self.context.add_cookies(cookies)
-
- async def close_browser(self):
- """关闭浏览器"""
- if self.context:
- await self.context.close()
- if self.browser:
- await self.browser.close()
-
- async def save_cookies(self, file_path: str):
- """保存 cookies 到文件"""
- if self.context:
- await self.context.storage_state(path=file_path)
- async def capture_screenshot(self) -> str:
- """截取当前页面截图,返回 Base64 编码"""
- import base64
- if not self.page:
- return ""
- try:
- screenshot_bytes = await self.page.screenshot(type="jpeg", quality=80)
- return base64.b64encode(screenshot_bytes).decode('utf-8')
- except Exception as e:
- print(f"[{self.platform_name}] 截图失败: {e}")
- return ""
- async def request_sms_code_from_frontend(self, phone: str = "", timeout_seconds: int = 120, message: str = "") -> str:
- node_api_url = os.environ.get('NODEJS_API_URL', 'http://localhost:3000').rstrip('/')
- internal_api_key = os.environ.get('INTERNAL_API_KEY', 'internal-api-key-default')
- if not self.user_id:
- raise Exception("缺少 user_id,无法请求前端输入验证码")
- captcha_task_id = f"py_{self.platform_name}_{uuid.uuid4().hex}"
- payload = {
- "user_id": self.user_id,
- "captcha_task_id": captcha_task_id,
- "type": "sms",
- "phone": phone or "",
- "message": message or "请输入短信验证码",
- "timeout_seconds": timeout_seconds,
- "publish_task_id": self.publish_task_id,
- "publish_account_id": self.publish_account_id,
- }
- import requests
- try:
- resp = requests.post(
- f"{node_api_url}/api/internal/captcha/request",
- headers={
- "Content-Type": "application/json",
- "X-Internal-API-Key": internal_api_key,
- },
- json=payload,
- timeout=timeout_seconds + 30,
- )
- except Exception as e:
- raise Exception(f"请求前端验证码失败: {e}")
- try:
- data = resp.json()
- except Exception:
- raise Exception(f"请求前端验证码失败: HTTP {resp.status_code}")
- if resp.status_code >= 400 or not data.get("success"):
- raise Exception(data.get("error") or data.get("message") or f"请求前端验证码失败: HTTP {resp.status_code}")
- code = data.get("code") or ""
- if not code:
- raise Exception("未收到验证码")
- return str(code)
- async def ai_analyze_sms_send_state(self, screenshot_base64: str = None) -> dict:
- import os
- import requests
- import json
- import re
- try:
- if not screenshot_base64:
- screenshot_base64 = await self.capture_screenshot()
- if not screenshot_base64:
- return {
- "has_sms_modal": False,
- "send_button_state": "unknown",
- "sent_likely": False,
- "block_reason": "unknown",
- "suggested_action": "manual_send",
- "confidence": 0,
- "notes": "无法获取截图",
- }
- ai_api_key = os.environ.get('DASHSCOPE_API_KEY', '')
- ai_base_url = os.environ.get('DASHSCOPE_BASE_URL', 'https://dashscope.aliyuncs.com/compatible-mode/v1')
- ai_vision_model = os.environ.get('AI_VISION_MODEL', 'qwen-vl-plus')
- if not ai_api_key:
- return {
- "has_sms_modal": True,
- "send_button_state": "unknown",
- "sent_likely": False,
- "block_reason": "no_ai_key",
- "suggested_action": "manual_send",
- "confidence": 0,
- "notes": "未配置 AI API Key",
- }
- prompt = """请分析这张网页截图,判断是否处于“短信验证码”验证弹窗/页面,并判断“发送验证码/获取验证码”是否已经触发成功。
- 你需要重点识别:
- 1) 是否存在短信验证码弹窗(包含“请输入验证码/短信验证码/手机号验证/获取验证码/发送验证码”等)
- 2) 发送按钮状态:enabled / disabled / countdown(出现xx秒) / hidden / unknown
- 3) 是否已发送成功:例如出现倒计时、按钮禁用、出现“已发送/重新发送/xx秒后重试”等
- 4) 是否被阻塞:例如出现滑块/人机验证、频繁发送、风控提示、网络异常等
- 请以 JSON 返回:
- ```json
- {
- "has_sms_modal": true,
- "send_button_state": "enabled|disabled|countdown|hidden|unknown",
- "sent_likely": true,
- "block_reason": "none|need_click_send|slider|risk|rate_limit|network|unknown",
- "suggested_action": "wait|click_send|solve_slider|manual_send",
- "confidence": 0-100,
- "notes": "一句话说明你看到的证据"
- }
- ```"""
- headers = {
- 'Authorization': f'Bearer {ai_api_key}',
- 'Content-Type': 'application/json'
- }
- payload = {
- "model": ai_vision_model,
- "messages": [
- {
- "role": "user",
- "content": [
- {
- "type": "image_url",
- "image_url": {
- "url": f"data:image/jpeg;base64,{screenshot_base64}"
- }
- },
- {
- "type": "text",
- "text": prompt
- }
- ]
- }
- ],
- "max_tokens": 500
- }
- response = requests.post(
- f"{ai_base_url}/chat/completions",
- headers=headers,
- json=payload,
- timeout=30
- )
- if response.status_code != 200:
- return {
- "has_sms_modal": True,
- "send_button_state": "unknown",
- "sent_likely": False,
- "block_reason": "network",
- "suggested_action": "manual_send",
- "confidence": 0,
- "notes": f"AI API 返回错误 {response.status_code}",
- }
- result = response.json()
- ai_response = result.get('choices', [{}])[0].get('message', {}).get('content', '')
- json_match = re.search(r'```json\\s*([\\s\\S]*?)\\s*```', ai_response)
- if json_match:
- json_str = json_match.group(1)
- else:
- json_match = re.search(r'\\{[\\s\\S]*\\}', ai_response)
- json_str = json_match.group(0) if json_match else '{}'
- try:
- data = json.loads(json_str)
- except Exception:
- data = {}
- return {
- "has_sms_modal": bool(data.get("has_sms_modal", True)),
- "send_button_state": data.get("send_button_state", "unknown"),
- "sent_likely": bool(data.get("sent_likely", False)),
- "block_reason": data.get("block_reason", "unknown"),
- "suggested_action": data.get("suggested_action", "manual_send"),
- "confidence": int(data.get("confidence", 0) or 0),
- "notes": data.get("notes", ""),
- }
- except Exception as e:
- return {
- "has_sms_modal": True,
- "send_button_state": "unknown",
- "sent_likely": False,
- "block_reason": "unknown",
- "suggested_action": "manual_send",
- "confidence": 0,
- "notes": f"AI 分析异常: {e}",
- }
- async def sync_cookies_to_node(self, cookies: list) -> bool:
- import os
- import json
- import requests
- if not self.user_id or not self.publish_account_id:
- return False
- async def ai_suggest_playwright_selector(self, goal: str, screenshot_base64: str = None) -> dict:
- import os
- import requests
- import json
- import re
- try:
- if not screenshot_base64:
- screenshot_base64 = await self.capture_screenshot()
- if not screenshot_base64:
- return {"has_selector": False, "selector": "", "confidence": 0, "notes": "无法获取截图"}
- ai_api_key = os.environ.get('DASHSCOPE_API_KEY', '')
- ai_base_url = os.environ.get('DASHSCOPE_BASE_URL', 'https://dashscope.aliyuncs.com/compatible-mode/v1')
- ai_vision_model = os.environ.get('AI_VISION_MODEL', 'qwen-vl-plus')
- if not ai_api_key:
- return {"has_selector": False, "selector": "", "confidence": 0, "notes": "未配置 AI API Key"}
- prompt = f"""请分析这张网页截图,给出一个 Playwright Python 可用的 selector(用于 page.locator(selector))来完成目标操作。
- 目标:{goal}
- 要求:
- 1) selector 尽量稳定(优先 role/text/aria,其次 class,避免过度依赖随机 class)
- 2) selector 必须是 Playwright 支持的选择器语法(如:text="发布"、button:has-text("发布")、[role="button"]:has-text("发布") 等)
- 3) 只返回一个最优 selector
- 以 JSON 返回:
- ```json
- {{
- "has_selector": true,
- "selector": "button:has-text(\\"发布\\")",
- "confidence": 0-100,
- "notes": "你依据的页面证据"
- }}
- ```"""
- headers = {
- 'Authorization': f'Bearer {ai_api_key}',
- 'Content-Type': 'application/json'
- }
- payload = {
- "model": ai_vision_model,
- "messages": [
- {
- "role": "user",
- "content": [
- {
- "type": "image_url",
- "image_url": {
- "url": f"data:image/jpeg;base64,{screenshot_base64}"
- }
- },
- {
- "type": "text",
- "text": prompt
- }
- ]
- }
- ],
- "max_tokens": 300
- }
- response = requests.post(
- f"{ai_base_url}/chat/completions",
- headers=headers,
- json=payload,
- timeout=30
- )
- if response.status_code != 200:
- return {"has_selector": False, "selector": "", "confidence": 0, "notes": f"AI API 错误 {response.status_code}"}
- result = response.json()
- ai_response = result.get('choices', [{}])[0].get('message', {}).get('content', '')
- json_match = re.search(r'```json\\s*([\\s\\S]*?)\\s*```', ai_response)
- if json_match:
- json_str = json_match.group(1)
- else:
- json_match = re.search(r'\\{[\\s\\S]*\\}', ai_response)
- json_str = json_match.group(0) if json_match else '{}'
- try:
- data = json.loads(json_str)
- except Exception:
- data = {}
- selector = str(data.get("selector", "") or "").strip()
- has_selector = bool(data.get("has_selector", False)) and bool(selector)
- confidence = int(data.get("confidence", 0) or 0)
- notes = str(data.get("notes", "") or "")
- if not has_selector:
- return {"has_selector": False, "selector": "", "confidence": confidence, "notes": notes or "未给出 selector"}
- return {"has_selector": True, "selector": selector, "confidence": confidence, "notes": notes}
- except Exception as e:
- return {"has_selector": False, "selector": "", "confidence": 0, "notes": f"AI selector 异常: {e}"}
- node_api_url = os.environ.get('NODEJS_API_URL', 'http://localhost:3000').rstrip('/')
- internal_api_key = os.environ.get('INTERNAL_API_KEY', 'internal-api-key-default')
- try:
- payload = {
- "user_id": int(self.user_id),
- "account_id": int(self.publish_account_id),
- "cookies": json.dumps(cookies, ensure_ascii=False),
- }
- resp = requests.post(
- f"{node_api_url}/api/internal/accounts/update-cookies",
- headers={
- "Content-Type": "application/json",
- "X-Internal-API-Key": internal_api_key,
- },
- json=payload,
- timeout=30,
- )
- if resp.status_code >= 400:
- return False
- data = resp.json() if resp.content else {}
- return bool(data.get("success", True))
- except Exception:
- return False
- async def ai_check_captcha(self, screenshot_base64: str = None) -> dict:
- """
- 使用 AI 分析截图检测验证码
-
- Args:
- screenshot_base64: 截图的 Base64 编码,如果为空则自动获取当前页面截图
-
- Returns:
- dict: {
- "has_captcha": bool, # 是否有验证码
- "captcha_type": str, # 验证码类型: slider, image, phone, rotate, puzzle
- "captcha_description": str, # 验证码描述
- "confidence": float, # 置信度 0-100
- "need_headful": bool # 是否需要切换到有头浏览器
- }
- """
- import os
- import requests
-
- try:
- # 获取截图
- if not screenshot_base64:
- screenshot_base64 = await self.capture_screenshot()
-
- if not screenshot_base64:
- print(f"[{self.platform_name}] AI验证码检测: 无法获取截图")
- return {
- "has_captcha": False,
- "captcha_type": "",
- "captcha_description": "",
- "confidence": 0,
- "need_headful": False
- }
-
- # 获取 AI 配置
- ai_api_key = os.environ.get('DASHSCOPE_API_KEY', '')
- ai_base_url = os.environ.get('DASHSCOPE_BASE_URL', 'https://dashscope.aliyuncs.com/compatible-mode/v1')
- ai_vision_model = os.environ.get('AI_VISION_MODEL', 'qwen-vl-plus')
-
- if not ai_api_key:
- print(f"[{self.platform_name}] AI验证码检测: 未配置 AI API Key,使用传统方式检测")
- return await self._traditional_captcha_check()
-
- # 构建 AI 请求
- prompt = """请分析这张网页截图,判断页面上是否存在验证码。
- 请检查以下类型的验证码:
- 1. 滑块验证码(需要滑动滑块到指定位置)
- 2. 图片验证码(需要选择正确的图片、点击图片上的文字等)
- 3. 旋转验证码(需要旋转图片到正确角度)
- 4. 拼图验证码(需要拖动拼图块到正确位置)
- 5. 手机验证码(需要输入手机收到的验证码)
- 6. 计算验证码(需要输入计算结果)
- 请以 JSON 格式返回结果:
- ```json
- {
- "has_captcha": true/false,
- "captcha_type": "slider/image/phone/rotate/puzzle/calculate/none",
- "captcha_description": "验证码的具体描述",
- "confidence": 0-100
- }
- ```
- 注意:
- - 如果页面有明显的验证码弹窗或验证区域,has_captcha 为 true
- - 如果只是普通的登录页面或表单,没有特殊的验证步骤,has_captcha 为 false
- - confidence 表示你对判断结果的信心,100 表示非常确定"""
- headers = {
- 'Authorization': f'Bearer {ai_api_key}',
- 'Content-Type': 'application/json'
- }
-
- payload = {
- "model": ai_vision_model,
- "messages": [
- {
- "role": "user",
- "content": [
- {
- "type": "image_url",
- "image_url": {
- "url": f"data:image/jpeg;base64,{screenshot_base64}"
- }
- },
- {
- "type": "text",
- "text": prompt
- }
- ]
- }
- ],
- "max_tokens": 500
- }
-
- print(f"[{self.platform_name}] AI验证码检测: 正在分析截图...")
-
- response = requests.post(
- f"{ai_base_url}/chat/completions",
- headers=headers,
- json=payload,
- timeout=30
- )
-
- if response.status_code != 200:
- print(f"[{self.platform_name}] AI验证码检测: API 返回错误 {response.status_code}")
- return await self._traditional_captcha_check()
-
- result = response.json()
- ai_response = result.get('choices', [{}])[0].get('message', {}).get('content', '')
-
- print(f"[{self.platform_name}] AI验证码检测响应: {ai_response[:200]}...")
-
- # 解析 AI 响应
- import re
- json_match = re.search(r'```json\s*([\s\S]*?)\s*```', ai_response)
- if json_match:
- json_str = json_match.group(1)
- else:
- # 尝试直接解析
- json_match = re.search(r'\{[\s\S]*\}', ai_response)
- if json_match:
- json_str = json_match.group(0)
- else:
- json_str = '{}'
-
- try:
- ai_result = json.loads(json_str)
- except:
- ai_result = {}
-
- has_captcha = ai_result.get('has_captcha', False)
- captcha_type = ai_result.get('captcha_type', '')
- captcha_description = ai_result.get('captcha_description', '')
- confidence = ai_result.get('confidence', 0)
-
- # 如果检测到验证码,需要切换到有头浏览器
- need_headful = has_captcha and captcha_type not in ['none', '']
-
- print(f"[{self.platform_name}] AI验证码检测结果: has_captcha={has_captcha}, type={captcha_type}, confidence={confidence}")
-
- return {
- "has_captcha": has_captcha,
- "captcha_type": captcha_type if captcha_type != 'none' else '',
- "captcha_description": captcha_description,
- "confidence": confidence,
- "need_headful": need_headful
- }
-
- except Exception as e:
- print(f"[{self.platform_name}] AI验证码检测异常: {e}")
- import traceback
- traceback.print_exc()
- return await self._traditional_captcha_check()
-
- async def _traditional_captcha_check(self) -> dict:
- """传统方式检测验证码(基于 DOM 元素)"""
- if not self.page:
- return {
- "has_captcha": False,
- "captcha_type": "",
- "captcha_description": "",
- "confidence": 0,
- "need_headful": False
- }
-
- try:
- # 检查常见的验证码选择器
- captcha_selectors = [
- # 滑块验证码
- ('[class*="slider"]', 'slider', '滑块验证码'),
- ('[class*="slide-verify"]', 'slider', '滑块验证码'),
- ('text="滑动"', 'slider', '滑块验证码'),
- ('text="拖动"', 'slider', '滑块验证码'),
-
- # 图片验证码
- ('[class*="captcha"]', 'image', '图片验证码'),
- ('[class*="verify-img"]', 'image', '图片验证码'),
- ('text="点击"', 'image', '图片验证码'),
- ('text="选择"', 'image', '图片验证码'),
-
- # 手机验证码
- ('text="验证码"', 'phone', '手机验证码'),
- ('text="获取验证码"', 'phone', '手机验证码'),
- ('[class*="sms-code"]', 'phone', '手机验证码'),
-
- # 旋转验证码
- ('text="旋转"', 'rotate', '旋转验证码'),
- ('[class*="rotate"]', 'rotate', '旋转验证码'),
- ]
-
- for selector, captcha_type, description in captcha_selectors:
- try:
- count = await self.page.locator(selector).count()
- if count > 0:
- # 检查是否可见
- element = self.page.locator(selector).first
- if await element.is_visible():
- print(f"[{self.platform_name}] 传统检测: 发现验证码 - {selector}")
- return {
- "has_captcha": True,
- "captcha_type": captcha_type,
- "captcha_description": description,
- "confidence": 80,
- "need_headful": True
- }
- except:
- pass
-
- return {
- "has_captcha": False,
- "captcha_type": "",
- "captcha_description": "",
- "confidence": 80,
- "need_headful": False
- }
- except Exception as e:
- print(f"[{self.platform_name}] 传统验证码检测异常: {e}")
- return {
- "has_captcha": False,
- "captcha_type": "",
- "captcha_description": "",
- "confidence": 0,
- "need_headful": False
- }
- async def get_page_url(self) -> str:
- """获取当前页面 URL"""
- if not self.page:
- return ""
- try:
- return self.page.url
- except:
- return ""
- async def check_publish_status(self) -> dict:
- """
- 检查发布状态
- 返回: {status, screenshot_base64, page_url, message}
- """
- if not self.page:
- return {"status": "error", "message": "页面未初始化"}
-
- try:
- screenshot = await self.capture_screenshot()
- page_url = await self.get_page_url()
-
- # 检查常见的成功/失败标志
- page_content = await self.page.content()
-
- # 检查成功标志
- success_keywords = ['发布成功', '上传成功', '发表成功', '提交成功']
- for keyword in success_keywords:
- if keyword in page_content:
- return {
- "status": "success",
- "screenshot_base64": screenshot,
- "page_url": page_url,
- "message": "发布成功"
- }
-
- # 检查验证码标志
- captcha_keywords = ['验证码', '身份验证', '请完成验证', '滑动验证', '图形验证']
- for keyword in captcha_keywords:
- if keyword in page_content:
- return {
- "status": "need_captcha",
- "screenshot_base64": screenshot,
- "page_url": page_url,
- "message": f"检测到{keyword}"
- }
-
- # 检查失败标志
- fail_keywords = ['发布失败', '上传失败', '提交失败', '操作失败']
- for keyword in fail_keywords:
- if keyword in page_content:
- return {
- "status": "failed",
- "screenshot_base64": screenshot,
- "page_url": page_url,
- "message": keyword
- }
-
- # 默认返回处理中
- return {
- "status": "processing",
- "screenshot_base64": screenshot,
- "page_url": page_url,
- "message": "处理中"
- }
- except Exception as e:
- return {
- "status": "error",
- "screenshot_base64": "",
- "page_url": "",
- "message": str(e)
- }
- async def wait_for_upload_complete(self, success_selector: str, timeout: int = 300):
- """等待上传完成"""
- if not self.page:
- raise Exception("Page not initialized")
-
- for _ in range(timeout // 3):
- try:
- count = await self.page.locator(success_selector).count()
- if count > 0:
- return True
- except:
- pass
- await asyncio.sleep(3)
- self.report_progress(30, "正在上传视频...")
-
- return False
-
- @abstractmethod
- async def publish(self, cookies: str, params: PublishParams) -> PublishResult:
- """
- 发布视频 - 子类必须实现
-
- Args:
- cookies: cookie 字符串或 JSON
- params: 发布参数
-
- Returns:
- PublishResult: 发布结果
- """
- pass
-
- async def get_works(self, cookies: str, page: int = 0, page_size: int = 20) -> WorksResult:
- """
- 获取作品列表 - 子类可覆盖实现
-
- Args:
- cookies: cookie 字符串或 JSON
- page: 页码(从0开始)
- page_size: 每页数量
-
- Returns:
- WorksResult: 作品列表结果
- """
- return WorksResult(
- success=False,
- platform=self.platform_name,
- error="该平台暂不支持获取作品列表"
- )
-
- async def get_comments(self, cookies: str, work_id: str, cursor: str = "") -> CommentsResult:
- """
- 获取作品评论 - 子类可覆盖实现
-
- Args:
- cookies: cookie 字符串或 JSON
- work_id: 作品ID
- cursor: 分页游标
-
- Returns:
- CommentsResult: 评论列表结果
- """
- return CommentsResult(
- success=False,
- platform=self.platform_name,
- work_id=work_id,
- error="该平台暂不支持获取评论"
- )
-
- async def run(self, cookies: str, params: PublishParams) -> PublishResult:
- """
- 运行发布任务
- 包装了 publish 方法,添加了异常处理和资源清理
- """
- try:
- return await self.publish(cookies, params)
- except Exception as e:
- import traceback
- traceback.print_exc()
- return PublishResult(
- success=False,
- platform=self.platform_name,
- error=str(e)
- )
- finally:
- await self.close_browser()
-
- async def run_get_works(self, cookies: str, page: int = 0, page_size: int = 20) -> WorksResult:
- """
- 运行获取作品任务
- """
- try:
- return await self.get_works(cookies, page, page_size)
- except Exception as e:
- import traceback
- traceback.print_exc()
- return WorksResult(
- success=False,
- platform=self.platform_name,
- error=str(e)
- )
- finally:
- await self.close_browser()
-
- async def run_get_comments(self, cookies: str, work_id: str, cursor: str = "") -> CommentsResult:
- """
- 运行获取评论任务
- """
- try:
- return await self.get_comments(cookies, work_id, cursor)
- except Exception as e:
- import traceback
- traceback.print_exc()
- return CommentsResult(
- success=False,
- platform=self.platform_name,
- work_id=work_id,
- error=str(e)
- )
- finally:
- await self.close_browser()
-
- async def check_login_status(self, cookies: str) -> dict:
- """
- 检查 Cookie 登录状态(通过浏览器访问后台页面检测)
-
- Args:
- cookies: cookie 字符串或 JSON
-
- Returns:
- dict: {
- "success": True,
- "valid": True/False,
- "need_login": True/False,
- "message": "状态描述"
- }
- """
- try:
- await self.init_browser()
- cookie_list = self.parse_cookies(cookies)
- await self.set_cookies(cookie_list)
-
- if not self.page:
- raise Exception("Page not initialized")
-
- # 访问平台后台首页
- home_url = self.login_url
- print(f"[{self.platform_name}] 访问后台页面: {home_url}")
- await self.page.goto(home_url, wait_until='domcontentloaded', timeout=30000)
- await asyncio.sleep(3)
-
- # 检查当前 URL 是否被重定向到登录页
- current_url = self.page.url
- print(f"[{self.platform_name}] 当前 URL: {current_url}")
-
- # 登录页特征
- login_indicators = ['login', 'passport', 'signin', 'auth']
- is_login_page = any(indicator in current_url.lower() for indicator in login_indicators)
-
- # 检查页面是否有登录弹窗
- need_login = is_login_page
-
- if not need_login:
- # 检查页面内容是否有登录提示
- login_selectors = [
- 'text="请先登录"',
- 'text="登录后继续"',
- 'text="请登录"',
- '[class*="login-modal"]',
- '[class*="login-dialog"]',
- '[class*="login-popup"]',
- ]
- for selector in login_selectors:
- try:
- if await self.page.locator(selector).count() > 0:
- need_login = True
- print(f"[{self.platform_name}] 检测到登录弹窗: {selector}")
- break
- except:
- pass
-
- if need_login:
- return {
- "success": True,
- "valid": False,
- "need_login": True,
- "message": "Cookie 已过期,需要重新登录"
- }
- else:
- return {
- "success": True,
- "valid": True,
- "need_login": False,
- "message": "登录状态有效"
- }
-
- except Exception as e:
- import traceback
- traceback.print_exc()
- return {
- "success": False,
- "valid": False,
- "need_login": True,
- "error": str(e)
- }
- finally:
- await self.close_browser()
|