import time # from import_qt_mode import * import asyncio from .SerialIns import SerialIns from .BaseClass import BaseClass from utils.SingletonType import SingletonType from databases import SqlQuery, PhotoRecord, DeviceConfig, CRUD, insert_photo_records from .capture.module_digicam import DigiCam from .capture.module_watch_dog import FileEventHandler from sockets.connect_manager import ConnectionManager import settings,os import logging logger = logging.getLogger(__name__) class LineControl(BaseClass): # sign_data = Signal(dict) def __init__(self, websocket_manager: ConnectionManager): super().__init__(websocket_manager) self.serial_ins = None self.connect_state = False self.is_running = False self.msg_type = "blue_tooth" self.port_name = None self.goods_art_no = None # 0 闲置;1进行中;2已完成; self.photo_take_state = 0 async def to_connect_com(self, port_name): self.port_name = port_name self.close_connect() await asyncio.sleep(0.5) try: # 原值为9600 self.serial_ins = SerialIns(port_name=port_name, baud=115200) if self.serial_ins.serial_handle: self.connect_state = True message = { "_type": "show_info", "plugins_mode": "remote_control", "data": {"msg": "有线遥控器 打开串口成功", "port_name": port_name}, } self.sendSocketMessage( code=0, msg="有线遥控器 打开串口成功", data=message, device_status=2, ) # 循环监听消息 loop = asyncio.get_event_loop() loop.create_task(self.run()) return True else: message = { "_type": "show_info", "plugins_mode": "remote_control", "data": {"msg": "有线遥控器 打开串口失败"}, } self.sendSocketMessage( code=1, msg="有线遥控器 打开串口失败", data=message, device_status=-1, ) self.serial_ins = None self.connect_state = False return False except: message = { "_type": "show_info", "plugins_mode": "remote_control", "data": {"msg": "有线遥控器 打开串口失败"}, } self.sendSocketMessage( code=1, msg="有线遥控器 打开串口失败", data=message, device_status=-1, ) self.serial_ins = None self.connect_state = False return False def close_connect(self): if self.connect_state: self.serial_ins.close_serial_port() self.connect_state = False def __del__(self): self.close_connect() async def to_connect_linecontrol(self): """连接有线控制器""" print("to_connect_linecontrol 连接有线控制器,连接状态", self.connect_state) if self.connect_state: return message = { "_type": "show_info", "plugins_mode": "remote_control", "data": "有线遥控器 打开成功", } self.close_connect() print(message) self.sendSocketMessage( code=0, msg="有线遥控器 打开蓝牙成功", data=message, device_status=2 ) self.connect_state = True self.is_running = True await self.run() def handlerAction(self, button_value): """处理拍照动作按键[左 右]""" control_program = "执行左脚程序" if button_value == 1 else "执行右脚程序" match button_value: case 1: control_program = "执行左脚程序" case 2: control_program = "执行右脚程序" if self.goods_art_no == None or self.goods_art_no == "": input_data = { "data": { "action": control_program, "goods_art_no": "", }, "type": "run_mcu", } self.msg_type = "blue_tooth_scan" self.sendSocketMessage( code=0, msg=f"准备执行[{control_program}]", data=input_data, device_status=2, ) self.msg_type = "blue_tooth" return self.photo_take_state = 1 input_data = { "data": { "action": control_program, "goods_art_no": self.goods_art_no, }, "type": "run_mcu", } self.msg_type = "blue_tooth_scan" self.sendSocketMessage( code=0, msg=f"准备执行[{control_program}]", data=input_data, device_status=2, ) self.goods_art_no = None self.msg_type = "blue_tooth" self.photo_take_state = 2 async def analysis_received_data(self,): await asyncio.sleep(0.01) if not self.connect_state: return receive_data = self.serial_ins.read_cmd(out_time=1, check=0x6B) if receive_data is False: self.connect_state = False return False if not receive_data: return else: print("有线控制器receive_data", receive_data) # print( # "有线控制器 read receive_data {}".format( # self.serial_ins.change_hex_to_int(receive_data) # ) # ) pass # 数据 结构 command,按命令解析 if receive_data[0] == 1: # 扫码数据 bar_code = receive_data[1:].decode() bar_code = bar_code.replace("\r", "") bar_code = bar_code.replace("\n", "") # self.sign_data.emit( # {"_type": 0, "plugins_mode": "remote_control", "data": bar_code} # ) message = {"_type": 0, "plugins_mode": "remote_control", "data": bar_code} print("有线控制器 扫码数据1", message) self.goods_art_no = bar_code self.sendSocketMessage(code=0, msg="", data=message, device_status=2) return if receive_data[0] == 9: button_value = receive_data[1] data = {"button_value": button_value} message = {"_type": 9, "plugins_mode": "remote_control", "data": data} if button_value in [1, 2]: # 扫描货号 if self.photo_take_state != 0: self.sendSocketMessage(1, "前置拍照未完成,请稍后", device_status=-1) return print("收到货号信息", self.goods_art_no) self.saveScanData(self.goods_art_no) self.handlerAction(button_value) self.photo_take_state = 0 if button_value in [3]: # 处理遥控器单拍 self.msg_type = "handler_take_picture" # 0 闲置;1进行中;2已完成; _data = {"type": self.msg_type, "data": None} self.sendSocketMessage(0, "处理单拍消息", data=_data, device_status=-1) self.msg_type = "blue_tooth" if button_value in [9]: # 处理停止 self.msg_type = "stop_action" # 0 闲置;1进行中;2已完成; _data = {"type": self.msg_type, "data": None} self.sendSocketMessage( 0, "停止执行组合动作", data=_data, device_status=-1 ) self.msg_type = "blue_tooth" self.sendSocketMessage(code=0, msg="", data=message, device_status=2) return pass def saveScanData(self, scan_data): """保存扫码数据""" if scan_data == None or scan_data == "": return scan_path = settings.SCAN_DIR if scan_path ==None or scan_path == "": return try: # 验证文件名的有效性 # 替换可能导致问题的字符 invalid_chars = '<>:"/\\|?*' safe_scan_data = scan_data for char in invalid_chars: safe_scan_data = safe_scan_data.replace(char, '_') # 确保文件名不为空 if not safe_scan_data.strip(): logger.info(f"扫码文件,无效的文件名") return # 获取目录下所有txt文件 txt_files = [f for f in os.listdir(scan_path) if f.lower().endswith('.txt')] txt_filename = safe_scan_data + ".txt" new_txt_path = os.path.join(scan_path, txt_filename) # 如果只有一个txt文件,则重命名并覆盖内容 if len(txt_files) == 1: try: old_txt_path = os.path.join(scan_path, txt_files[0]) os.rename(old_txt_path, new_txt_path) # 覆盖文件内容 with open(new_txt_path, 'w', encoding='utf-8') as f: f.write(scan_data) except Exception as e: logger.info(f"扫码文件,重命名或写入文件失败: {e}") # 如果重命名失败,创建新文件 with open(new_txt_path, 'w', encoding='utf-8') as f: f.write(scan_data) # 如果有多个txt文件,删除它们并创建新文件 elif len(txt_files) > 1: for txt_file in txt_files: try: txt_file_path = os.path.join(scan_path, txt_file) os.remove(txt_file_path) except Exception as e: logger.info(f"扫码文件,删除文件失败 {txt_file}: {e}") # 创建新的txt文件 with open(new_txt_path, 'w', encoding='utf-8') as f: f.write(scan_data) # 如果没有txt文件,直接创建新文件 else: with open(new_txt_path, 'w', encoding='utf-8') as f: f.write(scan_data) except Exception as e: logger.info(f"扫码文件,保存文件时发生错误: {e}") async def run(self): self.is_running = True while True: await asyncio.sleep(0.06) if not self.connect_state: message = { "_type": "show_info", "plugins_mode": "remote_control", "data": {"msg": "有线遥控器 未连接"}, } self.sendSocketMessage( code=1, msg="有线遥控器 未连接", data=message, device_status=-1, ) break await self.analysis_received_data() self.is_running = False self.close_connect()