#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 从 stdin 读取 JSON(平台统计列表),生成 xlsx 并输出到 stdout(二进制)。 输入 JSON 格式: { "platforms": [ { "platform": "baijiahao", "viewsCount": 5, "commentsCount": 1, "likesCount": 0, "fansIncrease": 1, "updateTime": "2026-01-27 14:30:00" } ] } """ import json import sys from io import BytesIO from datetime import datetime # Ensure stdin is read as UTF-8 (important on Windows when Node passes UTF-8 JSON) if sys.platform == "win32": import io as _io if hasattr(sys.stdin, "buffer"): sys.stdin = _io.TextIOWrapper(sys.stdin.buffer, encoding="utf-8", errors="replace") try: from openpyxl import Workbook from openpyxl.styles import Font, Alignment, PatternFill, Border, Side from openpyxl.utils import get_column_letter except Exception as e: # pragma: no cover sys.stderr.write( "Missing dependency: openpyxl. Please install it in your python env.\n" "Example: pip install -r server/python/requirements.txt\n" f"Detail: {e}\n" ) sys.exit(3) HEADERS = [ "平台", "阅读(播放量)", "评论量", "点赞量", "涨粉量", "更新时间", ] COL_WIDTHS = [12, 16, 12, 12, 10, 18] def _safe_int(v): try: if v is None or v == "": return None return int(float(v)) except Exception: return None def _safe_str(v) -> str: """ 将任意值转换为字符串,并过滤掉 Excel 不支持的非法 Unicode 代理字符。 """ if v is None: return "" try: s = str(v) except Exception: s = repr(v) return "".join(ch for ch in s if not (0xD800 <= ord(ch) <= 0xDFFF)) def _format_update_time(value: str) -> str: """ 格式化更新时间为 "MM-DD HH:mm" 格式。 如果已经是该格式,直接返回;否则尝试解析并转换。 """ if not value: return "" s = str(value).strip() # 如果已经是 "MM-DD HH:mm" 格式,直接返回 if len(s) == 14 and s[2] == '-' and s[5] == ' ' and s[8] == ':': return s # 尝试解析 ISO 格式或其他常见格式 try: # 处理 ISO 格式(如 "2026-01-28T12:22:00Z") if s.endswith("Z"): s_clean = s[:-1] else: s_clean = s s_clean = s_clean.replace(" ", "T") dt = datetime.fromisoformat(s_clean) return dt.strftime("%m-%d %H:%M") except Exception: pass # 如果无法解析,尝试提取日期和时间部分 # 格式可能是 "YYYY-MM-DD HH:mm:ss" 或类似 try: # 尝试匹配 "YYYY-MM-DD HH:mm:ss" 格式 if len(s) >= 16: parts = s.split(' ') if len(parts) >= 2: date_part = parts[0] # "YYYY-MM-DD" time_part = parts[1] # "HH:mm:ss" 或 "HH:mm" date_parts = date_part.split('-') time_parts = time_part.split(':') if len(date_parts) >= 3 and len(time_parts) >= 2: month = date_parts[1] day = date_parts[2] hour = time_parts[0].zfill(2) minute = time_parts[1].zfill(2) return f"{month}-{day} {hour}:{minute}" except Exception: pass # 如果都失败了,返回原字符串 return s def build_xlsx(platforms): platform_name_map = { "douyin": "抖音", "baijiahao": "百家号", "weixin_video": "视频号", "xiaohongshu": "小红书", "bilibili": "哔哩哔哩", "kuaishou": "快手", } wb = Workbook() ws = wb.active ws.title = "平台数据" ws.append(HEADERS) header_font = Font(bold=True) header_fill = PatternFill("solid", fgColor="F2F2F2") center = Alignment(horizontal="center", vertical="center", wrap_text=False) left = Alignment(horizontal="left", vertical="center", wrap_text=False) thin = Side(style="thin", color="D9D9D9") border = Border(left=thin, right=thin, top=thin, bottom=thin) for col_idx in range(1, len(HEADERS) + 1): cell = ws.cell(row=1, column=col_idx) cell.font = header_font cell.fill = header_fill cell.alignment = center cell.border = border for i, w in enumerate(COL_WIDTHS, start=1): col_letter = get_column_letter(i) ws.column_dimensions[col_letter].width = w for p in platforms: platform_raw = (p.get("platform") or "").strip() platform_cn = platform_name_map.get(platform_raw, platform_raw) ws.append([ _safe_str(platform_cn), _safe_int(p.get("viewsCount")) or 0, _safe_int(p.get("commentsCount")) or 0, _safe_int(p.get("likesCount")) or 0, _safe_int(p.get("fansIncrease")) or 0, _safe_str(_format_update_time(p.get("updateTime"))), ]) int_cols = {"B", "C", "D", "E"} for row in range(2, ws.max_row + 1): for col in range(1, len(HEADERS) + 1): c = ws.cell(row=row, column=col) c.border = border if col == 1: c.alignment = left else: c.alignment = center for c_letter in int_cols: c = ws[f"{c_letter}{row}"] if c.value is not None: c.number_format = "0" ws.freeze_panes = "A2" bio = BytesIO() wb.save(bio) return bio.getvalue() def main(): try: raw = sys.stdin.read() payload = json.loads(raw) if raw.strip() else {} except Exception as e: sys.stderr.write(f"Invalid JSON input: {e}\n") sys.exit(2) platforms = payload.get("platforms") or [] xlsx_bytes = build_xlsx(platforms) sys.stdout.buffer.write(xlsx_bytes) if __name__ == "__main__": main()