socket_client.py 1.6 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344
  1. # socket_manager.py
  2. from enum import Flag
  3. import socket, json, asyncio
  4. import websockets
  5. from settings import APP_HOST,PORT
  6. from middleware import UnicornException
  7. class SocketClient:
  8. def __init__(self, uri="ws://127.0.0.1:7074"):
  9. self.uri = uri
  10. self.websocket = None
  11. async def connect(self):
  12. if self.websocket == None:
  13. self.websocket = await websockets.connect(self.uri)
  14. print(f"Local Socket Connected to {self.uri}")
  15. async def send_message(self, code=0, msg="", data: object = None):
  16. if self.websocket:
  17. # 本地客户端发送消息,在软件异步处理过程需要有客户端转发消息到应用客户端
  18. json_data = json.dumps(
  19. {"code": code, "type": "forward_message", "msg": msg, "data": data}
  20. )
  21. await self.websocket.send(json_data)
  22. print("Message sent:", json_data)
  23. else:
  24. print("WebSocket client is not connected")
  25. async def close(self):
  26. if self.websocket:
  27. await self.websocket.close()
  28. print("WebSocket connection closed")
  29. def jsonMessage(self, code=0, msg="", data: object = None):
  30. """json字符串数据"""
  31. jsonData = {"code": code, "msg": msg, "data": data}
  32. return json.dumps(jsonData)
  33. # 创建全局 SocketManager 实例
  34. socket_manager = SocketClient(f"ws://{APP_HOST}:{PORT}/ws")
  35. def sendMsg(code=0, msg="", data=None):
  36. '''通用客户端发送消息机制'''
  37. asyncio.run(socket_manager.send_message(code=code, msg=msg, data=data))