SerialIns.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324
  1. #!usr/bin/python3
  2. import serial.tools.list_ports
  3. import time
  4. import logging
  5. logger = logging.getLogger(__name__)
  6. # https://blog.csdn.net/weixin_44289254/article/details/121583562
  7. class SerialIns(object):
  8. def __init__(self, port_name=None, baud=9600, timeout=0.01):
  9. self.port_name = port_name
  10. self.baud = baud
  11. self.serial_handle = None
  12. # self.serial_handle = self.check_connect()
  13. try:
  14. self.serial_handle = serial.Serial(
  15. port=self.port_name, baudrate=self.baud, timeout=timeout
  16. )
  17. print("{}打开成功".format(self.port_name))
  18. except Exception as e:
  19. print(e)
  20. self.serial_handle = None
  21. print("{}打开失败".format(self.port_name))
  22. self.receive_data = b""
  23. # self.receive_data = bytearray(b'\x00\UU\x02d\x00\x9b\x9b')
  24. def read_line(self):
  25. c = self.serial_handle.inWaiting()
  26. if c:
  27. time.sleep(0.02)
  28. # 确保缓存写入完成
  29. if c == self.serial_handle.inWaiting():
  30. return self.serial_handle.readline()
  31. return None
  32. def read_all(self):
  33. return self.serial_handle.read_all()
  34. def check_connect(self):
  35. if not self.port_name:
  36. self.scan_serial_port()
  37. if self.port_name:
  38. self.serial_handle = serial.Serial(self.port_name, self.baud)
  39. else:
  40. try:
  41. # if self.scan_serial_port(self.port_name):
  42. self.serial_handle = serial.Serial(self.port_name, self.baud)
  43. print("{}打开成功".format(self.port_name))
  44. except:
  45. print("{}打开失败".format(self.port_name))
  46. return self.serial_handle
  47. def clearn_flush(self): # 清空接收缓存
  48. if self.serial_handle:
  49. self.serial_handle.flushInput()
  50. self.receive_data = b""
  51. def write_cmd(self, data: list):
  52. if self.serial_handle:
  53. # data = [(0xff & par1), (0xff & (par1 >> 8))]
  54. # self.clearn_flush()
  55. buf = bytearray(b"")
  56. buf.extend([0x55, 0x55, (0xFF & len(data))])
  57. buf.extend(data)
  58. buf.extend([0xFF & ~sum(data)])
  59. # 55 55 02 5a 01 a4
  60. print("send buf {}".format(self.change_hex_to_int(buf)))
  61. logger.info("正在发送命令======>>>>> send buf %s", self.change_hex_to_int(buf))
  62. try:
  63. self.serial_handle.write(buf)
  64. return True
  65. except:
  66. self.serial_handle = None
  67. _recv_data = b""
  68. return False
  69. def read_cmd_out_time(self, scan_interval=0.1, out_time=1):
  70. if self.serial_handle:
  71. """
  72. 获取数据,设计超时机制
  73. :param scan_interval:
  74. :param out_time:
  75. :return:
  76. """
  77. n = 0
  78. while 1:
  79. n += 1
  80. time.sleep(scan_interval)
  81. if out_time <= n * scan_interval:
  82. return False
  83. receive_data = self.read_cmd()
  84. if receive_data:
  85. return receive_data
  86. return False
  87. def read_cmd1(self, out_time=1, check=None):
  88. s = time.time()
  89. while 1:
  90. try:
  91. _recv_data = self.serial_handle.read_all() # 读取接收到的数据
  92. except:
  93. self.serial_handle = None
  94. _recv_data = b""
  95. return False
  96. self.receive_data += _recv_data # 写入所有缓存数据
  97. # print(self.receive_data)
  98. if not self.receive_data or len(self.receive_data) < 4:
  99. return
  100. if self.receive_data[0] == 0x55 and self.receive_data[1] == 0x55:
  101. # print("read ori ", self.change_hex_to_int(self.receive_data))
  102. data_len = self.receive_data[2]
  103. if len(self.receive_data) < data_len + 4:
  104. # 此处需要超时机制
  105. print("1数据长度不够,等待下次读取")
  106. # 超时退出
  107. if time.time() - s > out_time:
  108. time.sleep(0.01)
  109. return
  110. _data = self.receive_data[3 : data_len + 4]
  111. # 更新缓存区
  112. self.receive_data = self.receive_data[data_len + 4 :]
  113. # 校验数据
  114. if check is None:
  115. if 0xFF & ~sum(_data[:-1]) == _data[-1]:
  116. # print("data:", self.change_hex_to_int(data[:-1]))
  117. return _data[:-1]
  118. else:
  119. print("数据异常,丢弃")
  120. return
  121. else:
  122. if _data[-1] == check:
  123. return _data[:-1]
  124. else:
  125. print("数据异常,丢弃")
  126. return
  127. else:
  128. # print("起始位不是 55 55 进行移除", self.receive_data[0])
  129. # 起始位不是 55 55 进行移除
  130. index = 0
  131. for index, r_data in enumerate(self.receive_data):
  132. if r_data == 0x55:
  133. break
  134. index += 1
  135. if index >= len(self.receive_data):
  136. self.receive_data = b""
  137. else:
  138. self.receive_data = self.receive_data[index - 1 :]
  139. def read_cmd111(self, out_time=1, check=None):
  140. while 1:
  141. try:
  142. _recv_data = self.serial_handle.read_all() # 读取接收到的数据
  143. except BaseException as e:
  144. print("串口接收报错", e)
  145. self.serial_handle = None
  146. _recv_data = b""
  147. return False
  148. # print("read_cmd", _recv_data)
  149. if not _recv_data or len(_recv_data) < 4:
  150. # print("数据长度不够")
  151. return
  152. print("2 _recv_data", self.change_hex_to_int(_recv_data))
  153. if _recv_data[0] == 0x55 and _recv_data[1] == 0x55:
  154. # print("read ori ", self.change_hex_to_int(self.receive_data))
  155. data_len = _recv_data[2]
  156. if len(_recv_data) < data_len + 4:
  157. # 此处需要超时机制
  158. print("2数据长度不够,等待下次读取")
  159. # if time.time() - s > out_time:
  160. # time.sleep(0.01)
  161. # return
  162. return
  163. _data = _recv_data[3 : data_len + 4]
  164. # 校验数据
  165. if check is None:
  166. if 0xFF & ~sum(_data[:-1]) == _data[-1]:
  167. # print("data:", self.change_hex_to_int(data[:-1]))
  168. return _data[:-1]
  169. else:
  170. print("数据异常,丢弃")
  171. return
  172. else:
  173. if _data[-1] == check:
  174. return _data[:-1]
  175. else:
  176. print("数据异常,丢弃")
  177. return
  178. else:
  179. # 起始位不是 55 55 进行移除
  180. while self.receive_data:
  181. if len(self.receive_data) == 1:
  182. if self.receive_data[0] == 0x55:
  183. break
  184. else:
  185. self.receive_data = b""
  186. else:
  187. if (
  188. self.receive_data[0] == 0x55
  189. and self.receive_data[1] == 0x55
  190. ):
  191. break
  192. else:
  193. self.receive_data = self.receive_data[1:]
  194. def read_cmd(self, out_time=1, check=None, out_time_n=5):
  195. n = 0
  196. while 1:
  197. try:
  198. read_d = self.serial_handle.read_all() # 读取接收到的数据
  199. self.receive_data += read_d
  200. except BaseException as e:
  201. print("串口接收报错", e)
  202. self.serial_handle = None
  203. return False
  204. if len(self.receive_data) < 4:
  205. break
  206. if self.receive_data[0] == 0x55 and self.receive_data[1] == 0x55:
  207. # print("read ori ", self.change_hex_to_int(self.receive_data))
  208. data_len = self.receive_data[2]
  209. if len(self.receive_data) < data_len + 4:
  210. # 此处需要超时机制
  211. # print("数据长度不够,等待下次读取")
  212. # 超时退出
  213. # if not self.serial_handle.txdone():
  214. # return None
  215. # n += 1
  216. # if n > out_time_n:
  217. # return None
  218. # time.sleep(0.01)
  219. continue
  220. _data = self.receive_data[3 : data_len + 4]
  221. # 更新缓存区
  222. self.receive_data = self.receive_data[data_len + 4 :]
  223. # 校验数据
  224. if 0xFF & ~sum(_data[:-1]) == _data[-1]:
  225. # print("receive_data:", self.change_hex_to_int(self.receive_data[:-1]))
  226. return _data[:-1]
  227. else:
  228. return None
  229. else:
  230. # print("起始位不是 55 55 进行移除", self.receive_data[0])
  231. # 起始位不是 55 55 进行移除
  232. while self.receive_data:
  233. if len(self.receive_data) == 1:
  234. if self.receive_data[0] == 0x55:
  235. break
  236. else:
  237. self.receive_data = b""
  238. else:
  239. if (
  240. self.receive_data[0] == 0x55
  241. and self.receive_data[1] == 0x55
  242. ):
  243. break
  244. else:
  245. self.receive_data = self.receive_data[1:]
  246. def change_hex_to_int(self, _bytearray):
  247. return " ".join([hex(x)[2:].zfill(2) for x in _bytearray])
  248. def scan_serial_port(self, port_name=None):
  249. plist = list(serial.tools.list_ports.comports())
  250. if len(plist) <= 0:
  251. return None
  252. if port_name:
  253. for i in plist:
  254. # print("port", i)
  255. if i.name == port_name:
  256. return i
  257. return None
  258. else:
  259. for i in plist:
  260. print("串口列表:", i.description)
  261. print("串口列表:", i.name)
  262. if "CH340" in i.description:
  263. print("CH340:", i.name)
  264. self.port_name = i.name
  265. print("----------", i)
  266. return plist
  267. def open_serial_port(self):
  268. if self.serial_handle:
  269. if not s.check_connect():
  270. self.serial_handle.open()
  271. return True
  272. def close_serial_port(self):
  273. if self.serial_handle:
  274. self.serial_handle.close()
  275. print("{}串口已关闭".format(self.port_name))
  276. self.serial_handle = None
  277. def __del__(self):
  278. self.close_serial_port()
  279. if __name__ == "__main__":
  280. s = SerialIns(port_name="COM5", baud=115200, timeout=0.1)
  281. s.scan_serial_port()
  282. s.clearn_flush()
  283. print("-" * 30)
  284. for i in range(2):
  285. data = [2,99,1]
  286. s.write_cmd(data)
  287. time.sleep(0.1)
  288. data = s.read_cmd()
  289. # print("data-->", data)
  290. if data:
  291. print("data--> {}".format(s.change_hex_to_int(data)))
  292. time.sleep(1)
  293. print("-" * 30)
  294. s.close_serial_port()