# module_remote_control_v2 import json import time, asyncio import settings from .SerialIns import SerialIns from .BaseClass import BaseClass from sockets.connect_manager import ConnectionManager from databases import SqlQuery, PhotoRecord, CRUD, insert_photo_records from .capture.module_digicam import DigiCam from .capture.module_watch_dog import FileEventHandler # from .BlueToothMode import BlueToothMode class RemoteControlV2(BaseClass): # sign_data = Signal(dict) def __init__(self, bluetooth_ins, websocket_manager: ConnectionManager): # 遥控设备处理--新版遥控器;硅胶按钮 super().__init__(websocket_manager) self.msg_type = "blue_tooth" self.websocket_manager = websocket_manager # self.windows = windows self.serial_ins = None self.bluetooth_ins = bluetooth_ins self.port_name = "" self.bluetooth_address = "" self.connect_state = False self.is_running = False self.goods_art_no = None # 0 闲置;1进行中;2已完成; self.photo_take_state = 0 def to_connect_com(self, port_name, is_test=False): if self.connect_state: return self.close_connect() time.sleep(0.5) try: # 原值为9600 self.serial_ins = SerialIns(port_name=port_name, baud=115200) # self.serial_ins = SerialIns(port_name=port_name, baud=9600) if self.serial_ins.serial_handle: message = { "_type": "show_info", "plugins_mode": "remote_control", "data": "遥控设备V2 打开串口成功", } self.sendSocketMessage( code=0, msg="遥控设备V2 打开串口成功", data=message, device_status=1, ) # print(message) self.connect_state = True message = { "_type": "remote_control_connect", "plugins_mode": "remote_control", "data": port_name, } self.sendSocketMessage(code=0, msg="", data=message, device_status=2) print(message) self.port_name = port_name self.is_running = True self.set_voltage_value(0) self.run() return True else: message = { "_type": "show_info", "plugins_mode": "remote_control", "data": "遥控设备V2 打开串口失败", } self.sendSocketMessage( code=1, msg="遥控设备V2 打开串口失败", data=message, device_status=-1, ) print(message) self.serial_ins = None self.connect_state = False self.set_voltage_value(None) except: message = { "_type": "show_info", "plugins_mode": "remote_control", "data": "遥控设备V2 打开串口失败", } print(message) self.sendSocketMessage( code=1, msg="遥控设备V2 打开串口失败", data=message, device_status=-1 ) self.serial_ins = None self.connect_state = False self.set_voltage_value(None) return False def to_connect_bluetooth(self, address, is_test=False): print("to_connect_bluetooth", self.connect_state) if self.connect_state: return # if self.bluetooth_ins == None: # print("bluetooth_ins 未初始化", bluetooth_mode) # self.bluetooth_ins = BlueToothMode() # else: # self.bluetooth_ins = bluetooth_mode self.close_connect() if self.bluetooth_ins == None: print("bluetooth_ins 未初始化", self.bluetooth_ins) # self.bluetooth_ins = self. self.bluetooth_address = address message = { "_type": "show_info", "plugins_mode": "remote_control", "data": "遥控设备V2 打开蓝牙成功", } print(message) self.sendSocketMessage( code=0, msg="遥控设备V2 打开蓝牙成功", data=message, device_status=2 ) self.connect_state = True self.is_running = True self.set_voltage_value(0) if is_test is False: loop = asyncio.get_event_loop() loop.create_task(self.run()) def close_bluetooth_connect(self): if self.bluetooth_address: print("蓝牙断开") message = { "_type": "show_info", "plugins_mode": "remote_control", "data": "遥控设备V2 蓝牙断开", } print(message) self.sendSocketMessage( code=1, msg="遥控设备V2 蓝牙断开", data=message, device_status=-1 ) self.close_connect() print("关闭蓝牙") def set_voltage_value(self, voltage_value=None, voltage_text=None): device_status = 2 if self.is_running: flag = "接收器已连接 {}".format( "蓝牙" if self.bluetooth_address else "串口" ) self.sendSocketMessage( code=0, msg="遥控设备V2 打开蓝牙成功", data=None, device_status=2 ) self.connect_state = True self.is_running = True else: flag = "接收器未连接" device_status = -1 if voltage_value is None: if voltage_text: print(voltage_text) # 发送电量剩余消息 self.sendSocketMessage(msg=voltage_text, device_status=device_status) else: self.sendSocketMessage(msg=flag, device_status=device_status) else: if voltage_value == 0: print(flag) self.sendSocketMessage(msg=flag, device_status=device_status) else: print("电量:{}%".format(voltage_value)) self.sendSocketMessage( msg="电量:{}%".format(voltage_value), device_status=device_status ) # print("打印===>", flag) def close_connect(self): self.port_name = "" self.bluetooth_address = "" # self.bluetooth_ins = None if self.serial_ins: self.serial_ins.close_serial_port() self.connect_state = False def __del__(self): self.close_connect() def play_sound(self, tip="sound_tips_3"): self.windows.playsound.tips_type = tip self.windows.playsound.start() def handlerAction(self, button_value): """处理拍照动作按键[左 右]""" control_program = "执行左脚程序" if button_value == 1 else "执行右脚程序" if self.goods_art_no == None or self.goods_art_no == "": input_data = { "data": { "action": control_program, "goods_art_no": "", }, "type": "run_mcu", } self.msg_type = "blue_tooth_scan" self.sendSocketMessage( code=0, msg=f"准备执行[{control_program}]", data=input_data, device_status=2, ) self.msg_type = "blue_tooth" return self.photo_take_state = 1 input_data = { "data": { "action": control_program, "goods_art_no": self.goods_art_no, }, "type": "run_mcu", } self.msg_type = "blue_tooth_scan" self.sendSocketMessage( code=0, msg=f"准备执行[{control_program}]", data=input_data, device_status=2, ) self.goods_art_no = None self.msg_type = "blue_tooth" self.photo_take_state = 2 async def handlerTakePhoto(self): """处理单独拍照""" await asyncio.sleep(0.1) print("开始单拍0") session = SqlQuery() crud = CRUD(PhotoRecord) record = crud.read(session=session, order_by="id", ascending=False) print("开始单拍0-读取数据库") if record == None: # 发送失败消息 self.sendSocketMessage( code=1, msg="单拍失败,请先输入货号或扫码进行组合拍摄", data=None, device_status=2, ) else: print("开始单拍1") if record.image_index == 19: self.sendSocketMessage( code=1, msg="单拍失败,单个货号最多允许拍摄20张产品图", data=None, device_status=2, ) return image_index = record.image_index + 1 self.photo_take_state = 1 insert_photo_records( record.image_deal_mode, record.goods_art_no, image_index, record.action_id, ) print("开始单拍1-插入数据") capture_one = DigiCam() try: captrure_folder_path = capture_one.getCaptureFolderPath() watch_dog = FileEventHandler() watch_dog.goods_art_no = record.goods_art_no watch_dog.image_index = image_index watch_dog.start_observer(captrure_folder_path) print("开始单拍1-检查相机") camera_is_connect = capture_one.checkCameraConnect() if camera_is_connect is not True: self.sendSocketMessage(1, "相机未连接,请检查", device_status=-1) return capture_one.run_capture_action("Capture") print("开始单拍1-完成拍照") time.sleep(1) self.msg_type = "photo_take" self.sendSocketMessage( code=0, msg="{} 执行完成~".format( "执行右脚程序" if record.image_deal_mode == 1 else "执行左脚程序" ), data={"goods_art_no": record.goods_art_no}, device_status=2, ) self.msg_type = "blue_tooth" except Exception as e: self.sendSocketMessage(1, "digicam未初始化,请检查", device_status=-1) self.photo_take_state = 0 async def analysis_received_data(self): if not self.connect_state: return await asyncio.sleep(0.01) if self.bluetooth_address: receive_data = self.bluetooth_ins.read_cmd_one( address=self.bluetooth_address ) # print("received data", 1) else: receive_data = self.serial_ins.read_cmd(out_time=1, check=None) # print("received data", 2) # print("self.bluetooth_ins", receive_data) if receive_data is False: self.connect_state = False return False if not receive_data: return if self.photo_take_state != 0: print("正在拍照", self.photo_take_state) return receive_data_parser = receive_data[0] # 数据 结构 command,按命令解析 if receive_data_parser == 1: # self.play_sound("get_qr_code") # 扫码数据 bar_code = receive_data[1:].decode() bar_code = bar_code.replace("\r", "") bar_code = bar_code.replace("\n", "") self.goods_art_no = bar_code message = {"_type": 0, "plugins_mode": "remote_control", "data": bar_code} print(message) self.sendSocketMessage(code=0, msg="", data=message, device_status=2) return if receive_data_parser == 9: # 播放声音 button_value = receive_data[1] data = {"button_value": button_value} message = {"_type": 9, "plugins_mode": "remote_control", "data": data} print(message) if button_value in [1, 2]: # 扫描货号 if self.photo_take_state != 0: self.sendSocketMessage(1, "前置拍照未完成,请稍后", device_status=-1) return print("收到货号信息", self.goods_art_no) self.handlerAction(button_value) self.photo_take_state = 0 if button_value in [3]: # 处理遥控器单拍 self.msg_type = "handler_take_picture" # 0 闲置;1进行中;2已完成; _data = {"type": self.msg_type,"data":None} self.sendSocketMessage(0, "处理单拍消息", data=_data, device_status=-1) self.msg_type = "blue_tooth" if button_value in [9]: # 处理停止 self.msg_type = "stop_action" # 0 闲置;1进行中;2已完成; _data = {"type": self.msg_type, "data": None} self.sendSocketMessage(0, "停止执行组合动作", data=_data, device_status=-1) self.msg_type = "blue_tooth" self.sendSocketMessage(code=0, msg="", data=message, device_status=2) if settings.IS_DEBUG == "true": print("收到按键", button_value) return if receive_data_parser == 10: voltage_value = receive_data[1] self.set_voltage_value(voltage_value) if settings.IS_TEST: print("遥控器V2电量:{}".format(voltage_value)) return # 使用Max17048 查看电量 if receive_data_parser == 12: chg_status = self.get_data_from_receive_data( receive_data=receive_data, start=1, len_data=1 ) soc_percentage = self.get_data_from_receive_data( receive_data=receive_data, start=2, len_data=4 ) soc_percentage = soc_percentage / 100 voltage = self.get_data_from_receive_data( receive_data=receive_data, start=6, len_data=4 ) voltage = voltage / 100 current = self.get_data_from_receive_data( receive_data=receive_data, start=10, len_data=4 ) current = current / 10000 temperature = self.get_data_from_receive_data( receive_data=receive_data, start=14, len_data=4 ) temperature = temperature / 100 adjusted_soc = self.get_data_from_receive_data( receive_data=receive_data, start=18, len_data=4 ) adjusted_soc = adjusted_soc / 100 soft_vision = self.get_data_from_receive_data( receive_data=receive_data, start=22, len_data=1 ) full_status = self.get_data_from_receive_data( receive_data=receive_data, start=23, len_data=1 ) # print("is_charging:{}".format(chg_status)) # print("Battery SOC: {:.2f}%".format(soc_percentage)) # print("Battery Voltage: {:.3f}V".format(voltage)) # print("Average Current: {:.3f}A".format(current)) # print("Chip Temperature: {:.1f}°C".format(temperature)) # print("adjusted Battery soc: {:.2f}%".format(adjusted_soc)) # print("soft_vision:{}".format(soft_vision)) # print("chg_status:{} full_status:{}".format(chg_status, full_status)) if chg_status: t1 = "充电中" else: t1 = "" if full_status: t2 = "已充满" else: t2 = "" self.set_voltage_value( voltage_text="遥控器:{:.1f}% {:.2f}V {}{}".format( min(adjusted_soc / 92.5 * 100, 100), voltage, t1, t2 ) ) return if receive_data[0] == 111: value = ( receive_data[1] << 24 | receive_data[2] << 16 | receive_data[3] << 8 | receive_data[4] ) print("遥控器-测试 value:{}".format(value)) # self.windows.show_label.setText("--------{}".format(value)) if receive_data[0] == 112: bar_code = receive_data[1:].decode() bar_code = bar_code.replace("\r", "") bar_code = bar_code.replace("\n", "") print("read bar_code {}".format(bar_code)) # 通用串口数据解析器 def get_data_from_receive_data( self, receive_data, start, len_data, data_magnification=1 ): # data_magnification 数据放大倍数,或缩小倍数,默认为1 try: if len_data == 1: data = receive_data[start] return data * data_magnification elif len_data == 2: data = receive_data[start] << 8 | receive_data[start + 1] return data * data_magnification elif len_data == 4: data = ( receive_data[start] << 24 | receive_data[start + 1] << 16 | receive_data[start + 2] << 8 | receive_data[start + 3] ) return data * data_magnification return None except: return None async def run(self): # self.show_info.emit("未连接") # self.data_command_sign.emit(data) self.is_running = True print("Running") while 1: await asyncio.sleep(0.01) if not self.connect_state: message = { "_type": "show_info", "plugins_mode": "remote_control", "data": "遥控设备V2 未连接", } print(message) self.sendSocketMessage( code=1, msg="遥控设备V2 未连接", data=message, device_status=-1 ) break await self.analysis_received_data() self.is_running = False if not self.connect_state: message = { "_type": "show_info", "plugins_mode": "remote_control", "data": "遥控设备V2 未连接", } print(message) self.sendSocketMessage( code=1, msg="遥控设备V2 未连接", data=message, device_status=-1 ) self.set_voltage_value(None)