import asyncio import serial.tools.list_ports import time, json from .SerialIns import SerialIns from utils.SingletonType import SingletonType from .BaseClass import BaseClass from sockets import ConnectionManager from collections import defaultdict import threading import settings # mcu命令 class DeviceControl(BaseClass, metaclass=SingletonType): lock = threading.Lock() def __init__(self, websocket_manager: ConnectionManager): super().__init__(websocket_manager) self.msg_type = "mcu" self.m_t = 1 self._mcu_move_state = 0 self.state_camera_motor = 3 self.state_camera_steering = 3 self.state_turntable_steering = 3 self.state_overturn_steering = 3 self.state_move_turntable_steering = 3 self.last_from_mcu_move_respond_data = None self.camera_motor_speed = 0 self.camera_motor_value = 0 self.init_state = False self.port_name = "" self.t_n = 0 self.serial_ins = None self.connected_ports_dict = {} # 已连接的ports self.p_list = [] self.temp_ports_dict = {} self.is_running = False self.connect_state = False self.device_name_dict = { "camera_steering": 0, "camera_high_motor": 1, "turntable_steering": 2, "overturn_steering": 3, "laser_position": 4, "buzzer": 5, "split": 6, "turntable_position_motor": 7, "mp3_player": 8, "mcu": 99, } # 最近的mcu基础信息,用于获取数据状态检查 self.last_mcu_info_data = { "num": 0, "time": time.time(), "data": None, } # 最近的mcu的其他配置 self.last_mcu_other_info_data = { "num": 0, "time": time.time(), "data": {}, } self.command = { "to_device_move": 1, # 设备运动 "to_init_device": 2, # 初始化设备 "to_deal_other_device": 3, # 处理其他设备 "get_all_info": 29, # 获取所有信息 "set_deviation": 40, # 设置偏移量 "get_deviation": 41, # 读取偏移量 "signal_forwarding": 91, # 信号转发处理 "signal_forwarding_return": 92, # 信号转发返回 "get_other_info": 44, # 获取其他信息 "open_rgb_led": 43, ## RGB灯的处理与通讯 "set_other_info": 45, # 设置其他信息 "query_remote_control_battery": 47, # 查询遥控器电量 "set_turntable_mode": 48, # 设置转盘通讯方式 1、串口、2、无线、3 混合 "stop_mcu": 93, # 停止运行mcu } # self.window = window self.last_push_time = defaultdict(float) self.is_running = False self.is_wait_connect = False # 等待链接 self.send_data_queue = [] # 发送队列 # self.lock = Lock() # 是否是刚进行完初始化;首次初始化,需要运动到指定第一个指定位置 self.is_just_init_time = False # self.init() # ===========注册命令函数============ self.deal_code_func_dict = { 29: self.get_from_mcu_base_info, # 获取基本情况 32: self.get_from_mcu_button, # 获取按键信息 42: self.get_from_mcu_deviation_info, # 获取偏移量信息 44: self.get_from_mcu_other_info, # 获取其他配置参数 90: self.get_from_mcu_connect_info, # 获取链接电脑信号 92: self.get_from_mcu_move_respond_data, # 获取MCU响应 100: self.print_mcu_error_data, # 打印下位机的错误内容 } async def initDevice(self): if not self.is_running: self.sendSocketMessage( code=1, msg="mcu设备未连接,请先连接设备", device_status=0 ) return False self.serial_ins.clearn_flush() self.to_init_device_origin_point(device_name="mcu") print("MCU 开始循环~") while 1: await asyncio.sleep(0.01) if not self.serial_ins or not self.connect_state: break try: # print("mcu send_cmd") self.send_cmd() # time.sleep(0.01) self.get_basic_info_mcu() except BaseException as e: print("121231298908", e) break self.is_running = False self.connect_state = False print("MCU 循环退出~") # self.sign_data.emit( # {"_type": "show_info", "plugins_mode": "mcu", "data": "MCU 连接失败"} # ) message = {"_type": "show_info", "plugins_mode": "mcu", "data": "MCU 连接失败"} self.sendSocketMessage(code=1, msg="MCU 连接失败", data=message,device_status=-1) self.close_connect() def stop_mcu(self): buf = [self.command["stop_mcu"]] buf.extend(self.encapsulation_data(data=1, len_data=1)) self.add_send_data_queue(buf) # 设置转盘通讯方式 1、串口、2、无线、3 混合 def to_set_turntable_mode(self, mode=1): buf = [self.command["set_turntable_mode"]] buf.extend(self.encapsulation_data(data=mode, len_data=1)) self.add_send_data_queue(buf) def query_remote_control_battery(self): """查询遥控器电量""" buf = [self.command["query_remote_control_battery"]] buf.extend(self.encapsulation_data(data=1, len_data=1)) buf.extend(self.encapsulation_data(data=1, len_data=1)) buf.extend(self.encapsulation_data(data=1, len_data=1)) self.add_send_data_queue(buf) def encapsulation_data(self, data, len_data, data_magnification=1): # data_magnification 数据放大倍数,或缩小倍数,默认为1 data = int(data * data_magnification) if len_data == 1: return [0xFF & data] elif len_data == 2: return [0xFF & data >> 8, 0xFF & data] elif len_data == 4: return [0xFF & data >> 24, 0xFF & data >> 16, 0xFF & data >> 8, 0xFF & data] def open_rgb_led( self, color_name, led_command=1, brightness=80, enable=True, mode="loop", times=2, interval=0.1, ): color_name_value = { "红色": (156, 6, 3), "黄色": (255, 206, 25), "绿色": (0, 128, 0), "蓝色": (0, 25, 255), "白色": (255, 250, 227), } if color_name in color_name_value: buf = [self.command["open_rgb_led"]] buf.append(1 if enable else 0) buf.append(led_command) buf.extend(color_name_value[color_name]) buf.extend( [ brightness, 1 if mode == "loop" else 2, times, int(interval * 10), ] ) self.add_send_data_queue(buf) def get_deviation(self): # 发送获取偏移量 data = [self.command["get_deviation"], 1] self.add_send_data_queue(data) # if self.serial_ins: # self.serial_ins.write_cmd(data) print("发送获取偏移量") def get_other_info(self): # 发送获取偏移量 data = [self.command["get_other_info"], 1] self.add_send_data_queue(data) print("发送获取其他信息") def add_send_data_queue(self, data): self.lock.acquire() if self.serial_ins: # print("send_data_queue append :{}".format(data)) self.send_data_queue.append(data) self.lock.release() def send_cmd(self): self.lock.acquire() if self.send_data_queue: self.sendSocketMessage(msg="正在发送命令", device_status=1) data = self.send_data_queue.pop(0) self.serial_ins.write_cmd(data) self.sendSocketMessage(msg="命令发送完成", device_status=2) # else: # self.t_n += 1 # # 加大发送获取基础数据的时间间隔 # # 默认为0.01秒一个循环,每隔1.5秒发送数据 # if self.t_n > 150: # self.t_n = 0 # data = [self.command["get_all_info"], 1] # self.serial_ins.write_cmd(data) self.lock.release() def print_mcu_error_data(self, receive_data): # 扫码数据 try: data = receive_data[1:].decode() if "设备初始化完成" in data: self.init_state = True self.sendSocketMessage(msg=data, device_status=2) print("115 print_mcu_error_data:", data) except BaseException as e: print("117 error {}".format(e)) return def get_from_mcu_move_respond_data(self, receive_data): self.last_from_mcu_move_respond_data = receive_data def get_from_mcu_connect_info(self, receive_data): connect_flag = receive_data[1] device_id = receive_data[2] try: mcu_has_been_set = receive_data[6] # 设备是否有初始化 ,1 表示已初始化 except: mcu_has_been_set = 99 # 未知状态 # self.self_sign.emit({"type": "connect_sign", "data": connect_flag}) message = {"type": "connect_sign", "data": connect_flag} self.sendSocketMessage(msg="接收链接信息", data=message) print("接收链接信息") return def to_init_device_origin_point(self, device_name, is_force=False): device_id = self.device_name_dict[device_name] cmd = 2 data = [cmd, device_id, 0 if is_force is False else 1] self.open_rgb_led(color_name="红色") self.add_send_data_queue(data) if device_name == "mcu": # 重置初始化标记为 从未初始化 self.is_just_init_time = False return True def get_basic_info_mcu(self): receive_data = self.serial_ins.read_cmd(out_time=1) if receive_data is False: print("------------------------------------------------4657564654") self.connect_state = False return False if not receive_data: return # print("receive_data") # 数据 结构 command,按命令解析 # command 0(9) 相机高度1-2 相机角度3-4 转盘角度5-6 灯光状态7 激光指示器状态8,运行状态9 command = receive_data[0] if command in self.deal_code_func_dict: self.deal_code_func_dict[command](receive_data) def get_from_mcu_button(self, receive_data): button_name = receive_data[1] self.deal_mcu_button(button_name) def deal_mcu_button(self, button_name): # 防止重复点击 s = time.time() if s - self.last_push_time[button_name] > 0.1: self.last_push_time[button_name] = s # print("button_name", button_name) else: self.last_push_time[button_name] = s return if button_name == 1: # 自动执行全部 # self.sign_data.emit( # {"_type": "mcu_button", "plugins_mode": "mcu", "data": "todo_1"} # ) message = {"_type": "mcu_button", "plugins_mode": "mcu", "data": "todo_1"} self.sendSocketMessage(code=0, msg="", data=message) if button_name == 2: # self.sign_data.emit( # {"_type": "mcu_button", "plugins_mode": "mcu", "data": "todo_2"} # ) message = {"_type": "mcu_button", "plugins_mode": "mcu", "data": "todo_2"} self.sendSocketMessage(code=0, msg="", data=message) if button_name == 3: # self.sign_data.emit( # {"_type": "mcu_button", "plugins_mode": "mcu", "data": "todo_3"} # ) message = {"_type": "mcu_button", "plugins_mode": "mcu", "data": "todo_3"} self.sendSocketMessage(code=0, msg="", data=message) # 获取偏移量信息 def get_from_mcu_deviation_info(self, receive_data): if len(receive_data) == 18: camera_high_motor_deviation_dir = receive_data[1] camera_high_motor_deviation = receive_data[2] << 8 | receive_data[3] camera_high_motor_deviation = ( camera_high_motor_deviation * -1 if camera_high_motor_deviation_dir == 0 else camera_high_motor_deviation ) camera_steering_deviation_dir = receive_data[4] camera_steering_deviation = (receive_data[5] << 8 | receive_data[6]) * 0.1 camera_steering_deviation = ( camera_steering_deviation * -1 if camera_steering_deviation_dir == 0 else camera_steering_deviation ) turntable_steering_deviation_dir = receive_data[7] turntable_steering_deviation = ( receive_data[8] << 8 | receive_data[9] ) * 0.1 turntable_steering_deviation = ( turntable_steering_deviation * -1 if turntable_steering_deviation_dir == 0 else turntable_steering_deviation ) overturn_steering_middle_dir = receive_data[10] overturn_steering_middle = (receive_data[11] << 8 | receive_data[12]) * 0.1 overturn_steering_middle = ( overturn_steering_middle * -1 if overturn_steering_middle_dir == 0 else overturn_steering_middle ) overturn_steering_high_dir = receive_data[13] overturn_steering_high = (receive_data[14] << 8 | receive_data[15]) * 0.1 overturn_steering_high = ( overturn_steering_middle * -1 if overturn_steering_high_dir == 0 else overturn_steering_high ) overturn_steering_up_speed = receive_data[16] overturn_steering_down_speed = receive_data[17] # self.sign_data.emit( # { # "_type": "get_deviation_data", # "plugins_mode": "mcu", # "data": { # "camera_high_motor_deviation": camera_high_motor_deviation, # "camera_steering_deviation": camera_steering_deviation, # "turntable_steering_deviation": turntable_steering_deviation, # "overturn_steering_middle": overturn_steering_middle, # "overturn_steering_high": overturn_steering_high, # "overturn_steering_up_speed": overturn_steering_up_speed, # "overturn_steering_down_speed": overturn_steering_down_speed, # }, # } # ) message = { "_type": "get_deviation_data", "plugins_mode": "mcu", "data": { "camera_high_motor_deviation": camera_high_motor_deviation, "camera_steering_deviation": camera_steering_deviation, "turntable_steering_deviation": turntable_steering_deviation, "overturn_steering_middle": overturn_steering_middle, "overturn_steering_high": overturn_steering_high, "overturn_steering_up_speed": overturn_steering_up_speed, "overturn_steering_down_speed": overturn_steering_down_speed, }, } self.sendSocketMessage(msg="接收偏移量信息", data=message) print("接收偏移量信息") return # 获取其他信息 def get_from_mcu_other_info(self, receive_data): is_auto_send_base_info = self.get_data_from_receive_data( receive_data=receive_data, start=1, len_data=1 ) is_move_retry = self.get_data_from_receive_data( receive_data=receive_data, start=2, len_data=1 ) is_data_response = self.get_data_from_receive_data( receive_data=receive_data, start=3, len_data=1 ) low_speed = self.get_data_from_receive_data( receive_data=receive_data, start=4, len_data=2 ) is_test = self.get_data_from_receive_data( receive_data=receive_data, start=6, len_data=1 ) to_init_mode = self.get_data_from_receive_data( receive_data=receive_data, start=7, len_data=1 ) turntable_move_to_init_mode = self.get_data_from_receive_data( receive_data=receive_data, start=8, len_data=1 ) led_count = self.get_data_from_receive_data( receive_data=receive_data, start=9, len_data=2 ) turntable_steering_angle_ratio = self.get_data_from_receive_data( receive_data=receive_data, start=11, len_data=2 ) is_manual_check = self.get_data_from_receive_data( receive_data=receive_data, start=13, len_data=1 ) camera_steering_angle_ratio = self.get_data_from_receive_data( receive_data=receive_data, start=14, len_data=4 ) is_auto_motor_to_disable = self.get_data_from_receive_data( receive_data=receive_data, start=18, len_data=1 ) diff_dir = self.get_data_from_receive_data( receive_data=receive_data, start=19, len_data=1 ) is_auto_send_pos_info = self.get_data_from_receive_data( receive_data=receive_data, start=20, len_data=1 ) is_dog = self.get_data_from_receive_data( receive_data=receive_data, start=21, len_data=1 ) has_been_set_motor_config = self.get_data_from_receive_data( receive_data=receive_data, start=22, len_data=1 ) self.last_mcu_other_info_data["data"] = { "is_auto_send_base_info": is_auto_send_base_info, "is_move_retry": is_move_retry, "is_data_response": is_data_response, "low_speed": low_speed, "is_test": is_test, "to_init_mode": to_init_mode, "turntable_move_to_init_mode": turntable_move_to_init_mode, "led_count": led_count, "turntable_steering_angle_ratio": turntable_steering_angle_ratio, "is_manual_check": is_manual_check, "camera_steering_angle_ratio": camera_steering_angle_ratio, "is_auto_motor_to_disable": is_auto_motor_to_disable, "diff_dir": diff_dir, "is_auto_send_pos_info": is_auto_send_pos_info, "is_dog": is_dog, "has_been_set_motor_config": has_been_set_motor_config, } self.last_mcu_other_info_data["time"] = time.time() self.last_mcu_other_info_data["num"] += 1 for k, v in self.last_mcu_other_info_data["data"].items(): print("k:{},v:{}".format(k, v)) def get_data_from_receive_data( self, receive_data, start, len_data, data_magnification=1 ): # data_magnification 数据放大倍数,或缩小倍数,默认为1 try: if len_data == 1: data = receive_data[start] return data * data_magnification elif len_data == 2: data = receive_data[start] << 8 | receive_data[start + 1] return data * data_magnification elif len_data == 4: data = ( receive_data[start] << 24 | receive_data[start + 1] << 16 | receive_data[start + 2] << 8 | receive_data[start + 3] ) return data * data_magnification return None except: return None def get_from_mcu_base_info(self, receive_data): # 数据缓存 self.last_mcu_info_data["time"] = time.time() self.last_mcu_info_data["num"] += 1 # print("last_mcu_info_data:{}".format(self.last_mcu_info_data["time"])) self.state_camera_motor = 3 self.state_camera_steering = 3 self.state_turntable_steering = 3 self.state_overturn_steering = 3 self.state_move_turntable_steering = 3 if len(receive_data) == 7: self.m_t = 1 laser_state = receive_data[1] self.state_camera_motor = receive_data[2] self.state_camera_steering = receive_data[3] self.state_turntable_steering = receive_data[4] self.state_overturn_steering = receive_data[5] flag = receive_data[6] message = { "_type": "show_mcu_info", "plugins_mode": "mcu", "data": "激光状态;{laser_state},相机高度状态:{state_camera_motor},相机角度状态:{state_camera_steering},转盘状态:{state_turntable_steering},翻板状态:{state_overturn_steering},flag:{flag}".format( laser_state=laser_state, state_camera_motor=self.state_camera_motor, state_camera_steering=self.state_camera_steering, state_turntable_steering=self.state_turntable_steering, state_overturn_steering=self.state_overturn_steering, flag=flag, ), "data_state": { "state_camera_motor": self.state_camera_motor, "state_camera_steering": self.state_camera_steering, "state_turntable_steering": self.state_turntable_steering, "state_overturn_steering": self.state_overturn_steering, }, } self.sendSocketMessage(msg="获取mcu设备运行状态信息", data=message) # print("转盘:{},时间:{}".format(self.state_turntable_steering, time.time())) if len(receive_data) == 8: self.m_t = 2 laser_state = receive_data[1] self.state_camera_motor = receive_data[2] self.state_camera_steering = receive_data[3] self.state_turntable_steering = receive_data[4] self.state_overturn_steering = receive_data[5] self.state_move_turntable_steering = receive_data[6] flag = receive_data[7] message = { "_type": "show_mcu_info", "plugins_mode": "mcu", "data": "激光状态;{laser_state},相机高度状态:{state_camera_motor},相机角度状态:{state_camera_steering},转盘状态:{state_turntable_steering},转盘前后移动状态:{state_move_turntable_steering},翻板状态:{state_overturn_steering},flag:{flag}".format( laser_state=laser_state, state_camera_motor=self.state_camera_motor, state_camera_steering=self.state_camera_steering, state_turntable_steering=self.state_turntable_steering, state_overturn_steering=self.state_overturn_steering, state_move_turntable_steering=self.state_move_turntable_steering, flag=flag, ), "data_state": { "state_camera_motor": self.state_camera_motor, "state_camera_steering": self.state_camera_steering, "state_turntable_steering": self.state_turntable_steering, "state_overturn_steering": self.state_overturn_steering, "state_move_turntable_steering": self.state_move_turntable_steering, }, } # if self.state_camera_motor if all(value == 2 for value in [self.state_camera_motor, self.state_camera_steering, self.state_turntable_steering, self.state_overturn_steering]): self.init_state = True self.sendSocketMessage(msg="获取mcu设备运行状态信息", data=message) # 检查是否成功初始化 if self.is_just_init_time is False: if self.mcu_move_state == 2: self.is_just_init_time = True print("is_just_init_time") message = { "_type": "is_just_init_time", "plugins_mode": "mcu", "data": "", } self.sendSocketMessage(msg="检查设备初始化", data=message) return # def init(self, sign_data): # if sign_data["type"] == "connect_sign": # if sign_data["data"] == 0: # self.is_running = False # else: # self.is_running = True def scan_serial_port(self) -> dict: # 获取所有可用串口列表 ports_dict = {} ports = serial.tools.list_ports.comports() # 遍历所有端口并打印信息 for port in ports: if "CH340" in port.description: ports_dict[port.name] = { "name": port.name, "device": port.device, "description": port.description, "hwid": port.hwid, "manufacturer": port.manufacturer, "product": port.product, "serial_number": port.serial_number, } if len(ports_dict) <= 0: return {} return ports_dict def remove_port(self, port_name): """移除串口""" print("remove", port_name) data = { "_type": "remove_port", "plugins_mode": "auto_select_com", "data": {"port_name": port_name}, } self.sendSocketMessage(1, "串口被移除", data) def add_port_by_linkage(self, port_name): # port_value :串口基础信息 # todo 根据prot_value 信息自动进行连接 print("add", port_name) # 对没有连接的设备进行尝试连接 message_data = { "_type": "show_info", "plugins_mode": "auto_select_com", "data": {"text": "开始识别接口:{}".format(port_name)}, } self.sendSocketMessage( msg="开始识别接口:{}".format(port_name), data=message_data, device_status=1 ) time.sleep(1) """ 步骤: 1、进行临时连接,并发送命令,成功后,自动连接对应设备 """ try: # 尝试使用115200波特率链接 serial_handle = serial.Serial(port=port_name, baudrate=115200, timeout=0.5) except: message_data = { "_type": "show_info", "plugins_mode": "auto_select_com", "data": {"text": "串口:{} 被占用,或无法识别".format(port_name)}, } self.sendSocketMessage( 1, msg="串口:{} 被占用,或无法识别".format(port_name).format(port_name), data=message_data, device_status=3, ) print("串口:{} 被占用".format(port_name)) return time.sleep(2) print("开始发送命令") data = [90, 1] try: serial_handle.flushInput() # 尝试重置输入缓冲区 except serial.SerialTimeoutException: print("超时错误:无法在规定时间内重置输入缓冲区。") self.sendSocketMessage( 1, msg="超时错误:无法在规定时间内重置输入缓冲区。", data=None, ) serial_handle.close() return print("尝试写入数据") buf = bytearray(b"") buf.extend([0x55, 0x55, (0xFF & len(data))]) buf.extend(data) buf.extend([0xFF & ~sum(data)]) try: self.receive_data = b"" serial_handle.write(buf) except serial.SerialTimeoutException: print("写入数据错误") serial_handle.close() return time.sleep(0.3) print("尝试接收命令") receive_data = self.read_cmd(serial_handle) device_id = 0 if receive_data: print("receive_data", receive_data) if receive_data[0] == 90: connect_flag = receive_data[1] device_id = receive_data[2] print("关闭串口:{}".format(port_name)) serial_handle.close() if device_id > 0: if device_id == 1: self.to_connect_com(port_name) message_data = { "_type": "show_info", "plugins_mode": "auto_select_com", "data": {"text": "MCU开始连接"}, } self.sendSocketMessage( msg="MCU开始连接", data=message_data, device_status=1 ) self.connected_ports_dict[port_name] = "MCU" message_data = { "_type": "select_port_name", "plugins_mode": "auto_select_com", "data": { "device_name": "mcu" if device_id == 1 else "remote_control", "port_name": port_name, }, } self.sendSocketMessage( msg="MCU连接成功", data=message_data, device_status=2 ) else: print("串口无法识别") # 走其他途径处理 # 检查当前MCU链接是否正常 # 正常跳过;记录为其他列表 # 不正常进行尝试连接 # 连接不上,记录为其他列表 def to_connect_com(self, port_name): # 关闭串口 print("to_connect_com", port_name) self.close_connect() time.sleep(0.3) self.connect_state = False try: self.serial_ins = SerialIns(port_name=port_name, baud=115200, timeout=0.1) if not self.serial_ins.serial_handle: message_data = { "_type": "show_info", "plugins_mode": "mcu", "data": "MCU 打开串口失败", } self.sendSocketMessage( msg="MCU 打开串口失败", data=message_data, device_status=-1 ) self.serial_ins = None self.connect_state = False return False except: message_data = { "_type": "show_info", "plugins_mode": "mcu", "data": "MCU 打开串口失败", } self.sendSocketMessage( msg="MCU 打开串口失败", data=message_data, device_status=-1 ) self.serial_ins = None self.connect_state = False return False message_data = { "_type": "show_info", "plugins_mode": "mcu", "data": "MCU 开始连接", } self.sendSocketMessage(msg="MCU 开始连接", data=message_data, device_status=1) # =======================发送连接请求================================= cmd = 90 data = [cmd, 1] print("405 发送 连接请求 -----------------------------------------") print(self.serial_ins) # self.serial_ins.clearn_flush() self.serial_ins.write_cmd(data) # 延迟接收数据 time.sleep(0.3) receive_data = self.serial_ins.read_cmd(out_time=1) if receive_data: print( "409 receive_data--90:{}".format(self.change_hex_to_int(receive_data)) ) if receive_data: # receive_data[2]=1 表示为MCU设备编号 if receive_data[0] == 90 and receive_data[2] == 1: connect_flag = receive_data[1] # 是否有初始化 try: mcu_has_been_set = receive_data[ 6 ] # 设备是否有初始化 ,1 表示已初始化 except: mcu_has_been_set = 99 # 未知状态 print("MCU初始化信息{}".format(mcu_has_been_set)) message_data = { "_type": "show_info", "plugins_mode": "mcu", "data": "MCU 已连接", } self.sendSocketMessage( msg="MCU 已连接", data=message_data, device_status=1 ) self.connect_state = True self.is_running = True print("MCU 已连接") self.port_name = port_name return print("MCU 连接失败") message_data = { "_type": "show_info", "plugins_mode": "mcu", "data": "MCU 连接失败", } self.sendSocketMessage(msg="MCU 连接失败", data=message_data, device_status=-1) self.close_connect() def close_connect(self): self.port_name = "" if self.serial_ins: self.serial_ins.close_serial_port() self.is_running = False self.connect_state = False self.connected_ports_dict = {} # 已连接的ports self.p_list = [] self.temp_ports_dict = {} self.init_state = False print("关闭MCU") @property def mcu_move_state(self): if self.m_t == 1: if ( self.state_camera_motor == 2 and self.state_camera_steering == 2 and self.state_turntable_steering == 2 and self.state_overturn_steering == 2 ): self._mcu_move_state = 2 else: self._mcu_move_state = 1 else: if ( self.state_camera_motor == 2 and self.state_camera_steering == 2 and self.state_turntable_steering == 2 and self.state_overturn_steering == 2 and self.state_move_turntable_steering == 2 ): self._mcu_move_state = 2 else: self._mcu_move_state = 1 # self._mcu_move_state = 2 return self._mcu_move_state def to_deal_device(self, device_name, value=1, _type=0, times=1, delay=0): """ value 激光0关 1开 mp3_player value 表示0表示关,1表示开,_type 表示歌曲切换到指定歌曲 delay:延迟处理,单位为0.1秒,即delay=100时,表示延迟10秒 """ device_id = self.device_name_dict[device_name] if device_name == "buzzer": value = int(value) cmd = 3 data = [ cmd, device_id, value, _type, times, delay, ] self.add_send_data_queue(data) # if self.serial_ins: # self.serial_ins.write_cmd(data) return True def to_device_move( self, device_name, value=0, max_speed=None, up_speed=None, down_speed=None, _is_debug=0, is_relative=0, is_deviation=1, ): """ 此处输入单位为 毫米,以及度 需要先缩小,再放大 """ speed = settings.moveSpeed() cmd = 1 device_id = self.device_name_dict[device_name] match device_name: case "camera_high_motor": # value 单位毫米 # max_speed = 10000 if max_speed is None else max_speed # up_speed = 800 if up_speed is None else up_speed # down_speed = 700 if down_speed is None else down_speed max_speed = ( speed[device_name]["max_speed"] if max_speed is None else max_speed ) up_speed = ( speed[device_name]["up_speed"] if up_speed is None else up_speed ) down_speed = ( speed[device_name]["down_speed"] if down_speed is None else down_speed ) value = value / 10 # value 单位毫米 assert 0 <= value <= 40 assert 0 <= max_speed <= 10000 case "camera_steering": # 角度为度 未放大 精确到0.1度 max_speed = 6000 if max_speed is None else max_speed up_speed = 500 if up_speed is None else up_speed down_speed = 500 if down_speed is None else down_speed assert -360 <= value <= 360 case "turntable_steering": # 角度为度 未放大 精确到0.1度 # max_speed = 6000 if max_speed is None else max_speed # up_speed = 500 if up_speed is None else up_speed # down_speed = 400 if down_speed is None else down_speed max_speed = ( speed[device_name]["max_speed"] if max_speed is None else max_speed ) up_speed = ( speed[device_name]["up_speed"] if up_speed is None else up_speed ) down_speed = ( speed[device_name]["down_speed"] if down_speed is None else down_speed ) assert -720 <= value <= 720 case "overturn_steering": # 角度为度 未放大 精确到0.1度 max_speed = 2 if max_speed is None else max_speed up_speed = 1 if up_speed is None else up_speed down_speed = 1 if down_speed is None else down_speed assert 0 <= value <= 360 case "turntable_position_motor": # value 单位毫米 max_speed = 11000 if max_speed is None else max_speed up_speed = 900 if up_speed is None else up_speed down_speed = 900 if down_speed is None else down_speed value = value / 10 # value 单位毫米 assert 0 <= value <= 900 assert 0 <= max_speed <= 15000 _dir = True if value >= 0 else False value = int(abs(value * 10)) # 此处value赋值后,单位为mm以及0.1度 print("准备执行",device_name, value) data = [ cmd, device_id, 1 if _dir else 0, 0xFF & value >> 8, 0xFF & value, 0xFF & max_speed >> 8, 0xFF & max_speed, 0xFF & up_speed >> 8, 0xFF & up_speed, 0xFF & down_speed >> 8, 0xFF & down_speed, _is_debug, is_deviation, is_relative, ] self.add_send_data_queue(data) def controlDevice(self, device_name, value): '''控制设备移动等''' if not self.is_running: self.sendSocketMessage( code=1, msg="mcu设备未连接,请先连接设备", device_status=0 ) return False if not self.init_state: self.sendSocketMessage( code=1, msg="mcu设备未初始化", device_status=4 ) return False _is_debug = 1 match device_name: case "camera_high_motor": # 相机电机 print(device_name, value) self.to_device_move( device_name=device_name, value=value, max_speed=1400, up_speed=400, down_speed=100, _is_debug=_is_debug, is_deviation=0, ) case "camera_steering": print(device_name, value) # 相机舵机 self.to_device_move( device_name=device_name, value=value, _is_debug=_is_debug, is_deviation=0, ) case "turntable_steering": # 转盘舵机 self.to_device_move( device_name=device_name, value=value, _is_debug=_is_debug, is_deviation=0, ) case "turntable_position_motor": # 转盘舵机 self.to_device_move( device_name=device_name, value=value, max_speed=1400, up_speed=400, down_speed=100, _is_debug=_is_debug, is_deviation=0, ) case "overturn_steering": # 翻板舵机中位 self.to_device_move( device_name="overturn_steering", value=value, _is_debug=_is_debug, is_deviation=0, ) case "to_deal_device": self.to_deal_device(device_name, value=value, _type=0, times=1) case _: pass # case "photograph": # self.photograph(goods_art_no=None) async def checkMcuConnection(device_ctrl: DeviceControl): if device_ctrl.is_running == True: message = { "_type": "select_port_name", "plugins_mode": "auto_select_com", "data": device_ctrl.temp_ports_dict, } device_ctrl.device_status = 2 device_ctrl.sendSocketMessage(code=0, msg="MCU连接成功", data=message) return """实时检测串口是否连接""" while True: await asyncio.sleep(0.5) ports_dict = device_ctrl.scan_serial_port() device_ctrl.temp_ports_dict = ports_dict if not ports_dict: # 全部清空 移除所有串口 if device_ctrl.p_list: _p = device_ctrl.p_list.pop() device_ctrl.remove_port(_p) print("串口未连接,请检查") device_ctrl.sendSocketMessage(code=1, msg="串口未连接,请检查",device_status=-1) continue if ports_dict: for index, _i in enumerate(device_ctrl.p_list): if _i not in ports_dict: _p = device_ctrl.p_list.pop(index) device_ctrl.remove_port(_p) for _port_name, _port_value in ports_dict.items(): if _port_name not in device_ctrl.p_list: try: device_ctrl.p_list.append(_port_name) device_ctrl.add_port_by_linkage(_port_name) except BaseException as e: print("串口不存在{} {}".format(_port_name, e))