RemoteControlV2.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340
  1. # module_remote_control_v2
  2. import time, asyncio
  3. import settings
  4. from .SerialIns import SerialIns
  5. from .BaseClass import BaseClass
  6. from sockets.connect_manager import ConnectionManager
  7. # from .BlueToothMode import BlueToothMode
  8. class RemoteControlV2(BaseClass):
  9. # sign_data = Signal(dict)
  10. def __init__(self, bluetooth_ins, websocket_manager: ConnectionManager):
  11. # 遥控设备处理--新版遥控器;硅胶按钮
  12. super().__init__(websocket_manager)
  13. self.websocket_manager = websocket_manager
  14. # self.windows = windows
  15. self.serial_ins = None
  16. self.bluetooth_ins = bluetooth_ins
  17. self.port_name = ""
  18. self.bluetooth_address = ""
  19. self.connect_state = False
  20. self.is_running = False
  21. def to_connect_com(self, port_name, is_test=False):
  22. if self.connect_state:
  23. return
  24. self.close_connect()
  25. time.sleep(0.5)
  26. try:
  27. # 原值为9600
  28. self.serial_ins = SerialIns(port_name=port_name, baud=115200)
  29. # self.serial_ins = SerialIns(port_name=port_name, baud=9600)
  30. if self.serial_ins.serial_handle:
  31. message = {
  32. "_type": "show_info",
  33. "plugins_mode": "remote_control",
  34. "data": "遥控设备V2 打开串口成功",
  35. }
  36. self.sendSocketMessage(
  37. code=0, msg="遥控设备V2 打开串口成功", data=message
  38. )
  39. # print(message)
  40. self.connect_state = True
  41. message = {
  42. "_type": "remote_control_connect",
  43. "plugins_mode": "remote_control",
  44. "data": port_name,
  45. }
  46. self.sendSocketMessage(code=0, msg="", data=message)
  47. print(message)
  48. self.port_name = port_name
  49. self.is_running = True
  50. self.set_voltage_value(0)
  51. self.run()
  52. return True
  53. else:
  54. message = {
  55. "_type": "show_info",
  56. "plugins_mode": "remote_control",
  57. "data": "遥控设备V2 打开串口失败",
  58. }
  59. self.sendSocketMessage(
  60. code=1, msg="遥控设备V2 打开串口失败", data=message
  61. )
  62. print(message)
  63. self.serial_ins = None
  64. self.connect_state = False
  65. self.set_voltage_value(None)
  66. except:
  67. message = {
  68. "_type": "show_info",
  69. "plugins_mode": "remote_control",
  70. "data": "遥控设备V2 打开串口失败",
  71. }
  72. print(message)
  73. self.sendSocketMessage(code=1, msg="遥控设备V2 打开串口失败", data=message)
  74. self.serial_ins = None
  75. self.connect_state = False
  76. self.set_voltage_value(None)
  77. return False
  78. def to_connect_bluetooth(self, address, is_test=False):
  79. print("to_connect_bluetooth", self.connect_state)
  80. if self.connect_state:
  81. return
  82. # if self.bluetooth_ins == None:
  83. # print("bluetooth_ins 未初始化", bluetooth_mode)
  84. # self.bluetooth_ins = BlueToothMode()
  85. # else:
  86. # self.bluetooth_ins = bluetooth_mode
  87. self.close_connect()
  88. if self.bluetooth_ins == None:
  89. print("bluetooth_ins 未初始化", self.bluetooth_ins)
  90. # self.bluetooth_ins = self.
  91. self.bluetooth_address = address
  92. message = {
  93. "_type": "show_info",
  94. "plugins_mode": "remote_control",
  95. "data": "遥控设备V2 打开蓝牙成功",
  96. }
  97. print(message)
  98. self.sendSocketMessage(code=0, msg="遥控设备V2 打开蓝牙成功", data=message)
  99. self.connect_state = True
  100. self.is_running = True
  101. self.set_voltage_value(0)
  102. if is_test is False:
  103. loop = asyncio.get_event_loop()
  104. loop.create_task(self.run())
  105. def close_bluetooth_connect(self):
  106. if self.bluetooth_address:
  107. print("蓝牙断开")
  108. message = {
  109. "_type": "show_info",
  110. "plugins_mode": "remote_control",
  111. "data": "遥控设备V2 蓝牙断开",
  112. }
  113. print(message)
  114. self.sendSocketMessage(code=1, msg="遥控设备V2 蓝牙断开", data=message)
  115. self.close_connect()
  116. def set_voltage_value(self, voltage_value=None, voltage_text=None):
  117. if self.is_running:
  118. flag = "接收器已连接 {}".format(
  119. "蓝牙" if self.bluetooth_address else "串口"
  120. )
  121. else:
  122. flag = "接收器未连接"
  123. if voltage_value is None:
  124. # self.windows.show_label.setText("{}".format(flag))
  125. if voltage_text:
  126. print(voltage_text)
  127. # self.windows.show_label.setText("{}".format(voltage_text))
  128. else:
  129. if voltage_value == 0:
  130. print(flag)
  131. # self.windows.show_label.setText("{}".format(flag))
  132. else:
  133. print("电量:{}%".format(voltage_value))
  134. # self.windows.show_label.setText("电量:{}%".format(voltage_value))
  135. print("打印===>", flag)
  136. def close_connect(self):
  137. self.port_name = ""
  138. self.bluetooth_address = ""
  139. # self.bluetooth_ins = None
  140. if self.serial_ins:
  141. self.serial_ins.close_serial_port()
  142. self.connect_state = False
  143. def __del__(self):
  144. self.close_connect()
  145. def play_sound(self, tip="sound_tips_3"):
  146. self.windows.playsound.tips_type = tip
  147. self.windows.playsound.start()
  148. def analysis_received_data(self):
  149. if not self.connect_state:
  150. return
  151. if self.bluetooth_address:
  152. receive_data = self.bluetooth_ins.read_cmd_one(
  153. address=self.bluetooth_address
  154. )
  155. # print("received data", receive_data)
  156. else:
  157. receive_data = self.serial_ins.read_cmd(out_time=1, check=None)
  158. # print("self.bluetooth_ins", receive_data)
  159. if receive_data is False:
  160. self.connect_state = False
  161. return False
  162. if not receive_data:
  163. return
  164. # 数据 结构 command,按命令解析
  165. if receive_data[0] == 1:
  166. # self.play_sound("get_qr_code")
  167. # 扫码数据
  168. bar_code = receive_data[1:].decode()
  169. bar_code = bar_code.replace("\r", "")
  170. bar_code = bar_code.replace("\n", "")
  171. message = {"_type": 0, "plugins_mode": "remote_control", "data": bar_code}
  172. print(message)
  173. self.sendSocketMessage(code=0, msg="", data=message)
  174. return
  175. if receive_data[0] == 9:
  176. # 播放声音
  177. button_value = receive_data[1]
  178. data = {"button_value": button_value}
  179. message = {"_type": 9, "plugins_mode": "remote_control", "data": data}
  180. print(message)
  181. self.sendSocketMessage(code=0, msg="", data=message)
  182. if settings.IS_DEBUG:
  183. print("收到按键", button_value)
  184. return
  185. if receive_data[0] == 10:
  186. voltage_value = receive_data[1]
  187. self.set_voltage_value(voltage_value)
  188. if settings.IS_TEST:
  189. print("遥控器V2电量:{}".format(voltage_value))
  190. return
  191. # 使用Max17048 查看电量
  192. if receive_data[0] == 12:
  193. chg_status = self.get_data_from_receive_data(
  194. receive_data=receive_data, start=1, len_data=1
  195. )
  196. soc_percentage = self.get_data_from_receive_data(
  197. receive_data=receive_data, start=2, len_data=4
  198. )
  199. soc_percentage = soc_percentage / 100
  200. voltage = self.get_data_from_receive_data(
  201. receive_data=receive_data, start=6, len_data=4
  202. )
  203. voltage = voltage / 100
  204. current = self.get_data_from_receive_data(
  205. receive_data=receive_data, start=10, len_data=4
  206. )
  207. current = current / 10000
  208. temperature = self.get_data_from_receive_data(
  209. receive_data=receive_data, start=14, len_data=4
  210. )
  211. temperature = temperature / 100
  212. adjusted_soc = self.get_data_from_receive_data(
  213. receive_data=receive_data, start=18, len_data=4
  214. )
  215. adjusted_soc = adjusted_soc / 100
  216. soft_vision = self.get_data_from_receive_data(
  217. receive_data=receive_data, start=22, len_data=1
  218. )
  219. full_status = self.get_data_from_receive_data(
  220. receive_data=receive_data, start=23, len_data=1
  221. )
  222. # print("is_charging:{}".format(chg_status))
  223. # print("Battery SOC: {:.2f}%".format(soc_percentage))
  224. # print("Battery Voltage: {:.3f}V".format(voltage))
  225. # print("Average Current: {:.3f}A".format(current))
  226. # print("Chip Temperature: {:.1f}°C".format(temperature))
  227. # print("adjusted Battery soc: {:.2f}%".format(adjusted_soc))
  228. # print("soft_vision:{}".format(soft_vision))
  229. # print("chg_status:{} full_status:{}".format(chg_status, full_status))
  230. if chg_status:
  231. t1 = "充电中"
  232. else:
  233. t1 = ""
  234. if full_status:
  235. t2 = "已充满"
  236. else:
  237. t2 = ""
  238. self.set_voltage_value(
  239. voltage_text="遥控器:{:.1f}% {:.2f}V {}{}".format(
  240. min(adjusted_soc / 92.5 * 100, 100), voltage, t1, t2
  241. )
  242. )
  243. return
  244. if receive_data[0] == 111:
  245. value = (
  246. receive_data[1] << 24
  247. | receive_data[2] << 16
  248. | receive_data[3] << 8
  249. | receive_data[4]
  250. )
  251. print("遥控器-测试 value:{}".format(value))
  252. # self.windows.show_label.setText("--------{}".format(value))
  253. if receive_data[0] == 112:
  254. bar_code = receive_data[1:].decode()
  255. bar_code = bar_code.replace("\r", "")
  256. bar_code = bar_code.replace("\n", "")
  257. print("read bar_code {}".format(bar_code))
  258. # 通用串口数据解析器
  259. def get_data_from_receive_data(
  260. self, receive_data, start, len_data, data_magnification=1
  261. ):
  262. # data_magnification 数据放大倍数,或缩小倍数,默认为1
  263. try:
  264. if len_data == 1:
  265. data = receive_data[start]
  266. return data * data_magnification
  267. elif len_data == 2:
  268. data = receive_data[start] << 8 | receive_data[start + 1]
  269. return data * data_magnification
  270. elif len_data == 4:
  271. data = (
  272. receive_data[start] << 24
  273. | receive_data[start + 1] << 16
  274. | receive_data[start + 2] << 8
  275. | receive_data[start + 3]
  276. )
  277. return data * data_magnification
  278. return None
  279. except:
  280. return None
  281. async def run(self):
  282. # self.show_info.emit("未连接")
  283. # self.data_command_sign.emit(data)
  284. self.is_running = True
  285. print("Running")
  286. while 1:
  287. await asyncio.sleep(0.01)
  288. if not self.connect_state:
  289. message = {
  290. "_type": "show_info",
  291. "plugins_mode": "remote_control",
  292. "data": "遥控设备V2 未连接",
  293. }
  294. print(message)
  295. self.sendSocketMessage(code=1, msg="遥控设备V2 未连接", data=message)
  296. break
  297. self.analysis_received_data()
  298. self.is_running = False
  299. if not self.connect_state:
  300. message = {
  301. "_type": "show_info",
  302. "plugins_mode": "remote_control",
  303. "data": "遥控设备V2 未连接",
  304. }
  305. print(message)
  306. self.sendSocketMessage(code=1, msg="遥控设备V2 未连接", data=message)
  307. self.set_voltage_value(None)