SerialIns.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322
  1. #!usr/bin/python3
  2. import serial.tools.list_ports
  3. import time
  4. # https://blog.csdn.net/weixin_44289254/article/details/121583562
  5. class SerialIns(object):
  6. def __init__(self, port_name=None, baud=9600, timeout=0.01):
  7. self.port_name = port_name
  8. self.baud = baud
  9. self.serial_handle = None
  10. # self.serial_handle = self.check_connect()
  11. try:
  12. self.serial_handle = serial.Serial(
  13. port=self.port_name, baudrate=self.baud, timeout=timeout
  14. )
  15. print("{}打开成功".format(self.port_name))
  16. except:
  17. self.serial_handle = None
  18. print("{}打开失败".format(self.port_name))
  19. self.receive_data = b""
  20. # self.receive_data = bytearray(b'\x00\UU\x02d\x00\x9b\x9b')
  21. def read_line(self):
  22. c = self.serial_handle.inWaiting()
  23. if c:
  24. time.sleep(0.02)
  25. # 确保缓存写入完成
  26. if c == self.serial_handle.inWaiting():
  27. return self.serial_handle.readline()
  28. return None
  29. def read_all(self):
  30. return self.serial_handle.read_all()
  31. def check_connect(self):
  32. if not self.port_name:
  33. self.scan_serial_port()
  34. if self.port_name:
  35. self.serial_handle = serial.Serial(self.port_name, self.baud)
  36. else:
  37. try:
  38. # if self.scan_serial_port(self.port_name):
  39. self.serial_handle = serial.Serial(self.port_name, self.baud)
  40. print("{}打开成功".format(self.port_name))
  41. except:
  42. print("{}打开失败".format(self.port_name))
  43. return self.serial_handle
  44. def clearn_flush(self): # 清空接收缓存
  45. if self.serial_handle:
  46. self.serial_handle.flushInput()
  47. self.receive_data = b""
  48. def write_cmd(self, data: list):
  49. if self.serial_handle:
  50. # data = [(0xff & par1), (0xff & (par1 >> 8))]
  51. # self.clearn_flush()
  52. buf = bytearray(b"")
  53. buf.extend([0x55, 0x55, (0xFF & len(data))])
  54. buf.extend(data)
  55. buf.extend([0xFF & ~sum(data)])
  56. # 55 55 02 5a 01 a4
  57. # print("send buf {}".format(self.change_hex_to_int(buf)))
  58. try:
  59. self.serial_handle.write(buf)
  60. return True
  61. except:
  62. self.serial_handle = None
  63. _recv_data = b""
  64. return False
  65. def read_cmd_out_time(self, scan_interval=0.1, out_time=1):
  66. if self.serial_handle:
  67. """
  68. 获取数据,设计超时机制
  69. :param scan_interval:
  70. :param out_time:
  71. :return:
  72. """
  73. n = 0
  74. while 1:
  75. n += 1
  76. time.sleep(scan_interval)
  77. if out_time <= n * scan_interval:
  78. return False
  79. receive_data = self.read_cmd()
  80. if receive_data:
  81. return receive_data
  82. return False
  83. def read_cmd1(self, out_time=1, check=None):
  84. s = time.time()
  85. while 1:
  86. try:
  87. _recv_data = self.serial_handle.read_all() # 读取接收到的数据
  88. except:
  89. self.serial_handle = None
  90. _recv_data = b""
  91. return False
  92. self.receive_data += _recv_data # 写入所有缓存数据
  93. # print(self.receive_data)
  94. if not self.receive_data or len(self.receive_data) < 4:
  95. return
  96. if self.receive_data[0] == 0x55 and self.receive_data[1] == 0x55:
  97. # print("read ori ", self.change_hex_to_int(self.receive_data))
  98. data_len = self.receive_data[2]
  99. if len(self.receive_data) < data_len + 4:
  100. # 此处需要超时机制
  101. print("1数据长度不够,等待下次读取")
  102. # 超时退出
  103. if time.time() - s > out_time:
  104. time.sleep(0.01)
  105. return
  106. _data = self.receive_data[3 : data_len + 4]
  107. # 更新缓存区
  108. self.receive_data = self.receive_data[data_len + 4 :]
  109. # 校验数据
  110. if check is None:
  111. if 0xFF & ~sum(_data[:-1]) == _data[-1]:
  112. # print("data:", self.change_hex_to_int(data[:-1]))
  113. return _data[:-1]
  114. else:
  115. print("数据异常,丢弃")
  116. return
  117. else:
  118. if _data[-1] == check:
  119. return _data[:-1]
  120. else:
  121. print("数据异常,丢弃")
  122. return
  123. else:
  124. # print("起始位不是 55 55 进行移除", self.receive_data[0])
  125. # 起始位不是 55 55 进行移除
  126. index = 0
  127. for index, r_data in enumerate(self.receive_data):
  128. if r_data == 0x55:
  129. break
  130. index += 1
  131. if index >= len(self.receive_data):
  132. self.receive_data = b""
  133. else:
  134. self.receive_data = self.receive_data[index - 1 :]
  135. def read_cmd111(self, out_time=1, check=None):
  136. while 1:
  137. try:
  138. _recv_data = self.serial_handle.read_all() # 读取接收到的数据
  139. except BaseException as e:
  140. print("串口接收报错", e)
  141. self.serial_handle = None
  142. _recv_data = b""
  143. return False
  144. # print("read_cmd", _recv_data)
  145. if not _recv_data or len(_recv_data) < 4:
  146. # print("数据长度不够")
  147. return
  148. print("2 _recv_data", self.change_hex_to_int(_recv_data))
  149. if _recv_data[0] == 0x55 and _recv_data[1] == 0x55:
  150. # print("read ori ", self.change_hex_to_int(self.receive_data))
  151. data_len = _recv_data[2]
  152. if len(_recv_data) < data_len + 4:
  153. # 此处需要超时机制
  154. print("2数据长度不够,等待下次读取")
  155. # if time.time() - s > out_time:
  156. # time.sleep(0.01)
  157. # return
  158. return
  159. _data = _recv_data[3 : data_len + 4]
  160. # 校验数据
  161. if check is None:
  162. if 0xFF & ~sum(_data[:-1]) == _data[-1]:
  163. # print("data:", self.change_hex_to_int(data[:-1]))
  164. return _data[:-1]
  165. else:
  166. print("数据异常,丢弃")
  167. return
  168. else:
  169. if _data[-1] == check:
  170. return _data[:-1]
  171. else:
  172. print("数据异常,丢弃")
  173. return
  174. else:
  175. # 起始位不是 55 55 进行移除
  176. while self.receive_data:
  177. if len(self.receive_data) == 1:
  178. if self.receive_data[0] == 0x55:
  179. break
  180. else:
  181. self.receive_data = b""
  182. else:
  183. if (
  184. self.receive_data[0] == 0x55
  185. and self.receive_data[1] == 0x55
  186. ):
  187. break
  188. else:
  189. self.receive_data = self.receive_data[1:]
  190. def read_cmd(self, out_time=1, check=None, out_time_n=5):
  191. n = 0
  192. while 1:
  193. try:
  194. read_d = self.serial_handle.read_all() # 读取接收到的数据
  195. self.receive_data += read_d
  196. except BaseException as e:
  197. print("串口接收报错", e)
  198. self.serial_handle = None
  199. return False
  200. if len(self.receive_data) < 4:
  201. break
  202. if self.receive_data[0] == 0x55 and self.receive_data[1] == 0x55:
  203. # print("read ori ", self.change_hex_to_int(self.receive_data))
  204. data_len = self.receive_data[2]
  205. if len(self.receive_data) < data_len + 4:
  206. # 此处需要超时机制
  207. # print("数据长度不够,等待下次读取")
  208. # 超时退出
  209. # if not self.serial_handle.txdone():
  210. # return None
  211. # n += 1
  212. # if n > out_time_n:
  213. # return None
  214. # time.sleep(0.01)
  215. continue
  216. _data = self.receive_data[3 : data_len + 4]
  217. # 更新缓存区
  218. self.receive_data = self.receive_data[data_len + 4 :]
  219. # 校验数据
  220. if 0xFF & ~sum(_data[:-1]) == _data[-1]:
  221. # print("receive_data:", self.change_hex_to_int(self.receive_data[:-1]))
  222. return _data[:-1]
  223. else:
  224. return None
  225. else:
  226. # print("起始位不是 55 55 进行移除", self.receive_data[0])
  227. # 起始位不是 55 55 进行移除
  228. while self.receive_data:
  229. if len(self.receive_data) == 1:
  230. if self.receive_data[0] == 0x55:
  231. break
  232. else:
  233. self.receive_data = b""
  234. else:
  235. if (
  236. self.receive_data[0] == 0x55
  237. and self.receive_data[1] == 0x55
  238. ):
  239. break
  240. else:
  241. self.receive_data = self.receive_data[1:]
  242. def change_hex_to_int(self, _bytearray):
  243. return " ".join([hex(x)[2:].zfill(2) for x in _bytearray])
  244. def scan_serial_port(self, port_name=None):
  245. plist = list(serial.tools.list_ports.comports())
  246. if len(plist) <= 0:
  247. return None
  248. if port_name:
  249. for i in plist:
  250. # print("port", i)
  251. if i.name == port_name:
  252. return i
  253. return None
  254. else:
  255. for i in plist:
  256. if "CH340" in i.description:
  257. self.port_name = i.name
  258. print("----------", i)
  259. return plist
  260. def open_serial_port(self):
  261. if self.serial_handle:
  262. if not s.check_connect():
  263. self.serial_handle.open()
  264. return True
  265. def close_serial_port(self):
  266. if self.serial_handle:
  267. self.serial_handle.close()
  268. print("{}串口已关闭".format(self.port_name))
  269. self.serial_handle = None
  270. def __del__(self):
  271. self.close_serial_port()
  272. if __name__ == "__main__":
  273. s = SerialIns(port_name="COM11", baud=115200, timeout=0.1)
  274. s.scan_serial_port()
  275. s.clearn_flush()
  276. print("-" * 30)
  277. for i in range(2):
  278. data = [0x55, 0x55, 0x2, 0x5A, 0x1, 0xA4]
  279. print("command data {}".format(s.change_hex_to_int(data)))
  280. # s.write_cmd(data=data)
  281. buf = bytearray(b"")
  282. buf.extend(data)
  283. s.serial_handle.write(buf)
  284. time.sleep(0.1)
  285. data = s.read_cmd()
  286. # print("data-->", data)
  287. if data:
  288. print("data--> {}".format(s.change_hex_to_int(data)))
  289. time.sleep(1)
  290. print("-" * 30)
  291. s.close_serial_port()