||
- import { Queue, Worker, Job, QueueEvents } from 'bullmq';
- import IORedis from 'ioredis';
- import { v4 as uuidv4 } from 'uuid';
- import {
- Task,
- TaskType,
- TaskResult,
- TaskProgressUpdate,
- CreateTaskRequest,
- TASK_WS_EVENTS,
- } from '@media-manager/shared';
- import { wsManager } from '../websocket/index.js';
- import { logger } from '../utils/logger.js';
- import { config } from '../config/index.js';
- // Redis 连接配置
- const redisConnection = new IORedis({
- host: config.redis.host,
- port: config.redis.port,
- password: config.redis.password || undefined,
- db: config.redis.db,
- maxRetriesPerRequest: null, // BullMQ 需要这个设置
- });
- // 任务执行器类型
- type TaskExecutor = (
- task: Task,
- updateProgress: (update: Partial<TaskProgressUpdate>) => void
- ) => Promise<TaskResult>;
- // 队列名称
- const QUEUE_NAME = 'media-manager-tasks';
- /**
- * 基于 Redis (BullMQ) 的任务队列服务
- * 支持分布式、持久化、并行处理
- */
- class RedisTaskQueueService {
- private queue: Queue;
- private queueEvents: QueueEvents;
- private worker: Worker | null = null;
- // 任务执行器 Map<TaskType, TaskExecutor>
- private executors: Map<TaskType, TaskExecutor> = new Map();
- // 内存中缓存用户任务(用于快速查询)
- private userTasks: Map<number, Task[]> = new Map();
- // 最大并行任务数
- private concurrency = 5;
- constructor() {
- // 创建队列
- this.queue = new Queue(QUEUE_NAME, {
- // @ts-ignore
- connection: redisConnection,
- defaultJobOptions: {
- removeOnComplete: { count: 100 }, // 保留最近100个完成的任务
- removeOnFail: { count: 50 }, // 保留最近50个失败的任务
- attempts: 3, // 失败重试次数
- backoff: {
- type: 'exponential',
- delay: 1000,
- },
- },
- });
- // 创建队列事件监听
- this.queueEvents = new QueueEvents(QUEUE_NAME, {
- // @ts-ignore
- connection: redisConnection.duplicate(),
- });
- this.setupEventListeners();
- logger.info('Redis Task Queue Service initialized');
- }
- /**
- * 设置事件监听
- */
- private setupEventListeners(): void {
- this.queueEvents.on('completed', async ({ jobId }) => {
- logger.info(`Job ${jobId} completed`);
- });
- this.queueEvents.on('failed', async ({ jobId, failedReason }) => {
- logger.error(`Job ${jobId} failed: ${failedReason}`);
- });
- this.queueEvents.on('progress', async ({ jobId, data }) => {
- logger.debug(`Job ${jobId} progress:`, data);
- });
- }
- /**
- * 启动 Worker(处理任务)
- */
- startWorker(): void {
- if (this.worker) {
- logger.warn('Worker already running');
- return;
- }
- this.worker = new Worker(
- QUEUE_NAME,
- async (job: Job) => {
- return this.processJob(job);
- },
- {
- // @ts-ignore
- connection: redisConnection.duplicate(),
- concurrency: this.concurrency, // 并行处理任务数
- }
- );
- this.worker.on('completed', (job, result) => {
- logger.info(`Worker completed job ${job.id}: ${result?.message || 'success'}`);
- });
- this.worker.on('failed', (job, error) => {
- logger.error(`Worker failed job ${job?.id}:`, error);
- });
- this.worker.on('error', (error) => {
- logger.error('Worker error:', error);
- });
- logger.info(`Redis Task Queue Worker started with concurrency: ${this.concurrency}`);
- }
- /**
- * 停止 Worker
- */
- async stopWorker(): Promise<void> {
- if (this.worker) {
- await this.worker.close();
- this.worker = null;
- logger.info('Redis Task Queue Worker stopped');
- }
- }
- /**
- * 处理任务
- */
- private async processJob(job: Job): Promise<TaskResult> {
- const taskData = job.data as Task & { userId: number };
- const { userId } = taskData;
- const executor = this.executors.get(taskData.type);
- if (!executor) {
- throw new Error(`No executor registered for task type: ${taskData.type}`);
- }
- // 更新内存缓存中的任务状态
- this.updateTaskInCache(userId, taskData.id, {
- status: 'running',
- startedAt: new Date().toISOString(),
- });
- // 通知前端任务开始
- this.notifyUser(userId, TASK_WS_EVENTS.TASK_STARTED, {
- task: this.getTaskFromCache(userId, taskData.id)
- });
- // 进度更新回调
- const updateProgress = async (update: Partial<TaskProgressUpdate>) => {
- // 更新 Job 进度
- await job.updateProgress(update);
- // 更新内存缓存
- this.updateTaskInCache(userId, taskData.id, {
- progress: update.progress,
- currentStep: update.currentStep,
- currentStepIndex: update.currentStepIndex,
- });
- // 通知前端
- this.notifyUser(userId, TASK_WS_EVENTS.TASK_PROGRESS, {
- taskId: taskData.id,
- progress: update.progress,
- currentStep: update.currentStep,
- currentStepIndex: update.currentStepIndex,
- message: update.message,
- });
- };
- try {
- const result = await executor(taskData, updateProgress);
- // 更新缓存
- this.updateTaskInCache(userId, taskData.id, {
- status: 'completed',
- progress: 100,
- result,
- completedAt: new Date().toISOString(),
- });
- // 通知前端
- this.notifyUser(userId, TASK_WS_EVENTS.TASK_COMPLETED, {
- task: this.getTaskFromCache(userId, taskData.id)
- });
- return result;
- } catch (error) {
- const errorMessage = error instanceof Error ? error.message : '任务执行失败';
- // 更新缓存
- this.updateTaskInCache(userId, taskData.id, {
- status: 'failed',
- error: errorMessage,
- completedAt: new Date().toISOString(),
- });
- // 通知前端
- this.notifyUser(userId, TASK_WS_EVENTS.TASK_FAILED, {
- task: this.getTaskFromCache(userId, taskData.id)
- });
- throw error;
- }
- }
- /**
- * 注册任务执行器
- */
- registerExecutor(type: TaskType, executor: TaskExecutor): void {
- this.executors.set(type, executor);
- logger.info(`Task executor registered: ${type}`);
- }
- /**
- * 创建新任务
- */
- async createTask(userId: number, request: CreateTaskRequest): Promise<Task & { userId: number }> {
- const task: Task & { userId: number;[key: string]: unknown } = {
- id: uuidv4(),
- type: request.type,
- title: request.title || this.getDefaultTitle(request.type),
- description: request.description,
- status: 'pending',
- progress: 0,
- priority: request.priority || 'normal',
- createdAt: new Date().toISOString(),
- accountId: request.accountId,
- platform: request.platform,
- userId,
- ...(request.data || {}),
- };
- // 添加到内存缓存
- if (!this.userTasks.has(userId)) {
- this.userTasks.set(userId, []);
- }
- this.userTasks.get(userId)!.push(task);
- // 添加到 Redis 队列
- const priority = request.priority === 'high' ? 1 : (request.priority === 'low' ? 3 : 2);
- await this.queue.add(task.type, task, {
- jobId: task.id,
- priority,
- });
- // 通知前端
- this.notifyUser(userId, TASK_WS_EVENTS.TASK_CREATED, { task });
- logger.info(`Task created and queued: ${task.id} (${task.type}) for user ${userId}`);
- return task;
- }
- /**
- * 获取用户的所有任务
- */
- getUserTasks(userId: number): Task[] {
- return this.userTasks.get(userId) || [];
- }
- /**
- * 获取用户的活跃任务
- */
- getActiveTasks(userId: number): Task[] {
- const tasks = this.userTasks.get(userId) || [];
- return tasks.filter(t => t.status === 'pending' || t.status === 'running');
- }
- /**
- * 取消任务
- */
- async cancelTask(userId: number, taskId: string): Promise<boolean> {
- const tasks = this.userTasks.get(userId);
- if (!tasks) return false;
- const task = tasks.find(t => t.id === taskId);
- if (!task) return false;
- if (task.status === 'pending') {
- // 从队列中移除
- const job = await this.queue.getJob(taskId);
- if (job) {
- await job.remove();
- }
- // 更新缓存
- task.status = 'cancelled';
- task.completedAt = new Date().toISOString();
- // 通知前端
- this.notifyUser(userId, TASK_WS_EVENTS.TASK_CANCELLED, { task });
- logger.info(`Task cancelled: ${taskId}`);
- return true;
- }
- return false;
- }
- /**
- * 清理已完成的任务
- */
- cleanupCompletedTasks(userId: number, keepCount = 10): void {
- const tasks = this.userTasks.get(userId);
- if (!tasks) return;
- const completedTasks = tasks.filter(t =>
- t.status === 'completed' || t.status === 'failed' || t.status === 'cancelled'
- );
- if (completedTasks.length > keepCount) {
- completedTasks.sort((a, b) =>
- new Date(b.completedAt || 0).getTime() - new Date(a.completedAt || 0).getTime()
- );
- const toRemove = completedTasks.slice(keepCount);
- const toRemoveIds = new Set(toRemove.map(t => t.id));
- this.userTasks.set(userId, tasks.filter(t => !toRemoveIds.has(t.id)));
- }
- }
- /**
- * 发送任务列表给用户
- */
- sendTaskList(userId: number): void {
- const tasks = this.getUserTasks(userId);
- wsManager.sendToUser(userId, TASK_WS_EVENTS.TASK_LIST, {
- event: 'list',
- tasks,
- });
- }
- /**
- * 获取队列统计信息
- */
- async getQueueStats(): Promise<{
- waiting: number;
- active: number;
- completed: number;
- failed: number;
- delayed: number;
- }> {
- const [waiting, active, completed, failed, delayed] = await Promise.all([
- this.queue.getWaitingCount(),
- this.queue.getActiveCount(),
- this.queue.getCompletedCount(),
- this.queue.getFailedCount(),
- this.queue.getDelayedCount(),
- ]);
- return { waiting, active, completed, failed, delayed };
- }
- /**
- * 更新内存缓存中的任务
- */
- private updateTaskInCache(userId: number, taskId: string, updates: Partial<Task>): void {
- const tasks = this.userTasks.get(userId);
- if (!tasks) return;
- const task = tasks.find(t => t.id === taskId);
- if (task) {
- Object.assign(task, updates);
- }
- }
- /**
- * 从缓存获取任务
- */
- private getTaskFromCache(userId: number, taskId: string): Task | undefined {
- const tasks = this.userTasks.get(userId);
- return tasks?.find(t => t.id === taskId);
- }
- /**
- * 通知用户
- */
- private notifyUser(userId: number, event: string, data: Record<string, unknown>): void {
- wsManager.sendToUser(userId, event, {
- event: event.split(':')[1],
- ...data,
- });
- }
- /**
- * 获取默认任务标题
- */
- private getDefaultTitle(type: TaskType): string {
- const titles: Record<TaskType, string> = {
- sync_comments: '同步评论',
- sync_works: '同步作品',
- sync_account: '同步账号信息',
- publish_video: '发布视频',
- batch_reply: '批量回复评论',
- delete_work: '删除作品',
- xhs_work_stats_backfill: '小红书作品补数',
- dy_work_stats_backfill: '抖音作品补数',
- bj_work_stats_backfill: '百家号作品补数',
- };
- return titles[type] || '未知任务';
- }
- /**
- * 关闭连接
- */
- async close(): Promise<void> {
- await this.stopWorker();
- await this.queue.close();
- await this.queueEvents.close();
- await redisConnection.quit();
- logger.info('Redis Task Queue Service closed');
- }
- }
- // 导出单例
- export const redisTaskQueueService = new RedisTaskQueueService();
|