RemoteControlV2.py 14 KB


  1. # module_remote_control_v2
  2. import json
  3. import time, asyncio
  4. import settings
  5. from .SerialIns import SerialIns
  6. from .BaseClass import BaseClass
  7. from sockets.connect_manager import ConnectionManager
  8. from sockets.socket_client import socket_manager
  9. # from .BlueToothMode import BlueToothMode
  10. class RemoteControlV2(BaseClass):
  11. # sign_data = Signal(dict)
  12. def __init__(self, bluetooth_ins, websocket_manager: ConnectionManager):
  13. # 遥控设备处理--新版遥控器;硅胶按钮
  14. super().__init__(websocket_manager)
  15. self.msg_type = "blue_tooth"
  16. self.websocket_manager = websocket_manager
  17. # self.windows = windows
  18. self.serial_ins = None
  19. self.bluetooth_ins = bluetooth_ins
  20. self.port_name = ""
  21. self.bluetooth_address = ""
  22. self.connect_state = False
  23. self.is_running = False
  24. self.goods_art_no = None
  25. def to_connect_com(self, port_name, is_test=False):
  26. if self.connect_state:
  27. return
  28. self.close_connect()
  29. time.sleep(0.5)
  30. try:
  31. # 原值为9600
  32. self.serial_ins = SerialIns(port_name=port_name, baud=115200)
  33. # self.serial_ins = SerialIns(port_name=port_name, baud=9600)
  34. if self.serial_ins.serial_handle:
  35. message = {
  36. "_type": "show_info",
  37. "plugins_mode": "remote_control",
  38. "data": "遥控设备V2 打开串口成功",
  39. }
  40. self.sendSocketMessage(
  41. code=0,
  42. msg="遥控设备V2 打开串口成功",
  43. data=message,
  44. device_status=1,
  45. )
  46. # print(message)
  47. self.connect_state = True
  48. message = {
  49. "_type": "remote_control_connect",
  50. "plugins_mode": "remote_control",
  51. "data": port_name,
  52. }
  53. self.sendSocketMessage(code=0, msg="", data=message,device_status=2)
  54. print(message)
  55. self.port_name = port_name
  56. self.is_running = True
  57. self.set_voltage_value(0)
  58. self.run()
  59. return True
  60. else:
  61. message = {
  62. "_type": "show_info",
  63. "plugins_mode": "remote_control",
  64. "data": "遥控设备V2 打开串口失败",
  65. }
  66. self.sendSocketMessage(
  67. code=1, msg="遥控设备V2 打开串口失败", data=message,
  68. device_status=-1
  69. )
  70. print(message)
  71. self.serial_ins = None
  72. self.connect_state = False
  73. self.set_voltage_value(None)
  74. except:
  75. message = {
  76. "_type": "show_info",
  77. "plugins_mode": "remote_control",
  78. "data": "遥控设备V2 打开串口失败",
  79. }
  80. print(message)
  81. self.sendSocketMessage(
  82. code=1, msg="遥控设备V2 打开串口失败", data=message, device_status=-1
  83. )
  84. self.serial_ins = None
  85. self.connect_state = False
  86. self.set_voltage_value(None)
  87. return False
  88. def to_connect_bluetooth(self, address, is_test=False):
  89. print("to_connect_bluetooth", self.connect_state)
  90. if self.connect_state:
  91. return
  92. # if self.bluetooth_ins == None:
  93. # print("bluetooth_ins 未初始化", bluetooth_mode)
  94. # self.bluetooth_ins = BlueToothMode()
  95. # else:
  96. # self.bluetooth_ins = bluetooth_mode
  97. self.close_connect()
  98. if self.bluetooth_ins == None:
  99. print("bluetooth_ins 未初始化", self.bluetooth_ins)
  100. # self.bluetooth_ins = self.
  101. self.bluetooth_address = address
  102. message = {
  103. "_type": "show_info",
  104. "plugins_mode": "remote_control",
  105. "data": "遥控设备V2 打开蓝牙成功",
  106. }
  107. print(message)
  108. self.sendSocketMessage(code=0, msg="遥控设备V2 打开蓝牙成功", data=message,device_status=2)
  109. self.connect_state = True
  110. self.is_running = True
  111. self.set_voltage_value(0)
  112. if is_test is False:
  113. loop = asyncio.get_event_loop()
  114. loop.create_task(self.run())
  115. def close_bluetooth_connect(self):
  116. if self.bluetooth_address:
  117. print("蓝牙断开")
  118. message = {
  119. "_type": "show_info",
  120. "plugins_mode": "remote_control",
  121. "data": "遥控设备V2 蓝牙断开",
  122. }
  123. print(message)
  124. self.sendSocketMessage(
  125. code=1, msg="遥控设备V2 蓝牙断开", data=message, device_status=-1
  126. )
  127. self.close_connect()
  128. print("关闭蓝牙")
  129. def set_voltage_value(self, voltage_value=None, voltage_text=None):
  130. device_status = 2
  131. if self.is_running:
  132. flag = "接收器已连接 {}".format(
  133. "蓝牙" if self.bluetooth_address else "串口"
  134. )
  135. else:
  136. flag = "接收器未连接"
  137. device_status = -1
  138. if voltage_value is None:
  139. if voltage_text:
  140. print(voltage_text)
  141. # 发送电量剩余消息
  142. self.sendSocketMessage(msg=voltage_text, device_status=device_status)
  143. else:
  144. self.sendSocketMessage(msg=flag, device_status=device_status)
  145. else:
  146. if voltage_value == 0:
  147. print(flag)
  148. self.sendSocketMessage(msg=flag, device_status=device_status)
  149. else:
  150. print("电量:{}%".format(voltage_value))
  151. self.sendSocketMessage(
  152. msg="电量:{}%".format(voltage_value), device_status=device_status
  153. )
  154. # print("打印===>", flag)
  155. def close_connect(self):
  156. self.port_name = ""
  157. self.bluetooth_address = ""
  158. # self.bluetooth_ins = None
  159. if self.serial_ins:
  160. self.serial_ins.close_serial_port()
  161. self.connect_state = False
  162. def __del__(self):
  163. self.close_connect()
  164. def play_sound(self, tip="sound_tips_3"):
  165. self.windows.playsound.tips_type = tip
  166. self.windows.playsound.start()
  167. def analysis_received_data(self):
  168. if not self.connect_state:
  169. return
  170. if self.bluetooth_address:
  171. receive_data = self.bluetooth_ins.read_cmd_one(
  172. address=self.bluetooth_address
  173. )
  174. # print("received data", receive_data)
  175. else:
  176. receive_data = self.serial_ins.read_cmd(out_time=1, check=None)
  177. # print("self.bluetooth_ins", receive_data)
  178. if receive_data is False:
  179. self.connect_state = False
  180. return False
  181. if not receive_data:
  182. return
  183. receive_data_parser = receive_data[0]
  184. # 数据 结构 command,按命令解析
  185. if receive_data_parser == 1:
  186. # self.play_sound("get_qr_code")
  187. # 扫码数据
  188. bar_code = receive_data[1:].decode()
  189. bar_code = bar_code.replace("\r", "")
  190. bar_code = bar_code.replace("\n", "")
  191. self.goods_art_no = bar_code
  192. message = {"_type": 0, "plugins_mode": "remote_control", "data": bar_code}
  193. print(message)
  194. self.sendSocketMessage(code=0, msg="", data=message, device_status=2)
  195. return
  196. if receive_data_parser == 9:
  197. # 播放声音
  198. button_value = receive_data[1]
  199. data = {"button_value": button_value}
  200. message = {"_type": 9, "plugins_mode": "remote_control", "data": data}
  201. print(message)
  202. if button_value in [1,2]:
  203. # 扫描货号
  204. print("收到货号信息", self.goods_art_no)
  205. if self.goods_art_no == None or self.goods_art_no =="":
  206. self.sendSocketMessage(
  207. code=1, msg="货号信息不能为空", data=None, device_status=-1
  208. )
  209. return
  210. control_program = (
  211. "执行左脚程序" if button_value == 1 else "执行右脚程序"
  212. )
  213. input_data = {
  214. "data": {
  215. "action": control_program,
  216. "goods_art_no": self.goods_art_no,
  217. },
  218. "type": "run_mcu",
  219. }
  220. loop = asyncio.get_event_loop()
  221. async def sendControlData():
  222. await socket_manager.websocket.send(json.dumps(input_data))
  223. loop.create_task(sendControlData(), name="sendControlData")
  224. self.sendSocketMessage(
  225. code=0,
  226. msg=f"准备执行[{control_program}]",
  227. data=input_data,
  228. device_status=2,
  229. )
  230. self.sendSocketMessage(code=0, msg="", data=message, device_status=2)
  231. if settings.IS_DEBUG:
  232. print("收到按键", button_value)
  233. return
  234. if receive_data_parser == 10:
  235. voltage_value = receive_data[1]
  236. self.set_voltage_value(voltage_value)
  237. if settings.IS_TEST:
  238. print("遥控器V2电量:{}".format(voltage_value))
  239. return
  240. # 使用Max17048 查看电量
  241. if receive_data_parser == 12:
  242. chg_status = self.get_data_from_receive_data(
  243. receive_data=receive_data, start=1, len_data=1
  244. )
  245. soc_percentage = self.get_data_from_receive_data(
  246. receive_data=receive_data, start=2, len_data=4
  247. )
  248. soc_percentage = soc_percentage / 100
  249. voltage = self.get_data_from_receive_data(
  250. receive_data=receive_data, start=6, len_data=4
  251. )
  252. voltage = voltage / 100
  253. current = self.get_data_from_receive_data(
  254. receive_data=receive_data, start=10, len_data=4
  255. )
  256. current = current / 10000
  257. temperature = self.get_data_from_receive_data(
  258. receive_data=receive_data, start=14, len_data=4
  259. )
  260. temperature = temperature / 100
  261. adjusted_soc = self.get_data_from_receive_data(
  262. receive_data=receive_data, start=18, len_data=4
  263. )
  264. adjusted_soc = adjusted_soc / 100
  265. soft_vision = self.get_data_from_receive_data(
  266. receive_data=receive_data, start=22, len_data=1
  267. )
  268. full_status = self.get_data_from_receive_data(
  269. receive_data=receive_data, start=23, len_data=1
  270. )
  271. # print("is_charging:{}".format(chg_status))
  272. # print("Battery SOC: {:.2f}%".format(soc_percentage))
  273. # print("Battery Voltage: {:.3f}V".format(voltage))
  274. # print("Average Current: {:.3f}A".format(current))
  275. # print("Chip Temperature: {:.1f}°C".format(temperature))
  276. # print("adjusted Battery soc: {:.2f}%".format(adjusted_soc))
  277. # print("soft_vision:{}".format(soft_vision))
  278. # print("chg_status:{} full_status:{}".format(chg_status, full_status))
  279. if chg_status:
  280. t1 = "充电中"
  281. else:
  282. t1 = ""
  283. if full_status:
  284. t2 = "已充满"
  285. else:
  286. t2 = ""
  287. self.set_voltage_value(
  288. voltage_text="遥控器:{:.1f}% {:.2f}V {}{}".format(
  289. min(adjusted_soc / 92.5 * 100, 100), voltage, t1, t2
  290. )
  291. )
  292. return
  293. if receive_data[0] == 111:
  294. value = (
  295. receive_data[1] << 24
  296. | receive_data[2] << 16
  297. | receive_data[3] << 8
  298. | receive_data[4]
  299. )
  300. print("遥控器-测试 value:{}".format(value))
  301. # self.windows.show_label.setText("--------{}".format(value))
  302. if receive_data[0] == 112:
  303. bar_code = receive_data[1:].decode()
  304. bar_code = bar_code.replace("\r", "")
  305. bar_code = bar_code.replace("\n", "")
  306. print("read bar_code {}".format(bar_code))
  307. # 通用串口数据解析器
  308. def get_data_from_receive_data(
  309. self, receive_data, start, len_data, data_magnification=1
  310. ):
  311. # data_magnification 数据放大倍数,或缩小倍数,默认为1
  312. try:
  313. if len_data == 1:
  314. data = receive_data[start]
  315. return data * data_magnification
  316. elif len_data == 2:
  317. data = receive_data[start] << 8 | receive_data[start + 1]
  318. return data * data_magnification
  319. elif len_data == 4:
  320. data = (
  321. receive_data[start] << 24
  322. | receive_data[start + 1] << 16
  323. | receive_data[start + 2] << 8
  324. | receive_data[start + 3]
  325. )
  326. return data * data_magnification
  327. return None
  328. except:
  329. return None
  330. async def run(self):
  331. # self.show_info.emit("未连接")
  332. # self.data_command_sign.emit(data)
  333. self.is_running = True
  334. print("Running")
  335. while 1:
  336. await asyncio.sleep(0.01)
  337. if not self.connect_state:
  338. message = {
  339. "_type": "show_info",
  340. "plugins_mode": "remote_control",
  341. "data": "遥控设备V2 未连接",
  342. }
  343. print(message)
  344. self.sendSocketMessage(code=1, msg="遥控设备V2 未连接", data=message,device_status=-1)
  345. break
  346. self.analysis_received_data()
  347. self.is_running = False
  348. if not self.connect_state:
  349. message = {
  350. "_type": "show_info",
  351. "plugins_mode": "remote_control",
  352. "data": "遥控设备V2 未连接",
  353. }
  354. print(message)
  355. self.sendSocketMessage(
  356. code=1, msg="遥控设备V2 未连接", data=message, device_status=-1
  357. )
  358. self.set_voltage_value(None)