index.ts 18 KB


  1. import schedule from 'node-schedule';
  2. import { AppDataSource, PublishTask, PlatformAccount, AnalyticsData } from '../models/index.js';
  3. import { logger } from '../utils/logger.js';
  4. import { wsManager } from '../websocket/index.js';
  5. import { WS_EVENTS } from '@media-manager/shared';
  6. import { getAdapter, isPlatformSupported } from '../automation/platforms/index.js';
  7. import { LessThanOrEqual, In } from 'typeorm';
  8. import { taskQueueService } from '../services/TaskQueueService.js';
  9. import { XiaohongshuAccountOverviewImportService } from '../services/XiaohongshuAccountOverviewImportService.js';
  10. import { DouyinAccountOverviewImportService } from '../services/DouyinAccountOverviewImportService.js';
  11. import { BaijiahaoContentOverviewImportService } from '../services/BaijiahaoContentOverviewImportService.js';
  12. import { WeixinVideoDataCenterImportService } from '../services/WeixinVideoDataCenterImportService.js';
  13. import { XiaohongshuWorkNoteStatisticsImportService } from '../services/XiaohongshuWorkNoteStatisticsImportService.js';
  14. /**
  15. * 定时任务调度器
  16. */
  17. export class TaskScheduler {
  18. private jobs: Map<string, schedule.Job> = new Map();
  19. private isRefreshingAccounts = false; // 账号刷新锁,防止任务重叠执行
  20. private isXhsImportRunning = false; // 小红书导入锁,防止任务重叠执行
  21. private isXhsWorkImportRunning = false; // 小红书作品日统计导入锁
  22. private isDyImportRunning = false; // 抖音导入锁,防止任务重叠执行
  23. private isBjImportRunning = false; // 百家号导入锁,防止任务重叠执行
  24. private isWxImportRunning = false; // 视频号导入锁,防止任务重叠执行
  25. private isAutoReplying = false; // 私信回复锁,防止任务重叠执行
  26. /**
  27. * 启动调度器
  28. *
  29. * 注意:账号刷新任务由客户端定时触发,只刷新当前登录用户的账号
  30. * 服务端不再自动刷新所有用户的账号
  31. */
  32. start(): void {
  33. logger.info('[Scheduler] ========================================');
  34. logger.info('[Scheduler] Starting task scheduler...');
  35. // 每分钟检查定时发布任务(只处理到期的定时发布任务)
  36. this.scheduleJob('check-publish-tasks', '* * * * *', this.checkPublishTasks.bind(this));
  37. // 每天中午 12 点:批量导出小红书“账号概览-笔记数据-观看数据-近30日”,导入 user_day_statistics
  38. // 注意:node-schedule 使用服务器本地时区
  39. this.scheduleJob('xhs-account-overview-import', '0 12 * * *', this.importXhsAccountOverviewLast30Days.bind(this));
  40. // 每天 12:40:同步小红书作品维度的「笔记详情-按天」数据,写入 work_day_statistics
  41. this.scheduleJob(
  42. 'xhs-work-note-statistics-import',
  43. '40 12 * * *',
  44. this.importXhsWorkNoteStatistics.bind(this)
  45. );
  46. // 每天 12:10:批量导出抖音“数据中心-账号总览-短视频-数据表现-近30天”,导入 user_day_statistics
  47. this.scheduleJob('dy-account-overview-import', '10 12 * * *', this.importDyAccountOverviewLast30Days.bind(this));
  48. // 每天 12:20:批量导出百家号“数据中心-内容分析-基础数据-近30天”,导入 user_day_statistics
  49. this.scheduleJob('bj-content-overview-import', '20 12 * * *', this.importBaijiahaoContentOverviewLast30Days.bind(this));
  50. // 每天 12:30:批量导出视频号“数据中心-各子菜单-增长详情(数据详情)-近30天-下载表格”,导入 user_day_statistics
  51. this.scheduleJob('wx-video-data-center-import', '30 12 * * *', this.importWeixinVideoDataCenterLast30Days.bind(this));
  52. this.scheduleJob('auto-reply-messages', '* * * * *', this.autoReplyMessages.bind(this));
  53. // 注意:账号刷新由客户端定时触发,不在服务端自动执行
  54. // 这样可以确保只刷新当前登录用户的账号,避免处理其他用户的数据
  55. // 每天凌晨2点采集数据统计(可选,如果需要服务端采集可以启用)
  56. // this.scheduleJob('collect-analytics', '0 2 * * *', this.collectAnalytics.bind(this));
  57. logger.info('[Scheduler] Scheduled jobs:');
  58. logger.info('[Scheduler] - check-publish-tasks: every minute (* * * * *)');
  59. logger.info('[Scheduler] - xhs-account-overview-import: daily at 12:00 (0 12 * * *)');
  60. logger.info(
  61. '[Scheduler] - xhs-work-note-statistics-import: daily at 12:40 (40 12 * * *)'
  62. );
  63. logger.info('[Scheduler] - dy-account-overview-import: daily at 12:10 (10 12 * * *)');
  64. logger.info('[Scheduler] - bj-content-overview-import: daily at 12:20 (20 12 * * *)');
  65. logger.info('[Scheduler] - wx-video-data-center-import: daily at 12:30 (30 12 * * *)');
  66. logger.info('[Scheduler] - auto-reply-messages: every minute (* * * * *)');
  67. logger.info('[Scheduler] Note: Account refresh is triggered by client, not server');
  68. logger.info('[Scheduler] ========================================');
  69. logger.info('[Scheduler] Task scheduler started successfully');
  70. }
  71. /**
  72. * 停止调度器
  73. */
  74. stop(): void {
  75. this.jobs.forEach((job, name) => {
  76. job.cancel();
  77. logger.info(`Cancelled job: ${name}`);
  78. });
  79. this.jobs.clear();
  80. logger.info('Task scheduler stopped');
  81. }
  82. /**
  83. * 添加定时任务
  84. */
  85. private scheduleJob(name: string, cron: string, handler: () => Promise<void>): void {
  86. const job = schedule.scheduleJob(cron, async () => {
  87. logger.info(`Running scheduled job: ${name}`);
  88. try {
  89. await handler();
  90. logger.info(`Completed job: ${name}`);
  91. } catch (error) {
  92. logger.error(`Job ${name} failed:`, error);
  93. }
  94. });
  95. if (job) {
  96. this.jobs.set(name, job);
  97. }
  98. }
  99. /**
  100. * 检查定时发布任务
  101. */
  102. private async checkPublishTasks(): Promise<void> {
  103. const taskRepository = AppDataSource.getRepository(PublishTask);
  104. // 获取需要执行的任务
  105. const tasks = await taskRepository.find({
  106. where: {
  107. status: 'pending',
  108. scheduledAt: LessThanOrEqual(new Date()),
  109. },
  110. relations: ['results'],
  111. });
  112. for (const task of tasks) {
  113. logger.info(`Executing scheduled task: ${task.id}`);
  114. // 更新状态为处理中
  115. await taskRepository.update(task.id, { status: 'processing' });
  116. wsManager.sendToUser(task.userId, WS_EVENTS.TASK_STATUS_CHANGED, {
  117. taskId: task.id,
  118. status: 'processing',
  119. });
  120. // 执行发布
  121. await this.executePublishTask(task);
  122. }
  123. }
  124. /**
  125. * 执行发布任务
  126. */
  127. private async executePublishTask(task: PublishTask): Promise<void> {
  128. const taskRepository = AppDataSource.getRepository(PublishTask);
  129. const accountRepository = AppDataSource.getRepository(PlatformAccount);
  130. const targetAccounts = task.targetAccounts || [];
  131. const accounts = await accountRepository.find({
  132. where: { id: In(targetAccounts) },
  133. });
  134. let successCount = 0;
  135. let failCount = 0;
  136. for (const account of accounts) {
  137. if (!isPlatformSupported(account.platform)) {
  138. logger.warn(`Platform ${account.platform} not supported`);
  139. failCount++;
  140. continue;
  141. }
  142. try {
  143. const adapter = getAdapter(account.platform);
  144. const result = await adapter.publishVideo(account.cookieData || '', {
  145. videoPath: task.videoPath || '',
  146. title: task.title || '',
  147. description: task.description || undefined,
  148. coverPath: task.coverPath || undefined,
  149. tags: task.tags || undefined,
  150. });
  151. if (result.success) {
  152. successCount++;
  153. } else {
  154. failCount++;
  155. }
  156. // 更新发布结果
  157. // TODO: 更新 publish_results 表
  158. } catch (error) {
  159. logger.error(`Publish to ${account.platform} failed:`, error);
  160. failCount++;
  161. }
  162. }
  163. // 更新任务状态
  164. const finalStatus = failCount === 0 ? 'completed' : (successCount === 0 ? 'failed' : 'completed');
  165. await taskRepository.update(task.id, {
  166. status: finalStatus,
  167. publishedAt: new Date(),
  168. });
  169. wsManager.sendToUser(task.userId, WS_EVENTS.TASK_STATUS_CHANGED, {
  170. taskId: task.id,
  171. status: finalStatus,
  172. });
  173. }
  174. /**
  175. * 刷新账号状态和信息
  176. * 将每个账号的刷新任务加入到任务队列中执行
  177. * 通过任务队列控制并发,避免浏览器资源竞争
  178. */
  179. private async refreshAccounts(): Promise<void> {
  180. // 检查是否正在执行刷新任务
  181. if (this.isRefreshingAccounts) {
  182. logger.info('[Scheduler] Account refresh is already running, skipping this cycle...');
  183. return;
  184. }
  185. // 获取锁
  186. this.isRefreshingAccounts = true;
  187. logger.debug('[Scheduler] Acquired refresh lock');
  188. try {
  189. const accountRepository = AppDataSource.getRepository(PlatformAccount);
  190. // 获取所有账号,不过滤 status,让刷新任务自动检测并更新状态
  191. const accounts = await accountRepository.find();
  192. if (accounts.length === 0) {
  193. logger.info('[Scheduler] No active accounts to refresh');
  194. return;
  195. }
  196. logger.info(`[Scheduler] Creating refresh tasks for ${accounts.length} active accounts...`);
  197. let tasksCreated = 0;
  198. let skipped = 0;
  199. // 为每个账号创建刷新任务,加入任务队列
  200. for (const account of accounts) {
  201. if (!isPlatformSupported(account.platform)) {
  202. logger.debug(`[Scheduler] Platform ${account.platform} not supported, skipping account ${account.id}`);
  203. skipped++;
  204. continue;
  205. }
  206. try {
  207. // 创建 sync_account 任务加入队列(静默执行,前台不弹框)
  208. taskQueueService.createTask(account.userId, {
  209. type: 'sync_account',
  210. title: `自动刷新: ${account.accountName || account.platform}`,
  211. description: `定时刷新账号 ${account.accountName} 的状态和信息`,
  212. priority: 'low', // 自动任务使用低优先级,不影响用户主动操作
  213. silent: true, // 静默执行,前台不弹框显示
  214. accountId: account.id,
  215. });
  216. tasksCreated++;
  217. logger.debug(`[Scheduler] Created refresh task for account ${account.id} (${account.accountName})`);
  218. } catch (error) {
  219. logger.error(`[Scheduler] Failed to create refresh task for account ${account.id}:`, error);
  220. }
  221. }
  222. logger.info(`[Scheduler] Account refresh tasks created: ${tasksCreated} tasks, ${skipped} skipped`);
  223. } finally {
  224. // 释放锁
  225. this.isRefreshingAccounts = false;
  226. logger.debug('[Scheduler] Released refresh lock');
  227. }
  228. }
  229. /**
  230. * 采集数据统计
  231. */
  232. private async collectAnalytics(): Promise<void> {
  233. const accountRepository = AppDataSource.getRepository(PlatformAccount);
  234. const analyticsRepository = AppDataSource.getRepository(AnalyticsData);
  235. const accounts = await accountRepository.find({
  236. where: { status: 'active' },
  237. });
  238. const today = new Date().toISOString().split('T')[0];
  239. for (const account of accounts) {
  240. if (!isPlatformSupported(account.platform)) continue;
  241. try {
  242. const adapter = getAdapter(account.platform);
  243. const data = await adapter.getAnalytics(account.cookieData || '', {
  244. startDate: today,
  245. endDate: today,
  246. });
  247. // 保存或更新数据
  248. const existing = await analyticsRepository.findOne({
  249. where: { accountId: account.id, date: today },
  250. });
  251. if (existing) {
  252. await analyticsRepository.update(existing.id, {
  253. fansCount: data.fansCount,
  254. fansIncrease: data.fansIncrease,
  255. viewsCount: data.viewsCount,
  256. likesCount: data.likesCount,
  257. commentsCount: data.commentsCount,
  258. sharesCount: data.sharesCount,
  259. income: data.income || 0,
  260. });
  261. } else {
  262. await analyticsRepository.save({
  263. userId: account.userId,
  264. accountId: account.id,
  265. date: today,
  266. fansCount: data.fansCount,
  267. fansIncrease: data.fansIncrease,
  268. viewsCount: data.viewsCount,
  269. likesCount: data.likesCount,
  270. commentsCount: data.commentsCount,
  271. sharesCount: data.sharesCount,
  272. income: data.income || 0,
  273. });
  274. }
  275. wsManager.sendToUser(account.userId, WS_EVENTS.ANALYTICS_UPDATED, {
  276. accountId: account.id,
  277. });
  278. } catch (error) {
  279. logger.error(`Collect analytics for account ${account.id} failed:`, error);
  280. }
  281. }
  282. }
  283. /**
  284. * 小红书:账号概览导出(近30日)→ 导入 user_day_statistics
  285. */
  286. private async importXhsAccountOverviewLast30Days(): Promise<void> {
  287. if (this.isXhsImportRunning) {
  288. logger.info('[Scheduler] XHS import is already running, skipping this cycle...');
  289. return;
  290. }
  291. this.isXhsImportRunning = true;
  292. try {
  293. await XiaohongshuAccountOverviewImportService.runDailyImport();
  294. } finally {
  295. this.isXhsImportRunning = false;
  296. }
  297. }
  298. /**
  299. * 小红书:作品维度「笔记详情-按天」→ 导入 work_day_statistics
  300. */
  301. private async importXhsWorkNoteStatistics(): Promise<void> {
  302. if (this.isXhsWorkImportRunning) {
  303. logger.info('[Scheduler] XHS work note statistics import is already running, skipping...');
  304. return;
  305. }
  306. this.isXhsWorkImportRunning = true;
  307. try {
  308. await XiaohongshuWorkNoteStatisticsImportService.runDailyImport();
  309. } finally {
  310. this.isXhsWorkImportRunning = false;
  311. }
  312. }
  313. /**
  314. * 抖音:账号总览-短视频-数据表现导出(近30天)→ 导入 user_day_statistics
  315. */
  316. private async importDyAccountOverviewLast30Days(): Promise<void> {
  317. if (this.isDyImportRunning) {
  318. logger.info('[Scheduler] Douyin import is already running, skipping this cycle...');
  319. return;
  320. }
  321. this.isDyImportRunning = true;
  322. try {
  323. await DouyinAccountOverviewImportService.runDailyImport();
  324. } finally {
  325. this.isDyImportRunning = false;
  326. }
  327. }
  328. /**
  329. * 自动回复私信(每5分钟执行一次)
  330. * 只处理微信视频号平台的账号
  331. */
  332. private async autoReplyMessages(): Promise<void> {
  333. // 检查是否正在执行回复任务
  334. if (this.isAutoReplying) {
  335. logger.info('[Scheduler] Auto reply is already running, skipping this cycle...');
  336. return;
  337. }
  338. // 获取锁
  339. this.isAutoReplying = true;
  340. logger.debug('[Scheduler] Acquired auto reply lock');
  341. try {
  342. const accountRepository = AppDataSource.getRepository(PlatformAccount);
  343. // 只获取微信视频号的活跃账号
  344. const accounts = await accountRepository.find({
  345. where: {
  346. // platform: 'weixin_video',
  347. userId: 2,
  348. status: 'active',
  349. },
  350. });
  351. if (accounts.length === 0) {
  352. logger.info('[Scheduler] No active accounts for auto reply');
  353. return;
  354. }
  355. logger.info(`[Scheduler] Starting auto reply for ${accounts.length} accounts...`);
  356. let successCount = 0;
  357. let failCount = 0;
  358. // 为每个账号执行自动回复
  359. for (const account of accounts) {
  360. try {
  361. logger.info(`[Scheduler] Auto replying for account: ${account.accountName} (${account.id})`);
  362. // Python 服务端使用 weixin,不是 weixin_video
  363. const pythonPlatform = account.platform === 'weixin_video' ? 'weixin' : account.platform;
  364. // 调用 Python 服务执行自动回复
  365. const response = await fetch('http://localhost:5005/auto-reply', {
  366. method: 'POST',
  367. headers: {
  368. 'Content-Type': 'application/json',
  369. },
  370. body: JSON.stringify({
  371. platform: pythonPlatform,
  372. cookie: account.cookieData || '',
  373. }),
  374. signal: AbortSignal.timeout(120000), // 2分钟超时
  375. });
  376. if (!response.ok) {
  377. throw new Error(`HTTP ${response.status}`);
  378. }
  379. const result = await response.json();
  380. if (result.success) {
  381. successCount++;
  382. logger.info(`[Scheduler] Auto reply success for ${account.accountName}: ${result.replied_count} messages`);
  383. } else {
  384. failCount++;
  385. logger.error(`[Scheduler] Auto reply failed for ${account.accountName}: ${result.error}`);
  386. }
  387. } catch (error) {
  388. failCount++;
  389. logger.error(`[Scheduler] Auto reply error for account ${account.id}:`, error);
  390. }
  391. }
  392. logger.info(`[Scheduler] Auto reply completed: ${successCount} success, ${failCount} failed`);
  393. } finally {
  394. // 释放锁
  395. this.isAutoReplying = false;
  396. logger.debug('[Scheduler] Released auto reply lock');
  397. }
  398. }
  399. /**
  400. * 百家号:内容分析-基础数据导出(近30天)→ 导入 user_day_statistics
  401. */
  402. private async importBaijiahaoContentOverviewLast30Days(): Promise<void> {
  403. if (this.isBjImportRunning) {
  404. logger.info('[Scheduler] Baijiahao import is already running, skipping this cycle...');
  405. return;
  406. }
  407. this.isBjImportRunning = true;
  408. try {
  409. await BaijiahaoContentOverviewImportService.runDailyImport();
  410. } finally {
  411. this.isBjImportRunning = false;
  412. }
  413. }
  414. /**
  415. * 视频号:数据中心-关注者/视频/图文 的增长详情(近30天)→ 导入 user_day_statistics
  416. */
  417. private async importWeixinVideoDataCenterLast30Days(): Promise<void> {
  418. if (this.isWxImportRunning) {
  419. logger.info('[Scheduler] Weixin video import is already running, skipping this cycle...');
  420. return;
  421. }
  422. this.isWxImportRunning = true;
  423. try {
  424. await WeixinVideoDataCenterImportService.runDailyImport();
  425. } finally {
  426. this.isWxImportRunning = false;
  427. }
  428. }
  429. }
  430. export const taskScheduler = new TaskScheduler();