import asyncio from sockets import ConnectionManager from utils.common import message_queue class BaseClass: def __init__(self, websocket_manager: ConnectionManager): self.websocket_manager = websocket_manager def sendSocketMessage(self, code=0, msg="", data=None, msg_type=""): data = {"code": code, "msg": msg, "data": data, "msg_type": msg_type} # loop = asyncio.get_event_loop() # loop.create_task(self.websocket_manager.broadcast(data)) message_queue.put(data) def change_hex_to_int(self,_bytearray): return ' '.join([hex(x) for x in _bytearray]) def read_cmd(self, serial_handle, check=None): n = 0 while 1: try: read_d = serial_handle.read_all() # 读取接收到的数据 self.receive_data += read_d except BaseException as e: print("171串口接收报错", e) self.serial_handle = None return False if len(self.receive_data) < 4: break if self.receive_data[0] == 0x55 and self.receive_data[1] == 0x55: # print("read ori ", self.change_hex_to_int(self.receive_data)) data_len = self.receive_data[2] if len(self.receive_data) < data_len + 4: # 此处需要超时机制 # print("数据长度不够,等待下次读取") # 超时退出 # if not self.serial_handle.txdone(): # return None # n += 1 # if n > out_time_n: # return None # time.sleep(0.01) continue _data = self.receive_data[3 : data_len + 4] # 更新缓存区 self.receive_data = self.receive_data[data_len + 4 :] # 校验数据 if 0xFF & ~sum(_data[:-1]) == _data[-1]: # print("receive_data:", self.change_hex_to_int(self.receive_data[:-1])) return _data[:-1] else: return None else: # print("起始位不是 55 55 进行移除", self.receive_data[0]) # 起始位不是 55 55 进行移除 while self.receive_data: if len(self.receive_data) == 1: if self.receive_data[0] == 0x55: break else: self.receive_data = b"" else: if ( self.receive_data[0] == 0x55 and self.receive_data[1] == 0x55 ): break else: self.receive_data = self.receive_data[1:] def write_cmd(self, serial_handle,data: list): if serial_handle: # data = [(0xff & par1), (0xff & (par1 >> 8))] # self.clearn_flush() buf = bytearray(b"") buf.extend([0x55, 0x55, (0xFF & len(data))]) buf.extend(data) buf.extend([0xFF & ~sum(data)]) # 55 55 02 5a 01 a4 # print("send buf {}".format(self.change_hex_to_int(buf))) try: serial_handle.write(buf) return True except: serial_handle = None _recv_data = b"" return False