#!usr/bin/python3 import serial.tools.list_ports import time # https://blog.csdn.net/weixin_44289254/article/details/121583562 class SerialIns(object): def __init__(self, port_name=None, baud=9600, timeout=0.01): self.port_name = port_name self.baud = baud self.serial_handle = None # self.serial_handle = self.check_connect() try: self.serial_handle = serial.Serial( port=self.port_name, baudrate=self.baud, timeout=timeout ) print("{}打开成功".format(self.port_name)) except Exception as e: print(e) self.serial_handle = None print("{}打开失败".format(self.port_name)) self.receive_data = b"" # self.receive_data = bytearray(b'\x00\UU\x02d\x00\x9b\x9b') def read_line(self): c = self.serial_handle.inWaiting() if c: time.sleep(0.02) # 确保缓存写入完成 if c == self.serial_handle.inWaiting(): return self.serial_handle.readline() return None def read_all(self): return self.serial_handle.read_all() def check_connect(self): if not self.port_name: self.scan_serial_port() if self.port_name: self.serial_handle = serial.Serial(self.port_name, self.baud) else: try: # if self.scan_serial_port(self.port_name): self.serial_handle = serial.Serial(self.port_name, self.baud) print("{}打开成功".format(self.port_name)) except: print("{}打开失败".format(self.port_name)) return self.serial_handle def clearn_flush(self): # 清空接收缓存 if self.serial_handle: self.serial_handle.flushInput() self.receive_data = b"" def write_cmd(self, data: list): if self.serial_handle: # data = [(0xff & par1), (0xff & (par1 >> 8))] # self.clearn_flush() buf = bytearray(b"") buf.extend([0x55, 0x55, (0xFF & len(data))]) buf.extend(data) buf.extend([0xFF & ~sum(data)]) # 55 55 02 5a 01 a4 print("send buf {}".format(self.change_hex_to_int(buf))) try: self.serial_handle.write(buf) return True except: self.serial_handle = None _recv_data = b"" return False def read_cmd_out_time(self, scan_interval=0.1, out_time=1): if self.serial_handle: """ 获取数据,设计超时机制 :param scan_interval: :param out_time: :return: """ n = 0 while 1: n += 1 time.sleep(scan_interval) if out_time <= n * scan_interval: return False receive_data = self.read_cmd() if receive_data: return receive_data return False def read_cmd1(self, out_time=1, check=None): s = time.time() while 1: try: _recv_data = self.serial_handle.read_all() # 读取接收到的数据 except: self.serial_handle = None _recv_data = b"" return False self.receive_data += _recv_data # 写入所有缓存数据 # print(self.receive_data) if not self.receive_data or len(self.receive_data) < 4: return if self.receive_data[0] == 0x55 and self.receive_data[1] == 0x55: # print("read ori ", self.change_hex_to_int(self.receive_data)) data_len = self.receive_data[2] if len(self.receive_data) < data_len + 4: # 此处需要超时机制 print("1数据长度不够,等待下次读取") # 超时退出 if time.time() - s > out_time: time.sleep(0.01) return _data = self.receive_data[3 : data_len + 4] # 更新缓存区 self.receive_data = self.receive_data[data_len + 4 :] # 校验数据 if check is None: if 0xFF & ~sum(_data[:-1]) == _data[-1]: # print("data:", self.change_hex_to_int(data[:-1])) return _data[:-1] else: print("数据异常,丢弃") return else: if _data[-1] == check: return _data[:-1] else: print("数据异常,丢弃") return else: # print("起始位不是 55 55 进行移除", self.receive_data[0]) # 起始位不是 55 55 进行移除 index = 0 for index, r_data in enumerate(self.receive_data): if r_data == 0x55: break index += 1 if index >= len(self.receive_data): self.receive_data = b"" else: self.receive_data = self.receive_data[index - 1 :] def read_cmd111(self, out_time=1, check=None): while 1: try: _recv_data = self.serial_handle.read_all() # 读取接收到的数据 except BaseException as e: print("串口接收报错", e) self.serial_handle = None _recv_data = b"" return False # print("read_cmd", _recv_data) if not _recv_data or len(_recv_data) < 4: # print("数据长度不够") return print("2 _recv_data", self.change_hex_to_int(_recv_data)) if _recv_data[0] == 0x55 and _recv_data[1] == 0x55: # print("read ori ", self.change_hex_to_int(self.receive_data)) data_len = _recv_data[2] if len(_recv_data) < data_len + 4: # 此处需要超时机制 print("2数据长度不够,等待下次读取") # if time.time() - s > out_time: # time.sleep(0.01) # return return _data = _recv_data[3 : data_len + 4] # 校验数据 if check is None: if 0xFF & ~sum(_data[:-1]) == _data[-1]: # print("data:", self.change_hex_to_int(data[:-1])) return _data[:-1] else: print("数据异常,丢弃") return else: if _data[-1] == check: return _data[:-1] else: print("数据异常,丢弃") return else: # 起始位不是 55 55 进行移除 while self.receive_data: if len(self.receive_data) == 1: if self.receive_data[0] == 0x55: break else: self.receive_data = b"" else: if ( self.receive_data[0] == 0x55 and self.receive_data[1] == 0x55 ): break else: self.receive_data = self.receive_data[1:] def read_cmd(self, out_time=1, check=None, out_time_n=5): n = 0 while 1: try: read_d = self.serial_handle.read_all() # 读取接收到的数据 self.receive_data += read_d except BaseException as e: print("串口接收报错", e) self.serial_handle = None return False if len(self.receive_data) < 4: break if self.receive_data[0] == 0x55 and self.receive_data[1] == 0x55: # print("read ori ", self.change_hex_to_int(self.receive_data)) data_len = self.receive_data[2] if len(self.receive_data) < data_len + 4: # 此处需要超时机制 # print("数据长度不够,等待下次读取") # 超时退出 # if not self.serial_handle.txdone(): # return None # n += 1 # if n > out_time_n: # return None # time.sleep(0.01) continue _data = self.receive_data[3 : data_len + 4] # 更新缓存区 self.receive_data = self.receive_data[data_len + 4 :] # 校验数据 if 0xFF & ~sum(_data[:-1]) == _data[-1]: # print("receive_data:", self.change_hex_to_int(self.receive_data[:-1])) return _data[:-1] else: return None else: # print("起始位不是 55 55 进行移除", self.receive_data[0]) # 起始位不是 55 55 进行移除 while self.receive_data: if len(self.receive_data) == 1: if self.receive_data[0] == 0x55: break else: self.receive_data = b"" else: if ( self.receive_data[0] == 0x55 and self.receive_data[1] == 0x55 ): break else: self.receive_data = self.receive_data[1:] def change_hex_to_int(self, _bytearray): return " ".join([hex(x)[2:].zfill(2) for x in _bytearray]) def scan_serial_port(self, port_name=None): plist = list(serial.tools.list_ports.comports()) if len(plist) <= 0: return None if port_name: for i in plist: # print("port", i) if i.name == port_name: return i return None else: for i in plist: print("串口列表:", i.description) print("串口列表:", i.name) if "CH340" in i.description: print("CH340:", i.name) self.port_name = i.name print("----------", i) return plist def open_serial_port(self): if self.serial_handle: if not s.check_connect(): self.serial_handle.open() return True def close_serial_port(self): if self.serial_handle: self.serial_handle.close() print("{}串口已关闭".format(self.port_name)) self.serial_handle = None def __del__(self): self.close_serial_port() if __name__ == "__main__": s = SerialIns(port_name="COM5", baud=115200, timeout=0.1) s.scan_serial_port() s.clearn_flush() print("-" * 30) for i in range(2): data = [2,99,1] s.write_cmd(data) time.sleep(0.1) data = s.read_cmd() # print("data-->", data) if data: print("data--> {}".format(s.change_hex_to_int(data))) time.sleep(1) print("-" * 30) s.close_serial_port()