DeviceControl.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300
  1. import asyncio
  2. import serial.tools.list_ports
  3. import time, json
  4. from .SerialIns import SerialIns
  5. from utils.SingletonType import SingletonType
  6. from .BaseClass import BaseClass
  7. from sockets import ConnectionManager
  8. # mcu命令
  9. class DeviceControl(BaseClass,metaclass=SingletonType):
  10. def __init__(self, websocket_manager: ConnectionManager):
  11. super().__init__(websocket_manager)
  12. self.serial_ins = None
  13. self.connected_ports_dict = {} # 已连接的ports
  14. self.p_list = []
  15. self.temp_ports_dict = {}
  16. self.is_running = False
  17. self.connect_state = False
  18. def scan_serial_port(self) -> dict:
  19. # 获取所有可用串口列表
  20. ports_dict = {}
  21. ports = serial.tools.list_ports.comports()
  22. # 遍历所有端口并打印信息
  23. for port in ports:
  24. if "CH340" in port.description:
  25. ports_dict[port.name] = {
  26. "name": port.name,
  27. "device": port.device,
  28. "description": port.description,
  29. "hwid": port.hwid,
  30. "manufacturer": port.manufacturer,
  31. "product": port.product,
  32. "serial_number": port.serial_number,
  33. }
  34. if len(ports_dict) <= 0:
  35. return {}
  36. return ports_dict
  37. def remove_port(self, port_name):
  38. """移除串口"""
  39. print("remove", port_name)
  40. data = {
  41. "_type": "remove_port",
  42. "plugins_mode": "auto_select_com",
  43. "data": {"port_name": port_name},
  44. }
  45. self.sendSocketMessage(1, "串口被移除", data)
  46. def add_port_by_linkage(self, port_name):
  47. # port_value :串口基础信息
  48. # todo 根据prot_value 信息自动进行连接
  49. print("add", port_name)
  50. # 对没有连接的设备进行尝试连接
  51. message_data = {
  52. "_type": "show_info",
  53. "plugins_mode": "auto_select_com",
  54. "data": {"text": "开始识别接口:{}".format(port_name)},
  55. }
  56. self.sendSocketMessage(msg="开始识别接口:{}".format(port_name), data=message_data)
  57. time.sleep(1)
  58. """
  59. 步骤:
  60. 1、进行临时连接,并发送命令,成功后,自动连接对应设备
  61. """
  62. try:
  63. # 尝试使用115200波特率链接
  64. serial_handle = serial.Serial(port=port_name, baudrate=115200, timeout=0.5)
  65. except:
  66. message_data = {
  67. "_type": "show_info",
  68. "plugins_mode": "auto_select_com",
  69. "data": {"text": "串口:{} 被占用,或无法识别".format(port_name)},
  70. }
  71. self.sendSocketMessage(
  72. 1,
  73. msg="串口:{} 被占用,或无法识别".format(port_name).format(port_name),
  74. data=message_data,
  75. )
  76. print("串口:{} 被占用".format(port_name))
  77. return
  78. time.sleep(2)
  79. print("开始发送命令")
  80. data = [90, 1]
  81. try:
  82. serial_handle.flushInput() # 尝试重置输入缓冲区
  83. except serial.SerialTimeoutException:
  84. print("超时错误:无法在规定时间内重置输入缓冲区。")
  85. self.sendSocketMessage(
  86. 1,
  87. msg="超时错误:无法在规定时间内重置输入缓冲区。",
  88. data=None,
  89. )
  90. serial_handle.close()
  91. return
  92. print("尝试写入数据")
  93. buf = bytearray(b"")
  94. buf.extend([0x55, 0x55, (0xFF & len(data))])
  95. buf.extend(data)
  96. buf.extend([0xFF & ~sum(data)])
  97. try:
  98. self.receive_data = b""
  99. serial_handle.write(buf)
  100. except serial.SerialTimeoutException:
  101. print("写入数据错误")
  102. serial_handle.close()
  103. return
  104. time.sleep(0.3)
  105. print("尝试接收命令")
  106. receive_data = self.read_cmd(serial_handle)
  107. device_id = 0
  108. if receive_data:
  109. print("receive_data", receive_data)
  110. if receive_data[0] == 90:
  111. connect_flag = receive_data[1]
  112. device_id = receive_data[2]
  113. print("关闭串口:{}".format(port_name))
  114. serial_handle.close()
  115. if device_id > 0:
  116. if device_id == 1:
  117. self.to_connect_com(port_name)
  118. message_data = {
  119. "_type": "show_info",
  120. "plugins_mode": "auto_select_com",
  121. "data": {"text": "MCU开始连接"},
  122. }
  123. self.sendSocketMessage(
  124. msg="MCU开始连接",
  125. data=message_data,
  126. )
  127. self.connected_ports_dict[port_name] = "MCU"
  128. message_data = {
  129. "_type": "select_port_name",
  130. "plugins_mode": "auto_select_com",
  131. "data": {
  132. "device_name": "mcu" if device_id == 1 else "remote_control",
  133. "port_name": port_name,
  134. },
  135. }
  136. self.sendSocketMessage(
  137. msg="MCU连接成功",
  138. data=message_data,
  139. )
  140. else:
  141. print("串口无法识别")
  142. # 走其他途径处理
  143. # 检查当前MCU链接是否正常
  144. # 正常跳过;记录为其他列表
  145. # 不正常进行尝试连接
  146. # 连接不上,记录为其他列表
  147. def to_connect_com(self, port_name):
  148. # 关闭串口
  149. print("to_connect_com", port_name)
  150. self.close_connect()
  151. time.sleep(0.3)
  152. self.connect_state = False
  153. try:
  154. self.serial_ins = SerialIns(port_name=port_name, baud=115200, timeout=0.1)
  155. if not self.serial_ins.serial_handle:
  156. message_data= {
  157. "_type": "show_info",
  158. "plugins_mode": "mcu",
  159. "data": "MCU 打开串口失败",
  160. }
  161. self.sendSocketMessage(
  162. msg="MCU 打开串口失败",
  163. data=message_data,
  164. )
  165. self.serial_ins = None
  166. self.connect_state = False
  167. return False
  168. except:
  169. message_data={
  170. "_type": "show_info",
  171. "plugins_mode": "mcu",
  172. "data": "MCU 打开串口失败",
  173. }
  174. self.sendSocketMessage(
  175. msg="MCU 打开串口失败",
  176. data=message_data,
  177. )
  178. self.serial_ins = None
  179. self.connect_state = False
  180. return False
  181. message_data={"_type": "show_info", "plugins_mode": "mcu", "data": "MCU 开始连接"}
  182. self.sendSocketMessage(
  183. msg="MCU 开始连接",
  184. data=message_data,
  185. )
  186. # =======================发送连接请求=================================
  187. cmd = 90
  188. data = [cmd, 1]
  189. print("405 发送 连接请求 -----------------------------------------")
  190. print(self.serial_ins)
  191. # self.serial_ins.clearn_flush()
  192. self.serial_ins.write_cmd(data)
  193. # 延迟接收数据
  194. time.sleep(0.3)
  195. receive_data = self.serial_ins.read_cmd(out_time=1)
  196. if receive_data:
  197. print(
  198. "409 receive_data--90:{}".format(self.change_hex_to_int(receive_data))
  199. )
  200. if receive_data:
  201. # receive_data[2]=1 表示为MCU设备编号
  202. if receive_data[0] == 90 and receive_data[2] == 1:
  203. connect_flag = receive_data[1]
  204. # 是否有初始化
  205. try:
  206. mcu_has_been_set = receive_data[
  207. 6
  208. ] # 设备是否有初始化 ,1 表示已初始化
  209. except:
  210. mcu_has_been_set = 99 # 未知状态
  211. print("MCU初始化信息{}".format(mcu_has_been_set))
  212. message_data = {
  213. "_type": "show_info",
  214. "plugins_mode": "mcu",
  215. "data": "MCU 已连接",
  216. }
  217. self.sendSocketMessage(
  218. msg="MCU 已连接",
  219. data=message_data,
  220. )
  221. self.connect_state = True
  222. self.is_running = True
  223. print("MCU 已连接")
  224. self.port_name = port_name
  225. return
  226. print("MCU 连接失败")
  227. message_data = {
  228. "_type": "show_info",
  229. "plugins_mode": "mcu",
  230. "data": "MCU 连接失败",
  231. }
  232. self.sendSocketMessage(
  233. msg="MCU 连接失败",
  234. data=message_data,
  235. )
  236. self.close_connect()
  237. def close_connect(self):
  238. self.port_name = ""
  239. if self.serial_ins:
  240. self.serial_ins.close_serial_port()
  241. self.is_running = False
  242. self.connect_state = False
  243. self.connected_ports_dict = {} # 已连接的ports
  244. self.p_list = []
  245. self.temp_ports_dict = {}
  246. print("关闭MCU")
  247. async def checkMcuConnection(device_ctrl: DeviceControl):
  248. if device_ctrl.is_running == True:
  249. message = {
  250. "_type": "select_port_name",
  251. "plugins_mode": "auto_select_com",
  252. "data": device_ctrl.temp_ports_dict,
  253. }
  254. device_ctrl.sendSocketMessage(code=0, msg="MCU连接成功", data=message)
  255. return
  256. """实时检测串口是否连接"""
  257. while True:
  258. await asyncio.sleep(0.5)
  259. ports_dict = device_ctrl.scan_serial_port()
  260. device_ctrl.temp_ports_dict = ports_dict
  261. if not ports_dict:
  262. # 全部清空 移除所有串口
  263. if device_ctrl.p_list:
  264. _p = device_ctrl.p_list.pop()
  265. device_ctrl.remove_port(_p)
  266. print("串口未连接,请检查")
  267. device_ctrl.sendSocketMessage(code=1, msg="串口未连接,请检查")
  268. continue
  269. if ports_dict:
  270. for index, _i in enumerate(device_ctrl.p_list):
  271. if _i not in ports_dict:
  272. _p = device_ctrl.p_list.pop(index)
  273. device_ctrl.remove_port(_p)
  274. for _port_name, _port_value in ports_dict.items():
  275. if _port_name not in device_ctrl.p_list:
  276. try:
  277. device_ctrl.p_list.append(_port_name)
  278. device_ctrl.add_port_by_linkage(_port_name)
  279. except BaseException as e:
  280. print("串口不存在{} {}".format(_port_name, e))