socket_client.py 1.4 KB

12345678910111213141516171819202122232425262728293031323334353637383940
  1. # socket_manager.py
  2. import socket, json
  3. import websockets
  4. from settings import APP_HOST,PORT
  5. from middleware import UnicornException
  6. class SocketClient:
  7. def __init__(self, uri="ws://127.0.0.1:7074"):
  8. self.uri = uri
  9. self.websocket = None
  10. async def connect(self):
  11. if self.websocket == None:
  12. self.websocket = await websockets.connect(self.uri)
  13. print(f"Local Socket Connected to {self.uri}")
  14. async def send_message(self, code=0, msg="", data: object = None):
  15. if self.websocket:
  16. # 本地客户端发送消息,在软件异步处理过程需要有客户端转发消息到应用客户端
  17. json_data = json.dumps(
  18. {"code": code, "type": "forward_message", "msg": msg, "data": data}
  19. )
  20. await self.websocket.send(json_data)
  21. print("Message sent:", json_data)
  22. else:
  23. print("WebSocket client is not connected")
  24. async def close(self):
  25. if self.websocket:
  26. await self.websocket.close()
  27. print("WebSocket connection closed")
  28. def jsonMessage(self, code=0, msg="", data: object = None):
  29. """json字符串数据"""
  30. jsonData = {"code": code, "msg": msg, "data": data}
  31. return json.dumps(jsonData)
  32. # 创建全局 SocketManager 实例
  33. socket_manager = SocketClient(f"ws://{APP_HOST}:{PORT}/ws")