| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131 |
- import random
- import time
- import serial
- import serial.tools.list_ports
- from mcu.SerialIns import SerialIns
- import threading
- class Main():
- def __init__(self):
- port_name = self.list_serial_ports()
- if not port_name:
- return
- self.last_value = 0
- self.total_n = 0
- self.serial_ins = SerialIns(port_name=port_name, baud=115200)
- def list_serial_ports(self):
- # 获取所有可用串口列表
- ports = serial.tools.list_ports.comports()
- # 遍历所有端口并打印信息
- for port in ports:
- print(f"设备: {port.device}")
- print(f"名称: {port.name}")
- print(f"描述: {port.description}")
- print(f"硬件ID: {port.hwid}")
- print(f"制造商: {port.manufacturer}")
- print(f"产品ID: {port.product}")
- print(f"序列号: {port.serial_number}")
- print("-" * 40)
- if "CH340" in port.description:
- return port.name
- return False
- def encapsulation_data(self, data, len_data, data_magnification=1):
- # data_magnification 数据放大倍数,或缩小倍数,默认为1
- data = int(data * data_magnification)
- if len_data == 1:
- return [0xFF & data]
- elif len_data == 2:
- return [0xFF & data >> 8, 0xFF & data]
- elif len_data == 4:
- return [0xFF & data >> 24, 0xFF & data >> 16, 0xFF & data >> 8, 0xFF & data]
- def run(self):
- t = threading.Thread(target=self.print_all)
- t.start()
- n = 0
- value = 0
- while True:
- value += 1
- # time.sleep(0.02)
- v_t = random.randint(1, 10)
- print("倒计时 v_t:{}".format(v_t))
- time.sleep(0.02)
- n += 1
- send_data = []
- send_data.extend([0x01, 0x07, 0x01, 0x00, value, 0x05, 0x78, 0x01, 0x90, 0x00, 0x64, 0x00, 0x01, 0x00])
- send_data.extend(self.encapsulation_data(data=n, len_data=4))
- # send_data = [0x5a, 0x01]
- if value == 200:
- value = 0
- self.serial_ins.write_cmd(send_data)
- _send_data = self.serial_ins.change_hex_to_int(send_data)
- _send_data = _send_data.strip()
- # print("s buf: {}".format(_send_data))
- def print_all(self):
- while 1:
- time.sleep(0.01)
- r_data = self.serial_ins.read_cmd()
- if r_data:
- self.total_n += 1
- # print("g buf:{}".format(self.serial_ins.change_hex_to_int(r_data)))
- if r_data[0] == 100:
- self.print_mcu_error_data(r_data)
- raise 1111
- if r_data[1] != 1:
- # raise 222222
- continue
- r_data = r_data[1:]
- _r_data = self.serial_ins.change_hex_to_int(r_data)
- _r_data = _r_data.strip()
- # print("g buf:{}".format(_r_data))
- value = self.get_data_from_receive_data(receive_data=r_data, start=3, len_data=2, data_magnification=1)
- print("value:{},total:{}".format(value, self.total_n))
- if value > 300:
- self.last_value += 1
- continue
- if value == 1 or value == self.last_value + 1:
- self.last_value = value
- else:
- raise "数据接收有中断"
- def get_data_from_receive_data(
- self, receive_data, start, len_data, data_magnification=1
- ):
- # data_magnification 数据放大倍数,或缩小倍数,默认为1
- try:
- if len_data == 1:
- data = receive_data[start]
- return data * data_magnification
- elif len_data == 2:
- data = receive_data[start] << 8 | receive_data[start + 1]
- return data * data_magnification
- elif len_data == 4:
- data = (
- receive_data[start] << 24
- | receive_data[start + 1] << 16
- | receive_data[start + 2] << 8
- | receive_data[start + 3]
- )
- return data * data_magnification
- return None
- except:
- return None
- # 打印下位机的错误内容
- def print_mcu_error_data(self, receive_data):
- # 扫码数据
- try:
- data = receive_data[1:].decode()
- print("126 print_mcu_error_data:", data)
- except BaseException as e:
- print("128 error {}".format(e))
- return
- if __name__ == '__main__':
- Main().run()
|