# -*- coding: utf-8 -*- """ 快手视频发布器 参考: matrix/ks_uploader/main.py """ import asyncio import os from datetime import datetime from typing import List from .base import ( BasePublisher, PublishParams, PublishResult, WorkItem, WorksResult, CommentItem, CommentsResult ) class KuaishouPublisher(BasePublisher): """ 快手视频发布器 使用 Playwright 自动化操作快手创作者中心 """ platform_name = "kuaishou" login_url = "https://cp.kuaishou.com/" publish_url = "https://cp.kuaishou.com/article/publish/video" cookie_domain = ".kuaishou.com" async def set_schedule_time(self, publish_date: datetime): """设置定时发布""" if not self.page: return # 选择定时发布 label_element = self.page.locator("label.radio--4Gpx6:has-text('定时发布')") await label_element.click() await asyncio.sleep(1) # 输入时间 publish_date_str = publish_date.strftime("%Y-%m-%d %H:%M") await self.page.locator('.semi-input[placeholder="日期和时间"]').click() await self.page.keyboard.press("Control+KeyA") await self.page.keyboard.type(str(publish_date_str)) await self.page.keyboard.press("Enter") await asyncio.sleep(1) async def upload_cover(self, cover_path: str): """上传封面图""" if not self.page or not cover_path or not os.path.exists(cover_path): return try: await self.page.get_by_role("button", name="编辑封面").click() await asyncio.sleep(1) await self.page.get_by_role("tab", name="上传封面").click() preview_div = self.page.get_by_role("tabpanel", name="上传封面").locator("div").nth(1) async with self.page.expect_file_chooser() as fc_info: await preview_div.click() preview_chooser = await fc_info.value await preview_chooser.set_files(cover_path) await self.page.get_by_role("button", name="确认").click() await asyncio.sleep(3) print(f"[{self.platform_name}] 封面上传成功") except Exception as e: print(f"[{self.platform_name}] 封面上传失败: {e}") async def publish(self, cookies: str, params: PublishParams) -> PublishResult: """发布视频到快手""" self.report_progress(5, "正在初始化浏览器...") # 初始化浏览器 await self.init_browser() # 解析并设置 cookies cookie_list = self.parse_cookies(cookies) await self.set_cookies(cookie_list) if not self.page: raise Exception("Page not initialized") # 检查视频文件 if not os.path.exists(params.video_path): raise Exception(f"视频文件不存在: {params.video_path}") self.report_progress(10, "正在打开上传页面...") # 访问上传页面 await self.page.goto(self.publish_url) await self.page.wait_for_url(self.publish_url, timeout=30000) self.report_progress(15, "正在选择视频文件...") # 点击上传按钮 upload_btn = self.page.get_by_role("button", name="上传视频") async with self.page.expect_file_chooser() as fc_info: await upload_btn.click() file_chooser = await fc_info.value await file_chooser.set_files(params.video_path) await asyncio.sleep(1) # 关闭可能的弹窗 known_btn = self.page.get_by_role("button", name="我知道了") if await known_btn.count(): await known_btn.click() self.report_progress(20, "正在填充标题...") # 填写标题 await asyncio.sleep(1) title_input = self.page.get_by_placeholder('添加合适的话题和描述,作品能获得更多推荐~') if await title_input.count(): await title_input.click() await title_input.fill(params.title[:30]) self.report_progress(30, "等待视频上传完成...") # 等待上传完成 for _ in range(120): try: count = await self.page.locator('span:has-text("上传成功")').count() if count > 0: print(f"[{self.platform_name}] 视频上传完毕") break await asyncio.sleep(3) except: await asyncio.sleep(3) self.report_progress(50, "正在上传封面...") # 上传封面 await self.upload_cover(params.cover_path) # 定时发布(快手暂不支持或选择器有变化) # if params.publish_date: # await self.set_schedule_time(params.publish_date) self.report_progress(80, "正在发布...") # 点击发布 for _ in range(30): try: publish_btn = self.page.get_by_role('button', name="发布", exact=True) if await publish_btn.count(): await publish_btn.click() await self.page.wait_for_url( "https://cp.kuaishou.com/article/manage/video*", timeout=5000 ) self.report_progress(100, "发布成功") return PublishResult( success=True, platform=self.platform_name, message="发布成功" ) except: current_url = self.page.url if "manage/video" in current_url: self.report_progress(100, "发布成功") return PublishResult( success=True, platform=self.platform_name, message="发布成功" ) await asyncio.sleep(1) raise Exception("发布超时") async def get_works(self, cookies: str, page: int = 0, page_size: int = 20) -> WorksResult: """获取快手作品列表""" print(f"\n{'='*60}") print(f"[{self.platform_name}] 获取作品列表") print(f"[{self.platform_name}] page={page}, page_size={page_size}") print(f"{'='*60}") works: List[WorkItem] = [] total = 0 has_more = False try: await self.init_browser() cookie_list = self.parse_cookies(cookies) await self.set_cookies(cookie_list) if not self.page: raise Exception("Page not initialized") # 访问创作者中心 await self.page.goto("https://cp.kuaishou.com/") await asyncio.sleep(3) # 检查登录状态 current_url = self.page.url if "passport" in current_url or "login" in current_url: raise Exception("Cookie 已过期,请重新登录") # 调用作品列表 API pcursor = "" if page == 0 else str(page) api_url = f"https://cp.kuaishou.com/rest/cp/works/v2/video/pc/photo/list?count={page_size}&pcursor={pcursor}&status=public" js_code = f""" async () => {{ const resp = await fetch("{api_url}", {{ credentials: 'include', headers: {{ 'Accept': 'application/json' }} }}); return await resp.json(); }} """ response = await self.page.evaluate(js_code) if response.get('result') == 1: data = response.get('data', {}) photo_list = data.get('list', []) has_more = len(photo_list) >= page_size for photo in photo_list: photo_id = photo.get('photoId', '') if not photo_id: continue # 封面 cover_url = photo.get('coverUrl', '') if cover_url.startswith('http://'): cover_url = cover_url.replace('http://', 'https://') # 时长 duration = photo.get('duration', 0) // 1000 # 毫秒转秒 # 发布时间 create_time = photo.get('timestamp', 0) // 1000 publish_time = '' if create_time: from datetime import datetime publish_time = datetime.fromtimestamp(create_time).strftime('%Y-%m-%d %H:%M:%S') works.append(WorkItem( work_id=str(photo_id), title=photo.get('caption', '') or '无标题', cover_url=cover_url, duration=duration, status='published', publish_time=publish_time, play_count=photo.get('viewCount', 0), like_count=photo.get('likeCount', 0), comment_count=photo.get('commentCount', 0), share_count=photo.get('shareCount', 0), )) print(f"[{self.platform_name}] 获取到 {len(works)} 个作品") except Exception as e: import traceback traceback.print_exc() return WorksResult(success=False, platform=self.platform_name, error=str(e)) return WorksResult(success=True, platform=self.platform_name, works=works, total=total or len(works), has_more=has_more) async def get_comments(self, cookies: str, work_id: str, cursor: str = "") -> CommentsResult: """获取快手作品评论""" print(f"\n{'='*60}") print(f"[{self.platform_name}] 获取作品评论") print(f"[{self.platform_name}] work_id={work_id}") print(f"{'='*60}") comments: List[CommentItem] = [] total = 0 has_more = False try: await self.init_browser() cookie_list = self.parse_cookies(cookies) await self.set_cookies(cookie_list) if not self.page: raise Exception("Page not initialized") await self.page.goto("https://cp.kuaishou.com/") await asyncio.sleep(3) current_url = self.page.url if "passport" in current_url or "login" in current_url: raise Exception("Cookie 已过期,请重新登录") # 调用评论列表 API pcursor = cursor or "" api_url = f"https://cp.kuaishou.com/rest/cp/works/comment/list?photoId={work_id}&pcursor={pcursor}&count=20" js_code = f""" async () => {{ const resp = await fetch("{api_url}", {{ credentials: 'include', headers: {{ 'Accept': 'application/json' }} }}); return await resp.json(); }} """ response = await self.page.evaluate(js_code) if response.get('result') == 1: data = response.get('data', {}) comment_list = data.get('list', []) has_more = data.get('pcursor', '') != '' for comment in comment_list: cid = comment.get('commentId', '') if not cid: continue author = comment.get('author', {}) # 解析子评论 replies = [] sub_list = comment.get('subComments', []) or [] for sub in sub_list: sub_author = sub.get('author', {}) replies.append(CommentItem( comment_id=str(sub.get('commentId', '')), work_id=work_id, content=sub.get('content', ''), author_id=str(sub_author.get('id', '')), author_name=sub_author.get('name', ''), author_avatar=sub_author.get('headurl', ''), like_count=sub.get('likeCount', 0), create_time=str(sub.get('timestamp', '')), )) comments.append(CommentItem( comment_id=str(cid), work_id=work_id, content=comment.get('content', ''), author_id=str(author.get('id', '')), author_name=author.get('name', ''), author_avatar=author.get('headurl', ''), like_count=comment.get('likeCount', 0), reply_count=comment.get('subCommentCount', 0), create_time=str(comment.get('timestamp', '')), replies=replies, )) total = len(comments) print(f"[{self.platform_name}] 获取到 {total} 条评论") except Exception as e: import traceback traceback.print_exc() return CommentsResult(success=False, platform=self.platform_name, work_id=work_id, error=str(e)) return CommentsResult(success=True, platform=self.platform_name, work_id=work_id, comments=comments, total=total, has_more=has_more)