SerialIns.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321
  1. #!usr/bin/python3
  2. import serial.tools.list_ports
  3. import time
  4. # https://blog.csdn.net/weixin_44289254/article/details/121583562
  5. class SerialIns(object):
  6. def __init__(self, port_name=None, baud=9600, timeout=0.01):
  7. self.port_name = port_name
  8. self.baud = baud
  9. self.serial_handle = None
  10. # self.serial_handle = self.check_connect()
  11. try:
  12. self.serial_handle = serial.Serial(
  13. port=self.port_name, baudrate=self.baud, timeout=timeout
  14. )
  15. print("{}打开成功".format(self.port_name))
  16. except Exception as e:
  17. print(e)
  18. self.serial_handle = None
  19. print("{}打开失败".format(self.port_name))
  20. self.receive_data = b""
  21. # self.receive_data = bytearray(b'\x00\UU\x02d\x00\x9b\x9b')
  22. def read_line(self):
  23. c = self.serial_handle.inWaiting()
  24. if c:
  25. time.sleep(0.02)
  26. # 确保缓存写入完成
  27. if c == self.serial_handle.inWaiting():
  28. return self.serial_handle.readline()
  29. return None
  30. def read_all(self):
  31. return self.serial_handle.read_all()
  32. def check_connect(self):
  33. if not self.port_name:
  34. self.scan_serial_port()
  35. if self.port_name:
  36. self.serial_handle = serial.Serial(self.port_name, self.baud)
  37. else:
  38. try:
  39. # if self.scan_serial_port(self.port_name):
  40. self.serial_handle = serial.Serial(self.port_name, self.baud)
  41. print("{}打开成功".format(self.port_name))
  42. except:
  43. print("{}打开失败".format(self.port_name))
  44. return self.serial_handle
  45. def clearn_flush(self): # 清空接收缓存
  46. if self.serial_handle:
  47. self.serial_handle.flushInput()
  48. self.receive_data = b""
  49. def write_cmd(self, data: list):
  50. if self.serial_handle:
  51. # data = [(0xff & par1), (0xff & (par1 >> 8))]
  52. # self.clearn_flush()
  53. buf = bytearray(b"")
  54. buf.extend([0x55, 0x55, (0xFF & len(data))])
  55. buf.extend(data)
  56. buf.extend([0xFF & ~sum(data)])
  57. # 55 55 02 5a 01 a4
  58. print("send buf {}".format(self.change_hex_to_int(buf)))
  59. try:
  60. self.serial_handle.write(buf)
  61. return True
  62. except:
  63. self.serial_handle = None
  64. _recv_data = b""
  65. return False
  66. def read_cmd_out_time(self, scan_interval=0.1, out_time=1):
  67. if self.serial_handle:
  68. """
  69. 获取数据,设计超时机制
  70. :param scan_interval:
  71. :param out_time:
  72. :return:
  73. """
  74. n = 0
  75. while 1:
  76. n += 1
  77. time.sleep(scan_interval)
  78. if out_time <= n * scan_interval:
  79. return False
  80. receive_data = self.read_cmd()
  81. if receive_data:
  82. return receive_data
  83. return False
  84. def read_cmd1(self, out_time=1, check=None):
  85. s = time.time()
  86. while 1:
  87. try:
  88. _recv_data = self.serial_handle.read_all() # 读取接收到的数据
  89. except:
  90. self.serial_handle = None
  91. _recv_data = b""
  92. return False
  93. self.receive_data += _recv_data # 写入所有缓存数据
  94. # print(self.receive_data)
  95. if not self.receive_data or len(self.receive_data) < 4:
  96. return
  97. if self.receive_data[0] == 0x55 and self.receive_data[1] == 0x55:
  98. # print("read ori ", self.change_hex_to_int(self.receive_data))
  99. data_len = self.receive_data[2]
  100. if len(self.receive_data) < data_len + 4:
  101. # 此处需要超时机制
  102. print("1数据长度不够,等待下次读取")
  103. # 超时退出
  104. if time.time() - s > out_time:
  105. time.sleep(0.01)
  106. return
  107. _data = self.receive_data[3 : data_len + 4]
  108. # 更新缓存区
  109. self.receive_data = self.receive_data[data_len + 4 :]
  110. # 校验数据
  111. if check is None:
  112. if 0xFF & ~sum(_data[:-1]) == _data[-1]:
  113. # print("data:", self.change_hex_to_int(data[:-1]))
  114. return _data[:-1]
  115. else:
  116. print("数据异常,丢弃")
  117. return
  118. else:
  119. if _data[-1] == check:
  120. return _data[:-1]
  121. else:
  122. print("数据异常,丢弃")
  123. return
  124. else:
  125. # print("起始位不是 55 55 进行移除", self.receive_data[0])
  126. # 起始位不是 55 55 进行移除
  127. index = 0
  128. for index, r_data in enumerate(self.receive_data):
  129. if r_data == 0x55:
  130. break
  131. index += 1
  132. if index >= len(self.receive_data):
  133. self.receive_data = b""
  134. else:
  135. self.receive_data = self.receive_data[index - 1 :]
  136. def read_cmd111(self, out_time=1, check=None):
  137. while 1:
  138. try:
  139. _recv_data = self.serial_handle.read_all() # 读取接收到的数据
  140. except BaseException as e:
  141. print("串口接收报错", e)
  142. self.serial_handle = None
  143. _recv_data = b""
  144. return False
  145. # print("read_cmd", _recv_data)
  146. if not _recv_data or len(_recv_data) < 4:
  147. # print("数据长度不够")
  148. return
  149. print("2 _recv_data", self.change_hex_to_int(_recv_data))
  150. if _recv_data[0] == 0x55 and _recv_data[1] == 0x55:
  151. # print("read ori ", self.change_hex_to_int(self.receive_data))
  152. data_len = _recv_data[2]
  153. if len(_recv_data) < data_len + 4:
  154. # 此处需要超时机制
  155. print("2数据长度不够,等待下次读取")
  156. # if time.time() - s > out_time:
  157. # time.sleep(0.01)
  158. # return
  159. return
  160. _data = _recv_data[3 : data_len + 4]
  161. # 校验数据
  162. if check is None:
  163. if 0xFF & ~sum(_data[:-1]) == _data[-1]:
  164. # print("data:", self.change_hex_to_int(data[:-1]))
  165. return _data[:-1]
  166. else:
  167. print("数据异常,丢弃")
  168. return
  169. else:
  170. if _data[-1] == check:
  171. return _data[:-1]
  172. else:
  173. print("数据异常,丢弃")
  174. return
  175. else:
  176. # 起始位不是 55 55 进行移除
  177. while self.receive_data:
  178. if len(self.receive_data) == 1:
  179. if self.receive_data[0] == 0x55:
  180. break
  181. else:
  182. self.receive_data = b""
  183. else:
  184. if (
  185. self.receive_data[0] == 0x55
  186. and self.receive_data[1] == 0x55
  187. ):
  188. break
  189. else:
  190. self.receive_data = self.receive_data[1:]
  191. def read_cmd(self, out_time=1, check=None, out_time_n=5):
  192. n = 0
  193. while 1:
  194. try:
  195. read_d = self.serial_handle.read_all() # 读取接收到的数据
  196. self.receive_data += read_d
  197. except BaseException as e:
  198. print("串口接收报错", e)
  199. self.serial_handle = None
  200. return False
  201. if len(self.receive_data) < 4:
  202. break
  203. if self.receive_data[0] == 0x55 and self.receive_data[1] == 0x55:
  204. # print("read ori ", self.change_hex_to_int(self.receive_data))
  205. data_len = self.receive_data[2]
  206. if len(self.receive_data) < data_len + 4:
  207. # 此处需要超时机制
  208. # print("数据长度不够,等待下次读取")
  209. # 超时退出
  210. # if not self.serial_handle.txdone():
  211. # return None
  212. # n += 1
  213. # if n > out_time_n:
  214. # return None
  215. # time.sleep(0.01)
  216. continue
  217. _data = self.receive_data[3 : data_len + 4]
  218. # 更新缓存区
  219. self.receive_data = self.receive_data[data_len + 4 :]
  220. # 校验数据
  221. if 0xFF & ~sum(_data[:-1]) == _data[-1]:
  222. # print("receive_data:", self.change_hex_to_int(self.receive_data[:-1]))
  223. return _data[:-1]
  224. else:
  225. return None
  226. else:
  227. # print("起始位不是 55 55 进行移除", self.receive_data[0])
  228. # 起始位不是 55 55 进行移除
  229. while self.receive_data:
  230. if len(self.receive_data) == 1:
  231. if self.receive_data[0] == 0x55:
  232. break
  233. else:
  234. self.receive_data = b""
  235. else:
  236. if (
  237. self.receive_data[0] == 0x55
  238. and self.receive_data[1] == 0x55
  239. ):
  240. break
  241. else:
  242. self.receive_data = self.receive_data[1:]
  243. def change_hex_to_int(self, _bytearray):
  244. return " ".join([hex(x)[2:].zfill(2) for x in _bytearray])
  245. def scan_serial_port(self, port_name=None):
  246. plist = list(serial.tools.list_ports.comports())
  247. if len(plist) <= 0:
  248. return None
  249. if port_name:
  250. for i in plist:
  251. # print("port", i)
  252. if i.name == port_name:
  253. return i
  254. return None
  255. else:
  256. for i in plist:
  257. print("串口列表:", i.description)
  258. print("串口列表:", i.name)
  259. if "CH340" in i.description:
  260. print("CH340:", i.name)
  261. self.port_name = i.name
  262. print("----------", i)
  263. return plist
  264. def open_serial_port(self):
  265. if self.serial_handle:
  266. if not s.check_connect():
  267. self.serial_handle.open()
  268. return True
  269. def close_serial_port(self):
  270. if self.serial_handle:
  271. self.serial_handle.close()
  272. print("{}串口已关闭".format(self.port_name))
  273. self.serial_handle = None
  274. def __del__(self):
  275. self.close_serial_port()
  276. if __name__ == "__main__":
  277. s = SerialIns(port_name="COM5", baud=115200, timeout=0.1)
  278. s.scan_serial_port()
  279. s.clearn_flush()
  280. print("-" * 30)
  281. for i in range(2):
  282. data = [2,99,1]
  283. s.write_cmd(data)
  284. time.sleep(0.1)
  285. data = s.read_cmd()
  286. # print("data-->", data)
  287. if data:
  288. print("data--> {}".format(s.change_hex_to_int(data)))
  289. time.sleep(1)
  290. print("-" * 30)
  291. s.close_serial_port()