|
|
@@ -1,9 +1,9 @@
|
|
|
import { Queue, Worker, Job, QueueEvents } from 'bullmq';
|
|
|
import IORedis from 'ioredis';
|
|
|
import { v4 as uuidv4 } from 'uuid';
|
|
|
-import {
|
|
|
- Task,
|
|
|
- TaskType,
|
|
|
+import {
|
|
|
+ Task,
|
|
|
+ TaskType,
|
|
|
TaskResult,
|
|
|
TaskProgressUpdate,
|
|
|
CreateTaskRequest,
|
|
|
@@ -24,7 +24,7 @@ const redisConnection = new IORedis({
|
|
|
|
|
|
// 任务执行器类型
|
|
|
type TaskExecutor = (
|
|
|
- task: Task,
|
|
|
+ task: Task,
|
|
|
updateProgress: (update: Partial<TaskProgressUpdate>) => void
|
|
|
) => Promise<TaskResult>;
|
|
|
|
|
|
@@ -39,19 +39,20 @@ class RedisTaskQueueService {
|
|
|
private queue: Queue;
|
|
|
private queueEvents: QueueEvents;
|
|
|
private worker: Worker | null = null;
|
|
|
-
|
|
|
+
|
|
|
// 任务执行器 Map<TaskType, TaskExecutor>
|
|
|
private executors: Map<TaskType, TaskExecutor> = new Map();
|
|
|
-
|
|
|
+
|
|
|
// 内存中缓存用户任务(用于快速查询)
|
|
|
private userTasks: Map<number, Task[]> = new Map();
|
|
|
-
|
|
|
+
|
|
|
// 最大并行任务数
|
|
|
private concurrency = 5;
|
|
|
|
|
|
constructor() {
|
|
|
// 创建队列
|
|
|
this.queue = new Queue(QUEUE_NAME, {
|
|
|
+ // @ts-ignore
|
|
|
connection: redisConnection,
|
|
|
defaultJobOptions: {
|
|
|
removeOnComplete: { count: 100 }, // 保留最近100个完成的任务
|
|
|
@@ -66,11 +67,12 @@ class RedisTaskQueueService {
|
|
|
|
|
|
// 创建队列事件监听
|
|
|
this.queueEvents = new QueueEvents(QUEUE_NAME, {
|
|
|
+ // @ts-ignore
|
|
|
connection: redisConnection.duplicate(),
|
|
|
});
|
|
|
|
|
|
this.setupEventListeners();
|
|
|
-
|
|
|
+
|
|
|
logger.info('Redis Task Queue Service initialized');
|
|
|
}
|
|
|
|
|
|
@@ -106,6 +108,7 @@ class RedisTaskQueueService {
|
|
|
return this.processJob(job);
|
|
|
},
|
|
|
{
|
|
|
+ // @ts-ignore
|
|
|
connection: redisConnection.duplicate(),
|
|
|
concurrency: this.concurrency, // 并行处理任务数
|
|
|
}
|
|
|
@@ -143,7 +146,7 @@ class RedisTaskQueueService {
|
|
|
private async processJob(job: Job): Promise<TaskResult> {
|
|
|
const taskData = job.data as Task & { userId: number };
|
|
|
const { userId } = taskData;
|
|
|
-
|
|
|
+
|
|
|
const executor = this.executors.get(taskData.type);
|
|
|
if (!executor) {
|
|
|
throw new Error(`No executor registered for task type: ${taskData.type}`);
|
|
|
@@ -156,15 +159,15 @@ class RedisTaskQueueService {
|
|
|
});
|
|
|
|
|
|
// 通知前端任务开始
|
|
|
- this.notifyUser(userId, TASK_WS_EVENTS.TASK_STARTED, {
|
|
|
- task: this.getTaskFromCache(userId, taskData.id)
|
|
|
+ this.notifyUser(userId, TASK_WS_EVENTS.TASK_STARTED, {
|
|
|
+ task: this.getTaskFromCache(userId, taskData.id)
|
|
|
});
|
|
|
|
|
|
// 进度更新回调
|
|
|
const updateProgress = async (update: Partial<TaskProgressUpdate>) => {
|
|
|
// 更新 Job 进度
|
|
|
await job.updateProgress(update);
|
|
|
-
|
|
|
+
|
|
|
// 更新内存缓存
|
|
|
this.updateTaskInCache(userId, taskData.id, {
|
|
|
progress: update.progress,
|
|
|
@@ -184,7 +187,7 @@ class RedisTaskQueueService {
|
|
|
|
|
|
try {
|
|
|
const result = await executor(taskData, updateProgress);
|
|
|
-
|
|
|
+
|
|
|
// 更新缓存
|
|
|
this.updateTaskInCache(userId, taskData.id, {
|
|
|
status: 'completed',
|
|
|
@@ -194,14 +197,14 @@ class RedisTaskQueueService {
|
|
|
});
|
|
|
|
|
|
// 通知前端
|
|
|
- this.notifyUser(userId, TASK_WS_EVENTS.TASK_COMPLETED, {
|
|
|
- task: this.getTaskFromCache(userId, taskData.id)
|
|
|
+ this.notifyUser(userId, TASK_WS_EVENTS.TASK_COMPLETED, {
|
|
|
+ task: this.getTaskFromCache(userId, taskData.id)
|
|
|
});
|
|
|
|
|
|
return result;
|
|
|
} catch (error) {
|
|
|
const errorMessage = error instanceof Error ? error.message : '任务执行失败';
|
|
|
-
|
|
|
+
|
|
|
// 更新缓存
|
|
|
this.updateTaskInCache(userId, taskData.id, {
|
|
|
status: 'failed',
|
|
|
@@ -210,8 +213,8 @@ class RedisTaskQueueService {
|
|
|
});
|
|
|
|
|
|
// 通知前端
|
|
|
- this.notifyUser(userId, TASK_WS_EVENTS.TASK_FAILED, {
|
|
|
- task: this.getTaskFromCache(userId, taskData.id)
|
|
|
+ this.notifyUser(userId, TASK_WS_EVENTS.TASK_FAILED, {
|
|
|
+ task: this.getTaskFromCache(userId, taskData.id)
|
|
|
});
|
|
|
|
|
|
throw error;
|
|
|
@@ -230,7 +233,7 @@ class RedisTaskQueueService {
|
|
|
* 创建新任务
|
|
|
*/
|
|
|
async createTask(userId: number, request: CreateTaskRequest): Promise<Task & { userId: number }> {
|
|
|
- const task: Task & { userId: number; [key: string]: unknown } = {
|
|
|
+ const task: Task & { userId: number;[key: string]: unknown } = {
|
|
|
id: uuidv4(),
|
|
|
type: request.type,
|
|
|
title: request.title || this.getDefaultTitle(request.type),
|
|
|
@@ -303,7 +306,7 @@ class RedisTaskQueueService {
|
|
|
|
|
|
// 通知前端
|
|
|
this.notifyUser(userId, TASK_WS_EVENTS.TASK_CANCELLED, { task });
|
|
|
-
|
|
|
+
|
|
|
logger.info(`Task cancelled: ${taskId}`);
|
|
|
return true;
|
|
|
}
|
|
|
@@ -318,18 +321,18 @@ class RedisTaskQueueService {
|
|
|
const tasks = this.userTasks.get(userId);
|
|
|
if (!tasks) return;
|
|
|
|
|
|
- const completedTasks = tasks.filter(t =>
|
|
|
+ const completedTasks = tasks.filter(t =>
|
|
|
t.status === 'completed' || t.status === 'failed' || t.status === 'cancelled'
|
|
|
);
|
|
|
|
|
|
if (completedTasks.length > keepCount) {
|
|
|
- completedTasks.sort((a, b) =>
|
|
|
+ completedTasks.sort((a, b) =>
|
|
|
new Date(b.completedAt || 0).getTime() - new Date(a.completedAt || 0).getTime()
|
|
|
);
|
|
|
-
|
|
|
+
|
|
|
const toRemove = completedTasks.slice(keepCount);
|
|
|
const toRemoveIds = new Set(toRemove.map(t => t.id));
|
|
|
-
|
|
|
+
|
|
|
this.userTasks.set(userId, tasks.filter(t => !toRemoveIds.has(t.id)));
|
|
|
}
|
|
|
}
|
|
|
@@ -407,6 +410,7 @@ class RedisTaskQueueService {
|
|
|
sync_account: '同步账号信息',
|
|
|
publish_video: '发布视频',
|
|
|
batch_reply: '批量回复评论',
|
|
|
+ delete_work: '删除作品',
|
|
|
};
|
|
|
return titles[type] || '未知任务';
|
|
|
}
|