# module_remote_control_v2 import json import time, asyncio import settings from .SerialIns import SerialIns from .BaseClass import BaseClass from sockets.connect_manager import ConnectionManager # from .BlueToothMode import BlueToothMode class RemoteControlV2(BaseClass): # sign_data = Signal(dict) def __init__(self, bluetooth_ins, websocket_manager: ConnectionManager): # 遥控设备处理--新版遥控器;硅胶按钮 super().__init__(websocket_manager) self.msg_type = "blue_tooth" self.websocket_manager = websocket_manager # self.windows = windows self.serial_ins = None self.bluetooth_ins = bluetooth_ins self.port_name = "" self.bluetooth_address = "" self.connect_state = False self.is_running = False self.goods_art_no = None def to_connect_com(self, port_name, is_test=False): if self.connect_state: return self.close_connect() time.sleep(0.5) try: # 原值为9600 self.serial_ins = SerialIns(port_name=port_name, baud=115200) # self.serial_ins = SerialIns(port_name=port_name, baud=9600) if self.serial_ins.serial_handle: message = { "_type": "show_info", "plugins_mode": "remote_control", "data": "遥控设备V2 打开串口成功", } self.sendSocketMessage( code=0, msg="遥控设备V2 打开串口成功", data=message, device_status=1, ) # print(message) self.connect_state = True message = { "_type": "remote_control_connect", "plugins_mode": "remote_control", "data": port_name, } self.sendSocketMessage(code=0, msg="", data=message,device_status=2) print(message) self.port_name = port_name self.is_running = True self.set_voltage_value(0) self.run() return True else: message = { "_type": "show_info", "plugins_mode": "remote_control", "data": "遥控设备V2 打开串口失败", } self.sendSocketMessage( code=1, msg="遥控设备V2 打开串口失败", data=message, device_status=-1 ) print(message) self.serial_ins = None self.connect_state = False self.set_voltage_value(None) except: message = { "_type": "show_info", "plugins_mode": "remote_control", "data": "遥控设备V2 打开串口失败", } print(message) self.sendSocketMessage( code=1, msg="遥控设备V2 打开串口失败", data=message, device_status=-1 ) self.serial_ins = None self.connect_state = False self.set_voltage_value(None) return False def to_connect_bluetooth(self, address, is_test=False): print("to_connect_bluetooth", self.connect_state) if self.connect_state: return # if self.bluetooth_ins == None: # print("bluetooth_ins 未初始化", bluetooth_mode) # self.bluetooth_ins = BlueToothMode() # else: # self.bluetooth_ins = bluetooth_mode self.close_connect() if self.bluetooth_ins == None: print("bluetooth_ins 未初始化", self.bluetooth_ins) # self.bluetooth_ins = self. self.bluetooth_address = address message = { "_type": "show_info", "plugins_mode": "remote_control", "data": "遥控设备V2 打开蓝牙成功", } print(message) self.sendSocketMessage(code=0, msg="遥控设备V2 打开蓝牙成功", data=message,device_status=2) self.connect_state = True self.is_running = True self.set_voltage_value(0) if is_test is False: loop = asyncio.get_event_loop() loop.create_task(self.run()) def close_bluetooth_connect(self): if self.bluetooth_address: print("蓝牙断开") message = { "_type": "show_info", "plugins_mode": "remote_control", "data": "遥控设备V2 蓝牙断开", } print(message) self.sendSocketMessage( code=1, msg="遥控设备V2 蓝牙断开", data=message, device_status=-1 ) self.close_connect() print("关闭蓝牙") def set_voltage_value(self, voltage_value=None, voltage_text=None): device_status = 2 if self.is_running: flag = "接收器已连接 {}".format( "蓝牙" if self.bluetooth_address else "串口" ) self.sendSocketMessage(code=0, msg="遥控设备V2 打开蓝牙成功", data=None,device_status=2) self.connect_state = True self.is_running = True else: flag = "接收器未连接" device_status = -1 if voltage_value is None: if voltage_text: print(voltage_text) # 发送电量剩余消息 self.sendSocketMessage(msg=voltage_text, device_status=device_status) else: self.sendSocketMessage(msg=flag, device_status=device_status) else: if voltage_value == 0: print(flag) self.sendSocketMessage(msg=flag, device_status=device_status) else: print("电量:{}%".format(voltage_value)) self.sendSocketMessage( msg="电量:{}%".format(voltage_value), device_status=device_status ) # print("打印===>", flag) def close_connect(self): self.port_name = "" self.bluetooth_address = "" # self.bluetooth_ins = None if self.serial_ins: self.serial_ins.close_serial_port() self.connect_state = False def __del__(self): self.close_connect() def play_sound(self, tip="sound_tips_3"): self.windows.playsound.tips_type = tip self.windows.playsound.start() def analysis_received_data(self): if not self.connect_state: return if self.bluetooth_address: receive_data = self.bluetooth_ins.read_cmd_one( address=self.bluetooth_address ) # print("received data", receive_data) else: receive_data = self.serial_ins.read_cmd(out_time=1, check=None) # print("self.bluetooth_ins", receive_data) if receive_data is False: self.connect_state = False return False if not receive_data: return receive_data_parser = receive_data[0] # 数据 结构 command,按命令解析 if receive_data_parser == 1: # self.play_sound("get_qr_code") # 扫码数据 bar_code = receive_data[1:].decode() bar_code = bar_code.replace("\r", "") bar_code = bar_code.replace("\n", "") self.goods_art_no = bar_code message = {"_type": 0, "plugins_mode": "remote_control", "data": bar_code} print(message) self.sendSocketMessage(code=0, msg="", data=message, device_status=2) return if receive_data_parser == 9: # 播放声音 button_value = receive_data[1] data = {"button_value": button_value} message = {"_type": 9, "plugins_mode": "remote_control", "data": data} print(message) if button_value in [1,2]: # 扫描货号 print("收到货号信息", self.goods_art_no) if self.goods_art_no == None or self.goods_art_no =="": self.sendSocketMessage( code=1, msg="货号信息不能为空", data=None, device_status=-1 ) return control_program = ( "执行左脚程序" if button_value == 1 else "执行右脚程序" ) input_data = { "data": { "action": control_program, "goods_art_no": self.goods_art_no, }, "type": "run_mcu", } # loop = asyncio.get_event_loop() # session = SqlQuery() # crud = CRUD(DeviceConfig) # condtions = {"mode_type": control_program, "action_status": True} # all_devices = crud.read_all( # session, conditions=condtions, order_by="action_index", ascending=True # ) # if len(all_devices) == 0: # self.sendSocketMessage( # code=1, # msg=f"当前没有可用配置", # data=input_data, # device_status=2, # ) # return # action_list = [device.model_dump() for device in all_devices] # print("action_list", action_list) # loop.create_task( # device_ctrl.run_mcu_config(action_list, self.goods_art_no), # name="run_mcu_config", # ) self.sendSocketMessage( code=0, msg=f"准备执行[{control_program}]", data=input_data, device_status=2, ) self.sendSocketMessage(code=0, msg="", data=message, device_status=2) if settings.IS_DEBUG: print("收到按键", button_value) return if receive_data_parser == 10: voltage_value = receive_data[1] self.set_voltage_value(voltage_value) if settings.IS_TEST: print("遥控器V2电量:{}".format(voltage_value)) return # 使用Max17048 查看电量 if receive_data_parser == 12: chg_status = self.get_data_from_receive_data( receive_data=receive_data, start=1, len_data=1 ) soc_percentage = self.get_data_from_receive_data( receive_data=receive_data, start=2, len_data=4 ) soc_percentage = soc_percentage / 100 voltage = self.get_data_from_receive_data( receive_data=receive_data, start=6, len_data=4 ) voltage = voltage / 100 current = self.get_data_from_receive_data( receive_data=receive_data, start=10, len_data=4 ) current = current / 10000 temperature = self.get_data_from_receive_data( receive_data=receive_data, start=14, len_data=4 ) temperature = temperature / 100 adjusted_soc = self.get_data_from_receive_data( receive_data=receive_data, start=18, len_data=4 ) adjusted_soc = adjusted_soc / 100 soft_vision = self.get_data_from_receive_data( receive_data=receive_data, start=22, len_data=1 ) full_status = self.get_data_from_receive_data( receive_data=receive_data, start=23, len_data=1 ) # print("is_charging:{}".format(chg_status)) # print("Battery SOC: {:.2f}%".format(soc_percentage)) # print("Battery Voltage: {:.3f}V".format(voltage)) # print("Average Current: {:.3f}A".format(current)) # print("Chip Temperature: {:.1f}°C".format(temperature)) # print("adjusted Battery soc: {:.2f}%".format(adjusted_soc)) # print("soft_vision:{}".format(soft_vision)) # print("chg_status:{} full_status:{}".format(chg_status, full_status)) if chg_status: t1 = "充电中" else: t1 = "" if full_status: t2 = "已充满" else: t2 = "" self.set_voltage_value( voltage_text="遥控器:{:.1f}% {:.2f}V {}{}".format( min(adjusted_soc / 92.5 * 100, 100), voltage, t1, t2 ) ) return if receive_data[0] == 111: value = ( receive_data[1] << 24 | receive_data[2] << 16 | receive_data[3] << 8 | receive_data[4] ) print("遥控器-测试 value:{}".format(value)) # self.windows.show_label.setText("--------{}".format(value)) if receive_data[0] == 112: bar_code = receive_data[1:].decode() bar_code = bar_code.replace("\r", "") bar_code = bar_code.replace("\n", "") print("read bar_code {}".format(bar_code)) # 通用串口数据解析器 def get_data_from_receive_data( self, receive_data, start, len_data, data_magnification=1 ): # data_magnification 数据放大倍数,或缩小倍数,默认为1 try: if len_data == 1: data = receive_data[start] return data * data_magnification elif len_data == 2: data = receive_data[start] << 8 | receive_data[start + 1] return data * data_magnification elif len_data == 4: data = ( receive_data[start] << 24 | receive_data[start + 1] << 16 | receive_data[start + 2] << 8 | receive_data[start + 3] ) return data * data_magnification return None except: return None async def run(self): # self.show_info.emit("未连接") # self.data_command_sign.emit(data) self.is_running = True print("Running") while 1: await asyncio.sleep(0.01) if not self.connect_state: message = { "_type": "show_info", "plugins_mode": "remote_control", "data": "遥控设备V2 未连接", } print(message) self.sendSocketMessage(code=1, msg="遥控设备V2 未连接", data=message,device_status=-1) break self.analysis_received_data() self.is_running = False if not self.connect_state: message = { "_type": "show_info", "plugins_mode": "remote_control", "data": "遥控设备V2 未连接", } print(message) self.sendSocketMessage( code=1, msg="遥控设备V2 未连接", data=message, device_status=-1 ) self.set_voltage_value(None)