RemoteControlV2.py 13 KB


  1. # module_remote_control_v2
  2. import time, asyncio
  3. import settings
  4. from .SerialIns import SerialIns
  5. from .BaseClass import BaseClass
  6. from sockets.connect_manager import ConnectionManager
  7. # from .BlueToothMode import BlueToothMode
  8. class RemoteControlV2(BaseClass):
  9. # sign_data = Signal(dict)
  10. def __init__(self, bluetooth_ins, websocket_manager: ConnectionManager):
  11. # 遥控设备处理--新版遥控器;硅胶按钮
  12. super().__init__(websocket_manager)
  13. self.msg_type = "blue_tooth"
  14. self.websocket_manager = websocket_manager
  15. # self.windows = windows
  16. self.serial_ins = None
  17. self.bluetooth_ins = bluetooth_ins
  18. self.port_name = ""
  19. self.bluetooth_address = ""
  20. self.connect_state = False
  21. self.is_running = False
  22. def to_connect_com(self, port_name, is_test=False):
  23. if self.connect_state:
  24. return
  25. self.close_connect()
  26. time.sleep(0.5)
  27. try:
  28. # 原值为9600
  29. self.serial_ins = SerialIns(port_name=port_name, baud=115200)
  30. # self.serial_ins = SerialIns(port_name=port_name, baud=9600)
  31. if self.serial_ins.serial_handle:
  32. message = {
  33. "_type": "show_info",
  34. "plugins_mode": "remote_control",
  35. "data": "遥控设备V2 打开串口成功",
  36. }
  37. self.sendSocketMessage(
  38. code=0,
  39. msg="遥控设备V2 打开串口成功",
  40. data=message,
  41. device_status=1,
  42. )
  43. # print(message)
  44. self.connect_state = True
  45. message = {
  46. "_type": "remote_control_connect",
  47. "plugins_mode": "remote_control",
  48. "data": port_name,
  49. }
  50. self.sendSocketMessage(code=0, msg="", data=message,device_status=2)
  51. print(message)
  52. self.port_name = port_name
  53. self.is_running = True
  54. self.set_voltage_value(0)
  55. self.run()
  56. return True
  57. else:
  58. message = {
  59. "_type": "show_info",
  60. "plugins_mode": "remote_control",
  61. "data": "遥控设备V2 打开串口失败",
  62. }
  63. self.sendSocketMessage(
  64. code=1, msg="遥控设备V2 打开串口失败", data=message,
  65. device_status=-1
  66. )
  67. print(message)
  68. self.serial_ins = None
  69. self.connect_state = False
  70. self.set_voltage_value(None)
  71. except:
  72. message = {
  73. "_type": "show_info",
  74. "plugins_mode": "remote_control",
  75. "data": "遥控设备V2 打开串口失败",
  76. }
  77. print(message)
  78. self.sendSocketMessage(
  79. code=1, msg="遥控设备V2 打开串口失败", data=message, device_status=-1
  80. )
  81. self.serial_ins = None
  82. self.connect_state = False
  83. self.set_voltage_value(None)
  84. return False
  85. def to_connect_bluetooth(self, address, is_test=False):
  86. print("to_connect_bluetooth", self.connect_state)
  87. if self.connect_state:
  88. return
  89. # if self.bluetooth_ins == None:
  90. # print("bluetooth_ins 未初始化", bluetooth_mode)
  91. # self.bluetooth_ins = BlueToothMode()
  92. # else:
  93. # self.bluetooth_ins = bluetooth_mode
  94. self.close_connect()
  95. if self.bluetooth_ins == None:
  96. print("bluetooth_ins 未初始化", self.bluetooth_ins)
  97. # self.bluetooth_ins = self.
  98. self.bluetooth_address = address
  99. message = {
  100. "_type": "show_info",
  101. "plugins_mode": "remote_control",
  102. "data": "遥控设备V2 打开蓝牙成功",
  103. }
  104. print(message)
  105. self.sendSocketMessage(code=0, msg="遥控设备V2 打开蓝牙成功", data=message,device_status=2)
  106. self.connect_state = True
  107. self.is_running = True
  108. self.set_voltage_value(0)
  109. if is_test is False:
  110. loop = asyncio.get_event_loop()
  111. loop.create_task(self.run())
  112. def close_bluetooth_connect(self):
  113. if self.bluetooth_address:
  114. print("蓝牙断开")
  115. message = {
  116. "_type": "show_info",
  117. "plugins_mode": "remote_control",
  118. "data": "遥控设备V2 蓝牙断开",
  119. }
  120. print(message)
  121. self.sendSocketMessage(
  122. code=1, msg="遥控设备V2 蓝牙断开", data=message, device_status=-1
  123. )
  124. self.close_connect()
  125. print("关闭蓝牙")
  126. def set_voltage_value(self, voltage_value=None, voltage_text=None):
  127. device_status = 2
  128. if self.is_running:
  129. flag = "接收器已连接 {}".format(
  130. "蓝牙" if self.bluetooth_address else "串口"
  131. )
  132. else:
  133. flag = "接收器未连接"
  134. device_status = -1
  135. if voltage_value is None:
  136. if voltage_text:
  137. print(voltage_text)
  138. # 发送电量剩余消息
  139. self.sendSocketMessage(msg=voltage_text, device_status=device_status)
  140. else:
  141. self.sendSocketMessage(msg=flag, device_status=device_status)
  142. else:
  143. if voltage_value == 0:
  144. print(flag)
  145. self.sendSocketMessage(msg=flag, device_status=device_status)
  146. else:
  147. print("电量:{}%".format(voltage_value))
  148. self.sendSocketMessage(
  149. msg="电量:{}%".format(voltage_value), device_status=device_status
  150. )
  151. # print("打印===>", flag)
  152. def close_connect(self):
  153. self.port_name = ""
  154. self.bluetooth_address = ""
  155. # self.bluetooth_ins = None
  156. if self.serial_ins:
  157. self.serial_ins.close_serial_port()
  158. self.connect_state = False
  159. def __del__(self):
  160. self.close_connect()
  161. def play_sound(self, tip="sound_tips_3"):
  162. self.windows.playsound.tips_type = tip
  163. self.windows.playsound.start()
  164. def analysis_received_data(self):
  165. if not self.connect_state:
  166. return
  167. if self.bluetooth_address:
  168. receive_data = self.bluetooth_ins.read_cmd_one(
  169. address=self.bluetooth_address
  170. )
  171. # print("received data", receive_data)
  172. else:
  173. receive_data = self.serial_ins.read_cmd(out_time=1, check=None)
  174. # print("self.bluetooth_ins", receive_data)
  175. if receive_data is False:
  176. self.connect_state = False
  177. return False
  178. if not receive_data:
  179. return
  180. receive_data_parser = receive_data[0]
  181. # 数据 结构 command,按命令解析
  182. if receive_data_parser == 1:
  183. # self.play_sound("get_qr_code")
  184. # 扫码数据
  185. bar_code = receive_data[1:].decode()
  186. bar_code = bar_code.replace("\r", "")
  187. bar_code = bar_code.replace("\n", "")
  188. message = {"_type": 0, "plugins_mode": "remote_control", "data": bar_code}
  189. print(message)
  190. self.sendSocketMessage(code=0, msg="", data=message, device_status=2)
  191. return
  192. if receive_data_parser == 9:
  193. # 播放声音
  194. button_value = receive_data[1]
  195. data = {"button_value": button_value}
  196. message = {"_type": 9, "plugins_mode": "remote_control", "data": data}
  197. print(message)
  198. self.sendSocketMessage(code=0, msg="", data=message, device_status=2)
  199. if settings.IS_DEBUG:
  200. print("收到按键", button_value)
  201. return
  202. if receive_data_parser == 10:
  203. voltage_value = receive_data[1]
  204. self.set_voltage_value(voltage_value)
  205. if settings.IS_TEST:
  206. print("遥控器V2电量:{}".format(voltage_value))
  207. return
  208. # 使用Max17048 查看电量
  209. if receive_data_parser == 12:
  210. chg_status = self.get_data_from_receive_data(
  211. receive_data=receive_data, start=1, len_data=1
  212. )
  213. soc_percentage = self.get_data_from_receive_data(
  214. receive_data=receive_data, start=2, len_data=4
  215. )
  216. soc_percentage = soc_percentage / 100
  217. voltage = self.get_data_from_receive_data(
  218. receive_data=receive_data, start=6, len_data=4
  219. )
  220. voltage = voltage / 100
  221. current = self.get_data_from_receive_data(
  222. receive_data=receive_data, start=10, len_data=4
  223. )
  224. current = current / 10000
  225. temperature = self.get_data_from_receive_data(
  226. receive_data=receive_data, start=14, len_data=4
  227. )
  228. temperature = temperature / 100
  229. adjusted_soc = self.get_data_from_receive_data(
  230. receive_data=receive_data, start=18, len_data=4
  231. )
  232. adjusted_soc = adjusted_soc / 100
  233. soft_vision = self.get_data_from_receive_data(
  234. receive_data=receive_data, start=22, len_data=1
  235. )
  236. full_status = self.get_data_from_receive_data(
  237. receive_data=receive_data, start=23, len_data=1
  238. )
  239. # print("is_charging:{}".format(chg_status))
  240. # print("Battery SOC: {:.2f}%".format(soc_percentage))
  241. # print("Battery Voltage: {:.3f}V".format(voltage))
  242. # print("Average Current: {:.3f}A".format(current))
  243. # print("Chip Temperature: {:.1f}°C".format(temperature))
  244. # print("adjusted Battery soc: {:.2f}%".format(adjusted_soc))
  245. # print("soft_vision:{}".format(soft_vision))
  246. # print("chg_status:{} full_status:{}".format(chg_status, full_status))
  247. if chg_status:
  248. t1 = "充电中"
  249. else:
  250. t1 = ""
  251. if full_status:
  252. t2 = "已充满"
  253. else:
  254. t2 = ""
  255. self.set_voltage_value(
  256. voltage_text="遥控器:{:.1f}% {:.2f}V {}{}".format(
  257. min(adjusted_soc / 92.5 * 100, 100), voltage, t1, t2
  258. )
  259. )
  260. return
  261. if receive_data[0] == 111:
  262. value = (
  263. receive_data[1] << 24
  264. | receive_data[2] << 16
  265. | receive_data[3] << 8
  266. | receive_data[4]
  267. )
  268. print("遥控器-测试 value:{}".format(value))
  269. # self.windows.show_label.setText("--------{}".format(value))
  270. if receive_data[0] == 112:
  271. bar_code = receive_data[1:].decode()
  272. bar_code = bar_code.replace("\r", "")
  273. bar_code = bar_code.replace("\n", "")
  274. print("read bar_code {}".format(bar_code))
  275. # 通用串口数据解析器
  276. def get_data_from_receive_data(
  277. self, receive_data, start, len_data, data_magnification=1
  278. ):
  279. # data_magnification 数据放大倍数,或缩小倍数,默认为1
  280. try:
  281. if len_data == 1:
  282. data = receive_data[start]
  283. return data * data_magnification
  284. elif len_data == 2:
  285. data = receive_data[start] << 8 | receive_data[start + 1]
  286. return data * data_magnification
  287. elif len_data == 4:
  288. data = (
  289. receive_data[start] << 24
  290. | receive_data[start + 1] << 16
  291. | receive_data[start + 2] << 8
  292. | receive_data[start + 3]
  293. )
  294. return data * data_magnification
  295. return None
  296. except:
  297. return None
  298. async def run(self):
  299. # self.show_info.emit("未连接")
  300. # self.data_command_sign.emit(data)
  301. self.is_running = True
  302. print("Running")
  303. while 1:
  304. await asyncio.sleep(0.01)
  305. if not self.connect_state:
  306. message = {
  307. "_type": "show_info",
  308. "plugins_mode": "remote_control",
  309. "data": "遥控设备V2 未连接",
  310. }
  311. print(message)
  312. self.sendSocketMessage(code=1, msg="遥控设备V2 未连接", data=message,device_status=-1)
  313. break
  314. self.analysis_received_data()
  315. self.is_running = False
  316. if not self.connect_state:
  317. message = {
  318. "_type": "show_info",
  319. "plugins_mode": "remote_control",
  320. "data": "遥控设备V2 未连接",
  321. }
  322. print(message)
  323. self.sendSocketMessage(
  324. code=1, msg="遥控设备V2 未连接", data=message, device_status=-1
  325. )
  326. self.set_voltage_value(None)