import time import serial import serial.tools.list_ports # from mcu.base_mode.base import * from mcu.SerialIns import SerialIns import asyncio class Main(): def __init__(self): port_name = self.list_serial_ports() if not port_name: return self.last_value = 0 self.serial_ins = SerialIns(port_name=port_name, baud=115200) def encapsulation_data(self, data, len_data, data_magnification=1): # data_magnification 数据放大倍数,或缩小倍数,默认为1 data = int(data * data_magnification) if len_data == 1: return [0xFF & data] elif len_data == 2: return [0xFF & data >> 8, 0xFF & data] elif len_data == 4: return [0xFF & data >> 24, 0xFF & data >> 16, 0xFF & data >> 8, 0xFF & data] def list_serial_ports(self): # 获取所有可用串口列表 ports = serial.tools.list_ports.comports() # 遍历所有端口并打印信息 for port in ports: print(f"设备: {port.device}") print(f"名称: {port.name}") print(f"描述: {port.description}") print(f"硬件ID: {port.hwid}") print(f"制造商: {port.manufacturer}") print(f"产品ID: {port.product}") print(f"序列号: {port.serial_number}") print("-" * 40) if "CH340" in port.description: return port.name return False def run(self): # loop = asyncio.get_event_loop() loop = asyncio.get_event_loop() listen_task = loop.run_in_executor(None, self.print_all()) asyncio.gather(listen_task) n = 0 value = 0 while True: value += 1 time.sleep(0.05) n += 1 send_data = [] send_data.extend([0x01, 0x07, 0x01, 0x00, value, 0x05, 0x78, 0x01, 0x90, 0x00, 0x64, 0x00, 0x01, 0x00]) send_data.extend(self.encapsulation_data(data=n, len_data=4)) # send_data = [0x5a, 0x01] if value == 200: value = 0 self.serial_ins.write_cmd(send_data) _send_data = self.serial_ins.change_hex_to_int(send_data) _send_data = _send_data.strip() # print("s buf: {}".format(_send_data)) def get_data_from_receive_data( self, receive_data, start, len_data, data_magnification=1 ): # data_magnification 数据放大倍数,或缩小倍数,默认为1 try: if len_data == 1: data = receive_data[start] return data * data_magnification elif len_data == 2: data = receive_data[start] << 8 | receive_data[start + 1] return data * data_magnification elif len_data == 4: data = ( receive_data[start] << 24 | receive_data[start + 1] << 16 | receive_data[start + 2] << 8 | receive_data[start + 3] ) return data * data_magnification return None except: return None def print_all(self): while 1: time.sleep(0.01) r_data = self.serial_ins.read_cmd() if r_data: r_data = r_data[1:] _r_data = self.serial_ins.change_hex_to_int(r_data) _r_data = _r_data.strip() print("g buf:{}".format(_r_data)) value = self.get_data_from_receive_data(receive_data=r_data, start=3, len_data=2, data_magnification=1) print(value) if value == 1 or value == self.last_value + 1: self.last_value = value else: raise "数据接收有中断" if __name__ == '__main__': main = Main() main.run()