import asyncio import serial.tools.list_ports import time, json from .SerialIns import SerialIns from utils.SingletonType import SingletonType from .BaseClass import BaseClass from sockets import ConnectionManager # mcu命令 class DeviceControl(BaseClass,metaclass=SingletonType): def __init__(self, websocket_manager: ConnectionManager): super().__init__(websocket_manager) self.serial_ins = None self.connected_ports_dict = {} # 已连接的ports self.p_list = [] self.temp_ports_dict = {} self.is_running = False def scan_serial_port(self) -> dict: # 获取所有可用串口列表 ports_dict = {} ports = serial.tools.list_ports.comports() # 遍历所有端口并打印信息 for port in ports: if "CH340" in port.description: ports_dict[port.name] = { "name": port.name, "device": port.device, "description": port.description, "hwid": port.hwid, "manufacturer": port.manufacturer, "product": port.product, "serial_number": port.serial_number, } if len(ports_dict) <= 0: return {} return ports_dict def remove_port(self, port_name): """移除串口""" print("remove", port_name) data = { "_type": "remove_port", "plugins_mode": "auto_select_com", "data": {"port_name": port_name}, } self.sendSocketMessage(1, "串口被移除", data) def add_port_by_linkage(self, port_name): # port_value :串口基础信息 # todo 根据prot_value 信息自动进行连接 print("add", port_name) # 对没有连接的设备进行尝试连接 message_data = { "_type": "show_info", "plugins_mode": "auto_select_com", "data": {"text": "开始识别接口:{}".format(port_name)}, } self.sendSocketMessage(msg="开始识别接口:{}".format(port_name), data=message_data) time.sleep(1) """ 步骤: 1、进行临时连接,并发送命令,成功后,自动连接对应设备 """ try: # 尝试使用115200波特率链接 serial_handle = serial.Serial(port=port_name, baudrate=115200, timeout=0.5) except: message_data = { "_type": "show_info", "plugins_mode": "auto_select_com", "data": {"text": "串口:{} 被占用,或无法识别".format(port_name)}, } self.sendSocketMessage( 1, msg="串口:{} 被占用,或无法识别".format(port_name).format(port_name), data=message_data, ) print("串口:{} 被占用".format(port_name)) return time.sleep(2) print("开始发送命令") data = [90, 1] try: serial_handle.flushInput() # 尝试重置输入缓冲区 except serial.SerialTimeoutException: print("超时错误:无法在规定时间内重置输入缓冲区。") self.sendSocketMessage( 1, msg="超时错误:无法在规定时间内重置输入缓冲区。", data=None, ) serial_handle.close() return print("尝试写入数据") buf = bytearray(b"") buf.extend([0x55, 0x55, (0xFF & len(data))]) buf.extend(data) buf.extend([0xFF & ~sum(data)]) try: self.receive_data = b"" serial_handle.write(buf) except serial.SerialTimeoutException: print("写入数据错误") serial_handle.close() return time.sleep(0.3) print("尝试接收命令") receive_data = self.read_cmd(serial_handle) device_id = 0 if receive_data: print("receive_data", receive_data) if receive_data[0] == 90: connect_flag = receive_data[1] device_id = receive_data[2] print("关闭串口:{}".format(port_name)) serial_handle.close() if device_id > 0: if device_id == 1: self.to_connect_com(port_name) message_data = { "_type": "show_info", "plugins_mode": "auto_select_com", "data": {"text": "MCU开始连接"}, } self.sendSocketMessage( msg="MCU开始连接", data=message_data, ) self.connected_ports_dict[port_name] = "MCU" message_data = { "_type": "select_port_name", "plugins_mode": "auto_select_com", "data": { "device_name": "mcu" if device_id == 1 else "remote_control", "port_name": port_name, }, } self.sendSocketMessage( msg="MCU连接成功", data=message_data, ) else: print("串口无法识别") # 走其他途径处理 # 检查当前MCU链接是否正常 # 正常跳过;记录为其他列表 # 不正常进行尝试连接 # 连接不上,记录为其他列表 def to_connect_com(self, port_name): # 关闭串口 print("to_connect_com", port_name) self.close_connect() time.sleep(0.3) self.connect_state = False try: self.serial_ins = SerialIns(port_name=port_name, baud=115200, timeout=0.1) if not self.serial_ins.serial_handle: message_data= { "_type": "show_info", "plugins_mode": "mcu", "data": "MCU 打开串口失败", } self.sendSocketMessage( msg="MCU 打开串口失败", data=message_data, ) self.serial_ins = None self.connect_state = False return False except: message_data={ "_type": "show_info", "plugins_mode": "mcu", "data": "MCU 打开串口失败", } self.sendSocketMessage( msg="MCU 打开串口失败", data=message_data, ) self.serial_ins = None self.connect_state = False return False message_data={"_type": "show_info", "plugins_mode": "mcu", "data": "MCU 开始连接"} self.sendSocketMessage( msg="MCU 开始连接", data=message_data, ) # =======================发送连接请求================================= cmd = 90 data = [cmd, 1] print("405 发送 连接请求 -----------------------------------------") print(self.serial_ins) # self.serial_ins.clearn_flush() self.serial_ins.write_cmd(data) # 延迟接收数据 time.sleep(0.3) receive_data = self.serial_ins.read_cmd(out_time=1) if receive_data: print( "409 receive_data--90:{}".format(self.change_hex_to_int(receive_data)) ) if receive_data: # receive_data[2]=1 表示为MCU设备编号 if receive_data[0] == 90 and receive_data[2] == 1: connect_flag = receive_data[1] # 是否有初始化 try: mcu_has_been_set = receive_data[ 6 ] # 设备是否有初始化 ,1 表示已初始化 except: mcu_has_been_set = 99 # 未知状态 print("MCU初始化信息{}".format(mcu_has_been_set)) message_data = { "_type": "show_info", "plugins_mode": "mcu", "data": "MCU 已连接", } self.sendSocketMessage( msg="MCU 已连接", data=message_data, ) self.connect_state = True if not self.is_running: print("MCU start") self.is_running = True print("MCU 已连接") self.port_name = port_name return print("MCU 连接失败") message_data = { "_type": "show_info", "plugins_mode": "mcu", "data": "MCU 连接失败", } self.sendSocketMessage( msg="MCU 连接失败", data=message_data, ) self.close_connect() def close_connect(self): self.port_name = "" if self.serial_ins: self.serial_ins.close_serial_port() self.connect_state = False async def checkMcuConnection(device_ctrl: DeviceControl): """实时检测串口是否连接""" while True: await asyncio.sleep(0.5) ports_dict = device_ctrl.scan_serial_port() device_ctrl.temp_ports_dict = ports_dict if not ports_dict: # 全部清空 移除所有串口 if device_ctrl.p_list: _p = device_ctrl.p_list.pop() device_ctrl.remove_port(_p) print("串口未连接,请检查") device_ctrl.sendSocketMessage(code=1, msg="串口未连接,请检查") continue if ports_dict: for index, _i in enumerate(device_ctrl.p_list): if _i not in ports_dict: _p = device_ctrl.p_list.pop(index) device_ctrl.remove_port(_p) for _port_name, _port_value in ports_dict.items(): if _port_name not in device_ctrl.p_list: # try: device_ctrl.p_list.append(_port_name) device_ctrl.add_port_by_linkage(_port_name) # except BaseException as e: # print("串口不存在{} {}".format(_port_name, e))