temp.py 1.2 KB

1234567891011121314151617181920212223242526272829303132333435363738394041
  1. import zmq,json
  2. # def __send_tcp_message(socket, msg):
  3. # socket.send_string(json.dumps(msg, ensure_ascii=False))
  4. # rep = socket.recv()
  5. # str_msg = rep.decode("utf-8")
  6. # json_msg = json.loads(str_msg)
  7. # return json_msg
  8. # LISTEN_REQ = "tcp://127.0.0.1:54543"
  9. # SET_REQ = "tcp://127.0.0.1:54544"
  10. # context = zmq.Context()
  11. # req_socket = context.socket(zmq.REQ)
  12. # # 设置发送超时为 5000 毫秒(5 秒)
  13. # req_socket.setsockopt(zmq.RCVTIMEO, 2 * 1000)
  14. # # 设置接收超时为 5000 毫秒(5 秒)
  15. # req_socket.setsockopt(zmq.SNDTIMEO, 2 * 1000)
  16. # req_socket.setsockopt(zmq.LINGER, 0) # 设置为 0 表示不等待未完成的操作
  17. # req_socket.connect(SET_REQ)
  18. # req = {}
  19. # req["msg_type"] = "Request"
  20. # req["msg_id"] = "GetCamera"
  21. # req["msg_seq_num"] = 0
  22. # req["CameraSelection"] = "Single"
  23. # req["CameraKey"] = "Canon Inc.|Canon EOS 650D|12"
  24. # json_msg = __send_tcp_message(req_socket,req)
  25. # cameraInfo = json_msg.get("CameraInfo")
  26. # print("cameraInfo",json_msg)
  27. import zmq, json
  28. import asyncio, settings
  29. # # ... existing code ...
  30. from mcu.capture.smart_shooter_class import SmartShooter
  31. async def main():
  32. sm = SmartShooter(None)
  33. await sm.GetCameraInfo()
  34. if __name__ == "__main__":
  35. asyncio.run(main())