mcu_test_2.py 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131
  1. import random
  2. import time
  3. import serial
  4. import serial.tools.list_ports
  5. from mcu.SerialIns import SerialIns
  6. import threading
  7. class Main():
  8. def __init__(self):
  9. port_name = self.list_serial_ports()
  10. if not port_name:
  11. return
  12. self.last_value = 0
  13. self.total_n = 0
  14. self.serial_ins = SerialIns(port_name=port_name, baud=115200)
  15. def list_serial_ports(self):
  16. # 获取所有可用串口列表
  17. ports = serial.tools.list_ports.comports()
  18. # 遍历所有端口并打印信息
  19. for port in ports:
  20. print(f"设备: {port.device}")
  21. print(f"名称: {port.name}")
  22. print(f"描述: {port.description}")
  23. print(f"硬件ID: {port.hwid}")
  24. print(f"制造商: {port.manufacturer}")
  25. print(f"产品ID: {port.product}")
  26. print(f"序列号: {port.serial_number}")
  27. print("-" * 40)
  28. if "CH340" in port.description:
  29. return port.name
  30. return False
  31. def encapsulation_data(self, data, len_data, data_magnification=1):
  32. # data_magnification 数据放大倍数,或缩小倍数,默认为1
  33. data = int(data * data_magnification)
  34. if len_data == 1:
  35. return [0xFF & data]
  36. elif len_data == 2:
  37. return [0xFF & data >> 8, 0xFF & data]
  38. elif len_data == 4:
  39. return [0xFF & data >> 24, 0xFF & data >> 16, 0xFF & data >> 8, 0xFF & data]
  40. def run(self):
  41. t = threading.Thread(target=self.print_all)
  42. t.start()
  43. n = 0
  44. value = 0
  45. while True:
  46. value += 1
  47. # time.sleep(0.02)
  48. v_t = random.randint(1, 10)
  49. print("倒计时 v_t:{}".format(v_t))
  50. time.sleep(0.02)
  51. n += 1
  52. send_data = []
  53. send_data.extend([0x01, 0x07, 0x01, 0x00, value, 0x05, 0x78, 0x01, 0x90, 0x00, 0x64, 0x00, 0x01, 0x00])
  54. send_data.extend(self.encapsulation_data(data=n, len_data=4))
  55. # send_data = [0x5a, 0x01]
  56. if value == 200:
  57. value = 0
  58. self.serial_ins.write_cmd(send_data)
  59. _send_data = self.serial_ins.change_hex_to_int(send_data)
  60. _send_data = _send_data.strip()
  61. # print("s buf: {}".format(_send_data))
  62. def print_all(self):
  63. while 1:
  64. time.sleep(0.01)
  65. r_data = self.serial_ins.read_cmd()
  66. if r_data:
  67. self.total_n += 1
  68. # print("g buf:{}".format(self.serial_ins.change_hex_to_int(r_data)))
  69. if r_data[0] == 100:
  70. self.print_mcu_error_data(r_data)
  71. raise 1111
  72. if r_data[1] != 1:
  73. # raise 222222
  74. continue
  75. r_data = r_data[1:]
  76. _r_data = self.serial_ins.change_hex_to_int(r_data)
  77. _r_data = _r_data.strip()
  78. # print("g buf:{}".format(_r_data))
  79. value = self.get_data_from_receive_data(receive_data=r_data, start=3, len_data=2, data_magnification=1)
  80. print("value:{},total:{}".format(value, self.total_n))
  81. if value > 300:
  82. self.last_value += 1
  83. continue
  84. if value == 1 or value == self.last_value + 1:
  85. self.last_value = value
  86. else:
  87. raise "数据接收有中断"
  88. def get_data_from_receive_data(
  89. self, receive_data, start, len_data, data_magnification=1
  90. ):
  91. # data_magnification 数据放大倍数,或缩小倍数,默认为1
  92. try:
  93. if len_data == 1:
  94. data = receive_data[start]
  95. return data * data_magnification
  96. elif len_data == 2:
  97. data = receive_data[start] << 8 | receive_data[start + 1]
  98. return data * data_magnification
  99. elif len_data == 4:
  100. data = (
  101. receive_data[start] << 24
  102. | receive_data[start + 1] << 16
  103. | receive_data[start + 2] << 8
  104. | receive_data[start + 3]
  105. )
  106. return data * data_magnification
  107. return None
  108. except:
  109. return None
  110. # 打印下位机的错误内容
  111. def print_mcu_error_data(self, receive_data):
  112. # 扫码数据
  113. try:
  114. data = receive_data[1:].decode()
  115. print("126 print_mcu_error_data:", data)
  116. except BaseException as e:
  117. print("128 error {}".format(e))
  118. return
  119. if __name__ == '__main__':
  120. Main().run()