import { v4 as uuidv4 } from 'uuid'; import { Task, TaskType, TaskStatus, TaskPriority, TaskResult, TaskProgressUpdate, CreateTaskRequest, TASK_WS_EVENTS, } from '@media-manager/shared'; import { wsManager } from '../websocket/index.js'; import { logger } from '../utils/logger.js'; import { config } from '../config/index.js'; // 任务执行器类型 export type TaskExecutor = ( task: Task, updateProgress: (update: Partial) => void ) => Promise; // 检查是否启用 Redis const USE_REDIS = process.env.USE_REDIS_QUEUE === 'true'; /** * 全局异步任务队列服务 * 管理所有后台任务的创建、执行、进度追踪 */ class TaskQueueService { // 用户任务列表 Map private userTasks: Map = new Map(); // 任务执行器 Map private executors: Map = new Map(); // 正在执行的任务数量限制(每用户) private maxConcurrentTasks = 3; /** * 注册任务执行器 */ registerExecutor(type: TaskType, executor: TaskExecutor): void { this.executors.set(type, executor); logger.info(`Task executor registered: ${type}`); } /** * 创建新任务 */ createTask(userId: number, request: CreateTaskRequest): Task & { userId: number } { const task: Task & { userId: number; [key: string]: unknown } = { id: uuidv4(), type: request.type, title: request.title || this.getDefaultTitle(request.type), description: request.description, status: 'pending', progress: 0, priority: request.priority || 'normal', createdAt: new Date().toISOString(), accountId: request.accountId, userId, // 存储 userId 用于任务执行 // 合并额外数据 ...(request.data || {}), }; // 添加到用户任务列表 if (!this.userTasks.has(userId)) { this.userTasks.set(userId, []); } this.userTasks.get(userId)!.push(task); // 通知前端任务已创建 this.notifyUser(userId, TASK_WS_EVENTS.TASK_CREATED, { task }); logger.info(`Task created: ${task.id} (${task.type}) for user ${userId}`); // 尝试执行任务 this.tryExecuteNext(userId); return task; } /** * 获取用户的所有任务 */ getUserTasks(userId: number): Task[] { return this.userTasks.get(userId) || []; } /** * 获取用户的活跃任务(pending + running) */ getActiveTasks(userId: number): Task[] { const tasks = this.userTasks.get(userId) || []; return tasks.filter(t => t.status === 'pending' || t.status === 'running'); } /** * 取消任务 */ cancelTask(userId: number, taskId: string): boolean { const tasks = this.userTasks.get(userId); if (!tasks) return false; const task = tasks.find(t => t.id === taskId); if (!task) return false; if (task.status === 'pending') { task.status = 'cancelled'; task.completedAt = new Date().toISOString(); this.notifyUser(userId, TASK_WS_EVENTS.TASK_CANCELLED, { task }); logger.info(`Task cancelled: ${taskId}`); return true; } // 正在运行的任务暂不支持取消 return false; } /** * 清理已完成的任务(保留最近N个) */ cleanupCompletedTasks(userId: number, keepCount = 10): void { const tasks = this.userTasks.get(userId); if (!tasks) return; const completedTasks = tasks.filter(t => t.status === 'completed' || t.status === 'failed' || t.status === 'cancelled' ); if (completedTasks.length > keepCount) { // 按完成时间排序,保留最新的 completedTasks.sort((a, b) => new Date(b.completedAt || 0).getTime() - new Date(a.completedAt || 0).getTime() ); const toRemove = completedTasks.slice(keepCount); const toRemoveIds = new Set(toRemove.map(t => t.id)); this.userTasks.set(userId, tasks.filter(t => !toRemoveIds.has(t.id))); } } /** * 尝试执行下一个任务(支持并行执行多个任务) */ private tryExecuteNext(userId: number): void { const tasks = this.userTasks.get(userId); if (!tasks) return; // 检查当前运行中的任务数量 const runningCount = tasks.filter(t => t.status === 'running').length; const availableSlots = this.maxConcurrentTasks - runningCount; if (availableSlots <= 0) { return; } // 找到待执行的任务(按优先级排序) const pendingTasks = tasks.filter(t => t.status === 'pending'); if (pendingTasks.length === 0) return; // 按优先级排序 pendingTasks.sort((a, b) => { const priorityOrder = { high: 0, normal: 1, low: 2 }; return priorityOrder[a.priority] - priorityOrder[b.priority]; }); // 并行启动多个任务(不使用 await,让它们并行执行) const tasksToStart = pendingTasks.slice(0, availableSlots); for (const task of tasksToStart) { // 使用 void 来明确表示我们不等待这个 Promise void this.executeTask(userId, task); } } /** * 执行任务 */ private async executeTask(userId: number, task: Task): Promise { const executor = this.executors.get(task.type); if (!executor) { logger.error(`No executor registered for task type: ${task.type}`); task.status = 'failed'; task.error = `不支持的任务类型: ${task.type}`; task.completedAt = new Date().toISOString(); this.notifyUser(userId, TASK_WS_EVENTS.TASK_FAILED, { task }); return; } // 更新任务状态为运行中 task.status = 'running'; task.startedAt = new Date().toISOString(); task.progress = 0; this.notifyUser(userId, TASK_WS_EVENTS.TASK_STARTED, { task }); logger.info(`Task started: ${task.id} (${task.type})`); // 进度更新回调 const updateProgress = (update: Partial) => { if (update.progress !== undefined) task.progress = update.progress; if (update.currentStep !== undefined) task.currentStep = update.currentStep; if (update.currentStepIndex !== undefined) task.currentStepIndex = update.currentStepIndex; this.notifyUser(userId, TASK_WS_EVENTS.TASK_PROGRESS, { taskId: task.id, progress: task.progress, currentStep: task.currentStep, currentStepIndex: task.currentStepIndex, message: update.message, }); }; try { const result = await executor(task, updateProgress); task.status = 'completed'; task.progress = 100; task.result = result; task.completedAt = new Date().toISOString(); this.notifyUser(userId, TASK_WS_EVENTS.TASK_COMPLETED, { task }); logger.info(`Task completed: ${task.id}, result: ${result.message}`); } catch (error) { task.status = 'failed'; task.error = error instanceof Error ? error.message : '任务执行失败'; task.completedAt = new Date().toISOString(); this.notifyUser(userId, TASK_WS_EVENTS.TASK_FAILED, { task }); logger.error(`Task failed: ${task.id}`, error); } // 清理旧任务并尝试执行下一个 this.cleanupCompletedTasks(userId); this.tryExecuteNext(userId); } /** * 通知用户 */ private notifyUser(userId: number, event: string, data: Record): void { wsManager.sendToUser(userId, event, { event: event.split(':')[1], // 提取事件名 ...data, }); } /** * 获取默认任务标题 */ private getDefaultTitle(type: TaskType): string { const titles: Record = { sync_comments: '同步评论', sync_works: '同步作品', sync_account: '同步账号信息', publish_video: '发布视频', batch_reply: '批量回复评论', }; return titles[type] || '未知任务'; } /** * 发送任务列表给用户 */ sendTaskList(userId: number): void { const tasks = this.getUserTasks(userId); wsManager.sendToUser(userId, TASK_WS_EVENTS.TASK_LIST, { event: 'list', tasks, }); } /** * 启动 Worker(内存队列模式下为空操作) */ startWorker(): void { logger.info('Memory Task Queue started (no worker needed)'); } /** * 停止 Worker */ async stopWorker(): Promise { logger.info('Memory Task Queue stopped'); } /** * 关闭服务 */ async close(): Promise { logger.info('Memory Task Queue Service closed'); } } // 内存队列单例 const memoryTaskQueueService = new TaskQueueService(); // 根据配置选择队列实现 let taskQueueService: TaskQueueService; if (USE_REDIS) { // 动态导入 Redis 队列 import('./RedisTaskQueue.js').then(({ redisTaskQueueService }) => { (taskQueueService as unknown) = redisTaskQueueService; logger.info('Using Redis Task Queue'); }).catch((err) => { logger.warn('Failed to load Redis Task Queue, falling back to memory queue:', err.message); taskQueueService = memoryTaskQueueService; }); // 初始设置为内存队列(在 Redis 加载完成前使用) taskQueueService = memoryTaskQueueService; } else { taskQueueService = memoryTaskQueueService; logger.info('Using Memory Task Queue'); } export { taskQueueService };