import schedule from 'node-schedule'; import { AppDataSource, PublishTask, PlatformAccount, AnalyticsData } from '../models/index.js'; import { logger } from '../utils/logger.js'; import { wsManager } from '../websocket/index.js'; import { WS_EVENTS } from '@media-manager/shared'; import { getAdapter, isPlatformSupported } from '../automation/platforms/index.js'; import { LessThanOrEqual, In } from 'typeorm'; import { taskQueueService } from '../services/TaskQueueService.js'; import { XiaohongshuAccountOverviewImportService } from '../services/XiaohongshuAccountOverviewImportService.js'; import { DouyinAccountOverviewImportService } from '../services/DouyinAccountOverviewImportService.js'; import { BaijiahaoContentOverviewImportService } from '../services/BaijiahaoContentOverviewImportService.js'; import { WeixinVideoDataCenterImportService } from '../services/WeixinVideoDataCenterImportService.js'; import { XiaohongshuWorkNoteStatisticsImportService } from '../services/XiaohongshuWorkNoteStatisticsImportService.js'; /** * 定时任务调度器 */ export class TaskScheduler { private jobs: Map = new Map(); private isRefreshingAccounts = false; // 账号刷新锁,防止任务重叠执行 private isXhsImportRunning = false; // 小红书导入锁,防止任务重叠执行 private isXhsWorkImportRunning = false; // 小红书作品日统计导入锁 private isDyImportRunning = false; // 抖音导入锁,防止任务重叠执行 private isBjImportRunning = false; // 百家号导入锁,防止任务重叠执行 private isWxImportRunning = false; // 视频号导入锁,防止任务重叠执行 private isAutoReplying = false; // 私信回复锁,防止任务重叠执行 /** * 启动调度器 * * 注意:账号刷新任务由客户端定时触发,只刷新当前登录用户的账号 * 服务端不再自动刷新所有用户的账号 */ start(): void { logger.info('[Scheduler] ========================================'); logger.info('[Scheduler] Starting task scheduler...'); // 每分钟检查定时发布任务(只处理到期的定时发布任务) this.scheduleJob('check-publish-tasks', '* * * * *', this.checkPublishTasks.bind(this)); // 每天中午 12 点:批量导出小红书“账号概览-笔记数据-观看数据-近30日”,导入 user_day_statistics // 注意:node-schedule 使用服务器本地时区 this.scheduleJob('xhs-account-overview-import', '0 12 * * *', this.importXhsAccountOverviewLast30Days.bind(this)); // 每天 12:40:同步小红书作品维度的「笔记详情-按天」数据,写入 work_day_statistics this.scheduleJob( 'xhs-work-note-statistics-import', '40 12 * * *', this.importXhsWorkNoteStatistics.bind(this) ); // 每天 12:10:批量导出抖音“数据中心-账号总览-短视频-数据表现-近30天”,导入 user_day_statistics this.scheduleJob('dy-account-overview-import', '10 12 * * *', this.importDyAccountOverviewLast30Days.bind(this)); // 每天 12:20:批量导出百家号“数据中心-内容分析-基础数据-近30天”,导入 user_day_statistics this.scheduleJob('bj-content-overview-import', '20 12 * * *', this.importBaijiahaoContentOverviewLast30Days.bind(this)); // 每天 12:30:批量导出视频号“数据中心-各子菜单-增长详情(数据详情)-近30天-下载表格”,导入 user_day_statistics this.scheduleJob('wx-video-data-center-import', '30 12 * * *', this.importWeixinVideoDataCenterLast30Days.bind(this)); this.scheduleJob('auto-reply-messages', '* * * * *', this.autoReplyMessages.bind(this)); // 注意:账号刷新由客户端定时触发,不在服务端自动执行 // 这样可以确保只刷新当前登录用户的账号,避免处理其他用户的数据 // 每天凌晨2点采集数据统计(可选,如果需要服务端采集可以启用) // this.scheduleJob('collect-analytics', '0 2 * * *', this.collectAnalytics.bind(this)); logger.info('[Scheduler] Scheduled jobs:'); logger.info('[Scheduler] - check-publish-tasks: every minute (* * * * *)'); logger.info('[Scheduler] - xhs-account-overview-import: daily at 12:00 (0 12 * * *)'); logger.info( '[Scheduler] - xhs-work-note-statistics-import: daily at 12:40 (40 12 * * *)' ); logger.info('[Scheduler] - dy-account-overview-import: daily at 12:10 (10 12 * * *)'); logger.info('[Scheduler] - bj-content-overview-import: daily at 12:20 (20 12 * * *)'); logger.info('[Scheduler] - wx-video-data-center-import: daily at 12:30 (30 12 * * *)'); logger.info('[Scheduler] - auto-reply-messages: every minute (* * * * *)'); logger.info('[Scheduler] Note: Account refresh is triggered by client, not server'); logger.info('[Scheduler] ========================================'); logger.info('[Scheduler] Task scheduler started successfully'); } /** * 停止调度器 */ stop(): void { this.jobs.forEach((job, name) => { job.cancel(); logger.info(`Cancelled job: ${name}`); }); this.jobs.clear(); logger.info('Task scheduler stopped'); } /** * 添加定时任务 */ private scheduleJob(name: string, cron: string, handler: () => Promise): void { const job = schedule.scheduleJob(cron, async () => { logger.info(`Running scheduled job: ${name}`); try { await handler(); logger.info(`Completed job: ${name}`); } catch (error) { logger.error(`Job ${name} failed:`, error); } }); if (job) { this.jobs.set(name, job); } } /** * 检查定时发布任务 */ private async checkPublishTasks(): Promise { const taskRepository = AppDataSource.getRepository(PublishTask); // 获取需要执行的任务 const tasks = await taskRepository.find({ where: { status: 'pending', scheduledAt: LessThanOrEqual(new Date()), }, relations: ['results'], }); for (const task of tasks) { logger.info(`Executing scheduled task: ${task.id}`); // 更新状态为处理中 await taskRepository.update(task.id, { status: 'processing' }); wsManager.sendToUser(task.userId, WS_EVENTS.TASK_STATUS_CHANGED, { taskId: task.id, status: 'processing', }); // 执行发布 await this.executePublishTask(task); } } /** * 执行发布任务 */ private async executePublishTask(task: PublishTask): Promise { const taskRepository = AppDataSource.getRepository(PublishTask); const accountRepository = AppDataSource.getRepository(PlatformAccount); const targetAccounts = task.targetAccounts || []; const accounts = await accountRepository.find({ where: { id: In(targetAccounts) }, }); let successCount = 0; let failCount = 0; for (const account of accounts) { if (!isPlatformSupported(account.platform)) { logger.warn(`Platform ${account.platform} not supported`); failCount++; continue; } try { const adapter = getAdapter(account.platform); const result = await adapter.publishVideo(account.cookieData || '', { videoPath: task.videoPath || '', title: task.title || '', description: task.description || undefined, coverPath: task.coverPath || undefined, tags: task.tags || undefined, }); if (result.success) { successCount++; } else { failCount++; } // 更新发布结果 // TODO: 更新 publish_results 表 } catch (error) { logger.error(`Publish to ${account.platform} failed:`, error); failCount++; } } // 更新任务状态 const finalStatus = failCount === 0 ? 'completed' : (successCount === 0 ? 'failed' : 'completed'); await taskRepository.update(task.id, { status: finalStatus, publishedAt: new Date(), }); wsManager.sendToUser(task.userId, WS_EVENTS.TASK_STATUS_CHANGED, { taskId: task.id, status: finalStatus, }); } /** * 刷新账号状态和信息 * 将每个账号的刷新任务加入到任务队列中执行 * 通过任务队列控制并发,避免浏览器资源竞争 */ private async refreshAccounts(): Promise { // 检查是否正在执行刷新任务 if (this.isRefreshingAccounts) { logger.info('[Scheduler] Account refresh is already running, skipping this cycle...'); return; } // 获取锁 this.isRefreshingAccounts = true; logger.debug('[Scheduler] Acquired refresh lock'); try { const accountRepository = AppDataSource.getRepository(PlatformAccount); // 获取所有账号,不过滤 status,让刷新任务自动检测并更新状态 const accounts = await accountRepository.find(); if (accounts.length === 0) { logger.info('[Scheduler] No active accounts to refresh'); return; } logger.info(`[Scheduler] Creating refresh tasks for ${accounts.length} active accounts...`); let tasksCreated = 0; let skipped = 0; // 为每个账号创建刷新任务,加入任务队列 for (const account of accounts) { if (!isPlatformSupported(account.platform)) { logger.debug(`[Scheduler] Platform ${account.platform} not supported, skipping account ${account.id}`); skipped++; continue; } try { // 创建 sync_account 任务加入队列(静默执行,前台不弹框) taskQueueService.createTask(account.userId, { type: 'sync_account', title: `自动刷新: ${account.accountName || account.platform}`, description: `定时刷新账号 ${account.accountName} 的状态和信息`, priority: 'low', // 自动任务使用低优先级,不影响用户主动操作 silent: true, // 静默执行,前台不弹框显示 accountId: account.id, }); tasksCreated++; logger.debug(`[Scheduler] Created refresh task for account ${account.id} (${account.accountName})`); } catch (error) { logger.error(`[Scheduler] Failed to create refresh task for account ${account.id}:`, error); } } logger.info(`[Scheduler] Account refresh tasks created: ${tasksCreated} tasks, ${skipped} skipped`); } finally { // 释放锁 this.isRefreshingAccounts = false; logger.debug('[Scheduler] Released refresh lock'); } } /** * 采集数据统计 */ private async collectAnalytics(): Promise { const accountRepository = AppDataSource.getRepository(PlatformAccount); const analyticsRepository = AppDataSource.getRepository(AnalyticsData); const accounts = await accountRepository.find({ where: { status: 'active' }, }); const today = new Date().toISOString().split('T')[0]; for (const account of accounts) { if (!isPlatformSupported(account.platform)) continue; try { const adapter = getAdapter(account.platform); const data = await adapter.getAnalytics(account.cookieData || '', { startDate: today, endDate: today, }); // 保存或更新数据 const existing = await analyticsRepository.findOne({ where: { accountId: account.id, date: today }, }); if (existing) { await analyticsRepository.update(existing.id, { fansCount: data.fansCount, fansIncrease: data.fansIncrease, viewsCount: data.viewsCount, likesCount: data.likesCount, commentsCount: data.commentsCount, sharesCount: data.sharesCount, income: data.income || 0, }); } else { await analyticsRepository.save({ userId: account.userId, accountId: account.id, date: today, fansCount: data.fansCount, fansIncrease: data.fansIncrease, viewsCount: data.viewsCount, likesCount: data.likesCount, commentsCount: data.commentsCount, sharesCount: data.sharesCount, income: data.income || 0, }); } wsManager.sendToUser(account.userId, WS_EVENTS.ANALYTICS_UPDATED, { accountId: account.id, }); } catch (error) { logger.error(`Collect analytics for account ${account.id} failed:`, error); } } } /** * 小红书:账号概览导出(近30日)→ 导入 user_day_statistics */ private async importXhsAccountOverviewLast30Days(): Promise { if (this.isXhsImportRunning) { logger.info('[Scheduler] XHS import is already running, skipping this cycle...'); return; } this.isXhsImportRunning = true; try { await XiaohongshuAccountOverviewImportService.runDailyImport(); } finally { this.isXhsImportRunning = false; } } /** * 小红书:作品维度「笔记详情-按天」→ 导入 work_day_statistics */ private async importXhsWorkNoteStatistics(): Promise { if (this.isXhsWorkImportRunning) { logger.info('[Scheduler] XHS work note statistics import is already running, skipping...'); return; } this.isXhsWorkImportRunning = true; try { await XiaohongshuWorkNoteStatisticsImportService.runDailyImport(); } finally { this.isXhsWorkImportRunning = false; } } /** * 抖音:账号总览-短视频-数据表现导出(近30天)→ 导入 user_day_statistics */ private async importDyAccountOverviewLast30Days(): Promise { if (this.isDyImportRunning) { logger.info('[Scheduler] Douyin import is already running, skipping this cycle...'); return; } this.isDyImportRunning = true; try { await DouyinAccountOverviewImportService.runDailyImport(); } finally { this.isDyImportRunning = false; } } /** * 自动回复私信(每5分钟执行一次) * 只处理微信视频号平台的账号 */ private async autoReplyMessages(): Promise { // 检查是否正在执行回复任务 if (this.isAutoReplying) { logger.info('[Scheduler] Auto reply is already running, skipping this cycle...'); return; } // 获取锁 this.isAutoReplying = true; logger.debug('[Scheduler] Acquired auto reply lock'); try { const accountRepository = AppDataSource.getRepository(PlatformAccount); // 只获取微信视频号的活跃账号 const accounts = await accountRepository.find({ where: { // platform: 'weixin_video', userId: 2, status: 'active', }, }); if (accounts.length === 0) { logger.info('[Scheduler] No active accounts for auto reply'); return; } logger.info(`[Scheduler] Starting auto reply for ${accounts.length} accounts...`); let successCount = 0; let failCount = 0; // 为每个账号执行自动回复 for (const account of accounts) { try { logger.info(`[Scheduler] Auto replying for account: ${account.accountName} (${account.id})`); // Python 服务端使用 weixin,不是 weixin_video const pythonPlatform = account.platform === 'weixin_video' ? 'weixin' : account.platform; // 调用 Python 服务执行自动回复 const response = await fetch('http://localhost:5005/auto-reply', { method: 'POST', headers: { 'Content-Type': 'application/json', }, body: JSON.stringify({ platform: pythonPlatform, cookie: account.cookieData || '', }), signal: AbortSignal.timeout(120000), // 2分钟超时 }); if (!response.ok) { throw new Error(`HTTP ${response.status}`); } const result = await response.json(); if (result.success) { successCount++; logger.info(`[Scheduler] Auto reply success for ${account.accountName}: ${result.replied_count} messages`); } else { failCount++; logger.error(`[Scheduler] Auto reply failed for ${account.accountName}: ${result.error}`); } } catch (error) { failCount++; logger.error(`[Scheduler] Auto reply error for account ${account.id}:`, error); } } logger.info(`[Scheduler] Auto reply completed: ${successCount} success, ${failCount} failed`); } finally { // 释放锁 this.isAutoReplying = false; logger.debug('[Scheduler] Released auto reply lock'); } } /** * 百家号:内容分析-基础数据导出(近30天)→ 导入 user_day_statistics */ private async importBaijiahaoContentOverviewLast30Days(): Promise { if (this.isBjImportRunning) { logger.info('[Scheduler] Baijiahao import is already running, skipping this cycle...'); return; } this.isBjImportRunning = true; try { await BaijiahaoContentOverviewImportService.runDailyImport(); } finally { this.isBjImportRunning = false; } } /** * 视频号:数据中心-关注者/视频/图文 的增长详情(近30天)→ 导入 user_day_statistics */ private async importWeixinVideoDataCenterLast30Days(): Promise { if (this.isWxImportRunning) { logger.info('[Scheduler] Weixin video import is already running, skipping this cycle...'); return; } this.isWxImportRunning = true; try { await WeixinVideoDataCenterImportService.runDailyImport(); } finally { this.isWxImportRunning = false; } } } export const taskScheduler = new TaskScheduler();