| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353 |
- # -*- coding: utf-8 -*-
- """
- 快手视频发布器
- 参考: matrix/ks_uploader/main.py
- """
- import asyncio
- import os
- from datetime import datetime
- from typing import List
- from .base import (
- BasePublisher, PublishParams, PublishResult,
- WorkItem, WorksResult, CommentItem, CommentsResult
- )
- class KuaishouPublisher(BasePublisher):
- """
- 快手视频发布器
- 使用 Playwright 自动化操作快手创作者中心
- """
-
- platform_name = "kuaishou"
- login_url = "https://cp.kuaishou.com/"
- publish_url = "https://cp.kuaishou.com/article/publish/video"
- cookie_domain = ".kuaishou.com"
-
- async def set_schedule_time(self, publish_date: datetime):
- """设置定时发布"""
- if not self.page:
- return
-
- # 选择定时发布
- label_element = self.page.locator("label.radio--4Gpx6:has-text('定时发布')")
- await label_element.click()
- await asyncio.sleep(1)
-
- # 输入时间
- publish_date_str = publish_date.strftime("%Y-%m-%d %H:%M")
- await self.page.locator('.semi-input[placeholder="日期和时间"]').click()
- await self.page.keyboard.press("Control+KeyA")
- await self.page.keyboard.type(str(publish_date_str))
- await self.page.keyboard.press("Enter")
- await asyncio.sleep(1)
-
- async def upload_cover(self, cover_path: str):
- """上传封面图"""
- if not self.page or not cover_path or not os.path.exists(cover_path):
- return
-
- try:
- await self.page.get_by_role("button", name="编辑封面").click()
- await asyncio.sleep(1)
- await self.page.get_by_role("tab", name="上传封面").click()
-
- preview_div = self.page.get_by_role("tabpanel", name="上传封面").locator("div").nth(1)
- async with self.page.expect_file_chooser() as fc_info:
- await preview_div.click()
- preview_chooser = await fc_info.value
- await preview_chooser.set_files(cover_path)
-
- await self.page.get_by_role("button", name="确认").click()
- await asyncio.sleep(3)
-
- print(f"[{self.platform_name}] 封面上传成功")
- except Exception as e:
- print(f"[{self.platform_name}] 封面上传失败: {e}")
-
- async def publish(self, cookies: str, params: PublishParams) -> PublishResult:
- """发布视频到快手"""
- self.report_progress(5, "正在初始化浏览器...")
-
- # 初始化浏览器
- await self.init_browser()
-
- # 解析并设置 cookies
- cookie_list = self.parse_cookies(cookies)
- await self.set_cookies(cookie_list)
-
- if not self.page:
- raise Exception("Page not initialized")
-
- # 检查视频文件
- if not os.path.exists(params.video_path):
- raise Exception(f"视频文件不存在: {params.video_path}")
-
- self.report_progress(10, "正在打开上传页面...")
-
- # 访问上传页面
- await self.page.goto(self.publish_url)
- await self.page.wait_for_url(self.publish_url, timeout=30000)
-
- self.report_progress(15, "正在选择视频文件...")
-
- # 点击上传按钮
- upload_btn = self.page.get_by_role("button", name="上传视频")
- async with self.page.expect_file_chooser() as fc_info:
- await upload_btn.click()
- file_chooser = await fc_info.value
- await file_chooser.set_files(params.video_path)
-
- await asyncio.sleep(1)
-
- # 关闭可能的弹窗
- known_btn = self.page.get_by_role("button", name="我知道了")
- if await known_btn.count():
- await known_btn.click()
-
- self.report_progress(20, "正在填充标题...")
-
- # 填写标题
- await asyncio.sleep(1)
- title_input = self.page.get_by_placeholder('添加合适的话题和描述,作品能获得更多推荐~')
- if await title_input.count():
- await title_input.click()
- await title_input.fill(params.title[:30])
-
- self.report_progress(30, "等待视频上传完成...")
-
- # 等待上传完成
- for _ in range(120):
- try:
- count = await self.page.locator('span:has-text("上传成功")').count()
- if count > 0:
- print(f"[{self.platform_name}] 视频上传完毕")
- break
- await asyncio.sleep(3)
- except:
- await asyncio.sleep(3)
-
- self.report_progress(50, "正在上传封面...")
-
- # 上传封面
- await self.upload_cover(params.cover_path)
-
- # 定时发布(快手暂不支持或选择器有变化)
- # if params.publish_date:
- # await self.set_schedule_time(params.publish_date)
-
- self.report_progress(80, "正在发布...")
-
- # 点击发布
- for _ in range(30):
- try:
- publish_btn = self.page.get_by_role('button', name="发布", exact=True)
- if await publish_btn.count():
- await publish_btn.click()
- await self.page.wait_for_url(
- "https://cp.kuaishou.com/article/manage/video*",
- timeout=5000
- )
- self.report_progress(100, "发布成功")
- return PublishResult(
- success=True,
- platform=self.platform_name,
- message="发布成功"
- )
- except:
- current_url = self.page.url
- if "manage/video" in current_url:
- self.report_progress(100, "发布成功")
- return PublishResult(
- success=True,
- platform=self.platform_name,
- message="发布成功"
- )
- await asyncio.sleep(1)
-
- raise Exception("发布超时")
- async def get_works(self, cookies: str, page: int = 0, page_size: int = 20) -> WorksResult:
- """获取快手作品列表"""
- print(f"\n{'='*60}")
- print(f"[{self.platform_name}] 获取作品列表")
- print(f"[{self.platform_name}] page={page}, page_size={page_size}")
- print(f"{'='*60}")
-
- works: List[WorkItem] = []
- total = 0
- has_more = False
-
- try:
- await self.init_browser()
- cookie_list = self.parse_cookies(cookies)
- await self.set_cookies(cookie_list)
-
- if not self.page:
- raise Exception("Page not initialized")
-
- # 访问创作者中心
- await self.page.goto("https://cp.kuaishou.com/")
- await asyncio.sleep(3)
-
- # 检查登录状态
- current_url = self.page.url
- if "passport" in current_url or "login" in current_url:
- raise Exception("Cookie 已过期,请重新登录")
-
- # 调用作品列表 API
- pcursor = "" if page == 0 else str(page)
- api_url = f"https://cp.kuaishou.com/rest/cp/works/v2/video/pc/photo/list?count={page_size}&pcursor={pcursor}&status=public"
-
- js_code = f"""
- async () => {{
- const resp = await fetch("{api_url}", {{
- credentials: 'include',
- headers: {{ 'Accept': 'application/json' }}
- }});
- return await resp.json();
- }}
- """
-
- response = await self.page.evaluate(js_code)
-
- if response.get('result') == 1:
- data = response.get('data', {})
- photo_list = data.get('list', [])
- has_more = len(photo_list) >= page_size
-
- for photo in photo_list:
- photo_id = photo.get('photoId', '')
- if not photo_id:
- continue
-
- # 封面
- cover_url = photo.get('coverUrl', '')
- if cover_url.startswith('http://'):
- cover_url = cover_url.replace('http://', 'https://')
-
- # 时长
- duration = photo.get('duration', 0) // 1000 # 毫秒转秒
-
- # 发布时间
- create_time = photo.get('timestamp', 0) // 1000
- publish_time = ''
- if create_time:
- from datetime import datetime
- publish_time = datetime.fromtimestamp(create_time).strftime('%Y-%m-%d %H:%M:%S')
-
- works.append(WorkItem(
- work_id=str(photo_id),
- title=photo.get('caption', '') or '无标题',
- cover_url=cover_url,
- duration=duration,
- status='published',
- publish_time=publish_time,
- play_count=photo.get('viewCount', 0),
- like_count=photo.get('likeCount', 0),
- comment_count=photo.get('commentCount', 0),
- share_count=photo.get('shareCount', 0),
- ))
-
- print(f"[{self.platform_name}] 获取到 {len(works)} 个作品")
-
- except Exception as e:
- import traceback
- traceback.print_exc()
- return WorksResult(success=False, platform=self.platform_name, error=str(e))
-
- return WorksResult(success=True, platform=self.platform_name, works=works, total=total or len(works), has_more=has_more)
-
- async def get_comments(self, cookies: str, work_id: str, cursor: str = "") -> CommentsResult:
- """获取快手作品评论"""
- print(f"\n{'='*60}")
- print(f"[{self.platform_name}] 获取作品评论")
- print(f"[{self.platform_name}] work_id={work_id}")
- print(f"{'='*60}")
-
- comments: List[CommentItem] = []
- total = 0
- has_more = False
-
- try:
- await self.init_browser()
- cookie_list = self.parse_cookies(cookies)
- await self.set_cookies(cookie_list)
-
- if not self.page:
- raise Exception("Page not initialized")
-
- await self.page.goto("https://cp.kuaishou.com/")
- await asyncio.sleep(3)
-
- current_url = self.page.url
- if "passport" in current_url or "login" in current_url:
- raise Exception("Cookie 已过期,请重新登录")
-
- # 调用评论列表 API
- pcursor = cursor or ""
- api_url = f"https://cp.kuaishou.com/rest/cp/works/comment/list?photoId={work_id}&pcursor={pcursor}&count=20"
-
- js_code = f"""
- async () => {{
- const resp = await fetch("{api_url}", {{
- credentials: 'include',
- headers: {{ 'Accept': 'application/json' }}
- }});
- return await resp.json();
- }}
- """
-
- response = await self.page.evaluate(js_code)
-
- if response.get('result') == 1:
- data = response.get('data', {})
- comment_list = data.get('list', [])
- has_more = data.get('pcursor', '') != ''
-
- for comment in comment_list:
- cid = comment.get('commentId', '')
- if not cid:
- continue
-
- author = comment.get('author', {})
-
- # 解析子评论
- replies = []
- sub_list = comment.get('subComments', []) or []
- for sub in sub_list:
- sub_author = sub.get('author', {})
- replies.append(CommentItem(
- comment_id=str(sub.get('commentId', '')),
- work_id=work_id,
- content=sub.get('content', ''),
- author_id=str(sub_author.get('id', '')),
- author_name=sub_author.get('name', ''),
- author_avatar=sub_author.get('headurl', ''),
- like_count=sub.get('likeCount', 0),
- create_time=str(sub.get('timestamp', '')),
- ))
-
- comments.append(CommentItem(
- comment_id=str(cid),
- work_id=work_id,
- content=comment.get('content', ''),
- author_id=str(author.get('id', '')),
- author_name=author.get('name', ''),
- author_avatar=author.get('headurl', ''),
- like_count=comment.get('likeCount', 0),
- reply_count=comment.get('subCommentCount', 0),
- create_time=str(comment.get('timestamp', '')),
- replies=replies,
- ))
-
- total = len(comments)
- print(f"[{self.platform_name}] 获取到 {total} 条评论")
-
- except Exception as e:
- import traceback
- traceback.print_exc()
- return CommentsResult(success=False, platform=self.platform_name, work_id=work_id, error=str(e))
-
- return CommentsResult(success=True, platform=self.platform_name, work_id=work_id, comments=comments, total=total, has_more=has_more)
|