#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 多平台视频发布服务 - 统一入口 支持平台: 抖音、小红书、视频号、快手 参考项目: matrix (https://github.com/kebenxiaoming/matrix) 使用方式: python app.py # 启动 HTTP 服务 (端口 5005) python app.py --port 8080 # 指定端口 python app.py --headless false # 显示浏览器窗口 """ import asyncio import os import sys import argparse import traceback from datetime import datetime from pathlib import Path # 确保当前目录在 Python 路径中 CURRENT_DIR = Path(__file__).parent.resolve() if str(CURRENT_DIR) not in sys.path: sys.path.insert(0, str(CURRENT_DIR)) # 从 server/.env 文件加载环境变量 def load_env_file(): """从 server/.env 文件加载环境变量""" env_path = CURRENT_DIR.parent / '.env' if env_path.exists(): print(f"[Config] Loading env from: {env_path}") with open(env_path, 'r', encoding='utf-8') as f: for line in f: line = line.strip() if line and not line.startswith('#') and '=' in line: key, value = line.split('=', 1) key = key.strip() value = value.strip() # 移除引号 if value.startswith('"') and value.endswith('"'): value = value[1:-1] elif value.startswith("'") and value.endswith("'"): value = value[1:-1] # 只在环境变量未设置时加载 if key not in os.environ: os.environ[key] = value print(f"[Config] Loaded: {key}=***" if 'PASSWORD' in key or 'SECRET' in key else f"[Config] Loaded: {key}={value}") else: print(f"[Config] .env file not found: {env_path}") # 加载环境变量 load_env_file() from flask import Flask, request, jsonify from flask_cors import CORS from platforms import get_publisher, PLATFORM_MAP from platforms.base import PublishParams def parse_datetime(date_str: str): """解析日期时间字符串""" if not date_str: return None formats = [ "%Y-%m-%d %H:%M:%S", "%Y-%m-%d %H:%M", "%Y/%m/%d %H:%M:%S", "%Y/%m/%d %H:%M", "%Y-%m-%dT%H:%M:%S", "%Y-%m-%dT%H:%M:%SZ", ] for fmt in formats: try: return datetime.strptime(date_str, fmt) except ValueError: continue return None def validate_video_file(video_path: str) -> bool: """验证视频文件是否有效""" if not video_path: return False if not os.path.exists(video_path): return False if not os.path.isfile(video_path): return False valid_extensions = ['.mp4', '.mov', '.avi', '.mkv', '.flv', '.wmv', '.webm'] ext = os.path.splitext(video_path)[1].lower() if ext not in valid_extensions: return False if os.path.getsize(video_path) < 1024: return False return True # 创建 Flask 应用 app = Flask(__name__) CORS(app) # 全局配置 HEADLESS_MODE = os.environ.get('HEADLESS', 'true').lower() == 'true' print(f"[Config] HEADLESS env value: '{os.environ.get('HEADLESS', 'NOT SET')}'", flush=True) print(f"[Config] HEADLESS_MODE: {HEADLESS_MODE}", flush=True) # ==================== 签名相关(小红书专用) ==================== @app.route("/sign", methods=["POST"]) def sign_endpoint(): """小红书签名接口""" try: from platforms.xiaohongshu import XiaohongshuPublisher data = request.json publisher = XiaohongshuPublisher(headless=True) result = asyncio.run(publisher.get_sign( data.get("uri", ""), data.get("data"), data.get("a1", ""), data.get("web_session", "") )) return jsonify(result) except Exception as e: traceback.print_exc() return jsonify({"error": str(e)}), 500 # ==================== 统一发布接口 ==================== @app.route("/publish", methods=["POST"]) def publish_video(): """ 统一发布接口 请求体: { "platform": "douyin", # douyin | xiaohongshu | weixin | kuaishou "cookie": "cookie字符串或JSON", "title": "视频标题", "description": "视频描述(可选)", "video_path": "视频文件绝对路径", "cover_path": "封面图片绝对路径(可选)", "tags": ["话题1", "话题2"], "post_time": "定时发布时间(可选,格式:2024-01-20 12:00:00)", "location": "位置(可选,默认:重庆市)" } 响应: { "success": true, "platform": "douyin", "video_id": "xxx", "video_url": "xxx", "message": "发布成功" } """ try: data = request.json # 获取参数 platform = data.get("platform", "").lower() cookie_str = data.get("cookie", "") title = data.get("title", "") description = data.get("description", "") video_path = data.get("video_path", "") cover_path = data.get("cover_path") tags = data.get("tags", []) post_time = data.get("post_time") location = data.get("location", "重庆市") # 调试日志 print(f"[Publish] 收到请求: platform={platform}, title={title}, video_path={video_path}") # 参数验证 if not platform: print("[Publish] 错误: 缺少 platform 参数") return jsonify({"success": False, "error": "缺少 platform 参数"}), 400 if platform not in PLATFORM_MAP: print(f"[Publish] 错误: 不支持的平台 {platform}") return jsonify({ "success": False, "error": f"不支持的平台: {platform},支持: {list(PLATFORM_MAP.keys())}" }), 400 if not cookie_str: print("[Publish] 错误: 缺少 cookie 参数") return jsonify({"success": False, "error": "缺少 cookie 参数"}), 400 if not title: print("[Publish] 错误: 缺少 title 参数") return jsonify({"success": False, "error": "缺少 title 参数"}), 400 if not video_path: print("[Publish] 错误: 缺少 video_path 参数") return jsonify({"success": False, "error": "缺少 video_path 参数"}), 400 # 视频文件验证(增加详细信息) if not os.path.exists(video_path): print(f"[Publish] 错误: 视频文件不存在: {video_path}") return jsonify({"success": False, "error": f"视频文件不存在: {video_path}"}), 400 if not os.path.isfile(video_path): print(f"[Publish] 错误: 路径不是文件: {video_path}") return jsonify({"success": False, "error": f"路径不是文件: {video_path}"}), 400 # 解析发布时间 publish_date = parse_datetime(post_time) if post_time else None # 创建发布参数 params = PublishParams( title=title, video_path=video_path, description=description, cover_path=cover_path, tags=tags, publish_date=publish_date, location=location ) print("=" * 60) print(f"[Publish] 平台: {platform}") print(f"[Publish] 标题: {title}") print(f"[Publish] 视频: {video_path}") print(f"[Publish] 封面: {cover_path}") print(f"[Publish] 话题: {tags}") print(f"[Publish] 定时: {publish_date}") print("=" * 60) # 获取对应平台的发布器 PublisherClass = get_publisher(platform) publisher = PublisherClass(headless=HEADLESS_MODE) # 执行发布 result = asyncio.run(publisher.run(cookie_str, params)) response_data = { "success": result.success, "platform": result.platform, "video_id": result.video_id, "video_url": result.video_url, "message": result.message, "error": result.error, "need_captcha": result.need_captcha, "captcha_type": result.captcha_type, "screenshot_base64": result.screenshot_base64, "page_url": result.page_url, "status": result.status } # 如果需要验证码,打印明确的日志 if result.need_captcha: print(f"[Publish] 需要验证码: type={result.captcha_type}") return jsonify(response_data) except Exception as e: traceback.print_exc() return jsonify({"success": False, "error": str(e)}), 500 # ==================== AI 辅助发布接口 ==================== # 存储活跃的发布会话 active_publish_sessions = {} @app.route("/publish/ai-assisted", methods=["POST"]) def publish_ai_assisted(): """ AI 辅助发布接口 与普通发布接口的区别: 1. 发布过程中会返回截图供 AI 分析 2. 如果检测到需要验证码,返回截图和状态,等待外部处理 3. 支持继续发布(输入验证码后) 请求体: { "platform": "douyin", "cookie": "cookie字符串", "title": "视频标题", "video_path": "视频文件路径", ... "return_screenshot": true // 是否返回截图 } 响应: { "success": true/false, "status": "success|failed|need_captcha|processing", "screenshot_base64": "...", // 当前页面截图 "page_url": "...", ... } """ try: data = request.json # 获取参数 platform = data.get("platform", "").lower() cookie_str = data.get("cookie", "") title = data.get("title", "") description = data.get("description", "") video_path = data.get("video_path", "") cover_path = data.get("cover_path") tags = data.get("tags", []) post_time = data.get("post_time") location = data.get("location", "重庆市") return_screenshot = data.get("return_screenshot", True) # 参数验证 if not platform: return jsonify({"success": False, "error": "缺少 platform 参数"}), 400 if platform not in PLATFORM_MAP: return jsonify({"success": False, "error": f"不支持的平台: {platform}"}), 400 if not cookie_str: return jsonify({"success": False, "error": "缺少 cookie 参数"}), 400 if not title: return jsonify({"success": False, "error": "缺少 title 参数"}), 400 if not video_path or not os.path.exists(video_path): return jsonify({"success": False, "error": f"视频文件不存在: {video_path}"}), 400 # 解析发布时间 publish_date = parse_datetime(post_time) if post_time else None # 创建发布参数 params = PublishParams( title=title, video_path=video_path, description=description, cover_path=cover_path, tags=tags, publish_date=publish_date, location=location ) print("=" * 60) print(f"[AI Publish] 平台: {platform}") print(f"[AI Publish] 标题: {title}") print(f"[AI Publish] 视频: {video_path}") print("=" * 60) # 获取对应平台的发布器 PublisherClass = get_publisher(platform) publisher = PublisherClass(headless=HEADLESS_MODE) # 执行发布 result = asyncio.run(publisher.run(cookie_str, params)) response_data = { "success": result.success, "platform": result.platform, "video_id": result.video_id, "video_url": result.video_url, "message": result.message, "error": result.error, "need_captcha": result.need_captcha, "captcha_type": result.captcha_type, "status": result.status or ("success" if result.success else "failed"), "page_url": result.page_url } # 如果请求返回截图 if return_screenshot and result.screenshot_base64: response_data["screenshot_base64"] = result.screenshot_base64 return jsonify(response_data) except Exception as e: traceback.print_exc() return jsonify({"success": False, "error": str(e), "status": "error"}), 500 # ==================== 批量发布接口 ==================== @app.route("/publish/batch", methods=["POST"]) def publish_batch(): """ 批量发布接口 - 发布到多个平台 请求体: { "platforms": ["douyin", "xiaohongshu"], "cookies": { "douyin": "cookie字符串", "xiaohongshu": "cookie字符串" }, "title": "视频标题", "video_path": "视频文件绝对路径", ... } """ try: data = request.json platforms = data.get("platforms", []) cookies = data.get("cookies", {}) if not platforms: return jsonify({"success": False, "error": "缺少 platforms 参数"}), 400 results = [] for platform in platforms: platform = platform.lower() cookie_str = cookies.get(platform, "") if not cookie_str: results.append({ "platform": platform, "success": False, "error": f"缺少 {platform} 的 cookie" }) continue try: # 创建参数 params = PublishParams( title=data.get("title", ""), video_path=data.get("video_path", ""), description=data.get("description", ""), cover_path=data.get("cover_path"), tags=data.get("tags", []), publish_date=parse_datetime(data.get("post_time")), location=data.get("location", "重庆市") ) # 发布 PublisherClass = get_publisher(platform) publisher = PublisherClass(headless=HEADLESS_MODE) result = asyncio.run(publisher.run(cookie_str, params)) results.append({ "platform": result.platform, "success": result.success, "video_id": result.video_id, "message": result.message, "error": result.error }) except Exception as e: results.append({ "platform": platform, "success": False, "error": str(e) }) # 统计成功/失败数量 success_count = sum(1 for r in results if r.get("success")) return jsonify({ "success": success_count > 0, "total": len(platforms), "success_count": success_count, "fail_count": len(platforms) - success_count, "results": results }) except Exception as e: traceback.print_exc() return jsonify({"success": False, "error": str(e)}), 500 # ==================== Cookie 验证接口 ==================== @app.route("/check_cookie", methods=["POST"]) def check_cookie(): """检查 cookie 是否有效""" try: data = request.json platform = data.get("platform", "").lower() cookie_str = data.get("cookie", "") if not cookie_str: return jsonify({"valid": False, "error": "缺少 cookie 参数"}), 400 # 目前只支持小红书的 cookie 验证 if platform == "xiaohongshu": try: from platforms.xiaohongshu import XiaohongshuPublisher, XHS_SDK_AVAILABLE if XHS_SDK_AVAILABLE: from xhs import XhsClient publisher = XiaohongshuPublisher() xhs_client = XhsClient(cookie_str, sign=publisher.sign_sync) info = xhs_client.get_self_info() if info: return jsonify({ "valid": True, "user_info": { "user_id": info.get("user_id"), "nickname": info.get("nickname"), "avatar": info.get("images") } }) except Exception as e: return jsonify({"valid": False, "error": str(e)}) # 其他平台返回格式正确但未验证 return jsonify({ "valid": True, "message": "Cookie 格式正确,但未进行在线验证" }) except Exception as e: traceback.print_exc() return jsonify({"valid": False, "error": str(e)}) # ==================== 获取作品列表接口 ==================== @app.route("/works", methods=["POST"]) def get_works(): """ 获取作品列表 请求体: { "platform": "douyin", # douyin | xiaohongshu | kuaishou "cookie": "cookie字符串或JSON", "page": 0, # 页码(从0开始,可选,默认0) "page_size": 20 # 每页数量(可选,默认20) } 响应: { "success": true, "platform": "douyin", "works": [...], "total": 100, "has_more": true } """ try: data = request.json platform = data.get("platform", "").lower() cookie_str = data.get("cookie", "") page = data.get("page", 0) page_size = data.get("page_size", 20) print(f"[Works] 收到请求: platform={platform}, page={page}, page_size={page_size}") if not platform: return jsonify({"success": False, "error": "缺少 platform 参数"}), 400 if platform not in PLATFORM_MAP: return jsonify({ "success": False, "error": f"不支持的平台: {platform},支持: {list(PLATFORM_MAP.keys())}" }), 400 if not cookie_str: return jsonify({"success": False, "error": "缺少 cookie 参数"}), 400 # 获取对应平台的发布器 PublisherClass = get_publisher(platform) publisher = PublisherClass(headless=HEADLESS_MODE) # 执行获取作品 result = asyncio.run(publisher.run_get_works(cookie_str, page, page_size)) return jsonify(result.to_dict()) except Exception as e: traceback.print_exc() return jsonify({"success": False, "error": str(e)}), 500 # ==================== 保存作品日统计数据接口 ==================== @app.route("/work_day_statistics", methods=["POST"]) def save_work_day_statistics(): """ 保存作品每日统计数据 当天的数据走更新流,日期变化走新增流 请求体: { "statistics": [ { "work_id": 1, "fans_count": 1000, "play_count": 5000, "like_count": 200, "comment_count": 50, "share_count": 30, "collect_count": 100 }, ... ] } 响应: { "success": true, "inserted": 5, "updated": 3, "message": "保存成功" } """ print("=" * 60, flush=True) print("[DEBUG] ===== 进入 save_work_day_statistics 方法 =====", flush=True) print(f"[DEBUG] 请求方法: {request.method}", flush=True) print(f"[DEBUG] 请求数据: {request.json}", flush=True) print("=" * 60, flush=True) try: data = request.json statistics_list = data.get("statistics", []) if not statistics_list: return jsonify({"success": False, "error": "缺少 statistics 参数"}), 400 today = date.today() inserted_count = 0 updated_count = 0 print(f"[WorkDayStatistics] 收到请求: {len(statistics_list)} 条统计数据") conn = get_db_connection() try: with conn.cursor() as cursor: for stat in statistics_list: work_id = stat.get("work_id") if not work_id: continue fans_count = stat.get("fans_count", 0) play_count = stat.get("play_count", 0) like_count = stat.get("like_count", 0) comment_count = stat.get("comment_count", 0) share_count = stat.get("share_count", 0) collect_count = stat.get("collect_count", 0) # 检查当天是否已有记录 cursor.execute( "SELECT id FROM work_day_statistics WHERE work_id = %s AND record_date = %s", (work_id, today) ) existing = cursor.fetchone() if existing: # 更新已有记录 cursor.execute( """UPDATE work_day_statistics SET fans_count = %s, play_count = %s, like_count = %s, comment_count = %s, share_count = %s, collect_count = %s, updated_at = NOW() WHERE id = %s""", (fans_count, play_count, like_count, comment_count, share_count, collect_count, existing['id']) ) updated_count += 1 else: # 插入新记录 cursor.execute( """INSERT INTO work_day_statistics (work_id, record_date, fans_count, play_count, like_count, comment_count, share_count, collect_count, created_at, updated_at) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, NOW(), NOW())""", (work_id, today, fans_count, play_count, like_count, comment_count, share_count, collect_count) ) inserted_count += 1 conn.commit() finally: conn.close() print(f"[WorkDayStatistics] 完成: 新增 {inserted_count} 条, 更新 {updated_count} 条") return jsonify({ "success": True, "inserted": inserted_count, "updated": updated_count, "message": f"保存成功: 新增 {inserted_count} 条, 更新 {updated_count} 条" }) except Exception as e: traceback.print_exc() return jsonify({"success": False, "error": str(e)}), 500 @app.route("/work_day_statistics/trend", methods=["GET"]) def get_statistics_trend(): """ 获取数据趋势(用于 Dashboard 数据看板) 查询参数: user_id: 用户ID (必填) days: 天数 (可选,默认7天,最大30天) account_id: 账号ID (可选,不填则查询所有账号) 响应: { "success": true, "data": { "dates": ["01-16", "01-17", "01-18", ...], "fans": [100, 120, 130, ...], "views": [1000, 1200, 1500, ...], "likes": [50, 60, 70, ...], "comments": [10, 12, 15, ...], "shares": [5, 6, 8, ...], "collects": [20, 25, 30, ...] } } """ try: user_id = request.args.get("user_id") days = min(int(request.args.get("days", 7)), 30) # 最大30天 account_id = request.args.get("account_id") if not user_id: return jsonify({"success": False, "error": "缺少 user_id 参数"}), 400 conn = get_db_connection() try: with conn.cursor() as cursor: # 构建查询:关联 works 表获取用户的作品,然后汇总统计数据 # 注意:粉丝数是账号级别的数据,每个账号每天只取一个值(使用 MAX) # 其他指标(播放、点赞等)是作品级别的数据,需要累加 sql = """ SELECT record_date, SUM(account_fans) as total_fans, SUM(account_views) as total_views, SUM(account_likes) as total_likes, SUM(account_comments) as total_comments, SUM(account_shares) as total_shares, SUM(account_collects) as total_collects FROM ( SELECT wds.record_date, w.accountId, MAX(wds.fans_count) as account_fans, SUM(wds.play_count) as account_views, SUM(wds.like_count) as account_likes, SUM(wds.comment_count) as account_comments, SUM(wds.share_count) as account_shares, SUM(wds.collect_count) as account_collects FROM work_day_statistics wds INNER JOIN works w ON wds.work_id = w.id WHERE w.userId = %s AND wds.record_date >= DATE_SUB(CURDATE(), INTERVAL %s DAY) """ params = [user_id, days] if account_id: sql += " AND w.accountId = %s" params.append(account_id) sql += """ GROUP BY wds.record_date, w.accountId ) as account_stats GROUP BY record_date ORDER BY record_date ASC """ cursor.execute(sql, params) results = cursor.fetchall() # 构建响应数据 dates = [] fans = [] views = [] likes = [] comments = [] shares = [] collects = [] for row in results: # 格式化日期为 "MM-DD" 格式 record_date = row['record_date'] if isinstance(record_date, str): dates.append(record_date[5:10]) # "2026-01-16" -> "01-16" else: dates.append(record_date.strftime("%m-%d")) # 确保返回整数类型 fans.append(int(row['total_fans'] or 0)) views.append(int(row['total_views'] or 0)) likes.append(int(row['total_likes'] or 0)) comments.append(int(row['total_comments'] or 0)) shares.append(int(row['total_shares'] or 0)) collects.append(int(row['total_collects'] or 0)) # 如果没有数据,生成空的日期范围 if not dates: from datetime import timedelta today = date.today() for i in range(days, 0, -1): d = today - timedelta(days=i-1) dates.append(d.strftime("%m-%d")) fans.append(0) views.append(0) likes.append(0) comments.append(0) shares.append(0) collects.append(0) return jsonify({ "success": True, "data": { "dates": dates, "fans": fans, "views": views, "likes": likes, "comments": comments, "shares": shares, "collects": collects } }) finally: conn.close() except Exception as e: traceback.print_exc() return jsonify({"success": False, "error": str(e)}), 500 @app.route("/work_day_statistics/platforms", methods=["GET"]) def get_statistics_by_platform(): """ 按平台分组获取统计数据(用于数据分析页面的平台对比) 数据来源: - 粉丝数:从 platform_accounts 表获取(账号级别数据) - 播放量/点赞/评论/收藏:从 work_day_statistics 表按平台汇总 - 粉丝增量:通过比较区间内最早和最新的粉丝数计算 查询参数: user_id: 用户ID (必填) days: 天数 (可选,默认30天,最大30天) 响应: { "success": true, "data": [ { "platform": "douyin", "fansCount": 1000, "fansIncrease": 50, "viewsCount": 5000, "likesCount": 200, "commentsCount": 30, "collectsCount": 100 }, ... ] } """ try: user_id = request.args.get("user_id") days = min(int(request.args.get("days", 30)), 30) if not user_id: return jsonify({"success": False, "error": "缺少 user_id 参数"}), 400 conn = get_db_connection() try: with conn.cursor() as cursor: # 简化查询:按平台分组获取统计数据 # 1. 从 platform_accounts 获取当前粉丝数(注意:字段名是下划线命名 fans_count, user_id) # 2. 从 work_day_statistics 获取播放量等累计数据 sql = """ SELECT pa.platform, pa.fans_count as current_fans, COALESCE(stats.total_views, 0) as viewsCount, COALESCE(stats.total_likes, 0) as likesCount, COALESCE(stats.total_comments, 0) as commentsCount, COALESCE(stats.total_collects, 0) as collectsCount, COALESCE(fans_change.earliest_fans, pa.fans_count) as earliest_fans FROM platform_accounts pa LEFT JOIN ( -- 获取区间内的累计数据(按账号汇总) SELECT w.accountId, SUM(wds.play_count) as total_views, SUM(wds.like_count) as total_likes, SUM(wds.comment_count) as total_comments, SUM(wds.collect_count) as total_collects FROM work_day_statistics wds INNER JOIN works w ON wds.work_id = w.id WHERE w.userId = %s AND wds.record_date >= DATE_SUB(CURDATE(), INTERVAL %s DAY) GROUP BY w.accountId ) stats ON pa.id = stats.accountId LEFT JOIN ( -- 获取区间内最早一天的粉丝数 SELECT w.accountId, MAX(wds.fans_count) as earliest_fans FROM work_day_statistics wds INNER JOIN works w ON wds.work_id = w.id WHERE w.userId = %s AND wds.record_date = ( SELECT MIN(wds2.record_date) FROM work_day_statistics wds2 INNER JOIN works w2 ON wds2.work_id = w2.id WHERE w2.userId = %s AND wds2.record_date >= DATE_SUB(CURDATE(), INTERVAL %s DAY) ) GROUP BY w.accountId ) fans_change ON pa.id = fans_change.accountId WHERE pa.user_id = %s ORDER BY current_fans DESC """ cursor.execute(sql, [user_id, days, user_id, user_id, days, user_id]) results = cursor.fetchall() # 构建响应数据 platform_data = [] for row in results: current_fans = int(row['current_fans'] or 0) earliest_fans = int(row['earliest_fans'] or current_fans) fans_increase = current_fans - earliest_fans platform_data.append({ "platform": row['platform'], "fansCount": current_fans, "fansIncrease": fans_increase, "viewsCount": int(row['viewsCount'] or 0), "likesCount": int(row['likesCount'] or 0), "commentsCount": int(row['commentsCount'] or 0), "collectsCount": int(row['collectsCount'] or 0), }) print(f"[PlatformStats] 返回 {len(platform_data)} 个平台的数据") return jsonify({ "success": True, "data": platform_data }) finally: conn.close() except Exception as e: traceback.print_exc() return jsonify({"success": False, "error": str(e)}), 500 @app.route("/work_day_statistics/batch", methods=["POST"]) def get_work_statistics_history(): """ 批量获取作品的历史统计数据 请求体: { "work_ids": [1, 2, 3], "start_date": "2025-01-01", # 可选 "end_date": "2025-01-21" # 可选 } 响应: { "success": true, "data": { "1": [ {"record_date": "2025-01-20", "play_count": 100, ...}, {"record_date": "2025-01-21", "play_count": 150, ...} ], ... } } """ try: data = request.json work_ids = data.get("work_ids", []) start_date = data.get("start_date") end_date = data.get("end_date") if not work_ids: return jsonify({"success": False, "error": "缺少 work_ids 参数"}), 400 conn = get_db_connection() try: with conn.cursor() as cursor: # 构建查询 placeholders = ', '.join(['%s'] * len(work_ids)) sql = f"""SELECT work_id, record_date, fans_count, play_count, like_count, comment_count, share_count, collect_count FROM work_day_statistics WHERE work_id IN ({placeholders})""" params = list(work_ids) if start_date: sql += " AND record_date >= %s" params.append(start_date) if end_date: sql += " AND record_date <= %s" params.append(end_date) sql += " ORDER BY work_id, record_date" cursor.execute(sql, params) results = cursor.fetchall() finally: conn.close() # 按 work_id 分组 grouped_data = {} for row in results: work_id = str(row['work_id']) if work_id not in grouped_data: grouped_data[work_id] = [] grouped_data[work_id].append({ 'record_date': row['record_date'].strftime('%Y-%m-%d') if row['record_date'] else None, 'fans_count': row['fans_count'], 'play_count': row['play_count'], 'like_count': row['like_count'], 'comment_count': row['comment_count'], 'share_count': row['share_count'], 'collect_count': row['collect_count'] }) return jsonify({ "success": True, "data": grouped_data }) except Exception as e: traceback.print_exc() return jsonify({"success": False, "error": str(e)}), 500 # ==================== 获取评论列表接口 ==================== @app.route("/comments", methods=["POST"]) def get_comments(): """ 获取作品评论 请求体: { "platform": "douyin", # douyin | xiaohongshu | kuaishou "cookie": "cookie字符串或JSON", "work_id": "作品ID", "cursor": "" # 分页游标(可选) } 响应: { "success": true, "platform": "douyin", "work_id": "xxx", "comments": [...], "total": 50, "has_more": true, "cursor": "xxx" } """ try: data = request.json platform = data.get("platform", "").lower() cookie_str = data.get("cookie", "") work_id = data.get("work_id", "") cursor = data.get("cursor", "") print(f"[Comments] 收到请求: platform={platform}, work_id={work_id}") if not platform: return jsonify({"success": False, "error": "缺少 platform 参数"}), 400 if platform not in PLATFORM_MAP: return jsonify({ "success": False, "error": f"不支持的平台: {platform},支持: {list(PLATFORM_MAP.keys())}" }), 400 if not cookie_str: return jsonify({"success": False, "error": "缺少 cookie 参数"}), 400 if not work_id: return jsonify({"success": False, "error": "缺少 work_id 参数"}), 400 # 获取对应平台的发布器 PublisherClass = get_publisher(platform) publisher = PublisherClass(headless=HEADLESS_MODE) # 执行获取评论 result = asyncio.run(publisher.run_get_comments(cookie_str, work_id, cursor)) result_dict = result.to_dict() # 添加 cursor 到响应 if hasattr(result, '__dict__') and 'cursor' in result.__dict__: result_dict['cursor'] = result.__dict__['cursor'] return jsonify(result_dict) except Exception as e: traceback.print_exc() return jsonify({"success": False, "error": str(e)}), 500 # ==================== 获取所有作品评论接口 ==================== @app.route("/all_comments", methods=["POST"]) def get_all_comments(): """ 获取所有作品的评论(一次性获取) 请求体: { "platform": "douyin", # douyin | xiaohongshu "cookie": "cookie字符串或JSON" } 响应: { "success": true, "platform": "douyin", "work_comments": [ { "work_id": "xxx", "title": "作品标题", "cover_url": "封面URL", "comments": [...] } ], "total": 5 } """ try: data = request.json platform = data.get("platform", "").lower() cookie_str = data.get("cookie", "") print(f"[AllComments] 收到请求: platform={platform}") if not platform: return jsonify({"success": False, "error": "缺少 platform 参数"}), 400 if platform not in ['douyin', 'xiaohongshu']: return jsonify({ "success": False, "error": f"该接口只支持 douyin 和 xiaohongshu 平台" }), 400 if not cookie_str: return jsonify({"success": False, "error": "缺少 cookie 参数"}), 400 # 获取对应平台的发布器 PublisherClass = get_publisher(platform) publisher = PublisherClass(headless=HEADLESS_MODE) # 执行获取所有评论 result = asyncio.run(publisher.get_all_comments(cookie_str)) return jsonify(result) except Exception as e: traceback.print_exc() return jsonify({"success": False, "error": str(e)}), 500 # ==================== 登录状态检查接口 ==================== @app.route("/check_login", methods=["POST"]) def check_login(): """ 检查 Cookie 登录状态(通过浏览器访问后台页面检测) 请求体: { "platform": "douyin", # douyin | xiaohongshu | kuaishou | weixin "cookie": "cookie字符串或JSON" } 响应: { "success": true, "valid": true, # Cookie 是否有效 "need_login": false, # 是否需要重新登录 "message": "登录状态有效" } """ try: data = request.json platform = data.get("platform", "").lower() cookie_str = data.get("cookie", "") print(f"[CheckLogin] 收到请求: platform={platform}") if not platform: return jsonify({"success": False, "error": "缺少 platform 参数"}), 400 if platform not in PLATFORM_MAP: return jsonify({ "success": False, "error": f"不支持的平台: {platform},支持: {list(PLATFORM_MAP.keys())}" }), 400 if not cookie_str: return jsonify({"success": False, "error": "缺少 cookie 参数"}), 400 # 获取对应平台的发布器 PublisherClass = get_publisher(platform) publisher = PublisherClass(headless=HEADLESS_MODE) # 执行登录检查 result = asyncio.run(publisher.check_login_status(cookie_str)) return jsonify(result) except Exception as e: traceback.print_exc() return jsonify({ "success": False, "valid": False, "need_login": True, "error": str(e) }), 500 # ==================== 获取账号信息接口 ==================== @app.route("/account_info", methods=["POST"]) def get_account_info(): """ 获取账号信息 请求体: { "platform": "baijiahao", # 平台 "cookie": "cookie字符串或JSON" } 响应: { "success": true, "account_id": "xxx", "account_name": "用户名", "avatar_url": "头像URL", "fans_count": 0, "works_count": 0 } """ try: data = request.json platform = data.get("platform", "").lower() cookie_str = data.get("cookie", "") print(f"[AccountInfo] 收到请求: platform={platform}") if not platform: return jsonify({"success": False, "error": "缺少 platform 参数"}), 400 if platform not in PLATFORM_MAP: return jsonify({ "success": False, "error": f"不支持的平台: {platform},支持: {list(PLATFORM_MAP.keys())}" }), 400 if not cookie_str: return jsonify({"success": False, "error": "缺少 cookie 参数"}), 400 # 获取对应平台的发布器 PublisherClass = get_publisher(platform) publisher = PublisherClass(headless=HEADLESS_MODE) # 检查是否有 get_account_info 方法 if hasattr(publisher, 'get_account_info'): result = asyncio.run(publisher.get_account_info(cookie_str)) return jsonify(result) else: return jsonify({ "success": False, "error": f"平台 {platform} 不支持获取账号信息" }), 400 except Exception as e: traceback.print_exc() return jsonify({"success": False, "error": str(e)}), 500 # ==================== 健康检查 ==================== @app.route("/health", methods=["GET"]) def health_check(): """健康检查""" # 检查 xhs SDK 是否可用 xhs_available = False try: from platforms.xiaohongshu import XHS_SDK_AVAILABLE xhs_available = XHS_SDK_AVAILABLE except: pass return jsonify({ "status": "ok", "xhs_sdk": xhs_available, "supported_platforms": list(PLATFORM_MAP.keys()), "headless_mode": HEADLESS_MODE }) @app.route("/", methods=["GET"]) def index(): """首页""" return jsonify({ "name": "多平台视频发布服务", "version": "1.2.0", "endpoints": { "GET /": "服务信息", "GET /health": "健康检查", "POST /publish": "发布视频", "POST /publish/batch": "批量发布", "POST /works": "获取作品列表", "POST /comments": "获取作品评论", "POST /all_comments": "获取所有作品评论", "POST /work_day_statistics": "保存作品每日统计数据", "POST /work_day_statistics/batch": "获取作品历史统计数据", "POST /check_cookie": "检查 Cookie", "POST /sign": "小红书签名" }, "supported_platforms": list(PLATFORM_MAP.keys()) }) # ==================== 命令行启动 ==================== def main(): parser = argparse.ArgumentParser(description='多平台视频发布服务') parser.add_argument('--port', type=int, default=5005, help='服务端口 (默认: 5005)') parser.add_argument('--host', type=str, default='0.0.0.0', help='监听地址 (默认: 0.0.0.0)') parser.add_argument('--headless', type=str, default='true', help='是否无头模式 (默认: true)') parser.add_argument('--debug', action='store_true', help='调试模式') args = parser.parse_args() global HEADLESS_MODE HEADLESS_MODE = args.headless.lower() == 'true' # 检查 xhs SDK xhs_status = "未安装" try: from platforms.xiaohongshu import XHS_SDK_AVAILABLE xhs_status = "已安装" if XHS_SDK_AVAILABLE else "未安装" except: pass print("=" * 60) print("多平台视频发布服务") print("=" * 60) print(f"XHS SDK: {xhs_status}") print(f"Headless 模式: {HEADLESS_MODE}") print(f"支持平台: {', '.join(PLATFORM_MAP.keys())}") print("=" * 60) print(f"启动服务: http://{args.host}:{args.port}") print("=" * 60) app.run(host=args.host, port=args.port, debug=args.debug, threaded=True) if __name__ == '__main__': main()