||
- # module_remote_control_v2
- import json
- import time, asyncio
- import settings
- from .SerialIns import SerialIns
- from .BaseClass import BaseClass
- from sockets.connect_manager import ConnectionManager
- from databases import SqlQuery, PhotoRecord, CRUD, insert_photo_records
- from .capture.module_digicam import DigiCam
- from .capture.module_watch_dog import FileEventHandler
- # from .BlueToothMode import BlueToothMode
- class RemoteControlV2(BaseClass):
- # sign_data = Signal(dict)
- def __init__(self, bluetooth_ins, websocket_manager: ConnectionManager):
- # 遥控设备处理--新版遥控器;硅胶按钮
- super().__init__(websocket_manager)
- self.msg_type = "blue_tooth"
- self.websocket_manager = websocket_manager
- # self.windows = windows
- self.serial_ins = None
- self.bluetooth_ins = bluetooth_ins
- self.port_name = ""
- self.bluetooth_address = ""
- self.connect_state = False
- self.is_running = False
- self.goods_art_no = None
- # 0 闲置;1进行中;2已完成;
- self.photo_take_state = 0
- def to_connect_com(self, port_name, is_test=False):
- if self.connect_state:
- return
- self.close_connect()
- time.sleep(0.5)
- try:
- # 原值为9600
- self.serial_ins = SerialIns(port_name=port_name, baud=115200)
- # self.serial_ins = SerialIns(port_name=port_name, baud=9600)
- if self.serial_ins.serial_handle:
- message = {
- "_type": "show_info",
- "plugins_mode": "remote_control",
- "data": "遥控设备V2 打开串口成功",
- }
- self.sendSocketMessage(
- code=0,
- msg="遥控设备V2 打开串口成功",
- data=message,
- device_status=1,
- )
- # print(message)
- self.connect_state = True
- message = {
- "_type": "remote_control_connect",
- "plugins_mode": "remote_control",
- "data": port_name,
- }
- self.sendSocketMessage(code=0, msg="", data=message, device_status=2)
- print(message)
- self.port_name = port_name
- self.is_running = True
- self.set_voltage_value(0)
- self.run()
- return True
- else:
- message = {
- "_type": "show_info",
- "plugins_mode": "remote_control",
- "data": "遥控设备V2 打开串口失败",
- }
- self.sendSocketMessage(
- code=1,
- msg="遥控设备V2 打开串口失败",
- data=message,
- device_status=-1,
- )
- print(message)
- self.serial_ins = None
- self.connect_state = False
- self.set_voltage_value(None)
- except:
- message = {
- "_type": "show_info",
- "plugins_mode": "remote_control",
- "data": "遥控设备V2 打开串口失败",
- }
- print(message)
- self.sendSocketMessage(
- code=1, msg="遥控设备V2 打开串口失败", data=message, device_status=-1
- )
- self.serial_ins = None
- self.connect_state = False
- self.set_voltage_value(None)
- return False
- def to_connect_bluetooth(self, address, is_test=False):
- print("to_connect_bluetooth", self.connect_state)
- if self.connect_state:
- return
- # if self.bluetooth_ins == None:
- # print("bluetooth_ins 未初始化", bluetooth_mode)
- # self.bluetooth_ins = BlueToothMode()
- # else:
- # self.bluetooth_ins = bluetooth_mode
- self.close_connect()
- if self.bluetooth_ins == None:
- print("bluetooth_ins 未初始化", self.bluetooth_ins)
- # self.bluetooth_ins = self.
- self.bluetooth_address = address
- message = {
- "_type": "show_info",
- "plugins_mode": "remote_control",
- "data": "遥控设备V2 打开蓝牙成功",
- }
- print(message)
- self.sendSocketMessage(
- code=0, msg="遥控设备V2 打开蓝牙成功", data=message, device_status=2
- )
- self.connect_state = True
- self.is_running = True
- self.set_voltage_value(0)
- if is_test is False:
- loop = asyncio.get_event_loop()
- loop.create_task(self.run())
- def close_bluetooth_connect(self):
- if self.bluetooth_address:
- print("蓝牙断开")
- message = {
- "_type": "show_info",
- "plugins_mode": "remote_control",
- "data": "遥控设备V2 蓝牙断开",
- }
- print(message)
- self.sendSocketMessage(
- code=1, msg="遥控设备V2 蓝牙断开", data=message, device_status=-1
- )
- self.close_connect()
- print("关闭蓝牙")
- def set_voltage_value(self, voltage_value=None, voltage_text=None):
- device_status = 2
- if self.is_running:
- flag = "接收器已连接 {}".format(
- "蓝牙" if self.bluetooth_address else "串口"
- )
- self.sendSocketMessage(
- code=0, msg="遥控设备V2 打开蓝牙成功", data=None, device_status=2
- )
- self.connect_state = True
- self.is_running = True
- else:
- flag = "接收器未连接"
- device_status = -1
- if voltage_value is None:
- if voltage_text:
- print(voltage_text)
- # 发送电量剩余消息
- self.sendSocketMessage(msg=voltage_text, device_status=device_status)
- else:
- self.sendSocketMessage(msg=flag, device_status=device_status)
- else:
- if voltage_value == 0:
- print(flag)
- self.sendSocketMessage(msg=flag, device_status=device_status)
- else:
- print("电量:{}%".format(voltage_value))
- self.sendSocketMessage(
- msg="电量:{}%".format(voltage_value), device_status=device_status
- )
- # print("打印===>", flag)
- def close_connect(self):
- self.port_name = ""
- self.bluetooth_address = ""
- # self.bluetooth_ins = None
- if self.serial_ins:
- self.serial_ins.close_serial_port()
- self.connect_state = False
- def __del__(self):
- self.close_connect()
- def play_sound(self, tip="sound_tips_3"):
- self.windows.playsound.tips_type = tip
- self.windows.playsound.start()
- def handlerAction(self, button_value):
- """处理拍照动作按键[左 右]"""
- control_program = "执行左脚程序" if button_value == 1 else "执行右脚程序"
- if self.goods_art_no == None or self.goods_art_no == "":
- input_data = {
- "data": {
- "action": control_program,
- "goods_art_no": "",
- },
- "type": "run_mcu",
- }
- self.msg_type = "blue_tooth_scan"
- self.sendSocketMessage(
- code=0,
- msg=f"准备执行[{control_program}]",
- data=input_data,
- device_status=2,
- )
- self.msg_type = "blue_tooth"
- return
- self.photo_take_state = 1
- input_data = {
- "data": {
- "action": control_program,
- "goods_art_no": self.goods_art_no,
- },
- "type": "run_mcu",
- }
- self.msg_type = "blue_tooth_scan"
- self.sendSocketMessage(
- code=0,
- msg=f"准备执行[{control_program}]",
- data=input_data,
- device_status=2,
- )
- self.goods_art_no = None
- self.msg_type = "blue_tooth"
- self.photo_take_state = 2
- async def handlerTakePhoto(self):
- """处理单独拍照"""
- await asyncio.sleep(0.1)
- print("开始单拍0")
- session = SqlQuery()
- crud = CRUD(PhotoRecord)
- record = crud.read(session=session, order_by="id", ascending=False)
- print("开始单拍0-读取数据库")
- if record == None:
- # 发送失败消息
- self.sendSocketMessage(
- code=1,
- msg="单拍失败,请先输入货号或扫码进行组合拍摄",
- data=None,
- device_status=2,
- )
- else:
- print("开始单拍1")
- if record.image_index == 19:
- self.msg_type = "photo_take"
- self.sendSocketMessage(
- code=1,
- msg="单拍失败,单个货号最多允许拍摄20张产品图",
- data=None,
- device_status=2,
- )
- self.msg_type = "blue_tooth"
- return
- image_index = record.image_index + 1
- self.photo_take_state = 1
- state,record_id=insert_photo_records(
- record.image_deal_mode,
- record.goods_art_no,
- image_index,
- record.action_id,
- )
- print("开始单拍1-插入数据")
- capture_one = DigiCam()
- try:
- watch_dog = FileEventHandler()
- if watch_dog.observer is None:
- captrure_folder_path = capture_one.getCaptureFolderPath()
- watch_dog.start_observer(captrure_folder_path)
- watch_dog.goods_art_no = record.goods_art_no
- watch_dog.image_index = image_index
- watch_dog.record_id = record_id
- print("开始单拍1-检查相机")
- # camera_is_connect = capture_one.checkCameraConnect()
- # if camera_is_connect is not True:
- # self.sendSocketMessage(1, "相机未连接,请检查", device_status=-1)
- # return
- capture_one.run_capture_action("Capture")
- print("开始单拍1-完成拍照")
- time.sleep(1)
- self.msg_type = "photo_take"
- self.sendSocketMessage(
- code=0,
- msg="{} 执行完成~".format(
- "执行右脚程序"
- if record.image_deal_mode == 1
- else "执行左脚程序"
- ),
- data={"goods_art_no": record.goods_art_no},
- device_status=2,
- )
- self.msg_type = "blue_tooth"
- except Exception as e:
- self.sendSocketMessage(1, "digicam未初始化,请检查", device_status=-1)
- self.photo_take_state = 0
- async def analysis_received_data(self):
- if not self.connect_state:
- return
- await asyncio.sleep(0.01)
- if self.bluetooth_address:
- receive_data = self.bluetooth_ins.read_cmd_one(
- address=self.bluetooth_address
- )
- # print("received data", 1)
- else:
- receive_data = self.serial_ins.read_cmd(out_time=1, check=None)
- # print("received data", 2)
- # print("self.bluetooth_ins", receive_data)
- if receive_data is False:
- self.connect_state = False
- return False
- if not receive_data:
- return
- if self.photo_take_state != 0:
- print("正在拍照", self.photo_take_state)
- return
- receive_data_parser = receive_data[0]
- # 数据 结构 command,按命令解析
- if receive_data_parser == 1:
- # self.play_sound("get_qr_code")
- # 扫码数据
- bar_code = receive_data[1:].decode()
- bar_code = bar_code.replace("\r", "")
- bar_code = bar_code.replace("\n", "")
- self.goods_art_no = bar_code
- message = {"_type": 0, "plugins_mode": "remote_control", "data": bar_code}
- print(message)
- self.sendSocketMessage(code=0, msg="", data=message, device_status=2)
- return
- if receive_data_parser == 9:
- # 播放声音
- button_value = receive_data[1]
- data = {"button_value": button_value}
- message = {"_type": 9, "plugins_mode": "remote_control", "data": data}
- print(message)
- if button_value in [1, 2]:
- # 扫描货号
- if self.photo_take_state != 0:
- self.sendSocketMessage(1, "前置拍照未完成,请稍后", device_status=-1)
- return
- print("收到货号信息", self.goods_art_no)
- self.handlerAction(button_value)
- self.photo_take_state = 0
- if button_value in [3]:
- # 处理遥控器单拍
- self.msg_type = "handler_take_picture"
- # 0 闲置;1进行中;2已完成;
- _data = {"type": self.msg_type,"data":None}
- self.sendSocketMessage(0, "处理单拍消息", data=_data, device_status=-1)
- self.msg_type = "blue_tooth"
- if button_value in [9]:
- # 处理停止
- self.msg_type = "stop_action"
- # 0 闲置;1进行中;2已完成;
- _data = {"type": self.msg_type, "data": None}
- self.sendSocketMessage(0, "停止执行组合动作", data=_data, device_status=-1)
- self.msg_type = "blue_tooth"
- self.sendSocketMessage(code=0, msg="", data=message, device_status=2)
- if settings.IS_DEBUG == "true":
- print("收到按键", button_value)
- return
- if receive_data_parser == 10:
- voltage_value = receive_data[1]
- self.set_voltage_value(voltage_value)
- if settings.IS_TEST:
- print("遥控器V2电量:{}".format(voltage_value))
- return
- # 使用Max17048 查看电量
- if receive_data_parser == 12:
- chg_status = self.get_data_from_receive_data(
- receive_data=receive_data, start=1, len_data=1
- )
- soc_percentage = self.get_data_from_receive_data(
- receive_data=receive_data, start=2, len_data=4
- )
- soc_percentage = soc_percentage / 100
- voltage = self.get_data_from_receive_data(
- receive_data=receive_data, start=6, len_data=4
- )
- voltage = voltage / 100
- current = self.get_data_from_receive_data(
- receive_data=receive_data, start=10, len_data=4
- )
- current = current / 10000
- temperature = self.get_data_from_receive_data(
- receive_data=receive_data, start=14, len_data=4
- )
- temperature = temperature / 100
- adjusted_soc = self.get_data_from_receive_data(
- receive_data=receive_data, start=18, len_data=4
- )
- adjusted_soc = adjusted_soc / 100
- soft_vision = self.get_data_from_receive_data(
- receive_data=receive_data, start=22, len_data=1
- )
- full_status = self.get_data_from_receive_data(
- receive_data=receive_data, start=23, len_data=1
- )
- # print("is_charging:{}".format(chg_status))
- # print("Battery SOC: {:.2f}%".format(soc_percentage))
- # print("Battery Voltage: {:.3f}V".format(voltage))
- # print("Average Current: {:.3f}A".format(current))
- # print("Chip Temperature: {:.1f}°C".format(temperature))
- # print("adjusted Battery soc: {:.2f}%".format(adjusted_soc))
- # print("soft_vision:{}".format(soft_vision))
- # print("chg_status:{} full_status:{}".format(chg_status, full_status))
- if chg_status:
- t1 = "充电中"
- else:
- t1 = ""
- if full_status:
- t2 = "已充满"
- else:
- t2 = ""
- self.set_voltage_value(
- voltage_text="遥控器:{:.1f}% {:.2f}V {}{}".format(
- min(adjusted_soc / 92.5 * 100, 100), voltage, t1, t2
- )
- )
- return
- if receive_data[0] == 111:
- value = (
- receive_data[1] << 24
- | receive_data[2] << 16
- | receive_data[3] << 8
- | receive_data[4]
- )
- print("遥控器-测试 value:{}".format(value))
- # self.windows.show_label.setText("--------{}".format(value))
- if receive_data[0] == 112:
- bar_code = receive_data[1:].decode()
- bar_code = bar_code.replace("\r", "")
- bar_code = bar_code.replace("\n", "")
- print("read bar_code {}".format(bar_code))
- # 通用串口数据解析器
- def get_data_from_receive_data(
- self, receive_data, start, len_data, data_magnification=1
- ):
- # data_magnification 数据放大倍数,或缩小倍数,默认为1
- try:
- if len_data == 1:
- data = receive_data[start]
- return data * data_magnification
- elif len_data == 2:
- data = receive_data[start] << 8 | receive_data[start + 1]
- return data * data_magnification
- elif len_data == 4:
- data = (
- receive_data[start] << 24
- | receive_data[start + 1] << 16
- | receive_data[start + 2] << 8
- | receive_data[start + 3]
- )
- return data * data_magnification
- return None
- except:
- return None
- async def run(self):
- # self.show_info.emit("未连接")
- # self.data_command_sign.emit(data)
- self.is_running = True
- print("Running")
- while 1:
- await asyncio.sleep(0.01)
- if not self.connect_state:
- message = {
- "_type": "show_info",
- "plugins_mode": "remote_control",
- "data": "遥控设备V2 未连接",
- }
- print(message)
- self.sendSocketMessage(
- code=1, msg="遥控设备V2 未连接", data=message, device_status=-1
- )
- break
- await self.analysis_received_data()
- self.is_running = False
- if not self.connect_state:
- message = {
- "_type": "show_info",
- "plugins_mode": "remote_control",
- "data": "遥控设备V2 未连接",
- }
- print(message)
- self.sendSocketMessage(
- code=1, msg="遥控设备V2 未连接", data=message, device_status=-1
- )
- self.set_voltage_value(None)
|