import { AppDataSource, PublishTask, PublishResult, PlatformAccount, SystemConfig } from '../models/index.js'; import { AppError } from '../middleware/error.js'; import { ERROR_CODES, HTTP_STATUS, WS_EVENTS } from '@media-manager/shared'; import type { PublishTask as PublishTaskType, PublishTaskDetail, CreatePublishTaskRequest, PaginatedData, PlatformType, PublishProxyConfig, } from '@media-manager/shared'; import { wsManager } from '../websocket/index.js'; import { DouyinAdapter } from '../automation/platforms/douyin.js'; import { XiaohongshuAdapter } from '../automation/platforms/xiaohongshu.js'; import { WeixinAdapter } from '../automation/platforms/weixin.js'; import { KuaishouAdapter } from '../automation/platforms/kuaishou.js'; import { BilibiliAdapter } from '../automation/platforms/bilibili.js'; import { BaijiahaoAdapter } from '../automation/platforms/baijiahao.js'; import { BasePlatformAdapter } from '../automation/platforms/base.js'; import { logger } from '../utils/logger.js'; import path from 'path'; import { config } from '../config/index.js'; import { CookieManager } from '../automation/cookie.js'; import { taskQueueService } from './TaskQueueService.js'; import { In } from 'typeorm'; interface GetTasksParams { page: number; pageSize: number; status?: string; } export class PublishService { private taskRepository = AppDataSource.getRepository(PublishTask); private resultRepository = AppDataSource.getRepository(PublishResult); private accountRepository = AppDataSource.getRepository(PlatformAccount); private systemConfigRepository = AppDataSource.getRepository(SystemConfig); // 平台适配器映射 private adapters: Map = new Map(); constructor() { // 初始化平台适配器 this.adapters.set('douyin', new DouyinAdapter()); this.adapters.set('xiaohongshu', new XiaohongshuAdapter()); this.adapters.set('weixin_video', new WeixinAdapter()); this.adapters.set('kuaishou', new KuaishouAdapter()); this.adapters.set('bilibili', new BilibiliAdapter()); this.adapters.set('baijiahao', new BaijiahaoAdapter()); } /** * 获取平台适配器 */ private getAdapter(platform: PlatformType) { const adapter = this.adapters.get(platform); if (!adapter) { throw new AppError(`不支持的平台: ${platform}`, HTTP_STATUS.BAD_REQUEST, ERROR_CODES.VALIDATION); } return adapter; } async getTasks(userId: number, params: GetTasksParams): Promise> { const { page, pageSize, status } = params; const skip = (page - 1) * pageSize; const queryBuilder = this.taskRepository .createQueryBuilder('task') .where('task.userId = :userId', { userId }); if (status) { queryBuilder.andWhere('task.status = :status', { status }); } const [tasks, total] = await queryBuilder .orderBy('task.createdAt', 'DESC') .skip(skip) .take(pageSize) .getManyAndCount(); return { items: tasks.map(this.formatTask), total, page, pageSize, totalPages: Math.ceil(total / pageSize), }; } async getTaskById(userId: number, taskId: number): Promise { const task = await this.taskRepository.findOne({ where: { id: taskId, userId }, relations: ['results'], }); if (!task) { throw new AppError('任务不存在', HTTP_STATUS.NOT_FOUND, ERROR_CODES.NOT_FOUND); } return this.formatTaskDetail(task); } async createTask(userId: number, data: CreatePublishTaskRequest): Promise { // 验证目标账号是否存在 const validAccountIds: number[] = []; const invalidAccountIds: number[] = []; for (const accountId of data.targetAccounts) { const account = await this.accountRepository.findOne({ where: { id: accountId, userId } }); if (account) { validAccountIds.push(accountId); } else { invalidAccountIds.push(accountId); logger.warn(`[PublishService] Account ${accountId} not found or not owned by user ${userId}, skipping`); } } if (validAccountIds.length === 0) { throw new AppError('所选账号不存在或已被删除', HTTP_STATUS.BAD_REQUEST, ERROR_CODES.VALIDATION); } if (invalidAccountIds.length > 0) { logger.warn(`[PublishService] ${invalidAccountIds.length} invalid accounts skipped: ${invalidAccountIds.join(', ')}`); } const task = this.taskRepository.create({ userId, videoPath: data.videoPath, videoFilename: data.videoPath.split('/').pop() || null, title: data.title, description: data.description || null, coverPath: data.coverPath || null, tags: data.tags || null, targetAccounts: validAccountIds, // 只保存有效的账号 ID platformConfigs: data.platformConfigs || null, publishProxy: data.publishProxy || null, status: 'pending', // 初始状态为 pending,任务队列执行时再更新为 processing scheduledAt: data.scheduledAt ? new Date(data.scheduledAt) : null, }); await this.taskRepository.save(task); // 创建发布结果记录(只为有效账号创建) for (const accountId of validAccountIds) { const result = this.resultRepository.create({ taskId: task.id, accountId, }); await this.resultRepository.save(result); } // 通知客户端 wsManager.sendToUser(userId, WS_EVENTS.TASK_CREATED, { task: this.formatTask(task) }); // 返回任务信息,发布任务将通过任务队列执行 // 调用者需要调用 taskQueueService.createTask 来创建队列任务 return this.formatTask(task); } /** * 带进度回调的发布任务执行 */ async executePublishTaskWithProgress( taskId: number, userId: number, onProgress?: (progress: number, message: string) => void ): Promise { const task = await this.taskRepository.findOne({ where: { id: taskId }, relations: ['results'], }); if (!task) { throw new Error(`Task ${taskId} not found`); } // 更新任务状态为处理中 await this.taskRepository.update(taskId, { status: 'processing' }); wsManager.sendToUser(userId, WS_EVENTS.TASK_STATUS_CHANGED, { taskId, status: 'processing', }); const results = task.results || []; let successCount = 0; let failCount = 0; const totalAccounts = results.length; let publishProxyExtra: Awaited> = null; try { publishProxyExtra = await this.buildPublishProxyExtra(task.publishProxy); } catch (error) { const errorMessage = error instanceof Error ? error.message : '发布代理配置错误'; logger.error(`[PublishService] publish proxy config error: ${errorMessage}`); for (const r of results) { await this.resultRepository.update(r.id, { status: 'failed', errorMessage, }); } await this.taskRepository.update(taskId, { status: 'failed', publishedAt: new Date(), }); wsManager.sendToUser(userId, WS_EVENTS.TASK_STATUS_CHANGED, { taskId, status: 'failed', successCount: 0, failCount: totalAccounts, }); onProgress?.(100, `发布失败: ${errorMessage}`); return; } // 构建视频文件的完整路径 let videoPath = task.videoPath || ''; // 处理各种路径格式 if (videoPath) { // 如果路径以 /uploads/ 开头,提取相对路径部分 if (videoPath.startsWith('/uploads/')) { videoPath = path.join(config.upload.path, videoPath.replace('/uploads/', '')); } // 如果是相对路径(不是绝对路径),拼接上传目录 else if (!path.isAbsolute(videoPath)) { // 移除可能的重复 uploads 前缀 videoPath = videoPath.replace(/^uploads[\\\/]+uploads[\\\/]+/, ''); videoPath = videoPath.replace(/^uploads[\\\/]+/, ''); videoPath = path.join(config.upload.path, videoPath); } } logger.info(`Publishing video: ${videoPath}`); onProgress?.(5, `准备发布到 ${totalAccounts} 个账号...`); // 遍历所有目标账号,逐个发布 for (let i = 0; i < results.length; i++) { const result = results[i]; const accountProgress = Math.floor((i / totalAccounts) * 80) + 10; try { // 获取账号信息 const account = await this.accountRepository.findOne({ where: { id: result.accountId }, }); if (!account) { logger.warn(`Account ${result.accountId} not found`); await this.resultRepository.update(result.id, { status: 'failed', errorMessage: '账号不存在', }); failCount++; continue; } if (!account.cookieData) { logger.warn(`Account ${result.accountId} has no cookies`); await this.resultRepository.update(result.id, { status: 'failed', errorMessage: '账号未登录', }); failCount++; continue; } // 解密 Cookie let decryptedCookies: string; try { decryptedCookies = CookieManager.decrypt(account.cookieData); } catch { // 如果解密失败,可能是未加密的 Cookie decryptedCookies = account.cookieData; } // 更新发布结果的平台信息 await this.resultRepository.update(result.id, { platform: account.platform, }); // 获取适配器 const adapter = this.getAdapter(account.platform as PlatformType); logger.info(`Publishing to account ${account.accountName} (${account.platform})`); onProgress?.(accountProgress, `正在发布到 ${account.accountName}...`); // 发送进度通知 wsManager.sendToUser(userId, WS_EVENTS.PUBLISH_PROGRESS, { taskId, accountId: account.id, platform: account.platform, status: 'uploading', progress: 0, message: '开始发布...', }); // 验证码处理回调(支持短信验证码和图形验证码) const onCaptchaRequired = async (captchaInfo: { taskId: string; type: 'sms' | 'image'; phone?: string; imageBase64?: string; }): Promise => { return new Promise((resolve, reject) => { const captchaTaskId = captchaInfo.taskId; // 发送验证码请求到前端 const message = captchaInfo.type === 'sms' ? '请输入短信验证码' : '请输入图片中的验证码'; logger.info(`[Publish] Requesting ${captchaInfo.type} captcha, taskId: ${captchaTaskId}, phone: ${captchaInfo.phone}`); wsManager.sendToUser(userId, WS_EVENTS.CAPTCHA_REQUIRED, { taskId, captchaTaskId, type: captchaInfo.type, phone: captchaInfo.phone || '', imageBase64: captchaInfo.imageBase64 || '', message, }); // 设置超时(2分钟) const timeout = setTimeout(() => { wsManager.removeCaptchaListener(captchaTaskId); reject(new Error('验证码输入超时')); }, 120000); // 注册验证码监听 wsManager.onCaptchaSubmit(captchaTaskId, (code: string) => { clearTimeout(timeout); wsManager.removeCaptchaListener(captchaTaskId); logger.info(`[Publish] Received captcha code for ${captchaTaskId}`); resolve(code); }); }); }; // 执行发布 const publishResult = await adapter.publishVideo( decryptedCookies, { videoPath, title: task.title || '', description: task.description || undefined, coverPath: task.coverPath || undefined, tags: task.tags || undefined, extra: { userId, publishTaskId: taskId, publishAccountId: account.id, publishProxy: publishProxyExtra, }, }, (progress, message) => { // 发送进度更新 wsManager.sendToUser(userId, WS_EVENTS.PUBLISH_PROGRESS, { taskId, accountId: account.id, platform: account.platform, status: 'processing', progress, message, }); }, onCaptchaRequired ); if (publishResult.success) { await this.resultRepository.update(result.id, { status: 'success', videoUrl: publishResult.videoUrl || null, platformVideoId: publishResult.platformVideoId || null, publishedAt: new Date(), }); successCount++; wsManager.sendToUser(userId, WS_EVENTS.PUBLISH_PROGRESS, { taskId, accountId: account.id, platform: account.platform, status: 'success', progress: 100, message: '发布成功', }); } else { await this.resultRepository.update(result.id, { status: 'failed', errorMessage: publishResult.errorMessage || '发布失败', }); failCount++; wsManager.sendToUser(userId, WS_EVENTS.PUBLISH_PROGRESS, { taskId, accountId: account.id, platform: account.platform, status: 'failed', progress: 0, message: publishResult.errorMessage || '发布失败', }); } // 每个账号发布后等待一段时间,避免过于频繁 await new Promise(resolve => setTimeout(resolve, 5000)); } catch (error) { logger.error(`Failed to publish to account ${result.accountId}:`, error); await this.resultRepository.update(result.id, { status: 'failed', errorMessage: error instanceof Error ? error.message : '发布失败', }); failCount++; } } // 更新任务状态 const finalStatus = failCount === 0 ? 'completed' : (successCount === 0 ? 'failed' : 'completed'); await this.taskRepository.update(taskId, { status: finalStatus, publishedAt: new Date(), }); wsManager.sendToUser(userId, WS_EVENTS.TASK_STATUS_CHANGED, { taskId, status: finalStatus, successCount, failCount, }); onProgress?.(100, `发布完成: ${successCount} 成功, ${failCount} 失败`); logger.info(`Task ${taskId} completed: ${successCount} success, ${failCount} failed`); // 发布成功后,自动创建同步作品任务 if (successCount > 0) { // 收集成功发布的账号ID const successAccountIds = new Set(); for (const result of results) { if (result.status === 'success') { successAccountIds.add(result.accountId); } } // 为每个成功的账号创建同步任务 for (const accountId of successAccountIds) { const account = await this.accountRepository.findOne({ where: { id: accountId } }); if (account) { taskQueueService.createTask(userId, { type: 'sync_works', title: `同步作品 - ${account.accountName || '账号'}`, accountId: account.id, }); logger.info(`Created sync_works task for account ${accountId} after publish`); } } } } async cancelTask(userId: number, taskId: number): Promise { const task = await this.taskRepository.findOne({ where: { id: taskId, userId }, }); if (!task) { throw new AppError('任务不存在', HTTP_STATUS.NOT_FOUND, ERROR_CODES.NOT_FOUND); } if (!['pending', 'processing'].includes(task.status)) { throw new AppError('该任务状态不可取消', HTTP_STATUS.BAD_REQUEST, ERROR_CODES.VALIDATION); } await this.taskRepository.update(taskId, { status: 'cancelled' }); wsManager.sendToUser(userId, WS_EVENTS.TASK_STATUS_CHANGED, { taskId, status: 'cancelled', }); } async retryTask(userId: number, taskId: number): Promise { const task = await this.taskRepository.findOne({ where: { id: taskId, userId }, }); if (!task) { throw new AppError('任务不存在', HTTP_STATUS.NOT_FOUND, ERROR_CODES.NOT_FOUND); } // 允许重试失败或卡住(processing)的任务 if (!['failed', 'processing'].includes(task.status)) { throw new AppError('只能重试失败或卡住的任务', HTTP_STATUS.BAD_REQUEST, ERROR_CODES.VALIDATION); } await this.taskRepository.update(taskId, { status: 'pending' }); // 重置失败的发布结果 await this.resultRepository.update( { taskId, status: 'failed' }, { status: null, errorMessage: null } ); const updated = await this.taskRepository.findOne({ where: { id: taskId } }); wsManager.sendToUser(userId, WS_EVENTS.TASK_STATUS_CHANGED, { taskId, status: 'processing', }); // 返回任务信息,调用者需要通过任务队列重新执行 return this.formatTask(updated!); } /** * 单账号有头浏览器重试发布(用于验证码场景) * 调用 Python API 以有头浏览器模式执行发布 */ async retryAccountWithHeadfulBrowser( userId: number, taskId: number, accountId: number ): Promise<{ success: boolean; message: string; error?: string }> { // 1. 验证任务存在 const task = await this.taskRepository.findOne({ where: { id: taskId, userId }, relations: ['results'], }); if (!task) { throw new AppError('任务不存在', HTTP_STATUS.NOT_FOUND, ERROR_CODES.NOT_FOUND); } // 2. 获取账号信息 const account = await this.accountRepository.findOne({ where: { id: accountId }, }); if (!account) { throw new AppError('账号不存在', HTTP_STATUS.NOT_FOUND, ERROR_CODES.NOT_FOUND); } // 3. 获取发布结果记录 const publishResult = task.results?.find(r => r.accountId === accountId); if (!publishResult) { throw new AppError('发布结果不存在', HTTP_STATUS.NOT_FOUND, ERROR_CODES.NOT_FOUND); } // 4. 解密 Cookie let decryptedCookies: string; try { decryptedCookies = CookieManager.decrypt(account.cookieData || ''); } catch { decryptedCookies = account.cookieData || ''; } // 5. 构建视频文件的完整路径 let videoPath = task.videoPath || ''; if (videoPath) { if (videoPath.startsWith('/uploads/')) { videoPath = path.join(config.upload.path, videoPath.replace('/uploads/', '')); } else if (!path.isAbsolute(videoPath)) { videoPath = videoPath.replace(/^uploads[\\\/]+uploads[\\\/]+/, ''); videoPath = videoPath.replace(/^uploads[\\\/]+/, ''); videoPath = path.join(config.upload.path, videoPath); } } const publishProxyExtra = await this.buildPublishProxyExtra(task.publishProxy); // 6. 调用 Python API(有头浏览器模式) const PYTHON_SERVICE_URL = process.env.PYTHON_PUBLISH_SERVICE_URL || 'http://localhost:5005'; logger.info(`[Headful Publish] Starting headful browser publish for account ${account.accountName} (${account.platform})`); // 更新状态为处理中 await this.resultRepository.update(publishResult.id, { status: null, errorMessage: null, }); // 发送 WebSocket 通知 wsManager.sendToUser(userId, WS_EVENTS.PUBLISH_PROGRESS, { taskId, accountId: account.id, platform: account.platform, status: 'processing', progress: 10, message: '正在启动有头浏览器发布...', }); try { const absoluteVideoPath = path.isAbsolute(videoPath) ? videoPath : path.resolve(process.cwd(), videoPath); const response = await fetch(`${PYTHON_SERVICE_URL}/publish/ai-assisted`, { method: 'POST', headers: { 'Content-Type': 'application/json' }, body: JSON.stringify({ platform: account.platform, cookie: decryptedCookies, user_id: userId, publish_task_id: taskId, publish_account_id: accountId, proxy: publishProxyExtra, title: task.title, description: task.description || task.title, video_path: absoluteVideoPath, cover_path: task.coverPath ? path.resolve(process.cwd(), task.coverPath) : undefined, tags: task.tags || [], headless: false, // 关键:使用有头浏览器模式 }), signal: AbortSignal.timeout(600000), // 10分钟超时 }); const result = await response.json(); if (result.success) { // 更新发布结果为成功 await this.resultRepository.update(publishResult.id, { status: 'success', videoUrl: result.video_url || null, platformVideoId: result.video_id || null, publishedAt: new Date(), errorMessage: null, }); wsManager.sendToUser(userId, WS_EVENTS.PUBLISH_PROGRESS, { taskId, accountId: account.id, platform: account.platform, status: 'success', progress: 100, message: '发布成功', }); logger.info(`[Headful Publish] Success for account ${account.accountName}`); return { success: true, message: '发布成功' }; } else { // 发布失败 const errorMsg = result.error || '发布失败'; await this.resultRepository.update(publishResult.id, { status: 'failed', errorMessage: errorMsg, }); wsManager.sendToUser(userId, WS_EVENTS.PUBLISH_PROGRESS, { taskId, accountId: account.id, platform: account.platform, status: 'failed', progress: 0, message: errorMsg, }); logger.warn(`[Headful Publish] Failed for account ${account.accountName}: ${errorMsg}`); return { success: false, message: '发布失败', error: errorMsg }; } } catch (error) { const errorMsg = error instanceof Error ? error.message : '发布失败'; await this.resultRepository.update(publishResult.id, { status: 'failed', errorMessage: errorMsg, }); wsManager.sendToUser(userId, WS_EVENTS.PUBLISH_PROGRESS, { taskId, accountId: account.id, platform: account.platform, status: 'failed', progress: 0, message: errorMsg, }); logger.error(`[Headful Publish] Error for account ${account.accountName}:`, error); return { success: false, message: '发布失败', error: errorMsg }; } } async deleteTask(userId: number, taskId: number): Promise { const task = await this.taskRepository.findOne({ where: { id: taskId, userId }, }); if (!task) { throw new AppError('任务不存在', HTTP_STATUS.NOT_FOUND, ERROR_CODES.NOT_FOUND); } // 不能删除正在执行的任务 if (task.status === 'processing') { throw new AppError('不能删除正在执行的任务', HTTP_STATUS.BAD_REQUEST, ERROR_CODES.VALIDATION); } // 先删除关联的发布结果 await this.resultRepository.delete({ taskId }); // 再删除任务 await this.taskRepository.delete(taskId); } private formatTask(task: PublishTask): PublishTaskType { return { id: task.id, userId: task.userId, videoPath: task.videoPath || '', videoFilename: task.videoFilename || '', title: task.title || '', description: task.description, coverPath: task.coverPath, tags: task.tags || [], targetAccounts: task.targetAccounts || [], platformConfigs: task.platformConfigs || [], publishProxy: task.publishProxy, status: task.status, scheduledAt: task.scheduledAt?.toISOString() || null, publishedAt: task.publishedAt?.toISOString() || null, createdAt: task.createdAt.toISOString(), updatedAt: task.updatedAt.toISOString(), }; } private async buildPublishProxyExtra(publishProxy: PublishProxyConfig | null | undefined): Promise { if (!publishProxy?.enabled) return null; const provider = publishProxy.provider || 'shenlong'; if (provider !== 'shenlong') return null; const rows = await this.systemConfigRepository.find({ where: { configKey: 'publish_proxy_city_api_url' }, }); const cityApiUrl = String(rows?.[0]?.configValue || '').trim(); if (!cityApiUrl) { return null; } return { enabled: true, provider: 'shenlong', city: String(publishProxy.city || '').trim(), apiUrl: cityApiUrl, regionCode: publishProxy.regionCode ? String(publishProxy.regionCode).trim() : undefined, regionName: publishProxy.regionName ? String(publishProxy.regionName).trim() : undefined, }; } private formatTaskDetail(task: PublishTask): PublishTaskDetail { return { ...this.formatTask(task), results: task.results?.map(r => ({ id: r.id, taskId: r.taskId, accountId: r.accountId, platform: r.platform!, status: r.status!, videoUrl: r.videoUrl, platformVideoId: r.platformVideoId, errorMessage: r.errorMessage, publishedAt: r.publishedAt?.toISOString() || null, })) || [], }; } }