taskExecutors.ts 7.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224
  1. /**
  2. * 任务执行器注册
  3. * 在应用启动时注册所有任务类型的执行器
  4. */
  5. import { Task, TaskResult, TaskProgressUpdate } from '@media-manager/shared';
  6. import { taskQueueService } from './TaskQueueService.js';
  7. import { CommentService } from './CommentService.js';
  8. import { WorkService } from './WorkService.js';
  9. import { AccountService } from './AccountService.js';
  10. import { PublishService } from './PublishService.js';
  11. import { logger } from '../utils/logger.js';
  12. // 创建服务实例
  13. const commentService = new CommentService();
  14. const workService = new WorkService();
  15. const accountService = new AccountService();
  16. const publishService = new PublishService();
  17. type ProgressUpdater = (update: Partial<TaskProgressUpdate>) => void;
  18. /**
  19. * 同步评论任务执行器
  20. */
  21. async function syncCommentsExecutor(task: Task, updateProgress: ProgressUpdater): Promise<TaskResult> {
  22. updateProgress({ progress: 5, currentStep: '连接平台...' });
  23. const onProgress = (current: number, total: number, workTitle: string) => {
  24. const progress = total > 0 ? Math.min(90, Math.round((current / total) * 90) + 5) : 50;
  25. updateProgress({
  26. progress,
  27. currentStep: `正在同步: ${workTitle || `作品 ${current}/${total}`}`,
  28. currentStepIndex: current,
  29. });
  30. };
  31. // 从任务中获取 userId(需要在创建任务时传入)
  32. const userId = (task as Task & { userId?: number }).userId;
  33. if (!userId) {
  34. throw new Error('缺少用户ID');
  35. }
  36. const result = await commentService.syncComments(userId, task.accountId, onProgress);
  37. updateProgress({ progress: 100, currentStep: '同步完成' });
  38. return {
  39. success: true,
  40. message: `同步完成,共同步 ${result.synced} 条评论`,
  41. data: {
  42. syncedCount: result.synced,
  43. accountCount: result.accounts,
  44. },
  45. };
  46. }
  47. /**
  48. * 同步作品任务执行器
  49. */
  50. async function syncWorksExecutor(task: Task, updateProgress: ProgressUpdater): Promise<TaskResult> {
  51. updateProgress({ progress: 5, currentStep: '获取作品列表...' });
  52. const userId = (task as Task & { userId?: number }).userId;
  53. if (!userId) {
  54. throw new Error('缺少用户ID');
  55. }
  56. const result = await workService.syncWorks(userId, task.accountId, task.platform, (progress, currentStep) => {
  57. updateProgress({ progress, currentStep });
  58. });
  59. updateProgress({ progress: 100, currentStep: '同步完成' });
  60. const summaryText = (() => {
  61. if (result.accountSummaries.length === 1) {
  62. const s = result.accountSummaries[0];
  63. return `(${s.platform} list=${s.worksListLength}/${s.worksCount} python=${s.pythonAvailable ? 'ok' : 'off'} source=${s.source || 'unknown'})`;
  64. }
  65. return '';
  66. })();
  67. return {
  68. success: true,
  69. message: `同步完成,共同步 ${result.synced} 个作品${summaryText}`,
  70. data: {
  71. syncedCount: result.synced,
  72. accountCount: result.accounts,
  73. accountSummaries: result.accountSummaries,
  74. },
  75. };
  76. }
  77. /**
  78. * 同步账号信息任务执行器
  79. */
  80. async function syncAccountExecutor(task: Task, updateProgress: ProgressUpdater): Promise<TaskResult> {
  81. updateProgress({ progress: 10, currentStep: '获取账号信息...' });
  82. const userId = (task as Task & { userId?: number }).userId;
  83. if (!userId) {
  84. throw new Error('缺少用户ID');
  85. }
  86. if (!task.accountId) {
  87. throw new Error('缺少账号ID');
  88. }
  89. const refreshResult = await accountService.refreshAccount(userId, task.accountId);
  90. if (refreshResult.needReLogin) {
  91. updateProgress({ progress: 100, currentStep: '需要重新登录' });
  92. throw new Error('账号登录已过期或触发风控,需要重新登录');
  93. }
  94. updateProgress({ progress: 100, currentStep: '同步完成' });
  95. return {
  96. success: true,
  97. message: '账号信息已更新',
  98. };
  99. }
  100. /**
  101. * 发布视频任务执行器
  102. */
  103. async function publishVideoExecutor(task: Task, updateProgress: ProgressUpdater): Promise<TaskResult> {
  104. updateProgress({ progress: 5, currentStep: '准备发布...' });
  105. const userId = (task as Task & { userId?: number }).userId;
  106. if (!userId) {
  107. throw new Error('缺少用户ID');
  108. }
  109. // 从任务数据中获取发布任务ID
  110. const taskData = task as Task & { publishTaskId?: number };
  111. if (!taskData.publishTaskId) {
  112. throw new Error('缺少发布任务ID');
  113. }
  114. // 执行发布任务
  115. await publishService.executePublishTaskWithProgress(
  116. taskData.publishTaskId,
  117. userId,
  118. (progress, message) => {
  119. updateProgress({ progress, currentStep: message });
  120. }
  121. );
  122. updateProgress({ progress: 100, currentStep: '发布完成' });
  123. return {
  124. success: true,
  125. message: '视频发布任务已完成',
  126. };
  127. }
  128. /**
  129. * 删除平台作品任务执行器
  130. */
  131. async function deleteWorkExecutor(task: Task, updateProgress: ProgressUpdater): Promise<TaskResult> {
  132. updateProgress({ progress: 10, currentStep: '准备删除...' });
  133. const userId = (task as Task & { userId?: number }).userId;
  134. if (!userId) {
  135. throw new Error('缺少用户ID');
  136. }
  137. const taskData = task as Task & { workId?: number };
  138. if (!taskData.workId) {
  139. throw new Error('缺少作品ID');
  140. }
  141. updateProgress({ progress: 30, currentStep: '连接平台删除作品...' });
  142. // 执行平台删除
  143. const result = await workService.deletePlatformWork(userId, taskData.workId);
  144. if (result.success) {
  145. updateProgress({ progress: 70, currentStep: '删除本地记录...' });
  146. // 平台删除成功后,删除本地记录
  147. try {
  148. await workService.deleteWork(userId, taskData.workId);
  149. logger.info(`Local work ${taskData.workId} deleted after platform deletion`);
  150. } catch (error) {
  151. logger.warn(`Failed to delete local work ${taskData.workId}:`, error);
  152. // 本地删除失败不影响整体结果
  153. }
  154. // 删除成功后,自动创建同步作品任务刷新作品列表
  155. if (result.accountId) {
  156. updateProgress({ progress: 90, currentStep: '刷新作品列表...' });
  157. try {
  158. taskQueueService.createTask(userId, {
  159. type: 'sync_works',
  160. title: '刷新作品列表',
  161. accountId: result.accountId,
  162. });
  163. logger.info(`Created sync_works task for account ${result.accountId} after delete`);
  164. } catch (syncError) {
  165. logger.warn(`Failed to create sync_works task after delete:`, syncError);
  166. // 同步任务创建失败不影响删除结果
  167. }
  168. }
  169. }
  170. updateProgress({ progress: 100, currentStep: result.success ? '删除完成' : '删除失败' });
  171. return {
  172. success: result.success,
  173. message: result.success ? '作品已从平台删除,正在刷新作品列表' : (result.errorMessage || '删除失败'),
  174. };
  175. }
  176. /**
  177. * 注册所有任务执行器
  178. */
  179. export function registerTaskExecutors(): void {
  180. taskQueueService.registerExecutor('sync_comments', syncCommentsExecutor);
  181. taskQueueService.registerExecutor('sync_works', syncWorksExecutor);
  182. taskQueueService.registerExecutor('sync_account', syncAccountExecutor);
  183. taskQueueService.registerExecutor('publish_video', publishVideoExecutor);
  184. taskQueueService.registerExecutor('delete_work', deleteWorkExecutor);
  185. logger.info('All task executors registered');
  186. }