| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122 |
- import asyncio
- from sockets import ConnectionManager
- from utils.common import message_queue
- from mcu.capture.smart_shooter_class import SmartShooter
- class BaseClass:
- def __init__(
- self, websocket_manager: ConnectionManager, smart_shooter: SmartShooter = None
- ):
- self.websocket_manager = websocket_manager
- self.smart_shooter = smart_shooter
- self.msg_type = ""
- self.websocket = None
- # -1连接失败 0未连接 1连接中 2连接成功 3端口占用
- # self.device_status = 2
- def sendSocketMessage(self, code=0, msg="", data=None, device_status=2):
- data = {
- "code": code,
- "msg": msg,
- "status": device_status,
- "data": data,
- "msg_type": self.msg_type,
- }
- loop = asyncio.get_event_loop()
- if self.websocket == None:
- loop.create_task(message_queue.put(data))
- else:
- loop.create_task(
- self.websocket_manager.send_personal_message(data, self.websocket)
- )
- print("\033[1;32;40m 发送消息===>sendSocketMessage \033[0m", data)
- async def asyncSendSocketMessage(self, code=0, msg="", data=None, device_status=2):
- data = {
- "code": code,
- "msg": msg,
- "status": device_status,
- "data": data,
- "msg_type": self.msg_type,
- }
- if self.websocket == None:
- await asyncio.wait_for(message_queue.put(data), timeout=0.1)
- else:
- await self.websocket_manager.send_personal_message(data, self.websocket)
- def change_hex_to_int(self, _bytearray):
- return " ".join([hex(x) for x in _bytearray])
- def read_cmd(self, serial_handle, check=None):
- n = 0
- while 1:
- try:
- read_d = serial_handle.read_all() # 读取接收到的数据
- self.receive_data += read_d
- except BaseException as e:
- print("171串口接收报错", e)
- self.serial_handle = None
- return False
- if len(self.receive_data) < 4:
- break
- if self.receive_data[0] == 0x55 and self.receive_data[1] == 0x55:
- # print("read ori ", self.change_hex_to_int(self.receive_data))
- data_len = self.receive_data[2]
- if len(self.receive_data) < data_len + 4:
- # 此处需要超时机制
- # print("数据长度不够,等待下次读取")
- # 超时退出
- # if not self.serial_handle.txdone():
- # return None
- # n += 1
- # if n > out_time_n:
- # return None
- # time.sleep(0.01)
- continue
- _data = self.receive_data[3 : data_len + 4]
- # 更新缓存区
- self.receive_data = self.receive_data[data_len + 4 :]
- # 校验数据
- if 0xFF & ~sum(_data[:-1]) == _data[-1]:
- # print("receive_data:", self.change_hex_to_int(self.receive_data[:-1]))
- return _data[:-1]
- else:
- return None
- else:
- # print("起始位不是 55 55 进行移除", self.receive_data[0])
- # 起始位不是 55 55 进行移除
- while self.receive_data:
- if len(self.receive_data) == 1:
- if self.receive_data[0] == 0x55:
- break
- else:
- self.receive_data = b""
- else:
- if (
- self.receive_data[0] == 0x55
- and self.receive_data[1] == 0x55
- ):
- break
- else:
- self.receive_data = self.receive_data[1:]
- def write_cmd(self, serial_handle, data: list):
- if serial_handle:
- # data = [(0xff & par1), (0xff & (par1 >> 8))]
- # self.clearn_flush()
- buf = bytearray(b"")
- buf.extend([0x55, 0x55, (0xFF & len(data))])
- buf.extend(data)
- buf.extend([0xFF & ~sum(data)])
- # 55 55 02 5a 01 a4
- # print("send buf {}".format(self.change_hex_to_int(buf)))
- try:
- serial_handle.write(buf)
- return True
- except:
- serial_handle = None
- _recv_data = b""
- return False
|