RemoteControlV2.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344
  1. # module_remote_control_v2
  2. import time, asyncio
  3. import settings
  4. from .SerialIns import SerialIns
  5. from .BaseClass import BaseClass
  6. from sockets.connect_manager import ConnectionManager
  7. # from .BlueToothMode import BlueToothMode
  8. class RemoteControlV2(BaseClass):
  9. # sign_data = Signal(dict)
  10. def __init__(self, bluetooth_ins, websocket_manager: ConnectionManager):
  11. # 遥控设备处理--新版遥控器;硅胶按钮
  12. super().__init__(websocket_manager)
  13. self.msg_type = "blue_tooth"
  14. self.websocket_manager = websocket_manager
  15. # self.windows = windows
  16. self.serial_ins = None
  17. self.bluetooth_ins = bluetooth_ins
  18. self.port_name = ""
  19. self.bluetooth_address = ""
  20. self.connect_state = False
  21. self.is_running = False
  22. def to_connect_com(self, port_name, is_test=False):
  23. if self.connect_state:
  24. return
  25. self.close_connect()
  26. time.sleep(0.5)
  27. try:
  28. # 原值为9600
  29. self.serial_ins = SerialIns(port_name=port_name, baud=115200)
  30. # self.serial_ins = SerialIns(port_name=port_name, baud=9600)
  31. if self.serial_ins.serial_handle:
  32. message = {
  33. "_type": "show_info",
  34. "plugins_mode": "remote_control",
  35. "data": "遥控设备V2 打开串口成功",
  36. }
  37. self.sendSocketMessage(
  38. code=0, msg="遥控设备V2 打开串口成功", data=message
  39. )
  40. # print(message)
  41. self.connect_state = True
  42. message = {
  43. "_type": "remote_control_connect",
  44. "plugins_mode": "remote_control",
  45. "data": port_name,
  46. }
  47. self.sendSocketMessage(code=0, msg="", data=message)
  48. print(message)
  49. self.port_name = port_name
  50. self.is_running = True
  51. self.set_voltage_value(0)
  52. self.run()
  53. return True
  54. else:
  55. message = {
  56. "_type": "show_info",
  57. "plugins_mode": "remote_control",
  58. "data": "遥控设备V2 打开串口失败",
  59. }
  60. self.sendSocketMessage(
  61. code=1, msg="遥控设备V2 打开串口失败", data=message
  62. )
  63. print(message)
  64. self.serial_ins = None
  65. self.connect_state = False
  66. self.set_voltage_value(None)
  67. except:
  68. message = {
  69. "_type": "show_info",
  70. "plugins_mode": "remote_control",
  71. "data": "遥控设备V2 打开串口失败",
  72. }
  73. print(message)
  74. self.sendSocketMessage(code=1, msg="遥控设备V2 打开串口失败", data=message)
  75. self.serial_ins = None
  76. self.connect_state = False
  77. self.set_voltage_value(None)
  78. return False
  79. def to_connect_bluetooth(self, address, is_test=False):
  80. print("to_connect_bluetooth", self.connect_state)
  81. if self.connect_state:
  82. return
  83. # if self.bluetooth_ins == None:
  84. # print("bluetooth_ins 未初始化", bluetooth_mode)
  85. # self.bluetooth_ins = BlueToothMode()
  86. # else:
  87. # self.bluetooth_ins = bluetooth_mode
  88. self.close_connect()
  89. if self.bluetooth_ins == None:
  90. print("bluetooth_ins 未初始化", self.bluetooth_ins)
  91. # self.bluetooth_ins = self.
  92. self.bluetooth_address = address
  93. message = {
  94. "_type": "show_info",
  95. "plugins_mode": "remote_control",
  96. "data": "遥控设备V2 打开蓝牙成功",
  97. }
  98. print(message)
  99. self.sendSocketMessage(code=0, msg="遥控设备V2 打开蓝牙成功", data=message)
  100. self.connect_state = True
  101. self.is_running = True
  102. self.set_voltage_value(0)
  103. if is_test is False:
  104. loop = asyncio.get_event_loop()
  105. loop.create_task(self.run())
  106. def close_bluetooth_connect(self):
  107. if self.bluetooth_address:
  108. print("蓝牙断开")
  109. message = {
  110. "_type": "show_info",
  111. "plugins_mode": "remote_control",
  112. "data": "遥控设备V2 蓝牙断开",
  113. }
  114. print(message)
  115. self.sendSocketMessage(code=1, msg="遥控设备V2 蓝牙断开", data=message)
  116. self.close_connect()
  117. print("关闭蓝牙")
  118. def set_voltage_value(self, voltage_value=None, voltage_text=None):
  119. if self.is_running:
  120. flag = "接收器已连接 {}".format(
  121. "蓝牙" if self.bluetooth_address else "串口"
  122. )
  123. else:
  124. flag = "接收器未连接"
  125. if voltage_value is None:
  126. # self.windows.show_label.setText("{}".format(flag))
  127. if voltage_text:
  128. print(voltage_text)
  129. # 发送电量剩余消息
  130. self.sendSocketMessage(msg=voltage_text)
  131. # self.windows.show_label.setText("{}".format(voltage_text))
  132. else:
  133. if voltage_value == 0:
  134. print(flag)
  135. # self.windows.show_label.setText("{}".format(flag))
  136. else:
  137. print("电量:{}%".format(voltage_value))
  138. # self.windows.show_label.setText("电量:{}%".format(voltage_value))
  139. # print("打印===>", flag)
  140. def close_connect(self):
  141. self.port_name = ""
  142. self.bluetooth_address = ""
  143. # self.bluetooth_ins = None
  144. if self.serial_ins:
  145. self.serial_ins.close_serial_port()
  146. self.connect_state = False
  147. def __del__(self):
  148. self.close_connect()
  149. def play_sound(self, tip="sound_tips_3"):
  150. self.windows.playsound.tips_type = tip
  151. self.windows.playsound.start()
  152. def analysis_received_data(self):
  153. if not self.connect_state:
  154. return
  155. if self.bluetooth_address:
  156. receive_data = self.bluetooth_ins.read_cmd_one(
  157. address=self.bluetooth_address
  158. )
  159. # print("received data", receive_data)
  160. else:
  161. receive_data = self.serial_ins.read_cmd(out_time=1, check=None)
  162. # print("self.bluetooth_ins", receive_data)
  163. if receive_data is False:
  164. self.connect_state = False
  165. return False
  166. if not receive_data:
  167. return
  168. # 数据 结构 command,按命令解析
  169. if receive_data[0] == 1:
  170. # self.play_sound("get_qr_code")
  171. # 扫码数据
  172. bar_code = receive_data[1:].decode()
  173. bar_code = bar_code.replace("\r", "")
  174. bar_code = bar_code.replace("\n", "")
  175. message = {"_type": 0, "plugins_mode": "remote_control", "data": bar_code}
  176. print(message)
  177. self.sendSocketMessage(code=0, msg="", data=message)
  178. return
  179. if receive_data[0] == 9:
  180. # 播放声音
  181. button_value = receive_data[1]
  182. data = {"button_value": button_value}
  183. message = {"_type": 9, "plugins_mode": "remote_control", "data": data}
  184. print(message)
  185. self.sendSocketMessage(code=0, msg="", data=message)
  186. if settings.IS_DEBUG:
  187. print("收到按键", button_value)
  188. return
  189. if receive_data[0] == 10:
  190. voltage_value = receive_data[1]
  191. self.set_voltage_value(voltage_value)
  192. if settings.IS_TEST:
  193. print("遥控器V2电量:{}".format(voltage_value))
  194. return
  195. # 使用Max17048 查看电量
  196. if receive_data[0] == 12:
  197. chg_status = self.get_data_from_receive_data(
  198. receive_data=receive_data, start=1, len_data=1
  199. )
  200. soc_percentage = self.get_data_from_receive_data(
  201. receive_data=receive_data, start=2, len_data=4
  202. )
  203. soc_percentage = soc_percentage / 100
  204. voltage = self.get_data_from_receive_data(
  205. receive_data=receive_data, start=6, len_data=4
  206. )
  207. voltage = voltage / 100
  208. current = self.get_data_from_receive_data(
  209. receive_data=receive_data, start=10, len_data=4
  210. )
  211. current = current / 10000
  212. temperature = self.get_data_from_receive_data(
  213. receive_data=receive_data, start=14, len_data=4
  214. )
  215. temperature = temperature / 100
  216. adjusted_soc = self.get_data_from_receive_data(
  217. receive_data=receive_data, start=18, len_data=4
  218. )
  219. adjusted_soc = adjusted_soc / 100
  220. soft_vision = self.get_data_from_receive_data(
  221. receive_data=receive_data, start=22, len_data=1
  222. )
  223. full_status = self.get_data_from_receive_data(
  224. receive_data=receive_data, start=23, len_data=1
  225. )
  226. # print("is_charging:{}".format(chg_status))
  227. # print("Battery SOC: {:.2f}%".format(soc_percentage))
  228. # print("Battery Voltage: {:.3f}V".format(voltage))
  229. # print("Average Current: {:.3f}A".format(current))
  230. # print("Chip Temperature: {:.1f}°C".format(temperature))
  231. # print("adjusted Battery soc: {:.2f}%".format(adjusted_soc))
  232. # print("soft_vision:{}".format(soft_vision))
  233. # print("chg_status:{} full_status:{}".format(chg_status, full_status))
  234. if chg_status:
  235. t1 = "充电中"
  236. else:
  237. t1 = ""
  238. if full_status:
  239. t2 = "已充满"
  240. else:
  241. t2 = ""
  242. self.set_voltage_value(
  243. voltage_text="遥控器:{:.1f}% {:.2f}V {}{}".format(
  244. min(adjusted_soc / 92.5 * 100, 100), voltage, t1, t2
  245. )
  246. )
  247. return
  248. if receive_data[0] == 111:
  249. value = (
  250. receive_data[1] << 24
  251. | receive_data[2] << 16
  252. | receive_data[3] << 8
  253. | receive_data[4]
  254. )
  255. print("遥控器-测试 value:{}".format(value))
  256. # self.windows.show_label.setText("--------{}".format(value))
  257. if receive_data[0] == 112:
  258. bar_code = receive_data[1:].decode()
  259. bar_code = bar_code.replace("\r", "")
  260. bar_code = bar_code.replace("\n", "")
  261. print("read bar_code {}".format(bar_code))
  262. # 通用串口数据解析器
  263. def get_data_from_receive_data(
  264. self, receive_data, start, len_data, data_magnification=1
  265. ):
  266. # data_magnification 数据放大倍数,或缩小倍数,默认为1
  267. try:
  268. if len_data == 1:
  269. data = receive_data[start]
  270. return data * data_magnification
  271. elif len_data == 2:
  272. data = receive_data[start] << 8 | receive_data[start + 1]
  273. return data * data_magnification
  274. elif len_data == 4:
  275. data = (
  276. receive_data[start] << 24
  277. | receive_data[start + 1] << 16
  278. | receive_data[start + 2] << 8
  279. | receive_data[start + 3]
  280. )
  281. return data * data_magnification
  282. return None
  283. except:
  284. return None
  285. async def run(self):
  286. # self.show_info.emit("未连接")
  287. # self.data_command_sign.emit(data)
  288. self.is_running = True
  289. print("Running")
  290. while 1:
  291. await asyncio.sleep(0.01)
  292. if not self.connect_state:
  293. message = {
  294. "_type": "show_info",
  295. "plugins_mode": "remote_control",
  296. "data": "遥控设备V2 未连接",
  297. }
  298. print(message)
  299. self.sendSocketMessage(code=1, msg="遥控设备V2 未连接", data=message)
  300. break
  301. self.analysis_received_data()
  302. self.is_running = False
  303. if not self.connect_state:
  304. message = {
  305. "_type": "show_info",
  306. "plugins_mode": "remote_control",
  307. "data": "遥控设备V2 未连接",
  308. }
  309. print(message)
  310. self.sendSocketMessage(code=1, msg="遥控设备V2 未连接", data=message)
  311. self.set_voltage_value(None)