| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276 |
- /**
- * 任务执行器注册
- * 在应用启动时注册所有任务类型的执行器
- */
- import { Task, TaskResult, TaskProgressUpdate } from '@media-manager/shared';
- import { taskQueueService } from './TaskQueueService.js';
- import { CommentService } from './CommentService.js';
- import { WorkService } from './WorkService.js';
- import { AccountService } from './AccountService.js';
- import { PublishService } from './PublishService.js';
- import { XiaohongshuWorkNoteStatisticsImportService } from './XiaohongshuWorkNoteStatisticsImportService.js';
- import { AppDataSource, PlatformAccount } from '../models/index.js';
- import { logger } from '../utils/logger.js';
- // 创建服务实例
- const commentService = new CommentService();
- const workService = new WorkService();
- const accountService = new AccountService();
- const publishService = new PublishService();
- const xhsWorkStatsService = new XiaohongshuWorkNoteStatisticsImportService();
- type ProgressUpdater = (update: Partial<TaskProgressUpdate>) => void;
- /**
- * 同步评论任务执行器
- */
- async function syncCommentsExecutor(task: Task, updateProgress: ProgressUpdater): Promise<TaskResult> {
- updateProgress({ progress: 5, currentStep: '连接平台...' });
- const onProgress = (current: number, total: number, workTitle: string) => {
- const progress = total > 0 ? Math.min(90, Math.round((current / total) * 90) + 5) : 50;
- updateProgress({
- progress,
- currentStep: `正在同步: ${workTitle || `作品 ${current}/${total}`}`,
- currentStepIndex: current,
- });
- };
- // 从任务中获取 userId(需要在创建任务时传入)
- const userId = (task as Task & { userId?: number }).userId;
- if (!userId) {
- throw new Error('缺少用户ID');
- }
- const result = await commentService.syncComments(userId, task.accountId, onProgress);
- updateProgress({ progress: 100, currentStep: '同步完成' });
- return {
- success: true,
- message: `同步完成,共同步 ${result.synced} 条评论`,
- data: {
- syncedCount: result.synced,
- accountCount: result.accounts,
- },
- };
- }
- /**
- * 同步作品任务执行器
- */
- async function syncWorksExecutor(task: Task, updateProgress: ProgressUpdater): Promise<TaskResult> {
- updateProgress({ progress: 5, currentStep: '获取作品列表...' });
- const userId = (task as Task & { userId?: number }).userId;
- if (!userId) {
- throw new Error('缺少用户ID');
- }
- const result = await workService.syncWorks(userId, task.accountId, task.platform, (progress, currentStep) => {
- updateProgress({ progress, currentStep });
- });
- updateProgress({ progress: 100, currentStep: '同步完成' });
- const summaryText = (() => {
- if (result.accountSummaries.length === 1) {
- const s = result.accountSummaries[0];
- return `(${s.platform} list=${s.worksListLength}/${s.worksCount} python=${s.pythonAvailable ? 'ok' : 'off'} source=${s.source || 'unknown'})`;
- }
- return '';
- })();
- return {
- success: true,
- message: `同步完成,共同步 ${result.synced} 个作品${summaryText}`,
- data: {
- syncedCount: result.synced,
- accountCount: result.accounts,
- accountSummaries: result.accountSummaries,
- },
- };
- }
- /**
- * 同步账号信息任务执行器
- */
- async function syncAccountExecutor(task: Task, updateProgress: ProgressUpdater): Promise<TaskResult> {
- updateProgress({ progress: 10, currentStep: '获取账号信息...' });
- const userId = (task as Task & { userId?: number }).userId;
- if (!userId) {
- throw new Error('缺少用户ID');
- }
- if (!task.accountId) {
- throw new Error('缺少账号ID');
- }
- const refreshResult = await accountService.refreshAccount(userId, task.accountId);
- if (refreshResult.needReLogin) {
- updateProgress({ progress: 100, currentStep: '需要重新登录' });
- throw new Error('账号登录已过期或触发风控,需要重新登录');
- }
- updateProgress({ progress: 100, currentStep: '同步完成' });
- return {
- success: true,
- message: '账号信息已更新',
- };
- }
- /**
- * 发布视频任务执行器
- */
- async function publishVideoExecutor(task: Task, updateProgress: ProgressUpdater): Promise<TaskResult> {
- updateProgress({ progress: 5, currentStep: '准备发布...' });
- const userId = (task as Task & { userId?: number }).userId;
- if (!userId) {
- throw new Error('缺少用户ID');
- }
- // 从任务数据中获取发布任务ID
- const taskData = task as Task & { publishTaskId?: number };
- if (!taskData.publishTaskId) {
- throw new Error('缺少发布任务ID');
- }
- // 执行发布任务
- await publishService.executePublishTaskWithProgress(
- taskData.publishTaskId,
- userId,
- (progress, message) => {
- updateProgress({ progress, currentStep: message });
- }
- );
- updateProgress({ progress: 100, currentStep: '发布完成' });
- return {
- success: true,
- message: '视频发布任务已完成',
- };
- }
- /**
- * 删除平台作品任务执行器
- */
- async function deleteWorkExecutor(task: Task, updateProgress: ProgressUpdater): Promise<TaskResult> {
- updateProgress({ progress: 10, currentStep: '准备删除...' });
- const userId = (task as Task & { userId?: number }).userId;
- if (!userId) {
- throw new Error('缺少用户ID');
- }
- const taskData = task as Task & { workId?: number };
- if (!taskData.workId) {
- throw new Error('缺少作品ID');
- }
- updateProgress({ progress: 30, currentStep: '连接平台删除作品...' });
- // 执行平台删除
- const result = await workService.deletePlatformWork(userId, taskData.workId);
- if (result.success) {
- updateProgress({ progress: 70, currentStep: '删除本地记录...' });
-
- // 平台删除成功后,删除本地记录
- try {
- await workService.deleteWork(userId, taskData.workId);
- logger.info(`Local work ${taskData.workId} deleted after platform deletion`);
- } catch (error) {
- logger.warn(`Failed to delete local work ${taskData.workId}:`, error);
- // 本地删除失败不影响整体结果
- }
-
- // 删除成功后,自动创建同步作品任务刷新作品列表
- if (result.accountId) {
- updateProgress({ progress: 90, currentStep: '刷新作品列表...' });
- try {
- taskQueueService.createTask(userId, {
- type: 'sync_works',
- title: '刷新作品列表',
- accountId: result.accountId,
- });
- logger.info(`Created sync_works task for account ${result.accountId} after delete`);
- } catch (syncError) {
- logger.warn(`Failed to create sync_works task after delete:`, syncError);
- // 同步任务创建失败不影响删除结果
- }
- }
- }
- updateProgress({ progress: 100, currentStep: result.success ? '删除完成' : '删除失败' });
- return {
- success: result.success,
- message: result.success ? '作品已从平台删除,正在刷新作品列表' : (result.errorMessage || '删除失败'),
- };
- }
- /**
- * 小红书作品首批日统计/快照补数任务执行器(不阻塞同步作品)
- * 任务 data: { workIds: number[] }
- */
- async function xhsWorkStatsBackfillExecutor(task: Task, updateProgress: ProgressUpdater): Promise<TaskResult> {
- updateProgress({ progress: 5, currentStep: '准备补数任务...' });
- const userId = (task as Task & { userId?: number }).userId;
- if (!userId) throw new Error('缺少用户ID');
- if (!task.accountId) throw new Error('缺少账号ID');
- const workIdsRaw = (task as Task & { workIds?: unknown }).workIds;
- const workIds = Array.isArray(workIdsRaw) ? workIdsRaw.map((x) => Number(x)).filter((n) => Number.isFinite(n) && n > 0) : [];
- if (!workIds.length) throw new Error('缺少 workIds');
- // 仅允许当前用户自己的账号
- const account = await AppDataSource.getRepository(PlatformAccount).findOne({
- where: { id: task.accountId, userId, platform: 'xiaohongshu' as any },
- });
- if (!account) throw new Error('未找到账号或无权限');
- const total = workIds.length;
- updateProgress({ progress: 15, currentStep: `开始补数(作品数:${total})...`, totalSteps: total });
- await xhsWorkStatsService.importAccountWorksStatistics(account, false, {
- workIdFilter: workIds,
- ignorePublishTimeLimit: true,
- ignorePublishAgeLimit: true,
- onProgress: ({ index, total, work }) => {
- const pct = Math.min(99, Math.max(15, Math.round(15 + (index / Math.max(1, total)) * 84)));
- updateProgress({
- progress: pct,
- currentStepIndex: index,
- totalSteps: total,
- currentStep: `第 ${index}/${total} 个作品:${(work.title || '').trim() || `workId=${work.id}`}`,
- });
- },
- });
- updateProgress({ progress: 100, currentStep: '补数完成' });
- return {
- success: true,
- message: `补数完成,作品数:${workIds.length}`,
- data: { workIdsCount: workIds.length },
- };
- }
- /**
- * 注册所有任务执行器
- */
- export function registerTaskExecutors(): void {
- taskQueueService.registerExecutor('sync_comments', syncCommentsExecutor);
- taskQueueService.registerExecutor('sync_works', syncWorksExecutor);
- taskQueueService.registerExecutor('sync_account', syncAccountExecutor);
- taskQueueService.registerExecutor('publish_video', publishVideoExecutor);
- taskQueueService.registerExecutor('delete_work', deleteWorkExecutor);
- taskQueueService.registerExecutor('xhs_work_stats_backfill', xhsWorkStatsBackfillExecutor);
-
- logger.info('All task executors registered');
- }
|