import time # from import_qt_mode import * import asyncio from .SerialIns import SerialIns from .BaseClass import BaseClass from utils.SingletonType import SingletonType from databases import SqlQuery, PhotoRecord, DeviceConfig, CRUD, insert_photo_records from .capture.module_digicam import DigiCam from .capture.module_watch_dog import FileEventHandler from sockets.connect_manager import ConnectionManager class LineControl(BaseClass): # sign_data = Signal(dict) def __init__(self, websocket_manager: ConnectionManager): super().__init__(websocket_manager) self.serial_ins = None self.connect_state = False self.is_running = False self.msg_type = "line_control" self.goods_art_no = None # 0 闲置;1进行中;2已完成; self.photo_take_state = 0 async def to_connect_com(self, port_name): self.close_connect() await asyncio.sleep(0.5) try: # 原值为9600 self.serial_ins = SerialIns(port_name=port_name, baud=115200) if self.serial_ins.serial_handle: self.connect_state = True message = { "_type": "show_info", "plugins_mode": "remote_control", "data": {"msg": "有线遥控器 打开串口成功", "port_name": port_name}, } self.sendSocketMessage( code=0, msg="有线遥控器 打开串口成功", data=message, device_status=2, ) # 循环监听消息 await self.run() return True else: message = { "_type": "show_info", "plugins_mode": "remote_control", "data": {"msg": "有线遥控器 打开串口失败"}, } self.sendSocketMessage( code=1, msg="有线遥控器 打开串口失败", data=message, device_status=-1, ) self.serial_ins = None self.connect_state = False except: message = { "_type": "show_info", "plugins_mode": "remote_control", "data": {"msg": "有线遥控器 打开串口失败"}, } self.sendSocketMessage( code=1, msg="有线遥控器 打开串口失败", data=message, device_status=-1, ) self.serial_ins = None self.connect_state = False return False def close_connect(self): if self.connect_state: self.serial_ins.close_serial_port() self.connect_state = False def __del__(self): self.close_connect() async def to_connect_linecontrol(self): """连接有线控制器""" print("to_connect_linecontrol 连接有线控制器,连接状态", self.connect_state) if self.connect_state: return message = { "_type": "show_info", "plugins_mode": "remote_control", "data": "有线遥控器 打开成功", } self.close_connect() print(message) self.sendSocketMessage( code=0, msg="有线遥控器 打开蓝牙成功", data=message, device_status=2 ) self.connect_state = True self.is_running = True await self.run() def handlerAction(self, button_value): """处理拍照动作按键[左 右]""" control_program = "执行左脚程序" if button_value == 1 else "执行右脚程序" match button_value: case 1: control_program = "执行左脚程序" case 2: control_program = "执行右脚程序" if self.goods_art_no == None or self.goods_art_no == "": input_data = { "data": { "action": control_program, "goods_art_no": "", }, "type": "run_mcu", } self.msg_type = "blue_tooth_scan" self.sendSocketMessage( code=0, msg=f"准备执行[{control_program}]", data=input_data, device_status=2, ) self.msg_type = "blue_tooth" return self.photo_take_state = 1 input_data = { "data": { "action": control_program, "goods_art_no": self.goods_art_no, }, "type": "run_mcu", } self.msg_type = "blue_tooth_scan" self.sendSocketMessage( code=0, msg=f"准备执行[{control_program}]", data=input_data, device_status=2, ) self.goods_art_no = None self.msg_type = "line_control" self.photo_take_state = 2 async def analysis_received_data(self,): await asyncio.sleep(0.01) if not self.connect_state: return receive_data = self.serial_ins.read_cmd(out_time=1, check=0x6B) if receive_data is False: self.connect_state = False return False if not receive_data: return else: print("有线控制器receive_data", receive_data) # print( # "有线控制器 read receive_data {}".format( # self.serial_ins.change_hex_to_int(receive_data) # ) # ) pass # 数据 结构 command,按命令解析 if receive_data[0] == 1: # 扫码数据 bar_code = receive_data[1:].decode() bar_code = bar_code.replace("\r", "") bar_code = bar_code.replace("\n", "") # self.sign_data.emit( # {"_type": 0, "plugins_mode": "remote_control", "data": bar_code} # ) message = {"_type": 0, "plugins_mode": "remote_control", "data": bar_code} print("有线控制器 扫码数据1", message) self.goods_art_no = bar_code self.sendSocketMessage(code=0, msg="", data=message, device_status=2) return if receive_data[0] == 9: button_value = receive_data[1] data = {"button_value": button_value} message = {"_type": 9, "plugins_mode": "remote_control", "data": data} if button_value in [1, 2]: # 扫描货号 if self.photo_take_state != 0: self.sendSocketMessage(1, "前置拍照未完成,请稍后", device_status=-1) return print("收到货号信息", self.goods_art_no) self.handlerAction(button_value) self.photo_take_state = 0 if button_value in [3]: # 处理遥控器单拍 self.msg_type = "handler_take_picture" # 0 闲置;1进行中;2已完成; _data = {"type": self.msg_type, "data": None} self.sendSocketMessage(0, "处理单拍消息", data=_data, device_status=-1) self.msg_type = "line_control" if button_value in [9]: # 处理停止 self.msg_type = "stop_action" # 0 闲置;1进行中;2已完成; _data = {"type": self.msg_type, "data": None} self.sendSocketMessage( 0, "停止执行组合动作", data=_data, device_status=-1 ) self.msg_type = "line_control" self.sendSocketMessage(code=0, msg="", data=message, device_status=2) return pass async def run(self): self.is_running = True while True: await asyncio.sleep(0.06) if not self.connect_state: message = { "_type": "show_info", "plugins_mode": "remote_control", "data": {"msg": "有线遥控器 未连接"}, } self.sendSocketMessage( code=1, msg="有线遥控器 未连接", data=message, device_status=-1, ) break await self.analysis_received_data() self.is_running = False if not self.connect_state: message = { "_type": "show_info", "plugins_mode": "remote_control", "data": {"msg": "有线遥控器 未连接"}, } self.sendSocketMessage( code=1, msg="有线遥控器 未连接", data=message, device_status=-1, )