#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 多平台视频发布服务 - 统一入口 支持平台: 抖音、小红书、视频号、快手 参考项目: matrix (https://github.com/kebenxiaoming/matrix) 使用方式: python app.py # 启动 HTTP 服务 (端口 5005) python app.py --port 8080 # 指定端口 python app.py --headless false # 显示浏览器窗口 """ import asyncio import os import sys import argparse import traceback from datetime import datetime from pathlib import Path # 确保当前目录在 Python 路径中 CURRENT_DIR = Path(__file__).parent.resolve() if str(CURRENT_DIR) not in sys.path: sys.path.insert(0, str(CURRENT_DIR)) from flask import Flask, request, jsonify from flask_cors import CORS from platforms import get_publisher, PLATFORM_MAP from platforms.base import PublishParams def parse_datetime(date_str: str): """解析日期时间字符串""" if not date_str: return None formats = [ "%Y-%m-%d %H:%M:%S", "%Y-%m-%d %H:%M", "%Y/%m/%d %H:%M:%S", "%Y/%m/%d %H:%M", "%Y-%m-%dT%H:%M:%S", "%Y-%m-%dT%H:%M:%SZ", ] for fmt in formats: try: return datetime.strptime(date_str, fmt) except ValueError: continue return None def validate_video_file(video_path: str) -> bool: """验证视频文件是否有效""" if not video_path: return False if not os.path.exists(video_path): return False if not os.path.isfile(video_path): return False valid_extensions = ['.mp4', '.mov', '.avi', '.mkv', '.flv', '.wmv', '.webm'] ext = os.path.splitext(video_path)[1].lower() if ext not in valid_extensions: return False if os.path.getsize(video_path) < 1024: return False return True # 创建 Flask 应用 app = Flask(__name__) CORS(app) # 全局配置 HEADLESS_MODE = os.environ.get('HEADLESS', 'true').lower() == 'true' # ==================== 签名相关(小红书专用) ==================== @app.route("/sign", methods=["POST"]) def sign_endpoint(): """小红书签名接口""" try: from platforms.xiaohongshu import XiaohongshuPublisher data = request.json publisher = XiaohongshuPublisher(headless=True) result = asyncio.run(publisher.get_sign( data.get("uri", ""), data.get("data"), data.get("a1", ""), data.get("web_session", "") )) return jsonify(result) except Exception as e: traceback.print_exc() return jsonify({"error": str(e)}), 500 # ==================== 统一发布接口 ==================== @app.route("/publish", methods=["POST"]) def publish_video(): """ 统一发布接口 请求体: { "platform": "douyin", # douyin | xiaohongshu | weixin | kuaishou "cookie": "cookie字符串或JSON", "title": "视频标题", "description": "视频描述(可选)", "video_path": "视频文件绝对路径", "cover_path": "封面图片绝对路径(可选)", "tags": ["话题1", "话题2"], "post_time": "定时发布时间(可选,格式:2024-01-20 12:00:00)", "location": "位置(可选,默认:重庆市)" } 响应: { "success": true, "platform": "douyin", "video_id": "xxx", "video_url": "xxx", "message": "发布成功" } """ try: data = request.json # 获取参数 platform = data.get("platform", "").lower() cookie_str = data.get("cookie", "") title = data.get("title", "") description = data.get("description", "") video_path = data.get("video_path", "") cover_path = data.get("cover_path") tags = data.get("tags", []) post_time = data.get("post_time") location = data.get("location", "重庆市") # 调试日志 print(f"[Publish] 收到请求: platform={platform}, title={title}, video_path={video_path}") # 参数验证 if not platform: print("[Publish] 错误: 缺少 platform 参数") return jsonify({"success": False, "error": "缺少 platform 参数"}), 400 if platform not in PLATFORM_MAP: print(f"[Publish] 错误: 不支持的平台 {platform}") return jsonify({ "success": False, "error": f"不支持的平台: {platform},支持: {list(PLATFORM_MAP.keys())}" }), 400 if not cookie_str: print("[Publish] 错误: 缺少 cookie 参数") return jsonify({"success": False, "error": "缺少 cookie 参数"}), 400 if not title: print("[Publish] 错误: 缺少 title 参数") return jsonify({"success": False, "error": "缺少 title 参数"}), 400 if not video_path: print("[Publish] 错误: 缺少 video_path 参数") return jsonify({"success": False, "error": "缺少 video_path 参数"}), 400 # 视频文件验证(增加详细信息) if not os.path.exists(video_path): print(f"[Publish] 错误: 视频文件不存在: {video_path}") return jsonify({"success": False, "error": f"视频文件不存在: {video_path}"}), 400 if not os.path.isfile(video_path): print(f"[Publish] 错误: 路径不是文件: {video_path}") return jsonify({"success": False, "error": f"路径不是文件: {video_path}"}), 400 # 解析发布时间 publish_date = parse_datetime(post_time) if post_time else None # 创建发布参数 params = PublishParams( title=title, video_path=video_path, description=description, cover_path=cover_path, tags=tags, publish_date=publish_date, location=location ) print("=" * 60) print(f"[Publish] 平台: {platform}") print(f"[Publish] 标题: {title}") print(f"[Publish] 视频: {video_path}") print(f"[Publish] 封面: {cover_path}") print(f"[Publish] 话题: {tags}") print(f"[Publish] 定时: {publish_date}") print("=" * 60) # 获取对应平台的发布器 PublisherClass = get_publisher(platform) publisher = PublisherClass(headless=HEADLESS_MODE) # 执行发布 result = asyncio.run(publisher.run(cookie_str, params)) return jsonify({ "success": result.success, "platform": result.platform, "video_id": result.video_id, "video_url": result.video_url, "message": result.message, "error": result.error }) except Exception as e: traceback.print_exc() return jsonify({"success": False, "error": str(e)}), 500 # ==================== 批量发布接口 ==================== @app.route("/publish/batch", methods=["POST"]) def publish_batch(): """ 批量发布接口 - 发布到多个平台 请求体: { "platforms": ["douyin", "xiaohongshu"], "cookies": { "douyin": "cookie字符串", "xiaohongshu": "cookie字符串" }, "title": "视频标题", "video_path": "视频文件绝对路径", ... } """ try: data = request.json platforms = data.get("platforms", []) cookies = data.get("cookies", {}) if not platforms: return jsonify({"success": False, "error": "缺少 platforms 参数"}), 400 results = [] for platform in platforms: platform = platform.lower() cookie_str = cookies.get(platform, "") if not cookie_str: results.append({ "platform": platform, "success": False, "error": f"缺少 {platform} 的 cookie" }) continue try: # 创建参数 params = PublishParams( title=data.get("title", ""), video_path=data.get("video_path", ""), description=data.get("description", ""), cover_path=data.get("cover_path"), tags=data.get("tags", []), publish_date=parse_datetime(data.get("post_time")), location=data.get("location", "重庆市") ) # 发布 PublisherClass = get_publisher(platform) publisher = PublisherClass(headless=HEADLESS_MODE) result = asyncio.run(publisher.run(cookie_str, params)) results.append({ "platform": result.platform, "success": result.success, "video_id": result.video_id, "message": result.message, "error": result.error }) except Exception as e: results.append({ "platform": platform, "success": False, "error": str(e) }) # 统计成功/失败数量 success_count = sum(1 for r in results if r.get("success")) return jsonify({ "success": success_count > 0, "total": len(platforms), "success_count": success_count, "fail_count": len(platforms) - success_count, "results": results }) except Exception as e: traceback.print_exc() return jsonify({"success": False, "error": str(e)}), 500 # ==================== Cookie 验证接口 ==================== @app.route("/check_cookie", methods=["POST"]) def check_cookie(): """检查 cookie 是否有效""" try: data = request.json platform = data.get("platform", "").lower() cookie_str = data.get("cookie", "") if not cookie_str: return jsonify({"valid": False, "error": "缺少 cookie 参数"}), 400 # 目前只支持小红书的 cookie 验证 if platform == "xiaohongshu": try: from platforms.xiaohongshu import XiaohongshuPublisher, XHS_SDK_AVAILABLE if XHS_SDK_AVAILABLE: from xhs import XhsClient publisher = XiaohongshuPublisher() xhs_client = XhsClient(cookie_str, sign=publisher.sign_sync) info = xhs_client.get_self_info() if info: return jsonify({ "valid": True, "user_info": { "user_id": info.get("user_id"), "nickname": info.get("nickname"), "avatar": info.get("images") } }) except Exception as e: return jsonify({"valid": False, "error": str(e)}) # 其他平台返回格式正确但未验证 return jsonify({ "valid": True, "message": "Cookie 格式正确,但未进行在线验证" }) except Exception as e: traceback.print_exc() return jsonify({"valid": False, "error": str(e)}) # ==================== 获取作品列表接口 ==================== @app.route("/works", methods=["POST"]) def get_works(): """ 获取作品列表 请求体: { "platform": "douyin", # douyin | xiaohongshu | kuaishou "cookie": "cookie字符串或JSON", "page": 0, # 页码(从0开始,可选,默认0) "page_size": 20 # 每页数量(可选,默认20) } 响应: { "success": true, "platform": "douyin", "works": [...], "total": 100, "has_more": true } """ try: data = request.json platform = data.get("platform", "").lower() cookie_str = data.get("cookie", "") page = data.get("page", 0) page_size = data.get("page_size", 20) print(f"[Works] 收到请求: platform={platform}, page={page}, page_size={page_size}") if not platform: return jsonify({"success": False, "error": "缺少 platform 参数"}), 400 if platform not in PLATFORM_MAP: return jsonify({ "success": False, "error": f"不支持的平台: {platform},支持: {list(PLATFORM_MAP.keys())}" }), 400 if not cookie_str: return jsonify({"success": False, "error": "缺少 cookie 参数"}), 400 # 获取对应平台的发布器 PublisherClass = get_publisher(platform) publisher = PublisherClass(headless=HEADLESS_MODE) # 执行获取作品 result = asyncio.run(publisher.run_get_works(cookie_str, page, page_size)) return jsonify(result.to_dict()) except Exception as e: traceback.print_exc() return jsonify({"success": False, "error": str(e)}), 500 # ==================== 获取评论列表接口 ==================== @app.route("/comments", methods=["POST"]) def get_comments(): """ 获取作品评论 请求体: { "platform": "douyin", # douyin | xiaohongshu | kuaishou "cookie": "cookie字符串或JSON", "work_id": "作品ID", "cursor": "" # 分页游标(可选) } 响应: { "success": true, "platform": "douyin", "work_id": "xxx", "comments": [...], "total": 50, "has_more": true, "cursor": "xxx" } """ try: data = request.json platform = data.get("platform", "").lower() cookie_str = data.get("cookie", "") work_id = data.get("work_id", "") cursor = data.get("cursor", "") print(f"[Comments] 收到请求: platform={platform}, work_id={work_id}") if not platform: return jsonify({"success": False, "error": "缺少 platform 参数"}), 400 if platform not in PLATFORM_MAP: return jsonify({ "success": False, "error": f"不支持的平台: {platform},支持: {list(PLATFORM_MAP.keys())}" }), 400 if not cookie_str: return jsonify({"success": False, "error": "缺少 cookie 参数"}), 400 if not work_id: return jsonify({"success": False, "error": "缺少 work_id 参数"}), 400 # 获取对应平台的发布器 PublisherClass = get_publisher(platform) publisher = PublisherClass(headless=HEADLESS_MODE) # 执行获取评论 result = asyncio.run(publisher.run_get_comments(cookie_str, work_id, cursor)) result_dict = result.to_dict() # 添加 cursor 到响应 if hasattr(result, '__dict__') and 'cursor' in result.__dict__: result_dict['cursor'] = result.__dict__['cursor'] return jsonify(result_dict) except Exception as e: traceback.print_exc() return jsonify({"success": False, "error": str(e)}), 500 # ==================== 获取所有作品评论接口 ==================== @app.route("/all_comments", methods=["POST"]) def get_all_comments(): """ 获取所有作品的评论(一次性获取) 请求体: { "platform": "douyin", # douyin | xiaohongshu "cookie": "cookie字符串或JSON" } 响应: { "success": true, "platform": "douyin", "work_comments": [ { "work_id": "xxx", "title": "作品标题", "cover_url": "封面URL", "comments": [...] } ], "total": 5 } """ try: data = request.json platform = data.get("platform", "").lower() cookie_str = data.get("cookie", "") print(f"[AllComments] 收到请求: platform={platform}") if not platform: return jsonify({"success": False, "error": "缺少 platform 参数"}), 400 if platform not in ['douyin', 'xiaohongshu']: return jsonify({ "success": False, "error": f"该接口只支持 douyin 和 xiaohongshu 平台" }), 400 if not cookie_str: return jsonify({"success": False, "error": "缺少 cookie 参数"}), 400 # 获取对应平台的发布器 PublisherClass = get_publisher(platform) publisher = PublisherClass(headless=HEADLESS_MODE) # 执行获取所有评论 result = asyncio.run(publisher.get_all_comments(cookie_str)) return jsonify(result) except Exception as e: traceback.print_exc() return jsonify({"success": False, "error": str(e)}), 500 # ==================== 健康检查 ==================== @app.route("/health", methods=["GET"]) def health_check(): """健康检查""" # 检查 xhs SDK 是否可用 xhs_available = False try: from platforms.xiaohongshu import XHS_SDK_AVAILABLE xhs_available = XHS_SDK_AVAILABLE except: pass return jsonify({ "status": "ok", "xhs_sdk": xhs_available, "supported_platforms": list(PLATFORM_MAP.keys()), "headless_mode": HEADLESS_MODE }) @app.route("/", methods=["GET"]) def index(): """首页""" return jsonify({ "name": "多平台视频发布服务", "version": "1.1.0", "endpoints": { "GET /": "服务信息", "GET /health": "健康检查", "POST /publish": "发布视频", "POST /publish/batch": "批量发布", "POST /works": "获取作品列表", "POST /comments": "获取作品评论", "POST /all_comments": "获取所有作品评论", "POST /check_cookie": "检查 Cookie", "POST /sign": "小红书签名" }, "supported_platforms": list(PLATFORM_MAP.keys()) }) # ==================== 命令行启动 ==================== def main(): parser = argparse.ArgumentParser(description='多平台视频发布服务') parser.add_argument('--port', type=int, default=5005, help='服务端口 (默认: 5005)') parser.add_argument('--host', type=str, default='0.0.0.0', help='监听地址 (默认: 0.0.0.0)') parser.add_argument('--headless', type=str, default='true', help='是否无头模式 (默认: true)') parser.add_argument('--debug', action='store_true', help='调试模式') args = parser.parse_args() global HEADLESS_MODE HEADLESS_MODE = args.headless.lower() == 'true' # 检查 xhs SDK xhs_status = "未安装" try: from platforms.xiaohongshu import XHS_SDK_AVAILABLE xhs_status = "已安装" if XHS_SDK_AVAILABLE else "未安装" except: pass print("=" * 60) print("多平台视频发布服务") print("=" * 60) print(f"XHS SDK: {xhs_status}") print(f"Headless 模式: {HEADLESS_MODE}") print(f"支持平台: {', '.join(PLATFORM_MAP.keys())}") print("=" * 60) print(f"启动服务: http://{args.host}:{args.port}") print("=" * 60) app.run(host=args.host, port=args.port, debug=args.debug, threaded=True) if __name__ == '__main__': main()