import json import datetime import random import zmq import asyncio from PIL import Image from io import BytesIO import base64 import zmq, sys from utils.SingletonType import SingletonType # 定义为单例模式,避免被重复实例化 class SmartShooter(metaclass=SingletonType): SET_REQ = "tcp://127.0.0.1:54544" LISTEN_REQ = "tcp://127.0.0.1:54543" def __init__(self, websocket_manager): self.msg_type = "smart_shooter" self.websocket_manager = websocket_manager # 是否停止监听 self.stop_listen = False # 相机连接状态 self.connect_status = False # self.preview_queue = asyncio.Queue() # 实时预览状态 self.perview_state = False self.callback_listen = None self.listen_init = False self.websocket = None # loop = asyncio.get_event_loop() # loop.create_task( # self.connect_listen(), # name="connect_listen", # ) def __send_tcp_message(self, socket, msg): # await asyncio.sleep(0.01) socket.send_string(json.dumps(msg)) rep = socket.recv() str_msg = rep.decode("utf-8") json_msg = json.loads(str_msg) return json_msg def __create_req(self, time_out=5) -> tuple[zmq.Socket, zmq.Context]: context = zmq.Context() req_socket = context.socket(zmq.REQ) # 设置发送超时为 5000 毫秒(5 秒) req_socket.setsockopt(zmq.RCVTIMEO, time_out * 1000) # 设置接收超时为 5000 毫秒(5 秒) req_socket.setsockopt(zmq.SNDTIMEO, time_out * 1000) req_socket.setsockopt(zmq.LINGER, 0) # 设置为 0 表示不等待未完成的操作 req_socket.connect(self.SET_REQ) return req_socket, context def __create_listen(self) -> tuple[zmq.Socket, zmq.Context]: context = zmq.Context() listen_socket = context.socket(zmq.SUB) listen_socket.setsockopt(zmq.SUBSCRIBE, b"") # 设置发送超时为 5000 毫秒(5 秒) listen_socket.setsockopt(zmq.RCVTIMEO, 4000) # 设置接收超时为 5000 毫秒(5 秒) listen_socket.setsockopt(zmq.SNDTIMEO, 4000) listen_socket.setsockopt(zmq.LINGER, 0) # 设置为 0 表示不等待未完成的操作 listen_socket.connect(self.LISTEN_REQ) return listen_socket, context async def GetCameraInfo(self, is_send=True, msg_type=""): await asyncio.sleep(0.01) self.msg_type = msg_type """ 实时获取相机信息,是否连接、软件是否被打开 """ socket, context = self.__create_req(time_out=2) try: req = {} req["msg_type"] = "Request" req["msg_id"] = "GetCamera" req["msg_seq_num"] = 0 req["CameraSelection"] = "All" json_msg = self.__send_tcp_message(socket, req) msg_result = json_msg.get("msg_result") if not msg_result: self.connect_status = False msg_send = "相机未连接或软件未打开" if is_send: message = { "code": 1, "msg": msg_send, "data": msg_result, "msg_type": self.msg_type, "device_status": -1, } await self.websocket_manager.send_personal_message( message, self.websocket ) return False, msg_send cameraInfo = json_msg.get("CameraInfo") if cameraInfo == None or len(cameraInfo) == 0: self.connect_status = False msg_send = "相机未连接" if is_send: message = { "code": 1, "msg": msg_send, "data": msg_result, "msg_type": self.msg_type, "device_status": -1, } await self.websocket_manager.send_personal_message( message, self.websocket ) return False, "相机未连接" # 链接的相机 # connect_camera = cameraInfo CameraStatus = any( item.get("CameraStatus") in ["Ready", "Busy"] for item in cameraInfo ) # CameraStatus = connect_camera.get("CameraStatus") if not CameraStatus: self.connect_status = False msg_send = "相机未连接" if is_send: message = { "code": 1, "msg": msg_send, "data": msg_result, "msg_type": self.msg_type, "device_status": -1, } await self.websocket_manager.send_personal_message( message, self.websocket ) return False, msg_send self.connect_status = True msg_send = "相机已连接" if is_send: message = { "code": 0, "msg": msg_send, "data": msg_result, "msg_type": self.msg_type, "device_status": 2, } await self.websocket_manager.send_personal_message( message, self.websocket ) return True, "相机已连接" except zmq.Again: print("获取相机信息超时,继续监听...") msg_send = "相机未连接或软件未打开" if is_send: message = { "code": 1, "msg": msg_send, "data": None, "msg_type": self.msg_type, "device_status": -1, } await self.websocket_manager.send_personal_message( message, self.websocket ) return False, msg_send except Exception as e: print("拍照异常", e) self.connect_status = False socket.close() context.term() msg_send = "相机未连接或软件未打开" if is_send: message = { "code": 1, "msg": msg_send, "data": None, "msg_type": self.msg_type, "device_status": -1, } await self.websocket_manager.send_personal_message( message, self.websocket ) return False, msg_send async def SetCameraFPS(self, fps=5): """ 激活相机预览 """ camera_states, _ = await self.GetCameraInfo(is_send=False) if not camera_states: return False, "请先连接相机" try: socket, context = self.__create_req() req = {} req["msg_type"] = "Request" req["msg_id"] = "LiveviewFPS" req["msg_seq_num"] = 0 req["CameraSelection"] = "All" req["CameraLiveviewFPS"] = fps json_msg = self.__send_tcp_message(socket, req) msg_result = json_msg.get("msg_result") if not msg_result: self.perview_state = False return False, "设置失败" return True, "设置失败" except: self.perview_state = False socket.close() context.term() return False, "相机未连接或软件未打开" async def EnableCameraPreview(self, enable_status=True, msg_type=""): self.msg_type = msg_type await self.SetCameraFPS(5) """ 激活相机预览 """ camera_states, _ = await self.GetCameraInfo(is_send=False) if not camera_states: return False, "请先连接相机" try: socket, context = self.__create_req() req = {} req["msg_type"] = "Request" req["msg_id"] = "EnableLiveview" req["msg_seq_num"] = 0 req["CameraSelection"] = "All" req["Enable"] = enable_status json_msg = self.__send_tcp_message(socket, req) msg_result = json_msg.get("msg_result") if not msg_result: self.perview_state = False msg_send = "预览启用失败" self.sendSocketMessage( code=0, msg=msg_send, device_status=2, ) return False, "预览启用失败" msg_send = "预览启用成功" if enable_status else "预览关闭成功" message = { "code": 0, "msg": msg_send, "data": None, "msg_type": self.msg_type, "device_status": 2, } await self.websocket_manager.send_personal_message(message, self.websocket) return True, "预览启用成功" if enable_status else "预览关闭成功" except zmq.Again: print("启动预览超时,继续监听...") except: self.perview_state = False socket.close() context.term() msg_send = "相机未连接或软件未打开" message = { "code": 1, "msg": msg_send, "data": None, "msg_type": self.msg_type, "device_status": -1, } await self.websocket_manager.send_personal_message(message, self.websocket) return False, "相机未连接或软件未打开" async def CameraAutofocus(self): """ 相机自动对焦 """ camera_states, _ = await self.GetCameraInfo(is_send=False) print("CameraAutofocus 执行对焦") if not camera_states: return False, "请先连接相机" try: socket, context = self.__create_req() req = {} req["msg_type"] = "Request" req["msg_id"] = "Autofocus" req["msg_seq_num"] = 0 req["CameraSelection"] = "All" json_msg = self.__send_tcp_message(socket, req) print("json_msg", json_msg) msg_result = json_msg.get("msg_result") if not msg_result: return False, "对焦失败" return True, "对焦成功" except zmq.Again: print("对焦超时,继续监听...") except: socket.close() context.term() return False, "相机未连接或软件未打开" async def CameraShooter(self, msg_type="", goods_art_no="", id=0, is_af=False): # 对焦混用 if is_af: await self.CameraAutofocus() self.msg_type = msg_type print("camera_states", msg_type) """ 执行拍照 """ camera_states, _ = await self.GetCameraInfo(is_send=True) print("camera_states CameraShooter", camera_states) if not camera_states: return False, "请先连接相机" try: socket, context = self.__create_req() req = {} req["msg_type"] = "Request" req["msg_id"] = "Shoot" req["msg_seq_num"] = 1 req["CameraSelection"] = "All" if goods_art_no != "" and id != 0: # 此处用逗号分割,货号和id,需要在监听部分进行切割保存处理,如果使用下划线或者减号,可能货号中存在对应符号 req["PhotoOrigin"] = f"{goods_art_no},{id}" else: req["PhotoOrigin"] = "" json_msg = self.__send_tcp_message(socket, req) print("CameraShooter", json_msg) msg_result = json_msg.get("msg_result") if not msg_result: msg_send = "拍照失败" message = { "code": 1, "msg": msg_send, "data": None, "msg_type": self.msg_type, "device_status": -1, } await self.websocket_manager.send_personal_message( message, self.websocket ) return False, msg_send msg_send = "拍照成功" message = { "code": 0, "msg": msg_send, "data": None, "msg_type": self.msg_type, "device_status": 2, } msg_send = "相机未连接或软件未打开" message = { "code": 1, "msg": msg_send, "data": None, "msg_type": self.msg_type, "device_status": -1, } await self.websocket_manager.send_personal_message(message, self.websocket) return True, "拍照成功" except zmq.Again: msg_send = "相机未连接或软件未打开" message = { "code": 1, "msg": msg_send, "data": {goods_art_no: goods_art_no, "id": id}, "msg_type": self.msg_type, "device_status": -1, } await self.websocket_manager.send_personal_message(message, self.websocket) return True, "拍照失败" except: socket.close() context.term() msg_send = "相机未连接或软件未打开" message = { "code": 1, "msg": msg_send, "data": None, "msg_type": self.msg_type, "device_status": -1, } await self.websocket_manager.send_personal_message(message, self.websocket) return False, msg_send async def connect_listen(self): print("smart shooter connect_listen", self.connect_status, self.listen_init) if self.connect_status == True or self.listen_init == True: return True # 发起监听 sub_socket, context = self.__create_listen() print("构建监听", self.connect_status) while True: self.listen_init = True await asyncio.sleep(0.01) if self.callback_listen == None: continue camera_states, camera_msg = await self.GetCameraInfo(is_send=False) if not camera_states: print("相机未连接") await asyncio.sleep(0.01) # 等待相机连接 continue if self.stop_listen: break try: self.connect_status = True raw = sub_socket.recv() str_msg = raw.decode("utf-8") json_msg = json.loads(str_msg) if json_msg["msg_id"] == "NetworkPing": continue await self.callback_listen(json_msg) except zmq.Again: print("接收超时,继续监听...") continue except Exception as e: self.connect_status = False print(f"发生错误: {e}") break self.listen_init = False sub_socket.close() context.term() print("smart shooter连接断开")