| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659 |
- import asyncio
- import datetime
- import serial.tools.list_ports
- import time, json
- from .SerialIns import SerialIns
- from utils.SingletonType import SingletonType
- from .BaseClass import BaseClass
- from sockets import ConnectionManager
- from collections import defaultdict
- import threading
- import settings
- from .ProgramItem import ProgramItem
- from .capture.module_digicam import DigiCam
- from databases import insert_photo_records
- from .McuDeviationSet import McuDeviationSet
- from .OtherSet import OtherSet
- from .DebugUart import DebugUart
- from .LineControl import LineControl
- import copy
- # mcu命令
- class DeviceControl(BaseClass, metaclass=SingletonType):
- lock = threading.Lock()
- def __init__(self, websocket_manager: ConnectionManager):
- super().__init__(websocket_manager)
- self.msg_type = "mcu"
- self.mcu_deviation_set = McuDeviationSet(self)
- self.mcu_other_set = OtherSet(self)
- self.debug_uart = DebugUart(self)
- self.debug_uart = DebugUart(self)
- self.line_control = LineControl(websocket_manager)
- self.m_t = 1
- # 0未开始 1进行中 2已结束 99异常
- self.action_state = 2
- self._mcu_move_state = 0
- self.state_camera_motor = 3
- self.state_camera_steering = 3
- self.state_turntable_steering = 3
- self.state_overturn_steering = 3
- self.state_move_turntable_steering = 3
- self.last_from_mcu_move_respond_data = None
- self.camera_motor_speed = 0
- self.camera_motor_value = 0
- self.init_state = False
- self.port_name = ""
- self.mcu_exit = False
- self.t_n = 0
- self.serial_ins = None
- self.connected_ports_dict = {} # 已连接的ports
- self.p_list = []
- self.temp_ports_dict = {}
- self.is_running = False
- self.is_runn_action = False
- self.is_stop_action = False
- self.connect_state = False
- self.device_name_dict = {
- "camera_steering": 0,
- "camera_high_motor": 1,
- "turntable_steering": 2,
- "overturn_steering": 3,
- "laser_position": 4,
- "buzzer": 5,
- "split": 6,
- "turntable_position_motor": 7,
- "mp3_player": 8,
- "mcu": 99,
- }
- # 最近的mcu基础信息,用于获取数据状态检查
- self.last_mcu_info_data = {
- "num": 0,
- "time": time.time(),
- "data": None,
- }
- # 最近的mcu的其他配置
- self.last_mcu_other_info_data = {
- "num": 0,
- "time": time.time(),
- "data": {},
- }
- self.command = {
- "to_device_move": 1, # 设备运动
- "to_init_device": 2, # 初始化设备
- "to_deal_other_device": 3, # 处理其他设备
- "get_all_info": 29, # 获取所有信息
- "set_deviation": 40, # 设置偏移量
- "get_deviation": 41, # 读取偏移量
- "signal_forwarding": 91, # 信号转发处理
- "signal_forwarding_return": 92, # 信号转发返回
- "get_other_info": 44, # 获取其他信息
- "open_rgb_led": 43, ## RGB灯的处理与通讯
- "set_other_info": 45, # 设置其他信息
- "query_remote_control_battery": 47, # 查询遥控器电量
- "set_turntable_mode": 48, # 设置转盘通讯方式 1、串口、2、无线、3 混合
- "stop_mcu": 93, # 停止运行mcu
- }
- # self.window = window
- self.last_push_time = defaultdict(float)
- self.is_running = False
- self.is_wait_connect = False # 等待链接
- self.send_data_queue = [] # 发送队列
- # self.lock = Lock()
- # 是否是刚进行完初始化;首次初始化,需要运动到指定第一个指定位置
- self.is_just_init_time = False
- # self.init()
- # ===========注册命令函数============
- self.deal_code_func_dict = {
- 29: self.get_from_mcu_base_info, # 获取基本情况
- 32: self.get_from_mcu_button, # 获取按键信息
- 42: self.get_from_mcu_deviation_info, # 获取偏移量信息
- 44: self.get_from_mcu_other_info, # 获取其他配置参数
- 90: self.get_from_mcu_connect_info, # 获取链接电脑信号
- 92: self.get_from_mcu_move_respond_data, # 获取MCU响应
- 100: self.print_mcu_error_data, # 打印下位机的错误内容
- }
- async def sendCommand(self,command):
- await asyncio.sleep(0.01)
- loop = asyncio.get_event_loop()
- loop.create_task(self.debug_uart.set(command), name="sendCommand2")
- async def getMcuOtherInfo(self):
- await asyncio.sleep(0.01)
- self.get_other_info()
- async def setMcuOtherInfo(self,data):
- await asyncio.sleep(0.01)
- for k, v in data.items():
- print("k:{},v:{}".format(k, v))
- data[k] = self.setOtherMaxMinValue(k, v)
- self.set_to_mcu_other_info(data)
- self.msg_type = "set_other_mcu_info"
- self.sendSocketMessage(
- code=0,
- msg="设置mcu其他配置信息完成",
- device_status=2,
- )
- self.msg_type = "mcu"
- def setOtherMaxMinValue(self,item, value):
- value = int(value)
- parameter_limits = {
- "is_auto_send_base_info": (0, 10000),
- "is_move_retry": (0, 10000),
- "is_data_response": (0, 10000),
- "low_speed": (0, 10000), # 假设低速范围是0到1000
- "is_test": (0, 10000),
- "to_init_mode": (0, 10000),
- "turntable_move_to_init_mode": (0, 10000), # 假设转盘通讯方式范围是0到3
- "led_count": (0, 10000), # 假设LED数量范围是0到10
- "turntable_steering_angle_ratio": (0, 10000), # 假设角度比例范围是0到100
- "is_manual_check": (0, 10000),
- "camera_steering_angle_ratio": (0, 100000), # 假设角度比例范围是0到100
- "is_auto_motor_to_disable": (0, 10000),
- "diff_dir": (0, 10000),
- "is_auto_send_pos_info": (0, 10000),
- "is_dog": (0, 10000),
- "has_been_set_motor_config": (0, 10000),
- }
- # 获取参数的最小值和最大值
- min_value, max_value = parameter_limits.get(item, (0, 10000))
- value = max(min_value, min(max_value, value))
- return value
- async def initDevice(self,is_force=False):
- if not self.is_running:
- self.sendSocketMessage(
- code=1, msg="mcu设备未连接,请先连接设备", device_status=0
- )
- return False
- if self.init_state ==True:
- print("已经初始化过,请勿重复初始化")
- self.sendSocketMessage(msg="设备初始化完成", device_status=2)
- return False
- self.serial_ins.clearn_flush()
- self.to_init_device_origin_point(device_name="mcu",is_force=is_force)
- print("MCU 开始循环~")
- while 1:
- await asyncio.sleep(0.01)
- if not self.serial_ins or not self.connect_state:
- break
- try:
- # print("mcu send_cmd")
- self.send_cmd()
- # time.sleep(0.01)
- self.get_basic_info_mcu()
- # self.close_other_window()
- except BaseException as e:
- print("121231298908", e)
- break
- self.is_running = False
- self.connect_state = False
- print("MCU 循环退出~")
- # self.sign_data.emit(
- # {"_type": "show_info", "plugins_mode": "mcu", "data": "MCU 连接失败"}
- # )
- message = {"_type": "show_info", "plugins_mode": "mcu", "data": "MCU 连接失败"}
- self.sendSocketMessage(code=1, msg="MCU 连接失败", data=message,device_status=-1)
- self.close_connect()
- async def initControlLine(self,serial_handle):
- '''实例化有线控制设备'''
- self.line_control.connect_state = True
- self.line_control.serial_ins = serial_handle
- await self.line_control.run()
- # if self.init_state == True:
- # print("已经初始化过,请勿重复初始化")
- # self.sendSocketMessage(msg="设备初始化完成", device_status=2)
- # return False
- # self.serial_ins.clearn_flush()
- # self.to_init_device_origin_point(device_name="mcu", is_force=is_force)
- # print("MCU 开始循环~")
- # while 1:
- # await asyncio.sleep(0.01)
- # if not self.serial_ins or not self.connect_state:
- # break
- # try:
- # # print("mcu send_cmd")
- # self.send_cmd()
- # # time.sleep(0.01)
- # self.get_basic_info_mcu()
- # # self.close_other_window()
- # except BaseException as e:
- # print("121231298908", e)
- # break
- # self.is_running = False
- # self.connect_state = False
- # print("MCU 循环退出~")
- # # self.sign_data.emit(
- # # {"_type": "show_info", "plugins_mode": "mcu", "data": "MCU 连接失败"}
- # # )
- # message = {"_type": "show_info", "plugins_mode": "mcu", "data": "MCU 连接失败"}
- # self.sendSocketMessage(
- # code=1, msg="MCU 连接失败", data=message, device_status=-1
- # )
- # self.close_connect()
- def stop_mcu(self):
- buf = [self.command["stop_mcu"]]
- buf.extend(self.encapsulation_data(data=1, len_data=1))
- self.add_send_data_queue(buf)
- # 设置转盘通讯方式 1、串口、2、无线、3 混合
- def to_set_turntable_mode(self, mode=1):
- buf = [self.command["set_turntable_mode"]]
- buf.extend(self.encapsulation_data(data=mode, len_data=1))
- self.add_send_data_queue(buf)
- def query_remote_control_battery(self):
- """查询遥控器电量"""
- buf = [self.command["query_remote_control_battery"]]
- buf.extend(self.encapsulation_data(data=1, len_data=1))
- buf.extend(self.encapsulation_data(data=1, len_data=1))
- buf.extend(self.encapsulation_data(data=1, len_data=1))
- self.add_send_data_queue(buf)
- def encapsulation_data(self, data, len_data, data_magnification=1):
- # data_magnification 数据放大倍数,或缩小倍数,默认为1
- data = int(data * data_magnification)
- if len_data == 1:
- return [0xFF & data]
- elif len_data == 2:
- return [0xFF & data >> 8, 0xFF & data]
- elif len_data == 4:
- return [0xFF & data >> 24, 0xFF & data >> 16, 0xFF & data >> 8, 0xFF & data]
- def open_rgb_led(
- self,
- color_name,
- led_command=1,
- brightness=80,
- enable=True,
- mode="loop",
- times=2,
- interval=0.1,
- ):
- color_name_value = {
- "红色": (156, 6, 3),
- "黄色": (255, 206, 25),
- "绿色": (0, 128, 0),
- "蓝色": (0, 25, 255),
- "白色": (255, 250, 227),
- }
- if color_name in color_name_value:
- buf = [self.command["open_rgb_led"]]
- buf.append(1 if enable else 0)
- buf.append(led_command)
- buf.extend(color_name_value[color_name])
- buf.extend(
- [
- brightness,
- 1 if mode == "loop" else 2,
- times,
- int(interval * 10),
- ]
- )
- self.add_send_data_queue(buf)
- async def getDeviationInfo(self):
- await asyncio.sleep(0.01)
- try:
- # 发送获取偏移量
- data = [self.command["get_deviation"], 1]
- self.add_send_data_queue(data)
- # if self.serial_ins:
- # self.serial_ins.write_cmd(data)
- print("发送获取偏移量")
- except Exception as e:
- print(e)
- print("getDeviationInfo", "暂未获取到self.command")
- def set_deviation(self, device_name, _type=0, deviation=0):
- # turntable----0 angle_ratio 1 turntable_steering_deviation
- # overturn ----0 middle 1 high
- if device_name == "camera_high_motor":
- deviation = deviation / 10 # deviation 原单位为mm
- if device_name == "turntable_position_motor":
- deviation = deviation / 10 # deviation 原单位为mm
- if device_name == "camera_steering":
- pass
- if device_name == "turntable_steering":
- pass
- if device_name == "overturn_steering":
- pass
- device_id = self.device_name_dict[device_name]
- _dir = 1 if deviation >= 0 else 0
- deviation = int(abs(deviation * 10))
- data = [
- self.command["set_deviation"],
- device_id,
- _type,
- _dir,
- 0xFF & deviation >> 8,
- 0xFF & deviation,
- ]
- self.add_send_data_queue(data)
- # self.controlDevice(device_name, deviation)
- async def set_deviation_cmd(self,value,action_name,type):
- await asyncio.sleep(0.01)
- name_sets = [
- "相机电机", # min 0 max 400,步长1
- "相机舵机", # min -40 max 40,步长0.1
- "转盘舵机", # min -720 max 720,步长1
- "转盘前后电机", # min 0 max 950,步长1
- "翻板舵机中位", # min 0 max 180,步长0.5
- "翻板舵机高位", # min 0 max 180,步长0.5
- "翻板舵机上升速度", # min 1 max 10,步长1
- "翻板舵机下降速度", # min 1 max 10,步长1
- ]
- if action_name not in name_sets:
- self.msg_type = f"{type}_deviation"
- self.sendSocketMessage(msg="设置参数有误,请检查", device_status=0,code=1)
- self.msg_type = "mcu"
- return
- # 发送获取偏移量
- if type == "move":
- self.mcu_deviation_set.change_value(value, action_name)
- elif type == "set":
- self.mcu_deviation_set.set_deviation(action_name)
- self.msg_type = f"{type}_deviation"
- self.sendSocketMessage(msg=f"{action_name} 设置成功", device_status=2,code=0)
- self.msg_type = "mcu"
- def get_other_info(self):
- # 发送获取偏移量
- data = [self.command["get_other_info"], 1]
- self.add_send_data_queue(data)
- print("发送获取其他信息")
- def add_send_data_queue(self, data):
- self.lock.acquire()
- if self.serial_ins:
- print("send_data_queue append :{}".format(data))
- self.send_data_queue.append(data)
- self.lock.release()
- def send_all_cmd(self):
- while True:
- if self.send_data_queue:
- self.sendSocketMessage(msg="正在发送命令", device_status=1)
- data = self.send_data_queue.pop(0)
- self.serial_ins.write_cmd(data)
- self.sendSocketMessage(msg="命令发送完成", device_status=2)
- else:
- break
- def send_cmd(self):
- self.lock.acquire()
- if self.send_data_queue:
- self.sendSocketMessage(msg="正在发送命令", device_status=1)
- data = self.send_data_queue.pop(0)
- self.serial_ins.write_cmd(data)
- self.sendSocketMessage(msg="命令发送完成", device_status=2)
- # else:
- # self.t_n += 1
- # # 加大发送获取基础数据的时间间隔
- # # 默认为0.01秒一个循环,每隔1.5秒发送数据
- # if self.t_n > 150:
- # self.t_n = 0
- # data = [self.command["get_all_info"], 1]
- # self.serial_ins.write_cmd(data)
- self.lock.release()
- def print_mcu_error_data(self, receive_data):
- # 扫码数据
- try:
- data = receive_data[1:].decode()
- if "设备初始化完成" in data:
- self.init_state = True
- self.sendSocketMessage(msg=data, device_status=2)
- print("115 print_mcu_error_data:", data)
- except BaseException as e:
- print("117 error {}".format(e))
- return
- def get_from_mcu_move_respond_data(self, receive_data):
- self.last_from_mcu_move_respond_data = receive_data
- def get_from_mcu_connect_info(self, receive_data):
- connect_flag = receive_data[1]
- device_id = receive_data[2]
- try:
- mcu_has_been_set = receive_data[6] # 设备是否有初始化 ,1 表示已初始化
- except:
- mcu_has_been_set = 99 # 未知状态
- # self.self_sign.emit({"type": "connect_sign", "data": connect_flag})
- message = {"type": "connect_sign", "data": connect_flag}
- self.sendSocketMessage(msg="接收链接信息", data=message)
- print("接收链接信息")
- return
- def to_init_device_origin_point(self, device_name, is_force=False):
- device_id = self.device_name_dict[device_name]
- cmd = 2
- data = [cmd, device_id, 0 if is_force is False else 1]
- self.open_rgb_led(color_name="红色")
- self.add_send_data_queue(data)
- if device_name == "mcu":
- # 重置初始化标记为 从未初始化
- self.is_just_init_time = False
- return True
- # 设置其他信息
- def set_to_mcu_other_info(self, data):
- is_auto_send_base_info = data["is_auto_send_base_info"]
- is_move_retry = data["is_move_retry"]
- is_data_response = data["is_data_response"]
- low_speed = data["low_speed"]
- is_test = data["is_test"]
- to_init_mode = data["to_init_mode"]
- turntable_move_to_init_mode = data["turntable_move_to_init_mode"]
- led_count = data["led_count"]
- turntable_steering_angle_ratio = data["turntable_steering_angle_ratio"]
- is_manual_check = data["is_manual_check"]
- camera_steering_angle_ratio = data["camera_steering_angle_ratio"]
- is_auto_motor_to_disable = data["is_auto_motor_to_disable"]
- diff_dir = data["diff_dir"]
- is_auto_send_pos_info = data["is_auto_send_pos_info"]
- is_dog = data["is_dog"]
- has_been_set_motor_config = data["has_been_set_motor_config"]
- buf = [self.command["set_other_info"]]
- buf.extend(self.encapsulation_data(data=is_auto_send_base_info, len_data=1))
- buf.extend(self.encapsulation_data(data=is_move_retry, len_data=1))
- buf.extend(self.encapsulation_data(data=is_data_response, len_data=1))
- buf.extend(self.encapsulation_data(data=low_speed, len_data=2))
- buf.extend(self.encapsulation_data(data=is_test, len_data=1))
- buf.extend(self.encapsulation_data(data=to_init_mode, len_data=1))
- buf.extend(
- self.encapsulation_data(data=turntable_move_to_init_mode, len_data=1)
- )
- buf.extend(self.encapsulation_data(data=led_count, len_data=2))
- buf.extend(
- self.encapsulation_data(data=turntable_steering_angle_ratio, len_data=2)
- )
- buf.extend(self.encapsulation_data(data=is_manual_check, len_data=1))
- buf.extend(
- self.encapsulation_data(data=camera_steering_angle_ratio, len_data=4)
- )
- buf.extend(self.encapsulation_data(data=is_auto_motor_to_disable, len_data=1))
- buf.extend(self.encapsulation_data(data=diff_dir, len_data=1))
- buf.extend(self.encapsulation_data(data=is_auto_send_pos_info, len_data=1))
- buf.extend(self.encapsulation_data(data=is_dog, len_data=1))
- buf.extend(self.encapsulation_data(data=has_been_set_motor_config, len_data=1))
- self.add_send_data_queue(buf)
- def cleanAllReceiveData(self):
- while True:
- receive_data = self.serial_ins.read_cmd(out_time=1)
- if not receive_data:
- break
- def get_basic_info_mcu(self):
- receive_data = self.serial_ins.read_cmd(out_time=1)
- if receive_data is False:
- print("------------------------------------------------4657564654")
- self.connect_state = False
- return False
- if not receive_data:
- return
- # print("receive_data", receive_data)
- # 数据 结构 command,按命令解析
- # command 0(9) 相机高度1-2 相机角度3-4 转盘角度5-6 灯光状态7 激光指示器状态8,运行状态9
- command = receive_data[0]
- # receive_data_temp = receive_data[2:]
- # receive_data_temp_text = " ".join([hex(x) for x in receive_data_temp])
- # print("command",command)
- # print("receive_data", receive_data_temp_text)
- if command in self.deal_code_func_dict:
- self.deal_code_func_dict[command](receive_data)
- def get_from_mcu_button(self, receive_data):
- button_name = receive_data[1]
- self.deal_mcu_button(button_name)
- def deal_mcu_button(self, button_name):
- # 防止重复点击
- s = time.time()
- if s - self.last_push_time[button_name] > 0.1:
- self.last_push_time[button_name] = s
- # print("button_name", button_name)
- else:
- self.last_push_time[button_name] = s
- return
- if button_name == 1:
- # 自动执行全部
- # self.sign_data.emit(
- # {"_type": "mcu_button", "plugins_mode": "mcu", "data": "todo_1"}
- # )
- message = {"_type": "mcu_button", "plugins_mode": "mcu", "data": "todo_1"}
- self.sendSocketMessage(code=0, msg="", data=message)
- if button_name == 2:
- # self.sign_data.emit(
- # {"_type": "mcu_button", "plugins_mode": "mcu", "data": "todo_2"}
- # )
- message = {"_type": "mcu_button", "plugins_mode": "mcu", "data": "todo_2"}
- self.sendSocketMessage(code=0, msg="", data=message)
- if button_name == 3:
- # self.sign_data.emit(
- # {"_type": "mcu_button", "plugins_mode": "mcu", "data": "todo_3"}
- # )
- message = {"_type": "mcu_button", "plugins_mode": "mcu", "data": "todo_3"}
- self.sendSocketMessage(code=0, msg="", data=message)
- # 获取偏移量信息
- def get_from_mcu_deviation_info(self, receive_data):
- if len(receive_data) == 18:
- camera_high_motor_deviation_dir = receive_data[1]
- camera_high_motor_deviation = receive_data[2] << 8 | receive_data[3]
- camera_high_motor_deviation = (
- camera_high_motor_deviation * -1
- if camera_high_motor_deviation_dir == 0
- else camera_high_motor_deviation
- )
- camera_steering_deviation_dir = receive_data[4]
- camera_steering_deviation = (receive_data[5] << 8 | receive_data[6]) * 0.1
- camera_steering_deviation = (
- camera_steering_deviation * -1
- if camera_steering_deviation_dir == 0
- else camera_steering_deviation
- )
- turntable_steering_deviation_dir = receive_data[7]
- turntable_steering_deviation = (
- receive_data[8] << 8 | receive_data[9]
- ) * 0.1
- turntable_steering_deviation = (
- turntable_steering_deviation * -1
- if turntable_steering_deviation_dir == 0
- else turntable_steering_deviation
- )
- overturn_steering_middle_dir = receive_data[10]
- overturn_steering_middle = (receive_data[11] << 8 | receive_data[12]) * 0.1
- overturn_steering_middle = (
- overturn_steering_middle * -1
- if overturn_steering_middle_dir == 0
- else overturn_steering_middle
- )
- overturn_steering_high_dir = receive_data[13]
- overturn_steering_high = (receive_data[14] << 8 | receive_data[15]) * 0.1
- overturn_steering_high = (
- overturn_steering_middle * -1
- if overturn_steering_high_dir == 0
- else overturn_steering_high
- )
- overturn_steering_up_speed = receive_data[16]
- overturn_steering_down_speed = receive_data[17]
- # self.sign_data.emit(
- # {
- # "_type": "get_deviation_data",
- # "plugins_mode": "mcu",
- # "data": {
- # "camera_high_motor_deviation": camera_high_motor_deviation,
- # "camera_steering_deviation": camera_steering_deviation,
- # "turntable_steering_deviation": turntable_steering_deviation,
- # "overturn_steering_middle": overturn_steering_middle,
- # "overturn_steering_high": overturn_steering_high,
- # "overturn_steering_up_speed": overturn_steering_up_speed,
- # "overturn_steering_down_speed": overturn_steering_down_speed,
- # },
- # }
- # )
- # message = {
- # "_type": "get_deviation_data",
- # "plugins_mode": "mcu",
- # "data": {
- # "camera_high_motor_deviation": camera_high_motor_deviation,
- # "camera_steering_deviation": camera_steering_deviation,
- # "turntable_steering_deviation": turntable_steering_deviation,
- # "overturn_steering_middle": overturn_steering_middle,
- # "overturn_steering_high": overturn_steering_high,
- # "overturn_steering_up_speed": overturn_steering_up_speed,
- # "overturn_steering_down_speed": overturn_steering_down_speed,
- # },
- # }
- get_deviation_data = {
- "camera_high_motor_deviation": camera_high_motor_deviation,
- "camera_steering_deviation": camera_steering_deviation,
- "turntable_steering_deviation": turntable_steering_deviation,
- "turntable_front_end_deviation": 300,
- "overturn_steering_middle": overturn_steering_middle,
- "overturn_steering_high": overturn_steering_high,
- "overturn_steering_up_speed": overturn_steering_up_speed,
- "overturn_steering_down_speed": overturn_steering_down_speed,
- }
- self.initDeviationInfo(get_deviation_data)
- self.msg_type = "get_deviation_data"
- self.sendSocketMessage(msg="接收偏移量信息", data=get_deviation_data)
- self.msg_type = "mcu"
- print("接收偏移量信息")
- return
- def initDeviationInfo(self, get_deviation_data):
- # 初始化偏移量信息
- camera_high_motor_deviation = get_deviation_data["camera_high_motor_deviation"]
- camera_steering_deviation = get_deviation_data["camera_steering_deviation"]
- turntable_steering_deviation = get_deviation_data["turntable_steering_deviation"]
- overturn_steering_middle=get_deviation_data["overturn_steering_middle"]
- overturn_steering_high = get_deviation_data["overturn_steering_high"]
- overturn_steering_up_speed = get_deviation_data["overturn_steering_up_speed"]
- overturn_steering_down_speed = get_deviation_data["overturn_steering_down_speed"]
- self.mcu_deviation_set.last_value["相机电机"] = camera_high_motor_deviation
- self.mcu_deviation_set.last_value["相机舵机"] = camera_steering_deviation
- self.mcu_deviation_set.last_value["转盘舵机"] = turntable_steering_deviation
- self.mcu_deviation_set.last_value["翻板舵机中位"] = overturn_steering_middle
- self.mcu_deviation_set.last_value["翻板舵机高位"] = overturn_steering_high
- self.mcu_deviation_set.last_value["翻板舵机上升速度"] = (
- overturn_steering_up_speed
- )
- self.mcu_deviation_set.last_value["翻板舵机下降速度"] = (
- overturn_steering_down_speed
- )
- # 获取其他信息
- def get_from_mcu_other_info(self, receive_data):
- # TODO 已完成设置
- is_auto_send_base_info = self.get_data_from_receive_data(
- receive_data=receive_data, start=1, len_data=1
- )
- is_move_retry = self.get_data_from_receive_data(
- receive_data=receive_data, start=2, len_data=1
- )
- is_data_response = self.get_data_from_receive_data(
- receive_data=receive_data, start=3, len_data=1
- )
- low_speed = self.get_data_from_receive_data(
- receive_data=receive_data, start=4, len_data=2
- )
- is_test = self.get_data_from_receive_data(
- receive_data=receive_data, start=6, len_data=1
- )
- to_init_mode = self.get_data_from_receive_data(
- receive_data=receive_data, start=7, len_data=1
- )
- turntable_move_to_init_mode = self.get_data_from_receive_data(
- receive_data=receive_data, start=8, len_data=1
- )
- led_count = self.get_data_from_receive_data(
- receive_data=receive_data, start=9, len_data=2
- )
- turntable_steering_angle_ratio = self.get_data_from_receive_data(
- receive_data=receive_data, start=11, len_data=2
- )
- is_manual_check = self.get_data_from_receive_data(
- receive_data=receive_data, start=13, len_data=1
- )
- camera_steering_angle_ratio = self.get_data_from_receive_data(
- receive_data=receive_data, start=14, len_data=4
- )
- is_auto_motor_to_disable = self.get_data_from_receive_data(
- receive_data=receive_data, start=18, len_data=1
- )
- diff_dir = self.get_data_from_receive_data(
- receive_data=receive_data, start=19, len_data=1
- )
- is_auto_send_pos_info = self.get_data_from_receive_data(
- receive_data=receive_data, start=20, len_data=1
- )
- is_dog = self.get_data_from_receive_data(
- receive_data=receive_data, start=21, len_data=1
- )
- has_been_set_motor_config = self.get_data_from_receive_data(
- receive_data=receive_data, start=22, len_data=1
- )
- self.last_mcu_other_info_data["data"] = {
- "is_auto_send_base_info": is_auto_send_base_info,
- "is_move_retry": is_move_retry,
- "is_data_response": is_data_response,
- "low_speed": low_speed,
- "is_test": is_test,
- "to_init_mode": to_init_mode,
- "turntable_move_to_init_mode": turntable_move_to_init_mode,
- "led_count": led_count,
- "turntable_steering_angle_ratio": turntable_steering_angle_ratio,
- "is_manual_check": is_manual_check,
- "camera_steering_angle_ratio": camera_steering_angle_ratio,
- "is_auto_motor_to_disable": is_auto_motor_to_disable,
- "diff_dir": diff_dir,
- "is_auto_send_pos_info": is_auto_send_pos_info,
- "is_dog": is_dog,
- "has_been_set_motor_config": has_been_set_motor_config,
- }
- self.last_mcu_other_info_data["time"] = time.time()
- self.last_mcu_other_info_data["num"] += 1
- for k, v in self.last_mcu_other_info_data["data"].items():
- print("k:{},v:{}".format(k, v))
- self.msg_type = "get_mcu_other_info"
- self.sendSocketMessage(
- code=0,
- msg="获取mcu其他配置信息",
- device_status=2,
- data=self.last_mcu_other_info_data["data"],
- )
- self.msg_type = "mcu"
- def get_data_from_receive_data(
- self, receive_data, start, len_data, data_magnification=1
- ):
- # data_magnification 数据放大倍数,或缩小倍数,默认为1
- try:
- if len_data == 1:
- data = receive_data[start]
- return data * data_magnification
- elif len_data == 2:
- data = receive_data[start] << 8 | receive_data[start + 1]
- return data * data_magnification
- elif len_data == 4:
- data = (
- receive_data[start] << 24
- | receive_data[start + 1] << 16
- | receive_data[start + 2] << 8
- | receive_data[start + 3]
- )
- return data * data_magnification
- return None
- except:
- return None
- def get_from_mcu_base_info(self, receive_data):
- # 数据缓存
- self.last_mcu_info_data["time"] = time.time()
- self.last_mcu_info_data["num"] += 1
- # print("last_mcu_info_data:{}".format(self.last_mcu_info_data["time"]))
- self.state_camera_motor = 3
- self.state_camera_steering = 3
- self.state_turntable_steering = 3
- self.state_overturn_steering = 3
- self.state_move_turntable_steering = 3
- if len(receive_data) == 7:
- self.m_t = 1
- laser_state = receive_data[1]
- self.state_camera_motor = receive_data[2]
- self.state_camera_steering = receive_data[3]
- self.state_turntable_steering = receive_data[4]
- self.state_overturn_steering = receive_data[5]
- flag = receive_data[6]
- message = {
- "_type": "show_mcu_info",
- "plugins_mode": "mcu",
- "data": "激光状态;{laser_state},相机高度状态:{state_camera_motor},相机角度状态:{state_camera_steering},转盘状态:{state_turntable_steering},翻板状态:{state_overturn_steering},flag:{flag}".format(
- laser_state=laser_state,
- state_camera_motor=self.state_camera_motor,
- state_camera_steering=self.state_camera_steering,
- state_turntable_steering=self.state_turntable_steering,
- state_overturn_steering=self.state_overturn_steering,
- flag=flag,
- ),
- "data_state": {
- "state_camera_motor": self.state_camera_motor,
- "state_camera_steering": self.state_camera_steering,
- "state_turntable_steering": self.state_turntable_steering,
- "state_overturn_steering": self.state_overturn_steering,
- },
- }
- self.sendSocketMessage(msg="获取mcu设备运行状态信息", data=message)
- # print("转盘:{},时间:{}".format(self.state_turntable_steering, time.time()))
- if len(receive_data) == 8:
- self.m_t = 2
- laser_state = receive_data[1]
- self.state_camera_motor = receive_data[2]
- self.state_camera_steering = receive_data[3]
- self.state_turntable_steering = receive_data[4]
- self.state_overturn_steering = receive_data[5]
- self.state_move_turntable_steering = receive_data[6]
- flag = receive_data[7]
- message = {
- "_type": "show_mcu_info",
- "plugins_mode": "mcu",
- "data": "激光状态;{laser_state},相机高度状态:{state_camera_motor},相机角度状态:{state_camera_steering},转盘状态:{state_turntable_steering},转盘前后移动状态:{state_move_turntable_steering},翻板状态:{state_overturn_steering},flag:{flag}".format(
- laser_state=laser_state,
- state_camera_motor=self.state_camera_motor,
- state_camera_steering=self.state_camera_steering,
- state_turntable_steering=self.state_turntable_steering,
- state_overturn_steering=self.state_overturn_steering,
- state_move_turntable_steering=self.state_move_turntable_steering,
- flag=flag,
- ),
- "data_state": {
- "state_camera_motor": self.state_camera_motor,
- "state_camera_steering": self.state_camera_steering,
- "state_turntable_steering": self.state_turntable_steering,
- "state_overturn_steering": self.state_overturn_steering,
- "state_move_turntable_steering": self.state_move_turntable_steering,
- },
- }
- # if self.state_camera_motor
- if all(value == 2 for value in [self.state_camera_motor, self.state_camera_steering, self.state_turntable_steering, self.state_overturn_steering]):
- self.init_state = True
- self.sendSocketMessage(msg="设备初始化完成", device_status=2)
- self.sendSocketMessage(msg="获取mcu设备运行状态信息", data=message)
- # 检查是否成功初始化
- if self.is_just_init_time is False:
- if self.mcu_move_state == 2:
- self.is_just_init_time = True
- print("is_just_init_time")
- message = {
- "_type": "is_just_init_time",
- "plugins_mode": "mcu",
- "data": "",
- }
- self.sendSocketMessage(msg="检查设备初始化", data=message)
- return
- # def init(self, sign_data):
- # if sign_data["type"] == "connect_sign":
- # if sign_data["data"] == 0:
- # self.is_running = False
- # else:
- # self.is_running = True
- def scan_serial_port(self) -> dict:
- # 获取所有可用串口列表
- ports_dict = {}
- ports = serial.tools.list_ports.comports()
- # 遍历所有端口并打印信息
- for port in ports:
- if "CH340" in port.description:
- ports_dict[port.name] = {
- "name": port.name,
- "device": port.device,
- "description": port.description,
- "hwid": port.hwid,
- "manufacturer": port.manufacturer,
- "product": port.product,
- "serial_number": port.serial_number,
- }
- if len(ports_dict) <= 0:
- return {}
- return ports_dict
- def remove_port(self, port_name):
- """移除串口"""
- print("remove", port_name)
- data = {
- "_type": "remove_port",
- "plugins_mode": "auto_select_com",
- "data": {"port_name": port_name},
- }
- self.sendSocketMessage(1, "串口被移除", data)
- async def add_port_by_linkage(self, port_name):
- # port_value :串口基础信息
- # todo 根据prot_value 信息自动进行连接
- print("add", port_name)
- # 对没有连接的设备进行尝试连接
- message_data = {
- "_type": "show_info",
- "plugins_mode": "auto_select_com",
- "data": {"text": "开始识别接口:{}".format(port_name)},
- }
- self.sendSocketMessage(
- msg="开始识别接口:{}".format(port_name), data=message_data, device_status=1
- )
- await asyncio.sleep(1)
- """
- 步骤:
- 1、进行临时连接,并发送命令,成功后,自动连接对应设备
- """
- try:
- # 尝试使用115200波特率链接
- serial_handle = serial.Serial(port=port_name, baudrate=115200, timeout=0.5)
- except:
- message_data = {
- "_type": "show_info",
- "plugins_mode": "auto_select_com",
- "data": {"text": "串口:{} 被占用,或无法识别".format(port_name)},
- }
- self.sendSocketMessage(
- 1,
- msg="串口:{} 被占用,或无法识别".format(port_name).format(port_name),
- data=message_data,
- device_status=-1,
- )
- print("串口:{} 被占用".format(port_name))
- return
- await asyncio.sleep(2)
- print("开始发送命令")
- data = [90, 1]
- try:
- serial_handle.flushInput() # 尝试重置输入缓冲区
- except serial.SerialTimeoutException:
- print("超时错误:无法在规定时间内重置输入缓冲区。")
- self.sendSocketMessage(
- 1,
- msg="超时错误:无法在规定时间内重置输入缓冲区。",
- data=None,
- )
- serial_handle.close()
- return
- print("尝试写入数据")
- buf = bytearray(b"")
- buf.extend([0x55, 0x55, (0xFF & len(data))])
- buf.extend(data)
- buf.extend([0xFF & ~sum(data)])
- try:
- self.receive_data = b""
- serial_handle.write(buf)
- except serial.SerialTimeoutException:
- print("写入数据错误")
- serial_handle.close()
- return
- await asyncio.sleep(0.3)
- print("尝试接收命令")
- receive_data = self.read_cmd(serial_handle)
- device_id = 0
- if receive_data:
- print("receive_data", receive_data)
- if receive_data[0] == 90:
- connect_flag = receive_data[1]
- device_id = receive_data[2]
- print("关闭串口:{}".format(port_name))
- serial_handle.close()
- loop = asyncio.get_event_loop()
- if device_id > 0:
- if device_id == 1:
- await self.to_connect_com(port_name)
- message_data = {
- "_type": "show_info",
- "plugins_mode": "auto_select_com",
- "data": {"text": "MCU开始连接"},
- }
- self.sendSocketMessage(
- msg="MCU开始连接", data=message_data, device_status=1
- )
- self.connected_ports_dict[port_name] = "MCU"
- message_data = {
- "_type": "select_port_name",
- "plugins_mode": "auto_select_com",
- "data": {
- "device_name": "mcu" if device_id == 1 else "remote_control",
- "port_name": port_name,
- },
- }
- self.sendSocketMessage(
- msg="MCU连接成功", data=message_data, device_status=2
- )
- await asyncio.sleep(2)
- # await self.initDevice()
- loop.create_task(
- self.initDevice(port_name),
- name="initDevice",
- )
- elif device_id == 2:
- # 有线接收器
- print("device_id", device_id)
- loop.create_task(
- self.line_control.to_connect_com(port_name),
- name="line_control_to_connect_com",
- )
- # message_data = {
- # "_type": "show_info",
- # "plugins_mode": "auto_select_com",
- # "data": {"text": "有线控制器开始连接"},
- # }
- # self.sendSocketMessage(
- # msg="有线控制器开始连接", data=message_data, device_status=1
- # )
- # self.connected_ports_dict[port_name] = "MCU"
- # message_data = {
- # "_type": "select_port_name",
- # "plugins_mode": "auto_select_com",
- # "data": {
- # "device_name": "mcu" if device_id == 1 else "remote_control",
- # "port_name": port_name,
- # },
- # }
- # self.sendSocketMessage(
- # msg="有线控制器开始连接", data=message_data, device_status=2
- # )
- # await asyncio.sleep(2)
- # await self.initControlLine(serial_handle)
- # async def getBaseInfo():
- # while True:
- # await asyncio.sleep(1)
- # # 异步循环获取设备信息
- # self.to_get_mcu_base_info()
- # asyncio.gather(getBaseInfo())
- else:
- print("串口无法识别")
- self.sendSocketMessage(
- code=1,
- msg="串口无法识别,请重新插拔拍照机USB", data=message_data, device_status=-1
- )
- # 走其他途径处理
- # 检查当前MCU链接是否正常
- # 正常跳过;记录为其他列表
- # 不正常进行尝试连接
- # 连接不上,记录为其他列表
- def clearMyInstance(self):
- SingletonType.clear_instance()
- async def to_connect_com(self, port_name):
- # 关闭串口
- print("to_connect_com", port_name)
- self.close_connect()
- await asyncio.sleep(0.3)
- self.connect_state = False
- try:
- self.serial_ins = SerialIns(port_name=port_name, baud=115200, timeout=0.1)
- if not self.serial_ins.serial_handle:
- message_data = {
- "_type": "show_info",
- "plugins_mode": "mcu",
- "data": "MCU 打开串口失败",
- }
- self.sendSocketMessage(
- msg="MCU 打开串口失败", data=message_data, device_status=-1
- )
- self.serial_ins = None
- self.connect_state = False
- return False,None
- except:
- message_data = {
- "_type": "show_info",
- "plugins_mode": "mcu",
- "data": "MCU 打开串口失败",
- }
- self.sendSocketMessage(
- msg="MCU 打开串口失败", data=message_data, device_status=-1
- )
- self.serial_ins = None
- self.connect_state = False
- return False, None
- message_data = {
- "_type": "show_info",
- "plugins_mode": "mcu",
- "data": "MCU 开始连接",
- }
- self.sendSocketMessage(msg="MCU 开始连接", data=message_data, device_status=1)
- # =======================发送连接请求=================================
- cmd = 90
- data = [cmd, 1]
- print("405 发送 连接请求 -----------------------------------------")
- print(self.serial_ins)
- # self.serial_ins.clearn_flush()
- self.serial_ins.write_cmd(data)
- # 延迟接收数据
- await asyncio.sleep(0.3)
- receive_data = self.serial_ins.read_cmd(out_time=1)
- if receive_data:
- print(
- "409 receive_data--90:{}".format(self.change_hex_to_int(receive_data))
- )
- if receive_data:
- # receive_data[2]=1 表示为MCU设备编号
- if receive_data[0] == 90 and receive_data[2] == 1:
- connect_flag = receive_data[1]
- # 是否有初始化
- try:
- mcu_has_been_set = receive_data[
- 6
- ] # 设备是否有初始化 ,1 表示已初始化
- except:
- mcu_has_been_set = 99 # 未知状态
- print("MCU初始化信息{}".format(mcu_has_been_set))
- message_data = {
- "_type": "show_info",
- "plugins_mode": "mcu",
- "data": "MCU 已连接",
- }
- self.sendSocketMessage(
- msg="MCU 已连接", data=message_data, device_status=1
- )
- self.connect_state = True
- self.is_running = True
- print("MCU 已连接")
- self.port_name = port_name
- return
- print("MCU 连接失败")
- message_data = {
- "_type": "show_info",
- "plugins_mode": "mcu",
- "data": "MCU 连接失败",
- }
- self.sendSocketMessage(msg="MCU 连接失败", data=message_data, device_status=-1)
- self.close_connect()
- def close_connect(self):
- self.port_name = ""
- if self.serial_ins:
- self.serial_ins.close_serial_port()
- self.is_running = False
- self.connect_state = False
- self.connected_ports_dict = {} # 已连接的ports
- self.p_list = []
- self.temp_ports_dict = {}
- self.init_state = False
- print("关闭MCU")
- @property
- def mcu_move_state(self):
- if self.m_t == 1:
- if (
- self.state_camera_motor == 2
- and self.state_camera_steering == 2
- and self.state_turntable_steering == 2
- and self.state_overturn_steering == 2
- ):
- self._mcu_move_state = 2
- else:
- self._mcu_move_state = 1
- else:
- if (
- self.state_camera_motor == 2
- and self.state_camera_steering == 2
- and self.state_turntable_steering == 2
- and self.state_overturn_steering == 2
- and self.state_move_turntable_steering == 2
- ):
- self._mcu_move_state = 2
- else:
- self._mcu_move_state = 1
- # self._mcu_move_state = 2
- return self._mcu_move_state
- def to_deal_device(self, device_name, value=1, _type=0, times=1, delay=0):
- """
- value 激光0关 1开
- mp3_player value 表示0表示关,1表示开,_type 表示歌曲切换到指定歌曲
- delay:延迟处理,单位为0.1秒,即delay=100时,表示延迟10秒
- """
- device_id = self.device_name_dict[device_name]
- if device_name == "buzzer":
- value = int(value)
- cmd = 3
- data = [
- cmd,
- device_id,
- value,
- _type,
- times,
- delay,
- ]
- self.add_send_data_queue(data)
- # if self.serial_ins:
- # self.serial_ins.write_cmd(data)
- return True
- def to_device_move(
- self,
- device_name,
- value=0,
- max_speed=None,
- up_speed=None,
- down_speed=None,
- _is_debug=0,
- is_relative=0,
- is_deviation=1,
- ):
- """
- 此处输入单位为 毫米,以及度 需要先缩小,再放大
- """
- print("移动",time.time())
- speed = settings.moveSpeed()
- cmd = 1
- device_id = self.device_name_dict[device_name]
- if device_id != 1:
- print("F非MCU设备,禁止处理")
- return
- print("正在执行",device_name)
- match device_name:
- case "camera_high_motor":
- # value 单位毫米
- # max_speed = 10000 if max_speed is None else max_speed
- # up_speed = 800 if up_speed is None else up_speed
- # down_speed = 700 if down_speed is None else down_speed
- max_speed = (
- speed[device_name]["max_speed"] if max_speed is None else max_speed
- )
- up_speed = (
- speed[device_name]["up_speed"] if up_speed is None else up_speed
- )
- down_speed = (
- speed[device_name]["down_speed"]
- if down_speed is None
- else down_speed
- )
- value = value / 10 # value 单位毫米
- assert 0 <= value <= 40
- assert 0 <= max_speed <= 10000
- case "camera_steering":
- # 角度为度 未放大 精确到0.1度
- max_speed = 6000 if max_speed is None else max_speed
- up_speed = 500 if up_speed is None else up_speed
- down_speed = 500 if down_speed is None else down_speed
- assert -360 <= value <= 360
- case "turntable_steering":
- # 角度为度 未放大 精确到0.1度
- # max_speed = 6000 if max_speed is None else max_speed
- # up_speed = 500 if up_speed is None else up_speed
- # down_speed = 400 if down_speed is None else down_speed
- max_speed = (
- speed[device_name]["max_speed"] if max_speed is None else max_speed
- )
- up_speed = (
- speed[device_name]["up_speed"] if up_speed is None else up_speed
- )
- down_speed = (
- speed[device_name]["down_speed"]
- if down_speed is None
- else down_speed
- )
- assert -720 <= value <= 720
- case "overturn_steering":
- # 角度为度 未放大 精确到0.1度
- max_speed = 2 if max_speed is None else max_speed
- up_speed = 1 if up_speed is None else up_speed
- down_speed = 1 if down_speed is None else down_speed
- assert 0 <= value <= 360
- case "turntable_position_motor":
- # value 单位毫米
- max_speed = 11000 if max_speed is None else max_speed
- up_speed = 900 if up_speed is None else up_speed
- down_speed = 900 if down_speed is None else down_speed
- value = value / 10 # value 单位毫米
- assert 0 <= value <= 900
- assert 0 <= max_speed <= 15000
- _dir = True if value >= 0 else False
- value = int(abs(value * 10)) # 此处value赋值后,单位为mm以及0.1度
- print("准备执行",device_name, value)
- data = [
- cmd,
- device_id,
- 1 if _dir else 0,
- 0xFF & value >> 8,
- 0xFF & value,
- 0xFF & max_speed >> 8,
- 0xFF & max_speed,
- 0xFF & up_speed >> 8,
- 0xFF & up_speed,
- 0xFF & down_speed >> 8,
- 0xFF & down_speed,
- _is_debug,
- is_deviation,
- is_relative,
- ]
- self.add_send_data_queue(data)
- def to_get_mcu_base_info(self):
- if self.connect_state:
- self.lock.acquire()
- # print('==========================>1111')
- # print("-------------------to_get_mcu_base_info--------------------------")
- data = [self.command["get_all_info"], 1]
- f = True
- try:
- self.serial_ins.write_cmd(data)
- except:
- f = False
- pass
- self.lock.release()
- if not f:
- self.connect_state = False
- return False
- else:
- return True
- def check_before_action(self):
- if self.state != 2:
- print("check_before_action 设备正在运行中~")
- self.sendSocketMessage(
- code=1, msg="设备正在运行中", device_status=1
- )
- return False
- if self.mcu_move_state != 2:
- if settings.IS_LIN_SHI_TEST:
- return True
- # self.show_info("mcu 非停止状态")
- self.sendSocketMessage(code=1, msg="mcu 非停止状态", device_status=1)
- return True
- def controlDevice(self, device_name, value):
- '''控制设备移动等'''
- if not self.is_running:
- self.sendSocketMessage(
- code=1, msg="mcu设备未连接,请先连接设备", device_status=0
- )
- return False
- if not self.init_state:
- self.sendSocketMessage(
- code=1, msg="mcu设备未初始化", device_status=4
- )
- return False
- _is_debug = 1 if settings.IS_DEBUG == "true" else 0
- is_deviation = 0 if settings.IS_DEBUG == "true" else 1
- print("控制设备==>_is_debug", _is_debug)
- match device_name:
- case "camera_high_motor":
- # 相机电机
- print(device_name, value)
- self.to_device_move(
- device_name=device_name,
- value=float(value),
- # max_speed=1400,
- # up_speed=400,
- # down_speed=100,
- # _is_debug=_is_debug,
- # is_deviation=is_deviation,
- )
- case "camera_steering":
- print(device_name, value)
- # 相机舵机
- self.to_device_move(
- device_name=device_name,
- value=float(value),
- # _is_debug=_is_debug,
- # is_deviation=0,
- )
- case "turntable_steering":
- # 转盘舵机
- self.to_device_move(
- device_name=device_name,
- value=float(value),
- # _is_debug=_is_debug,
- # is_deviation=0,
- )
- case "turntable_position_motor":
- # 转盘舵机
- self.to_device_move(
- device_name=device_name,
- value=float(value),
- # max_speed=1400,
- # up_speed=400,
- # down_speed=100,
- # _is_debug=_is_debug,
- # is_deviation=is_deviation,
- )
- case "overturn_steering":
- # 翻板舵机中位
- self.to_deal_device(
- device_name="overturn_steering",
- )
- case "laser_position":
- self.to_deal_device(
- device_name="laser_position", value=0 if int(value) <= 0 else 1
- )
- case "take_picture":
- capture_one = DigiCam()
- try:
- # camera_is_connect = capture_one.checkCameraConnect()
- # if camera_is_connect is not True:
- # self.sendSocketMessage(1,"相机未连接,请检查",device_status=-1)
- # return
- capture_one.getCaptureFolderPath()
- if value > 0:
- capture_one.auto_focus()
- capture_one.run_capture_action("Capture")
- except:
- self.sendSocketMessage(1,"digicam未初始化,请检查",device_status=-1)
- case "to_deal_device":
- self.to_deal_device(device_name, value=value, _type=0, times=1)
- case _:
- pass
- # case "photograph":
- # self.photograph(goods_art_no=None)
- def checkDevice(self):
- if not self.is_running:
- self.sendSocketMessage(
- code=1, msg="mcu设备未连接,请先连接设备", device_status=0
- )
- return False
- if self.init_state is not True:
- self.sendSocketMessage(code=1, msg="mcu设备未初始化", device_status=4)
- return False
- if self.action_state != 2:
- self.sendSocketMessage(
- code=1,
- msg="当前有未完成的任务,请稍后再试",
- device_status=0,
- )
- return False
- async def run_mcu_config(self, config_list, goods_art_no, action_info):
- if self.checkDevice() == False:
- return
- image_counts = 0
- if config_list:
- action_names = []
- if len(config_list) > 1:
- if config_list[-1]["take_picture"] is True:
- new_init_config = copy.copy(config_list[0])
- new_init_config["action_name"] = "移动到初始位"
- new_init_config["number_focus"] = 0
- new_init_config["take_picture"] = False
- new_init_config["shoe_upturn"] = False
- new_init_config["pre_delay"] = 0.0
- new_init_config["after_delay"] = 0.0
- new_init_config["led_switch"] = True
- new_init_config["turntable_angle"] = 0.0
- new_init_config["is_wait"] = False
- new_init_config["is_need_confirm"] = False
- config_list.append(new_init_config)
- for idx, item in enumerate(config_list):
- is_take_picture = item["take_picture"]
- action_id = item["id"]
- record_id = -1
- if is_take_picture:
- action_names.append(item["action_name"])
- image_counts += 1
- # 批量插入
- image_deal_mode = 0 if action_info == "执行左脚程序" else 1
- state,record_id = insert_photo_records(
- image_deal_mode=image_deal_mode,
- goods_art_no=goods_art_no,
- image_index=idx,
- action_id=action_id,
- )
- config_list[idx]["record_id"] = record_id
- total_len = len(config_list)
- self.action_state = 1
- self.msg_type = "image_process"
- self.sendSocketMessage(
- code=0,
- msg="MCU 命令已发送完成",
- device_status=2,
- data={
- "goods_art_no": goods_art_no,
- "image_counts": image_counts,
- "action_names": action_names,
- "current_time": datetime.datetime.now(settings.TIME_ZONE).strftime(
- "%Y-%m-%d %H:%M:%S"
- ),
- },
- )
- self.controlDevice("laser_position", 0)
- self.msg_type = "mcu"
- self.is_runn_action = True
- for index, action in enumerate(config_list):
- await asyncio.sleep(0.1)
- if self.is_stop_action == True:
- self.is_stop_action = False
- break
- # action_is_take_picture = action["take_picture"]
- record_id = action["record_id"]
- image_index = -1
- if record_id == -1:
- image_index = -1
- else:
- image_index = index
- program_item = ProgramItem(
- websocket_manager=self.websocket_manager,
- action_data=action,
- mcu=self,
- goods_art_no=goods_art_no,
- image_index=image_index,
- record_id=record_id,
- )
- print("self.action_state===>", self.action_state)
- if self.action_state != 1:
- # 异常终止
- print("action异常终止")
- break
- self.msg_type = "photo_take"
- if not program_item.run(total_len):
- self.sendSocketMessage(
- code=1,
- msg="{} 执行失败~".format(program_item.action_name),
- device_status=0,
- )
- self.to_deal_device(device_name="buzzer", times=3)
- break
- else:
- # self.show_info("{}执行完成~".format(action.action_name))
- self.sendSocketMessage(
- code=0,
- msg="{} 执行完成~".format(program_item.action_name),
- data={"goods_art_no": goods_art_no},
- device_status=2,
- )
- self.msg_type = "mcu"
- # 在第三张图时检查是否有对应图片生成
- # if index == 3:
- # # if not self.image_process_data.check_photo_is_get():
- # self.sendSocketMessage(
- # code=1,
- # msg="未获取到图片数据",
- # device_status=0,
- # )
- # self.action_state = 2
- # return
- if index == total_len - 1:
- # 最后一个初始化处理
- pass
- # self.action_state = 2
- self.action_state = 2
- self.is_runn_action = False
- self.msg_type = "photo_take_finish"
- self.sendSocketMessage(
- code=0,
- msg=f"货号:{goods_art_no},执行完成",
- device_status=2,
- )
- self.msg_type = "mcu"
- self.controlDevice("laser_position", 1)
- async def run_mcu_config_single(self, config_info, goods_art_no,msg_type="run_mcu_single_finish",image_index=-1,record_id=-1):
- '''独立拍照 仅作测试用'''
- if self.checkDevice() == False:
- return
- if config_info:
- self.action_state = 1
- self.msg_type = "mcu"
- await asyncio.sleep(0.1)
- program_item = ProgramItem(
- websocket_manager=self.websocket_manager,
- action_data=config_info,
- mcu=self,
- goods_art_no=goods_art_no,
- image_index=image_index,
- record_id=record_id,
- )
- print("self.action_state===>", self.action_state)
- if self.action_state != 1:
- # 异常终止
- print("action异常终止")
- return
- self.msg_type = "photo_take"
- # if not program_item.run_only_mcu(1):
- # self.sendSocketMessage(
- # code=1,
- # msg="{} 执行失败~".format(program_item.action_name),
- # device_status=0,
- # )
- # self.to_deal_device(device_name="buzzer", times=3)
- # return
- # else:
- program_item.run(3)
- # self.show_info("{}执行完成~".format(action.action_name))
- self.sendSocketMessage(
- code=0,
- msg="{} 执行完成~".format(program_item.action_name),
- data={"goods_art_no": goods_art_no},
- device_status=2,
- )
- self.msg_type = "mcu"
- self.action_state = 2
- self.msg_type = msg_type
- self.sendSocketMessage(
- code=0,
- msg=f"执行完成",
- device_status=2,
- )
- self.msg_type = "mcu"
- async def checkMcuConnection(device_ctrl: DeviceControl):
- if device_ctrl.is_running == True:
- message = {
- "_type": "select_port_name",
- "plugins_mode": "auto_select_com",
- "data": device_ctrl.temp_ports_dict,
- }
- device_ctrl.device_status = 2
- device_ctrl.sendSocketMessage(code=0, msg="MCU连接成功", data=message)
- return
- """实时检测串口是否连接"""
- while True:
- await asyncio.sleep(0.5)
- if device_ctrl.mcu_exit:
- break
- ports_dict = device_ctrl.scan_serial_port()
- device_ctrl.temp_ports_dict = ports_dict
- if not ports_dict:
- # 全部清空 移除所有串口
- if device_ctrl.p_list:
- _p = device_ctrl.p_list.pop()
- device_ctrl.remove_port(_p)
- print("串口未连接,请检查")
- device_ctrl.sendSocketMessage(code=1, msg="串口未连接,请检查",device_status=-1)
- continue
- if ports_dict:
- for index, _i in enumerate(device_ctrl.p_list):
- if _i not in ports_dict:
- _p = device_ctrl.p_list.pop(index)
- device_ctrl.remove_port(_p)
- for _port_name, _port_value in ports_dict.items():
- if _port_name not in device_ctrl.p_list:
- try:
- device_ctrl.p_list.append(_port_name)
- await device_ctrl.add_port_by_linkage(_port_name)
- except BaseException as e:
- print("串口不存在{} {}".format(_port_name, e))
- print("MCU断开连接,已释放")
|