| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365 |
- # -*- coding: utf-8 -*-
- """
- 平台发布基类
- 提供通用的发布接口和工具方法
- """
- import asyncio
- import json
- import os
- from abc import ABC, abstractmethod
- from dataclasses import dataclass, field
- from datetime import datetime
- from typing import List, Optional, Callable, Dict, Any
- from playwright.async_api import async_playwright, Browser, BrowserContext, Page
- @dataclass
- class PublishParams:
- """发布参数"""
- title: str
- video_path: str
- description: str = ""
- cover_path: Optional[str] = None
- tags: List[str] = field(default_factory=list)
- publish_date: Optional[datetime] = None
- location: str = "重庆市"
-
- def __post_init__(self):
- if not self.description:
- self.description = self.title
- @dataclass
- class PublishResult:
- """发布结果"""
- success: bool
- platform: str
- video_id: str = ""
- video_url: str = ""
- message: str = ""
- error: str = ""
- @dataclass
- class WorkItem:
- """作品数据"""
- work_id: str
- title: str
- cover_url: str = ""
- video_url: str = ""
- duration: int = 0 # 秒
- status: str = "published" # published, reviewing, rejected, draft
- publish_time: str = ""
- play_count: int = 0
- like_count: int = 0
- comment_count: int = 0
- share_count: int = 0
- collect_count: int = 0
-
- def to_dict(self) -> Dict[str, Any]:
- return {
- "work_id": self.work_id,
- "title": self.title,
- "cover_url": self.cover_url,
- "video_url": self.video_url,
- "duration": self.duration,
- "status": self.status,
- "publish_time": self.publish_time,
- "play_count": self.play_count,
- "like_count": self.like_count,
- "comment_count": self.comment_count,
- "share_count": self.share_count,
- "collect_count": self.collect_count,
- }
- @dataclass
- class CommentItem:
- """评论数据"""
- comment_id: str
- work_id: str
- content: str
- author_id: str = ""
- author_name: str = ""
- author_avatar: str = ""
- like_count: int = 0
- reply_count: int = 0
- create_time: str = ""
- is_author: bool = False # 是否是作者的评论
- replies: List['CommentItem'] = field(default_factory=list)
-
- def to_dict(self) -> Dict[str, Any]:
- return {
- "comment_id": self.comment_id,
- "work_id": self.work_id,
- "content": self.content,
- "author_id": self.author_id,
- "author_name": self.author_name,
- "author_avatar": self.author_avatar,
- "like_count": self.like_count,
- "reply_count": self.reply_count,
- "create_time": self.create_time,
- "is_author": self.is_author,
- "replies": [r.to_dict() for r in self.replies],
- }
- @dataclass
- class WorksResult:
- """作品列表结果"""
- success: bool
- platform: str
- works: List[WorkItem] = field(default_factory=list)
- total: int = 0
- has_more: bool = False
- error: str = ""
-
- def to_dict(self) -> Dict[str, Any]:
- return {
- "success": self.success,
- "platform": self.platform,
- "works": [w.to_dict() for w in self.works],
- "total": self.total,
- "has_more": self.has_more,
- "error": self.error,
- }
- @dataclass
- class CommentsResult:
- """评论列表结果"""
- success: bool
- platform: str
- work_id: str
- comments: List[CommentItem] = field(default_factory=list)
- total: int = 0
- has_more: bool = False
- error: str = ""
-
- def to_dict(self) -> Dict[str, Any]:
- return {
- "success": self.success,
- "platform": self.platform,
- "work_id": self.work_id,
- "comments": [c.to_dict() for c in self.comments],
- "total": self.total,
- "has_more": self.has_more,
- "error": self.error,
- }
- class BasePublisher(ABC):
- """
- 平台发布基类
- 所有平台发布器都需要继承此类
- """
-
- platform_name: str = "base"
- login_url: str = ""
- publish_url: str = ""
- cookie_domain: str = ""
-
- def __init__(self, headless: bool = True):
- self.headless = headless
- self.browser: Optional[Browser] = None
- self.context: Optional[BrowserContext] = None
- self.page: Optional[Page] = None
- self.on_progress: Optional[Callable[[int, str], None]] = None
-
- def set_progress_callback(self, callback: Callable[[int, str], None]):
- """设置进度回调"""
- self.on_progress = callback
-
- def report_progress(self, progress: int, message: str):
- """报告进度"""
- print(f"[{self.platform_name}] [{progress}%] {message}")
- if self.on_progress:
- self.on_progress(progress, message)
-
- @staticmethod
- def parse_cookies(cookies_str: str) -> list:
- """解析 cookie 字符串为列表"""
- try:
- cookies = json.loads(cookies_str)
- if isinstance(cookies, list):
- return cookies
- except json.JSONDecodeError:
- pass
-
- # 字符串格式: name=value; name2=value2
- cookies = []
- for item in cookies_str.split(';'):
- item = item.strip()
- if '=' in item:
- name, value = item.split('=', 1)
- cookies.append({
- 'name': name.strip(),
- 'value': value.strip(),
- 'domain': '',
- 'path': '/'
- })
- return cookies
-
- @staticmethod
- def cookies_to_string(cookies: list) -> str:
- """将 cookie 列表转换为字符串"""
- return '; '.join([f"{c['name']}={c['value']}" for c in cookies])
-
- async def init_browser(self, storage_state: str = None):
- """初始化浏览器"""
- playwright = await async_playwright().start()
- self.browser = await playwright.chromium.launch(headless=self.headless)
-
- if storage_state and os.path.exists(storage_state):
- self.context = await self.browser.new_context(storage_state=storage_state)
- else:
- self.context = await self.browser.new_context()
-
- self.page = await self.context.new_page()
- return self.page
-
- async def set_cookies(self, cookies: list):
- """设置 cookies"""
- if not self.context:
- raise Exception("Browser context not initialized")
-
- # 设置默认域名
- for cookie in cookies:
- if 'domain' not in cookie or not cookie['domain']:
- cookie['domain'] = self.cookie_domain
-
- await self.context.add_cookies(cookies)
-
- async def close_browser(self):
- """关闭浏览器"""
- if self.context:
- await self.context.close()
- if self.browser:
- await self.browser.close()
-
- async def save_cookies(self, file_path: str):
- """保存 cookies 到文件"""
- if self.context:
- await self.context.storage_state(path=file_path)
-
- async def wait_for_upload_complete(self, success_selector: str, timeout: int = 300):
- """等待上传完成"""
- if not self.page:
- raise Exception("Page not initialized")
-
- for _ in range(timeout // 3):
- try:
- count = await self.page.locator(success_selector).count()
- if count > 0:
- return True
- except:
- pass
- await asyncio.sleep(3)
- self.report_progress(30, "正在上传视频...")
-
- return False
-
- @abstractmethod
- async def publish(self, cookies: str, params: PublishParams) -> PublishResult:
- """
- 发布视频 - 子类必须实现
-
- Args:
- cookies: cookie 字符串或 JSON
- params: 发布参数
-
- Returns:
- PublishResult: 发布结果
- """
- pass
-
- async def get_works(self, cookies: str, page: int = 0, page_size: int = 20) -> WorksResult:
- """
- 获取作品列表 - 子类可覆盖实现
-
- Args:
- cookies: cookie 字符串或 JSON
- page: 页码(从0开始)
- page_size: 每页数量
-
- Returns:
- WorksResult: 作品列表结果
- """
- return WorksResult(
- success=False,
- platform=self.platform_name,
- error="该平台暂不支持获取作品列表"
- )
-
- async def get_comments(self, cookies: str, work_id: str, cursor: str = "") -> CommentsResult:
- """
- 获取作品评论 - 子类可覆盖实现
-
- Args:
- cookies: cookie 字符串或 JSON
- work_id: 作品ID
- cursor: 分页游标
-
- Returns:
- CommentsResult: 评论列表结果
- """
- return CommentsResult(
- success=False,
- platform=self.platform_name,
- work_id=work_id,
- error="该平台暂不支持获取评论"
- )
-
- async def run(self, cookies: str, params: PublishParams) -> PublishResult:
- """
- 运行发布任务
- 包装了 publish 方法,添加了异常处理和资源清理
- """
- try:
- return await self.publish(cookies, params)
- except Exception as e:
- import traceback
- traceback.print_exc()
- return PublishResult(
- success=False,
- platform=self.platform_name,
- error=str(e)
- )
- finally:
- await self.close_browser()
-
- async def run_get_works(self, cookies: str, page: int = 0, page_size: int = 20) -> WorksResult:
- """
- 运行获取作品任务
- """
- try:
- return await self.get_works(cookies, page, page_size)
- except Exception as e:
- import traceback
- traceback.print_exc()
- return WorksResult(
- success=False,
- platform=self.platform_name,
- error=str(e)
- )
- finally:
- await self.close_browser()
-
- async def run_get_comments(self, cookies: str, work_id: str, cursor: str = "") -> CommentsResult:
- """
- 运行获取评论任务
- """
- try:
- return await self.get_comments(cookies, work_id, cursor)
- except Exception as e:
- import traceback
- traceback.print_exc()
- return CommentsResult(
- success=False,
- platform=self.platform_name,
- work_id=work_id,
- error=str(e)
- )
- finally:
- await self.close_browser()
|