index.ts 8.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290
  1. import { Server as HttpServer } from 'http';
  2. import { WebSocketServer, WebSocket } from 'ws';
  3. import jwt from 'jsonwebtoken';
  4. import { config } from '../config/index.js';
  5. import { logger } from '../utils/logger.js';
  6. import type { JwtPayload } from '../middleware/auth.js';
  7. import { WS_EVENTS } from '@media-manager/shared';
  8. interface AuthenticatedWebSocket extends WebSocket {
  9. userId?: number;
  10. isAlive?: boolean;
  11. }
  12. class WebSocketManager {
  13. private wss: WebSocketServer | null = null;
  14. private clients: Map<number, Set<AuthenticatedWebSocket>> = new Map();
  15. // 修复: captchaListeners 存储 callback 和 timer,以便超时自动删除
  16. private captchaListeners: Map<string, { callback: (code: string) => void; timer: NodeJS.Timeout }> = new Map();
  17. // 心跳检测定时器
  18. private heartbeatTimer: NodeJS.Timeout | null = null;
  19. setup(server: HttpServer): void {
  20. this.wss = new WebSocketServer({ server, path: '/ws' });
  21. this.wss.on('connection', (ws: AuthenticatedWebSocket, req) => {
  22. logger.info(`[WS] New connection from ${req.socket.remoteAddress}, URL: ${req.url}`);
  23. ws.isAlive = true;
  24. ws.on('pong', () => {
  25. ws.isAlive = true;
  26. });
  27. ws.on('message', (data) => {
  28. logger.info(`[WS] Received message: ${data.toString().slice(0, 200)}`);
  29. this.handleMessage(ws, data.toString());
  30. });
  31. ws.on('close', (code, reason) => {
  32. logger.info(`[WS] Connection closed, code: ${code}, reason: ${reason?.toString()}`);
  33. this.removeClient(ws);
  34. });
  35. ws.on('error', (error) => {
  36. logger.error('[WS] Connection error:', error);
  37. this.removeClient(ws);
  38. });
  39. // 发送连接成功消息,等待认证
  40. logger.info('[WS] Sending connection message...');
  41. this.send(ws, { type: 'connection', payload: { message: 'Connected, please authenticate' } });
  42. });
  43. // 心跳检测
  44. this.heartbeatTimer = setInterval(() => {
  45. this.wss?.clients.forEach((ws: AuthenticatedWebSocket) => {
  46. if (!ws.isAlive) {
  47. this.removeClient(ws);
  48. return ws.terminate();
  49. }
  50. ws.isAlive = false;
  51. ws.ping();
  52. });
  53. }, 30000);
  54. logger.info('WebSocket server initialized');
  55. }
  56. private handleMessage(ws: AuthenticatedWebSocket, data: string): void {
  57. try {
  58. const message = JSON.parse(data);
  59. switch (message.type) {
  60. case WS_EVENTS.AUTH:
  61. this.handleAuth(ws, message.payload?.token);
  62. break;
  63. case WS_EVENTS.PING:
  64. this.send(ws, { type: WS_EVENTS.PONG, timestamp: Date.now() });
  65. break;
  66. case WS_EVENTS.CAPTCHA_SUBMIT:
  67. this.handleCaptchaSubmit(message.payload);
  68. break;
  69. default:
  70. logger.warn('Unknown WebSocket message type:', message.type);
  71. }
  72. } catch (error) {
  73. logger.error('Failed to parse WebSocket message:', error);
  74. }
  75. }
  76. /**
  77. * 处理验证码提交
  78. */
  79. private handleCaptchaSubmit(payload: { captchaTaskId: string; code: string }): void {
  80. const { captchaTaskId, code } = payload || {};
  81. if (!captchaTaskId || !code) {
  82. logger.warn('[WS] Invalid captcha submit payload');
  83. return;
  84. }
  85. const listener = this.captchaListeners.get(captchaTaskId);
  86. if (listener) {
  87. logger.info(`[WS] Captcha submitted for task ${captchaTaskId}`);
  88. // 修复: 调用 listener.callback
  89. listener.callback(code);
  90. // 调用后删除监听器
  91. clearTimeout(listener.timer);
  92. this.captchaListeners.delete(captchaTaskId);
  93. } else {
  94. logger.warn(`[WS] No listener for captcha task ${captchaTaskId}`);
  95. }
  96. }
  97. private handleAuth(ws: AuthenticatedWebSocket, token?: string): void {
  98. logger.info(`[WS] Authenticating, token: ${token ? token.slice(0, 20) + '...' : 'none'}`);
  99. if (!token) {
  100. logger.warn('[WS] Auth failed: no token');
  101. this.send(ws, { type: WS_EVENTS.AUTH_ERROR, payload: { message: 'Token required' } });
  102. return;
  103. }
  104. try {
  105. const decoded = jwt.verify(token, config.jwt.secret) as JwtPayload;
  106. ws.userId = decoded.userId;
  107. // 添加到用户的连接列表
  108. if (!this.clients.has(decoded.userId)) {
  109. this.clients.set(decoded.userId, new Set());
  110. }
  111. this.clients.get(decoded.userId)!.add(ws);
  112. this.send(ws, {
  113. type: WS_EVENTS.AUTH_SUCCESS,
  114. payload: { userId: decoded.userId, message: 'Authenticated' }
  115. });
  116. logger.info(`[WS] Auth success for user ${decoded.userId}, total connections: ${this.clients.get(decoded.userId)!.size}`);
  117. } catch (error) {
  118. logger.warn('[WS] Auth failed: invalid token', error);
  119. this.send(ws, { type: WS_EVENTS.AUTH_ERROR, payload: { message: 'Invalid token' } });
  120. }
  121. }
  122. private removeClient(ws: AuthenticatedWebSocket): void {
  123. if (ws.userId) {
  124. const userClients = this.clients.get(ws.userId);
  125. if (userClients) {
  126. userClients.delete(ws);
  127. if (userClients.size === 0) {
  128. this.clients.delete(ws.userId);
  129. }
  130. }
  131. }
  132. }
  133. private send(ws: WebSocket, data: object): void {
  134. if (ws.readyState === WebSocket.OPEN) {
  135. ws.send(JSON.stringify({ ...data, timestamp: Date.now() }));
  136. }
  137. }
  138. /**
  139. * 向指定用户的所有客户端发送消息
  140. */
  141. sendToUser(userId: number, type: string, payload?: unknown): void {
  142. const userClients = this.clients.get(userId);
  143. if (userClients) {
  144. const message = JSON.stringify({ type, payload, timestamp: Date.now() });
  145. logger.info(`[WS] Sending to user ${userId}: type=${type}, payload=${JSON.stringify(payload)}`);
  146. logger.info(`[WS] Full message: ${message}`);
  147. userClients.forEach((ws) => {
  148. if (ws.readyState === WebSocket.OPEN) {
  149. ws.send(message);
  150. }
  151. });
  152. } else {
  153. logger.warn(`[WS] No clients for user ${userId}, message not sent: type=${type}`);
  154. }
  155. }
  156. /**
  157. * 向所有已认证的客户端广播消息
  158. */
  159. broadcast(type: string, payload?: unknown): void {
  160. const message = JSON.stringify({ type, payload, timestamp: Date.now() });
  161. this.clients.forEach((userClients) => {
  162. userClients.forEach((ws) => {
  163. if (ws.readyState === WebSocket.OPEN) {
  164. ws.send(message);
  165. }
  166. });
  167. });
  168. }
  169. /**
  170. * 获取在线用户数
  171. */
  172. getOnlineUserCount(): number {
  173. return this.clients.size;
  174. }
  175. /**
  176. * 检查用户是否在线
  177. */
  178. isUserOnline(userId: number): boolean {
  179. const userClients = this.clients.get(userId);
  180. return userClients ? userClients.size > 0 : false;
  181. }
  182. /**
  183. * 注册验证码提交监听(5分钟超时后自动删除)
  184. */
  185. onCaptchaSubmit(captchaTaskId: string, callback: (code: string) => void): void {
  186. // 修复: 清除旧的定时器
  187. const existing = this.captchaListeners.get(captchaTaskId);
  188. if (existing) {
  189. clearTimeout(existing.timer);
  190. }
  191. // 5分钟超时后自动删除
  192. const timer = setTimeout(() => {
  193. this.captchaListeners.delete(captchaTaskId);
  194. logger.info(`[WS] Captcha listener timed out for ${captchaTaskId}`);
  195. }, 5 * 60 * 1000);
  196. this.captchaListeners.set(captchaTaskId, { callback, timer });
  197. logger.info(`[WS] Registered captcha listener for ${captchaTaskId}`);
  198. }
  199. /**
  200. * 移除验证码监听
  201. */
  202. removeCaptchaListener(captchaTaskId: string): void {
  203. const existing = this.captchaListeners.get(captchaTaskId);
  204. if (existing) {
  205. clearTimeout(existing.timer);
  206. this.captchaListeners.delete(captchaTaskId);
  207. logger.info(`[WS] Removed captcha listener for ${captchaTaskId}`);
  208. }
  209. }
  210. /**
  211. * 清理所有监听器
  212. */
  213. clearAllCaptchaListeners(): void {
  214. for (const { timer } of this.captchaListeners.values()) {
  215. clearTimeout(timer);
  216. }
  217. this.captchaListeners.clear();
  218. logger.info('[WS] Cleared all captcha listeners');
  219. }
  220. /**
  221. * 关闭 WebSocket 服务器并清理资源
  222. */
  223. close(): void {
  224. // 清理心跳定时器
  225. if (this.heartbeatTimer) {
  226. clearInterval(this.heartbeatTimer);
  227. this.heartbeatTimer = null;
  228. }
  229. // 清理所有验证码监听器
  230. this.clearAllCaptchaListeners();
  231. // 关闭所有客户端连接
  232. this.clients.forEach((userClients) => {
  233. userClients.forEach((ws) => {
  234. try {
  235. ws.close();
  236. } catch (e) {
  237. // 忽略关闭错误
  238. }
  239. });
  240. });
  241. this.clients.clear();
  242. // 关闭 WebSocket 服务器
  243. if (this.wss) {
  244. this.wss.close();
  245. this.wss = null;
  246. }
  247. logger.info('[WS] WebSocket server closed');
  248. }
  249. }
  250. export const wsManager = new WebSocketManager();
  251. export function setupWebSocket(server: HttpServer): void {
  252. wsManager.setup(server);
  253. }