import asyncio import time from bleak import BleakScanner, BleakClient import threading from collections import deque from networkx import is_connected from utils.SingletonType import SingletonType from .RemoteControlV2 import RemoteControlV2 from .BaseClass import BaseClass from sockets.connect_manager import ConnectionManager class BlueToothMode(BaseClass,metaclass=SingletonType): instance = None init_flag = None def __init__(self, websocket_manager: ConnectionManager): super().__init__(websocket_manager) """此处设计为,如果已经存在实例时,不再执行初始化""" if self.init_flag: return else: self.init_flag = True self.msg_type = "blue_tooth" self.remote_control_v2 = RemoteControlV2(self, websocket_manager) # 用于存储找到的目标设备的地址 self.target_device_address = None self._lock = threading.Lock() self.connect_state = False self.receive_data = b"" self.devices = {} self.devices_name = {} self.last_value = None self.retry_num = 0 self.last_error = "" def __new__(cls, *args, **kwargs): """如果当前没有实例时,调用父类__new__方法,生成示例,有则返回保存的内存地址。""" if not cls.instance: cls.instance = super().__new__(cls) return cls.instance def print_error(self, text, *args): if text != self.last_error: self.last_error = text print(text) async def scan_for_esp32(self): """扫描附近的BLE设备,并寻找ESP32""" # print("打印连接状态", self.connect_state) if self.connect_state == True: return # self.sendSocketMessage( # code=0, msg="遥控设备V2 未连接", data=None, device_status=-1 # ) # print("Scanning for ESP32 devices...*****************") # print("打印连接状态2", self.connect_state) try: devices = await BleakScanner.discover() except BaseException as e: self.print_error("蓝牙疑似未打开,{}".format(e)) self.sendSocketMessage( code=0, msg="蓝牙疑似未打开,{}".format(e), data=None, device_status=-1 ) return for d in devices: # print(f"Device: {d.name} - Address: {d.address}") # 假设ESP32的广播名称包含"ESP32-S3"字符串 if d.name: if "ESP32-S3" in d.name: if d.address not in self.devices: self.devices_name[d.name] = d.address self.devices[d.address] = { "name": d.name, "client": None, "send_queue": deque(maxlen=20), "recv_queue": deque(maxlen=20), "receive_data": b"", "connect_state": True, } self.print_error( f"Found device: {d.name} - Address: {d.address}" ) asyncio.create_task(self.connect_and_listen(d.address)) self.connect_state = True # 注册到遥控器等上位机 def connect_device(self, address, name): print("注册到遥控器等上位机", address, name) if "remote" in name: self.remote_control_v2.to_connect_bluetooth(address, is_test=False) # 关闭连接 def disconnect_device(self, address, name): print("关闭蓝牙连接", address, name) if "remote" in name: self.print_error("71 关闭蓝牙连接{}-{}".format(address, name)) self.remote_control_v2.close_bluetooth_connect() self.connect_state = False self.receive_data = b"" self.devices = {} self.devices_name = {} self.last_value = None self.retry_num = 0 self.last_error = "" async def handle_disconnect(self, client): """处理断开连接事件""" self.print_error("Device was disconnected.") # 尝试重新连接 while not client.is_connected: try: self.print_error("Attempting to reconnect...") await client.connect() except Exception as e: self.print_error(f"Failed to reconnect: {e}") await asyncio.sleep(5) # 等待一段时间再重试 async def notification_handler(self, address, characteristic, data: bytearray): """处理接收到的通知数据""" with self._lock: # # if data == bytearray(b'UU\x01c\x9c'): # self.devices[address]['send_queue'].append(bytearray(b'UU\x01c\x9c')) # pass # else: # # print("60 data:", address, data) self.devices[address]["recv_queue"].append(data) self.devices[address]["connect_state"] = True async def write_characteristic(self, client, characteristic_uuid, data): """向指定特征写入数据""" try: await client.write_gatt_char(characteristic_uuid, data) except BaseException as e: self.print_error("write_characteristic error ", e) async def connect_and_listen(self, address): """连接到指定地址的ESP32设备并监听通知""" self.print_error("""连接到指定地址的ESP32设备并监听通知""") while True: # try: if len(self.devices) == 0: break async with BleakClient(address) as client: if not client.is_connected: self.print_error("Failed to connect to the device.") self.devices[address]["connect_state"] = False self.disconnect_device( address=address, name=self.devices[address]["name"] ) continue if len(self.devices) == 0: break self.devices[address]["connect_state"] = True self.print_error(f"Connected to {address}") # 获取服务和特征(假设你已经知道要监听的特征UUID) services = await client.get_services() for service in services: for char in service.characteristics: if "notify" in char.properties: self.print_error( f"Subscribing to characteristic: {char.uuid}" ) await client.start_notify( char, lambda char, data: asyncio.create_task( self.notification_handler(address, char, data) ), ) # 进入一个简单的循环,保持连接 self.print_error( "进入一个简单的循环 保持连接", self.devices[address]["name"] ) self.connect_device( address=address, name=self.devices[address]["name"] ) self.retry_num += 1 while True: if not client.is_connected: self.print_error( f"Device {address} disconnected unexpectedly." ) with self._lock: self.disconnect_device( address=address, name=self.devices[address]["name"] ) if len(self.devices) == 0: break self.devices[address]["connect_state"] = False break if len(self.devices) == 0: break if self.devices[address]["send_queue"]: with self._lock: send_data = self.devices[address][ "send_queue" ].popleft() # print("-----------> send_data:", self.change_hex_to_10_int(send_data)) await self.write_characteristic( client, char.uuid, send_data ) await asyncio.sleep(0.01) await asyncio.sleep(0.02) # except Exception as e: # with self._lock: # self.disconnect_device( # address=address, name=self.devices[address]["name"] # ) # self.devices[address]["connect_state"] = False # print(f"Error during connection or listening: {e}") # await asyncio.sleep(2) # 发生错误时等待一段时间再重试 async def main_func(self): """主函数""" # address = "24:EC:4A:26:4B:BE" # _name = "ESP32-S3-remote" # self.devices[address] = {'name': _name, # 'client': None, # 'send_queue': [], # 'recv_queue': [], # "receive_data": b"", # "connect_state": True, # } # self.devices_name[_name] = address # asyncio.create_task(self.connect_and_listen(address)) if self.connect_state == True: print('蓝牙已连接,不可重新连接') message = { "_type": "show_info", "plugins_mode": "remote_control", "data": "遥控设备V2 打开蓝牙成功", } self.sendSocketMessage( code=0, msg="遥控设备V2 打开蓝牙成功", data=message, device_status=2 ) return await self.scan_for_esp32() # 定期重新扫描以发现新设备 while True: if self.devices: await asyncio.sleep(20) else: await asyncio.sleep(3) await self.scan_for_esp32() def run(self): self.print_error("开启蓝牙扫描") asyncio.run(self.main_func()) def write_cmd(self, address, data: list): buf = [] buf.extend([0x55, 0x55, (0xFF & len(data))]) buf.extend(data) buf.extend([0xFF & ~sum(data)]) self.send_data(address, bytes(buf)) def send_data(self, address, byte_list: bytes): chunk_size = 20 chunks = [ byte_list[i : i + chunk_size] for i in range(0, len(byte_list), chunk_size) ] try: with self._lock: for chunk in chunks: self.devices[address]["send_queue"].append(chunk) except Exception as e: self.devices[address]["connect_state"] = False self.print_error(f"Error sending notification: {address},{e}") def change_hex_to_int(self, _bytearray): return " ".join([hex(x)[2:].zfill(2) for x in _bytearray]) def change_hex_to_10_int(self, _bytearray): return " ".join([str(int(x)) for x in _bytearray]) def read_cmd(self, time_out=10): data_list = [] for device_name in self.devices_name: address = self.devices_name[device_name] # print(self.devices[address]['connect_state']) while self.devices[address]["recv_queue"]: receive_data = self.read_cmd_one(address) if receive_data: data_list.append( { "device_name": device_name, "address": address, "receive_data": receive_data, } ) return data_list def read_cmd_one(self, address): # 获取所有缓冲区的字节 if not self.devices[address]["connect_state"]: return # print("读取数据....") while 1: receive_data = self.devices[address]["receive_data"] # print("166 receive_data", receive_data) if self.devices[address]["recv_queue"]: with self._lock: read_d = self.devices[address]["recv_queue"].popleft() # print("170 read_d", read_d) receive_data += read_d self.devices[address]["receive_data"] = receive_data if not receive_data: return None if len(receive_data) < 4: break # print("--------receive_data",self.change_hex_to_10_int(receive_data)) if receive_data[0] == 0x55 and receive_data[1] == 0x55: data_len = receive_data[2] if len(receive_data) < data_len + 4: # n += 1 # if n > time_out: # time.sleep(0.001) # return False # continue return _data = receive_data[3 : data_len + 4] # 更新缓存区 with self._lock: self.devices[address]["receive_data"] = receive_data[data_len + 4 :] receive_data = receive_data[data_len + 4 :] # 校验数据 if 0xFF & ~sum(_data[:-1]) == _data[-1]: return _data[:-1] else: print("数据异常,丢弃") return False else: # 起始位不是 55 55 进行移除 while receive_data: if len(receive_data) == 1: if receive_data[0] == 0x55: break else: with self._lock: self.devices[address]["receive_data"] = b"" receive_data = b"" else: if receive_data[0] == 0x55 and receive_data[1] == 0x55: break else: with self._lock: self.devices[address]["receive_data"] = receive_data[1:] receive_data = receive_data[1:] def analysis_received_data(self): receive_data = self.read_cmd(time_out=5) if not receive_data: return else: # print("receive_data:", receive_data) # print("read2 receive_data {}".format(self.change_hex_to_int(receive_data))) pass for data in receive_data: address = data["address"] rec_data = data["receive_data"] # print("read address {}".format(address)) # print("read receive_data {}".format(self.change_hex_to_int(rec_data))) # 数据 结构 command,按命令解析 if rec_data[0] == 1: # 扫码数据 bar_code = rec_data[1:].decode() bar_code = bar_code.replace("\r", "") bar_code = bar_code.replace("\n", "") print("bar_code:", bar_code) return if rec_data[0] == 2: print("read receive_data {}".format(self.change_hex_to_int(rec_data))) return if rec_data[0] == 90: print( "read receive_data-90 {}".format(self.change_hex_to_int(rec_data)) ) return if rec_data[0] == 99: blue_mode.write_cmd(address, [99]) # print("发送心跳包") return if rec_data[0] == 111: print( "{} receive_data-111 {}".format( self.retry_num, self.change_hex_to_10_int(rec_data) ) ) value = ( rec_data[1] << 24 | rec_data[2] << 16 | rec_data[3] << 8 | rec_data[4] ) if self.last_value is None: self.last_value = value else: self.last_value += 1 if self.last_value == value: flag = True else: flag = False print( "{} value:{},last_value;{},flag:{}\n".format( self.retry_num, value, self.last_value, flag ) ) if __name__ == "__main__": blue_mode = BlueToothMode(None) threading.Thread(target=blue_mode.run, args=()).start() print("=" * 50) n = 0 time.sleep(1) k = 0 last_t = time.time() while 1: time.sleep(0.001) s = time.time() address = "24:EC:4A:26:4B:BE" blue_mode.analysis_received_data() n += 1 if n == 500: print(s - last_t) for i in range(3): k += 1 data = [ 111, 0xFF & k >> 24, 0xFF & k >> 16, 0xFF & k >> 8, 0xFF & k, ] data.extend([x for x in range(100, 130)]) blue_mode.write_cmd(address=address, data=data) print("send_value:{}".format(k)) data = [90] blue_mode.write_cmd(address=address, data=data) n = 0 last_t = s