| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768 |
- # -*- coding: utf-8 -*-
- """
- 平台发布基类
- 提供通用的发布接口和工具方法
- """
- import asyncio
- import json
- import os
- from abc import ABC, abstractmethod
- from dataclasses import dataclass, field
- from datetime import datetime
- from typing import List, Optional, Callable, Dict, Any
- from playwright.async_api import async_playwright, Browser, BrowserContext, Page
- @dataclass
- class PublishParams:
- """发布参数"""
- title: str
- video_path: str
- description: str = ""
- cover_path: Optional[str] = None
- tags: List[str] = field(default_factory=list)
- publish_date: Optional[datetime] = None
- location: str = "重庆市"
-
- def __post_init__(self):
- if not self.description:
- self.description = self.title
- @dataclass
- class PublishResult:
- """发布结果"""
- success: bool
- platform: str
- video_id: str = ""
- video_url: str = ""
- message: str = ""
- error: str = ""
- need_captcha: bool = False # 是否需要验证码
- captcha_type: str = "" # 验证码类型: phone, slider, image
- screenshot_base64: str = "" # 页面截图(Base64)
- page_url: str = "" # 当前页面 URL
- status: str = "" # 状态: uploading, processing, success, failed, need_captcha, need_action
- @dataclass
- class WorkItem:
- """作品数据"""
- work_id: str
- title: str
- cover_url: str = ""
- video_url: str = ""
- duration: int = 0 # 秒
- status: str = "published" # published, reviewing, rejected, draft
- publish_time: str = ""
- play_count: int = 0
- like_count: int = 0
- comment_count: int = 0
- share_count: int = 0
- collect_count: int = 0
-
- def to_dict(self) -> Dict[str, Any]:
- return {
- "work_id": self.work_id,
- "title": self.title,
- "cover_url": self.cover_url,
- "video_url": self.video_url,
- "duration": self.duration,
- "status": self.status,
- "publish_time": self.publish_time,
- "play_count": self.play_count,
- "like_count": self.like_count,
- "comment_count": self.comment_count,
- "share_count": self.share_count,
- "collect_count": self.collect_count,
- }
- @dataclass
- class CommentItem:
- """评论数据"""
- comment_id: str
- work_id: str
- content: str
- author_id: str = ""
- author_name: str = ""
- author_avatar: str = ""
- like_count: int = 0
- reply_count: int = 0
- create_time: str = ""
- is_author: bool = False # 是否是作者的评论
- replies: List['CommentItem'] = field(default_factory=list)
-
- def to_dict(self) -> Dict[str, Any]:
- return {
- "comment_id": self.comment_id,
- "work_id": self.work_id,
- "content": self.content,
- "author_id": self.author_id,
- "author_name": self.author_name,
- "author_avatar": self.author_avatar,
- "like_count": self.like_count,
- "reply_count": self.reply_count,
- "create_time": self.create_time,
- "is_author": self.is_author,
- "replies": [r.to_dict() for r in self.replies],
- }
- @dataclass
- class WorksResult:
- """作品列表结果"""
- success: bool
- platform: str
- works: List[WorkItem] = field(default_factory=list)
- total: int = 0
- has_more: bool = False
- error: str = ""
-
- def to_dict(self) -> Dict[str, Any]:
- return {
- "success": self.success,
- "platform": self.platform,
- "works": [w.to_dict() for w in self.works],
- "total": self.total,
- "has_more": self.has_more,
- "error": self.error,
- }
- @dataclass
- class CommentsResult:
- """评论列表结果"""
- success: bool
- platform: str
- work_id: str
- comments: List[CommentItem] = field(default_factory=list)
- total: int = 0
- has_more: bool = False
- error: str = ""
-
- def to_dict(self) -> Dict[str, Any]:
- return {
- "success": self.success,
- "platform": self.platform,
- "work_id": self.work_id,
- "comments": [c.to_dict() for c in self.comments],
- "total": self.total,
- "has_more": self.has_more,
- "error": self.error,
- }
- class BasePublisher(ABC):
- """
- 平台发布基类
- 所有平台发布器都需要继承此类
- """
-
- platform_name: str = "base"
- login_url: str = ""
- publish_url: str = ""
- cookie_domain: str = ""
-
- def __init__(self, headless: bool = True):
- self.headless = headless
- self.browser: Optional[Browser] = None
- self.context: Optional[BrowserContext] = None
- self.page: Optional[Page] = None
- self.on_progress: Optional[Callable[[int, str], None]] = None
-
- def set_progress_callback(self, callback: Callable[[int, str], None]):
- """设置进度回调"""
- self.on_progress = callback
-
- def report_progress(self, progress: int, message: str):
- """报告进度"""
- print(f"[{self.platform_name}] [{progress}%] {message}")
- if self.on_progress:
- self.on_progress(progress, message)
-
- @staticmethod
- def parse_cookies(cookies_str: str) -> list:
- """解析 cookie 字符串为列表"""
- try:
- cookies = json.loads(cookies_str)
- if isinstance(cookies, list):
- return cookies
- except json.JSONDecodeError:
- pass
-
- # 字符串格式: name=value; name2=value2
- cookies = []
- for item in cookies_str.split(';'):
- item = item.strip()
- if '=' in item:
- name, value = item.split('=', 1)
- cookies.append({
- 'name': name.strip(),
- 'value': value.strip(),
- 'domain': '',
- 'path': '/'
- })
- return cookies
-
- @staticmethod
- def cookies_to_string(cookies: list) -> str:
- """将 cookie 列表转换为字符串"""
- return '; '.join([f"{c['name']}={c['value']}" for c in cookies])
-
- async def init_browser(self, storage_state: str = None):
- """初始化浏览器"""
- print(f"[{self.platform_name}] init_browser: headless={self.headless}", flush=True)
- playwright = await async_playwright().start()
- self.browser = await playwright.chromium.launch(headless=self.headless)
-
- if storage_state and os.path.exists(storage_state):
- self.context = await self.browser.new_context(storage_state=storage_state)
- else:
- self.context = await self.browser.new_context()
-
- self.page = await self.context.new_page()
- return self.page
-
- async def set_cookies(self, cookies: list):
- """设置 cookies"""
- if not self.context:
- raise Exception("Browser context not initialized")
-
- # 设置默认域名
- for cookie in cookies:
- if 'domain' not in cookie or not cookie['domain']:
- cookie['domain'] = self.cookie_domain
-
- await self.context.add_cookies(cookies)
-
- async def close_browser(self):
- """关闭浏览器"""
- if self.context:
- await self.context.close()
- if self.browser:
- await self.browser.close()
-
- async def save_cookies(self, file_path: str):
- """保存 cookies 到文件"""
- if self.context:
- await self.context.storage_state(path=file_path)
- async def capture_screenshot(self) -> str:
- """截取当前页面截图,返回 Base64 编码"""
- import base64
- if not self.page:
- return ""
- try:
- screenshot_bytes = await self.page.screenshot(type="jpeg", quality=80)
- return base64.b64encode(screenshot_bytes).decode('utf-8')
- except Exception as e:
- print(f"[{self.platform_name}] 截图失败: {e}")
- return ""
- async def ai_check_captcha(self, screenshot_base64: str = None) -> dict:
- """
- 使用 AI 分析截图检测验证码
-
- Args:
- screenshot_base64: 截图的 Base64 编码,如果为空则自动获取当前页面截图
-
- Returns:
- dict: {
- "has_captcha": bool, # 是否有验证码
- "captcha_type": str, # 验证码类型: slider, image, phone, rotate, puzzle
- "captcha_description": str, # 验证码描述
- "confidence": float, # 置信度 0-100
- "need_headful": bool # 是否需要切换到有头浏览器
- }
- """
- import os
- import requests
-
- try:
- # 获取截图
- if not screenshot_base64:
- screenshot_base64 = await self.capture_screenshot()
-
- if not screenshot_base64:
- print(f"[{self.platform_name}] AI验证码检测: 无法获取截图")
- return {
- "has_captcha": False,
- "captcha_type": "",
- "captcha_description": "",
- "confidence": 0,
- "need_headful": False
- }
-
- # 获取 AI 配置
- ai_api_key = os.environ.get('DASHSCOPE_API_KEY', '')
- ai_base_url = os.environ.get('DASHSCOPE_BASE_URL', 'https://dashscope.aliyuncs.com/compatible-mode/v1')
- ai_vision_model = os.environ.get('AI_VISION_MODEL', 'qwen-vl-plus')
-
- if not ai_api_key:
- print(f"[{self.platform_name}] AI验证码检测: 未配置 AI API Key,使用传统方式检测")
- return await self._traditional_captcha_check()
-
- # 构建 AI 请求
- prompt = """请分析这张网页截图,判断页面上是否存在验证码。
- 请检查以下类型的验证码:
- 1. 滑块验证码(需要滑动滑块到指定位置)
- 2. 图片验证码(需要选择正确的图片、点击图片上的文字等)
- 3. 旋转验证码(需要旋转图片到正确角度)
- 4. 拼图验证码(需要拖动拼图块到正确位置)
- 5. 手机验证码(需要输入手机收到的验证码)
- 6. 计算验证码(需要输入计算结果)
- 请以 JSON 格式返回结果:
- ```json
- {
- "has_captcha": true/false,
- "captcha_type": "slider/image/phone/rotate/puzzle/calculate/none",
- "captcha_description": "验证码的具体描述",
- "confidence": 0-100
- }
- ```
- 注意:
- - 如果页面有明显的验证码弹窗或验证区域,has_captcha 为 true
- - 如果只是普通的登录页面或表单,没有特殊的验证步骤,has_captcha 为 false
- - confidence 表示你对判断结果的信心,100 表示非常确定"""
- headers = {
- 'Authorization': f'Bearer {ai_api_key}',
- 'Content-Type': 'application/json'
- }
-
- payload = {
- "model": ai_vision_model,
- "messages": [
- {
- "role": "user",
- "content": [
- {
- "type": "image_url",
- "image_url": {
- "url": f"data:image/jpeg;base64,{screenshot_base64}"
- }
- },
- {
- "type": "text",
- "text": prompt
- }
- ]
- }
- ],
- "max_tokens": 500
- }
-
- print(f"[{self.platform_name}] AI验证码检测: 正在分析截图...")
-
- response = requests.post(
- f"{ai_base_url}/chat/completions",
- headers=headers,
- json=payload,
- timeout=30
- )
-
- if response.status_code != 200:
- print(f"[{self.platform_name}] AI验证码检测: API 返回错误 {response.status_code}")
- return await self._traditional_captcha_check()
-
- result = response.json()
- ai_response = result.get('choices', [{}])[0].get('message', {}).get('content', '')
-
- print(f"[{self.platform_name}] AI验证码检测响应: {ai_response[:200]}...")
-
- # 解析 AI 响应
- import re
- json_match = re.search(r'```json\s*([\s\S]*?)\s*```', ai_response)
- if json_match:
- json_str = json_match.group(1)
- else:
- # 尝试直接解析
- json_match = re.search(r'\{[\s\S]*\}', ai_response)
- if json_match:
- json_str = json_match.group(0)
- else:
- json_str = '{}'
-
- try:
- ai_result = json.loads(json_str)
- except:
- ai_result = {}
-
- has_captcha = ai_result.get('has_captcha', False)
- captcha_type = ai_result.get('captcha_type', '')
- captcha_description = ai_result.get('captcha_description', '')
- confidence = ai_result.get('confidence', 0)
-
- # 如果检测到验证码,需要切换到有头浏览器
- need_headful = has_captcha and captcha_type not in ['none', '']
-
- print(f"[{self.platform_name}] AI验证码检测结果: has_captcha={has_captcha}, type={captcha_type}, confidence={confidence}")
-
- return {
- "has_captcha": has_captcha,
- "captcha_type": captcha_type if captcha_type != 'none' else '',
- "captcha_description": captcha_description,
- "confidence": confidence,
- "need_headful": need_headful
- }
-
- except Exception as e:
- print(f"[{self.platform_name}] AI验证码检测异常: {e}")
- import traceback
- traceback.print_exc()
- return await self._traditional_captcha_check()
-
- async def _traditional_captcha_check(self) -> dict:
- """传统方式检测验证码(基于 DOM 元素)"""
- if not self.page:
- return {
- "has_captcha": False,
- "captcha_type": "",
- "captcha_description": "",
- "confidence": 0,
- "need_headful": False
- }
-
- try:
- # 检查常见的验证码选择器
- captcha_selectors = [
- # 滑块验证码
- ('[class*="slider"]', 'slider', '滑块验证码'),
- ('[class*="slide-verify"]', 'slider', '滑块验证码'),
- ('text="滑动"', 'slider', '滑块验证码'),
- ('text="拖动"', 'slider', '滑块验证码'),
-
- # 图片验证码
- ('[class*="captcha"]', 'image', '图片验证码'),
- ('[class*="verify-img"]', 'image', '图片验证码'),
- ('text="点击"', 'image', '图片验证码'),
- ('text="选择"', 'image', '图片验证码'),
-
- # 手机验证码
- ('text="验证码"', 'phone', '手机验证码'),
- ('text="获取验证码"', 'phone', '手机验证码'),
- ('[class*="sms-code"]', 'phone', '手机验证码'),
-
- # 旋转验证码
- ('text="旋转"', 'rotate', '旋转验证码'),
- ('[class*="rotate"]', 'rotate', '旋转验证码'),
- ]
-
- for selector, captcha_type, description in captcha_selectors:
- try:
- count = await self.page.locator(selector).count()
- if count > 0:
- # 检查是否可见
- element = self.page.locator(selector).first
- if await element.is_visible():
- print(f"[{self.platform_name}] 传统检测: 发现验证码 - {selector}")
- return {
- "has_captcha": True,
- "captcha_type": captcha_type,
- "captcha_description": description,
- "confidence": 80,
- "need_headful": True
- }
- except:
- pass
-
- return {
- "has_captcha": False,
- "captcha_type": "",
- "captcha_description": "",
- "confidence": 80,
- "need_headful": False
- }
- except Exception as e:
- print(f"[{self.platform_name}] 传统验证码检测异常: {e}")
- return {
- "has_captcha": False,
- "captcha_type": "",
- "captcha_description": "",
- "confidence": 0,
- "need_headful": False
- }
- async def get_page_url(self) -> str:
- """获取当前页面 URL"""
- if not self.page:
- return ""
- try:
- return self.page.url
- except:
- return ""
- async def check_publish_status(self) -> dict:
- """
- 检查发布状态
- 返回: {status, screenshot_base64, page_url, message}
- """
- if not self.page:
- return {"status": "error", "message": "页面未初始化"}
-
- try:
- screenshot = await self.capture_screenshot()
- page_url = await self.get_page_url()
-
- # 检查常见的成功/失败标志
- page_content = await self.page.content()
-
- # 检查成功标志
- success_keywords = ['发布成功', '上传成功', '发表成功', '提交成功']
- for keyword in success_keywords:
- if keyword in page_content:
- return {
- "status": "success",
- "screenshot_base64": screenshot,
- "page_url": page_url,
- "message": "发布成功"
- }
-
- # 检查验证码标志
- captcha_keywords = ['验证码', '身份验证', '请完成验证', '滑动验证', '图形验证']
- for keyword in captcha_keywords:
- if keyword in page_content:
- return {
- "status": "need_captcha",
- "screenshot_base64": screenshot,
- "page_url": page_url,
- "message": f"检测到{keyword}"
- }
-
- # 检查失败标志
- fail_keywords = ['发布失败', '上传失败', '提交失败', '操作失败']
- for keyword in fail_keywords:
- if keyword in page_content:
- return {
- "status": "failed",
- "screenshot_base64": screenshot,
- "page_url": page_url,
- "message": keyword
- }
-
- # 默认返回处理中
- return {
- "status": "processing",
- "screenshot_base64": screenshot,
- "page_url": page_url,
- "message": "处理中"
- }
- except Exception as e:
- return {
- "status": "error",
- "screenshot_base64": "",
- "page_url": "",
- "message": str(e)
- }
- async def wait_for_upload_complete(self, success_selector: str, timeout: int = 300):
- """等待上传完成"""
- if not self.page:
- raise Exception("Page not initialized")
-
- for _ in range(timeout // 3):
- try:
- count = await self.page.locator(success_selector).count()
- if count > 0:
- return True
- except:
- pass
- await asyncio.sleep(3)
- self.report_progress(30, "正在上传视频...")
-
- return False
-
- @abstractmethod
- async def publish(self, cookies: str, params: PublishParams) -> PublishResult:
- """
- 发布视频 - 子类必须实现
-
- Args:
- cookies: cookie 字符串或 JSON
- params: 发布参数
-
- Returns:
- PublishResult: 发布结果
- """
- pass
-
- async def get_works(self, cookies: str, page: int = 0, page_size: int = 20) -> WorksResult:
- """
- 获取作品列表 - 子类可覆盖实现
-
- Args:
- cookies: cookie 字符串或 JSON
- page: 页码(从0开始)
- page_size: 每页数量
-
- Returns:
- WorksResult: 作品列表结果
- """
- return WorksResult(
- success=False,
- platform=self.platform_name,
- error="该平台暂不支持获取作品列表"
- )
-
- async def get_comments(self, cookies: str, work_id: str, cursor: str = "") -> CommentsResult:
- """
- 获取作品评论 - 子类可覆盖实现
-
- Args:
- cookies: cookie 字符串或 JSON
- work_id: 作品ID
- cursor: 分页游标
-
- Returns:
- CommentsResult: 评论列表结果
- """
- return CommentsResult(
- success=False,
- platform=self.platform_name,
- work_id=work_id,
- error="该平台暂不支持获取评论"
- )
-
- async def run(self, cookies: str, params: PublishParams) -> PublishResult:
- """
- 运行发布任务
- 包装了 publish 方法,添加了异常处理和资源清理
- """
- try:
- return await self.publish(cookies, params)
- except Exception as e:
- import traceback
- traceback.print_exc()
- return PublishResult(
- success=False,
- platform=self.platform_name,
- error=str(e)
- )
- finally:
- await self.close_browser()
-
- async def run_get_works(self, cookies: str, page: int = 0, page_size: int = 20) -> WorksResult:
- """
- 运行获取作品任务
- """
- try:
- return await self.get_works(cookies, page, page_size)
- except Exception as e:
- import traceback
- traceback.print_exc()
- return WorksResult(
- success=False,
- platform=self.platform_name,
- error=str(e)
- )
- finally:
- await self.close_browser()
-
- async def run_get_comments(self, cookies: str, work_id: str, cursor: str = "") -> CommentsResult:
- """
- 运行获取评论任务
- """
- try:
- return await self.get_comments(cookies, work_id, cursor)
- except Exception as e:
- import traceback
- traceback.print_exc()
- return CommentsResult(
- success=False,
- platform=self.platform_name,
- work_id=work_id,
- error=str(e)
- )
- finally:
- await self.close_browser()
-
- async def check_login_status(self, cookies: str) -> dict:
- """
- 检查 Cookie 登录状态(通过浏览器访问后台页面检测)
-
- Args:
- cookies: cookie 字符串或 JSON
-
- Returns:
- dict: {
- "success": True,
- "valid": True/False,
- "need_login": True/False,
- "message": "状态描述"
- }
- """
- try:
- await self.init_browser()
- cookie_list = self.parse_cookies(cookies)
- await self.set_cookies(cookie_list)
-
- if not self.page:
- raise Exception("Page not initialized")
-
- # 访问平台后台首页
- home_url = self.login_url
- print(f"[{self.platform_name}] 访问后台页面: {home_url}")
- await self.page.goto(home_url, wait_until='domcontentloaded', timeout=30000)
- await asyncio.sleep(3)
-
- # 检查当前 URL 是否被重定向到登录页
- current_url = self.page.url
- print(f"[{self.platform_name}] 当前 URL: {current_url}")
-
- # 登录页特征
- login_indicators = ['login', 'passport', 'signin', 'auth']
- is_login_page = any(indicator in current_url.lower() for indicator in login_indicators)
-
- # 检查页面是否有登录弹窗
- need_login = is_login_page
-
- if not need_login:
- # 检查页面内容是否有登录提示
- login_selectors = [
- 'text="请先登录"',
- 'text="登录后继续"',
- 'text="请登录"',
- '[class*="login-modal"]',
- '[class*="login-dialog"]',
- '[class*="login-popup"]',
- ]
- for selector in login_selectors:
- try:
- if await self.page.locator(selector).count() > 0:
- need_login = True
- print(f"[{self.platform_name}] 检测到登录弹窗: {selector}")
- break
- except:
- pass
-
- if need_login:
- return {
- "success": True,
- "valid": False,
- "need_login": True,
- "message": "Cookie 已过期,需要重新登录"
- }
- else:
- return {
- "success": True,
- "valid": True,
- "need_login": False,
- "message": "登录状态有效"
- }
-
- except Exception as e:
- import traceback
- traceback.print_exc()
- return {
- "success": False,
- "valid": False,
- "need_login": True,
- "error": str(e)
- }
- finally:
- await self.close_browser()
|