| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231 |
- #!/usr/bin/env python3
- # -*- coding: utf-8 -*-
- """
- 从 stdin 读取 JSON(数据总览账号列表),生成 xlsx 并输出到 stdout(二进制)。
- 注意:使用 openpyxl,便于之后扩展更多导出样式。
- 输入 JSON 格式:
- {
- "accounts": [
- {
- "account": "xxx",
- "platform": "douyin",
- "totalIncome": 0.0,
- "yesterdayIncome": 0.0,
- "totalViews": 0,
- "yesterdayViews": 0,
- "fansCount": 0,
- "yesterdayComments": 0,
- "yesterdayLikes": 0,
- "yesterdayFansIncrease": 0,
- "updateTime": "2026-01-27 14:30:00"
- }
- ]
- }
- """
- import json
- import sys
- from io import BytesIO
- from datetime import datetime
- # Ensure stdin is read as UTF-8 (important on Windows when Node passes UTF-8 JSON)
- if sys.platform == "win32":
- import io as _io
- if hasattr(sys.stdin, "buffer"):
- sys.stdin = _io.TextIOWrapper(sys.stdin.buffer, encoding="utf-8", errors="replace")
- try:
- from openpyxl import Workbook
- from openpyxl.styles import Font, Alignment, PatternFill, Border, Side
- from openpyxl.utils import get_column_letter
- except Exception as e: # pragma: no cover - 仅作为运行时保护
- sys.stderr.write(
- "Missing dependency: openpyxl. Please install it in your python env.\n"
- "Example: pip install -r server/python/requirements.txt\n"
- f"Detail: {e}\n"
- )
- sys.exit(3)
- HEADERS = [
- "账号",
- "平台",
- "总收益",
- "昨日收益",
- "总播放量",
- "昨日播放量",
- "粉丝数",
- "昨日评论",
- "昨日点赞",
- "昨日涨粉",
- "更新时间",
- ]
- COL_WIDTHS = [22, 12, 12, 12, 12, 12, 10, 10, 10, 10, 20]
- def _safe_int(v):
- try:
- if v is None or v == "":
- return None
- return int(float(v))
- except Exception:
- return None
- def _safe_float(v):
- try:
- if v is None or v == "":
- return None
- return float(v)
- except Exception:
- return None
- def _safe_str(v) -> str:
- """
- 将任意值转换为字符串,并过滤掉 Excel 不支持的非法 Unicode 代理字符。
- 只移除 U+D800~U+DFFF 范围的代理项,不影响正常中文等字符。
- """
- if v is None:
- return ""
- try:
- s = str(v)
- except Exception:
- s = repr(v)
- # 去掉孤立代理(surrogates),避免 "surrogates not allowed" 错误
- return "".join(ch for ch in s if not (0xD800 <= ord(ch) <= 0xDFFF))
- def _format_date_only(value: str) -> str:
- """
- 将各种时间字符串格式化为 YYYY-MM-DD,仅保留日期部分。
- 如果无法解析,则尽量取前 10 位作为日期返回。
- """
- if not value:
- return ""
- s = str(value).strip()
- # 先尝试 ISO 格式解析
- try:
- # 兼容 "...Z" 结尾
- if s.endswith("Z"):
- s_clean = s[:-1]
- else:
- s_clean = s
- # 如果中间是 ' ',替换成 'T'
- s_clean = s_clean.replace(" ", "T")
- dt = datetime.fromisoformat(s_clean)
- return dt.strftime("%Y-%m-%d")
- except Exception:
- pass
- # 退化处理:如果长度>=10,优先取前 10 位
- if len(s) >= 10:
- return s[:10]
- return s
- def build_xlsx(accounts):
- platform_name_map = {
- "douyin": "抖音",
- "baijiahao": "百家号",
- "weixin_video": "视频号",
- "xiaohongshu": "小红书",
- }
- wb = Workbook()
- ws = wb.active
- ws.title = "数据总览"
- # 表头
- ws.append(HEADERS)
- header_font = Font(bold=True)
- header_fill = PatternFill("solid", fgColor="F2F2F2")
- center = Alignment(horizontal="center", vertical="center", wrap_text=False)
- left = Alignment(horizontal="left", vertical="center", wrap_text=False)
- thin = Side(style="thin", color="D9D9D9")
- border = Border(left=thin, right=thin, top=thin, bottom=thin)
- for col_idx in range(1, len(HEADERS) + 1):
- cell = ws.cell(row=1, column=col_idx)
- cell.font = header_font
- cell.fill = header_fill
- cell.alignment = center
- cell.border = border
- # 列宽
- for i, w in enumerate(COL_WIDTHS, start=1):
- col_letter = get_column_letter(i)
- ws.column_dimensions[col_letter].width = w
- # 数据行
- for a in accounts:
- platform_raw = (a.get("platform") or "").strip()
- platform_cn = platform_name_map.get(platform_raw, platform_raw)
- ws.append([
- _safe_str(a.get("account")),
- _safe_str(platform_cn),
- _safe_float(a.get("totalIncome")),
- _safe_float(a.get("yesterdayIncome")),
- _safe_int(a.get("totalViews")),
- _safe_int(a.get("yesterdayViews")),
- _safe_int(a.get("fansCount")) or 0,
- _safe_int(a.get("yesterdayComments")) or 0,
- _safe_int(a.get("yesterdayLikes")) or 0,
- _safe_int(a.get("yesterdayFansIncrease")) or 0,
- _safe_str(_format_date_only(a.get("updateTime"))),
- ])
- money_cols = {"C", "D"}
- int_cols = {"E", "F", "G", "H", "I", "J"}
- for row in range(2, ws.max_row + 1):
- for col in range(1, len(HEADERS) + 1):
- c = ws.cell(row=row, column=col)
- c.border = border
- if col == 1:
- c.alignment = left
- else:
- c.alignment = center
- for c_letter in money_cols:
- c = ws[f"{c_letter}{row}"]
- if c.value is not None:
- c.number_format = "0.00"
- for c_letter in int_cols:
- c = ws[f"{c_letter}{row}"]
- if c.value is not None:
- c.number_format = "0"
- ws.freeze_panes = "A2"
- bio = BytesIO()
- wb.save(bio)
- return bio.getvalue()
- def main():
- try:
- raw = sys.stdin.read()
- payload = json.loads(raw) if raw.strip() else {}
- except Exception as e:
- sys.stderr.write(f"Invalid JSON input: {e}\n")
- sys.exit(2)
- accounts = payload.get("accounts") or []
- xlsx_bytes = build_xlsx(accounts)
- sys.stdout.buffer.write(xlsx_bytes)
- if __name__ == "__main__":
- main()
|