| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290 |
- # -*- coding: utf-8 -*-
- """
- 微信视频号发布器
- 参考: matrix/tencent_uploader/main.py
- """
- import asyncio
- import os
- from datetime import datetime
- from .base import BasePublisher, PublishParams, PublishResult
- def format_short_title(origin_title: str) -> str:
- """
- 格式化短标题
- - 移除特殊字符
- - 长度限制在 6-16 字符
- """
- allowed_special_chars = "《》"":+?%°"
-
- filtered_chars = [
- char if char.isalnum() or char in allowed_special_chars
- else ' ' if char == ',' else ''
- for char in origin_title
- ]
- formatted_string = ''.join(filtered_chars)
-
- if len(formatted_string) > 16:
- formatted_string = formatted_string[:16]
- elif len(formatted_string) < 6:
- formatted_string += ' ' * (6 - len(formatted_string))
-
- return formatted_string
- class WeixinPublisher(BasePublisher):
- """
- 微信视频号发布器
- 使用 Playwright 自动化操作视频号创作者中心
- 注意: 需要使用 Chrome 浏览器,否则可能出现 H264 编码错误
- """
-
- platform_name = "weixin"
- login_url = "https://channels.weixin.qq.com/platform"
- publish_url = "https://channels.weixin.qq.com/platform/post/create"
- cookie_domain = ".weixin.qq.com"
-
- async def init_browser(self, storage_state: str = None):
- """初始化浏览器 - 使用 Chrome 浏览器"""
- from playwright.async_api import async_playwright
-
- playwright = await async_playwright().start()
- # 使用 Chrome 浏览器,避免 H264 编码问题
- self.browser = await playwright.chromium.launch(
- headless=self.headless,
- channel="chrome"
- )
-
- if storage_state and os.path.exists(storage_state):
- self.context = await self.browser.new_context(storage_state=storage_state)
- else:
- self.context = await self.browser.new_context()
-
- self.page = await self.context.new_page()
- return self.page
-
- async def set_schedule_time(self, publish_date: datetime):
- """设置定时发布"""
- if not self.page:
- return
-
- print(f"[{self.platform_name}] 设置定时发布...")
-
- # 点击定时选项
- label_element = self.page.locator("label").filter(has_text="定时").nth(1)
- await label_element.click()
-
- # 选择日期
- await self.page.click('input[placeholder="请选择发表时间"]')
-
- publish_month = f"{publish_date.month:02d}"
- current_month = f"{publish_month}月"
-
- # 检查月份
- page_month = await self.page.inner_text('span.weui-desktop-picker__panel__label:has-text("月")')
- if page_month != current_month:
- await self.page.click('button.weui-desktop-btn__icon__right')
-
- # 选择日期
- elements = await self.page.query_selector_all('table.weui-desktop-picker__table a')
- for element in elements:
- class_name = await element.evaluate('el => el.className')
- if 'weui-desktop-picker__disabled' in class_name:
- continue
- text = await element.inner_text()
- if text.strip() == str(publish_date.day):
- await element.click()
- break
-
- # 输入时间
- await self.page.click('input[placeholder="请选择时间"]')
- await self.page.keyboard.press("Control+KeyA")
- await self.page.keyboard.type(str(publish_date.hour))
-
- # 点击其他地方确认
- await self.page.locator("div.input-editor").click()
-
- async def handle_upload_error(self, video_path: str):
- """处理上传错误"""
- if not self.page:
- return
-
- print(f"[{self.platform_name}] 视频出错了,重新上传中...")
- await self.page.locator('div.media-status-content div.tag-inner:has-text("删除")').click()
- await self.page.get_by_role('button', name="删除", exact=True).click()
- file_input = self.page.locator('input[type="file"]')
- await file_input.set_input_files(video_path)
-
- async def add_title_tags(self, params: PublishParams):
- """添加标题和话题"""
- if not self.page:
- return
-
- await self.page.locator("div.input-editor").click()
- await self.page.keyboard.type(params.title)
-
- if params.tags:
- await self.page.keyboard.press("Enter")
- for tag in params.tags:
- await self.page.keyboard.type("#" + tag)
- await self.page.keyboard.press("Space")
-
- print(f"[{self.platform_name}] 成功添加标题和 {len(params.tags)} 个话题")
-
- async def add_short_title(self):
- """添加短标题"""
- if not self.page:
- return
-
- try:
- short_title_element = self.page.get_by_text("短标题", exact=True).locator("..").locator(
- "xpath=following-sibling::div").locator('span input[type="text"]')
- if await short_title_element.count():
- # 获取已有内容作为短标题
- pass
- except:
- pass
-
- async def upload_cover(self, cover_path: str):
- """上传封面图"""
- if not self.page or not cover_path or not os.path.exists(cover_path):
- return
-
- try:
- await asyncio.sleep(2)
- preview_btn_info = await self.page.locator(
- 'div.finder-tag-wrap.btn:has-text("更换封面")').get_attribute('class')
-
- if "disabled" not in preview_btn_info:
- await self.page.locator('div.finder-tag-wrap.btn:has-text("更换封面")').click()
- await self.page.locator('div.single-cover-uploader-wrap > div.wrap').hover()
-
- # 删除现有封面
- if await self.page.locator(".del-wrap > .svg-icon").count():
- await self.page.locator(".del-wrap > .svg-icon").click()
-
- # 上传新封面
- preview_div = self.page.locator("div.single-cover-uploader-wrap > div.wrap")
- async with self.page.expect_file_chooser() as fc_info:
- await preview_div.click()
- preview_chooser = await fc_info.value
- await preview_chooser.set_files(cover_path)
-
- await asyncio.sleep(2)
- await self.page.get_by_role("button", name="确定").click()
- await asyncio.sleep(1)
- await self.page.get_by_role("button", name="确认").click()
-
- print(f"[{self.platform_name}] 封面上传成功")
- except Exception as e:
- print(f"[{self.platform_name}] 封面上传失败: {e}")
-
- async def publish(self, cookies: str, params: PublishParams) -> PublishResult:
- """发布视频到视频号"""
- self.report_progress(5, "正在初始化浏览器...")
-
- # 初始化浏览器(使用 Chrome)
- await self.init_browser()
-
- # 解析并设置 cookies
- cookie_list = self.parse_cookies(cookies)
- await self.set_cookies(cookie_list)
-
- if not self.page:
- raise Exception("Page not initialized")
-
- # 检查视频文件
- if not os.path.exists(params.video_path):
- raise Exception(f"视频文件不存在: {params.video_path}")
-
- self.report_progress(10, "正在打开上传页面...")
-
- # 访问上传页面
- await self.page.goto(self.publish_url)
- await self.page.wait_for_url(self.publish_url, timeout=30000)
-
- self.report_progress(15, "正在选择视频文件...")
-
- # 点击上传区域
- upload_div = self.page.locator("div.upload-content")
- async with self.page.expect_file_chooser() as fc_info:
- await upload_div.click()
- file_chooser = await fc_info.value
- await file_chooser.set_files(params.video_path)
-
- self.report_progress(20, "正在填充标题和话题...")
-
- # 添加标题和话题
- await self.add_title_tags(params)
-
- self.report_progress(30, "等待视频上传完成...")
-
- # 等待上传完成
- for _ in range(120):
- try:
- button_info = await self.page.get_by_role("button", name="发表").get_attribute('class')
- if "weui-desktop-btn_disabled" not in button_info:
- print(f"[{self.platform_name}] 视频上传完毕")
-
- # 上传封面
- self.report_progress(50, "正在上传封面...")
- await self.upload_cover(params.cover_path)
- break
- else:
- # 检查上传错误
- if await self.page.locator('div.status-msg.error').count():
- if await self.page.locator('div.media-status-content div.tag-inner:has-text("删除")').count():
- await self.handle_upload_error(params.video_path)
-
- await asyncio.sleep(3)
- except:
- await asyncio.sleep(3)
-
- self.report_progress(60, "处理视频设置...")
-
- # 添加短标题
- try:
- short_title_el = self.page.get_by_text("短标题", exact=True).locator("..").locator(
- "xpath=following-sibling::div").locator('span input[type="text"]')
- if await short_title_el.count():
- short_title = format_short_title(params.title)
- await short_title_el.fill(short_title)
- except:
- pass
-
- # 定时发布
- if params.publish_date:
- self.report_progress(70, "设置定时发布...")
- await self.set_schedule_time(params.publish_date)
-
- self.report_progress(80, "正在发布...")
-
- # 点击发布
- for _ in range(30):
- try:
- publish_btn = self.page.locator('div.form-btns button:has-text("发表")')
- if await publish_btn.count():
- await publish_btn.click()
- await self.page.wait_for_url(
- "https://channels.weixin.qq.com/platform/post/list",
- timeout=10000
- )
- self.report_progress(100, "发布成功")
- return PublishResult(
- success=True,
- platform=self.platform_name,
- message="发布成功"
- )
- except:
- current_url = self.page.url
- if "post/list" in current_url:
- self.report_progress(100, "发布成功")
- return PublishResult(
- success=True,
- platform=self.platform_name,
- message="发布成功"
- )
- await asyncio.sleep(1)
-
- raise Exception("发布超时")
|