|
|
@@ -1,8 +1,13 @@
|
|
|
+import asyncio
|
|
|
import serial.tools.list_ports
|
|
|
import time
|
|
|
+from .SerialIns import SerialIns
|
|
|
+from sockets import socket_manager
|
|
|
# mcu命令
|
|
|
class DeviceControl:
|
|
|
-
|
|
|
+ def __init__(self):
|
|
|
+ self.serial_ins = None
|
|
|
+ self.connected_ports_dict = {} # 已连接的ports
|
|
|
def scan_serial_port(self) -> dict:
|
|
|
# 获取所有可用串口列表
|
|
|
ports_dict = {}
|
|
|
@@ -23,18 +28,31 @@ class DeviceControl:
|
|
|
return {}
|
|
|
return ports_dict
|
|
|
|
|
|
- def add_port(self, port_name, port_value=None):
|
|
|
- # port_value :串口基础信息
|
|
|
- # todo 根据prot_value 信息自动进行连接
|
|
|
- print("add", port_name)
|
|
|
- # 对没有连接的设备进行尝试连接
|
|
|
+ def remove_port(self, port_name):
|
|
|
+ '''移除串口'''
|
|
|
+ print("remove", port_name)
|
|
|
self.sign_data.emit(
|
|
|
{
|
|
|
- "_type": "show_info",
|
|
|
+ "_type": "remove_port",
|
|
|
"plugins_mode": "auto_select_com",
|
|
|
- "data": {"text": "开始识别接口:{}".format(port_name)},
|
|
|
+ "data": {"port_name": port_name},
|
|
|
}
|
|
|
)
|
|
|
+ def sendMsg(self,code=0,msg="",data=None):
|
|
|
+ asyncio.run(socket_manager.send_message(code=code, msg=msg, data=data))
|
|
|
+ # print(code,msg,data)
|
|
|
+ def add_port_by_linkage(self, port_name):
|
|
|
+ # port_value :串口基础信息
|
|
|
+ # todo 根据prot_value 信息自动进行连接
|
|
|
+ print("add", port_name)
|
|
|
+ # 对没有连接的设备进行尝试连接
|
|
|
+ message_data = {
|
|
|
+ "_type": "show_info",
|
|
|
+ "plugins_mode": "auto_select_com",
|
|
|
+ "data": {"text": "开始识别接口:{}".format(port_name)},
|
|
|
+ }
|
|
|
+
|
|
|
+ self.sendMsg(msg="开始识别接口:{}".format(port_name), data=message_data)
|
|
|
time.sleep(1)
|
|
|
"""
|
|
|
步骤:
|
|
|
@@ -42,14 +60,18 @@ class DeviceControl:
|
|
|
"""
|
|
|
try:
|
|
|
# 尝试使用115200波特率链接
|
|
|
- serial_handle = serial.Serial(port=port_name, baudrate=115200, timeout=0.5)
|
|
|
+ serial_handle = serial.Serial(
|
|
|
+ port=port_name, baudrate=115200, timeout=0.5
|
|
|
+ )
|
|
|
except:
|
|
|
- self.sign_data.emit(
|
|
|
- {
|
|
|
- "_type": "show_info",
|
|
|
- "plugins_mode": "auto_select_com",
|
|
|
- "data": {"text": "串口:{} 被占用,或无法识别".format(port_name)},
|
|
|
- }
|
|
|
+ message_data = {
|
|
|
+ "_type": "show_info",
|
|
|
+ "plugins_mode": "auto_select_com",
|
|
|
+ "data": {"text": "串口:{} 被占用,或无法识别".format(port_name)},
|
|
|
+ }
|
|
|
+ self.sendMsg(
|
|
|
+ msg="串口:{} 被占用,或无法识别".format(port_name).format(port_name),
|
|
|
+ data=message_data,
|
|
|
)
|
|
|
print("串口:{} 被占用".format(port_name))
|
|
|
return
|
|
|
@@ -93,47 +115,28 @@ class DeviceControl:
|
|
|
|
|
|
if device_id > 0:
|
|
|
if device_id == 1:
|
|
|
- self.windows.mcu.to_connect_com(port_name, is_test=False)
|
|
|
- self.sign_data.emit(
|
|
|
- {
|
|
|
- "_type": "show_info",
|
|
|
- "plugins_mode": "auto_select_com",
|
|
|
- "data": {"text": "MCU开始连接"},
|
|
|
- }
|
|
|
- )
|
|
|
- self.connected_ports_dict[port_name] = "MCU"
|
|
|
- if device_id == 2:
|
|
|
- self.windows.remote_control.to_connect_com(port_name, is_test=False)
|
|
|
- self.sign_data.emit(
|
|
|
- {
|
|
|
- "_type": "show_info",
|
|
|
- "plugins_mode": "auto_select_com",
|
|
|
- "data": {"text": "开始连接遥控器"},
|
|
|
- }
|
|
|
- )
|
|
|
- self.connected_ports_dict[port_name] = "有线遥控器"
|
|
|
-
|
|
|
- # RemoteControlV2 新版遥控器 2024-12-02
|
|
|
- if device_id == 4:
|
|
|
- self.windows.remote_control_v2.to_connect_com(port_name, is_test=False)
|
|
|
- self.sign_data.emit(
|
|
|
- {
|
|
|
- "_type": "show_info",
|
|
|
- "plugins_mode": "auto_select_com",
|
|
|
- "data": {"text": "开始连接遥控器V2"},
|
|
|
- }
|
|
|
- )
|
|
|
- self.connected_ports_dict[port_name] = "无线遥控器V2"
|
|
|
-
|
|
|
- self.sign_data.emit(
|
|
|
- {
|
|
|
- "_type": "select_port_name",
|
|
|
+ self.to_connect_com(port_name)
|
|
|
+ message_data = {
|
|
|
+ "_type": "show_info",
|
|
|
"plugins_mode": "auto_select_com",
|
|
|
- "data": {
|
|
|
- "device_name": "mcu" if device_id == 1 else "remote_control",
|
|
|
- "port_name": port_name,
|
|
|
- },
|
|
|
+ "data": {"text": "MCU开始连接"},
|
|
|
}
|
|
|
+ self.sendMsg(
|
|
|
+ msg="MCU开始连接",
|
|
|
+ data=message_data,
|
|
|
+ )
|
|
|
+ self.connected_ports_dict[port_name] = "MCU"
|
|
|
+ message_data = {
|
|
|
+ "_type": "select_port_name",
|
|
|
+ "plugins_mode": "auto_select_com",
|
|
|
+ "data": {
|
|
|
+ "device_name": "mcu" if device_id == 1 else "remote_control",
|
|
|
+ "port_name": port_name,
|
|
|
+ },
|
|
|
+ }
|
|
|
+ self.sendMsg(
|
|
|
+ msg="MCU连接成功",
|
|
|
+ data=message_data,
|
|
|
)
|
|
|
else:
|
|
|
print("串口无法识别")
|
|
|
@@ -143,3 +146,123 @@ class DeviceControl:
|
|
|
# 正常跳过;记录为其他列表
|
|
|
# 不正常进行尝试连接
|
|
|
# 连接不上,记录为其他列表
|
|
|
+
|
|
|
+ def read_cmd(self, serial_handle, check=None):
|
|
|
+ n = 0
|
|
|
+ while 1:
|
|
|
+ try:
|
|
|
+ read_d = serial_handle.read_all() # 读取接收到的数据
|
|
|
+ self.receive_data += read_d
|
|
|
+ except BaseException as e:
|
|
|
+ print("171串口接收报错", e)
|
|
|
+ self.serial_handle = None
|
|
|
+ return False
|
|
|
+
|
|
|
+ if len(self.receive_data) < 4:
|
|
|
+ break
|
|
|
+
|
|
|
+ if self.receive_data[0] == 0x55 and self.receive_data[1] == 0x55:
|
|
|
+ # print("read ori ", self.change_hex_to_int(self.receive_data))
|
|
|
+ data_len = self.receive_data[2]
|
|
|
+ if len(self.receive_data) < data_len + 4:
|
|
|
+ # 此处需要超时机制
|
|
|
+ # print("数据长度不够,等待下次读取")
|
|
|
+ # 超时退出
|
|
|
+ # if not self.serial_handle.txdone():
|
|
|
+ # return None
|
|
|
+ # n += 1
|
|
|
+ # if n > out_time_n:
|
|
|
+ # return None
|
|
|
+ # time.sleep(0.01)
|
|
|
+ continue
|
|
|
+ _data = self.receive_data[3 : data_len + 4]
|
|
|
+ # 更新缓存区
|
|
|
+ self.receive_data = self.receive_data[data_len + 4 :]
|
|
|
+ # 校验数据
|
|
|
+ if 0xFF & ~sum(_data[:-1]) == _data[-1]:
|
|
|
+ # print("receive_data:", self.change_hex_to_int(self.receive_data[:-1]))
|
|
|
+ return _data[:-1]
|
|
|
+ else:
|
|
|
+ return None
|
|
|
+ else:
|
|
|
+ # print("起始位不是 55 55 进行移除", self.receive_data[0])
|
|
|
+ # 起始位不是 55 55 进行移除
|
|
|
+ while self.receive_data:
|
|
|
+ if len(self.receive_data) == 1:
|
|
|
+ if self.receive_data[0] == 0x55:
|
|
|
+ break
|
|
|
+ else:
|
|
|
+ self.receive_data = b""
|
|
|
+ else:
|
|
|
+ if (
|
|
|
+ self.receive_data[0] == 0x55
|
|
|
+ and self.receive_data[1] == 0x55
|
|
|
+ ):
|
|
|
+ break
|
|
|
+ else:
|
|
|
+ self.receive_data = self.receive_data[1:]
|
|
|
+
|
|
|
+ def to_connect_com(self, port_name):
|
|
|
+ self.close_connect()
|
|
|
+ time.sleep(0.5)
|
|
|
+ try:
|
|
|
+ self.serial_ins = SerialIns(port_name=port_name, baud=115200)
|
|
|
+ # self.serial_ins = SerialIns(port_name=port_name, baud=9600)
|
|
|
+ if self.serial_ins.serial_handle:
|
|
|
+ self.sendMsg(msg="条码识别 打开串口成功")
|
|
|
+ self.connect_state = True
|
|
|
+ self.start()
|
|
|
+ return True
|
|
|
+ else:
|
|
|
+ self.sendMsg(code=1, msg="条码识别 打开串口失败")
|
|
|
+ self.serial_ins = None
|
|
|
+ self.connect_state = False
|
|
|
+ except:
|
|
|
+ self.sendMsg(code=1, msg="条码识别 打开串口失败")
|
|
|
+ # print("条码识别 打开串口失败")
|
|
|
+ self.serial_ins = None
|
|
|
+ self.connect_state = False
|
|
|
+ return False
|
|
|
+
|
|
|
+ def close_connect(self):
|
|
|
+ self.port_name = ""
|
|
|
+ if self.serial_ins:
|
|
|
+ self.serial_ins.close_serial_port()
|
|
|
+ self.connect_state = False
|
|
|
+
|
|
|
+
|
|
|
+if __name__ == "__main__":
|
|
|
+ device_control = DeviceControl()
|
|
|
+ p_list = []
|
|
|
+ temp_ports_dict = {}
|
|
|
+ while 1:
|
|
|
+ time.sleep(1)
|
|
|
+ ports_dict = device_control.scan_serial_port()
|
|
|
+ temp_ports_dict = ports_dict
|
|
|
+
|
|
|
+ if not ports_dict:
|
|
|
+ # 全部清空 移除所有串口
|
|
|
+ if p_list:
|
|
|
+ _p = p_list.pop()
|
|
|
+ device_control.remove_port(_p)
|
|
|
+ continue
|
|
|
+
|
|
|
+ if ports_dict:
|
|
|
+ # print(plist)
|
|
|
+ for index, _i in enumerate(p_list):
|
|
|
+ if _i not in ports_dict:
|
|
|
+ _p = p_list.pop(index)
|
|
|
+ device_control.remove_port(_p)
|
|
|
+
|
|
|
+ for _port_name, _port_value in ports_dict.items():
|
|
|
+ if _port_name not in p_list:
|
|
|
+ # try:
|
|
|
+ p_list.append(_port_name)
|
|
|
+ device_control.add_port_by_linkage(_port_name)
|
|
|
+ # except BaseException as e:
|
|
|
+ # print(e.__traceback__.tb_frame.f_globals["__file__"]) # 发生异常所在的文件
|
|
|
+ # print(e.__traceback__.tb_lineno) # 发生异常所在的行数
|
|
|
+ # print("串口不存在{} {}".format(_port_name, e))
|
|
|
+
|
|
|
+ # threading.Thread(target=self.add_port, args=(_port_name, _port_value)).start()
|
|
|
+ # self.add_port(_p)
|