TaskQueueService.ts 9.5 KB


  1. import { v4 as uuidv4 } from 'uuid';
  2. import {
  3. Task,
  4. TaskType,
  5. TaskStatus,
  6. TaskPriority,
  7. TaskResult,
  8. TaskProgressUpdate,
  9. CreateTaskRequest,
  10. TASK_WS_EVENTS,
  11. } from '@media-manager/shared';
  12. import { wsManager } from '../websocket/index.js';
  13. import { logger } from '../utils/logger.js';
  14. import { config } from '../config/index.js';
  15. // 任务执行器类型
  16. export type TaskExecutor = (
  17. task: Task,
  18. updateProgress: (update: Partial<TaskProgressUpdate>) => void
  19. ) => Promise<TaskResult>;
  20. // 检查是否启用 Redis
  21. const USE_REDIS = process.env.USE_REDIS_QUEUE === 'true';
  22. /**
  23. * 全局异步任务队列服务
  24. * 管理所有后台任务的创建、执行、进度追踪
  25. */
  26. class TaskQueueService {
  27. // 用户任务列表 Map<userId, Task[]>
  28. private userTasks: Map<number, Task[]> = new Map();
  29. // 任务执行器 Map<TaskType, TaskExecutor>
  30. private executors: Map<TaskType, TaskExecutor> = new Map();
  31. // 正在执行的任务数量限制(每用户)
  32. private maxConcurrentTasks = 3;
  33. /**
  34. * 注册任务执行器
  35. */
  36. registerExecutor(type: TaskType, executor: TaskExecutor): void {
  37. this.executors.set(type, executor);
  38. logger.info(`Task executor registered: ${type}`);
  39. }
  40. /**
  41. * 创建新任务
  42. */
  43. createTask(userId: number, request: CreateTaskRequest): Task & { userId: number } {
  44. const task: Task & { userId: number; [key: string]: unknown } = {
  45. id: uuidv4(),
  46. type: request.type,
  47. title: request.title || this.getDefaultTitle(request.type),
  48. description: request.description,
  49. status: 'pending',
  50. progress: 0,
  51. priority: request.priority || 'normal',
  52. silent: request.silent || false, // 静默执行标记
  53. createdAt: new Date().toISOString(),
  54. accountId: request.accountId,
  55. platform: request.platform,
  56. userId, // 存储 userId 用于任务执行
  57. // 合并额外数据
  58. ...(request.data || {}),
  59. };
  60. // 添加到用户任务列表
  61. if (!this.userTasks.has(userId)) {
  62. this.userTasks.set(userId, []);
  63. }
  64. this.userTasks.get(userId)!.push(task);
  65. // 通知前端任务已创建
  66. this.notifyUser(userId, TASK_WS_EVENTS.TASK_CREATED, { task });
  67. logger.info(`Task created: ${task.id} (${task.type}) for user ${userId}`);
  68. // 尝试执行任务
  69. this.tryExecuteNext(userId);
  70. return task;
  71. }
  72. /**
  73. * 获取用户的所有任务
  74. */
  75. getUserTasks(userId: number): Task[] {
  76. return this.userTasks.get(userId) || [];
  77. }
  78. /**
  79. * 获取用户的活跃任务(pending + running)
  80. */
  81. getActiveTasks(userId: number): Task[] {
  82. const tasks = this.userTasks.get(userId) || [];
  83. return tasks.filter(t => t.status === 'pending' || t.status === 'running');
  84. }
  85. /**
  86. * 取消任务
  87. */
  88. cancelTask(userId: number, taskId: string): boolean {
  89. const tasks = this.userTasks.get(userId);
  90. if (!tasks) return false;
  91. const task = tasks.find(t => t.id === taskId);
  92. if (!task) return false;
  93. if (task.status === 'pending') {
  94. task.status = 'cancelled';
  95. task.completedAt = new Date().toISOString();
  96. this.notifyUser(userId, TASK_WS_EVENTS.TASK_CANCELLED, { task });
  97. logger.info(`Task cancelled: ${taskId}`);
  98. return true;
  99. }
  100. // 正在运行的任务暂不支持取消
  101. return false;
  102. }
  103. /**
  104. * 清理已完成的任务(保留最近N个)
  105. */
  106. cleanupCompletedTasks(userId: number, keepCount = 10): void {
  107. const tasks = this.userTasks.get(userId);
  108. if (!tasks) return;
  109. const completedTasks = tasks.filter(t =>
  110. t.status === 'completed' || t.status === 'failed' || t.status === 'cancelled'
  111. );
  112. if (completedTasks.length > keepCount) {
  113. // 按完成时间排序,保留最新的
  114. completedTasks.sort((a, b) =>
  115. new Date(b.completedAt || 0).getTime() - new Date(a.completedAt || 0).getTime()
  116. );
  117. const toRemove = completedTasks.slice(keepCount);
  118. const toRemoveIds = new Set(toRemove.map(t => t.id));
  119. this.userTasks.set(userId, tasks.filter(t => !toRemoveIds.has(t.id)));
  120. }
  121. }
  122. /**
  123. * 尝试执行下一个任务(支持并行执行多个任务)
  124. */
  125. private tryExecuteNext(userId: number): void {
  126. const tasks = this.userTasks.get(userId);
  127. if (!tasks) return;
  128. // 检查当前运行中的任务数量
  129. const runningCount = tasks.filter(t => t.status === 'running').length;
  130. const availableSlots = this.maxConcurrentTasks - runningCount;
  131. if (availableSlots <= 0) {
  132. return;
  133. }
  134. // 找到待执行的任务(按优先级排序)
  135. const pendingTasks = tasks.filter(t => t.status === 'pending');
  136. if (pendingTasks.length === 0) return;
  137. // 按优先级排序
  138. pendingTasks.sort((a, b) => {
  139. const priorityOrder = { high: 0, normal: 1, low: 2 };
  140. return priorityOrder[a.priority] - priorityOrder[b.priority];
  141. });
  142. // 并行启动多个任务(不使用 await,让它们并行执行)
  143. const tasksToStart = pendingTasks.slice(0, availableSlots);
  144. for (const task of tasksToStart) {
  145. // 使用 void 来明确表示我们不等待这个 Promise
  146. void this.executeTask(userId, task);
  147. }
  148. }
  149. /**
  150. * 执行任务
  151. */
  152. private async executeTask(userId: number, task: Task): Promise<void> {
  153. const executor = this.executors.get(task.type);
  154. if (!executor) {
  155. logger.error(`No executor registered for task type: ${task.type}`);
  156. task.status = 'failed';
  157. task.error = `不支持的任务类型: ${task.type}`;
  158. task.completedAt = new Date().toISOString();
  159. this.notifyUser(userId, TASK_WS_EVENTS.TASK_FAILED, { task });
  160. return;
  161. }
  162. // 更新任务状态为运行中
  163. task.status = 'running';
  164. task.startedAt = new Date().toISOString();
  165. task.progress = 0;
  166. this.notifyUser(userId, TASK_WS_EVENTS.TASK_STARTED, { task });
  167. logger.info(`Task started: ${task.id} (${task.type})`);
  168. // 进度更新回调
  169. const updateProgress = (update: Partial<TaskProgressUpdate>) => {
  170. if (update.progress !== undefined) task.progress = update.progress;
  171. if (update.currentStep !== undefined) task.currentStep = update.currentStep;
  172. if (update.currentStepIndex !== undefined) task.currentStepIndex = update.currentStepIndex;
  173. this.notifyUser(userId, TASK_WS_EVENTS.TASK_PROGRESS, {
  174. taskId: task.id,
  175. progress: task.progress,
  176. currentStep: task.currentStep,
  177. currentStepIndex: task.currentStepIndex,
  178. message: update.message,
  179. });
  180. };
  181. try {
  182. const result = await executor(task, updateProgress);
  183. task.status = 'completed';
  184. task.progress = 100;
  185. task.result = result;
  186. task.completedAt = new Date().toISOString();
  187. this.notifyUser(userId, TASK_WS_EVENTS.TASK_COMPLETED, { task });
  188. logger.info(`Task completed: ${task.id}, result: ${result.message}`);
  189. } catch (error) {
  190. task.status = 'failed';
  191. task.error = error instanceof Error ? error.message : '任务执行失败';
  192. task.completedAt = new Date().toISOString();
  193. this.notifyUser(userId, TASK_WS_EVENTS.TASK_FAILED, { task });
  194. logger.error(`Task failed: ${task.id}`, error);
  195. }
  196. // 清理旧任务并尝试执行下一个
  197. this.cleanupCompletedTasks(userId);
  198. this.tryExecuteNext(userId);
  199. }
  200. /**
  201. * 通知用户
  202. */
  203. private notifyUser(userId: number, event: string, data: Record<string, unknown>): void {
  204. wsManager.sendToUser(userId, event, {
  205. event: event.split(':')[1], // 提取事件名
  206. ...data,
  207. });
  208. }
  209. /**
  210. * 获取默认任务标题
  211. */
  212. private getDefaultTitle(type: TaskType): string {
  213. const titles: Record<TaskType, string> = {
  214. sync_comments: '同步评论',
  215. sync_works: '同步作品',
  216. sync_account: '同步账号信息',
  217. publish_video: '发布视频',
  218. batch_reply: '批量回复评论',
  219. delete_work: '删除作品',
  220. xhs_work_stats_backfill: '小红书作品补数',
  221. dy_work_stats_backfill: '抖音作品补数',
  222. bj_work_stats_backfill: '百家号作品补数',
  223. };
  224. return titles[type] || '未知任务';
  225. }
  226. /**
  227. * 发送任务列表给用户
  228. */
  229. sendTaskList(userId: number): void {
  230. const tasks = this.getUserTasks(userId);
  231. wsManager.sendToUser(userId, TASK_WS_EVENTS.TASK_LIST, {
  232. event: 'list',
  233. tasks,
  234. });
  235. }
  236. /**
  237. * 启动 Worker(内存队列模式下为空操作)
  238. */
  239. startWorker(): void {
  240. logger.info('Memory Task Queue started (no worker needed)');
  241. }
  242. /**
  243. * 停止 Worker
  244. */
  245. async stopWorker(): Promise<void> {
  246. logger.info('Memory Task Queue stopped');
  247. }
  248. /**
  249. * 关闭服务
  250. */
  251. async close(): Promise<void> {
  252. logger.info('Memory Task Queue Service closed');
  253. }
  254. }
  255. // 内存队列单例
  256. const memoryTaskQueueService = new TaskQueueService();
  257. // 根据配置选择队列实现
  258. let taskQueueService: TaskQueueService;
  259. if (USE_REDIS) {
  260. // 动态导入 Redis 队列
  261. import('./RedisTaskQueue.js').then(({ redisTaskQueueService }) => {
  262. (taskQueueService as unknown) = redisTaskQueueService;
  263. logger.info('Using Redis Task Queue');
  264. }).catch((err) => {
  265. logger.warn('Failed to load Redis Task Queue, falling back to memory queue:', err.message);
  266. taskQueueService = memoryTaskQueueService;
  267. });
  268. // 初始设置为内存队列(在 Redis 加载完成前使用)
  269. taskQueueService = memoryTaskQueueService;
  270. } else {
  271. taskQueueService = memoryTaskQueueService;
  272. logger.info('Using Memory Task Queue');
  273. }
  274. export { taskQueueService };