| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217 |
- # -*- coding: utf-8 -*-
- """
- 微信视频号发布器
- 参考: matrix/tencent_uploader/main.py
- """
- import asyncio
- import os
- from datetime import datetime
- from typing import List
- from .base import (
- BasePublisher, PublishParams, PublishResult,
- WorkItem, WorksResult, CommentItem, CommentsResult
- )
- import os
- import time
- def format_short_title(origin_title: str) -> str:
- """
- 格式化短标题
- - 移除特殊字符
- - 长度限制在 6-16 字符
- """
- allowed_special_chars = "《》"":+?%°"
-
- filtered_chars = [
- char if char.isalnum() or char in allowed_special_chars
- else ' ' if char == ',' else ''
- for char in origin_title
- ]
- formatted_string = ''.join(filtered_chars)
-
- if len(formatted_string) > 16:
- formatted_string = formatted_string[:16]
- elif len(formatted_string) < 6:
- formatted_string += ' ' * (6 - len(formatted_string))
-
- return formatted_string
- class WeixinPublisher(BasePublisher):
- """
- 微信视频号发布器
- 使用 Playwright 自动化操作视频号创作者中心
- 注意: 需要使用 Chrome 浏览器,否则可能出现 H264 编码错误
- """
-
- platform_name = "weixin"
- login_url = "https://channels.weixin.qq.com/platform"
- publish_url = "https://channels.weixin.qq.com/platform/post/create"
- cookie_domain = ".weixin.qq.com"
-
- def _parse_count(self, count_str: str) -> int:
- """解析数字(支持带'万'的格式)"""
- try:
- count_str = count_str.strip()
- if '万' in count_str:
- return int(float(count_str.replace('万', '')) * 10000)
- return int(count_str)
- except:
- return 0
-
- async def init_browser(self, storage_state: str = None):
- """初始化浏览器 - 参考 matrix 使用 channel=chrome 避免 H264 编码错误"""
- from playwright.async_api import async_playwright
-
- playwright = await async_playwright().start()
-
- # 参考 matrix: 使用系统内的 Chrome 浏览器,避免 H264 编码错误
- # 如果没有安装 Chrome,则使用默认 Chromium
- try:
- self.browser = await playwright.chromium.launch(
- headless=self.headless,
- channel="chrome" # 使用系统 Chrome
- )
- print(f"[{self.platform_name}] 使用系统 Chrome 浏览器")
- except Exception as e:
- print(f"[{self.platform_name}] Chrome 不可用,使用 Chromium: {e}")
- self.browser = await playwright.chromium.launch(headless=self.headless)
-
- # 设置 HTTP Headers 防止重定向
- headers = {
- "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
- "Referer": "https://channels.weixin.qq.com/platform/post/list",
- }
-
- self.context = await self.browser.new_context(
- extra_http_headers=headers,
- ignore_https_errors=True,
- viewport={"width": 1920, "height": 1080},
- user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36"
- )
-
- self.page = await self.context.new_page()
- return self.page
-
- async def set_schedule_time(self, publish_date: datetime):
- """设置定时发布"""
- if not self.page:
- return
-
- print(f"[{self.platform_name}] 设置定时发布...")
-
- # 点击定时选项
- label_element = self.page.locator("label").filter(has_text="定时").nth(1)
- await label_element.click()
-
- # 选择日期
- await self.page.click('input[placeholder="请选择发表时间"]')
-
- publish_month = f"{publish_date.month:02d}"
- current_month = f"{publish_month}月"
-
- # 检查月份
- page_month = await self.page.inner_text('span.weui-desktop-picker__panel__label:has-text("月")')
- if page_month != current_month:
- await self.page.click('button.weui-desktop-btn__icon__right')
-
- # 选择日期
- elements = await self.page.query_selector_all('table.weui-desktop-picker__table a')
- for element in elements:
- class_name = await element.evaluate('el => el.className')
- if 'weui-desktop-picker__disabled' in class_name:
- continue
- text = await element.inner_text()
- if text.strip() == str(publish_date.day):
- await element.click()
- break
-
- # 输入时间
- await self.page.click('input[placeholder="请选择时间"]')
- await self.page.keyboard.press("Control+KeyA")
- await self.page.keyboard.type(str(publish_date.hour))
-
- # 点击其他地方确认
- await self.page.locator("div.input-editor").click()
-
- async def handle_upload_error(self, video_path: str):
- """处理上传错误"""
- if not self.page:
- return
-
- print(f"[{self.platform_name}] 视频出错了,重新上传中...")
- await self.page.locator('div.media-status-content div.tag-inner:has-text("删除")').click()
- await self.page.get_by_role('button', name="删除", exact=True).click()
- file_input = self.page.locator('input[type="file"]')
- await file_input.set_input_files(video_path)
-
- async def add_title_tags(self, params: PublishParams):
- """添加标题和话题"""
- if not self.page:
- return
-
- await self.page.locator("div.input-editor").click()
- await self.page.keyboard.type(params.title)
-
- if params.tags:
- await self.page.keyboard.press("Enter")
- for tag in params.tags:
- await self.page.keyboard.type("#" + tag)
- await self.page.keyboard.press("Space")
-
- print(f"[{self.platform_name}] 成功添加标题和 {len(params.tags)} 个话题")
-
- async def add_short_title(self):
- """添加短标题"""
- if not self.page:
- return
-
- try:
- short_title_element = self.page.get_by_text("短标题", exact=True).locator("..").locator(
- "xpath=following-sibling::div").locator('span input[type="text"]')
- if await short_title_element.count():
- # 获取已有内容作为短标题
- pass
- except:
- pass
-
- async def upload_cover(self, cover_path: str):
- """上传封面图"""
- if not self.page or not cover_path or not os.path.exists(cover_path):
- return
-
- try:
- await asyncio.sleep(2)
- preview_btn_info = await self.page.locator(
- 'div.finder-tag-wrap.btn:has-text("更换封面")').get_attribute('class')
-
- if "disabled" not in preview_btn_info:
- await self.page.locator('div.finder-tag-wrap.btn:has-text("更换封面")').click()
- await self.page.locator('div.single-cover-uploader-wrap > div.wrap').hover()
-
- # 删除现有封面
- if await self.page.locator(".del-wrap > .svg-icon").count():
- await self.page.locator(".del-wrap > .svg-icon").click()
-
- # 上传新封面
- preview_div = self.page.locator("div.single-cover-uploader-wrap > div.wrap")
- async with self.page.expect_file_chooser() as fc_info:
- await preview_div.click()
- preview_chooser = await fc_info.value
- await preview_chooser.set_files(cover_path)
-
- await asyncio.sleep(2)
- await self.page.get_by_role("button", name="确定").click()
- await asyncio.sleep(1)
- await self.page.get_by_role("button", name="确认").click()
-
- print(f"[{self.platform_name}] 封面上传成功")
- except Exception as e:
- print(f"[{self.platform_name}] 封面上传失败: {e}")
-
- async def check_captcha(self) -> dict:
- """检查页面是否需要验证码"""
- if not self.page:
- return {'need_captcha': False, 'captcha_type': ''}
-
- try:
- # 检查各种验证码
- captcha_selectors = [
- 'text="请输入验证码"',
- 'text="滑动验证"',
- '[class*="captcha"]',
- '[class*="verify"]',
- ]
- for selector in captcha_selectors:
- try:
- if await self.page.locator(selector).count() > 0:
- print(f"[{self.platform_name}] 检测到验证码: {selector}")
- return {'need_captcha': True, 'captcha_type': 'image'}
- except:
- pass
-
- # 检查登录弹窗
- login_selectors = [
- 'text="请登录"',
- 'text="扫码登录"',
- '[class*="login-dialog"]',
- ]
- for selector in login_selectors:
- try:
- if await self.page.locator(selector).count() > 0:
- print(f"[{self.platform_name}] 检测到需要登录: {selector}")
- return {'need_captcha': True, 'captcha_type': 'login'}
- except:
- pass
-
- except Exception as e:
- print(f"[{self.platform_name}] 验证码检测异常: {e}")
-
- return {'need_captcha': False, 'captcha_type': ''}
- async def publish(self, cookies: str, params: PublishParams) -> PublishResult:
- """发布视频到视频号"""
- print(f"\n{'='*60}")
- print(f"[{self.platform_name}] 开始发布视频")
- print(f"[{self.platform_name}] 视频路径: {params.video_path}")
- print(f"[{self.platform_name}] 标题: {params.title}")
- print(f"[{self.platform_name}] Headless: {self.headless}")
- print(f"{'='*60}")
-
- self.report_progress(5, "正在初始化浏览器...")
-
- # 初始化浏览器(使用 Chrome)
- await self.init_browser()
- print(f"[{self.platform_name}] 浏览器初始化完成")
-
- # 解析并设置 cookies
- cookie_list = self.parse_cookies(cookies)
- print(cookie_list)
- print(f"[{self.platform_name}] 解析到 {len(cookie_list)} 个 cookies")
- await self.set_cookies(cookie_list)
-
- if not self.page:
- raise Exception("Page not initialized")
-
- # 检查视频文件
- if not os.path.exists(params.video_path):
- raise Exception(f"视频文件不存在: {params.video_path}")
-
- print(f"[{self.platform_name}] 视频文件存在,大小: {os.path.getsize(params.video_path)} bytes")
-
- self.report_progress(10, "正在打开上传页面...")
- print(f"[{self.platform_name}] 当前 发布URL: {self.publish_url}")
- # 访问上传页面
- await self.page.goto(self.publish_url, wait_until="networkidle", timeout=60000)
- await asyncio.sleep(10)
- # 打印页面HTML调试
- print(f"[{self.platform_name}] 当前 URL: {self.page.url}")
- html_content = await self.page.content()
- print(f"[{self.platform_name}] 页面HTML长度: {len(html_content)}")
-
- # 截图调试
- screenshot_path = f"weixin_publish_{int(asyncio.get_event_loop().time())}.png"
- await self.page.screenshot(path=screenshot_path)
- print(f"[{self.platform_name}] 截图已保存: {screenshot_path}")
-
- # 检查 input[type='file'] 是否存在
- file_input = self.page.locator("input[type='file']")
- count = await file_input.count()
- print(f"[{self.platform_name}] 找到 {count} 个 file input")
-
- if count == 0:
- raise Exception("页面中未找到 input[type='file'] 元素")
-
- # 直接设置文件,不触发click
- print("上传文件...")
- file_path = params.video_path
- await file_input.first.set_input_files(file_path)
- print(f"[{self.platform_name}] 文件已设置: {file_path}")
-
- # 等待上传进度
- await asyncio.sleep(5)
-
- # 等待删除标签弹窗可见(可选,设置超时)
- try:
- await self.page.wait_for_selector(".weui-desktop-popover__wrp.finder-popover-dialog-wrap .finder-tag-wrap", state="visible", timeout=20000)
- print("删除标签弹窗已显示")
- except:
- print("删除标签弹窗未出现,继续执行")
-
- # 主动关闭系统文件选择窗口(如果还存在)
- try:
- # 获取所有窗口
- context_pages = self.page.context.pages
- for p in context_pages:
- if p != self.page and "打开" in await p.title():
- print(f"关闭系统文件选择窗口: {await p.title()}")
- await p.close()
- except Exception as e:
- print(f"关闭文件选择窗口异常: {e}")
-
-
- # 填写多个输入框
- print("填写输入框...")
- # 描述输入框
- await self.page.locator("div.input-editor[contenteditable][data-placeholder='添加描述']").fill("智能拍照机来啦")
-
- # 短标题输入框
- await self.page.fill("input.weui-desktop-form__input[placeholder*='概括视频主要内容']", "解放双手的智能拍照机")
- await self.page.wait_for_timeout(1000)
- # 点击最下方的发布按钮
- print("点击发布按钮...")
- await self.page.click("button.weui-desktop-btn.weui-desktop-btn_primary:has-text('发表')")
-
- # 监控是否出现"直接发表"按钮
- try:
- direct_publish_btn = self.page.locator("button.weui-desktop-btn.weui-desktop-btn_default:has-text('直接发表')")
- await direct_publish_btn.wait_for(state="visible", timeout=3000)
- print("检测到'直接发表'按钮,点击...")
- await direct_publish_btn.click()
- except:
- print("未检测到'直接发表'按钮,继续...")
-
-
- # 等待发布完成
- await self.page.wait_for_timeout(3000)
- print("发布完成!")
- return PublishResult(
- success=True,
- platform=self.platform_name,
- message="发布成功",
- screenshot_base64="",
- page_url=self.publish_url,
- status='success'
- )
-
- # 检查是否跳转到登录页
- current_url = self.page.url
- print(f"[{self.platform_name}] 当前页面: {current_url}")
-
- if "login" in current_url:
- screenshot_base64 = await self.capture_screenshot()
- return PublishResult(
- success=False,
- platform=self.platform_name,
- error="Cookie 已过期,需要重新登录",
- need_captcha=True,
- captcha_type='login',
- screenshot_base64=screenshot_base64,
- page_url=current_url,
- status='need_captcha'
- )
-
- # 使用 AI 检查验证码
- ai_captcha = await self.ai_check_captcha()
- if ai_captcha['has_captcha']:
- print(f"[{self.platform_name}] AI检测到验证码: {ai_captcha['captcha_type']}", flush=True)
- screenshot_base64 = await self.capture_screenshot()
- return PublishResult(
- success=False,
- platform=self.platform_name,
- error=f"检测到{ai_captcha['captcha_type']}验证码,需要使用有头浏览器完成验证",
- need_captcha=True,
- captcha_type=ai_captcha['captcha_type'],
- screenshot_base64=screenshot_base64,
- page_url=current_url,
- status='need_captcha'
- )
-
- # 传统方式检查验证码
- captcha_result = await self.check_captcha()
- if captcha_result['need_captcha']:
- screenshot_base64 = await self.capture_screenshot()
- return PublishResult(
- success=False,
- platform=self.platform_name,
- error=f"需要{captcha_result['captcha_type']}验证码,请使用有头浏览器完成验证",
- need_captcha=True,
- captcha_type=captcha_result['captcha_type'],
- screenshot_base64=screenshot_base64,
- page_url=current_url,
- status='need_captcha'
- )
-
- self.report_progress(15, "正在选择视频文件...")
-
- # 上传视频 - 参考 matrix/tencent_uploader/main.py
- # matrix 使用: div.upload-content 点击后触发文件选择器
- upload_success = False
-
- # 方法1: 参考 matrix - 点击 div.upload-content
- try:
- upload_div = self.page.locator("div.upload-content")
- if await upload_div.count() > 0:
- print(f"[{self.platform_name}] 找到 upload-content 上传区域")
- async with self.page.expect_file_chooser(timeout=10000) as fc_info:
- await upload_div.click()
- file_chooser = await fc_info.value
- await file_chooser.set_files(params.video_path)
- upload_success = True
- print(f"[{self.platform_name}] 通过 upload-content 上传成功")
- except Exception as e:
- print(f"[{self.platform_name}] upload-content 上传失败: {e}")
-
- # 方法2: 尝试其他选择器
- if not upload_success:
- upload_selectors = [
- 'div[class*="upload-area"]',
- 'div[class*="drag-upload"]',
- 'div.add-wrap',
- '[class*="uploader"]',
- ]
-
- for selector in upload_selectors:
- if upload_success:
- break
- try:
- upload_area = self.page.locator(selector).first
- if await upload_area.count() > 0:
- print(f"[{self.platform_name}] 尝试点击上传区域: {selector}")
- async with self.page.expect_file_chooser(timeout=10000) as fc_info:
- await upload_area.click()
- file_chooser = await fc_info.value
- await file_chooser.set_files(params.video_path)
- upload_success = True
- print(f"[{self.platform_name}] 通过点击上传区域成功")
- break
- except Exception as e:
- print(f"[{self.platform_name}] 选择器 {selector} 失败: {e}")
-
- # 方法3: 直接设置 file input
- if not upload_success:
- try:
- file_input = self.page.locator('input[type="file"]')
- if await file_input.count() > 0:
- await file_input.first.set_input_files(params.video_path)
- upload_success = True
- print(f"[{self.platform_name}] 通过 file input 上传成功")
- except Exception as e:
- print(f"[{self.platform_name}] file input 上传失败: {e}")
-
- if not upload_success:
- screenshot_base64 = await self.capture_screenshot()
- return PublishResult(
- success=False,
- platform=self.platform_name,
- error="未找到上传入口",
- screenshot_base64=screenshot_base64,
- page_url=await self.get_page_url(),
- status='failed'
- )
-
- self.report_progress(20, "正在填充标题和话题...")
-
- # 添加标题和话题
- await self.add_title_tags(params)
-
- self.report_progress(30, "等待视频上传完成...")
-
- # 等待上传完成
- for _ in range(120):
- try:
- button_info = await self.page.get_by_role("button", name="发表").get_attribute('class')
- if "weui-desktop-btn_disabled" not in button_info:
- print(f"[{self.platform_name}] 视频上传完毕")
-
- # 上传封面
- self.report_progress(50, "正在上传封面...")
- await self.upload_cover(params.cover_path)
- break
- else:
- # 检查上传错误
- if await self.page.locator('div.status-msg.error').count():
- if await self.page.locator('div.media-status-content div.tag-inner:has-text("删除")').count():
- await self.handle_upload_error(params.video_path)
-
- await asyncio.sleep(3)
- except:
- await asyncio.sleep(3)
-
- self.report_progress(60, "处理视频设置...")
-
- # 添加短标题
- try:
- short_title_el = self.page.get_by_text("短标题", exact=True).locator("..").locator(
- "xpath=following-sibling::div").locator('span input[type="text"]')
- if await short_title_el.count():
- short_title = format_short_title(params.title)
- await short_title_el.fill(short_title)
- except:
- pass
-
- # 定时发布
- if params.publish_date:
- self.report_progress(70, "设置定时发布...")
- await self.set_schedule_time(params.publish_date)
-
- self.report_progress(80, "正在发布...")
-
- # 点击发布 - 参考 matrix
- for i in range(30):
- try:
- # 参考 matrix: div.form-btns button:has-text("发表")
- publish_btn = self.page.locator('div.form-btns button:has-text("发表")')
- if await publish_btn.count():
- print(f"[{self.platform_name}] 点击发布按钮...")
- await publish_btn.click()
-
- # 等待跳转到作品列表页面 - 参考 matrix
- await self.page.wait_for_url(
- "https://channels.weixin.qq.com/platform/post/list",
- timeout=10000
- )
- self.report_progress(100, "发布成功")
- print(f"[{self.platform_name}] 视频发布成功!")
- screenshot_base64 = await self.capture_screenshot()
- return PublishResult(
- success=True,
- platform=self.platform_name,
- message="发布成功",
- screenshot_base64=screenshot_base64,
- page_url=self.page.url,
- status='success'
- )
- except Exception as e:
- current_url = self.page.url
- if "https://channels.weixin.qq.com/platform/post/list" in current_url:
- self.report_progress(100, "发布成功")
- print(f"[{self.platform_name}] 视频发布成功!")
- screenshot_base64 = await self.capture_screenshot()
- return PublishResult(
- success=True,
- platform=self.platform_name,
- message="发布成功",
- screenshot_base64=screenshot_base64,
- page_url=current_url,
- status='success'
- )
- else:
- print(f"[{self.platform_name}] 视频正在发布中... {i+1}/30, URL: {current_url}")
- await asyncio.sleep(1)
-
- # 发布超时
- screenshot_base64 = await self.capture_screenshot()
- page_url = await self.get_page_url()
- return PublishResult(
- success=False,
- platform=self.platform_name,
- error="发布超时,请检查发布状态",
- screenshot_base64=screenshot_base64,
- page_url=page_url,
- status='need_action'
- )
- async def get_works(self, cookies: str, page: int = 0, page_size: int = 20) -> WorksResult:
- print(f"1111111111111111111")
- """获取视频号作品列表"""
- print(f"\n{'='*60}")
- print(f"[{self.platform_name}] 获取作品列表")
- print(f"[{self.platform_name}] page={page}, page_size={page_size}")
- print(f"{'='*60}")
-
- works: List[WorkItem] = []
- total = 0
- has_more = False
-
- try:
- await self.init_browser()
- cookie_list = self.parse_cookies(cookies)
- await self.set_cookies(cookie_list)
-
- if not self.page:
- raise Exception("Page not initialized")
-
- # 访问视频号创作者中心
- await self.page.goto("https://channels.weixin.qq.com/platform/post/list")
- await asyncio.sleep(5)
- print(f"1111111111111111")
- # 检查登录状态
- current_url = self.page.url
- if "login" in current_url:
- print(f"2111111111111111")
- raise Exception("Cookie 已过期,请重新登录")
-
- # 视频号使用页面爬取方式获取作品列表
- # 等待作品列表加载(增加等待时间,并添加截图调试)
- try:
- await self.page.wait_for_selector('div.post-feed-item', timeout=15000)
- except:
- # 超时后打印当前 URL 和截图
- current_url = self.page.url
- print(f"[{self.platform_name}] 等待超时,当前 URL: {current_url}")
- # 截图保存
- screenshot_path = f"weixin_timeout_{int(asyncio.get_event_loop().time())}.png"
- await self.page.screenshot(path=screenshot_path)
- print(f"[{self.platform_name}] 截图已保存: {screenshot_path}")
- raise Exception(f"页面加载超时,当前 URL: {current_url}")
-
- # 打印 DOM 结构
- page_html = await self.page.content()
- print(f"[{self.platform_name}] ========== 页面 DOM 开始 ==========")
- print(page_html[:5000]) # 打印前5000个字符
- print(f"[{self.platform_name}] ========== 页面 DOM 结束 ==========")
-
- # 获取所有作品项
- post_items = self.page.locator('div.post-feed-item')
- item_count = await post_items.count()
-
- print(f"[{self.platform_name}] 找到 {item_count} 个作品项")
-
- for i in range(min(item_count, page_size)):
- try:
- item = post_items.nth(i)
-
- # 获取封面
- cover_el = item.locator('div.media img.thumb').first
- cover_url = ''
- if await cover_el.count() > 0:
- cover_url = await cover_el.get_attribute('src') or ''
-
- # 获取标题
- title_el = item.locator('div.post-title').first
- title = ''
- if await title_el.count() > 0:
- title = await title_el.text_content() or ''
- title = title.strip()
-
- # 获取发布时间
- time_el = item.locator('div.post-time span').first
- publish_time = ''
- if await time_el.count() > 0:
- publish_time = await time_el.text_content() or ''
- publish_time = publish_time.strip()
-
- # 获取统计数据
- import re
- data_items = item.locator('div.post-data div.data-item')
- data_count = await data_items.count()
-
- play_count = 0
- like_count = 0
- comment_count = 0
- share_count = 0
- collect_count = 0
-
- for j in range(data_count):
- data_item = data_items.nth(j)
- count_text = await data_item.locator('span.count').text_content() or '0'
- count_text = count_text.strip()
-
- # 判断图标类型
- if await data_item.locator('span.weui-icon-outlined-eyes-on').count() > 0:
- # 播放量
- play_count = self._parse_count(count_text)
- elif await data_item.locator('span.weui-icon-outlined-like').count() > 0:
- # 点赞
- like_count = self._parse_count(count_text)
- elif await data_item.locator('span.weui-icon-outlined-comment').count() > 0:
- # 评论
- comment_count = self._parse_count(count_text)
- elif await data_item.locator('use[xlink\\:href="#icon-share"]').count() > 0:
- # 分享
- share_count = self._parse_count(count_text)
- elif await data_item.locator('use[xlink\\:href="#icon-thumb"]').count() > 0:
- # 收藏
- collect_count = self._parse_count(count_text)
-
- # 生成临时 work_id
- work_id = f"weixin_{i}_{hash(title)}_{hash(publish_time)}"
-
- works.append(WorkItem(
- work_id=work_id,
- title=title or '无标题',
- cover_url=cover_url,
- duration=0,
- status='published',
- publish_time=publish_time,
- play_count=play_count,
- like_count=like_count,
- comment_count=comment_count,
- share_count=share_count,
- collect_count=collect_count,
- ))
- except Exception as e:
- print(f"[{self.platform_name}] 解析作品 {i} 失败: {e}")
- import traceback
- traceback.print_exc()
- continue
-
- total = len(works)
- has_more = item_count > page_size
- print(f"[{self.platform_name}] 获取到 {total} 个作品")
-
- except Exception as e:
- import traceback
- traceback.print_exc()
- return WorksResult(success=False, platform=self.platform_name, error=str(e))
-
- return WorksResult(success=True, platform=self.platform_name, works=works, total=total, has_more=has_more)
-
- async def get_comments(self, cookies: str, work_id: str, cursor: str = "") -> CommentsResult:
- """获取视频号作品评论"""
- print(f"\n{'='*60}")
- print(f"[{self.platform_name}] 获取作品评论")
- print(f"[{self.platform_name}] work_id={work_id}")
- print(f"{'='*60}")
-
- comments: List[CommentItem] = []
- total = 0
- has_more = False
-
- try:
- await self.init_browser()
- cookie_list = self.parse_cookies(cookies)
- await self.set_cookies(cookie_list)
-
- if not self.page:
- raise Exception("Page not initialized")
-
- # 访问评论管理页面
- await self.page.goto("https://channels.weixin.qq.com/platform/interaction/comment")
- await asyncio.sleep(3)
-
- # 检查登录状态
- current_url = self.page.url
- if "login" in current_url:
- raise Exception("Cookie 已过期,请重新登录")
-
- # 等待左侧作品列表加载
- try:
- await self.page.wait_for_selector('div.comment-feed-wrap', timeout=15000)
- except:
- print(f"[{self.platform_name}] 未找到作品列表")
- return CommentsResult(success=True, platform=self.platform_name, work_id=work_id, comments=[], total=0, has_more=False)
-
- print(f"[{self.platform_name}] 查找 work_id={work_id} 对应的作品")
-
- # 点击左侧作品项(根据 work_id 匹配)
- feed_items = self.page.locator('div.comment-feed-wrap')
- item_count = await feed_items.count()
- print(f"[{self.platform_name}] 左侧共 {item_count} 个作品")
-
- clicked = False
- for i in range(item_count):
- feed = feed_items.nth(i)
- title_el = feed.locator('div.feed-title').first
- if await title_el.count() > 0:
- title_text = await title_el.text_content() or ''
- title_text = title_text.strip()
-
- # 检查是否包含 work_id(标题)
- if work_id in title_text or title_text in work_id:
- print(f"[{self.platform_name}] 找到匹配作品: {title_text}")
- await feed.click()
- await asyncio.sleep(2)
- clicked = True
- break
-
- if not clicked:
- # 如果没找到匹配的,点击第一个
- print(f"[{self.platform_name}] 未找到匹配作品,点击第一个")
- if item_count > 0:
- await feed_items.nth(0).click()
- await asyncio.sleep(2)
- else:
- return CommentsResult(success=True, platform=self.platform_name, work_id=work_id, comments=[], total=0, has_more=False)
-
- # 等待右侧评论详情加载
- try:
- await self.page.wait_for_selector('div.comment-item', timeout=5000)
- except:
- print(f"[{self.platform_name}] 该作品暂无评论")
- return CommentsResult(success=True, platform=self.platform_name, work_id=work_id, comments=[], total=0, has_more=False)
-
- # 获取评论总数
- total_text_el = self.page.locator('div.comment-count__tips')
- if await total_text_el.count() > 0:
- total_text = await total_text_el.text_content() or ''
- # 提取数字(如 "共 1 条评论")
- import re
- match = re.search(r'(\d+)', total_text)
- if match:
- total = int(match.group(1))
-
- print(f"[{self.platform_name}] 评论总数: {total}")
-
- # 获取右侧评论列表
- comment_items = self.page.locator('div.comment-item')
- item_count = await comment_items.count()
-
- print(f"[{self.platform_name}] 当前加载 {item_count} 条评论")
-
- for i in range(item_count):
- try:
- item = comment_items.nth(i)
-
- # 获取作者昵称(加 .first 防 strict mode)
- author_name = ''
- name_el = item.locator('span.comment-user-name').first
- if await name_el.count() > 0:
- author_name = await name_el.text_content() or ''
- author_name = author_name.strip()
-
- # 获取头像
- author_avatar = ''
- avatar_el = item.locator('img.comment-avatar').first
- if await avatar_el.count() > 0:
- author_avatar = await avatar_el.get_attribute('src') or ''
-
- # 获取评论内容(加 .first 防 strict mode)
- content = ''
- content_el = item.locator('span.comment-content').first
- if await content_el.count() > 0:
- content = await content_el.text_content() or ''
- content = content.strip()
-
- # 获取评论时间(加 .first 防 strict mode)
- create_time = ''
- time_el = item.locator('span.comment-time').first
- if await time_el.count() > 0:
- create_time = await time_el.text_content() or ''
- create_time = create_time.strip()
-
- if not content:
- continue
-
- # 生成评论 ID
- comment_id = f"weixin_comment_{i}_{abs(hash(content))}"
-
- comments.append(CommentItem(
- comment_id=comment_id,
- work_id=work_id,
- content=content,
- author_id='',
- author_name=author_name,
- author_avatar=author_avatar,
- like_count=0,
- reply_count=0,
- create_time=create_time,
- ))
-
- print(f"[{self.platform_name}] 评论 {i+1}: {author_name} - {content[:20]}...")
-
- except Exception as e:
- print(f"[{self.platform_name}] 解析评论 {i} 失败: {e}")
- continue
-
- print(f"[{self.platform_name}] 成功获取 {len(comments)} 条评论")
-
- except Exception as e:
- import traceback
- traceback.print_exc()
- return CommentsResult(success=False, platform=self.platform_name, work_id=work_id, error=str(e))
-
- return CommentsResult(success=True, platform=self.platform_name, work_id=work_id, comments=comments, total=total, has_more=has_more)
-
- async def auto_reply_private_messages(self, cookies: str) -> dict:
- """自动回复私信 - 集成自 pw3.py"""
- print(f"\n{'='*60}")
- print(f"[{self.platform_name}] 开始自动回复私信")
- print(f"{'='*60}")
-
- try:
- await self.init_browser()
- cookie_list = self.parse_cookies(cookies)
- await self.set_cookies(cookie_list)
-
- if not self.page:
- raise Exception("Page not initialized")
-
- # 访问私信页面
- await self.page.goto("https://channels.weixin.qq.com/platform/private_msg", timeout=30000)
- await asyncio.sleep(3)
-
- # 检查登录状态
- current_url = self.page.url
- print(f"[{self.platform_name}] 当前 URL: {current_url}")
-
- if "login" in current_url:
- raise Exception("Cookie 已过期,请重新登录")
-
- # 等待私信页面加载(使用多个选择器容错)
- try:
- await self.page.wait_for_selector('.private-msg-list-header', timeout=15000)
- except:
- # 尝试其他选择器
- try:
- await self.page.wait_for_selector('.weui-desktop-tab__navs__inner', timeout=10000)
- print(f"[{self.platform_name}] 使用备用选择器加载成功")
- except:
- # 截图调试
- screenshot_path = f"weixin_private_msg_{int(asyncio.get_event_loop().time())}.png"
- await self.page.screenshot(path=screenshot_path)
- print(f"[{self.platform_name}] 页面加载失败,截图: {screenshot_path}")
- raise Exception(f"私信页面加载超时,当前 URL: {current_url}")
-
- print(f"[{self.platform_name}] 私信页面加载完成")
-
- # 处理两个 tab
- total_replied = 0
- for tab_name in ["打招呼消息", "私信"]:
- replied_count = await self._process_tab_sessions(tab_name)
- total_replied += replied_count
-
- print(f"[{self.platform_name}] 自动回复完成,共回复 {total_replied} 条消息")
-
- return {
- 'success': True,
- 'platform': self.platform_name,
- 'replied_count': total_replied,
- 'message': f'成功回复 {total_replied} 条私信'
- }
-
- except Exception as e:
- import traceback
- traceback.print_exc()
- return {
- 'success': False,
- 'platform': self.platform_name,
- 'error': str(e)
- }
-
- async def _process_tab_sessions(self, tab_name: str) -> int:
- """处理指定 tab 下的所有会话"""
- print(f"\n🔄 正在处理「{tab_name}」中的所有会话...")
-
- if not self.page:
- return 0
-
- replied_count = 0
-
- try:
- # 点击 tab
- if tab_name == "私信":
- tab_link = self.page.locator('.weui-desktop-tab__navs__inner li').first.locator('a')
- elif tab_name == "打招呼消息":
- tab_link = self.page.locator('.weui-desktop-tab__navs__inner li').nth(1).locator('a')
- else:
- return 0
-
- if await tab_link.is_visible():
- await tab_link.click()
- print(f" ➤ 已点击「{tab_name}」tab")
- else:
- print(f" ❌ 「{tab_name}」tab 不可见")
- return 0
-
- # 等待会话列表加载
- try:
- await self.page.wait_for_function("""
- () => {
- const hasSession = document.querySelectorAll('.session-wrap').length > 0;
- const hasEmpty = !!document.querySelector('.empty-text');
- return hasSession || hasEmpty;
- }
- """, timeout=8000)
- print(" ✅ 会话列表区域已加载")
- except:
- print(" ⚠️ 等待会话列表超时,继续尝试读取...")
-
- # 获取会话
- session_wraps = self.page.locator('.session-wrap')
- session_count = await session_wraps.count()
- print(f" 💬 共找到 {session_count} 个会话")
-
- if session_count == 0:
- return 0
-
- # 遍历每个会话
- for idx in range(session_count):
- try:
- current_sessions = self.page.locator('.session-wrap')
- if idx >= await current_sessions.count():
- break
-
- session = current_sessions.nth(idx)
- user_name = await session.locator('.name').inner_text()
- last_preview = await session.locator('.feed-info').inner_text()
- print(f"\n ➤ [{idx+1}/{session_count}] 正在处理: {user_name} | 最后消息: {last_preview}")
-
- await session.click()
- await asyncio.sleep(2)
-
- # 提取聊天历史
- history = await self._extract_chat_history()
- need_reply = (not history) or (not history[-1]["is_author"])
-
- if need_reply:
- reply_text = await self._generate_reply_with_ai(history)
- if reply_text=="":
- reply_text = self._generate_reply(history)
- # # 生成回复
- # if history and history[-1]["is_author"]:
- # reply_text = await self._generate_reply_with_ai(history)
- # else:
- # reply_text = self._generate_reply(history)
-
- if reply_text:
- print(f" 📝 回复内容: {reply_text}")
- try:
- textarea = self.page.locator('.edit_area').first
- send_btn = self.page.locator('button:has-text("发送")').first
- if await textarea.is_visible() and await send_btn.is_visible():
- await textarea.fill(reply_text)
- await asyncio.sleep(0.5)
- await send_btn.click()
- print(" ✅ 已发送")
- replied_count += 1
- await asyncio.sleep(1.5)
- else:
- print(" ❌ 输入框或发送按钮不可见")
- except Exception as e:
- print(f" ❌ 发送失败: {e}")
- else:
- print(" ➤ 无需回复")
- else:
- print(" ➤ 最后一条是我发的,跳过回复")
-
- except Exception as e:
- print(f" ❌ 处理会话 {idx+1} 时出错: {e}")
- continue
-
- except Exception as e:
- print(f"❌ 处理「{tab_name}」失败: {e}")
-
- return replied_count
-
- async def _extract_chat_history(self) -> list:
- """精准提取聊天记录,区分作者(自己)和用户"""
- if not self.page:
- return []
-
- history = []
- message_wrappers = self.page.locator('.session-content-wrapper > div:not(.footer) > .text-wrapper')
- count = await message_wrappers.count()
-
- for i in range(count):
- try:
- wrapper = message_wrappers.nth(i)
- # 判断方向
- is_right = await wrapper.locator('.content-right').count() > 0
- is_left = await wrapper.locator('.content-left').count() > 0
-
- if not (is_left or is_right):
- continue
-
- # 提取消息文本
- pre_el = wrapper.locator('pre.message-plain')
- content = ''
- if await pre_el.count() > 0:
- content = await pre_el.inner_text()
- content = content.strip()
-
- if not content:
- continue
-
- # 获取头像
- avatar_img = wrapper.locator('.avatar').first
- avatar_src = ''
- if await avatar_img.count() > 0:
- avatar_src = await avatar_img.get_attribute("src") or ''
-
- # 右侧 = 作者(自己)
- is_author = is_right
-
- # 获取用户名
- if is_left:
- name_el = wrapper.locator('.profile .name')
- author_name = '用户'
- if await name_el.count() > 0:
- author_name = await name_el.inner_text()
- else:
- author_name = "我"
-
- history.append({
- "author": author_name,
- "content": content,
- "is_author": is_author,
- "avatar": avatar_src
- })
- except Exception as e:
- print(f" ⚠️ 解析第 {i+1} 条消息失败: {e}")
- continue
-
- return history
-
- async def _generate_reply_with_ai(self, chat_history: list) -> str:
- """使用 AI 生成智能回复"""
- import requests
- import json
-
- try:
- # 获取 AI 配置
- ai_api_key = os.environ.get('DASHSCOPE_API_KEY', '')
- ai_base_url = os.environ.get('DASHSCOPE_BASE_URL', 'https://dashscope.aliyuncs.com/compatible-mode/v1')
- ai_model = os.environ.get('AI_MODEL', 'qwen-plus')
-
-
- if not ai_api_key:
- print("⚠️ 未配置 AI API Key,使用规则回复")
- return self._generate_reply(chat_history)
-
- # 构建对话上下文
- messages = [{"role": "system", "content": "你是一个友好的微信视频号创作者助手,负责回复粉丝私信。请保持简洁、友好、专业的语气。回复长度不超过20字。"}]
-
- for msg in chat_history:
- role = "assistant" if msg["is_author"] else "user"
- messages.append({
- "role": role,
- "content": msg["content"]
- })
-
- # 调用 AI API
- headers = {
- 'Authorization': f'Bearer {ai_api_key}',
- 'Content-Type': 'application/json'
- }
-
- payload = {
- "model": ai_model,
- "messages": messages,
- "max_tokens": 150,
- "temperature": 0.8
- }
-
- print(" 🤖 正在调用 AI 生成回复...")
- response = requests.post(
- f"{ai_base_url}/chat/completions",
- headers=headers,
- json=payload,
- timeout=30
- )
-
- if response.status_code != 200:
- print(f" ⚠️ AI API 返回错误 {response.status_code},使用规则回复")
- return self._generate_reply(chat_history)
-
- result = response.json()
- ai_reply = result.get('choices', [{}])[0].get('message', {}).get('content', '').strip()
-
- if ai_reply:
- print(f" ✅ AI 生成回复: {ai_reply}")
- return ai_reply
- else:
- print(" ⚠️ AI 返回空内容,使用规则回复")
- return self._generate_reply(chat_history)
-
- except Exception as e:
- print(f" ⚠️ AI 回复生成失败: {e},使用规则回复")
- return self._generate_reply(chat_history)
-
- def _generate_reply(self, chat_history: list) -> str:
- """根据完整聊天历史生成回复(规则回复方式)"""
- if not chat_history:
- return "你好!感谢联系~"
-
- # 检查最后一条是否是作者发的
- if chat_history[-1]["is_author"]:
- return "" # 不回复
-
- # 找最后一条用户消息
- last_user_msg = chat_history[-1]["content"]
-
- # 简单规则回复
- if "谢谢" in last_user_msg or "感谢" in last_user_msg:
- return "不客气!欢迎常来交流~"
- elif "你好" in last_user_msg or "在吗" in last_user_msg:
- return "你好!请问有什么可以帮您的?"
- elif "视频" in last_user_msg or "怎么拍" in last_user_msg:
- return "视频是用手机拍摄的,注意光线和稳定哦!"
- else:
- return "收到!我会认真阅读您的留言~"
|