TaskQueueService.ts 9.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317
  1. import { v4 as uuidv4 } from 'uuid';
  2. import {
  3. Task,
  4. TaskType,
  5. TaskStatus,
  6. TaskPriority,
  7. TaskResult,
  8. TaskProgressUpdate,
  9. CreateTaskRequest,
  10. TASK_WS_EVENTS,
  11. } from '@media-manager/shared';
  12. import { wsManager } from '../websocket/index.js';
  13. import { logger } from '../utils/logger.js';
  14. import { config } from '../config/index.js';
  15. // 任务执行器类型
  16. export type TaskExecutor = (
  17. task: Task,
  18. updateProgress: (update: Partial<TaskProgressUpdate>) => void
  19. ) => Promise<TaskResult>;
  20. // 检查是否启用 Redis
  21. const USE_REDIS = process.env.USE_REDIS_QUEUE === 'true';
  22. /**
  23. * 全局异步任务队列服务
  24. * 管理所有后台任务的创建、执行、进度追踪
  25. */
  26. class TaskQueueService {
  27. // 用户任务列表 Map<userId, Task[]>
  28. private userTasks: Map<number, Task[]> = new Map();
  29. // 任务执行器 Map<TaskType, TaskExecutor>
  30. private executors: Map<TaskType, TaskExecutor> = new Map();
  31. // 正在执行的任务数量限制(每用户)
  32. private maxConcurrentTasks = 3;
  33. /**
  34. * 注册任务执行器
  35. */
  36. registerExecutor(type: TaskType, executor: TaskExecutor): void {
  37. this.executors.set(type, executor);
  38. logger.info(`Task executor registered: ${type}`);
  39. }
  40. /**
  41. * 创建新任务
  42. */
  43. createTask(userId: number, request: CreateTaskRequest): Task & { userId: number } {
  44. const task: Task & { userId: number; [key: string]: unknown } = {
  45. id: uuidv4(),
  46. type: request.type,
  47. title: request.title || this.getDefaultTitle(request.type),
  48. description: request.description,
  49. status: 'pending',
  50. progress: 0,
  51. priority: request.priority || 'normal',
  52. createdAt: new Date().toISOString(),
  53. accountId: request.accountId,
  54. userId, // 存储 userId 用于任务执行
  55. // 合并额外数据
  56. ...(request.data || {}),
  57. };
  58. // 添加到用户任务列表
  59. if (!this.userTasks.has(userId)) {
  60. this.userTasks.set(userId, []);
  61. }
  62. this.userTasks.get(userId)!.push(task);
  63. // 通知前端任务已创建
  64. this.notifyUser(userId, TASK_WS_EVENTS.TASK_CREATED, { task });
  65. logger.info(`Task created: ${task.id} (${task.type}) for user ${userId}`);
  66. // 尝试执行任务
  67. this.tryExecuteNext(userId);
  68. return task;
  69. }
  70. /**
  71. * 获取用户的所有任务
  72. */
  73. getUserTasks(userId: number): Task[] {
  74. return this.userTasks.get(userId) || [];
  75. }
  76. /**
  77. * 获取用户的活跃任务(pending + running)
  78. */
  79. getActiveTasks(userId: number): Task[] {
  80. const tasks = this.userTasks.get(userId) || [];
  81. return tasks.filter(t => t.status === 'pending' || t.status === 'running');
  82. }
  83. /**
  84. * 取消任务
  85. */
  86. cancelTask(userId: number, taskId: string): boolean {
  87. const tasks = this.userTasks.get(userId);
  88. if (!tasks) return false;
  89. const task = tasks.find(t => t.id === taskId);
  90. if (!task) return false;
  91. if (task.status === 'pending') {
  92. task.status = 'cancelled';
  93. task.completedAt = new Date().toISOString();
  94. this.notifyUser(userId, TASK_WS_EVENTS.TASK_CANCELLED, { task });
  95. logger.info(`Task cancelled: ${taskId}`);
  96. return true;
  97. }
  98. // 正在运行的任务暂不支持取消
  99. return false;
  100. }
  101. /**
  102. * 清理已完成的任务(保留最近N个)
  103. */
  104. cleanupCompletedTasks(userId: number, keepCount = 10): void {
  105. const tasks = this.userTasks.get(userId);
  106. if (!tasks) return;
  107. const completedTasks = tasks.filter(t =>
  108. t.status === 'completed' || t.status === 'failed' || t.status === 'cancelled'
  109. );
  110. if (completedTasks.length > keepCount) {
  111. // 按完成时间排序,保留最新的
  112. completedTasks.sort((a, b) =>
  113. new Date(b.completedAt || 0).getTime() - new Date(a.completedAt || 0).getTime()
  114. );
  115. const toRemove = completedTasks.slice(keepCount);
  116. const toRemoveIds = new Set(toRemove.map(t => t.id));
  117. this.userTasks.set(userId, tasks.filter(t => !toRemoveIds.has(t.id)));
  118. }
  119. }
  120. /**
  121. * 尝试执行下一个任务(支持并行执行多个任务)
  122. */
  123. private tryExecuteNext(userId: number): void {
  124. const tasks = this.userTasks.get(userId);
  125. if (!tasks) return;
  126. // 检查当前运行中的任务数量
  127. const runningCount = tasks.filter(t => t.status === 'running').length;
  128. const availableSlots = this.maxConcurrentTasks - runningCount;
  129. if (availableSlots <= 0) {
  130. return;
  131. }
  132. // 找到待执行的任务(按优先级排序)
  133. const pendingTasks = tasks.filter(t => t.status === 'pending');
  134. if (pendingTasks.length === 0) return;
  135. // 按优先级排序
  136. pendingTasks.sort((a, b) => {
  137. const priorityOrder = { high: 0, normal: 1, low: 2 };
  138. return priorityOrder[a.priority] - priorityOrder[b.priority];
  139. });
  140. // 并行启动多个任务(不使用 await,让它们并行执行)
  141. const tasksToStart = pendingTasks.slice(0, availableSlots);
  142. for (const task of tasksToStart) {
  143. // 使用 void 来明确表示我们不等待这个 Promise
  144. void this.executeTask(userId, task);
  145. }
  146. }
  147. /**
  148. * 执行任务
  149. */
  150. private async executeTask(userId: number, task: Task): Promise<void> {
  151. const executor = this.executors.get(task.type);
  152. if (!executor) {
  153. logger.error(`No executor registered for task type: ${task.type}`);
  154. task.status = 'failed';
  155. task.error = `不支持的任务类型: ${task.type}`;
  156. task.completedAt = new Date().toISOString();
  157. this.notifyUser(userId, TASK_WS_EVENTS.TASK_FAILED, { task });
  158. return;
  159. }
  160. // 更新任务状态为运行中
  161. task.status = 'running';
  162. task.startedAt = new Date().toISOString();
  163. task.progress = 0;
  164. this.notifyUser(userId, TASK_WS_EVENTS.TASK_STARTED, { task });
  165. logger.info(`Task started: ${task.id} (${task.type})`);
  166. // 进度更新回调
  167. const updateProgress = (update: Partial<TaskProgressUpdate>) => {
  168. if (update.progress !== undefined) task.progress = update.progress;
  169. if (update.currentStep !== undefined) task.currentStep = update.currentStep;
  170. if (update.currentStepIndex !== undefined) task.currentStepIndex = update.currentStepIndex;
  171. this.notifyUser(userId, TASK_WS_EVENTS.TASK_PROGRESS, {
  172. taskId: task.id,
  173. progress: task.progress,
  174. currentStep: task.currentStep,
  175. currentStepIndex: task.currentStepIndex,
  176. message: update.message,
  177. });
  178. };
  179. try {
  180. const result = await executor(task, updateProgress);
  181. task.status = 'completed';
  182. task.progress = 100;
  183. task.result = result;
  184. task.completedAt = new Date().toISOString();
  185. this.notifyUser(userId, TASK_WS_EVENTS.TASK_COMPLETED, { task });
  186. logger.info(`Task completed: ${task.id}, result: ${result.message}`);
  187. } catch (error) {
  188. task.status = 'failed';
  189. task.error = error instanceof Error ? error.message : '任务执行失败';
  190. task.completedAt = new Date().toISOString();
  191. this.notifyUser(userId, TASK_WS_EVENTS.TASK_FAILED, { task });
  192. logger.error(`Task failed: ${task.id}`, error);
  193. }
  194. // 清理旧任务并尝试执行下一个
  195. this.cleanupCompletedTasks(userId);
  196. this.tryExecuteNext(userId);
  197. }
  198. /**
  199. * 通知用户
  200. */
  201. private notifyUser(userId: number, event: string, data: Record<string, unknown>): void {
  202. wsManager.sendToUser(userId, event, {
  203. event: event.split(':')[1], // 提取事件名
  204. ...data,
  205. });
  206. }
  207. /**
  208. * 获取默认任务标题
  209. */
  210. private getDefaultTitle(type: TaskType): string {
  211. const titles: Record<TaskType, string> = {
  212. sync_comments: '同步评论',
  213. sync_works: '同步作品',
  214. sync_account: '同步账号信息',
  215. publish_video: '发布视频',
  216. batch_reply: '批量回复评论',
  217. };
  218. return titles[type] || '未知任务';
  219. }
  220. /**
  221. * 发送任务列表给用户
  222. */
  223. sendTaskList(userId: number): void {
  224. const tasks = this.getUserTasks(userId);
  225. wsManager.sendToUser(userId, TASK_WS_EVENTS.TASK_LIST, {
  226. event: 'list',
  227. tasks,
  228. });
  229. }
  230. /**
  231. * 启动 Worker(内存队列模式下为空操作)
  232. */
  233. startWorker(): void {
  234. logger.info('Memory Task Queue started (no worker needed)');
  235. }
  236. /**
  237. * 停止 Worker
  238. */
  239. async stopWorker(): Promise<void> {
  240. logger.info('Memory Task Queue stopped');
  241. }
  242. /**
  243. * 关闭服务
  244. */
  245. async close(): Promise<void> {
  246. logger.info('Memory Task Queue Service closed');
  247. }
  248. }
  249. // 内存队列单例
  250. const memoryTaskQueueService = new TaskQueueService();
  251. // 根据配置选择队列实现
  252. let taskQueueService: TaskQueueService;
  253. if (USE_REDIS) {
  254. // 动态导入 Redis 队列
  255. import('./RedisTaskQueue.js').then(({ redisTaskQueueService }) => {
  256. (taskQueueService as unknown) = redisTaskQueueService;
  257. logger.info('Using Redis Task Queue');
  258. }).catch((err) => {
  259. logger.warn('Failed to load Redis Task Queue, falling back to memory queue:', err.message);
  260. taskQueueService = memoryTaskQueueService;
  261. });
  262. // 初始设置为内存队列(在 Redis 加载完成前使用)
  263. taskQueueService = memoryTaskQueueService;
  264. } else {
  265. taskQueueService = memoryTaskQueueService;
  266. logger.info('Using Memory Task Queue');
  267. }
  268. export { taskQueueService };