浏览代码

fix: task cancel running state support

ethanfly 3 天之前
父节点
当前提交
83b8fe34de
共有 2 个文件被更改,包括 67 次插入22 次删除
  1. 11 5
      client/src/stores/taskQueue.ts
  2. 56 17
      server/src/services/TaskQueueService.ts

+ 11 - 5
client/src/stores/taskQueue.ts

@@ -459,12 +459,18 @@ export const useTaskQueueStore = defineStore('taskQueue', () => {
 
   async function cancelTask(taskId: string): Promise<boolean> {
     try {
-      await request.post(`/api/tasks/${taskId}/cancel`);
-      const task = tasks.value.find(t => t.id === taskId);
-      if (task) {
-        task.status = 'cancelled';
+      const resp = await request.post(`/api/tasks/${taskId}/cancel`);
+      // 后端返回 { success: true } 才算成功
+      if (resp?.success === true) {
+        const task = tasks.value.find(t => t.id === taskId);
+        if (task) {
+          task.status = 'cancelled';
+        }
+        return true;
+      } else {
+        console.warn('[TaskQueue] cancelTask returned success=false, task may be running');
+        return false;
       }
-      return true;
     } catch (e) {
       console.error('Failed to cancel task:', e);
       return false;

+ 56 - 17
server/src/services/TaskQueueService.ts

@@ -10,10 +10,11 @@ import {
 import { wsManager } from '../websocket/index.js';
 import { logger } from '../utils/logger.js';
 
-// 任务执行器类型
+// 任务执行器类型(支持 AbortSignal 以实现取消)
 export type TaskExecutor = (
-  task: Task, 
-  updateProgress: (update: Partial<TaskProgressUpdate>) => void
+  task: Task,
+  updateProgress: (update: Partial<TaskProgressUpdate>) => void,
+  signal?: AbortSignal
 ) => Promise<TaskResult>;
 
 // 检查是否启用 Redis
@@ -26,13 +27,16 @@ const USE_REDIS = process.env.USE_REDIS_QUEUE === 'true';
 class TaskQueueService {
   // 用户任务列表 Map<userId, Task[]>
   private userTasks: Map<number, Task[]> = new Map();
-  
+
   // 任务执行器 Map<TaskType, TaskExecutor>
   private executors: Map<TaskType, TaskExecutor> = new Map();
-  
+
   // 正在执行的任务数量限制(每用户)
   private maxConcurrentTasks = 3;
 
+  // 取消控制器 Map<taskId, AbortController>
+  private taskAbortControllers: Map<string, AbortController> = new Map();
+
   /**
    * 注册任务执行器
    */
@@ -95,7 +99,7 @@ class TaskQueueService {
   }
 
   /**
-   * 取消任务
+   * 取消任务(支持 pending 和 running 状态)
    */
   cancelTask(userId: number, taskId: string): boolean {
     const tasks = this.userTasks.get(userId);
@@ -108,11 +112,25 @@ class TaskQueueService {
       task.status = 'cancelled';
       task.completedAt = new Date().toISOString();
       this.notifyUser(userId, TASK_WS_EVENTS.TASK_CANCELLED, { task });
-      logger.info(`Task cancelled: ${taskId}`);
+      logger.info(`Task cancelled (pending): ${taskId}`);
       return true;
     }
 
-    // 正在运行的任务暂不支持取消
+    if (task.status === 'running') {
+      // 向正在运行的任务发送取消信号
+      const controller = this.taskAbortControllers.get(taskId);
+      if (controller) {
+        controller.abort();
+        task.status = 'cancelled';
+        task.completedAt = new Date().toISOString();
+        task.error = '用户取消';
+        this.notifyUser(userId, TASK_WS_EVENTS.TASK_CANCELLED, { task });
+        logger.info(`Task cancelled (running): ${taskId}`);
+        return true;
+      }
+      return false;
+    }
+
     return false;
   }
 
@@ -195,12 +213,19 @@ class TaskQueueService {
 
     logger.info(`Task started: ${task.id} (${task.type})`);
 
+    // 创建 AbortController 以支持取消
+    const controller = new AbortController();
+    this.taskAbortControllers.set(task.id, controller);
+
+    // 将 controller 附加到 task 对象上,供 executor 内部检查
+    (task as Task & { _abortController?: AbortController })._abortController = controller;
+
     // 进度更新回调
     const updateProgress = (update: Partial<TaskProgressUpdate>) => {
       if (update.progress !== undefined) task.progress = update.progress;
       if (update.currentStep !== undefined) task.currentStep = update.currentStep;
       if (update.currentStepIndex !== undefined) task.currentStepIndex = update.currentStepIndex;
-      
+
       this.notifyUser(userId, TASK_WS_EVENTS.TASK_PROGRESS, {
         taskId: task.id,
         progress: task.progress,
@@ -211,22 +236,36 @@ class TaskQueueService {
     };
 
     try {
-      const result = await executor(task, updateProgress);
-      
+      // 使用 Promise.race 实现取消:当 abort 时立即 reject
+      const result = await Promise.race([
+        executor(task, updateProgress, controller.signal),
+        new Promise<never>((_, reject) => {
+          controller.signal.addEventListener('abort', () => {
+            reject(new DOMException('Task cancelled', 'AbortError'));
+          });
+        }),
+      ]);
+
       task.status = 'completed';
       task.progress = 100;
       task.result = result;
       task.completedAt = new Date().toISOString();
-      
+
       this.notifyUser(userId, TASK_WS_EVENTS.TASK_COMPLETED, { task });
       logger.info(`Task completed: ${task.id}, result: ${result.message}`);
     } catch (error) {
-      task.status = 'failed';
-      task.error = error instanceof Error ? error.message : '任务执行失败';
+      // 判断是否为用户取消
+      const isCancelled = error instanceof DOMException && error.name === 'AbortError';
+      task.status = isCancelled ? 'cancelled' : 'failed';
+      task.error = isCancelled ? '用户取消' : (error instanceof Error ? error.message : '任务执行失败');
       task.completedAt = new Date().toISOString();
-      
-      this.notifyUser(userId, TASK_WS_EVENTS.TASK_FAILED, { task });
-      logger.error(`Task failed: ${task.id}`, error);
+
+      this.notifyUser(userId, isCancelled ? TASK_WS_EVENTS.TASK_CANCELLED : TASK_WS_EVENTS.TASK_FAILED, { task });
+      logger.info(`Task ${isCancelled ? 'cancelled' : 'failed'}: ${task.id}`, error);
+    } finally {
+      // 清理 AbortController
+      this.taskAbortControllers.delete(task.id);
+      delete (task as Task & { _abortController?: AbortController })._abortController;
     }
 
     // 清理旧任务并尝试执行下一个