||
- import { AppDataSource, PublishTask, PublishResult, PlatformAccount, SystemConfig } from '../models/index.js';
- import { AppError } from '../middleware/error.js';
- import { ERROR_CODES, HTTP_STATUS, WS_EVENTS } from '@media-manager/shared';
- import type {
- PublishTask as PublishTaskType,
- PublishTaskDetail,
- CreatePublishTaskRequest,
- PaginatedData,
- PlatformType,
- PublishProxyConfig,
- } from '@media-manager/shared';
- import { wsManager } from '../websocket/index.js';
- import { DouyinAdapter } from '../automation/platforms/douyin.js';
- import { XiaohongshuAdapter } from '../automation/platforms/xiaohongshu.js';
- import { WeixinAdapter } from '../automation/platforms/weixin.js';
- import { KuaishouAdapter } from '../automation/platforms/kuaishou.js';
- import { BilibiliAdapter } from '../automation/platforms/bilibili.js';
- import { BaijiahaoAdapter } from '../automation/platforms/baijiahao.js';
- import { BasePlatformAdapter } from '../automation/platforms/base.js';
- import { logger } from '../utils/logger.js';
- import path from 'path';
- import { config } from '../config/index.js';
- import { CookieManager } from '../automation/cookie.js';
- import { taskQueueService } from './TaskQueueService.js';
- import { In } from 'typeorm';
- interface GetTasksParams {
- page: number;
- pageSize: number;
- status?: string;
- }
- export class PublishService {
- private taskRepository = AppDataSource.getRepository(PublishTask);
- private resultRepository = AppDataSource.getRepository(PublishResult);
- private accountRepository = AppDataSource.getRepository(PlatformAccount);
- private systemConfigRepository = AppDataSource.getRepository(SystemConfig);
- // 平台适配器映射
- private adapters: Map<PlatformType, BasePlatformAdapter> = new Map();
- constructor() {
- // 初始化平台适配器
- this.adapters.set('douyin', new DouyinAdapter());
- this.adapters.set('xiaohongshu', new XiaohongshuAdapter());
- this.adapters.set('weixin_video', new WeixinAdapter());
- this.adapters.set('kuaishou', new KuaishouAdapter());
- this.adapters.set('bilibili', new BilibiliAdapter());
- this.adapters.set('baijiahao', new BaijiahaoAdapter());
- }
- /**
- * 获取平台适配器
- */
- private getAdapter(platform: PlatformType) {
- const adapter = this.adapters.get(platform);
- if (!adapter) {
- throw new AppError(`不支持的平台: ${platform}`, HTTP_STATUS.BAD_REQUEST, ERROR_CODES.VALIDATION);
- }
- return adapter;
- }
- async getTasks(userId: number, params: GetTasksParams): Promise<PaginatedData<PublishTaskType>> {
- const { page, pageSize, status } = params;
- const skip = (page - 1) * pageSize;
- const queryBuilder = this.taskRepository
- .createQueryBuilder('task')
- .where('task.userId = :userId', { userId });
- if (status) {
- queryBuilder.andWhere('task.status = :status', { status });
- }
- const [tasks, total] = await queryBuilder
- .orderBy('task.createdAt', 'DESC')
- .skip(skip)
- .take(pageSize)
- .getManyAndCount();
- return {
- items: tasks.map(this.formatTask),
- total,
- page,
- pageSize,
- totalPages: Math.ceil(total / pageSize),
- };
- }
- async getTaskById(userId: number, taskId: number): Promise<PublishTaskDetail> {
- const task = await this.taskRepository.findOne({
- where: { id: taskId, userId },
- relations: ['results'],
- });
- if (!task) {
- throw new AppError('任务不存在', HTTP_STATUS.NOT_FOUND, ERROR_CODES.NOT_FOUND);
- }
- return this.formatTaskDetail(task);
- }
- async createTask(userId: number, data: CreatePublishTaskRequest): Promise<PublishTaskType> {
- // 验证目标账号是否存在
- const validAccountIds: number[] = [];
- const invalidAccountIds: number[] = [];
- for (const accountId of data.targetAccounts) {
- const account = await this.accountRepository.findOne({
- where: { id: accountId, userId }
- });
- if (account) {
- validAccountIds.push(accountId);
- } else {
- invalidAccountIds.push(accountId);
- logger.warn(`[PublishService] Account ${accountId} not found or not owned by user ${userId}, skipping`);
- }
- }
- if (validAccountIds.length === 0) {
- throw new AppError('所选账号不存在或已被删除', HTTP_STATUS.BAD_REQUEST, ERROR_CODES.VALIDATION);
- }
- if (invalidAccountIds.length > 0) {
- logger.warn(`[PublishService] ${invalidAccountIds.length} invalid accounts skipped: ${invalidAccountIds.join(', ')}`);
- }
- const task = this.taskRepository.create({
- userId,
- videoPath: data.videoPath,
- videoFilename: data.videoPath.split('/').pop() || null,
- title: data.title,
- description: data.description || null,
- coverPath: data.coverPath || null,
- tags: data.tags || null,
- targetAccounts: validAccountIds, // 只保存有效的账号 ID
- platformConfigs: data.platformConfigs || null,
- publishProxy: data.publishProxy || null,
- status: 'pending', // 初始状态为 pending,任务队列执行时再更新为 processing
- scheduledAt: data.scheduledAt ? new Date(data.scheduledAt) : null,
- });
- await this.taskRepository.save(task);
- // 创建发布结果记录(只为有效账号创建)
- for (const accountId of validAccountIds) {
- const result = this.resultRepository.create({
- taskId: task.id,
- accountId,
- });
- await this.resultRepository.save(result);
- }
- // 通知客户端
- wsManager.sendToUser(userId, WS_EVENTS.TASK_CREATED, { task: this.formatTask(task) });
- // 返回任务信息,发布任务将通过任务队列执行
- // 调用者需要调用 taskQueueService.createTask 来创建队列任务
- return this.formatTask(task);
- }
- /**
- * 带进度回调的发布任务执行
- */
- async executePublishTaskWithProgress(
- taskId: number,
- userId: number,
- onProgress?: (progress: number, message: string) => void
- ): Promise<void> {
- const task = await this.taskRepository.findOne({
- where: { id: taskId },
- relations: ['results'],
- });
- if (!task) {
- throw new Error(`Task ${taskId} not found`);
- }
- // 更新任务状态为处理中
- await this.taskRepository.update(taskId, { status: 'processing' });
- wsManager.sendToUser(userId, WS_EVENTS.TASK_STATUS_CHANGED, {
- taskId,
- status: 'processing',
- });
- const results = task.results || [];
- let successCount = 0;
- let failCount = 0;
- const totalAccounts = results.length;
- let publishProxyExtra: Awaited<ReturnType<PublishService['buildPublishProxyExtra']>> = null;
- try {
- publishProxyExtra = await this.buildPublishProxyExtra(task.publishProxy);
- } catch (error) {
- const errorMessage = error instanceof Error ? error.message : '发布代理配置错误';
- logger.error(`[PublishService] publish proxy config error: ${errorMessage}`);
- for (const r of results) {
- await this.resultRepository.update(r.id, {
- status: 'failed',
- errorMessage,
- });
- }
- await this.taskRepository.update(taskId, {
- status: 'failed',
- publishedAt: new Date(),
- });
- wsManager.sendToUser(userId, WS_EVENTS.TASK_STATUS_CHANGED, {
- taskId,
- status: 'failed',
- successCount: 0,
- failCount: totalAccounts,
- });
- onProgress?.(100, `发布失败: ${errorMessage}`);
- return;
- }
- // 构建视频文件的完整路径
- let videoPath = task.videoPath || '';
- // 处理各种路径格式
- if (videoPath) {
- // 如果路径以 /uploads/ 开头,提取相对路径部分
- if (videoPath.startsWith('/uploads/')) {
- videoPath = path.join(config.upload.path, videoPath.replace('/uploads/', ''));
- }
- // 如果是相对路径(不是绝对路径),拼接上传目录
- else if (!path.isAbsolute(videoPath)) {
- // 移除可能的重复 uploads 前缀
- videoPath = videoPath.replace(/^uploads[\\\/]+uploads[\\\/]+/, '');
- videoPath = videoPath.replace(/^uploads[\\\/]+/, '');
- videoPath = path.join(config.upload.path, videoPath);
- }
- }
- logger.info(`Publishing video: ${videoPath}`);
- onProgress?.(5, `准备发布到 ${totalAccounts} 个账号...`);
- // 遍历所有目标账号,逐个发布
- for (let i = 0; i < results.length; i++) {
- const result = results[i];
- const accountProgress = Math.floor((i / totalAccounts) * 80) + 10;
- try {
- // 获取账号信息
- const account = await this.accountRepository.findOne({
- where: { id: result.accountId },
- });
- if (!account) {
- logger.warn(`Account ${result.accountId} not found`);
- await this.resultRepository.update(result.id, {
- status: 'failed',
- errorMessage: '账号不存在',
- });
- failCount++;
- continue;
- }
- if (!account.cookieData) {
- logger.warn(`Account ${result.accountId} has no cookies`);
- await this.resultRepository.update(result.id, {
- status: 'failed',
- errorMessage: '账号未登录',
- });
- failCount++;
- continue;
- }
- // 解密 Cookie
- let decryptedCookies: string;
- try {
- decryptedCookies = CookieManager.decrypt(account.cookieData);
- } catch {
- // 如果解密失败,可能是未加密的 Cookie
- decryptedCookies = account.cookieData;
- }
- // 更新发布结果的平台信息
- await this.resultRepository.update(result.id, {
- platform: account.platform,
- });
- // 获取适配器
- const adapter = this.getAdapter(account.platform as PlatformType);
- logger.info(`Publishing to account ${account.accountName} (${account.platform})`);
- onProgress?.(accountProgress, `正在发布到 ${account.accountName}...`);
- // 发送进度通知
- wsManager.sendToUser(userId, WS_EVENTS.PUBLISH_PROGRESS, {
- taskId,
- accountId: account.id,
- platform: account.platform,
- status: 'uploading',
- progress: 0,
- message: '开始发布...',
- });
- // 验证码处理回调(支持短信验证码和图形验证码)
- const onCaptchaRequired = async (captchaInfo: {
- taskId: string;
- type: 'sms' | 'image';
- phone?: string;
- imageBase64?: string;
- }): Promise<string> => {
- return new Promise((resolve, reject) => {
- const captchaTaskId = captchaInfo.taskId;
- // 发送验证码请求到前端
- const message = captchaInfo.type === 'sms'
- ? '请输入短信验证码'
- : '请输入图片中的验证码';
- logger.info(`[Publish] Requesting ${captchaInfo.type} captcha, taskId: ${captchaTaskId}, phone: ${captchaInfo.phone}`);
- wsManager.sendToUser(userId, WS_EVENTS.CAPTCHA_REQUIRED, {
- taskId,
- captchaTaskId,
- type: captchaInfo.type,
- phone: captchaInfo.phone || '',
- imageBase64: captchaInfo.imageBase64 || '',
- message,
- });
- // 设置超时(2分钟)
- const timeout = setTimeout(() => {
- wsManager.removeCaptchaListener(captchaTaskId);
- reject(new Error('验证码输入超时'));
- }, 120000);
- // 注册验证码监听
- wsManager.onCaptchaSubmit(captchaTaskId, (code: string) => {
- clearTimeout(timeout);
- wsManager.removeCaptchaListener(captchaTaskId);
- logger.info(`[Publish] Received captcha code for ${captchaTaskId}`);
- resolve(code);
- });
- });
- };
- // 执行发布
- const publishResult = await adapter.publishVideo(
- decryptedCookies,
- {
- videoPath,
- title: task.title || '',
- description: task.description || undefined,
- coverPath: task.coverPath || undefined,
- tags: task.tags || undefined,
- extra: {
- userId,
- publishTaskId: taskId,
- publishAccountId: account.id,
- publishProxy: publishProxyExtra,
- },
- },
- (progress, message) => {
- // 发送进度更新
- wsManager.sendToUser(userId, WS_EVENTS.PUBLISH_PROGRESS, {
- taskId,
- accountId: account.id,
- platform: account.platform,
- status: 'processing',
- progress,
- message,
- });
- },
- onCaptchaRequired
- );
- if (publishResult.success) {
- await this.resultRepository.update(result.id, {
- status: 'success',
- videoUrl: publishResult.videoUrl || null,
- platformVideoId: publishResult.platformVideoId || null,
- publishedAt: new Date(),
- });
- successCount++;
- wsManager.sendToUser(userId, WS_EVENTS.PUBLISH_PROGRESS, {
- taskId,
- accountId: account.id,
- platform: account.platform,
- status: 'success',
- progress: 100,
- message: '发布成功',
- });
- } else {
- await this.resultRepository.update(result.id, {
- status: 'failed',
- errorMessage: publishResult.errorMessage || '发布失败',
- });
- failCount++;
- wsManager.sendToUser(userId, WS_EVENTS.PUBLISH_PROGRESS, {
- taskId,
- accountId: account.id,
- platform: account.platform,
- status: 'failed',
- progress: 0,
- message: publishResult.errorMessage || '发布失败',
- });
- }
- // 每个账号发布后等待一段时间,避免过于频繁
- await new Promise(resolve => setTimeout(resolve, 5000));
- } catch (error) {
- logger.error(`Failed to publish to account ${result.accountId}:`, error);
- await this.resultRepository.update(result.id, {
- status: 'failed',
- errorMessage: error instanceof Error ? error.message : '发布失败',
- });
- failCount++;
- }
- }
- // 更新任务状态
- const finalStatus = failCount === 0 ? 'completed' : (successCount === 0 ? 'failed' : 'completed');
- await this.taskRepository.update(taskId, {
- status: finalStatus,
- publishedAt: new Date(),
- });
- wsManager.sendToUser(userId, WS_EVENTS.TASK_STATUS_CHANGED, {
- taskId,
- status: finalStatus,
- successCount,
- failCount,
- });
- onProgress?.(100, `发布完成: ${successCount} 成功, ${failCount} 失败`);
- logger.info(`Task ${taskId} completed: ${successCount} success, ${failCount} failed`);
- // 发布成功后,自动创建同步作品任务
- if (successCount > 0) {
- // 收集成功发布的账号ID
- const successAccountIds = new Set<number>();
- for (const result of results) {
- if (result.status === 'success') {
- successAccountIds.add(result.accountId);
- }
- }
- // 为每个成功的账号创建同步任务
- for (const accountId of successAccountIds) {
- const account = await this.accountRepository.findOne({ where: { id: accountId } });
- if (account) {
- taskQueueService.createTask(userId, {
- type: 'sync_works',
- title: `同步作品 - ${account.accountName || '账号'}`,
- accountId: account.id,
- });
- logger.info(`Created sync_works task for account ${accountId} after publish`);
- }
- }
- }
- }
- async cancelTask(userId: number, taskId: number): Promise<void> {
- const task = await this.taskRepository.findOne({
- where: { id: taskId, userId },
- });
- if (!task) {
- throw new AppError('任务不存在', HTTP_STATUS.NOT_FOUND, ERROR_CODES.NOT_FOUND);
- }
- if (!['pending', 'processing'].includes(task.status)) {
- throw new AppError('该任务状态不可取消', HTTP_STATUS.BAD_REQUEST, ERROR_CODES.VALIDATION);
- }
- await this.taskRepository.update(taskId, { status: 'cancelled' });
- wsManager.sendToUser(userId, WS_EVENTS.TASK_STATUS_CHANGED, {
- taskId,
- status: 'cancelled',
- });
- }
- async retryTask(userId: number, taskId: number): Promise<PublishTaskType> {
- const task = await this.taskRepository.findOne({
- where: { id: taskId, userId },
- });
- if (!task) {
- throw new AppError('任务不存在', HTTP_STATUS.NOT_FOUND, ERROR_CODES.NOT_FOUND);
- }
- // 允许重试失败或卡住(processing)的任务
- if (!['failed', 'processing'].includes(task.status)) {
- throw new AppError('只能重试失败或卡住的任务', HTTP_STATUS.BAD_REQUEST, ERROR_CODES.VALIDATION);
- }
- await this.taskRepository.update(taskId, { status: 'pending' });
- // 重置失败的发布结果
- await this.resultRepository.update(
- { taskId, status: 'failed' },
- { status: null, errorMessage: null }
- );
- const updated = await this.taskRepository.findOne({ where: { id: taskId } });
- wsManager.sendToUser(userId, WS_EVENTS.TASK_STATUS_CHANGED, {
- taskId,
- status: 'processing',
- });
- // 返回任务信息,调用者需要通过任务队列重新执行
- return this.formatTask(updated!);
- }
- /**
- * 单账号有头浏览器重试发布(用于验证码场景)
- * 调用 Python API 以有头浏览器模式执行发布
- */
- async retryAccountWithHeadfulBrowser(
- userId: number,
- taskId: number,
- accountId: number
- ): Promise<{ success: boolean; message: string; error?: string }> {
- // 1. 验证任务存在
- const task = await this.taskRepository.findOne({
- where: { id: taskId, userId },
- relations: ['results'],
- });
- if (!task) {
- throw new AppError('任务不存在', HTTP_STATUS.NOT_FOUND, ERROR_CODES.NOT_FOUND);
- }
- // 2. 获取账号信息
- const account = await this.accountRepository.findOne({
- where: { id: accountId },
- });
- if (!account) {
- throw new AppError('账号不存在', HTTP_STATUS.NOT_FOUND, ERROR_CODES.NOT_FOUND);
- }
- // 3. 获取发布结果记录
- const publishResult = task.results?.find(r => r.accountId === accountId);
- if (!publishResult) {
- throw new AppError('发布结果不存在', HTTP_STATUS.NOT_FOUND, ERROR_CODES.NOT_FOUND);
- }
- // 4. 解密 Cookie
- let decryptedCookies: string;
- try {
- decryptedCookies = CookieManager.decrypt(account.cookieData || '');
- } catch {
- decryptedCookies = account.cookieData || '';
- }
- // 5. 构建视频文件的完整路径
- let videoPath = task.videoPath || '';
- if (videoPath) {
- if (videoPath.startsWith('/uploads/')) {
- videoPath = path.join(config.upload.path, videoPath.replace('/uploads/', ''));
- } else if (!path.isAbsolute(videoPath)) {
- videoPath = videoPath.replace(/^uploads[\\\/]+uploads[\\\/]+/, '');
- videoPath = videoPath.replace(/^uploads[\\\/]+/, '');
- videoPath = path.join(config.upload.path, videoPath);
- }
- }
- const publishProxyExtra = await this.buildPublishProxyExtra(task.publishProxy);
- // 6. 调用 Python API(有头浏览器模式)
- const PYTHON_SERVICE_URL = process.env.PYTHON_PUBLISH_SERVICE_URL || 'http://localhost:5005';
- logger.info(`[Headful Publish] Starting headful browser publish for account ${account.accountName} (${account.platform})`);
- // 更新状态为处理中
- await this.resultRepository.update(publishResult.id, {
- status: null,
- errorMessage: null,
- });
- // 发送 WebSocket 通知
- wsManager.sendToUser(userId, WS_EVENTS.PUBLISH_PROGRESS, {
- taskId,
- accountId: account.id,
- platform: account.platform,
- status: 'processing',
- progress: 10,
- message: '正在启动有头浏览器发布...',
- });
- try {
- const absoluteVideoPath = path.isAbsolute(videoPath)
- ? videoPath
- : path.resolve(process.cwd(), videoPath);
- const response = await fetch(`${PYTHON_SERVICE_URL}/publish/ai-assisted`, {
- method: 'POST',
- headers: { 'Content-Type': 'application/json' },
- body: JSON.stringify({
- platform: account.platform,
- cookie: decryptedCookies,
- user_id: userId,
- publish_task_id: taskId,
- publish_account_id: accountId,
- proxy: publishProxyExtra,
- title: task.title,
- description: task.description || task.title,
- video_path: absoluteVideoPath,
- cover_path: task.coverPath ? path.resolve(process.cwd(), task.coverPath) : undefined,
- tags: task.tags || [],
- headless: false, // 关键:使用有头浏览器模式
- }),
- signal: AbortSignal.timeout(600000), // 10分钟超时
- });
- const result = await response.json();
- if (result.success) {
- // 更新发布结果为成功
- await this.resultRepository.update(publishResult.id, {
- status: 'success',
- videoUrl: result.video_url || null,
- platformVideoId: result.video_id || null,
- publishedAt: new Date(),
- errorMessage: null,
- });
- wsManager.sendToUser(userId, WS_EVENTS.PUBLISH_PROGRESS, {
- taskId,
- accountId: account.id,
- platform: account.platform,
- status: 'success',
- progress: 100,
- message: '发布成功',
- });
- logger.info(`[Headful Publish] Success for account ${account.accountName}`);
- return { success: true, message: '发布成功' };
- } else {
- // 发布失败
- const errorMsg = result.error || '发布失败';
- await this.resultRepository.update(publishResult.id, {
- status: 'failed',
- errorMessage: errorMsg,
- });
- wsManager.sendToUser(userId, WS_EVENTS.PUBLISH_PROGRESS, {
- taskId,
- accountId: account.id,
- platform: account.platform,
- status: 'failed',
- progress: 0,
- message: errorMsg,
- });
- logger.warn(`[Headful Publish] Failed for account ${account.accountName}: ${errorMsg}`);
- return { success: false, message: '发布失败', error: errorMsg };
- }
- } catch (error) {
- const errorMsg = error instanceof Error ? error.message : '发布失败';
- await this.resultRepository.update(publishResult.id, {
- status: 'failed',
- errorMessage: errorMsg,
- });
- wsManager.sendToUser(userId, WS_EVENTS.PUBLISH_PROGRESS, {
- taskId,
- accountId: account.id,
- platform: account.platform,
- status: 'failed',
- progress: 0,
- message: errorMsg,
- });
- logger.error(`[Headful Publish] Error for account ${account.accountName}:`, error);
- return { success: false, message: '发布失败', error: errorMsg };
- }
- }
- async deleteTask(userId: number, taskId: number): Promise<void> {
- const task = await this.taskRepository.findOne({
- where: { id: taskId, userId },
- });
- if (!task) {
- throw new AppError('任务不存在', HTTP_STATUS.NOT_FOUND, ERROR_CODES.NOT_FOUND);
- }
- // 不能删除正在执行的任务
- if (task.status === 'processing') {
- throw new AppError('不能删除正在执行的任务', HTTP_STATUS.BAD_REQUEST, ERROR_CODES.VALIDATION);
- }
- // 先删除关联的发布结果
- await this.resultRepository.delete({ taskId });
- // 再删除任务
- await this.taskRepository.delete(taskId);
- }
- private formatTask(task: PublishTask): PublishTaskType {
- return {
- id: task.id,
- userId: task.userId,
- videoPath: task.videoPath || '',
- videoFilename: task.videoFilename || '',
- title: task.title || '',
- description: task.description,
- coverPath: task.coverPath,
- tags: task.tags || [],
- targetAccounts: task.targetAccounts || [],
- platformConfigs: task.platformConfigs || [],
- publishProxy: task.publishProxy,
- status: task.status,
- scheduledAt: task.scheduledAt?.toISOString() || null,
- publishedAt: task.publishedAt?.toISOString() || null,
- createdAt: task.createdAt.toISOString(),
- updatedAt: task.updatedAt.toISOString(),
- };
- }
- private async buildPublishProxyExtra(publishProxy: PublishProxyConfig | null | undefined): Promise<null | {
- enabled: boolean;
- provider: 'shenlong';
- city: string;
- apiUrl: string;
- regionCode?: string;
- regionName?: string;
- }> {
- if (!publishProxy?.enabled) return null;
- const provider = publishProxy.provider || 'shenlong';
- if (provider !== 'shenlong') return null;
- const rows = await this.systemConfigRepository.find({
- where: { configKey: 'publish_proxy_city_api_url' },
- });
- const cityApiUrl = String(rows?.[0]?.configValue || '').trim();
- if (!cityApiUrl) {
- return null;
- }
- return {
- enabled: true,
- provider: 'shenlong',
- city: String(publishProxy.city || '').trim(),
- apiUrl: cityApiUrl,
- regionCode: publishProxy.regionCode ? String(publishProxy.regionCode).trim() : undefined,
- regionName: publishProxy.regionName ? String(publishProxy.regionName).trim() : undefined,
- };
- }
- private formatTaskDetail(task: PublishTask): PublishTaskDetail {
- return {
- ...this.formatTask(task),
- results: task.results?.map(r => ({
- id: r.id,
- taskId: r.taskId,
- accountId: r.accountId,
- platform: r.platform!,
- status: r.status!,
- videoUrl: r.videoUrl,
- platformVideoId: r.platformVideoId,
- errorMessage: r.errorMessage,
- publishedAt: r.publishedAt?.toISOString() || null,
- })) || [],
- };
- }
- }
|