import json import datetime import random import zmq import asyncio from PIL import Image from io import BytesIO import base64 import zmq, sys from utils.SingletonType import SingletonType from mcu.BaseClass import BaseClass # 定义为单例模式,避免被重复实例化 class SmartShooter(BaseClass,metaclass=SingletonType): SET_REQ = "tcp://127.0.0.1:54544" LISTEN_REQ = "tcp://127.0.0.1:54543" def __init__(self, websocket_manager): super().__init__(websocket_manager) self.msg_type = "smart_shooter" self.websocket_manager = websocket_manager # 是否停止监听 self.stop_listen = False # 相机连接状态 self.connect_status = False # 实时预览状态 self.perview_state = False self.callback_listen = None # loop = asyncio.get_event_loop() # loop.create_task( # self.connect_listen(), # name="connect_listen", # ) def __send_tcp_message(self, socket, msg): # await asyncio.sleep(0.01) socket.send_string(json.dumps(msg)) rep = socket.recv() str_msg = rep.decode("utf-8") json_msg = json.loads(str_msg) return json_msg def __create_req(self) -> tuple[zmq.Socket, zmq.Context]: context = zmq.Context() req_socket = context.socket(zmq.REQ) # 设置发送超时为 5000 毫秒(5 秒) req_socket.setsockopt(zmq.RCVTIMEO, 5000) # 设置接收超时为 5000 毫秒(5 秒) req_socket.setsockopt(zmq.SNDTIMEO, 5000) req_socket.setsockopt(zmq.LINGER, 0) # 设置为 0 表示不等待未完成的操作 req_socket.connect(self.SET_REQ) return req_socket, context def __create_listen(self) -> tuple[zmq.Socket, zmq.Context]: context = zmq.Context() listen_socket = context.socket(zmq.SUB) listen_socket.setsockopt(zmq.SUBSCRIBE, b"") # 设置发送超时为 5000 毫秒(5 秒) listen_socket.setsockopt(zmq.RCVTIMEO, 5000) # 设置接收超时为 5000 毫秒(5 秒) listen_socket.setsockopt(zmq.SNDTIMEO, 5000) listen_socket.setsockopt(zmq.LINGER, 0) # 设置为 0 表示不等待未完成的操作 listen_socket.connect(self.LISTEN_REQ) return listen_socket, context async def GetCameraInfo(self,is_send=True,msg_type=""): await asyncio.sleep(0.01) self.msg_type = msg_type ''' 实时获取相机信息,是否连接、软件是否被打开 ''' try: socket, context = self.__create_req() req = {} req["msg_type"] = "Request" req["msg_id"] = "GetCamera" req["msg_seq_num"] = 0 req["CameraSelection"] = "All" json_msg = self.__send_tcp_message(socket, req) msg_result = json_msg.get("msg_result") if not msg_result: self.connect_status = False msg_send = "相机未连接或软件未打开" # self.websocket_manager if is_send: self.sendSocketMessage( code=0, msg=msg_send, device_status=-1, ) return False, msg_send cameraInfo = json_msg.get("CameraInfo") if cameraInfo == None or len(cameraInfo) == 0: self.connect_status = False msg_send = "相机未连接" if is_send: self.sendSocketMessage( code=0, msg=msg_send, device_status=-1, ) return False, "相机未连接" # 链接的相机 connect_camera = cameraInfo[0] CameraStatus = connect_camera.get("CameraStatus") if CameraStatus != "Ready": self.connect_status = False msg_send = "相机未连接" if is_send: self.sendSocketMessage( code=0, msg=msg_send, device_status=-1, ) return False, msg_send self.connect_status = True msg_send = "相机已连接" if is_send: self.sendSocketMessage( code=0, msg=msg_send, device_status=2, ) return True, "相机已连接" except: self.connect_status = False socket.close() context.term() msg_send = "相机未连接或软件未打开" if is_send: self.sendSocketMessage( code=0, msg=msg_send, device_status=-1, ) return False, msg_send async def EnableCameraPreview(self, enable_status=True, msg_type=""): self.msg_type = msg_type """ 激活相机预览 """ camera_states, _ = await self.GetCameraInfo(is_send=False) if not camera_states: return False, "请先连接相机" try: socket, context = self.__create_req() req = {} req["msg_type"] = "Request" req["msg_id"] = "EnableLiveview" req["msg_seq_num"] = 0 req["CameraSelection"] = "All" req["Enable"] = enable_status json_msg = self.__send_tcp_message(socket, req) msg_result = json_msg.get("msg_result") if not msg_result: self.perview_state = False msg_send = "预览启用失败" self.sendSocketMessage( code=0, msg=msg_send, device_status=2, ) return False, "预览启用失败" msg_send = "预览启用成功" if enable_status else "预览关闭成功" self.sendSocketMessage( code=0, msg=msg_send, device_status=2, ) return True, "预览启用成功" if enable_status else "预览关闭成功" except: self.perview_state = False socket.close() context.term() msg_send = "相机未连接或软件未打开" self.sendSocketMessage( code=0, msg=msg_send, device_status=2, ) return False, "相机未连接或软件未打开" async def CameraAutofocus(self): """ 相机自动对焦 """ camera_states, _ = await self.GetCameraInfo(is_send=False) if not camera_states: return False, "请先连接相机" try: socket, context = self.__create_req() req = {} req["msg_type"] = "Request" req["msg_id"] = "Autofocus" req["msg_seq_num"] = 0 req["CameraSelection"] = "All" json_msg = self.__send_tcp_message(socket, req) print("json_msg", json_msg) msg_result = json_msg.get("msg_result") if not msg_result: return False, "对焦失败" return True, "对焦成功" except: socket.close() context.term() return False, "相机未连接或软件未打开" async def CameraShooter(self, msg_type=""): self.msg_type = msg_type """ 执行拍照 """ camera_states, _ = await self.GetCameraInfo(is_send=False) if not camera_states: return False, "请先连接相机" try: socket, context = self.__create_req() req = {} req["msg_type"] = "Request" req["msg_id"] = "Shoot" req["msg_seq_num"] = 0 req["CameraSelection"] = "All" json_msg = self.__send_tcp_message(socket, req) msg_result = json_msg.get("msg_result") if not msg_result: msg_send = "拍照失败" self.sendSocketMessage( code=0, msg=msg_send, device_status=2, ) return False, msg_send msg_send = "拍照成功" self.sendSocketMessage( code=0, msg=msg_send, device_status=2, ) return True, "拍照成功" except: socket.close() context.term() msg_send = "相机未连接或软件未打开" self.sendSocketMessage( code=0, msg=msg_send, device_status=2, ) return False, msg_send async def connect_listen(self): if self.connect_status == True: return True # 发起监听 sub_socket, context = self.__create_listen() print("构建监听") while True: await asyncio.sleep(0.01) if self.callback_listen == None: continue camera_states, camera_msg = await self.GetCameraInfo(is_send=False) if not camera_states: print("相机未连接") await asyncio.sleep(5) # 等待相机连接 continue if self.stop_listen: break try: self.connect_status = True raw = sub_socket.recv() str_msg = raw.decode("utf-8") json_msg = json.loads(str_msg) if json_msg["msg_id"] == "NetworkPing": continue await self.callback_listen(json_msg) except zmq.Again: print("接收超时,继续监听...") continue except Exception as e: self.connect_status = False print(f"发生错误: {e}") break sub_socket.close() context.term() print("smart shooter连接断开")