# -*- coding: utf-8 -*- """ 微信视频号发布器 参考: matrix/tencent_uploader/main.py """ import asyncio import os from datetime import datetime from typing import List from .base import ( BasePublisher, PublishParams, PublishResult, WorkItem, WorksResult, CommentItem, CommentsResult ) import os import time def format_short_title(origin_title: str) -> str: """ 格式化短标题 - 移除特殊字符 - 长度限制在 6-16 字符 """ allowed_special_chars = "《》"":+?%°" filtered_chars = [ char if char.isalnum() or char in allowed_special_chars else ' ' if char == ',' else '' for char in origin_title ] formatted_string = ''.join(filtered_chars) if len(formatted_string) > 16: formatted_string = formatted_string[:16] elif len(formatted_string) < 6: formatted_string += ' ' * (6 - len(formatted_string)) return formatted_string class WeixinPublisher(BasePublisher): """ 微信视频号发布器 使用 Playwright 自动化操作视频号创作者中心 注意: 需要使用 Chrome 浏览器,否则可能出现 H264 编码错误 """ platform_name = "weixin" login_url = "https://channels.weixin.qq.com/platform" publish_url = "https://channels.weixin.qq.com/platform/post/create" cookie_domain = ".weixin.qq.com" def _parse_count(self, count_str: str) -> int: """解析数字(支持带'万'的格式)""" try: count_str = count_str.strip() if '万' in count_str: return int(float(count_str.replace('万', '')) * 10000) return int(count_str) except: return 0 async def init_browser(self, storage_state: str = None): """初始化浏览器 - 参考 matrix 使用 channel=chrome 避免 H264 编码错误""" from playwright.async_api import async_playwright playwright = await async_playwright().start() # 参考 matrix: 使用系统内的 Chrome 浏览器,避免 H264 编码错误 # 如果没有安装 Chrome,则使用默认 Chromium try: self.browser = await playwright.chromium.launch( headless=self.headless, channel="chrome" # 使用系统 Chrome ) print(f"[{self.platform_name}] 使用系统 Chrome 浏览器") except Exception as e: print(f"[{self.platform_name}] Chrome 不可用,使用 Chromium: {e}") self.browser = await playwright.chromium.launch(headless=self.headless) # 设置 HTTP Headers 防止重定向 headers = { "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36", "Referer": "https://channels.weixin.qq.com/platform/post/list", } self.context = await self.browser.new_context( extra_http_headers=headers, ignore_https_errors=True, viewport={"width": 1920, "height": 1080}, user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36" ) self.page = await self.context.new_page() return self.page async def set_schedule_time(self, publish_date: datetime): """设置定时发布""" if not self.page: return print(f"[{self.platform_name}] 设置定时发布...") # 点击定时选项 label_element = self.page.locator("label").filter(has_text="定时").nth(1) await label_element.click() # 选择日期 await self.page.click('input[placeholder="请选择发表时间"]') publish_month = f"{publish_date.month:02d}" current_month = f"{publish_month}月" # 检查月份 page_month = await self.page.inner_text('span.weui-desktop-picker__panel__label:has-text("月")') if page_month != current_month: await self.page.click('button.weui-desktop-btn__icon__right') # 选择日期 elements = await self.page.query_selector_all('table.weui-desktop-picker__table a') for element in elements: class_name = await element.evaluate('el => el.className') if 'weui-desktop-picker__disabled' in class_name: continue text = await element.inner_text() if text.strip() == str(publish_date.day): await element.click() break # 输入时间 await self.page.click('input[placeholder="请选择时间"]') await self.page.keyboard.press("Control+KeyA") await self.page.keyboard.type(str(publish_date.hour)) # 点击其他地方确认 await self.page.locator("div.input-editor").click() async def handle_upload_error(self, video_path: str): """处理上传错误""" if not self.page: return print(f"[{self.platform_name}] 视频出错了,重新上传中...") await self.page.locator('div.media-status-content div.tag-inner:has-text("删除")').click() await self.page.get_by_role('button', name="删除", exact=True).click() file_input = self.page.locator('input[type="file"]') await file_input.set_input_files(video_path) async def add_title_tags(self, params: PublishParams): """添加标题和话题""" if not self.page: return await self.page.locator("div.input-editor").click() await self.page.keyboard.type(params.title) if params.tags: await self.page.keyboard.press("Enter") for tag in params.tags: await self.page.keyboard.type("#" + tag) await self.page.keyboard.press("Space") print(f"[{self.platform_name}] 成功添加标题和 {len(params.tags)} 个话题") async def add_short_title(self): """添加短标题""" if not self.page: return try: short_title_element = self.page.get_by_text("短标题", exact=True).locator("..").locator( "xpath=following-sibling::div").locator('span input[type="text"]') if await short_title_element.count(): # 获取已有内容作为短标题 pass except: pass async def upload_cover(self, cover_path: str): """上传封面图""" if not self.page or not cover_path or not os.path.exists(cover_path): return try: await asyncio.sleep(2) preview_btn_info = await self.page.locator( 'div.finder-tag-wrap.btn:has-text("更换封面")').get_attribute('class') if "disabled" not in preview_btn_info: await self.page.locator('div.finder-tag-wrap.btn:has-text("更换封面")').click() await self.page.locator('div.single-cover-uploader-wrap > div.wrap').hover() # 删除现有封面 if await self.page.locator(".del-wrap > .svg-icon").count(): await self.page.locator(".del-wrap > .svg-icon").click() # 上传新封面 preview_div = self.page.locator("div.single-cover-uploader-wrap > div.wrap") async with self.page.expect_file_chooser() as fc_info: await preview_div.click() preview_chooser = await fc_info.value await preview_chooser.set_files(cover_path) await asyncio.sleep(2) await self.page.get_by_role("button", name="确定").click() await asyncio.sleep(1) await self.page.get_by_role("button", name="确认").click() print(f"[{self.platform_name}] 封面上传成功") except Exception as e: print(f"[{self.platform_name}] 封面上传失败: {e}") async def check_captcha(self) -> dict: """检查页面是否需要验证码""" if not self.page: return {'need_captcha': False, 'captcha_type': ''} try: # 检查各种验证码 captcha_selectors = [ 'text="请输入验证码"', 'text="滑动验证"', '[class*="captcha"]', '[class*="verify"]', ] for selector in captcha_selectors: try: if await self.page.locator(selector).count() > 0: print(f"[{self.platform_name}] 检测到验证码: {selector}") return {'need_captcha': True, 'captcha_type': 'image'} except: pass # 检查登录弹窗 login_selectors = [ 'text="请登录"', 'text="扫码登录"', '[class*="login-dialog"]', ] for selector in login_selectors: try: if await self.page.locator(selector).count() > 0: print(f"[{self.platform_name}] 检测到需要登录: {selector}") return {'need_captcha': True, 'captcha_type': 'login'} except: pass except Exception as e: print(f"[{self.platform_name}] 验证码检测异常: {e}") return {'need_captcha': False, 'captcha_type': ''} async def publish(self, cookies: str, params: PublishParams) -> PublishResult: """发布视频到视频号""" print(f"\n{'='*60}") print(f"[{self.platform_name}] 开始发布视频") print(f"[{self.platform_name}] 视频路径: {params.video_path}") print(f"[{self.platform_name}] 标题: {params.title}") print(f"[{self.platform_name}] Headless: {self.headless}") print(f"{'='*60}") self.report_progress(5, "正在初始化浏览器...") # 初始化浏览器(使用 Chrome) await self.init_browser() print(f"[{self.platform_name}] 浏览器初始化完成") # 解析并设置 cookies cookie_list = self.parse_cookies(cookies) print(cookie_list) print(f"[{self.platform_name}] 解析到 {len(cookie_list)} 个 cookies") await self.set_cookies(cookie_list) if not self.page: raise Exception("Page not initialized") # 检查视频文件 if not os.path.exists(params.video_path): raise Exception(f"视频文件不存在: {params.video_path}") print(f"[{self.platform_name}] 视频文件存在,大小: {os.path.getsize(params.video_path)} bytes") self.report_progress(10, "正在打开上传页面...") print(f"[{self.platform_name}] 当前 发布URL: {self.publish_url}") # 访问上传页面 await self.page.goto(self.publish_url, wait_until="networkidle", timeout=60000) await asyncio.sleep(10) # 打印页面HTML调试 print(f"[{self.platform_name}] 当前 URL: {self.page.url}") html_content = await self.page.content() print(f"[{self.platform_name}] 页面HTML长度: {len(html_content)}") # 截图调试 screenshot_path = f"weixin_publish_{int(asyncio.get_event_loop().time())}.png" await self.page.screenshot(path=screenshot_path) print(f"[{self.platform_name}] 截图已保存: {screenshot_path}") # 检查 input[type='file'] 是否存在 file_input = self.page.locator("input[type='file']") count = await file_input.count() print(f"[{self.platform_name}] 找到 {count} 个 file input") if count == 0: raise Exception("页面中未找到 input[type='file'] 元素") # 直接设置文件,不触发click print("上传文件...") file_path = params.video_path await file_input.first.set_input_files(file_path) print(f"[{self.platform_name}] 文件已设置: {file_path}") # 等待上传进度 await asyncio.sleep(5) # 等待删除标签弹窗可见(可选,设置超时) try: await self.page.wait_for_selector(".weui-desktop-popover__wrp.finder-popover-dialog-wrap .finder-tag-wrap", state="visible", timeout=20000) print("删除标签弹窗已显示") except: print("删除标签弹窗未出现,继续执行") # 主动关闭系统文件选择窗口(如果还存在) try: # 获取所有窗口 context_pages = self.page.context.pages for p in context_pages: if p != self.page and "打开" in await p.title(): print(f"关闭系统文件选择窗口: {await p.title()}") await p.close() except Exception as e: print(f"关闭文件选择窗口异常: {e}") # 填写多个输入框 print("填写输入框...") # 描述输入框 await self.page.locator("div.input-editor[contenteditable][data-placeholder='添加描述']").fill("智能拍照机来啦") # 短标题输入框 await self.page.fill("input.weui-desktop-form__input[placeholder*='概括视频主要内容']", "解放双手的智能拍照机") await self.page.wait_for_timeout(1000) # 点击最下方的发布按钮 print("点击发布按钮...") await self.page.click("button.weui-desktop-btn.weui-desktop-btn_primary:has-text('发表')") # 监控是否出现"直接发表"按钮 try: direct_publish_btn = self.page.locator("button.weui-desktop-btn.weui-desktop-btn_default:has-text('直接发表')") await direct_publish_btn.wait_for(state="visible", timeout=3000) print("检测到'直接发表'按钮,点击...") await direct_publish_btn.click() except: print("未检测到'直接发表'按钮,继续...") # 等待发布完成 await self.page.wait_for_timeout(3000) print("发布完成!") return PublishResult( success=True, platform=self.platform_name, message="发布成功", screenshot_base64="", page_url=self.publish_url, status='success' ) # 检查是否跳转到登录页 current_url = self.page.url print(f"[{self.platform_name}] 当前页面: {current_url}") if "login" in current_url: screenshot_base64 = await self.capture_screenshot() return PublishResult( success=False, platform=self.platform_name, error="Cookie 已过期,需要重新登录", need_captcha=True, captcha_type='login', screenshot_base64=screenshot_base64, page_url=current_url, status='need_captcha' ) # 使用 AI 检查验证码 ai_captcha = await self.ai_check_captcha() if ai_captcha['has_captcha']: print(f"[{self.platform_name}] AI检测到验证码: {ai_captcha['captcha_type']}", flush=True) screenshot_base64 = await self.capture_screenshot() return PublishResult( success=False, platform=self.platform_name, error=f"检测到{ai_captcha['captcha_type']}验证码,需要使用有头浏览器完成验证", need_captcha=True, captcha_type=ai_captcha['captcha_type'], screenshot_base64=screenshot_base64, page_url=current_url, status='need_captcha' ) # 传统方式检查验证码 captcha_result = await self.check_captcha() if captcha_result['need_captcha']: screenshot_base64 = await self.capture_screenshot() return PublishResult( success=False, platform=self.platform_name, error=f"需要{captcha_result['captcha_type']}验证码,请使用有头浏览器完成验证", need_captcha=True, captcha_type=captcha_result['captcha_type'], screenshot_base64=screenshot_base64, page_url=current_url, status='need_captcha' ) self.report_progress(15, "正在选择视频文件...") # 上传视频 - 参考 matrix/tencent_uploader/main.py # matrix 使用: div.upload-content 点击后触发文件选择器 upload_success = False # 方法1: 参考 matrix - 点击 div.upload-content try: upload_div = self.page.locator("div.upload-content") if await upload_div.count() > 0: print(f"[{self.platform_name}] 找到 upload-content 上传区域") async with self.page.expect_file_chooser(timeout=10000) as fc_info: await upload_div.click() file_chooser = await fc_info.value await file_chooser.set_files(params.video_path) upload_success = True print(f"[{self.platform_name}] 通过 upload-content 上传成功") except Exception as e: print(f"[{self.platform_name}] upload-content 上传失败: {e}") # 方法2: 尝试其他选择器 if not upload_success: upload_selectors = [ 'div[class*="upload-area"]', 'div[class*="drag-upload"]', 'div.add-wrap', '[class*="uploader"]', ] for selector in upload_selectors: if upload_success: break try: upload_area = self.page.locator(selector).first if await upload_area.count() > 0: print(f"[{self.platform_name}] 尝试点击上传区域: {selector}") async with self.page.expect_file_chooser(timeout=10000) as fc_info: await upload_area.click() file_chooser = await fc_info.value await file_chooser.set_files(params.video_path) upload_success = True print(f"[{self.platform_name}] 通过点击上传区域成功") break except Exception as e: print(f"[{self.platform_name}] 选择器 {selector} 失败: {e}") # 方法3: 直接设置 file input if not upload_success: try: file_input = self.page.locator('input[type="file"]') if await file_input.count() > 0: await file_input.first.set_input_files(params.video_path) upload_success = True print(f"[{self.platform_name}] 通过 file input 上传成功") except Exception as e: print(f"[{self.platform_name}] file input 上传失败: {e}") if not upload_success: screenshot_base64 = await self.capture_screenshot() return PublishResult( success=False, platform=self.platform_name, error="未找到上传入口", screenshot_base64=screenshot_base64, page_url=await self.get_page_url(), status='failed' ) self.report_progress(20, "正在填充标题和话题...") # 添加标题和话题 await self.add_title_tags(params) self.report_progress(30, "等待视频上传完成...") # 等待上传完成 for _ in range(120): try: button_info = await self.page.get_by_role("button", name="发表").get_attribute('class') if "weui-desktop-btn_disabled" not in button_info: print(f"[{self.platform_name}] 视频上传完毕") # 上传封面 self.report_progress(50, "正在上传封面...") await self.upload_cover(params.cover_path) break else: # 检查上传错误 if await self.page.locator('div.status-msg.error').count(): if await self.page.locator('div.media-status-content div.tag-inner:has-text("删除")').count(): await self.handle_upload_error(params.video_path) await asyncio.sleep(3) except: await asyncio.sleep(3) self.report_progress(60, "处理视频设置...") # 添加短标题 try: short_title_el = self.page.get_by_text("短标题", exact=True).locator("..").locator( "xpath=following-sibling::div").locator('span input[type="text"]') if await short_title_el.count(): short_title = format_short_title(params.title) await short_title_el.fill(short_title) except: pass # 定时发布 if params.publish_date: self.report_progress(70, "设置定时发布...") await self.set_schedule_time(params.publish_date) self.report_progress(80, "正在发布...") # 点击发布 - 参考 matrix for i in range(30): try: # 参考 matrix: div.form-btns button:has-text("发表") publish_btn = self.page.locator('div.form-btns button:has-text("发表")') if await publish_btn.count(): print(f"[{self.platform_name}] 点击发布按钮...") await publish_btn.click() # 等待跳转到作品列表页面 - 参考 matrix await self.page.wait_for_url( "https://channels.weixin.qq.com/platform/post/list", timeout=10000 ) self.report_progress(100, "发布成功") print(f"[{self.platform_name}] 视频发布成功!") screenshot_base64 = await self.capture_screenshot() return PublishResult( success=True, platform=self.platform_name, message="发布成功", screenshot_base64=screenshot_base64, page_url=self.page.url, status='success' ) except Exception as e: current_url = self.page.url if "https://channels.weixin.qq.com/platform/post/list" in current_url: self.report_progress(100, "发布成功") print(f"[{self.platform_name}] 视频发布成功!") screenshot_base64 = await self.capture_screenshot() return PublishResult( success=True, platform=self.platform_name, message="发布成功", screenshot_base64=screenshot_base64, page_url=current_url, status='success' ) else: print(f"[{self.platform_name}] 视频正在发布中... {i+1}/30, URL: {current_url}") await asyncio.sleep(1) # 发布超时 screenshot_base64 = await self.capture_screenshot() page_url = await self.get_page_url() return PublishResult( success=False, platform=self.platform_name, error="发布超时,请检查发布状态", screenshot_base64=screenshot_base64, page_url=page_url, status='need_action' ) async def get_works(self, cookies: str, page: int = 0, page_size: int = 20) -> WorksResult: print(f"1111111111111111111") """获取视频号作品列表""" print(f"\n{'='*60}") print(f"[{self.platform_name}] 获取作品列表") print(f"[{self.platform_name}] page={page}, page_size={page_size}") print(f"{'='*60}") works: List[WorkItem] = [] total = 0 has_more = False try: await self.init_browser() cookie_list = self.parse_cookies(cookies) await self.set_cookies(cookie_list) if not self.page: raise Exception("Page not initialized") # 访问视频号创作者中心 await self.page.goto("https://channels.weixin.qq.com/platform/post/list") await asyncio.sleep(5) print(f"1111111111111111") # 检查登录状态 current_url = self.page.url if "login" in current_url: print(f"2111111111111111") raise Exception("Cookie 已过期,请重新登录") # 视频号使用页面爬取方式获取作品列表 # 等待作品列表加载(增加等待时间,并添加截图调试) try: await self.page.wait_for_selector('div.post-feed-item', timeout=15000) except: # 超时后打印当前 URL 和截图 current_url = self.page.url print(f"[{self.platform_name}] 等待超时,当前 URL: {current_url}") # 截图保存 screenshot_path = f"weixin_timeout_{int(asyncio.get_event_loop().time())}.png" await self.page.screenshot(path=screenshot_path) print(f"[{self.platform_name}] 截图已保存: {screenshot_path}") raise Exception(f"页面加载超时,当前 URL: {current_url}") # 打印 DOM 结构 page_html = await self.page.content() print(f"[{self.platform_name}] ========== 页面 DOM 开始 ==========") print(page_html[:5000]) # 打印前5000个字符 print(f"[{self.platform_name}] ========== 页面 DOM 结束 ==========") # 获取所有作品项 post_items = self.page.locator('div.post-feed-item') item_count = await post_items.count() print(f"[{self.platform_name}] 找到 {item_count} 个作品项") for i in range(min(item_count, page_size)): try: item = post_items.nth(i) # 获取封面 cover_el = item.locator('div.media img.thumb').first cover_url = '' if await cover_el.count() > 0: cover_url = await cover_el.get_attribute('src') or '' # 获取标题 title_el = item.locator('div.post-title').first title = '' if await title_el.count() > 0: title = await title_el.text_content() or '' title = title.strip() # 获取发布时间 time_el = item.locator('div.post-time span').first publish_time = '' if await time_el.count() > 0: publish_time = await time_el.text_content() or '' publish_time = publish_time.strip() # 获取统计数据 import re data_items = item.locator('div.post-data div.data-item') data_count = await data_items.count() play_count = 0 like_count = 0 comment_count = 0 share_count = 0 collect_count = 0 for j in range(data_count): data_item = data_items.nth(j) count_text = await data_item.locator('span.count').text_content() or '0' count_text = count_text.strip() # 判断图标类型 if await data_item.locator('span.weui-icon-outlined-eyes-on').count() > 0: # 播放量 play_count = self._parse_count(count_text) elif await data_item.locator('span.weui-icon-outlined-like').count() > 0: # 点赞 like_count = self._parse_count(count_text) elif await data_item.locator('span.weui-icon-outlined-comment').count() > 0: # 评论 comment_count = self._parse_count(count_text) elif await data_item.locator('use[xlink\\:href="#icon-share"]').count() > 0: # 分享 share_count = self._parse_count(count_text) elif await data_item.locator('use[xlink\\:href="#icon-thumb"]').count() > 0: # 收藏 collect_count = self._parse_count(count_text) # 生成临时 work_id work_id = f"weixin_{i}_{hash(title)}_{hash(publish_time)}" works.append(WorkItem( work_id=work_id, title=title or '无标题', cover_url=cover_url, duration=0, status='published', publish_time=publish_time, play_count=play_count, like_count=like_count, comment_count=comment_count, share_count=share_count, collect_count=collect_count, )) except Exception as e: print(f"[{self.platform_name}] 解析作品 {i} 失败: {e}") import traceback traceback.print_exc() continue total = len(works) has_more = item_count > page_size print(f"[{self.platform_name}] 获取到 {total} 个作品") except Exception as e: import traceback traceback.print_exc() return WorksResult(success=False, platform=self.platform_name, error=str(e)) return WorksResult(success=True, platform=self.platform_name, works=works, total=total, has_more=has_more) async def get_comments(self, cookies: str, work_id: str, cursor: str = "") -> CommentsResult: """获取视频号作品评论""" print(f"\n{'='*60}") print(f"[{self.platform_name}] 获取作品评论") print(f"[{self.platform_name}] work_id={work_id}") print(f"{'='*60}") comments: List[CommentItem] = [] total = 0 has_more = False try: await self.init_browser() cookie_list = self.parse_cookies(cookies) await self.set_cookies(cookie_list) if not self.page: raise Exception("Page not initialized") # 访问评论管理页面 await self.page.goto("https://channels.weixin.qq.com/platform/interaction/comment") await asyncio.sleep(3) # 检查登录状态 current_url = self.page.url if "login" in current_url: raise Exception("Cookie 已过期,请重新登录") # 等待左侧作品列表加载 try: await self.page.wait_for_selector('div.comment-feed-wrap', timeout=15000) except: print(f"[{self.platform_name}] 未找到作品列表") return CommentsResult(success=True, platform=self.platform_name, work_id=work_id, comments=[], total=0, has_more=False) print(f"[{self.platform_name}] 查找 work_id={work_id} 对应的作品") # 点击左侧作品项(根据 work_id 匹配) feed_items = self.page.locator('div.comment-feed-wrap') item_count = await feed_items.count() print(f"[{self.platform_name}] 左侧共 {item_count} 个作品") clicked = False for i in range(item_count): feed = feed_items.nth(i) title_el = feed.locator('div.feed-title').first if await title_el.count() > 0: title_text = await title_el.text_content() or '' title_text = title_text.strip() # 检查是否包含 work_id(标题) if work_id in title_text or title_text in work_id: print(f"[{self.platform_name}] 找到匹配作品: {title_text}") await feed.click() await asyncio.sleep(2) clicked = True break if not clicked: # 如果没找到匹配的,点击第一个 print(f"[{self.platform_name}] 未找到匹配作品,点击第一个") if item_count > 0: await feed_items.nth(0).click() await asyncio.sleep(2) else: return CommentsResult(success=True, platform=self.platform_name, work_id=work_id, comments=[], total=0, has_more=False) # 等待右侧评论详情加载 try: await self.page.wait_for_selector('div.comment-item', timeout=5000) except: print(f"[{self.platform_name}] 该作品暂无评论") return CommentsResult(success=True, platform=self.platform_name, work_id=work_id, comments=[], total=0, has_more=False) # 获取评论总数 total_text_el = self.page.locator('div.comment-count__tips') if await total_text_el.count() > 0: total_text = await total_text_el.text_content() or '' # 提取数字(如 "共 1 条评论") import re match = re.search(r'(\d+)', total_text) if match: total = int(match.group(1)) print(f"[{self.platform_name}] 评论总数: {total}") # 获取右侧评论列表 comment_items = self.page.locator('div.comment-item') item_count = await comment_items.count() print(f"[{self.platform_name}] 当前加载 {item_count} 条评论") for i in range(item_count): try: item = comment_items.nth(i) # 获取作者昵称(加 .first 防 strict mode) author_name = '' name_el = item.locator('span.comment-user-name').first if await name_el.count() > 0: author_name = await name_el.text_content() or '' author_name = author_name.strip() # 获取头像 author_avatar = '' avatar_el = item.locator('img.comment-avatar').first if await avatar_el.count() > 0: author_avatar = await avatar_el.get_attribute('src') or '' # 获取评论内容(加 .first 防 strict mode) content = '' content_el = item.locator('span.comment-content').first if await content_el.count() > 0: content = await content_el.text_content() or '' content = content.strip() # 获取评论时间(加 .first 防 strict mode) create_time = '' time_el = item.locator('span.comment-time').first if await time_el.count() > 0: create_time = await time_el.text_content() or '' create_time = create_time.strip() if not content: continue # 生成评论 ID comment_id = f"weixin_comment_{i}_{abs(hash(content))}" comments.append(CommentItem( comment_id=comment_id, work_id=work_id, content=content, author_id='', author_name=author_name, author_avatar=author_avatar, like_count=0, reply_count=0, create_time=create_time, )) print(f"[{self.platform_name}] 评论 {i+1}: {author_name} - {content[:20]}...") except Exception as e: print(f"[{self.platform_name}] 解析评论 {i} 失败: {e}") continue print(f"[{self.platform_name}] 成功获取 {len(comments)} 条评论") except Exception as e: import traceback traceback.print_exc() return CommentsResult(success=False, platform=self.platform_name, work_id=work_id, error=str(e)) return CommentsResult(success=True, platform=self.platform_name, work_id=work_id, comments=comments, total=total, has_more=has_more) async def auto_reply_private_messages(self, cookies: str) -> dict: """自动回复私信 - 集成自 pw3.py""" print(f"\n{'='*60}") print(f"[{self.platform_name}] 开始自动回复私信") print(f"{'='*60}") try: await self.init_browser() cookie_list = self.parse_cookies(cookies) await self.set_cookies(cookie_list) if not self.page: raise Exception("Page not initialized") # 访问私信页面 await self.page.goto("https://channels.weixin.qq.com/platform/private_msg", timeout=30000) await asyncio.sleep(3) # 检查登录状态 current_url = self.page.url print(f"[{self.platform_name}] 当前 URL: {current_url}") if "login" in current_url: raise Exception("Cookie 已过期,请重新登录") # 等待私信页面加载(使用多个选择器容错) try: await self.page.wait_for_selector('.private-msg-list-header', timeout=15000) except: # 尝试其他选择器 try: await self.page.wait_for_selector('.weui-desktop-tab__navs__inner', timeout=10000) print(f"[{self.platform_name}] 使用备用选择器加载成功") except: # 截图调试 screenshot_path = f"weixin_private_msg_{int(asyncio.get_event_loop().time())}.png" await self.page.screenshot(path=screenshot_path) print(f"[{self.platform_name}] 页面加载失败,截图: {screenshot_path}") raise Exception(f"私信页面加载超时,当前 URL: {current_url}") print(f"[{self.platform_name}] 私信页面加载完成") # 处理两个 tab total_replied = 0 for tab_name in ["打招呼消息", "私信"]: replied_count = await self._process_tab_sessions(tab_name) total_replied += replied_count print(f"[{self.platform_name}] 自动回复完成,共回复 {total_replied} 条消息") return { 'success': True, 'platform': self.platform_name, 'replied_count': total_replied, 'message': f'成功回复 {total_replied} 条私信' } except Exception as e: import traceback traceback.print_exc() return { 'success': False, 'platform': self.platform_name, 'error': str(e) } async def _process_tab_sessions(self, tab_name: str) -> int: """处理指定 tab 下的所有会话""" print(f"\n🔄 正在处理「{tab_name}」中的所有会话...") if not self.page: return 0 replied_count = 0 try: # 点击 tab if tab_name == "私信": tab_link = self.page.locator('.weui-desktop-tab__navs__inner li').first.locator('a') elif tab_name == "打招呼消息": tab_link = self.page.locator('.weui-desktop-tab__navs__inner li').nth(1).locator('a') else: return 0 if await tab_link.is_visible(): await tab_link.click() print(f" ➤ 已点击「{tab_name}」tab") else: print(f" ❌ 「{tab_name}」tab 不可见") return 0 # 等待会话列表加载 try: await self.page.wait_for_function(""" () => { const hasSession = document.querySelectorAll('.session-wrap').length > 0; const hasEmpty = !!document.querySelector('.empty-text'); return hasSession || hasEmpty; } """, timeout=8000) print(" ✅ 会话列表区域已加载") except: print(" ⚠️ 等待会话列表超时,继续尝试读取...") # 获取会话 session_wraps = self.page.locator('.session-wrap') session_count = await session_wraps.count() print(f" 💬 共找到 {session_count} 个会话") if session_count == 0: return 0 # 遍历每个会话 for idx in range(session_count): try: current_sessions = self.page.locator('.session-wrap') if idx >= await current_sessions.count(): break session = current_sessions.nth(idx) user_name = await session.locator('.name').inner_text() last_preview = await session.locator('.feed-info').inner_text() print(f"\n ➤ [{idx+1}/{session_count}] 正在处理: {user_name} | 最后消息: {last_preview}") await session.click() await asyncio.sleep(2) # 提取聊天历史 history = await self._extract_chat_history() need_reply = (not history) or (not history[-1]["is_author"]) if need_reply: reply_text = await self._generate_reply_with_ai(history) if reply_text=="": reply_text = self._generate_reply(history) # # 生成回复 # if history and history[-1]["is_author"]: # reply_text = await self._generate_reply_with_ai(history) # else: # reply_text = self._generate_reply(history) if reply_text: print(f" 📝 回复内容: {reply_text}") try: textarea = self.page.locator('.edit_area').first send_btn = self.page.locator('button:has-text("发送")').first if await textarea.is_visible() and await send_btn.is_visible(): await textarea.fill(reply_text) await asyncio.sleep(0.5) await send_btn.click() print(" ✅ 已发送") replied_count += 1 await asyncio.sleep(1.5) else: print(" ❌ 输入框或发送按钮不可见") except Exception as e: print(f" ❌ 发送失败: {e}") else: print(" ➤ 无需回复") else: print(" ➤ 最后一条是我发的,跳过回复") except Exception as e: print(f" ❌ 处理会话 {idx+1} 时出错: {e}") continue except Exception as e: print(f"❌ 处理「{tab_name}」失败: {e}") return replied_count async def _extract_chat_history(self) -> list: """精准提取聊天记录,区分作者(自己)和用户""" if not self.page: return [] history = [] message_wrappers = self.page.locator('.session-content-wrapper > div:not(.footer) > .text-wrapper') count = await message_wrappers.count() for i in range(count): try: wrapper = message_wrappers.nth(i) # 判断方向 is_right = await wrapper.locator('.content-right').count() > 0 is_left = await wrapper.locator('.content-left').count() > 0 if not (is_left or is_right): continue # 提取消息文本 pre_el = wrapper.locator('pre.message-plain') content = '' if await pre_el.count() > 0: content = await pre_el.inner_text() content = content.strip() if not content: continue # 获取头像 avatar_img = wrapper.locator('.avatar').first avatar_src = '' if await avatar_img.count() > 0: avatar_src = await avatar_img.get_attribute("src") or '' # 右侧 = 作者(自己) is_author = is_right # 获取用户名 if is_left: name_el = wrapper.locator('.profile .name') author_name = '用户' if await name_el.count() > 0: author_name = await name_el.inner_text() else: author_name = "我" history.append({ "author": author_name, "content": content, "is_author": is_author, "avatar": avatar_src }) except Exception as e: print(f" ⚠️ 解析第 {i+1} 条消息失败: {e}") continue return history async def _generate_reply_with_ai(self, chat_history: list) -> str: """使用 AI 生成智能回复""" import requests import json try: # 获取 AI 配置 ai_api_key = os.environ.get('DASHSCOPE_API_KEY', '') ai_base_url = os.environ.get('DASHSCOPE_BASE_URL', 'https://dashscope.aliyuncs.com/compatible-mode/v1') ai_model = os.environ.get('AI_MODEL', 'qwen-plus') if not ai_api_key: print("⚠️ 未配置 AI API Key,使用规则回复") return self._generate_reply(chat_history) # 构建对话上下文 messages = [{"role": "system", "content": "你是一个友好的微信视频号创作者助手,负责回复粉丝私信。请保持简洁、友好、专业的语气。回复长度不超过20字。"}] for msg in chat_history: role = "assistant" if msg["is_author"] else "user" messages.append({ "role": role, "content": msg["content"] }) # 调用 AI API headers = { 'Authorization': f'Bearer {ai_api_key}', 'Content-Type': 'application/json' } payload = { "model": ai_model, "messages": messages, "max_tokens": 150, "temperature": 0.8 } print(" 🤖 正在调用 AI 生成回复...") response = requests.post( f"{ai_base_url}/chat/completions", headers=headers, json=payload, timeout=30 ) if response.status_code != 200: print(f" ⚠️ AI API 返回错误 {response.status_code},使用规则回复") return self._generate_reply(chat_history) result = response.json() ai_reply = result.get('choices', [{}])[0].get('message', {}).get('content', '').strip() if ai_reply: print(f" ✅ AI 生成回复: {ai_reply}") return ai_reply else: print(" ⚠️ AI 返回空内容,使用规则回复") return self._generate_reply(chat_history) except Exception as e: print(f" ⚠️ AI 回复生成失败: {e},使用规则回复") return self._generate_reply(chat_history) def _generate_reply(self, chat_history: list) -> str: """根据完整聊天历史生成回复(规则回复方式)""" if not chat_history: return "你好!感谢联系~" # 检查最后一条是否是作者发的 if chat_history[-1]["is_author"]: return "" # 不回复 # 找最后一条用户消息 last_user_msg = chat_history[-1]["content"] # 简单规则回复 if "谢谢" in last_user_msg or "感谢" in last_user_msg: return "不客气!欢迎常来交流~" elif "你好" in last_user_msg or "在吗" in last_user_msg: return "你好!请问有什么可以帮您的?" elif "视频" in last_user_msg or "怎么拍" in last_user_msg: return "视频是用手机拍摄的,注意光线和稳定哦!" else: return "收到!我会认真阅读您的留言~"