#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 从 stdin 读取 JSON(数据总览账号列表),生成 xlsx 并输出到 stdout(二进制)。 注意:使用 openpyxl,便于之后扩展更多导出样式。 输入 JSON 格式: { "accounts": [ { "account": "xxx", "platform": "douyin", "totalIncome": 0.0, "yesterdayIncome": 0.0, "totalViews": 0, "yesterdayViews": 0, "fansCount": 0, "yesterdayComments": 0, "yesterdayLikes": 0, "yesterdayFansIncrease": 0, "updateTime": "2026-01-27 14:30:00" } ] } """ import json import sys from io import BytesIO from datetime import datetime # Ensure stdin is read as UTF-8 (important on Windows when Node passes UTF-8 JSON) if sys.platform == "win32": import io as _io if hasattr(sys.stdin, "buffer"): sys.stdin = _io.TextIOWrapper(sys.stdin.buffer, encoding="utf-8", errors="replace") try: from openpyxl import Workbook from openpyxl.styles import Font, Alignment, PatternFill, Border, Side from openpyxl.utils import get_column_letter except Exception as e: # pragma: no cover - 仅作为运行时保护 sys.stderr.write( "Missing dependency: openpyxl. Please install it in your python env.\n" "Example: pip install -r server/python/requirements.txt\n" f"Detail: {e}\n" ) sys.exit(3) HEADERS = [ "账号", "平台", "总收益", "昨日收益", "总播放量", "昨日播放量", "粉丝数", "昨日评论", "昨日点赞", "昨日涨粉", "更新时间", ] COL_WIDTHS = [22, 12, 12, 12, 12, 12, 10, 10, 10, 10, 20] def _safe_int(v): try: if v is None or v == "": return None return int(float(v)) except Exception: return None def _safe_float(v): try: if v is None or v == "": return None return float(v) except Exception: return None def _safe_str(v) -> str: """ 将任意值转换为字符串,并过滤掉 Excel 不支持的非法 Unicode 代理字符。 只移除 U+D800~U+DFFF 范围的代理项,不影响正常中文等字符。 """ if v is None: return "" try: s = str(v) except Exception: s = repr(v) # 去掉孤立代理(surrogates),避免 "surrogates not allowed" 错误 return "".join(ch for ch in s if not (0xD800 <= ord(ch) <= 0xDFFF)) def _format_date_only(value: str) -> str: """ 将各种时间字符串格式化为 YYYY-MM-DD,仅保留日期部分。 如果无法解析,则尽量取前 10 位作为日期返回。 """ if not value: return "" s = str(value).strip() # 先尝试 ISO 格式解析 try: # 兼容 "...Z" 结尾 if s.endswith("Z"): s_clean = s[:-1] else: s_clean = s # 如果中间是 ' ',替换成 'T' s_clean = s_clean.replace(" ", "T") dt = datetime.fromisoformat(s_clean) return dt.strftime("%Y-%m-%d") except Exception: pass # 退化处理:如果长度>=10,优先取前 10 位 if len(s) >= 10: return s[:10] return s def build_xlsx(accounts): platform_name_map = { "douyin": "抖音", "baijiahao": "百家号", "weixin_video": "视频号", "xiaohongshu": "小红书", } wb = Workbook() ws = wb.active ws.title = "数据总览" # 表头 ws.append(HEADERS) header_font = Font(bold=True) header_fill = PatternFill("solid", fgColor="F2F2F2") center = Alignment(horizontal="center", vertical="center", wrap_text=False) left = Alignment(horizontal="left", vertical="center", wrap_text=False) thin = Side(style="thin", color="D9D9D9") border = Border(left=thin, right=thin, top=thin, bottom=thin) for col_idx in range(1, len(HEADERS) + 1): cell = ws.cell(row=1, column=col_idx) cell.font = header_font cell.fill = header_fill cell.alignment = center cell.border = border # 列宽 for i, w in enumerate(COL_WIDTHS, start=1): col_letter = get_column_letter(i) ws.column_dimensions[col_letter].width = w # 数据行 for a in accounts: platform_raw = (a.get("platform") or "").strip() platform_cn = platform_name_map.get(platform_raw, platform_raw) ws.append([ _safe_str(a.get("account")), _safe_str(platform_cn), _safe_float(a.get("totalIncome")), _safe_float(a.get("yesterdayIncome")), _safe_int(a.get("totalViews")), _safe_int(a.get("yesterdayViews")), _safe_int(a.get("fansCount")) or 0, _safe_int(a.get("yesterdayComments")) or 0, _safe_int(a.get("yesterdayLikes")) or 0, _safe_int(a.get("yesterdayFansIncrease")) or 0, _safe_str(_format_date_only(a.get("updateTime"))), ]) money_cols = {"C", "D"} int_cols = {"E", "F", "G", "H", "I", "J"} for row in range(2, ws.max_row + 1): for col in range(1, len(HEADERS) + 1): c = ws.cell(row=row, column=col) c.border = border if col == 1: c.alignment = left else: c.alignment = center for c_letter in money_cols: c = ws[f"{c_letter}{row}"] if c.value is not None: c.number_format = "0.00" for c_letter in int_cols: c = ws[f"{c_letter}{row}"] if c.value is not None: c.number_format = "0" ws.freeze_panes = "A2" bio = BytesIO() wb.save(bio) return bio.getvalue() def main(): try: raw = sys.stdin.read() payload = json.loads(raw) if raw.strip() else {} except Exception as e: sys.stderr.write(f"Invalid JSON input: {e}\n") sys.exit(2) accounts = payload.get("accounts") or [] xlsx_bytes = build_xlsx(accounts) sys.stdout.buffer.write(xlsx_bytes) if __name__ == "__main__": main()