import time # from import_qt_mode import * import asyncio from .SerialIns import SerialIns from .BaseClass import BaseClass from utils.SingletonType import SingletonType from databases import SqlQuery, PhotoRecord, DeviceConfig, CRUD, insert_photo_records from .capture.module_digicam import DigiCam from .capture.module_watch_dog import FileEventHandler class LineControl(BaseClass, metaclass=SingletonType): # sign_data = Signal(dict) def __init__(self, windows=None): super().__init__() self.windows = windows self.serial_ins = None self.connect_state = False self.is_running = False self.msg_type = "line_control" self.goods_art_no = None # 0 闲置;1进行中;2已完成; self.photo_take_state = 0 async def to_connect_com(self, port_name): self.close_connect() await asyncio.sleep(0.5) try: # 原值为9600 self.serial_ins = SerialIns(port_name=port_name, baud=115200) if self.serial_ins.serial_handle: self.connect_state = True message = { "_type": "show_info", "plugins_mode": "remote_control", "data": {"msg": "有线遥控器 打开串口成功", "port_name": port_name}, } self.sendSocketMessage( code=0, msg="有线遥控器 打开串口成功", data=message, device_status=2, ) # 循环监听消息 await self.run() return True else: message = { "_type": "show_info", "plugins_mode": "remote_control", "data": {"msg": "有线遥控器 打开串口失败"}, } self.sendSocketMessage( code=1, msg="有线遥控器 打开串口失败", data=message, device_status=-1, ) self.serial_ins = None self.connect_state = False except: message = { "_type": "show_info", "plugins_mode": "remote_control", "data": {"msg": "有线遥控器 打开串口失败"}, } self.sendSocketMessage( code=1, msg="有线遥控器 打开串口失败", data=message, device_status=-1, ) self.serial_ins = None self.connect_state = False return False def close_connect(self): if self.connect_state: self.serial_ins.close_serial_port() self.connect_state = False def __del__(self): self.close_connect() def handlerAction(self, button_value): """处理拍照动作按键[左 右]""" control_program = "执行左脚程序" if button_value == 1 else "执行右脚程序" if self.goods_art_no == None or self.goods_art_no == "": input_data = { "data": { "action": control_program, "goods_art_no": "", }, "type": "run_mcu", } self.msg_type = "blue_tooth_scan" self.sendSocketMessage( code=0, msg=f"准备执行[{control_program}]", data=input_data, device_status=2, ) self.msg_type = "blue_tooth" return self.photo_take_state = 1 input_data = { "data": { "action": control_program, "goods_art_no": self.goods_art_no, }, "type": "run_mcu", } self.msg_type = "blue_tooth_scan" self.sendSocketMessage( code=0, msg=f"准备执行[{control_program}]", data=input_data, device_status=2, ) self.goods_art_no = None self.msg_type = "blue_tooth" self.photo_take_state = 2 async def handlerTakePhoto(self): """处理单独拍照""" await asyncio.sleep(0.1) print("开始单拍0") session = SqlQuery() crud = CRUD(PhotoRecord) record = crud.read(session=session, order_by="id", ascending=False) print("开始单拍0-读取数据库") if record == None: # 发送失败消息 self.sendSocketMessage( code=1, msg="单拍失败,请先输入货号或扫码进行组合拍摄", data=None, device_status=2, ) else: print("开始单拍1") if record.image_index == 19: self.msg_type = "photo_take" self.sendSocketMessage( code=1, msg="单拍失败,单个货号最多允许拍摄20张产品图", data=None, device_status=2, ) self.msg_type = "blue_tooth" return deviceConfig = CRUD(DeviceConfig) deviceConfigData = deviceConfig.read( session=session, conditions={"id": record.action_id} ) select_tab_id = deviceConfigData.tab_id AllTabConfig = deviceConfig.read_all( session=session, conditions={"tab_id": select_tab_id} ) action_id = 0 if AllTabConfig[len(AllTabConfig) - 1].take_picture == True: action_id = AllTabConfig[0].id else: action_id = AllTabConfig[len(AllTabConfig) - 1].id image_index = record.image_index + 1 self.photo_take_state = 1 state, record_id = insert_photo_records( record.image_deal_mode, record.goods_art_no, image_index, action_id, ) print("开始单拍1-插入数据") capture_one = DigiCam() try: watch_dog = FileEventHandler() if watch_dog.observer is None: captrure_folder_path = capture_one.getCaptureFolderPath() watch_dog.start_observer(captrure_folder_path) watch_dog.goods_art_no = record.goods_art_no watch_dog.image_index = image_index watch_dog.record_id = record_id print("开始单拍1-检查相机") # camera_is_connect = capture_one.checkCameraConnect() # if camera_is_connect is not True: # self.sendSocketMessage(1, "相机未连接,请检查", device_status=-1) # return capture_one.run_capture_action("Capture") print("开始单拍1-完成拍照") await asyncio.sleep(1) self.msg_type = "photo_take" self.sendSocketMessage( code=0, msg="{} 执行完成~".format( "执行右脚程序" if record.image_deal_mode == 1 else "执行左脚程序" ), data={"goods_art_no": record.goods_art_no}, device_status=2, ) self.msg_type = "blue_tooth" except Exception as e: self.sendSocketMessage(1, "digicam未初始化,请检查", device_status=-1) self.photo_take_state = 0 async def analysis_received_data(self): await asyncio.sleep(0.01) if not self.connect_state: return receive_data = self.serial_ins.read_cmd(out_time=1, check=0x6B) if receive_data is False: self.connect_state = False return False if not receive_data: return else: print( "read receive_data {}".format( self.serial_ins.change_hex_to_int(receive_data) ) ) pass # 数据 结构 command,按命令解析 if receive_data[0] == 1: # 扫码数据 bar_code = receive_data[1:].decode() bar_code = bar_code.replace("\r", "") bar_code = bar_code.replace("\n", "") self.sign_data.emit( {"_type": 0, "plugins_mode": "remote_control", "data": bar_code} ) return if receive_data[0] == 9: switch_1, switch_2, button_value = receive_data[1:] data = { "switch_1": True if switch_1 == 1 else False, "switch_2": True if switch_2 == 1 else False, "button_value": button_value, } self.sign_data.emit( {"_type": 9, "plugins_mode": "remote_control", "data": data} ) return pass async def run(self): # self.show_info.emit("未连接") # self.data_command_sign.emit(data) self.is_running = True while self.is_running: await asyncio.sleep(0.06) if not self.connect_state: self.sign_data.emit( { "_type": "show_info", "plugins_mode": "remote_control", "data": "遥控设备 未连接", } ) break await self.analysis_received_data() self.is_running = False if not self.connect_state: self.sign_data.emit( { "_type": "show_info", "plugins_mode": "remote_control", "data": "遥控设备 未连接", } )