| 1234567891011121314151617181920212223242526272829303132333435363738394041424344 |
- # socket_manager.py
- from enum import Flag
- import socket, json, asyncio
- import websockets
- from settings import APP_HOST,PORT
- from middleware import UnicornException
- class SocketClient:
- def __init__(self, uri="ws://127.0.0.1:7074"):
- self.uri = uri
- self.websocket = None
- async def connect(self):
- if self.websocket == None:
- self.websocket = await websockets.connect(self.uri)
- print(f"Local Socket Connected to {self.uri}")
- async def send_message(self, code=0, msg="", data: object = None):
- if self.websocket:
- # 本地客户端发送消息,在软件异步处理过程需要有客户端转发消息到应用客户端
- json_data = json.dumps(
- {"code": code, "type": "forward_message", "msg": msg, "data": data}
- )
- await self.websocket.send(json_data)
- print("Message sent:", json_data)
- else:
- print("WebSocket client is not connected")
- async def close(self):
- if self.websocket:
- await self.websocket.close()
- print("WebSocket connection closed")
- def jsonMessage(self, code=0, msg="", data: object = None):
- """json字符串数据"""
- jsonData = {"code": code, "msg": msg, "data": data}
- return json.dumps(jsonData)
- # 创建全局 SocketManager 实例
- socket_manager = SocketClient(f"ws://{APP_HOST}:{PORT}/ws")
- def sendMsg(code=0, msg="", data=None):
- '''通用客户端发送消息机制'''
- asyncio.run(socket_manager.send_message(code=code, msg=msg, data=data))
|