import asyncio import datetime from cv2 import log import serial.tools.list_ports import time, json from .SerialIns import SerialIns from utils.SingletonType import SingletonType from utils.utils_func import dynamic_parameter_issuance_get from .BaseClass import BaseClass from sockets import ConnectionManager from collections import defaultdict import threading import settings from .ProgramItem import ProgramItem from .capture.module_digicam import DigiCam from databases import insert_photo_records from .McuDeviationSet import McuDeviationSet from .OtherSet import OtherSet from .DebugUart import DebugUart from .LineControl import LineControl import copy import logging from mcu.capture.smart_shooter_class import SmartShooter import logging from conifg_info import ConfigManager logger = logging.getLogger(__name__) # mcu命令 class DeviceControl(BaseClass, metaclass=SingletonType): lock = threading.Lock() def __init__( self, websocket_manager: ConnectionManager, smart_shooter: SmartShooter = None ): super().__init__( websocket_manager=websocket_manager, smart_shooter=smart_shooter ) self.config_manager = None self.msg_type = "mcu" self.mcu_deviation_set = McuDeviationSet(self) self.mcu_other_set = OtherSet(self) self.debug_uart = DebugUart(self) self.line_control = LineControl(websocket_manager) self.m_t = 1 # 0未开始 1进行中 2已结束 99异常 self.action_state = 2 self._mcu_move_state = 0 self.state_camera_motor = 3 self.state_camera_steering = 3 self.state_turntable_steering = 3 self.state_overturn_steering = 3 self.seed = 0 self.last_camera_height = 0 # 是否实时获取mcu状态信息 self.is_get_mcu_state = True self.state_move_turntable_steering = 3 self.last_from_mcu_move_respond_data = None self.camera_motor_speed = 0 self.camera_motor_value = 0 self.init_state = False self.port_name = "" self.mcu_exit = False self.t_n = 0 self.serial_ins = None self.connected_ports_dict = {} # 已连接的ports self.p_list = [] self.temp_ports_dict = {} self.is_running = False self.is_runn_action = False self.is_stop_action = False self.connect_state = False self.is_init_while = False self.device_name_dict = { "camera_steering": 0, "camera_high_motor": 1, "turntable_steering": 2, "overturn_steering": 3, "laser_position": 4, "buzzer": 5, "split": 6, "turntable_position_motor": 7, "mp3_player": 8, "mcu": 99, } self.last_move_time = time.time() self.device_name_dict_mapping = { 0:"相机角度", 1:"相机高度", 2:"转盘角度", 3:"翻板角度", 4:"激光灯位置", 5:"蜂鸣器", 6:"split", 7:"转盘位置", 8:"播放音频", 99:"mcu命令", } # 最近的mcu基础信息,用于获取数据状态检查 self.last_mcu_info_data = { "num": 0, "time": time.time(), "data": None, } # 最近的mcu的其他配置 self.last_mcu_other_info_data = { "num": 0, "time": time.time(), "data": {}, } self.command = { "to_device_move": 1, # 设备运动 "to_init_device": 2, # 初始化设备 "to_deal_other_device": 3, # 处理其他设备 "get_all_info": 29, # 获取所有信息 "set_deviation": 40, # 设置偏移量 "get_deviation": 41, # 读取偏移量 "signal_forwarding": 91, # 信号转发处理 "signal_forwarding_return": 92, # 信号转发返回 "get_other_info": 44, # 获取其他信息 "open_rgb_led": 43, ## RGB灯的处理与通讯 "set_other_info": 45, # 设置其他信息 "query_remote_control_battery": 47, # 查询遥控器电量 "set_turntable_mode": 48, # 设置转盘通讯方式 1、串口、2、无线、3 混合 "stop_mcu": 93, # 停止运行mcu } # self.window = window self.last_push_time = defaultdict(float) self.is_running = False self.is_wait_connect = False # 等待链接 self.send_data_queue = [] # 发送队列 # self.lock = Lock() # 是否是刚进行完初始化;首次初始化,需要运动到指定第一个指定位置 self.is_just_init_time = False # self.init() # ===========注册命令函数============ self.deal_code_func_dict = { 29: self.get_from_mcu_base_info, # 获取基本情况 32: self.get_from_mcu_button, # 获取按键信息 42: self.get_from_mcu_deviation_info, # 获取偏移量信息 44: self.get_from_mcu_other_info, # 获取其他配置参数 90: self.get_from_mcu_connect_info, # 获取链接电脑信号 92: self.get_from_mcu_move_respond_data, # 获取MCU响应 100: self.print_mcu_error_data, # 打印下位机的错误内容 255: self.print_mcu_noraml_data, # 打印回执 124: self.read_register_data_by_usb, # 读取某个寄存器数据 125: self.write_register_data_by_usb, # 写入某个寄存器数据 126: self.get_all_registers_list_by_usb, # 获取所有寄存器 150: self.dynamic_parameter_issuance, # 动态参数下发 } def get_device_info(self): if not self.init_state: self.sendSocketMessage(code=1, msg="mcu设备未初始化", device_status=4) return False cmd = [124] cmd.extend([0xff & 106 >> 8, 0xff & 106]) print("get_device_info cmd",cmd) data = self.get_basic_info_mcu_without_async(data=cmd) print("get_device_info====>",data) if not data: return False return_data = self.analysis_data(data[1:]) if return_data: print("get_device_info====>",return_data) else: print("get_device_info====>","数据不存在") # 获取异步数据 def analysis_data(self, _data): _addr = _data[0] << 8 | _data[1] if _addr not in self.config_manager.CONFIG_METADATA_BY_ADDR: return False start = 2 _value = _data[start] << 40 | _data[start + 1] << 32 | _data[start + 2] << 24 | _data[start + 3] << 16 | _data[start + 4] << 8 | _data[start + 5] start = start + 5 _read_only = True if _data[start + 1] == 1 else False # 是否只读 _dir = 1 if _data[start + 2] == 1 else -1 _type = "int" if _data[start + 3] == 1 else "float" _precision = _data[start + 4] if _dir < 0: _value = _value * _dir if _type == "float": if _precision > 0: _round_x = _precision _precision = _precision * -1 _value = _value * 10 ** _precision _value = round(_value, _round_x) else: _value = int(_value) return_data = {"addr": _addr, "key_name": self.config_manager.CONFIG_METADATA_BY_ADDR[_addr], "readonly": _read_only, "value": _value} return return_data # 跳过异步,直接查询某个数据信息 def get_basic_info_mcu_without_async(self, data,fiddler_cmd=0): """ fiddler_cmd :只接收指定的命令内容 """ try: # 清空接收数据 self.serial_ins.clearn_flush() self.serial_ins.write_cmd(data) time.sleep(0.06) r_data = self.get_basic_info_mcu() print("264----------r_data:", r_data) except BaseException as e: print("302---e",e) r_data = [] # # self.async_lock.release() return r_data def dynamic_parameter_issuance(self, receive_data): print("dynamic_parameter_issuance receive_data", receive_data) func_code, status_code, out_par_data_list = dynamic_parameter_issuance_get( receive_data ) r_data = { "func_code": func_code, "status_code": status_code, "out_par_data_list": out_par_data_list, } print(r_data) return r_data # 读取某个寄存器数据 def read_register_data_by_usb(self, receive_data): return receive_data # 写入某个寄存器数据 def write_register_data_by_usb(self, receive_data): return receive_data # 获取所有寄存器 def get_all_registers_list_by_usb(self, receive_data): byte_list = receive_data[1:] _r_data = [] for i in range(0, len(byte_list), 2): if i + 1 < len(byte_list): first_byte = byte_list[i] second_byte = byte_list[i + 1] _addr = first_byte << 8 | second_byte _r_data.append(_addr) return _r_data async def sendCommand(self, command): await asyncio.sleep(0.01) loop = asyncio.get_event_loop() loop.create_task(self.debug_uart.set(command), name="sendCommand2") async def getMcuOtherInfo(self): await asyncio.sleep(0.01) self.get_other_info() async def setMcuOtherInfo(self, data): await asyncio.sleep(0.01) for k, v in data.items(): print("k:{},v:{}".format(k, v)) logger.info("k:{},v:{}".format(k, v)) data[k] = self.setOtherMaxMinValue(k, v) self.set_to_mcu_other_info(data) self.msg_type = "set_other_mcu_info" self.sendSocketMessage( code=0, msg="设置mcu其他配置信息完成", device_status=2, ) self.msg_type = "mcu" def setOtherMaxMinValue(self, item, value): value = int(value) parameter_limits = { "is_auto_send_base_info": (0, 10000), "is_move_retry": (0, 10000), "is_data_response": (0, 10000), "low_speed": (0, 10000), # 假设低速范围是0到1000 "is_test": (0, 10000), "to_init_mode": (0, 10000), "turntable_move_to_init_mode": (0, 10000), # 假设转盘通讯方式范围是0到3 "led_count": (0, 10000), # 假设LED数量范围是0到10 "turntable_steering_angle_ratio": (0, 10000), # 假设角度比例范围是0到100 "is_manual_check": (0, 10000), "camera_steering_angle_ratio": (0, 100000), # 假设角度比例范围是0到100 "is_auto_motor_to_disable": (0, 10000), "diff_dir": (0, 10000), "is_auto_send_pos_info": (0, 10000), "is_dog": (0, 10000), "has_been_set_motor_config": (0, 10000), } # 获取参数的最小值和最大值 min_value, max_value = parameter_limits.get(item, (0, 10000)) value = max(min_value, min(max_value, value)) return value async def initDevice(self, is_force=False): if not self.is_running: self.sendSocketMessage( code=1, msg="mcu设备未连接,请先连接设备", device_status=0 ) return False if self.init_state == True: print("已经初始化过,请勿重复初始化") logger.info("已经初始化过,请勿重复初始化") self.sendSocketMessage(msg="设备初始化完成", device_status=2) return False self.config_manager = ConfigManager() self.serial_ins.clearn_flush() self.to_init_device_origin_point(device_name="mcu", is_force=is_force) print("MCU 开始循环~") logger.info("MCU 开始循环~") while 1: await asyncio.sleep(0.05) if not self.serial_ins or not self.connect_state: break try: self.send_cmd() if not self.get_basic_info_mcu(): pass except BaseException as e: print("121231298908", e) logger.info(f"121231298908{e}") break self.is_running = False self.connect_state = False print("MCU 循环退出~") logger.info("MCU 循环退出~") print("串口未连接,请检查") logger.info("串口未连接,请检查") self.sendSocketMessage(code=1, msg="MCU串口未连接,请检查", device_status=-1) message = {"_type": "show_info", "plugins_mode": "mcu", "data": "MCU 连接失败"} self.sendSocketMessage( code=1, msg="MCU 连接失败", data=message, device_status=-1 ) self.close_connect() async def initControlLine(self, serial_handle): """实例化有线控制设备""" self.line_control.connect_state = True self.line_control.serial_ins = serial_handle await self.line_control.run() # if self.init_state == True: # print("已经初始化过,请勿重复初始化") # self.sendSocketMessage(msg="设备初始化完成", device_status=2) # return False # self.serial_ins.clearn_flush() # self.to_init_device_origin_point(device_name="mcu", is_force=is_force) # print("MCU 开始循环~") # while 1: # await asyncio.sleep(0.01) # if not self.serial_ins or not self.connect_state: # break # try: # # print("mcu send_cmd") # self.send_cmd() # # time.sleep(0.01) # self.get_basic_info_mcu() # # self.close_other_window() # except BaseException as e: # print("121231298908", e) # break # self.is_running = False # self.connect_state = False # print("MCU 循环退出~") # # self.sign_data.emit( # # {"_type": "show_info", "plugins_mode": "mcu", "data": "MCU 连接失败"} # # ) # message = {"_type": "show_info", "plugins_mode": "mcu", "data": "MCU 连接失败"} # self.sendSocketMessage( # code=1, msg="MCU 连接失败", data=message, device_status=-1 # ) # self.close_connect() def stop_mcu(self): buf = [self.command["stop_mcu"]] buf.extend(self.encapsulation_data(data=1, len_data=1)) self.add_send_data_queue(buf) # 设置转盘通讯方式 1、串口、2、无线、3 混合 def to_set_turntable_mode(self, mode=1): buf = [self.command["set_turntable_mode"]] buf.extend(self.encapsulation_data(data=mode, len_data=1)) self.add_send_data_queue(buf) def query_remote_control_battery(self): """查询遥控器电量""" buf = [self.command["query_remote_control_battery"]] buf.extend(self.encapsulation_data(data=1, len_data=1)) buf.extend(self.encapsulation_data(data=1, len_data=1)) buf.extend(self.encapsulation_data(data=1, len_data=1)) self.add_send_data_queue(buf) def encapsulation_data(self, data, len_data, data_magnification=1): # data_magnification 数据放大倍数,或缩小倍数,默认为1 data = int(data * data_magnification) if len_data == 1: return [0xFF & data] elif len_data == 2: return [0xFF & data >> 8, 0xFF & data] elif len_data == 4: return [0xFF & data >> 24, 0xFF & data >> 16, 0xFF & data >> 8, 0xFF & data] def open_rgb_led( self, color_name, led_command=1, brightness=80, enable=True, mode="loop", times=2, interval=0.1, ): color_name_value = { "红色": (156, 6, 3), "黄色": (255, 206, 25), "绿色": (0, 128, 0), "蓝色": (0, 25, 255), "白色": (255, 250, 227), } if color_name in color_name_value: buf = [self.command["open_rgb_led"]] buf.append(1 if enable else 0) buf.append(led_command) buf.extend(color_name_value[color_name]) buf.extend( [ brightness, 1 if mode == "loop" else 2, times, int(interval * 10), ] ) self.add_send_data_queue(buf) async def getDeviationInfo(self): await asyncio.sleep(0.01) try: # 发送获取偏移量 data = [self.command["get_deviation"], 1] self.add_send_data_queue(data) # if self.serial_ins: # self.serial_ins.write_cmd(data) print("发送获取偏移量") logger.info("发送获取偏移量") except Exception as e: print(e) print("getDeviationInfo", "暂未获取到self.command") logger.info("getDeviationInfo暂未获取到self.command") def set_deviation(self, device_name, _type=0, deviation=0): # turntable----0 angle_ratio 1 turntable_steering_deviation # overturn ----0 middle 1 high if device_name == "camera_high_motor": deviation = deviation / 10 # deviation 原单位为mm if device_name == "turntable_position_motor": deviation = deviation / 10 # deviation 原单位为mm if device_name == "camera_steering": pass if device_name == "turntable_steering": pass if device_name == "overturn_steering": pass device_id = self.device_name_dict[device_name] _dir = 1 if deviation >= 0 else 0 deviation = int(abs(deviation * 10)) data = [ self.command["set_deviation"], device_id, _type, _dir, 0xFF & deviation >> 8, 0xFF & deviation, ] self.add_send_data_queue(data) # self.controlDevice(device_name, deviation) async def set_deviation_cmd(self, value, action_name, type): await asyncio.sleep(0.01) name_sets = [ "相机电机", # min 0 max 400,步长1 "相机舵机", # min -40 max 40,步长0.1 "转盘舵机", # min -720 max 720,步长1 "转盘前后电机", # min 0 max 950,步长1 "翻板舵机中位", # min 0 max 180,步长0.5 "翻板舵机高位", # min 0 max 180,步长0.5 "翻板舵机上升速度", # min 1 max 10,步长1 "翻板舵机下降速度", # min 1 max 10,步长1 ] if action_name not in name_sets: self.msg_type = f"{type}_deviation" self.sendSocketMessage(msg="设置参数有误,请检查", device_status=0, code=1) self.msg_type = "mcu" return # 发送获取偏移量 if type == "move": self.mcu_deviation_set.change_value(value, action_name) elif type == "set": self.mcu_deviation_set.set_deviation(action_name) self.msg_type = f"{type}_deviation" self.sendSocketMessage(msg=f"{action_name} 设置成功", device_status=2, code=0) self.msg_type = "mcu" def get_other_info(self): # 发送获取偏移量 data = [self.command["get_other_info"], 1] self.add_send_data_queue(data) print("发送获取其他信息") logger.info("发送获取其他信息") def add_send_data_queue(self, data): self.lock.acquire() if self.serial_ins: print("send_data_queue append :{}".format(data)) logger.info("send_data_queue append %s", data) self.send_data_queue.append(data) self.lock.release() async def send_all_cmd(self): # await asyncio.sleep(0.001) while True: await asyncio.sleep(0.1) if self.send_data_queue: # self.sendSocketMessage(msg="正在发送命令", device_status=1) data = self.send_data_queue.pop(0) # print("\033[1;32;40m 正在发送命令 \033[0m",data) self.serial_ins.write_cmd(data) # self.sendSocketMessage(msg="命令发送完成", device_status=2) else: break def send_cmd(self): self.lock.acquire() if self.send_data_queue: # self.sendSocketMessage(msg="正在发送命令", device_status=1) data = self.send_data_queue.pop(0) self.serial_ins.write_cmd(data) # self.sendSocketMessage(msg="命令发送完成", device_status=2) else: # self.t_n += 1 # 加大发送获取基础数据的时间间隔 # 默认为0.01秒一个循环,每隔1.5秒发送数据 if self.t_n == 150: # self.t_n = 0 self.send_get_all_info_to_mcu() self.lock.release() def send_get_all_info_to_mcu(self): if self.is_get_mcu_state is False: return data = [self.command["get_all_info"], 1] self.serial_ins.write_cmd(data) def print_mcu_error_data(self, receive_data): # 扫码数据 try: data = receive_data[1:].decode() if "设备初始化完成" in data: self.init_state = True logger.info(f"设备初始化完成:%{data}") self.sendSocketMessage(msg=data, device_status=2) else: print("设备异常数据打印:", data) logger.info(f"115 设备异常数据打印:%{data}") except BaseException as e: print("117 error {}".format(e)) logger.info(f"117 error %{e}") return def print_mcu_noraml_data(self, receive_data): # 扫码数据 print("接收到255数据:", receive_data) try: command_mapping = { 1:"设备运动", 2:"初始化设备", # 初始化设备 3:"处理其他设备", # 处理其他设备 29:"获取所有信息", # 获取所有信息 40:"设置偏移量", # 设置偏移量 41:"读取偏移量", # 读取偏移量 91:"信号转发处理", # 信号转发处理 92:"信号转发返回", # 信号转发返回 44:"获取其他信息", # 获取其他信息 43:"RGB灯的处理与通讯", ## RGB灯的处理与通讯 45:"设置其他信息", # 设置其他信息 47:"查询遥控器电量", # 查询遥控器电量 48:"设置转盘通讯方式 1、串口、2、无线、3 混合", # 93:"停止运行mcu", # 停止运行mcu 90:"连接MCU",# 连接MCU } # command = int(receive_data[0]) command = int(receive_data[1]) command_text = command_mapping[command] # receive_data_temp = receive_data[2:] receive_data_temp_text = " ".join([hex(x) for x in receive_data]) # print("255 command_text:", command_text) if command_text in ["设备运动","处理其他设备"]: device_id = int(receive_data[2]) device_value = int(receive_data[3]) device_name_info = self.device_name_dict_mapping[device_id] message_info = {"设备名称":device_name_info,"运动值":device_value} print("【设备运动】消息回执:", message_info) logger.info(f"设备运动消息回执:{message_info}") print("接收设备消息回执:", command_text) logger.info(f"接收设备消息回执:{receive_data_temp_text}") except BaseException as e: print(f"255 error {e}") logger.info(f"255 error {e}") return def get_from_mcu_move_respond_data(self, receive_data): self.last_from_mcu_move_respond_data = receive_data def get_from_mcu_connect_info(self, receive_data): connect_flag = receive_data[1] device_id = receive_data[2] try: mcu_has_been_set = receive_data[6] # 设备是否有初始化 ,1 表示已初始化 except: mcu_has_been_set = 99 # 未知状态 # self.self_sign.emit({"type": "connect_sign", "data": connect_flag}) message = {"type": "connect_sign", "data": connect_flag} self.sendSocketMessage(msg="接收连接信息", data=message) print("接收链接信息") logger.info("接收链接信息") return def to_init_device_origin_point(self, device_name, is_force=False): device_id = self.device_name_dict[device_name] cmd = 2 data = [cmd, device_id, 0 if is_force is False else 1] self.open_rgb_led(color_name="红色") self.add_send_data_queue(data) if device_name == "mcu": # 重置初始化标记为 从未初始化 self.is_just_init_time = False return True # 设置其他信息 def set_to_mcu_other_info(self, data): is_auto_send_base_info = data["is_auto_send_base_info"] is_move_retry = data["is_move_retry"] is_data_response = data["is_data_response"] low_speed = data["low_speed"] is_test = data["is_test"] to_init_mode = data["to_init_mode"] turntable_move_to_init_mode = data["turntable_move_to_init_mode"] led_count = data["led_count"] turntable_steering_angle_ratio = data["turntable_steering_angle_ratio"] is_manual_check = data["is_manual_check"] camera_steering_angle_ratio = data["camera_steering_angle_ratio"] is_auto_motor_to_disable = data["is_auto_motor_to_disable"] diff_dir = data["diff_dir"] is_auto_send_pos_info = data["is_auto_send_pos_info"] is_dog = data["is_dog"] has_been_set_motor_config = data["has_been_set_motor_config"] buf = [self.command["set_other_info"]] buf.extend(self.encapsulation_data(data=is_auto_send_base_info, len_data=1)) buf.extend(self.encapsulation_data(data=is_move_retry, len_data=1)) buf.extend(self.encapsulation_data(data=is_data_response, len_data=1)) buf.extend(self.encapsulation_data(data=low_speed, len_data=2)) buf.extend(self.encapsulation_data(data=is_test, len_data=1)) buf.extend(self.encapsulation_data(data=to_init_mode, len_data=1)) buf.extend( self.encapsulation_data(data=turntable_move_to_init_mode, len_data=1) ) buf.extend(self.encapsulation_data(data=led_count, len_data=2)) buf.extend( self.encapsulation_data(data=turntable_steering_angle_ratio, len_data=2) ) buf.extend(self.encapsulation_data(data=is_manual_check, len_data=1)) buf.extend( self.encapsulation_data(data=camera_steering_angle_ratio, len_data=4) ) buf.extend(self.encapsulation_data(data=is_auto_motor_to_disable, len_data=1)) buf.extend(self.encapsulation_data(data=diff_dir, len_data=1)) buf.extend(self.encapsulation_data(data=is_auto_send_pos_info, len_data=1)) buf.extend(self.encapsulation_data(data=is_dog, len_data=1)) buf.extend(self.encapsulation_data(data=has_been_set_motor_config, len_data=1)) self.add_send_data_queue(buf) async def cleanAllReceiveData(self): while True: await asyncio.sleep(0.001) receive_data = self.serial_ins.read_cmd(out_time=1) if not receive_data: break def get_basic_info_mcu(self): receive_data = self.serial_ins.read_cmd(out_time=1) if receive_data is False: print("------------------------------------------------4657564654") print( "------------------------------------------------get_basic_info_mcu------------------" ) logger.info("------------------------------------------------4657564654") self.connect_state = False return False if not receive_data: return False command = receive_data[0] print("get_basic_info_mcu",command) if command in self.deal_code_func_dict: _data = ' '.join([hex(x) for x in receive_data]) return self.deal_code_func_dict[command](receive_data) return False def get_from_mcu_button(self, receive_data): button_name = receive_data[1] self.deal_mcu_button(button_name) def deal_mcu_button(self, button_name): # 防止重复点击 s = time.time() if s - self.last_push_time[button_name] > 0.1: self.last_push_time[button_name] = s # print("button_name", button_name) else: self.last_push_time[button_name] = s return if button_name == 1: # 自动执行全部 # self.sign_data.emit( # {"_type": "mcu_button", "plugins_mode": "mcu", "data": "todo_1"} # ) message = {"_type": "mcu_button", "plugins_mode": "mcu", "data": "todo_1"} self.sendSocketMessage(code=0, msg="", data=message) if button_name == 2: # self.sign_data.emit( # {"_type": "mcu_button", "plugins_mode": "mcu", "data": "todo_2"} # ) message = {"_type": "mcu_button", "plugins_mode": "mcu", "data": "todo_2"} self.sendSocketMessage(code=0, msg="", data=message) if button_name == 3: # self.sign_data.emit( # {"_type": "mcu_button", "plugins_mode": "mcu", "data": "todo_3"} # ) message = {"_type": "mcu_button", "plugins_mode": "mcu", "data": "todo_3"} self.sendSocketMessage(code=0, msg="", data=message) # 获取偏移量信息 def get_from_mcu_deviation_info(self, receive_data): if len(receive_data) == 18: camera_high_motor_deviation_dir = receive_data[1] camera_high_motor_deviation = receive_data[2] << 8 | receive_data[3] camera_high_motor_deviation = ( camera_high_motor_deviation * -1 if camera_high_motor_deviation_dir == 0 else camera_high_motor_deviation ) camera_steering_deviation_dir = receive_data[4] camera_steering_deviation = (receive_data[5] << 8 | receive_data[6]) * 0.1 camera_steering_deviation = ( camera_steering_deviation * -1 if camera_steering_deviation_dir == 0 else camera_steering_deviation ) turntable_steering_deviation_dir = receive_data[7] turntable_steering_deviation = ( receive_data[8] << 8 | receive_data[9] ) * 0.1 turntable_steering_deviation = ( turntable_steering_deviation * -1 if turntable_steering_deviation_dir == 0 else turntable_steering_deviation ) overturn_steering_middle_dir = receive_data[10] overturn_steering_middle = (receive_data[11] << 8 | receive_data[12]) * 0.1 overturn_steering_middle = ( overturn_steering_middle * -1 if overturn_steering_middle_dir == 0 else overturn_steering_middle ) overturn_steering_high_dir = receive_data[13] overturn_steering_high = (receive_data[14] << 8 | receive_data[15]) * 0.1 overturn_steering_high = ( overturn_steering_middle * -1 if overturn_steering_high_dir == 0 else overturn_steering_high ) overturn_steering_up_speed = receive_data[16] overturn_steering_down_speed = receive_data[17] # self.sign_data.emit( # { # "_type": "get_deviation_data", # "plugins_mode": "mcu", # "data": { # "camera_high_motor_deviation": camera_high_motor_deviation, # "camera_steering_deviation": camera_steering_deviation, # "turntable_steering_deviation": turntable_steering_deviation, # "overturn_steering_middle": overturn_steering_middle, # "overturn_steering_high": overturn_steering_high, # "overturn_steering_up_speed": overturn_steering_up_speed, # "overturn_steering_down_speed": overturn_steering_down_speed, # }, # } # ) # message = { # "_type": "get_deviation_data", # "plugins_mode": "mcu", # "data": { # "camera_high_motor_deviation": camera_high_motor_deviation, # "camera_steering_deviation": camera_steering_deviation, # "turntable_steering_deviation": turntable_steering_deviation, # "overturn_steering_middle": overturn_steering_middle, # "overturn_steering_high": overturn_steering_high, # "overturn_steering_up_speed": overturn_steering_up_speed, # "overturn_steering_down_speed": overturn_steering_down_speed, # }, # } get_deviation_data = { "camera_high_motor_deviation": camera_high_motor_deviation, "camera_steering_deviation": camera_steering_deviation, "turntable_steering_deviation": turntable_steering_deviation, "turntable_front_end_deviation": 300, "overturn_steering_middle": overturn_steering_middle, "overturn_steering_high": overturn_steering_high, "overturn_steering_up_speed": overturn_steering_up_speed, "overturn_steering_down_speed": overturn_steering_down_speed, } self.initDeviationInfo(get_deviation_data) self.msg_type = "get_deviation_data" self.sendSocketMessage(msg="接收偏移量信息", data=get_deviation_data) self.msg_type = "mcu" print("接收偏移量信息") logger.info("接收偏移量信息") return def initDeviationInfo(self, get_deviation_data): # 初始化偏移量信息 camera_high_motor_deviation = get_deviation_data["camera_high_motor_deviation"] camera_steering_deviation = get_deviation_data["camera_steering_deviation"] turntable_steering_deviation = get_deviation_data[ "turntable_steering_deviation" ] overturn_steering_middle = get_deviation_data["overturn_steering_middle"] overturn_steering_high = get_deviation_data["overturn_steering_high"] overturn_steering_up_speed = get_deviation_data["overturn_steering_up_speed"] overturn_steering_down_speed = get_deviation_data[ "overturn_steering_down_speed" ] self.mcu_deviation_set.last_value["相机电机"] = camera_high_motor_deviation self.mcu_deviation_set.last_value["相机舵机"] = camera_steering_deviation self.mcu_deviation_set.last_value["转盘舵机"] = turntable_steering_deviation self.mcu_deviation_set.last_value["翻板舵机中位"] = overturn_steering_middle self.mcu_deviation_set.last_value["翻板舵机高位"] = overturn_steering_high self.mcu_deviation_set.last_value["翻板舵机上升速度"] = ( overturn_steering_up_speed ) self.mcu_deviation_set.last_value["翻板舵机下降速度"] = ( overturn_steering_down_speed ) # 获取其他信息 def get_from_mcu_other_info(self, receive_data): # TODO 已完成设置 is_auto_send_base_info = self.get_data_from_receive_data( receive_data=receive_data, start=1, len_data=1 ) is_move_retry = self.get_data_from_receive_data( receive_data=receive_data, start=2, len_data=1 ) is_data_response = self.get_data_from_receive_data( receive_data=receive_data, start=3, len_data=1 ) low_speed = self.get_data_from_receive_data( receive_data=receive_data, start=4, len_data=2 ) is_test = self.get_data_from_receive_data( receive_data=receive_data, start=6, len_data=1 ) to_init_mode = self.get_data_from_receive_data( receive_data=receive_data, start=7, len_data=1 ) turntable_move_to_init_mode = self.get_data_from_receive_data( receive_data=receive_data, start=8, len_data=1 ) led_count = self.get_data_from_receive_data( receive_data=receive_data, start=9, len_data=2 ) turntable_steering_angle_ratio = self.get_data_from_receive_data( receive_data=receive_data, start=11, len_data=2 ) is_manual_check = self.get_data_from_receive_data( receive_data=receive_data, start=13, len_data=1 ) camera_steering_angle_ratio = self.get_data_from_receive_data( receive_data=receive_data, start=14, len_data=4 ) is_auto_motor_to_disable = self.get_data_from_receive_data( receive_data=receive_data, start=18, len_data=1 ) diff_dir = self.get_data_from_receive_data( receive_data=receive_data, start=19, len_data=1 ) is_auto_send_pos_info = self.get_data_from_receive_data( receive_data=receive_data, start=20, len_data=1 ) is_dog = self.get_data_from_receive_data( receive_data=receive_data, start=21, len_data=1 ) has_been_set_motor_config = self.get_data_from_receive_data( receive_data=receive_data, start=22, len_data=1 ) self.last_mcu_other_info_data["data"] = { "is_auto_send_base_info": is_auto_send_base_info, "is_move_retry": is_move_retry, "is_data_response": is_data_response, "low_speed": low_speed, "is_test": is_test, "to_init_mode": to_init_mode, "turntable_move_to_init_mode": turntable_move_to_init_mode, "led_count": led_count, "turntable_steering_angle_ratio": turntable_steering_angle_ratio, "is_manual_check": is_manual_check, "camera_steering_angle_ratio": camera_steering_angle_ratio, "is_auto_motor_to_disable": is_auto_motor_to_disable, "diff_dir": diff_dir, "is_auto_send_pos_info": is_auto_send_pos_info, "is_dog": is_dog, "has_been_set_motor_config": has_been_set_motor_config, } self.last_mcu_other_info_data["time"] = time.time() self.last_mcu_other_info_data["num"] += 1 for k, v in self.last_mcu_other_info_data["data"].items(): print("k:{},v:{}".format(k, v)) logger.info("k:%s,v:%s", k, v) self.msg_type = "get_mcu_other_info" self.sendSocketMessage( code=0, msg="获取mcu其他配置信息", device_status=2, data=self.last_mcu_other_info_data["data"], ) self.msg_type = "mcu" def get_data_from_receive_data( self, receive_data, start, len_data, data_magnification=1 ): # data_magnification 数据放大倍数,或缩小倍数,默认为1 try: if len_data == 1: data = receive_data[start] return data * data_magnification elif len_data == 2: data = receive_data[start] << 8 | receive_data[start + 1] return data * data_magnification elif len_data == 4: data = ( receive_data[start] << 24 | receive_data[start + 1] << 16 | receive_data[start + 2] << 8 | receive_data[start + 3] ) return data * data_magnification return None except: return None def get_from_mcu_base_info(self, receive_data): # 数据缓存 self.last_mcu_info_data["time"] = time.time() self.last_mcu_info_data["num"] += 1 # print("last_mcu_info_data:{}".format(self.last_mcu_info_data["time"])) self.state_camera_motor = 3 self.state_camera_steering = 3 self.state_turntable_steering = 3 self.state_overturn_steering = 3 self.state_move_turntable_steering = 3 if len(receive_data) == 7: self.m_t = 1 laser_state = receive_data[1] self.state_camera_motor = receive_data[2] self.state_camera_steering = receive_data[3] self.state_turntable_steering = receive_data[4] self.state_overturn_steering = receive_data[5] flag = receive_data[6] message = { "_type": "show_mcu_info", "plugins_mode": "mcu", "data": "激光状态;{laser_state},相机高度状态:{state_camera_motor},相机角度状态:{state_camera_steering},转盘状态:{state_turntable_steering},翻板状态:{state_overturn_steering},flag:{flag}".format( laser_state=laser_state, state_camera_motor=self.state_camera_motor, state_camera_steering=self.state_camera_steering, state_turntable_steering=self.state_turntable_steering, state_overturn_steering=self.state_overturn_steering, flag=flag, ), "data_state": { "state_camera_motor": self.state_camera_motor, "state_camera_steering": self.state_camera_steering, "state_turntable_steering": self.state_turntable_steering, "state_overturn_steering": self.state_overturn_steering, }, } self.msg_type = "get_mcu_info" self.sendSocketMessage(msg="获取mcu设备运行状态信息", data=message) self.msg_type = "mcu" # print("转盘:{},时间:{}".format(self.state_turntable_steering, time.time())) if len(receive_data) == 8: self.m_t = 2 laser_state = receive_data[1] self.state_camera_motor = receive_data[2] self.state_camera_steering = receive_data[3] self.state_turntable_steering = receive_data[4] self.state_overturn_steering = receive_data[5] self.state_move_turntable_steering = receive_data[6] flag = receive_data[7] message = { "_type": "show_mcu_info", "plugins_mode": "mcu", "data": "激光状态;{laser_state},相机高度状态:{state_camera_motor},相机角度状态:{state_camera_steering},转盘状态:{state_turntable_steering},转盘前后移动状态:{state_move_turntable_steering},翻板状态:{state_overturn_steering},flag:{flag}".format( laser_state=laser_state, state_camera_motor=self.state_camera_motor, state_camera_steering=self.state_camera_steering, state_turntable_steering=self.state_turntable_steering, state_overturn_steering=self.state_overturn_steering, state_move_turntable_steering=self.state_move_turntable_steering, flag=flag, ), "data_state": { "state_camera_motor": self.state_camera_motor, "state_camera_steering": self.state_camera_steering, "state_turntable_steering": self.state_turntable_steering, "state_overturn_steering": self.state_overturn_steering, "state_move_turntable_steering": self.state_move_turntable_steering, }, } # if self.state_camera_motor if all( value == 2 for value in [ self.state_camera_motor, self.state_camera_steering, self.state_turntable_steering, self.state_overturn_steering, ] ): self.init_state = True self.sendSocketMessage(msg="设备初始化完成", device_status=2) self.msg_type = "get_mcu_info" self.sendSocketMessage(msg="获取mcu设备运行状态信息", data=message) self.msg_type = "mcu" # 检查是否成功初始化 if self.is_just_init_time is False: if self.mcu_move_state == 2: self.is_just_init_time = True print("is_just_init_time") logger.info("设备初始化完成") message = { "_type": "is_just_init_time", "plugins_mode": "mcu", "data": "", } self.sendSocketMessage(msg="检查设备初始化", data=message) return def scan_serial_port(self) -> dict: # 获取所有可用串口列表 ports_dict = {} ports = serial.tools.list_ports.comports() # 遍历所有端口并打印信息 for port in ports: # logger.info("扫描到的串口信息:{}".format(port.description)) if "CH34" in port.description: ports_dict[port.name] = { "name": port.name, "device": port.device, "description": port.description, "hwid": port.hwid, "manufacturer": port.manufacturer, "product": port.product, "serial_number": port.serial_number, } if len(ports_dict) <= 0: return {} return ports_dict def remove_port(self, port_name): """移除串口""" print("remove", port_name) logger.info("串口被移除") data = { "_type": "remove_port", "plugins_mode": "auto_select_com", "data": {"port_name": port_name}, } self.sendSocketMessage(1, "串口被移除", data) def is_serial_available(self, port): try: ser = serial.Serial(port) ser.close() return True except serial.serialException: return False async def add_port_by_linkage(self, port_name): # port_value :串口基础信息 # todo 根据prot_value 信息自动进行连接 print("add port_name", port_name) logger.info("add port_name %s", port_name) print("self.port_name", self.port_name) logger.info("self.port_name %s", self.port_name) if self.port_name == port_name: self.port_name = "" # 对没有连接的设备进行尝试连接 # 如果存在串口被占用则进行断开 self.is_serial_available(port_name) return False message_data = { "_type": "show_info", "plugins_mode": "auto_select_com", "data": {"text": "开始识别接口:{}".format(port_name)}, } self.sendSocketMessage( msg="开始识别接口:{}".format(port_name), data=message_data, device_status=1 ) await asyncio.sleep(1) """ 步骤: 1、进行临时连接,并发送命令,成功后,自动连接对应设备 """ try: # 尝试使用115200波特率链接 serial_handle = serial.Serial(port=port_name, baudrate=115200, timeout=0.5) except Exception as e: message_data = { "_type": "show_info", "plugins_mode": "auto_select_com", "data": {"text": "串口:{} 被占用,或无法识别".format(port_name)}, } self.sendSocketMessage( 1, msg="串口:{} 被占用,或无法识别".format(port_name).format(port_name), data=message_data, device_status=-1, ) print("串口:{} 被占用".format(port_name)) logger.info("串口:%s 被占用", port_name) if self.serial_ins.serial_handle is not None: self.serial_ins.serial_handle.close() return False await asyncio.sleep(2) print("开始发送命令") logger.info("开始发送命令") data = [90, 1] try: serial_handle.flushInput() # 尝试重置输入缓冲区 except serial.SerialTimeoutException: print("超时错误:无法在规定时间内重置输入缓冲区。") logger.info("超时错误:无法在规定时间内重置输入缓冲区。") self.sendSocketMessage( 1, msg="超时错误:无法在规定时间内重置输入缓冲区。", data=None, ) serial_handle.close() return False print("尝试写入数据") logger.info("尝试写入数据") buf = bytearray(b"") buf.extend([0x55, 0x55, (0xFF & len(data))]) buf.extend(data) buf.extend([0xFF & ~sum(data)]) try: self.receive_data = b"" serial_handle.write(buf) except serial.SerialTimeoutException: print("写入数据错误") logger.info("写入数据错误") serial_handle.close() return False await asyncio.sleep(0.3) receive_data = self.read_cmd(serial_handle) # print("尝试接收命令", receive_data) # logger.info("尝试接收命令,%s", receive_data) device_id = 0 if receive_data: print("receive_data", receive_data) logger.info("receive_data %s", receive_data) if receive_data[0] == 90: connect_flag = receive_data[1] device_id = receive_data[2] else: logger.info("串口数据未监听到,继续等待...%s", port_name) return False print("关闭串口:{}".format(port_name)) logger.info("关闭串口:%s", port_name) serial_handle.close() loop = asyncio.get_event_loop() print("device_id============>>>", device_id) logger.info("device_id============>>>%s", device_id) if device_id > 0: if device_id == settings.MCU_CODE: state, _ = self.to_connect_com(port_name) if not state: return False message_data = { "_type": "show_info", "plugins_mode": "auto_select_com", "data": {"text": "MCU开始连接"}, } self.sendSocketMessage( msg="MCU开始连接", data=message_data, device_status=1 ) self.connected_ports_dict[port_name] = "MCU" message_data = { "_type": "select_port_name", "plugins_mode": "auto_select_com", "data": { "device_name": "mcu" if device_id == 1 else "remote_control", "port_name": port_name, }, } return True elif device_id == 2: # 有线接收器 self.connected_ports_dict[port_name] = "Line_Control" # print("device_id", device_id) # logger.info("device_id %s", device_id) state = await self.line_control.to_connect_com(port_name) if not state: return False self.line_control.connect_state = True return True else: print("串口无法识别") logger.info("串口无法识别") self.sendSocketMessage( code=1, msg="串口无法识别,请重新插拔拍照机USB", data=message_data, device_status=-1, ) return False # 走其他途径处理 def clearMyInstance(self): SingletonType.clear_instance() def to_connect_com(self, port_name): # 关闭串口 print("to_connect_com", port_name) logger.info("to_connect_com %s", port_name) self.close_connect() time.sleep(0.3) self.connect_state = False try: self.serial_ins = SerialIns(port_name=port_name, baud=115200, timeout=0.1) if not self.serial_ins.serial_handle: message_data = { "_type": "show_info", "plugins_mode": "mcu", "data": "MCU 打开串口失败", } self.sendSocketMessage( msg="MCU 打开串口失败", data=message_data, device_status=-1 ) self.serial_ins = None self.connect_state = False return False, None except: message_data = { "_type": "show_info", "plugins_mode": "mcu", "data": "MCU 打开串口失败", } self.sendSocketMessage( msg="MCU 打开串口失败", data=message_data, device_status=-1 ) self.serial_ins = None self.connect_state = False return False, None message_data = { "_type": "show_info", "plugins_mode": "mcu", "data": "MCU 开始连接", } self.sendSocketMessage(msg="MCU 开始连接", data=message_data, device_status=1) # =======================发送连接请求================================= cmd = 90 data = [cmd, 1] print("405 发送 连接请求 -----------------------------------------") logger.info("405 发送 连接请求 -----------------------------------------") print(self.serial_ins) logger.info(self.serial_ins) # self.serial_ins.clearn_flush() self.serial_ins.write_cmd(data) # 延迟接收数据 time.sleep(0.3) receive_data = self.serial_ins.read_cmd(out_time=1) if receive_data: print( "409 receive_data--90:{}".format(self.change_hex_to_int(receive_data)) ) logger.info( "409 receive_data--90:%s", str(self.change_hex_to_int(receive_data)) ) else: return False, None if receive_data: # receive_data[2]=1 表示为MCU设备编号 if receive_data[0] == 90 and receive_data[2] == settings.MCU_CODE: connect_flag = receive_data[1] # 是否有初始化 try: mcu_has_been_set = receive_data[ 6 ] # 设备是否有初始化 ,1 表示已初始化 except: mcu_has_been_set = 99 # 未知状态 print("MCU初始化信息{}".format(mcu_has_been_set)) logger.info("MCU初始化信息%s", str(mcu_has_been_set)) message_data = { "_type": "show_info", "plugins_mode": "mcu", "data": "MCU 已连接", } self.sendSocketMessage( msg="MCU 已连接", data=message_data, device_status=1 ) self.connect_state = True self.is_running = True print("MCU 已连接") logger.info("MCU 已连接") self.port_name = port_name # time.sleep(2) loop = asyncio.get_event_loop() # # 异步处理 loop.create_task(self.initDevice(), name="initDevice") return True, None print("MCU 连接失败") logger.info("MCU 连接失败") # for port in self.p_list: # logger.info( # "MCU 连接失败处理list移除操作,self port name %s,p_list:%s", # port_name, # self.p_list, # ) # if port == port_name: # logger.info("MCU 连接失败处理list移除操作,移除 %s", port_name) # self.p_list.remove(port) # self.remove_port(port) message_data = { "_type": "show_info", "plugins_mode": "mcu", "data": "MCU 连接失败", } self.sendSocketMessage(msg="MCU 连接失败", data=message_data, device_status=-1) self.close_connect() return False, None def close_connect(self): self.port_name = "" if self.serial_ins: self.serial_ins.close_serial_port() self.is_running = False self.connect_state = False self.connected_ports_dict = {} # 已连接的ports # self.p_list = [] # self.temp_ports_dict = {} self.init_state = False print("关闭MCU") logger.info("关闭MCU") def close_lineConnect(self): """'关闭有线遥控器连接""" self.line_control.port_name = "" self.line_control.close_connect() print("关闭有线遥控器") logger.info("关闭有线遥控器") @property def mcu_move_state(self): if self.is_get_mcu_state is False: self._mcu_move_state = 2 # self.action_state = 2 else: if self.m_t == 1: if ( self.state_camera_motor == 2 and self.state_camera_steering == 2 and self.state_turntable_steering == 2 and self.state_overturn_steering == 2 ): self._mcu_move_state = 2 else: self._mcu_move_state = 1 else: if ( self.state_camera_motor == 2 and self.state_camera_steering == 2 and self.state_turntable_steering == 2 and self.state_overturn_steering == 2 and self.state_move_turntable_steering == 2 ): self._mcu_move_state = 2 else: self._mcu_move_state = 1 # self._mcu_move_state = 2 return self._mcu_move_state def to_deal_device(self, device_name, value=1, _type=0, times=1, delay=0): """ value 激光0关 1开 mp3_player value 表示0表示关,1表示开,_type 表示歌曲切换到指定歌曲 delay:延迟处理,单位为0.1秒,即delay=100时,表示延迟10秒 """ device_id = self.device_name_dict[device_name] if device_name == "buzzer": value = int(value) cmd = 3 data = [ cmd, device_id, value, _type, times, delay, ] self.add_send_data_queue(data) # if self.serial_ins: # self.serial_ins.write_cmd(data) return True def to_device_move( self, device_name, value=0, max_speed=None, up_speed=None, down_speed=None, _is_debug=0, is_relative=0, is_deviation=1, times=2, is_response=False ): """ 此处输入单位为 毫米,以及度 需要先缩小,再放大 """ self.seed += 1 if self.seed > 9000: self.seed = 1 print("移动", time.time()) logger.info("移动,%s", time.time()) speed = settings.moveSpeed() cmd = 1 device_id = self.device_name_dict[device_name] # if device_id != 1: # print("F非MCU设备,禁止处理") # return print("正在执行", device_name) logger.info("正在执行 %s", device_name) match device_name: case "camera_high_motor": # value 单位毫米 # max_speed = 10000 if max_speed is None else max_speed # up_speed = 800 if up_speed is None else up_speed # down_speed = 700 if down_speed is None else down_speed max_speed = ( speed[device_name]["max_speed"] if max_speed is None else max_speed ) up_speed = ( speed[device_name]["up_speed"] if up_speed is None else up_speed ) down_speed = ( speed[device_name]["down_speed"] if down_speed is None else down_speed ) value = value / 10 # value 单位毫米 assert 0 <= value <= 40 assert 0 <= max_speed <= 10000 case "camera_steering": # 角度为度 未放大 精确到0.1度 max_speed = 6000 if max_speed is None else max_speed up_speed = 500 if up_speed is None else up_speed down_speed = 500 if down_speed is None else down_speed assert -360 <= value <= 360 case "turntable_steering": # 角度为度 未放大 精确到0.1度 # max_speed = 6000 if max_speed is None else max_speed # up_speed = 500 if up_speed is None else up_speed # down_speed = 400 if down_speed is None else down_speed max_speed = ( speed[device_name]["max_speed"] if max_speed is None else max_speed ) up_speed = ( speed[device_name]["up_speed"] if up_speed is None else up_speed ) down_speed = ( speed[device_name]["down_speed"] if down_speed is None else down_speed ) assert -720 <= value <= 720 case "overturn_steering": # 角度为度 未放大 精确到0.1度 max_speed = 2 if max_speed is None else max_speed up_speed = 1 if up_speed is None else up_speed down_speed = 1 if down_speed is None else down_speed assert 0 <= value <= 360 case "turntable_position_motor": # value 单位毫米 max_speed = 11000 if max_speed is None else max_speed up_speed = 900 if up_speed is None else up_speed down_speed = 900 if down_speed is None else down_speed value = value / 10 # value 单位毫米 assert 0 <= value <= 900 assert 0 <= max_speed <= 15000 _dir = True if value >= 0 else False value = int(abs(value * 10)) # 此处value赋值后,单位为mm以及0.1度 print("准备执行", device_name, value) logger.info("准备执行,%s %s", device_name, value) data = [ cmd, device_id, 1 if _dir else 0, 0xFF & value >> 8, 0xFF & value, 0xFF & max_speed >> 8, 0xFF & max_speed, 0xFF & up_speed >> 8, 0xFF & up_speed, 0xFF & down_speed >> 8, 0xFF & down_speed, _is_debug, is_deviation, is_relative, times, # 重试次数 0xff & self.seed >> 8, # 随机种子 0xff & self.seed, # 随机种子 1 if is_response is True else 0, # 是否返回内容 ] self.add_send_data_queue(data) current_time = time.time() self.last_move_time = current_time def to_get_mcu_base_info(self): if self.connect_state: self.lock.acquire() # print('==========================>1111') print("-------------------to_get_mcu_base_info--------------------------") data = [self.command["get_all_info"], 1] f = True try: self.serial_ins.write_cmd(data) except: f = False pass self.lock.release() if not f: self.connect_state = False return False else: return True def check_before_action(self): if self.state != 2: print("check_before_action 设备正在运行中~") logger.info("check_before_action 设备正在运行中~") self.sendSocketMessage(code=1, msg="设备正在运行中", device_status=1) return False if self.mcu_move_state != 2: if settings.IS_LIN_SHI_TEST: return True # self.show_info("mcu 非停止状态") self.sendSocketMessage(code=1, msg="mcu 非停止状态", device_status=1) return True async def controlDevice(self, device_name, value): """控制设备移动等""" await asyncio.sleep(0.01) if not self.is_running: self.sendSocketMessage( code=1, msg="mcu设备未连接,请先连接设备", device_status=0 ) return False if not self.init_state: self.sendSocketMessage(code=1, msg="mcu设备未初始化", device_status=4) return False _is_debug = 1 if settings.IS_DEBUG == "true" else 0 is_deviation = 0 if settings.IS_DEBUG == "true" else 1 print("控制设备==>_is_debug", _is_debug) logger.info("控制设备==>_is_debug %s", _is_debug) match device_name: case "camera_high_motor": # 相机电机 print(device_name, value) self.to_device_move( device_name=device_name, value=float(value), ) self.to_device_move( device_name=device_name, value=float(value), ) case "camera_steering": print(device_name, value) # 相机舵机 self.to_device_move( device_name=device_name, value=float(value), ) self.to_device_move( device_name=device_name, value=float(value), ) case "turntable_steering": # 转盘舵机 self.to_device_move( device_name=device_name, value=float(value), ) self.to_device_move( device_name=device_name, value=float(value), ) case "turntable_position_motor": # 转盘舵机 self.to_device_move( device_name=device_name, value=float(value), ) self.to_device_move( device_name=device_name, value=float(value), ) case "overturn_steering": # 翻板舵机中位 self.to_deal_device( device_name="overturn_steering", ) case "laser_position": self.to_deal_device( device_name="laser_position", value=0 if int(value) <= 0 else 1 ) case "take_picture": # loop = asyncio.get_event_loop() # if value>0: # # 指定自动对焦 # loop.create_task( # self.smart_shooter.CameraAutofocus(), # name="CameraAutofocus", # ) # 拍照 await self.smart_shooter.CameraShooter(msg_type="run_mcu") case "to_deal_device": self.to_deal_device(device_name, value=value, _type=0, times=1) case _: pass def checkDevice(self): print("检查设备是否运行中") logger.info("检查设备是否运行中") if not self.is_running: self.sendSocketMessage( code=1, msg="mcu设备未连接,请先连接设备", device_status=0 ) return False if self.init_state is not True: self.sendSocketMessage(code=1, msg="mcu设备未初始化", device_status=4) return False if self.action_state != 2: self.sendSocketMessage( code=1, msg="当前有未完成的任务,请稍后再试", device_status=0, ) return False async def run_mcu_config( self, config_list, goods_art_no, action_info, smart_shooter ): if self.checkDevice() == False: return image_counts = 0 if config_list: action_names = [] record_ids = [] if len(config_list) > 1: if config_list[-1]["take_picture"] is True: new_init_config = copy.copy(config_list[0]) new_init_config["action_name"] = "移动到初始位" new_init_config["number_focus"] = 0 new_init_config["take_picture"] = False new_init_config["shoe_upturn"] = False new_init_config["pre_delay"] = 0.0 new_init_config["after_delay"] = 0.0 # new_init_config["point_name"] = "A" new_init_config["is_move_device"] = True new_init_config["led_switch"] = True new_init_config["turntable_angle"] = 0.0 new_init_config["is_wait"] = False new_init_config["is_need_confirm"] = False config_list.append(new_init_config) for idx, item in enumerate(config_list): is_take_picture = item["take_picture"] action_id = item["id"] record_id = -1 if is_take_picture: image_counts += 1 # 批量插入 image_deal_mode = 0 if action_info == "执行左脚程序" else 1 state, record_id = await insert_photo_records( image_deal_mode=image_deal_mode, goods_art_no=goods_art_no, image_index=idx, action_id=action_id, ) config_list[idx]["record_id"] = record_id total_len = len(config_list) self.action_state = 1 self.msg_type = "image_process" self.sendSocketMessage( code=0, msg="MCU 命令已发送完成", device_status=2, data={ "goods_art_no": goods_art_no, "image_counts": image_counts, "action_names": action_names, "current_time": datetime.datetime.now(settings.TIME_ZONE).strftime( "%Y-%m-%d %H:%M:%S" ), }, ) await self.controlDevice("laser_position", 0) # await smart_shooter.EnableCameraPreview( # enable_status=True, msg_type="smart_shooter_enable_preview" # ) self.msg_type = "mcu" self.is_runn_action = True for index, action in enumerate(config_list): # await asyncio.sleep(0.01) if self.is_stop_action == True: print("停止命令接收,立即终止") logger.info("停止命令接收,立即终止") break # action_is_take_picture = action["take_picture"] record_id = action["record_id"] image_index = -1 if record_id == -1: image_index = -1 else: image_index = index program_item = ProgramItem( websocket_manager=self.websocket_manager, action_data=action, mcu=self, goods_art_no=goods_art_no, image_index=image_index, smart_shooter=smart_shooter, record_id=record_id, is_get_mcu_state=True, ) program_item.last_move_time = self.last_move_time if self.action_state != 1: # 异常终止 print("action异常终止") logger.info("action异常终止") break self.msg_type = "photo_take" if not await program_item.run(total_len): self.sendSocketMessage( code=1, msg="{} 执行失败~".format(program_item.action_name), data={"goods_art_no": goods_art_no, "id": action_id}, device_status=0, ) self.to_deal_device(device_name="buzzer", times=3) break else: if program_item.action_name == "移动到初始位": self.msg_type = "mcu" break self.sendSocketMessage( code=0, msg="{} 执行完成~".format(program_item.action_name), data={"goods_art_no": goods_art_no, "id": record_id}, device_status=2, ) self.msg_type = "mcu" if index == total_len - 1: pass if self.is_stop_action == True: self.msg_type = "run_mcu_stop" self.sendSocketMessage( code=0, msg=f"货号:{goods_art_no},执行终止", device_status=2, ) self.is_stop_action = False self.action_state = 2 self.is_runn_action = False self.msg_type = "photo_take_finish" self.sendSocketMessage( code=0, msg=f"货号:{goods_art_no},执行完成", device_status=2, ) self.msg_type = "mcu" await self.controlDevice("laser_position", 1) self.action_state = 2 # await smart_shooter.EnableCameraPreview( # enable_status=False, msg_type="smart_shooter_enable_preview" # ) async def run_mcu_config_single( self, config_info, goods_art_no, msg_type="run_mcu_single_finish", image_index=-1, smart_shooter=None, action_id=-1, ): """独立拍照 仅作测试用""" await asyncio.sleep(0.01) if self.checkDevice() == False: return print("检查完成", config_info) logger.info("检查完成 %s", config_info) if config_info: self.action_state = 1 self.msg_type = "mcu" program_item = ProgramItem( websocket_manager=self.websocket_manager, action_data=config_info, mcu=self, goods_art_no=goods_art_no, image_index=image_index, smart_shooter=smart_shooter, record_id=action_id, is_get_mcu_state=False, ) program_item.last_move_time = self.last_move_time if self.action_state != 1: # 异常终止 print("action异常终止") logger.info("action异常终止") return program_item.smart_shooter = smart_shooter await program_item.run(3) self.msg_type = "mcu" logger.info( "发送 run_mcu_signle消息,{} 执行完成~".format(program_item.action_name) ) self.sendSocketMessage( code=0, msg="{} 执行完成~".format(program_item.action_name), data={"goods_art_no": goods_art_no, "id": action_id}, device_status=2, ) self.action_state = 2 self.msg_type = "run_mcu_single" logger.info("发送 run_mcu_signle消息执行完成") self.sendSocketMessage( code=0, msg=f"执行完成", device_status=2, ) self.msg_type = "mcu" else: self.sendSocketMessage( code=1, msg="未查询到重拍记录得配置信息,请确认", device_status=0 ) self.action_state = 2 async def only_take_photo(self, goods_art_no, image_index, record_id): await asyncio.sleep(0.1) print("only_take_photo=====>", goods_art_no, image_index, record_id) logger.info( f"only_take_photo=====> {goods_art_no} {image_index} {record_id}" ) if goods_art_no == "": print("only_take_photo 数据查询异常") logger.info("only_take_photo 数据查询异常") return # 关闭led self.controlDevice("laser_position", 0) program_item = ProgramItem( websocket_manager=self.websocket_manager, action_data={}, mcu=self, goods_art_no=goods_art_no, image_index=image_index, record_id=record_id, ) program_item.digicam_take_picture() # 打开led self.controlDevice("laser_position", 1) async def checkMcuConnection(device_ctrl: DeviceControl): if device_ctrl.is_running == True: message = { "_type": "select_port_name", "plugins_mode": "auto_select_com", "data": device_ctrl.temp_ports_dict, } device_ctrl.device_status = 2 device_ctrl.sendSocketMessage(code=0, msg="MCU连接成功", data=message) return if device_ctrl.is_init_while == True: logger.info("mcu 初始化循环已创建,请勿重复创建") device_ctrl.sendSocketMessage( code=0, msg="mcu 初始化循环已创建,请勿重复创建", data={} ) return device_ctrl.is_init_while = True """实时检测串口是否连接""" while True: await asyncio.sleep(1) if device_ctrl.mcu_exit: break ports_dict = device_ctrl.scan_serial_port() device_ctrl.temp_ports_dict = ports_dict # print("device_ctrl.p_list", device_ctrl.p_list) # logger.info("device_ctrl.p_list %s", device_ctrl.p_list) if not ports_dict: # 全部清空 移除所有串口 if device_ctrl.p_list: _p = device_ctrl.p_list.pop() device_ctrl.remove_port(_p) continue if ports_dict: for index, _i in enumerate(device_ctrl.p_list): if _i not in ports_dict: _p = device_ctrl.p_list.pop(index) device_ctrl.remove_port(_p) for _port_name, _port_value in ports_dict.items(): if _port_name not in device_ctrl.p_list: try: flag = await device_ctrl.add_port_by_linkage(_port_name) if not flag: continue device_ctrl.p_list.append(_port_name) except BaseException as e: print("串口不存在{} {}".format(_port_name, e)) logger.info("串口不存在{} {}".format(_port_name, e)) device_ctrl.is_init_while = False print("MCU断开连接,已释放") logger.info("MCU断开连接,已释放")