BlueToothMode.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414
  1. import asyncio
  2. import time
  3. from bleak import BleakScanner, BleakClient
  4. import threading
  5. from collections import deque
  6. from utils.SingletonType import SingletonType
  7. from .RemoteControlV2 import RemoteControlV2
  8. from .BaseClass import BaseClass
  9. from sockets.connect_manager import ConnectionManager
  10. class BlueToothMode(BaseClass,metaclass=SingletonType):
  11. instance = None
  12. init_flag = None
  13. def __init__(self, websocket_manager: ConnectionManager):
  14. super().__init__(websocket_manager)
  15. """此处设计为,如果已经存在实例时,不再执行初始化"""
  16. if self.init_flag:
  17. return
  18. else:
  19. self.init_flag = True
  20. self.remote_control_v2 = RemoteControlV2(self, websocket_manager)
  21. # 用于存储找到的目标设备的地址
  22. self.target_device_address = None
  23. self._lock = threading.Lock()
  24. self.connect_state = False
  25. self.receive_data = b""
  26. self.devices = {}
  27. self.devices_name = {}
  28. self.last_value = None
  29. self.retry_num = 0
  30. self.last_error = ""
  31. def __new__(cls, *args, **kwargs):
  32. """如果当前没有实例时,调用父类__new__方法,生成示例,有则返回保存的内存地址。"""
  33. if not cls.instance:
  34. cls.instance = super().__new__(cls)
  35. return cls.instance
  36. def print_error(self, text, *args):
  37. if text != self.last_error:
  38. self.last_error = text
  39. print(text)
  40. async def scan_for_esp32(self):
  41. """扫描附近的BLE设备,并寻找ESP32"""
  42. print("Scanning for ESP32 devices...*****************")
  43. try:
  44. devices = await BleakScanner.discover()
  45. except BaseException as e:
  46. self.print_error("蓝牙疑似未打开,{}".format(e))
  47. return
  48. for d in devices:
  49. # print(f"Device: {d.name} - Address: {d.address}")
  50. # 假设ESP32的广播名称包含"ESP32-S3"字符串
  51. if d.name:
  52. if "ESP32-S3" in d.name:
  53. if d.address not in self.devices:
  54. self.devices_name[d.name] = d.address
  55. self.devices[d.address] = {
  56. "name": d.name,
  57. "client": None,
  58. "send_queue": deque(maxlen=20),
  59. "recv_queue": deque(maxlen=20),
  60. "receive_data": b"",
  61. "connect_state": True,
  62. }
  63. self.print_error(
  64. f"Found device: {d.name} - Address: {d.address}"
  65. )
  66. asyncio.create_task(self.connect_and_listen(d.address))
  67. self.connect_state = True
  68. # 注册到遥控器等上位机
  69. def connect_device(self, address, name):
  70. print("注册到遥控器等上位机", address, name)
  71. if "remote" in name:
  72. self.remote_control_v2.to_connect_bluetooth(address, is_test=False)
  73. # 关闭连接
  74. def disconnect_device(self, address, name):
  75. print("关闭蓝牙连接", address, name)
  76. if "remote" in name:
  77. self.print_error("71 关闭蓝牙连接{}-{}".format(address, name))
  78. self.remote_control_v2.close_bluetooth_connect()
  79. async def handle_disconnect(self, client):
  80. """处理断开连接事件"""
  81. self.print_error("Device was disconnected.")
  82. # 尝试重新连接
  83. while not client.is_connected:
  84. try:
  85. self.print_error("Attempting to reconnect...")
  86. await client.connect()
  87. except Exception as e:
  88. self.print_error(f"Failed to reconnect: {e}")
  89. await asyncio.sleep(5) # 等待一段时间再重试
  90. async def notification_handler(self, address, characteristic, data: bytearray):
  91. """处理接收到的通知数据"""
  92. with self._lock:
  93. #
  94. # if data == bytearray(b'UU\x01c\x9c'):
  95. # self.devices[address]['send_queue'].append(bytearray(b'UU\x01c\x9c'))
  96. # pass
  97. # else:
  98. # # print("60 data:", address, data)
  99. self.devices[address]["recv_queue"].append(data)
  100. self.devices[address]["connect_state"] = True
  101. async def write_characteristic(self, client, characteristic_uuid, data):
  102. """向指定特征写入数据"""
  103. try:
  104. await client.write_gatt_char(characteristic_uuid, data)
  105. except BaseException as e:
  106. self.print_error("write_characteristic error ", e)
  107. async def connect_and_listen(self, address):
  108. """连接到指定地址的ESP32设备并监听通知"""
  109. self.print_error("""连接到指定地址的ESP32设备并监听通知""")
  110. while True:
  111. # try:
  112. async with BleakClient(address) as client:
  113. if not client.is_connected:
  114. self.print_error("Failed to connect to the device.")
  115. self.devices[address]["connect_state"] = False
  116. self.disconnect_device(
  117. address=address, name=self.devices[address]["name"]
  118. )
  119. continue
  120. self.devices[address]["connect_state"] = True
  121. self.print_error(f"Connected to {address}")
  122. # 获取服务和特征(假设你已经知道要监听的特征UUID)
  123. services = await client.get_services()
  124. for service in services:
  125. for char in service.characteristics:
  126. if "notify" in char.properties:
  127. self.print_error(
  128. f"Subscribing to characteristic: {char.uuid}"
  129. )
  130. await client.start_notify(
  131. char,
  132. lambda char, data: asyncio.create_task(
  133. self.notification_handler(address, char, data)
  134. ),
  135. )
  136. # 进入一个简单的循环,保持连接
  137. self.print_error(
  138. "进入一个简单的循环 保持连接", self.devices[address]["name"]
  139. )
  140. self.connect_device(
  141. address=address, name=self.devices[address]["name"]
  142. )
  143. self.retry_num += 1
  144. while True:
  145. if not client.is_connected:
  146. self.print_error(
  147. f"Device {address} disconnected unexpectedly."
  148. )
  149. with self._lock:
  150. self.disconnect_device(
  151. address=address, name=self.devices[address]["name"]
  152. )
  153. self.devices[address]["connect_state"] = False
  154. break
  155. if self.devices[address]["send_queue"]:
  156. with self._lock:
  157. send_data = self.devices[address][
  158. "send_queue"
  159. ].popleft()
  160. # print("-----------> send_data:", self.change_hex_to_10_int(send_data))
  161. await self.write_characteristic(
  162. client, char.uuid, send_data
  163. )
  164. await asyncio.sleep(0.01)
  165. await asyncio.sleep(0.02)
  166. # except Exception as e:
  167. # with self._lock:
  168. # self.disconnect_device(
  169. # address=address, name=self.devices[address]["name"]
  170. # )
  171. # self.devices[address]["connect_state"] = False
  172. # print(f"Error during connection or listening: {e}")
  173. # await asyncio.sleep(2) # 发生错误时等待一段时间再重试
  174. async def main_func(self):
  175. """主函数"""
  176. # address = "24:EC:4A:26:4B:BE"
  177. # _name = "ESP32-S3-remote"
  178. # self.devices[address] = {'name': _name,
  179. # 'client': None,
  180. # 'send_queue': [],
  181. # 'recv_queue': [],
  182. # "receive_data": b"",
  183. # "connect_state": True,
  184. # }
  185. # self.devices_name[_name] = address
  186. # asyncio.create_task(self.connect_and_listen(address))
  187. await self.scan_for_esp32()
  188. # 定期重新扫描以发现新设备
  189. while True:
  190. if self.devices:
  191. await asyncio.sleep(20)
  192. else:
  193. await asyncio.sleep(3)
  194. await self.scan_for_esp32()
  195. def run(self):
  196. self.print_error("开启蓝牙扫描")
  197. asyncio.run(self.main_func())
  198. def write_cmd(self, address, data: list):
  199. buf = []
  200. buf.extend([0x55, 0x55, (0xFF & len(data))])
  201. buf.extend(data)
  202. buf.extend([0xFF & ~sum(data)])
  203. self.send_data(address, bytes(buf))
  204. def send_data(self, address, byte_list: bytes):
  205. chunk_size = 20
  206. chunks = [
  207. byte_list[i : i + chunk_size] for i in range(0, len(byte_list), chunk_size)
  208. ]
  209. try:
  210. with self._lock:
  211. for chunk in chunks:
  212. self.devices[address]["send_queue"].append(chunk)
  213. except Exception as e:
  214. self.devices[address]["connect_state"] = False
  215. self.print_error(f"Error sending notification: {address},{e}")
  216. def change_hex_to_int(self, _bytearray):
  217. return " ".join([hex(x)[2:].zfill(2) for x in _bytearray])
  218. def change_hex_to_10_int(self, _bytearray):
  219. return " ".join([str(int(x)) for x in _bytearray])
  220. def read_cmd(self, time_out=10):
  221. data_list = []
  222. for device_name in self.devices_name:
  223. address = self.devices_name[device_name]
  224. # print(self.devices[address]['connect_state'])
  225. while self.devices[address]["recv_queue"]:
  226. receive_data = self.read_cmd_one(address)
  227. if receive_data:
  228. data_list.append(
  229. {
  230. "device_name": device_name,
  231. "address": address,
  232. "receive_data": receive_data,
  233. }
  234. )
  235. return data_list
  236. def read_cmd_one(self, address):
  237. # 获取所有缓冲区的字节
  238. if not self.devices[address]["connect_state"]:
  239. return
  240. # print("读取数据....")
  241. while 1:
  242. receive_data = self.devices[address]["receive_data"]
  243. # print("166 receive_data", receive_data)
  244. if self.devices[address]["recv_queue"]:
  245. with self._lock:
  246. read_d = self.devices[address]["recv_queue"].popleft()
  247. # print("170 read_d", read_d)
  248. receive_data += read_d
  249. self.devices[address]["receive_data"] = receive_data
  250. if not receive_data:
  251. return None
  252. if len(receive_data) < 4:
  253. break
  254. # print("--------receive_data",self.change_hex_to_10_int(receive_data))
  255. if receive_data[0] == 0x55 and receive_data[1] == 0x55:
  256. data_len = receive_data[2]
  257. if len(receive_data) < data_len + 4:
  258. # n += 1
  259. # if n > time_out:
  260. # time.sleep(0.001)
  261. # return False
  262. # continue
  263. return
  264. _data = receive_data[3 : data_len + 4]
  265. # 更新缓存区
  266. with self._lock:
  267. self.devices[address]["receive_data"] = receive_data[data_len + 4 :]
  268. receive_data = receive_data[data_len + 4 :]
  269. # 校验数据
  270. if 0xFF & ~sum(_data[:-1]) == _data[-1]:
  271. return _data[:-1]
  272. else:
  273. print("数据异常,丢弃")
  274. return False
  275. else:
  276. # 起始位不是 55 55 进行移除
  277. while receive_data:
  278. if len(receive_data) == 1:
  279. if receive_data[0] == 0x55:
  280. break
  281. else:
  282. with self._lock:
  283. self.devices[address]["receive_data"] = b""
  284. receive_data = b""
  285. else:
  286. if receive_data[0] == 0x55 and receive_data[1] == 0x55:
  287. break
  288. else:
  289. with self._lock:
  290. self.devices[address]["receive_data"] = receive_data[1:]
  291. receive_data = receive_data[1:]
  292. def analysis_received_data(self):
  293. receive_data = self.read_cmd(time_out=5)
  294. if not receive_data:
  295. return
  296. else:
  297. # print("receive_data:", receive_data)
  298. # print("read2 receive_data {}".format(self.change_hex_to_int(receive_data)))
  299. pass
  300. for data in receive_data:
  301. address = data["address"]
  302. rec_data = data["receive_data"]
  303. # print("read address {}".format(address))
  304. # print("read receive_data {}".format(self.change_hex_to_int(rec_data)))
  305. # 数据 结构 command,按命令解析
  306. if rec_data[0] == 1:
  307. # 扫码数据
  308. bar_code = rec_data[1:].decode()
  309. bar_code = bar_code.replace("\r", "")
  310. bar_code = bar_code.replace("\n", "")
  311. print("bar_code:", bar_code)
  312. return
  313. if rec_data[0] == 2:
  314. print("read receive_data {}".format(self.change_hex_to_int(rec_data)))
  315. return
  316. if rec_data[0] == 90:
  317. print(
  318. "read receive_data-90 {}".format(self.change_hex_to_int(rec_data))
  319. )
  320. return
  321. if rec_data[0] == 99:
  322. blue_mode.write_cmd(address, [99])
  323. # print("发送心跳包")
  324. return
  325. if rec_data[0] == 111:
  326. print(
  327. "{} receive_data-111 {}".format(
  328. self.retry_num, self.change_hex_to_10_int(rec_data)
  329. )
  330. )
  331. value = (
  332. rec_data[1] << 24
  333. | rec_data[2] << 16
  334. | rec_data[3] << 8
  335. | rec_data[4]
  336. )
  337. if self.last_value is None:
  338. self.last_value = value
  339. else:
  340. self.last_value += 1
  341. if self.last_value == value:
  342. flag = True
  343. else:
  344. flag = False
  345. print(
  346. "{} value:{},last_value;{},flag:{}\n".format(
  347. self.retry_num, value, self.last_value, flag
  348. )
  349. )
  350. if __name__ == "__main__":
  351. blue_mode = BlueToothMode(None)
  352. threading.Thread(target=blue_mode.run, args=()).start()
  353. print("=" * 50)
  354. n = 0
  355. time.sleep(1)
  356. k = 0
  357. last_t = time.time()
  358. while 1:
  359. time.sleep(0.001)
  360. s = time.time()
  361. address = "24:EC:4A:26:4B:BE"
  362. blue_mode.analysis_received_data()
  363. n += 1
  364. if n == 500:
  365. print(s - last_t)
  366. for i in range(3):
  367. k += 1
  368. data = [
  369. 111,
  370. 0xFF & k >> 24,
  371. 0xFF & k >> 16,
  372. 0xFF & k >> 8,
  373. 0xFF & k,
  374. ]
  375. data.extend([x for x in range(100, 130)])
  376. blue_mode.write_cmd(address=address, data=data)
  377. print("send_value:{}".format(k))
  378. data = [90]
  379. blue_mode.write_cmd(address=address, data=data)
  380. n = 0
  381. last_t = s