# module_remote_control_v2 import time, asyncio import settings from .SerialIns import SerialIns from .BaseClass import BaseClass from sockets.connect_manager import ConnectionManager # from .BlueToothMode import BlueToothMode class RemoteControlV2(BaseClass): # sign_data = Signal(dict) def __init__(self, bluetooth_ins, websocket_manager: ConnectionManager): # 遥控设备处理--新版遥控器;硅胶按钮 super().__init__(websocket_manager) self.websocket_manager = websocket_manager # self.windows = windows self.serial_ins = None self.bluetooth_ins = bluetooth_ins self.port_name = "" self.bluetooth_address = "" self.connect_state = False self.is_running = False def to_connect_com(self, port_name, is_test=False): if self.connect_state: return self.close_connect() time.sleep(0.5) try: # 原值为9600 self.serial_ins = SerialIns(port_name=port_name, baud=115200) # self.serial_ins = SerialIns(port_name=port_name, baud=9600) if self.serial_ins.serial_handle: message = { "_type": "show_info", "plugins_mode": "remote_control", "data": "遥控设备V2 打开串口成功", } self.sendSocketMessage( code=0, msg="遥控设备V2 打开串口成功", data=message ) # print(message) self.connect_state = True message = { "_type": "remote_control_connect", "plugins_mode": "remote_control", "data": port_name, } self.sendSocketMessage(code=0, msg="", data=message) print(message) self.port_name = port_name self.is_running = True self.set_voltage_value(0) self.run() return True else: message = { "_type": "show_info", "plugins_mode": "remote_control", "data": "遥控设备V2 打开串口失败", } self.sendSocketMessage( code=1, msg="遥控设备V2 打开串口失败", data=message ) print(message) self.serial_ins = None self.connect_state = False self.set_voltage_value(None) except: message = { "_type": "show_info", "plugins_mode": "remote_control", "data": "遥控设备V2 打开串口失败", } print(message) self.sendSocketMessage(code=1, msg="遥控设备V2 打开串口失败", data=message) self.serial_ins = None self.connect_state = False self.set_voltage_value(None) return False def to_connect_bluetooth(self, address, is_test=False): print("to_connect_bluetooth", self.connect_state) if self.connect_state: return # if self.bluetooth_ins == None: # print("bluetooth_ins 未初始化", bluetooth_mode) # self.bluetooth_ins = BlueToothMode() # else: # self.bluetooth_ins = bluetooth_mode self.close_connect() if self.bluetooth_ins == None: print("bluetooth_ins 未初始化", self.bluetooth_ins) # self.bluetooth_ins = self. self.bluetooth_address = address message = { "_type": "show_info", "plugins_mode": "remote_control", "data": "遥控设备V2 打开蓝牙成功", } print(message) self.sendSocketMessage(code=0, msg="遥控设备V2 打开蓝牙成功", data=message) self.connect_state = True self.is_running = True self.set_voltage_value(0) if is_test is False: loop = asyncio.get_event_loop() loop.create_task(self.run()) def close_bluetooth_connect(self): if self.bluetooth_address: print("蓝牙断开") message = { "_type": "show_info", "plugins_mode": "remote_control", "data": "遥控设备V2 蓝牙断开", } print(message) self.sendSocketMessage(code=1, msg="遥控设备V2 蓝牙断开", data=message) self.close_connect() def set_voltage_value(self, voltage_value=None, voltage_text=None): if self.is_running: flag = "接收器已连接 {}".format( "蓝牙" if self.bluetooth_address else "串口" ) else: flag = "接收器未连接" if voltage_value is None: # self.windows.show_label.setText("{}".format(flag)) if voltage_text: print(voltage_text) # self.windows.show_label.setText("{}".format(voltage_text)) else: if voltage_value == 0: print(flag) # self.windows.show_label.setText("{}".format(flag)) else: print("电量:{}%".format(voltage_value)) # self.windows.show_label.setText("电量:{}%".format(voltage_value)) print("打印===>", flag) def close_connect(self): self.port_name = "" self.bluetooth_address = "" # self.bluetooth_ins = None if self.serial_ins: self.serial_ins.close_serial_port() self.connect_state = False def __del__(self): self.close_connect() def play_sound(self, tip="sound_tips_3"): self.windows.playsound.tips_type = tip self.windows.playsound.start() def analysis_received_data(self): if not self.connect_state: return if self.bluetooth_address: receive_data = self.bluetooth_ins.read_cmd_one( address=self.bluetooth_address ) # print("received data", receive_data) else: receive_data = self.serial_ins.read_cmd(out_time=1, check=None) # print("self.bluetooth_ins", receive_data) if receive_data is False: self.connect_state = False return False if not receive_data: return # 数据 结构 command,按命令解析 if receive_data[0] == 1: # self.play_sound("get_qr_code") # 扫码数据 bar_code = receive_data[1:].decode() bar_code = bar_code.replace("\r", "") bar_code = bar_code.replace("\n", "") message = {"_type": 0, "plugins_mode": "remote_control", "data": bar_code} print(message) self.sendSocketMessage(code=0, msg="", data=message) return if receive_data[0] == 9: # 播放声音 button_value = receive_data[1] data = {"button_value": button_value} message = {"_type": 9, "plugins_mode": "remote_control", "data": data} print(message) self.sendSocketMessage(code=0, msg="", data=message) if settings.IS_DEBUG: print("收到按键", button_value) return if receive_data[0] == 10: voltage_value = receive_data[1] self.set_voltage_value(voltage_value) if settings.IS_TEST: print("遥控器V2电量:{}".format(voltage_value)) return # 使用Max17048 查看电量 if receive_data[0] == 12: chg_status = self.get_data_from_receive_data( receive_data=receive_data, start=1, len_data=1 ) soc_percentage = self.get_data_from_receive_data( receive_data=receive_data, start=2, len_data=4 ) soc_percentage = soc_percentage / 100 voltage = self.get_data_from_receive_data( receive_data=receive_data, start=6, len_data=4 ) voltage = voltage / 100 current = self.get_data_from_receive_data( receive_data=receive_data, start=10, len_data=4 ) current = current / 10000 temperature = self.get_data_from_receive_data( receive_data=receive_data, start=14, len_data=4 ) temperature = temperature / 100 adjusted_soc = self.get_data_from_receive_data( receive_data=receive_data, start=18, len_data=4 ) adjusted_soc = adjusted_soc / 100 soft_vision = self.get_data_from_receive_data( receive_data=receive_data, start=22, len_data=1 ) full_status = self.get_data_from_receive_data( receive_data=receive_data, start=23, len_data=1 ) # print("is_charging:{}".format(chg_status)) # print("Battery SOC: {:.2f}%".format(soc_percentage)) # print("Battery Voltage: {:.3f}V".format(voltage)) # print("Average Current: {:.3f}A".format(current)) # print("Chip Temperature: {:.1f}°C".format(temperature)) # print("adjusted Battery soc: {:.2f}%".format(adjusted_soc)) # print("soft_vision:{}".format(soft_vision)) # print("chg_status:{} full_status:{}".format(chg_status, full_status)) if chg_status: t1 = "充电中" else: t1 = "" if full_status: t2 = "已充满" else: t2 = "" self.set_voltage_value( voltage_text="遥控器:{:.1f}% {:.2f}V {}{}".format( min(adjusted_soc / 92.5 * 100, 100), voltage, t1, t2 ) ) return if receive_data[0] == 111: value = ( receive_data[1] << 24 | receive_data[2] << 16 | receive_data[3] << 8 | receive_data[4] ) print("遥控器-测试 value:{}".format(value)) # self.windows.show_label.setText("--------{}".format(value)) if receive_data[0] == 112: bar_code = receive_data[1:].decode() bar_code = bar_code.replace("\r", "") bar_code = bar_code.replace("\n", "") print("read bar_code {}".format(bar_code)) # 通用串口数据解析器 def get_data_from_receive_data( self, receive_data, start, len_data, data_magnification=1 ): # data_magnification 数据放大倍数,或缩小倍数,默认为1 try: if len_data == 1: data = receive_data[start] return data * data_magnification elif len_data == 2: data = receive_data[start] << 8 | receive_data[start + 1] return data * data_magnification elif len_data == 4: data = ( receive_data[start] << 24 | receive_data[start + 1] << 16 | receive_data[start + 2] << 8 | receive_data[start + 3] ) return data * data_magnification return None except: return None async def run(self): # self.show_info.emit("未连接") # self.data_command_sign.emit(data) self.is_running = True print("Running") while 1: await asyncio.sleep(0.01) if not self.connect_state: message = { "_type": "show_info", "plugins_mode": "remote_control", "data": "遥控设备V2 未连接", } print(message) self.sendSocketMessage(code=1, msg="遥控设备V2 未连接", data=message) break self.analysis_received_data() self.is_running = False if not self.connect_state: message = { "_type": "show_info", "plugins_mode": "remote_control", "data": "遥控设备V2 未连接", } print(message) self.sendSocketMessage(code=1, msg="遥控设备V2 未连接", data=message) self.set_voltage_value(None)