||
- #!usr/bin/python3
- import serial.tools.list_ports
- import time
- # https://blog.csdn.net/weixin_44289254/article/details/121583562
- class SerialIns(object):
- def __init__(self, port_name=None, baud=9600, timeout=0.01):
- self.port_name = port_name
- self.baud = baud
- self.serial_handle = None
- # self.serial_handle = self.check_connect()
- try:
- self.serial_handle = serial.Serial(
- port=self.port_name, baudrate=self.baud, timeout=timeout
- )
- print("{}打开成功".format(self.port_name))
- except Exception as e:
- print(e)
- self.serial_handle = None
- print("{}打开失败".format(self.port_name))
- self.receive_data = b""
- # self.receive_data = bytearray(b'\x00\UU\x02d\x00\x9b\x9b')
- def read_line(self):
- c = self.serial_handle.inWaiting()
- if c:
- time.sleep(0.02)
- # 确保缓存写入完成
- if c == self.serial_handle.inWaiting():
- return self.serial_handle.readline()
- return None
- def read_all(self):
- return self.serial_handle.read_all()
- def check_connect(self):
- if not self.port_name:
- self.scan_serial_port()
- if self.port_name:
- self.serial_handle = serial.Serial(self.port_name, self.baud)
- else:
- try:
- # if self.scan_serial_port(self.port_name):
- self.serial_handle = serial.Serial(self.port_name, self.baud)
- print("{}打开成功".format(self.port_name))
- except:
- print("{}打开失败".format(self.port_name))
- return self.serial_handle
- def clearn_flush(self): # 清空接收缓存
- if self.serial_handle:
- self.serial_handle.flushInput()
- self.receive_data = b""
- def write_cmd(self, data: list):
- if self.serial_handle:
- # data = [(0xff & par1), (0xff & (par1 >> 8))]
- # self.clearn_flush()
- buf = bytearray(b"")
- buf.extend([0x55, 0x55, (0xFF & len(data))])
- buf.extend(data)
- buf.extend([0xFF & ~sum(data)])
- # 55 55 02 5a 01 a4
- # print("send buf {}".format(self.change_hex_to_int(buf)))
- try:
- self.serial_handle.write(buf)
- return True
- except:
- self.serial_handle = None
- _recv_data = b""
- return False
- def read_cmd_out_time(self, scan_interval=0.1, out_time=1):
- if self.serial_handle:
- """
- 获取数据,设计超时机制
- :param scan_interval:
- :param out_time:
- :return:
- """
- n = 0
- while 1:
- n += 1
- time.sleep(scan_interval)
- if out_time <= n * scan_interval:
- return False
- receive_data = self.read_cmd()
- if receive_data:
- return receive_data
- return False
- def read_cmd1(self, out_time=1, check=None):
- s = time.time()
- while 1:
- try:
- _recv_data = self.serial_handle.read_all() # 读取接收到的数据
- except:
- self.serial_handle = None
- _recv_data = b""
- return False
- self.receive_data += _recv_data # 写入所有缓存数据
- # print(self.receive_data)
- if not self.receive_data or len(self.receive_data) < 4:
- return
- if self.receive_data[0] == 0x55 and self.receive_data[1] == 0x55:
- # print("read ori ", self.change_hex_to_int(self.receive_data))
- data_len = self.receive_data[2]
- if len(self.receive_data) < data_len + 4:
- # 此处需要超时机制
- print("1数据长度不够,等待下次读取")
- # 超时退出
- if time.time() - s > out_time:
- time.sleep(0.01)
- return
- _data = self.receive_data[3 : data_len + 4]
- # 更新缓存区
- self.receive_data = self.receive_data[data_len + 4 :]
- # 校验数据
- if check is None:
- if 0xFF & ~sum(_data[:-1]) == _data[-1]:
- # print("data:", self.change_hex_to_int(data[:-1]))
- return _data[:-1]
- else:
- print("数据异常,丢弃")
- return
- else:
- if _data[-1] == check:
- return _data[:-1]
- else:
- print("数据异常,丢弃")
- return
- else:
- # print("起始位不是 55 55 进行移除", self.receive_data[0])
- # 起始位不是 55 55 进行移除
- index = 0
- for index, r_data in enumerate(self.receive_data):
- if r_data == 0x55:
- break
- index += 1
- if index >= len(self.receive_data):
- self.receive_data = b""
- else:
- self.receive_data = self.receive_data[index - 1 :]
- def read_cmd111(self, out_time=1, check=None):
- while 1:
- try:
- _recv_data = self.serial_handle.read_all() # 读取接收到的数据
- except BaseException as e:
- print("串口接收报错", e)
- self.serial_handle = None
- _recv_data = b""
- return False
- # print("read_cmd", _recv_data)
- if not _recv_data or len(_recv_data) < 4:
- # print("数据长度不够")
- return
- print("2 _recv_data", self.change_hex_to_int(_recv_data))
- if _recv_data[0] == 0x55 and _recv_data[1] == 0x55:
- # print("read ori ", self.change_hex_to_int(self.receive_data))
- data_len = _recv_data[2]
- if len(_recv_data) < data_len + 4:
- # 此处需要超时机制
- print("2数据长度不够,等待下次读取")
- # if time.time() - s > out_time:
- # time.sleep(0.01)
- # return
- return
- _data = _recv_data[3 : data_len + 4]
- # 校验数据
- if check is None:
- if 0xFF & ~sum(_data[:-1]) == _data[-1]:
- # print("data:", self.change_hex_to_int(data[:-1]))
- return _data[:-1]
- else:
- print("数据异常,丢弃")
- return
- else:
- if _data[-1] == check:
- return _data[:-1]
- else:
- print("数据异常,丢弃")
- return
- else:
- # 起始位不是 55 55 进行移除
- while self.receive_data:
- if len(self.receive_data) == 1:
- if self.receive_data[0] == 0x55:
- break
- else:
- self.receive_data = b""
- else:
- if (
- self.receive_data[0] == 0x55
- and self.receive_data[1] == 0x55
- ):
- break
- else:
- self.receive_data = self.receive_data[1:]
- def read_cmd(self, out_time=1, check=None, out_time_n=5):
- n = 0
- while 1:
- try:
- read_d = self.serial_handle.read_all() # 读取接收到的数据
- self.receive_data += read_d
- except BaseException as e:
- print("串口接收报错", e)
- self.serial_handle = None
- return False
- if len(self.receive_data) < 4:
- break
- if self.receive_data[0] == 0x55 and self.receive_data[1] == 0x55:
- # print("read ori ", self.change_hex_to_int(self.receive_data))
- data_len = self.receive_data[2]
- if len(self.receive_data) < data_len + 4:
- # 此处需要超时机制
- # print("数据长度不够,等待下次读取")
- # 超时退出
- # if not self.serial_handle.txdone():
- # return None
- # n += 1
- # if n > out_time_n:
- # return None
- # time.sleep(0.01)
- continue
- _data = self.receive_data[3 : data_len + 4]
- # 更新缓存区
- self.receive_data = self.receive_data[data_len + 4 :]
- # 校验数据
- if 0xFF & ~sum(_data[:-1]) == _data[-1]:
- # print("receive_data:", self.change_hex_to_int(self.receive_data[:-1]))
- return _data[:-1]
- else:
- return None
- else:
- # print("起始位不是 55 55 进行移除", self.receive_data[0])
- # 起始位不是 55 55 进行移除
- while self.receive_data:
- if len(self.receive_data) == 1:
- if self.receive_data[0] == 0x55:
- break
- else:
- self.receive_data = b""
- else:
- if (
- self.receive_data[0] == 0x55
- and self.receive_data[1] == 0x55
- ):
- break
- else:
- self.receive_data = self.receive_data[1:]
- def change_hex_to_int(self, _bytearray):
- return " ".join([hex(x)[2:].zfill(2) for x in _bytearray])
- def scan_serial_port(self, port_name=None):
- plist = list(serial.tools.list_ports.comports())
- if len(plist) <= 0:
- return None
- if port_name:
- for i in plist:
- # print("port", i)
- if i.name == port_name:
- return i
- return None
- else:
- for i in plist:
- print("串口列表:", i.description)
- print("串口列表:", i.name)
- if "CH340" in i.description:
- print("CH340:", i.name)
- self.port_name = i.name
- print("----------", i)
- return plist
- def open_serial_port(self):
- if self.serial_handle:
- if not s.check_connect():
- self.serial_handle.open()
- return True
- def close_serial_port(self):
- if self.serial_handle:
- self.serial_handle.close()
- print("{}串口已关闭".format(self.port_name))
- self.serial_handle = None
- def __del__(self):
- self.close_serial_port()
- if __name__ == "__main__":
- s = SerialIns(port_name="COM4", baud=115200, timeout=0.1)
- s.scan_serial_port()
- s.clearn_flush()
- print("-" * 30)
- for i in range(2):
- data = [2,99,1]
- s.write_cmd(data)
- time.sleep(0.1)
- data = s.read_cmd()
- # print("data-->", data)
- if data:
- print("data--> {}".format(s.change_hex_to_int(data)))
- time.sleep(1)
- print("-" * 30)
- s.close_serial_port()
|