| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290 |
- import { Server as HttpServer } from 'http';
- import { WebSocketServer, WebSocket } from 'ws';
- import jwt from 'jsonwebtoken';
- import { config } from '../config/index.js';
- import { logger } from '../utils/logger.js';
- import type { JwtPayload } from '../middleware/auth.js';
- import { WS_EVENTS } from '@media-manager/shared';
- interface AuthenticatedWebSocket extends WebSocket {
- userId?: number;
- isAlive?: boolean;
- }
- class WebSocketManager {
- private wss: WebSocketServer | null = null;
- private clients: Map<number, Set<AuthenticatedWebSocket>> = new Map();
- // 修复: captchaListeners 存储 callback 和 timer,以便超时自动删除
- private captchaListeners: Map<string, { callback: (code: string) => void; timer: NodeJS.Timeout }> = new Map();
- // 心跳检测定时器
- private heartbeatTimer: NodeJS.Timeout | null = null;
- setup(server: HttpServer): void {
- this.wss = new WebSocketServer({ server, path: '/ws' });
- this.wss.on('connection', (ws: AuthenticatedWebSocket, req) => {
- logger.info(`[WS] New connection from ${req.socket.remoteAddress}, URL: ${req.url}`);
- ws.isAlive = true;
- ws.on('pong', () => {
- ws.isAlive = true;
- });
- ws.on('message', (data) => {
- logger.info(`[WS] Received message: ${data.toString().slice(0, 200)}`);
- this.handleMessage(ws, data.toString());
- });
- ws.on('close', (code, reason) => {
- logger.info(`[WS] Connection closed, code: ${code}, reason: ${reason?.toString()}`);
- this.removeClient(ws);
- });
- ws.on('error', (error) => {
- logger.error('[WS] Connection error:', error);
- this.removeClient(ws);
- });
- // 发送连接成功消息,等待认证
- logger.info('[WS] Sending connection message...');
- this.send(ws, { type: 'connection', payload: { message: 'Connected, please authenticate' } });
- });
- // 心跳检测
- this.heartbeatTimer = setInterval(() => {
- this.wss?.clients.forEach((ws: AuthenticatedWebSocket) => {
- if (!ws.isAlive) {
- this.removeClient(ws);
- return ws.terminate();
- }
- ws.isAlive = false;
- ws.ping();
- });
- }, 30000);
- logger.info('WebSocket server initialized');
- }
- private handleMessage(ws: AuthenticatedWebSocket, data: string): void {
- try {
- const message = JSON.parse(data);
- switch (message.type) {
- case WS_EVENTS.AUTH:
- this.handleAuth(ws, message.payload?.token);
- break;
- case WS_EVENTS.PING:
- this.send(ws, { type: WS_EVENTS.PONG, timestamp: Date.now() });
- break;
- case WS_EVENTS.CAPTCHA_SUBMIT:
- this.handleCaptchaSubmit(message.payload);
- break;
- default:
- logger.warn('Unknown WebSocket message type:', message.type);
- }
- } catch (error) {
- logger.error('Failed to parse WebSocket message:', error);
- }
- }
-
- /**
- * 处理验证码提交
- */
- private handleCaptchaSubmit(payload: { captchaTaskId: string; code: string }): void {
- const { captchaTaskId, code } = payload || {};
- if (!captchaTaskId || !code) {
- logger.warn('[WS] Invalid captcha submit payload');
- return;
- }
-
- const listener = this.captchaListeners.get(captchaTaskId);
- if (listener) {
- logger.info(`[WS] Captcha submitted for task ${captchaTaskId}`);
- // 修复: 调用 listener.callback
- listener.callback(code);
- // 调用后删除监听器
- clearTimeout(listener.timer);
- this.captchaListeners.delete(captchaTaskId);
- } else {
- logger.warn(`[WS] No listener for captcha task ${captchaTaskId}`);
- }
- }
- private handleAuth(ws: AuthenticatedWebSocket, token?: string): void {
- logger.info(`[WS] Authenticating, token: ${token ? token.slice(0, 20) + '...' : 'none'}`);
-
- if (!token) {
- logger.warn('[WS] Auth failed: no token');
- this.send(ws, { type: WS_EVENTS.AUTH_ERROR, payload: { message: 'Token required' } });
- return;
- }
- try {
- const decoded = jwt.verify(token, config.jwt.secret) as JwtPayload;
- ws.userId = decoded.userId;
- // 添加到用户的连接列表
- if (!this.clients.has(decoded.userId)) {
- this.clients.set(decoded.userId, new Set());
- }
- this.clients.get(decoded.userId)!.add(ws);
- this.send(ws, {
- type: WS_EVENTS.AUTH_SUCCESS,
- payload: { userId: decoded.userId, message: 'Authenticated' }
- });
- logger.info(`[WS] Auth success for user ${decoded.userId}, total connections: ${this.clients.get(decoded.userId)!.size}`);
- } catch (error) {
- logger.warn('[WS] Auth failed: invalid token', error);
- this.send(ws, { type: WS_EVENTS.AUTH_ERROR, payload: { message: 'Invalid token' } });
- }
- }
- private removeClient(ws: AuthenticatedWebSocket): void {
- if (ws.userId) {
- const userClients = this.clients.get(ws.userId);
- if (userClients) {
- userClients.delete(ws);
- if (userClients.size === 0) {
- this.clients.delete(ws.userId);
- }
- }
- }
- }
- private send(ws: WebSocket, data: object): void {
- if (ws.readyState === WebSocket.OPEN) {
- ws.send(JSON.stringify({ ...data, timestamp: Date.now() }));
- }
- }
- /**
- * 向指定用户的所有客户端发送消息
- */
- sendToUser(userId: number, type: string, payload?: unknown): void {
- const userClients = this.clients.get(userId);
- if (userClients) {
- const message = JSON.stringify({ type, payload, timestamp: Date.now() });
- logger.info(`[WS] Sending to user ${userId}: type=${type}, payload=${JSON.stringify(payload)}`);
- logger.info(`[WS] Full message: ${message}`);
- userClients.forEach((ws) => {
- if (ws.readyState === WebSocket.OPEN) {
- ws.send(message);
- }
- });
- } else {
- logger.warn(`[WS] No clients for user ${userId}, message not sent: type=${type}`);
- }
- }
- /**
- * 向所有已认证的客户端广播消息
- */
- broadcast(type: string, payload?: unknown): void {
- const message = JSON.stringify({ type, payload, timestamp: Date.now() });
- this.clients.forEach((userClients) => {
- userClients.forEach((ws) => {
- if (ws.readyState === WebSocket.OPEN) {
- ws.send(message);
- }
- });
- });
- }
- /**
- * 获取在线用户数
- */
- getOnlineUserCount(): number {
- return this.clients.size;
- }
- /**
- * 检查用户是否在线
- */
- isUserOnline(userId: number): boolean {
- const userClients = this.clients.get(userId);
- return userClients ? userClients.size > 0 : false;
- }
-
- /**
- * 注册验证码提交监听(5分钟超时后自动删除)
- */
- onCaptchaSubmit(captchaTaskId: string, callback: (code: string) => void): void {
- // 修复: 清除旧的定时器
- const existing = this.captchaListeners.get(captchaTaskId);
- if (existing) {
- clearTimeout(existing.timer);
- }
- // 5分钟超时后自动删除
- const timer = setTimeout(() => {
- this.captchaListeners.delete(captchaTaskId);
- logger.info(`[WS] Captcha listener timed out for ${captchaTaskId}`);
- }, 5 * 60 * 1000);
- this.captchaListeners.set(captchaTaskId, { callback, timer });
- logger.info(`[WS] Registered captcha listener for ${captchaTaskId}`);
- }
-
- /**
- * 移除验证码监听
- */
- removeCaptchaListener(captchaTaskId: string): void {
- const existing = this.captchaListeners.get(captchaTaskId);
- if (existing) {
- clearTimeout(existing.timer);
- this.captchaListeners.delete(captchaTaskId);
- logger.info(`[WS] Removed captcha listener for ${captchaTaskId}`);
- }
- }
-
- /**
- * 清理所有监听器
- */
- clearAllCaptchaListeners(): void {
- for (const { timer } of this.captchaListeners.values()) {
- clearTimeout(timer);
- }
- this.captchaListeners.clear();
- logger.info('[WS] Cleared all captcha listeners');
- }
-
- /**
- * 关闭 WebSocket 服务器并清理资源
- */
- close(): void {
- // 清理心跳定时器
- if (this.heartbeatTimer) {
- clearInterval(this.heartbeatTimer);
- this.heartbeatTimer = null;
- }
-
- // 清理所有验证码监听器
- this.clearAllCaptchaListeners();
-
- // 关闭所有客户端连接
- this.clients.forEach((userClients) => {
- userClients.forEach((ws) => {
- try {
- ws.close();
- } catch (e) {
- // 忽略关闭错误
- }
- });
- });
- this.clients.clear();
-
- // 关闭 WebSocket 服务器
- if (this.wss) {
- this.wss.close();
- this.wss = null;
- }
-
- logger.info('[WS] WebSocket server closed');
- }
- }
- export const wsManager = new WebSocketManager();
- export function setupWebSocket(server: HttpServer): void {
- wsManager.setup(server);
- }
|