SerialIns.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325
  1. #!usr/bin/python3
  2. import serial.tools.list_ports
  3. import time
  4. import logging
  5. logger = logging.getLogger(__name__)
  6. # https://blog.csdn.net/weixin_44289254/article/details/121583562
  7. class SerialIns(object):
  8. def __init__(self, port_name=None, baud=9600, timeout=0.01):
  9. self.port_name = port_name
  10. self.baud = baud
  11. self.serial_handle = None
  12. # self.serial_handle = self.check_connect()
  13. try:
  14. self.serial_handle = serial.Serial(
  15. port=self.port_name, baudrate=self.baud, timeout=timeout
  16. )
  17. print("{}打开成功".format(self.port_name))
  18. except Exception as e:
  19. print(e)
  20. self.serial_handle = None
  21. print("{}打开失败".format(self.port_name))
  22. self.receive_data = b""
  23. # self.receive_data = bytearray(b'\x00\UU\x02d\x00\x9b\x9b')
  24. def read_line(self):
  25. c = self.serial_handle.inWaiting()
  26. if c:
  27. time.sleep(0.02)
  28. # 确保缓存写入完成
  29. if c == self.serial_handle.inWaiting():
  30. return self.serial_handle.readline()
  31. return None
  32. def read_all(self):
  33. return self.serial_handle.read_all()
  34. def check_connect(self):
  35. if not self.port_name:
  36. self.scan_serial_port()
  37. if self.port_name:
  38. self.serial_handle = serial.Serial(self.port_name, self.baud)
  39. else:
  40. try:
  41. # if self.scan_serial_port(self.port_name):
  42. self.serial_handle = serial.Serial(self.port_name, self.baud)
  43. print("{}打开成功".format(self.port_name))
  44. except:
  45. print("{}打开失败".format(self.port_name))
  46. return self.serial_handle
  47. def clearn_flush(self): # 清空接收缓存
  48. if self.serial_handle:
  49. self.serial_handle.flushInput()
  50. self.receive_data = b""
  51. def write_cmd(self, data: list):
  52. if self.serial_handle:
  53. # data = [(0xff & par1), (0xff & (par1 >> 8))]
  54. # self.clearn_flush()
  55. buf = bytearray(b"")
  56. buf.extend([0x55, 0x55, (0xFF & len(data))])
  57. buf.extend(data)
  58. buf.extend([0xFF & ~sum(data)])
  59. # 55 55 02 5a 01 a4
  60. print("send buf {}".format(self.change_hex_to_int(buf)))
  61. # logger.info("正在发送命令======>>>>> send buf %s", self.change_hex_to_int(buf))
  62. try:
  63. self.serial_handle.write(buf)
  64. # time.sleep(0.001)
  65. return True
  66. except:
  67. self.serial_handle = None
  68. _recv_data = b""
  69. return False
  70. def read_cmd_out_time(self, scan_interval=0.1, out_time=1):
  71. if self.serial_handle:
  72. """
  73. 获取数据,设计超时机制
  74. :param scan_interval:
  75. :param out_time:
  76. :return:
  77. """
  78. n = 0
  79. while 1:
  80. n += 1
  81. time.sleep(scan_interval)
  82. if out_time <= n * scan_interval:
  83. return False
  84. receive_data = self.read_cmd()
  85. if receive_data:
  86. return receive_data
  87. return False
  88. def read_cmd1(self, out_time=1, check=None):
  89. s = time.time()
  90. while 1:
  91. try:
  92. _recv_data = self.serial_handle.read_all() # 读取接收到的数据
  93. except:
  94. self.serial_handle = None
  95. _recv_data = b""
  96. return False
  97. self.receive_data += _recv_data # 写入所有缓存数据
  98. # print(self.receive_data)
  99. if not self.receive_data or len(self.receive_data) < 4:
  100. return
  101. if self.receive_data[0] == 0x55 and self.receive_data[1] == 0x55:
  102. # print("read ori ", self.change_hex_to_int(self.receive_data))
  103. data_len = self.receive_data[2]
  104. if len(self.receive_data) < data_len + 4:
  105. # 此处需要超时机制
  106. print("1数据长度不够,等待下次读取")
  107. # 超时退出
  108. if time.time() - s > out_time:
  109. time.sleep(0.01)
  110. return
  111. _data = self.receive_data[3 : data_len + 4]
  112. # 更新缓存区
  113. self.receive_data = self.receive_data[data_len + 4 :]
  114. # 校验数据
  115. if check is None:
  116. if 0xFF & ~sum(_data[:-1]) == _data[-1]:
  117. # print("data:", self.change_hex_to_int(data[:-1]))
  118. return _data[:-1]
  119. else:
  120. print("数据异常,丢弃")
  121. return
  122. else:
  123. if _data[-1] == check:
  124. return _data[:-1]
  125. else:
  126. print("数据异常,丢弃")
  127. return
  128. else:
  129. # print("起始位不是 55 55 进行移除", self.receive_data[0])
  130. # 起始位不是 55 55 进行移除
  131. index = 0
  132. for index, r_data in enumerate(self.receive_data):
  133. if r_data == 0x55:
  134. break
  135. index += 1
  136. if index >= len(self.receive_data):
  137. self.receive_data = b""
  138. else:
  139. self.receive_data = self.receive_data[index - 1 :]
  140. def read_cmd111(self, out_time=1, check=None):
  141. while 1:
  142. try:
  143. _recv_data = self.serial_handle.read_all() # 读取接收到的数据
  144. except BaseException as e:
  145. print("串口接收报错", e)
  146. self.serial_handle = None
  147. _recv_data = b""
  148. return False
  149. # print("read_cmd", _recv_data)
  150. if not _recv_data or len(_recv_data) < 4:
  151. # print("数据长度不够")
  152. return
  153. print("2 _recv_data", self.change_hex_to_int(_recv_data))
  154. if _recv_data[0] == 0x55 and _recv_data[1] == 0x55:
  155. # print("read ori ", self.change_hex_to_int(self.receive_data))
  156. data_len = _recv_data[2]
  157. if len(_recv_data) < data_len + 4:
  158. # 此处需要超时机制
  159. print("2数据长度不够,等待下次读取")
  160. # if time.time() - s > out_time:
  161. # time.sleep(0.01)
  162. # return
  163. return
  164. _data = _recv_data[3 : data_len + 4]
  165. # 校验数据
  166. if check is None:
  167. if 0xFF & ~sum(_data[:-1]) == _data[-1]:
  168. # print("data:", self.change_hex_to_int(data[:-1]))
  169. return _data[:-1]
  170. else:
  171. print("数据异常,丢弃")
  172. return
  173. else:
  174. if _data[-1] == check:
  175. return _data[:-1]
  176. else:
  177. print("数据异常,丢弃")
  178. return
  179. else:
  180. # 起始位不是 55 55 进行移除
  181. while self.receive_data:
  182. if len(self.receive_data) == 1:
  183. if self.receive_data[0] == 0x55:
  184. break
  185. else:
  186. self.receive_data = b""
  187. else:
  188. if (
  189. self.receive_data[0] == 0x55
  190. and self.receive_data[1] == 0x55
  191. ):
  192. break
  193. else:
  194. self.receive_data = self.receive_data[1:]
  195. def read_cmd(self, out_time=1, check=None, out_time_n=5):
  196. n = 0
  197. while 1:
  198. try:
  199. read_d = self.serial_handle.read_all() # 读取接收到的数据
  200. self.receive_data += read_d
  201. except BaseException as e:
  202. print("串口接收报错", e)
  203. self.serial_handle = None
  204. return False
  205. if len(self.receive_data) < 4:
  206. break
  207. print("read ori ", self.change_hex_to_int(self.receive_data))
  208. if self.receive_data[0] == 0x55 and self.receive_data[1] == 0x55:
  209. # print("read ori ", self.change_hex_to_int(self.receive_data))
  210. data_len = self.receive_data[2]
  211. if len(self.receive_data) < data_len + 4:
  212. # 此处需要超时机制
  213. # print("数据长度不够,等待下次读取")
  214. # 超时退出
  215. # if not self.serial_handle.txdone():
  216. # return None
  217. # n += 1
  218. # if n > out_time_n:
  219. # return None
  220. # time.sleep(0.01)
  221. continue
  222. _data = self.receive_data[3 : data_len + 4]
  223. # 更新缓存区
  224. self.receive_data = self.receive_data[data_len + 4 :]
  225. # 校验数据
  226. if 0xFF & ~sum(_data[:-1]) == _data[-1]:
  227. # print("receive_data:", self.change_hex_to_int(self.receive_data[:-1]))
  228. return _data[:-1]
  229. else:
  230. return None
  231. else:
  232. # print("起始位不是 55 55 进行移除", self.receive_data[0])
  233. # 起始位不是 55 55 进行移除
  234. while self.receive_data:
  235. if len(self.receive_data) == 1:
  236. if self.receive_data[0] == 0x55:
  237. break
  238. else:
  239. self.receive_data = b""
  240. else:
  241. if (
  242. self.receive_data[0] == 0x55
  243. and self.receive_data[1] == 0x55
  244. ):
  245. break
  246. else:
  247. self.receive_data = self.receive_data[1:]
  248. def change_hex_to_int(self, _bytearray):
  249. return " ".join([hex(x)[2:].zfill(2) for x in _bytearray])
  250. def scan_serial_port(self, port_name=None):
  251. plist = list(serial.tools.list_ports.comports())
  252. if len(plist) <= 0:
  253. return None
  254. if port_name:
  255. for i in plist:
  256. # print("port", i)
  257. if i.name == port_name:
  258. return i
  259. return None
  260. else:
  261. for i in plist:
  262. print("串口列表:", i.description)
  263. print("串口列表:", i.name)
  264. if "CH340" in i.description:
  265. print("CH340:", i.name)
  266. self.port_name = i.name
  267. print("----------", i)
  268. return plist
  269. def open_serial_port(self):
  270. if self.serial_handle:
  271. if not s.check_connect():
  272. self.serial_handle.open()
  273. return True
  274. def close_serial_port(self):
  275. if self.serial_handle:
  276. self.serial_handle.close()
  277. print("{}串口已关闭".format(self.port_name))
  278. self.serial_handle = None
  279. def __del__(self):
  280. self.close_serial_port()
  281. if __name__ == "__main__":
  282. s = SerialIns(port_name="COM5", baud=115200, timeout=0.1)
  283. s.scan_serial_port()
  284. s.clearn_flush()
  285. print("-" * 30)
  286. for i in range(2):
  287. data = [2,99,1]
  288. s.write_cmd(data)
  289. time.sleep(0.1)
  290. data = s.read_cmd()
  291. # print("data-->", data)
  292. if data:
  293. print("data--> {}".format(s.change_hex_to_int(data)))
  294. time.sleep(1)
  295. print("-" * 30)
  296. s.close_serial_port()