| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317 |
- import { v4 as uuidv4 } from 'uuid';
- import {
- Task,
- TaskType,
- TaskStatus,
- TaskPriority,
- TaskResult,
- TaskProgressUpdate,
- CreateTaskRequest,
- TASK_WS_EVENTS,
- } from '@media-manager/shared';
- import { wsManager } from '../websocket/index.js';
- import { logger } from '../utils/logger.js';
- import { config } from '../config/index.js';
- // 任务执行器类型
- export type TaskExecutor = (
- task: Task,
- updateProgress: (update: Partial<TaskProgressUpdate>) => void
- ) => Promise<TaskResult>;
- // 检查是否启用 Redis
- const USE_REDIS = process.env.USE_REDIS_QUEUE === 'true';
- /**
- * 全局异步任务队列服务
- * 管理所有后台任务的创建、执行、进度追踪
- */
- class TaskQueueService {
- // 用户任务列表 Map<userId, Task[]>
- private userTasks: Map<number, Task[]> = new Map();
-
- // 任务执行器 Map<TaskType, TaskExecutor>
- private executors: Map<TaskType, TaskExecutor> = new Map();
-
- // 正在执行的任务数量限制(每用户)
- private maxConcurrentTasks = 3;
- /**
- * 注册任务执行器
- */
- registerExecutor(type: TaskType, executor: TaskExecutor): void {
- this.executors.set(type, executor);
- logger.info(`Task executor registered: ${type}`);
- }
- /**
- * 创建新任务
- */
- createTask(userId: number, request: CreateTaskRequest): Task & { userId: number } {
- const task: Task & { userId: number; [key: string]: unknown } = {
- id: uuidv4(),
- type: request.type,
- title: request.title || this.getDefaultTitle(request.type),
- description: request.description,
- status: 'pending',
- progress: 0,
- priority: request.priority || 'normal',
- createdAt: new Date().toISOString(),
- accountId: request.accountId,
- userId, // 存储 userId 用于任务执行
- // 合并额外数据
- ...(request.data || {}),
- };
- // 添加到用户任务列表
- if (!this.userTasks.has(userId)) {
- this.userTasks.set(userId, []);
- }
- this.userTasks.get(userId)!.push(task);
- // 通知前端任务已创建
- this.notifyUser(userId, TASK_WS_EVENTS.TASK_CREATED, { task });
- logger.info(`Task created: ${task.id} (${task.type}) for user ${userId}`);
- // 尝试执行任务
- this.tryExecuteNext(userId);
- return task;
- }
- /**
- * 获取用户的所有任务
- */
- getUserTasks(userId: number): Task[] {
- return this.userTasks.get(userId) || [];
- }
- /**
- * 获取用户的活跃任务(pending + running)
- */
- getActiveTasks(userId: number): Task[] {
- const tasks = this.userTasks.get(userId) || [];
- return tasks.filter(t => t.status === 'pending' || t.status === 'running');
- }
- /**
- * 取消任务
- */
- cancelTask(userId: number, taskId: string): boolean {
- const tasks = this.userTasks.get(userId);
- if (!tasks) return false;
- const task = tasks.find(t => t.id === taskId);
- if (!task) return false;
- if (task.status === 'pending') {
- task.status = 'cancelled';
- task.completedAt = new Date().toISOString();
- this.notifyUser(userId, TASK_WS_EVENTS.TASK_CANCELLED, { task });
- logger.info(`Task cancelled: ${taskId}`);
- return true;
- }
- // 正在运行的任务暂不支持取消
- return false;
- }
- /**
- * 清理已完成的任务(保留最近N个)
- */
- cleanupCompletedTasks(userId: number, keepCount = 10): void {
- const tasks = this.userTasks.get(userId);
- if (!tasks) return;
- const completedTasks = tasks.filter(t =>
- t.status === 'completed' || t.status === 'failed' || t.status === 'cancelled'
- );
- if (completedTasks.length > keepCount) {
- // 按完成时间排序,保留最新的
- completedTasks.sort((a, b) =>
- new Date(b.completedAt || 0).getTime() - new Date(a.completedAt || 0).getTime()
- );
-
- const toRemove = completedTasks.slice(keepCount);
- const toRemoveIds = new Set(toRemove.map(t => t.id));
-
- this.userTasks.set(userId, tasks.filter(t => !toRemoveIds.has(t.id)));
- }
- }
- /**
- * 尝试执行下一个任务(支持并行执行多个任务)
- */
- private tryExecuteNext(userId: number): void {
- const tasks = this.userTasks.get(userId);
- if (!tasks) return;
- // 检查当前运行中的任务数量
- const runningCount = tasks.filter(t => t.status === 'running').length;
- const availableSlots = this.maxConcurrentTasks - runningCount;
-
- if (availableSlots <= 0) {
- return;
- }
- // 找到待执行的任务(按优先级排序)
- const pendingTasks = tasks.filter(t => t.status === 'pending');
- if (pendingTasks.length === 0) return;
- // 按优先级排序
- pendingTasks.sort((a, b) => {
- const priorityOrder = { high: 0, normal: 1, low: 2 };
- return priorityOrder[a.priority] - priorityOrder[b.priority];
- });
- // 并行启动多个任务(不使用 await,让它们并行执行)
- const tasksToStart = pendingTasks.slice(0, availableSlots);
- for (const task of tasksToStart) {
- // 使用 void 来明确表示我们不等待这个 Promise
- void this.executeTask(userId, task);
- }
- }
- /**
- * 执行任务
- */
- private async executeTask(userId: number, task: Task): Promise<void> {
- const executor = this.executors.get(task.type);
- if (!executor) {
- logger.error(`No executor registered for task type: ${task.type}`);
- task.status = 'failed';
- task.error = `不支持的任务类型: ${task.type}`;
- task.completedAt = new Date().toISOString();
- this.notifyUser(userId, TASK_WS_EVENTS.TASK_FAILED, { task });
- return;
- }
- // 更新任务状态为运行中
- task.status = 'running';
- task.startedAt = new Date().toISOString();
- task.progress = 0;
- this.notifyUser(userId, TASK_WS_EVENTS.TASK_STARTED, { task });
- logger.info(`Task started: ${task.id} (${task.type})`);
- // 进度更新回调
- const updateProgress = (update: Partial<TaskProgressUpdate>) => {
- if (update.progress !== undefined) task.progress = update.progress;
- if (update.currentStep !== undefined) task.currentStep = update.currentStep;
- if (update.currentStepIndex !== undefined) task.currentStepIndex = update.currentStepIndex;
-
- this.notifyUser(userId, TASK_WS_EVENTS.TASK_PROGRESS, {
- taskId: task.id,
- progress: task.progress,
- currentStep: task.currentStep,
- currentStepIndex: task.currentStepIndex,
- message: update.message,
- });
- };
- try {
- const result = await executor(task, updateProgress);
-
- task.status = 'completed';
- task.progress = 100;
- task.result = result;
- task.completedAt = new Date().toISOString();
-
- this.notifyUser(userId, TASK_WS_EVENTS.TASK_COMPLETED, { task });
- logger.info(`Task completed: ${task.id}, result: ${result.message}`);
- } catch (error) {
- task.status = 'failed';
- task.error = error instanceof Error ? error.message : '任务执行失败';
- task.completedAt = new Date().toISOString();
-
- this.notifyUser(userId, TASK_WS_EVENTS.TASK_FAILED, { task });
- logger.error(`Task failed: ${task.id}`, error);
- }
- // 清理旧任务并尝试执行下一个
- this.cleanupCompletedTasks(userId);
- this.tryExecuteNext(userId);
- }
- /**
- * 通知用户
- */
- private notifyUser(userId: number, event: string, data: Record<string, unknown>): void {
- wsManager.sendToUser(userId, event, {
- event: event.split(':')[1], // 提取事件名
- ...data,
- });
- }
- /**
- * 获取默认任务标题
- */
- private getDefaultTitle(type: TaskType): string {
- const titles: Record<TaskType, string> = {
- sync_comments: '同步评论',
- sync_works: '同步作品',
- sync_account: '同步账号信息',
- publish_video: '发布视频',
- batch_reply: '批量回复评论',
- };
- return titles[type] || '未知任务';
- }
- /**
- * 发送任务列表给用户
- */
- sendTaskList(userId: number): void {
- const tasks = this.getUserTasks(userId);
- wsManager.sendToUser(userId, TASK_WS_EVENTS.TASK_LIST, {
- event: 'list',
- tasks,
- });
- }
- /**
- * 启动 Worker(内存队列模式下为空操作)
- */
- startWorker(): void {
- logger.info('Memory Task Queue started (no worker needed)');
- }
- /**
- * 停止 Worker
- */
- async stopWorker(): Promise<void> {
- logger.info('Memory Task Queue stopped');
- }
- /**
- * 关闭服务
- */
- async close(): Promise<void> {
- logger.info('Memory Task Queue Service closed');
- }
- }
- // 内存队列单例
- const memoryTaskQueueService = new TaskQueueService();
- // 根据配置选择队列实现
- let taskQueueService: TaskQueueService;
- if (USE_REDIS) {
- // 动态导入 Redis 队列
- import('./RedisTaskQueue.js').then(({ redisTaskQueueService }) => {
- (taskQueueService as unknown) = redisTaskQueueService;
- logger.info('Using Redis Task Queue');
- }).catch((err) => {
- logger.warn('Failed to load Redis Task Queue, falling back to memory queue:', err.message);
- taskQueueService = memoryTaskQueueService;
- });
- // 初始设置为内存队列(在 Redis 加载完成前使用)
- taskQueueService = memoryTaskQueueService;
- } else {
- taskQueueService = memoryTaskQueueService;
- logger.info('Using Memory Task Queue');
- }
- export { taskQueueService };
|