DeviceControl.py 9.8 KB


  1. import asyncio
  2. import serial.tools.list_ports
  3. import time
  4. from .SerialIns import SerialIns
  5. from sockets import socket_manager
  6. # mcu命令
  7. class DeviceControl:
  8. def __init__(self):
  9. self.serial_ins = None
  10. self.connected_ports_dict = {} # 已连接的ports
  11. def scan_serial_port(self) -> dict:
  12. # 获取所有可用串口列表
  13. ports_dict = {}
  14. ports = serial.tools.list_ports.comports()
  15. # 遍历所有端口并打印信息
  16. for port in ports:
  17. if "CH340" in port.description:
  18. ports_dict[port.name] = {
  19. "name": port.name,
  20. "device": port.device,
  21. "description": port.description,
  22. "hwid": port.hwid,
  23. "manufacturer": port.manufacturer,
  24. "product": port.product,
  25. "serial_number": port.serial_number,
  26. }
  27. if len(ports_dict) <= 0:
  28. return {}
  29. return ports_dict
  30. def remove_port(self, port_name):
  31. '''移除串口'''
  32. print("remove", port_name)
  33. self.sign_data.emit(
  34. {
  35. "_type": "remove_port",
  36. "plugins_mode": "auto_select_com",
  37. "data": {"port_name": port_name},
  38. }
  39. )
  40. def sendMsg(self,code=0,msg="",data=None):
  41. asyncio.run(socket_manager.send_message(code=code, msg=msg, data=data))
  42. # print(code,msg,data)
  43. def add_port_by_linkage(self, port_name):
  44. # port_value :串口基础信息
  45. # todo 根据prot_value 信息自动进行连接
  46. print("add", port_name)
  47. # 对没有连接的设备进行尝试连接
  48. message_data = {
  49. "_type": "show_info",
  50. "plugins_mode": "auto_select_com",
  51. "data": {"text": "开始识别接口:{}".format(port_name)},
  52. }
  53. self.sendMsg(msg="开始识别接口:{}".format(port_name), data=message_data)
  54. time.sleep(1)
  55. """
  56. 步骤:
  57. 1、进行临时连接,并发送命令,成功后,自动连接对应设备
  58. """
  59. try:
  60. # 尝试使用115200波特率链接
  61. serial_handle = serial.Serial(
  62. port=port_name, baudrate=115200, timeout=0.5
  63. )
  64. except:
  65. message_data = {
  66. "_type": "show_info",
  67. "plugins_mode": "auto_select_com",
  68. "data": {"text": "串口:{} 被占用,或无法识别".format(port_name)},
  69. }
  70. self.sendMsg(
  71. msg="串口:{} 被占用,或无法识别".format(port_name).format(port_name),
  72. data=message_data,
  73. )
  74. print("串口:{} 被占用".format(port_name))
  75. return
  76. time.sleep(2)
  77. print("开始发送命令")
  78. data = [90, 1]
  79. try:
  80. serial_handle.flushInput() # 尝试重置输入缓冲区
  81. except serial.SerialTimeoutException:
  82. print("超时错误:无法在规定时间内重置输入缓冲区。")
  83. serial_handle.close()
  84. return
  85. print("尝试写入数据")
  86. buf = bytearray(b"")
  87. buf.extend([0x55, 0x55, (0xFF & len(data))])
  88. buf.extend(data)
  89. buf.extend([0xFF & ~sum(data)])
  90. try:
  91. self.receive_data = b""
  92. serial_handle.write(buf)
  93. except serial.SerialTimeoutException:
  94. print("写入数据错误")
  95. serial_handle.close()
  96. return
  97. time.sleep(0.3)
  98. print("尝试接收命令")
  99. receive_data = self.read_cmd(serial_handle)
  100. device_id = 0
  101. if receive_data:
  102. print("receive_data", receive_data)
  103. if receive_data[0] == 90:
  104. connect_flag = receive_data[1]
  105. device_id = receive_data[2]
  106. print("关闭串口:{}".format(port_name))
  107. serial_handle.close()
  108. if device_id > 0:
  109. if device_id == 1:
  110. self.to_connect_com(port_name)
  111. message_data = {
  112. "_type": "show_info",
  113. "plugins_mode": "auto_select_com",
  114. "data": {"text": "MCU开始连接"},
  115. }
  116. self.sendMsg(
  117. msg="MCU开始连接",
  118. data=message_data,
  119. )
  120. self.connected_ports_dict[port_name] = "MCU"
  121. message_data = {
  122. "_type": "select_port_name",
  123. "plugins_mode": "auto_select_com",
  124. "data": {
  125. "device_name": "mcu" if device_id == 1 else "remote_control",
  126. "port_name": port_name,
  127. },
  128. }
  129. self.sendMsg(
  130. msg="MCU连接成功",
  131. data=message_data,
  132. )
  133. else:
  134. print("串口无法识别")
  135. # 走其他途径处理
  136. # 检查当前MCU链接是否正常
  137. # 正常跳过;记录为其他列表
  138. # 不正常进行尝试连接
  139. # 连接不上,记录为其他列表
  140. def read_cmd(self, serial_handle, check=None):
  141. n = 0
  142. while 1:
  143. try:
  144. read_d = serial_handle.read_all() # 读取接收到的数据
  145. self.receive_data += read_d
  146. except BaseException as e:
  147. print("171串口接收报错", e)
  148. self.serial_handle = None
  149. return False
  150. if len(self.receive_data) < 4:
  151. break
  152. if self.receive_data[0] == 0x55 and self.receive_data[1] == 0x55:
  153. # print("read ori ", self.change_hex_to_int(self.receive_data))
  154. data_len = self.receive_data[2]
  155. if len(self.receive_data) < data_len + 4:
  156. # 此处需要超时机制
  157. # print("数据长度不够,等待下次读取")
  158. # 超时退出
  159. # if not self.serial_handle.txdone():
  160. # return None
  161. # n += 1
  162. # if n > out_time_n:
  163. # return None
  164. # time.sleep(0.01)
  165. continue
  166. _data = self.receive_data[3 : data_len + 4]
  167. # 更新缓存区
  168. self.receive_data = self.receive_data[data_len + 4 :]
  169. # 校验数据
  170. if 0xFF & ~sum(_data[:-1]) == _data[-1]:
  171. # print("receive_data:", self.change_hex_to_int(self.receive_data[:-1]))
  172. return _data[:-1]
  173. else:
  174. return None
  175. else:
  176. # print("起始位不是 55 55 进行移除", self.receive_data[0])
  177. # 起始位不是 55 55 进行移除
  178. while self.receive_data:
  179. if len(self.receive_data) == 1:
  180. if self.receive_data[0] == 0x55:
  181. break
  182. else:
  183. self.receive_data = b""
  184. else:
  185. if (
  186. self.receive_data[0] == 0x55
  187. and self.receive_data[1] == 0x55
  188. ):
  189. break
  190. else:
  191. self.receive_data = self.receive_data[1:]
  192. def to_connect_com(self, port_name):
  193. self.close_connect()
  194. time.sleep(0.5)
  195. try:
  196. self.serial_ins = SerialIns(port_name=port_name, baud=115200)
  197. # self.serial_ins = SerialIns(port_name=port_name, baud=9600)
  198. if self.serial_ins.serial_handle:
  199. self.sendMsg(msg="条码识别 打开串口成功")
  200. self.connect_state = True
  201. self.start()
  202. return True
  203. else:
  204. self.sendMsg(code=1, msg="条码识别 打开串口失败")
  205. self.serial_ins = None
  206. self.connect_state = False
  207. except:
  208. self.sendMsg(code=1, msg="条码识别 打开串口失败")
  209. # print("条码识别 打开串口失败")
  210. self.serial_ins = None
  211. self.connect_state = False
  212. return False
  213. def close_connect(self):
  214. self.port_name = ""
  215. if self.serial_ins:
  216. self.serial_ins.close_serial_port()
  217. self.connect_state = False
  218. if __name__ == "__main__":
  219. device_control = DeviceControl()
  220. p_list = []
  221. temp_ports_dict = {}
  222. while 1:
  223. time.sleep(1)
  224. ports_dict = device_control.scan_serial_port()
  225. temp_ports_dict = ports_dict
  226. if not ports_dict:
  227. # 全部清空 移除所有串口
  228. if p_list:
  229. _p = p_list.pop()
  230. device_control.remove_port(_p)
  231. continue
  232. if ports_dict:
  233. # print(plist)
  234. for index, _i in enumerate(p_list):
  235. if _i not in ports_dict:
  236. _p = p_list.pop(index)
  237. device_control.remove_port(_p)
  238. for _port_name, _port_value in ports_dict.items():
  239. if _port_name not in p_list:
  240. # try:
  241. p_list.append(_port_name)
  242. device_control.add_port_by_linkage(_port_name)
  243. # except BaseException as e:
  244. # print(e.__traceback__.tb_frame.f_globals["__file__"]) # 发生异常所在的文件
  245. # print(e.__traceback__.tb_lineno) # 发生异常所在的行数
  246. # print("串口不存在{} {}".format(_port_name, e))
  247. # threading.Thread(target=self.add_port, args=(_port_name, _port_value)).start()
  248. # self.add_port(_p)