||
- # -*- coding: utf-8 -*-
- """
- 微信视频号发布器
- 参考: matrix/tencent_uploader/main.py
- """
- import asyncio
- import json
- import os
- from datetime import datetime
- from typing import List
- from .base import (
- BasePublisher, PublishParams, PublishResult,
- WorkItem, WorksResult, CommentItem, CommentsResult
- )
- import os
- import time
- # 允许通过环境变量手动指定“上传视频入口”的选择器,便于在页面结构频繁变更时快速调整
- WEIXIN_UPLOAD_SELECTOR = os.environ.get("WEIXIN_UPLOAD_SELECTOR", "").strip()
- def format_short_title(origin_title: str) -> str:
- """
- 格式化短标题
- - 移除特殊字符
- - 长度限制在 6-16 字符
- """
- allowed_special_chars = "《》"":+?%°"
-
- filtered_chars = [
- char if char.isalnum() or char in allowed_special_chars
- else ' ' if char == ',' else ''
- for char in origin_title
- ]
- formatted_string = ''.join(filtered_chars)
-
- if len(formatted_string) > 16:
- formatted_string = formatted_string[:16]
- elif len(formatted_string) < 6:
- formatted_string += ' ' * (6 - len(formatted_string))
-
- return formatted_string
- class WeixinPublisher(BasePublisher):
- """
- 微信视频号发布器
- 使用 Playwright 自动化操作视频号创作者中心
- 注意: 需要使用 Chrome 浏览器,否则可能出现 H264 编码错误
- """
-
- platform_name = "weixin"
- login_url = "https://channels.weixin.qq.com/platform"
- publish_url = "https://channels.weixin.qq.com/platform/post/create"
- cookie_domain = ".weixin.qq.com"
-
- def _parse_count(self, count_str: str) -> int:
- """解析数字(支持带'万'的格式)"""
- try:
- count_str = count_str.strip()
- if '万' in count_str:
- return int(float(count_str.replace('万', '')) * 10000)
- return int(count_str)
- except:
- return 0
- async def ai_find_upload_selector(self, frame_html: str, frame_name: str = "main") -> str:
- """
- 使用 AI 从 HTML 中识别“上传视频/选择文件”相关元素的 CSS 选择器。
-
- 设计思路:
- - 仅在常规 DOM 选择器都失败时调用,避免频繁占用 AI 配额;
- - 通过 DashScope 文本模型(与验证码识别同一套配置)分析 HTML;
- - 返回一个适合用于 frame.locator(selector) 的 CSS 选择器。
- """
- import json
- import re
- import requests
- import os
- # 避免 HTML 过长导致 token 超限,只截取前 N 字符
- if not frame_html:
- return ""
- max_len = 20000
- if len(frame_html) > max_len:
- frame_html = frame_html[:max_len]
- ai_api_key = os.environ.get("DASHSCOPE_API_KEY", "")
- ai_base_url = os.environ.get("DASHSCOPE_BASE_URL", "https://dashscope.aliyuncs.com/compatible-mode/v1")
- ai_text_model = os.environ.get("AI_TEXT_MODEL", "qwen-plus")
- if not ai_api_key:
- print(f"[{self.platform_name}] AI上传入口识别: 未配置 AI API Key,跳过")
- return ""
- prompt = f"""
- 你是熟悉微信视频号后台的前端工程师,现在需要在一段 HTML 中找到“上传视频文件”的入口。
- 页面说明:
- - 平台:微信视频号(channels.weixin.qq.com)
- - 目标:用于上传视频文件的按钮或 input(一般会触发文件选择框)
- - 你会收到某个 frame 的完整 HTML 片段(不包含截图)。
- 请你根据下面的 HTML,推断最适合用于上传视频文件的元素,并输出一个可以被 Playwright 使用的 CSS 选择器。
- 要求:
- 1. 只考虑“上传/选择视频文件”的入口,不要返回“发布/发表/下一步”等按钮;
- 2. 选择器需要尽量稳定,不要使用自动生成的随机类名(例如带很多随机字母/数字的类名可以用前缀匹配);
- 3. 选择器必须是 CSS 选择器(不要返回 XPath);
- 4. 如果确实找不到合理的上传入口,返回 selector 为空字符串。
- 请以 JSON 格式输出,严格遵守以下结构(不要添加任何解释文字):
- ```json
- {{
- "selector": "CSS 选择器字符串,比如:input[type='file'] 或 div.upload-content input[type='file']"
- }}
- ```
- 下面是 frame=\"{frame_name}\" 的 HTML:
- ```html
- {frame_html}
- ```"""
- payload = {
- "model": ai_text_model,
- "messages": [
- {
- "role": "user",
- "content": prompt,
- }
- ],
- "max_tokens": 600,
- }
- headers = {
- "Authorization": f"Bearer {ai_api_key}",
- "Content-Type": "application/json",
- }
- try:
- print(f"[{self.platform_name}] AI上传入口识别: 正在分析 frame={frame_name} HTML...")
- resp = requests.post(
- f"{ai_base_url}/chat/completions",
- headers=headers,
- json=payload,
- timeout=40,
- )
- if resp.status_code != 200:
- print(f"[{self.platform_name}] AI上传入口识别: API 返回错误 {resp.status_code}")
- return ""
- data = resp.json()
- content = data.get("choices", [{}])[0].get("message", {}).get("content", "") or ""
- # 尝试从 ```json``` 代码块中解析
- json_match = re.search(r"```json\\s*([\\s\\S]*?)\\s*```", content)
- if json_match:
- json_str = json_match.group(1)
- else:
- json_match = re.search(r"\\{[\\s\\S]*\\}", content)
- json_str = json_match.group(0) if json_match else "{}"
- try:
- result = json.loads(json_str)
- except Exception:
- result = {}
- selector = (result.get("selector") or "").strip()
- print(f"[{self.platform_name}] AI上传入口识别结果: selector='{selector}'")
- return selector
- except Exception as e:
- print(f"[{self.platform_name}] AI上传入口识别异常: {e}")
- return ""
- async def ai_pick_selector_from_candidates(self, candidates: list, goal: str, frame_name: str = "main") -> str:
- """
- 将“候选元素列表(包含 css selector + 文本/属性)”发给 AI,让 AI 直接挑选最符合 goal 的元素。
- 适用于:HTML 里看不出上传入口、或页面大量动态渲染时。
- """
- import json
- import re
- import requests
- import os
- if not candidates:
- return ""
- ai_api_key = os.environ.get("DASHSCOPE_API_KEY", "")
- ai_base_url = os.environ.get("DASHSCOPE_BASE_URL", "https://dashscope.aliyuncs.com/compatible-mode/v1")
- ai_text_model = os.environ.get("AI_TEXT_MODEL", "qwen-plus")
- if not ai_api_key:
- print(f"[{self.platform_name}] AI候选选择器: 未配置 AI API Key,跳过")
- return ""
- # 控制长度,最多取前 120 个候选
- candidates = candidates[:120]
- prompt = f"""
- 你是自动化发布工程师。现在要在微信视频号(channels.weixin.qq.com)发布页面里找到“{goal}”相关的入口元素。
- 我会给你一组候选元素,每个候选都包含:
- - css: 可直接用于 Playwright 的 CSS 选择器
- - tag / type / role / ariaLabel / text / id / className(部分字段可能为空)
- 你的任务:
- - 从候选中选出最可能用于“{goal}”的元素,返回它的 css 选择器;
- - 如果没有任何候选符合,返回空字符串。
- 注意:
- - 如果 goal 是“上传视频入口”,优先选择 input[type=file] 或看起来会触发选择文件/上传的区域;
- - 不要选择“发布/发表/下一步”等按钮(除非 goal 明确是发布按钮)。
- 请严格按 JSON 输出(不要解释):
- ```json
- {{ "selector": "..." }}
- ```
- 候选列表(frame={frame_name}):
- ```json
- {json.dumps(candidates, ensure_ascii=False)}
- ```"""
- payload = {
- "model": ai_text_model,
- "messages": [{"role": "user", "content": prompt}],
- "max_tokens": 400,
- }
- headers = {
- "Authorization": f"Bearer {ai_api_key}",
- "Content-Type": "application/json",
- }
- try:
- print(f"[{self.platform_name}] AI候选选择器: 正在分析 frame={frame_name}, goal={goal} ...")
- resp = requests.post(
- f"{ai_base_url}/chat/completions",
- headers=headers,
- json=payload,
- timeout=40,
- )
- if resp.status_code != 200:
- print(f"[{self.platform_name}] AI候选选择器: API 返回错误 {resp.status_code}")
- return ""
- data = resp.json()
- content = data.get("choices", [{}])[0].get("message", {}).get("content", "") or ""
- json_match = re.search(r"```json\\s*([\\s\\S]*?)\\s*```", content)
- if json_match:
- json_str = json_match.group(1)
- else:
- json_match = re.search(r"\\{[\\s\\S]*\\}", content)
- json_str = json_match.group(0) if json_match else "{}"
- try:
- result = json.loads(json_str)
- except Exception:
- result = {}
- selector = (result.get("selector") or "").strip()
- print(f"[{self.platform_name}] AI候选选择器结果: selector='{selector}'")
- return selector
- except Exception as e:
- print(f"[{self.platform_name}] AI候选选择器异常: {e}")
- return ""
- async def _extract_relevant_html_snippets(self, html: str) -> str:
- """
- 从 HTML 中抽取与上传相关的片段,减少 token,提升 AI 命中率。
- - 优先抓取包含 upload/上传/file/input 等关键词的窗口片段
- - 若未命中关键词,返回“开头 + 结尾”的拼接
- """
- import re
- if not html:
- return ""
- patterns = [
- r"upload",
- r"uploader",
- r"file",
- r"type\\s*=\\s*['\\\"]file['\\\"]",
- r"input",
- r"drag",
- r"drop",
- r"选择",
- r"上传",
- r"添加",
- r"视频",
- ]
- regex = re.compile("|".join(patterns), re.IGNORECASE)
- snippets = []
- for m in regex.finditer(html):
- start = max(0, m.start() - 350)
- end = min(len(html), m.end() + 350)
- snippets.append(html[start:end])
- if len(snippets) >= 18:
- break
- if snippets:
- # 去重(粗略)
- unique = []
- seen = set()
- for s in snippets:
- key = hash(s)
- if key not in seen:
- seen.add(key)
- unique.append(s)
- return "\n\n<!-- SNIPPET -->\n\n".join(unique)[:20000]
- # fallback: head + tail
- head = html[:9000]
- tail = html[-9000:] if len(html) > 9000 else ""
- return (head + "\n\n<!-- TAIL -->\n\n" + tail)[:20000]
-
- async def init_browser(self, storage_state: str = None):
- """初始化浏览器 - 参考 matrix 使用 channel=chrome 避免 H264 编码错误"""
- from playwright.async_api import async_playwright
-
- playwright = await async_playwright().start()
-
- # 参考 matrix: 使用系统内的 Chrome 浏览器,避免 H264 编码错误
- # 如果没有安装 Chrome,则使用默认 Chromium
- try:
- self.browser = await playwright.chromium.launch(
- # headless=self.headless,
- headless=False,
- channel="chrome" # 使用系统 Chrome
- )
- print(f"[{self.platform_name}] 使用系统 Chrome 浏览器")
- except Exception as e:
- print(f"[{self.platform_name}] Chrome 不可用,使用 Chromium: {e}")
- self.browser = await playwright.chromium.launch(headless=self.headless)
-
- # 设置 HTTP Headers 防止重定向
- headers = {
- "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
- "Referer": "https://channels.weixin.qq.com/platform/post/list",
- }
-
- self.context = await self.browser.new_context(
- extra_http_headers=headers,
- ignore_https_errors=True,
- viewport={"width": 1920, "height": 1080},
- user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36"
- )
-
- self.page = await self.context.new_page()
- return self.page
-
- async def set_schedule_time(self, publish_date: datetime):
- """设置定时发布"""
- if not self.page:
- return
-
- print(f"[{self.platform_name}] 设置定时发布...")
-
- # 点击定时选项
- label_element = self.page.locator("label").filter(has_text="定时").nth(1)
- await label_element.click()
-
- # 选择日期
- await self.page.click('input[placeholder="请选择发表时间"]')
-
- publish_month = f"{publish_date.month:02d}"
- current_month = f"{publish_month}月"
-
- # 检查月份
- page_month = await self.page.inner_text('span.weui-desktop-picker__panel__label:has-text("月")')
- if page_month != current_month:
- await self.page.click('button.weui-desktop-btn__icon__right')
-
- # 选择日期
- elements = await self.page.query_selector_all('table.weui-desktop-picker__table a')
- for element in elements:
- class_name = await element.evaluate('el => el.className')
- if 'weui-desktop-picker__disabled' in class_name:
- continue
- text = await element.inner_text()
- if text.strip() == str(publish_date.day):
- await element.click()
- break
-
- # 输入时间
- await self.page.click('input[placeholder="请选择时间"]')
- await self.page.keyboard.press("Control+KeyA")
- await self.page.keyboard.type(str(publish_date.hour))
-
- # 点击其他地方确认
- await self.page.locator("div.input-editor").click()
-
- async def handle_upload_error(self, video_path: str):
- """处理上传错误"""
- if not self.page:
- return
-
- print(f"[{self.platform_name}] 视频出错了,重新上传中...")
- await self.page.locator('div.media-status-content div.tag-inner:has-text("删除")').click()
- await self.page.get_by_role('button', name="删除", exact=True).click()
- file_input = self.page.locator('input[type="file"]')
- await file_input.set_input_files(video_path)
-
- async def add_title_tags(self, params: PublishParams):
- """添加标题和话题"""
- if not self.page:
- return
-
- await self.page.locator("div.input-editor").click()
- await self.page.keyboard.type(params.title)
-
- if params.tags:
- await self.page.keyboard.press("Enter")
- for tag in params.tags:
- await self.page.keyboard.type("#" + tag)
- await self.page.keyboard.press("Space")
-
- print(f"[{self.platform_name}] 成功添加标题和 {len(params.tags)} 个话题")
-
- async def add_short_title(self):
- """添加短标题"""
- if not self.page:
- return
-
- try:
- short_title_element = self.page.get_by_text("短标题", exact=True).locator("..").locator(
- "xpath=following-sibling::div").locator('span input[type="text"]')
- if await short_title_element.count():
- # 获取已有内容作为短标题
- pass
- except:
- pass
-
- async def upload_cover(self, cover_path: str):
- """上传封面图"""
- if not self.page or not cover_path or not os.path.exists(cover_path):
- return
-
- try:
- await asyncio.sleep(2)
- preview_btn_info = await self.page.locator(
- 'div.finder-tag-wrap.btn:has-text("更换封面")').get_attribute('class')
-
- if "disabled" not in preview_btn_info:
- await self.page.locator('div.finder-tag-wrap.btn:has-text("更换封面")').click()
- await self.page.locator('div.single-cover-uploader-wrap > div.wrap').hover()
-
- # 删除现有封面
- if await self.page.locator(".del-wrap > .svg-icon").count():
- await self.page.locator(".del-wrap > .svg-icon").click()
-
- # 上传新封面
- preview_div = self.page.locator("div.single-cover-uploader-wrap > div.wrap")
- async with self.page.expect_file_chooser() as fc_info:
- await preview_div.click()
- preview_chooser = await fc_info.value
- await preview_chooser.set_files(cover_path)
-
- await asyncio.sleep(2)
- await self.page.get_by_role("button", name="确定").click()
- await asyncio.sleep(1)
- await self.page.get_by_role("button", name="确认").click()
-
- print(f"[{self.platform_name}] 封面上传成功")
- except Exception as e:
- print(f"[{self.platform_name}] 封面上传失败: {e}")
-
- async def check_captcha(self) -> dict:
- """检查页面是否需要验证码"""
- if not self.page:
- return {'need_captcha': False, 'captcha_type': ''}
-
- try:
- # 检查各种验证码
- captcha_selectors = [
- 'text="请输入验证码"',
- 'text="滑动验证"',
- '[class*="captcha"]',
- '[class*="verify"]',
- ]
- for selector in captcha_selectors:
- try:
- if await self.page.locator(selector).count() > 0:
- print(f"[{self.platform_name}] 检测到验证码: {selector}")
- return {'need_captcha': True, 'captcha_type': 'image'}
- except:
- pass
-
- # 检查登录弹窗
- login_selectors = [
- 'text="请登录"',
- 'text="扫码登录"',
- '[class*="login-dialog"]',
- ]
- for selector in login_selectors:
- try:
- if await self.page.locator(selector).count() > 0:
- print(f"[{self.platform_name}] 检测到需要登录: {selector}")
- return {'need_captcha': True, 'captcha_type': 'login'}
- except:
- pass
-
- except Exception as e:
- print(f"[{self.platform_name}] 验证码检测异常: {e}")
-
- return {'need_captcha': False, 'captcha_type': ''}
- async def publish(self, cookies: str, params: PublishParams) -> PublishResult:
- """发布视频到视频号"""
- print(f"\n{'='*60}")
- print(f"[{self.platform_name}] 开始发布视频")
- print(f"[{self.platform_name}] 视频路径: {params.video_path}")
- print(f"[{self.platform_name}] 标题: {params.title}")
- print(f"[{self.platform_name}] Headless: {self.headless}")
- print(f"{'='*60}")
-
- self.report_progress(5, "正在初始化浏览器...")
-
- # 初始化浏览器(使用 Chrome)
- await self.init_browser()
- print(f"[{self.platform_name}] 浏览器初始化完成")
-
- # 解析并设置 cookies
- cookie_list = self.parse_cookies(cookies)
- print(cookie_list)
- print(f"[{self.platform_name}] 解析到 {len(cookie_list)} 个 cookies")
- await self.set_cookies(cookie_list)
-
- if not self.page:
- raise Exception("Page not initialized")
-
- # 检查视频文件
- if not os.path.exists(params.video_path):
- raise Exception(f"视频文件不存在: {params.video_path}")
-
- print(f"[{self.platform_name}] 视频文件存在,大小: {os.path.getsize(params.video_path)} bytes")
-
- self.report_progress(10, "正在打开上传页面...")
-
- # 访问上传页面
- await self.page.goto(self.publish_url, wait_until="networkidle", timeout=60000)
- await asyncio.sleep(3)
-
- # 检查是否跳转到登录页
- current_url = self.page.url
- print(f"[{self.platform_name}] 当前页面: {current_url}")
-
- if "login" in current_url:
- screenshot_base64 = await self.capture_screenshot()
- return PublishResult(
- success=False,
- platform=self.platform_name,
- error="Cookie 已过期,需要重新登录",
- need_captcha=True,
- captcha_type='login',
- screenshot_base64=screenshot_base64,
- page_url=current_url,
- status='need_captcha'
- )
-
- # 使用 AI 检查验证码
- ai_captcha = await self.ai_check_captcha()
- if ai_captcha['has_captcha']:
- print(f"[{self.platform_name}] AI检测到验证码: {ai_captcha['captcha_type']}", flush=True)
- screenshot_base64 = await self.capture_screenshot()
- return PublishResult(
- success=False,
- platform=self.platform_name,
- error=f"检测到{ai_captcha['captcha_type']}验证码,需要使用有头浏览器完成验证",
- need_captcha=True,
- captcha_type=ai_captcha['captcha_type'],
- screenshot_base64=screenshot_base64,
- page_url=current_url,
- status='need_captcha'
- )
-
- # 传统方式检查验证码
- captcha_result = await self.check_captcha()
- if captcha_result['need_captcha']:
- screenshot_base64 = await self.capture_screenshot()
- return PublishResult(
- success=False,
- platform=self.platform_name,
- error=f"需要{captcha_result['captcha_type']}验证码,请使用有头浏览器完成验证",
- need_captcha=True,
- captcha_type=captcha_result['captcha_type'],
- screenshot_base64=screenshot_base64,
- page_url=current_url,
- status='need_captcha'
- )
-
- self.report_progress(15, "正在选择视频文件...")
-
- # 上传视频
- # 说明:视频号发布页在不同账号/地区/灰度下 DOM 结构差异较大,且上传组件可能在 iframe 中。
- # 因此这里按 matrix 的思路“点击触发 file chooser”,同时增加“遍历全部 frame + 精确挑选 video input”的兜底。
- upload_success = False
- if not self.page:
- raise Exception("Page not initialized")
- # 等待页面把上传区域渲染出来(避免过早判断)
- try:
- await self.page.wait_for_selector("div.upload-content, input[type='file'], iframe", timeout=20000)
- except Exception:
- pass
- async def _try_set_files_in_frame(frame, frame_name: str) -> bool:
- """在指定 frame 中尝试触发上传"""
- nonlocal upload_success
- if upload_success:
- return True
- # 方法0:如果用户通过环境变量显式配置了选择器,优先尝试这个
- if WEIXIN_UPLOAD_SELECTOR:
- try:
- el = frame.locator(WEIXIN_UPLOAD_SELECTOR).first
- if await el.count() > 0 and await el.is_visible():
- print(f"[{self.platform_name}] [{frame_name}] 使用环境变量 WEIXIN_UPLOAD_SELECTOR: {WEIXIN_UPLOAD_SELECTOR}")
- try:
- async with self.page.expect_file_chooser(timeout=5000) as fc_info:
- await el.click()
- chooser = await fc_info.value
- await chooser.set_files(params.video_path)
- upload_success = True
- print(f"[{self.platform_name}] [{frame_name}] 通过环境变量选择器上传成功")
- return True
- except Exception as e:
- print(f"[{self.platform_name}] [{frame_name}] 环境变量选择器点击失败,尝试直接 set_input_files: {e}")
- try:
- await el.set_input_files(params.video_path)
- upload_success = True
- print(f"[{self.platform_name}] [{frame_name}] 环境变量选择器 set_input_files 成功")
- return True
- except Exception as e2:
- print(f"[{self.platform_name}] [{frame_name}] 环境变量选择器 set_input_files 仍失败: {e2}")
- except Exception as e:
- print(f"[{self.platform_name}] [{frame_name}] 使用环境变量选择器定位元素失败: {e}")
- # 先尝试点击上传区域触发 chooser(最贴近 matrix)
- click_selectors = [
- "div.upload-content",
- "div[class*='upload-content']",
- "div[class*='upload']",
- "div.add-wrap",
- "[class*='uploader']",
- "text=点击上传",
- "text=上传视频",
- "text=选择视频",
- ]
- for selector in click_selectors:
- try:
- el = frame.locator(selector).first
- if await el.count() > 0 and await el.is_visible():
- print(f"[{self.platform_name}] [{frame_name}] 找到可点击上传区域: {selector}")
- try:
- async with self.page.expect_file_chooser(timeout=5000) as fc_info:
- await el.click()
- chooser = await fc_info.value
- await chooser.set_files(params.video_path)
- upload_success = True
- print(f"[{self.platform_name}] [{frame_name}] 通过 file chooser 上传成功")
- return True
- except Exception as e:
- print(f"[{self.platform_name}] [{frame_name}] 点击触发 chooser 失败: {e}")
- except Exception:
- pass
- # 再尝试直接设置 input[type=file](iframe/隐藏 input 常见)
- try:
- inputs = frame.locator("input[type='file']")
- cnt = await inputs.count()
- if cnt > 0:
- best_idx = 0
- best_score = -1
- for i in range(cnt):
- try:
- inp = inputs.nth(i)
- accept = (await inp.get_attribute("accept")) or ""
- multiple = (await inp.get_attribute("multiple")) or ""
- score = 0
- if "video" in accept:
- score += 10
- if "mp4" in accept:
- score += 3
- if multiple:
- score += 1
- if score > best_score:
- best_score = score
- best_idx = i
- except Exception:
- continue
- target = inputs.nth(best_idx)
- print(f"[{self.platform_name}] [{frame_name}] 尝试对 input[{best_idx}] set_input_files (score={best_score})")
- await target.set_input_files(params.video_path)
- upload_success = True
- print(f"[{self.platform_name}] [{frame_name}] 通过 file input 上传成功")
- return True
- except Exception as e:
- print(f"[{self.platform_name}] [{frame_name}] file input 上传失败: {e}")
- # 不直接返回,让后面的 AI 兜底有机会执行
- # 方法4: 兜底使用 AI 分析 HTML,猜测上传入口
- try:
- frame_url = getattr(frame, "url", "")
- html_full = await frame.content()
- html_for_ai = await self._extract_relevant_html_snippets(html_full)
- print(f"[{self.platform_name}] [{frame_name}] frame_url={frame_url}, html_len={len(html_full)}, html_for_ai_len={len(html_for_ai)}")
- ai_selector = await self.ai_find_upload_selector(html_for_ai, frame_name=frame_name)
- if ai_selector:
- try:
- el = frame.locator(ai_selector).first
- if await el.count() > 0:
- print(f"[{self.platform_name}] [{frame_name}] 使用 AI 选择器点击上传入口: {ai_selector}")
- try:
- async with self.page.expect_file_chooser(timeout=5000) as fc_info:
- await el.click()
- chooser = await fc_info.value
- await chooser.set_files(params.video_path)
- upload_success = True
- print(f"[{self.platform_name}] [{frame_name}] 通过 AI 选择器上传成功")
- return True
- except Exception as e:
- print(f"[{self.platform_name}] [{frame_name}] AI 选择器点击失败,改为直接 set_input_files: {e}")
- try:
- await el.set_input_files(params.video_path)
- upload_success = True
- print(f"[{self.platform_name}] [{frame_name}] AI 选择器直接 set_input_files 成功")
- return True
- except Exception as e2:
- print(f"[{self.platform_name}] [{frame_name}] AI 选择器 set_input_files 仍失败: {e2}")
- except Exception as e:
- print(f"[{self.platform_name}] [{frame_name}] 使用 AI 选择器定位元素失败: {e}")
- else:
- # 如果 AI 无法从 HTML 推断,退一步:构造候选元素列表交给 AI 选择
- try:
- candidates = await frame.evaluate("""
- () => {
- function cssEscape(s) {
- try { return CSS.escape(s); } catch (e) { return s.replace(/[^a-zA-Z0-9_-]/g, '\\\\$&'); }
- }
- function buildSelector(el) {
- if (!el || el.nodeType !== 1) return '';
- if (el.id) return `#${cssEscape(el.id)}`;
- let parts = [];
- let cur = el;
- for (let depth = 0; cur && cur.nodeType === 1 && depth < 5; depth++) {
- let part = cur.tagName.toLowerCase();
- const role = cur.getAttribute('role');
- const type = cur.getAttribute('type');
- if (type) part += `[type="${type}"]`;
- if (role) part += `[role="${role}"]`;
- const cls = (cur.className || '').toString().trim().split(/\\s+/).filter(Boolean);
- if (cls.length) part += '.' + cls.slice(0, 2).map(cssEscape).join('.');
- // nth-of-type
- let idx = 1;
- let sib = cur;
- while (sib && (sib = sib.previousElementSibling)) {
- if (sib.tagName === cur.tagName) idx++;
- }
- part += `:nth-of-type(${idx})`;
- parts.unshift(part);
- cur = cur.parentElement;
- }
- return parts.join(' > ');
- }
- const nodes = Array.from(document.querySelectorAll('input, button, a, div, span'))
- .filter(el => {
- const tag = el.tagName.toLowerCase();
- const type = (el.getAttribute('type') || '').toLowerCase();
- const role = (el.getAttribute('role') || '').toLowerCase();
- const aria = (el.getAttribute('aria-label') || '').toLowerCase();
- const txt = (el.innerText || '').trim().slice(0, 60);
- const cls = (el.className || '').toString().toLowerCase();
- const isFile = tag === 'input' && type === 'file';
- const looksClickable =
- tag === 'button' || tag === 'a' || role === 'button' || el.onclick ||
- cls.includes('upload') || cls.includes('uploader') || cls.includes('drag') ||
- aria.includes('上传') || aria.includes('选择') || aria.includes('添加') ||
- txt.includes('上传') || txt.includes('选择') || txt.includes('添加') || txt.includes('点击上传');
- if (!isFile && !looksClickable) return false;
- const r = el.getBoundingClientRect();
- const visible = r.width > 5 && r.height > 5;
- return visible;
- });
- const limited = nodes.slice(0, 120).map(el => ({
- css: buildSelector(el),
- tag: el.tagName.toLowerCase(),
- type: el.getAttribute('type') || '',
- role: el.getAttribute('role') || '',
- ariaLabel: el.getAttribute('aria-label') || '',
- text: (el.innerText || '').trim().slice(0, 80),
- id: el.id || '',
- className: (el.className || '').toString().slice(0, 120),
- accept: el.getAttribute('accept') || '',
- }));
- return limited;
- }
- """)
- ai_selector2 = await self.ai_pick_selector_from_candidates(
- candidates=candidates,
- goal="上传视频入口",
- frame_name=frame_name
- )
- if ai_selector2:
- el2 = frame.locator(ai_selector2).first
- if await el2.count() > 0:
- print(f"[{self.platform_name}] [{frame_name}] 使用 AI 候选选择器点击上传入口: {ai_selector2}")
- try:
- async with self.page.expect_file_chooser(timeout=5000) as fc_info:
- await el2.click()
- chooser2 = await fc_info.value
- await chooser2.set_files(params.video_path)
- upload_success = True
- print(f"[{self.platform_name}] [{frame_name}] 通过 AI 候选选择器上传成功")
- return True
- except Exception as e:
- print(f"[{self.platform_name}] [{frame_name}] AI 候选选择器点击失败,尝试 set_input_files: {e}")
- try:
- await el2.set_input_files(params.video_path)
- upload_success = True
- print(f"[{self.platform_name}] [{frame_name}] AI 候选选择器 set_input_files 成功")
- return True
- except Exception as e2:
- print(f"[{self.platform_name}] [{frame_name}] AI 候选选择器 set_input_files 仍失败: {e2}")
- except Exception as e:
- print(f"[{self.platform_name}] [{frame_name}] 构造候选并交给 AI 失败: {e}")
- except Exception as e:
- print(f"[{self.platform_name}] [{frame_name}] AI 上传入口识别整体失败: {e}")
- return False
- # 先尝试主 frame
- try:
- await _try_set_files_in_frame(self.page.main_frame, "main")
- except Exception as e:
- print(f"[{self.platform_name}] main frame 上传尝试异常: {e}")
- # 再遍历所有子 frame
- if not upload_success:
- try:
- frames = self.page.frames
- print(f"[{self.platform_name}] 发现 frames: {len(frames)}")
- for idx, fr in enumerate(frames):
- if upload_success:
- break
- # main_frame 已尝试过
- if fr == self.page.main_frame:
- continue
- name = fr.name or f"frame-{idx}"
- await _try_set_files_in_frame(fr, name)
- except Exception as e:
- print(f"[{self.platform_name}] 遍历 frames 异常: {e}")
-
- if not upload_success:
- screenshot_base64 = await self.capture_screenshot()
- return PublishResult(
- success=False,
- platform=self.platform_name,
- error="未找到上传入口(可能在 iframe 中或页面结构已变更)",
- screenshot_base64=screenshot_base64,
- page_url=await self.get_page_url(),
- status='failed'
- )
-
- self.report_progress(20, "正在填充标题和话题...")
-
- # 添加标题和话题
- await self.add_title_tags(params)
-
- self.report_progress(30, "等待视频上传完成...")
-
- # 等待上传完成
- for _ in range(120):
- try:
- button_info = await self.page.get_by_role("button", name="发表").get_attribute('class')
- if "weui-desktop-btn_disabled" not in button_info:
- print(f"[{self.platform_name}] 视频上传完毕")
-
- # 上传封面
- self.report_progress(50, "正在上传封面...")
- await self.upload_cover(params.cover_path)
- break
- else:
- # 检查上传错误
- if await self.page.locator('div.status-msg.error').count():
- if await self.page.locator('div.media-status-content div.tag-inner:has-text("删除")').count():
- await self.handle_upload_error(params.video_path)
-
- await asyncio.sleep(3)
- except:
- await asyncio.sleep(3)
-
- self.report_progress(60, "处理视频设置...")
-
- # 添加短标题
- try:
- short_title_el = self.page.get_by_text("短标题", exact=True).locator("..").locator(
- "xpath=following-sibling::div").locator('span input[type="text"]')
- if await short_title_el.count():
- short_title = format_short_title(params.title)
- await short_title_el.fill(short_title)
- except:
- pass
-
- # 定时发布
- if params.publish_date:
- self.report_progress(70, "设置定时发布...")
- await self.set_schedule_time(params.publish_date)
-
- self.report_progress(80, "正在发布...")
-
- # 点击发布 - 参考 matrix
- for i in range(30):
- try:
- # 参考 matrix: div.form-btns button:has-text("发表")
- publish_btn = self.page.locator('div.form-btns button:has-text("发表")')
- if await publish_btn.count():
- print(f"[{self.platform_name}] 点击发布按钮...")
- await publish_btn.click()
-
- # 等待跳转到作品列表页面 - 参考 matrix
- await self.page.wait_for_url(
- "https://channels.weixin.qq.com/platform/post/list",
- timeout=10000
- )
- self.report_progress(100, "发布成功")
- print(f"[{self.platform_name}] 视频发布成功!")
- screenshot_base64 = await self.capture_screenshot()
- return PublishResult(
- success=True,
- platform=self.platform_name,
- message="发布成功",
- screenshot_base64=screenshot_base64,
- page_url=self.page.url,
- status='success'
- )
- except Exception as e:
- current_url = self.page.url
- if "https://channels.weixin.qq.com/platform/post/list" in current_url:
- self.report_progress(100, "发布成功")
- print(f"[{self.platform_name}] 视频发布成功!")
- screenshot_base64 = await self.capture_screenshot()
- return PublishResult(
- success=True,
- platform=self.platform_name,
- message="发布成功",
- screenshot_base64=screenshot_base64,
- page_url=current_url,
- status='success'
- )
- else:
- print(f"[{self.platform_name}] 视频正在发布中... {i+1}/30, URL: {current_url}")
- await asyncio.sleep(1)
-
- # 发布超时
- screenshot_base64 = await self.capture_screenshot()
- page_url = await self.get_page_url()
- return PublishResult(
- success=False,
- platform=self.platform_name,
- error="发布超时,请检查发布状态",
- screenshot_base64=screenshot_base64,
- page_url=page_url,
- status='need_action'
- )
- async def _get_works_fallback_dom(self, page_size: int) -> tuple:
- """API 失败时从当前页面 DOM 抓取作品列表(兼容新账号/不同入口)"""
- works: List[WorkItem] = []
- total = 0
- has_more = False
- try:
- for selector in ["div.post-feed-item", "[class*='post-feed']", "[class*='feed-item']", "div[class*='post']"]:
- try:
- await self.page.wait_for_selector(selector, timeout=8000)
- break
- except Exception:
- continue
- post_items = self.page.locator("div.post-feed-item")
- item_count = await post_items.count()
- if item_count == 0:
- post_items = self.page.locator("[class*='post-feed']")
- item_count = await post_items.count()
- for i in range(min(item_count, page_size)):
- try:
- item = post_items.nth(i)
- cover_el = item.locator("div.media img.thumb").first
- cover_url = await cover_el.get_attribute("src") or "" if await cover_el.count() > 0 else ""
- if not cover_url:
- cover_el = item.locator("img").first
- cover_url = await cover_el.get_attribute("src") or "" if await cover_el.count() > 0 else ""
- title_el = item.locator("div.post-title").first
- title = (await title_el.text_content() or "").strip() if await title_el.count() > 0 else ""
- time_el = item.locator("div.post-time span").first
- publish_time = (await time_el.text_content() or "").strip() if await time_el.count() > 0 else ""
- play_count = like_count = comment_count = share_count = collect_count = 0
- data_items = item.locator("div.post-data div.data-item")
- for j in range(await data_items.count()):
- data_item = data_items.nth(j)
- count_text = (await data_item.locator("span.count").text_content() or "0").strip()
- if await data_item.locator("span.weui-icon-outlined-eyes-on").count() > 0:
- play_count = self._parse_count(count_text)
- elif await data_item.locator("span.weui-icon-outlined-like").count() > 0:
- like_count = self._parse_count(count_text)
- elif await data_item.locator("span.weui-icon-outlined-comment").count() > 0:
- comment_count = self._parse_count(count_text)
- elif await data_item.locator("use[xlink\\:href='#icon-share']").count() > 0:
- share_count = self._parse_count(count_text)
- elif await data_item.locator("use[xlink\\:href='#icon-thumb']").count() > 0:
- collect_count = self._parse_count(count_text)
- work_id = f"weixin_{i}_{hash(title)}_{hash(publish_time)}"
- works.append(WorkItem(
- work_id=work_id,
- title=title or "无标题",
- cover_url=cover_url,
- duration=0,
- status="published",
- publish_time=publish_time,
- play_count=play_count,
- like_count=like_count,
- comment_count=comment_count,
- share_count=share_count,
- collect_count=collect_count,
- ))
- except Exception as e:
- print(f"[{self.platform_name}] DOM 解析作品 {i} 失败: {e}", flush=True)
- continue
- total = len(works)
- has_more = item_count > page_size
- print(f"[{self.platform_name}] DOM 回退获取 {len(works)} 条", flush=True)
- except Exception as e:
- print(f"[{self.platform_name}] DOM 回退失败: {e}", flush=True)
- return (works, total, has_more, "")
-
- async def get_works(self, cookies: str, page: int = 0, page_size: int = 20) -> WorksResult:
- """获取视频号作品列表(调用 post_list 接口)
- page: 页码从 0 开始,或上一页返回的 rawKeyBuff/lastBuff 字符串
- """
- # 分页:首页 currentPage=1/rawKeyBuff=null,下一页用 currentPage 递增或 rawKeyBuff
- if page is None or page == "" or (isinstance(page, int) and page == 0):
- current_page = 1
- raw_key_buff = None
- elif isinstance(page, int):
- current_page = page + 1
- raw_key_buff = None
- else:
- current_page = 1
- raw_key_buff = str(page)
- ts_ms = str(int(time.time() * 1000))
- print(f"\n{'='*60}")
- print(f"[{self.platform_name}] 获取作品列表 currentPage={current_page}, pageSize={page_size}, rawKeyBuff={raw_key_buff[:40] if raw_key_buff else 'null'}...")
- print(f"{'='*60}")
-
- works: List[WorkItem] = []
- total = 0
- has_more = False
- next_page = ""
-
- try:
- await self.init_browser()
- cookie_list = self.parse_cookies(cookies)
- await self.set_cookies(cookie_list)
-
- if not self.page:
- raise Exception("Page not initialized")
-
- await self.page.goto("https://channels.weixin.qq.com/platform/post/list", timeout=30000)
- await asyncio.sleep(3)
-
- current_url = self.page.url
- if "login" in current_url:
- raise Exception("Cookie 已过期,请重新登录")
-
- api_url = "https://channels.weixin.qq.com/micro/content/cgi-bin/mmfinderassistant-bin/post/post_list"
- req_body = {
- "pageSize": page_size,
- "currentPage": current_page,
- "userpageType": 11,
- "stickyOrder": True,
- "timestamp": ts_ms,
- "_log_finder_uin": "",
- "_log_finder_id": "",
- "rawKeyBuff": raw_key_buff,
- "pluginSessionId": None,
- "scene": 7,
- "reqScene": 7,
- }
- body_str = json.dumps(req_body)
-
- response = await self.page.evaluate("""
- async ([url, bodyStr]) => {
- try {
- const resp = await fetch(url, {
- method: 'POST',
- credentials: 'include',
- headers: {
- 'Content-Type': 'application/json',
- 'Accept': '*/*',
- 'Referer': 'https://channels.weixin.qq.com/platform/post/list'
- },
- body: bodyStr
- });
- return await resp.json();
- } catch (e) {
- return { error: e.toString() };
- }
- }
- """, [api_url, body_str])
-
- is_first_page = current_page == 1 and raw_key_buff is None
- if response.get("error"):
- print(f"[{self.platform_name}] API 请求失败: {response.get('error')}", flush=True)
- if is_first_page:
- works, total, has_more, next_page = await self._get_works_fallback_dom(page_size)
- if works:
- return WorksResult(success=True, platform=self.platform_name, works=works, total=total, has_more=has_more, next_page=next_page)
- return WorksResult(success=False, platform=self.platform_name, error=response.get("error", "API 请求失败"))
-
- err_code = response.get("errCode", -1)
- if err_code != 0:
- err_msg = response.get("errMsg", "unknown")
- print(f"[{self.platform_name}] API errCode={err_code}, errMsg={err_msg}, 完整响应(前800字): {json.dumps(response, ensure_ascii=False)[:800]}", flush=True)
- if is_first_page:
- works, total, has_more, next_page = await self._get_works_fallback_dom(page_size)
- if works:
- return WorksResult(success=True, platform=self.platform_name, works=works, total=total, has_more=has_more, next_page=next_page)
- return WorksResult(success=False, platform=self.platform_name, error=f"errCode={err_code}, errMsg={err_msg}")
-
- data = response.get("data") or {}
- raw_list = data.get("list") or []
- total = int(data.get("totalCount") or 0)
- has_more = bool(data.get("continueFlag", False))
- next_page = (data.get("lastBuff") or "").strip()
-
- print(f"[{self.platform_name}] API 响应: list_len={len(raw_list)}, totalCount={total}, continueFlag={has_more}, lastBuff={next_page[:50] if next_page else ''}...")
-
- if is_first_page and len(raw_list) == 0:
- works_fb, total_fb, has_more_fb, _ = await self._get_works_fallback_dom(page_size)
- if works_fb:
- return WorksResult(success=True, platform=self.platform_name, works=works_fb, total=total_fb, has_more=has_more_fb, next_page="")
-
- for item in raw_list:
- try:
- work_id = str(item.get("objectId") or item.get("id") or "").strip()
- if not work_id:
- work_id = f"weixin_{hash(item.get('createTime',0))}_{hash(item.get('desc', {}).get('description',''))}"
-
- desc = item.get("desc") or {}
- title = (desc.get("description") or "").strip() or "无标题"
- cover_url = ""
- duration = 0
- media_list = desc.get("media") or []
- if media_list and isinstance(media_list[0], dict):
- m = media_list[0]
- cover_url = (m.get("coverUrl") or m.get("thumbUrl") or "").strip()
- duration = int(m.get("videoPlayLen") or 0)
-
- create_ts = item.get("createTime") or 0
- if isinstance(create_ts, (int, float)) and create_ts:
- publish_time = datetime.fromtimestamp(create_ts).strftime("%Y-%m-%d %H:%M:%S")
- else:
- publish_time = str(create_ts) if create_ts else ""
-
- read_count = int(item.get("readCount") or 0)
- like_count = int(item.get("likeCount") or 0)
- comment_count = int(item.get("commentCount") or 0)
- forward_count = int(item.get("forwardCount") or 0)
- fav_count = int(item.get("favCount") or 0)
-
- works.append(WorkItem(
- work_id=work_id,
- title=title,
- cover_url=cover_url,
- duration=duration,
- status="published",
- publish_time=publish_time,
- play_count=read_count,
- like_count=like_count,
- comment_count=comment_count,
- share_count=forward_count,
- collect_count=fav_count,
- ))
- except Exception as e:
- print(f"[{self.platform_name}] 解析作品项失败: {e}", flush=True)
- continue
-
- if total == 0 and works:
- total = len(works)
- print(f"[{self.platform_name}] 本页获取 {len(works)} 条,totalCount={total}, next_page={bool(next_page)}")
-
- except Exception as e:
- import traceback
- traceback.print_exc()
- return WorksResult(success=False, platform=self.platform_name, error=str(e))
-
- return WorksResult(success=True, platform=self.platform_name, works=works, total=total, has_more=has_more, next_page=next_page)
-
- async def get_comments(self, cookies: str, work_id: str, cursor: str = "") -> CommentsResult:
- """
- 获取视频号作品评论(完全参考 get_weixin_work_comments.py 的接口监听逻辑)
- 支持递归提取二级评论,正确处理 parent_comment_id
- """
- print(f"\n{'='*60}")
- print(f"[{self.platform_name}] 获取作品评论")
- print(f"[{self.platform_name}] work_id={work_id}")
- print(f"{'='*60}")
-
- comments: List[CommentItem] = []
- total = 0
- has_more = False
-
- try:
- await self.init_browser()
- cookie_list = self.parse_cookies(cookies)
- await self.set_cookies(cookie_list)
-
- if not self.page:
- raise Exception("Page not initialized")
-
- # 访问评论管理页面
- print(f"[{self.platform_name}] 正在打开评论页面...")
- await self.page.goto("https://channels.weixin.qq.com/platform/interaction/comment", timeout=30000)
- await asyncio.sleep(2)
-
- # 检查登录状态
- current_url = self.page.url
- if "login" in current_url:
- raise Exception("Cookie 已过期,请重新登录")
-
- # === 步骤1: 监听 post_list 接口获取作品列表 ===
- posts = []
- try:
- async with self.page.expect_response(
- lambda res: "/post/post_list" in res.url,
- timeout=20000
- ) as post_resp_info:
- await self.page.wait_for_selector('.scroll-list .comment-feed-wrap', timeout=15000)
-
- post_resp = await post_resp_info.value
- post_data = await post_resp.json()
-
- if post_data.get("errCode") == 0:
- posts = post_data.get("data", {}).get("list", [])
- print(f"[{self.platform_name}] ✅ 获取 {len(posts)} 个作品")
- else:
- err_msg = post_data.get("errMsg", "未知错误")
- print(f"[{self.platform_name}] ❌ post_list 业务错误: {err_msg}")
- return CommentsResult(
- success=False,
- platform=self.platform_name,
- work_id=work_id,
- error=f"post_list 业务错误: {err_msg}"
- )
- except Exception as e:
- print(f"[{self.platform_name}] ❌ 获取 post_list 失败: {e}")
- return CommentsResult(
- success=False,
- platform=self.platform_name,
- work_id=work_id,
- error=f"获取 post_list 失败: {e}"
- )
-
- # === 步骤2: 在 DOM 中查找目标作品 ===
- feed_wraps = await self.page.query_selector_all('.scroll-list .comment-feed-wrap')
- target_feed = None
- target_post = None
- target_index = -1
-
- for i, feed in enumerate(feed_wraps):
- if i >= len(posts):
- break
-
- post = posts[i]
- object_nonce = post.get("objectNonce", "")
- post_work_id = post.get("objectId", "") or object_nonce
-
- # 匹配 work_id(支持 objectId 或 objectNonce 匹配)
- if work_id in [post_work_id, object_nonce] or post_work_id in work_id or object_nonce in work_id:
- target_feed = feed
- target_post = post
- target_index = i
- work_title = post.get("desc", {}).get("description", "无标题")
- print(f"[{self.platform_name}] ✅ 找到目标作品: {work_title}")
- break
-
- if not target_feed or not target_post:
- print(f"[{self.platform_name}] ❌ 未找到 work_id={work_id} 对应的作品")
- return CommentsResult(
- success=True,
- platform=self.platform_name,
- work_id=work_id,
- comments=[],
- total=0,
- has_more=False
- )
-
- # 准备作品信息(用于递归函数)
- object_nonce = target_post.get("objectNonce", f"nonce_{target_index}")
- work_title = target_post.get("desc", {}).get("description", f"作品{target_index+1}")
-
- work_info = {
- "work_id": object_nonce,
- "work_title": work_title
- }
-
- # === 步骤3: 点击作品触发 comment_list 接口 ===
- content_wrap = await target_feed.query_selector('.feed-content') or target_feed
-
- try:
- async with self.page.expect_response(
- lambda res: "/comment/comment_list" in res.url,
- timeout=15000
- ) as comment_resp_info:
- await content_wrap.click()
- await asyncio.sleep(0.8)
-
- comment_resp = await comment_resp_info.value
- comment_data = await comment_resp.json()
-
- if comment_data.get("errCode") != 0:
- err_msg = comment_data.get("errMsg", "未知错误")
- print(f"[{self.platform_name}] ❌ 评论接口错误: {err_msg}")
- return CommentsResult(
- success=False,
- platform=self.platform_name,
- work_id=work_id,
- error=f"评论接口错误: {err_msg}"
- )
-
- raw_comments = comment_data.get("data", {}).get("comment", [])
- total = comment_data.get("data", {}).get("totalCount", len(raw_comments))
-
- print(f"[{self.platform_name}] 📊 原始评论数: {len(raw_comments)}, 总数: {total}")
-
- # === 步骤4: 递归提取所有评论(含子评论)===
- extracted = self._extract_comments(raw_comments, parent_id="", work_info=work_info)
-
- # === 步骤5: 转换为 CommentItem 列表(保留 weixin.py 的数据结构)===
- for c in extracted:
- # 使用接口返回的 comment_id
- comment_id = c.get("comment_id", "")
- parent_comment_id = c.get("parent_comment_id", "")
-
- # 构建 CommentItem(保留原有数据结构用于数据库入库)
- comment_item = CommentItem(
- comment_id=comment_id,
- parent_comment_id=parent_comment_id,
- work_id=work_id,
- content=c.get("content", ""),
- author_id=c.get("username", ""), # 使用 username 作为 author_id
- author_name=c.get("nickname", ""),
- author_avatar=c.get("avatar", ""),
- like_count=c.get("like_count", 0),
- reply_count=0,
- create_time=c.get("create_time", ""),
- )
-
- # 添加扩展字段(用于数据库存储和后续处理)
- # comment_item.parent_comment_id = c.get("parent_comment_id", "")
- comment_item.is_author = c.get("is_author", False)
- comment_item.create_time_unix = c.get("create_time_unix", 0)
- comment_item.work_title = c.get("work_title", "")
- print(comment_item)
- comments.append(comment_item)
-
- # 打印日志
- author_tag = " 👤(作者)" if c.get("is_author") else ""
- parent_tag = f" [回复: {c.get('parent_comment_id', '')}]" if c.get("parent_comment_id") else ""
- print(f"[{self.platform_name}] - [{c.get('nickname', '')}] {c.get('content', '')[:30]}... "
- f"({c.get('create_time', '')}){author_tag}{parent_tag}")
-
- # 判断是否还有更多(优先使用接口返回的 continueFlag,否则根据数量判断)
- has_more = comment_data.get("data", {}).get("continueFlag", False) or len(extracted) < total
-
- print(f"[{self.platform_name}] ✅ 共提取 {len(comments)} 条评论(含子评论)")
-
- except Exception as e:
- print(f"[{self.platform_name}] ❌ 获取评论失败: {e}")
- import traceback
- traceback.print_exc()
- return CommentsResult(
- success=False,
- platform=self.platform_name,
- work_id=work_id,
- error=f"获取评论失败: {e}"
- )
-
- except Exception as e:
- import traceback
- traceback.print_exc()
- return CommentsResult(
- success=False,
- platform=self.platform_name,
- work_id=work_id,
- error=str(e)
- )
-
- return CommentsResult(
- success=True,
- platform=self.platform_name,
- work_id=work_id,
- comments=comments,
- total=total,
- has_more=has_more
- )
- def _extract_comments(self, comment_list: list, parent_id: str = "", work_info: dict = None) -> list:
- """
- 递归提取一级和二级评论(完全参考 get_weixin_work_comments.py 的 extract_comments 函数)
-
- Args:
- comment_list: 评论列表(原始接口数据)
- parent_id: 父评论ID(一级评论为空字符串"",二级评论为父级评论ID)
- work_info: 作品信息字典
-
- Returns:
- list: 扁平化的评论列表,包含一级和二级评论
- """
- result = []
-
- # 获取当前用户 username(用于判断是否为作者)
- # 优先从环境变量获取,也可通过其他方式配置
- my_username = getattr(self, 'my_username', '') or os.environ.get('WEIXIN_MY_USERNAME', '')
-
- for cmt in comment_list:
- # 处理时间戳
- create_ts = int(cmt.get("commentCreatetime", 0) or 0)
- readable_time = (
- datetime.fromtimestamp(create_ts).strftime('%Y-%m-%d %H:%M:%S')
- if create_ts > 0 else ""
- )
-
- # 判断是否作者(如果配置了 my_username)
- username = cmt.get("username", "") or ""
- is_author = (my_username != "") and (username == my_username)
-
- # 构建评论条目 - 完全参考 get_weixin_work_comments.py 的字段
- entry = {
- "work_id": work_info.get("work_id", "") if work_info else "",
- "work_title": work_info.get("work_title", "") if work_info else "",
- "comment_id": cmt.get("commentId"),
- "parent_comment_id": parent_id, # 关键:一级评论为空字符串"",二级评论为父评论ID
- "username": username,
- "nickname": cmt.get("commentNickname", ""),
- "avatar": cmt.get("commentHeadurl", ""),
- "content": cmt.get("commentContent", ""),
- "create_time_unix": create_ts,
- "create_time": readable_time,
- "is_author": is_author,
- "like_count": cmt.get("commentLikeCount", 0) or 0
- }
- result.append(entry)
-
- # 递归处理二级评论(levelTwoComment)
- # 关键:二级评论的 parent_id 应该是当前这条评论的 comment_id
- level_two = cmt.get("levelTwoComment", []) or []
- if level_two and isinstance(level_two, list) and len(level_two) > 0:
- # 当前评论的 ID 作为其子评论的 parent_id
- current_comment_id = cmt.get("commentId", "")
- result.extend(
- self._extract_comments(level_two, parent_id=current_comment_id, work_info=work_info)
- )
-
- return result
- async def auto_reply_private_messages(self, cookies: str) -> dict:
- """自动回复私信 - 集成自 pw3.py"""
- print(f"\n{'='*60}")
- print(f"[{self.platform_name}] 开始自动回复私信")
- print(f"{'='*60}")
-
- try:
- await self.init_browser()
- cookie_list = self.parse_cookies(cookies)
- await self.set_cookies(cookie_list)
-
- if not self.page:
- raise Exception("Page not initialized")
-
- # 访问私信页面
- await self.page.goto("https://channels.weixin.qq.com/platform/private_msg", timeout=30000)
- await asyncio.sleep(3)
-
- # 检查登录状态
- current_url = self.page.url
- print(f"[{self.platform_name}] 当前 URL: {current_url}")
-
- if "login" in current_url:
- raise Exception("Cookie 已过期,请重新登录")
-
- # 等待私信页面加载(使用多个选择器容错)
- try:
- await self.page.wait_for_selector('.private-msg-list-header', timeout=15000)
- except:
- # 尝试其他选择器
- try:
- await self.page.wait_for_selector('.weui-desktop-tab__navs__inner', timeout=10000)
- print(f"[{self.platform_name}] 使用备用选择器加载成功")
- except:
- # 截图调试
- screenshot_path = f"weixin_private_msg_{int(asyncio.get_event_loop().time())}.png"
- await self.page.screenshot(path=screenshot_path)
- print(f"[{self.platform_name}] 页面加载失败,截图: {screenshot_path}")
- raise Exception(f"私信页面加载超时,当前 URL: {current_url}")
-
- print(f"[{self.platform_name}] 私信页面加载完成")
-
- # 处理两个 tab
- total_replied = 0
- for tab_name in ["打招呼消息", "私信"]:
- replied_count = await self._process_tab_sessions(tab_name)
- total_replied += replied_count
-
- print(f"[{self.platform_name}] 自动回复完成,共回复 {total_replied} 条消息")
-
- return {
- 'success': True,
- 'platform': self.platform_name,
- 'replied_count': total_replied,
- 'message': f'成功回复 {total_replied} 条私信'
- }
-
- except Exception as e:
- import traceback
- traceback.print_exc()
- return {
- 'success': False,
- 'platform': self.platform_name,
- 'error': str(e)
- }
-
- async def _process_tab_sessions(self, tab_name: str) -> int:
- """处理指定 tab 下的所有会话"""
- print(f"\n🔄 正在处理「{tab_name}」中的所有会话...")
-
- if not self.page:
- return 0
-
- replied_count = 0
-
- try:
- # 点击 tab
- if tab_name == "私信":
- tab_link = self.page.locator('.weui-desktop-tab__navs__inner li').first.locator('a')
- elif tab_name == "打招呼消息":
- tab_link = self.page.locator('.weui-desktop-tab__navs__inner li').nth(1).locator('a')
- else:
- return 0
-
- if await tab_link.is_visible():
- await tab_link.click()
- print(f" ➤ 已点击「{tab_name}」tab")
- else:
- print(f" ❌ 「{tab_name}」tab 不可见")
- return 0
-
- # 等待会话列表加载
- try:
- await self.page.wait_for_function("""
- () => {
- const hasSession = document.querySelectorAll('.session-wrap').length > 0;
- const hasEmpty = !!document.querySelector('.empty-text');
- return hasSession || hasEmpty;
- }
- """, timeout=8000)
- print(" ✅ 会话列表区域已加载")
- except:
- print(" ⚠️ 等待会话列表超时,继续尝试读取...")
-
- # 获取会话
- session_wraps = self.page.locator('.session-wrap')
- session_count = await session_wraps.count()
- print(f" 💬 共找到 {session_count} 个会话")
-
- if session_count == 0:
- return 0
-
- # 遍历每个会话
- for idx in range(session_count):
- try:
- current_sessions = self.page.locator('.session-wrap')
- if idx >= await current_sessions.count():
- break
-
- session = current_sessions.nth(idx)
- user_name = await session.locator('.name').inner_text()
- last_preview = await session.locator('.feed-info').inner_text()
- print(f"\n ➤ [{idx+1}/{session_count}] 正在处理: {user_name} | 最后消息: {last_preview}")
-
- await session.click()
- await asyncio.sleep(2)
-
- # 提取聊天历史
- history = await self._extract_chat_history()
- need_reply = (not history) or (not history[-1]["is_author"])
-
- if need_reply:
- reply_text = await self._generate_reply_with_ai(history)
- if reply_text=="":
- reply_text = self._generate_reply(history)
- # # 生成回复
- # if history and history[-1]["is_author"]:
- # reply_text = await self._generate_reply_with_ai(history)
- # else:
- # reply_text = self._generate_reply(history)
-
- if reply_text:
- print(f" 📝 回复内容: {reply_text}")
- try:
- textarea = self.page.locator('.edit_area').first
- send_btn = self.page.locator('button:has-text("发送")').first
- if await textarea.is_visible() and await send_btn.is_visible():
- await textarea.fill(reply_text)
- await asyncio.sleep(0.5)
- await send_btn.click()
- print(" ✅ 已发送")
- replied_count += 1
- await asyncio.sleep(1.5)
- else:
- print(" ❌ 输入框或发送按钮不可见")
- except Exception as e:
- print(f" ❌ 发送失败: {e}")
- else:
- print(" ➤ 无需回复")
- else:
- print(" ➤ 最后一条是我发的,跳过回复")
-
- except Exception as e:
- print(f" ❌ 处理会话 {idx+1} 时出错: {e}")
- continue
-
- except Exception as e:
- print(f"❌ 处理「{tab_name}」失败: {e}")
-
- return replied_count
-
- async def _extract_chat_history(self) -> list:
- """精准提取聊天记录,区分作者(自己)和用户"""
- if not self.page:
- return []
-
- history = []
- message_wrappers = self.page.locator('.session-content-wrapper > div:not(.footer) > .text-wrapper')
- count = await message_wrappers.count()
-
- for i in range(count):
- try:
- wrapper = message_wrappers.nth(i)
- # 判断方向
- is_right = await wrapper.locator('.content-right').count() > 0
- is_left = await wrapper.locator('.content-left').count() > 0
-
- if not (is_left or is_right):
- continue
-
- # 提取消息文本
- pre_el = wrapper.locator('pre.message-plain')
- content = ''
- if await pre_el.count() > 0:
- content = await pre_el.inner_text()
- content = content.strip()
-
- if not content:
- continue
-
- # 获取头像
- avatar_img = wrapper.locator('.avatar').first
- avatar_src = ''
- if await avatar_img.count() > 0:
- avatar_src = await avatar_img.get_attribute("src") or ''
-
- # 右侧 = 作者(自己)
- is_author = is_right
-
- # 获取用户名
- if is_left:
- name_el = wrapper.locator('.profile .name')
- author_name = '用户'
- if await name_el.count() > 0:
- author_name = await name_el.inner_text()
- else:
- author_name = "我"
-
- history.append({
- "author": author_name,
- "content": content,
- "is_author": is_author,
- "avatar": avatar_src
- })
- except Exception as e:
- print(f" ⚠️ 解析第 {i+1} 条消息失败: {e}")
- continue
-
- return history
-
- async def _generate_reply_with_ai(self, chat_history: list) -> str:
- """使用 AI 生成智能回复"""
- import requests
- import json
-
- try:
- # 获取 AI 配置
- ai_api_key = os.environ.get('DASHSCOPE_API_KEY', '')
- ai_base_url = os.environ.get('DASHSCOPE_BASE_URL', 'https://dashscope.aliyuncs.com/compatible-mode/v1')
- ai_model = os.environ.get('AI_MODEL', 'qwen-plus')
-
-
- if not ai_api_key:
- print("⚠️ 未配置 AI API Key,使用规则回复")
- return self._generate_reply(chat_history)
-
- # 构建对话上下文
- messages = [{"role": "system", "content": "你是一个友好的微信视频号创作者助手,负责回复粉丝私信。请保持简洁、友好、专业的语气。回复长度不超过20字。"}]
-
- for msg in chat_history:
- role = "assistant" if msg["is_author"] else "user"
- messages.append({
- "role": role,
- "content": msg["content"]
- })
-
- # 调用 AI API
- headers = {
- 'Authorization': f'Bearer {ai_api_key}',
- 'Content-Type': 'application/json'
- }
-
- payload = {
- "model": ai_model,
- "messages": messages,
- "max_tokens": 150,
- "temperature": 0.8
- }
-
- print(" 🤖 正在调用 AI 生成回复...")
- response = requests.post(
- f"{ai_base_url}/chat/completions",
- headers=headers,
- json=payload,
- timeout=30
- )
-
- if response.status_code != 200:
- print(f" ⚠️ AI API 返回错误 {response.status_code},使用规则回复")
- return self._generate_reply(chat_history)
-
- result = response.json()
- ai_reply = result.get('choices', [{}])[0].get('message', {}).get('content', '').strip()
-
- if ai_reply:
- print(f" ✅ AI 生成回复: {ai_reply}")
- return ai_reply
- else:
- print(" ⚠️ AI 返回空内容,使用规则回复")
- return self._generate_reply(chat_history)
-
- except Exception as e:
- print(f" ⚠️ AI 回复生成失败: {e},使用规则回复")
- return self._generate_reply(chat_history)
-
- def _generate_reply(self, chat_history: list) -> str:
- """根据完整聊天历史生成回复(规则回复方式)"""
- if not chat_history:
- return "你好!感谢联系~"
-
- # 检查最后一条是否是作者发的
- if chat_history[-1]["is_author"]:
- return "" # 不回复
-
- # 找最后一条用户消息
- last_user_msg = chat_history[-1]["content"]
-
- # 简单规则回复
- if "谢谢" in last_user_msg or "感谢" in last_user_msg:
- return "不客气!欢迎常来交流~"
- elif "你好" in last_user_msg or "在吗" in last_user_msg:
- return "你好!请问有什么可以帮您的?"
- elif "视频" in last_user_msg or "怎么拍" in last_user_msg:
- return "视频是用手机拍摄的,注意光线和稳定哦!"
- else:
- return "收到!我会认真阅读您的留言~"
|