# -*- coding: utf-8 -*- """ 平台发布基类 提供通用的发布接口和工具方法 """ import asyncio import json import os from abc import ABC, abstractmethod from dataclasses import dataclass, field from datetime import datetime from typing import List, Optional, Callable from playwright.async_api import async_playwright, Browser, BrowserContext, Page @dataclass class PublishParams: """发布参数""" title: str video_path: str description: str = "" cover_path: Optional[str] = None tags: List[str] = field(default_factory=list) publish_date: Optional[datetime] = None location: str = "重庆市" def __post_init__(self): if not self.description: self.description = self.title @dataclass class PublishResult: """发布结果""" success: bool platform: str video_id: str = "" video_url: str = "" message: str = "" error: str = "" class BasePublisher(ABC): """ 平台发布基类 所有平台发布器都需要继承此类 """ platform_name: str = "base" login_url: str = "" publish_url: str = "" cookie_domain: str = "" def __init__(self, headless: bool = True): self.headless = headless self.browser: Optional[Browser] = None self.context: Optional[BrowserContext] = None self.page: Optional[Page] = None self.on_progress: Optional[Callable[[int, str], None]] = None def set_progress_callback(self, callback: Callable[[int, str], None]): """设置进度回调""" self.on_progress = callback def report_progress(self, progress: int, message: str): """报告进度""" print(f"[{self.platform_name}] [{progress}%] {message}") if self.on_progress: self.on_progress(progress, message) @staticmethod def parse_cookies(cookies_str: str) -> list: """解析 cookie 字符串为列表""" try: cookies = json.loads(cookies_str) if isinstance(cookies, list): return cookies except json.JSONDecodeError: pass # 字符串格式: name=value; name2=value2 cookies = [] for item in cookies_str.split(';'): item = item.strip() if '=' in item: name, value = item.split('=', 1) cookies.append({ 'name': name.strip(), 'value': value.strip(), 'domain': '', 'path': '/' }) return cookies @staticmethod def cookies_to_string(cookies: list) -> str: """将 cookie 列表转换为字符串""" return '; '.join([f"{c['name']}={c['value']}" for c in cookies]) async def init_browser(self, storage_state: str = None): """初始化浏览器""" playwright = await async_playwright().start() self.browser = await playwright.chromium.launch(headless=self.headless) if storage_state and os.path.exists(storage_state): self.context = await self.browser.new_context(storage_state=storage_state) else: self.context = await self.browser.new_context() self.page = await self.context.new_page() return self.page async def set_cookies(self, cookies: list): """设置 cookies""" if not self.context: raise Exception("Browser context not initialized") # 设置默认域名 for cookie in cookies: if 'domain' not in cookie or not cookie['domain']: cookie['domain'] = self.cookie_domain await self.context.add_cookies(cookies) async def close_browser(self): """关闭浏览器""" if self.context: await self.context.close() if self.browser: await self.browser.close() async def save_cookies(self, file_path: str): """保存 cookies 到文件""" if self.context: await self.context.storage_state(path=file_path) async def wait_for_upload_complete(self, success_selector: str, timeout: int = 300): """等待上传完成""" if not self.page: raise Exception("Page not initialized") for _ in range(timeout // 3): try: count = await self.page.locator(success_selector).count() if count > 0: return True except: pass await asyncio.sleep(3) self.report_progress(30, "正在上传视频...") return False @abstractmethod async def publish(self, cookies: str, params: PublishParams) -> PublishResult: """ 发布视频 - 子类必须实现 Args: cookies: cookie 字符串或 JSON params: 发布参数 Returns: PublishResult: 发布结果 """ pass async def run(self, cookies: str, params: PublishParams) -> PublishResult: """ 运行发布任务 包装了 publish 方法,添加了异常处理和资源清理 """ try: return await self.publish(cookies, params) except Exception as e: import traceback traceback.print_exc() return PublishResult( success=False, platform=self.platform_name, error=str(e) ) finally: await self.close_browser()