| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300 |
- import time
- # from import_qt_mode import *
- import asyncio
- from .SerialIns import SerialIns
- from .BaseClass import BaseClass
- from utils.SingletonType import SingletonType
- from databases import SqlQuery, PhotoRecord, DeviceConfig, CRUD, insert_photo_records
- from .capture.module_digicam import DigiCam
- from .capture.module_watch_dog import FileEventHandler
- from sockets.connect_manager import ConnectionManager
- import settings,os
- import logging
- logger = logging.getLogger(__name__)
- class LineControl(BaseClass):
- # sign_data = Signal(dict)
- def __init__(self, websocket_manager: ConnectionManager):
- super().__init__(websocket_manager)
- self.serial_ins = None
- self.connect_state = False
- self.is_running = False
- self.msg_type = "blue_tooth"
- self.port_name = None
- self.goods_art_no = None
- # 0 闲置;1进行中;2已完成;
- self.photo_take_state = 0
- async def to_connect_com(self, port_name):
- self.port_name = port_name
- self.close_connect()
- await asyncio.sleep(0.5)
- try:
- # 原值为9600
- self.serial_ins = SerialIns(port_name=port_name, baud=115200)
- if self.serial_ins.serial_handle:
- self.connect_state = True
- message = {
- "_type": "show_info",
- "plugins_mode": "remote_control",
- "data": {"msg": "有线遥控器 打开串口成功", "port_name": port_name},
- }
- self.sendSocketMessage(
- code=0,
- msg="有线遥控器 打开串口成功",
- data=message,
- device_status=2,
- )
- # 循环监听消息
- loop = asyncio.get_event_loop()
- loop.create_task(self.run())
- return True
- else:
- message = {
- "_type": "show_info",
- "plugins_mode": "remote_control",
- "data": {"msg": "有线遥控器 打开串口失败"},
- }
- self.sendSocketMessage(
- code=1,
- msg="有线遥控器 打开串口失败",
- data=message,
- device_status=-1,
- )
- self.serial_ins = None
- self.connect_state = False
- return False
- except:
- message = {
- "_type": "show_info",
- "plugins_mode": "remote_control",
- "data": {"msg": "有线遥控器 打开串口失败"},
- }
- self.sendSocketMessage(
- code=1,
- msg="有线遥控器 打开串口失败",
- data=message,
- device_status=-1,
- )
- self.serial_ins = None
- self.connect_state = False
- return False
- def close_connect(self):
- if self.connect_state:
- self.serial_ins.close_serial_port()
- self.connect_state = False
- def __del__(self):
- self.close_connect()
- async def to_connect_linecontrol(self):
- """连接有线控制器"""
- print("to_connect_linecontrol 连接有线控制器,连接状态", self.connect_state)
- if self.connect_state:
- return
- message = {
- "_type": "show_info",
- "plugins_mode": "remote_control",
- "data": "有线遥控器 打开成功",
- }
- self.close_connect()
- print(message)
- self.sendSocketMessage(
- code=0, msg="有线遥控器 打开蓝牙成功", data=message, device_status=2
- )
- self.connect_state = True
- self.is_running = True
- await self.run()
- def handlerAction(self, button_value):
- """处理拍照动作按键[左 右]"""
- control_program = "执行左脚程序" if button_value == 1 else "执行右脚程序"
- match button_value:
- case 1:
- control_program = "执行左脚程序"
- case 2:
- control_program = "执行右脚程序"
- if self.goods_art_no == None or self.goods_art_no == "":
- input_data = {
- "data": {
- "action": control_program,
- "goods_art_no": "",
- },
- "type": "run_mcu",
- }
- self.msg_type = "blue_tooth_scan"
- self.sendSocketMessage(
- code=0,
- msg=f"准备执行[{control_program}]",
- data=input_data,
- device_status=2,
- )
- self.msg_type = "blue_tooth"
- return
- self.photo_take_state = 1
- input_data = {
- "data": {
- "action": control_program,
- "goods_art_no": self.goods_art_no,
- },
- "type": "run_mcu",
- }
- self.msg_type = "blue_tooth_scan"
- self.sendSocketMessage(
- code=0,
- msg=f"准备执行[{control_program}]",
- data=input_data,
- device_status=2,
- )
- self.goods_art_no = None
- self.msg_type = "blue_tooth"
- self.photo_take_state = 2
- async def analysis_received_data(self,):
- await asyncio.sleep(0.01)
- if not self.connect_state:
- return
- receive_data = self.serial_ins.read_cmd(out_time=1, check=0x6B)
- if receive_data is False:
- self.connect_state = False
- return False
- if not receive_data:
- return
- else:
- print("有线控制器receive_data", receive_data)
- # print(
- # "有线控制器 read receive_data {}".format(
- # self.serial_ins.change_hex_to_int(receive_data)
- # )
- # )
- pass
- # 数据 结构 command,按命令解析
- if receive_data[0] == 1:
- # 扫码数据
- bar_code = receive_data[1:].decode()
- bar_code = bar_code.replace("\r", "")
- bar_code = bar_code.replace("\n", "")
- # self.sign_data.emit(
- # {"_type": 0, "plugins_mode": "remote_control", "data": bar_code}
- # )
- message = {"_type": 0, "plugins_mode": "remote_control", "data": bar_code}
- print("有线控制器 扫码数据1", message)
- self.goods_art_no = bar_code
- self.sendSocketMessage(code=0, msg="", data=message, device_status=2)
- return
- if receive_data[0] == 9:
- button_value = receive_data[1]
- data = {"button_value": button_value}
- message = {"_type": 9, "plugins_mode": "remote_control", "data": data}
- if button_value in [1, 2]:
- # 扫描货号
- if self.photo_take_state != 0:
- self.sendSocketMessage(1, "前置拍照未完成,请稍后", device_status=-1)
- return
- print("收到货号信息", self.goods_art_no)
- self.saveScanData(self.goods_art_no)
- self.handlerAction(button_value)
- self.photo_take_state = 0
- if button_value in [3]:
- # 处理遥控器单拍
- self.msg_type = "handler_take_picture"
- # 0 闲置;1进行中;2已完成;
- _data = {"type": self.msg_type, "data": None}
- self.sendSocketMessage(0, "处理单拍消息", data=_data, device_status=-1)
- self.msg_type = "blue_tooth"
- if button_value in [9]:
- # 处理停止
- self.msg_type = "stop_action"
- # 0 闲置;1进行中;2已完成;
- _data = {"type": self.msg_type, "data": None}
- self.sendSocketMessage(
- 0, "停止执行组合动作", data=_data, device_status=-1
- )
- self.msg_type = "blue_tooth"
- self.sendSocketMessage(code=0, msg="", data=message, device_status=2)
- return
- pass
- def saveScanData(self, scan_data):
- """保存扫码数据"""
- if scan_data == None or scan_data == "":
- return
- scan_path = settings.SCAN_DIR
- if scan_path ==None or scan_path == "":
- return
- try:
- # 验证文件名的有效性
- # 替换可能导致问题的字符
- invalid_chars = '<>:"/\\|?*'
- safe_scan_data = scan_data
- for char in invalid_chars:
- safe_scan_data = safe_scan_data.replace(char, '_')
- # 确保文件名不为空
- if not safe_scan_data.strip():
- logger.info(f"扫码文件,无效的文件名")
- return
-
- # 获取目录下所有txt文件
- txt_files = [f for f in os.listdir(scan_path) if f.lower().endswith('.txt')]
- txt_filename = safe_scan_data + ".txt"
- new_txt_path = os.path.join(scan_path, txt_filename)
- # 如果只有一个txt文件,则重命名并覆盖内容
- if len(txt_files) == 1:
- try:
- old_txt_path = os.path.join(scan_path, txt_files[0])
- os.rename(old_txt_path, new_txt_path)
-
- # 覆盖文件内容
- with open(new_txt_path, 'w', encoding='utf-8') as f:
- f.write(scan_data)
-
- except Exception as e:
- logger.info(f"扫码文件,重命名或写入文件失败: {e}")
-
- # 如果重命名失败,创建新文件
- with open(new_txt_path, 'w', encoding='utf-8') as f:
- f.write(scan_data)
-
- # 如果有多个txt文件,删除它们并创建新文件
- elif len(txt_files) > 1:
- for txt_file in txt_files:
- try:
- txt_file_path = os.path.join(scan_path, txt_file)
- os.remove(txt_file_path)
- except Exception as e:
- logger.info(f"扫码文件,删除文件失败 {txt_file}: {e}")
-
- # 创建新的txt文件
- with open(new_txt_path, 'w', encoding='utf-8') as f:
- f.write(scan_data)
-
- # 如果没有txt文件,直接创建新文件
- else:
- with open(new_txt_path, 'w', encoding='utf-8') as f:
- f.write(scan_data)
-
- except Exception as e:
- logger.info(f"扫码文件,保存文件时发生错误: {e}")
- async def run(self):
- self.is_running = True
- while True:
- await asyncio.sleep(0.06)
- if not self.connect_state:
- message = {
- "_type": "show_info",
- "plugins_mode": "remote_control",
- "data": {"msg": "有线遥控器 未连接"},
- }
- self.sendSocketMessage(
- code=1,
- msg="有线遥控器 未连接",
- data=message,
- device_status=-1,
- )
- break
- await self.analysis_received_data()
- self.is_running = False
- self.close_connect()
|