taskExecutors.ts 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364
  1. /**
  2. * 任务执行器注册
  3. * 在应用启动时注册所有任务类型的执行器
  4. */
  5. import { Task, TaskResult, TaskProgressUpdate } from '@media-manager/shared';
  6. import { taskQueueService } from './TaskQueueService.js';
  7. import { CommentService } from './CommentService.js';
  8. import { WorkService } from './WorkService.js';
  9. import { AccountService } from './AccountService.js';
  10. import { PublishService } from './PublishService.js';
  11. import { XiaohongshuWorkNoteStatisticsImportService } from './XiaohongshuWorkNoteStatisticsImportService.js';
  12. import { DouyinWorkStatisticsImportService } from './DouyinWorkStatisticsImportService.js';
  13. import { BaijiahaoWorkDailyStatisticsImportService } from './BaijiahaoWorkDailyStatisticsImportService.js';
  14. import { AppDataSource, PlatformAccount } from '../models/index.js';
  15. import { logger } from '../utils/logger.js';
  16. // 创建服务实例
  17. const commentService = new CommentService();
  18. const workService = new WorkService();
  19. const accountService = new AccountService();
  20. const publishService = new PublishService();
  21. const xhsWorkStatsService = new XiaohongshuWorkNoteStatisticsImportService();
  22. const dyWorkStatsService = new DouyinWorkStatisticsImportService();
  23. type ProgressUpdater = (update: Partial<TaskProgressUpdate>) => void;
  24. /**
  25. * 同步评论任务执行器
  26. */
  27. async function syncCommentsExecutor(task: Task, updateProgress: ProgressUpdater): Promise<TaskResult> {
  28. updateProgress({ progress: 5, currentStep: '连接平台...' });
  29. const onProgress = (current: number, total: number, workTitle: string) => {
  30. const progress = total > 0 ? Math.min(90, Math.round((current / total) * 90) + 5) : 50;
  31. updateProgress({
  32. progress,
  33. currentStep: `正在同步: ${workTitle || `作品 ${current}/${total}`}`,
  34. currentStepIndex: current,
  35. });
  36. };
  37. // 从任务中获取 userId(需要在创建任务时传入)
  38. const userId = (task as Task & { userId?: number }).userId;
  39. if (!userId) {
  40. throw new Error('缺少用户ID');
  41. }
  42. const result = await commentService.syncComments(userId, task.accountId, onProgress);
  43. updateProgress({ progress: 100, currentStep: '同步完成' });
  44. return {
  45. success: true,
  46. message: `同步完成,共同步 ${result.synced} 条评论`,
  47. data: {
  48. syncedCount: result.synced,
  49. accountCount: result.accounts,
  50. },
  51. };
  52. }
  53. /**
  54. * 同步作品任务执行器
  55. */
  56. async function syncWorksExecutor(task: Task, updateProgress: ProgressUpdater): Promise<TaskResult> {
  57. updateProgress({ progress: 5, currentStep: '获取作品列表...' });
  58. const userId = (task as Task & { userId?: number }).userId;
  59. if (!userId) {
  60. throw new Error('缺少用户ID');
  61. }
  62. const result = await workService.syncWorks(userId, task.accountId, task.platform, (progress, currentStep) => {
  63. updateProgress({ progress, currentStep });
  64. });
  65. updateProgress({ progress: 100, currentStep: '同步完成' });
  66. const summaryText = (() => {
  67. if (result.accountSummaries.length === 1) {
  68. const s = result.accountSummaries[0];
  69. return `(${s.platform} list=${s.worksListLength}/${s.worksCount} python=${s.pythonAvailable ? 'ok' : 'off'} source=${s.source || 'unknown'})`;
  70. }
  71. return '';
  72. })();
  73. return {
  74. success: true,
  75. message: `同步完成,共同步 ${result.synced} 个作品${summaryText}`,
  76. data: {
  77. syncedCount: result.synced,
  78. accountCount: result.accounts,
  79. accountSummaries: result.accountSummaries,
  80. },
  81. };
  82. }
  83. /**
  84. * 同步账号信息任务执行器
  85. */
  86. async function syncAccountExecutor(task: Task, updateProgress: ProgressUpdater): Promise<TaskResult> {
  87. updateProgress({ progress: 10, currentStep: '获取账号信息...' });
  88. const userId = (task as Task & { userId?: number }).userId;
  89. if (!userId) {
  90. throw new Error('缺少用户ID');
  91. }
  92. if (!task.accountId) {
  93. throw new Error('缺少账号ID');
  94. }
  95. const refreshResult = await accountService.refreshAccount(userId, task.accountId);
  96. if (refreshResult.needReLogin) {
  97. updateProgress({ progress: 100, currentStep: '需要重新登录' });
  98. throw new Error('账号登录已过期或触发风控,需要重新登录');
  99. }
  100. updateProgress({ progress: 100, currentStep: '同步完成' });
  101. return {
  102. success: true,
  103. message: '账号信息已更新',
  104. };
  105. }
  106. /**
  107. * 发布视频任务执行器
  108. */
  109. async function publishVideoExecutor(task: Task, updateProgress: ProgressUpdater): Promise<TaskResult> {
  110. updateProgress({ progress: 5, currentStep: '准备发布...' });
  111. const userId = (task as Task & { userId?: number }).userId;
  112. if (!userId) {
  113. throw new Error('缺少用户ID');
  114. }
  115. // 从任务数据中获取发布任务ID
  116. const taskData = task as Task & { publishTaskId?: number };
  117. if (!taskData.publishTaskId) {
  118. throw new Error('缺少发布任务ID');
  119. }
  120. // 执行发布任务
  121. await publishService.executePublishTaskWithProgress(
  122. taskData.publishTaskId,
  123. userId,
  124. (progress, message) => {
  125. updateProgress({ progress, currentStep: message });
  126. }
  127. );
  128. updateProgress({ progress: 100, currentStep: '发布完成' });
  129. return {
  130. success: true,
  131. message: '视频发布任务已完成',
  132. };
  133. }
  134. /**
  135. * 删除平台作品任务执行器
  136. */
  137. async function deleteWorkExecutor(task: Task, updateProgress: ProgressUpdater): Promise<TaskResult> {
  138. updateProgress({ progress: 10, currentStep: '准备删除...' });
  139. const userId = (task as Task & { userId?: number }).userId;
  140. if (!userId) {
  141. throw new Error('缺少用户ID');
  142. }
  143. const taskData = task as Task & { workId?: number };
  144. if (!taskData.workId) {
  145. throw new Error('缺少作品ID');
  146. }
  147. updateProgress({ progress: 30, currentStep: '连接平台删除作品...' });
  148. // 执行平台删除
  149. const result = await workService.deletePlatformWork(userId, taskData.workId);
  150. if (result.success) {
  151. updateProgress({ progress: 70, currentStep: '删除本地记录...' });
  152. // 平台删除成功后,删除本地记录
  153. try {
  154. await workService.deleteWork(userId, taskData.workId);
  155. logger.info(`Local work ${taskData.workId} deleted after platform deletion`);
  156. } catch (error) {
  157. logger.warn(`Failed to delete local work ${taskData.workId}:`, error);
  158. // 本地删除失败不影响整体结果
  159. }
  160. // 删除成功后,自动创建同步作品任务刷新作品列表
  161. if (result.accountId) {
  162. updateProgress({ progress: 90, currentStep: '刷新作品列表...' });
  163. try {
  164. taskQueueService.createTask(userId, {
  165. type: 'sync_works',
  166. title: '刷新作品列表',
  167. accountId: result.accountId,
  168. });
  169. logger.info(`Created sync_works task for account ${result.accountId} after delete`);
  170. } catch (syncError) {
  171. logger.warn(`Failed to create sync_works task after delete:`, syncError);
  172. // 同步任务创建失败不影响删除结果
  173. }
  174. }
  175. }
  176. updateProgress({ progress: 100, currentStep: result.success ? '删除完成' : '删除失败' });
  177. return {
  178. success: result.success,
  179. message: result.success ? '作品已从平台删除,正在刷新作品列表' : (result.errorMessage || '删除失败'),
  180. };
  181. }
  182. /**
  183. * 小红书作品首批日统计/快照补数任务执行器(不阻塞同步作品)
  184. * 任务 data: { workIds: number[] }
  185. */
  186. async function xhsWorkStatsBackfillExecutor(task: Task, updateProgress: ProgressUpdater): Promise<TaskResult> {
  187. updateProgress({ progress: 5, currentStep: '准备补数任务...' });
  188. const userId = (task as Task & { userId?: number }).userId;
  189. if (!userId) throw new Error('缺少用户ID');
  190. if (!task.accountId) throw new Error('缺少账号ID');
  191. const workIdsRaw = (task as Task & { workIds?: unknown }).workIds;
  192. const workIds = Array.isArray(workIdsRaw) ? workIdsRaw.map((x) => Number(x)).filter((n) => Number.isFinite(n) && n > 0) : [];
  193. if (!workIds.length) throw new Error('缺少 workIds');
  194. // 仅允许当前用户自己的账号
  195. const account = await AppDataSource.getRepository(PlatformAccount).findOne({
  196. where: { id: task.accountId, userId, platform: 'xiaohongshu' as any },
  197. });
  198. if (!account) throw new Error('未找到账号或无权限');
  199. const total = workIds.length;
  200. updateProgress({ progress: 15, currentStep: `开始补数(作品数:${total})...`, totalSteps: total });
  201. await xhsWorkStatsService.importAccountWorksStatistics(account, false, {
  202. workIdFilter: workIds,
  203. ignorePublishTimeLimit: true,
  204. ignorePublishAgeLimit: true,
  205. onProgress: ({ index, total, work }) => {
  206. const pct = Math.min(99, Math.max(15, Math.round(15 + (index / Math.max(1, total)) * 84)));
  207. updateProgress({
  208. progress: pct,
  209. currentStepIndex: index,
  210. totalSteps: total,
  211. currentStep: `第 ${index}/${total} 个作品:${(work.title || '').trim() || `workId=${work.id}`}`,
  212. });
  213. },
  214. });
  215. updateProgress({ progress: 100, currentStep: '补数完成' });
  216. return {
  217. success: true,
  218. message: `补数完成,作品数:${workIds.length}`,
  219. data: { workIdsCount: workIds.length },
  220. };
  221. }
  222. /**
  223. * 抖音作品日统计/快照补数(不阻塞同步作品)
  224. * 任务 data: { workIds: number[] }
  225. */
  226. async function dyWorkStatsBackfillExecutor(task: Task, updateProgress: ProgressUpdater): Promise<TaskResult> {
  227. updateProgress({ progress: 5, currentStep: '准备抖音作品补数任务...' });
  228. const userId = (task as Task & { userId?: number }).userId;
  229. if (!userId) throw new Error('缺少用户ID');
  230. if (!task.accountId) throw new Error('缺少账号ID');
  231. const workIdsRaw = (task as Task & { workIds?: unknown }).workIds;
  232. const workIds = Array.isArray(workIdsRaw)
  233. ? workIdsRaw.map((x) => Number(x)).filter((n) => Number.isFinite(n) && n > 0)
  234. : [];
  235. if (!workIds.length) throw new Error('缺少 workIds');
  236. // 仅允许当前用户自己的抖音账号
  237. const account = await AppDataSource.getRepository(PlatformAccount).findOne({
  238. where: { id: task.accountId, userId, platform: 'douyin' as any },
  239. });
  240. if (!account) throw new Error('未找到抖音账号或无权限');
  241. const total = workIds.length;
  242. updateProgress({ progress: 15, currentStep: `开始抖音作品补数(作品数:${total})...`, totalSteps: total });
  243. await dyWorkStatsService.importAccountWorksStatistics(account, false, {
  244. workIdFilter: workIds,
  245. onProgress: ({ index, total, work }) => {
  246. const pct = Math.min(99, Math.max(15, Math.round(15 + (index / Math.max(1, total)) * 84)));
  247. updateProgress({
  248. progress: pct,
  249. currentStepIndex: index,
  250. totalSteps: total,
  251. currentStep: `第 ${index}/${total} 个作品:${(work.title || '').trim() || `workId=${work.id}`}`,
  252. });
  253. },
  254. });
  255. updateProgress({ progress: 100, currentStep: '抖音作品补数完成' });
  256. return {
  257. success: true,
  258. message: `抖音作品补数完成,作品数:${workIds.length}`,
  259. data: { workIdsCount: workIds.length },
  260. };
  261. }
  262. /**
  263. * 百家号作品日统计/快照补数(不阻塞同步作品)
  264. * 任务 data: { workIds: number[] },实际执行时对整个账号运行 importAccountWorkDaily(补全所有作品的历史日统计)
  265. */
  266. async function bjWorkStatsBackfillExecutor(task: Task, updateProgress: ProgressUpdater): Promise<TaskResult> {
  267. updateProgress({ progress: 5, currentStep: '准备百家号作品补数任务...' });
  268. const userId = (task as Task & { userId?: number }).userId;
  269. if (!userId) throw new Error('缺少用户ID');
  270. if (!task.accountId) throw new Error('缺少账号ID');
  271. const workIdsRaw = (task as Task & { workIds?: unknown }).workIds;
  272. const workIds = Array.isArray(workIdsRaw)
  273. ? workIdsRaw.map((x) => Number(x)).filter((n) => Number.isFinite(n) && n > 0)
  274. : [];
  275. // 仅允许当前用户自己的百家号账号
  276. const account = await AppDataSource.getRepository(PlatformAccount).findOne({
  277. where: { id: task.accountId, userId, platform: 'baijiahao' as any },
  278. });
  279. if (!account) throw new Error('未找到百家号账号或无权限');
  280. updateProgress({ progress: 15, currentStep: `开始百家号作品补数(需补数作品:${workIds.length})...` });
  281. await BaijiahaoWorkDailyStatisticsImportService.runDailyImportForAccount(account.id);
  282. updateProgress({ progress: 100, currentStep: '百家号作品补数完成' });
  283. return {
  284. success: true,
  285. message: `百家号作品补数完成`,
  286. data: { workIdsCount: workIds.length },
  287. };
  288. }
  289. /**
  290. * 注册所有任务执行器
  291. */
  292. export function registerTaskExecutors(): void {
  293. taskQueueService.registerExecutor('sync_comments', syncCommentsExecutor);
  294. taskQueueService.registerExecutor('sync_works', syncWorksExecutor);
  295. taskQueueService.registerExecutor('sync_account', syncAccountExecutor);
  296. taskQueueService.registerExecutor('publish_video', publishVideoExecutor);
  297. taskQueueService.registerExecutor('delete_work', deleteWorkExecutor);
  298. taskQueueService.registerExecutor('xhs_work_stats_backfill', xhsWorkStatsBackfillExecutor);
  299. taskQueueService.registerExecutor('dy_work_stats_backfill', dyWorkStatsBackfillExecutor);
  300. taskQueueService.registerExecutor('bj_work_stats_backfill', bjWorkStatsBackfillExecutor);
  301. logger.info('All task executors registered');
  302. }