import json import datetime import random import zmq import asyncio from PIL import Image from io import BytesIO import base64, threading import zmq, sys, time from utils.SingletonType import SingletonType import settings import logging from databases import SqlQuery, CRUD, SysConfigs from utils.common import message_queue from models import SysConfigParams logger = logging.getLogger(__name__) # 定义为单例模式,避免被重复实例化 class SmartShooter(metaclass=SingletonType): SET_REQ = "tcp://127.0.0.1:54544" LISTEN_REQ = "tcp://127.0.0.1:54543" def __init__(self, websocket_manager): self.msg_type = "smart_shooter" self.websocket_manager = websocket_manager # 是否停止监听 self.stop_listen = False # 相机连接状态 self.connect_status = False # 实时预览状态 self.perview_state = False self.callback_listen = None self.listen_init = None self.websocket = None # self.main_loop = None def __send_tcp_message(self, socket, msg): # await asyncio.sleep(0.01) socket.send_string(json.dumps(msg)) rep = socket.recv() str_msg = rep.decode("utf-8") json_msg = json.loads(str_msg) return json_msg def __create_req(self, time_out=5) -> tuple[zmq.Socket, zmq.Context]: context = zmq.Context() req_socket = context.socket(zmq.REQ) # 设置发送超时为 5000 毫秒(5 秒) req_socket.setsockopt(zmq.RCVTIMEO, time_out * 1000) # 设置接收超时为 5000 毫秒(5 秒) req_socket.setsockopt(zmq.SNDTIMEO, time_out * 1000) req_socket.setsockopt(zmq.LINGER, 0) # 设置为 0 表示不等待未完成的操作 req_socket.connect(self.SET_REQ) return req_socket, context async def sendMessageSocket(self, message): if self.websocket_manager and self.websocket: await self.websocket_manager.send_personal_message(message, self.websocket) def __create_listen(self) -> tuple[zmq.Socket, zmq.Context]: context = zmq.Context() listen_socket = context.socket(zmq.SUB) listen_socket.setsockopt(zmq.SUBSCRIBE, b"") # 设置发送超时为 5000 毫秒(5 秒) listen_socket.setsockopt(zmq.RCVTIMEO, 5000) # 设置接收超时为 5000 毫秒(5 秒) listen_socket.setsockopt(zmq.SNDTIMEO, 5000) listen_socket.setsockopt(zmq.LINGER, 0) # 设置为 0 表示不等待未完成的操作 listen_socket.connect(self.LISTEN_REQ) return listen_socket, context async def GetCameraProperty(self, CameraKey=None): """获取相机属性""" await asyncio.sleep(0.01) """ 实时获取相机信息,是否连接、软件是否被打开 """ socket, context = self.__create_req(time_out=2) try: req = {} req["msg_type"] = "Request" req["msg_id"] = "GetCamera" req["msg_seq_num"] = 0 req["CameraSelection"] = "All" if CameraKey is not None: req["CameraSelection"] = "Single" req["CameraKey"] = CameraKey json_msg = self.__send_tcp_message(socket, req) msg_result = json_msg.get("msg_result") if not msg_result: self.connect_status = False msg_send = "相机未连接或软件未打开" return False, msg_send cameraInfo = json_msg.get("CameraInfo") if cameraInfo == None or len(cameraInfo) == 0: self.connect_status = False msg_send = "相机未连接" return False, "相机未连接" # 链接的相机 CameraStatus = False CameraIndex = -1 for cam_idx, item in enumerate(cameraInfo): CameraStatus = item.get("CameraStatus") in ["Ready", "Busy"] if CameraStatus == True: CameraIndex = cam_idx break # CameraStatus = connect_camera.get("CameraStatus") if not CameraStatus: self.connect_status = False msg_send = "相机未连接" return False, msg_send self.connect_status = True msg_send = "相机已连接" return True, cameraInfo[CameraIndex].get("CameraPropertyInfo") except zmq.Again: msg_send = "相机未连接或软件未打开" return False, msg_send except Exception as e: self.connect_status = False socket.close() context.term() msg_send = "相机未连接或软件未打开" return False, msg_send def get_iso_range(self,camera_info): """ 从相机数据中获取 ISO 的可调范围 :param camera_data: 相机状态数据列表 (即你提供的那个长列表) :return: ISO 范围列表,如果未找到则返回 None """ # 获取属性信息列表 property_info_list = camera_info.get('CameraPropertyInfo', []) for prop in property_info_list: # 查找 CameraPropertyType 等于 'ISO' 的项 if prop.get('CameraPropertyType') == 'ISO': return prop.get('CameraPropertyRange') return None async def GetCameraInfo(self, is_send=True, msg_type="", CameraKey=None,isMultCameraMode=False): await asyncio.sleep(0.001) # self.msg_type = msg_type """ 实时获取相机信息,是否连接、软件是否被打开 """ socket, context = self.__create_req(time_out=5) try: req = {} req["msg_type"] = "Request" req["msg_id"] = "GetCamera" req["msg_seq_num"] = 0 req["CameraSelection"] = "All" if CameraKey is not None: req["CameraSelection"] = "Single" req["CameraKey"] = CameraKey json_msg = self.__send_tcp_message(socket, req) msg_result = json_msg.get("msg_result") if not msg_result: self.connect_status = False msg_send = f"请检查{CameraKey},相机是否连接" if CameraKey else f"请检查相机是否连接" if is_send: message = { "code": 1, "msg": msg_send, "data": msg_result, "CameraKey":CameraKey, "msg_type": msg_type, "device_status": -1, } await self.sendMessageSocket(message) return False, msg_send cameraInfo = json_msg.get("CameraInfo") if cameraInfo == None or len(cameraInfo) == 0: self.connect_status = False msg_send = ( f"请检查{CameraKey},相机是否连接" if CameraKey else f"请检查相机是否连接" ) if is_send: message = { "code": 1, "msg": msg_send, "data": msg_result, "msg_type": msg_type, "device_status": -1, } await self.sendMessageSocket(message) return False, "相机未连接" # 链接的相机 CameraStatus = any( item.get("CameraStatus") in ["Ready", "Busy"] for item in cameraInfo ) # 在这里处理 CameraLists = [ { "CameraSelection": item.get("CameraSelection"), "CameraKey": item.get("CameraKey"), "CameraName": item.get("CameraName"), "CameraStatus": item.get("CameraStatus") in ["Ready", "Busy"], "CameraISO": self.get_iso_range(item), } for item in cameraInfo ] if not CameraStatus: self.connect_status = False msg_send = ( f"请检查{CameraKey},相机是否连接" if CameraKey else f"请检查相机是否连接" ) if is_send: message = { "code": 1, "msg": msg_send, "data": msg_result, "CameraKey":CameraKey, "msg_type": msg_type, "device_status": -1, } await self.sendMessageSocket(message) return False, msg_send self.connect_status = True msg_send = f"相机{CameraKey}已连接" if CameraKey else "相机已连接" # print("CameraLists", CameraLists) if is_send: message = { "code": 0, "msg": msg_send, "data": msg_result, "CameraLists":CameraLists, "msg_type": msg_type, "device_status": 2, } await self.sendMessageSocket(message) # print("相机已连接状态信息---->", cameraInfo) # self.initConfigIsoSettings( # CameraLists=CameraLists, isMultCameraMode=isMultCameraMode # ) return True, "相机已连接" except zmq.Again as e: self.connect_status = False socket.close() context.term() print("获取相机信息超时,继续监听...",e) msg_send = f"请检查{CameraKey},相机是否连接" if CameraKey else f"请检查相机是否连接" return False, msg_send except Exception as e: print("相机状态获取异常", e) self.connect_status = False socket.close() context.term() msg_send = ( f"请检查{CameraKey},相机是否连接" if CameraKey else f"请检查相机是否连接" ) if is_send: message = { "code": 1, "msg": msg_send, "data": None, "msg_type": msg_type, "device_status": -1, } await self.sendMessageSocket(message) return False, msg_send async def SetCameraFPS(self, fps=5, CameraKey=None): """ 激活相机预览 """ camera_states, _ = await self.GetCameraInfo(is_send=False, CameraKey=CameraKey) if not camera_states: return False, "请先连接相机" socket, context = self.__create_req() try: req = {} req["msg_type"] = "Request" req["msg_id"] = "LiveviewFPS" req["msg_seq_num"] = 0 req["CameraSelection"] = "All" if CameraKey is not None: req["CameraSelection"] = "Single" req["CameraKey"] = CameraKey req["CameraLiveviewFPS"] = fps json_msg = self.__send_tcp_message(socket, req) msg_result = json_msg.get("msg_result") if not msg_result: self.perview_state = False return False, "设置失败" return True, "设置失败" except: self.perview_state = False socket.close() context.term() return False, "相机未连接或软件未打开" async def setCameraProperty(self, property="ISO", value=0, CameraKey=None): # SetProperty camera_states, _ = await self.GetCameraInfo(is_send=False, CameraKey=CameraKey) if not camera_states: return False, "请先连接相机" socket, context = self.__create_req() try: req = {} req["msg_type"] = "Request" req["msg_id"] = "SetProperty" req["msg_seq_num"] = 0 req["CameraSelection"] = "All" if CameraKey is not None: req["CameraSelection"] = "Single" req["CameraKey"] = CameraKey req["CameraPropertyType"] = str(property) req["CameraPropertyValue"] = str(value) print("发送设置属性请求", req) json_msg = self.__send_tcp_message(socket, req) msg_result = json_msg.get("msg_result") if not msg_result: return False, f"{property}设置失败" return True, f"{property}设置成功" except zmq.Again: print(f"设置{property}超时,继续监听...") except: self.perview_state = False socket.close() context.term() msg_send = "相机未连接或软件未打开" return False, msg_send def getConfigIso(self, CameraKey=None): """获取ISO配置信息""" camera_configs = settings.getSysConfigs( "camera_configs", "iso_config", None, ) Itemiso = {} for idx, item in enumerate(camera_configs): itemConfig = camera_configs[item] if itemConfig == {}: continue ItemCameraKey = itemConfig.get("CameraKey") if ItemCameraKey == CameraKey: Itemiso = itemConfig.get("iso") break low_iso = Itemiso.get("low", 100) high_iso = Itemiso.get("high", 6400) return low_iso, high_iso def initConfigIsoSettings(self, CameraLists=[],isMultCameraMode=False): if not CameraLists: return None """获取ISO配置信息""" camera_configs = settings.getSysConfigs( "camera_configs", "iso_config", None, ) temp_A_point = camera_configs.get("A", None) if temp_A_point is not None: print("已配置无需更新") itemSettings = CameraLists[0] OldCameraKey = temp_A_point.get("CameraKey", None) if OldCameraKey == itemSettings.get("CameraKey", None): print("相机无变动。无需自动更改配置") return None basic_iso = temp_A_point.get("iso", {"low": 100, "high": 6400}) if isMultCameraMode == False: points = {"A": {}} print("相机发生变动。需要更改配置") # 如果客户是单相机版本用户,需要每次同步camera信息 points["A"] = { **itemSettings, "iso": basic_iso, } sys_iso_config = {"key": "camera_configs", "value": {"iso_config": points}} sys_iso_config: SysConfigParams print("单相机用户同步相机配置", sys_iso_config) settings.updateSysConfigs(params=sys_iso_config) # 同步本地到线上 settings.sync_sys_configs2Online() return None points = {"A": {}, "B": {}, "C": {}} for idx,item in enumerate(points): low_iso = camera_configs.get("low", 100) high_iso = camera_configs.get("high", 6400) if idx > len(CameraLists) -1: points[item] = { "iso": {"low": low_iso, "high": high_iso}, } continue itemSettings = CameraLists[idx] points[item] = {**itemSettings, "iso": {"low": low_iso, "high": high_iso}} sys_iso_config = {"key": "camera_configs", "value": {"iso_config": points}} sys_iso_config: SysConfigParams print("首次初始化", sys_iso_config) settings.updateSysConfigs(params=sys_iso_config) # 同步本地到线上 settings.sync_sys_configs2Online() async def EnableCameraPreview( self, enable_status=True, msg_type="", CameraKey=None ): print("收到得msg_type====>>>", msg_type) self.msg_type = msg_type await self.SetCameraFPS(5, CameraKey=CameraKey) """ 激活相机预览 """ camera_states, _ = await self.GetCameraInfo(is_send=False, CameraKey=CameraKey) if not camera_states: msg_send = "预览启用失败" message = { "code": 0, "msg": msg_send, "data": None, "msg_type": msg_type, "device_status": -1, } logger.error(message) await self.sendMessageSocket(message) return False, "请先连接相机" try: low_iso, high_iso = self.getConfigIso(CameraKey=CameraKey) print("LOW_ISO", low_iso) print("HIGH_ISO", high_iso) # 等于auto就不设置 if enable_status == True: if str(high_iso).lower() != "auto": set_state,set_msg = await self.setCameraProperty( property="ISO", value=str(high_iso), CameraKey=CameraKey ) else: print("high_iso 等于auto就不设置") if enable_status == False: if str(low_iso).lower() != "auto": set_state,set_msg = await self.setCameraProperty( property="ISO", value=str(low_iso), CameraKey=CameraKey ) else: print("low_iso 等于auto就不设置") # print("设置状态", set_state, set_msg) socket, context = self.__create_req() req = {} req["msg_type"] = "Request" req["msg_id"] = "EnableLiveview" req["msg_seq_num"] = 0 req["CameraSelection"] = "All" if CameraKey is not None: req["CameraSelection"] = "Single" req["CameraKey"] = CameraKey req["Enable"] = enable_status json_msg = self.__send_tcp_message(socket, req) msg_result = json_msg.get("msg_result") if not msg_result: self.perview_state = False msg_send = "预览启用失败" message = { "code": 0, "msg": msg_send, "data": None, "msg_type": msg_type, "device_status": -1, } logger.error(message) await self.sendMessageSocket(message) return False, "预览启用失败" msg_send = "预览启用成功" if enable_status else "预览关闭成功" message = { "code": 0, "msg": msg_send, "data": None, "msg_type": msg_type, "device_status": 2, } logger.error(message) await self.sendMessageSocket(message) return True, "预览启用成功" if enable_status else "预览关闭成功" except zmq.Again: print("启动预览超时,继续监听...") except: self.perview_state = False socket.close() context.term() msg_send = "相机未连接或软件未打开" message = { "code": 1, "msg": msg_send, "data": None, "msg_type": msg_type, "device_status": -1, } logger.error(message) await self.sendMessageSocket(message) return False, "相机未连接或软件未打开" async def CameraAutofocus(self, CameraKey=None): """ 相机自动对焦 """ camera_states, _ = await self.GetCameraInfo(is_send=False, CameraKey=CameraKey) print("CameraAutofocus 执行对焦") if not camera_states: return False, "请先连接相机" socket, context = self.__create_req() try: req = {} req["msg_type"] = "Request" req["msg_id"] = "Autofocus" req["msg_seq_num"] = 0 req["CameraSelection"] = "All" if CameraKey is not None: req["CameraSelection"] = "Single" req["CameraKey"] = CameraKey start_time = time.time() # 添加对焦开始时间记录 json_msg = self.__send_tcp_message(socket, req) end_time = time.time() # 添加对焦结束时间记录 logger.info(f"对焦通信耗时: {end_time - start_time:.4f} 秒") print("json_msg", json_msg) msg_result = json_msg.get("msg_result") if not msg_result: return False, "对焦失败" return True, "对焦成功" except zmq.Again: print("对焦超时,继续监听...") except: socket.close() context.term() return False, "相机未连接或软件未打开" async def CameraShooter( self, msg_type="", goods_art_no="", id=0, is_af=False, delay=0.5, CameraKey=None ): # 这里延迟一秒钟 等待前置命令完成 await asyncio.sleep(delay) # 对焦混用 if is_af: # await self.EnableCameraPreview( # enable_status=True, # msg_type="smart_shooter_enable_preview_status", # CameraKey=CameraKey, # ) # start_time = time.time() await self.CameraAutofocus(CameraKey=CameraKey) # end_time = time.time() # elapsed_time = end_time - start_time # logger.info(f"自动对焦耗时 {elapsed_time:.4f} 秒") # await self.EnableCameraPreview( # enable_status=False, msg_type="smart_shooter_enable_preview_status", CameraKey=CameraKey # ) self.msg_type = msg_type print("camera_states", msg_type) """ 执行拍照 """ camera_states, _ = await self.GetCameraInfo(is_send=True, CameraKey=CameraKey) print("camera_states CameraShooter", camera_states) if not camera_states: return False, "请先连接相机" socket, context = self.__create_req() try: low_iso, high_iso = self.getConfigIso(CameraKey=CameraKey) print("LOW_ISO", low_iso) print("HIGH_ISO", high_iso) if str(low_iso).lower() != "auto": await self.setCameraProperty( property="ISO", value=str(low_iso), CameraKey=CameraKey ) else: print("low_iso 等于auto就不设置") req = {} req["msg_type"] = "Request" req["msg_id"] = "Shoot" req["msg_seq_num"] = 1 req["CameraSelection"] = "All" if CameraKey is not None: req["CameraSelection"] = "Single" req["CameraKey"] = CameraKey if goods_art_no != "" and id != 0: # 此处用逗号分割,货号和id,需要在监听部分进行切割保存处理,如果使用下划线或者减号,可能货号中存在对应符号 req["PhotoOrigin"] = f"{goods_art_no},{id}" else: req["PhotoOrigin"] = "" print("发送拍照请求", req) json_msg = self.__send_tcp_message(socket, req) print("CameraShooter", json_msg) msg_result = json_msg.get("msg_result") if not msg_result: msg_send = "拍照失败" message = { "code": 1, "msg": msg_send, "data": None, "msg_type": self.msg_type, "device_status": -1, } await self.sendMessageSocket(message) return False, msg_send msg_send = "拍照成功" message = { "code": 0, "msg": msg_send, "data": None, "msg_type": self.msg_type, "device_status": 2, } await self.sendMessageSocket(message) return True, "拍照成功" except zmq.Again: print("拍照超时") msg_send = "相机未连接或软件未打开" message = { "code": 1, "msg": msg_send, "data": {goods_art_no: goods_art_no, "id": id}, "msg_type": self.msg_type, "device_status": -1, } await self.sendMessageSocket(message) return True, "拍照失败" except Exception as e: print("拍照出错",e) socket.close() context.term() msg_send = "相机未连接或软件未打开" message = { "code": 1, "msg": msg_send, "data": None, "msg_type": self.msg_type, "device_status": -1, } await self.sendMessageSocket(message) return False, msg_send async def asyncMessageListen(self): if self.websocket.client_state.name != "CONNECTED": print("WebSocket连接已断开,停止发送消息") return try: # 使用非阻塞方式检查队列中是否有项目 if not message_queue.empty(): message = message_queue.get_nowait() print("发送消息中。。。。。", message) await self.sendMessageSocket(message) message_queue.task_done() except Exception as e: # 处理可能的异常,如队列为空等 pass def connect_listen(self): print("smart shooter connect_listen START") if self.connect_status == True or self.listen_init == True: return True sub_socket, context = self.__create_listen() print("构建监听", self.connect_status) # 不再需要 self.listen_loop,我们只使用主循环 try: while True: self.listen_init = True if self.stop_listen: break # 1. 阻塞接收 ZMQ 消息 (这是唯一的阻塞点,但在子线程,所以没问题) try: raw = sub_socket.recv() # 这里会阻塞直到有消息或超时 str_msg = raw.decode("utf-8") json_msg = json.loads(str_msg) if json_msg.get("msg_id") == "NetworkPing": continue # 2. 将回调提交到【主事件循环】 if hasattr(self, "main_loop") and self.main_loop: # 非阻塞提交,立即返回 asyncio.run_coroutine_threadsafe( self.callback_listen(json_msg), self.main_loop ) else: print("Error: main_loop not set in SmartShooter") except zmq.Again: # 超时,继续循环 continue except Exception as e: print(f"ZMQ Error: {e}") break finally: self.listen_init = False self.connect_status = False sub_socket.close() context.term() print("smart shooter连接断开")