/** * 任务执行器注册 * 在应用启动时注册所有任务类型的执行器 */ import { Task, TaskResult, TaskProgressUpdate } from '@media-manager/shared'; import { taskQueueService } from './TaskQueueService.js'; import { CommentService } from './CommentService.js'; import { WorkService } from './WorkService.js'; import { AccountService } from './AccountService.js'; import { PublishService } from './PublishService.js'; import { XiaohongshuWorkNoteStatisticsImportService } from './XiaohongshuWorkNoteStatisticsImportService.js'; import { AppDataSource, PlatformAccount } from '../models/index.js'; import { logger } from '../utils/logger.js'; // 创建服务实例 const commentService = new CommentService(); const workService = new WorkService(); const accountService = new AccountService(); const publishService = new PublishService(); const xhsWorkStatsService = new XiaohongshuWorkNoteStatisticsImportService(); type ProgressUpdater = (update: Partial) => void; /** * 同步评论任务执行器 */ async function syncCommentsExecutor(task: Task, updateProgress: ProgressUpdater): Promise { updateProgress({ progress: 5, currentStep: '连接平台...' }); const onProgress = (current: number, total: number, workTitle: string) => { const progress = total > 0 ? Math.min(90, Math.round((current / total) * 90) + 5) : 50; updateProgress({ progress, currentStep: `正在同步: ${workTitle || `作品 ${current}/${total}`}`, currentStepIndex: current, }); }; // 从任务中获取 userId(需要在创建任务时传入) const userId = (task as Task & { userId?: number }).userId; if (!userId) { throw new Error('缺少用户ID'); } const result = await commentService.syncComments(userId, task.accountId, onProgress); updateProgress({ progress: 100, currentStep: '同步完成' }); return { success: true, message: `同步完成,共同步 ${result.synced} 条评论`, data: { syncedCount: result.synced, accountCount: result.accounts, }, }; } /** * 同步作品任务执行器 */ async function syncWorksExecutor(task: Task, updateProgress: ProgressUpdater): Promise { updateProgress({ progress: 5, currentStep: '获取作品列表...' }); const userId = (task as Task & { userId?: number }).userId; if (!userId) { throw new Error('缺少用户ID'); } const result = await workService.syncWorks(userId, task.accountId, task.platform, (progress, currentStep) => { updateProgress({ progress, currentStep }); }); updateProgress({ progress: 100, currentStep: '同步完成' }); const summaryText = (() => { if (result.accountSummaries.length === 1) { const s = result.accountSummaries[0]; return `(${s.platform} list=${s.worksListLength}/${s.worksCount} python=${s.pythonAvailable ? 'ok' : 'off'} source=${s.source || 'unknown'})`; } return ''; })(); return { success: true, message: `同步完成,共同步 ${result.synced} 个作品${summaryText}`, data: { syncedCount: result.synced, accountCount: result.accounts, accountSummaries: result.accountSummaries, }, }; } /** * 同步账号信息任务执行器 */ async function syncAccountExecutor(task: Task, updateProgress: ProgressUpdater): Promise { updateProgress({ progress: 10, currentStep: '获取账号信息...' }); const userId = (task as Task & { userId?: number }).userId; if (!userId) { throw new Error('缺少用户ID'); } if (!task.accountId) { throw new Error('缺少账号ID'); } const refreshResult = await accountService.refreshAccount(userId, task.accountId); if (refreshResult.needReLogin) { updateProgress({ progress: 100, currentStep: '需要重新登录' }); throw new Error('账号登录已过期或触发风控,需要重新登录'); } updateProgress({ progress: 100, currentStep: '同步完成' }); return { success: true, message: '账号信息已更新', }; } /** * 发布视频任务执行器 */ async function publishVideoExecutor(task: Task, updateProgress: ProgressUpdater): Promise { updateProgress({ progress: 5, currentStep: '准备发布...' }); const userId = (task as Task & { userId?: number }).userId; if (!userId) { throw new Error('缺少用户ID'); } // 从任务数据中获取发布任务ID const taskData = task as Task & { publishTaskId?: number }; if (!taskData.publishTaskId) { throw new Error('缺少发布任务ID'); } // 执行发布任务 await publishService.executePublishTaskWithProgress( taskData.publishTaskId, userId, (progress, message) => { updateProgress({ progress, currentStep: message }); } ); updateProgress({ progress: 100, currentStep: '发布完成' }); return { success: true, message: '视频发布任务已完成', }; } /** * 删除平台作品任务执行器 */ async function deleteWorkExecutor(task: Task, updateProgress: ProgressUpdater): Promise { updateProgress({ progress: 10, currentStep: '准备删除...' }); const userId = (task as Task & { userId?: number }).userId; if (!userId) { throw new Error('缺少用户ID'); } const taskData = task as Task & { workId?: number }; if (!taskData.workId) { throw new Error('缺少作品ID'); } updateProgress({ progress: 30, currentStep: '连接平台删除作品...' }); // 执行平台删除 const result = await workService.deletePlatformWork(userId, taskData.workId); if (result.success) { updateProgress({ progress: 70, currentStep: '删除本地记录...' }); // 平台删除成功后,删除本地记录 try { await workService.deleteWork(userId, taskData.workId); logger.info(`Local work ${taskData.workId} deleted after platform deletion`); } catch (error) { logger.warn(`Failed to delete local work ${taskData.workId}:`, error); // 本地删除失败不影响整体结果 } // 删除成功后,自动创建同步作品任务刷新作品列表 if (result.accountId) { updateProgress({ progress: 90, currentStep: '刷新作品列表...' }); try { taskQueueService.createTask(userId, { type: 'sync_works', title: '刷新作品列表', accountId: result.accountId, }); logger.info(`Created sync_works task for account ${result.accountId} after delete`); } catch (syncError) { logger.warn(`Failed to create sync_works task after delete:`, syncError); // 同步任务创建失败不影响删除结果 } } } updateProgress({ progress: 100, currentStep: result.success ? '删除完成' : '删除失败' }); return { success: result.success, message: result.success ? '作品已从平台删除,正在刷新作品列表' : (result.errorMessage || '删除失败'), }; } /** * 小红书作品首批日统计/快照补数任务执行器(不阻塞同步作品) * 任务 data: { workIds: number[] } */ async function xhsWorkStatsBackfillExecutor(task: Task, updateProgress: ProgressUpdater): Promise { updateProgress({ progress: 5, currentStep: '准备补数任务...' }); const userId = (task as Task & { userId?: number }).userId; if (!userId) throw new Error('缺少用户ID'); if (!task.accountId) throw new Error('缺少账号ID'); const workIdsRaw = (task as Task & { workIds?: unknown }).workIds; const workIds = Array.isArray(workIdsRaw) ? workIdsRaw.map((x) => Number(x)).filter((n) => Number.isFinite(n) && n > 0) : []; if (!workIds.length) throw new Error('缺少 workIds'); // 仅允许当前用户自己的账号 const account = await AppDataSource.getRepository(PlatformAccount).findOne({ where: { id: task.accountId, userId, platform: 'xiaohongshu' as any }, }); if (!account) throw new Error('未找到账号或无权限'); const total = workIds.length; updateProgress({ progress: 15, currentStep: `开始补数(作品数:${total})...`, totalSteps: total }); await xhsWorkStatsService.importAccountWorksStatistics(account, false, { workIdFilter: workIds, ignorePublishTimeLimit: true, ignorePublishAgeLimit: true, onProgress: ({ index, total, work }) => { const pct = Math.min(99, Math.max(15, Math.round(15 + (index / Math.max(1, total)) * 84))); updateProgress({ progress: pct, currentStepIndex: index, totalSteps: total, currentStep: `第 ${index}/${total} 个作品:${(work.title || '').trim() || `workId=${work.id}`}`, }); }, }); updateProgress({ progress: 100, currentStep: '补数完成' }); return { success: true, message: `补数完成,作品数:${workIds.length}`, data: { workIdsCount: workIds.length }, }; } /** * 注册所有任务执行器 */ export function registerTaskExecutors(): void { taskQueueService.registerExecutor('sync_comments', syncCommentsExecutor); taskQueueService.registerExecutor('sync_works', syncWorksExecutor); taskQueueService.registerExecutor('sync_account', syncAccountExecutor); taskQueueService.registerExecutor('publish_video', publishVideoExecutor); taskQueueService.registerExecutor('delete_work', deleteWorkExecutor); taskQueueService.registerExecutor('xhs_work_stats_backfill', xhsWorkStatsBackfillExecutor); logger.info('All task executors registered'); }