import { Server as HttpServer } from 'http'; import { WebSocketServer, WebSocket } from 'ws'; import jwt from 'jsonwebtoken'; import { config } from '../config/index.js'; import { logger } from '../utils/logger.js'; import type { JwtPayload } from '../middleware/auth.js'; import { WS_EVENTS } from '@media-manager/shared'; interface AuthenticatedWebSocket extends WebSocket { userId?: number; isAlive?: boolean; } class WebSocketManager { private wss: WebSocketServer | null = null; private clients: Map> = new Map(); // 修复: captchaListeners 存储 callback 和 timer,以便超时自动删除 private captchaListeners: Map void; timer: NodeJS.Timeout }> = new Map(); // 心跳检测定时器 private heartbeatTimer: NodeJS.Timeout | null = null; setup(server: HttpServer): void { this.wss = new WebSocketServer({ server, path: '/ws' }); this.wss.on('connection', (ws: AuthenticatedWebSocket, req) => { logger.info(`[WS] New connection from ${req.socket.remoteAddress}, URL: ${req.url}`); ws.isAlive = true; ws.on('pong', () => { ws.isAlive = true; }); ws.on('message', (data) => { logger.info(`[WS] Received message: ${data.toString().slice(0, 200)}`); this.handleMessage(ws, data.toString()); }); ws.on('close', (code, reason) => { logger.info(`[WS] Connection closed, code: ${code}, reason: ${reason?.toString()}`); this.removeClient(ws); }); ws.on('error', (error) => { logger.error('[WS] Connection error:', error); this.removeClient(ws); }); // 发送连接成功消息,等待认证 logger.info('[WS] Sending connection message...'); this.send(ws, { type: 'connection', payload: { message: 'Connected, please authenticate' } }); }); // 心跳检测 this.heartbeatTimer = setInterval(() => { this.wss?.clients.forEach((ws: AuthenticatedWebSocket) => { if (!ws.isAlive) { this.removeClient(ws); return ws.terminate(); } ws.isAlive = false; ws.ping(); }); }, 30000); logger.info('WebSocket server initialized'); } private handleMessage(ws: AuthenticatedWebSocket, data: string): void { try { const message = JSON.parse(data); switch (message.type) { case WS_EVENTS.AUTH: this.handleAuth(ws, message.payload?.token); break; case WS_EVENTS.PING: this.send(ws, { type: WS_EVENTS.PONG, timestamp: Date.now() }); break; case WS_EVENTS.CAPTCHA_SUBMIT: this.handleCaptchaSubmit(message.payload); break; default: logger.warn('Unknown WebSocket message type:', message.type); } } catch (error) { logger.error('Failed to parse WebSocket message:', error); } } /** * 处理验证码提交 */ private handleCaptchaSubmit(payload: { captchaTaskId: string; code: string }): void { const { captchaTaskId, code } = payload || {}; if (!captchaTaskId || !code) { logger.warn('[WS] Invalid captcha submit payload'); return; } const listener = this.captchaListeners.get(captchaTaskId); if (listener) { logger.info(`[WS] Captcha submitted for task ${captchaTaskId}`); // 修复: 调用 listener.callback listener.callback(code); // 调用后删除监听器 clearTimeout(listener.timer); this.captchaListeners.delete(captchaTaskId); } else { logger.warn(`[WS] No listener for captcha task ${captchaTaskId}`); } } private handleAuth(ws: AuthenticatedWebSocket, token?: string): void { logger.info(`[WS] Authenticating, token: ${token ? token.slice(0, 20) + '...' : 'none'}`); if (!token) { logger.warn('[WS] Auth failed: no token'); this.send(ws, { type: WS_EVENTS.AUTH_ERROR, payload: { message: 'Token required' } }); return; } try { const decoded = jwt.verify(token, config.jwt.secret) as JwtPayload; ws.userId = decoded.userId; // 添加到用户的连接列表 if (!this.clients.has(decoded.userId)) { this.clients.set(decoded.userId, new Set()); } this.clients.get(decoded.userId)!.add(ws); this.send(ws, { type: WS_EVENTS.AUTH_SUCCESS, payload: { userId: decoded.userId, message: 'Authenticated' } }); logger.info(`[WS] Auth success for user ${decoded.userId}, total connections: ${this.clients.get(decoded.userId)!.size}`); } catch (error) { logger.warn('[WS] Auth failed: invalid token', error); this.send(ws, { type: WS_EVENTS.AUTH_ERROR, payload: { message: 'Invalid token' } }); } } private removeClient(ws: AuthenticatedWebSocket): void { if (ws.userId) { const userClients = this.clients.get(ws.userId); if (userClients) { userClients.delete(ws); if (userClients.size === 0) { this.clients.delete(ws.userId); } } } } private send(ws: WebSocket, data: object): void { if (ws.readyState === WebSocket.OPEN) { ws.send(JSON.stringify({ ...data, timestamp: Date.now() })); } } /** * 向指定用户的所有客户端发送消息 */ sendToUser(userId: number, type: string, payload?: unknown): void { const userClients = this.clients.get(userId); if (userClients) { const message = JSON.stringify({ type, payload, timestamp: Date.now() }); logger.info(`[WS] Sending to user ${userId}: type=${type}, payload=${JSON.stringify(payload)}`); logger.info(`[WS] Full message: ${message}`); userClients.forEach((ws) => { if (ws.readyState === WebSocket.OPEN) { ws.send(message); } }); } else { logger.warn(`[WS] No clients for user ${userId}, message not sent: type=${type}`); } } /** * 向所有已认证的客户端广播消息 */ broadcast(type: string, payload?: unknown): void { const message = JSON.stringify({ type, payload, timestamp: Date.now() }); this.clients.forEach((userClients) => { userClients.forEach((ws) => { if (ws.readyState === WebSocket.OPEN) { ws.send(message); } }); }); } /** * 获取在线用户数 */ getOnlineUserCount(): number { return this.clients.size; } /** * 检查用户是否在线 */ isUserOnline(userId: number): boolean { const userClients = this.clients.get(userId); return userClients ? userClients.size > 0 : false; } /** * 注册验证码提交监听(5分钟超时后自动删除) */ onCaptchaSubmit(captchaTaskId: string, callback: (code: string) => void): void { // 修复: 清除旧的定时器 const existing = this.captchaListeners.get(captchaTaskId); if (existing) { clearTimeout(existing.timer); } // 5分钟超时后自动删除 const timer = setTimeout(() => { this.captchaListeners.delete(captchaTaskId); logger.info(`[WS] Captcha listener timed out for ${captchaTaskId}`); }, 5 * 60 * 1000); this.captchaListeners.set(captchaTaskId, { callback, timer }); logger.info(`[WS] Registered captcha listener for ${captchaTaskId}`); } /** * 移除验证码监听 */ removeCaptchaListener(captchaTaskId: string): void { const existing = this.captchaListeners.get(captchaTaskId); if (existing) { clearTimeout(existing.timer); this.captchaListeners.delete(captchaTaskId); logger.info(`[WS] Removed captcha listener for ${captchaTaskId}`); } } /** * 清理所有监听器 */ clearAllCaptchaListeners(): void { for (const { timer } of this.captchaListeners.values()) { clearTimeout(timer); } this.captchaListeners.clear(); logger.info('[WS] Cleared all captcha listeners'); } /** * 关闭 WebSocket 服务器并清理资源 */ close(): void { // 清理心跳定时器 if (this.heartbeatTimer) { clearInterval(this.heartbeatTimer); this.heartbeatTimer = null; } // 清理所有验证码监听器 this.clearAllCaptchaListeners(); // 关闭所有客户端连接 this.clients.forEach((userClients) => { userClients.forEach((ws) => { try { ws.close(); } catch (e) { // 忽略关闭错误 } }); }); this.clients.clear(); // 关闭 WebSocket 服务器 if (this.wss) { this.wss.close(); this.wss = null; } logger.info('[WS] WebSocket server closed'); } } export const wsManager = new WebSocketManager(); export function setupWebSocket(server: HttpServer): void { wsManager.setup(server); }