| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433 |
- # -*- coding: utf-8 -*-
- """
- 快手视频发布器
- 参考: matrix/ks_uploader/main.py
- """
- import asyncio
- import os
- from datetime import datetime
- from typing import List
- from .base import (
- BasePublisher, PublishParams, PublishResult,
- WorkItem, WorksResult, CommentItem, CommentsResult
- )
- class KuaishouPublisher(BasePublisher):
- """
- 快手视频发布器
- 使用 Playwright 自动化操作快手创作者中心
- """
-
- platform_name = "kuaishou"
- login_url = "https://cp.kuaishou.com/"
- publish_url = "https://cp.kuaishou.com/article/publish/video"
- cookie_domain = ".kuaishou.com"
-
- async def set_schedule_time(self, publish_date: datetime):
- """设置定时发布"""
- if not self.page:
- return
-
- # 选择定时发布
- label_element = self.page.locator("label.radio--4Gpx6:has-text('定时发布')")
- await label_element.click()
- await asyncio.sleep(1)
-
- # 输入时间
- publish_date_str = publish_date.strftime("%Y-%m-%d %H:%M")
- await self.page.locator('.semi-input[placeholder="日期和时间"]').click()
- await self.page.keyboard.press("Control+KeyA")
- await self.page.keyboard.type(str(publish_date_str))
- await self.page.keyboard.press("Enter")
- await asyncio.sleep(1)
-
- async def upload_cover(self, cover_path: str):
- """上传封面图"""
- if not self.page or not cover_path or not os.path.exists(cover_path):
- return
-
- try:
- await self.page.get_by_role("button", name="编辑封面").click()
- await asyncio.sleep(1)
- await self.page.get_by_role("tab", name="上传封面").click()
-
- preview_div = self.page.get_by_role("tabpanel", name="上传封面").locator("div").nth(1)
- async with self.page.expect_file_chooser() as fc_info:
- await preview_div.click()
- preview_chooser = await fc_info.value
- await preview_chooser.set_files(cover_path)
-
- await self.page.get_by_role("button", name="确认").click()
- await asyncio.sleep(3)
-
- print(f"[{self.platform_name}] 封面上传成功")
- except Exception as e:
- print(f"[{self.platform_name}] 封面上传失败: {e}")
-
- async def publish(self, cookies: str, params: PublishParams) -> PublishResult:
- """发布视频到快手 - 参考 matrix/ks_uploader/main.py"""
- print(f"\n{'='*60}")
- print(f"[{self.platform_name}] 开始发布视频")
- print(f"[{self.platform_name}] 视频路径: {params.video_path}")
- print(f"[{self.platform_name}] 标题: {params.title}")
- print(f"[{self.platform_name}] Headless: {self.headless}")
- print(f"{'='*60}")
-
- self.report_progress(5, "正在初始化浏览器...")
-
- # 初始化浏览器
- await self.init_browser()
- print(f"[{self.platform_name}] 浏览器初始化完成")
-
- # 解析并设置 cookies
- cookie_list = self.parse_cookies(cookies)
- print(f"[{self.platform_name}] 解析到 {len(cookie_list)} 个 cookies")
- await self.set_cookies(cookie_list)
-
- if not self.page:
- raise Exception("Page not initialized")
-
- # 检查视频文件
- if not os.path.exists(params.video_path):
- raise Exception(f"视频文件不存在: {params.video_path}")
-
- print(f"[{self.platform_name}] 视频文件存在,大小: {os.path.getsize(params.video_path)} bytes")
-
- self.report_progress(10, "正在打开上传页面...")
-
- # 访问上传页面 - 参考 matrix
- await self.page.goto("https://cp.kuaishou.com/article/publish/video")
- print(f"[{self.platform_name}] 等待页面加载...")
-
- try:
- await self.page.wait_for_url("https://cp.kuaishou.com/article/publish/video", timeout=30000)
- except:
- pass
-
- await asyncio.sleep(3)
-
- # 检查是否跳转到登录页
- current_url = self.page.url
- print(f"[{self.platform_name}] 当前 URL: {current_url}")
-
- if "passport" in current_url or "login" in current_url:
- screenshot_base64 = await self.capture_screenshot()
- return PublishResult(
- success=False,
- platform=self.platform_name,
- error="Cookie 已过期,需要重新登录",
- need_captcha=True,
- captcha_type='login',
- screenshot_base64=screenshot_base64,
- page_url=current_url,
- status='need_captcha'
- )
-
- # 使用 AI 检查验证码
- ai_captcha = await self.ai_check_captcha()
- if ai_captcha['has_captcha']:
- print(f"[{self.platform_name}] AI检测到验证码: {ai_captcha['captcha_type']}", flush=True)
- screenshot_base64 = await self.capture_screenshot()
- return PublishResult(
- success=False,
- platform=self.platform_name,
- error=f"检测到{ai_captcha['captcha_type']}验证码,需要使用有头浏览器完成验证",
- need_captcha=True,
- captcha_type=ai_captcha['captcha_type'],
- screenshot_base64=screenshot_base64,
- page_url=current_url,
- status='need_captcha'
- )
-
- self.report_progress(15, "正在选择视频文件...")
-
- # 点击上传按钮 - 参考 matrix: page.get_by_role("button", name="上传视频")
- upload_btn = self.page.get_by_role("button", name="上传视频")
- async with self.page.expect_file_chooser(timeout=10000) as fc_info:
- await upload_btn.click()
- file_chooser = await fc_info.value
- await file_chooser.set_files(params.video_path)
- print(f"[{self.platform_name}] 视频文件已选择")
-
- await asyncio.sleep(1)
-
- # 关闭可能的弹窗 - 参考 matrix
- known_btn = self.page.get_by_role("button", name="我知道了")
- if await known_btn.count():
- await known_btn.click()
- print(f"[{self.platform_name}] 关闭弹窗")
-
- self.report_progress(20, "正在填充标题...")
-
- # 填写标题 - 参考 matrix
- await asyncio.sleep(1)
- title_input = self.page.get_by_placeholder('添加合适的话题和描述,作品能获得更多推荐~')
- if await title_input.count():
- await title_input.click()
- await title_input.fill(params.title[:30])
- print(f"[{self.platform_name}] 标题已填写")
-
- self.report_progress(30, "等待视频上传完成...")
-
- # 等待上传完成 - 参考 matrix: span:has-text("上传成功")
- for i in range(120):
- try:
- count = await self.page.locator('span:has-text("上传成功")').count()
- if count > 0:
- print(f"[{self.platform_name}] 视频上传完毕")
- break
- else:
- print(f"[{self.platform_name}] 正在上传视频中... {i+1}/120")
- await asyncio.sleep(3)
- except:
- print(f"[{self.platform_name}] 正在上传视频中...")
- await asyncio.sleep(3)
-
- self.report_progress(50, "正在上传封面...")
-
- # 上传封面 - 参考 matrix
- await self.upload_cover(params.cover_path)
-
- await asyncio.sleep(5)
-
- self.report_progress(80, "正在发布...")
-
- # 点击发布 - 参考 matrix
- for i in range(30):
- try:
- publish_btn = self.page.get_by_role('button', name="发布", exact=True)
- if await publish_btn.count():
- print(f"[{self.platform_name}] 点击发布按钮...")
- await publish_btn.click()
-
- # 等待跳转到管理页面 - 参考 matrix: https://cp.kuaishou.com/article/manage/video?status=2&from=publish
- await self.page.wait_for_url(
- "https://cp.kuaishou.com/article/manage/video*",
- timeout=1500
- )
- self.report_progress(100, "发布成功")
- print(f"[{self.platform_name}] 视频发布成功!")
- screenshot_base64 = await self.capture_screenshot()
- return PublishResult(
- success=True,
- platform=self.platform_name,
- message="发布成功",
- screenshot_base64=screenshot_base64,
- page_url=self.page.url,
- status='success'
- )
- except Exception as e:
- current_url = self.page.url
- if "manage/video" in current_url:
- self.report_progress(100, "发布成功")
- print(f"[{self.platform_name}] 视频发布成功!")
- screenshot_base64 = await self.capture_screenshot()
- return PublishResult(
- success=True,
- platform=self.platform_name,
- message="发布成功",
- screenshot_base64=screenshot_base64,
- page_url=current_url,
- status='success'
- )
- else:
- print(f"[{self.platform_name}] 视频正在发布中... {i+1}/30")
- await asyncio.sleep(0.5)
-
- # 发布超时
- screenshot_base64 = await self.capture_screenshot()
- page_url = await self.get_page_url()
- return PublishResult(
- success=False,
- platform=self.platform_name,
- error="发布超时,请检查发布状态",
- screenshot_base64=screenshot_base64,
- page_url=page_url,
- status='need_action'
- )
- async def get_works(self, cookies: str, page: int = 0, page_size: int = 20) -> WorksResult:
- """获取快手作品列表"""
- print(f"\n{'='*60}")
- print(f"[{self.platform_name}] 获取作品列表")
- print(f"[{self.platform_name}] page={page}, page_size={page_size}")
- print(f"{'='*60}")
-
- works: List[WorkItem] = []
- total = 0
- has_more = False
-
- try:
- await self.init_browser()
- cookie_list = self.parse_cookies(cookies)
- await self.set_cookies(cookie_list)
-
- if not self.page:
- raise Exception("Page not initialized")
-
- # 访问创作者中心
- await self.page.goto("https://cp.kuaishou.com/")
- await asyncio.sleep(3)
-
- # 检查登录状态
- current_url = self.page.url
- if "passport" in current_url or "login" in current_url:
- raise Exception("Cookie 已过期,请重新登录")
-
- # 调用作品列表 API
- pcursor = "" if page == 0 else str(page)
- api_url = f"https://cp.kuaishou.com/rest/cp/works/v2/video/pc/photo/list?count={page_size}&pcursor={pcursor}&status=public"
-
- js_code = f"""
- async () => {{
- const resp = await fetch("{api_url}", {{
- credentials: 'include',
- headers: {{ 'Accept': 'application/json' }}
- }});
- return await resp.json();
- }}
- """
-
- response = await self.page.evaluate(js_code)
-
- if response.get('result') == 1:
- data = response.get('data', {})
- photo_list = data.get('list', [])
- has_more = len(photo_list) >= page_size
-
- for photo in photo_list:
- photo_id = photo.get('photoId', '')
- if not photo_id:
- continue
-
- # 封面
- cover_url = photo.get('coverUrl', '')
- if cover_url.startswith('http://'):
- cover_url = cover_url.replace('http://', 'https://')
-
- # 时长
- duration = photo.get('duration', 0) // 1000 # 毫秒转秒
-
- # 发布时间
- create_time = photo.get('timestamp', 0) // 1000
- publish_time = ''
- if create_time:
- from datetime import datetime
- publish_time = datetime.fromtimestamp(create_time).strftime('%Y-%m-%d %H:%M:%S')
-
- works.append(WorkItem(
- work_id=str(photo_id),
- title=photo.get('caption', '') or '无标题',
- cover_url=cover_url,
- duration=duration,
- status='published',
- publish_time=publish_time,
- play_count=photo.get('viewCount', 0),
- like_count=photo.get('likeCount', 0),
- comment_count=photo.get('commentCount', 0),
- share_count=photo.get('shareCount', 0),
- ))
-
- print(f"[{self.platform_name}] 获取到 {len(works)} 个作品")
-
- except Exception as e:
- import traceback
- traceback.print_exc()
- return WorksResult(success=False, platform=self.platform_name, error=str(e))
-
- return WorksResult(success=True, platform=self.platform_name, works=works, total=total or len(works), has_more=has_more)
-
- async def get_comments(self, cookies: str, work_id: str, cursor: str = "") -> CommentsResult:
- """获取快手作品评论"""
- print(f"\n{'='*60}")
- print(f"[{self.platform_name}] 获取作品评论")
- print(f"[{self.platform_name}] work_id={work_id}")
- print(f"{'='*60}")
-
- comments: List[CommentItem] = []
- total = 0
- has_more = False
-
- try:
- await self.init_browser()
- cookie_list = self.parse_cookies(cookies)
- await self.set_cookies(cookie_list)
-
- if not self.page:
- raise Exception("Page not initialized")
-
- await self.page.goto("https://cp.kuaishou.com/")
- await asyncio.sleep(3)
-
- current_url = self.page.url
- if "passport" in current_url or "login" in current_url:
- raise Exception("Cookie 已过期,请重新登录")
-
- # 调用评论列表 API
- pcursor = cursor or ""
- api_url = f"https://cp.kuaishou.com/rest/cp/works/comment/list?photoId={work_id}&pcursor={pcursor}&count=20"
-
- js_code = f"""
- async () => {{
- const resp = await fetch("{api_url}", {{
- credentials: 'include',
- headers: {{ 'Accept': 'application/json' }}
- }});
- return await resp.json();
- }}
- """
-
- response = await self.page.evaluate(js_code)
-
- if response.get('result') == 1:
- data = response.get('data', {})
- comment_list = data.get('list', [])
- has_more = data.get('pcursor', '') != ''
-
- for comment in comment_list:
- cid = comment.get('commentId', '')
- if not cid:
- continue
-
- author = comment.get('author', {})
-
- # 解析子评论
- replies = []
- sub_list = comment.get('subComments', []) or []
- for sub in sub_list:
- sub_author = sub.get('author', {})
- replies.append(CommentItem(
- comment_id=str(sub.get('commentId', '')),
- work_id=work_id,
- content=sub.get('content', ''),
- author_id=str(sub_author.get('id', '')),
- author_name=sub_author.get('name', ''),
- author_avatar=sub_author.get('headurl', ''),
- like_count=sub.get('likeCount', 0),
- create_time=str(sub.get('timestamp', '')),
- ))
-
- comments.append(CommentItem(
- comment_id=str(cid),
- work_id=work_id,
- content=comment.get('content', ''),
- author_id=str(author.get('id', '')),
- author_name=author.get('name', ''),
- author_avatar=author.get('headurl', ''),
- like_count=comment.get('likeCount', 0),
- reply_count=comment.get('subCommentCount', 0),
- create_time=str(comment.get('timestamp', '')),
- replies=replies,
- ))
-
- total = len(comments)
- print(f"[{self.platform_name}] 获取到 {total} 条评论")
-
- except Exception as e:
- import traceback
- traceback.print_exc()
- return CommentsResult(success=False, platform=self.platform_name, work_id=work_id, error=str(e))
-
- return CommentsResult(success=True, platform=self.platform_name, work_id=work_id, comments=comments, total=total, has_more=has_more)
|