import json import datetime import random import zmq import asyncio from PIL import Image from io import BytesIO import base64,threading import zmq, sys, time from utils.SingletonType import SingletonType import settings import logging from utils.common import message_queue logger = logging.getLogger(__name__) # 定义为单例模式,避免被重复实例化 class SmartShooter(metaclass=SingletonType): SET_REQ = "tcp://127.0.0.1:54544" LISTEN_REQ = "tcp://127.0.0.1:54543" def __init__(self, websocket_manager): self.msg_type = "smart_shooter" self.websocket_manager = websocket_manager # 是否停止监听 self.stop_listen = False # 相机连接状态 self.connect_status = False # 实时预览状态 self.perview_state = False self.callback_listen = None self.listen_init = None self.websocket = None def __send_tcp_message(self, socket, msg): # await asyncio.sleep(0.01) socket.send_string(json.dumps(msg)) rep = socket.recv() str_msg = rep.decode("utf-8") json_msg = json.loads(str_msg) return json_msg def __create_req(self, time_out=5) -> tuple[zmq.Socket, zmq.Context]: context = zmq.Context() req_socket = context.socket(zmq.REQ) # 设置发送超时为 5000 毫秒(5 秒) req_socket.setsockopt(zmq.RCVTIMEO, time_out * 1000) # 设置接收超时为 5000 毫秒(5 秒) req_socket.setsockopt(zmq.SNDTIMEO, time_out * 1000) req_socket.setsockopt(zmq.LINGER, 0) # 设置为 0 表示不等待未完成的操作 req_socket.connect(self.SET_REQ) return req_socket, context def __create_listen(self) -> tuple[zmq.Socket, zmq.Context]: context = zmq.Context() listen_socket = context.socket(zmq.SUB) listen_socket.setsockopt(zmq.SUBSCRIBE, b"") # 设置发送超时为 5000 毫秒(5 秒) listen_socket.setsockopt(zmq.RCVTIMEO, 4000) # 设置接收超时为 5000 毫秒(5 秒) listen_socket.setsockopt(zmq.SNDTIMEO, 4000) listen_socket.setsockopt(zmq.LINGER, 0) # 设置为 0 表示不等待未完成的操作 listen_socket.connect(self.LISTEN_REQ) return listen_socket, context async def GetCameraProperty(self): '''获取相机属性''' await asyncio.sleep(0.01) """ 实时获取相机信息,是否连接、软件是否被打开 """ socket, context = self.__create_req(time_out=2) try: req = {} req["msg_type"] = "Request" req["msg_id"] = "GetCamera" req["msg_seq_num"] = 0 req["CameraSelection"] = "All" json_msg = self.__send_tcp_message(socket, req) msg_result = json_msg.get("msg_result") if not msg_result: self.connect_status = False msg_send = "相机未连接或软件未打开" return False, msg_send cameraInfo = json_msg.get("CameraInfo") if cameraInfo == None or len(cameraInfo) == 0: self.connect_status = False msg_send = "相机未连接" return False, "相机未连接" # 链接的相机 CameraStatus = False CameraIndex =-1 for cam_idx,item in enumerate(cameraInfo): CameraStatus = item.get("CameraStatus") in ["Ready", "Busy"] if CameraStatus == True: CameraIndex = cam_idx break # CameraStatus = connect_camera.get("CameraStatus") if not CameraStatus: self.connect_status = False msg_send = "相机未连接" return False, msg_send self.connect_status = True msg_send = "相机已连接" return True, cameraInfo[CameraIndex].get("CameraPropertyInfo") except zmq.Again: msg_send = "相机未连接或软件未打开" return False, msg_send except Exception as e: self.connect_status = False socket.close() context.term() msg_send = "相机未连接或软件未打开" return False, msg_send async def GetCameraInfo(self, is_send=True, msg_type=""): await asyncio.sleep(0.01) self.msg_type = msg_type """ 实时获取相机信息,是否连接、软件是否被打开 """ socket, context = self.__create_req(time_out=2) try: req = {} req["msg_type"] = "Request" req["msg_id"] = "GetCamera" req["msg_seq_num"] = 0 req["CameraSelection"] = "All" json_msg = self.__send_tcp_message(socket, req) msg_result = json_msg.get("msg_result") if not msg_result: self.connect_status = False msg_send = "相机未连接或软件未打开" if is_send: message = { "code": 1, "msg": msg_send, "data": msg_result, "msg_type": self.msg_type, "device_status": -1, } await self.websocket_manager.send_personal_message( message, self.websocket ) return False, msg_send cameraInfo = json_msg.get("CameraInfo") if cameraInfo == None or len(cameraInfo) == 0: self.connect_status = False msg_send = "相机未连接" if is_send: message = { "code": 1, "msg": msg_send, "data": msg_result, "msg_type": self.msg_type, "device_status": -1, } await self.websocket_manager.send_personal_message( message, self.websocket ) return False, "相机未连接" # 链接的相机 CameraStatus = any( item.get("CameraStatus") in ["Ready", "Busy"] for item in cameraInfo ) if not CameraStatus: self.connect_status = False msg_send = "相机未连接" if is_send: message = { "code": 1, "msg": msg_send, "data": msg_result, "msg_type": self.msg_type, "device_status": -1, } await self.websocket_manager.send_personal_message( message, self.websocket ) return False, msg_send self.connect_status = True msg_send = "相机已连接" if is_send: message = { "code": 0, "msg": msg_send, "data": msg_result, "msg_type": self.msg_type, "device_status": 2, } await self.websocket_manager.send_personal_message( message, self.websocket ) # print("相机已连接状态信息---->", cameraInfo) return True, "相机已连接" except zmq.Again: print("获取相机信息超时,继续监听...") msg_send = "相机未连接或软件未打开" if is_send: message = { "code": 1, "msg": msg_send, "data": None, "msg_type": self.msg_type, "device_status": -1, } await self.websocket_manager.send_personal_message( message, self.websocket ) return False, msg_send except Exception as e: print("拍照异常", e) self.connect_status = False socket.close() context.term() msg_send = "相机未连接或软件未打开" if is_send: message = { "code": 1, "msg": msg_send, "data": None, "msg_type": self.msg_type, "device_status": -1, } await self.websocket_manager.send_personal_message( message, self.websocket ) return False, msg_send async def SetCameraFPS(self, fps=5): """ 激活相机预览 """ camera_states, _ = await self.GetCameraInfo(is_send=False) if not camera_states: return False, "请先连接相机" try: socket, context = self.__create_req() req = {} req["msg_type"] = "Request" req["msg_id"] = "LiveviewFPS" req["msg_seq_num"] = 0 req["CameraSelection"] = "All" req["CameraLiveviewFPS"] = fps json_msg = self.__send_tcp_message(socket, req) msg_result = json_msg.get("msg_result") if not msg_result: self.perview_state = False return False, "设置失败" return True, "设置失败" except: self.perview_state = False socket.close() context.term() return False, "相机未连接或软件未打开" async def setCameraProperty(self, property="ISO", value=0): # SetProperty camera_states, _ = await self.GetCameraInfo(is_send=False) if not camera_states: return False, "请先连接相机" try: socket, context = self.__create_req() req = {} req["msg_type"] = "Request" req["msg_id"] = "SetProperty" req["msg_seq_num"] = 0 req["CameraSelection"] = "All" req["CameraPropertyType"] = str(property) req["CameraPropertyValue"] = str(value) json_msg = self.__send_tcp_message(socket, req) msg_result = json_msg.get("msg_result") if not msg_result: return False, f"{property}设置失败" return True, f"{property}设置成功" except zmq.Again: print(f"设置{property}超时,继续监听...") except: self.perview_state = False socket.close() context.term() msg_send = "相机未连接或软件未打开" return False, msg_send async def EnableCameraPreview(self, enable_status=True, msg_type=""): self.msg_type = msg_type await self.SetCameraFPS(5) """ 激活相机预览 """ camera_states, _ = await self.GetCameraInfo(is_send=False) if not camera_states: return False, "请先连接相机" try: camera_configs = settings.getSysConfigs( "camera_configs", "iso_config", {"low": 100, "high": 6400, "mode": "un_auto"}, ) low_iso = camera_configs.get("low", 100) high_iso = camera_configs.get("high", 6400) print("LOW_ISO", low_iso) print("HIGH_ISO", high_iso) # 等于auto就不设置 if enable_status == True: if str(high_iso).lower() != "auto": await self.setCameraProperty(property="ISO", value=str(high_iso)) else: print("high_iso 等于auto就不设置") if enable_status == False: if str(low_iso).lower() != "auto": await self.setCameraProperty(property="ISO", value=str(low_iso)) else: print("low_iso 等于auto就不设置") socket, context = self.__create_req() req = {} req["msg_type"] = "Request" req["msg_id"] = "EnableLiveview" req["msg_seq_num"] = 0 req["CameraSelection"] = "All" req["Enable"] = enable_status json_msg = self.__send_tcp_message(socket, req) msg_result = json_msg.get("msg_result") if not msg_result: self.perview_state = False msg_send = "预览启用失败" self.sendSocketMessage( code=0, msg=msg_send, device_status=2, ) return False, "预览启用失败" msg_send = "预览启用成功" if enable_status else "预览关闭成功" message = { "code": 0, "msg": msg_send, "data": None, "msg_type": self.msg_type, "device_status": 2, } await self.websocket_manager.send_personal_message(message, self.websocket) return True, "预览启用成功" if enable_status else "预览关闭成功" except zmq.Again: print("启动预览超时,继续监听...") except: self.perview_state = False socket.close() context.term() msg_send = "相机未连接或软件未打开" message = { "code": 1, "msg": msg_send, "data": None, "msg_type": self.msg_type, "device_status": -1, } await self.websocket_manager.send_personal_message(message, self.websocket) return False, "相机未连接或软件未打开" async def CameraAutofocus(self): """ 相机自动对焦 """ camera_states, _ = await self.GetCameraInfo(is_send=False) print("CameraAutofocus 执行对焦") if not camera_states: return False, "请先连接相机" try: socket, context = self.__create_req() req = {} req["msg_type"] = "Request" req["msg_id"] = "Autofocus" req["msg_seq_num"] = 0 req["CameraSelection"] = "All" json_msg = self.__send_tcp_message(socket, req) print("json_msg", json_msg) msg_result = json_msg.get("msg_result") if not msg_result: msg_send = "对焦失败" message = { "code": 1, "msg": msg_send, "data": None, "msg_type": "smart_shooter_photo_take", "device_status": -1, } await self.websocket_manager.send_personal_message( message, self.websocket ) return False, "对焦失败" return True, "对焦成功" except zmq.Again: print("对焦超时,继续监听...") except: socket.close() context.term() return False, "相机未连接或软件未打开" async def CameraShooter(self, msg_type="", goods_art_no="", id=0, is_af=False,delay=1.5): # 这里延迟一秒钟 等待前置命令完成 await asyncio.sleep(delay) # 对焦混用 if is_af: await self.CameraAutofocus() self.msg_type = msg_type print("camera_states", msg_type) """ 执行拍照 """ camera_states, _ = await self.GetCameraInfo(is_send=True) print("camera_states CameraShooter", camera_states) if not camera_states: return False, "请先连接相机" try: socket, context = self.__create_req() req = {} req["msg_type"] = "Request" req["msg_id"] = "Shoot" req["msg_seq_num"] = 1 req["CameraSelection"] = "All" if goods_art_no != "" and id != 0: # 此处用逗号分割,货号和id,需要在监听部分进行切割保存处理,如果使用下划线或者减号,可能货号中存在对应符号 req["PhotoOrigin"] = f"{goods_art_no},{id}" else: req["PhotoOrigin"] = "" json_msg = self.__send_tcp_message(socket, req) print("CameraShooter", json_msg) msg_result = json_msg.get("msg_result") if not msg_result: msg_send = "拍照失败" message = { "code": 1, "msg": msg_send, "data": None, "msg_type": self.msg_type, "device_status": -1, } await self.websocket_manager.send_personal_message( message, self.websocket ) return False, msg_send msg_send = "拍照成功" message = { "code": 0, "msg": msg_send, "data": None, "msg_type": self.msg_type, "device_status": 2, } msg_send = "相机未连接或软件未打开" message = { "code": 1, "msg": msg_send, "data": None, "msg_type": self.msg_type, "device_status": -1, } await self.websocket_manager.send_personal_message(message, self.websocket) return True, "拍照成功" except zmq.Again: msg_send = "相机未连接或软件未打开" message = { "code": 1, "msg": msg_send, "data": {goods_art_no: goods_art_no, "id": id}, "msg_type": self.msg_type, "device_status": -1, } await self.websocket_manager.send_personal_message(message, self.websocket) return True, "拍照失败" except: socket.close() context.term() msg_send = "相机未连接或软件未打开" message = { "code": 1, "msg": msg_send, "data": None, "msg_type": self.msg_type, "device_status": -1, } await self.websocket_manager.send_personal_message(message, self.websocket) return False, msg_send async def asyncMessageListen(self): if self.websocket.client_state.name != "CONNECTED": print("WebSocket连接已断开,停止发送消息") return try: # 使用非阻塞方式检查队列中是否有项目 if not message_queue.empty(): message = message_queue.get_nowait() print("发送消息中。。。。。", message) await self.websocket_manager.send_personal_message( message, self.websocket ) message_queue.task_done() except Exception as e: # 处理可能的异常,如队列为空等 pass def connect_listen(self): print("smart shooter connect_listen", self.connect_status, self.listen_init) if self.connect_status == True or self.listen_init == True: return True # 发起监听 sub_socket, context = self.__create_listen() print("构建监听", self.connect_status) logger.info("构建监听,%s", self.connect_status) try: # 尝试获取当前线程的事件循环 try: self.listen_loop = asyncio.get_running_loop() except RuntimeError: self.listen_loop = asyncio.new_event_loop() except RuntimeError: # 如果当前线程没有事件循环,则创建一个新的 self.listen_loop = asyncio.new_event_loop() asyncio.set_event_loop(self.listen_loop) while True: self.listen_init = True if self.callback_listen == None: continue try: # 创建任务并立即运行,设置超时以避免阻塞 # future = asyncio.ensure_future(self.asyncMessageListen()) # 运行任务,但设置超时以避免无限等待 # 使用一致的事件循环运行异步任务 # asyncio.run(self.asyncMessageListen()) # 运行任务,但设置超时以避免无限等待 self.listen_loop.run_until_complete(self.asyncMessageListen()) except asyncio.TimeoutError: # 超时是正常的,表示没有消息需要处理 pass except Exception as e: # 处理其他可能的异常 print(f"Error handling async message-asyncMessageListen: {e}") # camera_states, camera_msg = await self.GetCameraInfo(is_send=False) # if not camera_states: # print("相机未连接回调打印", camera_states, camera_msg) # await asyncio.sleep(0.01) # 等待相机连接 # continue if self.stop_listen: break try: self.connect_status = True raw = sub_socket.recv() str_msg = raw.decode("utf-8") json_msg = json.loads(str_msg) if json_msg["msg_id"] == "NetworkPing": continue # self.callback_listen(json_msg) asyncio.run(self.callback_listen(json_msg)) except zmq.Again: print("接收超时,继续监听...") logger.info("接收超时,继续监听...") continue except Exception as e: self.connect_status = False print(f"发生错误: {e}") break self.listen_init = False self.connect_status = False self.stop_listen = False sub_socket.close() context.term() print("smart shooter连接断开")