# -*- coding: utf-8 -*- """ 快手视频发布器 参考: matrix/ks_uploader/main.py """ import asyncio import os from datetime import datetime from typing import List from .base import ( BasePublisher, PublishParams, PublishResult, WorkItem, WorksResult, CommentItem, CommentsResult ) class KuaishouPublisher(BasePublisher): """ 快手视频发布器 使用 Playwright 自动化操作快手创作者中心 """ platform_name = "kuaishou" login_url = "https://cp.kuaishou.com/" publish_url = "https://cp.kuaishou.com/article/publish/video" cookie_domain = ".kuaishou.com" async def set_schedule_time(self, publish_date: datetime): """设置定时发布""" if not self.page: return # 选择定时发布 label_element = self.page.locator("label.radio--4Gpx6:has-text('定时发布')") await label_element.click() await asyncio.sleep(1) # 输入时间 publish_date_str = publish_date.strftime("%Y-%m-%d %H:%M") await self.page.locator('.semi-input[placeholder="日期和时间"]').click() await self.page.keyboard.press("Control+KeyA") await self.page.keyboard.type(str(publish_date_str)) await self.page.keyboard.press("Enter") await asyncio.sleep(1) async def upload_cover(self, cover_path: str): """上传封面图""" if not self.page or not cover_path or not os.path.exists(cover_path): return try: await self.page.get_by_role("button", name="编辑封面").click() await asyncio.sleep(1) await self.page.get_by_role("tab", name="上传封面").click() preview_div = self.page.get_by_role("tabpanel", name="上传封面").locator("div").nth(1) async with self.page.expect_file_chooser() as fc_info: await preview_div.click() preview_chooser = await fc_info.value await preview_chooser.set_files(cover_path) await self.page.get_by_role("button", name="确认").click() await asyncio.sleep(3) print(f"[{self.platform_name}] 封面上传成功") except Exception as e: print(f"[{self.platform_name}] 封面上传失败: {e}") async def publish(self, cookies: str, params: PublishParams) -> PublishResult: """发布视频到快手 - 参考 matrix/ks_uploader/main.py""" print(f"\n{'='*60}") print(f"[{self.platform_name}] 开始发布视频") print(f"[{self.platform_name}] 视频路径: {params.video_path}") print(f"[{self.platform_name}] 标题: {params.title}") print(f"[{self.platform_name}] Headless: {self.headless}") print(f"{'='*60}") self.report_progress(5, "正在初始化浏览器...") # 初始化浏览器 await self.init_browser() print(f"[{self.platform_name}] 浏览器初始化完成") # 解析并设置 cookies cookie_list = self.parse_cookies(cookies) print(f"[{self.platform_name}] 解析到 {len(cookie_list)} 个 cookies") await self.set_cookies(cookie_list) if not self.page: raise Exception("Page not initialized") # 检查视频文件 if not os.path.exists(params.video_path): raise Exception(f"视频文件不存在: {params.video_path}") print(f"[{self.platform_name}] 视频文件存在,大小: {os.path.getsize(params.video_path)} bytes") self.report_progress(10, "正在打开上传页面...") # 访问上传页面 - 参考 matrix await self.page.goto("https://cp.kuaishou.com/article/publish/video") print(f"[{self.platform_name}] 等待页面加载...") try: await self.page.wait_for_url("https://cp.kuaishou.com/article/publish/video", timeout=30000) except: pass await asyncio.sleep(3) # 检查是否跳转到登录页 current_url = self.page.url print(f"[{self.platform_name}] 当前 URL: {current_url}") if "passport" in current_url or "login" in current_url: screenshot_base64 = await self.capture_screenshot() return PublishResult( success=False, platform=self.platform_name, error="Cookie 已过期,需要重新登录", need_captcha=True, captcha_type='login', screenshot_base64=screenshot_base64, page_url=current_url, status='need_captcha' ) # 使用 AI 检查验证码 ai_captcha = await self.ai_check_captcha() if ai_captcha['has_captcha']: print(f"[{self.platform_name}] AI检测到验证码: {ai_captcha['captcha_type']}", flush=True) screenshot_base64 = await self.capture_screenshot() return PublishResult( success=False, platform=self.platform_name, error=f"检测到{ai_captcha['captcha_type']}验证码,需要使用有头浏览器完成验证", need_captcha=True, captcha_type=ai_captcha['captcha_type'], screenshot_base64=screenshot_base64, page_url=current_url, status='need_captcha' ) self.report_progress(15, "正在选择视频文件...") # 点击上传按钮 - 参考 matrix: page.get_by_role("button", name="上传视频") upload_btn = self.page.get_by_role("button", name="上传视频") async with self.page.expect_file_chooser(timeout=10000) as fc_info: await upload_btn.click() file_chooser = await fc_info.value await file_chooser.set_files(params.video_path) print(f"[{self.platform_name}] 视频文件已选择") await asyncio.sleep(1) # 关闭可能的弹窗 - 参考 matrix known_btn = self.page.get_by_role("button", name="我知道了") if await known_btn.count(): await known_btn.click() print(f"[{self.platform_name}] 关闭弹窗") self.report_progress(20, "正在填充标题...") # 填写标题 - 参考 matrix await asyncio.sleep(1) title_input = self.page.get_by_placeholder('添加合适的话题和描述,作品能获得更多推荐~') if await title_input.count(): await title_input.click() await title_input.fill(params.title[:30]) print(f"[{self.platform_name}] 标题已填写") self.report_progress(30, "等待视频上传完成...") # 等待上传完成 - 参考 matrix: span:has-text("上传成功") for i in range(120): try: count = await self.page.locator('span:has-text("上传成功")').count() if count > 0: print(f"[{self.platform_name}] 视频上传完毕") break else: print(f"[{self.platform_name}] 正在上传视频中... {i+1}/120") await asyncio.sleep(3) except: print(f"[{self.platform_name}] 正在上传视频中...") await asyncio.sleep(3) self.report_progress(50, "正在上传封面...") # 上传封面 - 参考 matrix await self.upload_cover(params.cover_path) await asyncio.sleep(5) self.report_progress(80, "正在发布...") # 点击发布 - 参考 matrix for i in range(30): try: publish_btn = self.page.get_by_role('button', name="发布", exact=True) if await publish_btn.count(): print(f"[{self.platform_name}] 点击发布按钮...") await publish_btn.click() # 等待跳转到管理页面 - 参考 matrix: https://cp.kuaishou.com/article/manage/video?status=2&from=publish await self.page.wait_for_url( "https://cp.kuaishou.com/article/manage/video*", timeout=1500 ) self.report_progress(100, "发布成功") print(f"[{self.platform_name}] 视频发布成功!") screenshot_base64 = await self.capture_screenshot() return PublishResult( success=True, platform=self.platform_name, message="发布成功", screenshot_base64=screenshot_base64, page_url=self.page.url, status='success' ) except Exception as e: current_url = self.page.url if "manage/video" in current_url: self.report_progress(100, "发布成功") print(f"[{self.platform_name}] 视频发布成功!") screenshot_base64 = await self.capture_screenshot() return PublishResult( success=True, platform=self.platform_name, message="发布成功", screenshot_base64=screenshot_base64, page_url=current_url, status='success' ) else: print(f"[{self.platform_name}] 视频正在发布中... {i+1}/30") await asyncio.sleep(0.5) # 发布超时 screenshot_base64 = await self.capture_screenshot() page_url = await self.get_page_url() return PublishResult( success=False, platform=self.platform_name, error="发布超时,请检查发布状态", screenshot_base64=screenshot_base64, page_url=page_url, status='need_action' ) async def get_works(self, cookies: str, page: int = 0, page_size: int = 20) -> WorksResult: """获取快手作品列表""" print(f"\n{'='*60}") print(f"[{self.platform_name}] 获取作品列表") print(f"[{self.platform_name}] page={page}, page_size={page_size}") print(f"{'='*60}") works: List[WorkItem] = [] total = 0 has_more = False try: await self.init_browser() cookie_list = self.parse_cookies(cookies) await self.set_cookies(cookie_list) if not self.page: raise Exception("Page not initialized") # 访问创作者中心 await self.page.goto("https://cp.kuaishou.com/") await asyncio.sleep(3) # 检查登录状态 current_url = self.page.url if "passport" in current_url or "login" in current_url: raise Exception("Cookie 已过期,请重新登录") # 调用作品列表 API pcursor = "" if page == 0 else str(page) api_url = f"https://cp.kuaishou.com/rest/cp/works/v2/video/pc/photo/list?count={page_size}&pcursor={pcursor}&status=public" js_code = f""" async () => {{ const resp = await fetch("{api_url}", {{ credentials: 'include', headers: {{ 'Accept': 'application/json' }} }}); return await resp.json(); }} """ response = await self.page.evaluate(js_code) if response.get('result') == 1: data = response.get('data', {}) photo_list = data.get('list', []) has_more = len(photo_list) >= page_size for photo in photo_list: photo_id = photo.get('photoId', '') if not photo_id: continue # 封面 cover_url = photo.get('coverUrl', '') if cover_url.startswith('http://'): cover_url = cover_url.replace('http://', 'https://') # 时长 duration = photo.get('duration', 0) // 1000 # 毫秒转秒 # 发布时间 create_time = photo.get('timestamp', 0) // 1000 publish_time = '' if create_time: from datetime import datetime publish_time = datetime.fromtimestamp(create_time).strftime('%Y-%m-%d %H:%M:%S') works.append(WorkItem( work_id=str(photo_id), title=photo.get('caption', '') or '无标题', cover_url=cover_url, duration=duration, status='published', publish_time=publish_time, play_count=photo.get('viewCount', 0), like_count=photo.get('likeCount', 0), comment_count=photo.get('commentCount', 0), share_count=photo.get('shareCount', 0), )) print(f"[{self.platform_name}] 获取到 {len(works)} 个作品") except Exception as e: import traceback traceback.print_exc() return WorksResult(success=False, platform=self.platform_name, error=str(e)) return WorksResult(success=True, platform=self.platform_name, works=works, total=total or len(works), has_more=has_more) async def get_comments(self, cookies: str, work_id: str, cursor: str = "") -> CommentsResult: """获取快手作品评论""" print(f"\n{'='*60}") print(f"[{self.platform_name}] 获取作品评论") print(f"[{self.platform_name}] work_id={work_id}") print(f"{'='*60}") comments: List[CommentItem] = [] total = 0 has_more = False try: await self.init_browser() cookie_list = self.parse_cookies(cookies) await self.set_cookies(cookie_list) if not self.page: raise Exception("Page not initialized") await self.page.goto("https://cp.kuaishou.com/") await asyncio.sleep(3) current_url = self.page.url if "passport" in current_url or "login" in current_url: raise Exception("Cookie 已过期,请重新登录") # 调用评论列表 API pcursor = cursor or "" api_url = f"https://cp.kuaishou.com/rest/cp/works/comment/list?photoId={work_id}&pcursor={pcursor}&count=20" js_code = f""" async () => {{ const resp = await fetch("{api_url}", {{ credentials: 'include', headers: {{ 'Accept': 'application/json' }} }}); return await resp.json(); }} """ response = await self.page.evaluate(js_code) if response.get('result') == 1: data = response.get('data', {}) comment_list = data.get('list', []) has_more = data.get('pcursor', '') != '' for comment in comment_list: cid = comment.get('commentId', '') if not cid: continue author = comment.get('author', {}) # 解析子评论 replies = [] sub_list = comment.get('subComments', []) or [] for sub in sub_list: sub_author = sub.get('author', {}) replies.append(CommentItem( comment_id=str(sub.get('commentId', '')), work_id=work_id, content=sub.get('content', ''), author_id=str(sub_author.get('id', '')), author_name=sub_author.get('name', ''), author_avatar=sub_author.get('headurl', ''), like_count=sub.get('likeCount', 0), create_time=str(sub.get('timestamp', '')), )) comments.append(CommentItem( comment_id=str(cid), work_id=work_id, content=comment.get('content', ''), author_id=str(author.get('id', '')), author_name=author.get('name', ''), author_avatar=author.get('headurl', ''), like_count=comment.get('likeCount', 0), reply_count=comment.get('subCommentCount', 0), create_time=str(comment.get('timestamp', '')), replies=replies, )) total = len(comments) print(f"[{self.platform_name}] 获取到 {total} 条评论") except Exception as e: import traceback traceback.print_exc() return CommentsResult(success=False, platform=self.platform_name, work_id=work_id, error=str(e)) return CommentsResult(success=True, platform=self.platform_name, work_id=work_id, comments=comments, total=total, has_more=has_more)