RemoteControlV2.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409
  1. # module_remote_control_v2
  2. import json
  3. import time, asyncio
  4. import settings
  5. from .SerialIns import SerialIns
  6. from .BaseClass import BaseClass
  7. from sockets.connect_manager import ConnectionManager
  8. # from .BlueToothMode import BlueToothMode
  9. class RemoteControlV2(BaseClass):
  10. # sign_data = Signal(dict)
  11. def __init__(self, bluetooth_ins, websocket_manager: ConnectionManager):
  12. # 遥控设备处理--新版遥控器;硅胶按钮
  13. super().__init__(websocket_manager)
  14. self.msg_type = "blue_tooth"
  15. self.websocket_manager = websocket_manager
  16. # self.windows = windows
  17. self.serial_ins = None
  18. self.bluetooth_ins = bluetooth_ins
  19. self.port_name = ""
  20. self.bluetooth_address = ""
  21. self.connect_state = False
  22. self.is_running = False
  23. self.goods_art_no = None
  24. def to_connect_com(self, port_name, is_test=False):
  25. if self.connect_state:
  26. return
  27. self.close_connect()
  28. time.sleep(0.5)
  29. try:
  30. # 原值为9600
  31. self.serial_ins = SerialIns(port_name=port_name, baud=115200)
  32. # self.serial_ins = SerialIns(port_name=port_name, baud=9600)
  33. if self.serial_ins.serial_handle:
  34. message = {
  35. "_type": "show_info",
  36. "plugins_mode": "remote_control",
  37. "data": "遥控设备V2 打开串口成功",
  38. }
  39. self.sendSocketMessage(
  40. code=0,
  41. msg="遥控设备V2 打开串口成功",
  42. data=message,
  43. device_status=1,
  44. )
  45. # print(message)
  46. self.connect_state = True
  47. message = {
  48. "_type": "remote_control_connect",
  49. "plugins_mode": "remote_control",
  50. "data": port_name,
  51. }
  52. self.sendSocketMessage(code=0, msg="", data=message,device_status=2)
  53. print(message)
  54. self.port_name = port_name
  55. self.is_running = True
  56. self.set_voltage_value(0)
  57. self.run()
  58. return True
  59. else:
  60. message = {
  61. "_type": "show_info",
  62. "plugins_mode": "remote_control",
  63. "data": "遥控设备V2 打开串口失败",
  64. }
  65. self.sendSocketMessage(
  66. code=1, msg="遥控设备V2 打开串口失败", data=message,
  67. device_status=-1
  68. )
  69. print(message)
  70. self.serial_ins = None
  71. self.connect_state = False
  72. self.set_voltage_value(None)
  73. except:
  74. message = {
  75. "_type": "show_info",
  76. "plugins_mode": "remote_control",
  77. "data": "遥控设备V2 打开串口失败",
  78. }
  79. print(message)
  80. self.sendSocketMessage(
  81. code=1, msg="遥控设备V2 打开串口失败", data=message, device_status=-1
  82. )
  83. self.serial_ins = None
  84. self.connect_state = False
  85. self.set_voltage_value(None)
  86. return False
  87. def to_connect_bluetooth(self, address, is_test=False):
  88. print("to_connect_bluetooth", self.connect_state)
  89. if self.connect_state:
  90. return
  91. # if self.bluetooth_ins == None:
  92. # print("bluetooth_ins 未初始化", bluetooth_mode)
  93. # self.bluetooth_ins = BlueToothMode()
  94. # else:
  95. # self.bluetooth_ins = bluetooth_mode
  96. self.close_connect()
  97. if self.bluetooth_ins == None:
  98. print("bluetooth_ins 未初始化", self.bluetooth_ins)
  99. # self.bluetooth_ins = self.
  100. self.bluetooth_address = address
  101. message = {
  102. "_type": "show_info",
  103. "plugins_mode": "remote_control",
  104. "data": "遥控设备V2 打开蓝牙成功",
  105. }
  106. print(message)
  107. self.sendSocketMessage(code=0, msg="遥控设备V2 打开蓝牙成功", data=message,device_status=2)
  108. self.connect_state = True
  109. self.is_running = True
  110. self.set_voltage_value(0)
  111. if is_test is False:
  112. loop = asyncio.get_event_loop()
  113. loop.create_task(self.run())
  114. def close_bluetooth_connect(self):
  115. if self.bluetooth_address:
  116. print("蓝牙断开")
  117. message = {
  118. "_type": "show_info",
  119. "plugins_mode": "remote_control",
  120. "data": "遥控设备V2 蓝牙断开",
  121. }
  122. print(message)
  123. self.sendSocketMessage(
  124. code=1, msg="遥控设备V2 蓝牙断开", data=message, device_status=-1
  125. )
  126. self.close_connect()
  127. print("关闭蓝牙")
  128. def set_voltage_value(self, voltage_value=None, voltage_text=None):
  129. device_status = 2
  130. if self.is_running:
  131. flag = "接收器已连接 {}".format(
  132. "蓝牙" if self.bluetooth_address else "串口"
  133. )
  134. self.sendSocketMessage(code=0, msg="遥控设备V2 打开蓝牙成功", data=None,device_status=2)
  135. self.connect_state = True
  136. self.is_running = True
  137. else:
  138. flag = "接收器未连接"
  139. device_status = -1
  140. if voltage_value is None:
  141. if voltage_text:
  142. print(voltage_text)
  143. # 发送电量剩余消息
  144. self.sendSocketMessage(msg=voltage_text, device_status=device_status)
  145. else:
  146. self.sendSocketMessage(msg=flag, device_status=device_status)
  147. else:
  148. if voltage_value == 0:
  149. print(flag)
  150. self.sendSocketMessage(msg=flag, device_status=device_status)
  151. else:
  152. print("电量:{}%".format(voltage_value))
  153. self.sendSocketMessage(
  154. msg="电量:{}%".format(voltage_value), device_status=device_status
  155. )
  156. # print("打印===>", flag)
  157. def close_connect(self):
  158. self.port_name = ""
  159. self.bluetooth_address = ""
  160. # self.bluetooth_ins = None
  161. if self.serial_ins:
  162. self.serial_ins.close_serial_port()
  163. self.connect_state = False
  164. def __del__(self):
  165. self.close_connect()
  166. def play_sound(self, tip="sound_tips_3"):
  167. self.windows.playsound.tips_type = tip
  168. self.windows.playsound.start()
  169. def analysis_received_data(self):
  170. if not self.connect_state:
  171. return
  172. if self.bluetooth_address:
  173. receive_data = self.bluetooth_ins.read_cmd_one(
  174. address=self.bluetooth_address
  175. )
  176. # print("received data", receive_data)
  177. else:
  178. receive_data = self.serial_ins.read_cmd(out_time=1, check=None)
  179. # print("self.bluetooth_ins", receive_data)
  180. if receive_data is False:
  181. self.connect_state = False
  182. return False
  183. if not receive_data:
  184. return
  185. receive_data_parser = receive_data[0]
  186. # 数据 结构 command,按命令解析
  187. if receive_data_parser == 1:
  188. # self.play_sound("get_qr_code")
  189. # 扫码数据
  190. bar_code = receive_data[1:].decode()
  191. bar_code = bar_code.replace("\r", "")
  192. bar_code = bar_code.replace("\n", "")
  193. self.goods_art_no = bar_code
  194. message = {"_type": 0, "plugins_mode": "remote_control", "data": bar_code}
  195. print(message)
  196. self.sendSocketMessage(code=0, msg="", data=message, device_status=2)
  197. return
  198. if receive_data_parser == 9:
  199. # 播放声音
  200. button_value = receive_data[1]
  201. data = {"button_value": button_value}
  202. message = {"_type": 9, "plugins_mode": "remote_control", "data": data}
  203. print(message)
  204. if button_value in [1,2]:
  205. # 扫描货号
  206. print("收到货号信息", self.goods_art_no)
  207. if self.goods_art_no == None or self.goods_art_no =="":
  208. self.sendSocketMessage(
  209. code=1, msg="货号信息不能为空", data=None, device_status=-1
  210. )
  211. return
  212. control_program = (
  213. "执行左脚程序" if button_value == 1 else "执行右脚程序"
  214. )
  215. input_data = {
  216. "data": {
  217. "action": control_program,
  218. "goods_art_no": self.goods_art_no,
  219. },
  220. "type": "run_mcu",
  221. }
  222. # loop = asyncio.get_event_loop()
  223. # session = SqlQuery()
  224. # crud = CRUD(DeviceConfig)
  225. # condtions = {"mode_type": control_program, "action_status": True}
  226. # all_devices = crud.read_all(
  227. # session, conditions=condtions, order_by="action_index", ascending=True
  228. # )
  229. # if len(all_devices) == 0:
  230. # self.sendSocketMessage(
  231. # code=1,
  232. # msg=f"当前没有可用配置",
  233. # data=input_data,
  234. # device_status=2,
  235. # )
  236. # return
  237. # action_list = [device.model_dump() for device in all_devices]
  238. # print("action_list", action_list)
  239. # loop.create_task(
  240. # device_ctrl.run_mcu_config(action_list, self.goods_art_no),
  241. # name="run_mcu_config",
  242. # )
  243. self.sendSocketMessage(
  244. code=0,
  245. msg=f"准备执行[{control_program}]",
  246. data=input_data,
  247. device_status=2,
  248. )
  249. self.sendSocketMessage(code=0, msg="", data=message, device_status=2)
  250. if settings.IS_DEBUG:
  251. print("收到按键", button_value)
  252. return
  253. if receive_data_parser == 10:
  254. voltage_value = receive_data[1]
  255. self.set_voltage_value(voltage_value)
  256. if settings.IS_TEST:
  257. print("遥控器V2电量:{}".format(voltage_value))
  258. return
  259. # 使用Max17048 查看电量
  260. if receive_data_parser == 12:
  261. chg_status = self.get_data_from_receive_data(
  262. receive_data=receive_data, start=1, len_data=1
  263. )
  264. soc_percentage = self.get_data_from_receive_data(
  265. receive_data=receive_data, start=2, len_data=4
  266. )
  267. soc_percentage = soc_percentage / 100
  268. voltage = self.get_data_from_receive_data(
  269. receive_data=receive_data, start=6, len_data=4
  270. )
  271. voltage = voltage / 100
  272. current = self.get_data_from_receive_data(
  273. receive_data=receive_data, start=10, len_data=4
  274. )
  275. current = current / 10000
  276. temperature = self.get_data_from_receive_data(
  277. receive_data=receive_data, start=14, len_data=4
  278. )
  279. temperature = temperature / 100
  280. adjusted_soc = self.get_data_from_receive_data(
  281. receive_data=receive_data, start=18, len_data=4
  282. )
  283. adjusted_soc = adjusted_soc / 100
  284. soft_vision = self.get_data_from_receive_data(
  285. receive_data=receive_data, start=22, len_data=1
  286. )
  287. full_status = self.get_data_from_receive_data(
  288. receive_data=receive_data, start=23, len_data=1
  289. )
  290. # print("is_charging:{}".format(chg_status))
  291. # print("Battery SOC: {:.2f}%".format(soc_percentage))
  292. # print("Battery Voltage: {:.3f}V".format(voltage))
  293. # print("Average Current: {:.3f}A".format(current))
  294. # print("Chip Temperature: {:.1f}°C".format(temperature))
  295. # print("adjusted Battery soc: {:.2f}%".format(adjusted_soc))
  296. # print("soft_vision:{}".format(soft_vision))
  297. # print("chg_status:{} full_status:{}".format(chg_status, full_status))
  298. if chg_status:
  299. t1 = "充电中"
  300. else:
  301. t1 = ""
  302. if full_status:
  303. t2 = "已充满"
  304. else:
  305. t2 = ""
  306. self.set_voltage_value(
  307. voltage_text="遥控器:{:.1f}% {:.2f}V {}{}".format(
  308. min(adjusted_soc / 92.5 * 100, 100), voltage, t1, t2
  309. )
  310. )
  311. return
  312. if receive_data[0] == 111:
  313. value = (
  314. receive_data[1] << 24
  315. | receive_data[2] << 16
  316. | receive_data[3] << 8
  317. | receive_data[4]
  318. )
  319. print("遥控器-测试 value:{}".format(value))
  320. # self.windows.show_label.setText("--------{}".format(value))
  321. if receive_data[0] == 112:
  322. bar_code = receive_data[1:].decode()
  323. bar_code = bar_code.replace("\r", "")
  324. bar_code = bar_code.replace("\n", "")
  325. print("read bar_code {}".format(bar_code))
  326. # 通用串口数据解析器
  327. def get_data_from_receive_data(
  328. self, receive_data, start, len_data, data_magnification=1
  329. ):
  330. # data_magnification 数据放大倍数,或缩小倍数,默认为1
  331. try:
  332. if len_data == 1:
  333. data = receive_data[start]
  334. return data * data_magnification
  335. elif len_data == 2:
  336. data = receive_data[start] << 8 | receive_data[start + 1]
  337. return data * data_magnification
  338. elif len_data == 4:
  339. data = (
  340. receive_data[start] << 24
  341. | receive_data[start + 1] << 16
  342. | receive_data[start + 2] << 8
  343. | receive_data[start + 3]
  344. )
  345. return data * data_magnification
  346. return None
  347. except:
  348. return None
  349. async def run(self):
  350. # self.show_info.emit("未连接")
  351. # self.data_command_sign.emit(data)
  352. self.is_running = True
  353. print("Running")
  354. while 1:
  355. await asyncio.sleep(0.01)
  356. if not self.connect_state:
  357. message = {
  358. "_type": "show_info",
  359. "plugins_mode": "remote_control",
  360. "data": "遥控设备V2 未连接",
  361. }
  362. print(message)
  363. self.sendSocketMessage(code=1, msg="遥控设备V2 未连接", data=message,device_status=-1)
  364. break
  365. self.analysis_received_data()
  366. self.is_running = False
  367. if not self.connect_state:
  368. message = {
  369. "_type": "show_info",
  370. "plugins_mode": "remote_control",
  371. "data": "遥控设备V2 未连接",
  372. }
  373. print(message)
  374. self.sendSocketMessage(
  375. code=1, msg="遥控设备V2 未连接", data=message, device_status=-1
  376. )
  377. self.set_voltage_value(None)