|
@@ -0,0 +1,322 @@
|
|
|
|
|
+#!usr/bin/python3
|
|
|
|
|
+import serial.tools.list_ports
|
|
|
|
|
+import time
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+# https://blog.csdn.net/weixin_44289254/article/details/121583562
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+class SerialIns(object):
|
|
|
|
|
+ def __init__(self, port_name=None, baud=9600, timeout=0.01):
|
|
|
|
|
+ self.port_name = port_name
|
|
|
|
|
+ self.baud = baud
|
|
|
|
|
+ self.serial_handle = None
|
|
|
|
|
+ # self.serial_handle = self.check_connect()
|
|
|
|
|
+ try:
|
|
|
|
|
+ self.serial_handle = serial.Serial(
|
|
|
|
|
+ port=self.port_name, baudrate=self.baud, timeout=timeout
|
|
|
|
|
+ )
|
|
|
|
|
+ print("{}打开成功".format(self.port_name))
|
|
|
|
|
+ except:
|
|
|
|
|
+ self.serial_handle = None
|
|
|
|
|
+ print("{}打开失败".format(self.port_name))
|
|
|
|
|
+
|
|
|
|
|
+ self.receive_data = b""
|
|
|
|
|
+ # self.receive_data = bytearray(b'\x00\UU\x02d\x00\x9b\x9b')
|
|
|
|
|
+
|
|
|
|
|
+ def read_line(self):
|
|
|
|
|
+ c = self.serial_handle.inWaiting()
|
|
|
|
|
+ if c:
|
|
|
|
|
+ time.sleep(0.02)
|
|
|
|
|
+ # 确保缓存写入完成
|
|
|
|
|
+ if c == self.serial_handle.inWaiting():
|
|
|
|
|
+ return self.serial_handle.readline()
|
|
|
|
|
+ return None
|
|
|
|
|
+
|
|
|
|
|
+ def read_all(self):
|
|
|
|
|
+ return self.serial_handle.read_all()
|
|
|
|
|
+
|
|
|
|
|
+ def check_connect(self):
|
|
|
|
|
+ if not self.port_name:
|
|
|
|
|
+ self.scan_serial_port()
|
|
|
|
|
+ if self.port_name:
|
|
|
|
|
+ self.serial_handle = serial.Serial(self.port_name, self.baud)
|
|
|
|
|
+ else:
|
|
|
|
|
+ try:
|
|
|
|
|
+ # if self.scan_serial_port(self.port_name):
|
|
|
|
|
+ self.serial_handle = serial.Serial(self.port_name, self.baud)
|
|
|
|
|
+ print("{}打开成功".format(self.port_name))
|
|
|
|
|
+ except:
|
|
|
|
|
+ print("{}打开失败".format(self.port_name))
|
|
|
|
|
+
|
|
|
|
|
+ return self.serial_handle
|
|
|
|
|
+
|
|
|
|
|
+ def clearn_flush(self): # 清空接收缓存
|
|
|
|
|
+ if self.serial_handle:
|
|
|
|
|
+ self.serial_handle.flushInput()
|
|
|
|
|
+ self.receive_data = b""
|
|
|
|
|
+
|
|
|
|
|
+ def write_cmd(self, data: list):
|
|
|
|
|
+ if self.serial_handle:
|
|
|
|
|
+ # data = [(0xff & par1), (0xff & (par1 >> 8))]
|
|
|
|
|
+ # self.clearn_flush()
|
|
|
|
|
+ buf = bytearray(b"")
|
|
|
|
|
+ buf.extend([0x55, 0x55, (0xFF & len(data))])
|
|
|
|
|
+ buf.extend(data)
|
|
|
|
|
+ buf.extend([0xFF & ~sum(data)])
|
|
|
|
|
+ # 55 55 02 5a 01 a4
|
|
|
|
|
+ # print("send buf {}".format(self.change_hex_to_int(buf)))
|
|
|
|
|
+ try:
|
|
|
|
|
+ self.serial_handle.write(buf)
|
|
|
|
|
+ return True
|
|
|
|
|
+ except:
|
|
|
|
|
+ self.serial_handle = None
|
|
|
|
|
+ _recv_data = b""
|
|
|
|
|
+ return False
|
|
|
|
|
+
|
|
|
|
|
+ def read_cmd_out_time(self, scan_interval=0.1, out_time=1):
|
|
|
|
|
+ if self.serial_handle:
|
|
|
|
|
+ """
|
|
|
|
|
+ 获取数据,设计超时机制
|
|
|
|
|
+ :param scan_interval:
|
|
|
|
|
+ :param out_time:
|
|
|
|
|
+ :return:
|
|
|
|
|
+ """
|
|
|
|
|
+ n = 0
|
|
|
|
|
+ while 1:
|
|
|
|
|
+ n += 1
|
|
|
|
|
+ time.sleep(scan_interval)
|
|
|
|
|
+ if out_time <= n * scan_interval:
|
|
|
|
|
+ return False
|
|
|
|
|
+ receive_data = self.read_cmd()
|
|
|
|
|
+ if receive_data:
|
|
|
|
|
+ return receive_data
|
|
|
|
|
+ return False
|
|
|
|
|
+
|
|
|
|
|
+ def read_cmd1(self, out_time=1, check=None):
|
|
|
|
|
+ s = time.time()
|
|
|
|
|
+ while 1:
|
|
|
|
|
+ try:
|
|
|
|
|
+ _recv_data = self.serial_handle.read_all() # 读取接收到的数据
|
|
|
|
|
+ except:
|
|
|
|
|
+ self.serial_handle = None
|
|
|
|
|
+ _recv_data = b""
|
|
|
|
|
+ return False
|
|
|
|
|
+
|
|
|
|
|
+ self.receive_data += _recv_data # 写入所有缓存数据
|
|
|
|
|
+ # print(self.receive_data)
|
|
|
|
|
+ if not self.receive_data or len(self.receive_data) < 4:
|
|
|
|
|
+ return
|
|
|
|
|
+ if self.receive_data[0] == 0x55 and self.receive_data[1] == 0x55:
|
|
|
|
|
+ # print("read ori ", self.change_hex_to_int(self.receive_data))
|
|
|
|
|
+ data_len = self.receive_data[2]
|
|
|
|
|
+ if len(self.receive_data) < data_len + 4:
|
|
|
|
|
+ # 此处需要超时机制
|
|
|
|
|
+ print("1数据长度不够,等待下次读取")
|
|
|
|
|
+ # 超时退出
|
|
|
|
|
+ if time.time() - s > out_time:
|
|
|
|
|
+ time.sleep(0.01)
|
|
|
|
|
+ return
|
|
|
|
|
+ _data = self.receive_data[3 : data_len + 4]
|
|
|
|
|
+ # 更新缓存区
|
|
|
|
|
+ self.receive_data = self.receive_data[data_len + 4 :]
|
|
|
|
|
+ # 校验数据
|
|
|
|
|
+ if check is None:
|
|
|
|
|
+ if 0xFF & ~sum(_data[:-1]) == _data[-1]:
|
|
|
|
|
+ # print("data:", self.change_hex_to_int(data[:-1]))
|
|
|
|
|
+ return _data[:-1]
|
|
|
|
|
+ else:
|
|
|
|
|
+ print("数据异常,丢弃")
|
|
|
|
|
+ return
|
|
|
|
|
+ else:
|
|
|
|
|
+ if _data[-1] == check:
|
|
|
|
|
+ return _data[:-1]
|
|
|
|
|
+ else:
|
|
|
|
|
+ print("数据异常,丢弃")
|
|
|
|
|
+ return
|
|
|
|
|
+ else:
|
|
|
|
|
+ # print("起始位不是 55 55 进行移除", self.receive_data[0])
|
|
|
|
|
+ # 起始位不是 55 55 进行移除
|
|
|
|
|
+ index = 0
|
|
|
|
|
+ for index, r_data in enumerate(self.receive_data):
|
|
|
|
|
+ if r_data == 0x55:
|
|
|
|
|
+ break
|
|
|
|
|
+
|
|
|
|
|
+ index += 1
|
|
|
|
|
+ if index >= len(self.receive_data):
|
|
|
|
|
+ self.receive_data = b""
|
|
|
|
|
+ else:
|
|
|
|
|
+ self.receive_data = self.receive_data[index - 1 :]
|
|
|
|
|
+
|
|
|
|
|
+ def read_cmd111(self, out_time=1, check=None):
|
|
|
|
|
+ while 1:
|
|
|
|
|
+ try:
|
|
|
|
|
+ _recv_data = self.serial_handle.read_all() # 读取接收到的数据
|
|
|
|
|
+ except BaseException as e:
|
|
|
|
|
+ print("串口接收报错", e)
|
|
|
|
|
+ self.serial_handle = None
|
|
|
|
|
+ _recv_data = b""
|
|
|
|
|
+ return False
|
|
|
|
|
+
|
|
|
|
|
+ # print("read_cmd", _recv_data)
|
|
|
|
|
+ if not _recv_data or len(_recv_data) < 4:
|
|
|
|
|
+ # print("数据长度不够")
|
|
|
|
|
+ return
|
|
|
|
|
+
|
|
|
|
|
+ print("2 _recv_data", self.change_hex_to_int(_recv_data))
|
|
|
|
|
+ if _recv_data[0] == 0x55 and _recv_data[1] == 0x55:
|
|
|
|
|
+ # print("read ori ", self.change_hex_to_int(self.receive_data))
|
|
|
|
|
+ data_len = _recv_data[2]
|
|
|
|
|
+ if len(_recv_data) < data_len + 4:
|
|
|
|
|
+ # 此处需要超时机制
|
|
|
|
|
+ print("2数据长度不够,等待下次读取")
|
|
|
|
|
+ # if time.time() - s > out_time:
|
|
|
|
|
+ # time.sleep(0.01)
|
|
|
|
|
+ # return
|
|
|
|
|
+ return
|
|
|
|
|
+
|
|
|
|
|
+ _data = _recv_data[3 : data_len + 4]
|
|
|
|
|
+ # 校验数据
|
|
|
|
|
+ if check is None:
|
|
|
|
|
+ if 0xFF & ~sum(_data[:-1]) == _data[-1]:
|
|
|
|
|
+ # print("data:", self.change_hex_to_int(data[:-1]))
|
|
|
|
|
+ return _data[:-1]
|
|
|
|
|
+ else:
|
|
|
|
|
+ print("数据异常,丢弃")
|
|
|
|
|
+ return
|
|
|
|
|
+ else:
|
|
|
|
|
+ if _data[-1] == check:
|
|
|
|
|
+ return _data[:-1]
|
|
|
|
|
+ else:
|
|
|
|
|
+ print("数据异常,丢弃")
|
|
|
|
|
+ return
|
|
|
|
|
+ else:
|
|
|
|
|
+ # 起始位不是 55 55 进行移除
|
|
|
|
|
+ while self.receive_data:
|
|
|
|
|
+ if len(self.receive_data) == 1:
|
|
|
|
|
+ if self.receive_data[0] == 0x55:
|
|
|
|
|
+ break
|
|
|
|
|
+ else:
|
|
|
|
|
+ self.receive_data = b""
|
|
|
|
|
+ else:
|
|
|
|
|
+ if (
|
|
|
|
|
+ self.receive_data[0] == 0x55
|
|
|
|
|
+ and self.receive_data[1] == 0x55
|
|
|
|
|
+ ):
|
|
|
|
|
+ break
|
|
|
|
|
+ else:
|
|
|
|
|
+ self.receive_data = self.receive_data[1:]
|
|
|
|
|
+
|
|
|
|
|
+ def read_cmd(self, out_time=1, check=None, out_time_n=5):
|
|
|
|
|
+ n = 0
|
|
|
|
|
+ while 1:
|
|
|
|
|
+ try:
|
|
|
|
|
+ read_d = self.serial_handle.read_all() # 读取接收到的数据
|
|
|
|
|
+ self.receive_data += read_d
|
|
|
|
|
+ except BaseException as e:
|
|
|
|
|
+ print("串口接收报错", e)
|
|
|
|
|
+ self.serial_handle = None
|
|
|
|
|
+ return False
|
|
|
|
|
+
|
|
|
|
|
+ if len(self.receive_data) < 4:
|
|
|
|
|
+ break
|
|
|
|
|
+
|
|
|
|
|
+ if self.receive_data[0] == 0x55 and self.receive_data[1] == 0x55:
|
|
|
|
|
+ # print("read ori ", self.change_hex_to_int(self.receive_data))
|
|
|
|
|
+ data_len = self.receive_data[2]
|
|
|
|
|
+ if len(self.receive_data) < data_len + 4:
|
|
|
|
|
+ # 此处需要超时机制
|
|
|
|
|
+ # print("数据长度不够,等待下次读取")
|
|
|
|
|
+ # 超时退出
|
|
|
|
|
+ # if not self.serial_handle.txdone():
|
|
|
|
|
+ # return None
|
|
|
|
|
+ # n += 1
|
|
|
|
|
+ # if n > out_time_n:
|
|
|
|
|
+ # return None
|
|
|
|
|
+ # time.sleep(0.01)
|
|
|
|
|
+ continue
|
|
|
|
|
+ _data = self.receive_data[3 : data_len + 4]
|
|
|
|
|
+ # 更新缓存区
|
|
|
|
|
+ self.receive_data = self.receive_data[data_len + 4 :]
|
|
|
|
|
+ # 校验数据
|
|
|
|
|
+ if 0xFF & ~sum(_data[:-1]) == _data[-1]:
|
|
|
|
|
+ # print("receive_data:", self.change_hex_to_int(self.receive_data[:-1]))
|
|
|
|
|
+ return _data[:-1]
|
|
|
|
|
+ else:
|
|
|
|
|
+ return None
|
|
|
|
|
+ else:
|
|
|
|
|
+ # print("起始位不是 55 55 进行移除", self.receive_data[0])
|
|
|
|
|
+ # 起始位不是 55 55 进行移除
|
|
|
|
|
+ while self.receive_data:
|
|
|
|
|
+ if len(self.receive_data) == 1:
|
|
|
|
|
+ if self.receive_data[0] == 0x55:
|
|
|
|
|
+ break
|
|
|
|
|
+ else:
|
|
|
|
|
+ self.receive_data = b""
|
|
|
|
|
+ else:
|
|
|
|
|
+ if (
|
|
|
|
|
+ self.receive_data[0] == 0x55
|
|
|
|
|
+ and self.receive_data[1] == 0x55
|
|
|
|
|
+ ):
|
|
|
|
|
+ break
|
|
|
|
|
+ else:
|
|
|
|
|
+ self.receive_data = self.receive_data[1:]
|
|
|
|
|
+
|
|
|
|
|
+ def change_hex_to_int(self, _bytearray):
|
|
|
|
|
+ return " ".join([hex(x)[2:].zfill(2) for x in _bytearray])
|
|
|
|
|
+
|
|
|
|
|
+ def scan_serial_port(self, port_name=None):
|
|
|
|
|
+ plist = list(serial.tools.list_ports.comports())
|
|
|
|
|
+ if len(plist) <= 0:
|
|
|
|
|
+ return None
|
|
|
|
|
+
|
|
|
|
|
+ if port_name:
|
|
|
|
|
+ for i in plist:
|
|
|
|
|
+ # print("port", i)
|
|
|
|
|
+ if i.name == port_name:
|
|
|
|
|
+ return i
|
|
|
|
|
+ return None
|
|
|
|
|
+ else:
|
|
|
|
|
+ for i in plist:
|
|
|
|
|
+ if "CH340" in i.description:
|
|
|
|
|
+ self.port_name = i.name
|
|
|
|
|
+ print("----------", i)
|
|
|
|
|
+ return plist
|
|
|
|
|
+
|
|
|
|
|
+ def open_serial_port(self):
|
|
|
|
|
+ if self.serial_handle:
|
|
|
|
|
+ if not s.check_connect():
|
|
|
|
|
+ self.serial_handle.open()
|
|
|
|
|
+ return True
|
|
|
|
|
+
|
|
|
|
|
+ def close_serial_port(self):
|
|
|
|
|
+ if self.serial_handle:
|
|
|
|
|
+ self.serial_handle.close()
|
|
|
|
|
+ print("{}串口已关闭".format(self.port_name))
|
|
|
|
|
+ self.serial_handle = None
|
|
|
|
|
+
|
|
|
|
|
+ def __del__(self):
|
|
|
|
|
+ self.close_serial_port()
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+if __name__ == "__main__":
|
|
|
|
|
+ s = SerialIns(port_name="COM11", baud=115200, timeout=0.1)
|
|
|
|
|
+ s.scan_serial_port()
|
|
|
|
|
+ s.clearn_flush()
|
|
|
|
|
+ print("-" * 30)
|
|
|
|
|
+ for i in range(2):
|
|
|
|
|
+ data = [0x55, 0x55, 0x2, 0x5A, 0x1, 0xA4]
|
|
|
|
|
+ print("command data {}".format(s.change_hex_to_int(data)))
|
|
|
|
|
+ # s.write_cmd(data=data)
|
|
|
|
|
+ buf = bytearray(b"")
|
|
|
|
|
+ buf.extend(data)
|
|
|
|
|
+ s.serial_handle.write(buf)
|
|
|
|
|
+
|
|
|
|
|
+ time.sleep(0.1)
|
|
|
|
|
+ data = s.read_cmd()
|
|
|
|
|
+ # print("data-->", data)
|
|
|
|
|
+ if data:
|
|
|
|
|
+ print("data--> {}".format(s.change_hex_to_int(data)))
|
|
|
|
|
+ time.sleep(1)
|
|
|
|
|
+ print("-" * 30)
|
|
|
|
|
+ s.close_serial_port()
|