weixin.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290
  1. # -*- coding: utf-8 -*-
  2. """
  3. 微信视频号发布器
  4. 参考: matrix/tencent_uploader/main.py
  5. """
  6. import asyncio
  7. import os
  8. from datetime import datetime
  9. from .base import BasePublisher, PublishParams, PublishResult
  10. def format_short_title(origin_title: str) -> str:
  11. """
  12. 格式化短标题
  13. - 移除特殊字符
  14. - 长度限制在 6-16 字符
  15. """
  16. allowed_special_chars = "《》"":+?%°"
  17. filtered_chars = [
  18. char if char.isalnum() or char in allowed_special_chars
  19. else ' ' if char == ',' else ''
  20. for char in origin_title
  21. ]
  22. formatted_string = ''.join(filtered_chars)
  23. if len(formatted_string) > 16:
  24. formatted_string = formatted_string[:16]
  25. elif len(formatted_string) < 6:
  26. formatted_string += ' ' * (6 - len(formatted_string))
  27. return formatted_string
  28. class WeixinPublisher(BasePublisher):
  29. """
  30. 微信视频号发布器
  31. 使用 Playwright 自动化操作视频号创作者中心
  32. 注意: 需要使用 Chrome 浏览器,否则可能出现 H264 编码错误
  33. """
  34. platform_name = "weixin"
  35. login_url = "https://channels.weixin.qq.com/platform"
  36. publish_url = "https://channels.weixin.qq.com/platform/post/create"
  37. cookie_domain = ".weixin.qq.com"
  38. async def init_browser(self, storage_state: str = None):
  39. """初始化浏览器 - 使用 Chrome 浏览器"""
  40. from playwright.async_api import async_playwright
  41. playwright = await async_playwright().start()
  42. # 使用 Chrome 浏览器,避免 H264 编码问题
  43. self.browser = await playwright.chromium.launch(
  44. headless=self.headless,
  45. channel="chrome"
  46. )
  47. if storage_state and os.path.exists(storage_state):
  48. self.context = await self.browser.new_context(storage_state=storage_state)
  49. else:
  50. self.context = await self.browser.new_context()
  51. self.page = await self.context.new_page()
  52. return self.page
  53. async def set_schedule_time(self, publish_date: datetime):
  54. """设置定时发布"""
  55. if not self.page:
  56. return
  57. print(f"[{self.platform_name}] 设置定时发布...")
  58. # 点击定时选项
  59. label_element = self.page.locator("label").filter(has_text="定时").nth(1)
  60. await label_element.click()
  61. # 选择日期
  62. await self.page.click('input[placeholder="请选择发表时间"]')
  63. publish_month = f"{publish_date.month:02d}"
  64. current_month = f"{publish_month}月"
  65. # 检查月份
  66. page_month = await self.page.inner_text('span.weui-desktop-picker__panel__label:has-text("月")')
  67. if page_month != current_month:
  68. await self.page.click('button.weui-desktop-btn__icon__right')
  69. # 选择日期
  70. elements = await self.page.query_selector_all('table.weui-desktop-picker__table a')
  71. for element in elements:
  72. class_name = await element.evaluate('el => el.className')
  73. if 'weui-desktop-picker__disabled' in class_name:
  74. continue
  75. text = await element.inner_text()
  76. if text.strip() == str(publish_date.day):
  77. await element.click()
  78. break
  79. # 输入时间
  80. await self.page.click('input[placeholder="请选择时间"]')
  81. await self.page.keyboard.press("Control+KeyA")
  82. await self.page.keyboard.type(str(publish_date.hour))
  83. # 点击其他地方确认
  84. await self.page.locator("div.input-editor").click()
  85. async def handle_upload_error(self, video_path: str):
  86. """处理上传错误"""
  87. if not self.page:
  88. return
  89. print(f"[{self.platform_name}] 视频出错了,重新上传中...")
  90. await self.page.locator('div.media-status-content div.tag-inner:has-text("删除")').click()
  91. await self.page.get_by_role('button', name="删除", exact=True).click()
  92. file_input = self.page.locator('input[type="file"]')
  93. await file_input.set_input_files(video_path)
  94. async def add_title_tags(self, params: PublishParams):
  95. """添加标题和话题"""
  96. if not self.page:
  97. return
  98. await self.page.locator("div.input-editor").click()
  99. await self.page.keyboard.type(params.title)
  100. if params.tags:
  101. await self.page.keyboard.press("Enter")
  102. for tag in params.tags:
  103. await self.page.keyboard.type("#" + tag)
  104. await self.page.keyboard.press("Space")
  105. print(f"[{self.platform_name}] 成功添加标题和 {len(params.tags)} 个话题")
  106. async def add_short_title(self):
  107. """添加短标题"""
  108. if not self.page:
  109. return
  110. try:
  111. short_title_element = self.page.get_by_text("短标题", exact=True).locator("..").locator(
  112. "xpath=following-sibling::div").locator('span input[type="text"]')
  113. if await short_title_element.count():
  114. # 获取已有内容作为短标题
  115. pass
  116. except:
  117. pass
  118. async def upload_cover(self, cover_path: str):
  119. """上传封面图"""
  120. if not self.page or not cover_path or not os.path.exists(cover_path):
  121. return
  122. try:
  123. await asyncio.sleep(2)
  124. preview_btn_info = await self.page.locator(
  125. 'div.finder-tag-wrap.btn:has-text("更换封面")').get_attribute('class')
  126. if "disabled" not in preview_btn_info:
  127. await self.page.locator('div.finder-tag-wrap.btn:has-text("更换封面")').click()
  128. await self.page.locator('div.single-cover-uploader-wrap > div.wrap').hover()
  129. # 删除现有封面
  130. if await self.page.locator(".del-wrap > .svg-icon").count():
  131. await self.page.locator(".del-wrap > .svg-icon").click()
  132. # 上传新封面
  133. preview_div = self.page.locator("div.single-cover-uploader-wrap > div.wrap")
  134. async with self.page.expect_file_chooser() as fc_info:
  135. await preview_div.click()
  136. preview_chooser = await fc_info.value
  137. await preview_chooser.set_files(cover_path)
  138. await asyncio.sleep(2)
  139. await self.page.get_by_role("button", name="确定").click()
  140. await asyncio.sleep(1)
  141. await self.page.get_by_role("button", name="确认").click()
  142. print(f"[{self.platform_name}] 封面上传成功")
  143. except Exception as e:
  144. print(f"[{self.platform_name}] 封面上传失败: {e}")
  145. async def publish(self, cookies: str, params: PublishParams) -> PublishResult:
  146. """发布视频到视频号"""
  147. self.report_progress(5, "正在初始化浏览器...")
  148. # 初始化浏览器(使用 Chrome)
  149. await self.init_browser()
  150. # 解析并设置 cookies
  151. cookie_list = self.parse_cookies(cookies)
  152. await self.set_cookies(cookie_list)
  153. if not self.page:
  154. raise Exception("Page not initialized")
  155. # 检查视频文件
  156. if not os.path.exists(params.video_path):
  157. raise Exception(f"视频文件不存在: {params.video_path}")
  158. self.report_progress(10, "正在打开上传页面...")
  159. # 访问上传页面
  160. await self.page.goto(self.publish_url)
  161. await self.page.wait_for_url(self.publish_url, timeout=30000)
  162. self.report_progress(15, "正在选择视频文件...")
  163. # 点击上传区域
  164. upload_div = self.page.locator("div.upload-content")
  165. async with self.page.expect_file_chooser() as fc_info:
  166. await upload_div.click()
  167. file_chooser = await fc_info.value
  168. await file_chooser.set_files(params.video_path)
  169. self.report_progress(20, "正在填充标题和话题...")
  170. # 添加标题和话题
  171. await self.add_title_tags(params)
  172. self.report_progress(30, "等待视频上传完成...")
  173. # 等待上传完成
  174. for _ in range(120):
  175. try:
  176. button_info = await self.page.get_by_role("button", name="发表").get_attribute('class')
  177. if "weui-desktop-btn_disabled" not in button_info:
  178. print(f"[{self.platform_name}] 视频上传完毕")
  179. # 上传封面
  180. self.report_progress(50, "正在上传封面...")
  181. await self.upload_cover(params.cover_path)
  182. break
  183. else:
  184. # 检查上传错误
  185. if await self.page.locator('div.status-msg.error').count():
  186. if await self.page.locator('div.media-status-content div.tag-inner:has-text("删除")').count():
  187. await self.handle_upload_error(params.video_path)
  188. await asyncio.sleep(3)
  189. except:
  190. await asyncio.sleep(3)
  191. self.report_progress(60, "处理视频设置...")
  192. # 添加短标题
  193. try:
  194. short_title_el = self.page.get_by_text("短标题", exact=True).locator("..").locator(
  195. "xpath=following-sibling::div").locator('span input[type="text"]')
  196. if await short_title_el.count():
  197. short_title = format_short_title(params.title)
  198. await short_title_el.fill(short_title)
  199. except:
  200. pass
  201. # 定时发布
  202. if params.publish_date:
  203. self.report_progress(70, "设置定时发布...")
  204. await self.set_schedule_time(params.publish_date)
  205. self.report_progress(80, "正在发布...")
  206. # 点击发布
  207. for _ in range(30):
  208. try:
  209. publish_btn = self.page.locator('div.form-btns button:has-text("发表")')
  210. if await publish_btn.count():
  211. await publish_btn.click()
  212. await self.page.wait_for_url(
  213. "https://channels.weixin.qq.com/platform/post/list",
  214. timeout=10000
  215. )
  216. self.report_progress(100, "发布成功")
  217. return PublishResult(
  218. success=True,
  219. platform=self.platform_name,
  220. message="发布成功"
  221. )
  222. except:
  223. current_url = self.page.url
  224. if "post/list" in current_url:
  225. self.report_progress(100, "发布成功")
  226. return PublishResult(
  227. success=True,
  228. platform=self.platform_name,
  229. message="发布成功"
  230. )
  231. await asyncio.sleep(1)
  232. raise Exception("发布超时")