# -*- coding: utf-8 -*- """ 微信视频号发布器 参考: matrix/tencent_uploader/main.py """ import asyncio import os from datetime import datetime from typing import List from .base import ( BasePublisher, PublishParams, PublishResult, WorkItem, WorksResult, CommentItem, CommentsResult ) def format_short_title(origin_title: str) -> str: """ 格式化短标题 - 移除特殊字符 - 长度限制在 6-16 字符 """ allowed_special_chars = "《》"":+?%°" filtered_chars = [ char if char.isalnum() or char in allowed_special_chars else ' ' if char == ',' else '' for char in origin_title ] formatted_string = ''.join(filtered_chars) if len(formatted_string) > 16: formatted_string = formatted_string[:16] elif len(formatted_string) < 6: formatted_string += ' ' * (6 - len(formatted_string)) return formatted_string class WeixinPublisher(BasePublisher): """ 微信视频号发布器 使用 Playwright 自动化操作视频号创作者中心 注意: 需要使用 Chrome 浏览器,否则可能出现 H264 编码错误 """ platform_name = "weixin" login_url = "https://channels.weixin.qq.com/platform" publish_url = "https://channels.weixin.qq.com/platform/post/create" cookie_domain = ".weixin.qq.com" async def init_browser(self, storage_state: str = None): """初始化浏览器 - 使用 Chrome 浏览器""" from playwright.async_api import async_playwright playwright = await async_playwright().start() # 使用 Chrome 浏览器,避免 H264 编码问题 self.browser = await playwright.chromium.launch( headless=self.headless, channel="chrome" ) if storage_state and os.path.exists(storage_state): self.context = await self.browser.new_context(storage_state=storage_state) else: self.context = await self.browser.new_context() self.page = await self.context.new_page() return self.page async def set_schedule_time(self, publish_date: datetime): """设置定时发布""" if not self.page: return print(f"[{self.platform_name}] 设置定时发布...") # 点击定时选项 label_element = self.page.locator("label").filter(has_text="定时").nth(1) await label_element.click() # 选择日期 await self.page.click('input[placeholder="请选择发表时间"]') publish_month = f"{publish_date.month:02d}" current_month = f"{publish_month}月" # 检查月份 page_month = await self.page.inner_text('span.weui-desktop-picker__panel__label:has-text("月")') if page_month != current_month: await self.page.click('button.weui-desktop-btn__icon__right') # 选择日期 elements = await self.page.query_selector_all('table.weui-desktop-picker__table a') for element in elements: class_name = await element.evaluate('el => el.className') if 'weui-desktop-picker__disabled' in class_name: continue text = await element.inner_text() if text.strip() == str(publish_date.day): await element.click() break # 输入时间 await self.page.click('input[placeholder="请选择时间"]') await self.page.keyboard.press("Control+KeyA") await self.page.keyboard.type(str(publish_date.hour)) # 点击其他地方确认 await self.page.locator("div.input-editor").click() async def handle_upload_error(self, video_path: str): """处理上传错误""" if not self.page: return print(f"[{self.platform_name}] 视频出错了,重新上传中...") await self.page.locator('div.media-status-content div.tag-inner:has-text("删除")').click() await self.page.get_by_role('button', name="删除", exact=True).click() file_input = self.page.locator('input[type="file"]') await file_input.set_input_files(video_path) async def add_title_tags(self, params: PublishParams): """添加标题和话题""" if not self.page: return await self.page.locator("div.input-editor").click() await self.page.keyboard.type(params.title) if params.tags: await self.page.keyboard.press("Enter") for tag in params.tags: await self.page.keyboard.type("#" + tag) await self.page.keyboard.press("Space") print(f"[{self.platform_name}] 成功添加标题和 {len(params.tags)} 个话题") async def add_short_title(self): """添加短标题""" if not self.page: return try: short_title_element = self.page.get_by_text("短标题", exact=True).locator("..").locator( "xpath=following-sibling::div").locator('span input[type="text"]') if await short_title_element.count(): # 获取已有内容作为短标题 pass except: pass async def upload_cover(self, cover_path: str): """上传封面图""" if not self.page or not cover_path or not os.path.exists(cover_path): return try: await asyncio.sleep(2) preview_btn_info = await self.page.locator( 'div.finder-tag-wrap.btn:has-text("更换封面")').get_attribute('class') if "disabled" not in preview_btn_info: await self.page.locator('div.finder-tag-wrap.btn:has-text("更换封面")').click() await self.page.locator('div.single-cover-uploader-wrap > div.wrap').hover() # 删除现有封面 if await self.page.locator(".del-wrap > .svg-icon").count(): await self.page.locator(".del-wrap > .svg-icon").click() # 上传新封面 preview_div = self.page.locator("div.single-cover-uploader-wrap > div.wrap") async with self.page.expect_file_chooser() as fc_info: await preview_div.click() preview_chooser = await fc_info.value await preview_chooser.set_files(cover_path) await asyncio.sleep(2) await self.page.get_by_role("button", name="确定").click() await asyncio.sleep(1) await self.page.get_by_role("button", name="确认").click() print(f"[{self.platform_name}] 封面上传成功") except Exception as e: print(f"[{self.platform_name}] 封面上传失败: {e}") async def publish(self, cookies: str, params: PublishParams) -> PublishResult: """发布视频到视频号""" self.report_progress(5, "正在初始化浏览器...") # 初始化浏览器(使用 Chrome) await self.init_browser() # 解析并设置 cookies cookie_list = self.parse_cookies(cookies) await self.set_cookies(cookie_list) if not self.page: raise Exception("Page not initialized") # 检查视频文件 if not os.path.exists(params.video_path): raise Exception(f"视频文件不存在: {params.video_path}") self.report_progress(10, "正在打开上传页面...") # 访问上传页面 await self.page.goto(self.publish_url) await self.page.wait_for_url(self.publish_url, timeout=30000) self.report_progress(15, "正在选择视频文件...") # 点击上传区域 upload_div = self.page.locator("div.upload-content") async with self.page.expect_file_chooser() as fc_info: await upload_div.click() file_chooser = await fc_info.value await file_chooser.set_files(params.video_path) self.report_progress(20, "正在填充标题和话题...") # 添加标题和话题 await self.add_title_tags(params) self.report_progress(30, "等待视频上传完成...") # 等待上传完成 for _ in range(120): try: button_info = await self.page.get_by_role("button", name="发表").get_attribute('class') if "weui-desktop-btn_disabled" not in button_info: print(f"[{self.platform_name}] 视频上传完毕") # 上传封面 self.report_progress(50, "正在上传封面...") await self.upload_cover(params.cover_path) break else: # 检查上传错误 if await self.page.locator('div.status-msg.error').count(): if await self.page.locator('div.media-status-content div.tag-inner:has-text("删除")').count(): await self.handle_upload_error(params.video_path) await asyncio.sleep(3) except: await asyncio.sleep(3) self.report_progress(60, "处理视频设置...") # 添加短标题 try: short_title_el = self.page.get_by_text("短标题", exact=True).locator("..").locator( "xpath=following-sibling::div").locator('span input[type="text"]') if await short_title_el.count(): short_title = format_short_title(params.title) await short_title_el.fill(short_title) except: pass # 定时发布 if params.publish_date: self.report_progress(70, "设置定时发布...") await self.set_schedule_time(params.publish_date) self.report_progress(80, "正在发布...") # 点击发布 for _ in range(30): try: publish_btn = self.page.locator('div.form-btns button:has-text("发表")') if await publish_btn.count(): await publish_btn.click() await self.page.wait_for_url( "https://channels.weixin.qq.com/platform/post/list", timeout=10000 ) self.report_progress(100, "发布成功") return PublishResult( success=True, platform=self.platform_name, message="发布成功" ) except: current_url = self.page.url if "post/list" in current_url: self.report_progress(100, "发布成功") return PublishResult( success=True, platform=self.platform_name, message="发布成功" ) await asyncio.sleep(1) raise Exception("发布超时") async def get_works(self, cookies: str, page: int = 0, page_size: int = 20) -> WorksResult: """获取视频号作品列表""" print(f"\n{'='*60}") print(f"[{self.platform_name}] 获取作品列表") print(f"[{self.platform_name}] page={page}, page_size={page_size}") print(f"{'='*60}") works: List[WorkItem] = [] total = 0 has_more = False try: await self.init_browser() cookie_list = self.parse_cookies(cookies) await self.set_cookies(cookie_list) if not self.page: raise Exception("Page not initialized") # 访问视频号创作者中心 await self.page.goto("https://channels.weixin.qq.com/platform/post/list") await asyncio.sleep(5) # 检查登录状态 current_url = self.page.url if "login" in current_url: raise Exception("Cookie 已过期,请重新登录") # 视频号使用页面爬取方式获取作品列表 # 等待作品列表加载 await self.page.wait_for_selector('div.post-feed-wrap', timeout=10000) # 获取所有作品项 post_items = self.page.locator('div.post-feed-item') item_count = await post_items.count() print(f"[{self.platform_name}] 找到 {item_count} 个作品项") for i in range(min(item_count, page_size)): try: item = post_items.nth(i) # 获取封面 cover_el = item.locator('div.cover-wrap img').first cover_url = '' if await cover_el.count() > 0: cover_url = await cover_el.get_attribute('src') or '' # 获取标题 title_el = item.locator('div.content').first title = '' if await title_el.count() > 0: title = await title_el.text_content() or '' title = title.strip()[:50] # 获取统计数据 stats_el = item.locator('div.post-data') play_count = 0 like_count = 0 comment_count = 0 if await stats_el.count() > 0: stats_text = await stats_el.text_content() or '' # 解析统计数据(格式可能是: 播放 100 点赞 50 评论 10) import re play_match = re.search(r'播放[\s]*([\d.]+[万]?)', stats_text) like_match = re.search(r'点赞[\s]*([\d.]+[万]?)', stats_text) comment_match = re.search(r'评论[\s]*([\d.]+[万]?)', stats_text) def parse_count(match): if not match: return 0 val = match.group(1) if '万' in val: return int(float(val.replace('万', '')) * 10000) return int(val) play_count = parse_count(play_match) like_count = parse_count(like_match) comment_count = parse_count(comment_match) # 获取发布时间 time_el = item.locator('div.time') publish_time = '' if await time_el.count() > 0: publish_time = await time_el.text_content() or '' publish_time = publish_time.strip() # 生成临时 work_id(视频号可能需要从详情页获取) work_id = f"weixin_{i}_{hash(title)}" works.append(WorkItem( work_id=work_id, title=title or '无标题', cover_url=cover_url, duration=0, status='published', publish_time=publish_time, play_count=play_count, like_count=like_count, comment_count=comment_count, )) except Exception as e: print(f"[{self.platform_name}] 解析作品 {i} 失败: {e}") continue total = len(works) has_more = item_count > page_size print(f"[{self.platform_name}] 获取到 {total} 个作品") except Exception as e: import traceback traceback.print_exc() return WorksResult(success=False, platform=self.platform_name, error=str(e)) return WorksResult(success=True, platform=self.platform_name, works=works, total=total, has_more=has_more) async def get_comments(self, cookies: str, work_id: str, cursor: str = "") -> CommentsResult: """获取视频号作品评论""" print(f"\n{'='*60}") print(f"[{self.platform_name}] 获取作品评论") print(f"[{self.platform_name}] work_id={work_id}") print(f"{'='*60}") comments: List[CommentItem] = [] total = 0 has_more = False try: await self.init_browser() cookie_list = self.parse_cookies(cookies) await self.set_cookies(cookie_list) if not self.page: raise Exception("Page not initialized") # 访问评论管理页面 await self.page.goto("https://channels.weixin.qq.com/platform/comment/index") await asyncio.sleep(5) # 检查登录状态 current_url = self.page.url if "login" in current_url: raise Exception("Cookie 已过期,请重新登录") # 等待评论列表加载 try: await self.page.wait_for_selector('div.comment-list', timeout=10000) except: print(f"[{self.platform_name}] 未找到评论列表") return CommentsResult(success=True, platform=self.platform_name, work_id=work_id, comments=[], total=0, has_more=False) # 获取所有评论项 comment_items = self.page.locator('div.comment-item') item_count = await comment_items.count() print(f"[{self.platform_name}] 找到 {item_count} 个评论项") for i in range(item_count): try: item = comment_items.nth(i) # 获取作者信息 author_name = '' author_avatar = '' name_el = item.locator('div.nick-name') if await name_el.count() > 0: author_name = await name_el.text_content() or '' author_name = author_name.strip() avatar_el = item.locator('img.avatar') if await avatar_el.count() > 0: author_avatar = await avatar_el.get_attribute('src') or '' # 获取评论内容 content = '' content_el = item.locator('div.comment-content') if await content_el.count() > 0: content = await content_el.text_content() or '' content = content.strip() # 获取时间 create_time = '' time_el = item.locator('div.time') if await time_el.count() > 0: create_time = await time_el.text_content() or '' create_time = create_time.strip() # 生成评论 ID comment_id = f"weixin_comment_{i}_{hash(content)}" comments.append(CommentItem( comment_id=comment_id, work_id=work_id, content=content, author_id='', author_name=author_name, author_avatar=author_avatar, like_count=0, reply_count=0, create_time=create_time, )) except Exception as e: print(f"[{self.platform_name}] 解析评论 {i} 失败: {e}") continue total = len(comments) print(f"[{self.platform_name}] 获取到 {total} 条评论") except Exception as e: import traceback traceback.print_exc() return CommentsResult(success=False, platform=self.platform_name, work_id=work_id, error=str(e)) return CommentsResult(success=True, platform=self.platform_name, work_id=work_id, comments=comments, total=total, has_more=has_more)