||
- /**
- * 视频号:作品维度「作品列表 + 按天聚合数据」→ 导入 work_day_statistics
- *
- * 流程:
- * 1. 获取 works 表中 platform=weixin_video 的作品(platform_video_id 存的是 exportId)
- * 2. 调用 post_list 接口获取作品列表,通过 exportId 匹配得到 objectId
- * 3. 对每个作品调用 feed_aggreagate_data_by_tab_type,取「全部」tab 的按天数据
- * 4. 将 browse→播放、like→点赞、comment→评论 写入 work_day_statistics(follow=关注、fav/forward 暂不入库)
- */
- import crypto from 'crypto';
- import { AppDataSource, PlatformAccount, Work } from '../models/index.js';
- import { logger } from '../utils/logger.js';
- import { WorkDayStatisticsService } from './WorkDayStatisticsService.js';
- import { CookieManager } from '../automation/cookie.js';
- const POST_LIST_BASE =
- 'https://channels.weixin.qq.com/micro/statistic/cgi-bin/mmfinderassistant-bin/statistic/post_list';
- const FEED_AGGREGATE_BASE =
- 'https://channels.weixin.qq.com/micro/statistic/cgi-bin/mmfinderassistant-bin/statistic/feed_aggreagate_data_by_tab_type';
- /** 列表页 _pageUrl(与浏览器「数据统计-作品」列表一致) */
- const POST_LIST_PAGE_URL = 'https://channels.weixin.qq.com/micro/statistic/post';
- /** 详情页 _pageUrl(与浏览器 postDetail 一致,feed_aggreagate 用) */
- const POST_DETAIL_PAGE_URL = 'https://channels.weixin.qq.com/micro/statistic/postDetail';
- /** 生成随机 _rid(格式如 6982df69-ff6e46a5,8hex-8hex) */
- function generateRandomRid(): string {
- const a = crypto.randomBytes(4).toString('hex');
- const b = crypto.randomBytes(4).toString('hex');
- return `${a}-${b}`;
- }
- /**
- * 构建带 _aid、_rid、_pageUrl 的 URL。
- * 若传入 sessionAid/sessionRid 则优先使用(本账号 post_list 生成的,复用于 feed_aggreagate);
- * 否则读环境变量 WX_VIDEO_AID、WX_VIDEO_RID。
- */
- function buildUrlWithAidRid(
- base: string,
- pageUrl: string,
- sessionAid?: string,
- sessionRid?: string
- ): string {
- const aid = sessionAid ?? process.env.WX_VIDEO_AID?.trim() ?? '';
- const rid = sessionRid ?? process.env.WX_VIDEO_RID?.trim() ?? '';
- const params = new URLSearchParams();
- if (aid) params.set('_aid', aid);
- if (rid) params.set('_rid', rid);
- params.set('_pageUrl', pageUrl);
- const qs = params.toString();
- return qs ? `${base}?${qs}` : base;
- }
- function tryDecryptCookieData(cookieData: string | null): string | null {
- if (!cookieData) return null;
- const raw = cookieData.trim();
- if (!raw) return null;
- try {
- return CookieManager.decrypt(raw);
- } catch {
- return raw;
- }
- }
- /** 将账号 cookie_data 转为 HTTP Cookie 头字符串 */
- function getCookieHeaderString(cookieData: string | null): string {
- const raw = tryDecryptCookieData(cookieData);
- if (!raw) return '';
- const s = raw.trim();
- if (!s) return '';
- if (s.startsWith('[') || s.startsWith('{')) {
- try {
- const parsed = JSON.parse(s);
- const arr = Array.isArray(parsed) ? parsed : parsed?.cookies ?? [];
- if (!Array.isArray(arr)) return '';
- return arr
- .map((c: { name?: string; value?: string }) => {
- const name = String(c?.name ?? '').trim();
- const value = String(c?.value ?? '').trim();
- return name ? `${name}=${value}` : '';
- })
- .filter(Boolean)
- .join('; ');
- } catch {
- return s;
- }
- }
- return s;
- }
- /** 从 Cookie 字符串中解析 x-wechat-uin(可选) */
- function getXWechatUinFromCookie(cookieHeader: string): string | undefined {
- const match = cookieHeader.match(/\bwxuin=(\d+)/i);
- return match ? match[1] : undefined;
- }
- /** 从账号 account_id 得到 _log_finder_id(去掉 weixin_video_ 前缀,保证以 @finder 结尾) */
- function getLogFinderId(accountId: string | null): string {
- if (!accountId) return '';
- const s = String(accountId).trim();
- const prefix = 'weixin_video_';
- const id = s.startsWith(prefix) ? s.slice(prefix.length) : s;
- if (!id) return '';
- return id.endsWith('@finder') ? id : `${id}@finder`;
- }
- function buildPostListUrl(sessionAid?: string, sessionRid?: string): string {
- return buildUrlWithAidRid(POST_LIST_BASE, POST_LIST_PAGE_URL, sessionAid, sessionRid);
- }
- function buildFeedAggregateUrl(sessionAid?: string, sessionRid?: string): string {
- return buildUrlWithAidRid(FEED_AGGREGATE_BASE, POST_DETAIL_PAGE_URL, sessionAid, sessionRid);
- }
- /** 近30天到昨天:返回 [startTime, endTime] Unix 秒(中国时间 00:00:00 起算) */
- function getLast30DaysRange(): { startTime: number; endTime: number; startDate: Date; endDate: Date } {
- const now = new Date();
- const yesterday = new Date(now.getFullYear(), now.getMonth(), now.getDate() - 1);
- const startDate = new Date(yesterday.getFullYear(), yesterday.getMonth(), yesterday.getDate() - 30);
- startDate.setHours(0, 0, 0, 0);
- yesterday.setHours(23, 59, 59, 999);
- const startTime = Math.floor(startDate.getTime() / 1000);
- const endTime = Math.floor(yesterday.getTime() / 1000);
- const endDateNorm = new Date(yesterday.getFullYear(), yesterday.getMonth(), yesterday.getDate());
- endDateNorm.setHours(0, 0, 0, 0);
- return { startTime, endTime, startDate, endDate: endDateNorm };
- }
- function toInt(val: unknown, defaultVal = 0): number {
- if (typeof val === 'number') return Number.isFinite(val) ? Math.round(val) : defaultVal;
- if (typeof val === 'string') {
- const n = parseInt(val, 10);
- return Number.isFinite(n) ? n : defaultVal;
- }
- return defaultVal;
- }
- interface PostListItem {
- objectId?: string;
- exportId?: string;
- }
- interface FeedAggregateDataByTabType {
- tabType?: number;
- tabTypeName?: string;
- data?: {
- browse?: string[];
- like?: string[];
- comment?: string[];
- forward?: string[];
- fav?: string[];
- follow?: string[];
- };
- }
- export class WeixinVideoWorkStatisticsImportService {
- private accountRepository = AppDataSource.getRepository(PlatformAccount);
- private workRepository = AppDataSource.getRepository(Work);
- private workDayStatisticsService = new WorkDayStatisticsService();
- static async runDailyImport(): Promise<void> {
- const svc = new WeixinVideoWorkStatisticsImportService();
- await svc.runDailyImportForAllWeixinVideoAccounts();
- }
- async runDailyImportForAllWeixinVideoAccounts(): Promise<void> {
- const accounts = await this.accountRepository.find({
- where: { platform: 'weixin_video' as any },
- });
- logger.info(`[WX WorkStats] Start import for ${accounts.length} weixin_video accounts`);
- for (const account of accounts) {
- try {
- await this.importAccountWorksStatistics(account);
- } catch (e) {
- logger.error(
- `[WX WorkStats] Account failed. accountId=${account.id} name=${account.accountName || ''}`,
- e
- );
- }
- }
- logger.info('[WX WorkStats] All accounts done');
- }
- private async importAccountWorksStatistics(account: PlatformAccount): Promise<void> {
- const cookieHeader = getCookieHeaderString(account.cookieData);
- if (!cookieHeader) {
- logger.warn(`[WX WorkStats] accountId=${account.id} cookieData 为空或无法解析,跳过`);
- return;
- }
- const works = await this.workRepository.find({
- where: { accountId: account.id, platform: 'weixin_video' as any },
- });
- if (!works.length) {
- logger.info(`[WX WorkStats] accountId=${account.id} 没有作品,跳过`);
- return;
- }
- const { startTime, endTime, startDate, endDate } = getLast30DaysRange();
- const logFinderId = getLogFinderId(account.accountId);
- const xWechatUin = getXWechatUinFromCookie(cookieHeader);
- // _aid:post_list 时生成一次,本批次请求数据接口(feed_aggreagate)时复用;_rid 每次请求随机
- const sessionAid =
- process.env.WX_VIDEO_AID?.trim() || crypto.randomUUID();
- logger.info(`[WX WorkStats] accountId=${account.id} post_list 生成 aid=${sessionAid},数据接口复用此 aid,rid 每次随机`);
- const headers: Record<string, string> = {
- accept: '*/*',
- 'accept-language': 'zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6',
- 'content-type': 'application/json',
- cookie: cookieHeader,
- origin: 'https://channels.weixin.qq.com',
- referer: 'https://channels.weixin.qq.com/micro/statistic/post',
- 'user-agent':
- 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/144.0.0.0 Safari/537.36 Edg/144.0.0.0',
- };
- if (xWechatUin) headers['x-wechat-uin'] = xWechatUin;
- const postListBody = {
- pageSize: 100,
- currentPage: 1,
- sort: 0,
- order: 0,
- startTime,
- endTime,
- timestamp: String(Date.now()),
- _log_finder_uin: '',
- _log_finder_id: logFinderId,
- rawKeyBuff: null,
- pluginSessionId: null,
- scene: 7,
- reqScene: 7,
- };
- const postListUrl = buildPostListUrl(sessionAid, generateRandomRid());
- let res: Response;
- try {
- res = await fetch(postListUrl, {
- method: 'POST',
- headers,
- body: JSON.stringify(postListBody),
- signal: AbortSignal.timeout(30_000),
- });
- } catch (e) {
- logger.error(`[WX WorkStats] post_list request failed. accountId=${account.id}`, e);
- throw e;
- }
- if (!res.ok) {
- logger.warn(`[WX WorkStats] post_list HTTP ${res.status}. accountId=${account.id}`);
- return;
- }
- const postListJson = (await res.json().catch(() => null)) as {
- errCode?: number;
- errMsg?: string;
- data?: { list?: PostListItem[]; totalCount?: number };
- } | null;
- if (!postListJson || postListJson.errCode !== 0) {
- logger.warn(
- `[WX WorkStats] post_list errCode=${postListJson?.errCode} errMsg=${postListJson?.errMsg}. accountId=${account.id}`
- );
- return;
- }
- const list = postListJson.data?.list ?? [];
- const totalCount = postListJson.data?.totalCount ?? list.length;
- const exportIdToObjectId = new Map<string, string>();
- for (const item of list) {
- const exportId = item.exportId ?? '';
- const objectId = item.objectId ?? '';
- if (exportId && objectId) exportIdToObjectId.set(exportId, objectId);
- }
- // 日志:对比 API 与 DB 的 exportId
- logger.info(
- `[WX WorkStats] accountId=${account.id} post_list 返回 totalCount=${totalCount} list.length=${list.length}`
- );
- const apiExportIds: string[] = [];
- for (let i = 0; i < list.length; i++) {
- const item = list[i];
- const eid = (item.exportId ?? '').trim();
- const oid = (item.objectId ?? '').trim();
- apiExportIds.push(eid);
- logger.info(`[WX WorkStats] post_list[${i}] exportId=${eid} objectId=${oid}`);
- }
- for (const work of works) {
- const dbExportId = (work.platformVideoId ?? '').trim();
- if (!dbExportId) continue;
- const matched = exportIdToObjectId.has(dbExportId);
- logger.info(
- `[WX WorkStats] DB workId=${work.id} platform_video_id(exportId)=${dbExportId} 匹配post_list=${matched}`
- );
- if (!matched && apiExportIds.length > 0) {
- const sameLength = apiExportIds.filter((e) => e.length === dbExportId.length).length;
- const containsDb = apiExportIds.some((e) => e === dbExportId || e.includes(dbExportId) || dbExportId.includes(e));
- logger.info(
- `[WX WorkStats] 对比: DB长度=${dbExportId.length} API条数=${apiExportIds.length} 同长API条数=${sameLength} 是否包含关系=${containsDb}`
- );
- }
- }
- let totalInserted = 0;
- let totalUpdated = 0;
- const feedHeaders: Record<string, string> = {
- ...headers,
- referer: 'https://channels.weixin.qq.com/micro/statistic/postDetail?isImageMode=0',
- 'finger-print-device-id':
- process.env.WX_VIDEO_FINGERPRINT_DEVICE_ID?.trim() ||
- '4605bc28ad3962eb9ee791897b199217',
- };
- for (const work of works) {
- const exportId = (work.platformVideoId ?? '').trim();
- if (!exportId) continue;
- const objectId = exportIdToObjectId.get(exportId);
- if (!objectId) {
- logger.debug(`[WX WorkStats] workId=${work.id} exportId=${exportId} 未在 post_list 中匹配到 objectId,跳过`);
- continue;
- }
- const feedBody = {
- startTs: String(startTime),
- endTs: String(endTime),
- interval: 3,
- feedId: objectId,
- timestamp: String(Date.now()),
- _log_finder_uin: '',
- _log_finder_id: logFinderId,
- rawKeyBuff: null,
- pluginSessionId: null,
- scene: 7,
- reqScene: 7,
- };
- const feedUrl = buildFeedAggregateUrl(sessionAid, generateRandomRid());
- let feedRes: Response;
- try {
- feedRes = await fetch(feedUrl, {
- method: 'POST',
- headers: feedHeaders,
- body: JSON.stringify(feedBody),
- signal: AbortSignal.timeout(30_000),
- });
- } catch (e) {
- logger.error(`[WX WorkStats] feed_aggreagate request failed. workId=${work.id} feedId=${objectId}`, e);
- continue;
- }
- if (!feedRes.ok) {
- logger.warn(`[WX WorkStats] feed_aggreagate HTTP ${feedRes.status}. workId=${work.id}`);
- continue;
- }
- const feedJson = (await feedRes.json().catch(() => null)) as {
- errCode?: number;
- data?: {
- dataByFanstype?: { dataByTabtype?: FeedAggregateDataByTabType[] }[];
- feedData?: { dataByTabtype?: FeedAggregateDataByTabType[] }[];
- };
- } | null;
- const isTestWork = work.id === 866 || work.id === 867 || work.id === 902 || work.id === 903;
- if (isTestWork) {
- logger.info(`[WX WorkStats] feed_aggreagate 原始响应 workId=${work.id} errCode=${feedJson?.errCode} errMsg=${(feedJson as any)?.errMsg} 完整body=${JSON.stringify(feedJson ?? null)}`);
- }
- if (!feedJson || feedJson.errCode !== 0) {
- if (isTestWork) {
- logger.warn(`[WX WorkStats] workId=${work.id} feed_aggreagate 非成功 errCode=${feedJson?.errCode} 跳过`);
- }
- continue;
- }
- const dataByFanstype = feedJson.data?.dataByFanstype ?? [];
- const firstFans = dataByFanstype[0];
- const dataByTabtype = firstFans?.dataByTabtype ?? feedJson.data?.feedData?.[0]?.dataByTabtype ?? [];
- const tabAll = dataByTabtype.find((t) => t.tabTypeName === '全部' || t.tabType === 999);
- if (isTestWork) {
- logger.info(
- `[WX WorkStats] workId=${work.id} dataByTabtype.length=${dataByTabtype.length} tabAll=${!!tabAll} tabAll.tabTypeName=${tabAll?.tabTypeName}`
- );
- }
- if (!tabAll?.data) continue;
- const data = tabAll.data;
- const browse = data.browse ?? [];
- const like = data.like ?? [];
- const comment = data.comment ?? [];
- if (isTestWork) {
- logger.info(
- `[WX WorkStats] workId=${work.id} 「全部」data: browse.length=${browse.length} like.length=${like.length} comment.length=${comment.length}`
- );
- logger.info(`[WX WorkStats] workId=${work.id} browse=${JSON.stringify(browse)}`);
- logger.info(`[WX WorkStats] workId=${work.id} like=${JSON.stringify(like)}`);
- logger.info(`[WX WorkStats] workId=${work.id} comment=${JSON.stringify(comment)}`);
- }
- const len = Math.max(browse.length, like.length, comment.length);
- if (len === 0) continue;
- const patches: Array<{
- workId: number;
- recordDate: Date;
- playCount?: number;
- likeCount?: number;
- commentCount?: number;
- }> = [];
- for (let i = 0; i < len; i++) {
- const recordDate = new Date(startDate);
- recordDate.setDate(recordDate.getDate() + i);
- recordDate.setHours(0, 0, 0, 0);
- if (recordDate > endDate) break;
- patches.push({
- workId: work.id,
- recordDate,
- playCount: toInt(browse[i], 0),
- likeCount: toInt(like[i], 0),
- commentCount: toInt(comment[i], 0),
- });
- }
- if (isTestWork) {
- logger.info(`[WX WorkStats] workId=${work.id} 生成 patches.length=${patches.length} 前3条=${JSON.stringify(patches.slice(0, 3))}`);
- }
- if (patches.length) {
- const result = await this.workDayStatisticsService.saveStatisticsForDateBatch(patches);
- if (isTestWork) {
- logger.info(`[WX WorkStats] workId=${work.id} saveStatisticsForDateBatch inserted=${result.inserted} updated=${result.updated}`);
- }
- totalInserted += result.inserted;
- totalUpdated += result.updated;
- }
- }
- logger.info(
- `[WX WorkStats] accountId=${account.id} completed. inserted=${totalInserted} updated=${totalUpdated}`
- );
- }
- }
|