weixin.py 26 KB


  1. # -*- coding: utf-8 -*-
  2. """
  3. 微信视频号发布器
  4. 参考: matrix/tencent_uploader/main.py
  5. """
  6. import asyncio
  7. import os
  8. from datetime import datetime
  9. from typing import List
  10. from .base import (
  11. BasePublisher, PublishParams, PublishResult,
  12. WorkItem, WorksResult, CommentItem, CommentsResult
  13. )
  14. def format_short_title(origin_title: str) -> str:
  15. """
  16. 格式化短标题
  17. - 移除特殊字符
  18. - 长度限制在 6-16 字符
  19. """
  20. allowed_special_chars = "《》"":+?%°"
  21. filtered_chars = [
  22. char if char.isalnum() or char in allowed_special_chars
  23. else ' ' if char == ',' else ''
  24. for char in origin_title
  25. ]
  26. formatted_string = ''.join(filtered_chars)
  27. if len(formatted_string) > 16:
  28. formatted_string = formatted_string[:16]
  29. elif len(formatted_string) < 6:
  30. formatted_string += ' ' * (6 - len(formatted_string))
  31. return formatted_string
  32. class WeixinPublisher(BasePublisher):
  33. """
  34. 微信视频号发布器
  35. 使用 Playwright 自动化操作视频号创作者中心
  36. 注意: 需要使用 Chrome 浏览器,否则可能出现 H264 编码错误
  37. """
  38. platform_name = "weixin"
  39. login_url = "https://channels.weixin.qq.com/platform"
  40. publish_url = "https://channels.weixin.qq.com/platform/post/create"
  41. cookie_domain = ".weixin.qq.com"
  42. async def init_browser(self, storage_state: str = None):
  43. """初始化浏览器 - 使用 Chrome 浏览器"""
  44. from playwright.async_api import async_playwright
  45. playwright = await async_playwright().start()
  46. # 使用 Chrome 浏览器,避免 H264 编码问题
  47. self.browser = await playwright.chromium.launch(
  48. headless=self.headless,
  49. channel="chrome"
  50. )
  51. if storage_state and os.path.exists(storage_state):
  52. self.context = await self.browser.new_context(storage_state=storage_state)
  53. else:
  54. self.context = await self.browser.new_context()
  55. self.page = await self.context.new_page()
  56. return self.page
  57. async def set_schedule_time(self, publish_date: datetime):
  58. """设置定时发布"""
  59. if not self.page:
  60. return
  61. print(f"[{self.platform_name}] 设置定时发布...")
  62. # 点击定时选项
  63. label_element = self.page.locator("label").filter(has_text="定时").nth(1)
  64. await label_element.click()
  65. # 选择日期
  66. await self.page.click('input[placeholder="请选择发表时间"]')
  67. publish_month = f"{publish_date.month:02d}"
  68. current_month = f"{publish_month}月"
  69. # 检查月份
  70. page_month = await self.page.inner_text('span.weui-desktop-picker__panel__label:has-text("月")')
  71. if page_month != current_month:
  72. await self.page.click('button.weui-desktop-btn__icon__right')
  73. # 选择日期
  74. elements = await self.page.query_selector_all('table.weui-desktop-picker__table a')
  75. for element in elements:
  76. class_name = await element.evaluate('el => el.className')
  77. if 'weui-desktop-picker__disabled' in class_name:
  78. continue
  79. text = await element.inner_text()
  80. if text.strip() == str(publish_date.day):
  81. await element.click()
  82. break
  83. # 输入时间
  84. await self.page.click('input[placeholder="请选择时间"]')
  85. await self.page.keyboard.press("Control+KeyA")
  86. await self.page.keyboard.type(str(publish_date.hour))
  87. # 点击其他地方确认
  88. await self.page.locator("div.input-editor").click()
  89. async def handle_upload_error(self, video_path: str):
  90. """处理上传错误"""
  91. if not self.page:
  92. return
  93. print(f"[{self.platform_name}] 视频出错了,重新上传中...")
  94. await self.page.locator('div.media-status-content div.tag-inner:has-text("删除")').click()
  95. await self.page.get_by_role('button', name="删除", exact=True).click()
  96. file_input = self.page.locator('input[type="file"]')
  97. await file_input.set_input_files(video_path)
  98. async def add_title_tags(self, params: PublishParams):
  99. """添加标题和话题"""
  100. if not self.page:
  101. return
  102. await self.page.locator("div.input-editor").click()
  103. await self.page.keyboard.type(params.title)
  104. if params.tags:
  105. await self.page.keyboard.press("Enter")
  106. for tag in params.tags:
  107. await self.page.keyboard.type("#" + tag)
  108. await self.page.keyboard.press("Space")
  109. print(f"[{self.platform_name}] 成功添加标题和 {len(params.tags)} 个话题")
  110. async def add_short_title(self):
  111. """添加短标题"""
  112. if not self.page:
  113. return
  114. try:
  115. short_title_element = self.page.get_by_text("短标题", exact=True).locator("..").locator(
  116. "xpath=following-sibling::div").locator('span input[type="text"]')
  117. if await short_title_element.count():
  118. # 获取已有内容作为短标题
  119. pass
  120. except:
  121. pass
  122. async def upload_cover(self, cover_path: str):
  123. """上传封面图"""
  124. if not self.page or not cover_path or not os.path.exists(cover_path):
  125. return
  126. try:
  127. await asyncio.sleep(2)
  128. preview_btn_info = await self.page.locator(
  129. 'div.finder-tag-wrap.btn:has-text("更换封面")').get_attribute('class')
  130. if "disabled" not in preview_btn_info:
  131. await self.page.locator('div.finder-tag-wrap.btn:has-text("更换封面")').click()
  132. await self.page.locator('div.single-cover-uploader-wrap > div.wrap').hover()
  133. # 删除现有封面
  134. if await self.page.locator(".del-wrap > .svg-icon").count():
  135. await self.page.locator(".del-wrap > .svg-icon").click()
  136. # 上传新封面
  137. preview_div = self.page.locator("div.single-cover-uploader-wrap > div.wrap")
  138. async with self.page.expect_file_chooser() as fc_info:
  139. await preview_div.click()
  140. preview_chooser = await fc_info.value
  141. await preview_chooser.set_files(cover_path)
  142. await asyncio.sleep(2)
  143. await self.page.get_by_role("button", name="确定").click()
  144. await asyncio.sleep(1)
  145. await self.page.get_by_role("button", name="确认").click()
  146. print(f"[{self.platform_name}] 封面上传成功")
  147. except Exception as e:
  148. print(f"[{self.platform_name}] 封面上传失败: {e}")
  149. async def check_captcha(self) -> dict:
  150. """检查页面是否需要验证码"""
  151. if not self.page:
  152. return {'need_captcha': False, 'captcha_type': ''}
  153. try:
  154. # 检查各种验证码
  155. captcha_selectors = [
  156. 'text="请输入验证码"',
  157. 'text="滑动验证"',
  158. '[class*="captcha"]',
  159. '[class*="verify"]',
  160. ]
  161. for selector in captcha_selectors:
  162. try:
  163. if await self.page.locator(selector).count() > 0:
  164. print(f"[{self.platform_name}] 检测到验证码: {selector}")
  165. return {'need_captcha': True, 'captcha_type': 'image'}
  166. except:
  167. pass
  168. # 检查登录弹窗
  169. login_selectors = [
  170. 'text="请登录"',
  171. 'text="扫码登录"',
  172. '[class*="login-dialog"]',
  173. ]
  174. for selector in login_selectors:
  175. try:
  176. if await self.page.locator(selector).count() > 0:
  177. print(f"[{self.platform_name}] 检测到需要登录: {selector}")
  178. return {'need_captcha': True, 'captcha_type': 'login'}
  179. except:
  180. pass
  181. except Exception as e:
  182. print(f"[{self.platform_name}] 验证码检测异常: {e}")
  183. return {'need_captcha': False, 'captcha_type': ''}
  184. async def publish(self, cookies: str, params: PublishParams) -> PublishResult:
  185. """发布视频到视频号"""
  186. print(f"\n{'='*60}")
  187. print(f"[{self.platform_name}] 开始发布视频")
  188. print(f"[{self.platform_name}] 视频路径: {params.video_path}")
  189. print(f"[{self.platform_name}] 标题: {params.title}")
  190. print(f"[{self.platform_name}] Headless: {self.headless}")
  191. print(f"{'='*60}")
  192. self.report_progress(5, "正在初始化浏览器...")
  193. # 初始化浏览器(使用 Chrome)
  194. await self.init_browser()
  195. print(f"[{self.platform_name}] 浏览器初始化完成")
  196. # 解析并设置 cookies
  197. cookie_list = self.parse_cookies(cookies)
  198. print(f"[{self.platform_name}] 解析到 {len(cookie_list)} 个 cookies")
  199. await self.set_cookies(cookie_list)
  200. if not self.page:
  201. raise Exception("Page not initialized")
  202. # 检查视频文件
  203. if not os.path.exists(params.video_path):
  204. raise Exception(f"视频文件不存在: {params.video_path}")
  205. print(f"[{self.platform_name}] 视频文件存在,大小: {os.path.getsize(params.video_path)} bytes")
  206. self.report_progress(10, "正在打开上传页面...")
  207. # 访问上传页面
  208. await self.page.goto(self.publish_url, wait_until="domcontentloaded", timeout=60000)
  209. await asyncio.sleep(3)
  210. # 检查是否跳转到登录页
  211. current_url = self.page.url
  212. print(f"[{self.platform_name}] 当前页面: {current_url}")
  213. if "login" in current_url:
  214. screenshot_base64 = await self.capture_screenshot()
  215. return PublishResult(
  216. success=False,
  217. platform=self.platform_name,
  218. error="Cookie 已过期,需要重新登录",
  219. need_captcha=True,
  220. captcha_type='login',
  221. screenshot_base64=screenshot_base64,
  222. page_url=current_url,
  223. status='need_captcha'
  224. )
  225. # 检查验证码
  226. captcha_result = await self.check_captcha()
  227. if captcha_result['need_captcha']:
  228. screenshot_base64 = await self.capture_screenshot()
  229. return PublishResult(
  230. success=False,
  231. platform=self.platform_name,
  232. error=f"需要{captcha_result['captcha_type']}验证码",
  233. need_captcha=True,
  234. captcha_type=captcha_result['captcha_type'],
  235. screenshot_base64=screenshot_base64,
  236. page_url=current_url,
  237. status='need_captcha'
  238. )
  239. self.report_progress(15, "正在选择视频文件...")
  240. # 上传视频 - 尝试多种方式
  241. upload_success = False
  242. # 方法1: 直接通过 file input 上传
  243. try:
  244. file_inputs = await self.page.query_selector_all('input[type="file"]')
  245. print(f"[{self.platform_name}] 找到 {len(file_inputs)} 个文件输入")
  246. for file_input in file_inputs:
  247. try:
  248. await file_input.set_input_files(params.video_path)
  249. upload_success = True
  250. print(f"[{self.platform_name}] 通过 file input 上传成功")
  251. break
  252. except Exception as e:
  253. print(f"[{self.platform_name}] file input 上传失败: {e}")
  254. except Exception as e:
  255. print(f"[{self.platform_name}] 查找 file input 失败: {e}")
  256. # 方法2: 点击上传区域
  257. if not upload_success:
  258. upload_selectors = [
  259. 'div.upload-content',
  260. 'div[class*="upload"]',
  261. 'div[class*="drag-upload"]',
  262. 'div.add-wrap',
  263. 'div:has-text("上传视频")',
  264. '[class*="uploader"]',
  265. ]
  266. for selector in upload_selectors:
  267. if upload_success:
  268. break
  269. try:
  270. upload_area = self.page.locator(selector).first
  271. if await upload_area.count() > 0:
  272. print(f"[{self.platform_name}] 尝试点击上传区域: {selector}")
  273. async with self.page.expect_file_chooser(timeout=10000) as fc_info:
  274. await upload_area.click()
  275. file_chooser = await fc_info.value
  276. await file_chooser.set_files(params.video_path)
  277. upload_success = True
  278. print(f"[{self.platform_name}] 通过点击上传区域成功")
  279. break
  280. except Exception as e:
  281. print(f"[{self.platform_name}] 选择器 {selector} 失败: {e}")
  282. if not upload_success:
  283. screenshot_base64 = await self.capture_screenshot()
  284. return PublishResult(
  285. success=False,
  286. platform=self.platform_name,
  287. error="未找到上传入口",
  288. screenshot_base64=screenshot_base64,
  289. page_url=await self.get_page_url(),
  290. status='failed'
  291. )
  292. self.report_progress(20, "正在填充标题和话题...")
  293. # 添加标题和话题
  294. await self.add_title_tags(params)
  295. self.report_progress(30, "等待视频上传完成...")
  296. # 等待上传完成
  297. for _ in range(120):
  298. try:
  299. button_info = await self.page.get_by_role("button", name="发表").get_attribute('class')
  300. if "weui-desktop-btn_disabled" not in button_info:
  301. print(f"[{self.platform_name}] 视频上传完毕")
  302. # 上传封面
  303. self.report_progress(50, "正在上传封面...")
  304. await self.upload_cover(params.cover_path)
  305. break
  306. else:
  307. # 检查上传错误
  308. if await self.page.locator('div.status-msg.error').count():
  309. if await self.page.locator('div.media-status-content div.tag-inner:has-text("删除")').count():
  310. await self.handle_upload_error(params.video_path)
  311. await asyncio.sleep(3)
  312. except:
  313. await asyncio.sleep(3)
  314. self.report_progress(60, "处理视频设置...")
  315. # 添加短标题
  316. try:
  317. short_title_el = self.page.get_by_text("短标题", exact=True).locator("..").locator(
  318. "xpath=following-sibling::div").locator('span input[type="text"]')
  319. if await short_title_el.count():
  320. short_title = format_short_title(params.title)
  321. await short_title_el.fill(short_title)
  322. except:
  323. pass
  324. # 定时发布
  325. if params.publish_date:
  326. self.report_progress(70, "设置定时发布...")
  327. await self.set_schedule_time(params.publish_date)
  328. self.report_progress(80, "正在发布...")
  329. # 点击发布
  330. for _ in range(30):
  331. try:
  332. publish_btn = self.page.locator('div.form-btns button:has-text("发表")')
  333. if await publish_btn.count():
  334. await publish_btn.click()
  335. await self.page.wait_for_url(
  336. "https://channels.weixin.qq.com/platform/post/list",
  337. timeout=10000
  338. )
  339. self.report_progress(100, "发布成功")
  340. return PublishResult(
  341. success=True,
  342. platform=self.platform_name,
  343. message="发布成功"
  344. )
  345. except:
  346. current_url = self.page.url
  347. if "post/list" in current_url:
  348. self.report_progress(100, "发布成功")
  349. return PublishResult(
  350. success=True,
  351. platform=self.platform_name,
  352. message="发布成功"
  353. )
  354. await asyncio.sleep(1)
  355. raise Exception("发布超时")
  356. async def get_works(self, cookies: str, page: int = 0, page_size: int = 20) -> WorksResult:
  357. """获取视频号作品列表"""
  358. print(f"\n{'='*60}")
  359. print(f"[{self.platform_name}] 获取作品列表")
  360. print(f"[{self.platform_name}] page={page}, page_size={page_size}")
  361. print(f"{'='*60}")
  362. works: List[WorkItem] = []
  363. total = 0
  364. has_more = False
  365. try:
  366. await self.init_browser()
  367. cookie_list = self.parse_cookies(cookies)
  368. await self.set_cookies(cookie_list)
  369. if not self.page:
  370. raise Exception("Page not initialized")
  371. # 访问视频号创作者中心
  372. await self.page.goto("https://channels.weixin.qq.com/platform/post/list")
  373. await asyncio.sleep(5)
  374. # 检查登录状态
  375. current_url = self.page.url
  376. if "login" in current_url:
  377. raise Exception("Cookie 已过期,请重新登录")
  378. # 视频号使用页面爬取方式获取作品列表
  379. # 等待作品列表加载
  380. await self.page.wait_for_selector('div.post-feed-wrap', timeout=10000)
  381. # 获取所有作品项
  382. post_items = self.page.locator('div.post-feed-item')
  383. item_count = await post_items.count()
  384. print(f"[{self.platform_name}] 找到 {item_count} 个作品项")
  385. for i in range(min(item_count, page_size)):
  386. try:
  387. item = post_items.nth(i)
  388. # 获取封面
  389. cover_el = item.locator('div.cover-wrap img').first
  390. cover_url = ''
  391. if await cover_el.count() > 0:
  392. cover_url = await cover_el.get_attribute('src') or ''
  393. # 获取标题
  394. title_el = item.locator('div.content').first
  395. title = ''
  396. if await title_el.count() > 0:
  397. title = await title_el.text_content() or ''
  398. title = title.strip()[:50]
  399. # 获取统计数据
  400. stats_el = item.locator('div.post-data')
  401. play_count = 0
  402. like_count = 0
  403. comment_count = 0
  404. if await stats_el.count() > 0:
  405. stats_text = await stats_el.text_content() or ''
  406. # 解析统计数据(格式可能是: 播放 100 点赞 50 评论 10)
  407. import re
  408. play_match = re.search(r'播放[\s]*([\d.]+[万]?)', stats_text)
  409. like_match = re.search(r'点赞[\s]*([\d.]+[万]?)', stats_text)
  410. comment_match = re.search(r'评论[\s]*([\d.]+[万]?)', stats_text)
  411. def parse_count(match):
  412. if not match:
  413. return 0
  414. val = match.group(1)
  415. if '万' in val:
  416. return int(float(val.replace('万', '')) * 10000)
  417. return int(val)
  418. play_count = parse_count(play_match)
  419. like_count = parse_count(like_match)
  420. comment_count = parse_count(comment_match)
  421. # 获取发布时间
  422. time_el = item.locator('div.time')
  423. publish_time = ''
  424. if await time_el.count() > 0:
  425. publish_time = await time_el.text_content() or ''
  426. publish_time = publish_time.strip()
  427. # 生成临时 work_id(视频号可能需要从详情页获取)
  428. work_id = f"weixin_{i}_{hash(title)}"
  429. works.append(WorkItem(
  430. work_id=work_id,
  431. title=title or '无标题',
  432. cover_url=cover_url,
  433. duration=0,
  434. status='published',
  435. publish_time=publish_time,
  436. play_count=play_count,
  437. like_count=like_count,
  438. comment_count=comment_count,
  439. ))
  440. except Exception as e:
  441. print(f"[{self.platform_name}] 解析作品 {i} 失败: {e}")
  442. continue
  443. total = len(works)
  444. has_more = item_count > page_size
  445. print(f"[{self.platform_name}] 获取到 {total} 个作品")
  446. except Exception as e:
  447. import traceback
  448. traceback.print_exc()
  449. return WorksResult(success=False, platform=self.platform_name, error=str(e))
  450. return WorksResult(success=True, platform=self.platform_name, works=works, total=total, has_more=has_more)
  451. async def get_comments(self, cookies: str, work_id: str, cursor: str = "") -> CommentsResult:
  452. """获取视频号作品评论"""
  453. print(f"\n{'='*60}")
  454. print(f"[{self.platform_name}] 获取作品评论")
  455. print(f"[{self.platform_name}] work_id={work_id}")
  456. print(f"{'='*60}")
  457. comments: List[CommentItem] = []
  458. total = 0
  459. has_more = False
  460. try:
  461. await self.init_browser()
  462. cookie_list = self.parse_cookies(cookies)
  463. await self.set_cookies(cookie_list)
  464. if not self.page:
  465. raise Exception("Page not initialized")
  466. # 访问评论管理页面
  467. await self.page.goto("https://channels.weixin.qq.com/platform/comment/index")
  468. await asyncio.sleep(5)
  469. # 检查登录状态
  470. current_url = self.page.url
  471. if "login" in current_url:
  472. raise Exception("Cookie 已过期,请重新登录")
  473. # 等待评论列表加载
  474. try:
  475. await self.page.wait_for_selector('div.comment-list', timeout=10000)
  476. except:
  477. print(f"[{self.platform_name}] 未找到评论列表")
  478. return CommentsResult(success=True, platform=self.platform_name, work_id=work_id, comments=[], total=0, has_more=False)
  479. # 获取所有评论项
  480. comment_items = self.page.locator('div.comment-item')
  481. item_count = await comment_items.count()
  482. print(f"[{self.platform_name}] 找到 {item_count} 个评论项")
  483. for i in range(item_count):
  484. try:
  485. item = comment_items.nth(i)
  486. # 获取作者信息
  487. author_name = ''
  488. author_avatar = ''
  489. name_el = item.locator('div.nick-name')
  490. if await name_el.count() > 0:
  491. author_name = await name_el.text_content() or ''
  492. author_name = author_name.strip()
  493. avatar_el = item.locator('img.avatar')
  494. if await avatar_el.count() > 0:
  495. author_avatar = await avatar_el.get_attribute('src') or ''
  496. # 获取评论内容
  497. content = ''
  498. content_el = item.locator('div.comment-content')
  499. if await content_el.count() > 0:
  500. content = await content_el.text_content() or ''
  501. content = content.strip()
  502. # 获取时间
  503. create_time = ''
  504. time_el = item.locator('div.time')
  505. if await time_el.count() > 0:
  506. create_time = await time_el.text_content() or ''
  507. create_time = create_time.strip()
  508. # 生成评论 ID
  509. comment_id = f"weixin_comment_{i}_{hash(content)}"
  510. comments.append(CommentItem(
  511. comment_id=comment_id,
  512. work_id=work_id,
  513. content=content,
  514. author_id='',
  515. author_name=author_name,
  516. author_avatar=author_avatar,
  517. like_count=0,
  518. reply_count=0,
  519. create_time=create_time,
  520. ))
  521. except Exception as e:
  522. print(f"[{self.platform_name}] 解析评论 {i} 失败: {e}")
  523. continue
  524. total = len(comments)
  525. print(f"[{self.platform_name}] 获取到 {total} 条评论")
  526. except Exception as e:
  527. import traceback
  528. traceback.print_exc()
  529. return CommentsResult(success=False, platform=self.platform_name, work_id=work_id, error=str(e))
  530. return CommentsResult(success=True, platform=self.platform_name, work_id=work_id, comments=comments, total=total, has_more=has_more)