# -*- coding: utf-8 -*- """ 微信视频号发布器 参考: matrix/tencent_uploader/main.py """ import asyncio import json import os from datetime import datetime from typing import List from .base import ( BasePublisher, PublishParams, PublishResult, WorkItem, WorksResult, CommentItem, CommentsResult ) import os import time # 允许通过环境变量手动指定“上传视频入口”的选择器,便于在页面结构频繁变更时快速调整 WEIXIN_UPLOAD_SELECTOR = os.environ.get("WEIXIN_UPLOAD_SELECTOR", "").strip() def format_short_title(origin_title: str) -> str: """ 格式化短标题 - 移除特殊字符 - 长度限制在 6-16 字符 """ allowed_special_chars = "《》"":+?%°" filtered_chars = [ char if char.isalnum() or char in allowed_special_chars else ' ' if char == ',' else '' for char in origin_title ] formatted_string = ''.join(filtered_chars) if len(formatted_string) > 16: formatted_string = formatted_string[:16] elif len(formatted_string) < 6: formatted_string += ' ' * (6 - len(formatted_string)) return formatted_string class WeixinPublisher(BasePublisher): """ 微信视频号发布器 使用 Playwright 自动化操作视频号创作者中心 注意: 需要使用 Chrome 浏览器,否则可能出现 H264 编码错误 """ platform_name = "weixin" login_url = "https://channels.weixin.qq.com/platform" publish_url = "https://channels.weixin.qq.com/platform/post/create" cookie_domain = ".weixin.qq.com" def _parse_count(self, count_str: str) -> int: """解析数字(支持带'万'的格式)""" try: count_str = count_str.strip() if '万' in count_str: return int(float(count_str.replace('万', '')) * 10000) return int(count_str) except: return 0 async def ai_find_upload_selector(self, frame_html: str, frame_name: str = "main") -> str: """ 使用 AI 从 HTML 中识别“上传视频/选择文件”相关元素的 CSS 选择器。 设计思路: - 仅在常规 DOM 选择器都失败时调用,避免频繁占用 AI 配额; - 通过 DashScope 文本模型(与验证码识别同一套配置)分析 HTML; - 返回一个适合用于 frame.locator(selector) 的 CSS 选择器。 """ import json import re import requests import os # 避免 HTML 过长导致 token 超限,只截取前 N 字符 if not frame_html: return "" max_len = 20000 if len(frame_html) > max_len: frame_html = frame_html[:max_len] ai_api_key = os.environ.get("DASHSCOPE_API_KEY", "") ai_base_url = os.environ.get("DASHSCOPE_BASE_URL", "https://dashscope.aliyuncs.com/compatible-mode/v1") ai_text_model = os.environ.get("AI_TEXT_MODEL", "qwen-plus") if not ai_api_key: print(f"[{self.platform_name}] AI上传入口识别: 未配置 AI API Key,跳过") return "" prompt = f""" 你是熟悉微信视频号后台的前端工程师,现在需要在一段 HTML 中找到“上传视频文件”的入口。 页面说明: - 平台:微信视频号(channels.weixin.qq.com) - 目标:用于上传视频文件的按钮或 input(一般会触发文件选择框) - 你会收到某个 frame 的完整 HTML 片段(不包含截图)。 请你根据下面的 HTML,推断最适合用于上传视频文件的元素,并输出一个可以被 Playwright 使用的 CSS 选择器。 要求: 1. 只考虑“上传/选择视频文件”的入口,不要返回“发布/发表/下一步”等按钮; 2. 选择器需要尽量稳定,不要使用自动生成的随机类名(例如带很多随机字母/数字的类名可以用前缀匹配); 3. 选择器必须是 CSS 选择器(不要返回 XPath); 4. 如果确实找不到合理的上传入口,返回 selector 为空字符串。 请以 JSON 格式输出,严格遵守以下结构(不要添加任何解释文字): ```json {{ "selector": "CSS 选择器字符串,比如:input[type='file'] 或 div.upload-content input[type='file']" }} ``` 下面是 frame=\"{frame_name}\" 的 HTML: ```html {frame_html} ```""" payload = { "model": ai_text_model, "messages": [ { "role": "user", "content": prompt, } ], "max_tokens": 600, } headers = { "Authorization": f"Bearer {ai_api_key}", "Content-Type": "application/json", } try: print(f"[{self.platform_name}] AI上传入口识别: 正在分析 frame={frame_name} HTML...") resp = requests.post( f"{ai_base_url}/chat/completions", headers=headers, json=payload, timeout=40, ) if resp.status_code != 200: print(f"[{self.platform_name}] AI上传入口识别: API 返回错误 {resp.status_code}") return "" data = resp.json() content = data.get("choices", [{}])[0].get("message", {}).get("content", "") or "" # 尝试从 ```json``` 代码块中解析 json_match = re.search(r"```json\\s*([\\s\\S]*?)\\s*```", content) if json_match: json_str = json_match.group(1) else: json_match = re.search(r"\\{[\\s\\S]*\\}", content) json_str = json_match.group(0) if json_match else "{}" try: result = json.loads(json_str) except Exception: result = {} selector = (result.get("selector") or "").strip() print(f"[{self.platform_name}] AI上传入口识别结果: selector='{selector}'") return selector except Exception as e: print(f"[{self.platform_name}] AI上传入口识别异常: {e}") return "" async def ai_pick_selector_from_candidates(self, candidates: list, goal: str, frame_name: str = "main") -> str: """ 将“候选元素列表(包含 css selector + 文本/属性)”发给 AI,让 AI 直接挑选最符合 goal 的元素。 适用于:HTML 里看不出上传入口、或页面大量动态渲染时。 """ import json import re import requests import os if not candidates: return "" ai_api_key = os.environ.get("DASHSCOPE_API_KEY", "") ai_base_url = os.environ.get("DASHSCOPE_BASE_URL", "https://dashscope.aliyuncs.com/compatible-mode/v1") ai_text_model = os.environ.get("AI_TEXT_MODEL", "qwen-plus") if not ai_api_key: print(f"[{self.platform_name}] AI候选选择器: 未配置 AI API Key,跳过") return "" # 控制长度,最多取前 120 个候选 candidates = candidates[:120] prompt = f""" 你是自动化发布工程师。现在要在微信视频号(channels.weixin.qq.com)发布页面里找到“{goal}”相关的入口元素。 我会给你一组候选元素,每个候选都包含: - css: 可直接用于 Playwright 的 CSS 选择器 - tag / type / role / ariaLabel / text / id / className(部分字段可能为空) 你的任务: - 从候选中选出最可能用于“{goal}”的元素,返回它的 css 选择器; - 如果没有任何候选符合,返回空字符串。 注意: - 如果 goal 是“上传视频入口”,优先选择 input[type=file] 或看起来会触发选择文件/上传的区域; - 不要选择“发布/发表/下一步”等按钮(除非 goal 明确是发布按钮)。 请严格按 JSON 输出(不要解释): ```json {{ "selector": "..." }} ``` 候选列表(frame={frame_name}): ```json {json.dumps(candidates, ensure_ascii=False)} ```""" payload = { "model": ai_text_model, "messages": [{"role": "user", "content": prompt}], "max_tokens": 400, } headers = { "Authorization": f"Bearer {ai_api_key}", "Content-Type": "application/json", } try: print(f"[{self.platform_name}] AI候选选择器: 正在分析 frame={frame_name}, goal={goal} ...") resp = requests.post( f"{ai_base_url}/chat/completions", headers=headers, json=payload, timeout=40, ) if resp.status_code != 200: print(f"[{self.platform_name}] AI候选选择器: API 返回错误 {resp.status_code}") return "" data = resp.json() content = data.get("choices", [{}])[0].get("message", {}).get("content", "") or "" json_match = re.search(r"```json\\s*([\\s\\S]*?)\\s*```", content) if json_match: json_str = json_match.group(1) else: json_match = re.search(r"\\{[\\s\\S]*\\}", content) json_str = json_match.group(0) if json_match else "{}" try: result = json.loads(json_str) except Exception: result = {} selector = (result.get("selector") or "").strip() print(f"[{self.platform_name}] AI候选选择器结果: selector='{selector}'") return selector except Exception as e: print(f"[{self.platform_name}] AI候选选择器异常: {e}") return "" async def _extract_relevant_html_snippets(self, html: str) -> str: """ 从 HTML 中抽取与上传相关的片段,减少 token,提升 AI 命中率。 - 优先抓取包含 upload/上传/file/input 等关键词的窗口片段 - 若未命中关键词,返回“开头 + 结尾”的拼接 """ import re if not html: return "" patterns = [ r"upload", r"uploader", r"file", r"type\\s*=\\s*['\\\"]file['\\\"]", r"input", r"drag", r"drop", r"选择", r"上传", r"添加", r"视频", ] regex = re.compile("|".join(patterns), re.IGNORECASE) snippets = [] for m in regex.finditer(html): start = max(0, m.start() - 350) end = min(len(html), m.end() + 350) snippets.append(html[start:end]) if len(snippets) >= 18: break if snippets: # 去重(粗略) unique = [] seen = set() for s in snippets: key = hash(s) if key not in seen: seen.add(key) unique.append(s) return "\n\n\n\n".join(unique)[:20000] # fallback: head + tail head = html[:9000] tail = html[-9000:] if len(html) > 9000 else "" return (head + "\n\n\n\n" + tail)[:20000] async def init_browser(self, storage_state: str = None): """初始化浏览器 - 参考 matrix 使用 channel=chrome 避免 H264 编码错误""" from playwright.async_api import async_playwright playwright = await async_playwright().start() proxy = self.proxy_config if isinstance(getattr(self, 'proxy_config', None), dict) else None if proxy and proxy.get('server'): print(f"[{self.platform_name}] 使用代理: {proxy.get('server')}", flush=True) # 参考 matrix: 使用系统内的 Chrome 浏览器,避免 H264 编码错误 # 非 headless 时添加 slow_mo 便于观察点击操作 launch_opts = {"headless": self.headless} if not self.headless: launch_opts["slow_mo"] = 400 # 每个操作间隔 400ms,便于观看 print(f"[{self.platform_name}] 有头模式 + slow_mo=400ms,浏览器将可见", flush=True) try: launch_opts["channel"] = "chrome" if proxy and proxy.get("server"): launch_opts["proxy"] = proxy self.browser = await playwright.chromium.launch(**launch_opts) print(f"[{self.platform_name}] 使用系统 Chrome 浏览器", flush=True) except Exception as e: print(f"[{self.platform_name}] Chrome 不可用,使用 Chromium: {e}", flush=True) if "channel" in launch_opts: del launch_opts["channel"] self.browser = await playwright.chromium.launch(**launch_opts) # 设置 HTTP Headers 防止重定向 headers = { "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36", "Referer": "https://channels.weixin.qq.com/platform/post/list", } self.context = await self.browser.new_context( extra_http_headers=headers, ignore_https_errors=True, viewport={"width": 1920, "height": 1080}, user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36" ) self.page = await self.context.new_page() return self.page async def set_schedule_time(self, publish_date: datetime): """设置定时发布""" if not self.page: return print(f"[{self.platform_name}] 设置定时发布...") # 点击定时选项 label_element = self.page.locator("label").filter(has_text="定时").nth(1) await label_element.click() # 选择日期 await self.page.click('input[placeholder="请选择发表时间"]') publish_month = f"{publish_date.month:02d}" current_month = f"{publish_month}月" # 检查月份 page_month = await self.page.inner_text('span.weui-desktop-picker__panel__label:has-text("月")') if page_month != current_month: await self.page.click('button.weui-desktop-btn__icon__right') # 选择日期 elements = await self.page.query_selector_all('table.weui-desktop-picker__table a') for element in elements: class_name = await element.evaluate('el => el.className') if 'weui-desktop-picker__disabled' in class_name: continue text = await element.inner_text() if text.strip() == str(publish_date.day): await element.click() break # 输入时间 await self.page.click('input[placeholder="请选择时间"]') await self.page.keyboard.press("Control+KeyA") await self.page.keyboard.type(str(publish_date.hour)) # 点击其他地方确认 await self.page.locator("div.input-editor").click() async def handle_upload_error(self, video_path: str): """处理上传错误""" if not self.page: return print(f"[{self.platform_name}] 视频出错了,重新上传中...") await self.page.locator('div.media-status-content div.tag-inner:has-text("删除")').click() await self.page.get_by_role('button', name="删除", exact=True).click() file_input = self.page.locator('input[type="file"]') await file_input.set_input_files(video_path) async def add_title_tags(self, params: PublishParams): """添加标题和话题""" if not self.page: return await self.page.locator("div.input-editor").click() await self.page.keyboard.type(params.title) if params.tags: await self.page.keyboard.press("Enter") for tag in params.tags: await self.page.keyboard.type("#" + tag) await self.page.keyboard.press("Space") print(f"[{self.platform_name}] 成功添加标题和 {len(params.tags)} 个话题") async def add_short_title(self): """添加短标题""" if not self.page: return try: short_title_element = self.page.get_by_text("短标题", exact=True).locator("..").locator( "xpath=following-sibling::div").locator('span input[type="text"]') if await short_title_element.count(): # 获取已有内容作为短标题 pass except: pass async def upload_cover(self, cover_path: str): """上传封面图""" if not self.page or not cover_path or not os.path.exists(cover_path): return try: await asyncio.sleep(2) preview_btn_info = await self.page.locator( 'div.finder-tag-wrap.btn:has-text("更换封面")').get_attribute('class') if "disabled" not in preview_btn_info: await self.page.locator('div.finder-tag-wrap.btn:has-text("更换封面")').click() await self.page.locator('div.single-cover-uploader-wrap > div.wrap').hover() # 删除现有封面 if await self.page.locator(".del-wrap > .svg-icon").count(): await self.page.locator(".del-wrap > .svg-icon").click() # 上传新封面 preview_div = self.page.locator("div.single-cover-uploader-wrap > div.wrap") async with self.page.expect_file_chooser() as fc_info: await preview_div.click() preview_chooser = await fc_info.value await preview_chooser.set_files(cover_path) await asyncio.sleep(2) await self.page.get_by_role("button", name="确定").click() await asyncio.sleep(1) await self.page.get_by_role("button", name="确认").click() print(f"[{self.platform_name}] 封面上传成功") except Exception as e: print(f"[{self.platform_name}] 封面上传失败: {e}") async def check_captcha(self) -> dict: """检查页面是否需要验证码""" if not self.page: return {'need_captcha': False, 'captcha_type': ''} try: # 检查各种验证码 captcha_selectors = [ 'text="请输入验证码"', 'text="滑动验证"', '[class*="captcha"]', '[class*="verify"]', ] for selector in captcha_selectors: try: if await self.page.locator(selector).count() > 0: print(f"[{self.platform_name}] 检测到验证码: {selector}") return {'need_captcha': True, 'captcha_type': 'image'} except: pass # 检查登录弹窗 login_selectors = [ 'text="请登录"', 'text="扫码登录"', '[class*="login-dialog"]', ] for selector in login_selectors: try: if await self.page.locator(selector).count() > 0: print(f"[{self.platform_name}] 检测到需要登录: {selector}") return {'need_captcha': True, 'captcha_type': 'login'} except: pass except Exception as e: print(f"[{self.platform_name}] 验证码检测异常: {e}") return {'need_captcha': False, 'captcha_type': ''} async def publish(self, cookies: str, params: PublishParams) -> PublishResult: """发布视频到视频号""" print(f"\n{'='*60}") print(f"[{self.platform_name}] 开始发布视频") print(f"[{self.platform_name}] 视频路径: {params.video_path}") print(f"[{self.platform_name}] 标题: {params.title}") print(f"[{self.platform_name}] Headless: {self.headless}") print(f"{'='*60}") self.report_progress(5, "正在初始化浏览器...") # 初始化浏览器(使用 Chrome) await self.init_browser() print(f"[{self.platform_name}] 浏览器初始化完成") # 解析并设置 cookies cookie_list = self.parse_cookies(cookies) print(cookie_list) print(f"[{self.platform_name}] 解析到 {len(cookie_list)} 个 cookies") await self.set_cookies(cookie_list) if not self.page: raise Exception("Page not initialized") # 检查视频文件 if not os.path.exists(params.video_path): raise Exception(f"视频文件不存在: {params.video_path}") print(f"[{self.platform_name}] 视频文件存在,大小: {os.path.getsize(params.video_path)} bytes") self.report_progress(10, "正在打开上传页面...") # 访问上传页面 await self.page.goto(self.publish_url, wait_until="networkidle", timeout=60000) await asyncio.sleep(3) # 检查是否跳转到登录页 current_url = self.page.url print(f"[{self.platform_name}] 当前页面: {current_url}") if "login" in current_url: screenshot_base64 = await self.capture_screenshot() return PublishResult( success=False, platform=self.platform_name, error="Cookie 已过期,需要重新登录", need_captcha=True, captcha_type='login', screenshot_base64=screenshot_base64, page_url=current_url, status='need_captcha' ) # 使用 AI 检查验证码 ai_captcha = await self.ai_check_captcha() if ai_captcha['has_captcha']: print(f"[{self.platform_name}] AI检测到验证码: {ai_captcha['captcha_type']}", flush=True) screenshot_base64 = await self.capture_screenshot() return PublishResult( success=False, platform=self.platform_name, error=f"检测到{ai_captcha['captcha_type']}验证码,需要使用有头浏览器完成验证", need_captcha=True, captcha_type=ai_captcha['captcha_type'], screenshot_base64=screenshot_base64, page_url=current_url, status='need_captcha' ) # 传统方式检查验证码 captcha_result = await self.check_captcha() if captcha_result['need_captcha']: screenshot_base64 = await self.capture_screenshot() return PublishResult( success=False, platform=self.platform_name, error=f"需要{captcha_result['captcha_type']}验证码,请使用有头浏览器完成验证", need_captcha=True, captcha_type=captcha_result['captcha_type'], screenshot_base64=screenshot_base64, page_url=current_url, status='need_captcha' ) self.report_progress(15, "正在选择视频文件...") # 上传视频 # 说明:视频号发布页在不同账号/地区/灰度下 DOM 结构差异较大,且上传组件可能在 iframe 中。 # 因此这里按 matrix 的思路“点击触发 file chooser”,同时增加“遍历全部 frame + 精确挑选 video input”的兜底。 upload_success = False if not self.page: raise Exception("Page not initialized") # 等待页面把上传区域渲染出来(避免过早判断) try: await self.page.wait_for_selector("div.upload-content, input[type='file'], iframe", timeout=20000) except Exception: pass async def _try_set_files_in_frame(frame, frame_name: str) -> bool: """在指定 frame 中尝试触发上传""" nonlocal upload_success if upload_success: return True # 方法0:如果用户通过环境变量显式配置了选择器,优先尝试这个 if WEIXIN_UPLOAD_SELECTOR: try: el = frame.locator(WEIXIN_UPLOAD_SELECTOR).first if await el.count() > 0 and await el.is_visible(): print(f"[{self.platform_name}] [{frame_name}] 使用环境变量 WEIXIN_UPLOAD_SELECTOR: {WEIXIN_UPLOAD_SELECTOR}") try: async with self.page.expect_file_chooser(timeout=5000) as fc_info: await el.click() chooser = await fc_info.value await chooser.set_files(params.video_path) upload_success = True print(f"[{self.platform_name}] [{frame_name}] 通过环境变量选择器上传成功") return True except Exception as e: print(f"[{self.platform_name}] [{frame_name}] 环境变量选择器点击失败,尝试直接 set_input_files: {e}") try: await el.set_input_files(params.video_path) upload_success = True print(f"[{self.platform_name}] [{frame_name}] 环境变量选择器 set_input_files 成功") return True except Exception as e2: print(f"[{self.platform_name}] [{frame_name}] 环境变量选择器 set_input_files 仍失败: {e2}") except Exception as e: print(f"[{self.platform_name}] [{frame_name}] 使用环境变量选择器定位元素失败: {e}") # 先尝试点击上传区域触发 chooser(最贴近 matrix) click_selectors = [ "div.upload-content", "div[class*='upload-content']", "div[class*='upload']", "div.add-wrap", "[class*='uploader']", "text=点击上传", "text=上传视频", "text=选择视频", ] for selector in click_selectors: try: el = frame.locator(selector).first if await el.count() > 0 and await el.is_visible(): print(f"[{self.platform_name}] [{frame_name}] 找到可点击上传区域: {selector}") try: async with self.page.expect_file_chooser(timeout=5000) as fc_info: await el.click() chooser = await fc_info.value await chooser.set_files(params.video_path) upload_success = True print(f"[{self.platform_name}] [{frame_name}] 通过 file chooser 上传成功") return True except Exception as e: print(f"[{self.platform_name}] [{frame_name}] 点击触发 chooser 失败: {e}") except Exception: pass # 再尝试直接设置 input[type=file](iframe/隐藏 input 常见) try: inputs = frame.locator("input[type='file']") cnt = await inputs.count() if cnt > 0: best_idx = 0 best_score = -1 for i in range(cnt): try: inp = inputs.nth(i) accept = (await inp.get_attribute("accept")) or "" multiple = (await inp.get_attribute("multiple")) or "" score = 0 if "video" in accept: score += 10 if "mp4" in accept: score += 3 if multiple: score += 1 if score > best_score: best_score = score best_idx = i except Exception: continue target = inputs.nth(best_idx) print(f"[{self.platform_name}] [{frame_name}] 尝试对 input[{best_idx}] set_input_files (score={best_score})") await target.set_input_files(params.video_path) upload_success = True print(f"[{self.platform_name}] [{frame_name}] 通过 file input 上传成功") return True except Exception as e: print(f"[{self.platform_name}] [{frame_name}] file input 上传失败: {e}") # 不直接返回,让后面的 AI 兜底有机会执行 # 方法4: 兜底使用 AI 分析 HTML,猜测上传入口 try: frame_url = getattr(frame, "url", "") html_full = await frame.content() html_for_ai = await self._extract_relevant_html_snippets(html_full) print(f"[{self.platform_name}] [{frame_name}] frame_url={frame_url}, html_len={len(html_full)}, html_for_ai_len={len(html_for_ai)}") ai_selector = await self.ai_find_upload_selector(html_for_ai, frame_name=frame_name) if ai_selector: try: el = frame.locator(ai_selector).first if await el.count() > 0: print(f"[{self.platform_name}] [{frame_name}] 使用 AI 选择器点击上传入口: {ai_selector}") try: async with self.page.expect_file_chooser(timeout=5000) as fc_info: await el.click() chooser = await fc_info.value await chooser.set_files(params.video_path) upload_success = True print(f"[{self.platform_name}] [{frame_name}] 通过 AI 选择器上传成功") return True except Exception as e: print(f"[{self.platform_name}] [{frame_name}] AI 选择器点击失败,改为直接 set_input_files: {e}") try: await el.set_input_files(params.video_path) upload_success = True print(f"[{self.platform_name}] [{frame_name}] AI 选择器直接 set_input_files 成功") return True except Exception as e2: print(f"[{self.platform_name}] [{frame_name}] AI 选择器 set_input_files 仍失败: {e2}") except Exception as e: print(f"[{self.platform_name}] [{frame_name}] 使用 AI 选择器定位元素失败: {e}") else: # 如果 AI 无法从 HTML 推断,退一步:构造候选元素列表交给 AI 选择 try: candidates = await frame.evaluate(""" () => { function cssEscape(s) { try { return CSS.escape(s); } catch (e) { return s.replace(/[^a-zA-Z0-9_-]/g, '\\\\$&'); } } function buildSelector(el) { if (!el || el.nodeType !== 1) return ''; if (el.id) return `#${cssEscape(el.id)}`; let parts = []; let cur = el; for (let depth = 0; cur && cur.nodeType === 1 && depth < 5; depth++) { let part = cur.tagName.toLowerCase(); const role = cur.getAttribute('role'); const type = cur.getAttribute('type'); if (type) part += `[type="${type}"]`; if (role) part += `[role="${role}"]`; const cls = (cur.className || '').toString().trim().split(/\\s+/).filter(Boolean); if (cls.length) part += '.' + cls.slice(0, 2).map(cssEscape).join('.'); // nth-of-type let idx = 1; let sib = cur; while (sib && (sib = sib.previousElementSibling)) { if (sib.tagName === cur.tagName) idx++; } part += `:nth-of-type(${idx})`; parts.unshift(part); cur = cur.parentElement; } return parts.join(' > '); } const nodes = Array.from(document.querySelectorAll('input, button, a, div, span')) .filter(el => { const tag = el.tagName.toLowerCase(); const type = (el.getAttribute('type') || '').toLowerCase(); const role = (el.getAttribute('role') || '').toLowerCase(); const aria = (el.getAttribute('aria-label') || '').toLowerCase(); const txt = (el.innerText || '').trim().slice(0, 60); const cls = (el.className || '').toString().toLowerCase(); const isFile = tag === 'input' && type === 'file'; const looksClickable = tag === 'button' || tag === 'a' || role === 'button' || el.onclick || cls.includes('upload') || cls.includes('uploader') || cls.includes('drag') || aria.includes('上传') || aria.includes('选择') || aria.includes('添加') || txt.includes('上传') || txt.includes('选择') || txt.includes('添加') || txt.includes('点击上传'); if (!isFile && !looksClickable) return false; const r = el.getBoundingClientRect(); const visible = r.width > 5 && r.height > 5; return visible; }); const limited = nodes.slice(0, 120).map(el => ({ css: buildSelector(el), tag: el.tagName.toLowerCase(), type: el.getAttribute('type') || '', role: el.getAttribute('role') || '', ariaLabel: el.getAttribute('aria-label') || '', text: (el.innerText || '').trim().slice(0, 80), id: el.id || '', className: (el.className || '').toString().slice(0, 120), accept: el.getAttribute('accept') || '', })); return limited; } """) ai_selector2 = await self.ai_pick_selector_from_candidates( candidates=candidates, goal="上传视频入口", frame_name=frame_name ) if ai_selector2: el2 = frame.locator(ai_selector2).first if await el2.count() > 0: print(f"[{self.platform_name}] [{frame_name}] 使用 AI 候选选择器点击上传入口: {ai_selector2}") try: async with self.page.expect_file_chooser(timeout=5000) as fc_info: await el2.click() chooser2 = await fc_info.value await chooser2.set_files(params.video_path) upload_success = True print(f"[{self.platform_name}] [{frame_name}] 通过 AI 候选选择器上传成功") return True except Exception as e: print(f"[{self.platform_name}] [{frame_name}] AI 候选选择器点击失败,尝试 set_input_files: {e}") try: await el2.set_input_files(params.video_path) upload_success = True print(f"[{self.platform_name}] [{frame_name}] AI 候选选择器 set_input_files 成功") return True except Exception as e2: print(f"[{self.platform_name}] [{frame_name}] AI 候选选择器 set_input_files 仍失败: {e2}") except Exception as e: print(f"[{self.platform_name}] [{frame_name}] 构造候选并交给 AI 失败: {e}") except Exception as e: print(f"[{self.platform_name}] [{frame_name}] AI 上传入口识别整体失败: {e}") return False # 先尝试主 frame try: await _try_set_files_in_frame(self.page.main_frame, "main") except Exception as e: print(f"[{self.platform_name}] main frame 上传尝试异常: {e}") # 再遍历所有子 frame if not upload_success: try: frames = self.page.frames print(f"[{self.platform_name}] 发现 frames: {len(frames)}") for idx, fr in enumerate(frames): if upload_success: break # main_frame 已尝试过 if fr == self.page.main_frame: continue name = fr.name or f"frame-{idx}" await _try_set_files_in_frame(fr, name) except Exception as e: print(f"[{self.platform_name}] 遍历 frames 异常: {e}") if not upload_success: screenshot_base64 = await self.capture_screenshot() return PublishResult( success=False, platform=self.platform_name, error="未找到上传入口(可能在 iframe 中或页面结构已变更)", screenshot_base64=screenshot_base64, page_url=await self.get_page_url(), status='failed' ) self.report_progress(20, "正在填充标题和话题...") # 添加标题和话题 await self.add_title_tags(params) self.report_progress(30, "等待视频上传完成...") # 等待上传完成 for _ in range(120): try: button_info = await self.page.get_by_role("button", name="发表").get_attribute('class') if "weui-desktop-btn_disabled" not in button_info: print(f"[{self.platform_name}] 视频上传完毕") # 上传封面 self.report_progress(50, "正在上传封面...") await self.upload_cover(params.cover_path) break else: # 检查上传错误 if await self.page.locator('div.status-msg.error').count(): if await self.page.locator('div.media-status-content div.tag-inner:has-text("删除")').count(): await self.handle_upload_error(params.video_path) await asyncio.sleep(3) except: await asyncio.sleep(3) self.report_progress(60, "处理视频设置...") # 添加短标题 try: short_title_el = self.page.get_by_text("短标题", exact=True).locator("..").locator( "xpath=following-sibling::div").locator('span input[type="text"]') if await short_title_el.count(): short_title = format_short_title(params.title) await short_title_el.fill(short_title) except: pass # 定时发布 if params.publish_date: self.report_progress(70, "设置定时发布...") await self.set_schedule_time(params.publish_date) self.report_progress(80, "正在发布...") # 点击发布 - 参考 matrix for i in range(30): try: # 参考 matrix: div.form-btns button:has-text("发表") publish_btn = self.page.locator('div.form-btns button:has-text("发表")') if await publish_btn.count(): print(f"[{self.platform_name}] 点击发布按钮...") await publish_btn.click() # 等待跳转到作品列表页面 - 参考 matrix await self.page.wait_for_url( "https://channels.weixin.qq.com/platform/post/list", timeout=10000 ) self.report_progress(100, "发布成功") print(f"[{self.platform_name}] 视频发布成功!") screenshot_base64 = await self.capture_screenshot() return PublishResult( success=True, platform=self.platform_name, message="发布成功", screenshot_base64=screenshot_base64, page_url=self.page.url, status='success' ) except Exception as e: current_url = self.page.url if "https://channels.weixin.qq.com/platform/post/list" in current_url: self.report_progress(100, "发布成功") print(f"[{self.platform_name}] 视频发布成功!") screenshot_base64 = await self.capture_screenshot() return PublishResult( success=True, platform=self.platform_name, message="发布成功", screenshot_base64=screenshot_base64, page_url=current_url, status='success' ) else: print(f"[{self.platform_name}] 视频正在发布中... {i+1}/30, URL: {current_url}") await asyncio.sleep(1) # 发布超时 screenshot_base64 = await self.capture_screenshot() page_url = await self.get_page_url() return PublishResult( success=False, platform=self.platform_name, error="发布超时,请检查发布状态", screenshot_base64=screenshot_base64, page_url=page_url, status='need_action' ) async def _get_works_fallback_dom(self, page_size: int) -> tuple: """API 失败时从当前页面 DOM 抓取作品列表(兼容新账号/不同入口)""" works: List[WorkItem] = [] total = 0 has_more = False try: for selector in ["div.post-feed-item", "[class*='post-feed']", "[class*='feed-item']", "div[class*='post']"]: try: await self.page.wait_for_selector(selector, timeout=8000) break except Exception: continue post_items = self.page.locator("div.post-feed-item") item_count = await post_items.count() if item_count == 0: post_items = self.page.locator("[class*='post-feed']") item_count = await post_items.count() for i in range(min(item_count, page_size)): try: item = post_items.nth(i) cover_el = item.locator("div.media img.thumb").first cover_url = await cover_el.get_attribute("src") or "" if await cover_el.count() > 0 else "" if not cover_url: cover_el = item.locator("img").first cover_url = await cover_el.get_attribute("src") or "" if await cover_el.count() > 0 else "" title_el = item.locator("div.post-title").first title = (await title_el.text_content() or "").strip() if await title_el.count() > 0 else "" time_el = item.locator("div.post-time span").first publish_time = (await time_el.text_content() or "").strip() if await time_el.count() > 0 else "" play_count = like_count = comment_count = share_count = collect_count = 0 data_items = item.locator("div.post-data div.data-item") for j in range(await data_items.count()): data_item = data_items.nth(j) count_text = (await data_item.locator("span.count").text_content() or "0").strip() if await data_item.locator("span.weui-icon-outlined-eyes-on").count() > 0: play_count = self._parse_count(count_text) elif await data_item.locator("span.weui-icon-outlined-like").count() > 0: like_count = self._parse_count(count_text) elif await data_item.locator("span.weui-icon-outlined-comment").count() > 0: comment_count = self._parse_count(count_text) elif await data_item.locator("use[xlink\\:href='#icon-share']").count() > 0: share_count = self._parse_count(count_text) elif await data_item.locator("use[xlink\\:href='#icon-thumb']").count() > 0: collect_count = self._parse_count(count_text) work_id = f"weixin_{i}_{hash(title)}_{hash(publish_time)}" works.append(WorkItem( work_id=work_id, title=title or "无标题", cover_url=cover_url, duration=0, status="published", publish_time=publish_time, play_count=play_count, like_count=like_count, comment_count=comment_count, share_count=share_count, collect_count=collect_count, )) except Exception as e: print(f"[{self.platform_name}] DOM 解析作品 {i} 失败: {e}", flush=True) continue total = len(works) has_more = item_count > page_size print(f"[{self.platform_name}] DOM 回退获取 {len(works)} 条", flush=True) except Exception as e: print(f"[{self.platform_name}] DOM 回退失败: {e}", flush=True) return (works, total, has_more, "") async def get_works(self, cookies: str, page: int = 0, page_size: int = 20) -> WorksResult: """获取视频号作品列表(调用 post_list 接口) page: 页码从 0 开始,或上一页返回的 rawKeyBuff/lastBuff 字符串 """ # 分页:首页 currentPage=1/rawKeyBuff=null,下一页用 currentPage 递增或 rawKeyBuff if page is None or page == "" or (isinstance(page, int) and page == 0): current_page = 1 raw_key_buff = None elif isinstance(page, int): current_page = page + 1 raw_key_buff = None else: current_page = 1 raw_key_buff = str(page) ts_ms = str(int(time.time() * 1000)) print(f"\n{'='*60}") print(f"[{self.platform_name}] 获取作品列表 currentPage={current_page}, pageSize={page_size}, rawKeyBuff={raw_key_buff[:40] if raw_key_buff else 'null'}...") print(f"{'='*60}") works: List[WorkItem] = [] total = 0 has_more = False next_page = "" try: await self.init_browser() cookie_list = self.parse_cookies(cookies) await self.set_cookies(cookie_list) if not self.page: raise Exception("Page not initialized") await self.page.goto("https://channels.weixin.qq.com/platform/post/list", timeout=30000) await asyncio.sleep(3) current_url = self.page.url if "login" in current_url: raise Exception("Cookie 已过期,请重新登录") api_url = "https://channels.weixin.qq.com/micro/content/cgi-bin/mmfinderassistant-bin/post/post_list" req_body = { "pageSize": page_size, "currentPage": current_page, "userpageType": 11, "stickyOrder": True, "timestamp": ts_ms, "_log_finder_uin": "", "_log_finder_id": "", "rawKeyBuff": raw_key_buff, "pluginSessionId": None, "scene": 7, "reqScene": 7, } body_str = json.dumps(req_body) response = await self.page.evaluate(""" async ([url, bodyStr]) => { try { const resp = await fetch(url, { method: 'POST', credentials: 'include', headers: { 'Content-Type': 'application/json', 'Accept': '*/*', 'Referer': 'https://channels.weixin.qq.com/platform/post/list' }, body: bodyStr }); return await resp.json(); } catch (e) { return { error: e.toString() }; } } """, [api_url, body_str]) is_first_page = current_page == 1 and raw_key_buff is None if response.get("error"): print(f"[{self.platform_name}] API 请求失败: {response.get('error')}", flush=True) if is_first_page: works, total, has_more, next_page = await self._get_works_fallback_dom(page_size) if works: return WorksResult(success=True, platform=self.platform_name, works=works, total=total, has_more=has_more, next_page=next_page) return WorksResult(success=False, platform=self.platform_name, error=response.get("error", "API 请求失败")) err_code = response.get("errCode", -1) if err_code != 0: err_msg = response.get("errMsg", "unknown") print(f"[{self.platform_name}] API errCode={err_code}, errMsg={err_msg}, 完整响应(前800字): {json.dumps(response, ensure_ascii=False)[:800]}", flush=True) if is_first_page: works, total, has_more, next_page = await self._get_works_fallback_dom(page_size) if works: return WorksResult(success=True, platform=self.platform_name, works=works, total=total, has_more=has_more, next_page=next_page) return WorksResult(success=False, platform=self.platform_name, error=f"errCode={err_code}, errMsg={err_msg}") data = response.get("data") or {} raw_list = data.get("list") or [] total = int(data.get("totalCount") or 0) has_more = bool(data.get("continueFlag", False)) next_page = (data.get("lastBuff") or "").strip() print(f"[{self.platform_name}] API 响应: list_len={len(raw_list)}, totalCount={total}, continueFlag={has_more}, lastBuff={next_page[:50] if next_page else ''}...") if is_first_page and len(raw_list) == 0: works_fb, total_fb, has_more_fb, _ = await self._get_works_fallback_dom(page_size) if works_fb: return WorksResult(success=True, platform=self.platform_name, works=works_fb, total=total_fb, has_more=has_more_fb, next_page="") for item in raw_list: try: # 存 works.platform_video_id 统一用 post_list 接口回参中的 exportId(如 export/xxx) work_id = str(item.get("exportId") or item.get("objectId") or item.get("id") or "").strip() if not work_id: work_id = f"weixin_{hash(item.get('createTime',0))}_{hash(item.get('desc', {}).get('description',''))}" desc = item.get("desc") or {} title = (desc.get("description") or "").strip() or "无标题" cover_url = "" duration = 0 media_list = desc.get("media") or [] if media_list and isinstance(media_list[0], dict): m = media_list[0] cover_url = (m.get("coverUrl") or m.get("thumbUrl") or "").strip() duration = int(m.get("videoPlayLen") or 0) create_ts = item.get("createTime") or 0 if isinstance(create_ts, (int, float)) and create_ts: publish_time = datetime.fromtimestamp(create_ts).strftime("%Y-%m-%d %H:%M:%S") else: publish_time = str(create_ts) if create_ts else "" read_count = int(item.get("readCount") or 0) like_count = int(item.get("likeCount") or 0) comment_count = int(item.get("commentCount") or 0) forward_count = int(item.get("forwardCount") or 0) fav_count = int(item.get("favCount") or 0) works.append(WorkItem( work_id=work_id, title=title, cover_url=cover_url, duration=duration, status="published", publish_time=publish_time, play_count=read_count, like_count=like_count, comment_count=comment_count, share_count=forward_count, collect_count=fav_count, )) except Exception as e: print(f"[{self.platform_name}] 解析作品项失败: {e}", flush=True) continue if total == 0 and works: total = len(works) print(f"[{self.platform_name}] 本页获取 {len(works)} 条,totalCount={total}, next_page={bool(next_page)}") except Exception as e: import traceback traceback.print_exc() return WorksResult(success=False, platform=self.platform_name, error=str(e)) return WorksResult(success=True, platform=self.platform_name, works=works, total=total, has_more=has_more, next_page=next_page) async def sync_work_daily_stats_via_browser( self, cookies: str, work_id: int, platform_video_id: str ) -> dict: """ 通过浏览器自动化同步单个作品的每日数据到 work_day_statistics。 流程: 1. 打开 statistic/post 页,点击单篇视频 tab,点击近30天 2. 监听 post_list 接口,根据 exportId 匹配 platform_video_id 得到 objectId 3. 找到 data-row-key=objectId 的行,点击「查看」 4. 进入详情页,点击数据详情的近30天,点击下载表格 5. 解析 CSV 并返回 statistics 列表(供 Node 保存) """ import csv import tempfile from pathlib import Path result = {"success": False, "error": "", "statistics": [], "inserted": 0, "updated": 0} post_list_data = {"list": []} async def handle_response(response): try: if "statistic/post_list" in response.url and response.request.method == "POST": try: body = await response.json() if body.get("errCode") == 0 and body.get("data"): post_list_data["list"] = body.get("data", {}).get("list", []) except Exception: pass except Exception: pass try: await self.init_browser() cookie_list = self.parse_cookies(cookies) await self.set_cookies(cookie_list) if not self.page: raise Exception("Page not initialized") self.page.on("response", handle_response) # 1. 打开数据分析-作品数据页 print(f"[{self.platform_name}] 打开数据分析页...", flush=True) await self.page.goto("https://channels.weixin.qq.com/platform/statistic/post", timeout=30000) if not self.headless: print(f"[{self.platform_name}] 浏览器已打开,请将窗口置于前台观看操作(等待 5 秒)...", flush=True) await asyncio.sleep(5) else: await asyncio.sleep(3) if "login" in self.page.url: raise Exception("Cookie 已过期,请重新登录") # 2. 点击「单篇视频」tab tab_sel = "div.weui-desktop-tab__navs ul li:nth-child(2) a" try: await self.page.wait_for_selector(tab_sel, timeout=8000) await self.page.click(tab_sel) except Exception: tab_sel = "a:has-text('单篇视频')" await self.page.click(tab_sel) await asyncio.sleep(2) # 3. 点击「近30天」(单篇视频页的日期范围筛选) # 选择器优先级:精确匹配单篇视频区域内的日期范围 radio 组 radio_selectors = [ "div.post-single-wrap div.weui-desktop-radio-group.radio-group label:has-text('近30天')", "div.post-single-wrap div.filter-wrap div.weui-desktop-radio-group label:nth-child(2)", "div.post-single-wrap div.card-body div.filter-wrap div:nth-child(2) label:nth-child(2)", "div.post-single-wrap label:has-text('近30天')", "div.weui-desktop-radio-group label:has-text('近30天')", "label:has-text('近30天')", ] clicked = False for sel in radio_selectors: try: el = self.page.locator(sel).first if await el.count() > 0: await el.click() clicked = True print(f"[{self.platform_name}] 已点击近30天按钮 (selector: {sel[:50]}...)", flush=True) break except Exception as e: continue if not clicked: print(f"[{self.platform_name}] 警告: 未找到近30天按钮,继续尝试...", flush=True) await asyncio.sleep(3) # 4. 从 post_list 响应中找 exportId -> objectId export_id_to_object = {} for item in post_list_data["list"]: eid = (item.get("exportId") or "").strip() oid = (item.get("objectId") or "").strip() if eid and oid: export_id_to_object[eid] = oid object_id = export_id_to_object.get(platform_video_id) or export_id_to_object.get( platform_video_id.strip() ) if not object_id: # 尝试宽松匹配(platform_video_id 可能带前缀) for eid, oid in export_id_to_object.items(): if platform_video_id in eid or eid in platform_video_id: object_id = oid break if not object_id: result["error"] = f"未在 post_list 中匹配到 exportId={platform_video_id}" print(f"[{self.platform_name}] {result['error']}", flush=True) return result # 5. 找到 data-row-key=objectId 的行,点击「查看」 view_btn = self.page.locator(f'tr[data-row-key="{object_id}"] a.detail-wrap, tr[data-row-key="{object_id}"] a:has-text("查看")') try: await view_btn.first.wait_for(timeout=5000) await view_btn.first.click() except Exception as e: view_btn = self.page.locator(f'tr[data-row-key="{object_id}"] a') if await view_btn.count() > 0: await view_btn.first.click() else: raise Exception(f"未找到 objectId={object_id} 的查看按钮: {e}") await asyncio.sleep(3) # 6. 详情页:点击数据详情的「近30天」,再点击「下载表格」 detail_radio = "div.post-statistic-common div.filter-wrap label:nth-child(2)" for sel in [detail_radio, "div.main-body label:has-text('近30天')"]: try: el = self.page.locator(sel).first if await el.count() > 0: await el.click() break except Exception: continue await asyncio.sleep(2) # 保存到 server/tmp 目录 download_dir = Path(__file__).resolve().parent.parent.parent / "tmp" download_dir.mkdir(parents=True, exist_ok=True) async with self.page.expect_download(timeout=15000) as download_info: download_btn = self.page.locator("div.post-statistic-common div.filter-extra a, a:has-text('下载表格')") if await download_btn.count() == 0: raise Exception("未找到「下载表格」按钮") await download_btn.first.click() download = await download_info.value save_path = download_dir / f"work_{work_id}_{int(time.time())}.csv" await download.save_as(save_path) # 7. 解析 CSV -> statistics stats_list = [] with open(save_path, "r", encoding="utf-8-sig", errors="replace") as f: reader = csv.DictReader(f) rows = list(reader) for row in rows: date_val = ( row.get("日期") or row.get("date") or row.get("时间") or row.get("时间周期", "") ).strip() if not date_val: continue dt = None norm = date_val[:10].replace("年", "-").replace("月", "-").replace("日", "-").replace("/", "-") if len(norm) >= 8 and norm.count("-") >= 2: parts = norm.split("-") if len(parts) == 3: try: y, m, d = int(parts[0]), int(parts[1]), int(parts[2]) if 2000 <= y <= 2100 and 1 <= m <= 12 and 1 <= d <= 31: dt = datetime(y, m, d) except (ValueError, IndexError): pass if not dt: for fmt in ["%Y-%m-%d", "%Y/%m/%d", "%m/%d/%Y"]: try: dt = datetime.strptime((date_val.split()[0] if date_val else "")[:10], fmt) break except (ValueError, IndexError): dt = None if not dt: continue rec_date = dt.strftime("%Y-%m-%d") play = self._parse_count(row.get("播放", "") or row.get("播放量", "") or row.get("play_count", "0")) like = self._parse_count(row.get("点赞", "") or row.get("like_count", "0")) comment = self._parse_count(row.get("评论", "") or row.get("comment_count", "0")) share = self._parse_count(row.get("分享", "") or row.get("share_count", "0")) collect = self._parse_count(row.get("收藏", "") or row.get("collect_count", "0")) comp_rate = (row.get("完播率", "") or row.get("completion_rate", "0")).strip().rstrip("%") or "0" avg_dur = (row.get("平均播放时长", "") or row.get("avg_watch_duration", "0")).strip() stats_list.append({ "work_id": work_id, "record_date": rec_date, "play_count": play, "like_count": like, "comment_count": comment, "share_count": share, "collect_count": collect, "completion_rate": comp_rate, "avg_watch_duration": avg_dur, }) result["statistics"] = stats_list result["success"] = True try: os.remove(save_path) except Exception: pass except Exception as e: import traceback traceback.print_exc() result["error"] = str(e) finally: try: await self.close_browser() except Exception: pass return result async def sync_account_works_daily_stats_via_browser( self, cookies: str, works: List[dict], save_fn=None, update_works_fn=None, headless: bool = True, ) -> dict: """ 纯浏览器批量同步账号下所有作品(在库的)的每日数据到 work_day_statistics。 流程: 1. 打开 statistic/post → 点击单篇视频 → 点击近30天 2. 【首次】监听 post_list 接口 → 解析响应更新 works 表 yesterday_* 字段 3. 监听 post_list 获取 exportId->objectId 映射 4. 遍历 post_list 的每一条: - 若 exportId 在 works 的 platform_video_id 中无匹配 → 跳过 - 若匹配 → 找到 data-row-key=objectId 的行,点击「查看」 - 详情页:默认近7天,直接监听 feed_aggreagate_data_by_tab_type 接口 - 从「全部」tab 解析 browse/like/comment/forward/fav/follow,日期从昨天往前推 - 通过 save_fn 存入 work_day_statistics - 返回列表页,继续下一条 works: [{"work_id": int, "platform_video_id": str}, ...] save_fn: (stats_list: List[dict]) -> {inserted, updated},由调用方传入,用于调用 Node batch-dates update_works_fn: (updates: List[dict]) -> {updated},由调用方传入,用于将 post_list 解析数据更新到 works 表(仅首次调用) """ from pathlib import Path from datetime import timedelta result = { "success": True, "error": "", "total_processed": 0, "total_skipped": 0, "inserted": 0, "updated": 0, "works_updated": 0, } # platform_video_id(exportId) -> work_id export_id_to_work = {} for w in works: pvid = (w.get("platform_video_id") or w.get("platformVideoId") or "").strip() wid = w.get("work_id") or w.get("workId") if pvid and wid is not None: export_id_to_work[pvid] = int(wid) # 兼容可能带/不带前缀(如 export/xxx vs xxx) if "/" in pvid: export_id_to_work[pvid.split("/")[-1]] = int(wid) post_list_data = {"list": []} feed_aggreagate_data = {"body": None} async def handle_response(response): try: url = response.url if "statistic/post_list" in url: try: body = await response.json() if body.get("errCode") == 0 and body.get("data"): post_list_data["list"] = body.get("data", {}).get("list", []) except Exception: pass elif "feed_aggreagate_data_by_tab_type" in url: try: body = await response.json() if body.get("errCode") == 0 and body.get("data"): feed_aggreagate_data["body"] = body except Exception: pass except Exception: pass try: await self.init_browser() cookie_list = self.parse_cookies(cookies) await self.set_cookies(cookie_list) if not self.page: raise Exception("Page not initialized") self.page.on("response", handle_response) # 1. 打开数据分析-作品数据页 print(f"[{self.platform_name}] 打开数据分析页...", flush=True) await self.page.goto("https://channels.weixin.qq.com/platform/statistic/post", timeout=30000) if not headless: print(f"[{self.platform_name}] 浏览器已打开,请将窗口置于前台观看操作(等待 5 秒)...", flush=True) await asyncio.sleep(5) else: await asyncio.sleep(3) if "login" in self.page.url: raise Exception("Cookie 已过期,请重新登录") # 2. 点击「单篇视频」tab tab_sel = "div.weui-desktop-tab__navs ul li:nth-child(2) a" try: await self.page.wait_for_selector(tab_sel, timeout=8000) await self.page.click(tab_sel) except Exception: tab_sel = "a:has-text('单篇视频')" await self.page.click(tab_sel) await asyncio.sleep(2) # 3. 点击「近30天」前清空 list,点击后等待 handler 捕获带 fullPlayRate 的 post_list post_list_data["list"] = [] radio_selectors = [ "div.post-single-wrap div.weui-desktop-radio-group.radio-group label:has-text('近30天')", "div.post-single-wrap div.filter-wrap div.weui-desktop-radio-group label:nth-child(2)", "div.post-single-wrap label:has-text('近30天')", "div.weui-desktop-radio-group label:has-text('近30天')", "label:has-text('近30天')", ] clicked = False for sel in radio_selectors: try: el = self.page.locator(sel).first if await el.count() > 0: await el.click() clicked = True print(f"[{self.platform_name}] 已点击近30天 (selector: {sel[:40]}...)", flush=True) break except Exception: continue if not clicked: print(f"[{self.platform_name}] 警告: 未找到近30天按钮", flush=True) await asyncio.sleep(5) # 4. 从 post_list 获取列表 items = post_list_data["list"] if not items: result["error"] = "未监听到 post_list 或列表为空" print(f"[{self.platform_name}] {result['error']}", flush=True) return result # 4.5 【仅首次】从 post_list 接口响应解析数据 → 更新 works 表(不再下载 CSV) # post_list 返回字段映射: readCount->播放量, likeCount->点赞, commentCount->评论, forwardCount->分享, # fullPlayRate->完播率(0-1小数), avgPlayTimeSec->平均播放时长(秒), exportId->匹配 work_id if update_works_fn and items: try: updates = [] for it in items: eid = (it.get("exportId") or "").strip() if not eid: continue work_id = export_id_to_work.get(eid) if work_id is None: for k, v in export_id_to_work.items(): if eid in k or k in eid: work_id = v break if work_id is None: continue read_count = int(it.get("readCount") or 0) like_count = int(it.get("likeCount") or 0) comment_count = int(it.get("commentCount") or 0) forward_count = int(it.get("forwardCount") or 0) follow_count = int(it.get("followCount") or 0) full_play_rate = it.get("fullPlayRate") if full_play_rate is not None: comp_rate = f"{float(full_play_rate) * 100:.2f}%" else: comp_rate = "0" avg_sec = it.get("avgPlayTimeSec") if avg_sec is not None: avg_dur = f"{float(avg_sec):.2f}秒" else: avg_dur = "0" updates.append({ "work_id": work_id, "yesterday_play_count": read_count, "yesterday_like_count": like_count, "yesterday_comment_count": comment_count, "yesterday_share_count": forward_count, "yesterday_follow_count": follow_count, "yesterday_completion_rate": comp_rate, "yesterday_avg_watch_duration": avg_dur, }) if updates: try: save_result = update_works_fn(updates) result["works_updated"] = save_result.get("updated", 0) except Exception as api_err: import traceback traceback.print_exc() except Exception as e: import traceback traceback.print_exc() print(f"[{self.platform_name}] 解析 post_list 更新 works 失败: {e}", flush=True) # 辅助:点击单篇视频 + 近30天,恢复列表视图(go_back 后会回到全部视频页) async def ensure_single_video_near30(): tab_sel = "div.weui-desktop-tab__navs ul li:nth-child(2) a" try: await self.page.wait_for_selector(tab_sel, timeout=8000) await self.page.click(tab_sel) except Exception: await self.page.click("a:has-text('单篇视频')") await asyncio.sleep(2) for sel in [ "div.post-single-wrap div.weui-desktop-radio-group.radio-group label:has-text('近30天')", "div.post-single-wrap label:has-text('近30天')", "div.weui-desktop-radio-group label:has-text('近30天')", "label:has-text('近30天')", ]: try: el = self.page.locator(sel).first if await el.count() > 0: await el.click() break except Exception: continue await asyncio.sleep(3) # 5. 遍历每一条,按 exportId 匹配作品 processed_export_ids = set() for idx, item in enumerate(items): eid = (item.get("exportId") or "").strip() oid = (item.get("objectId") or "").strip() if not oid: continue # 已处理过的跳过(理论上循环顺序即处理顺序,此处做双重保险) if eid in processed_export_ids: print(f"[{self.platform_name}] 跳过 [{idx+1}] exportId={eid} (已处理)", flush=True) continue # go_back 后回到全部视频页,需重新点击单篇视频+近30天 if idx > 0: await ensure_single_video_near30() # 匹配 work_id work_id = export_id_to_work.get(eid) if work_id is None: for k, v in export_id_to_work.items(): if eid in k or k in eid: work_id = v break if work_id is None: result["total_skipped"] += 1 print(f"[{self.platform_name}] 跳过 [{idx+1}] exportId={eid} (库中无对应作品)", flush=True) continue # 点击「查看」:Ant Design 表格 tr[data-row-key] > td > div.slot-wrap > a.detail-wrap # 操作列可能在 ant-table-fixed-right 内,优先尝试 view_selectors = [ f'div.ant-table-fixed-right tr[data-row-key="{oid}"] a.detail-wrap', f'tr[data-row-key="{oid}"] a.detail-wrap', f'tr[data-row-key="{oid}"] td a.detail-wrap', f'tr[data-row-key="{oid}"] a:has-text("查看")', f'tr[data-row-key="{oid}"] a', ] clicked = False for sel in view_selectors: view_btn = self.page.locator(sel) if await view_btn.count() > 0: try: await view_btn.first.wait_for(timeout=3000) await view_btn.first.click() clicked = True print(f"[{self.platform_name}] 已点击查看 (selector: {sel[:40]}...)", flush=True) break except Exception as e: continue if not clicked: print(f"[{self.platform_name}] 未找到 objectId={oid} 的查看按钮", flush=True) result["total_skipped"] += 1 continue await asyncio.sleep(3) # 详情页:默认展示近7天,页面加载时自动请求 feed_aggreagate,不清空 body 避免覆盖已监听到的响应 await asyncio.sleep(4) # 从 feed_aggreagate 响应解析「全部」数据 # 数据结构: data.dataByFanstype[].dataByTabtype[] 中 tabTypeName="全部" 或 tabType=999 # 日期:从昨天往前推 N 天(含昨天),数组从最早到最晚排列 body = feed_aggreagate_data.get("body") if not body or not body.get("data"): print(f"[{self.platform_name}] work_id={work_id} 未监听到 feed_aggreagate 有效响应", flush=True) await self.page.go_back() await asyncio.sleep(2) continue tab_all = None for fan_item in body.get("data", {}).get("dataByFanstype", []): for tab_item in fan_item.get("dataByTabtype", []): if tab_item.get("tabTypeName") == "全部" or tab_item.get("tabType") == 999: tab_all = tab_item.get("data") break if tab_all is not None: break if not tab_all: tab_all = body.get("data", {}).get("feedData", [{}])[0].get("totalData") if not tab_all: print(f"[{self.platform_name}] work_id={work_id} 未找到「全部」数据", flush=True) await self.page.go_back() await asyncio.sleep(2) continue browse = tab_all.get("browse", []) n = len(browse) if n == 0: print(f"[{self.platform_name}] work_id={work_id} browse 为空", flush=True) await self.page.go_back() await asyncio.sleep(2) continue # 日期:昨天往前推 n 天,index 0 = 最早日 today = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0) yesterday = today - timedelta(days=1) start_date = yesterday - timedelta(days=n - 1) like_arr = tab_all.get("like", []) comment_arr = tab_all.get("comment", []) forward_arr = tab_all.get("forward", []) fav_arr = tab_all.get("fav", []) follow_arr = tab_all.get("follow", []) stats_list = [] for i in range(n): rec_dt = start_date + timedelta(days=i) rec_date = rec_dt.strftime("%Y-%m-%d") play = self._parse_count(browse[i] if i < len(browse) else "0") like = self._parse_count(like_arr[i] if i < len(like_arr) else "0") comment = self._parse_count(comment_arr[i] if i < len(comment_arr) else "0") share = self._parse_count(forward_arr[i] if i < len(forward_arr) else "0") follow = self._parse_count(follow_arr[i] if i < len(follow_arr) else "0") # fav[i] 不入库,follow[i] 入 follow_count stats_list.append({ "work_id": work_id, "record_date": rec_date, "play_count": play, "like_count": like, "comment_count": comment, "share_count": share, "collect_count": 0, "follow_count": follow, "completion_rate": "0", "avg_watch_duration": "0", }) print(f"[{self.platform_name}] work_id={work_id} 从 feed_aggreagate 解析得到 {len(stats_list)} 条日统计", flush=True) # 存入 work_day_statistics(通过 save_fn 调用 Node) if save_fn and stats_list: try: save_result = save_fn(stats_list) result["inserted"] += save_result.get("inserted", 0) result["updated"] += save_result.get("updated", 0) except Exception as e: print(f"[{self.platform_name}] work_id={work_id} 保存失败: {e}", flush=True) result["total_processed"] += 1 processed_export_ids.add(eid) # 返回列表页,继续下一条(会回到全部视频页,下次循环会重新点击单篇视频+近30天) await self.page.go_back() await asyncio.sleep(2) print(f"[{self.platform_name}] 批量同步完成: 处理 {result['total_processed']} 个作品, 跳过 {result['total_skipped']} 个", flush=True) except Exception as e: import traceback traceback.print_exc() result["success"] = False result["error"] = str(e) finally: try: await self.close_browser() except Exception: pass return result async def get_comments(self, cookies: str, work_id: str, cursor: str = "") -> CommentsResult: """ 获取视频号作品评论(完全参考 get_weixin_work_comments.py 的接口监听逻辑) 支持递归提取二级评论,正确处理 parent_comment_id """ print(f"\n{'='*60}") print(f"[{self.platform_name}] 获取作品评论") print(f"[{self.platform_name}] work_id={work_id}") print(f"{'='*60}") comments: List[CommentItem] = [] total = 0 has_more = False try: await self.init_browser() cookie_list = self.parse_cookies(cookies) await self.set_cookies(cookie_list) if not self.page: raise Exception("Page not initialized") # 访问评论管理页面 print(f"[{self.platform_name}] 正在打开评论页面...") await self.page.goto("https://channels.weixin.qq.com/platform/interaction/comment", timeout=30000) await asyncio.sleep(2) # 检查登录状态 current_url = self.page.url if "login" in current_url: raise Exception("Cookie 已过期,请重新登录") # === 步骤1: 监听 post_list 接口获取作品列表 === posts = [] try: async with self.page.expect_response( lambda res: "/post/post_list" in res.url, timeout=20000 ) as post_resp_info: await self.page.wait_for_selector('.scroll-list .comment-feed-wrap', timeout=15000) post_resp = await post_resp_info.value post_data = await post_resp.json() if post_data.get("errCode") == 0: posts = post_data.get("data", {}).get("list", []) print(f"[{self.platform_name}] ✅ 获取 {len(posts)} 个作品") else: err_msg = post_data.get("errMsg", "未知错误") print(f"[{self.platform_name}] ❌ post_list 业务错误: {err_msg}") return CommentsResult( success=False, platform=self.platform_name, work_id=work_id, error=f"post_list 业务错误: {err_msg}" ) except Exception as e: print(f"[{self.platform_name}] ❌ 获取 post_list 失败: {e}") return CommentsResult( success=False, platform=self.platform_name, work_id=work_id, error=f"获取 post_list 失败: {e}" ) # === 步骤2: 在 DOM 中查找目标作品 === feed_wraps = await self.page.query_selector_all('.scroll-list .comment-feed-wrap') target_feed = None target_post = None target_index = -1 for i, feed in enumerate(feed_wraps): if i >= len(posts): break post = posts[i] object_nonce = post.get("objectNonce", "") post_work_id = post.get("objectId", "") or object_nonce # 匹配 work_id(支持 objectId 或 objectNonce 匹配) if work_id in [post_work_id, object_nonce] or post_work_id in work_id or object_nonce in work_id: target_feed = feed target_post = post target_index = i work_title = post.get("desc", {}).get("description", "无标题") print(f"[{self.platform_name}] ✅ 找到目标作品: {work_title}") continue if not target_feed or not target_post: print(f"[{self.platform_name}] ❌ 未找到 work_id={work_id} 对应的作品") return CommentsResult( success=True, platform=self.platform_name, work_id=work_id, comments=[], total=0, has_more=False ) # 准备作品信息(用于递归函数) object_nonce = target_post.get("objectNonce", f"nonce_{target_index}") work_title = target_post.get("desc", {}).get("description", f"作品{target_index+1}") work_info = { "work_id": object_nonce, "work_title": work_title } # === 步骤3: 点击作品触发 comment_list 接口 === content_wrap = await target_feed.query_selector('.feed-content') or target_feed try: async with self.page.expect_response( lambda res: "/comment/comment_list" in res.url, timeout=15000 ) as comment_resp_info: await content_wrap.click() await asyncio.sleep(0.8) comment_resp = await comment_resp_info.value comment_data = await comment_resp.json() if comment_data.get("errCode") != 0: err_msg = comment_data.get("errMsg", "未知错误") print(f"[{self.platform_name}] ❌ 评论接口错误: {err_msg}") return CommentsResult( success=False, platform=self.platform_name, work_id=work_id, error=f"评论接口错误: {err_msg}" ) raw_comments = comment_data.get("data", {}).get("comment", []) total = comment_data.get("data", {}).get("totalCount", len(raw_comments)) print(f"[{self.platform_name}] 📊 原始评论数: {len(raw_comments)}, 总数: {total}") # === 步骤4: 递归提取所有评论(含子评论)=== extracted = self._extract_comments(raw_comments, parent_id="", work_info=work_info) # === 步骤5: 转换为 CommentItem 列表(保留 weixin.py 的数据结构)=== for c in extracted: # 使用接口返回的 comment_id comment_id = c.get("comment_id", "") parent_comment_id = c.get("parent_comment_id", "") # 构建 CommentItem(保留原有数据结构用于数据库入库) comment_item = CommentItem( comment_id=comment_id, parent_comment_id=parent_comment_id, work_id=work_id, content=c.get("content", ""), author_id=c.get("username", ""), # 使用 username 作为 author_id author_name=c.get("nickname", ""), author_avatar=c.get("avatar", ""), like_count=c.get("like_count", 0), reply_count=0, create_time=c.get("create_time", ""), ) # 添加扩展字段(用于数据库存储和后续处理) # comment_item.parent_comment_id = c.get("parent_comment_id", "") comment_item.is_author = c.get("is_author", False) comment_item.create_time_unix = c.get("create_time_unix", 0) comment_item.work_title = c.get("work_title", "") print(comment_item) comments.append(comment_item) # 打印日志 author_tag = " 👤(作者)" if c.get("is_author") else "" parent_tag = f" [回复: {c.get('parent_comment_id', '')}]" if c.get("parent_comment_id") else "" print(f"[{self.platform_name}] - [{c.get('nickname', '')}] {c.get('content', '')[:30]}... " f"({c.get('create_time', '')}){author_tag}{parent_tag}") # 判断是否还有更多(优先使用接口返回的 continueFlag,否则根据数量判断) has_more = comment_data.get("data", {}).get("continueFlag", False) or len(extracted) < total print(f"[{self.platform_name}] ✅ 共提取 {len(comments)} 条评论(含子评论)") except Exception as e: print(f"[{self.platform_name}] ❌ 获取评论失败: {e}") import traceback traceback.print_exc() return CommentsResult( success=False, platform=self.platform_name, work_id=work_id, error=f"获取评论失败: {e}" ) except Exception as e: import traceback traceback.print_exc() return CommentsResult( success=False, platform=self.platform_name, work_id=work_id, error=str(e) ) return CommentsResult( success=True, platform=self.platform_name, work_id=work_id, comments=comments, total=total, has_more=has_more ) def _extract_comments(self, comment_list: list, parent_id: str = "", work_info: dict = None) -> list: """ 递归提取一级和二级评论(完全参考 get_weixin_work_comments.py 的 extract_comments 函数) Args: comment_list: 评论列表(原始接口数据) parent_id: 父评论ID(一级评论为空字符串"",二级评论为父级评论ID) work_info: 作品信息字典 Returns: list: 扁平化的评论列表,包含一级和二级评论 """ result = [] # 获取当前用户 username(用于判断是否为作者) # 优先从环境变量获取,也可通过其他方式配置 my_username = getattr(self, 'my_username', '') or os.environ.get('WEIXIN_MY_USERNAME', '') for cmt in comment_list: # 处理时间戳 create_ts = int(cmt.get("commentCreatetime", 0) or 0) readable_time = ( datetime.fromtimestamp(create_ts).strftime('%Y-%m-%d %H:%M:%S') if create_ts > 0 else "" ) # 判断是否作者(如果配置了 my_username) username = cmt.get("username", "") or "" is_author = (my_username != "") and (username == my_username) # 构建评论条目 - 完全参考 get_weixin_work_comments.py 的字段 entry = { "work_id": work_info.get("work_id", "") if work_info else "", "work_title": work_info.get("work_title", "") if work_info else "", "comment_id": cmt.get("commentId"), "parent_comment_id": parent_id, # 关键:一级评论为空字符串"",二级评论为父评论ID "username": username, "nickname": cmt.get("commentNickname", ""), "avatar": cmt.get("commentHeadurl", ""), "content": cmt.get("commentContent", ""), "create_time_unix": create_ts, "create_time": readable_time, "is_author": is_author, "like_count": cmt.get("commentLikeCount", 0) or 0 } result.append(entry) # 递归处理二级评论(levelTwoComment) # 关键:二级评论的 parent_id 应该是当前这条评论的 comment_id level_two = cmt.get("levelTwoComment", []) or [] if level_two and isinstance(level_two, list) and len(level_two) > 0: # 当前评论的 ID 作为其子评论的 parent_id current_comment_id = cmt.get("commentId", "") result.extend( self._extract_comments(level_two, parent_id=current_comment_id, work_info=work_info) ) return result async def auto_reply_private_messages(self, cookies: str) -> dict: """自动回复私信 - 集成自 pw3.py""" print(f"\n{'='*60}") print(f"[{self.platform_name}] 开始自动回复私信") print(f"{'='*60}") try: await self.init_browser() cookie_list = self.parse_cookies(cookies) await self.set_cookies(cookie_list) if not self.page: raise Exception("Page not initialized") # 访问私信页面 await self.page.goto("https://channels.weixin.qq.com/platform/private_msg", timeout=30000) await asyncio.sleep(3) # 检查登录状态 current_url = self.page.url print(f"[{self.platform_name}] 当前 URL: {current_url}") if "login" in current_url: raise Exception("Cookie 已过期,请重新登录") # 等待私信页面加载(使用多个选择器容错) try: await self.page.wait_for_selector('.private-msg-list-header', timeout=15000) except: # 尝试其他选择器 try: await self.page.wait_for_selector('.weui-desktop-tab__navs__inner', timeout=10000) print(f"[{self.platform_name}] 使用备用选择器加载成功") except: # 截图调试 screenshot_path = f"weixin_private_msg_{int(asyncio.get_event_loop().time())}.png" await self.page.screenshot(path=screenshot_path) print(f"[{self.platform_name}] 页面加载失败,截图: {screenshot_path}") raise Exception(f"私信页面加载超时,当前 URL: {current_url}") print(f"[{self.platform_name}] 私信页面加载完成") # 处理两个 tab total_replied = 0 for tab_name in ["打招呼消息", "私信"]: replied_count = await self._process_tab_sessions(tab_name) total_replied += replied_count print(f"[{self.platform_name}] 自动回复完成,共回复 {total_replied} 条消息") return { 'success': True, 'platform': self.platform_name, 'replied_count': total_replied, 'message': f'成功回复 {total_replied} 条私信' } except Exception as e: import traceback traceback.print_exc() return { 'success': False, 'platform': self.platform_name, 'error': str(e) } async def _process_tab_sessions(self, tab_name: str) -> int: """处理指定 tab 下的所有会话""" print(f"\n🔄 正在处理「{tab_name}」中的所有会话...") if not self.page: return 0 replied_count = 0 try: # 点击 tab if tab_name == "私信": tab_link = self.page.locator('.weui-desktop-tab__navs__inner li').first.locator('a') elif tab_name == "打招呼消息": tab_link = self.page.locator('.weui-desktop-tab__navs__inner li').nth(1).locator('a') else: return 0 if await tab_link.is_visible(): await tab_link.click() print(f" ➤ 已点击「{tab_name}」tab") else: print(f" ❌ 「{tab_name}」tab 不可见") return 0 # 等待会话列表加载 try: await self.page.wait_for_function(""" () => { const hasSession = document.querySelectorAll('.session-wrap').length > 0; const hasEmpty = !!document.querySelector('.empty-text'); return hasSession || hasEmpty; } """, timeout=8000) print(" ✅ 会话列表区域已加载") except: print(" ⚠️ 等待会话列表超时,继续尝试读取...") # 获取会话 session_wraps = self.page.locator('.session-wrap') session_count = await session_wraps.count() print(f" 💬 共找到 {session_count} 个会话") if session_count == 0: return 0 # 遍历每个会话 for idx in range(session_count): try: current_sessions = self.page.locator('.session-wrap') if idx >= await current_sessions.count(): break session = current_sessions.nth(idx) user_name = await session.locator('.name').inner_text() last_preview = await session.locator('.feed-info').inner_text() print(f"\n ➤ [{idx+1}/{session_count}] 正在处理: {user_name} | 最后消息: {last_preview}") await session.click() await asyncio.sleep(2) # 提取聊天历史 history = await self._extract_chat_history() need_reply = (not history) or (not history[-1]["is_author"]) if need_reply: reply_text = await self._generate_reply_with_ai(history) if reply_text=="": reply_text = self._generate_reply(history) # # 生成回复 # if history and history[-1]["is_author"]: # reply_text = await self._generate_reply_with_ai(history) # else: # reply_text = self._generate_reply(history) if reply_text: print(f" 📝 回复内容: {reply_text}") try: textarea = self.page.locator('.edit_area').first send_btn = self.page.locator('button:has-text("发送")').first if await textarea.is_visible() and await send_btn.is_visible(): await textarea.fill(reply_text) await asyncio.sleep(0.5) await send_btn.click() print(" ✅ 已发送") replied_count += 1 await asyncio.sleep(1.5) else: print(" ❌ 输入框或发送按钮不可见") except Exception as e: print(f" ❌ 发送失败: {e}") else: print(" ➤ 无需回复") else: print(" ➤ 最后一条是我发的,跳过回复") except Exception as e: print(f" ❌ 处理会话 {idx+1} 时出错: {e}") continue except Exception as e: print(f"❌ 处理「{tab_name}」失败: {e}") return replied_count async def _extract_chat_history(self) -> list: """精准提取聊天记录,区分作者(自己)和用户""" if not self.page: return [] history = [] message_wrappers = self.page.locator('.session-content-wrapper > div:not(.footer) > .text-wrapper') count = await message_wrappers.count() for i in range(count): try: wrapper = message_wrappers.nth(i) # 判断方向 is_right = await wrapper.locator('.content-right').count() > 0 is_left = await wrapper.locator('.content-left').count() > 0 if not (is_left or is_right): continue # 提取消息文本 pre_el = wrapper.locator('pre.message-plain') content = '' if await pre_el.count() > 0: content = await pre_el.inner_text() content = content.strip() if not content: continue # 获取头像 avatar_img = wrapper.locator('.avatar').first avatar_src = '' if await avatar_img.count() > 0: avatar_src = await avatar_img.get_attribute("src") or '' # 右侧 = 作者(自己) is_author = is_right # 获取用户名 if is_left: name_el = wrapper.locator('.profile .name') author_name = '用户' if await name_el.count() > 0: author_name = await name_el.inner_text() else: author_name = "我" history.append({ "author": author_name, "content": content, "is_author": is_author, "avatar": avatar_src }) except Exception as e: print(f" ⚠️ 解析第 {i+1} 条消息失败: {e}") continue return history async def _generate_reply_with_ai(self, chat_history: list) -> str: """使用 AI 生成智能回复""" import requests import json try: # 获取 AI 配置 ai_api_key = os.environ.get('DASHSCOPE_API_KEY', '') ai_base_url = os.environ.get('DASHSCOPE_BASE_URL', 'https://dashscope.aliyuncs.com/compatible-mode/v1') ai_model = os.environ.get('AI_MODEL', 'qwen-plus') if not ai_api_key: print("⚠️ 未配置 AI API Key,使用规则回复") return self._generate_reply(chat_history) # 构建对话上下文 messages = [{"role": "system", "content": "你是一个友好的微信视频号创作者助手,负责回复粉丝私信。请保持简洁、友好、专业的语气。回复长度不超过20字。"}] for msg in chat_history: role = "assistant" if msg["is_author"] else "user" messages.append({ "role": role, "content": msg["content"] }) # 调用 AI API headers = { 'Authorization': f'Bearer {ai_api_key}', 'Content-Type': 'application/json' } payload = { "model": ai_model, "messages": messages, "max_tokens": 150, "temperature": 0.8 } print(" 🤖 正在调用 AI 生成回复...") response = requests.post( f"{ai_base_url}/chat/completions", headers=headers, json=payload, timeout=30 ) if response.status_code != 200: print(f" ⚠️ AI API 返回错误 {response.status_code},使用规则回复") return self._generate_reply(chat_history) result = response.json() ai_reply = result.get('choices', [{}])[0].get('message', {}).get('content', '').strip() if ai_reply: print(f" ✅ AI 生成回复: {ai_reply}") return ai_reply else: print(" ⚠️ AI 返回空内容,使用规则回复") return self._generate_reply(chat_history) except Exception as e: print(f" ⚠️ AI 回复生成失败: {e},使用规则回复") return self._generate_reply(chat_history) def _generate_reply(self, chat_history: list) -> str: """根据完整聊天历史生成回复(规则回复方式)""" if not chat_history: return "你好!感谢联系~" # 检查最后一条是否是作者发的 if chat_history[-1]["is_author"]: return "" # 不回复 # 找最后一条用户消息 last_user_msg = chat_history[-1]["content"] # 简单规则回复 if "谢谢" in last_user_msg or "感谢" in last_user_msg: return "不客气!欢迎常来交流~" elif "你好" in last_user_msg or "在吗" in last_user_msg: return "你好!请问有什么可以帮您的?" elif "视频" in last_user_msg or "怎么拍" in last_user_msg: return "视频是用手机拍摄的,注意光线和稳定哦!" else: return "收到!我会认真阅读您的留言~"