smart_shooter_class.py 1.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051
  1. import json
  2. import datetime
  3. import zmq
  4. import asyncio
  5. from PIL import Image
  6. from io import BytesIO
  7. import base64
  8. import zmq, sys
  9. class SmartShooter:
  10. SET_REQ = "tcp://127.0.0.1:54544"
  11. LISTEN_REQ = "tcp://127.0.0.1:54543"
  12. def send_tcp_message(self, socket, code):
  13. exit(1)
  14. req = {}
  15. req["msg_type"] = "Request"
  16. req["msg_id"] = "ActivateLicense"
  17. req["msg_seq_num"] = 0
  18. req["ActivationCode"] = code
  19. socket.send_string(json.dumps(req))
  20. rep = socket.recv()
  21. str_msg = rep.decode("utf-8")
  22. json_msg = json.loads(str_msg)
  23. return json_msg["msg_result"]
  24. def connect_send(self):
  25. context = zmq.Context()
  26. req_socket = context.socket(zmq.REQ)
  27. req_socket.connect(self.SET_REQ)
  28. if self.send_tcp_message(req_socket, True):
  29. print("License activated")
  30. else:
  31. print("Failed to activate license", file=sys.stderr)
  32. def connect_listen(self):
  33. context = zmq.Context()
  34. sub_socket = context.socket(zmq.SUB)
  35. sub_socket.setsockopt(zmq.SUBSCRIBE, b"")
  36. sub_socket.connect(self.LISTEN_REQ)
  37. while True:
  38. raw = sub_socket.recv()
  39. str_msg = raw.decode("utf-8")
  40. json_msg = json.loads(str_msg)
  41. # if args.nopings and json_msg["msg_id"] == "NetworkPing":
  42. # continue
  43. # print("{0}: {1}".format(datetime.datetime.now(), json_msg["msg_id"]))
  44. # if not args.quiet:
  45. # print(str_msg)