from .BaseClass import BaseClass from utils.SingletonType import SingletonType import settings from .SerialIns import SerialIns import time from threading import Lock from collections import defaultdict # from threading import Thread class Mcu(BaseClass, metaclass=SingletonType): instance = None init_flag = None # sign_data = Signal(dict) # self_sign = Signal(dict) def __init__(self, window, port_name=None): super().__init__(BaseClass) if self.init_flag: return else: self.init_flag = True # 终端设备是否处于运动中 1运动中,2已停止 3未初始化 self.msg_type = "mcu" self.m_t = 1 self._mcu_move_state = 0 self.state_camera_motor = 3 self.state_camera_steering = 3 self.state_turntable_steering = 3 self.state_overturn_steering = 3 self.state_move_turntable_steering = 3 self.last_from_mcu_move_respond_data = None self.camera_motor_speed = 0 self.camera_motor_value = 0 self.init_state = False self.connect_state = False self.port_name = "" self.serial_ins = None self.connect_turn_state = False self.serial_turn_ins = None self.t_n = 0 if port_name: self.serial_ins = SerialIns(port_name=port_name, baud=115200) if self.serial_ins.serial_handle: self.connect_state = True self.port_name = port_name self.device_name_dict = { "camera_steering": 0, "camera_high_motor": 1, "turntable_steering": 2, "overturn_steering": 3, "laser_position": 4, "buzzer": 5, "split": 6, "turntable_position_motor": 7, "mp3_player": 8, "mcu": 99, } # 最近的mcu基础信息,用于获取数据状态检查 self.last_mcu_info_data = { "num": 0, "time": time.time(), "data": None, } # 最近的mcu的其他配置 self.last_mcu_other_info_data = { "num": 0, "time": time.time(), "data": {}, } self.command = { "to_device_move": 1, # 设备运动 "to_init_device": 2, # 初始化设备 "to_deal_other_device": 3, # 处理其他设备 "get_all_info": 29, # 获取所有信息 "set_deviation": 40, # 设置偏移量 "get_deviation": 41, # 读取偏移量 "signal_forwarding": 91, # 信号转发处理 "signal_forwarding_return": 92, # 信号转发返回 "get_other_info": 44, # 获取其他信息 "open_rgb_led": 43, ## RGB灯的处理与通讯 "set_other_info": 45, # 设置其他信息 "query_remote_control_battery": 47, # 查询遥控器电量 "set_turntable_mode": 48, # 设置转盘通讯方式 1、串口、2、无线、3 混合 "stop_mcu": 93, # 停止运行mcu } self.window = window self.last_push_time = defaultdict(float) self.is_running = False self.is_wait_connect = False # 等待链接 self.send_data_queue = [] # 发送队列 self.lock = Lock() # 是否是刚进行完初始化;首次初始化,需要运动到指定第一个指定位置 self.is_just_init_time = False self.init() # ===========注册命令函数============ self.deal_code_func_dict = { 29: self.get_from_mcu_base_info, # 获取基本情况 32: self.get_from_mcu_button, # 获取按键信息 42: self.get_from_mcu_deviation_info, # 获取偏移量信息 44: self.get_from_mcu_other_info, # 获取其他配置参数 90: self.get_from_mcu_connect_info, # 获取链接电脑信号 92: self.get_from_mcu_move_respond_data, # 获取MCU响应 100: self.print_mcu_error_data, # 打印下位机的错误内容 } # 打印下位机的错误内容 def print_mcu_error_data(self, receive_data): # 扫码数据 try: data = receive_data[1:].decode() if data == "设备初始化完成": self.init_state = False self.sendSocketMessage(msg=data,device_status=2) print("115 print_mcu_error_data:", data) except BaseException as e: print("117 error {}".format(e)) return # 停止运行mcu def stop_mcu(self): buf = [self.command["stop_mcu"]] buf.extend(self.encapsulation_data(data=1, len_data=1)) self.add_send_data_queue(buf) # 设置转盘通讯方式 1、串口、2、无线、3 混合 def to_set_turntable_mode(self, mode=1): buf = [self.command["set_turntable_mode"]] buf.extend(self.encapsulation_data(data=mode, len_data=1)) self.add_send_data_queue(buf) # 查询遥控器电量 def query_remote_control_battery(self): buf = [self.command["query_remote_control_battery"]] buf.extend(self.encapsulation_data(data=1, len_data=1)) buf.extend(self.encapsulation_data(data=1, len_data=1)) buf.extend(self.encapsulation_data(data=1, len_data=1)) self.add_send_data_queue(buf) # 获取MCU响应 def get_from_mcu_move_respond_data(self, receive_data): self.last_from_mcu_move_respond_data = receive_data def change_hex_to_int(self, _bytearray): return " ".join([hex(x) for x in _bytearray]) # 获取基本情况 def get_from_mcu_base_info(self, receive_data): # 数据缓存 self.last_mcu_info_data["time"] = time.time() self.last_mcu_info_data["num"] += 1 # print("last_mcu_info_data:{}".format(self.last_mcu_info_data["time"])) self.state_camera_motor = 3 self.state_camera_steering = 3 self.state_turntable_steering = 3 self.state_overturn_steering = 3 self.state_move_turntable_steering = 3 if len(receive_data) == 7: self.m_t = 1 laser_state = receive_data[1] self.state_camera_motor = receive_data[2] self.state_camera_steering = receive_data[3] self.state_turntable_steering = receive_data[4] self.state_overturn_steering = receive_data[5] flag = receive_data[6] message = { "_type": "show_mcu_info", "plugins_mode": "mcu", "data": "激光状态;{laser_state},相机高度状态:{state_camera_motor},相机角度状态:{state_camera_steering},转盘状态:{state_turntable_steering},翻板状态:{state_overturn_steering},flag:{flag}".format( laser_state=laser_state, state_camera_motor=self.state_camera_motor, state_camera_steering=self.state_camera_steering, state_turntable_steering=self.state_turntable_steering, state_overturn_steering=self.state_overturn_steering, flag=flag, ), "data_state": { "state_camera_motor": self.state_camera_motor, "state_camera_steering": self.state_camera_steering, "state_turntable_steering": self.state_turntable_steering, "state_overturn_steering": self.state_overturn_steering, }, } self.sendSocketMessage(msg="获取mcu设备运行状态信息", data=message) # print("转盘:{},时间:{}".format(self.state_turntable_steering, time.time())) if len(receive_data) == 8: self.m_t = 2 laser_state = receive_data[1] self.state_camera_motor = receive_data[2] self.state_camera_steering = receive_data[3] self.state_turntable_steering = receive_data[4] self.state_overturn_steering = receive_data[5] self.state_move_turntable_steering = receive_data[6] flag = receive_data[7] message = { "_type": "show_mcu_info", "plugins_mode": "mcu", "data": "激光状态;{laser_state},相机高度状态:{state_camera_motor},相机角度状态:{state_camera_steering},转盘状态:{state_turntable_steering},转盘前后移动状态:{state_move_turntable_steering},翻板状态:{state_overturn_steering},flag:{flag}".format( laser_state=laser_state, state_camera_motor=self.state_camera_motor, state_camera_steering=self.state_camera_steering, state_turntable_steering=self.state_turntable_steering, state_overturn_steering=self.state_overturn_steering, state_move_turntable_steering=self.state_move_turntable_steering, flag=flag, ), "data_state": { "state_camera_motor": self.state_camera_motor, "state_camera_steering": self.state_camera_steering, "state_turntable_steering": self.state_turntable_steering, "state_overturn_steering": self.state_overturn_steering, "state_move_turntable_steering": self.state_move_turntable_steering, }, } self.sendSocketMessage(msg="获取mcu设备运行状态信息", data=message) # 检查是否成功初始化 if self.is_just_init_time is False: if self.mcu_move_state == 2: self.is_just_init_time = True print("is_just_init_time") message = { "_type": "is_just_init_time", "plugins_mode": "mcu", "data": "", } self.sendSocketMessage(msg="检查设备初始化", data=message) return # 获取其他信息 def get_from_mcu_other_info(self, receive_data): is_auto_send_base_info = self.get_data_from_receive_data( receive_data=receive_data, start=1, len_data=1 ) is_move_retry = self.get_data_from_receive_data( receive_data=receive_data, start=2, len_data=1 ) is_data_response = self.get_data_from_receive_data( receive_data=receive_data, start=3, len_data=1 ) low_speed = self.get_data_from_receive_data( receive_data=receive_data, start=4, len_data=2 ) is_test = self.get_data_from_receive_data( receive_data=receive_data, start=6, len_data=1 ) to_init_mode = self.get_data_from_receive_data( receive_data=receive_data, start=7, len_data=1 ) turntable_move_to_init_mode = self.get_data_from_receive_data( receive_data=receive_data, start=8, len_data=1 ) led_count = self.get_data_from_receive_data( receive_data=receive_data, start=9, len_data=2 ) turntable_steering_angle_ratio = self.get_data_from_receive_data( receive_data=receive_data, start=11, len_data=2 ) is_manual_check = self.get_data_from_receive_data( receive_data=receive_data, start=13, len_data=1 ) camera_steering_angle_ratio = self.get_data_from_receive_data( receive_data=receive_data, start=14, len_data=4 ) is_auto_motor_to_disable = self.get_data_from_receive_data( receive_data=receive_data, start=18, len_data=1 ) diff_dir = self.get_data_from_receive_data( receive_data=receive_data, start=19, len_data=1 ) is_auto_send_pos_info = self.get_data_from_receive_data( receive_data=receive_data, start=20, len_data=1 ) is_dog = self.get_data_from_receive_data( receive_data=receive_data, start=21, len_data=1 ) has_been_set_motor_config = self.get_data_from_receive_data( receive_data=receive_data, start=22, len_data=1 ) self.last_mcu_other_info_data["data"] = { "is_auto_send_base_info": is_auto_send_base_info, "is_move_retry": is_move_retry, "is_data_response": is_data_response, "low_speed": low_speed, "is_test": is_test, "to_init_mode": to_init_mode, "turntable_move_to_init_mode": turntable_move_to_init_mode, "led_count": led_count, "turntable_steering_angle_ratio": turntable_steering_angle_ratio, "is_manual_check": is_manual_check, "camera_steering_angle_ratio": camera_steering_angle_ratio, "is_auto_motor_to_disable": is_auto_motor_to_disable, "diff_dir": diff_dir, "is_auto_send_pos_info": is_auto_send_pos_info, "is_dog": is_dog, "has_been_set_motor_config": has_been_set_motor_config, } self.last_mcu_other_info_data["time"] = time.time() self.last_mcu_other_info_data["num"] += 1 for k, v in self.last_mcu_other_info_data["data"].items(): print("k:{},v:{}".format(k, v)) # 设置其他信息 def set_to_mcu_other_info(self, data): is_auto_send_base_info = data["is_auto_send_base_info"] is_move_retry = data["is_move_retry"] is_data_response = data["is_data_response"] low_speed = data["low_speed"] is_test = data["is_test"] to_init_mode = data["to_init_mode"] turntable_move_to_init_mode = data["turntable_move_to_init_mode"] led_count = data["led_count"] turntable_steering_angle_ratio = data["turntable_steering_angle_ratio"] is_manual_check = data["is_manual_check"] camera_steering_angle_ratio = data["camera_steering_angle_ratio"] is_auto_motor_to_disable = data["is_auto_motor_to_disable"] diff_dir = data["diff_dir"] is_auto_send_pos_info = data["is_auto_send_pos_info"] is_dog = data["is_dog"] has_been_set_motor_config = data["has_been_set_motor_config"] buf = [self.command["set_other_info"]] buf.extend(self.encapsulation_data(data=is_auto_send_base_info, len_data=1)) buf.extend(self.encapsulation_data(data=is_move_retry, len_data=1)) buf.extend(self.encapsulation_data(data=is_data_response, len_data=1)) buf.extend(self.encapsulation_data(data=low_speed, len_data=2)) buf.extend(self.encapsulation_data(data=is_test, len_data=1)) buf.extend(self.encapsulation_data(data=to_init_mode, len_data=1)) buf.extend( self.encapsulation_data(data=turntable_move_to_init_mode, len_data=1) ) buf.extend(self.encapsulation_data(data=led_count, len_data=2)) buf.extend( self.encapsulation_data(data=turntable_steering_angle_ratio, len_data=2) ) buf.extend(self.encapsulation_data(data=is_manual_check, len_data=1)) buf.extend( self.encapsulation_data(data=camera_steering_angle_ratio, len_data=4) ) buf.extend(self.encapsulation_data(data=is_auto_motor_to_disable, len_data=1)) buf.extend(self.encapsulation_data(data=diff_dir, len_data=1)) buf.extend(self.encapsulation_data(data=is_auto_send_pos_info, len_data=1)) buf.extend(self.encapsulation_data(data=is_dog, len_data=1)) buf.extend(self.encapsulation_data(data=has_been_set_motor_config, len_data=1)) self.add_send_data_queue(buf) # 获取按键信息 def get_from_mcu_button(self, receive_data): button_name = receive_data[1] self.deal_mcu_button(button_name) # 获取偏移量信息 def get_from_mcu_deviation_info(self, receive_data): if len(receive_data) == 18: camera_high_motor_deviation_dir = receive_data[1] camera_high_motor_deviation = receive_data[2] << 8 | receive_data[3] camera_high_motor_deviation = ( camera_high_motor_deviation * -1 if camera_high_motor_deviation_dir == 0 else camera_high_motor_deviation ) camera_steering_deviation_dir = receive_data[4] camera_steering_deviation = (receive_data[5] << 8 | receive_data[6]) * 0.1 camera_steering_deviation = ( camera_steering_deviation * -1 if camera_steering_deviation_dir == 0 else camera_steering_deviation ) turntable_steering_deviation_dir = receive_data[7] turntable_steering_deviation = ( receive_data[8] << 8 | receive_data[9] ) * 0.1 turntable_steering_deviation = ( turntable_steering_deviation * -1 if turntable_steering_deviation_dir == 0 else turntable_steering_deviation ) overturn_steering_middle_dir = receive_data[10] overturn_steering_middle = (receive_data[11] << 8 | receive_data[12]) * 0.1 overturn_steering_middle = ( overturn_steering_middle * -1 if overturn_steering_middle_dir == 0 else overturn_steering_middle ) overturn_steering_high_dir = receive_data[13] overturn_steering_high = (receive_data[14] << 8 | receive_data[15]) * 0.1 overturn_steering_high = ( overturn_steering_middle * -1 if overturn_steering_high_dir == 0 else overturn_steering_high ) overturn_steering_up_speed = receive_data[16] overturn_steering_down_speed = receive_data[17] # self.sign_data.emit( # { # "_type": "get_deviation_data", # "plugins_mode": "mcu", # "data": { # "camera_high_motor_deviation": camera_high_motor_deviation, # "camera_steering_deviation": camera_steering_deviation, # "turntable_steering_deviation": turntable_steering_deviation, # "overturn_steering_middle": overturn_steering_middle, # "overturn_steering_high": overturn_steering_high, # "overturn_steering_up_speed": overturn_steering_up_speed, # "overturn_steering_down_speed": overturn_steering_down_speed, # }, # } # ) message = { "_type": "get_deviation_data", "plugins_mode": "mcu", "data": { "camera_high_motor_deviation": camera_high_motor_deviation, "camera_steering_deviation": camera_steering_deviation, "turntable_steering_deviation": turntable_steering_deviation, "overturn_steering_middle": overturn_steering_middle, "overturn_steering_high": overturn_steering_high, "overturn_steering_up_speed": overturn_steering_up_speed, "overturn_steering_down_speed": overturn_steering_down_speed, }, } self.sendSocketMessage(msg="接收偏移量信息", data=message) print("接收偏移量信息") return # 设备链接信息 def get_from_mcu_connect_info(self, receive_data): connect_flag = receive_data[1] device_id = receive_data[2] try: mcu_has_been_set = receive_data[6] # 设备是否有初始化 ,1 表示已初始化 except: mcu_has_been_set = 99 # 未知状态 # self.self_sign.emit({"type": "connect_sign", "data": connect_flag}) message = {"type": "connect_sign", "data": connect_flag} self.sendSocketMessage(msg="接收链接信息", data=message) print("接收链接信息") return def init(self): self.self_sign.connect(self.deal_self_sign_data) # LED 灯光处理 def open_rgb_led( self, color_name, led_command=1, brightness=80, enable=True, mode="loop", times=2, interval=0.1, ): color_name_value = { "红色": (156, 6, 3), "黄色": (255, 206, 25), "绿色": (0, 128, 0), "蓝色": (0, 25, 255), "白色": (255, 250, 227), } if color_name in color_name_value: buf = [self.command["open_rgb_led"]] buf.append(1 if enable else 0) buf.append(led_command) buf.extend(color_name_value[color_name]) buf.extend( [ brightness, 1 if mode == "loop" else 2, times, int(interval * 10), ] ) self.add_send_data_queue(buf) def deal_self_sign_data(self, sign_data): if sign_data["type"] == "connect_sign": if sign_data["data"] == 0: self.is_running = False else: self.is_running = True def to_connect_com(self, port_name, is_test=False): # 关闭串口 print("to_connect_com", port_name) self.close_connect() time.sleep(0.3) self.connect_state = False try: self.serial_ins = SerialIns(port_name=port_name, baud=115200, timeout=0.1) if not self.serial_ins.serial_handle: # self.sign_data.emit( # { # "_type": "show_info", # "plugins_mode": "mcu", # "data": "MCU 打开串口失败", # } # ) message = { "_type": "show_info", "plugins_mode": "mcu", "data": "MCU 打开串口失败", } self.sendSocketMessage(code=1,msg="接收链接信息", data=message) self.serial_ins = None self.connect_state = False return False except: # self.sign_data.emit( # { # "_type": "show_info", # "plugins_mode": "mcu", # "data": "MCU 打开串口失败", # } # ) message = { "_type": "show_info", "plugins_mode": "mcu", "data": "MCU 打开串口失败", } self.sendSocketMessage(code=1,msg="MCU 打开串口失败", data=message) self.serial_ins = None self.connect_state = False return False # self.sign_data.emit( # {"_type": "show_info", "plugins_mode": "mcu", "data": "MCU 开始连接"} # ) message = {"_type": "show_info", "plugins_mode": "mcu", "data": "MCU 开始连接"} self.sendSocketMessage( code=1, msg="MCU 开始连接", data=message, device_status=1 ) # =======================发送连接请求================================= cmd = 90 data = [cmd, 1] print("405 发送 连接请求 -----------------------------------------") print(self.serial_ins) # self.serial_ins.clearn_flush() self.serial_ins.write_cmd(data) # 延迟接收数据 time.sleep(0.3) receive_data = self.serial_ins.read_cmd(out_time=1) if receive_data: print( "409 receive_data--90:{}".format(self.change_hex_to_int(receive_data)) ) if receive_data: # receive_data[2]=1 表示为MCU设备编号 if receive_data[0] == 90 and receive_data[2] == 1: connect_flag = receive_data[1] # 是否有初始化 try: mcu_has_been_set = receive_data[ 6 ] # 设备是否有初始化 ,1 表示已初始化 except: mcu_has_been_set = 99 # 未知状态 print("MCU初始化信息{}".format(mcu_has_been_set)) # self.sign_data.emit( # {"_type": "show_info", "plugins_mode": "mcu", "data": "MCU 已连接"} # ) message = { "_type": "show_info", "plugins_mode": "mcu", "data": "MCU 已连接", } self.sendSocketMessage(code=1, msg="MCU 开始连接", data=message) self.connect_state = True if not self.is_running: print("MCU start") self.start() self.self_sign.emit({"type": "connect_sign", "data": connect_flag}) print("MCU 已连接") self.port_name = port_name return print("MCU 连接失败") self.sign_data.emit( {"_type": "show_info", "plugins_mode": "mcu", "data": "MCU 连接失败"} ) self.close_connect() def close_connect(self): self.port_name = "" if self.serial_ins: self.serial_ins.close_serial_port() self.connect_state = False self.init_state = False @property def mcu_move_state(self): if self.m_t == 1: if ( self.state_camera_motor == 2 and self.state_camera_steering == 2 and self.state_turntable_steering == 2 and self.state_overturn_steering == 2 ): self._mcu_move_state = 2 else: self._mcu_move_state = 1 else: if ( self.state_camera_motor == 2 and self.state_camera_steering == 2 and self.state_turntable_steering == 2 and self.state_overturn_steering == 2 and self.state_move_turntable_steering == 2 ): self._mcu_move_state = 2 else: self._mcu_move_state = 1 # self._mcu_move_state = 2 return self._mcu_move_state def get_basic_info_mcu(self): receive_data = self.serial_ins.read_cmd(out_time=1) if receive_data is False: print("------------------------------------------------4657564654") self.connect_state = False return False if not receive_data: return # print("receive_data") # 数据 结构 command,按命令解析 # command 0(9) 相机高度1-2 相机角度3-4 转盘角度5-6 灯光状态7 激光指示器状态8,运行状态9 command = receive_data[0] if command in self.deal_code_func_dict: self.deal_code_func_dict[command](receive_data) def run(self): self.is_running = True self.serial_ins.clearn_flush() self.to_init_device_origin_point(device_name="mcu") print("MCU 开始循环~") while 1: time.sleep(0.01) if not self.serial_ins or not self.connect_state: break try: # print("mcu send_cmd") self.send_cmd() # time.sleep(0.01) self.get_basic_info_mcu() except BaseException as e: print("121231298908", e) break self.is_running = False self.connect_state = False print("MCU 循环退出~") # self.sign_data.emit( # {"_type": "show_info", "plugins_mode": "mcu", "data": "MCU 连接失败"} # ) message = {"_type": "show_info", "plugins_mode": "mcu", "data": "MCU 连接失败"} self.sendSocketMessage(code=1,msg="MCU 连接失败",data=message) self.close_connect() def __del__(self): self.close_connect() def send_cmd(self): self.lock.acquire() if self.send_data_queue: data = self.send_data_queue.pop(0) self.serial_ins.write_cmd(data) else: self.t_n += 1 # 加大发送获取基础数据的时间间隔 # 默认为0.01秒一个循环,每隔1.5秒发送数据 if self.t_n > 150: self.t_n = 0 data = [self.command["get_all_info"], 1] self.serial_ins.write_cmd(data) self.lock.release() def add_send_data_queue(self, data): self.lock.acquire() if self.serial_ins: # print("send_data_queue append :{}".format(data)) self.send_data_queue.append(data) self.lock.release() def get_deviation(self): # 发送获取偏移量 data = [self.command["get_deviation"], 1] self.add_send_data_queue(data) # if self.serial_ins: # self.serial_ins.write_cmd(data) print("发送获取偏移量") def get_other_info(self): # 发送获取偏移量 data = [self.command["get_other_info"], 1] self.add_send_data_queue(data) print("发送获取其他信息") def set_deviation(self, device_name, _type=0, deviation=0): # turntable----0 angle_ratio 1 turntable_steering_deviation # overturn ----0 middle 1 high if device_name == "camera_high_motor": deviation = deviation / 10 # deviation 原单位为mm if device_name == "turntable_position_motor": deviation = deviation / 10 # deviation 原单位为mm if device_name == "camera_steering": pass if device_name == "turntable_steering": pass if device_name == "overturn_steering": pass device_id = self.device_name_dict[device_name] _dir = 1 if deviation >= 0 else 0 deviation = int(abs(deviation * 10)) data = [ self.command["set_deviation"], device_id, _type, _dir, 0xFF & deviation >> 8, 0xFF & deviation, ] self.add_send_data_queue(data) def deal_mcu_button(self, button_name): # 防止重复点击 s = time.time() if s - self.last_push_time[button_name] > 0.1: self.last_push_time[button_name] = s # print("button_name", button_name) else: self.last_push_time[button_name] = s return if button_name == 1: # 自动执行全部 # self.sign_data.emit( # {"_type": "mcu_button", "plugins_mode": "mcu", "data": "todo_1"} # ) message = {"_type": "mcu_button", "plugins_mode": "mcu", "data": "todo_1"} self.sendSocketMessage(code=0, msg="", data=message) if button_name == 2: # self.sign_data.emit( # {"_type": "mcu_button", "plugins_mode": "mcu", "data": "todo_2"} # ) message = {"_type": "mcu_button", "plugins_mode": "mcu", "data": "todo_2"} self.sendSocketMessage(code=0, msg="", data=message) if button_name == 3: # self.sign_data.emit( # {"_type": "mcu_button", "plugins_mode": "mcu", "data": "todo_3"} # ) message = {"_type": "mcu_button", "plugins_mode": "mcu", "data": "todo_3"} self.sendSocketMessage(code=0, msg="", data=message) def get_mcu_state(self): """ 获取mcu 设备状态,当状态为停止时,其他程序才可正常执行 :return: """ return True def to_get_mcu_base_info(self): if self.connect_state: self.lock.acquire() # print('==========================>1111') # print("-------------------to_get_mcu_base_info--------------------------") data = [self.command["get_all_info"], 1] f = True try: self.serial_ins.write_cmd(data) except: f = False pass self.lock.release() if not f: self.connect_state = False return False else: return True def to_deal_device(self, device_name, value=1, _type=0, times=1, delay=0): """ value 激光0关 1开 mp3_player value 表示0表示关,1表示开,_type 表示歌曲切换到指定歌曲 delay:延迟处理,单位为0.1秒,即delay=100时,表示延迟10秒 """ device_id = self.device_name_dict[device_name] if device_name == "buzzer": value = int(value) cmd = 3 data = [ cmd, device_id, value, _type, times, delay, ] self.add_send_data_queue(data) # if self.serial_ins: # self.serial_ins.write_cmd(data) return True # 让设备运动到原点 并设置MCU需要进行初始化 def to_init_device_origin_point(self, device_name, is_force=False): device_id = self.device_name_dict[device_name] cmd = 2 data = [cmd, device_id, 0 if is_force is False else 1] self.open_rgb_led(color_name="红色") self.add_send_data_queue(data) if device_name == "mcu": # 重置初始化标记为 从未初始化 QTimer.singleShot(5000, self.just_init) return True def just_init(self, *args): # 重置初始化标记为 从未初始化 self.is_just_init_time = False def to_device_move( self, device_name, value=0, max_speed=None, up_speed=None, down_speed=None, _is_debug=0, is_relative=0, is_deviation=1, ): """ 此处输入单位为 毫米,以及度 需要先缩小,再放大 """ speed = settings.moveSpeed() cmd = 1 device_id = self.device_name_dict[device_name] if device_name == "camera_high_motor": # value 单位毫米 # max_speed = 10000 if max_speed is None else max_speed # up_speed = 800 if up_speed is None else up_speed # down_speed = 700 if down_speed is None else down_speed max_speed = ( speed[device_name]["max_speed"] if max_speed is None else max_speed ) up_speed = speed[device_name]["up_speed"] if up_speed is None else up_speed down_speed = ( speed[device_name]["down_speed"] if down_speed is None else down_speed ) value = value / 10 # value 单位毫米 assert 0 <= value <= 40 assert 0 <= max_speed <= 10000 if device_name == "camera_steering": # 角度为度 未放大 精确到0.1度 max_speed = 6000 if max_speed is None else max_speed up_speed = 500 if up_speed is None else up_speed down_speed = 500 if down_speed is None else down_speed assert -360 <= value <= 360 if device_name == "turntable_steering": # 角度为度 未放大 精确到0.1度 # max_speed = 6000 if max_speed is None else max_speed # up_speed = 500 if up_speed is None else up_speed # down_speed = 400 if down_speed is None else down_speed max_speed = ( speed[device_name]["max_speed"] if max_speed is None else max_speed ) up_speed = speed[device_name]["up_speed"] if up_speed is None else up_speed down_speed = ( speed[device_name]["down_speed"] if down_speed is None else down_speed ) assert -720 <= value <= 720 if device_name == "overturn_steering": # 角度为度 未放大 精确到0.1度 max_speed = 2 if max_speed is None else max_speed up_speed = 1 if up_speed is None else up_speed down_speed = 1 if down_speed is None else down_speed assert 0 <= value <= 360 if device_name == "turntable_position_motor": # value 单位毫米 max_speed = 11000 if max_speed is None else max_speed up_speed = 900 if up_speed is None else up_speed down_speed = 900 if down_speed is None else down_speed value = value / 10 # value 单位毫米 assert 0 <= value <= 900 assert 0 <= max_speed <= 15000 _dir = True if value >= 0 else False value = int(abs(value * 10)) # 此处value赋值后,单位为mm以及0.1度 data = [ cmd, device_id, 1 if _dir else 0, 0xFF & value >> 8, 0xFF & value, 0xFF & max_speed >> 8, 0xFF & max_speed, 0xFF & up_speed >> 8, 0xFF & up_speed, 0xFF & down_speed >> 8, 0xFF & down_speed, _is_debug, is_deviation, is_relative, ] self.add_send_data_queue(data) # if self.serial_ins: # self.serial_ins.write_cmd(data) # 通用串口数据解析器 def get_data_from_receive_data( self, receive_data, start, len_data, data_magnification=1 ): # data_magnification 数据放大倍数,或缩小倍数,默认为1 try: if len_data == 1: data = receive_data[start] return data * data_magnification elif len_data == 2: data = receive_data[start] << 8 | receive_data[start + 1] return data * data_magnification elif len_data == 4: data = ( receive_data[start] << 24 | receive_data[start + 1] << 16 | receive_data[start + 2] << 8 | receive_data[start + 3] ) return data * data_magnification return None except: return None def encapsulation_data(self, data, len_data, data_magnification=1): # data_magnification 数据放大倍数,或缩小倍数,默认为1 data = int(data * data_magnification) if len_data == 1: return [0xFF & data] elif len_data == 2: return [0xFF & data >> 8, 0xFF & data] elif len_data == 4: return [0xFF & data >> 24, 0xFF & data >> 16, 0xFF & data >> 8, 0xFF & data] def __new__(cls, *args, **kwargs): """如果当前没有实例时,调用父类__new__方法,生成示例,有则返回保存的内存地址。""" if not cls.instance: cls.instance = super().__new__(cls) return cls.instance class McuDebug(object): def __init__(self, windows, is_debug=True, is_deviation=False): self.windows = windows self.is_debug = 1 if is_debug else 0 self.is_deviation = 1 if is_deviation else 0 def camera_high_motor(self, value): # 相机电机 self.windows.mcu.to_device_move( device_name="camera_high_motor", value=value, max_speed=1400, up_speed=400, down_speed=100, _is_debug=self.is_debug, is_deviation=self.is_deviation, ) def camera_steering(self, value): # 相机舵机 self.windows.mcu.to_device_move( device_name="camera_steering", value=value, _is_debug=self.is_debug, is_deviation=self.is_deviation, ) def turntable_steering(self, value): # 转盘舵机 self.windows.mcu.to_device_move( device_name="turntable_steering", value=value, _is_debug=self.is_debug, is_deviation=self.is_deviation, ) def turntable_position_motor(self, value): # 转盘舵机 self.windows.mcu.to_device_move( device_name="turntable_position_motor", value=value, max_speed=1400, up_speed=400, down_speed=100, _is_debug=self.is_debug, is_deviation=self.is_deviation, ) def overturn_steering(self, value): # 翻板舵机中位 self.windows.mcu.to_device_move( device_name="overturn_steering", value=value, _is_debug=self.is_debug, is_deviation=self.is_deviation, ) def to_deal_device(self, device_name, value=1, _type=0, times=1): self.windows.mcu.to_deal_device( device_name, value=value, _type=_type, times=times ) def photograph(self, goods_art_no=None): self.windows.photograph(goods_art_no=goods_art_no) def __move_equipment(self, data): func_class = { "相机电机": self.camera_high_motor, "相机舵机": self.camera_steering, "转盘舵机": self.turntable_steering, "转盘前后电机": self.turntable_position_motor, } for key, value in data.items(): if key in func_class: func_class[key](value=value) time.sleep(0.1) def move_equipment(self, data: dict): Thread(target=self.__move_equipment, args=(data,)).start()