| 12345678910111213141516171819202122232425262728293031323334353637383940 |
- # socket_manager.py
- import socket, json
- import websockets
- from settings import APP_HOST,PORT
- from middleware import UnicornException
- class SocketClient:
- def __init__(self, uri="ws://127.0.0.1:7074"):
- self.uri = uri
- self.websocket = None
- async def connect(self):
- if self.websocket == None:
- self.websocket = await websockets.connect(self.uri)
- print(f"Local Socket Connected to {self.uri}")
- async def send_message(self, code=0, msg="", data: object = None):
- if self.websocket:
- # 本地客户端发送消息,在软件异步处理过程需要有客户端转发消息到应用客户端
- json_data = json.dumps(
- {"code": code, "type": "forward_message", "msg": msg, "data": data}
- )
- await self.websocket.send(json_data)
- print("Message sent:", json_data)
- else:
- print("WebSocket client is not connected")
- async def close(self):
- if self.websocket:
- await self.websocket.close()
- print("WebSocket connection closed")
- def jsonMessage(self, code=0, msg="", data: object = None):
- """json字符串数据"""
- jsonData = {"code": code, "msg": msg, "data": data}
- return json.dumps(jsonData)
- # 创建全局 SocketManager 实例
- socket_manager = SocketClient(f"ws://{APP_HOST}:{PORT}/ws")
|