| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185 |
- # -*- coding: utf-8 -*-
- """
- 平台发布基类
- 提供通用的发布接口和工具方法
- """
- import asyncio
- import json
- import os
- from abc import ABC, abstractmethod
- from dataclasses import dataclass, field
- from datetime import datetime
- from typing import List, Optional, Callable
- from playwright.async_api import async_playwright, Browser, BrowserContext, Page
- @dataclass
- class PublishParams:
- """发布参数"""
- title: str
- video_path: str
- description: str = ""
- cover_path: Optional[str] = None
- tags: List[str] = field(default_factory=list)
- publish_date: Optional[datetime] = None
- location: str = "重庆市"
-
- def __post_init__(self):
- if not self.description:
- self.description = self.title
- @dataclass
- class PublishResult:
- """发布结果"""
- success: bool
- platform: str
- video_id: str = ""
- video_url: str = ""
- message: str = ""
- error: str = ""
- class BasePublisher(ABC):
- """
- 平台发布基类
- 所有平台发布器都需要继承此类
- """
-
- platform_name: str = "base"
- login_url: str = ""
- publish_url: str = ""
- cookie_domain: str = ""
-
- def __init__(self, headless: bool = True):
- self.headless = headless
- self.browser: Optional[Browser] = None
- self.context: Optional[BrowserContext] = None
- self.page: Optional[Page] = None
- self.on_progress: Optional[Callable[[int, str], None]] = None
-
- def set_progress_callback(self, callback: Callable[[int, str], None]):
- """设置进度回调"""
- self.on_progress = callback
-
- def report_progress(self, progress: int, message: str):
- """报告进度"""
- print(f"[{self.platform_name}] [{progress}%] {message}")
- if self.on_progress:
- self.on_progress(progress, message)
-
- @staticmethod
- def parse_cookies(cookies_str: str) -> list:
- """解析 cookie 字符串为列表"""
- try:
- cookies = json.loads(cookies_str)
- if isinstance(cookies, list):
- return cookies
- except json.JSONDecodeError:
- pass
-
- # 字符串格式: name=value; name2=value2
- cookies = []
- for item in cookies_str.split(';'):
- item = item.strip()
- if '=' in item:
- name, value = item.split('=', 1)
- cookies.append({
- 'name': name.strip(),
- 'value': value.strip(),
- 'domain': '',
- 'path': '/'
- })
- return cookies
-
- @staticmethod
- def cookies_to_string(cookies: list) -> str:
- """将 cookie 列表转换为字符串"""
- return '; '.join([f"{c['name']}={c['value']}" for c in cookies])
-
- async def init_browser(self, storage_state: str = None):
- """初始化浏览器"""
- playwright = await async_playwright().start()
- self.browser = await playwright.chromium.launch(headless=self.headless)
-
- if storage_state and os.path.exists(storage_state):
- self.context = await self.browser.new_context(storage_state=storage_state)
- else:
- self.context = await self.browser.new_context()
-
- self.page = await self.context.new_page()
- return self.page
-
- async def set_cookies(self, cookies: list):
- """设置 cookies"""
- if not self.context:
- raise Exception("Browser context not initialized")
-
- # 设置默认域名
- for cookie in cookies:
- if 'domain' not in cookie or not cookie['domain']:
- cookie['domain'] = self.cookie_domain
-
- await self.context.add_cookies(cookies)
-
- async def close_browser(self):
- """关闭浏览器"""
- if self.context:
- await self.context.close()
- if self.browser:
- await self.browser.close()
-
- async def save_cookies(self, file_path: str):
- """保存 cookies 到文件"""
- if self.context:
- await self.context.storage_state(path=file_path)
-
- async def wait_for_upload_complete(self, success_selector: str, timeout: int = 300):
- """等待上传完成"""
- if not self.page:
- raise Exception("Page not initialized")
-
- for _ in range(timeout // 3):
- try:
- count = await self.page.locator(success_selector).count()
- if count > 0:
- return True
- except:
- pass
- await asyncio.sleep(3)
- self.report_progress(30, "正在上传视频...")
-
- return False
-
- @abstractmethod
- async def publish(self, cookies: str, params: PublishParams) -> PublishResult:
- """
- 发布视频 - 子类必须实现
-
- Args:
- cookies: cookie 字符串或 JSON
- params: 发布参数
-
- Returns:
- PublishResult: 发布结果
- """
- pass
-
- async def run(self, cookies: str, params: PublishParams) -> PublishResult:
- """
- 运行发布任务
- 包装了 publish 方法,添加了异常处理和资源清理
- """
- try:
- return await self.publish(cookies, params)
- except Exception as e:
- import traceback
- traceback.print_exc()
- return PublishResult(
- success=False,
- platform=self.platform_name,
- error=str(e)
- )
- finally:
- await self.close_browser()
|