export_work_analytics_xlsx.py 5.4 KB


  1. #!/usr/bin/env python3
  2. # -*- coding: utf-8 -*-
  3. """
  4. 从 stdin 读取 JSON(作品数据列表),生成 xlsx 并输出到 stdout(二进制)。
  5. 参考融媒宝作品数据格式,列:账号、平台、标题、发布时间、播放量、评论量、分享量、收藏量、点赞量。
  6. 输入 JSON 格式:
  7. {
  8. "works": [
  9. {
  10. "accountName": "xxx",
  11. "platform": "douyin",
  12. "title": "作品标题",
  13. "publishTime": "2026-01-28T12:00:00Z",
  14. "viewsCount": 1000,
  15. "commentsCount": 10,
  16. "sharesCount": 5,
  17. "collectsCount": 20,
  18. "likesCount": 100
  19. }
  20. ]
  21. }
  22. """
  23. import json
  24. import sys
  25. from io import BytesIO
  26. from datetime import datetime
  27. # Ensure stdin is read as UTF-8 (important on Windows when Node passes UTF-8 JSON)
  28. if sys.platform == "win32":
  29. import io as _io
  30. if hasattr(sys.stdin, "buffer"):
  31. sys.stdin = _io.TextIOWrapper(sys.stdin.buffer, encoding="utf-8", errors="replace")
  32. try:
  33. from openpyxl import Workbook
  34. from openpyxl.styles import Font, Alignment, PatternFill, Border, Side
  35. from openpyxl.utils import get_column_letter
  36. except Exception as e:
  37. sys.stderr.write(
  38. "Missing dependency: openpyxl. Please install it in your python env.\n"
  39. "Example: pip install -r server/python/requirements.txt\n"
  40. f"Detail: {e}\n"
  41. )
  42. sys.exit(3)
  43. HEADERS = [
  44. "账号",
  45. "平台",
  46. "标题",
  47. "发布时间",
  48. "播放量",
  49. "评论量",
  50. "分享量",
  51. "收藏量",
  52. "点赞量",
  53. ]
  54. COL_WIDTHS = [18, 12, 50, 18, 12, 12, 12, 12, 12]
  55. PLATFORM_NAME_MAP = {
  56. "douyin": "抖音",
  57. "baijiahao": "百家号",
  58. "weixin_video": "视频号",
  59. "xiaohongshu": "小红书",
  60. "kuaishou": "快手",
  61. }
  62. def _safe_int(v):
  63. try:
  64. if v is None or v == "":
  65. return 0
  66. return int(float(v))
  67. except Exception:
  68. return 0
  69. def _safe_str(v) -> str:
  70. """将任意值转换为字符串,过滤 Excel 不支持的代理字符。"""
  71. if v is None:
  72. return ""
  73. try:
  74. s = str(v)
  75. except Exception:
  76. s = repr(v)
  77. return "".join(ch for ch in s if not (0xD800 <= ord(ch) <= 0xDFFF))
  78. def _format_datetime_pretty(value: str) -> str:
  79. """
  80. 将时间字符串格式化为人类可读格式:
  81. - 今年:MM-DD HH:mm
  82. - 往年:YYYY-MM-DD HH:mm
  83. """
  84. if not value:
  85. return ""
  86. s = str(value).strip()
  87. try:
  88. if s.endswith("Z"):
  89. s_clean = s[:-1]
  90. else:
  91. s_clean = s
  92. s_clean = s_clean.replace(" ", "T")
  93. dt = datetime.fromisoformat(s_clean)
  94. now_year = datetime.now().year
  95. if dt.year == now_year:
  96. return dt.strftime("%m-%d %H:%M")
  97. return dt.strftime("%Y-%m-%d %H:%M")
  98. except Exception:
  99. pass
  100. try:
  101. if len(s) >= 16:
  102. parts = s.split(" ")
  103. if len(parts) >= 2:
  104. date_part = parts[0]
  105. time_part = parts[1]
  106. date_parts = date_part.split("-")
  107. time_parts = time_part.split(":")
  108. if len(date_parts) >= 3 and len(time_parts) >= 2:
  109. year = int(date_parts[0])
  110. month = date_parts[1].zfill(2)
  111. day = date_parts[2].zfill(2)
  112. hour = time_parts[0].zfill(2)
  113. minute = time_parts[1].zfill(2)
  114. now_year = datetime.now().year
  115. if year == now_year:
  116. return f"{month}-{day} {hour}:{minute}"
  117. return f"{year}-{month}-{day} {hour}:{minute}"
  118. except Exception:
  119. pass
  120. return s
  121. def build_xlsx(works):
  122. wb = Workbook()
  123. ws = wb.active
  124. ws.title = "作品数据"
  125. ws.append(HEADERS)
  126. header_font = Font(bold=True)
  127. header_fill = PatternFill("solid", fgColor="F2F2F2")
  128. center = Alignment(horizontal="center", vertical="center", wrap_text=False)
  129. left = Alignment(horizontal="left", vertical="center", wrap_text=True)
  130. thin = Side(style="thin", color="D9D9D9")
  131. border = Border(left=thin, right=thin, top=thin, bottom=thin)
  132. for col_idx in range(1, len(HEADERS) + 1):
  133. cell = ws.cell(row=1, column=col_idx)
  134. cell.font = header_font
  135. cell.fill = header_fill
  136. cell.alignment = center
  137. cell.border = border
  138. for i, w in enumerate(COL_WIDTHS, start=1):
  139. col_letter = get_column_letter(i)
  140. ws.column_dimensions[col_letter].width = w
  141. for w in works:
  142. platform_raw = (w.get("platform") or "").strip()
  143. platform_cn = PLATFORM_NAME_MAP.get(platform_raw, platform_raw)
  144. ws.append([
  145. _safe_str(w.get("accountName")),
  146. _safe_str(platform_cn),
  147. _safe_str(w.get("title")),
  148. _safe_str(_format_datetime_pretty(w.get("publishTime"))),
  149. _safe_int(w.get("viewsCount")),
  150. _safe_int(w.get("commentsCount")),
  151. _safe_int(w.get("sharesCount")),
  152. _safe_int(w.get("collectsCount")),
  153. _safe_int(w.get("likesCount")),
  154. ])
  155. int_cols = {"E", "F", "G", "H", "I"}
  156. for row in range(2, ws.max_row + 1):
  157. for col in range(1, len(HEADERS) + 1):
  158. c = ws.cell(row=row, column=col)
  159. c.border = border
  160. if col in (1, 3):
  161. c.alignment = left
  162. else:
  163. c.alignment = center
  164. for c_letter in int_cols:
  165. c = ws[f"{c_letter}{row}"]
  166. if c.value is not None:
  167. c.number_format = "0"
  168. ws.freeze_panes = "A2"
  169. bio = BytesIO()
  170. wb.save(bio)
  171. return bio.getvalue()
  172. def main():
  173. try:
  174. raw = sys.stdin.read()
  175. payload = json.loads(raw) if raw.strip() else {}
  176. except Exception as e:
  177. sys.stderr.write(f"Invalid JSON input: {e}\n")
  178. sys.exit(2)
  179. works = payload.get("works") or []
  180. xlsx_bytes = build_xlsx(works)
  181. sys.stdout.buffer.write(xlsx_bytes)
  182. if __name__ == "__main__":
  183. main()