connect_manager.py 1.0 KB

1234567891011121314151617181920212223242526272829303132
  1. from models import WebSocket
  2. from logger import logger
  3. import json
  4. class ConnectionManager:
  5. def __init__(self):
  6. self.active_connections: list[WebSocket] = []
  7. def jsonMessage(self, code=0, msg="", data: object = None):
  8. """json字符串数据"""
  9. jsonData = {"code": code, "msg": msg, "data": data}
  10. return json.dumps(jsonData)
  11. async def connect(self, websocket: WebSocket):
  12. '''连接事件'''
  13. await websocket.accept()
  14. self.active_connections.append(websocket)
  15. logger.info("socket 已连接")
  16. def disconnect(self, websocket: WebSocket):
  17. '''断开连接事件'''
  18. self.active_connections.remove(websocket)
  19. logger.info("socket 连接断开")
  20. async def send_personal_message(self, message: str, websocket: WebSocket):
  21. '''向用户发送消息'''
  22. await websocket.send_json(message)
  23. async def broadcast(self, message: str):
  24. """广播消息"""
  25. for connection in self.active_connections:
  26. await connection.send_json(message)