| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290 |
- import asyncio
- import serial.tools.list_ports
- import time, json
- from .SerialIns import SerialIns
- from utils.SingletonType import SingletonType
- from .BaseClass import BaseClass
- from sockets import ConnectionManager
- # mcu命令
- class DeviceControl(BaseClass,metaclass=SingletonType):
- def __init__(self, websocket_manager: ConnectionManager):
- super().__init__(websocket_manager)
- self.serial_ins = None
- self.connected_ports_dict = {} # 已连接的ports
- self.p_list = []
- self.temp_ports_dict = {}
- self.is_running = False
- def scan_serial_port(self) -> dict:
- # 获取所有可用串口列表
- ports_dict = {}
- ports = serial.tools.list_ports.comports()
- # 遍历所有端口并打印信息
- for port in ports:
- if "CH340" in port.description:
- ports_dict[port.name] = {
- "name": port.name,
- "device": port.device,
- "description": port.description,
- "hwid": port.hwid,
- "manufacturer": port.manufacturer,
- "product": port.product,
- "serial_number": port.serial_number,
- }
- if len(ports_dict) <= 0:
- return {}
- return ports_dict
- def remove_port(self, port_name):
- """移除串口"""
- print("remove", port_name)
- data = {
- "_type": "remove_port",
- "plugins_mode": "auto_select_com",
- "data": {"port_name": port_name},
- }
- self.sendSocketMessage(1, "串口被移除", data)
- def add_port_by_linkage(self, port_name):
- # port_value :串口基础信息
- # todo 根据prot_value 信息自动进行连接
- print("add", port_name)
- # 对没有连接的设备进行尝试连接
- message_data = {
- "_type": "show_info",
- "plugins_mode": "auto_select_com",
- "data": {"text": "开始识别接口:{}".format(port_name)},
- }
- self.sendSocketMessage(msg="开始识别接口:{}".format(port_name), data=message_data)
- time.sleep(1)
- """
- 步骤:
- 1、进行临时连接,并发送命令,成功后,自动连接对应设备
- """
- try:
- # 尝试使用115200波特率链接
- serial_handle = serial.Serial(port=port_name, baudrate=115200, timeout=0.5)
- except:
- message_data = {
- "_type": "show_info",
- "plugins_mode": "auto_select_com",
- "data": {"text": "串口:{} 被占用,或无法识别".format(port_name)},
- }
- self.sendSocketMessage(
- 1,
- msg="串口:{} 被占用,或无法识别".format(port_name).format(port_name),
- data=message_data,
- )
- print("串口:{} 被占用".format(port_name))
- return
- time.sleep(2)
- print("开始发送命令")
- data = [90, 1]
- try:
- serial_handle.flushInput() # 尝试重置输入缓冲区
- except serial.SerialTimeoutException:
- print("超时错误:无法在规定时间内重置输入缓冲区。")
- self.sendSocketMessage(
- 1,
- msg="超时错误:无法在规定时间内重置输入缓冲区。",
- data=None,
- )
- serial_handle.close()
- return
- print("尝试写入数据")
- buf = bytearray(b"")
- buf.extend([0x55, 0x55, (0xFF & len(data))])
- buf.extend(data)
- buf.extend([0xFF & ~sum(data)])
- try:
- self.receive_data = b""
- serial_handle.write(buf)
- except serial.SerialTimeoutException:
- print("写入数据错误")
- serial_handle.close()
- return
- time.sleep(0.3)
- print("尝试接收命令")
- receive_data = self.read_cmd(serial_handle)
- device_id = 0
- if receive_data:
- print("receive_data", receive_data)
- if receive_data[0] == 90:
- connect_flag = receive_data[1]
- device_id = receive_data[2]
- print("关闭串口:{}".format(port_name))
- serial_handle.close()
- if device_id > 0:
- if device_id == 1:
- self.to_connect_com(port_name)
- message_data = {
- "_type": "show_info",
- "plugins_mode": "auto_select_com",
- "data": {"text": "MCU开始连接"},
- }
- self.sendSocketMessage(
- msg="MCU开始连接",
- data=message_data,
- )
- self.connected_ports_dict[port_name] = "MCU"
- message_data = {
- "_type": "select_port_name",
- "plugins_mode": "auto_select_com",
- "data": {
- "device_name": "mcu" if device_id == 1 else "remote_control",
- "port_name": port_name,
- },
- }
- self.sendSocketMessage(
- msg="MCU连接成功",
- data=message_data,
- )
- else:
- print("串口无法识别")
- # 走其他途径处理
- # 检查当前MCU链接是否正常
- # 正常跳过;记录为其他列表
- # 不正常进行尝试连接
- # 连接不上,记录为其他列表
- def to_connect_com(self, port_name):
- # 关闭串口
- print("to_connect_com", port_name)
- self.close_connect()
- time.sleep(0.3)
- self.connect_state = False
- try:
- self.serial_ins = SerialIns(port_name=port_name, baud=115200, timeout=0.1)
- if not self.serial_ins.serial_handle:
- message_data= {
- "_type": "show_info",
- "plugins_mode": "mcu",
- "data": "MCU 打开串口失败",
- }
- self.sendSocketMessage(
- msg="MCU 打开串口失败",
- data=message_data,
- )
- self.serial_ins = None
- self.connect_state = False
- return False
- except:
- message_data={
- "_type": "show_info",
- "plugins_mode": "mcu",
- "data": "MCU 打开串口失败",
- }
- self.sendSocketMessage(
- msg="MCU 打开串口失败",
- data=message_data,
- )
- self.serial_ins = None
- self.connect_state = False
- return False
- message_data={"_type": "show_info", "plugins_mode": "mcu", "data": "MCU 开始连接"}
- self.sendSocketMessage(
- msg="MCU 开始连接",
- data=message_data,
- )
- # =======================发送连接请求=================================
- cmd = 90
- data = [cmd, 1]
- print("405 发送 连接请求 -----------------------------------------")
- print(self.serial_ins)
- # self.serial_ins.clearn_flush()
- self.serial_ins.write_cmd(data)
- # 延迟接收数据
- time.sleep(0.3)
- receive_data = self.serial_ins.read_cmd(out_time=1)
- if receive_data:
- print(
- "409 receive_data--90:{}".format(self.change_hex_to_int(receive_data))
- )
- if receive_data:
- # receive_data[2]=1 表示为MCU设备编号
- if receive_data[0] == 90 and receive_data[2] == 1:
- connect_flag = receive_data[1]
- # 是否有初始化
- try:
- mcu_has_been_set = receive_data[
- 6
- ] # 设备是否有初始化 ,1 表示已初始化
- except:
- mcu_has_been_set = 99 # 未知状态
- print("MCU初始化信息{}".format(mcu_has_been_set))
- message_data = {
- "_type": "show_info",
- "plugins_mode": "mcu",
- "data": "MCU 已连接",
- }
- self.sendSocketMessage(
- msg="MCU 已连接",
- data=message_data,
- )
- self.connect_state = True
- if not self.is_running:
- print("MCU start")
- self.is_running = True
- print("MCU 已连接")
- self.port_name = port_name
- return
- print("MCU 连接失败")
- message_data = {
- "_type": "show_info",
- "plugins_mode": "mcu",
- "data": "MCU 连接失败",
- }
- self.sendSocketMessage(
- msg="MCU 连接失败",
- data=message_data,
- )
- self.close_connect()
- def close_connect(self):
- self.port_name = ""
- if self.serial_ins:
- self.serial_ins.close_serial_port()
- self.connect_state = False
- async def checkMcuConnection(device_ctrl: DeviceControl):
- """实时检测串口是否连接"""
- while True:
- await asyncio.sleep(0.5)
- ports_dict = device_ctrl.scan_serial_port()
- device_ctrl.temp_ports_dict = ports_dict
- if not ports_dict:
- # 全部清空 移除所有串口
- if device_ctrl.p_list:
- _p = device_ctrl.p_list.pop()
- device_ctrl.remove_port(_p)
- print("串口未连接,请检查")
- device_ctrl.sendSocketMessage(code=1, msg="串口未连接,请检查")
- continue
- if ports_dict:
- for index, _i in enumerate(device_ctrl.p_list):
- if _i not in ports_dict:
- _p = device_ctrl.p_list.pop(index)
- device_ctrl.remove_port(_p)
- for _port_name, _port_value in ports_dict.items():
- if _port_name not in device_ctrl.p_list:
- # try:
- device_ctrl.p_list.append(_port_name)
- device_ctrl.add_port_by_linkage(_port_name)
- # except BaseException as e:
- # print("串口不存在{} {}".format(_port_name, e))
|