RemoteControlV2.py 12 KB


  1. # module_remote_control_v2
  2. import time, asyncio
  3. import settings
  4. from .SerialIns import SerialIns
  5. from .BaseClass import BaseClass
  6. from sockets.connect_manager import ConnectionManager
  7. # from .BlueToothMode import BlueToothMode
  8. class RemoteControlV2(BaseClass):
  9. # sign_data = Signal(dict)
  10. def __init__(self, bluetooth_ins, websocket_manager: ConnectionManager):
  11. # 遥控设备处理--新版遥控器;硅胶按钮
  12. super().__init__(websocket_manager)
  13. self.websocket_manager = websocket_manager
  14. # self.windows = windows
  15. self.serial_ins = None
  16. self.bluetooth_ins = bluetooth_ins
  17. self.port_name = ""
  18. self.bluetooth_address = ""
  19. self.connect_state = False
  20. self.is_running = False
  21. def to_connect_com(self, port_name, is_test=False):
  22. if self.connect_state:
  23. return
  24. self.close_connect()
  25. time.sleep(0.5)
  26. try:
  27. # 原值为9600
  28. self.serial_ins = SerialIns(port_name=port_name, baud=115200)
  29. # self.serial_ins = SerialIns(port_name=port_name, baud=9600)
  30. if self.serial_ins.serial_handle:
  31. message = {
  32. "_type": "show_info",
  33. "plugins_mode": "remote_control",
  34. "data": "遥控设备V2 打开串口成功",
  35. }
  36. self.sendSocketMessage(
  37. code=0, msg="遥控设备V2 打开串口成功", data=message
  38. )
  39. # print(message)
  40. self.connect_state = True
  41. message = {
  42. "_type": "remote_control_connect",
  43. "plugins_mode": "remote_control",
  44. "data": port_name,
  45. }
  46. self.sendSocketMessage(code=0, msg="", data=message)
  47. print(message)
  48. self.port_name = port_name
  49. self.is_running = True
  50. self.set_voltage_value(0)
  51. self.run()
  52. return True
  53. else:
  54. message = {
  55. "_type": "show_info",
  56. "plugins_mode": "remote_control",
  57. "data": "遥控设备V2 打开串口失败",
  58. }
  59. self.sendSocketMessage(
  60. code=1, msg="遥控设备V2 打开串口失败", data=message
  61. )
  62. print(message)
  63. self.serial_ins = None
  64. self.connect_state = False
  65. self.set_voltage_value(None)
  66. except:
  67. message = {
  68. "_type": "show_info",
  69. "plugins_mode": "remote_control",
  70. "data": "遥控设备V2 打开串口失败",
  71. }
  72. print(message)
  73. self.sendSocketMessage(code=1, msg="遥控设备V2 打开串口失败", data=message)
  74. self.serial_ins = None
  75. self.connect_state = False
  76. self.set_voltage_value(None)
  77. return False
  78. def to_connect_bluetooth(self, address, is_test=False):
  79. print("to_connect_bluetooth", self.connect_state)
  80. if self.connect_state:
  81. return
  82. # if self.bluetooth_ins == None:
  83. # print("bluetooth_ins 未初始化", bluetooth_mode)
  84. # self.bluetooth_ins = BlueToothMode()
  85. # else:
  86. # self.bluetooth_ins = bluetooth_mode
  87. self.close_connect()
  88. if self.bluetooth_ins == None:
  89. print("bluetooth_ins 未初始化", self.bluetooth_ins)
  90. # self.bluetooth_ins = self.
  91. self.bluetooth_address = address
  92. message = {
  93. "_type": "show_info",
  94. "plugins_mode": "remote_control",
  95. "data": "遥控设备V2 打开蓝牙成功",
  96. }
  97. print(message)
  98. self.sendSocketMessage(code=0, msg="遥控设备V2 打开蓝牙成功", data=message)
  99. self.connect_state = True
  100. self.is_running = True
  101. self.set_voltage_value(0)
  102. if is_test is False:
  103. loop = asyncio.get_event_loop()
  104. loop.create_task(self.run())
  105. def close_bluetooth_connect(self):
  106. if self.bluetooth_address:
  107. print("蓝牙断开")
  108. message = {
  109. "_type": "show_info",
  110. "plugins_mode": "remote_control",
  111. "data": "遥控设备V2 蓝牙断开",
  112. }
  113. print(message)
  114. self.sendSocketMessage(code=1, msg="遥控设备V2 蓝牙断开", data=message)
  115. self.close_connect()
  116. print("关闭蓝牙")
  117. def set_voltage_value(self, voltage_value=None, voltage_text=None):
  118. if self.is_running:
  119. flag = "接收器已连接 {}".format(
  120. "蓝牙" if self.bluetooth_address else "串口"
  121. )
  122. else:
  123. flag = "接收器未连接"
  124. if voltage_value is None:
  125. # self.windows.show_label.setText("{}".format(flag))
  126. if voltage_text:
  127. print(voltage_text)
  128. # self.windows.show_label.setText("{}".format(voltage_text))
  129. else:
  130. if voltage_value == 0:
  131. print(flag)
  132. # self.windows.show_label.setText("{}".format(flag))
  133. else:
  134. print("电量:{}%".format(voltage_value))
  135. # self.windows.show_label.setText("电量:{}%".format(voltage_value))
  136. print("打印===>", flag)
  137. def close_connect(self):
  138. self.port_name = ""
  139. self.bluetooth_address = ""
  140. # self.bluetooth_ins = None
  141. if self.serial_ins:
  142. self.serial_ins.close_serial_port()
  143. self.connect_state = False
  144. def __del__(self):
  145. self.close_connect()
  146. def play_sound(self, tip="sound_tips_3"):
  147. self.windows.playsound.tips_type = tip
  148. self.windows.playsound.start()
  149. def analysis_received_data(self):
  150. if not self.connect_state:
  151. return
  152. if self.bluetooth_address:
  153. receive_data = self.bluetooth_ins.read_cmd_one(
  154. address=self.bluetooth_address
  155. )
  156. # print("received data", receive_data)
  157. else:
  158. receive_data = self.serial_ins.read_cmd(out_time=1, check=None)
  159. # print("self.bluetooth_ins", receive_data)
  160. if receive_data is False:
  161. self.connect_state = False
  162. return False
  163. if not receive_data:
  164. return
  165. # 数据 结构 command,按命令解析
  166. if receive_data[0] == 1:
  167. # self.play_sound("get_qr_code")
  168. # 扫码数据
  169. bar_code = receive_data[1:].decode()
  170. bar_code = bar_code.replace("\r", "")
  171. bar_code = bar_code.replace("\n", "")
  172. message = {"_type": 0, "plugins_mode": "remote_control", "data": bar_code}
  173. print(message)
  174. self.sendSocketMessage(code=0, msg="", data=message)
  175. return
  176. if receive_data[0] == 9:
  177. # 播放声音
  178. button_value = receive_data[1]
  179. data = {"button_value": button_value}
  180. message = {"_type": 9, "plugins_mode": "remote_control", "data": data}
  181. print(message)
  182. self.sendSocketMessage(code=0, msg="", data=message)
  183. if settings.IS_DEBUG:
  184. print("收到按键", button_value)
  185. return
  186. if receive_data[0] == 10:
  187. voltage_value = receive_data[1]
  188. self.set_voltage_value(voltage_value)
  189. if settings.IS_TEST:
  190. print("遥控器V2电量:{}".format(voltage_value))
  191. return
  192. # 使用Max17048 查看电量
  193. if receive_data[0] == 12:
  194. chg_status = self.get_data_from_receive_data(
  195. receive_data=receive_data, start=1, len_data=1
  196. )
  197. soc_percentage = self.get_data_from_receive_data(
  198. receive_data=receive_data, start=2, len_data=4
  199. )
  200. soc_percentage = soc_percentage / 100
  201. voltage = self.get_data_from_receive_data(
  202. receive_data=receive_data, start=6, len_data=4
  203. )
  204. voltage = voltage / 100
  205. current = self.get_data_from_receive_data(
  206. receive_data=receive_data, start=10, len_data=4
  207. )
  208. current = current / 10000
  209. temperature = self.get_data_from_receive_data(
  210. receive_data=receive_data, start=14, len_data=4
  211. )
  212. temperature = temperature / 100
  213. adjusted_soc = self.get_data_from_receive_data(
  214. receive_data=receive_data, start=18, len_data=4
  215. )
  216. adjusted_soc = adjusted_soc / 100
  217. soft_vision = self.get_data_from_receive_data(
  218. receive_data=receive_data, start=22, len_data=1
  219. )
  220. full_status = self.get_data_from_receive_data(
  221. receive_data=receive_data, start=23, len_data=1
  222. )
  223. # print("is_charging:{}".format(chg_status))
  224. # print("Battery SOC: {:.2f}%".format(soc_percentage))
  225. # print("Battery Voltage: {:.3f}V".format(voltage))
  226. # print("Average Current: {:.3f}A".format(current))
  227. # print("Chip Temperature: {:.1f}°C".format(temperature))
  228. # print("adjusted Battery soc: {:.2f}%".format(adjusted_soc))
  229. # print("soft_vision:{}".format(soft_vision))
  230. # print("chg_status:{} full_status:{}".format(chg_status, full_status))
  231. if chg_status:
  232. t1 = "充电中"
  233. else:
  234. t1 = ""
  235. if full_status:
  236. t2 = "已充满"
  237. else:
  238. t2 = ""
  239. self.set_voltage_value(
  240. voltage_text="遥控器:{:.1f}% {:.2f}V {}{}".format(
  241. min(adjusted_soc / 92.5 * 100, 100), voltage, t1, t2
  242. )
  243. )
  244. return
  245. if receive_data[0] == 111:
  246. value = (
  247. receive_data[1] << 24
  248. | receive_data[2] << 16
  249. | receive_data[3] << 8
  250. | receive_data[4]
  251. )
  252. print("遥控器-测试 value:{}".format(value))
  253. # self.windows.show_label.setText("--------{}".format(value))
  254. if receive_data[0] == 112:
  255. bar_code = receive_data[1:].decode()
  256. bar_code = bar_code.replace("\r", "")
  257. bar_code = bar_code.replace("\n", "")
  258. print("read bar_code {}".format(bar_code))
  259. # 通用串口数据解析器
  260. def get_data_from_receive_data(
  261. self, receive_data, start, len_data, data_magnification=1
  262. ):
  263. # data_magnification 数据放大倍数,或缩小倍数,默认为1
  264. try:
  265. if len_data == 1:
  266. data = receive_data[start]
  267. return data * data_magnification
  268. elif len_data == 2:
  269. data = receive_data[start] << 8 | receive_data[start + 1]
  270. return data * data_magnification
  271. elif len_data == 4:
  272. data = (
  273. receive_data[start] << 24
  274. | receive_data[start + 1] << 16
  275. | receive_data[start + 2] << 8
  276. | receive_data[start + 3]
  277. )
  278. return data * data_magnification
  279. return None
  280. except:
  281. return None
  282. async def run(self):
  283. # self.show_info.emit("未连接")
  284. # self.data_command_sign.emit(data)
  285. self.is_running = True
  286. print("Running")
  287. while 1:
  288. await asyncio.sleep(0.01)
  289. if not self.connect_state:
  290. message = {
  291. "_type": "show_info",
  292. "plugins_mode": "remote_control",
  293. "data": "遥控设备V2 未连接",
  294. }
  295. print(message)
  296. self.sendSocketMessage(code=1, msg="遥控设备V2 未连接", data=message)
  297. break
  298. self.analysis_received_data()
  299. self.is_running = False
  300. if not self.connect_state:
  301. message = {
  302. "_type": "show_info",
  303. "plugins_mode": "remote_control",
  304. "data": "遥控设备V2 未连接",
  305. }
  306. print(message)
  307. self.sendSocketMessage(code=1, msg="遥控设备V2 未连接", data=message)
  308. self.set_voltage_value(None)