||
- import schedule from 'node-schedule';
- import { AppDataSource, PublishTask, PlatformAccount, AnalyticsData } from '../models/index.js';
- import { logger } from '../utils/logger.js';
- import { wsManager } from '../websocket/index.js';
- import { WS_EVENTS } from '@media-manager/shared';
- import { getAdapter, isPlatformSupported } from '../automation/platforms/index.js';
- import { LessThanOrEqual, In } from 'typeorm';
- import { taskQueueService } from '../services/TaskQueueService.js';
- import { XiaohongshuAccountOverviewImportService } from '../services/XiaohongshuAccountOverviewImportService.js';
- import { DouyinAccountOverviewImportService } from '../services/DouyinAccountOverviewImportService.js';
- import { BaijiahaoContentOverviewImportService } from '../services/BaijiahaoContentOverviewImportService.js';
- import { WeixinVideoDataCenterImportService } from '../services/WeixinVideoDataCenterImportService.js';
- import { XiaohongshuWorkNoteStatisticsImportService } from '../services/XiaohongshuWorkNoteStatisticsImportService.js';
- /**
- * 定时任务调度器
- */
- export class TaskScheduler {
- private jobs: Map<string, schedule.Job> = new Map();
- private isRefreshingAccounts = false; // 账号刷新锁,防止任务重叠执行
- private isXhsImportRunning = false; // 小红书导入锁,防止任务重叠执行
- private isXhsWorkImportRunning = false; // 小红书作品日统计导入锁
- private isDyImportRunning = false; // 抖音导入锁,防止任务重叠执行
- private isBjImportRunning = false; // 百家号导入锁,防止任务重叠执行
- private isWxImportRunning = false; // 视频号导入锁,防止任务重叠执行
- private isAutoReplying = false; // 私信回复锁,防止任务重叠执行
- /**
- * 启动调度器
- *
- * 注意:账号刷新任务由客户端定时触发,只刷新当前登录用户的账号
- * 服务端不再自动刷新所有用户的账号
- */
- start(): void {
- logger.info('[Scheduler] ========================================');
- logger.info('[Scheduler] Starting task scheduler...');
-
- // 每分钟检查定时发布任务(只处理到期的定时发布任务)
- this.scheduleJob('check-publish-tasks', '* * * * *', this.checkPublishTasks.bind(this));
- // 每天中午 12 点:批量导出小红书“账号概览-笔记数据-观看数据-近30日”,导入 user_day_statistics
- // 注意:node-schedule 使用服务器本地时区
- this.scheduleJob('xhs-account-overview-import', '0 12 * * *', this.importXhsAccountOverviewLast30Days.bind(this));
- // 每天 12:40:同步小红书作品维度的「笔记详情-按天」数据,写入 work_day_statistics
- this.scheduleJob(
- 'xhs-work-note-statistics-import',
- '40 12 * * *',
- this.importXhsWorkNoteStatistics.bind(this)
- );
- // 每天 12:10:批量导出抖音“数据中心-账号总览-短视频-数据表现-近30天”,导入 user_day_statistics
- this.scheduleJob('dy-account-overview-import', '10 12 * * *', this.importDyAccountOverviewLast30Days.bind(this));
- // 每天 12:20:批量导出百家号“数据中心-内容分析-基础数据-近30天”,导入 user_day_statistics
- this.scheduleJob('bj-content-overview-import', '20 12 * * *', this.importBaijiahaoContentOverviewLast30Days.bind(this));
- // 每天 12:30:批量导出视频号“数据中心-各子菜单-增长详情(数据详情)-近30天-下载表格”,导入 user_day_statistics
- this.scheduleJob('wx-video-data-center-import', '30 12 * * *', this.importWeixinVideoDataCenterLast30Days.bind(this));
-
- this.scheduleJob('auto-reply-messages', '* * * * *', this.autoReplyMessages.bind(this));
- // 注意:账号刷新由客户端定时触发,不在服务端自动执行
- // 这样可以确保只刷新当前登录用户的账号,避免处理其他用户的数据
-
- // 每天凌晨2点采集数据统计(可选,如果需要服务端采集可以启用)
- // this.scheduleJob('collect-analytics', '0 2 * * *', this.collectAnalytics.bind(this));
-
- logger.info('[Scheduler] Scheduled jobs:');
- logger.info('[Scheduler] - check-publish-tasks: every minute (* * * * *)');
- logger.info('[Scheduler] - xhs-account-overview-import: daily at 12:00 (0 12 * * *)');
- logger.info(
- '[Scheduler] - xhs-work-note-statistics-import: daily at 12:40 (40 12 * * *)'
- );
- logger.info('[Scheduler] - dy-account-overview-import: daily at 12:10 (10 12 * * *)');
- logger.info('[Scheduler] - bj-content-overview-import: daily at 12:20 (20 12 * * *)');
- logger.info('[Scheduler] - wx-video-data-center-import: daily at 12:30 (30 12 * * *)');
- logger.info('[Scheduler] - auto-reply-messages: every minute (* * * * *)');
- logger.info('[Scheduler] Note: Account refresh is triggered by client, not server');
- logger.info('[Scheduler] ========================================');
-
- logger.info('[Scheduler] Task scheduler started successfully');
- }
-
- /**
- * 停止调度器
- */
- stop(): void {
- this.jobs.forEach((job, name) => {
- job.cancel();
- logger.info(`Cancelled job: ${name}`);
- });
- this.jobs.clear();
- logger.info('Task scheduler stopped');
- }
-
- /**
- * 添加定时任务
- */
- private scheduleJob(name: string, cron: string, handler: () => Promise<void>): void {
- const job = schedule.scheduleJob(cron, async () => {
- logger.info(`Running scheduled job: ${name}`);
- try {
- await handler();
- logger.info(`Completed job: ${name}`);
- } catch (error) {
- logger.error(`Job ${name} failed:`, error);
- }
- });
-
- if (job) {
- this.jobs.set(name, job);
- }
- }
-
- /**
- * 检查定时发布任务
- */
- private async checkPublishTasks(): Promise<void> {
- const taskRepository = AppDataSource.getRepository(PublishTask);
-
- // 获取需要执行的任务
- const tasks = await taskRepository.find({
- where: {
- status: 'pending',
- scheduledAt: LessThanOrEqual(new Date()),
- },
- relations: ['results'],
- });
-
- for (const task of tasks) {
- logger.info(`Executing scheduled task: ${task.id}`);
-
- // 更新状态为处理中
- await taskRepository.update(task.id, { status: 'processing' });
- wsManager.sendToUser(task.userId, WS_EVENTS.TASK_STATUS_CHANGED, {
- taskId: task.id,
- status: 'processing',
- });
-
- // 执行发布
- await this.executePublishTask(task);
- }
- }
-
- /**
- * 执行发布任务
- */
- private async executePublishTask(task: PublishTask): Promise<void> {
- const taskRepository = AppDataSource.getRepository(PublishTask);
- const accountRepository = AppDataSource.getRepository(PlatformAccount);
-
- const targetAccounts = task.targetAccounts || [];
- const accounts = await accountRepository.find({
- where: { id: In(targetAccounts) },
- });
-
- let successCount = 0;
- let failCount = 0;
-
- for (const account of accounts) {
- if (!isPlatformSupported(account.platform)) {
- logger.warn(`Platform ${account.platform} not supported`);
- failCount++;
- continue;
- }
-
- try {
- const adapter = getAdapter(account.platform);
- const result = await adapter.publishVideo(account.cookieData || '', {
- videoPath: task.videoPath || '',
- title: task.title || '',
- description: task.description || undefined,
- coverPath: task.coverPath || undefined,
- tags: task.tags || undefined,
- });
-
- if (result.success) {
- successCount++;
- } else {
- failCount++;
- }
-
- // 更新发布结果
- // TODO: 更新 publish_results 表
-
- } catch (error) {
- logger.error(`Publish to ${account.platform} failed:`, error);
- failCount++;
- }
- }
-
- // 更新任务状态
- const finalStatus = failCount === 0 ? 'completed' : (successCount === 0 ? 'failed' : 'completed');
- await taskRepository.update(task.id, {
- status: finalStatus,
- publishedAt: new Date(),
- });
-
- wsManager.sendToUser(task.userId, WS_EVENTS.TASK_STATUS_CHANGED, {
- taskId: task.id,
- status: finalStatus,
- });
- }
-
- /**
- * 刷新账号状态和信息
- * 将每个账号的刷新任务加入到任务队列中执行
- * 通过任务队列控制并发,避免浏览器资源竞争
- */
- private async refreshAccounts(): Promise<void> {
- // 检查是否正在执行刷新任务
- if (this.isRefreshingAccounts) {
- logger.info('[Scheduler] Account refresh is already running, skipping this cycle...');
- return;
- }
-
- // 获取锁
- this.isRefreshingAccounts = true;
- logger.debug('[Scheduler] Acquired refresh lock');
-
- try {
- const accountRepository = AppDataSource.getRepository(PlatformAccount);
-
- // 获取所有账号,不过滤 status,让刷新任务自动检测并更新状态
- const accounts = await accountRepository.find();
-
- if (accounts.length === 0) {
- logger.info('[Scheduler] No active accounts to refresh');
- return;
- }
-
- logger.info(`[Scheduler] Creating refresh tasks for ${accounts.length} active accounts...`);
-
- let tasksCreated = 0;
- let skipped = 0;
-
- // 为每个账号创建刷新任务,加入任务队列
- for (const account of accounts) {
- if (!isPlatformSupported(account.platform)) {
- logger.debug(`[Scheduler] Platform ${account.platform} not supported, skipping account ${account.id}`);
- skipped++;
- continue;
- }
-
- try {
- // 创建 sync_account 任务加入队列(静默执行,前台不弹框)
- taskQueueService.createTask(account.userId, {
- type: 'sync_account',
- title: `自动刷新: ${account.accountName || account.platform}`,
- description: `定时刷新账号 ${account.accountName} 的状态和信息`,
- priority: 'low', // 自动任务使用低优先级,不影响用户主动操作
- silent: true, // 静默执行,前台不弹框显示
- accountId: account.id,
- });
-
- tasksCreated++;
- logger.debug(`[Scheduler] Created refresh task for account ${account.id} (${account.accountName})`);
-
- } catch (error) {
- logger.error(`[Scheduler] Failed to create refresh task for account ${account.id}:`, error);
- }
- }
-
- logger.info(`[Scheduler] Account refresh tasks created: ${tasksCreated} tasks, ${skipped} skipped`);
- } finally {
- // 释放锁
- this.isRefreshingAccounts = false;
- logger.debug('[Scheduler] Released refresh lock');
- }
- }
-
- /**
- * 采集数据统计
- */
- private async collectAnalytics(): Promise<void> {
- const accountRepository = AppDataSource.getRepository(PlatformAccount);
- const analyticsRepository = AppDataSource.getRepository(AnalyticsData);
-
- const accounts = await accountRepository.find({
- where: { status: 'active' },
- });
-
- const today = new Date().toISOString().split('T')[0];
-
- for (const account of accounts) {
- if (!isPlatformSupported(account.platform)) continue;
-
- try {
- const adapter = getAdapter(account.platform);
- const data = await adapter.getAnalytics(account.cookieData || '', {
- startDate: today,
- endDate: today,
- });
-
- // 保存或更新数据
- const existing = await analyticsRepository.findOne({
- where: { accountId: account.id, date: today },
- });
-
- if (existing) {
- await analyticsRepository.update(existing.id, {
- fansCount: data.fansCount,
- fansIncrease: data.fansIncrease,
- viewsCount: data.viewsCount,
- likesCount: data.likesCount,
- commentsCount: data.commentsCount,
- sharesCount: data.sharesCount,
- income: data.income || 0,
- });
- } else {
- await analyticsRepository.save({
- userId: account.userId,
- accountId: account.id,
- date: today,
- fansCount: data.fansCount,
- fansIncrease: data.fansIncrease,
- viewsCount: data.viewsCount,
- likesCount: data.likesCount,
- commentsCount: data.commentsCount,
- sharesCount: data.sharesCount,
- income: data.income || 0,
- });
- }
-
- wsManager.sendToUser(account.userId, WS_EVENTS.ANALYTICS_UPDATED, {
- accountId: account.id,
- });
-
- } catch (error) {
- logger.error(`Collect analytics for account ${account.id} failed:`, error);
- }
- }
- }
- /**
- * 小红书:账号概览导出(近30日)→ 导入 user_day_statistics
- */
- private async importXhsAccountOverviewLast30Days(): Promise<void> {
- if (this.isXhsImportRunning) {
- logger.info('[Scheduler] XHS import is already running, skipping this cycle...');
- return;
- }
- this.isXhsImportRunning = true;
- try {
- await XiaohongshuAccountOverviewImportService.runDailyImport();
- } finally {
- this.isXhsImportRunning = false;
- }
- }
- /**
- * 小红书:作品维度「笔记详情-按天」→ 导入 work_day_statistics
- */
- private async importXhsWorkNoteStatistics(): Promise<void> {
- if (this.isXhsWorkImportRunning) {
- logger.info('[Scheduler] XHS work note statistics import is already running, skipping...');
- return;
- }
- this.isXhsWorkImportRunning = true;
- try {
- await XiaohongshuWorkNoteStatisticsImportService.runDailyImport();
- } finally {
- this.isXhsWorkImportRunning = false;
- }
- }
- /**
- * 抖音:账号总览-短视频-数据表现导出(近30天)→ 导入 user_day_statistics
- */
- private async importDyAccountOverviewLast30Days(): Promise<void> {
- if (this.isDyImportRunning) {
- logger.info('[Scheduler] Douyin import is already running, skipping this cycle...');
- return;
- }
- this.isDyImportRunning = true;
- try {
- await DouyinAccountOverviewImportService.runDailyImport();
- } finally {
- this.isDyImportRunning = false;
- }
- }
-
- /**
- * 自动回复私信(每5分钟执行一次)
- * 只处理微信视频号平台的账号
- */
- private async autoReplyMessages(): Promise<void> {
- // 检查是否正在执行回复任务
- if (this.isAutoReplying) {
- logger.info('[Scheduler] Auto reply is already running, skipping this cycle...');
- return;
- }
-
- // 获取锁
- this.isAutoReplying = true;
- logger.debug('[Scheduler] Acquired auto reply lock');
-
- try {
- const accountRepository = AppDataSource.getRepository(PlatformAccount);
-
- // 只获取微信视频号的活跃账号
- const accounts = await accountRepository.find({
- where: {
- // platform: 'weixin_video',
- userId: 2,
- status: 'active',
- },
- });
-
- if (accounts.length === 0) {
- logger.info('[Scheduler] No active accounts for auto reply');
- return;
- }
-
- logger.info(`[Scheduler] Starting auto reply for ${accounts.length} accounts...`);
-
- let successCount = 0;
- let failCount = 0;
-
- // 为每个账号执行自动回复
- for (const account of accounts) {
- try {
- logger.info(`[Scheduler] Auto replying for account: ${account.accountName} (${account.id})`);
-
- // Python 服务端使用 weixin,不是 weixin_video
- const pythonPlatform = account.platform === 'weixin_video' ? 'weixin' : account.platform;
-
- // 调用 Python 服务执行自动回复
- const response = await fetch('http://localhost:5005/auto-reply', {
- method: 'POST',
- headers: {
- 'Content-Type': 'application/json',
- },
- body: JSON.stringify({
- platform: pythonPlatform,
- cookie: account.cookieData || '',
- }),
- signal: AbortSignal.timeout(120000), // 2分钟超时
- });
-
- if (!response.ok) {
- throw new Error(`HTTP ${response.status}`);
- }
-
- const result = await response.json();
-
- if (result.success) {
- successCount++;
- logger.info(`[Scheduler] Auto reply success for ${account.accountName}: ${result.replied_count} messages`);
- } else {
- failCount++;
- logger.error(`[Scheduler] Auto reply failed for ${account.accountName}: ${result.error}`);
- }
-
- } catch (error) {
- failCount++;
- logger.error(`[Scheduler] Auto reply error for account ${account.id}:`, error);
- }
- }
-
- logger.info(`[Scheduler] Auto reply completed: ${successCount} success, ${failCount} failed`);
- } finally {
- // 释放锁
- this.isAutoReplying = false;
- logger.debug('[Scheduler] Released auto reply lock');
- }
- }
- /**
- * 百家号:内容分析-基础数据导出(近30天)→ 导入 user_day_statistics
- */
- private async importBaijiahaoContentOverviewLast30Days(): Promise<void> {
- if (this.isBjImportRunning) {
- logger.info('[Scheduler] Baijiahao import is already running, skipping this cycle...');
- return;
- }
- this.isBjImportRunning = true;
- try {
- await BaijiahaoContentOverviewImportService.runDailyImport();
- } finally {
- this.isBjImportRunning = false;
- }
- }
- /**
- * 视频号:数据中心-关注者/视频/图文 的增长详情(近30天)→ 导入 user_day_statistics
- */
- private async importWeixinVideoDataCenterLast30Days(): Promise<void> {
- if (this.isWxImportRunning) {
- logger.info('[Scheduler] Weixin video import is already running, skipping this cycle...');
- return;
- }
- this.isWxImportRunning = true;
- try {
- await WeixinVideoDataCenterImportService.runDailyImport();
- } finally {
- this.isWxImportRunning = false;
- }
- }
- }
- export const taskScheduler = new TaskScheduler();
|