#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 从 stdin 读取 JSON(作品数据列表),生成 xlsx 并输出到 stdout(二进制)。 参考融媒宝作品数据格式,列:账号、平台、标题、发布时间、播放量、评论量、分享量、收藏量、点赞量。 输入 JSON 格式: { "works": [ { "accountName": "xxx", "platform": "douyin", "title": "作品标题", "publishTime": "2026-01-28T12:00:00Z", "viewsCount": 1000, "commentsCount": 10, "sharesCount": 5, "collectsCount": 20, "likesCount": 100 } ] } """ import json import sys from io import BytesIO from datetime import datetime # Ensure stdin is read as UTF-8 (important on Windows when Node passes UTF-8 JSON) if sys.platform == "win32": import io as _io if hasattr(sys.stdin, "buffer"): sys.stdin = _io.TextIOWrapper(sys.stdin.buffer, encoding="utf-8", errors="replace") try: from openpyxl import Workbook from openpyxl.styles import Font, Alignment, PatternFill, Border, Side from openpyxl.utils import get_column_letter except Exception as e: sys.stderr.write( "Missing dependency: openpyxl. Please install it in your python env.\n" "Example: pip install -r server/python/requirements.txt\n" f"Detail: {e}\n" ) sys.exit(3) HEADERS = [ "账号", "平台", "标题", "发布时间", "播放量", "评论量", "分享量", "收藏量", "点赞量", ] COL_WIDTHS = [18, 12, 50, 18, 12, 12, 12, 12, 12] PLATFORM_NAME_MAP = { "douyin": "抖音", "baijiahao": "百家号", "weixin_video": "视频号", "xiaohongshu": "小红书", "kuaishou": "快手", } def _safe_int(v): try: if v is None or v == "": return 0 return int(float(v)) except Exception: return 0 def _safe_str(v) -> str: """将任意值转换为字符串,过滤 Excel 不支持的代理字符。""" if v is None: return "" try: s = str(v) except Exception: s = repr(v) return "".join(ch for ch in s if not (0xD800 <= ord(ch) <= 0xDFFF)) def _format_datetime_pretty(value: str) -> str: """ 将时间字符串格式化为人类可读格式: - 今年:MM-DD HH:mm - 往年:YYYY-MM-DD HH:mm """ if not value: return "" s = str(value).strip() try: if s.endswith("Z"): s_clean = s[:-1] else: s_clean = s s_clean = s_clean.replace(" ", "T") dt = datetime.fromisoformat(s_clean) now_year = datetime.now().year if dt.year == now_year: return dt.strftime("%m-%d %H:%M") return dt.strftime("%Y-%m-%d %H:%M") except Exception: pass try: if len(s) >= 16: parts = s.split(" ") if len(parts) >= 2: date_part = parts[0] time_part = parts[1] date_parts = date_part.split("-") time_parts = time_part.split(":") if len(date_parts) >= 3 and len(time_parts) >= 2: year = int(date_parts[0]) month = date_parts[1].zfill(2) day = date_parts[2].zfill(2) hour = time_parts[0].zfill(2) minute = time_parts[1].zfill(2) now_year = datetime.now().year if year == now_year: return f"{month}-{day} {hour}:{minute}" return f"{year}-{month}-{day} {hour}:{minute}" except Exception: pass return s def build_xlsx(works): wb = Workbook() ws = wb.active ws.title = "作品数据" ws.append(HEADERS) header_font = Font(bold=True) header_fill = PatternFill("solid", fgColor="F2F2F2") center = Alignment(horizontal="center", vertical="center", wrap_text=False) left = Alignment(horizontal="left", vertical="center", wrap_text=True) thin = Side(style="thin", color="D9D9D9") border = Border(left=thin, right=thin, top=thin, bottom=thin) for col_idx in range(1, len(HEADERS) + 1): cell = ws.cell(row=1, column=col_idx) cell.font = header_font cell.fill = header_fill cell.alignment = center cell.border = border for i, w in enumerate(COL_WIDTHS, start=1): col_letter = get_column_letter(i) ws.column_dimensions[col_letter].width = w for w in works: platform_raw = (w.get("platform") or "").strip() platform_cn = PLATFORM_NAME_MAP.get(platform_raw, platform_raw) ws.append([ _safe_str(w.get("accountName")), _safe_str(platform_cn), _safe_str(w.get("title")), _safe_str(_format_datetime_pretty(w.get("publishTime"))), _safe_int(w.get("viewsCount")), _safe_int(w.get("commentsCount")), _safe_int(w.get("sharesCount")), _safe_int(w.get("collectsCount")), _safe_int(w.get("likesCount")), ]) int_cols = {"E", "F", "G", "H", "I"} for row in range(2, ws.max_row + 1): for col in range(1, len(HEADERS) + 1): c = ws.cell(row=row, column=col) c.border = border if col in (1, 3): c.alignment = left else: c.alignment = center for c_letter in int_cols: c = ws[f"{c_letter}{row}"] if c.value is not None: c.number_format = "0" ws.freeze_panes = "A2" bio = BytesIO() wb.save(bio) return bio.getvalue() def main(): try: raw = sys.stdin.read() payload = json.loads(raw) if raw.strip() else {} except Exception as e: sys.stderr.write(f"Invalid JSON input: {e}\n") sys.exit(2) works = payload.get("works") or [] xlsx_bytes = build_xlsx(works) sys.stdout.buffer.write(xlsx_bytes) if __name__ == "__main__": main()