import time import serial import serial.tools.list_ports # from mcu.base_mode.base import * from mcu.SerialIns import SerialIns import asyncio import random class Main(): def __init__(self): port_name = self.list_serial_ports() if not port_name: return self.last_value = 0 self.serial_ins = SerialIns(port_name=port_name, baud=115200) def encapsulation_data(self, data, len_data, data_magnification=1): # data_magnification 数据放大倍数,或缩小倍数,默认为1 data = int(data * data_magnification) if len_data == 1: return [0xFF & data] elif len_data == 2: return [0xFF & data >> 8, 0xFF & data] elif len_data == 4: return [0xFF & data >> 24, 0xFF & data >> 16, 0xFF & data >> 8, 0xFF & data] def list_serial_ports(self): # 获取所有可用串口列表 ports = serial.tools.list_ports.comports() # 遍历所有端口并打印信息 for port in ports: print(f"设备: {port.device}") print(f"名称: {port.name}") print(f"描述: {port.description}") print(f"硬件ID: {port.hwid}") print(f"制造商: {port.manufacturer}") print(f"产品ID: {port.product}") print(f"序列号: {port.serial_number}") print("-" * 40) if "CH340" in port.description: return port.name return False def run(self): n = 0 value = 0 while True: value += 1 random_int = random.randint(1, 10) time.sleep(random_int) n += 1 send_data = [] send_data.extend([0x01, 0x07, 0x01, 0x00, value, 0x05, 0x78, 0x01, 0x90, 0x00, 0x64, 0x00, 0x01, 0x00]) send_data.extend(self.encapsulation_data(data=n, len_data=4)) # send_data = [0x5a, 0x01] if value == 200: value = 0 self.serial_ins.write_cmd(send_data) _send_data = self.serial_ins.change_hex_to_int(send_data) _send_data = _send_data.strip() # print("s buf: {}".format(_send_data)) def get_data_from_receive_data( self, receive_data, start, len_data, data_magnification=1 ): # data_magnification 数据放大倍数,或缩小倍数,默认为1 try: if len_data == 1: data = receive_data[start] return data * data_magnification elif len_data == 2: data = receive_data[start] << 8 | receive_data[start + 1] return data * data_magnification elif len_data == 4: data = ( receive_data[start] << 24 | receive_data[start + 1] << 16 | receive_data[start + 2] << 8 | receive_data[start + 3] ) return data * data_magnification return None except: return None def print_all(self): print("开始打印数据") while 1: time.sleep(0.01) r_data = self.serial_ins.read_cmd() print("接收数据:{}".format(r_data)) if r_data: r_data = r_data[1:] _r_data = self.serial_ins.change_hex_to_int(r_data) _r_data = _r_data.strip() print("g buf:{}".format(_r_data)) value = self.get_data_from_receive_data(receive_data=r_data, start=3, len_data=2, data_magnification=1) print(value) if value == 1 or value == self.last_value + 1: self.last_value = value else: print("数据接收有中断") raise "数据接收有中断" else: # print("数据未获取到") pass if __name__ == '__main__': main = Main() async def run_tasks(): loop = asyncio.get_event_loop() # 创建任务 task1 = loop.run_in_executor(None, main.run) task2 = loop.run_in_executor(None, main.print_all) # 等待两个任务(它们会无限运行,除非发生异常) await asyncio.gather(task1, task2) try: asyncio.run(run_tasks()) except KeyboardInterrupt: print("程序被用户中断")