DeviceControl.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301
  1. import asyncio
  2. import serial.tools.list_ports
  3. import time, json
  4. from .SerialIns import SerialIns
  5. from utils.SingletonType import SingletonType
  6. from .BaseClass import BaseClass
  7. from sockets import ConnectionManager
  8. # mcu命令
  9. class DeviceControl(BaseClass,metaclass=SingletonType):
  10. def __init__(self, websocket_manager: ConnectionManager):
  11. super().__init__(websocket_manager)
  12. self.msg_type = "mcu"
  13. self.serial_ins = None
  14. self.connected_ports_dict = {} # 已连接的ports
  15. self.p_list = []
  16. self.temp_ports_dict = {}
  17. self.is_running = False
  18. self.connect_state = False
  19. def scan_serial_port(self) -> dict:
  20. # 获取所有可用串口列表
  21. ports_dict = {}
  22. ports = serial.tools.list_ports.comports()
  23. # 遍历所有端口并打印信息
  24. for port in ports:
  25. if "CH340" in port.description:
  26. ports_dict[port.name] = {
  27. "name": port.name,
  28. "device": port.device,
  29. "description": port.description,
  30. "hwid": port.hwid,
  31. "manufacturer": port.manufacturer,
  32. "product": port.product,
  33. "serial_number": port.serial_number,
  34. }
  35. if len(ports_dict) <= 0:
  36. return {}
  37. return ports_dict
  38. def remove_port(self, port_name):
  39. """移除串口"""
  40. print("remove", port_name)
  41. data = {
  42. "_type": "remove_port",
  43. "plugins_mode": "auto_select_com",
  44. "data": {"port_name": port_name},
  45. }
  46. self.sendSocketMessage(1, "串口被移除", data)
  47. def add_port_by_linkage(self, port_name):
  48. # port_value :串口基础信息
  49. # todo 根据prot_value 信息自动进行连接
  50. print("add", port_name)
  51. # 对没有连接的设备进行尝试连接
  52. message_data = {
  53. "_type": "show_info",
  54. "plugins_mode": "auto_select_com",
  55. "data": {"text": "开始识别接口:{}".format(port_name)},
  56. }
  57. self.sendSocketMessage(msg="开始识别接口:{}".format(port_name), data=message_data)
  58. time.sleep(1)
  59. """
  60. 步骤:
  61. 1、进行临时连接,并发送命令,成功后,自动连接对应设备
  62. """
  63. try:
  64. # 尝试使用115200波特率链接
  65. serial_handle = serial.Serial(port=port_name, baudrate=115200, timeout=0.5)
  66. except:
  67. message_data = {
  68. "_type": "show_info",
  69. "plugins_mode": "auto_select_com",
  70. "data": {"text": "串口:{} 被占用,或无法识别".format(port_name)},
  71. }
  72. self.sendSocketMessage(
  73. 1,
  74. msg="串口:{} 被占用,或无法识别".format(port_name).format(port_name),
  75. data=message_data,
  76. )
  77. print("串口:{} 被占用".format(port_name))
  78. return
  79. time.sleep(2)
  80. print("开始发送命令")
  81. data = [90, 1]
  82. try:
  83. serial_handle.flushInput() # 尝试重置输入缓冲区
  84. except serial.SerialTimeoutException:
  85. print("超时错误:无法在规定时间内重置输入缓冲区。")
  86. self.sendSocketMessage(
  87. 1,
  88. msg="超时错误:无法在规定时间内重置输入缓冲区。",
  89. data=None,
  90. )
  91. serial_handle.close()
  92. return
  93. print("尝试写入数据")
  94. buf = bytearray(b"")
  95. buf.extend([0x55, 0x55, (0xFF & len(data))])
  96. buf.extend(data)
  97. buf.extend([0xFF & ~sum(data)])
  98. try:
  99. self.receive_data = b""
  100. serial_handle.write(buf)
  101. except serial.SerialTimeoutException:
  102. print("写入数据错误")
  103. serial_handle.close()
  104. return
  105. time.sleep(0.3)
  106. print("尝试接收命令")
  107. receive_data = self.read_cmd(serial_handle)
  108. device_id = 0
  109. if receive_data:
  110. print("receive_data", receive_data)
  111. if receive_data[0] == 90:
  112. connect_flag = receive_data[1]
  113. device_id = receive_data[2]
  114. print("关闭串口:{}".format(port_name))
  115. serial_handle.close()
  116. if device_id > 0:
  117. if device_id == 1:
  118. self.to_connect_com(port_name)
  119. message_data = {
  120. "_type": "show_info",
  121. "plugins_mode": "auto_select_com",
  122. "data": {"text": "MCU开始连接"},
  123. }
  124. self.sendSocketMessage(
  125. msg="MCU开始连接",
  126. data=message_data,
  127. )
  128. self.connected_ports_dict[port_name] = "MCU"
  129. message_data = {
  130. "_type": "select_port_name",
  131. "plugins_mode": "auto_select_com",
  132. "data": {
  133. "device_name": "mcu" if device_id == 1 else "remote_control",
  134. "port_name": port_name,
  135. },
  136. }
  137. self.sendSocketMessage(
  138. msg="MCU连接成功",
  139. data=message_data,
  140. )
  141. else:
  142. print("串口无法识别")
  143. # 走其他途径处理
  144. # 检查当前MCU链接是否正常
  145. # 正常跳过;记录为其他列表
  146. # 不正常进行尝试连接
  147. # 连接不上,记录为其他列表
  148. def to_connect_com(self, port_name):
  149. # 关闭串口
  150. print("to_connect_com", port_name)
  151. self.close_connect()
  152. time.sleep(0.3)
  153. self.connect_state = False
  154. try:
  155. self.serial_ins = SerialIns(port_name=port_name, baud=115200, timeout=0.1)
  156. if not self.serial_ins.serial_handle:
  157. message_data= {
  158. "_type": "show_info",
  159. "plugins_mode": "mcu",
  160. "data": "MCU 打开串口失败",
  161. }
  162. self.sendSocketMessage(
  163. msg="MCU 打开串口失败",
  164. data=message_data,
  165. )
  166. self.serial_ins = None
  167. self.connect_state = False
  168. return False
  169. except:
  170. message_data={
  171. "_type": "show_info",
  172. "plugins_mode": "mcu",
  173. "data": "MCU 打开串口失败",
  174. }
  175. self.sendSocketMessage(
  176. msg="MCU 打开串口失败",
  177. data=message_data,
  178. )
  179. self.serial_ins = None
  180. self.connect_state = False
  181. return False
  182. message_data={"_type": "show_info", "plugins_mode": "mcu", "data": "MCU 开始连接"}
  183. self.sendSocketMessage(
  184. msg="MCU 开始连接",
  185. data=message_data,
  186. )
  187. # =======================发送连接请求=================================
  188. cmd = 90
  189. data = [cmd, 1]
  190. print("405 发送 连接请求 -----------------------------------------")
  191. print(self.serial_ins)
  192. # self.serial_ins.clearn_flush()
  193. self.serial_ins.write_cmd(data)
  194. # 延迟接收数据
  195. time.sleep(0.3)
  196. receive_data = self.serial_ins.read_cmd(out_time=1)
  197. if receive_data:
  198. print(
  199. "409 receive_data--90:{}".format(self.change_hex_to_int(receive_data))
  200. )
  201. if receive_data:
  202. # receive_data[2]=1 表示为MCU设备编号
  203. if receive_data[0] == 90 and receive_data[2] == 1:
  204. connect_flag = receive_data[1]
  205. # 是否有初始化
  206. try:
  207. mcu_has_been_set = receive_data[
  208. 6
  209. ] # 设备是否有初始化 ,1 表示已初始化
  210. except:
  211. mcu_has_been_set = 99 # 未知状态
  212. print("MCU初始化信息{}".format(mcu_has_been_set))
  213. message_data = {
  214. "_type": "show_info",
  215. "plugins_mode": "mcu",
  216. "data": "MCU 已连接",
  217. }
  218. self.sendSocketMessage(
  219. msg="MCU 已连接",
  220. data=message_data,
  221. )
  222. self.connect_state = True
  223. self.is_running = True
  224. print("MCU 已连接")
  225. self.port_name = port_name
  226. return
  227. print("MCU 连接失败")
  228. message_data = {
  229. "_type": "show_info",
  230. "plugins_mode": "mcu",
  231. "data": "MCU 连接失败",
  232. }
  233. self.sendSocketMessage(
  234. msg="MCU 连接失败",
  235. data=message_data,
  236. )
  237. self.close_connect()
  238. def close_connect(self):
  239. self.port_name = ""
  240. if self.serial_ins:
  241. self.serial_ins.close_serial_port()
  242. self.is_running = False
  243. self.connect_state = False
  244. self.connected_ports_dict = {} # 已连接的ports
  245. self.p_list = []
  246. self.temp_ports_dict = {}
  247. print("关闭MCU")
  248. async def checkMcuConnection(device_ctrl: DeviceControl):
  249. if device_ctrl.is_running == True:
  250. message = {
  251. "_type": "select_port_name",
  252. "plugins_mode": "auto_select_com",
  253. "data": device_ctrl.temp_ports_dict,
  254. }
  255. device_ctrl.sendSocketMessage(code=0, msg="MCU连接成功", data=message)
  256. return
  257. """实时检测串口是否连接"""
  258. while True:
  259. await asyncio.sleep(0.5)
  260. ports_dict = device_ctrl.scan_serial_port()
  261. device_ctrl.temp_ports_dict = ports_dict
  262. if not ports_dict:
  263. # 全部清空 移除所有串口
  264. if device_ctrl.p_list:
  265. _p = device_ctrl.p_list.pop()
  266. device_ctrl.remove_port(_p)
  267. print("串口未连接,请检查")
  268. device_ctrl.sendSocketMessage(code=1, msg="串口未连接,请检查")
  269. continue
  270. if ports_dict:
  271. for index, _i in enumerate(device_ctrl.p_list):
  272. if _i not in ports_dict:
  273. _p = device_ctrl.p_list.pop(index)
  274. device_ctrl.remove_port(_p)
  275. for _port_name, _port_value in ports_dict.items():
  276. if _port_name not in device_ctrl.p_list:
  277. try:
  278. device_ctrl.p_list.append(_port_name)
  279. device_ctrl.add_port_by_linkage(_port_name)
  280. except BaseException as e:
  281. print("串口不存在{} {}".format(_port_name, e))