Bläddra i källkod

```
feat(mcu): 添加MCU测试脚本和优化串口通信

添加新的mcu_test_2.py测试脚本,包含完整的串口通信测试功能,
支持数据封装、接收处理、错误检测等特性。同时在原测试脚本中
添加数据计数功能,移除调试用的sleep延迟,并注释掉部分调试
打印语句以优化性能。
```

rambo 1 vecka sedan
förälder
incheckning
ffa7faa7ea
3 ändrade filer med 137 tillägg och 2 borttagningar
  1. 1 0
      python/mcu/SerialIns.py
  2. 5 2
      python/mcu_test.py
  3. 131 0
      python/mcu_test_2.py

+ 1 - 0
python/mcu/SerialIns.py

@@ -72,6 +72,7 @@ class SerialIns(object):
             # logger.info("正在发送命令======>>>>> send buf  %s", self.change_hex_to_int(buf))
             try:
                 self.serial_handle.write(buf)
+                # time.sleep(0.001)
                 return True
             except:
                 self.serial_handle = None

+ 5 - 2
python/mcu_test.py

@@ -12,6 +12,7 @@ class Main():
         port_name = self.list_serial_ports()
         if not port_name:
             return
+        self.total_n = 0
         self.last_value = 0
         self.serial_ins = SerialIns(port_name=port_name, baud=115200)
     def encapsulation_data(self, data, len_data, data_magnification=1):
@@ -55,6 +56,7 @@ class Main():
             if value == 200:
                 value = 0
             self.serial_ins.write_cmd(send_data)
+            
             _send_data = self.serial_ins.change_hex_to_int(send_data)
             _send_data = _send_data.strip()
             # print("s buf:  {}".format(_send_data))
@@ -85,15 +87,16 @@ class Main():
         while 1:
             time.sleep(0.01)
             r_data = self.serial_ins.read_cmd()
-            print("接收数据:{}".format(r_data))
+            # print("接收数据:{}".format(r_data))
             if r_data:
+                self.total_n += 1
                 r_data = r_data[1:]
                 _r_data = self.serial_ins.change_hex_to_int(r_data)
                 _r_data = _r_data.strip()
                 print("g buf:{}".format(_r_data))
 
                 value = self.get_data_from_receive_data(receive_data=r_data, start=3, len_data=2, data_magnification=1)
-                print(value)
+                print("value:{},total:{}".format(value, self.total_n))
                 if value == 1 or value == self.last_value + 1:
                     self.last_value = value
                 else:

+ 131 - 0
python/mcu_test_2.py

@@ -0,0 +1,131 @@
+import random
+import time
+
+import serial
+import serial.tools.list_ports
+from mcu.SerialIns import SerialIns
+import threading
+
+
+class Main():
+    def __init__(self):
+        port_name = self.list_serial_ports()
+        if not port_name:
+            return
+        self.last_value = 0
+        self.total_n = 0
+        self.serial_ins = SerialIns(port_name=port_name, baud=115200)
+
+    def list_serial_ports(self):
+        # 获取所有可用串口列表
+        ports = serial.tools.list_ports.comports()
+        # 遍历所有端口并打印信息
+        for port in ports:
+            print(f"设备: {port.device}")
+            print(f"名称: {port.name}")
+            print(f"描述: {port.description}")
+            print(f"硬件ID: {port.hwid}")
+            print(f"制造商: {port.manufacturer}")
+            print(f"产品ID: {port.product}")
+            print(f"序列号: {port.serial_number}")
+            print("-" * 40)
+            if "CH340" in port.description:
+                return port.name
+        return False
+    def encapsulation_data(self, data, len_data, data_magnification=1):
+        # data_magnification 数据放大倍数,或缩小倍数,默认为1
+        data = int(data * data_magnification)
+        if len_data == 1:
+            return [0xFF & data]
+        elif len_data == 2:
+            return [0xFF & data >> 8, 0xFF & data]
+        elif len_data == 4:
+            return [0xFF & data >> 24, 0xFF & data >> 16, 0xFF & data >> 8, 0xFF & data]
+    def run(self):
+        t = threading.Thread(target=self.print_all)
+        t.start()
+
+        n = 0
+        value = 0
+        while True:
+            value += 1
+            # time.sleep(0.02)
+            v_t = random.randint(1, 10)
+            print("倒计时 v_t:{}".format(v_t))
+            time.sleep(0.02)
+
+            n += 1
+            send_data = []
+            send_data.extend([0x01, 0x07, 0x01, 0x00, value, 0x05, 0x78, 0x01, 0x90, 0x00, 0x64, 0x00, 0x01, 0x00])
+            send_data.extend(self.encapsulation_data(data=n, len_data=4))
+            # send_data = [0x5a, 0x01]
+            if value == 200:
+                value = 0
+            self.serial_ins.write_cmd(send_data)
+            _send_data = self.serial_ins.change_hex_to_int(send_data)
+            _send_data = _send_data.strip()
+            # print("s buf:  {}".format(_send_data))
+
+    def print_all(self):
+        while 1:
+            time.sleep(0.01)
+            r_data = self.serial_ins.read_cmd()
+            if r_data:
+                self.total_n += 1
+                # print("g buf:{}".format(self.serial_ins.change_hex_to_int(r_data)))
+                if r_data[0] == 100:
+                    self.print_mcu_error_data(r_data)
+                    raise 1111
+                if r_data[1] != 1:
+                    # raise 222222
+                    continue
+
+                r_data = r_data[1:]
+                _r_data = self.serial_ins.change_hex_to_int(r_data)
+                _r_data = _r_data.strip()
+                # print("g buf:{}".format(_r_data))
+                value = self.get_data_from_receive_data(receive_data=r_data, start=3, len_data=2, data_magnification=1)
+                print("value:{},total:{}".format(value, self.total_n))
+                if value > 300:
+                    self.last_value += 1
+                    continue
+                if value == 1 or value == self.last_value + 1:
+                    self.last_value = value
+                else:
+                    raise "数据接收有中断"
+    def get_data_from_receive_data(
+        self, receive_data, start, len_data, data_magnification=1
+    ):
+        # data_magnification 数据放大倍数,或缩小倍数,默认为1
+        try:
+            if len_data == 1:
+                data = receive_data[start]
+                return data * data_magnification
+            elif len_data == 2:
+                data = receive_data[start] << 8 | receive_data[start + 1]
+                return data * data_magnification
+            elif len_data == 4:
+                data = (
+                    receive_data[start] << 24
+                    | receive_data[start + 1] << 16
+                    | receive_data[start + 2] << 8
+                    | receive_data[start + 3]
+                )
+                return data * data_magnification
+            return None
+        except:
+            return None
+    # 打印下位机的错误内容
+    def print_mcu_error_data(self, receive_data):
+        # 扫码数据
+        try:
+            data = receive_data[1:].decode()
+            print("126  print_mcu_error_data:", data)
+        except BaseException as e:
+            print("128 error {}".format(e))
+        return
+
+
+if __name__ == '__main__':
+    Main().run()
+