socket_client.py 1.2 KB

12345678910111213141516171819202122232425262728293031323334353637
  1. # socket_manager.py
  2. import socket, json
  3. import websockets
  4. from settings import APP_HOST,PORT
  5. from middleware import UnicornException
  6. class SocketClient:
  7. def __init__(self, uri="ws://127.0.0.1:7074"):
  8. self.uri = uri
  9. self.websocket = None
  10. async def connect(self):
  11. if self.websocket == None:
  12. self.websocket = await websockets.connect(self.uri)
  13. print(f"Local Socket Connected to {self.uri}")
  14. async def send_message(self, code=0, msg="", data: object = None):
  15. if self.websocket:
  16. json_data = json.dumps({"code": code, "msg": msg, "data": data})
  17. await self.websocket.send(json_data)
  18. print("Message sent:", json_data)
  19. else:
  20. print("WebSocket client is not connected")
  21. async def close(self):
  22. if self.websocket:
  23. await self.websocket.close()
  24. print("WebSocket connection closed")
  25. def jsonMessage(self, code=0, msg="", data: object = None):
  26. """json字符串数据"""
  27. jsonData = {"code": code, "msg": msg, "data": data}
  28. return json.dumps(jsonData)
  29. # 创建全局 SocketManager 实例
  30. socket_manager = SocketClient(f"ws://{APP_HOST}:{PORT}/ws")