| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439 |
- import asyncio
- import time
- from bleak import BleakScanner, BleakClient
- import threading
- from collections import deque
- from networkx import is_connected
- from utils.SingletonType import SingletonType
- from .RemoteControlV2 import RemoteControlV2
- from .BaseClass import BaseClass
- from sockets.connect_manager import ConnectionManager
- class BlueToothMode(BaseClass,metaclass=SingletonType):
- instance = None
- init_flag = None
- def __init__(self, websocket_manager: ConnectionManager):
- super().__init__(websocket_manager)
- """此处设计为,如果已经存在实例时,不再执行初始化"""
- if self.init_flag:
- return
- else:
- self.init_flag = True
- self.msg_type = "blue_tooth"
- self.remote_control_v2 = RemoteControlV2(self, websocket_manager)
- # 用于存储找到的目标设备的地址
- self.target_device_address = None
- self._lock = threading.Lock()
- self.connect_state = False
- self.receive_data = b""
- self.devices = {}
- self.devices_name = {}
- self.last_value = None
- self.retry_num = 0
- self.last_error = ""
- def __new__(cls, *args, **kwargs):
- """如果当前没有实例时,调用父类__new__方法,生成示例,有则返回保存的内存地址。"""
- if not cls.instance:
- cls.instance = super().__new__(cls)
- return cls.instance
- def print_error(self, text, *args):
- if text != self.last_error:
- self.last_error = text
- print(text)
- async def scan_for_esp32(self):
- """扫描附近的BLE设备,并寻找ESP32"""
- if self.connect_state == True:
- return
- print("Scanning for ESP32 devices...*****************")
- try:
- devices = await BleakScanner.discover()
- except BaseException as e:
- self.print_error("蓝牙疑似未打开,{}".format(e))
- return
- for d in devices:
- # print(f"Device: {d.name} - Address: {d.address}")
- # 假设ESP32的广播名称包含"ESP32-S3"字符串
- if d.name:
- if "ESP32-S3" in d.name:
- if d.address not in self.devices:
- self.devices_name[d.name] = d.address
- self.devices[d.address] = {
- "name": d.name,
- "client": None,
- "send_queue": deque(maxlen=20),
- "recv_queue": deque(maxlen=20),
- "receive_data": b"",
- "connect_state": True,
- }
- self.print_error(
- f"Found device: {d.name} - Address: {d.address}"
- )
- asyncio.create_task(self.connect_and_listen(d.address))
- self.connect_state = True
- # 注册到遥控器等上位机
- def connect_device(self, address, name):
- print("注册到遥控器等上位机", address, name)
- if "remote" in name:
- self.remote_control_v2.to_connect_bluetooth(address, is_test=False)
- # 关闭连接
- def disconnect_device(self, address, name):
- print("关闭蓝牙连接", address, name)
- if "remote" in name:
- self.print_error("71 关闭蓝牙连接{}-{}".format(address, name))
- self.remote_control_v2.close_bluetooth_connect()
- self.connect_state = False
- self.receive_data = b""
- self.devices = {}
- self.devices_name = {}
- self.last_value = None
- self.retry_num = 0
- self.last_error = ""
- async def handle_disconnect(self, client):
- """处理断开连接事件"""
- self.print_error("Device was disconnected.")
- # 尝试重新连接
- while not client.is_connected:
- try:
- self.print_error("Attempting to reconnect...")
- await client.connect()
- except Exception as e:
- self.print_error(f"Failed to reconnect: {e}")
- await asyncio.sleep(5) # 等待一段时间再重试
- async def notification_handler(self, address, characteristic, data: bytearray):
- """处理接收到的通知数据"""
- with self._lock:
- #
- # if data == bytearray(b'UU\x01c\x9c'):
- # self.devices[address]['send_queue'].append(bytearray(b'UU\x01c\x9c'))
- # pass
- # else:
- # # print("60 data:", address, data)
- self.devices[address]["recv_queue"].append(data)
- self.devices[address]["connect_state"] = True
- async def write_characteristic(self, client, characteristic_uuid, data):
- """向指定特征写入数据"""
- try:
- await client.write_gatt_char(characteristic_uuid, data)
- except BaseException as e:
- self.print_error("write_characteristic error ", e)
- async def connect_and_listen(self, address):
- """连接到指定地址的ESP32设备并监听通知"""
- self.print_error("""连接到指定地址的ESP32设备并监听通知""")
- while True:
- # try:
- if len(self.devices) == 0:
- break
- async with BleakClient(address) as client:
- if not client.is_connected:
- self.print_error("Failed to connect to the device.")
- self.devices[address]["connect_state"] = False
- self.disconnect_device(
- address=address, name=self.devices[address]["name"]
- )
- continue
- if len(self.devices) == 0:
- break
- self.devices[address]["connect_state"] = True
- self.print_error(f"Connected to {address}")
- # 获取服务和特征(假设你已经知道要监听的特征UUID)
- services = await client.get_services()
- for service in services:
- for char in service.characteristics:
- if "notify" in char.properties:
- self.print_error(
- f"Subscribing to characteristic: {char.uuid}"
- )
- await client.start_notify(
- char,
- lambda char, data: asyncio.create_task(
- self.notification_handler(address, char, data)
- ),
- )
- # 进入一个简单的循环,保持连接
- self.print_error(
- "进入一个简单的循环 保持连接", self.devices[address]["name"]
- )
- self.connect_device(
- address=address, name=self.devices[address]["name"]
- )
- self.retry_num += 1
- while True:
- if not client.is_connected:
- self.print_error(
- f"Device {address} disconnected unexpectedly."
- )
- with self._lock:
- self.disconnect_device(
- address=address, name=self.devices[address]["name"]
- )
- self.devices[address]["connect_state"] = False
- break
- if len(self.devices) == 0:
- break
- if self.devices[address]["send_queue"]:
- with self._lock:
- send_data = self.devices[address][
- "send_queue"
- ].popleft()
- # print("-----------> send_data:", self.change_hex_to_10_int(send_data))
- await self.write_characteristic(
- client, char.uuid, send_data
- )
- await asyncio.sleep(0.01)
- await asyncio.sleep(0.02)
- # except Exception as e:
- # with self._lock:
- # self.disconnect_device(
- # address=address, name=self.devices[address]["name"]
- # )
- # self.devices[address]["connect_state"] = False
- # print(f"Error during connection or listening: {e}")
- # await asyncio.sleep(2) # 发生错误时等待一段时间再重试
- async def main_func(self):
- """主函数"""
- # address = "24:EC:4A:26:4B:BE"
- # _name = "ESP32-S3-remote"
- # self.devices[address] = {'name': _name,
- # 'client': None,
- # 'send_queue': [],
- # 'recv_queue': [],
- # "receive_data": b"",
- # "connect_state": True,
- # }
- # self.devices_name[_name] = address
- # asyncio.create_task(self.connect_and_listen(address))
- if self.connect_state == True:
- print('蓝牙已连接,不可重新连接')
- message = {
- "_type": "show_info",
- "plugins_mode": "remote_control",
- "data": "遥控设备V2 打开蓝牙成功",
- }
- self.sendSocketMessage(code=0, msg="遥控设备V2 打开蓝牙成功", data=message)
- return
- await self.scan_for_esp32()
- # 定期重新扫描以发现新设备
- while True:
- if self.devices:
- await asyncio.sleep(20)
- else:
- await asyncio.sleep(3)
- await self.scan_for_esp32()
- def run(self):
- self.print_error("开启蓝牙扫描")
- asyncio.run(self.main_func())
- def write_cmd(self, address, data: list):
- buf = []
- buf.extend([0x55, 0x55, (0xFF & len(data))])
- buf.extend(data)
- buf.extend([0xFF & ~sum(data)])
- self.send_data(address, bytes(buf))
- def send_data(self, address, byte_list: bytes):
- chunk_size = 20
- chunks = [
- byte_list[i : i + chunk_size] for i in range(0, len(byte_list), chunk_size)
- ]
- try:
- with self._lock:
- for chunk in chunks:
- self.devices[address]["send_queue"].append(chunk)
- except Exception as e:
- self.devices[address]["connect_state"] = False
- self.print_error(f"Error sending notification: {address},{e}")
- def change_hex_to_int(self, _bytearray):
- return " ".join([hex(x)[2:].zfill(2) for x in _bytearray])
- def change_hex_to_10_int(self, _bytearray):
- return " ".join([str(int(x)) for x in _bytearray])
- def read_cmd(self, time_out=10):
- data_list = []
- for device_name in self.devices_name:
- address = self.devices_name[device_name]
- # print(self.devices[address]['connect_state'])
- while self.devices[address]["recv_queue"]:
- receive_data = self.read_cmd_one(address)
- if receive_data:
- data_list.append(
- {
- "device_name": device_name,
- "address": address,
- "receive_data": receive_data,
- }
- )
- return data_list
- def read_cmd_one(self, address):
- # 获取所有缓冲区的字节
- if not self.devices[address]["connect_state"]:
- return
- # print("读取数据....")
- while 1:
- receive_data = self.devices[address]["receive_data"]
- # print("166 receive_data", receive_data)
- if self.devices[address]["recv_queue"]:
- with self._lock:
- read_d = self.devices[address]["recv_queue"].popleft()
- # print("170 read_d", read_d)
- receive_data += read_d
- self.devices[address]["receive_data"] = receive_data
- if not receive_data:
- return None
- if len(receive_data) < 4:
- break
- # print("--------receive_data",self.change_hex_to_10_int(receive_data))
- if receive_data[0] == 0x55 and receive_data[1] == 0x55:
- data_len = receive_data[2]
- if len(receive_data) < data_len + 4:
- # n += 1
- # if n > time_out:
- # time.sleep(0.001)
- # return False
- # continue
- return
- _data = receive_data[3 : data_len + 4]
- # 更新缓存区
- with self._lock:
- self.devices[address]["receive_data"] = receive_data[data_len + 4 :]
- receive_data = receive_data[data_len + 4 :]
- # 校验数据
- if 0xFF & ~sum(_data[:-1]) == _data[-1]:
- return _data[:-1]
- else:
- print("数据异常,丢弃")
- return False
- else:
- # 起始位不是 55 55 进行移除
- while receive_data:
- if len(receive_data) == 1:
- if receive_data[0] == 0x55:
- break
- else:
- with self._lock:
- self.devices[address]["receive_data"] = b""
- receive_data = b""
- else:
- if receive_data[0] == 0x55 and receive_data[1] == 0x55:
- break
- else:
- with self._lock:
- self.devices[address]["receive_data"] = receive_data[1:]
- receive_data = receive_data[1:]
- def analysis_received_data(self):
- receive_data = self.read_cmd(time_out=5)
- if not receive_data:
- return
- else:
- # print("receive_data:", receive_data)
- # print("read2 receive_data {}".format(self.change_hex_to_int(receive_data)))
- pass
- for data in receive_data:
- address = data["address"]
- rec_data = data["receive_data"]
- # print("read address {}".format(address))
- # print("read receive_data {}".format(self.change_hex_to_int(rec_data)))
- # 数据 结构 command,按命令解析
- if rec_data[0] == 1:
- # 扫码数据
- bar_code = rec_data[1:].decode()
- bar_code = bar_code.replace("\r", "")
- bar_code = bar_code.replace("\n", "")
- print("bar_code:", bar_code)
- return
- if rec_data[0] == 2:
- print("read receive_data {}".format(self.change_hex_to_int(rec_data)))
- return
- if rec_data[0] == 90:
- print(
- "read receive_data-90 {}".format(self.change_hex_to_int(rec_data))
- )
- return
- if rec_data[0] == 99:
- blue_mode.write_cmd(address, [99])
- # print("发送心跳包")
- return
- if rec_data[0] == 111:
- print(
- "{} receive_data-111 {}".format(
- self.retry_num, self.change_hex_to_10_int(rec_data)
- )
- )
- value = (
- rec_data[1] << 24
- | rec_data[2] << 16
- | rec_data[3] << 8
- | rec_data[4]
- )
- if self.last_value is None:
- self.last_value = value
- else:
- self.last_value += 1
- if self.last_value == value:
- flag = True
- else:
- flag = False
- print(
- "{} value:{},last_value;{},flag:{}\n".format(
- self.retry_num, value, self.last_value, flag
- )
- )
- if __name__ == "__main__":
- blue_mode = BlueToothMode(None)
- threading.Thread(target=blue_mode.run, args=()).start()
- print("=" * 50)
- n = 0
- time.sleep(1)
- k = 0
- last_t = time.time()
- while 1:
- time.sleep(0.001)
- s = time.time()
- address = "24:EC:4A:26:4B:BE"
- blue_mode.analysis_received_data()
- n += 1
- if n == 500:
- print(s - last_t)
- for i in range(3):
- k += 1
- data = [
- 111,
- 0xFF & k >> 24,
- 0xFF & k >> 16,
- 0xFF & k >> 8,
- 0xFF & k,
- ]
- data.extend([x for x in range(100, 130)])
- blue_mode.write_cmd(address=address, data=data)
- print("send_value:{}".format(k))
- data = [90]
- blue_mode.write_cmd(address=address, data=data)
- n = 0
- last_t = s
|