taskExecutors.ts 9.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276
  1. /**
  2. * 任务执行器注册
  3. * 在应用启动时注册所有任务类型的执行器
  4. */
  5. import { Task, TaskResult, TaskProgressUpdate } from '@media-manager/shared';
  6. import { taskQueueService } from './TaskQueueService.js';
  7. import { CommentService } from './CommentService.js';
  8. import { WorkService } from './WorkService.js';
  9. import { AccountService } from './AccountService.js';
  10. import { PublishService } from './PublishService.js';
  11. import { XiaohongshuWorkNoteStatisticsImportService } from './XiaohongshuWorkNoteStatisticsImportService.js';
  12. import { AppDataSource, PlatformAccount } from '../models/index.js';
  13. import { logger } from '../utils/logger.js';
  14. // 创建服务实例
  15. const commentService = new CommentService();
  16. const workService = new WorkService();
  17. const accountService = new AccountService();
  18. const publishService = new PublishService();
  19. const xhsWorkStatsService = new XiaohongshuWorkNoteStatisticsImportService();
  20. type ProgressUpdater = (update: Partial<TaskProgressUpdate>) => void;
  21. /**
  22. * 同步评论任务执行器
  23. */
  24. async function syncCommentsExecutor(task: Task, updateProgress: ProgressUpdater): Promise<TaskResult> {
  25. updateProgress({ progress: 5, currentStep: '连接平台...' });
  26. const onProgress = (current: number, total: number, workTitle: string) => {
  27. const progress = total > 0 ? Math.min(90, Math.round((current / total) * 90) + 5) : 50;
  28. updateProgress({
  29. progress,
  30. currentStep: `正在同步: ${workTitle || `作品 ${current}/${total}`}`,
  31. currentStepIndex: current,
  32. });
  33. };
  34. // 从任务中获取 userId(需要在创建任务时传入)
  35. const userId = (task as Task & { userId?: number }).userId;
  36. if (!userId) {
  37. throw new Error('缺少用户ID');
  38. }
  39. const result = await commentService.syncComments(userId, task.accountId, onProgress);
  40. updateProgress({ progress: 100, currentStep: '同步完成' });
  41. return {
  42. success: true,
  43. message: `同步完成,共同步 ${result.synced} 条评论`,
  44. data: {
  45. syncedCount: result.synced,
  46. accountCount: result.accounts,
  47. },
  48. };
  49. }
  50. /**
  51. * 同步作品任务执行器
  52. */
  53. async function syncWorksExecutor(task: Task, updateProgress: ProgressUpdater): Promise<TaskResult> {
  54. updateProgress({ progress: 5, currentStep: '获取作品列表...' });
  55. const userId = (task as Task & { userId?: number }).userId;
  56. if (!userId) {
  57. throw new Error('缺少用户ID');
  58. }
  59. const result = await workService.syncWorks(userId, task.accountId, task.platform, (progress, currentStep) => {
  60. updateProgress({ progress, currentStep });
  61. });
  62. updateProgress({ progress: 100, currentStep: '同步完成' });
  63. const summaryText = (() => {
  64. if (result.accountSummaries.length === 1) {
  65. const s = result.accountSummaries[0];
  66. return `(${s.platform} list=${s.worksListLength}/${s.worksCount} python=${s.pythonAvailable ? 'ok' : 'off'} source=${s.source || 'unknown'})`;
  67. }
  68. return '';
  69. })();
  70. return {
  71. success: true,
  72. message: `同步完成,共同步 ${result.synced} 个作品${summaryText}`,
  73. data: {
  74. syncedCount: result.synced,
  75. accountCount: result.accounts,
  76. accountSummaries: result.accountSummaries,
  77. },
  78. };
  79. }
  80. /**
  81. * 同步账号信息任务执行器
  82. */
  83. async function syncAccountExecutor(task: Task, updateProgress: ProgressUpdater): Promise<TaskResult> {
  84. updateProgress({ progress: 10, currentStep: '获取账号信息...' });
  85. const userId = (task as Task & { userId?: number }).userId;
  86. if (!userId) {
  87. throw new Error('缺少用户ID');
  88. }
  89. if (!task.accountId) {
  90. throw new Error('缺少账号ID');
  91. }
  92. const refreshResult = await accountService.refreshAccount(userId, task.accountId);
  93. if (refreshResult.needReLogin) {
  94. updateProgress({ progress: 100, currentStep: '需要重新登录' });
  95. throw new Error('账号登录已过期或触发风控,需要重新登录');
  96. }
  97. updateProgress({ progress: 100, currentStep: '同步完成' });
  98. return {
  99. success: true,
  100. message: '账号信息已更新',
  101. };
  102. }
  103. /**
  104. * 发布视频任务执行器
  105. */
  106. async function publishVideoExecutor(task: Task, updateProgress: ProgressUpdater): Promise<TaskResult> {
  107. updateProgress({ progress: 5, currentStep: '准备发布...' });
  108. const userId = (task as Task & { userId?: number }).userId;
  109. if (!userId) {
  110. throw new Error('缺少用户ID');
  111. }
  112. // 从任务数据中获取发布任务ID
  113. const taskData = task as Task & { publishTaskId?: number };
  114. if (!taskData.publishTaskId) {
  115. throw new Error('缺少发布任务ID');
  116. }
  117. // 执行发布任务
  118. await publishService.executePublishTaskWithProgress(
  119. taskData.publishTaskId,
  120. userId,
  121. (progress, message) => {
  122. updateProgress({ progress, currentStep: message });
  123. }
  124. );
  125. updateProgress({ progress: 100, currentStep: '发布完成' });
  126. return {
  127. success: true,
  128. message: '视频发布任务已完成',
  129. };
  130. }
  131. /**
  132. * 删除平台作品任务执行器
  133. */
  134. async function deleteWorkExecutor(task: Task, updateProgress: ProgressUpdater): Promise<TaskResult> {
  135. updateProgress({ progress: 10, currentStep: '准备删除...' });
  136. const userId = (task as Task & { userId?: number }).userId;
  137. if (!userId) {
  138. throw new Error('缺少用户ID');
  139. }
  140. const taskData = task as Task & { workId?: number };
  141. if (!taskData.workId) {
  142. throw new Error('缺少作品ID');
  143. }
  144. updateProgress({ progress: 30, currentStep: '连接平台删除作品...' });
  145. // 执行平台删除
  146. const result = await workService.deletePlatformWork(userId, taskData.workId);
  147. if (result.success) {
  148. updateProgress({ progress: 70, currentStep: '删除本地记录...' });
  149. // 平台删除成功后,删除本地记录
  150. try {
  151. await workService.deleteWork(userId, taskData.workId);
  152. logger.info(`Local work ${taskData.workId} deleted after platform deletion`);
  153. } catch (error) {
  154. logger.warn(`Failed to delete local work ${taskData.workId}:`, error);
  155. // 本地删除失败不影响整体结果
  156. }
  157. // 删除成功后,自动创建同步作品任务刷新作品列表
  158. if (result.accountId) {
  159. updateProgress({ progress: 90, currentStep: '刷新作品列表...' });
  160. try {
  161. taskQueueService.createTask(userId, {
  162. type: 'sync_works',
  163. title: '刷新作品列表',
  164. accountId: result.accountId,
  165. });
  166. logger.info(`Created sync_works task for account ${result.accountId} after delete`);
  167. } catch (syncError) {
  168. logger.warn(`Failed to create sync_works task after delete:`, syncError);
  169. // 同步任务创建失败不影响删除结果
  170. }
  171. }
  172. }
  173. updateProgress({ progress: 100, currentStep: result.success ? '删除完成' : '删除失败' });
  174. return {
  175. success: result.success,
  176. message: result.success ? '作品已从平台删除,正在刷新作品列表' : (result.errorMessage || '删除失败'),
  177. };
  178. }
  179. /**
  180. * 小红书作品首批日统计/快照补数任务执行器(不阻塞同步作品)
  181. * 任务 data: { workIds: number[] }
  182. */
  183. async function xhsWorkStatsBackfillExecutor(task: Task, updateProgress: ProgressUpdater): Promise<TaskResult> {
  184. updateProgress({ progress: 5, currentStep: '准备补数任务...' });
  185. const userId = (task as Task & { userId?: number }).userId;
  186. if (!userId) throw new Error('缺少用户ID');
  187. if (!task.accountId) throw new Error('缺少账号ID');
  188. const workIdsRaw = (task as Task & { workIds?: unknown }).workIds;
  189. const workIds = Array.isArray(workIdsRaw) ? workIdsRaw.map((x) => Number(x)).filter((n) => Number.isFinite(n) && n > 0) : [];
  190. if (!workIds.length) throw new Error('缺少 workIds');
  191. // 仅允许当前用户自己的账号
  192. const account = await AppDataSource.getRepository(PlatformAccount).findOne({
  193. where: { id: task.accountId, userId, platform: 'xiaohongshu' as any },
  194. });
  195. if (!account) throw new Error('未找到账号或无权限');
  196. const total = workIds.length;
  197. updateProgress({ progress: 15, currentStep: `开始补数(作品数:${total})...`, totalSteps: total });
  198. await xhsWorkStatsService.importAccountWorksStatistics(account, false, {
  199. workIdFilter: workIds,
  200. ignorePublishTimeLimit: true,
  201. ignorePublishAgeLimit: true,
  202. onProgress: ({ index, total, work }) => {
  203. const pct = Math.min(99, Math.max(15, Math.round(15 + (index / Math.max(1, total)) * 84)));
  204. updateProgress({
  205. progress: pct,
  206. currentStepIndex: index,
  207. totalSteps: total,
  208. currentStep: `第 ${index}/${total} 个作品:${(work.title || '').trim() || `workId=${work.id}`}`,
  209. });
  210. },
  211. });
  212. updateProgress({ progress: 100, currentStep: '补数完成' });
  213. return {
  214. success: true,
  215. message: `补数完成,作品数:${workIds.length}`,
  216. data: { workIdsCount: workIds.length },
  217. };
  218. }
  219. /**
  220. * 注册所有任务执行器
  221. */
  222. export function registerTaskExecutors(): void {
  223. taskQueueService.registerExecutor('sync_comments', syncCommentsExecutor);
  224. taskQueueService.registerExecutor('sync_works', syncWorksExecutor);
  225. taskQueueService.registerExecutor('sync_account', syncAccountExecutor);
  226. taskQueueService.registerExecutor('publish_video', publishVideoExecutor);
  227. taskQueueService.registerExecutor('delete_work', deleteWorkExecutor);
  228. taskQueueService.registerExecutor('xhs_work_stats_backfill', xhsWorkStatsBackfillExecutor);
  229. logger.info('All task executors registered');
  230. }