import { Queue, Worker, Job, QueueEvents } from 'bullmq'; import IORedis from 'ioredis'; import { v4 as uuidv4 } from 'uuid'; import { Task, TaskType, TaskResult, TaskProgressUpdate, CreateTaskRequest, TASK_WS_EVENTS, } from '@media-manager/shared'; import { wsManager } from '../websocket/index.js'; import { logger } from '../utils/logger.js'; import { config } from '../config/index.js'; // Redis 连接配置 const redisConnection = new IORedis({ host: config.redis.host, port: config.redis.port, password: config.redis.password || undefined, db: config.redis.db, maxRetriesPerRequest: null, // BullMQ 需要这个设置 }); // 任务执行器类型 type TaskExecutor = ( task: Task, updateProgress: (update: Partial) => void ) => Promise; // 队列名称 const QUEUE_NAME = 'media-manager-tasks'; /** * 基于 Redis (BullMQ) 的任务队列服务 * 支持分布式、持久化、并行处理 */ class RedisTaskQueueService { private queue: Queue; private queueEvents: QueueEvents; private worker: Worker | null = null; // 任务执行器 Map private executors: Map = new Map(); // 内存中缓存用户任务(用于快速查询) private userTasks: Map = new Map(); // 最大并行任务数 private concurrency = 5; constructor() { // 创建队列 this.queue = new Queue(QUEUE_NAME, { // @ts-ignore connection: redisConnection, defaultJobOptions: { removeOnComplete: { count: 100 }, // 保留最近100个完成的任务 removeOnFail: { count: 50 }, // 保留最近50个失败的任务 attempts: 3, // 失败重试次数 backoff: { type: 'exponential', delay: 1000, }, }, }); // 创建队列事件监听 this.queueEvents = new QueueEvents(QUEUE_NAME, { // @ts-ignore connection: redisConnection.duplicate(), }); this.setupEventListeners(); logger.info('Redis Task Queue Service initialized'); } /** * 设置事件监听 */ private setupEventListeners(): void { this.queueEvents.on('completed', async ({ jobId }) => { logger.info(`Job ${jobId} completed`); }); this.queueEvents.on('failed', async ({ jobId, failedReason }) => { logger.error(`Job ${jobId} failed: ${failedReason}`); }); this.queueEvents.on('progress', async ({ jobId, data }) => { logger.debug(`Job ${jobId} progress:`, data); }); } /** * 启动 Worker(处理任务) */ startWorker(): void { if (this.worker) { logger.warn('Worker already running'); return; } this.worker = new Worker( QUEUE_NAME, async (job: Job) => { return this.processJob(job); }, { // @ts-ignore connection: redisConnection.duplicate(), concurrency: this.concurrency, // 并行处理任务数 } ); this.worker.on('completed', (job, result) => { logger.info(`Worker completed job ${job.id}: ${result?.message || 'success'}`); }); this.worker.on('failed', (job, error) => { logger.error(`Worker failed job ${job?.id}:`, error); }); this.worker.on('error', (error) => { logger.error('Worker error:', error); }); logger.info(`Redis Task Queue Worker started with concurrency: ${this.concurrency}`); } /** * 停止 Worker */ async stopWorker(): Promise { if (this.worker) { await this.worker.close(); this.worker = null; logger.info('Redis Task Queue Worker stopped'); } } /** * 处理任务 */ private async processJob(job: Job): Promise { const taskData = job.data as Task & { userId: number }; const { userId } = taskData; const executor = this.executors.get(taskData.type); if (!executor) { throw new Error(`No executor registered for task type: ${taskData.type}`); } // 更新内存缓存中的任务状态 this.updateTaskInCache(userId, taskData.id, { status: 'running', startedAt: new Date().toISOString(), }); // 通知前端任务开始 this.notifyUser(userId, TASK_WS_EVENTS.TASK_STARTED, { task: this.getTaskFromCache(userId, taskData.id) }); // 进度更新回调 const updateProgress = async (update: Partial) => { // 更新 Job 进度 await job.updateProgress(update); // 更新内存缓存 this.updateTaskInCache(userId, taskData.id, { progress: update.progress, currentStep: update.currentStep, currentStepIndex: update.currentStepIndex, }); // 通知前端 this.notifyUser(userId, TASK_WS_EVENTS.TASK_PROGRESS, { taskId: taskData.id, progress: update.progress, currentStep: update.currentStep, currentStepIndex: update.currentStepIndex, message: update.message, }); }; try { const result = await executor(taskData, updateProgress); // 更新缓存 this.updateTaskInCache(userId, taskData.id, { status: 'completed', progress: 100, result, completedAt: new Date().toISOString(), }); // 通知前端 this.notifyUser(userId, TASK_WS_EVENTS.TASK_COMPLETED, { task: this.getTaskFromCache(userId, taskData.id) }); return result; } catch (error) { const errorMessage = error instanceof Error ? error.message : '任务执行失败'; // 更新缓存 this.updateTaskInCache(userId, taskData.id, { status: 'failed', error: errorMessage, completedAt: new Date().toISOString(), }); // 通知前端 this.notifyUser(userId, TASK_WS_EVENTS.TASK_FAILED, { task: this.getTaskFromCache(userId, taskData.id) }); throw error; } } /** * 注册任务执行器 */ registerExecutor(type: TaskType, executor: TaskExecutor): void { this.executors.set(type, executor); logger.info(`Task executor registered: ${type}`); } /** * 创建新任务 */ async createTask(userId: number, request: CreateTaskRequest): Promise { const task: Task & { userId: number;[key: string]: unknown } = { id: uuidv4(), type: request.type, title: request.title || this.getDefaultTitle(request.type), description: request.description, status: 'pending', progress: 0, priority: request.priority || 'normal', createdAt: new Date().toISOString(), accountId: request.accountId, platform: request.platform, userId, ...(request.data || {}), }; // 添加到内存缓存 if (!this.userTasks.has(userId)) { this.userTasks.set(userId, []); } this.userTasks.get(userId)!.push(task); // 添加到 Redis 队列 const priority = request.priority === 'high' ? 1 : (request.priority === 'low' ? 3 : 2); await this.queue.add(task.type, task, { jobId: task.id, priority, }); // 通知前端 this.notifyUser(userId, TASK_WS_EVENTS.TASK_CREATED, { task }); logger.info(`Task created and queued: ${task.id} (${task.type}) for user ${userId}`); return task; } /** * 获取用户的所有任务 */ getUserTasks(userId: number): Task[] { return this.userTasks.get(userId) || []; } /** * 获取用户的活跃任务 */ getActiveTasks(userId: number): Task[] { const tasks = this.userTasks.get(userId) || []; return tasks.filter(t => t.status === 'pending' || t.status === 'running'); } /** * 取消任务 */ async cancelTask(userId: number, taskId: string): Promise { const tasks = this.userTasks.get(userId); if (!tasks) return false; const task = tasks.find(t => t.id === taskId); if (!task) return false; if (task.status === 'pending') { // 从队列中移除 const job = await this.queue.getJob(taskId); if (job) { await job.remove(); } // 更新缓存 task.status = 'cancelled'; task.completedAt = new Date().toISOString(); // 通知前端 this.notifyUser(userId, TASK_WS_EVENTS.TASK_CANCELLED, { task }); logger.info(`Task cancelled: ${taskId}`); return true; } return false; } /** * 清理已完成的任务 */ cleanupCompletedTasks(userId: number, keepCount = 10): void { const tasks = this.userTasks.get(userId); if (!tasks) return; const completedTasks = tasks.filter(t => t.status === 'completed' || t.status === 'failed' || t.status === 'cancelled' ); if (completedTasks.length > keepCount) { completedTasks.sort((a, b) => new Date(b.completedAt || 0).getTime() - new Date(a.completedAt || 0).getTime() ); const toRemove = completedTasks.slice(keepCount); const toRemoveIds = new Set(toRemove.map(t => t.id)); this.userTasks.set(userId, tasks.filter(t => !toRemoveIds.has(t.id))); } } /** * 发送任务列表给用户 */ sendTaskList(userId: number): void { const tasks = this.getUserTasks(userId); wsManager.sendToUser(userId, TASK_WS_EVENTS.TASK_LIST, { event: 'list', tasks, }); } /** * 获取队列统计信息 */ async getQueueStats(): Promise<{ waiting: number; active: number; completed: number; failed: number; delayed: number; }> { const [waiting, active, completed, failed, delayed] = await Promise.all([ this.queue.getWaitingCount(), this.queue.getActiveCount(), this.queue.getCompletedCount(), this.queue.getFailedCount(), this.queue.getDelayedCount(), ]); return { waiting, active, completed, failed, delayed }; } /** * 更新内存缓存中的任务 */ private updateTaskInCache(userId: number, taskId: string, updates: Partial): void { const tasks = this.userTasks.get(userId); if (!tasks) return; const task = tasks.find(t => t.id === taskId); if (task) { Object.assign(task, updates); } } /** * 从缓存获取任务 */ private getTaskFromCache(userId: number, taskId: string): Task | undefined { const tasks = this.userTasks.get(userId); return tasks?.find(t => t.id === taskId); } /** * 通知用户 */ private notifyUser(userId: number, event: string, data: Record): void { wsManager.sendToUser(userId, event, { event: event.split(':')[1], ...data, }); } /** * 获取默认任务标题 */ private getDefaultTitle(type: TaskType): string { const titles: Record = { sync_comments: '同步评论', sync_works: '同步作品', sync_account: '同步账号信息', publish_video: '发布视频', batch_reply: '批量回复评论', delete_work: '删除作品', xhs_work_stats_backfill: '小红书作品补数', dy_work_stats_backfill: '抖音作品补数', bj_work_stats_backfill: '百家号作品补数', }; return titles[type] || '未知任务'; } /** * 关闭连接 */ async close(): Promise { await this.stopWorker(); await this.queue.close(); await this.queueEvents.close(); await redisConnection.quit(); logger.info('Redis Task Queue Service closed'); } } // 导出单例 export const redisTaskQueueService = new RedisTaskQueueService();