index.ts 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581
  1. import schedule from 'node-schedule';
  2. import { AppDataSource, PublishTask, PlatformAccount, AnalyticsData } from '../models/index.js';
  3. import { logger } from '../utils/logger.js';
  4. import { wsManager } from '../websocket/index.js';
  5. import { WS_EVENTS } from '@media-manager/shared';
  6. import { getAdapter, isPlatformSupported } from '../automation/platforms/index.js';
  7. import { LessThanOrEqual, In } from 'typeorm';
  8. import { taskQueueService } from '../services/TaskQueueService.js';
  9. import { XiaohongshuAccountOverviewImportService } from '../services/XiaohongshuAccountOverviewImportService.js';
  10. import { DouyinAccountOverviewImportService } from '../services/DouyinAccountOverviewImportService.js';
  11. import { BaijiahaoContentOverviewImportService } from '../services/BaijiahaoContentOverviewImportService.js';
  12. import { WeixinVideoDataCenterImportService } from '../services/WeixinVideoDataCenterImportService.js';
  13. import { XiaohongshuWorkNoteStatisticsImportService } from '../services/XiaohongshuWorkNoteStatisticsImportService.js';
  14. import { DouyinWorkStatisticsImportService } from '../services/DouyinWorkStatisticsImportService.js';
  15. import { WeixinVideoWorkStatisticsImportService } from '../services/WeixinVideoWorkStatisticsImportService.js';
  16. import { BaijiahaoWorkDailyStatisticsImportService } from '../services/BaijiahaoWorkDailyStatisticsImportService.js';
  17. import { getPythonServiceBaseUrl } from '../services/PythonServiceConfigService.js';
  18. /**
  19. * 定时任务调度器
  20. */
  21. export class TaskScheduler {
  22. private jobs: Map<string, schedule.Job> = new Map();
  23. private isRefreshingAccounts = false; // 账号刷新锁,防止任务重叠执行
  24. private isXhsImportRunning = false; // 小红书导入锁,防止任务重叠执行
  25. private isXhsWorkImportRunning = false; // 小红书作品日统计导入锁
  26. private isDyImportRunning = false; // 抖音导入锁,防止任务重叠执行
  27. private isDyWorkImportRunning = false; // 抖音作品日统计导入锁
  28. private isBjImportRunning = false; // 百家号导入锁,防止任务重叠执行
  29. private isBjWorkImportRunning = false; // 百家号作品日统计导入锁
  30. private isWxImportRunning = false; // 视频号导入锁,防止任务重叠执行
  31. private isWxWorkImportRunning = false; // 视频号作品日统计导入锁
  32. private isAutoReplying = false; // 私信回复锁,防止任务重叠执行
  33. /**
  34. * 启动调度器
  35. *
  36. * 注意:账号刷新任务由客户端定时触发,只刷新当前登录用户的账号
  37. * 服务端不再自动刷新所有用户的账号
  38. */
  39. start(): void {
  40. logger.info('[Scheduler] ========================================');
  41. logger.info('[Scheduler] Starting task scheduler...');
  42. // 每分钟检查定时发布任务(只处理到期的定时发布任务)
  43. this.scheduleJob('check-publish-tasks', '* * * * *', this.checkPublishTasks.bind(this));
  44. // 每天中午 12 点:批量导出小红书“账号概览-笔记数据-观看数据-近30日”,导入 user_day_statistics
  45. // 注意:node-schedule 使用服务器本地时区
  46. this.scheduleJob('xhs-account-overview-import', '0 12 * * *', this.importXhsAccountOverviewLast30Days.bind(this));
  47. // 每天 12:40:同步小红书作品维度的「笔记详情-按天」数据,写入 work_day_statistics
  48. this.scheduleJob(
  49. 'xhs-work-note-statistics-import',
  50. '40 12 * * *',
  51. this.importXhsWorkNoteStatistics.bind(this)
  52. );
  53. // 每天 12:10:批量导出抖音“数据中心-账号总览-短视频-数据表现-近30天”,导入 user_day_statistics
  54. this.scheduleJob('dy-account-overview-import', '10 12 * * *', this.importDyAccountOverviewLast30Days.bind(this));
  55. // 每天 12:50:同步抖音作品维度的「作品详情-按天」数据,写入 work_day_statistics
  56. this.scheduleJob('dy-work-statistics-import', '50 12 * * *', this.importDyWorkStatistics.bind(this));
  57. // 每天 12:20:批量导出百家号“数据中心-内容分析-基础数据-近30天”,导入 user_day_statistics
  58. this.scheduleJob('bj-content-overview-import', '20 12 * * *', this.importBaijiahaoContentOverviewLast30Days.bind(this));
  59. // 每天 12:25:百家号作品维度「每日数据」同步(列表分页 + 逐条趋势),写入 works.yesterday_* 与 work_day_statistics
  60. this.scheduleJob('bj-work-daily-import', '25 12 * * *', this.importBaijiahaoWorkDailyStatistics.bind(this));
  61. // 每天 12:30:批量导出视频号“数据中心-各子菜单-增长详情(数据详情)-近30天-下载表格”,导入 user_day_statistics
  62. this.scheduleJob('wx-video-data-center-import', '30 12 * * *', this.importWeixinVideoDataCenterLast30Days.bind(this));
  63. // 每天 12:35:同步视频号作品维度的「作品列表 + 按天聚合-全部」数据,写入 work_day_statistics
  64. this.scheduleJob(
  65. 'wx-video-work-statistics-import',
  66. '35 12 * * *',
  67. this.importWeixinVideoWorkStatistics.bind(this)
  68. );
  69. this.scheduleJob('auto-reply-messages', '* * * * *', this.autoReplyMessages.bind(this));
  70. // 注意:账号刷新由客户端定时触发,不在服务端自动执行
  71. // 这样可以确保只刷新当前登录用户的账号,避免处理其他用户的数据
  72. // 每天凌晨2点采集数据统计(可选,如果需要服务端采集可以启用)
  73. // this.scheduleJob('collect-analytics', '0 2 * * *', this.collectAnalytics.bind(this));
  74. logger.info('[Scheduler] Scheduled jobs:');
  75. logger.info('[Scheduler] - check-publish-tasks: every minute (* * * * *)');
  76. logger.info('[Scheduler] - xhs-account-overview-import: daily at 12:00 (0 12 * * *)');
  77. logger.info(
  78. '[Scheduler] - xhs-work-note-statistics-import: daily at 12:40 (40 12 * * *)'
  79. );
  80. logger.info('[Scheduler] - dy-account-overview-import: daily at 12:10 (10 12 * * *)');
  81. logger.info('[Scheduler] - dy-work-statistics-import: daily at 12:50 (50 12 * * *)');
  82. logger.info('[Scheduler] - bj-content-overview-import: daily at 12:20 (20 12 * * *)');
  83. logger.info('[Scheduler] - bj-work-daily-import: daily at 12:25 (25 12 * * *)');
  84. logger.info('[Scheduler] - wx-video-data-center-import: daily at 12:30 (30 12 * * *)');
  85. logger.info('[Scheduler] - wx-video-work-statistics-import: daily at 12:35 (35 12 * * *)');
  86. logger.info('[Scheduler] - auto-reply-messages: every minute (* * * * *)');
  87. logger.info('[Scheduler] Note: Account refresh is triggered by client, not server');
  88. logger.info('[Scheduler] ========================================');
  89. logger.info('[Scheduler] Task scheduler started successfully');
  90. }
  91. /**
  92. * 停止调度器
  93. */
  94. stop(): void {
  95. this.jobs.forEach((job, name) => {
  96. job.cancel();
  97. logger.info(`Cancelled job: ${name}`);
  98. });
  99. this.jobs.clear();
  100. logger.info('Task scheduler stopped');
  101. }
  102. /**
  103. * 添加定时任务
  104. */
  105. private scheduleJob(name: string, cron: string, handler: () => Promise<void>): void {
  106. const job = schedule.scheduleJob(cron, async () => {
  107. logger.info(`Running scheduled job: ${name}`);
  108. try {
  109. await handler();
  110. logger.info(`Completed job: ${name}`);
  111. } catch (error) {
  112. logger.error(`Job ${name} failed:`, error);
  113. }
  114. });
  115. if (job) {
  116. this.jobs.set(name, job);
  117. }
  118. }
  119. /**
  120. * 检查定时发布任务
  121. */
  122. private async checkPublishTasks(): Promise<void> {
  123. const taskRepository = AppDataSource.getRepository(PublishTask);
  124. // 获取需要执行的任务
  125. const tasks = await taskRepository.find({
  126. where: {
  127. status: 'pending',
  128. scheduledAt: LessThanOrEqual(new Date()),
  129. },
  130. relations: ['results'],
  131. });
  132. for (const task of tasks) {
  133. logger.info(`Executing scheduled task: ${task.id}`);
  134. // 更新状态为处理中
  135. await taskRepository.update(task.id, { status: 'processing' });
  136. wsManager.sendToUser(task.userId, WS_EVENTS.TASK_STATUS_CHANGED, {
  137. taskId: task.id,
  138. status: 'processing',
  139. });
  140. // 执行发布
  141. await this.executePublishTask(task);
  142. }
  143. }
  144. /**
  145. * 执行发布任务
  146. */
  147. private async executePublishTask(task: PublishTask): Promise<void> {
  148. const taskRepository = AppDataSource.getRepository(PublishTask);
  149. const accountRepository = AppDataSource.getRepository(PlatformAccount);
  150. const targetAccounts = task.targetAccounts || [];
  151. const accounts = await accountRepository.find({
  152. where: { id: In(targetAccounts) },
  153. });
  154. let successCount = 0;
  155. let failCount = 0;
  156. for (const account of accounts) {
  157. if (!isPlatformSupported(account.platform)) {
  158. logger.warn(`Platform ${account.platform} not supported`);
  159. failCount++;
  160. continue;
  161. }
  162. try {
  163. const adapter = getAdapter(account.platform);
  164. const result = await adapter.publishVideo(account.cookieData || '', {
  165. videoPath: task.videoPath || '',
  166. title: task.title || '',
  167. description: task.description || undefined,
  168. coverPath: task.coverPath || undefined,
  169. tags: task.tags || undefined,
  170. });
  171. if (result.success) {
  172. successCount++;
  173. } else {
  174. failCount++;
  175. }
  176. // 更新发布结果
  177. // TODO: 更新 publish_results 表
  178. } catch (error) {
  179. logger.error(`Publish to ${account.platform} failed:`, error);
  180. failCount++;
  181. }
  182. }
  183. // 更新任务状态
  184. const finalStatus = failCount === 0 ? 'completed' : (successCount === 0 ? 'failed' : 'completed');
  185. await taskRepository.update(task.id, {
  186. status: finalStatus,
  187. publishedAt: new Date(),
  188. });
  189. wsManager.sendToUser(task.userId, WS_EVENTS.TASK_STATUS_CHANGED, {
  190. taskId: task.id,
  191. status: finalStatus,
  192. });
  193. }
  194. /**
  195. * 刷新账号状态和信息
  196. * 将每个账号的刷新任务加入到任务队列中执行
  197. * 通过任务队列控制并发,避免浏览器资源竞争
  198. */
  199. private async _refreshAccounts(): Promise<void> {
  200. // 检查是否正在执行刷新任务
  201. if (this.isRefreshingAccounts) {
  202. logger.info('[Scheduler] Account refresh is already running, skipping this cycle...');
  203. return;
  204. }
  205. // 获取锁
  206. this.isRefreshingAccounts = true;
  207. logger.debug('[Scheduler] Acquired refresh lock');
  208. try {
  209. const accountRepository = AppDataSource.getRepository(PlatformAccount);
  210. // 获取所有账号,不过滤 status,让刷新任务自动检测并更新状态
  211. const accounts = await accountRepository.find();
  212. if (accounts.length === 0) {
  213. logger.info('[Scheduler] No active accounts to refresh');
  214. return;
  215. }
  216. logger.info(`[Scheduler] Creating refresh tasks for ${accounts.length} active accounts...`);
  217. let tasksCreated = 0;
  218. let skipped = 0;
  219. // 为每个账号创建刷新任务,加入任务队列
  220. for (const account of accounts) {
  221. if (!isPlatformSupported(account.platform)) {
  222. logger.debug(`[Scheduler] Platform ${account.platform} not supported, skipping account ${account.id}`);
  223. skipped++;
  224. continue;
  225. }
  226. try {
  227. // 创建 sync_account 任务加入队列(静默执行,前台不弹框)
  228. taskQueueService.createTask(account.userId, {
  229. type: 'sync_account',
  230. title: `自动刷新: ${account.accountName || account.platform}`,
  231. description: `定时刷新账号 ${account.accountName} 的状态和信息`,
  232. priority: 'low', // 自动任务使用低优先级,不影响用户主动操作
  233. silent: true, // 静默执行,前台不弹框显示
  234. accountId: account.id,
  235. });
  236. tasksCreated++;
  237. logger.debug(`[Scheduler] Created refresh task for account ${account.id} (${account.accountName})`);
  238. } catch (error) {
  239. logger.error(`[Scheduler] Failed to create refresh task for account ${account.id}:`, error);
  240. }
  241. }
  242. logger.info(`[Scheduler] Account refresh tasks created: ${tasksCreated} tasks, ${skipped} skipped`);
  243. } finally {
  244. // 释放锁
  245. this.isRefreshingAccounts = false;
  246. logger.debug('[Scheduler] Released refresh lock');
  247. }
  248. }
  249. /**
  250. * 采集数据统计
  251. */
  252. private async _collectAnalytics(): Promise<void> {
  253. const accountRepository = AppDataSource.getRepository(PlatformAccount);
  254. const analyticsRepository = AppDataSource.getRepository(AnalyticsData);
  255. const accounts = await accountRepository.find({
  256. where: { status: 'active' },
  257. });
  258. const today = new Date().toISOString().split('T')[0];
  259. for (const account of accounts) {
  260. if (!isPlatformSupported(account.platform)) continue;
  261. try {
  262. const adapter = getAdapter(account.platform);
  263. const data = await adapter.getAnalytics(account.cookieData || '', {
  264. startDate: today,
  265. endDate: today,
  266. });
  267. // 保存或更新数据
  268. const existing = await analyticsRepository.findOne({
  269. where: { accountId: account.id, date: today },
  270. });
  271. if (existing) {
  272. await analyticsRepository.update(existing.id, {
  273. fansCount: data.fansCount,
  274. fansIncrease: data.fansIncrease,
  275. viewsCount: data.viewsCount,
  276. likesCount: data.likesCount,
  277. commentsCount: data.commentsCount,
  278. sharesCount: data.sharesCount,
  279. income: data.income || 0,
  280. });
  281. } else {
  282. await analyticsRepository.save({
  283. userId: account.userId,
  284. accountId: account.id,
  285. date: today,
  286. fansCount: data.fansCount,
  287. fansIncrease: data.fansIncrease,
  288. viewsCount: data.viewsCount,
  289. likesCount: data.likesCount,
  290. commentsCount: data.commentsCount,
  291. sharesCount: data.sharesCount,
  292. income: data.income || 0,
  293. });
  294. }
  295. wsManager.sendToUser(account.userId, WS_EVENTS.ANALYTICS_UPDATED, {
  296. accountId: account.id,
  297. });
  298. } catch (error) {
  299. logger.error(`Collect analytics for account ${account.id} failed:`, error);
  300. }
  301. }
  302. }
  303. /**
  304. * 小红书:账号概览导出(近30日)→ 导入 user_day_statistics
  305. */
  306. private async importXhsAccountOverviewLast30Days(): Promise<void> {
  307. if (this.isXhsImportRunning) {
  308. logger.info('[Scheduler] XHS import is already running, skipping this cycle...');
  309. return;
  310. }
  311. this.isXhsImportRunning = true;
  312. try {
  313. await XiaohongshuAccountOverviewImportService.runDailyImport();
  314. } finally {
  315. this.isXhsImportRunning = false;
  316. }
  317. }
  318. /**
  319. * 小红书:作品维度「笔记详情-按天」→ 导入 work_day_statistics
  320. */
  321. private async importXhsWorkNoteStatistics(): Promise<void> {
  322. if (this.isXhsWorkImportRunning) {
  323. logger.info('[Scheduler] XHS work note statistics import is already running, skipping...');
  324. return;
  325. }
  326. this.isXhsWorkImportRunning = true;
  327. try {
  328. await XiaohongshuWorkNoteStatisticsImportService.runDailyImport();
  329. } finally {
  330. this.isXhsWorkImportRunning = false;
  331. }
  332. }
  333. /**
  334. * 抖音:账号总览-短视频-数据表现导出(近30天)→ 导入 user_day_statistics
  335. */
  336. private async importDyAccountOverviewLast30Days(): Promise<void> {
  337. if (this.isDyImportRunning) {
  338. logger.info('[Scheduler] Douyin import is already running, skipping this cycle...');
  339. return;
  340. }
  341. this.isDyImportRunning = true;
  342. try {
  343. await DouyinAccountOverviewImportService.runDailyImport();
  344. } finally {
  345. this.isDyImportRunning = false;
  346. }
  347. }
  348. /**
  349. * 抖音:作品维度「作品详情-按天」→ 导入 work_day_statistics
  350. */
  351. private async importDyWorkStatistics(): Promise<void> {
  352. if (this.isDyWorkImportRunning) {
  353. logger.info('[Scheduler] Douyin work statistics import is already running, skipping...');
  354. return;
  355. }
  356. this.isDyWorkImportRunning = true;
  357. try {
  358. await DouyinWorkStatisticsImportService.runDailyImport();
  359. } finally {
  360. this.isDyWorkImportRunning = false;
  361. }
  362. }
  363. /**
  364. * 自动回复私信(每5分钟执行一次)
  365. * 只处理微信视频号平台的账号
  366. */
  367. private async autoReplyMessages(): Promise<void> {
  368. // 检查是否正在执行回复任务
  369. if (this.isAutoReplying) {
  370. logger.info('[Scheduler] Auto reply is already running, skipping this cycle...');
  371. return;
  372. }
  373. // 获取锁
  374. this.isAutoReplying = true;
  375. logger.debug('[Scheduler] Acquired auto reply lock');
  376. try {
  377. const accountRepository = AppDataSource.getRepository(PlatformAccount);
  378. // 只获取微信视频号的活跃账号
  379. const accounts = await accountRepository.find({
  380. where: {
  381. // platform: 'weixin_video',
  382. userId: 2,
  383. status: 'active',
  384. },
  385. });
  386. if (accounts.length === 0) {
  387. logger.info('[Scheduler] No active accounts for auto reply');
  388. return;
  389. }
  390. logger.info(`[Scheduler] Starting auto reply for ${accounts.length} accounts...`);
  391. let successCount = 0;
  392. let failCount = 0;
  393. // 为每个账号执行自动回复
  394. for (const account of accounts) {
  395. try {
  396. logger.info(`[Scheduler] Auto replying for account: ${account.accountName} (${account.id})`);
  397. // Python 服务端使用 weixin,不是 weixin_video
  398. const pythonPlatform = account.platform === 'weixin_video' ? 'weixin' : account.platform;
  399. // 调用 Python 服务执行自动回复
  400. const pythonUrl = (await getPythonServiceBaseUrl()).replace(/\/$/, '');
  401. const response = await fetch(`${pythonUrl}/auto-reply`, {
  402. method: 'POST',
  403. headers: {
  404. 'Content-Type': 'application/json',
  405. },
  406. body: JSON.stringify({
  407. platform: pythonPlatform,
  408. cookie: account.cookieData || '',
  409. }),
  410. signal: AbortSignal.timeout(120000), // 2分钟超时
  411. });
  412. if (!response.ok) {
  413. throw new Error(`HTTP ${response.status}`);
  414. }
  415. const result = await response.json();
  416. if (result.success) {
  417. successCount++;
  418. logger.info(`[Scheduler] Auto reply success for ${account.accountName}: ${result.replied_count} messages`);
  419. } else {
  420. failCount++;
  421. logger.error(`[Scheduler] Auto reply failed for ${account.accountName}: ${result.error}`);
  422. }
  423. } catch (error) {
  424. failCount++;
  425. logger.error(`[Scheduler] Auto reply error for account ${account.id}:`, error);
  426. }
  427. }
  428. logger.info(`[Scheduler] Auto reply completed: ${successCount} success, ${failCount} failed`);
  429. } finally {
  430. // 释放锁
  431. this.isAutoReplying = false;
  432. logger.debug('[Scheduler] Released auto reply lock');
  433. }
  434. }
  435. /**
  436. * 百家号:内容分析-基础数据导出(近30天)→ 导入 user_day_statistics
  437. */
  438. private async importBaijiahaoContentOverviewLast30Days(): Promise<void> {
  439. if (this.isBjImportRunning) {
  440. logger.info('[Scheduler] Baijiahao import is already running, skipping this cycle...');
  441. return;
  442. }
  443. this.isBjImportRunning = true;
  444. try {
  445. await BaijiahaoContentOverviewImportService.runDailyImport();
  446. } finally {
  447. this.isBjImportRunning = false;
  448. }
  449. }
  450. /**
  451. * 百家号:作品维度「每日作品数据」→ 写入 works.yesterday_* 与 work_day_statistics
  452. */
  453. private async importBaijiahaoWorkDailyStatistics(): Promise<void> {
  454. if (this.isBjWorkImportRunning) {
  455. logger.info('[Scheduler] Baijiahao work daily import is already running, skipping...');
  456. return;
  457. }
  458. this.isBjWorkImportRunning = true;
  459. try {
  460. await BaijiahaoWorkDailyStatisticsImportService.runDailyImport();
  461. } finally {
  462. this.isBjWorkImportRunning = false;
  463. }
  464. }
  465. /**
  466. * 视频号:数据中心-关注者/视频/图文 的增长详情(近30天)→ 导入 user_day_statistics
  467. */
  468. private async importWeixinVideoDataCenterLast30Days(): Promise<void> {
  469. if (this.isWxImportRunning) {
  470. logger.info('[Scheduler] Weixin video import is already running, skipping this cycle...');
  471. return;
  472. }
  473. this.isWxImportRunning = true;
  474. try {
  475. await WeixinVideoDataCenterImportService.runDailyImport();
  476. } finally {
  477. this.isWxImportRunning = false;
  478. }
  479. }
  480. /**
  481. * 视频号:作品维度「作品列表 + feed_aggreagate_data_by_tab_type 全部」→ 导入 work_day_statistics
  482. */
  483. private async importWeixinVideoWorkStatistics(): Promise<void> {
  484. if (this.isWxWorkImportRunning) {
  485. logger.info('[Scheduler] Weixin video work statistics import is already running, skipping...');
  486. return;
  487. }
  488. this.isWxWorkImportRunning = true;
  489. try {
  490. await WeixinVideoWorkStatisticsImportService.runDailyImport();
  491. } finally {
  492. this.isWxWorkImportRunning = false;
  493. }
  494. }
  495. }
  496. export const taskScheduler = new TaskScheduler();