# -*- coding: utf-8 -*- """ 微信视频号发布器 参考: matrix/tencent_uploader/main.py """ import asyncio import os from datetime import datetime from typing import List from .base import ( BasePublisher, PublishParams, PublishResult, WorkItem, WorksResult, CommentItem, CommentsResult ) # 允许通过环境变量手动指定“上传视频入口”的选择器,便于在页面结构频繁变更时快速调整 WEIXIN_UPLOAD_SELECTOR = os.environ.get("WEIXIN_UPLOAD_SELECTOR", "").strip() def format_short_title(origin_title: str) -> str: """ 格式化短标题 - 移除特殊字符 - 长度限制在 6-16 字符 """ allowed_special_chars = "《》"":+?%°" filtered_chars = [ char if char.isalnum() or char in allowed_special_chars else ' ' if char == ',' else '' for char in origin_title ] formatted_string = ''.join(filtered_chars) if len(formatted_string) > 16: formatted_string = formatted_string[:16] elif len(formatted_string) < 6: formatted_string += ' ' * (6 - len(formatted_string)) return formatted_string class WeixinPublisher(BasePublisher): """ 微信视频号发布器 使用 Playwright 自动化操作视频号创作者中心 注意: 需要使用 Chrome 浏览器,否则可能出现 H264 编码错误 """ platform_name = "weixin" login_url = "https://channels.weixin.qq.com/platform" publish_url = "https://channels.weixin.qq.com/platform/post/create" # 视频号域名为 channels.weixin.qq.com,cookie 常见 domain 为 .qq.com / .weixin.qq.com 等 # 这里默认用更宽泛的 .qq.com,避免“字符串 cookie”场景下 domain 兜底不生效 cookie_domain = ".qq.com" async def ai_find_upload_selector(self, frame_html: str, frame_name: str = "main") -> str: """ 使用 AI 从 HTML 中识别“上传视频/选择文件”相关元素的 CSS 选择器。 设计思路: - 仅在常规 DOM 选择器都失败时调用,避免频繁占用 AI 配额; - 通过 DashScope 文本模型(与验证码识别同一套配置)分析 HTML; - 返回一个适合用于 frame.locator(selector) 的 CSS 选择器。 """ import json import re import requests import os # 避免 HTML 过长导致 token 超限,只截取前 N 字符 if not frame_html: return "" max_len = 20000 if len(frame_html) > max_len: frame_html = frame_html[:max_len] ai_api_key = os.environ.get("DASHSCOPE_API_KEY", "") ai_base_url = os.environ.get("DASHSCOPE_BASE_URL", "https://dashscope.aliyuncs.com/compatible-mode/v1") ai_text_model = os.environ.get("AI_TEXT_MODEL", "qwen-plus") if not ai_api_key: print(f"[{self.platform_name}] AI上传入口识别: 未配置 AI API Key,跳过") return "" prompt = f""" 你是熟悉微信视频号后台的前端工程师,现在需要在一段 HTML 中找到“上传视频文件”的入口。 页面说明: - 平台:微信视频号(channels.weixin.qq.com) - 目标:用于上传视频文件的按钮或 input(一般会触发文件选择框) - 你会收到某个 frame 的完整 HTML 片段(不包含截图)。 请你根据下面的 HTML,推断最适合用于上传视频文件的元素,并输出一个可以被 Playwright 使用的 CSS 选择器。 要求: 1. 只考虑“上传/选择视频文件”的入口,不要返回“发布/发表/下一步”等按钮; 2. 选择器需要尽量稳定,不要使用自动生成的随机类名(例如带很多随机字母/数字的类名可以用前缀匹配); 3. 选择器必须是 CSS 选择器(不要返回 XPath); 4. 如果确实找不到合理的上传入口,返回 selector 为空字符串。 请以 JSON 格式输出,严格遵守以下结构(不要添加任何解释文字): ```json {{ "selector": "CSS 选择器字符串,比如:input[type='file'] 或 div.upload-content input[type='file']" }} ``` 下面是 frame=\"{frame_name}\" 的 HTML: ```html {frame_html} ```""" payload = { "model": ai_text_model, "messages": [ { "role": "user", "content": prompt, } ], "max_tokens": 600, } headers = { "Authorization": f"Bearer {ai_api_key}", "Content-Type": "application/json", } try: print(f"[{self.platform_name}] AI上传入口识别: 正在分析 frame={frame_name} HTML...") resp = requests.post( f"{ai_base_url}/chat/completions", headers=headers, json=payload, timeout=40, ) if resp.status_code != 200: print(f"[{self.platform_name}] AI上传入口识别: API 返回错误 {resp.status_code}") return "" data = resp.json() content = data.get("choices", [{}])[0].get("message", {}).get("content", "") or "" # 尝试从 ```json``` 代码块中解析 json_match = re.search(r"```json\\s*([\\s\\S]*?)\\s*```", content) if json_match: json_str = json_match.group(1) else: json_match = re.search(r"\\{[\\s\\S]*\\}", content) json_str = json_match.group(0) if json_match else "{}" try: result = json.loads(json_str) except Exception: result = {} selector = (result.get("selector") or "").strip() print(f"[{self.platform_name}] AI上传入口识别结果: selector='{selector}'") return selector except Exception as e: print(f"[{self.platform_name}] AI上传入口识别异常: {e}") return "" async def ai_pick_selector_from_candidates(self, candidates: list, goal: str, frame_name: str = "main") -> str: """ 将“候选元素列表(包含 css selector + 文本/属性)”发给 AI,让 AI 直接挑选最符合 goal 的元素。 适用于:HTML 里看不出上传入口、或页面大量动态渲染时。 """ import json import re import requests import os if not candidates: return "" ai_api_key = os.environ.get("DASHSCOPE_API_KEY", "") ai_base_url = os.environ.get("DASHSCOPE_BASE_URL", "https://dashscope.aliyuncs.com/compatible-mode/v1") ai_text_model = os.environ.get("AI_TEXT_MODEL", "qwen-plus") if not ai_api_key: print(f"[{self.platform_name}] AI候选选择器: 未配置 AI API Key,跳过") return "" # 控制长度,最多取前 120 个候选 candidates = candidates[:120] prompt = f""" 你是自动化发布工程师。现在要在微信视频号(channels.weixin.qq.com)发布页面里找到“{goal}”相关的入口元素。 我会给你一组候选元素,每个候选都包含: - css: 可直接用于 Playwright 的 CSS 选择器 - tag / type / role / ariaLabel / text / id / className(部分字段可能为空) 你的任务: - 从候选中选出最可能用于“{goal}”的元素,返回它的 css 选择器; - 如果没有任何候选符合,返回空字符串。 注意: - 如果 goal 是“上传视频入口”,优先选择 input[type=file] 或看起来会触发选择文件/上传的区域; - 不要选择“发布/发表/下一步”等按钮(除非 goal 明确是发布按钮)。 请严格按 JSON 输出(不要解释): ```json {{ "selector": "..." }} ``` 候选列表(frame={frame_name}): ```json {json.dumps(candidates, ensure_ascii=False)} ```""" payload = { "model": ai_text_model, "messages": [{"role": "user", "content": prompt}], "max_tokens": 400, } headers = { "Authorization": f"Bearer {ai_api_key}", "Content-Type": "application/json", } try: print(f"[{self.platform_name}] AI候选选择器: 正在分析 frame={frame_name}, goal={goal} ...") resp = requests.post( f"{ai_base_url}/chat/completions", headers=headers, json=payload, timeout=40, ) if resp.status_code != 200: print(f"[{self.platform_name}] AI候选选择器: API 返回错误 {resp.status_code}") return "" data = resp.json() content = data.get("choices", [{}])[0].get("message", {}).get("content", "") or "" json_match = re.search(r"```json\\s*([\\s\\S]*?)\\s*```", content) if json_match: json_str = json_match.group(1) else: json_match = re.search(r"\\{[\\s\\S]*\\}", content) json_str = json_match.group(0) if json_match else "{}" try: result = json.loads(json_str) except Exception: result = {} selector = (result.get("selector") or "").strip() print(f"[{self.platform_name}] AI候选选择器结果: selector='{selector}'") return selector except Exception as e: print(f"[{self.platform_name}] AI候选选择器异常: {e}") return "" async def _extract_relevant_html_snippets(self, html: str) -> str: """ 从 HTML 中抽取与上传相关的片段,减少 token,提升 AI 命中率。 - 优先抓取包含 upload/上传/file/input 等关键词的窗口片段 - 若未命中关键词,返回“开头 + 结尾”的拼接 """ import re if not html: return "" patterns = [ r"upload", r"uploader", r"file", r"type\\s*=\\s*['\\\"]file['\\\"]", r"input", r"drag", r"drop", r"选择", r"上传", r"添加", r"视频", ] regex = re.compile("|".join(patterns), re.IGNORECASE) snippets = [] for m in regex.finditer(html): start = max(0, m.start() - 350) end = min(len(html), m.end() + 350) snippets.append(html[start:end]) if len(snippets) >= 18: break if snippets: # 去重(粗略) unique = [] seen = set() for s in snippets: key = hash(s) if key not in seen: seen.add(key) unique.append(s) return "\n\n\n\n".join(unique)[:20000] # fallback: head + tail head = html[:9000] tail = html[-9000:] if len(html) > 9000 else "" return (head + "\n\n\n\n" + tail)[:20000] async def init_browser(self, storage_state: str = None): """初始化浏览器 - 参考 matrix 使用 channel=chrome 避免 H264 编码错误""" from playwright.async_api import async_playwright playwright = await async_playwright().start() # 参考 matrix: 使用系统内的 Chrome 浏览器,避免 H264 编码错误 # 如果没有安装 Chrome,则使用默认 Chromium try: self.browser = await playwright.chromium.launch( headless=self.headless, channel="chrome" # 使用系统 Chrome ) print(f"[{self.platform_name}] 使用系统 Chrome 浏览器") except Exception as e: print(f"[{self.platform_name}] Chrome 不可用,使用 Chromium: {e}") self.browser = await playwright.chromium.launch(headless=self.headless) if storage_state and os.path.exists(storage_state): self.context = await self.browser.new_context(storage_state=storage_state) else: self.context = await self.browser.new_context() self.page = await self.context.new_page() return self.page async def set_schedule_time(self, publish_date: datetime): """设置定时发布""" if not self.page: return print(f"[{self.platform_name}] 设置定时发布...") # 点击定时选项 label_element = self.page.locator("label").filter(has_text="定时").nth(1) await label_element.click() # 选择日期 await self.page.click('input[placeholder="请选择发表时间"]') publish_month = f"{publish_date.month:02d}" current_month = f"{publish_month}月" # 检查月份 page_month = await self.page.inner_text('span.weui-desktop-picker__panel__label:has-text("月")') if page_month != current_month: await self.page.click('button.weui-desktop-btn__icon__right') # 选择日期 elements = await self.page.query_selector_all('table.weui-desktop-picker__table a') for element in elements: class_name = await element.evaluate('el => el.className') if 'weui-desktop-picker__disabled' in class_name: continue text = await element.inner_text() if text.strip() == str(publish_date.day): await element.click() break # 输入时间 await self.page.click('input[placeholder="请选择时间"]') await self.page.keyboard.press("Control+KeyA") await self.page.keyboard.type(str(publish_date.hour)) # 点击其他地方确认 await self.page.locator("div.input-editor").click() async def handle_upload_error(self, video_path: str): """处理上传错误""" if not self.page: return print(f"[{self.platform_name}] 视频出错了,重新上传中...") await self.page.locator('div.media-status-content div.tag-inner:has-text("删除")').click() await self.page.get_by_role('button', name="删除", exact=True).click() file_input = self.page.locator('input[type="file"]') await file_input.set_input_files(video_path) async def add_title_tags(self, params: PublishParams): """添加标题和话题""" if not self.page: return await self.page.locator("div.input-editor").click() await self.page.keyboard.type(params.title) if params.tags: await self.page.keyboard.press("Enter") for tag in params.tags: await self.page.keyboard.type("#" + tag) await self.page.keyboard.press("Space") print(f"[{self.platform_name}] 成功添加标题和 {len(params.tags)} 个话题") async def add_short_title(self): """添加短标题""" if not self.page: return try: short_title_element = self.page.get_by_text("短标题", exact=True).locator("..").locator( "xpath=following-sibling::div").locator('span input[type="text"]') if await short_title_element.count(): # 获取已有内容作为短标题 pass except: pass async def upload_cover(self, cover_path: str): """上传封面图""" if not self.page or not cover_path or not os.path.exists(cover_path): return try: await asyncio.sleep(2) preview_btn_info = await self.page.locator( 'div.finder-tag-wrap.btn:has-text("更换封面")').get_attribute('class') if "disabled" not in preview_btn_info: await self.page.locator('div.finder-tag-wrap.btn:has-text("更换封面")').click() await self.page.locator('div.single-cover-uploader-wrap > div.wrap').hover() # 删除现有封面 if await self.page.locator(".del-wrap > .svg-icon").count(): await self.page.locator(".del-wrap > .svg-icon").click() # 上传新封面 preview_div = self.page.locator("div.single-cover-uploader-wrap > div.wrap") async with self.page.expect_file_chooser() as fc_info: await preview_div.click() preview_chooser = await fc_info.value await preview_chooser.set_files(cover_path) await asyncio.sleep(2) await self.page.get_by_role("button", name="确定").click() await asyncio.sleep(1) await self.page.get_by_role("button", name="确认").click() print(f"[{self.platform_name}] 封面上传成功") except Exception as e: print(f"[{self.platform_name}] 封面上传失败: {e}") async def check_captcha(self) -> dict: """检查页面是否需要验证码""" if not self.page: return {'need_captcha': False, 'captcha_type': ''} try: # 检查各种验证码 captcha_selectors = [ 'text="请输入验证码"', 'text="滑动验证"', '[class*="captcha"]', '[class*="verify"]', ] for selector in captcha_selectors: try: if await self.page.locator(selector).count() > 0: print(f"[{self.platform_name}] 检测到验证码: {selector}") return {'need_captcha': True, 'captcha_type': 'image'} except: pass # 检查登录弹窗 login_selectors = [ 'text="请登录"', 'text="扫码登录"', '[class*="login-dialog"]', ] for selector in login_selectors: try: if await self.page.locator(selector).count() > 0: print(f"[{self.platform_name}] 检测到需要登录: {selector}") return {'need_captcha': True, 'captcha_type': 'login'} except: pass except Exception as e: print(f"[{self.platform_name}] 验证码检测异常: {e}") return {'need_captcha': False, 'captcha_type': ''} async def publish(self, cookies: str, params: PublishParams) -> PublishResult: """发布视频到视频号""" print(f"\n{'='*60}") print(f"[{self.platform_name}] 开始发布视频") print(f"[{self.platform_name}] 视频路径: {params.video_path}") print(f"[{self.platform_name}] 标题: {params.title}") print(f"[{self.platform_name}] Headless: {self.headless}") print(f"{'='*60}") self.report_progress(5, "正在初始化浏览器...") # 初始化浏览器(使用 Chrome) await self.init_browser() print(f"[{self.platform_name}] 浏览器初始化完成") # 解析并设置 cookies cookie_list = self.parse_cookies(cookies) print(f"[{self.platform_name}] 解析到 {len(cookie_list)} 个 cookies") await self.set_cookies(cookie_list) if not self.page: raise Exception("Page not initialized") # 检查视频文件 if not os.path.exists(params.video_path): raise Exception(f"视频文件不存在: {params.video_path}") print(f"[{self.platform_name}] 视频文件存在,大小: {os.path.getsize(params.video_path)} bytes") self.report_progress(10, "正在打开上传页面...") # 访问上传页面 await self.page.goto(self.publish_url, wait_until="domcontentloaded", timeout=60000) await asyncio.sleep(3) # 检查是否跳转到登录页 current_url = self.page.url print(f"[{self.platform_name}] 当前页面: {current_url}") if "login" in current_url: screenshot_base64 = await self.capture_screenshot() return PublishResult( success=False, platform=self.platform_name, error="Cookie 已过期,需要重新登录", need_captcha=True, captcha_type='login', screenshot_base64=screenshot_base64, page_url=current_url, status='need_captcha' ) # 使用 AI 检查验证码 ai_captcha = await self.ai_check_captcha() if ai_captcha['has_captcha']: print(f"[{self.platform_name}] AI检测到验证码: {ai_captcha['captcha_type']}", flush=True) screenshot_base64 = await self.capture_screenshot() return PublishResult( success=False, platform=self.platform_name, error=f"检测到{ai_captcha['captcha_type']}验证码,需要使用有头浏览器完成验证", need_captcha=True, captcha_type=ai_captcha['captcha_type'], screenshot_base64=screenshot_base64, page_url=current_url, status='need_captcha' ) # 传统方式检查验证码 captcha_result = await self.check_captcha() if captcha_result['need_captcha']: screenshot_base64 = await self.capture_screenshot() return PublishResult( success=False, platform=self.platform_name, error=f"需要{captcha_result['captcha_type']}验证码,请使用有头浏览器完成验证", need_captcha=True, captcha_type=captcha_result['captcha_type'], screenshot_base64=screenshot_base64, page_url=current_url, status='need_captcha' ) self.report_progress(15, "正在选择视频文件...") # 上传视频 # 说明:视频号发布页在不同账号/地区/灰度下 DOM 结构差异较大,且上传组件可能在 iframe 中。 # 因此这里按 matrix 的思路“点击触发 file chooser”,同时增加“遍历全部 frame + 精确挑选 video input”的兜底。 upload_success = False if not self.page: raise Exception("Page not initialized") # 等待页面把上传区域渲染出来(避免过早判断) try: await self.page.wait_for_selector("div.upload-content, input[type='file'], iframe", timeout=20000) except Exception: pass async def _try_set_files_in_frame(frame, frame_name: str) -> bool: """在指定 frame 中尝试触发上传""" nonlocal upload_success if upload_success: return True # 方法0:如果用户通过环境变量显式配置了选择器,优先尝试这个 if WEIXIN_UPLOAD_SELECTOR: try: el = frame.locator(WEIXIN_UPLOAD_SELECTOR).first if await el.count() > 0 and await el.is_visible(): print(f"[{self.platform_name}] [{frame_name}] 使用环境变量 WEIXIN_UPLOAD_SELECTOR: {WEIXIN_UPLOAD_SELECTOR}") try: async with self.page.expect_file_chooser(timeout=5000) as fc_info: await el.click() chooser = await fc_info.value await chooser.set_files(params.video_path) upload_success = True print(f"[{self.platform_name}] [{frame_name}] 通过环境变量选择器上传成功") return True except Exception as e: print(f"[{self.platform_name}] [{frame_name}] 环境变量选择器点击失败,尝试直接 set_input_files: {e}") try: await el.set_input_files(params.video_path) upload_success = True print(f"[{self.platform_name}] [{frame_name}] 环境变量选择器 set_input_files 成功") return True except Exception as e2: print(f"[{self.platform_name}] [{frame_name}] 环境变量选择器 set_input_files 仍失败: {e2}") except Exception as e: print(f"[{self.platform_name}] [{frame_name}] 使用环境变量选择器定位元素失败: {e}") # 先尝试点击上传区域触发 chooser(最贴近 matrix) click_selectors = [ "div.upload-content", "div[class*='upload-content']", "div[class*='upload']", "div.add-wrap", "[class*='uploader']", "text=点击上传", "text=上传视频", "text=选择视频", ] for selector in click_selectors: try: el = frame.locator(selector).first if await el.count() > 0 and await el.is_visible(): print(f"[{self.platform_name}] [{frame_name}] 找到可点击上传区域: {selector}") try: async with self.page.expect_file_chooser(timeout=5000) as fc_info: await el.click() chooser = await fc_info.value await chooser.set_files(params.video_path) upload_success = True print(f"[{self.platform_name}] [{frame_name}] 通过 file chooser 上传成功") return True except Exception as e: print(f"[{self.platform_name}] [{frame_name}] 点击触发 chooser 失败: {e}") except Exception: pass # 再尝试直接设置 input[type=file](iframe/隐藏 input 常见) try: inputs = frame.locator("input[type='file']") cnt = await inputs.count() if cnt > 0: best_idx = 0 best_score = -1 for i in range(cnt): try: inp = inputs.nth(i) accept = (await inp.get_attribute("accept")) or "" multiple = (await inp.get_attribute("multiple")) or "" score = 0 if "video" in accept: score += 10 if "mp4" in accept: score += 3 if multiple: score += 1 if score > best_score: best_score = score best_idx = i except Exception: continue target = inputs.nth(best_idx) print(f"[{self.platform_name}] [{frame_name}] 尝试对 input[{best_idx}] set_input_files (score={best_score})") await target.set_input_files(params.video_path) upload_success = True print(f"[{self.platform_name}] [{frame_name}] 通过 file input 上传成功") return True except Exception as e: print(f"[{self.platform_name}] [{frame_name}] file input 上传失败: {e}") # 不直接返回,让后面的 AI 兜底有机会执行 # 方法4: 兜底使用 AI 分析 HTML,猜测上传入口 try: frame_url = getattr(frame, "url", "") html_full = await frame.content() html_for_ai = await self._extract_relevant_html_snippets(html_full) print(f"[{self.platform_name}] [{frame_name}] frame_url={frame_url}, html_len={len(html_full)}, html_for_ai_len={len(html_for_ai)}") ai_selector = await self.ai_find_upload_selector(html_for_ai, frame_name=frame_name) if ai_selector: try: el = frame.locator(ai_selector).first if await el.count() > 0: print(f"[{self.platform_name}] [{frame_name}] 使用 AI 选择器点击上传入口: {ai_selector}") try: async with self.page.expect_file_chooser(timeout=5000) as fc_info: await el.click() chooser = await fc_info.value await chooser.set_files(params.video_path) upload_success = True print(f"[{self.platform_name}] [{frame_name}] 通过 AI 选择器上传成功") return True except Exception as e: print(f"[{self.platform_name}] [{frame_name}] AI 选择器点击失败,改为直接 set_input_files: {e}") try: await el.set_input_files(params.video_path) upload_success = True print(f"[{self.platform_name}] [{frame_name}] AI 选择器直接 set_input_files 成功") return True except Exception as e2: print(f"[{self.platform_name}] [{frame_name}] AI 选择器 set_input_files 仍失败: {e2}") except Exception as e: print(f"[{self.platform_name}] [{frame_name}] 使用 AI 选择器定位元素失败: {e}") else: # 如果 AI 无法从 HTML 推断,退一步:构造候选元素列表交给 AI 选择 try: candidates = await frame.evaluate(""" () => { function cssEscape(s) { try { return CSS.escape(s); } catch (e) { return s.replace(/[^a-zA-Z0-9_-]/g, '\\\\$&'); } } function buildSelector(el) { if (!el || el.nodeType !== 1) return ''; if (el.id) return `#${cssEscape(el.id)}`; let parts = []; let cur = el; for (let depth = 0; cur && cur.nodeType === 1 && depth < 5; depth++) { let part = cur.tagName.toLowerCase(); const role = cur.getAttribute('role'); const type = cur.getAttribute('type'); if (type) part += `[type="${type}"]`; if (role) part += `[role="${role}"]`; const cls = (cur.className || '').toString().trim().split(/\\s+/).filter(Boolean); if (cls.length) part += '.' + cls.slice(0, 2).map(cssEscape).join('.'); // nth-of-type let idx = 1; let sib = cur; while (sib && (sib = sib.previousElementSibling)) { if (sib.tagName === cur.tagName) idx++; } part += `:nth-of-type(${idx})`; parts.unshift(part); cur = cur.parentElement; } return parts.join(' > '); } const nodes = Array.from(document.querySelectorAll('input, button, a, div, span')) .filter(el => { const tag = el.tagName.toLowerCase(); const type = (el.getAttribute('type') || '').toLowerCase(); const role = (el.getAttribute('role') || '').toLowerCase(); const aria = (el.getAttribute('aria-label') || '').toLowerCase(); const txt = (el.innerText || '').trim().slice(0, 60); const cls = (el.className || '').toString().toLowerCase(); const isFile = tag === 'input' && type === 'file'; const looksClickable = tag === 'button' || tag === 'a' || role === 'button' || el.onclick || cls.includes('upload') || cls.includes('uploader') || cls.includes('drag') || aria.includes('上传') || aria.includes('选择') || aria.includes('添加') || txt.includes('上传') || txt.includes('选择') || txt.includes('添加') || txt.includes('点击上传'); if (!isFile && !looksClickable) return false; const r = el.getBoundingClientRect(); const visible = r.width > 5 && r.height > 5; return visible; }); const limited = nodes.slice(0, 120).map(el => ({ css: buildSelector(el), tag: el.tagName.toLowerCase(), type: el.getAttribute('type') || '', role: el.getAttribute('role') || '', ariaLabel: el.getAttribute('aria-label') || '', text: (el.innerText || '').trim().slice(0, 80), id: el.id || '', className: (el.className || '').toString().slice(0, 120), accept: el.getAttribute('accept') || '', })); return limited; } """) ai_selector2 = await self.ai_pick_selector_from_candidates( candidates=candidates, goal="上传视频入口", frame_name=frame_name ) if ai_selector2: el2 = frame.locator(ai_selector2).first if await el2.count() > 0: print(f"[{self.platform_name}] [{frame_name}] 使用 AI 候选选择器点击上传入口: {ai_selector2}") try: async with self.page.expect_file_chooser(timeout=5000) as fc_info: await el2.click() chooser2 = await fc_info.value await chooser2.set_files(params.video_path) upload_success = True print(f"[{self.platform_name}] [{frame_name}] 通过 AI 候选选择器上传成功") return True except Exception as e: print(f"[{self.platform_name}] [{frame_name}] AI 候选选择器点击失败,尝试 set_input_files: {e}") try: await el2.set_input_files(params.video_path) upload_success = True print(f"[{self.platform_name}] [{frame_name}] AI 候选选择器 set_input_files 成功") return True except Exception as e2: print(f"[{self.platform_name}] [{frame_name}] AI 候选选择器 set_input_files 仍失败: {e2}") except Exception as e: print(f"[{self.platform_name}] [{frame_name}] 构造候选并交给 AI 失败: {e}") except Exception as e: print(f"[{self.platform_name}] [{frame_name}] AI 上传入口识别整体失败: {e}") return False # 先尝试主 frame try: await _try_set_files_in_frame(self.page.main_frame, "main") except Exception as e: print(f"[{self.platform_name}] main frame 上传尝试异常: {e}") # 再遍历所有子 frame if not upload_success: try: frames = self.page.frames print(f"[{self.platform_name}] 发现 frames: {len(frames)}") for idx, fr in enumerate(frames): if upload_success: break # main_frame 已尝试过 if fr == self.page.main_frame: continue name = fr.name or f"frame-{idx}" await _try_set_files_in_frame(fr, name) except Exception as e: print(f"[{self.platform_name}] 遍历 frames 异常: {e}") if not upload_success: screenshot_base64 = await self.capture_screenshot() return PublishResult( success=False, platform=self.platform_name, error="未找到上传入口(可能在 iframe 中或页面结构已变更)", screenshot_base64=screenshot_base64, page_url=await self.get_page_url(), status='failed' ) self.report_progress(20, "正在填充标题和话题...") # 添加标题和话题 await self.add_title_tags(params) self.report_progress(30, "等待视频上传完成...") # 等待上传完成 for _ in range(120): try: button_info = await self.page.get_by_role("button", name="发表").get_attribute('class') if "weui-desktop-btn_disabled" not in button_info: print(f"[{self.platform_name}] 视频上传完毕") # 上传封面 self.report_progress(50, "正在上传封面...") await self.upload_cover(params.cover_path) break else: # 检查上传错误 if await self.page.locator('div.status-msg.error').count(): if await self.page.locator('div.media-status-content div.tag-inner:has-text("删除")').count(): await self.handle_upload_error(params.video_path) await asyncio.sleep(3) except: await asyncio.sleep(3) self.report_progress(60, "处理视频设置...") # 添加短标题 try: short_title_el = self.page.get_by_text("短标题", exact=True).locator("..").locator( "xpath=following-sibling::div").locator('span input[type="text"]') if await short_title_el.count(): short_title = format_short_title(params.title) await short_title_el.fill(short_title) except: pass # 定时发布 if params.publish_date: self.report_progress(70, "设置定时发布...") await self.set_schedule_time(params.publish_date) self.report_progress(80, "正在发布...") # 点击发布 - 参考 matrix for i in range(30): try: # 参考 matrix: div.form-btns button:has-text("发表") publish_btn = self.page.locator('div.form-btns button:has-text("发表")') if await publish_btn.count(): print(f"[{self.platform_name}] 点击发布按钮...") await publish_btn.click() # 等待跳转到作品列表页面 - 参考 matrix await self.page.wait_for_url( "https://channels.weixin.qq.com/platform/post/list", timeout=10000 ) self.report_progress(100, "发布成功") print(f"[{self.platform_name}] 视频发布成功!") screenshot_base64 = await self.capture_screenshot() return PublishResult( success=True, platform=self.platform_name, message="发布成功", screenshot_base64=screenshot_base64, page_url=self.page.url, status='success' ) except Exception as e: current_url = self.page.url if "https://channels.weixin.qq.com/platform/post/list" in current_url: self.report_progress(100, "发布成功") print(f"[{self.platform_name}] 视频发布成功!") screenshot_base64 = await self.capture_screenshot() return PublishResult( success=True, platform=self.platform_name, message="发布成功", screenshot_base64=screenshot_base64, page_url=current_url, status='success' ) else: print(f"[{self.platform_name}] 视频正在发布中... {i+1}/30, URL: {current_url}") await asyncio.sleep(1) # 发布超时 screenshot_base64 = await self.capture_screenshot() page_url = await self.get_page_url() return PublishResult( success=False, platform=self.platform_name, error="发布超时,请检查发布状态", screenshot_base64=screenshot_base64, page_url=page_url, status='need_action' ) async def get_works(self, cookies: str, page: int = 0, page_size: int = 20) -> WorksResult: """获取视频号作品列表""" print(f"\n{'='*60}") print(f"[{self.platform_name}] 获取作品列表") print(f"[{self.platform_name}] page={page}, page_size={page_size}") print(f"{'='*60}") works: List[WorkItem] = [] total = 0 has_more = False try: await self.init_browser() cookie_list = self.parse_cookies(cookies) await self.set_cookies(cookie_list) if not self.page: raise Exception("Page not initialized") # 访问视频号创作者中心 await self.page.goto("https://channels.weixin.qq.com/platform/post/list") await asyncio.sleep(5) # 检查登录状态 current_url = self.page.url if "login" in current_url: raise Exception("Cookie 已过期,请重新登录") # 视频号使用页面爬取方式获取作品列表 # 等待作品列表加载 await self.page.wait_for_selector('div.post-feed-wrap', timeout=10000) # 获取所有作品项 post_items = self.page.locator('div.post-feed-item') item_count = await post_items.count() print(f"[{self.platform_name}] 找到 {item_count} 个作品项") for i in range(min(item_count, page_size)): try: item = post_items.nth(i) # 获取封面 cover_el = item.locator('div.cover-wrap img').first cover_url = '' if await cover_el.count() > 0: cover_url = await cover_el.get_attribute('src') or '' # 获取标题 title_el = item.locator('div.content').first title = '' if await title_el.count() > 0: title = await title_el.text_content() or '' title = title.strip()[:50] # 获取统计数据 stats_el = item.locator('div.post-data') play_count = 0 like_count = 0 comment_count = 0 if await stats_el.count() > 0: stats_text = await stats_el.text_content() or '' # 解析统计数据(格式可能是: 播放 100 点赞 50 评论 10) import re play_match = re.search(r'播放[\s]*([\d.]+[万]?)', stats_text) like_match = re.search(r'点赞[\s]*([\d.]+[万]?)', stats_text) comment_match = re.search(r'评论[\s]*([\d.]+[万]?)', stats_text) def parse_count(match): if not match: return 0 val = match.group(1) if '万' in val: return int(float(val.replace('万', '')) * 10000) return int(val) play_count = parse_count(play_match) like_count = parse_count(like_match) comment_count = parse_count(comment_match) # 获取发布时间 time_el = item.locator('div.time') publish_time = '' if await time_el.count() > 0: publish_time = await time_el.text_content() or '' publish_time = publish_time.strip() # 生成临时 work_id(视频号可能需要从详情页获取) work_id = f"weixin_{i}_{hash(title)}" works.append(WorkItem( work_id=work_id, title=title or '无标题', cover_url=cover_url, duration=0, status='published', publish_time=publish_time, play_count=play_count, like_count=like_count, comment_count=comment_count, )) except Exception as e: print(f"[{self.platform_name}] 解析作品 {i} 失败: {e}") continue total = len(works) has_more = item_count > page_size print(f"[{self.platform_name}] 获取到 {total} 个作品") except Exception as e: import traceback traceback.print_exc() return WorksResult(success=False, platform=self.platform_name, error=str(e)) return WorksResult(success=True, platform=self.platform_name, works=works, total=total, has_more=has_more) async def get_comments(self, cookies: str, work_id: str, cursor: str = "") -> CommentsResult: """获取视频号作品评论""" print(f"\n{'='*60}") print(f"[{self.platform_name}] 获取作品评论") print(f"[{self.platform_name}] work_id={work_id}") print(f"{'='*60}") comments: List[CommentItem] = [] total = 0 has_more = False try: await self.init_browser() cookie_list = self.parse_cookies(cookies) await self.set_cookies(cookie_list) if not self.page: raise Exception("Page not initialized") # 访问评论管理页面 await self.page.goto("https://channels.weixin.qq.com/platform/comment/index") await asyncio.sleep(5) # 检查登录状态 current_url = self.page.url if "login" in current_url: raise Exception("Cookie 已过期,请重新登录") # 等待评论列表加载 try: await self.page.wait_for_selector('div.comment-list', timeout=10000) except: print(f"[{self.platform_name}] 未找到评论列表") return CommentsResult(success=True, platform=self.platform_name, work_id=work_id, comments=[], total=0, has_more=False) # 获取所有评论项 comment_items = self.page.locator('div.comment-item') item_count = await comment_items.count() print(f"[{self.platform_name}] 找到 {item_count} 个评论项") for i in range(item_count): try: item = comment_items.nth(i) # 获取作者信息 author_name = '' author_avatar = '' name_el = item.locator('div.nick-name') if await name_el.count() > 0: author_name = await name_el.text_content() or '' author_name = author_name.strip() avatar_el = item.locator('img.avatar') if await avatar_el.count() > 0: author_avatar = await avatar_el.get_attribute('src') or '' # 获取评论内容 content = '' content_el = item.locator('div.comment-content') if await content_el.count() > 0: content = await content_el.text_content() or '' content = content.strip() # 获取时间 create_time = '' time_el = item.locator('div.time') if await time_el.count() > 0: create_time = await time_el.text_content() or '' create_time = create_time.strip() # 生成评论 ID comment_id = f"weixin_comment_{i}_{hash(content)}" comments.append(CommentItem( comment_id=comment_id, work_id=work_id, content=content, author_id='', author_name=author_name, author_avatar=author_avatar, like_count=0, reply_count=0, create_time=create_time, )) except Exception as e: print(f"[{self.platform_name}] 解析评论 {i} 失败: {e}") continue total = len(comments) print(f"[{self.platform_name}] 获取到 {total} 条评论") except Exception as e: import traceback traceback.print_exc() return CommentsResult(success=False, platform=self.platform_name, work_id=work_id, error=str(e)) return CommentsResult(success=True, platform=self.platform_name, work_id=work_id, comments=comments, total=total, has_more=has_more)