Pārlūkot izejas kodu

数据同步任务统一底层

Ethanfly 20 stundas atpakaļ
vecāks
revīzija
a4aa9bcc78

+ 4 - 8
server/src/scheduler/index.ts

@@ -330,8 +330,7 @@ export class TaskScheduler {
 
     this.isXhsImportRunning = true;
     try {
-      const svc = new XiaohongshuAccountOverviewImportService();
-      await svc.runDailyImportForAllXhsAccounts();
+      await XiaohongshuAccountOverviewImportService.runDailyImport();
     } finally {
       this.isXhsImportRunning = false;
     }
@@ -348,8 +347,7 @@ export class TaskScheduler {
 
     this.isDyImportRunning = true;
     try {
-      const svc = new DouyinAccountOverviewImportService();
-      await svc.runDailyImportForAllDouyinAccounts();
+      await DouyinAccountOverviewImportService.runDailyImport();
     } finally {
       this.isDyImportRunning = false;
     }
@@ -448,8 +446,7 @@ export class TaskScheduler {
 
     this.isBjImportRunning = true;
     try {
-      const svc = new BaijiahaoContentOverviewImportService();
-      await svc.runDailyImportForAllBaijiahaoAccounts();
+      await BaijiahaoContentOverviewImportService.runDailyImport();
     } finally {
       this.isBjImportRunning = false;
     }
@@ -466,8 +463,7 @@ export class TaskScheduler {
 
     this.isWxImportRunning = true;
     try {
-      const svc = new WeixinVideoDataCenterImportService();
-      await svc.runDailyImportForAllWeixinVideoAccounts();
+      await WeixinVideoDataCenterImportService.runDailyImport();
     } finally {
       this.isWxImportRunning = false;
     }

+ 1 - 2
server/src/scripts/run-baijiahao-import.ts

@@ -6,8 +6,7 @@ async function main() {
   try {
     await initDatabase();
     logger.info('[BJ Import] Manual run start...');
-    const svc = new BaijiahaoContentOverviewImportService();
-    await svc.runDailyImportForAllBaijiahaoAccounts();
+    await BaijiahaoContentOverviewImportService.runDailyImport();
     logger.info('[BJ Import] Manual run done.');
     process.exit(0);
   } catch (e) {

+ 1 - 2
server/src/scripts/run-douyin-import.ts

@@ -6,8 +6,7 @@ async function main() {
   try {
     await initDatabase();
     logger.info('[DY Import] Manual run start...');
-    const svc = new DouyinAccountOverviewImportService();
-    await svc.runDailyImportForAllDouyinAccounts();
+    await DouyinAccountOverviewImportService.runDailyImport();
     logger.info('[DY Import] Manual run done.');
     process.exit(0);
   } catch (e) {

+ 1 - 2
server/src/scripts/run-weixin-video-import.ts

@@ -6,8 +6,7 @@ async function main() {
   try {
     await initDatabase();
     logger.info('[WX Import] Manual run start...');
-    const svc = new WeixinVideoDataCenterImportService();
-    await svc.runDailyImportForAllWeixinVideoAccounts();
+    await WeixinVideoDataCenterImportService.runDailyImport();
     logger.info('[WX Import] Manual run done.');
     process.exit(0);
   } catch (e) {

+ 1 - 2
server/src/scripts/run-xhs-import.ts

@@ -6,8 +6,7 @@ async function main() {
   try {
     await initDatabase();
     logger.info('[XHS Import] Manual run start...');
-    const svc = new XiaohongshuAccountOverviewImportService();
-    await svc.runDailyImportForAllXhsAccounts();
+    await XiaohongshuAccountOverviewImportService.runDailyImport();
     logger.info('[XHS Import] Manual run done.');
     process.exit(0);
   } catch (e) {

+ 18 - 25
server/src/services/AccountService.ts

@@ -328,18 +328,13 @@ export class AccountService {
 
     try {
       if (platform === 'xiaohongshu') {
-        // 与定时任务相同:账号概览(观看/互动/涨粉)导出 + 粉丝数据页近30天 overall_new → user_day_statistics.fans_count
-        const svc = new XiaohongshuAccountOverviewImportService();
-        await svc.importAccountLast30Days(account);
+        await XiaohongshuAccountOverviewImportService.runDailyImport();
       } else if (platform === 'douyin') {
-        const svc = new DouyinAccountOverviewImportService();
-        await svc.importAccountLast30Days(account);
+        await DouyinAccountOverviewImportService.runDailyImport();
       } else if (platform === 'baijiahao') {
-        const svc = new BaijiahaoContentOverviewImportService();
-        await svc.importAccountLast30Days(account);
+        await BaijiahaoContentOverviewImportService.runDailyImport();
       } else if (platform === 'weixin_video') {
-        const svc = new WeixinVideoDataCenterImportService();
-        await svc.importAccountLast30Days(account);
+        await WeixinVideoDataCenterImportService.runDailyImport();
       } else {
         logger.info(
           `[addAccount] Initial statistics import skipped for unsupported platform ${platform}`
@@ -568,22 +563,20 @@ export class AccountService {
 
     const updated = await this.accountRepository.findOne({ where: { id: accountId } });
 
-    // 保存账号每日统计数据(粉丝数、作品数)
-    // 无论是否更新了粉丝数/作品数,都要保存当前值到统计表,确保每天都有记录
-    if (updated) {
-      try {
-        const userDayStatisticsService = new UserDayStatisticsService();
-        await userDayStatisticsService.saveStatistics({
-          accountId,
-          fansCount: updated.fansCount || 0,
-          worksCount: updated.worksCount || 0,
-        });
-        logger.debug(`[AccountService] Saved account day statistics for account ${accountId} (fans: ${updated.fansCount || 0}, works: ${updated.worksCount || 0})`);
-      } catch (error) {
-        logger.error(`[AccountService] Failed to save account day statistics for account ${accountId}:`, error);
-        // 不抛出错误,不影响主流程
-      }
-    }
+    // 刷新账号时不再写入 user_day_statistics,仅保留:每天定时任务 + 添加账号时的 initStatisticsForNewAccountAsync
+    // if (updated) {
+    //   try {
+    //     const userDayStatisticsService = new UserDayStatisticsService();
+    //     await userDayStatisticsService.saveStatistics({
+    //       accountId,
+    //       fansCount: updated.fansCount || 0,
+    //       worksCount: updated.worksCount || 0,
+    //     });
+    //     logger.debug(`[AccountService] Saved account day statistics for account ${accountId} (fans: ${updated.fansCount || 0}, works: ${updated.worksCount || 0})`);
+    //   } catch (error) {
+    //     logger.error(`[AccountService] Failed to save account day statistics for account ${accountId}:`, error);
+    //   }
+    // }
 
     // 通知其他客户端
     wsManager.sendToUser(userId, WS_EVENTS.ACCOUNT_UPDATED, { account: this.formatAccount(updated!) });

+ 129 - 1
server/src/services/BaijiahaoContentOverviewImportService.ts

@@ -1,6 +1,6 @@
 import fs from 'node:fs/promises';
 import path from 'node:path';
-import { chromium, type Browser } from 'playwright';
+import { chromium, type Browser, type Page, type BrowserContext } from 'playwright';
 import * as XLSXNS from 'xlsx';
 import { AppDataSource, PlatformAccount } from '../models/index.js';
 import { BrowserManager } from '../automation/browser.js';
@@ -447,6 +447,14 @@ export class BaijiahaoContentOverviewImportService {
   }
 
   /**
+   * 统一入口:定时任务与添加账号均调用此方法,执行“内容分析-基础数据-近30天 + 粉丝 getFansBasicInfo”
+   */
+  static async runDailyImport(): Promise<void> {
+    const svc = new BaijiahaoContentOverviewImportService();
+    await svc.runDailyImportForAllBaijiahaoAccounts();
+  }
+
+  /**
    * 为所有百家号账号导出“数据中心-内容分析-基础数据-近30天”并导入 user_day_statistics
    */
   async runDailyImportForAllBaijiahaoAccounts(): Promise<void> {
@@ -644,6 +652,16 @@ export class BaijiahaoContentOverviewImportService {
         }
       }
 
+      // 粉丝数据:直接请求 getFansBasicInfo(近30天:昨天为结束,往前推30天),不打开页面、不等待点击
+      try {
+        await this.importFansDataByApi(context, account);
+      } catch (e) {
+        logger.warn(
+          `[BJ Import] Fans data import failed (non-fatal). accountId=${account.id}`,
+          e instanceof Error ? e.message : e
+        );
+      }
+
       await context.close();
     } finally {
       if (shouldClose) {
@@ -651,5 +669,115 @@ export class BaijiahaoContentOverviewImportService {
       }
     }
   }
+
+  /**
+   * 粉丝数据:直接请求 getFansBasicInfo(近30天 = 中国时区昨天为结束,往前推 30 天),不打开页面
+   * sum_fans_count → fans_count,new_fans_count → fans_increase
+   * 使用中国时区计算日期,避免服务器非东八区时只拿到部分天数
+   */
+  private async importFansDataByApi(context: BrowserContext, account: PlatformAccount): Promise<void> {
+    const chinaTz = 'Asia/Shanghai';
+    const toChinaYMD = (date: Date): { y: number; m: number; d: number } => {
+      const formatter = new Intl.DateTimeFormat('en-CA', { timeZone: chinaTz, year: 'numeric', month: '2-digit', day: '2-digit' });
+      const parts = formatter.formatToParts(date);
+      const get = (type: string) => parts.find((p) => p.type === type)?.value ?? '0';
+      return { y: parseInt(get('year'), 10), m: parseInt(get('month'), 10), d: parseInt(get('day'), 10) };
+    };
+    const now = new Date();
+    const today = toChinaYMD(now);
+    const yesterdayDate = new Date(Date.UTC(today.y, today.m - 1, today.d, 0, 0, 0, 0) - 24 * 60 * 60 * 1000);
+    const startDate = new Date(yesterdayDate.getTime() - 29 * 24 * 60 * 60 * 1000);
+    const endYMD = toChinaYMD(yesterdayDate);
+    const startYMD = toChinaYMD(startDate);
+    const pad = (n: number) => String(n).padStart(2, '0');
+    const startStr = `${startYMD.y}${pad(startYMD.m)}${pad(startYMD.d)}`;
+    const endStr = `${endYMD.y}${pad(endYMD.m)}${pad(endYMD.d)}`;
+    const apiUrl = `https://baijiahao.baidu.com/author/eco/statistics/getFansBasicInfo?start=${startStr}&end=${endStr}&fans_type=new%2Csum&sort=asc&is_page=0&show_type=chart`;
+
+    logger.info(`[BJ Import] getFansBasicInfo range (China). accountId=${account.id} start=${startStr} end=${endStr}`);
+
+    let body: Record<string, unknown> | null = null;
+    try {
+      const res = await (context as any).request.get(apiUrl, {
+        headers: { Referer: 'https://baijiahao.baidu.com/builder/rc/analysisfans/basedata' },
+      });
+      if (res.ok()) body = await res.json().catch(() => null);
+    } catch (e) {
+      logger.warn(`[BJ Import] getFansBasicInfo request failed. accountId=${account.id}`, e);
+      return;
+    }
+
+    if (!body || typeof body !== 'object') {
+      logger.warn(`[BJ Import] getFansBasicInfo response not valid JSON, skip. accountId=${account.id}`);
+      return;
+    }
+    const errno = (body as any).errno;
+    if (errno !== 0 && errno !== undefined) {
+      logger.warn(`[BJ Import] getFansBasicInfo errno=${errno}, skip. accountId=${account.id}`);
+      return;
+    }
+
+    const list = this.parseGetFansBasicInfoResponse(body);
+    if (!list.length) {
+      logger.info(`[BJ Import] No fans data from getFansBasicInfo. accountId=${account.id}`);
+      return;
+    }
+    const firstDay = list[0]?.recordDate;
+    const lastDay = list[list.length - 1]?.recordDate;
+    const fmtDay = (d: Date) => (d ? `${d.getFullYear()}-${String(d.getMonth() + 1).padStart(2, '0')}-${String(d.getDate()).padStart(2, '0')}` : '');
+    logger.info(`[BJ Import] getFansBasicInfo response. accountId=${account.id} count=${list.length} first=${fmtDay(firstDay)} last=${fmtDay(lastDay)}`);
+
+    let updated = 0;
+    for (const { recordDate, fansCount, fansIncrease } of list) {
+      const r = await this.userDayStatisticsService.saveStatisticsForDate(account.id, recordDate, {
+        fansCount,
+        fansIncrease,
+      });
+      updated += r.inserted + r.updated;
+    }
+    logger.info(`[BJ Import] Fans data imported. accountId=${account.id} days=${list.length} updated=${updated}`);
+  }
+
+  /**
+   * 解析 getFansBasicInfo 接口返回,提取 (recordDate, fansCount, fansIncrease) 列表
+   * sum_fans_count → fans_count,new_fans_count → fans_increase;"--" 或无效值跳过或按 0 处理
+   */
+  private parseGetFansBasicInfoResponse(
+    body: Record<string, unknown>
+  ): Array<{ recordDate: Date; fansCount: number; fansIncrease: number }> {
+    const list: Array<{ recordDate: Date; fansCount: number; fansIncrease: number }> = [];
+    const data = body.data as Record<string, unknown> | undefined;
+    if (!data || typeof data !== 'object') return list;
+
+    const arr = data.list as unknown[] | undefined;
+    if (!Array.isArray(arr)) return list;
+
+    for (const item of arr) {
+      if (!item || typeof item !== 'object') continue;
+      const o = item as Record<string, unknown>;
+      const dayRaw = o.day;
+      if (dayRaw == null) continue;
+      const dayStr = String(dayRaw).trim();
+      if (!/^\d{8}$/.test(dayStr)) continue;
+      const d = normalizeDateText(dayStr);
+      if (!d) continue;
+
+      const sumRaw = o.sum_fans_count;
+      const newRaw = o.new_fans_count;
+      const toNum = (v: unknown): number => {
+        if (v === null || v === undefined) return 0;
+        if (typeof v === 'number' && Number.isFinite(v)) return Math.max(0, Math.round(v));
+        const s = String(v).trim();
+        if (s === '' || s === '--') return 0;
+        const n = Number(s.replace(/,/g, ''));
+        return Number.isFinite(n) ? Math.max(0, Math.round(n)) : 0;
+      };
+      const fansCount = toNum(sumRaw);
+      const fansIncrease = toNum(newRaw);
+
+      list.push({ recordDate: d, fansCount, fansIncrease });
+    }
+    return list;
+  }
 }
 

+ 8 - 0
server/src/services/DouyinAccountOverviewImportService.ts

@@ -304,6 +304,14 @@ export class DouyinAccountOverviewImportService {
   }
 
   /**
+   * 统一入口:定时任务与添加账号均调用此方法,执行“账号总览-短视频-数据表现-近30天”
+   */
+  static async runDailyImport(): Promise<void> {
+    const svc = new DouyinAccountOverviewImportService();
+    await svc.runDailyImportForAllDouyinAccounts();
+  }
+
+  /**
    * 为所有抖音账号导出“账号总览-短视频-数据表现-近30天”并导入 user_day_statistics
    */
   async runDailyImportForAllDouyinAccounts(): Promise<void> {

+ 21 - 5
server/src/services/UserDayStatisticsService.ts

@@ -26,15 +26,31 @@ export class UserDayStatisticsService {
   private statisticsRepository = AppDataSource.getRepository(UserDayStatistics);
 
   /**
+   * 获取「中国时区(Asia/Shanghai)当前日历日」的 Date,用于存库保证 record_date 与业务日期一致
+   * 避免服务器非中国时区时出现“1月29日却生成 record_date=1月30”等问题
+   */
+  private getTodayInChina(): Date {
+    const formatter = new Intl.DateTimeFormat('en-CA', {
+      timeZone: 'Asia/Shanghai',
+      year: 'numeric',
+      month: '2-digit',
+      day: '2-digit',
+    });
+    const parts = formatter.formatToParts(new Date());
+    const get = (type: string) => parts.find((p) => p.type === type)?.value ?? '0';
+    const y = parseInt(get('year'), 10);
+    const m = parseInt(get('month'), 10) - 1;
+    const d = parseInt(get('day'), 10);
+    return new Date(Date.UTC(y, m, d, 0, 0, 0, 0));
+  }
+
+  /**
    * 保存用户每日统计数据
    * 同日更新,隔日新增
+   * 「今天」以中国时区(Asia/Shanghai)为准,避免服务器时区导致 record_date 错日
    */
   async saveStatistics(item: UserDayStatisticsItem): Promise<SaveResult> {
-    // 使用中国时区(UTC+8)计算今天的业务日期
-    const now = new Date();
-    const chinaNow = new Date(now.getTime() + 8 * 60 * 60 * 1000);
-    const today = new Date(chinaNow);
-    today.setHours(0, 0, 0, 0);
+    const today = this.getTodayInChina();
 
     // 检查今天是否已有记录
     const existing = await this.statisticsRepository.findOne({

+ 8 - 0
server/src/services/WeixinVideoDataCenterImportService.ts

@@ -398,6 +398,14 @@ export class WeixinVideoDataCenterImportService {
     }
   }
 
+  /**
+   * 统一入口:定时任务与添加账号均调用此方法,执行“数据中心-关注者/视频/图文-增长详情-近30天”
+   */
+  static async runDailyImport(): Promise<void> {
+    const svc = new WeixinVideoDataCenterImportService();
+    await svc.runDailyImportForAllWeixinVideoAccounts();
+  }
+
   async runDailyImportForAllWeixinVideoAccounts(): Promise<void> {
     await ensureDir(this.downloadDir);
     const accounts = await this.accountRepository.find({ where: { platform: 'weixin_video' as any } });

+ 8 - 0
server/src/services/XiaohongshuAccountOverviewImportService.ts

@@ -316,6 +316,14 @@ export class XiaohongshuAccountOverviewImportService {
   }
 
   /**
+   * 统一入口:定时任务与添加账号均调用此方法,执行“账号概览-观看/互动/涨粉-近30日 + 粉丝 overall_new”
+   */
+  static async runDailyImport(): Promise<void> {
+    const svc = new XiaohongshuAccountOverviewImportService();
+    await svc.runDailyImportForAllXhsAccounts();
+  }
+
+  /**
    * 为所有小红书账号导出“观看数据-近30日”并导入 user_day_statistics
    */
   async runDailyImportForAllXhsAccounts(): Promise<void> {