RedisTaskQueue.ts 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435
  1. import { Queue, Worker, Job, QueueEvents } from 'bullmq';
  2. import IORedis from 'ioredis';
  3. import { v4 as uuidv4 } from 'uuid';
  4. import {
  5. Task,
  6. TaskType,
  7. TaskResult,
  8. TaskProgressUpdate,
  9. CreateTaskRequest,
  10. TASK_WS_EVENTS,
  11. } from '@media-manager/shared';
  12. import { wsManager } from '../websocket/index.js';
  13. import { logger } from '../utils/logger.js';
  14. import { config } from '../config/index.js';
  15. // Redis 连接配置
  16. const redisConnection = new IORedis({
  17. host: config.redis.host,
  18. port: config.redis.port,
  19. password: config.redis.password || undefined,
  20. db: config.redis.db,
  21. maxRetriesPerRequest: null, // BullMQ 需要这个设置
  22. });
  23. // 任务执行器类型
  24. type TaskExecutor = (
  25. task: Task,
  26. updateProgress: (update: Partial<TaskProgressUpdate>) => void
  27. ) => Promise<TaskResult>;
  28. // 队列名称
  29. const QUEUE_NAME = 'media-manager-tasks';
  30. /**
  31. * 基于 Redis (BullMQ) 的任务队列服务
  32. * 支持分布式、持久化、并行处理
  33. */
  34. class RedisTaskQueueService {
  35. private queue: Queue;
  36. private queueEvents: QueueEvents;
  37. private worker: Worker | null = null;
  38. // 任务执行器 Map<TaskType, TaskExecutor>
  39. private executors: Map<TaskType, TaskExecutor> = new Map();
  40. // 内存中缓存用户任务(用于快速查询)
  41. private userTasks: Map<number, Task[]> = new Map();
  42. // 最大并行任务数
  43. private concurrency = 5;
  44. constructor() {
  45. // 创建队列
  46. this.queue = new Queue(QUEUE_NAME, {
  47. // @ts-ignore
  48. connection: redisConnection,
  49. defaultJobOptions: {
  50. removeOnComplete: { count: 100 }, // 保留最近100个完成的任务
  51. removeOnFail: { count: 50 }, // 保留最近50个失败的任务
  52. attempts: 3, // 失败重试次数
  53. backoff: {
  54. type: 'exponential',
  55. delay: 1000,
  56. },
  57. },
  58. });
  59. // 创建队列事件监听
  60. this.queueEvents = new QueueEvents(QUEUE_NAME, {
  61. // @ts-ignore
  62. connection: redisConnection.duplicate(),
  63. });
  64. this.setupEventListeners();
  65. logger.info('Redis Task Queue Service initialized');
  66. }
  67. /**
  68. * 设置事件监听
  69. */
  70. private setupEventListeners(): void {
  71. this.queueEvents.on('completed', async ({ jobId }) => {
  72. logger.info(`Job ${jobId} completed`);
  73. });
  74. this.queueEvents.on('failed', async ({ jobId, failedReason }) => {
  75. logger.error(`Job ${jobId} failed: ${failedReason}`);
  76. });
  77. this.queueEvents.on('progress', async ({ jobId, data }) => {
  78. logger.debug(`Job ${jobId} progress:`, data);
  79. });
  80. }
  81. /**
  82. * 启动 Worker(处理任务)
  83. */
  84. startWorker(): void {
  85. if (this.worker) {
  86. logger.warn('Worker already running');
  87. return;
  88. }
  89. this.worker = new Worker(
  90. QUEUE_NAME,
  91. async (job: Job) => {
  92. return this.processJob(job);
  93. },
  94. {
  95. // @ts-ignore
  96. connection: redisConnection.duplicate(),
  97. concurrency: this.concurrency, // 并行处理任务数
  98. }
  99. );
  100. this.worker.on('completed', (job, result) => {
  101. logger.info(`Worker completed job ${job.id}: ${result?.message || 'success'}`);
  102. });
  103. this.worker.on('failed', (job, error) => {
  104. logger.error(`Worker failed job ${job?.id}:`, error);
  105. });
  106. this.worker.on('error', (error) => {
  107. logger.error('Worker error:', error);
  108. });
  109. logger.info(`Redis Task Queue Worker started with concurrency: ${this.concurrency}`);
  110. }
  111. /**
  112. * 停止 Worker
  113. */
  114. async stopWorker(): Promise<void> {
  115. if (this.worker) {
  116. await this.worker.close();
  117. this.worker = null;
  118. logger.info('Redis Task Queue Worker stopped');
  119. }
  120. }
  121. /**
  122. * 处理任务
  123. */
  124. private async processJob(job: Job): Promise<TaskResult> {
  125. const taskData = job.data as Task & { userId: number };
  126. const { userId } = taskData;
  127. const executor = this.executors.get(taskData.type);
  128. if (!executor) {
  129. throw new Error(`No executor registered for task type: ${taskData.type}`);
  130. }
  131. // 更新内存缓存中的任务状态
  132. this.updateTaskInCache(userId, taskData.id, {
  133. status: 'running',
  134. startedAt: new Date().toISOString(),
  135. });
  136. // 通知前端任务开始
  137. this.notifyUser(userId, TASK_WS_EVENTS.TASK_STARTED, {
  138. task: this.getTaskFromCache(userId, taskData.id)
  139. });
  140. // 进度更新回调
  141. const updateProgress = async (update: Partial<TaskProgressUpdate>) => {
  142. // 更新 Job 进度
  143. await job.updateProgress(update);
  144. // 更新内存缓存
  145. this.updateTaskInCache(userId, taskData.id, {
  146. progress: update.progress,
  147. currentStep: update.currentStep,
  148. currentStepIndex: update.currentStepIndex,
  149. });
  150. // 通知前端
  151. this.notifyUser(userId, TASK_WS_EVENTS.TASK_PROGRESS, {
  152. taskId: taskData.id,
  153. progress: update.progress,
  154. currentStep: update.currentStep,
  155. currentStepIndex: update.currentStepIndex,
  156. message: update.message,
  157. });
  158. };
  159. try {
  160. const result = await executor(taskData, updateProgress);
  161. // 更新缓存
  162. this.updateTaskInCache(userId, taskData.id, {
  163. status: 'completed',
  164. progress: 100,
  165. result,
  166. completedAt: new Date().toISOString(),
  167. });
  168. // 通知前端
  169. this.notifyUser(userId, TASK_WS_EVENTS.TASK_COMPLETED, {
  170. task: this.getTaskFromCache(userId, taskData.id)
  171. });
  172. return result;
  173. } catch (error) {
  174. const errorMessage = error instanceof Error ? error.message : '任务执行失败';
  175. // 更新缓存
  176. this.updateTaskInCache(userId, taskData.id, {
  177. status: 'failed',
  178. error: errorMessage,
  179. completedAt: new Date().toISOString(),
  180. });
  181. // 通知前端
  182. this.notifyUser(userId, TASK_WS_EVENTS.TASK_FAILED, {
  183. task: this.getTaskFromCache(userId, taskData.id)
  184. });
  185. throw error;
  186. }
  187. }
  188. /**
  189. * 注册任务执行器
  190. */
  191. registerExecutor(type: TaskType, executor: TaskExecutor): void {
  192. this.executors.set(type, executor);
  193. logger.info(`Task executor registered: ${type}`);
  194. }
  195. /**
  196. * 创建新任务
  197. */
  198. async createTask(userId: number, request: CreateTaskRequest): Promise<Task & { userId: number }> {
  199. const task: Task & { userId: number;[key: string]: unknown } = {
  200. id: uuidv4(),
  201. type: request.type,
  202. title: request.title || this.getDefaultTitle(request.type),
  203. description: request.description,
  204. status: 'pending',
  205. progress: 0,
  206. priority: request.priority || 'normal',
  207. createdAt: new Date().toISOString(),
  208. accountId: request.accountId,
  209. platform: request.platform,
  210. userId,
  211. ...(request.data || {}),
  212. };
  213. // 添加到内存缓存
  214. if (!this.userTasks.has(userId)) {
  215. this.userTasks.set(userId, []);
  216. }
  217. this.userTasks.get(userId)!.push(task);
  218. // 添加到 Redis 队列
  219. const priority = request.priority === 'high' ? 1 : (request.priority === 'low' ? 3 : 2);
  220. await this.queue.add(task.type, task, {
  221. jobId: task.id,
  222. priority,
  223. });
  224. // 通知前端
  225. this.notifyUser(userId, TASK_WS_EVENTS.TASK_CREATED, { task });
  226. logger.info(`Task created and queued: ${task.id} (${task.type}) for user ${userId}`);
  227. return task;
  228. }
  229. /**
  230. * 获取用户的所有任务
  231. */
  232. getUserTasks(userId: number): Task[] {
  233. return this.userTasks.get(userId) || [];
  234. }
  235. /**
  236. * 获取用户的活跃任务
  237. */
  238. getActiveTasks(userId: number): Task[] {
  239. const tasks = this.userTasks.get(userId) || [];
  240. return tasks.filter(t => t.status === 'pending' || t.status === 'running');
  241. }
  242. /**
  243. * 取消任务
  244. */
  245. async cancelTask(userId: number, taskId: string): Promise<boolean> {
  246. const tasks = this.userTasks.get(userId);
  247. if (!tasks) return false;
  248. const task = tasks.find(t => t.id === taskId);
  249. if (!task) return false;
  250. if (task.status === 'pending') {
  251. // 从队列中移除
  252. const job = await this.queue.getJob(taskId);
  253. if (job) {
  254. await job.remove();
  255. }
  256. // 更新缓存
  257. task.status = 'cancelled';
  258. task.completedAt = new Date().toISOString();
  259. // 通知前端
  260. this.notifyUser(userId, TASK_WS_EVENTS.TASK_CANCELLED, { task });
  261. logger.info(`Task cancelled: ${taskId}`);
  262. return true;
  263. }
  264. return false;
  265. }
  266. /**
  267. * 清理已完成的任务
  268. */
  269. cleanupCompletedTasks(userId: number, keepCount = 10): void {
  270. const tasks = this.userTasks.get(userId);
  271. if (!tasks) return;
  272. const completedTasks = tasks.filter(t =>
  273. t.status === 'completed' || t.status === 'failed' || t.status === 'cancelled'
  274. );
  275. if (completedTasks.length > keepCount) {
  276. completedTasks.sort((a, b) =>
  277. new Date(b.completedAt || 0).getTime() - new Date(a.completedAt || 0).getTime()
  278. );
  279. const toRemove = completedTasks.slice(keepCount);
  280. const toRemoveIds = new Set(toRemove.map(t => t.id));
  281. this.userTasks.set(userId, tasks.filter(t => !toRemoveIds.has(t.id)));
  282. }
  283. }
  284. /**
  285. * 发送任务列表给用户
  286. */
  287. sendTaskList(userId: number): void {
  288. const tasks = this.getUserTasks(userId);
  289. wsManager.sendToUser(userId, TASK_WS_EVENTS.TASK_LIST, {
  290. event: 'list',
  291. tasks,
  292. });
  293. }
  294. /**
  295. * 获取队列统计信息
  296. */
  297. async getQueueStats(): Promise<{
  298. waiting: number;
  299. active: number;
  300. completed: number;
  301. failed: number;
  302. delayed: number;
  303. }> {
  304. const [waiting, active, completed, failed, delayed] = await Promise.all([
  305. this.queue.getWaitingCount(),
  306. this.queue.getActiveCount(),
  307. this.queue.getCompletedCount(),
  308. this.queue.getFailedCount(),
  309. this.queue.getDelayedCount(),
  310. ]);
  311. return { waiting, active, completed, failed, delayed };
  312. }
  313. /**
  314. * 更新内存缓存中的任务
  315. */
  316. private updateTaskInCache(userId: number, taskId: string, updates: Partial<Task>): void {
  317. const tasks = this.userTasks.get(userId);
  318. if (!tasks) return;
  319. const task = tasks.find(t => t.id === taskId);
  320. if (task) {
  321. Object.assign(task, updates);
  322. }
  323. }
  324. /**
  325. * 从缓存获取任务
  326. */
  327. private getTaskFromCache(userId: number, taskId: string): Task | undefined {
  328. const tasks = this.userTasks.get(userId);
  329. return tasks?.find(t => t.id === taskId);
  330. }
  331. /**
  332. * 通知用户
  333. */
  334. private notifyUser(userId: number, event: string, data: Record<string, unknown>): void {
  335. wsManager.sendToUser(userId, event, {
  336. event: event.split(':')[1],
  337. ...data,
  338. });
  339. }
  340. /**
  341. * 获取默认任务标题
  342. */
  343. private getDefaultTitle(type: TaskType): string {
  344. const titles: Record<TaskType, string> = {
  345. sync_comments: '同步评论',
  346. sync_works: '同步作品',
  347. sync_account: '同步账号信息',
  348. publish_video: '发布视频',
  349. batch_reply: '批量回复评论',
  350. delete_work: '删除作品',
  351. xhs_work_stats_backfill: '小红书作品补数',
  352. dy_work_stats_backfill: '抖音作品补数',
  353. bj_work_stats_backfill: '百家号作品补数',
  354. };
  355. return titles[type] || '未知任务';
  356. }
  357. /**
  358. * 关闭连接
  359. */
  360. async close(): Promise<void> {
  361. await this.stopWorker();
  362. await this.queue.close();
  363. await this.queueEvents.close();
  364. await redisConnection.quit();
  365. logger.info('Redis Task Queue Service closed');
  366. }
  367. }
  368. // 导出单例
  369. export const redisTaskQueueService = new RedisTaskQueueService();