export_platform_statistics_xlsx.py 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216
  1. #!/usr/bin/env python3
  2. # -*- coding: utf-8 -*-
  3. """
  4. 从 stdin 读取 JSON(平台统计列表),生成 xlsx 并输出到 stdout(二进制)。
  5. 输入 JSON 格式:
  6. {
  7. "platforms": [
  8. {
  9. "platform": "baijiahao",
  10. "viewsCount": 5,
  11. "commentsCount": 1,
  12. "likesCount": 0,
  13. "fansIncrease": 1,
  14. "updateTime": "2026-01-27 14:30:00"
  15. }
  16. ]
  17. }
  18. """
  19. import json
  20. import sys
  21. from io import BytesIO
  22. from datetime import datetime
  23. # Ensure stdin is read as UTF-8 (important on Windows when Node passes UTF-8 JSON)
  24. if sys.platform == "win32":
  25. import io as _io
  26. if hasattr(sys.stdin, "buffer"):
  27. sys.stdin = _io.TextIOWrapper(sys.stdin.buffer, encoding="utf-8", errors="replace")
  28. try:
  29. from openpyxl import Workbook
  30. from openpyxl.styles import Font, Alignment, PatternFill, Border, Side
  31. from openpyxl.utils import get_column_letter
  32. except Exception as e: # pragma: no cover
  33. sys.stderr.write(
  34. "Missing dependency: openpyxl. Please install it in your python env.\n"
  35. "Example: pip install -r server/python/requirements.txt\n"
  36. f"Detail: {e}\n"
  37. )
  38. sys.exit(3)
  39. HEADERS = [
  40. "平台",
  41. "阅读(播放量)",
  42. "评论量",
  43. "点赞量",
  44. "涨粉量",
  45. "更新时间",
  46. ]
  47. COL_WIDTHS = [12, 16, 12, 12, 10, 18]
  48. def _safe_int(v):
  49. try:
  50. if v is None or v == "":
  51. return None
  52. return int(float(v))
  53. except Exception:
  54. return None
  55. def _safe_str(v) -> str:
  56. """
  57. 将任意值转换为字符串,并过滤掉 Excel 不支持的非法 Unicode 代理字符。
  58. """
  59. if v is None:
  60. return ""
  61. try:
  62. s = str(v)
  63. except Exception:
  64. s = repr(v)
  65. return "".join(ch for ch in s if not (0xD800 <= ord(ch) <= 0xDFFF))
  66. def _format_update_time(value: str) -> str:
  67. """
  68. 格式化更新时间为 "MM-DD HH:mm" 格式。
  69. 如果已经是该格式,直接返回;否则尝试解析并转换。
  70. """
  71. if not value:
  72. return ""
  73. s = str(value).strip()
  74. # 如果已经是 "MM-DD HH:mm" 格式,直接返回
  75. if len(s) == 14 and s[2] == '-' and s[5] == ' ' and s[8] == ':':
  76. return s
  77. # 尝试解析 ISO 格式或其他常见格式
  78. try:
  79. # 处理 ISO 格式(如 "2026-01-28T12:22:00Z")
  80. if s.endswith("Z"):
  81. s_clean = s[:-1]
  82. else:
  83. s_clean = s
  84. s_clean = s_clean.replace(" ", "T")
  85. dt = datetime.fromisoformat(s_clean)
  86. return dt.strftime("%m-%d %H:%M")
  87. except Exception:
  88. pass
  89. # 如果无法解析,尝试提取日期和时间部分
  90. # 格式可能是 "YYYY-MM-DD HH:mm:ss" 或类似
  91. try:
  92. # 尝试匹配 "YYYY-MM-DD HH:mm:ss" 格式
  93. if len(s) >= 16:
  94. parts = s.split(' ')
  95. if len(parts) >= 2:
  96. date_part = parts[0] # "YYYY-MM-DD"
  97. time_part = parts[1] # "HH:mm:ss" 或 "HH:mm"
  98. date_parts = date_part.split('-')
  99. time_parts = time_part.split(':')
  100. if len(date_parts) >= 3 and len(time_parts) >= 2:
  101. month = date_parts[1]
  102. day = date_parts[2]
  103. hour = time_parts[0].zfill(2)
  104. minute = time_parts[1].zfill(2)
  105. return f"{month}-{day} {hour}:{minute}"
  106. except Exception:
  107. pass
  108. # 如果都失败了,返回原字符串
  109. return s
  110. def build_xlsx(platforms):
  111. platform_name_map = {
  112. "douyin": "抖音",
  113. "baijiahao": "百家号",
  114. "weixin_video": "视频号",
  115. "xiaohongshu": "小红书",
  116. "bilibili": "哔哩哔哩",
  117. "kuaishou": "快手",
  118. }
  119. wb = Workbook()
  120. ws = wb.active
  121. ws.title = "平台数据"
  122. ws.append(HEADERS)
  123. header_font = Font(bold=True)
  124. header_fill = PatternFill("solid", fgColor="F2F2F2")
  125. center = Alignment(horizontal="center", vertical="center", wrap_text=False)
  126. left = Alignment(horizontal="left", vertical="center", wrap_text=False)
  127. thin = Side(style="thin", color="D9D9D9")
  128. border = Border(left=thin, right=thin, top=thin, bottom=thin)
  129. for col_idx in range(1, len(HEADERS) + 1):
  130. cell = ws.cell(row=1, column=col_idx)
  131. cell.font = header_font
  132. cell.fill = header_fill
  133. cell.alignment = center
  134. cell.border = border
  135. for i, w in enumerate(COL_WIDTHS, start=1):
  136. col_letter = get_column_letter(i)
  137. ws.column_dimensions[col_letter].width = w
  138. for p in platforms:
  139. platform_raw = (p.get("platform") or "").strip()
  140. platform_cn = platform_name_map.get(platform_raw, platform_raw)
  141. ws.append([
  142. _safe_str(platform_cn),
  143. _safe_int(p.get("viewsCount")) or 0,
  144. _safe_int(p.get("commentsCount")) or 0,
  145. _safe_int(p.get("likesCount")) or 0,
  146. _safe_int(p.get("fansIncrease")) or 0,
  147. _safe_str(_format_update_time(p.get("updateTime"))),
  148. ])
  149. int_cols = {"B", "C", "D", "E"}
  150. for row in range(2, ws.max_row + 1):
  151. for col in range(1, len(HEADERS) + 1):
  152. c = ws.cell(row=row, column=col)
  153. c.border = border
  154. if col == 1:
  155. c.alignment = left
  156. else:
  157. c.alignment = center
  158. for c_letter in int_cols:
  159. c = ws[f"{c_letter}{row}"]
  160. if c.value is not None:
  161. c.number_format = "0"
  162. ws.freeze_panes = "A2"
  163. bio = BytesIO()
  164. wb.save(bio)
  165. return bio.getvalue()
  166. def main():
  167. try:
  168. raw = sys.stdin.read()
  169. payload = json.loads(raw) if raw.strip() else {}
  170. except Exception as e:
  171. sys.stderr.write(f"Invalid JSON input: {e}\n")
  172. sys.exit(2)
  173. platforms = payload.get("platforms") or []
  174. xlsx_bytes = build_xlsx(platforms)
  175. sys.stdout.buffer.write(xlsx_bytes)
  176. if __name__ == "__main__":
  177. main()