| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889 |
- import asyncio
- from sockets import ConnectionManager
- from utils.common import message_queue
- class BaseClass:
- def __init__(self, websocket_manager: ConnectionManager):
- self.websocket_manager = websocket_manager
- self.msg_type = ""
- def sendSocketMessage(self, code=0, msg="", data=None):
- data = {"code": code, "msg": msg, "data": data, "msg_type": self.msg_type}
- loop = asyncio.get_event_loop()
- loop.create_task(message_queue.put(data))
- def change_hex_to_int(self,_bytearray):
- return ' '.join([hex(x) for x in _bytearray])
- def read_cmd(self, serial_handle, check=None):
- n = 0
- while 1:
- try:
- read_d = serial_handle.read_all() # 读取接收到的数据
- self.receive_data += read_d
- except BaseException as e:
- print("171串口接收报错", e)
- self.serial_handle = None
- return False
- if len(self.receive_data) < 4:
- break
- if self.receive_data[0] == 0x55 and self.receive_data[1] == 0x55:
- # print("read ori ", self.change_hex_to_int(self.receive_data))
- data_len = self.receive_data[2]
- if len(self.receive_data) < data_len + 4:
- # 此处需要超时机制
- # print("数据长度不够,等待下次读取")
- # 超时退出
- # if not self.serial_handle.txdone():
- # return None
- # n += 1
- # if n > out_time_n:
- # return None
- # time.sleep(0.01)
- continue
- _data = self.receive_data[3 : data_len + 4]
- # 更新缓存区
- self.receive_data = self.receive_data[data_len + 4 :]
- # 校验数据
- if 0xFF & ~sum(_data[:-1]) == _data[-1]:
- # print("receive_data:", self.change_hex_to_int(self.receive_data[:-1]))
- return _data[:-1]
- else:
- return None
- else:
- # print("起始位不是 55 55 进行移除", self.receive_data[0])
- # 起始位不是 55 55 进行移除
- while self.receive_data:
- if len(self.receive_data) == 1:
- if self.receive_data[0] == 0x55:
- break
- else:
- self.receive_data = b""
- else:
- if (
- self.receive_data[0] == 0x55
- and self.receive_data[1] == 0x55
- ):
- break
- else:
- self.receive_data = self.receive_data[1:]
- def write_cmd(self, serial_handle,data: list):
- if serial_handle:
- # data = [(0xff & par1), (0xff & (par1 >> 8))]
- # self.clearn_flush()
- buf = bytearray(b"")
- buf.extend([0x55, 0x55, (0xFF & len(data))])
- buf.extend(data)
- buf.extend([0xFF & ~sum(data)])
- # 55 55 02 5a 01 a4
- # print("send buf {}".format(self.change_hex_to_int(buf)))
- try:
- serial_handle.write(buf)
- return True
- except:
- serial_handle = None
- _recv_data = b""
- return False
|