#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 从 stdin 读取 JSON(数据总览账号列表),生成 xlsx 并输出到 stdout(二进制)。 注意:使用 openpyxl,便于之后扩展更多导出样式。 输入 JSON 格式: { "accounts": [ { "account": "xxx", "platform": "douyin", "totalViews": 0, "yesterdayViews": 0, "fansCount": 0, "yesterdayComments": 0, "yesterdayLikes": 0, "yesterdayFansIncrease": 0, "updateTime": "2026-01-27 14:30:00" } ] } """ import json import sys from io import BytesIO from datetime import datetime # Ensure stdin is read as UTF-8 (important on Windows when Node passes UTF-8 JSON) if sys.platform == "win32": import io as _io if hasattr(sys.stdin, "buffer"): sys.stdin = _io.TextIOWrapper(sys.stdin.buffer, encoding="utf-8", errors="replace") try: from openpyxl import Workbook from openpyxl.styles import Font, Alignment, PatternFill, Border, Side from openpyxl.utils import get_column_letter except Exception as e: # pragma: no cover - 仅作为运行时保护 sys.stderr.write( "Missing dependency: openpyxl. Please install it in your python env.\n" "Example: pip install -r server/python/requirements.txt\n" f"Detail: {e}\n" ) sys.exit(3) HEADERS = [ "账号", "平台", "总播放量", "昨日播放量", "粉丝数", "昨日评论", "昨日点赞", "昨日涨粉", "更新时间", ] COL_WIDTHS = [22, 12, 12, 12, 10, 10, 10, 10, 20] def _safe_int(v): try: if v is None or v == "": return None return int(float(v)) except Exception: return None def _safe_float(v): try: if v is None or v == "": return None return float(v) except Exception: return None def _safe_str(v) -> str: """ 将任意值转换为字符串,并过滤掉 Excel 不支持的非法 Unicode 代理字符。 只移除 U+D800~U+DFFF 范围的代理项,不影响正常中文等字符。 """ if v is None: return "" try: s = str(v) except Exception: s = repr(v) # 去掉孤立代理(surrogates),避免 "surrogates not allowed" 错误 return "".join(ch for ch in s if not (0xD800 <= ord(ch) <= 0xDFFF)) def _format_datetime_pretty(value: str) -> str: """\ 将时间字符串格式化为统一的人类可读格式: - 如果是今年:MM-DD HH:mm - 如果是往年:YYYY-MM-DD HH:mm 解析失败时,尽量返回原始字符串。\ """ if not value: return "" s = str(value).strip() # 尝试按 ISO 格式解析("2026-01-28T12:22:00Z" / "2026-01-28 12:22:00" 等) try: if s.endswith("Z"): s_clean = s[:-1] else: s_clean = s s_clean = s_clean.replace(" ", "T") dt = datetime.fromisoformat(s_clean) now_year = datetime.now().year if dt.year == now_year: return dt.strftime("%m-%d %H:%M") return dt.strftime("%Y-%m-%d %H:%M") except Exception: pass # 退化处理:尝试从类似 "YYYY-MM-DD HH:mm:ss" 的字符串中拆分 try: if len(s) >= 16: parts = s.split(" ") if len(parts) >= 2: date_part = parts[0] time_part = parts[1] date_parts = date_part.split("-") time_parts = time_part.split(":") if len(date_parts) >= 3 and len(time_parts) >= 2: year = int(date_parts[0]) month = date_parts[1].zfill(2) day = date_parts[2].zfill(2) hour = time_parts[0].zfill(2) minute = time_parts[1].zfill(2) now_year = datetime.now().year if year == now_year: return f"{month}-{day} {hour}:{minute}" return f"{year}-{month}-{day} {hour}:{minute}" except Exception: pass return s def build_xlsx(accounts): platform_name_map = { "douyin": "抖音", "baijiahao": "百家号", "weixin_video": "视频号", "xiaohongshu": "小红书", } wb = Workbook() ws = wb.active ws.title = "数据总览" # 表头 ws.append(HEADERS) header_font = Font(bold=True) header_fill = PatternFill("solid", fgColor="F2F2F2") center = Alignment(horizontal="center", vertical="center", wrap_text=False) left = Alignment(horizontal="left", vertical="center", wrap_text=False) thin = Side(style="thin", color="D9D9D9") border = Border(left=thin, right=thin, top=thin, bottom=thin) for col_idx in range(1, len(HEADERS) + 1): cell = ws.cell(row=1, column=col_idx) cell.font = header_font cell.fill = header_fill cell.alignment = center cell.border = border # 列宽 for i, w in enumerate(COL_WIDTHS, start=1): col_letter = get_column_letter(i) ws.column_dimensions[col_letter].width = w # 数据行 for a in accounts: platform_raw = (a.get("platform") or "").strip() platform_cn = platform_name_map.get(platform_raw, platform_raw) ws.append([ _safe_str(a.get("account")), _safe_str(platform_cn), _safe_int(a.get("totalViews")), _safe_int(a.get("yesterdayViews")), _safe_int(a.get("fansCount")) or 0, _safe_int(a.get("yesterdayComments")) or 0, _safe_int(a.get("yesterdayLikes")) or 0, _safe_int(a.get("yesterdayFansIncrease")) or 0, _safe_str(_format_datetime_pretty(a.get("updateTime"))), ]) int_cols = {"C", "D", "E", "F", "G", "H"} for row in range(2, ws.max_row + 1): for col in range(1, len(HEADERS) + 1): c = ws.cell(row=row, column=col) c.border = border if col == 1: c.alignment = left else: c.alignment = center for c_letter in int_cols: c = ws[f"{c_letter}{row}"] if c.value is not None: c.number_format = "0" ws.freeze_panes = "A2" bio = BytesIO() wb.save(bio) return bio.getvalue() def main(): try: raw = sys.stdin.read() payload = json.loads(raw) if raw.strip() else {} except Exception as e: sys.stderr.write(f"Invalid JSON input: {e}\n") sys.exit(2) accounts = payload.get("accounts") or [] xlsx_bytes = build_xlsx(accounts) sys.stdout.buffer.write(xlsx_bytes) if __name__ == "__main__": main()