WorkService.ts 35 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934
  1. import { AppDataSource, Work, PlatformAccount, Comment, WorkDayStatistics } from '../models/index.js';
  2. import { AppError } from '../middleware/error.js';
  3. import { ERROR_CODES, HTTP_STATUS } from '@media-manager/shared';
  4. import type { PlatformType, Work as WorkType, WorkStats, WorksQueryParams } from '@media-manager/shared';
  5. import { logger } from '../utils/logger.js';
  6. import { headlessBrowserService } from './HeadlessBrowserService.js';
  7. import { CookieManager } from '../automation/cookie.js';
  8. import { WorkDayStatisticsService } from './WorkDayStatisticsService.js';
  9. import { taskQueueService } from './TaskQueueService.js';
  10. import { wsManager } from '../websocket/index.js';
  11. export class WorkService {
  12. private workRepository = AppDataSource.getRepository(Work);
  13. private accountRepository = AppDataSource.getRepository(PlatformAccount);
  14. private commentRepository = AppDataSource.getRepository(Comment);
  15. private workDayStatisticsService = new WorkDayStatisticsService();
  16. /**
  17. * 获取作品列表
  18. */
  19. async getWorks(userId: number, params: WorksQueryParams): Promise<{ items: WorkType[]; total: number }> {
  20. const queryBuilder = this.workRepository
  21. .createQueryBuilder('work')
  22. .where('work.userId = :userId', { userId });
  23. if (params.accountId) {
  24. queryBuilder.andWhere('work.accountId = :accountId', { accountId: params.accountId });
  25. }
  26. if (params.platform) {
  27. queryBuilder.andWhere('work.platform = :platform', { platform: params.platform });
  28. }
  29. if (params.status) {
  30. queryBuilder.andWhere('work.status = :status', { status: params.status });
  31. }
  32. if (params.keyword) {
  33. queryBuilder.andWhere('work.title LIKE :keyword', { keyword: `%${params.keyword}%` });
  34. }
  35. const page = params.page || 1;
  36. const pageSize = params.pageSize || 12;
  37. const [items, total] = await queryBuilder
  38. .orderBy('work.publishTime', 'DESC')
  39. .skip((page - 1) * pageSize)
  40. .take(pageSize)
  41. .getManyAndCount();
  42. return {
  43. items: items.map(this.formatWork),
  44. total,
  45. };
  46. }
  47. /**
  48. * 获取作品统计
  49. */
  50. async getStats(userId: number, params?: WorksQueryParams): Promise<WorkStats> {
  51. const queryBuilder = this.workRepository
  52. .createQueryBuilder('work')
  53. .select([
  54. 'COUNT(*) as totalCount',
  55. 'SUM(CASE WHEN status = "published" THEN 1 ELSE 0 END) as publishedCount',
  56. 'CAST(SUM(work.playCount) AS SIGNED) as totalPlayCount',
  57. 'CAST(SUM(work.likeCount) AS SIGNED) as totalLikeCount',
  58. 'CAST(SUM(work.commentCount) AS SIGNED) as totalCommentCount',
  59. ])
  60. .where('work.userId = :userId', { userId });
  61. if (params?.accountId) {
  62. queryBuilder.andWhere('work.accountId = :accountId', { accountId: params.accountId });
  63. }
  64. if (params?.platform) {
  65. queryBuilder.andWhere('work.platform = :platform', { platform: params.platform });
  66. }
  67. if (params?.status) {
  68. queryBuilder.andWhere('work.status = :status', { status: params.status });
  69. }
  70. if (params?.keyword) {
  71. queryBuilder.andWhere('work.title LIKE :keyword', { keyword: `%${params.keyword}%` });
  72. }
  73. const result = await queryBuilder.getRawOne();
  74. return {
  75. totalCount: parseInt(result.totalCount) || 0,
  76. publishedCount: parseInt(result.publishedCount) || 0,
  77. totalPlayCount: parseInt(result.totalPlayCount) || 0,
  78. totalLikeCount: parseInt(result.totalLikeCount) || 0,
  79. totalCommentCount: parseInt(result.totalCommentCount) || 0,
  80. };
  81. }
  82. /**
  83. * 获取单个作品
  84. */
  85. async getWorkById(userId: number, workId: number): Promise<WorkType> {
  86. const work = await this.workRepository.findOne({
  87. where: { id: workId, userId },
  88. });
  89. if (!work) {
  90. throw new AppError('作品不存在', HTTP_STATUS.NOT_FOUND, ERROR_CODES.NOT_FOUND);
  91. }
  92. return this.formatWork(work);
  93. }
  94. /**
  95. * 同步账号的作品
  96. */
  97. async syncWorks(
  98. userId: number,
  99. accountId?: number,
  100. platform?: string,
  101. onProgress?: (progress: number, currentStep: string) => void
  102. ): Promise<{
  103. synced: number;
  104. accounts: number;
  105. accountSummaries: Array<{
  106. accountId: number;
  107. platform: PlatformType;
  108. worksListLength: number;
  109. worksCount: number;
  110. source?: string;
  111. pythonAvailable?: boolean;
  112. syncedCount: number;
  113. }>;
  114. }> {
  115. logger.info(`[SyncWorks] Starting sync for userId: ${userId}, accountId: ${accountId || 'all'}, platform: ${platform || 'all'}`);
  116. // 先查看所有账号(调试用)
  117. const allAccounts = await this.accountRepository.find({ where: { userId } });
  118. logger.info(`[SyncWorks] All accounts for user ${userId}: ${allAccounts.map(a => `id=${a.id},status=${a.status},platform=${a.platform}`).join('; ')}`);
  119. // 同时查询 active 和 expired 状态的账号(expired 的账号 cookie 可能实际上还有效)
  120. const queryBuilder = this.accountRepository
  121. .createQueryBuilder('account')
  122. .where('account.userId = :userId', { userId })
  123. .andWhere('account.status IN (:...statuses)', { statuses: ['active', 'expired'] });
  124. if (accountId) {
  125. queryBuilder.andWhere('account.id = :accountId', { accountId });
  126. } else if (platform) {
  127. queryBuilder.andWhere('account.platform = :platform', { platform });
  128. }
  129. const accounts = await queryBuilder.getMany();
  130. logger.info(`[SyncWorks] Found ${accounts.length} accounts (active + expired)`);
  131. let totalSynced = 0;
  132. let accountCount = 0;
  133. const accountSummaries: Array<{
  134. accountId: number;
  135. platform: PlatformType;
  136. worksListLength: number;
  137. worksCount: number;
  138. source?: string;
  139. pythonAvailable?: boolean;
  140. syncedCount: number;
  141. }> = [];
  142. for (let i = 0; i < accounts.length; i++) {
  143. const account = accounts[i];
  144. try {
  145. logger.info(`[SyncWorks] Syncing account ${account.id} (${account.platform}, status: ${account.status})`);
  146. onProgress?.(
  147. Math.min(95, 5 + Math.round((i / Math.max(1, accounts.length)) * 90)),
  148. `同步账号 ${i + 1}/${accounts.length}: ${account.accountName || account.id} (${account.platform})`
  149. );
  150. const result = await this.syncAccountWorks(userId, account, (p, step) => {
  151. const overall = 5 + Math.round(((i + Math.max(0, Math.min(1, p))) / Math.max(1, accounts.length)) * 90);
  152. onProgress?.(Math.min(95, overall), step);
  153. });
  154. totalSynced += result.syncedCount;
  155. accountCount++;
  156. accountSummaries.push({
  157. accountId: account.id,
  158. platform: account.platform as PlatformType,
  159. worksListLength: result.worksListLength,
  160. worksCount: result.worksCount,
  161. source: result.source,
  162. pythonAvailable: result.pythonAvailable,
  163. syncedCount: result.syncedCount,
  164. });
  165. logger.info(`[SyncWorks] Account ${account.id} synced ${result.syncedCount} works`);
  166. // 如果同步成功且账号状态是 expired,则恢复为 active
  167. if (result.syncedCount > 0 && account.status === 'expired') {
  168. await this.accountRepository.update(account.id, { status: 'active' });
  169. logger.info(`[SyncWorks] Account ${account.id} status restored to active`);
  170. }
  171. } catch (error) {
  172. logger.error(`Failed to sync works for account ${account.id}:`, error);
  173. }
  174. }
  175. onProgress?.(100, `同步完成:共同步 ${totalSynced} 条作品(${accountCount} 个账号)`);
  176. logger.info(`[SyncWorks] Complete: ${totalSynced} works synced from ${accountCount} accounts`);
  177. return { synced: totalSynced, accounts: accountCount, accountSummaries };
  178. }
  179. /**
  180. * 同步单个账号的作品
  181. */
  182. async syncAccountWorks(
  183. userId: number,
  184. account: PlatformAccount,
  185. onProgress?: (progress: number, currentStep: string) => void
  186. ): Promise<{
  187. syncedCount: number;
  188. worksListLength: number;
  189. worksCount: number;
  190. source?: string;
  191. pythonAvailable?: boolean;
  192. }> {
  193. logger.info(`[SyncAccountWorks] Starting for account ${account.id} (${account.platform})`);
  194. if (!account.cookieData) {
  195. logger.warn(`Account ${account.id} has no cookie data`);
  196. return { syncedCount: 0, worksListLength: 0, worksCount: 0 };
  197. }
  198. // 解密 Cookie
  199. let decryptedCookies: string;
  200. try {
  201. decryptedCookies = CookieManager.decrypt(account.cookieData);
  202. logger.info(`[SyncAccountWorks] Cookie decrypted successfully`);
  203. } catch {
  204. decryptedCookies = account.cookieData;
  205. logger.info(`[SyncAccountWorks] Using raw cookie data`);
  206. }
  207. // 解析 Cookie - 支持两种格式
  208. const platform = account.platform as PlatformType;
  209. let cookieList: { name: string; value: string; domain: string; path: string }[];
  210. try {
  211. // 先尝试 JSON 格式
  212. cookieList = JSON.parse(decryptedCookies);
  213. logger.info(`[SyncAccountWorks] Parsed ${cookieList.length} cookies from JSON format`);
  214. } catch {
  215. // 如果 JSON 解析失败,尝试解析 "name=value; name2=value2" 格式
  216. cookieList = this.parseCookieString(decryptedCookies, platform);
  217. logger.info(`[SyncAccountWorks] Parsed ${cookieList.length} cookies from string format`);
  218. if (cookieList.length === 0) {
  219. logger.error(`Invalid cookie format for account ${account.id}`);
  220. return { syncedCount: 0, worksListLength: 0, worksCount: 0 };
  221. }
  222. }
  223. // 获取作品列表
  224. logger.info(`[SyncAccountWorks] Fetching account info from ${platform}...`);
  225. onProgress?.(0.1, `获取作品列表中:${account.accountName || account.id} (${platform})`);
  226. const accountInfo = await headlessBrowserService.fetchAccountInfo(platform, cookieList, {
  227. onWorksFetchProgress: (info) => {
  228. const declaredTotal = typeof info.declaredTotal === 'number' ? info.declaredTotal : 0;
  229. const ratio = declaredTotal > 0 ? Math.min(1, info.totalSoFar / declaredTotal) : 0;
  230. onProgress?.(
  231. 0.1 + ratio * 0.2,
  232. `拉取作品中:${account.accountName || account.id} (${platform}) ${info.totalSoFar}/${declaredTotal || '?'}`
  233. );
  234. },
  235. });
  236. logger.info(`[SyncAccountWorks] Got ${accountInfo.worksList?.length || 0} works from API`);
  237. onProgress?.(
  238. 0.3,
  239. `拉取完成:${account.accountName || account.id} (${platform}) python=${accountInfo.pythonAvailable ? 'ok' : 'off'} source=${accountInfo.source || 'unknown'} list=${accountInfo.worksList?.length || 0} total=${accountInfo.worksCount || 0}`
  240. );
  241. // #6075: 同步作品时同步更新账号的头像和昵称
  242. if (accountInfo.accountName || accountInfo.avatarUrl) {
  243. const accountUpdate: Partial<PlatformAccount> = {};
  244. const defaultNames = [`${platform}账号`, '未知账号', '抖音账号', '小红书账号', '快手账号', '视频号账号', 'B站账号', '头条账号', '百家号账号'];
  245. if (accountInfo.accountName && !defaultNames.includes(accountInfo.accountName) && accountInfo.accountName !== account.accountName) {
  246. accountUpdate.accountName = accountInfo.accountName;
  247. }
  248. if (accountInfo.avatarUrl && accountInfo.avatarUrl !== account.avatarUrl) {
  249. accountUpdate.avatarUrl = accountInfo.avatarUrl;
  250. }
  251. if (Object.keys(accountUpdate).length > 0) {
  252. await this.accountRepository.update(account.id, accountUpdate as any);
  253. logger.info(`[SyncAccountWorks] Updated account info: ${Object.keys(accountUpdate).join(', ')}`);
  254. wsManager.sendToUser(userId, 'account:info_updated', {
  255. accountId: account.id,
  256. ...accountUpdate,
  257. });
  258. }
  259. }
  260. let syncedCount = 0;
  261. // 收集远程作品的 platformVideoId
  262. const remotePlatformVideoIds = new Set<string>();
  263. if (accountInfo.worksList && accountInfo.worksList.length > 0) {
  264. const legacyToCanonicalInRun = new Map<string, string>();
  265. const processedPlatformVideoIds = new Set<string>();
  266. const total = accountInfo.worksList.length;
  267. for (const workItem of accountInfo.worksList) {
  268. const titleForId = (workItem.title || '').trim();
  269. const publishTimeForId = (workItem.publishTime || '').trim();
  270. const legacyFallbackId = `${platform}_${titleForId}_${publishTimeForId}`.substring(0, 100);
  271. let canonicalVideoId = (workItem.videoId || '').trim() || legacyFallbackId;
  272. // 小红书每日同步只同步三个月内的作品(原为一年内,现改为三个月)
  273. if (platform === 'xiaohongshu') {
  274. const publishedAt = this.parsePublishTime(workItem.publishTime);
  275. if (publishedAt) {
  276. const threeMonthsAgo = new Date();
  277. threeMonthsAgo.setMonth(threeMonthsAgo.getMonth() - 3);
  278. if (publishedAt < threeMonthsAgo) continue;
  279. }
  280. }
  281. if (platform === 'weixin_video') {
  282. const rawVideoId = (workItem.videoId || '').trim();
  283. if (rawVideoId) {
  284. legacyToCanonicalInRun.set(legacyFallbackId, rawVideoId);
  285. canonicalVideoId = rawVideoId;
  286. } else {
  287. const mapped = legacyToCanonicalInRun.get(legacyFallbackId);
  288. if (mapped) {
  289. canonicalVideoId = mapped;
  290. }
  291. }
  292. }
  293. if (processedPlatformVideoIds.has(canonicalVideoId)) {
  294. continue;
  295. }
  296. processedPlatformVideoIds.add(canonicalVideoId);
  297. remotePlatformVideoIds.add(canonicalVideoId);
  298. if (legacyFallbackId !== canonicalVideoId) {
  299. remotePlatformVideoIds.add(legacyFallbackId);
  300. }
  301. let work = await this.workRepository.findOne({
  302. where: { accountId: account.id, platformVideoId: canonicalVideoId },
  303. });
  304. if (platform === 'weixin_video' && work && legacyFallbackId !== canonicalVideoId) {
  305. const legacyWork = await this.workRepository.findOne({
  306. where: { accountId: account.id, platformVideoId: legacyFallbackId },
  307. });
  308. if (legacyWork && legacyWork.id !== work.id) {
  309. await AppDataSource.getRepository(Comment).update(
  310. { workId: legacyWork.id },
  311. { workId: work.id }
  312. );
  313. await this.workDayStatisticsService.deleteByWorkId(legacyWork.id);
  314. await this.workRepository.delete(legacyWork.id);
  315. }
  316. }
  317. if (!work && legacyFallbackId !== canonicalVideoId) {
  318. const legacyWork = await this.workRepository.findOne({
  319. where: { accountId: account.id, platformVideoId: legacyFallbackId },
  320. });
  321. if (legacyWork) {
  322. const canonicalWork = await this.workRepository.findOne({
  323. where: { accountId: account.id, platformVideoId: canonicalVideoId },
  324. });
  325. if (canonicalWork) {
  326. await AppDataSource.getRepository(Comment).update(
  327. { workId: legacyWork.id },
  328. { workId: canonicalWork.id }
  329. );
  330. await this.workDayStatisticsService.deleteByWorkId(legacyWork.id);
  331. await this.workRepository.delete(legacyWork.id);
  332. work = canonicalWork;
  333. } else {
  334. await this.workRepository.update(legacyWork.id, {
  335. platformVideoId: canonicalVideoId,
  336. });
  337. work = { ...legacyWork, platformVideoId: canonicalVideoId };
  338. }
  339. }
  340. }
  341. if (work) {
  342. await this.workRepository.update(work.id, {
  343. title: workItem.title || work.title,
  344. coverUrl: workItem.coverUrl || work.coverUrl,
  345. videoUrl: workItem.videoUrl !== undefined ? workItem.videoUrl || null : work.videoUrl,
  346. duration: workItem.duration || work.duration,
  347. status: workItem.status || work.status,
  348. playCount: workItem.playCount ?? work.playCount,
  349. likeCount: workItem.likeCount ?? work.likeCount,
  350. commentCount: workItem.commentCount ?? work.commentCount,
  351. shareCount: workItem.shareCount ?? work.shareCount,
  352. collectCount: workItem.collectCount ?? work.collectCount,
  353. });
  354. } else {
  355. // 创建新作品
  356. const work = this.workRepository.create({
  357. userId,
  358. accountId: account.id,
  359. platform,
  360. platformVideoId: canonicalVideoId,
  361. title: workItem.title || '',
  362. coverUrl: workItem.coverUrl || '',
  363. videoUrl: workItem.videoUrl || null,
  364. duration: workItem.duration || '00:00',
  365. status: this.normalizeStatus(workItem.status),
  366. publishTime: this.parsePublishTime(workItem.publishTime),
  367. playCount: workItem.playCount || 0,
  368. likeCount: workItem.likeCount || 0,
  369. commentCount: workItem.commentCount || 0,
  370. shareCount: workItem.shareCount || 0,
  371. collectCount: workItem.collectCount || 0,
  372. });
  373. await this.workRepository.save(work);
  374. }
  375. syncedCount++;
  376. if (syncedCount === 1 || syncedCount === total || syncedCount % 10 === 0) {
  377. onProgress?.(0.3 + (syncedCount / total) * 0.65, `写入作品:${account.accountName || account.id} ${syncedCount}/${total}`);
  378. }
  379. }
  380. logger.info(`Synced ${syncedCount} works for account ${account.id}`);
  381. }
  382. if (platform === 'weixin_video') {
  383. await this.dedupeWeixinVideoWorks(account.id);
  384. }
  385. // 删除本地存在但远程已删除的作品
  386. const remoteListLength = accountInfo.worksList?.length || 0;
  387. const expectedRemoteCount = accountInfo.worksCount || 0;
  388. const remoteComplete =
  389. typeof accountInfo.worksListComplete === 'boolean'
  390. ? accountInfo.worksListComplete
  391. : expectedRemoteCount > 0
  392. ? remoteListLength >= expectedRemoteCount
  393. : remoteListLength > 0;
  394. let skipLocalDeletions = false;
  395. if (!remoteComplete) {
  396. logger.warn(
  397. `[SyncAccountWorks] Skipping local deletions for account ${account.id} because remote works list seems incomplete (remote=${remoteListLength}, expected=${expectedRemoteCount})`
  398. );
  399. skipLocalDeletions = true;
  400. } else if (remotePlatformVideoIds.size === 0) {
  401. logger.warn(`[SyncAccountWorks] Skipping local deletions for account ${account.id} because no remote IDs were collected`);
  402. skipLocalDeletions = true;
  403. } else {
  404. const localWorks = await this.workRepository.find({
  405. where: { accountId: account.id },
  406. });
  407. if (platform === 'weixin_video') {
  408. logger.info(`[SyncAccountWorks] Skipping local deletions for ${platform} account ${account.id} to avoid false deletions`);
  409. skipLocalDeletions = true;
  410. }
  411. if (platform === 'xiaohongshu') {
  412. logger.info(`[SyncAccountWorks] Skipping local deletions for ${platform} account ${account.id} (only syncing works within 3 months)`);
  413. skipLocalDeletions = true;
  414. }
  415. const matchedCount = localWorks.reduce(
  416. (sum, w) => sum + (remotePlatformVideoIds.has(w.platformVideoId) ? 1 : 0),
  417. 0
  418. );
  419. const matchRatio = localWorks.length > 0 ? matchedCount / localWorks.length : 1;
  420. if (!skipLocalDeletions && localWorks.length >= 10 && matchRatio < 0.2) {
  421. logger.warn(
  422. `[SyncAccountWorks] Skipping local deletions for account ${account.id} because remote/local ID match ratio is too low (matched=${matchedCount}/${localWorks.length})`
  423. );
  424. skipLocalDeletions = true;
  425. }
  426. if (!skipLocalDeletions) {
  427. let deletedCount = 0;
  428. for (const localWork of localWorks) {
  429. if (!remotePlatformVideoIds.has(localWork.platformVideoId)) {
  430. await AppDataSource.getRepository(Comment).delete({ workId: localWork.id });
  431. await this.workDayStatisticsService.deleteByWorkId(localWork.id);
  432. await this.workRepository.delete(localWork.id);
  433. deletedCount++;
  434. logger.info(`Deleted work ${localWork.id} (${localWork.title}) - no longer exists on platform`);
  435. }
  436. }
  437. if (deletedCount > 0) {
  438. logger.info(`Deleted ${deletedCount} works that no longer exist on platform for account ${account.id}`);
  439. }
  440. }
  441. }
  442. // 同步作品后:不再基于 works 累计值写 work_day_statistics,当天快照全部交给各平台专门的“每日数据”任务
  443. // try {
  444. // await this.saveWorkDayStatistics(account);
  445. // } catch (error) {
  446. // logger.error(`[SyncAccountWorks] Failed to save day statistics for account ${account.id}:`, error);
  447. // }
  448. // 小红书:如果是新作品且 work_day_statistics 中尚无任何记录,则补首批日统计 & works.yesterday_*(不受14天限制)
  449. if (platform === 'xiaohongshu') {
  450. try {
  451. const works = await this.workRepository.find({
  452. where: { accountId: account.id, platform },
  453. select: ['id'],
  454. });
  455. const workIds = works.map((w) => w.id);
  456. if (workIds.length > 0) {
  457. const rows = await AppDataSource.getRepository(WorkDayStatistics)
  458. .createQueryBuilder('wds')
  459. .select('DISTINCT wds.work_id', 'workId')
  460. .where('wds.work_id IN (:...ids)', { ids: workIds })
  461. .getRawMany();
  462. const hasStats = new Set<number>(rows.map((r: any) => Number(r.workId)));
  463. const needInitIds = workIds.filter((id) => !hasStats.has(id));
  464. if (needInitIds.length > 0) {
  465. logger.info(
  466. `[SyncAccountWorks] XHS account ${account.id} has ${needInitIds.length} works without statistics, running initial note/base import.`
  467. );
  468. // 放入任务队列异步执行,避免阻塞同步作品流程
  469. taskQueueService.createTask(account.userId, {
  470. type: 'xhs_work_stats_backfill',
  471. title: `小红书作品补数(${needInitIds.length})`,
  472. accountId: account.id,
  473. platform: 'xiaohongshu',
  474. data: {
  475. workIds: needInitIds,
  476. },
  477. });
  478. }
  479. }
  480. } catch (err) {
  481. logger.error(
  482. `[SyncAccountWorks] Failed to backfill XHS work_day_statistics for account ${account.id}:`,
  483. err
  484. );
  485. }
  486. }
  487. // 抖音:如果是新作品且 work_day_statistics 中尚无任何记录,则异步补齐历史日统计 & works.yesterday_*(使用 DouyinWorkStatisticsImportService)
  488. if (platform === 'douyin') {
  489. try {
  490. const works = await this.workRepository.find({
  491. where: { accountId: account.id, platform },
  492. select: ['id'],
  493. });
  494. const workIds = works.map((w) => w.id);
  495. if (workIds.length > 0) {
  496. const rows = await AppDataSource.getRepository(WorkDayStatistics)
  497. .createQueryBuilder('wds')
  498. .select('DISTINCT wds.work_id', 'workId')
  499. .where('wds.work_id IN (:...ids)', { ids: workIds })
  500. .getRawMany();
  501. const hasStats = new Set<number>(rows.map((r: any) => Number(r.workId)));
  502. const needInitIds = workIds.filter((id) => !hasStats.has(id));
  503. if (needInitIds.length > 0) {
  504. logger.info(
  505. `[SyncAccountWorks] DY account ${account.id} has ${needInitIds.length} works without statistics, enqueue dy_work_stats_backfill task.`
  506. );
  507. taskQueueService.createTask(account.userId, {
  508. type: 'dy_work_stats_backfill',
  509. title: `抖音作品补数(${needInitIds.length})`,
  510. accountId: account.id,
  511. platform: 'douyin',
  512. data: {
  513. workIds: needInitIds,
  514. },
  515. });
  516. }
  517. }
  518. } catch (err) {
  519. logger.error(
  520. `[SyncAccountWorks] Failed to enqueue DY work_day_statistics backfill for account ${account.id}:`,
  521. err
  522. );
  523. }
  524. }
  525. // 百家号:如果是新作品且 work_day_statistics 中尚无任何记录,则异步补齐历史日统计 & works.yesterday_*(使用 BaijiahaoWorkDailyStatisticsImportService)
  526. if (platform === 'baijiahao') {
  527. try {
  528. const works = await this.workRepository.find({
  529. where: { accountId: account.id, platform },
  530. select: ['id'],
  531. });
  532. const workIds = works.map((w) => w.id);
  533. if (workIds.length > 0) {
  534. const rows = await AppDataSource.getRepository(WorkDayStatistics)
  535. .createQueryBuilder('wds')
  536. .select('DISTINCT wds.work_id', 'workId')
  537. .where('wds.work_id IN (:...ids)', { ids: workIds })
  538. .getRawMany();
  539. const hasStats = new Set<number>(rows.map((r: any) => Number(r.workId)));
  540. const needInitIds = workIds.filter((id) => !hasStats.has(id));
  541. if (needInitIds.length > 0) {
  542. logger.info(
  543. `[SyncAccountWorks] BJ account ${account.id} has ${needInitIds.length} works without statistics, enqueue bj_work_stats_backfill task.`
  544. );
  545. taskQueueService.createTask(account.userId, {
  546. type: 'bj_work_stats_backfill',
  547. title: `百家号作品补数(${needInitIds.length})`,
  548. accountId: account.id,
  549. platform: 'baijiahao',
  550. data: {
  551. workIds: needInitIds,
  552. },
  553. });
  554. }
  555. }
  556. } catch (err) {
  557. logger.error(
  558. `[SyncAccountWorks] Failed to enqueue BJ work_day_statistics backfill for account ${account.id}:`,
  559. err
  560. );
  561. }
  562. }
  563. // 视频号:如果是新作品且 work_day_statistics 中尚无任何记录,则异步补齐日统计 & works.yesterday_*(使用 WeixinVideoWorkStatisticsImportService)
  564. if (platform === 'weixin_video') {
  565. try {
  566. const works = await this.workRepository.find({
  567. where: { accountId: account.id, platform },
  568. select: ['id'],
  569. });
  570. const workIds = works.map((w) => w.id);
  571. if (workIds.length > 0) {
  572. const rows = await AppDataSource.getRepository(WorkDayStatistics)
  573. .createQueryBuilder('wds')
  574. .select('DISTINCT wds.work_id', 'workId')
  575. .where('wds.work_id IN (:...ids)', { ids: workIds })
  576. .getRawMany();
  577. const hasStats = new Set<number>(rows.map((r: any) => Number(r.workId)));
  578. const needInitIds = workIds.filter((id) => !hasStats.has(id));
  579. if (needInitIds.length > 0) {
  580. logger.info(
  581. `[SyncAccountWorks] WX account ${account.id} has ${needInitIds.length} works without statistics, enqueue wx_work_stats_backfill task.`
  582. );
  583. taskQueueService.createTask(account.userId, {
  584. type: 'wx_work_stats_backfill',
  585. title: `视频号作品补数(${needInitIds.length})`,
  586. accountId: account.id,
  587. platform: 'weixin_video',
  588. data: {
  589. workIds: needInitIds,
  590. },
  591. });
  592. }
  593. }
  594. } catch (err) {
  595. logger.error(
  596. `[SyncAccountWorks] Failed to enqueue WX work_day_statistics backfill for account ${account.id}:`,
  597. err
  598. );
  599. }
  600. }
  601. return {
  602. syncedCount,
  603. worksListLength: accountInfo.worksList?.length || 0,
  604. worksCount: accountInfo.worksCount || 0,
  605. source: accountInfo.source,
  606. pythonAvailable: accountInfo.pythonAvailable,
  607. };
  608. }
  609. /**
  610. * 保存作品每日统计数据
  611. */
  612. private async saveWorkDayStatistics(account: PlatformAccount): Promise<void> {
  613. // 已废弃:同步作品时不再基于 works 表当前累计值,往 work_day_statistics 写“今天快照”。
  614. // 保留空实现仅为兼容旧调用点,避免影响同步主流程。
  615. logger.info(
  616. `[SaveWorkDayStatistics] Skip snapshot-based work_day_statistics when syncing works for account ${account.id} (disabled by design).`
  617. );
  618. }
  619. /**
  620. * 标准化状态
  621. */
  622. private normalizeStatus(status: string): string {
  623. const statusMap: Record<string, string> = {
  624. '已发布': 'published',
  625. '审核中': 'reviewing',
  626. '未通过': 'rejected',
  627. '草稿': 'draft',
  628. 'published': 'published',
  629. 'reviewing': 'reviewing',
  630. 'rejected': 'rejected',
  631. 'draft': 'draft',
  632. };
  633. return statusMap[status] || 'published';
  634. }
  635. /**
  636. * 解析发布时间
  637. */
  638. private parsePublishTime(timeStr: string): Date | null {
  639. if (!timeStr) return null;
  640. // 尝试解析各种格式
  641. // 格式: "2025年12月19日 06:33"
  642. const match = timeStr.match(/(\d{4})年(\d{1,2})月(\d{1,2})日\s*(\d{1,2}):(\d{2})/);
  643. if (match) {
  644. const [, year, month, day, hour, minute] = match;
  645. return new Date(parseInt(year), parseInt(month) - 1, parseInt(day), parseInt(hour), parseInt(minute));
  646. }
  647. // 尝试直接解析
  648. const date = new Date(timeStr);
  649. if (!isNaN(date.getTime())) {
  650. return date;
  651. }
  652. return null;
  653. }
  654. /**
  655. * 删除本地作品记录
  656. */
  657. async deleteWork(userId: number, workId: number): Promise<void> {
  658. const work = await this.workRepository.findOne({
  659. where: { id: workId, userId },
  660. });
  661. if (!work) {
  662. throw new AppError('作品不存在', HTTP_STATUS.NOT_FOUND, ERROR_CODES.NOT_FOUND);
  663. }
  664. // 先删除关联的评论和作品每日统计
  665. await AppDataSource.getRepository(Comment).delete({ workId });
  666. await this.workDayStatisticsService.deleteByWorkId(workId);
  667. // 删除作品
  668. await this.workRepository.delete(workId);
  669. logger.info(`Deleted work ${workId} for user ${userId}`);
  670. }
  671. /**
  672. * 删除平台上的作品
  673. * @returns 包含 accountId 用于后续刷新作品列表
  674. */
  675. async deletePlatformWork(
  676. userId: number,
  677. workId: number,
  678. onCaptchaRequired?: (captchaInfo: { taskId: string }) => Promise<string>
  679. ): Promise<{ success: boolean; errorMessage?: string; accountId?: number }> {
  680. const work = await this.workRepository.findOne({
  681. where: { id: workId, userId },
  682. relations: ['account'],
  683. });
  684. if (!work) {
  685. throw new AppError('作品不存在', HTTP_STATUS.NOT_FOUND, ERROR_CODES.NOT_FOUND);
  686. }
  687. const account = await this.accountRepository.findOne({
  688. where: { id: work.accountId },
  689. });
  690. if (!account || !account.cookieData) {
  691. throw new AppError('账号不存在或未登录', HTTP_STATUS.BAD_REQUEST, ERROR_CODES.ACCOUNT_NOT_FOUND);
  692. }
  693. // 解密 Cookie
  694. let decryptedCookies: string;
  695. try {
  696. decryptedCookies = CookieManager.decrypt(account.cookieData);
  697. } catch {
  698. decryptedCookies = account.cookieData;
  699. }
  700. // 根据平台调用对应的删除方法
  701. if (account.platform === 'douyin') {
  702. const { DouyinAdapter } = await import('../automation/platforms/douyin.js');
  703. const adapter = new DouyinAdapter();
  704. const result = await adapter.deleteWork(decryptedCookies, work.platformVideoId, onCaptchaRequired);
  705. if (result.success) {
  706. // 更新作品状态为已删除
  707. await this.workRepository.update(workId, { status: 'deleted' });
  708. logger.info(`Platform work ${workId} deleted successfully`);
  709. }
  710. return { ...result, accountId: account.id };
  711. }
  712. if (account.platform === 'xiaohongshu') {
  713. const { XiaohongshuAdapter } = await import('../automation/platforms/xiaohongshu.js');
  714. const adapter = new XiaohongshuAdapter();
  715. const result = await adapter.deleteWork(decryptedCookies, work.platformVideoId, onCaptchaRequired);
  716. if (result.success) {
  717. // 更新作品状态为已删除
  718. await this.workRepository.update(workId, { status: 'deleted' });
  719. logger.info(`Platform work ${workId} (xiaohongshu) deleted successfully`);
  720. }
  721. return { ...result, accountId: account.id };
  722. }
  723. return { success: false, errorMessage: '暂不支持该平台删除功能' };
  724. }
  725. /**
  726. * 将 cookie 字符串解析为 cookie 列表
  727. */
  728. private parseCookieString(cookieString: string, platform: PlatformType): {
  729. name: string;
  730. value: string;
  731. domain: string;
  732. path: string;
  733. }[] {
  734. // 获取平台对应的域名
  735. const domainMap: Record<string, string> = {
  736. douyin: '.douyin.com',
  737. kuaishou: '.kuaishou.com',
  738. xiaohongshu: '.xiaohongshu.com',
  739. weixin_video: '.qq.com',
  740. bilibili: '.bilibili.com',
  741. toutiao: '.toutiao.com',
  742. baijiahao: '.baidu.com',
  743. qie: '.qq.com',
  744. dayuhao: '.alibaba.com',
  745. };
  746. const domain = domainMap[platform] || `.${platform}.com`;
  747. // 解析 "name=value; name2=value2" 格式的 cookie 字符串
  748. const cookies: { name: string; value: string; domain: string; path: string }[] = [];
  749. const pairs = cookieString.split(';');
  750. for (const pair of pairs) {
  751. const trimmed = pair.trim();
  752. if (!trimmed) continue;
  753. const eqIndex = trimmed.indexOf('=');
  754. if (eqIndex === -1) continue;
  755. const name = trimmed.substring(0, eqIndex).trim();
  756. const value = trimmed.substring(eqIndex + 1).trim();
  757. if (name && value) {
  758. cookies.push({
  759. name,
  760. value,
  761. domain,
  762. path: '/',
  763. });
  764. }
  765. }
  766. return cookies;
  767. }
  768. /**
  769. * 格式化作品
  770. */
  771. private formatWork(work: Work): WorkType {
  772. return {
  773. id: work.id,
  774. accountId: work.accountId,
  775. platform: work.platform as PlatformType,
  776. platformVideoId: work.platformVideoId,
  777. title: work.title,
  778. description: work.description || undefined,
  779. coverUrl: work.coverUrl,
  780. videoUrl: work.videoUrl || undefined,
  781. duration: work.duration,
  782. status: work.status as 'published' | 'reviewing' | 'rejected' | 'draft' | 'deleted',
  783. publishTime: work.publishTime?.toISOString() || '',
  784. playCount: work.playCount,
  785. likeCount: work.likeCount,
  786. commentCount: work.commentCount,
  787. shareCount: work.shareCount,
  788. collectCount: work.collectCount,
  789. yesterdayPlayCount: work.yesterdayPlayCount,
  790. yesterdayLikeCount: work.yesterdayLikeCount,
  791. yesterdayCommentCount: work.yesterdayCommentCount,
  792. yesterdayShareCount: work.yesterdayShareCount,
  793. yesterdayCollectCount: work.yesterdayCollectCount,
  794. yesterdayRecommendCount: (work as any).yesterdayRecommendCount,
  795. yesterdayFansIncrease: work.yesterdayFansIncrease,
  796. yesterdayCoverClickRate: work.yesterdayCoverClickRate,
  797. yesterdayAvgWatchDuration: work.yesterdayAvgWatchDuration,
  798. yesterdayTotalWatchDuration: work.yesterdayTotalWatchDuration,
  799. yesterdayCompletionRate: work.yesterdayCompletionRate,
  800. yesterdayTwoSecondExitRate: work.yesterdayTwoSecondExitRate,
  801. yesterdayCompletionRate5s: work.yesterdayCompletionRate5s,
  802. yesterdayExposureCount: work.yesterdayExposureCount,
  803. createdAt: work.createdAt.toISOString(),
  804. updatedAt: work.updatedAt.toISOString(),
  805. };
  806. }
  807. private async dedupeWeixinVideoWorks(accountId: number): Promise<void> {
  808. const works = await this.workRepository.find({ where: { accountId } });
  809. const groups = new Map<string, Work[]>();
  810. for (const w of works) {
  811. if (!w.title || !w.publishTime) continue;
  812. const key = `${w.title}__${w.publishTime.toISOString()}`;
  813. const list = groups.get(key);
  814. if (list) list.push(w);
  815. else groups.set(key, [w]);
  816. }
  817. for (const list of groups.values()) {
  818. if (list.length <= 1) continue;
  819. list.sort((a, b) => {
  820. const qa = a.platformVideoId.startsWith('weixin_video_') ? 0 : 1;
  821. const qb = b.platformVideoId.startsWith('weixin_video_') ? 0 : 1;
  822. if (qa !== qb) return qb - qa;
  823. return (b.updatedAt?.getTime?.() || 0) - (a.updatedAt?.getTime?.() || 0);
  824. });
  825. const keep = list[0];
  826. for (const dup of list.slice(1)) {
  827. if (dup.id === keep.id) continue;
  828. await this.commentRepository.update({ workId: dup.id }, { workId: keep.id });
  829. await this.workRepository.delete(dup.id);
  830. }
  831. }
  832. }
  833. }
  834. export const workService = new WorkService();