index.ts 19 KB


  1. import schedule from 'node-schedule';
  2. import { AppDataSource, PublishTask, PlatformAccount, AnalyticsData } from '../models/index.js';
  3. import { logger } from '../utils/logger.js';
  4. import { wsManager } from '../websocket/index.js';
  5. import { WS_EVENTS } from '@media-manager/shared';
  6. import { getAdapter, isPlatformSupported } from '../automation/platforms/index.js';
  7. import { LessThanOrEqual, In } from 'typeorm';
  8. import { taskQueueService } from '../services/TaskQueueService.js';
  9. import { XiaohongshuAccountOverviewImportService } from '../services/XiaohongshuAccountOverviewImportService.js';
  10. import { DouyinAccountOverviewImportService } from '../services/DouyinAccountOverviewImportService.js';
  11. import { BaijiahaoContentOverviewImportService } from '../services/BaijiahaoContentOverviewImportService.js';
  12. import { WeixinVideoDataCenterImportService } from '../services/WeixinVideoDataCenterImportService.js';
  13. import { XiaohongshuWorkNoteStatisticsImportService } from '../services/XiaohongshuWorkNoteStatisticsImportService.js';
  14. import { DouyinWorkStatisticsImportService } from '../services/DouyinWorkStatisticsImportService.js';
  15. /**
  16. * 定时任务调度器
  17. */
  18. export class TaskScheduler {
  19. private jobs: Map<string, schedule.Job> = new Map();
  20. private isRefreshingAccounts = false; // 账号刷新锁,防止任务重叠执行
  21. private isXhsImportRunning = false; // 小红书导入锁,防止任务重叠执行
  22. private isXhsWorkImportRunning = false; // 小红书作品日统计导入锁
  23. private isDyImportRunning = false; // 抖音导入锁,防止任务重叠执行
  24. private isDyWorkImportRunning = false; // 抖音作品日统计导入锁
  25. private isBjImportRunning = false; // 百家号导入锁,防止任务重叠执行
  26. private isWxImportRunning = false; // 视频号导入锁,防止任务重叠执行
  27. private isAutoReplying = false; // 私信回复锁,防止任务重叠执行
  28. /**
  29. * 启动调度器
  30. *
  31. * 注意:账号刷新任务由客户端定时触发,只刷新当前登录用户的账号
  32. * 服务端不再自动刷新所有用户的账号
  33. */
  34. start(): void {
  35. logger.info('[Scheduler] ========================================');
  36. logger.info('[Scheduler] Starting task scheduler...');
  37. // 每分钟检查定时发布任务(只处理到期的定时发布任务)
  38. this.scheduleJob('check-publish-tasks', '* * * * *', this.checkPublishTasks.bind(this));
  39. // 每天中午 12 点:批量导出小红书“账号概览-笔记数据-观看数据-近30日”,导入 user_day_statistics
  40. // 注意:node-schedule 使用服务器本地时区
  41. this.scheduleJob('xhs-account-overview-import', '0 12 * * *', this.importXhsAccountOverviewLast30Days.bind(this));
  42. // 每天 12:40:同步小红书作品维度的「笔记详情-按天」数据,写入 work_day_statistics
  43. this.scheduleJob(
  44. 'xhs-work-note-statistics-import',
  45. '40 12 * * *',
  46. this.importXhsWorkNoteStatistics.bind(this)
  47. );
  48. // 每天 12:10:批量导出抖音“数据中心-账号总览-短视频-数据表现-近30天”,导入 user_day_statistics
  49. this.scheduleJob('dy-account-overview-import', '10 12 * * *', this.importDyAccountOverviewLast30Days.bind(this));
  50. // 每天 12:50:同步抖音作品维度的「作品详情-按天」数据,写入 work_day_statistics
  51. this.scheduleJob('dy-work-statistics-import', '50 12 * * *', this.importDyWorkStatistics.bind(this));
  52. // 每天 12:20:批量导出百家号“数据中心-内容分析-基础数据-近30天”,导入 user_day_statistics
  53. this.scheduleJob('bj-content-overview-import', '20 12 * * *', this.importBaijiahaoContentOverviewLast30Days.bind(this));
  54. // 每天 12:30:批量导出视频号“数据中心-各子菜单-增长详情(数据详情)-近30天-下载表格”,导入 user_day_statistics
  55. this.scheduleJob('wx-video-data-center-import', '30 12 * * *', this.importWeixinVideoDataCenterLast30Days.bind(this));
  56. this.scheduleJob('auto-reply-messages', '* * * * *', this.autoReplyMessages.bind(this));
  57. // 注意:账号刷新由客户端定时触发,不在服务端自动执行
  58. // 这样可以确保只刷新当前登录用户的账号,避免处理其他用户的数据
  59. // 每天凌晨2点采集数据统计(可选,如果需要服务端采集可以启用)
  60. // this.scheduleJob('collect-analytics', '0 2 * * *', this.collectAnalytics.bind(this));
  61. logger.info('[Scheduler] Scheduled jobs:');
  62. logger.info('[Scheduler] - check-publish-tasks: every minute (* * * * *)');
  63. logger.info('[Scheduler] - xhs-account-overview-import: daily at 12:00 (0 12 * * *)');
  64. logger.info(
  65. '[Scheduler] - xhs-work-note-statistics-import: daily at 12:40 (40 12 * * *)'
  66. );
  67. logger.info('[Scheduler] - dy-account-overview-import: daily at 12:10 (10 12 * * *)');
  68. logger.info('[Scheduler] - dy-work-statistics-import: daily at 12:50 (50 12 * * *)');
  69. logger.info('[Scheduler] - bj-content-overview-import: daily at 12:20 (20 12 * * *)');
  70. logger.info('[Scheduler] - wx-video-data-center-import: daily at 12:30 (30 12 * * *)');
  71. logger.info('[Scheduler] - auto-reply-messages: every minute (* * * * *)');
  72. logger.info('[Scheduler] Note: Account refresh is triggered by client, not server');
  73. logger.info('[Scheduler] ========================================');
  74. logger.info('[Scheduler] Task scheduler started successfully');
  75. }
  76. /**
  77. * 停止调度器
  78. */
  79. stop(): void {
  80. this.jobs.forEach((job, name) => {
  81. job.cancel();
  82. logger.info(`Cancelled job: ${name}`);
  83. });
  84. this.jobs.clear();
  85. logger.info('Task scheduler stopped');
  86. }
  87. /**
  88. * 添加定时任务
  89. */
  90. private scheduleJob(name: string, cron: string, handler: () => Promise<void>): void {
  91. const job = schedule.scheduleJob(cron, async () => {
  92. logger.info(`Running scheduled job: ${name}`);
  93. try {
  94. await handler();
  95. logger.info(`Completed job: ${name}`);
  96. } catch (error) {
  97. logger.error(`Job ${name} failed:`, error);
  98. }
  99. });
  100. if (job) {
  101. this.jobs.set(name, job);
  102. }
  103. }
  104. /**
  105. * 检查定时发布任务
  106. */
  107. private async checkPublishTasks(): Promise<void> {
  108. const taskRepository = AppDataSource.getRepository(PublishTask);
  109. // 获取需要执行的任务
  110. const tasks = await taskRepository.find({
  111. where: {
  112. status: 'pending',
  113. scheduledAt: LessThanOrEqual(new Date()),
  114. },
  115. relations: ['results'],
  116. });
  117. for (const task of tasks) {
  118. logger.info(`Executing scheduled task: ${task.id}`);
  119. // 更新状态为处理中
  120. await taskRepository.update(task.id, { status: 'processing' });
  121. wsManager.sendToUser(task.userId, WS_EVENTS.TASK_STATUS_CHANGED, {
  122. taskId: task.id,
  123. status: 'processing',
  124. });
  125. // 执行发布
  126. await this.executePublishTask(task);
  127. }
  128. }
  129. /**
  130. * 执行发布任务
  131. */
  132. private async executePublishTask(task: PublishTask): Promise<void> {
  133. const taskRepository = AppDataSource.getRepository(PublishTask);
  134. const accountRepository = AppDataSource.getRepository(PlatformAccount);
  135. const targetAccounts = task.targetAccounts || [];
  136. const accounts = await accountRepository.find({
  137. where: { id: In(targetAccounts) },
  138. });
  139. let successCount = 0;
  140. let failCount = 0;
  141. for (const account of accounts) {
  142. if (!isPlatformSupported(account.platform)) {
  143. logger.warn(`Platform ${account.platform} not supported`);
  144. failCount++;
  145. continue;
  146. }
  147. try {
  148. const adapter = getAdapter(account.platform);
  149. const result = await adapter.publishVideo(account.cookieData || '', {
  150. videoPath: task.videoPath || '',
  151. title: task.title || '',
  152. description: task.description || undefined,
  153. coverPath: task.coverPath || undefined,
  154. tags: task.tags || undefined,
  155. });
  156. if (result.success) {
  157. successCount++;
  158. } else {
  159. failCount++;
  160. }
  161. // 更新发布结果
  162. // TODO: 更新 publish_results 表
  163. } catch (error) {
  164. logger.error(`Publish to ${account.platform} failed:`, error);
  165. failCount++;
  166. }
  167. }
  168. // 更新任务状态
  169. const finalStatus = failCount === 0 ? 'completed' : (successCount === 0 ? 'failed' : 'completed');
  170. await taskRepository.update(task.id, {
  171. status: finalStatus,
  172. publishedAt: new Date(),
  173. });
  174. wsManager.sendToUser(task.userId, WS_EVENTS.TASK_STATUS_CHANGED, {
  175. taskId: task.id,
  176. status: finalStatus,
  177. });
  178. }
  179. /**
  180. * 刷新账号状态和信息
  181. * 将每个账号的刷新任务加入到任务队列中执行
  182. * 通过任务队列控制并发,避免浏览器资源竞争
  183. */
  184. private async refreshAccounts(): Promise<void> {
  185. // 检查是否正在执行刷新任务
  186. if (this.isRefreshingAccounts) {
  187. logger.info('[Scheduler] Account refresh is already running, skipping this cycle...');
  188. return;
  189. }
  190. // 获取锁
  191. this.isRefreshingAccounts = true;
  192. logger.debug('[Scheduler] Acquired refresh lock');
  193. try {
  194. const accountRepository = AppDataSource.getRepository(PlatformAccount);
  195. // 获取所有账号,不过滤 status,让刷新任务自动检测并更新状态
  196. const accounts = await accountRepository.find();
  197. if (accounts.length === 0) {
  198. logger.info('[Scheduler] No active accounts to refresh');
  199. return;
  200. }
  201. logger.info(`[Scheduler] Creating refresh tasks for ${accounts.length} active accounts...`);
  202. let tasksCreated = 0;
  203. let skipped = 0;
  204. // 为每个账号创建刷新任务,加入任务队列
  205. for (const account of accounts) {
  206. if (!isPlatformSupported(account.platform)) {
  207. logger.debug(`[Scheduler] Platform ${account.platform} not supported, skipping account ${account.id}`);
  208. skipped++;
  209. continue;
  210. }
  211. try {
  212. // 创建 sync_account 任务加入队列(静默执行,前台不弹框)
  213. taskQueueService.createTask(account.userId, {
  214. type: 'sync_account',
  215. title: `自动刷新: ${account.accountName || account.platform}`,
  216. description: `定时刷新账号 ${account.accountName} 的状态和信息`,
  217. priority: 'low', // 自动任务使用低优先级,不影响用户主动操作
  218. silent: true, // 静默执行,前台不弹框显示
  219. accountId: account.id,
  220. });
  221. tasksCreated++;
  222. logger.debug(`[Scheduler] Created refresh task for account ${account.id} (${account.accountName})`);
  223. } catch (error) {
  224. logger.error(`[Scheduler] Failed to create refresh task for account ${account.id}:`, error);
  225. }
  226. }
  227. logger.info(`[Scheduler] Account refresh tasks created: ${tasksCreated} tasks, ${skipped} skipped`);
  228. } finally {
  229. // 释放锁
  230. this.isRefreshingAccounts = false;
  231. logger.debug('[Scheduler] Released refresh lock');
  232. }
  233. }
  234. /**
  235. * 采集数据统计
  236. */
  237. private async collectAnalytics(): Promise<void> {
  238. const accountRepository = AppDataSource.getRepository(PlatformAccount);
  239. const analyticsRepository = AppDataSource.getRepository(AnalyticsData);
  240. const accounts = await accountRepository.find({
  241. where: { status: 'active' },
  242. });
  243. const today = new Date().toISOString().split('T')[0];
  244. for (const account of accounts) {
  245. if (!isPlatformSupported(account.platform)) continue;
  246. try {
  247. const adapter = getAdapter(account.platform);
  248. const data = await adapter.getAnalytics(account.cookieData || '', {
  249. startDate: today,
  250. endDate: today,
  251. });
  252. // 保存或更新数据
  253. const existing = await analyticsRepository.findOne({
  254. where: { accountId: account.id, date: today },
  255. });
  256. if (existing) {
  257. await analyticsRepository.update(existing.id, {
  258. fansCount: data.fansCount,
  259. fansIncrease: data.fansIncrease,
  260. viewsCount: data.viewsCount,
  261. likesCount: data.likesCount,
  262. commentsCount: data.commentsCount,
  263. sharesCount: data.sharesCount,
  264. income: data.income || 0,
  265. });
  266. } else {
  267. await analyticsRepository.save({
  268. userId: account.userId,
  269. accountId: account.id,
  270. date: today,
  271. fansCount: data.fansCount,
  272. fansIncrease: data.fansIncrease,
  273. viewsCount: data.viewsCount,
  274. likesCount: data.likesCount,
  275. commentsCount: data.commentsCount,
  276. sharesCount: data.sharesCount,
  277. income: data.income || 0,
  278. });
  279. }
  280. wsManager.sendToUser(account.userId, WS_EVENTS.ANALYTICS_UPDATED, {
  281. accountId: account.id,
  282. });
  283. } catch (error) {
  284. logger.error(`Collect analytics for account ${account.id} failed:`, error);
  285. }
  286. }
  287. }
  288. /**
  289. * 小红书:账号概览导出(近30日)→ 导入 user_day_statistics
  290. */
  291. private async importXhsAccountOverviewLast30Days(): Promise<void> {
  292. if (this.isXhsImportRunning) {
  293. logger.info('[Scheduler] XHS import is already running, skipping this cycle...');
  294. return;
  295. }
  296. this.isXhsImportRunning = true;
  297. try {
  298. await XiaohongshuAccountOverviewImportService.runDailyImport();
  299. } finally {
  300. this.isXhsImportRunning = false;
  301. }
  302. }
  303. /**
  304. * 小红书:作品维度「笔记详情-按天」→ 导入 work_day_statistics
  305. */
  306. private async importXhsWorkNoteStatistics(): Promise<void> {
  307. if (this.isXhsWorkImportRunning) {
  308. logger.info('[Scheduler] XHS work note statistics import is already running, skipping...');
  309. return;
  310. }
  311. this.isXhsWorkImportRunning = true;
  312. try {
  313. await XiaohongshuWorkNoteStatisticsImportService.runDailyImport();
  314. } finally {
  315. this.isXhsWorkImportRunning = false;
  316. }
  317. }
  318. /**
  319. * 抖音:账号总览-短视频-数据表现导出(近30天)→ 导入 user_day_statistics
  320. */
  321. private async importDyAccountOverviewLast30Days(): Promise<void> {
  322. if (this.isDyImportRunning) {
  323. logger.info('[Scheduler] Douyin import is already running, skipping this cycle...');
  324. return;
  325. }
  326. this.isDyImportRunning = true;
  327. try {
  328. await DouyinAccountOverviewImportService.runDailyImport();
  329. } finally {
  330. this.isDyImportRunning = false;
  331. }
  332. }
  333. /**
  334. * 抖音:作品维度「作品详情-按天」→ 导入 work_day_statistics
  335. */
  336. private async importDyWorkStatistics(): Promise<void> {
  337. if (this.isDyWorkImportRunning) {
  338. logger.info('[Scheduler] Douyin work statistics import is already running, skipping...');
  339. return;
  340. }
  341. this.isDyWorkImportRunning = true;
  342. try {
  343. await DouyinWorkStatisticsImportService.runDailyImport();
  344. } finally {
  345. this.isDyWorkImportRunning = false;
  346. }
  347. }
  348. /**
  349. * 自动回复私信(每5分钟执行一次)
  350. * 只处理微信视频号平台的账号
  351. */
  352. private async autoReplyMessages(): Promise<void> {
  353. // 检查是否正在执行回复任务
  354. if (this.isAutoReplying) {
  355. logger.info('[Scheduler] Auto reply is already running, skipping this cycle...');
  356. return;
  357. }
  358. // 获取锁
  359. this.isAutoReplying = true;
  360. logger.debug('[Scheduler] Acquired auto reply lock');
  361. try {
  362. const accountRepository = AppDataSource.getRepository(PlatformAccount);
  363. // 只获取微信视频号的活跃账号
  364. const accounts = await accountRepository.find({
  365. where: {
  366. // platform: 'weixin_video',
  367. userId: 2,
  368. status: 'active',
  369. },
  370. });
  371. if (accounts.length === 0) {
  372. logger.info('[Scheduler] No active accounts for auto reply');
  373. return;
  374. }
  375. logger.info(`[Scheduler] Starting auto reply for ${accounts.length} accounts...`);
  376. let successCount = 0;
  377. let failCount = 0;
  378. // 为每个账号执行自动回复
  379. for (const account of accounts) {
  380. try {
  381. logger.info(`[Scheduler] Auto replying for account: ${account.accountName} (${account.id})`);
  382. // Python 服务端使用 weixin,不是 weixin_video
  383. const pythonPlatform = account.platform === 'weixin_video' ? 'weixin' : account.platform;
  384. // 调用 Python 服务执行自动回复
  385. const response = await fetch('http://localhost:5005/auto-reply', {
  386. method: 'POST',
  387. headers: {
  388. 'Content-Type': 'application/json',
  389. },
  390. body: JSON.stringify({
  391. platform: pythonPlatform,
  392. cookie: account.cookieData || '',
  393. }),
  394. signal: AbortSignal.timeout(120000), // 2分钟超时
  395. });
  396. if (!response.ok) {
  397. throw new Error(`HTTP ${response.status}`);
  398. }
  399. const result = await response.json();
  400. if (result.success) {
  401. successCount++;
  402. logger.info(`[Scheduler] Auto reply success for ${account.accountName}: ${result.replied_count} messages`);
  403. } else {
  404. failCount++;
  405. logger.error(`[Scheduler] Auto reply failed for ${account.accountName}: ${result.error}`);
  406. }
  407. } catch (error) {
  408. failCount++;
  409. logger.error(`[Scheduler] Auto reply error for account ${account.id}:`, error);
  410. }
  411. }
  412. logger.info(`[Scheduler] Auto reply completed: ${successCount} success, ${failCount} failed`);
  413. } finally {
  414. // 释放锁
  415. this.isAutoReplying = false;
  416. logger.debug('[Scheduler] Released auto reply lock');
  417. }
  418. }
  419. /**
  420. * 百家号:内容分析-基础数据导出(近30天)→ 导入 user_day_statistics
  421. */
  422. private async importBaijiahaoContentOverviewLast30Days(): Promise<void> {
  423. if (this.isBjImportRunning) {
  424. logger.info('[Scheduler] Baijiahao import is already running, skipping this cycle...');
  425. return;
  426. }
  427. this.isBjImportRunning = true;
  428. try {
  429. await BaijiahaoContentOverviewImportService.runDailyImport();
  430. } finally {
  431. this.isBjImportRunning = false;
  432. }
  433. }
  434. /**
  435. * 视频号:数据中心-关注者/视频/图文 的增长详情(近30天)→ 导入 user_day_statistics
  436. */
  437. private async importWeixinVideoDataCenterLast30Days(): Promise<void> {
  438. if (this.isWxImportRunning) {
  439. logger.info('[Scheduler] Weixin video import is already running, skipping this cycle...');
  440. return;
  441. }
  442. this.isWxImportRunning = true;
  443. try {
  444. await WeixinVideoDataCenterImportService.runDailyImport();
  445. } finally {
  446. this.isWxImportRunning = false;
  447. }
  448. }
  449. }
  450. export const taskScheduler = new TaskScheduler();