| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287 |
- import json
- import datetime
- import random
- import zmq
- import asyncio
- from PIL import Image
- from io import BytesIO
- import base64
- import zmq, sys
- from utils.SingletonType import SingletonType
- from mcu.BaseClass import BaseClass
- # 定义为单例模式,避免被重复实例化
- class SmartShooter(BaseClass,metaclass=SingletonType):
- SET_REQ = "tcp://127.0.0.1:54544"
- LISTEN_REQ = "tcp://127.0.0.1:54543"
- def __init__(self, websocket_manager):
- super().__init__(websocket_manager)
- self.msg_type = "smart_shooter"
- self.websocket_manager = websocket_manager
- # 是否停止监听
- self.stop_listen = False
- # 相机连接状态
- self.connect_status = False
- # 实时预览状态
- self.perview_state = False
- self.callback_listen = None
- # loop = asyncio.get_event_loop()
- # loop.create_task(
- # self.connect_listen(),
- # name="connect_listen",
- # )
- def __send_tcp_message(self, socket, msg):
- # await asyncio.sleep(0.01)
- socket.send_string(json.dumps(msg))
- rep = socket.recv()
- str_msg = rep.decode("utf-8")
- json_msg = json.loads(str_msg)
- return json_msg
- def __create_req(self) -> tuple[zmq.Socket, zmq.Context]:
- context = zmq.Context()
- req_socket = context.socket(zmq.REQ)
- # 设置发送超时为 5000 毫秒(5 秒)
- req_socket.setsockopt(zmq.RCVTIMEO, 5000)
- # 设置接收超时为 5000 毫秒(5 秒)
- req_socket.setsockopt(zmq.SNDTIMEO, 5000)
- req_socket.setsockopt(zmq.LINGER, 0) # 设置为 0 表示不等待未完成的操作
- req_socket.connect(self.SET_REQ)
- return req_socket, context
- def __create_listen(self) -> tuple[zmq.Socket, zmq.Context]:
- context = zmq.Context()
- listen_socket = context.socket(zmq.SUB)
- listen_socket.setsockopt(zmq.SUBSCRIBE, b"")
- # 设置发送超时为 5000 毫秒(5 秒)
- listen_socket.setsockopt(zmq.RCVTIMEO, 5000)
- # 设置接收超时为 5000 毫秒(5 秒)
- listen_socket.setsockopt(zmq.SNDTIMEO, 5000)
- listen_socket.setsockopt(zmq.LINGER, 0) # 设置为 0 表示不等待未完成的操作
- listen_socket.connect(self.LISTEN_REQ)
- return listen_socket, context
- async def GetCameraInfo(self,is_send=True,msg_type=""):
- await asyncio.sleep(0.01)
- self.msg_type = msg_type
- '''
- 实时获取相机信息,是否连接、软件是否被打开
- '''
- try:
- socket, context = self.__create_req()
- req = {}
- req["msg_type"] = "Request"
- req["msg_id"] = "GetCamera"
- req["msg_seq_num"] = 0
- req["CameraSelection"] = "All"
- json_msg = self.__send_tcp_message(socket, req)
- msg_result = json_msg.get("msg_result")
- if not msg_result:
- self.connect_status = False
- msg_send = "相机未连接或软件未打开"
- # self.websocket_manager
- if is_send:
- self.sendSocketMessage(
- code=0,
- msg=msg_send,
- device_status=-1,
- )
- return False, msg_send
- cameraInfo = json_msg.get("CameraInfo")
- if cameraInfo == None or len(cameraInfo) == 0:
- self.connect_status = False
- msg_send = "相机未连接"
- if is_send:
- self.sendSocketMessage(
- code=0,
- msg=msg_send,
- device_status=-1,
- )
- return False, "相机未连接"
- # 链接的相机
- connect_camera = cameraInfo[0]
- CameraStatus = connect_camera.get("CameraStatus")
- if CameraStatus != "Ready":
- self.connect_status = False
- msg_send = "相机未连接"
- if is_send:
- self.sendSocketMessage(
- code=0,
- msg=msg_send,
- device_status=-1,
- )
- return False, msg_send
- self.connect_status = True
- msg_send = "相机已连接"
- if is_send:
- self.sendSocketMessage(
- code=0,
- msg=msg_send,
- device_status=2,
- )
- return True, "相机已连接"
- except:
- self.connect_status = False
- socket.close()
- context.term()
- msg_send = "相机未连接或软件未打开"
- if is_send:
- self.sendSocketMessage(
- code=0,
- msg=msg_send,
- device_status=-1,
- )
- return False, msg_send
- async def EnableCameraPreview(self, enable_status=True, msg_type=""):
- self.msg_type = msg_type
- """
- 激活相机预览
- """
- camera_states, _ = await self.GetCameraInfo(is_send=False)
- if not camera_states:
- return False, "请先连接相机"
- try:
- socket, context = self.__create_req()
- req = {}
- req["msg_type"] = "Request"
- req["msg_id"] = "EnableLiveview"
- req["msg_seq_num"] = 0
- req["CameraSelection"] = "All"
- req["Enable"] = enable_status
- json_msg = self.__send_tcp_message(socket, req)
- msg_result = json_msg.get("msg_result")
- if not msg_result:
- self.perview_state = False
- msg_send = "预览启用失败"
- self.sendSocketMessage(
- code=0,
- msg=msg_send,
- device_status=2,
- )
- return False, "预览启用失败"
- msg_send = "预览启用成功" if enable_status else "预览关闭成功"
- self.sendSocketMessage(
- code=0,
- msg=msg_send,
- device_status=2,
- )
- return True, "预览启用成功" if enable_status else "预览关闭成功"
- except:
- self.perview_state = False
- socket.close()
- context.term()
- msg_send = "相机未连接或软件未打开"
- self.sendSocketMessage(
- code=0,
- msg=msg_send,
- device_status=2,
- )
- return False, "相机未连接或软件未打开"
- async def CameraAutofocus(self):
- """
- 相机自动对焦
- """
- camera_states, _ = await self.GetCameraInfo(is_send=False)
- if not camera_states:
- return False, "请先连接相机"
- try:
- socket, context = self.__create_req()
- req = {}
- req["msg_type"] = "Request"
- req["msg_id"] = "Autofocus"
- req["msg_seq_num"] = 0
- req["CameraSelection"] = "All"
- json_msg = self.__send_tcp_message(socket, req)
- print("json_msg", json_msg)
- msg_result = json_msg.get("msg_result")
- if not msg_result:
- return False, "对焦失败"
- return True, "对焦成功"
- except:
- socket.close()
- context.term()
- return False, "相机未连接或软件未打开"
- async def CameraShooter(self, msg_type=""):
- self.msg_type = msg_type
- """
- 执行拍照
- """
- camera_states, _ = await self.GetCameraInfo(is_send=False)
- if not camera_states:
- return False, "请先连接相机"
- try:
- socket, context = self.__create_req()
- req = {}
- req["msg_type"] = "Request"
- req["msg_id"] = "Shoot"
- req["msg_seq_num"] = 0
- req["CameraSelection"] = "All"
- json_msg = self.__send_tcp_message(socket, req)
- msg_result = json_msg.get("msg_result")
- if not msg_result:
- msg_send = "拍照失败"
- self.sendSocketMessage(
- code=0,
- msg=msg_send,
- device_status=2,
- )
- return False, msg_send
- msg_send = "拍照成功"
- self.sendSocketMessage(
- code=0,
- msg=msg_send,
- device_status=2,
- )
- return True, "拍照成功"
- except:
- socket.close()
- context.term()
- msg_send = "相机未连接或软件未打开"
- self.sendSocketMessage(
- code=0,
- msg=msg_send,
- device_status=2,
- )
- return False, msg_send
- async def connect_listen(self):
- if self.connect_status == True:
- return True
- # 发起监听
- sub_socket, context = self.__create_listen()
- print("构建监听")
- while True:
- await asyncio.sleep(0.01)
- if self.callback_listen == None:
- continue
- camera_states, camera_msg = await self.GetCameraInfo(is_send=False)
- if not camera_states:
- print("相机未连接")
- await asyncio.sleep(5) # 等待相机连接
- continue
- if self.stop_listen:
- break
- try:
- self.connect_status = True
- raw = sub_socket.recv()
- str_msg = raw.decode("utf-8")
- json_msg = json.loads(str_msg)
- if json_msg["msg_id"] == "NetworkPing":
- continue
- await self.callback_listen(json_msg)
- except zmq.Again:
- print("接收超时,继续监听...")
- continue
- except Exception as e:
- self.connect_status = False
- print(f"发生错误: {e}")
- break
- sub_socket.close()
- context.term()
- print("smart shooter连接断开")
|