from .connect_manager import ConnectionManager from models import WebSocket import json, asyncio from mcu.DeviceControl import DeviceControl, checkMcuConnection from mcu.BlueToothMode import BlueToothMode # socket消息发送逻辑处理方法 async def handlerSend( manager: ConnectionManager, receiveData: str, websocket: WebSocket ): loop = asyncio.get_event_loop() receiveData = json.loads(receiveData) # 处理消息发送逻辑 receiveData = json.loads(receiveData.get("text")) jsonType = receiveData.get("type") code = receiveData.get("code") msg = receiveData.get("msg") data = receiveData.get("data") print("receiveData", receiveData) print("jsonType", jsonType) match jsonType: case "ping": """发送心跳""" data = manager.jsonMessage("pong") await manager.send_personal_message(data, websocket) case "pong": """发送心跳""" pass case "forward_message": data = receiveData.get("data") dictMsg = {"code":code,"msg":msg,"data":data} await manager.broadcast(dictMsg) case "connect_mcu": device_ctrl = DeviceControl(websocket_manager=manager) # if device_ctrl.serial_ins.check_connect(): # print("未连接") loop.create_task(checkMcuConnection(device_ctrl), name="mcu") case "connect_bluetooth": blue_tooth = BlueToothMode(websocket_manager=manager) # await blue_tooth.main_func() loop.create_task(blue_tooth.main_func(), name="blue_tooth") # loop.close() case "init_mcu": device_ctrl = DeviceControl(websocket_manager=manager) loop.create_task(device_ctrl.initDevice(), name="init_mcu") case "control_mcu": device_name = data.get("device_name") value = data.get("value") if (device_name == "" or device_name == None) or ( value == "" or value == None ): data = manager.jsonMessage(code=1,msg="参数错误") await manager.send_personal_message(data, websocket) return device_ctrl = DeviceControl(websocket_manager=manager) device_ctrl.controlDevice(device_name, value) case "run_mcu_config": action = json.load(open('action.json',encoding='utf-8')) left_actions = action.get("actions") # data = manager.jsonMessage(code=0, msg="json消息", data=left_actions) # await manager.send_personal_message(data, websocket) device_ctrl = DeviceControl(websocket_manager=manager) loop.create_task( device_ctrl.run_mcu_config(left_actions, "123456789"), name="run_mcu_config", ) case _: data = manager.jsonMessage(code=1, msg="未知消息") await manager.send_personal_message(data, websocket) return