| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357 |
- #!/usr/bin/env python3
- # -*- coding: utf-8 -*-
- """
- 多平台视频发布服务 - 统一入口
- 支持平台: 抖音、小红书、视频号、快手
- 参考项目: matrix (https://github.com/kebenxiaoming/matrix)
- 使用方式:
- python app.py # 启动 HTTP 服务 (端口 5005)
- python app.py --port 8080 # 指定端口
- python app.py --headless false # 显示浏览器窗口
- """
- import asyncio
- import os
- import sys
- import argparse
- import traceback
- from datetime import datetime
- from pathlib import Path
- # 确保当前目录在 Python 路径中
- CURRENT_DIR = Path(__file__).parent.resolve()
- if str(CURRENT_DIR) not in sys.path:
- sys.path.insert(0, str(CURRENT_DIR))
- # 从 server/.env 文件加载环境变量
- def load_env_file():
- """从 server/.env 文件加载环境变量"""
- env_path = CURRENT_DIR.parent / '.env'
- if env_path.exists():
- print(f"[Config] Loading env from: {env_path}")
- with open(env_path, 'r', encoding='utf-8') as f:
- for line in f:
- line = line.strip()
- if line and not line.startswith('#') and '=' in line:
- key, value = line.split('=', 1)
- key = key.strip()
- value = value.strip()
- # 移除引号
- if value.startswith('"') and value.endswith('"'):
- value = value[1:-1]
- elif value.startswith("'") and value.endswith("'"):
- value = value[1:-1]
- # 只在环境变量未设置时加载
- if key not in os.environ:
- os.environ[key] = value
- print(f"[Config] Loaded: {key}=***" if 'PASSWORD' in key or 'SECRET' in key else f"[Config] Loaded: {key}={value}")
- else:
- print(f"[Config] .env file not found: {env_path}")
- # 加载环境变量
- load_env_file()
- from flask import Flask, request, jsonify
- from flask_cors import CORS
- from platforms import get_publisher, PLATFORM_MAP
- from platforms.base import PublishParams
- def parse_datetime(date_str: str):
- """解析日期时间字符串"""
- if not date_str:
- return None
-
- formats = [
- "%Y-%m-%d %H:%M:%S",
- "%Y-%m-%d %H:%M",
- "%Y/%m/%d %H:%M:%S",
- "%Y/%m/%d %H:%M",
- "%Y-%m-%dT%H:%M:%S",
- "%Y-%m-%dT%H:%M:%SZ",
- ]
-
- for fmt in formats:
- try:
- return datetime.strptime(date_str, fmt)
- except ValueError:
- continue
-
- return None
- def validate_video_file(video_path: str) -> bool:
- """验证视频文件是否有效"""
- if not video_path:
- return False
-
- if not os.path.exists(video_path):
- return False
-
- if not os.path.isfile(video_path):
- return False
-
- valid_extensions = ['.mp4', '.mov', '.avi', '.mkv', '.flv', '.wmv', '.webm']
- ext = os.path.splitext(video_path)[1].lower()
- if ext not in valid_extensions:
- return False
-
- if os.path.getsize(video_path) < 1024:
- return False
-
- return True
- # 创建 Flask 应用
- app = Flask(__name__)
- CORS(app)
- # 全局配置
- HEADLESS_MODE = os.environ.get('HEADLESS', 'true').lower() == 'true'
- print(f"[Config] HEADLESS env value: '{os.environ.get('HEADLESS', 'NOT SET')}'", flush=True)
- print(f"[Config] HEADLESS_MODE: {HEADLESS_MODE}", flush=True)
- # ==================== 签名相关(小红书专用) ====================
- @app.route("/sign", methods=["POST"])
- def sign_endpoint():
- """小红书签名接口"""
- try:
- from platforms.xiaohongshu import XiaohongshuPublisher
-
- data = request.json
- publisher = XiaohongshuPublisher(headless=True)
- result = asyncio.run(publisher.get_sign(
- data.get("uri", ""),
- data.get("data"),
- data.get("a1", ""),
- data.get("web_session", "")
- ))
- return jsonify(result)
- except Exception as e:
- traceback.print_exc()
- return jsonify({"error": str(e)}), 500
- # ==================== 统一发布接口 ====================
- @app.route("/publish", methods=["POST"])
- def publish_video():
- """
- 统一发布接口
-
- 请求体:
- {
- "platform": "douyin", # douyin | xiaohongshu | weixin | kuaishou
- "cookie": "cookie字符串或JSON",
- "title": "视频标题",
- "description": "视频描述(可选)",
- "video_path": "视频文件绝对路径",
- "cover_path": "封面图片绝对路径(可选)",
- "tags": ["话题1", "话题2"],
- "post_time": "定时发布时间(可选,格式:2024-01-20 12:00:00)",
- "location": "位置(可选,默认:重庆市)"
- }
-
- 响应:
- {
- "success": true,
- "platform": "douyin",
- "video_id": "xxx",
- "video_url": "xxx",
- "message": "发布成功"
- }
- """
- try:
- data = request.json
-
- # 获取参数
- platform = data.get("platform", "").lower()
- cookie_str = data.get("cookie", "")
- title = data.get("title", "")
- description = data.get("description", "")
- video_path = data.get("video_path", "")
- cover_path = data.get("cover_path")
- tags = data.get("tags", [])
- post_time = data.get("post_time")
- location = data.get("location", "重庆市")
-
- # 调试日志
- print(f"[Publish] 收到请求: platform={platform}, title={title}, video_path={video_path}")
-
- # 参数验证
- if not platform:
- print("[Publish] 错误: 缺少 platform 参数")
- return jsonify({"success": False, "error": "缺少 platform 参数"}), 400
- if platform not in PLATFORM_MAP:
- print(f"[Publish] 错误: 不支持的平台 {platform}")
- return jsonify({
- "success": False,
- "error": f"不支持的平台: {platform},支持: {list(PLATFORM_MAP.keys())}"
- }), 400
- if not cookie_str:
- print("[Publish] 错误: 缺少 cookie 参数")
- return jsonify({"success": False, "error": "缺少 cookie 参数"}), 400
- if not title:
- print("[Publish] 错误: 缺少 title 参数")
- return jsonify({"success": False, "error": "缺少 title 参数"}), 400
- if not video_path:
- print("[Publish] 错误: 缺少 video_path 参数")
- return jsonify({"success": False, "error": "缺少 video_path 参数"}), 400
-
- # 视频文件验证(增加详细信息)
- if not os.path.exists(video_path):
- print(f"[Publish] 错误: 视频文件不存在: {video_path}")
- return jsonify({"success": False, "error": f"视频文件不存在: {video_path}"}), 400
- if not os.path.isfile(video_path):
- print(f"[Publish] 错误: 路径不是文件: {video_path}")
- return jsonify({"success": False, "error": f"路径不是文件: {video_path}"}), 400
-
- # 解析发布时间
- publish_date = parse_datetime(post_time) if post_time else None
-
- # 创建发布参数
- params = PublishParams(
- title=title,
- video_path=video_path,
- description=description,
- cover_path=cover_path,
- tags=tags,
- publish_date=publish_date,
- location=location
- )
-
- print("=" * 60)
- print(f"[Publish] 平台: {platform}")
- print(f"[Publish] 标题: {title}")
- print(f"[Publish] 视频: {video_path}")
- print(f"[Publish] 封面: {cover_path}")
- print(f"[Publish] 话题: {tags}")
- print(f"[Publish] 定时: {publish_date}")
- print("=" * 60)
-
- # 获取对应平台的发布器
- PublisherClass = get_publisher(platform)
- publisher = PublisherClass(headless=HEADLESS_MODE)
-
- # 执行发布
- result = asyncio.run(publisher.run(cookie_str, params))
-
- response_data = {
- "success": result.success,
- "platform": result.platform,
- "video_id": result.video_id,
- "video_url": result.video_url,
- "message": result.message,
- "error": result.error,
- "need_captcha": result.need_captcha,
- "captcha_type": result.captcha_type,
- "screenshot_base64": result.screenshot_base64,
- "page_url": result.page_url,
- "status": result.status
- }
-
- # 如果需要验证码,打印明确的日志
- if result.need_captcha:
- print(f"[Publish] 需要验证码: type={result.captcha_type}")
-
- return jsonify(response_data)
-
- except Exception as e:
- traceback.print_exc()
- return jsonify({"success": False, "error": str(e)}), 500
- # ==================== AI 辅助发布接口 ====================
- # 存储活跃的发布会话
- active_publish_sessions = {}
- @app.route("/publish/ai-assisted", methods=["POST"])
- def publish_ai_assisted():
- """
- AI 辅助发布接口
-
- 与普通发布接口的区别:
- 1. 发布过程中会返回截图供 AI 分析
- 2. 如果检测到需要验证码,返回截图和状态,等待外部处理
- 3. 支持继续发布(输入验证码后)
-
- 请求体:
- {
- "platform": "douyin",
- "cookie": "cookie字符串",
- "title": "视频标题",
- "video_path": "视频文件路径",
- ...
- "return_screenshot": true // 是否返回截图
- }
-
- 响应:
- {
- "success": true/false,
- "status": "success|failed|need_captcha|processing",
- "screenshot_base64": "...", // 当前页面截图
- "page_url": "...",
- ...
- }
- """
- try:
- data = request.json
-
- # 获取参数
- platform = data.get("platform", "").lower()
- cookie_str = data.get("cookie", "")
- title = data.get("title", "")
- description = data.get("description", "")
- video_path = data.get("video_path", "")
- cover_path = data.get("cover_path")
- tags = data.get("tags", [])
- post_time = data.get("post_time")
- location = data.get("location", "重庆市")
- return_screenshot = data.get("return_screenshot", True)
-
- # 参数验证
- if not platform:
- return jsonify({"success": False, "error": "缺少 platform 参数"}), 400
- if platform not in PLATFORM_MAP:
- return jsonify({"success": False, "error": f"不支持的平台: {platform}"}), 400
- if not cookie_str:
- return jsonify({"success": False, "error": "缺少 cookie 参数"}), 400
- if not title:
- return jsonify({"success": False, "error": "缺少 title 参数"}), 400
- if not video_path or not os.path.exists(video_path):
- return jsonify({"success": False, "error": f"视频文件不存在: {video_path}"}), 400
-
- # 解析发布时间
- publish_date = parse_datetime(post_time) if post_time else None
-
- # 创建发布参数
- params = PublishParams(
- title=title,
- video_path=video_path,
- description=description,
- cover_path=cover_path,
- tags=tags,
- publish_date=publish_date,
- location=location
- )
-
- print("=" * 60)
- print(f"[AI Publish] 平台: {platform}")
- print(f"[AI Publish] 标题: {title}")
- print(f"[AI Publish] 视频: {video_path}")
- print("=" * 60)
-
- # 获取对应平台的发布器
- PublisherClass = get_publisher(platform)
- publisher = PublisherClass(headless=HEADLESS_MODE)
-
- # 执行发布
- result = asyncio.run(publisher.run(cookie_str, params))
-
- response_data = {
- "success": result.success,
- "platform": result.platform,
- "video_id": result.video_id,
- "video_url": result.video_url,
- "message": result.message,
- "error": result.error,
- "need_captcha": result.need_captcha,
- "captcha_type": result.captcha_type,
- "status": result.status or ("success" if result.success else "failed"),
- "page_url": result.page_url
- }
-
- # 如果请求返回截图
- if return_screenshot and result.screenshot_base64:
- response_data["screenshot_base64"] = result.screenshot_base64
-
- return jsonify(response_data)
-
- except Exception as e:
- traceback.print_exc()
- return jsonify({"success": False, "error": str(e), "status": "error"}), 500
- # ==================== 批量发布接口 ====================
- @app.route("/publish/batch", methods=["POST"])
- def publish_batch():
- """
- 批量发布接口 - 发布到多个平台
-
- 请求体:
- {
- "platforms": ["douyin", "xiaohongshu"],
- "cookies": {
- "douyin": "cookie字符串",
- "xiaohongshu": "cookie字符串"
- },
- "title": "视频标题",
- "video_path": "视频文件绝对路径",
- ...
- }
- """
- try:
- data = request.json
-
- platforms = data.get("platforms", [])
- cookies = data.get("cookies", {})
-
- if not platforms:
- return jsonify({"success": False, "error": "缺少 platforms 参数"}), 400
-
- results = []
-
- for platform in platforms:
- platform = platform.lower()
- cookie_str = cookies.get(platform, "")
-
- if not cookie_str:
- results.append({
- "platform": platform,
- "success": False,
- "error": f"缺少 {platform} 的 cookie"
- })
- continue
-
- try:
- # 创建参数
- params = PublishParams(
- title=data.get("title", ""),
- video_path=data.get("video_path", ""),
- description=data.get("description", ""),
- cover_path=data.get("cover_path"),
- tags=data.get("tags", []),
- publish_date=parse_datetime(data.get("post_time")),
- location=data.get("location", "重庆市")
- )
-
- # 发布
- PublisherClass = get_publisher(platform)
- publisher = PublisherClass(headless=HEADLESS_MODE)
- result = asyncio.run(publisher.run(cookie_str, params))
-
- results.append({
- "platform": result.platform,
- "success": result.success,
- "video_id": result.video_id,
- "message": result.message,
- "error": result.error
- })
- except Exception as e:
- results.append({
- "platform": platform,
- "success": False,
- "error": str(e)
- })
-
- # 统计成功/失败数量
- success_count = sum(1 for r in results if r.get("success"))
-
- return jsonify({
- "success": success_count > 0,
- "total": len(platforms),
- "success_count": success_count,
- "fail_count": len(platforms) - success_count,
- "results": results
- })
-
- except Exception as e:
- traceback.print_exc()
- return jsonify({"success": False, "error": str(e)}), 500
- # ==================== Cookie 验证接口 ====================
- @app.route("/check_cookie", methods=["POST"])
- def check_cookie():
- """检查 cookie 是否有效"""
- try:
- data = request.json
- platform = data.get("platform", "").lower()
- cookie_str = data.get("cookie", "")
-
- if not cookie_str:
- return jsonify({"valid": False, "error": "缺少 cookie 参数"}), 400
-
- # 目前只支持小红书的 cookie 验证
- if platform == "xiaohongshu":
- try:
- from platforms.xiaohongshu import XiaohongshuPublisher, XHS_SDK_AVAILABLE
-
- if XHS_SDK_AVAILABLE:
- from xhs import XhsClient
- publisher = XiaohongshuPublisher()
- xhs_client = XhsClient(cookie_str, sign=publisher.sign_sync)
- info = xhs_client.get_self_info()
-
- if info:
- return jsonify({
- "valid": True,
- "user_info": {
- "user_id": info.get("user_id"),
- "nickname": info.get("nickname"),
- "avatar": info.get("images")
- }
- })
- except Exception as e:
- return jsonify({"valid": False, "error": str(e)})
-
- # 其他平台返回格式正确但未验证
- return jsonify({
- "valid": True,
- "message": "Cookie 格式正确,但未进行在线验证"
- })
-
- except Exception as e:
- traceback.print_exc()
- return jsonify({"valid": False, "error": str(e)})
- # ==================== 获取作品列表接口 ====================
- @app.route("/works", methods=["POST"])
- def get_works():
- """
- 获取作品列表
-
- 请求体:
- {
- "platform": "douyin", # douyin | xiaohongshu | kuaishou
- "cookie": "cookie字符串或JSON",
- "page": 0, # 页码(从0开始,可选,默认0)
- "page_size": 20 # 每页数量(可选,默认20)
- }
-
- 响应:
- {
- "success": true,
- "platform": "douyin",
- "works": [...],
- "total": 100,
- "has_more": true
- }
- """
- try:
- data = request.json
-
- platform = data.get("platform", "").lower()
- cookie_str = data.get("cookie", "")
- page = data.get("page", 0)
- page_size = data.get("page_size", 20)
-
- print(f"[Works] 收到请求: platform={platform}, page={page}, page_size={page_size}")
-
- if not platform:
- return jsonify({"success": False, "error": "缺少 platform 参数"}), 400
- if platform not in PLATFORM_MAP:
- return jsonify({
- "success": False,
- "error": f"不支持的平台: {platform},支持: {list(PLATFORM_MAP.keys())}"
- }), 400
- if not cookie_str:
- return jsonify({"success": False, "error": "缺少 cookie 参数"}), 400
-
- # 获取对应平台的发布器
- PublisherClass = get_publisher(platform)
- publisher = PublisherClass(headless=HEADLESS_MODE)
-
- # 执行获取作品
- result = asyncio.run(publisher.run_get_works(cookie_str, page, page_size))
-
- return jsonify(result.to_dict())
-
- except Exception as e:
- traceback.print_exc()
- return jsonify({"success": False, "error": str(e)}), 500
- # ==================== 保存作品日统计数据接口 ====================
- @app.route("/work_day_statistics", methods=["POST"])
- def save_work_day_statistics():
- """
- 保存作品每日统计数据
- 当天的数据走更新流,日期变化走新增流
-
- 请求体:
- {
- "statistics": [
- {
- "work_id": 1,
- "fans_count": 1000,
- "play_count": 5000,
- "like_count": 200,
- "comment_count": 50,
- "share_count": 30,
- "collect_count": 100
- },
- ...
- ]
- }
-
- 响应:
- {
- "success": true,
- "inserted": 5,
- "updated": 3,
- "message": "保存成功"
- }
- """
- print("=" * 60, flush=True)
- print("[DEBUG] ===== 进入 save_work_day_statistics 方法 =====", flush=True)
- print(f"[DEBUG] 请求方法: {request.method}", flush=True)
- print(f"[DEBUG] 请求数据: {request.json}", flush=True)
- print("=" * 60, flush=True)
-
- try:
- data = request.json
- statistics_list = data.get("statistics", [])
-
- if not statistics_list:
- return jsonify({"success": False, "error": "缺少 statistics 参数"}), 400
-
- today = date.today()
- inserted_count = 0
- updated_count = 0
-
- print(f"[WorkDayStatistics] 收到请求: {len(statistics_list)} 条统计数据")
-
- conn = get_db_connection()
- try:
- with conn.cursor() as cursor:
- for stat in statistics_list:
- work_id = stat.get("work_id")
- if not work_id:
- continue
-
- fans_count = stat.get("fans_count", 0)
- play_count = stat.get("play_count", 0)
- like_count = stat.get("like_count", 0)
- comment_count = stat.get("comment_count", 0)
- share_count = stat.get("share_count", 0)
- collect_count = stat.get("collect_count", 0)
-
- # 检查当天是否已有记录
- cursor.execute(
- "SELECT id FROM work_day_statistics WHERE work_id = %s AND record_date = %s",
- (work_id, today)
- )
- existing = cursor.fetchone()
-
- if existing:
- # 更新已有记录
- cursor.execute(
- """UPDATE work_day_statistics
- SET fans_count = %s, play_count = %s, like_count = %s,
- comment_count = %s, share_count = %s, collect_count = %s,
- updated_at = NOW()
- WHERE id = %s""",
- (fans_count, play_count, like_count, comment_count,
- share_count, collect_count, existing['id'])
- )
- updated_count += 1
- else:
- # 插入新记录
- cursor.execute(
- """INSERT INTO work_day_statistics
- (work_id, record_date, fans_count, play_count, like_count,
- comment_count, share_count, collect_count, created_at, updated_at)
- VALUES (%s, %s, %s, %s, %s, %s, %s, %s, NOW(), NOW())""",
- (work_id, today, fans_count, play_count, like_count,
- comment_count, share_count, collect_count)
- )
- inserted_count += 1
-
- conn.commit()
- finally:
- conn.close()
-
- print(f"[WorkDayStatistics] 完成: 新增 {inserted_count} 条, 更新 {updated_count} 条")
-
- return jsonify({
- "success": True,
- "inserted": inserted_count,
- "updated": updated_count,
- "message": f"保存成功: 新增 {inserted_count} 条, 更新 {updated_count} 条"
- })
-
- except Exception as e:
- traceback.print_exc()
- return jsonify({"success": False, "error": str(e)}), 500
- @app.route("/work_day_statistics/trend", methods=["GET"])
- def get_statistics_trend():
- """
- 获取数据趋势(用于 Dashboard 数据看板)
-
- 查询参数:
- user_id: 用户ID (必填)
- days: 天数 (可选,默认7天,最大30天)
- account_id: 账号ID (可选,不填则查询所有账号)
-
- 响应:
- {
- "success": true,
- "data": {
- "dates": ["01-16", "01-17", "01-18", ...],
- "fans": [100, 120, 130, ...],
- "views": [1000, 1200, 1500, ...],
- "likes": [50, 60, 70, ...],
- "comments": [10, 12, 15, ...],
- "shares": [5, 6, 8, ...],
- "collects": [20, 25, 30, ...]
- }
- }
- """
- try:
- user_id = request.args.get("user_id")
- days = min(int(request.args.get("days", 7)), 30) # 最大30天
- account_id = request.args.get("account_id")
-
- if not user_id:
- return jsonify({"success": False, "error": "缺少 user_id 参数"}), 400
-
- conn = get_db_connection()
- try:
- with conn.cursor() as cursor:
- # 构建查询:关联 works 表获取用户的作品,然后汇总统计数据
- # 注意:粉丝数是账号级别的数据,每个账号每天只取一个值(使用 MAX)
- # 其他指标(播放、点赞等)是作品级别的数据,需要累加
- sql = """
- SELECT
- record_date,
- SUM(account_fans) as total_fans,
- SUM(account_views) as total_views,
- SUM(account_likes) as total_likes,
- SUM(account_comments) as total_comments,
- SUM(account_shares) as total_shares,
- SUM(account_collects) as total_collects
- FROM (
- SELECT
- wds.record_date,
- w.accountId,
- MAX(wds.fans_count) as account_fans,
- SUM(wds.play_count) as account_views,
- SUM(wds.like_count) as account_likes,
- SUM(wds.comment_count) as account_comments,
- SUM(wds.share_count) as account_shares,
- SUM(wds.collect_count) as account_collects
- FROM work_day_statistics wds
- INNER JOIN works w ON wds.work_id = w.id
- WHERE w.userId = %s
- AND wds.record_date >= DATE_SUB(CURDATE(), INTERVAL %s DAY)
- """
- params = [user_id, days]
-
- if account_id:
- sql += " AND w.accountId = %s"
- params.append(account_id)
-
- sql += """
- GROUP BY wds.record_date, w.accountId
- ) as account_stats
- GROUP BY record_date
- ORDER BY record_date ASC
- """
-
- cursor.execute(sql, params)
- results = cursor.fetchall()
-
- # 构建响应数据
- dates = []
- fans = []
- views = []
- likes = []
- comments = []
- shares = []
- collects = []
-
- for row in results:
- # 格式化日期为 "MM-DD" 格式
- record_date = row['record_date']
- if isinstance(record_date, str):
- dates.append(record_date[5:10]) # "2026-01-16" -> "01-16"
- else:
- dates.append(record_date.strftime("%m-%d"))
-
- # 确保返回整数类型
- fans.append(int(row['total_fans'] or 0))
- views.append(int(row['total_views'] or 0))
- likes.append(int(row['total_likes'] or 0))
- comments.append(int(row['total_comments'] or 0))
- shares.append(int(row['total_shares'] or 0))
- collects.append(int(row['total_collects'] or 0))
-
- # 如果没有数据,生成空的日期范围
- if not dates:
- from datetime import timedelta
- today = date.today()
- for i in range(days, 0, -1):
- d = today - timedelta(days=i-1)
- dates.append(d.strftime("%m-%d"))
- fans.append(0)
- views.append(0)
- likes.append(0)
- comments.append(0)
- shares.append(0)
- collects.append(0)
-
- return jsonify({
- "success": True,
- "data": {
- "dates": dates,
- "fans": fans,
- "views": views,
- "likes": likes,
- "comments": comments,
- "shares": shares,
- "collects": collects
- }
- })
- finally:
- conn.close()
-
- except Exception as e:
- traceback.print_exc()
- return jsonify({"success": False, "error": str(e)}), 500
- @app.route("/work_day_statistics/platforms", methods=["GET"])
- def get_statistics_by_platform():
- """
- 按平台分组获取统计数据(用于数据分析页面的平台对比)
-
- 数据来源:
- - 粉丝数:从 platform_accounts 表获取(账号级别数据)
- - 播放量/点赞/评论/收藏:从 work_day_statistics 表按平台汇总
- - 粉丝增量:通过比较区间内最早和最新的粉丝数计算
-
- 查询参数:
- user_id: 用户ID (必填)
- days: 天数 (可选,默认30天,最大30天)
-
- 响应:
- {
- "success": true,
- "data": [
- {
- "platform": "douyin",
- "fansCount": 1000,
- "fansIncrease": 50,
- "viewsCount": 5000,
- "likesCount": 200,
- "commentsCount": 30,
- "collectsCount": 100
- },
- ...
- ]
- }
- """
- try:
- user_id = request.args.get("user_id")
- days = min(int(request.args.get("days", 30)), 30)
-
- if not user_id:
- return jsonify({"success": False, "error": "缺少 user_id 参数"}), 400
-
- conn = get_db_connection()
- try:
- with conn.cursor() as cursor:
- # 简化查询:按平台分组获取统计数据
- # 1. 从 platform_accounts 获取当前粉丝数(注意:字段名是下划线命名 fans_count, user_id)
- # 2. 从 work_day_statistics 获取播放量等累计数据
- sql = """
- SELECT
- pa.platform,
- pa.fans_count as current_fans,
- COALESCE(stats.total_views, 0) as viewsCount,
- COALESCE(stats.total_likes, 0) as likesCount,
- COALESCE(stats.total_comments, 0) as commentsCount,
- COALESCE(stats.total_collects, 0) as collectsCount,
- COALESCE(fans_change.earliest_fans, pa.fans_count) as earliest_fans
- FROM platform_accounts pa
- LEFT JOIN (
- -- 获取区间内的累计数据(按账号汇总)
- SELECT
- w.accountId,
- SUM(wds.play_count) as total_views,
- SUM(wds.like_count) as total_likes,
- SUM(wds.comment_count) as total_comments,
- SUM(wds.collect_count) as total_collects
- FROM work_day_statistics wds
- INNER JOIN works w ON wds.work_id = w.id
- WHERE w.userId = %s
- AND wds.record_date >= DATE_SUB(CURDATE(), INTERVAL %s DAY)
- GROUP BY w.accountId
- ) stats ON pa.id = stats.accountId
- LEFT JOIN (
- -- 获取区间内最早一天的粉丝数
- SELECT
- w.accountId,
- MAX(wds.fans_count) as earliest_fans
- FROM work_day_statistics wds
- INNER JOIN works w ON wds.work_id = w.id
- WHERE w.userId = %s
- AND wds.record_date = (
- SELECT MIN(wds2.record_date)
- FROM work_day_statistics wds2
- INNER JOIN works w2 ON wds2.work_id = w2.id
- WHERE w2.userId = %s
- AND wds2.record_date >= DATE_SUB(CURDATE(), INTERVAL %s DAY)
- )
- GROUP BY w.accountId
- ) fans_change ON pa.id = fans_change.accountId
- WHERE pa.user_id = %s
- ORDER BY current_fans DESC
- """
-
- cursor.execute(sql, [user_id, days, user_id, user_id, days, user_id])
- results = cursor.fetchall()
-
- # 构建响应数据
- platform_data = []
- for row in results:
- current_fans = int(row['current_fans'] or 0)
- earliest_fans = int(row['earliest_fans'] or current_fans)
- fans_increase = current_fans - earliest_fans
-
- platform_data.append({
- "platform": row['platform'],
- "fansCount": current_fans,
- "fansIncrease": fans_increase,
- "viewsCount": int(row['viewsCount'] or 0),
- "likesCount": int(row['likesCount'] or 0),
- "commentsCount": int(row['commentsCount'] or 0),
- "collectsCount": int(row['collectsCount'] or 0),
- })
-
- print(f"[PlatformStats] 返回 {len(platform_data)} 个平台的数据")
-
- return jsonify({
- "success": True,
- "data": platform_data
- })
- finally:
- conn.close()
-
- except Exception as e:
- traceback.print_exc()
- return jsonify({"success": False, "error": str(e)}), 500
- @app.route("/work_day_statistics/batch", methods=["POST"])
- def get_work_statistics_history():
- """
- 批量获取作品的历史统计数据
-
- 请求体:
- {
- "work_ids": [1, 2, 3],
- "start_date": "2025-01-01", # 可选
- "end_date": "2025-01-21" # 可选
- }
-
- 响应:
- {
- "success": true,
- "data": {
- "1": [
- {"record_date": "2025-01-20", "play_count": 100, ...},
- {"record_date": "2025-01-21", "play_count": 150, ...}
- ],
- ...
- }
- }
- """
- try:
- data = request.json
- work_ids = data.get("work_ids", [])
- start_date = data.get("start_date")
- end_date = data.get("end_date")
-
- if not work_ids:
- return jsonify({"success": False, "error": "缺少 work_ids 参数"}), 400
-
- conn = get_db_connection()
- try:
- with conn.cursor() as cursor:
- # 构建查询
- placeholders = ', '.join(['%s'] * len(work_ids))
- sql = f"""SELECT work_id, record_date, fans_count, play_count, like_count,
- comment_count, share_count, collect_count
- FROM work_day_statistics
- WHERE work_id IN ({placeholders})"""
- params = list(work_ids)
-
- if start_date:
- sql += " AND record_date >= %s"
- params.append(start_date)
- if end_date:
- sql += " AND record_date <= %s"
- params.append(end_date)
-
- sql += " ORDER BY work_id, record_date"
-
- cursor.execute(sql, params)
- results = cursor.fetchall()
- finally:
- conn.close()
-
- # 按 work_id 分组
- grouped_data = {}
- for row in results:
- work_id = str(row['work_id'])
- if work_id not in grouped_data:
- grouped_data[work_id] = []
- grouped_data[work_id].append({
- 'record_date': row['record_date'].strftime('%Y-%m-%d') if row['record_date'] else None,
- 'fans_count': row['fans_count'],
- 'play_count': row['play_count'],
- 'like_count': row['like_count'],
- 'comment_count': row['comment_count'],
- 'share_count': row['share_count'],
- 'collect_count': row['collect_count']
- })
-
- return jsonify({
- "success": True,
- "data": grouped_data
- })
-
- except Exception as e:
- traceback.print_exc()
- return jsonify({"success": False, "error": str(e)}), 500
- # ==================== 获取评论列表接口 ====================
- @app.route("/comments", methods=["POST"])
- def get_comments():
- """
- 获取作品评论
-
- 请求体:
- {
- "platform": "douyin", # douyin | xiaohongshu | kuaishou
- "cookie": "cookie字符串或JSON",
- "work_id": "作品ID",
- "cursor": "" # 分页游标(可选)
- }
-
- 响应:
- {
- "success": true,
- "platform": "douyin",
- "work_id": "xxx",
- "comments": [...],
- "total": 50,
- "has_more": true,
- "cursor": "xxx"
- }
- """
- try:
- data = request.json
-
- platform = data.get("platform", "").lower()
- cookie_str = data.get("cookie", "")
- work_id = data.get("work_id", "")
- cursor = data.get("cursor", "")
-
- print(f"[Comments] 收到请求: platform={platform}, work_id={work_id}")
-
- if not platform:
- return jsonify({"success": False, "error": "缺少 platform 参数"}), 400
- if platform not in PLATFORM_MAP:
- return jsonify({
- "success": False,
- "error": f"不支持的平台: {platform},支持: {list(PLATFORM_MAP.keys())}"
- }), 400
- if not cookie_str:
- return jsonify({"success": False, "error": "缺少 cookie 参数"}), 400
- if not work_id:
- return jsonify({"success": False, "error": "缺少 work_id 参数"}), 400
-
- # 获取对应平台的发布器
- PublisherClass = get_publisher(platform)
- publisher = PublisherClass(headless=HEADLESS_MODE)
-
- # 执行获取评论
- result = asyncio.run(publisher.run_get_comments(cookie_str, work_id, cursor))
-
- result_dict = result.to_dict()
- # 添加 cursor 到响应
- if hasattr(result, '__dict__') and 'cursor' in result.__dict__:
- result_dict['cursor'] = result.__dict__['cursor']
-
- return jsonify(result_dict)
-
- except Exception as e:
- traceback.print_exc()
- return jsonify({"success": False, "error": str(e)}), 500
- # ==================== 获取所有作品评论接口 ====================
- @app.route("/all_comments", methods=["POST"])
- def get_all_comments():
- """
- 获取所有作品的评论(一次性获取)
-
- 请求体:
- {
- "platform": "douyin", # douyin | xiaohongshu
- "cookie": "cookie字符串或JSON"
- }
-
- 响应:
- {
- "success": true,
- "platform": "douyin",
- "work_comments": [
- {
- "work_id": "xxx",
- "title": "作品标题",
- "cover_url": "封面URL",
- "comments": [...]
- }
- ],
- "total": 5
- }
- """
- try:
- data = request.json
-
- platform = data.get("platform", "").lower()
- cookie_str = data.get("cookie", "")
-
- print(f"[AllComments] 收到请求: platform={platform}")
-
- if not platform:
- return jsonify({"success": False, "error": "缺少 platform 参数"}), 400
- if platform not in ['douyin', 'xiaohongshu']:
- return jsonify({
- "success": False,
- "error": f"该接口只支持 douyin 和 xiaohongshu 平台"
- }), 400
- if not cookie_str:
- return jsonify({"success": False, "error": "缺少 cookie 参数"}), 400
-
- # 获取对应平台的发布器
- PublisherClass = get_publisher(platform)
- publisher = PublisherClass(headless=HEADLESS_MODE)
-
- # 执行获取所有评论
- result = asyncio.run(publisher.get_all_comments(cookie_str))
-
- return jsonify(result)
-
- except Exception as e:
- traceback.print_exc()
- return jsonify({"success": False, "error": str(e)}), 500
- # ==================== 登录状态检查接口 ====================
- @app.route("/check_login", methods=["POST"])
- def check_login():
- """
- 检查 Cookie 登录状态(通过浏览器访问后台页面检测)
-
- 请求体:
- {
- "platform": "douyin", # douyin | xiaohongshu | kuaishou | weixin
- "cookie": "cookie字符串或JSON"
- }
-
- 响应:
- {
- "success": true,
- "valid": true, # Cookie 是否有效
- "need_login": false, # 是否需要重新登录
- "message": "登录状态有效"
- }
- """
- try:
- data = request.json
-
- platform = data.get("platform", "").lower()
- cookie_str = data.get("cookie", "")
-
- print(f"[CheckLogin] 收到请求: platform={platform}")
-
- if not platform:
- return jsonify({"success": False, "error": "缺少 platform 参数"}), 400
- if platform not in PLATFORM_MAP:
- return jsonify({
- "success": False,
- "error": f"不支持的平台: {platform},支持: {list(PLATFORM_MAP.keys())}"
- }), 400
- if not cookie_str:
- return jsonify({"success": False, "error": "缺少 cookie 参数"}), 400
-
- # 获取对应平台的发布器
- PublisherClass = get_publisher(platform)
- publisher = PublisherClass(headless=HEADLESS_MODE)
-
- # 执行登录检查
- result = asyncio.run(publisher.check_login_status(cookie_str))
-
- return jsonify(result)
-
- except Exception as e:
- traceback.print_exc()
- return jsonify({
- "success": False,
- "valid": False,
- "need_login": True,
- "error": str(e)
- }), 500
- # ==================== 获取账号信息接口 ====================
- @app.route("/account_info", methods=["POST"])
- def get_account_info():
- """
- 获取账号信息
-
- 请求体:
- {
- "platform": "baijiahao", # 平台
- "cookie": "cookie字符串或JSON"
- }
-
- 响应:
- {
- "success": true,
- "account_id": "xxx",
- "account_name": "用户名",
- "avatar_url": "头像URL",
- "fans_count": 0,
- "works_count": 0
- }
- """
- try:
- data = request.json
-
- platform = data.get("platform", "").lower()
- cookie_str = data.get("cookie", "")
-
- print(f"[AccountInfo] 收到请求: platform={platform}")
-
- if not platform:
- return jsonify({"success": False, "error": "缺少 platform 参数"}), 400
- if platform not in PLATFORM_MAP:
- return jsonify({
- "success": False,
- "error": f"不支持的平台: {platform},支持: {list(PLATFORM_MAP.keys())}"
- }), 400
- if not cookie_str:
- return jsonify({"success": False, "error": "缺少 cookie 参数"}), 400
-
- # 获取对应平台的发布器
- PublisherClass = get_publisher(platform)
- publisher = PublisherClass(headless=HEADLESS_MODE)
-
- # 检查是否有 get_account_info 方法
- if hasattr(publisher, 'get_account_info'):
- result = asyncio.run(publisher.get_account_info(cookie_str))
- return jsonify(result)
- else:
- return jsonify({
- "success": False,
- "error": f"平台 {platform} 不支持获取账号信息"
- }), 400
-
- except Exception as e:
- traceback.print_exc()
- return jsonify({"success": False, "error": str(e)}), 500
- # ==================== 健康检查 ====================
- @app.route("/health", methods=["GET"])
- def health_check():
- """健康检查"""
- # 检查 xhs SDK 是否可用
- xhs_available = False
- try:
- from platforms.xiaohongshu import XHS_SDK_AVAILABLE
- xhs_available = XHS_SDK_AVAILABLE
- except:
- pass
-
- return jsonify({
- "status": "ok",
- "xhs_sdk": xhs_available,
- "supported_platforms": list(PLATFORM_MAP.keys()),
- "headless_mode": HEADLESS_MODE
- })
- @app.route("/", methods=["GET"])
- def index():
- """首页"""
- return jsonify({
- "name": "多平台视频发布服务",
- "version": "1.2.0",
- "endpoints": {
- "GET /": "服务信息",
- "GET /health": "健康检查",
- "POST /publish": "发布视频",
- "POST /publish/batch": "批量发布",
- "POST /works": "获取作品列表",
- "POST /comments": "获取作品评论",
- "POST /all_comments": "获取所有作品评论",
- "POST /work_day_statistics": "保存作品每日统计数据",
- "POST /work_day_statistics/batch": "获取作品历史统计数据",
- "POST /check_cookie": "检查 Cookie",
- "POST /sign": "小红书签名"
- },
- "supported_platforms": list(PLATFORM_MAP.keys())
- })
- # ==================== 命令行启动 ====================
- def main():
- parser = argparse.ArgumentParser(description='多平台视频发布服务')
- parser.add_argument('--port', type=int, default=5005, help='服务端口 (默认: 5005)')
- parser.add_argument('--host', type=str, default='0.0.0.0', help='监听地址 (默认: 0.0.0.0)')
- parser.add_argument('--headless', type=str, default='true', help='是否无头模式 (默认: true)')
- parser.add_argument('--debug', action='store_true', help='调试模式')
-
- args = parser.parse_args()
-
- global HEADLESS_MODE
- HEADLESS_MODE = args.headless.lower() == 'true'
-
- # 检查 xhs SDK
- xhs_status = "未安装"
- try:
- from platforms.xiaohongshu import XHS_SDK_AVAILABLE
- xhs_status = "已安装" if XHS_SDK_AVAILABLE else "未安装"
- except:
- pass
-
- print("=" * 60)
- print("多平台视频发布服务")
- print("=" * 60)
- print(f"XHS SDK: {xhs_status}")
- print(f"Headless 模式: {HEADLESS_MODE}")
- print(f"支持平台: {', '.join(PLATFORM_MAP.keys())}")
- print("=" * 60)
- print(f"启动服务: http://{args.host}:{args.port}")
- print("=" * 60)
-
- app.run(host=args.host, port=args.port, debug=args.debug, threaded=True)
- if __name__ == '__main__':
- main()
|