#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 多平台视频发布服务 - 统一入口 支持平台: 抖音、小红书、视频号、快手 参考项目: matrix (https://github.com/kebenxiaoming/matrix) 使用方式: python app.py # 启动 HTTP 服务 (端口 5005) python app.py --port 8080 # 指定端口 python app.py --headless false # 显示浏览器窗口 """ import asyncio import os import sys import argparse # 禁用输出缓冲,确保 print 立即输出 os.environ['PYTHONUNBUFFERED'] = '1' # 修复 Windows 终端中文输出乱码 if sys.platform == 'win32': import io sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8', errors='replace', line_buffering=True) sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding='utf-8', errors='replace', line_buffering=True) # 设置环境变量 os.environ['PYTHONIOENCODING'] = 'utf-8' import traceback import requests from datetime import datetime, date from pathlib import Path # 确保当前目录在 Python 路径中 CURRENT_DIR = Path(__file__).parent.resolve() if str(CURRENT_DIR) not in sys.path: sys.path.insert(0, str(CURRENT_DIR)) # 从 server/.env 文件加载环境变量 def load_env_file(): """从 server/.env 文件加载环境变量""" env_path = CURRENT_DIR.parent / '.env' if env_path.exists(): print(f"[Config] Loading env from: {env_path}") with open(env_path, 'r', encoding='utf-8') as f: for line in f: line = line.strip() if line and not line.startswith('#') and '=' in line: key, value = line.split('=', 1) key = key.strip() value = value.strip() # 移除引号 if value.startswith('"') and value.endswith('"'): value = value[1:-1] elif value.startswith("'") and value.endswith("'"): value = value[1:-1] # 只在环境变量未设置时加载 if key not in os.environ: os.environ[key] = value print(f"[Config] Loaded: {key}=***" if 'PASSWORD' in key or 'SECRET' in key else f"[Config] Loaded: {key}={value}") else: print(f"[Config] .env file not found: {env_path}") # 加载环境变量 load_env_file() from flask import Flask, request, jsonify from flask_cors import CORS from platforms import get_publisher, PLATFORM_MAP from platforms.base import PublishParams def parse_datetime(date_str: str): """解析日期时间字符串""" if not date_str: return None formats = [ "%Y-%m-%d %H:%M:%S", "%Y-%m-%d %H:%M", "%Y/%m/%d %H:%M:%S", "%Y/%m/%d %H:%M", "%Y-%m-%dT%H:%M:%S", "%Y-%m-%dT%H:%M:%SZ", ] for fmt in formats: try: return datetime.strptime(date_str, fmt) except ValueError: continue return None def validate_video_file(video_path: str) -> bool: """验证视频文件是否有效""" if not video_path: return False if not os.path.exists(video_path): return False if not os.path.isfile(video_path): return False valid_extensions = ['.mp4', '.mov', '.avi', '.mkv', '.flv', '.wmv', '.webm'] ext = os.path.splitext(video_path)[1].lower() if ext not in valid_extensions: return False if os.path.getsize(video_path) < 1024: return False return True # 创建 Flask 应用 app = Flask(__name__) CORS(app) # 配置日志以显示所有 HTTP 请求 import logging logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s') # 让 werkzeug 日志显示 werkzeug_logger = logging.getLogger('werkzeug') werkzeug_logger.setLevel(logging.INFO) # 添加 StreamHandler 确保输出到控制台 handler = logging.StreamHandler(sys.stdout) handler.setLevel(logging.DEBUG) werkzeug_logger.addHandler(handler) # 添加请求钩子,打印所有收到的请求 @app.before_request def log_request_info(): """在处理每个请求前打印详细信息""" print(f"\n{'='*60}", flush=True) print(f"[HTTP Request] {request.method} {request.path}", flush=True) print(f"[HTTP Request] From: {request.remote_addr}", flush=True) if request.content_type and 'json' in request.content_type: try: data = request.get_json(silent=True) if data: # 打印部分参数,避免太长 keys = list(data.keys()) if data else [] print(f"[HTTP Request] JSON keys: {keys}", flush=True) except: pass print(f"{'='*60}\n", flush=True) # 全局配置 HEADLESS_MODE = os.environ.get('HEADLESS', 'true').lower() == 'true' print(f"[Config] HEADLESS env value: '{os.environ.get('HEADLESS', 'NOT SET')}'", flush=True) print(f"[Config] HEADLESS_MODE: {HEADLESS_MODE}", flush=True) # Node.js API 配置 NODEJS_API_BASE_URL = os.environ.get('NODEJS_API_URL', 'http://localhost:3000') INTERNAL_API_KEY = os.environ.get('INTERNAL_API_KEY', 'internal-api-key-default') print(f"[API Config] Node.js API: {NODEJS_API_BASE_URL}", flush=True) def call_nodejs_api(method: str, endpoint: str, data: dict = None, params: dict = None) -> dict: """调用 Node.js 内部 API""" url = f"{NODEJS_API_BASE_URL}/api/internal{endpoint}" headers = { 'Content-Type': 'application/json', 'X-Internal-API-Key': INTERNAL_API_KEY, } try: if method.upper() == 'GET': response = requests.get(url, headers=headers, params=params, timeout=30) elif method.upper() == 'POST': response = requests.post(url, headers=headers, json=data, timeout=30) else: raise ValueError(f"Unsupported HTTP method: {method}") response.raise_for_status() return response.json() except requests.exceptions.RequestException as e: print(f"[API Error] 调用 Node.js API 失败: {e}", flush=True) raise # ==================== 签名相关(小红书专用) ==================== @app.route("/sign", methods=["POST"]) def sign_endpoint(): """小红书签名接口""" try: from platforms.xiaohongshu import XiaohongshuPublisher data = request.json publisher = XiaohongshuPublisher(headless=True) result = asyncio.run(publisher.get_sign( data.get("uri", ""), data.get("data"), data.get("a1", ""), data.get("web_session", "") )) return jsonify(result) except Exception as e: traceback.print_exc() return jsonify({"error": str(e)}), 500 # ==================== 统一发布接口 ==================== @app.route("/publish", methods=["POST"]) def publish_video(): """ 统一发布接口 请求体: { "platform": "douyin", # douyin | xiaohongshu | weixin | kuaishou "cookie": "cookie字符串或JSON", "title": "视频标题", "description": "视频描述(可选)", "video_path": "视频文件绝对路径", "cover_path": "封面图片绝对路径(可选)", "tags": ["话题1", "话题2"], "post_time": "定时发布时间(可选,格式:2024-01-20 12:00:00)", "location": "位置(可选,默认:重庆市)" } 响应: { "success": true, "platform": "douyin", "video_id": "xxx", "video_url": "xxx", "message": "发布成功" } """ try: data = request.json # 获取参数 platform = data.get("platform", "").lower() cookie_str = data.get("cookie", "") title = data.get("title", "") description = data.get("description", "") video_path = data.get("video_path", "") cover_path = data.get("cover_path") tags = data.get("tags", []) post_time = data.get("post_time") location = data.get("location", "重庆市") # 调试日志 print(f"[Publish] 收到请求: platform={platform}, title={title}, video_path={video_path}") # 参数验证 if not platform: print("[Publish] 错误: 缺少 platform 参数") return jsonify({"success": False, "error": "缺少 platform 参数"}), 400 if platform not in PLATFORM_MAP: print(f"[Publish] 错误: 不支持的平台 {platform}") return jsonify({ "success": False, "error": f"不支持的平台: {platform},支持: {list(PLATFORM_MAP.keys())}" }), 400 if not cookie_str: print("[Publish] 错误: 缺少 cookie 参数") return jsonify({"success": False, "error": "缺少 cookie 参数"}), 400 if not title: print("[Publish] 错误: 缺少 title 参数") return jsonify({"success": False, "error": "缺少 title 参数"}), 400 if not video_path: print("[Publish] 错误: 缺少 video_path 参数") return jsonify({"success": False, "error": "缺少 video_path 参数"}), 400 # 视频文件验证(增加详细信息) if not os.path.exists(video_path): print(f"[Publish] 错误: 视频文件不存在: {video_path}") return jsonify({"success": False, "error": f"视频文件不存在: {video_path}"}), 400 if not os.path.isfile(video_path): print(f"[Publish] 错误: 路径不是文件: {video_path}") return jsonify({"success": False, "error": f"路径不是文件: {video_path}"}), 400 # 解析发布时间 publish_date = parse_datetime(post_time) if post_time else None # 创建发布参数 params = PublishParams( title=title, video_path=video_path, description=description, cover_path=cover_path, tags=tags, publish_date=publish_date, location=location ) print("=" * 60) print(f"[Publish] 平台: {platform}") print(f"[Publish] 标题: {title}") print(f"[Publish] 视频: {video_path}") print(f"[Publish] 封面: {cover_path}") print(f"[Publish] 话题: {tags}") print(f"[Publish] 定时: {publish_date}") print("=" * 60) # 获取对应平台的发布器 PublisherClass = get_publisher(platform) publisher = PublisherClass(headless=HEADLESS_MODE) # 执行发布 result = asyncio.run(publisher.run(cookie_str, params)) response_data = { "success": result.success, "platform": result.platform, "video_id": result.video_id, "video_url": result.video_url, "message": result.message, "error": result.error, "need_captcha": result.need_captcha, "captcha_type": result.captcha_type, "screenshot_base64": result.screenshot_base64, "page_url": result.page_url, "status": result.status } # 如果需要验证码,打印明确的日志 if result.need_captcha: print(f"[Publish] 需要验证码: type={result.captcha_type}") return jsonify(response_data) except Exception as e: traceback.print_exc() return jsonify({"success": False, "error": str(e)}), 500 # ==================== AI 辅助发布接口 ==================== # 存储活跃的发布会话 active_publish_sessions = {} @app.route("/publish/ai-assisted", methods=["POST"]) def publish_ai_assisted(): """ AI 辅助发布接口 与普通发布接口的区别: 1. 发布过程中会返回截图供 AI 分析 2. 如果检测到需要验证码,返回截图和状态,等待外部处理 3. 支持继续发布(输入验证码后) 请求体: { "platform": "douyin", "cookie": "cookie字符串", "title": "视频标题", "video_path": "视频文件路径", ... "return_screenshot": true // 是否返回截图 } 响应: { "success": true/false, "status": "success|failed|need_captcha|processing", "screenshot_base64": "...", // 当前页面截图 "page_url": "...", ... } """ # 立即打印请求日志,确保能看到 print("\n" + "!" * 60, flush=True) print("!!! [AI-Assisted Publish] 收到请求 !!!", flush=True) print("!" * 60 + "\n", flush=True) try: data = request.json print(f"[AI-Assisted Publish] 请求数据: platform={data.get('platform')}, title={data.get('title')}", flush=True) # 获取参数 platform = data.get("platform", "").lower() cookie_str = data.get("cookie", "") title = data.get("title", "") description = data.get("description", "") video_path = data.get("video_path", "") cover_path = data.get("cover_path") tags = data.get("tags", []) post_time = data.get("post_time") location = data.get("location", "重庆市") return_screenshot = data.get("return_screenshot", True) # 支持请求级别的 headless 参数,用于验证码场景下的有头浏览器模式 headless = data.get("headless", HEADLESS_MODE) if isinstance(headless, str): headless = headless.lower() == 'true' # 参数验证 if not platform: return jsonify({"success": False, "error": "缺少 platform 参数"}), 400 if platform not in PLATFORM_MAP: return jsonify({"success": False, "error": f"不支持的平台: {platform}"}), 400 if not cookie_str: return jsonify({"success": False, "error": "缺少 cookie 参数"}), 400 if not title: return jsonify({"success": False, "error": "缺少 title 参数"}), 400 if not video_path or not os.path.exists(video_path): return jsonify({"success": False, "error": f"视频文件不存在: {video_path}"}), 400 # 解析发布时间 publish_date = parse_datetime(post_time) if post_time else None # 创建发布参数 params = PublishParams( title=title, video_path=video_path, description=description, cover_path=cover_path, tags=tags, publish_date=publish_date, location=location ) print("=" * 60) print(f"[AI Publish] 平台: {platform}") print(f"[AI Publish] 标题: {title}") print(f"[AI Publish] 视频: {video_path}") print(f"[AI Publish] Headless: {headless}") print("=" * 60) # 获取对应平台的发布器 PublisherClass = get_publisher(platform) publisher = PublisherClass(headless=headless) # 使用请求参数中的 headless 值 # 执行发布 result = asyncio.run(publisher.run(cookie_str, params)) response_data = { "success": result.success, "platform": result.platform, "video_id": result.video_id, "video_url": result.video_url, "message": result.message, "error": result.error, "need_captcha": result.need_captcha, "captcha_type": result.captcha_type, "status": result.status or ("success" if result.success else "failed"), "page_url": result.page_url } # 如果请求返回截图 if return_screenshot and result.screenshot_base64: response_data["screenshot_base64"] = result.screenshot_base64 return jsonify(response_data) except Exception as e: traceback.print_exc() return jsonify({"success": False, "error": str(e), "status": "error"}), 500 # ==================== 批量发布接口 ==================== @app.route("/publish/batch", methods=["POST"]) def publish_batch(): """ 批量发布接口 - 发布到多个平台 请求体: { "platforms": ["douyin", "xiaohongshu"], "cookies": { "douyin": "cookie字符串", "xiaohongshu": "cookie字符串" }, "title": "视频标题", "video_path": "视频文件绝对路径", ... } """ try: data = request.json platforms = data.get("platforms", []) cookies = data.get("cookies", {}) if not platforms: return jsonify({"success": False, "error": "缺少 platforms 参数"}), 400 results = [] for platform in platforms: platform = platform.lower() cookie_str = cookies.get(platform, "") if not cookie_str: results.append({ "platform": platform, "success": False, "error": f"缺少 {platform} 的 cookie" }) continue try: # 创建参数 params = PublishParams( title=data.get("title", ""), video_path=data.get("video_path", ""), description=data.get("description", ""), cover_path=data.get("cover_path"), tags=data.get("tags", []), publish_date=parse_datetime(data.get("post_time")), location=data.get("location", "重庆市") ) # 发布 PublisherClass = get_publisher(platform) publisher = PublisherClass(headless=HEADLESS_MODE) result = asyncio.run(publisher.run(cookie_str, params)) results.append({ "platform": result.platform, "success": result.success, "video_id": result.video_id, "message": result.message, "error": result.error }) except Exception as e: results.append({ "platform": platform, "success": False, "error": str(e) }) # 统计成功/失败数量 success_count = sum(1 for r in results if r.get("success")) return jsonify({ "success": success_count > 0, "total": len(platforms), "success_count": success_count, "fail_count": len(platforms) - success_count, "results": results }) except Exception as e: traceback.print_exc() return jsonify({"success": False, "error": str(e)}), 500 # ==================== Cookie 验证接口 ==================== @app.route("/check_cookie", methods=["POST"]) def check_cookie(): """检查 cookie 是否有效""" try: data = request.json platform = data.get("platform", "").lower() cookie_str = data.get("cookie", "") if not cookie_str: return jsonify({"valid": False, "error": "缺少 cookie 参数"}), 400 # 目前只支持小红书的 cookie 验证 if platform == "xiaohongshu": try: from platforms.xiaohongshu import XiaohongshuPublisher, XHS_SDK_AVAILABLE if XHS_SDK_AVAILABLE: from xhs import XhsClient publisher = XiaohongshuPublisher() xhs_client = XhsClient(cookie_str, sign=publisher.sign_sync) info = xhs_client.get_self_info() if info: return jsonify({ "valid": True, "user_info": { "user_id": info.get("user_id"), "nickname": info.get("nickname"), "avatar": info.get("images") } }) except Exception as e: return jsonify({"valid": False, "error": str(e)}) # 其他平台返回格式正确但未验证 return jsonify({ "valid": True, "message": "Cookie 格式正确,但未进行在线验证" }) except Exception as e: traceback.print_exc() return jsonify({"valid": False, "error": str(e)}) # ==================== 获取作品列表接口 ==================== @app.route("/works", methods=["POST"]) def get_works(): """ 获取作品列表 请求体: { "platform": "douyin", # douyin | xiaohongshu | kuaishou "cookie": "cookie字符串或JSON", "page": 0, # 页码(从0开始,可选,默认0) "page_size": 20 # 每页数量(可选,默认20) } 响应: { "success": true, "platform": "douyin", "works": [...], "total": 100, "has_more": true } """ try: data = request.json platform = data.get("platform", "").lower() cookie_str = data.get("cookie", "") page = data.get("page", 0) page_size = data.get("page_size", 20) print(f"[Works] 收到请求: platform={platform}, page={page}, page_size={page_size}") if not platform: return jsonify({"success": False, "error": "缺少 platform 参数"}), 400 if platform not in PLATFORM_MAP: return jsonify({ "success": False, "error": f"不支持的平台: {platform},支持: {list(PLATFORM_MAP.keys())}" }), 400 if not cookie_str: return jsonify({"success": False, "error": "缺少 cookie 参数"}), 400 # 获取对应平台的发布器 PublisherClass = get_publisher(platform) publisher = PublisherClass(headless=HEADLESS_MODE) # 执行获取作品 result = asyncio.run(publisher.run_get_works(cookie_str, page, page_size)) return jsonify(result.to_dict()) except Exception as e: traceback.print_exc() return jsonify({"success": False, "error": str(e)}), 500 # ==================== 保存作品日统计数据接口 ==================== @app.route("/work_day_statistics", methods=["POST"]) def save_work_day_statistics(): """ 保存作品每日统计数据 当天的数据走更新流,日期变化走新增流 请求体: { "statistics": [ { "work_id": 1, "fans_count": 1000, "play_count": 5000, "like_count": 200, "comment_count": 50, "share_count": 30, "collect_count": 100 }, ... ] } 响应: { "success": true, "inserted": 5, "updated": 3, "message": "保存成功" } """ print("=" * 60, flush=True) print("[DEBUG] ===== 进入 save_work_day_statistics 方法 =====", flush=True) print(f"[DEBUG] 请求方法: {request.method}", flush=True) print(f"[DEBUG] 请求数据: {request.json}", flush=True) print("=" * 60, flush=True) try: data = request.json statistics_list = data.get("statistics", []) if not statistics_list: return jsonify({"success": False, "error": "缺少 statistics 参数"}), 400 print(f"[WorkDayStatistics] 收到请求: {len(statistics_list)} 条统计数据") # 调用 Node.js API 保存数据 result = call_nodejs_api('POST', '/work-day-statistics', { 'statistics': statistics_list }) print(f"[WorkDayStatistics] 完成: 新增 {result.get('inserted', 0)} 条, 更新 {result.get('updated', 0)} 条") return jsonify(result) except Exception as e: traceback.print_exc() return jsonify({"success": False, "error": str(e)}), 500 @app.route("/work_day_statistics/trend", methods=["GET"]) def get_statistics_trend(): """ 获取数据趋势(用于 Dashboard 数据看板 和 数据分析页面) 查询参数: user_id: 用户ID (必填) days: 天数 (可选,默认7天,最大30天) - 与 start_date/end_date 二选一 start_date: 开始日期 (可选,格式 YYYY-MM-DD) end_date: 结束日期 (可选,格式 YYYY-MM-DD) account_id: 账号ID (可选,不填则查询所有账号) 响应: { "success": true, "data": { "dates": ["01-16", "01-17", "01-18", ...], "fans": [100, 120, 130, ...], "views": [1000, 1200, 1500, ...], "likes": [50, 60, 70, ...], "comments": [10, 12, 15, ...], "shares": [5, 6, 8, ...], "collects": [20, 25, 30, ...] } } """ try: user_id = request.args.get("user_id") days = request.args.get("days") start_date = request.args.get("start_date") end_date = request.args.get("end_date") account_id = request.args.get("account_id") if not user_id: return jsonify({"success": False, "error": "缺少 user_id 参数"}), 400 # 调用 Node.js API 获取数据 params = {"user_id": user_id} if days: params["days"] = days if start_date: params["start_date"] = start_date if end_date: params["end_date"] = end_date if account_id: params["account_id"] = account_id result = call_nodejs_api('GET', '/work-day-statistics/trend', params=params) return jsonify(result) except Exception as e: traceback.print_exc() return jsonify({"success": False, "error": str(e)}), 500 @app.route("/work_day_statistics/platforms", methods=["GET"]) def get_statistics_by_platform(): """ 按平台分组获取统计数据(用于数据分析页面的平台对比) 数据来源: - 粉丝数:从 platform_accounts 表获取(账号级别数据) - 播放量/点赞/评论/收藏:从 work_day_statistics 表按平台汇总 - 粉丝增量:通过比较区间内最早和最新的粉丝数计算 查询参数: user_id: 用户ID (必填) days: 天数 (可选,默认30天,最大30天) - 与 start_date/end_date 二选一 start_date: 开始日期 (可选,格式 YYYY-MM-DD) end_date: 结束日期 (可选,格式 YYYY-MM-DD) 响应: { "success": true, "data": [ { "platform": "douyin", "fansCount": 1000, "fansIncrease": 50, "viewsCount": 5000, "likesCount": 200, "commentsCount": 30, "collectsCount": 100 }, ... ] } """ try: user_id = request.args.get("user_id") days = request.args.get("days") start_date = request.args.get("start_date") end_date = request.args.get("end_date") if not user_id: return jsonify({"success": False, "error": "缺少 user_id 参数"}), 400 # 调用 Node.js API 获取数据 params = {"user_id": user_id} if days: params["days"] = days if start_date: params["start_date"] = start_date if end_date: params["end_date"] = end_date result = call_nodejs_api('GET', '/work-day-statistics/platforms', params=params) print(f"[PlatformStats] 返回 {len(result.get('data', []))} 个平台的数据") return jsonify(result) except Exception as e: traceback.print_exc() return jsonify({"success": False, "error": str(e)}), 500 @app.route("/work_day_statistics/batch", methods=["POST"]) def get_work_statistics_history(): """ 批量获取作品的历史统计数据 请求体: { "work_ids": [1, 2, 3], "start_date": "2025-01-01", # 可选 "end_date": "2025-01-21" # 可选 } 响应: { "success": true, "data": { "1": [ {"record_date": "2025-01-20", "play_count": 100, ...}, {"record_date": "2025-01-21", "play_count": 150, ...} ], ... } } """ try: data = request.json work_ids = data.get("work_ids", []) start_date = data.get("start_date") end_date = data.get("end_date") if not work_ids: return jsonify({"success": False, "error": "缺少 work_ids 参数"}), 400 # 调用 Node.js API 获取数据 request_data = {"work_ids": work_ids} if start_date: request_data["start_date"] = start_date if end_date: request_data["end_date"] = end_date result = call_nodejs_api('POST', '/work-day-statistics/batch', data=request_data) return jsonify(result) except Exception as e: traceback.print_exc() return jsonify({"success": False, "error": str(e)}), 500 # ==================== 获取评论列表接口 ==================== @app.route("/comments", methods=["POST"]) def get_comments(): """ 获取作品评论 请求体: { "platform": "douyin", # douyin | xiaohongshu | kuaishou "cookie": "cookie字符串或JSON", "work_id": "作品ID", "cursor": "" # 分页游标(可选) } 响应: { "success": true, "platform": "douyin", "work_id": "xxx", "comments": [...], "total": 50, "has_more": true, "cursor": "xxx" } """ try: data = request.json platform = data.get("platform", "").lower() cookie_str = data.get("cookie", "") work_id = data.get("work_id", "") cursor = data.get("cursor", "") print(f"[Comments] 收到请求: platform={platform}, work_id={work_id}") if not platform: return jsonify({"success": False, "error": "缺少 platform 参数"}), 400 if platform not in PLATFORM_MAP: return jsonify({ "success": False, "error": f"不支持的平台: {platform},支持: {list(PLATFORM_MAP.keys())}" }), 400 if not cookie_str: return jsonify({"success": False, "error": "缺少 cookie 参数"}), 400 if not work_id: return jsonify({"success": False, "error": "缺少 work_id 参数"}), 400 # 获取对应平台的发布器 PublisherClass = get_publisher(platform) publisher = PublisherClass(headless=HEADLESS_MODE) # 执行获取评论 result = asyncio.run(publisher.run_get_comments(cookie_str, work_id, cursor)) result_dict = result.to_dict() # 添加 cursor 到响应 if hasattr(result, '__dict__') and 'cursor' in result.__dict__: result_dict['cursor'] = result.__dict__['cursor'] return jsonify(result_dict) except Exception as e: traceback.print_exc() return jsonify({"success": False, "error": str(e)}), 500 # ==================== 获取所有作品评论接口 ==================== @app.route("/all_comments", methods=["POST"]) def get_all_comments(): """ 获取所有作品的评论(一次性获取) 请求体: { "platform": "douyin", # douyin | xiaohongshu "cookie": "cookie字符串或JSON" } 响应: { "success": true, "platform": "douyin", "work_comments": [ { "work_id": "xxx", "title": "作品标题", "cover_url": "封面URL", "comments": [...] } ], "total": 5 } """ try: data = request.json platform = data.get("platform", "").lower() cookie_str = data.get("cookie", "") print(f"[AllComments] 收到请求: platform={platform}") if not platform: return jsonify({"success": False, "error": "缺少 platform 参数"}), 400 if platform not in ['douyin', 'xiaohongshu']: return jsonify({ "success": False, "error": f"该接口只支持 douyin 和 xiaohongshu 平台" }), 400 if not cookie_str: return jsonify({"success": False, "error": "缺少 cookie 参数"}), 400 # 获取对应平台的发布器 PublisherClass = get_publisher(platform) publisher = PublisherClass(headless=HEADLESS_MODE) # 执行获取所有评论 result = asyncio.run(publisher.get_all_comments(cookie_str)) return jsonify(result) except Exception as e: traceback.print_exc() return jsonify({"success": False, "error": str(e)}), 500 # ==================== 登录状态检查接口 ==================== @app.route("/check_login", methods=["POST"]) def check_login(): """ 检查 Cookie 登录状态(通过浏览器访问后台页面检测) 请求体: { "platform": "douyin", # douyin | xiaohongshu | kuaishou | weixin "cookie": "cookie字符串或JSON" } 响应: { "success": true, "valid": true, # Cookie 是否有效 "need_login": false, # 是否需要重新登录 "message": "登录状态有效" } """ try: data = request.json platform = data.get("platform", "").lower() cookie_str = data.get("cookie", "") print(f"[CheckLogin] 收到请求: platform={platform}") if not platform: return jsonify({"success": False, "error": "缺少 platform 参数"}), 400 if platform not in PLATFORM_MAP: return jsonify({ "success": False, "error": f"不支持的平台: {platform},支持: {list(PLATFORM_MAP.keys())}" }), 400 if not cookie_str: return jsonify({"success": False, "error": "缺少 cookie 参数"}), 400 # 获取对应平台的发布器 PublisherClass = get_publisher(platform) publisher = PublisherClass(headless=HEADLESS_MODE) # 执行登录检查 result = asyncio.run(publisher.check_login_status(cookie_str)) return jsonify(result) except Exception as e: traceback.print_exc() return jsonify({ "success": False, "valid": False, "need_login": True, "error": str(e) }), 500 # ==================== 获取账号信息接口 ==================== @app.route("/account_info", methods=["POST"]) def get_account_info(): """ 获取账号信息 请求体: { "platform": "baijiahao", # 平台 "cookie": "cookie字符串或JSON" } 响应: { "success": true, "account_id": "xxx", "account_name": "用户名", "avatar_url": "头像URL", "fans_count": 0, "works_count": 0 } """ try: data = request.json platform = data.get("platform", "").lower() cookie_str = data.get("cookie", "") print(f"[AccountInfo] 收到请求: platform={platform}") if not platform: return jsonify({"success": False, "error": "缺少 platform 参数"}), 400 if platform not in PLATFORM_MAP: return jsonify({ "success": False, "error": f"不支持的平台: {platform},支持: {list(PLATFORM_MAP.keys())}" }), 400 if not cookie_str: return jsonify({"success": False, "error": "缺少 cookie 参数"}), 400 # 获取对应平台的发布器 PublisherClass = get_publisher(platform) publisher = PublisherClass(headless=HEADLESS_MODE) # 检查是否有 get_account_info 方法 if hasattr(publisher, 'get_account_info'): result = asyncio.run(publisher.get_account_info(cookie_str)) return jsonify(result) else: return jsonify({ "success": False, "error": f"平台 {platform} 不支持获取账号信息" }), 400 except Exception as e: traceback.print_exc() return jsonify({"success": False, "error": str(e)}), 500 # ==================== 健康检查 ==================== @app.route("/health", methods=["GET"]) def health_check(): """健康检查""" # 检查 xhs SDK 是否可用 xhs_available = False try: from platforms.xiaohongshu import XHS_SDK_AVAILABLE xhs_available = XHS_SDK_AVAILABLE except: pass return jsonify({ "status": "ok", "xhs_sdk": xhs_available, "supported_platforms": list(PLATFORM_MAP.keys()), "headless_mode": HEADLESS_MODE }) @app.route("/", methods=["GET"]) def index(): """首页""" return jsonify({ "name": "多平台视频发布服务", "version": "1.2.0", "endpoints": { "GET /": "服务信息", "GET /health": "健康检查", "POST /publish": "发布视频", "POST /publish/batch": "批量发布", "POST /works": "获取作品列表", "POST /comments": "获取作品评论", "POST /all_comments": "获取所有作品评论", "POST /work_day_statistics": "保存作品每日统计数据", "POST /work_day_statistics/batch": "获取作品历史统计数据", "POST /check_cookie": "检查 Cookie", "POST /sign": "小红书签名" }, "supported_platforms": list(PLATFORM_MAP.keys()) }) # ==================== 命令行启动 ==================== def main(): parser = argparse.ArgumentParser(description='多平台视频发布服务') parser.add_argument('--port', type=int, default=5005, help='服务端口 (默认: 5005)') parser.add_argument('--host', type=str, default='0.0.0.0', help='监听地址 (默认: 0.0.0.0)') parser.add_argument('--headless', type=str, default='true', help='是否无头模式 (默认: true)') parser.add_argument('--debug', action='store_true', help='调试模式') args = parser.parse_args() global HEADLESS_MODE HEADLESS_MODE = args.headless.lower() == 'true' # 检查 xhs SDK xhs_status = "未安装" try: from platforms.xiaohongshu import XHS_SDK_AVAILABLE xhs_status = "已安装" if XHS_SDK_AVAILABLE else "未安装" except: pass print("=" * 60) print("多平台视频发布服务") print("=" * 60) print(f"XHS SDK: {xhs_status}") print(f"Headless 模式: {HEADLESS_MODE}") print(f"支持平台: {', '.join(PLATFORM_MAP.keys())}") print("=" * 60) print(f"启动服务: http://{args.host}:{args.port}") print("=" * 60) # 启用 debug 模式以获取详细日志,使用 use_reloader=False 避免重复启动 app.run(host=args.host, port=args.port, debug=True, threaded=True, use_reloader=False) if __name__ == '__main__': main()