mcu_test.py 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124
  1. import time
  2. import serial
  3. import serial.tools.list_ports
  4. # from mcu.base_mode.base import *
  5. from mcu.SerialIns import SerialIns
  6. import asyncio
  7. import random
  8. class Main():
  9. def __init__(self):
  10. port_name = self.list_serial_ports()
  11. if not port_name:
  12. return
  13. self.total_n = 0
  14. self.last_value = 0
  15. self.serial_ins = SerialIns(port_name=port_name, baud=115200)
  16. def encapsulation_data(self, data, len_data, data_magnification=1):
  17. # data_magnification 数据放大倍数,或缩小倍数,默认为1
  18. data = int(data * data_magnification)
  19. if len_data == 1:
  20. return [0xFF & data]
  21. elif len_data == 2:
  22. return [0xFF & data >> 8, 0xFF & data]
  23. elif len_data == 4:
  24. return [0xFF & data >> 24, 0xFF & data >> 16, 0xFF & data >> 8, 0xFF & data]
  25. def list_serial_ports(self):
  26. # 获取所有可用串口列表
  27. ports = serial.tools.list_ports.comports()
  28. # 遍历所有端口并打印信息
  29. for port in ports:
  30. print(f"设备: {port.device}")
  31. print(f"名称: {port.name}")
  32. print(f"描述: {port.description}")
  33. print(f"硬件ID: {port.hwid}")
  34. print(f"制造商: {port.manufacturer}")
  35. print(f"产品ID: {port.product}")
  36. print(f"序列号: {port.serial_number}")
  37. print("-" * 40)
  38. if "CH340" in port.description:
  39. return port.name
  40. return False
  41. def run(self):
  42. n = 0
  43. value = 0
  44. while True:
  45. value += 1
  46. random_int = random.randint(1, 10)
  47. time.sleep(random_int)
  48. n += 1
  49. send_data = []
  50. send_data.extend([0x01, 0x07, 0x01, 0x00, value, 0x05, 0x78, 0x01, 0x90, 0x00, 0x64, 0x00, 0x01, 0x00])
  51. send_data.extend(self.encapsulation_data(data=n, len_data=4))
  52. # send_data = [0x5a, 0x01]
  53. if value == 200:
  54. value = 0
  55. self.serial_ins.write_cmd(send_data)
  56. _send_data = self.serial_ins.change_hex_to_int(send_data)
  57. _send_data = _send_data.strip()
  58. # print("s buf: {}".format(_send_data))
  59. def get_data_from_receive_data(
  60. self, receive_data, start, len_data, data_magnification=1
  61. ):
  62. # data_magnification 数据放大倍数,或缩小倍数,默认为1
  63. try:
  64. if len_data == 1:
  65. data = receive_data[start]
  66. return data * data_magnification
  67. elif len_data == 2:
  68. data = receive_data[start] << 8 | receive_data[start + 1]
  69. return data * data_magnification
  70. elif len_data == 4:
  71. data = (
  72. receive_data[start] << 24
  73. | receive_data[start + 1] << 16
  74. | receive_data[start + 2] << 8
  75. | receive_data[start + 3]
  76. )
  77. return data * data_magnification
  78. return None
  79. except:
  80. return None
  81. def print_all(self):
  82. print("开始打印数据")
  83. while 1:
  84. time.sleep(0.01)
  85. r_data = self.serial_ins.read_cmd()
  86. # print("接收数据:{}".format(r_data))
  87. if r_data:
  88. self.total_n += 1
  89. r_data = r_data[1:]
  90. _r_data = self.serial_ins.change_hex_to_int(r_data)
  91. _r_data = _r_data.strip()
  92. print("g buf:{}".format(_r_data))
  93. value = self.get_data_from_receive_data(receive_data=r_data, start=3, len_data=2, data_magnification=1)
  94. print("value:{},total:{}".format(value, self.total_n))
  95. if value == 1 or value == self.last_value + 1:
  96. self.last_value = value
  97. else:
  98. print("数据接收有中断")
  99. raise "数据接收有中断"
  100. else:
  101. # print("数据未获取到")
  102. pass
  103. if __name__ == '__main__':
  104. main = Main()
  105. async def run_tasks():
  106. loop = asyncio.get_event_loop()
  107. # 创建任务
  108. task1 = loop.run_in_executor(None, main.run)
  109. task2 = loop.run_in_executor(None, main.print_all)
  110. # 等待两个任务(它们会无限运行,除非发生异常)
  111. await asyncio.gather(task1, task2)
  112. try:
  113. asyncio.run(run_tasks())
  114. except KeyboardInterrupt:
  115. print("程序被用户中断")