BlueToothMode.py 18 KB


  1. import asyncio
  2. import time
  3. from bleak import BleakScanner, BleakClient
  4. import threading
  5. from collections import deque
  6. from networkx import is_connected
  7. from utils.SingletonType import SingletonType
  8. from .RemoteControlV2 import RemoteControlV2
  9. from .BaseClass import BaseClass
  10. from sockets.connect_manager import ConnectionManager
  11. class BlueToothMode(BaseClass,metaclass=SingletonType):
  12. instance = None
  13. init_flag = None
  14. def __init__(self, websocket_manager: ConnectionManager):
  15. super().__init__(websocket_manager)
  16. """此处设计为,如果已经存在实例时,不再执行初始化"""
  17. if self.init_flag:
  18. return
  19. else:
  20. self.init_flag = True
  21. self.msg_type = "blue_tooth"
  22. self.remote_control_v2 = RemoteControlV2(self, websocket_manager)
  23. # 用于存储找到的目标设备的地址
  24. self.target_device_address = None
  25. self._lock = threading.Lock()
  26. self.connect_state = False
  27. self.receive_data = b""
  28. self.devices = {}
  29. self.devices_name = {}
  30. self.last_value = None
  31. self.retry_num = 0
  32. self.last_error = ""
  33. def __new__(cls, *args, **kwargs):
  34. """如果当前没有实例时,调用父类__new__方法,生成示例,有则返回保存的内存地址。"""
  35. if not cls.instance:
  36. cls.instance = super().__new__(cls)
  37. return cls.instance
  38. def print_error(self, text, *args):
  39. if text != self.last_error:
  40. self.last_error = text
  41. print(text)
  42. async def scan_for_esp32(self):
  43. """扫描附近的BLE设备,并寻找ESP32"""
  44. if self.connect_state == True:
  45. return
  46. # self.sendSocketMessage(
  47. # code=0, msg="遥控设备V2 未连接", data=None, device_status=-1
  48. # )
  49. # print("Scanning for ESP32 devices...*****************")
  50. try:
  51. devices = await BleakScanner.discover()
  52. except BaseException as e:
  53. self.print_error("蓝牙疑似未打开,{}".format(e))
  54. self.sendSocketMessage(
  55. code=0, msg="蓝牙疑似未打开,{}".format(e), data=None, device_status=-1
  56. )
  57. return
  58. for d in devices:
  59. # print(f"Device: {d.name} - Address: {d.address}")
  60. # 假设ESP32的广播名称包含"ESP32-S3"字符串
  61. if d.name:
  62. if "ESP32-S3" in d.name:
  63. if d.address not in self.devices:
  64. self.devices_name[d.name] = d.address
  65. self.devices[d.address] = {
  66. "name": d.name,
  67. "client": None,
  68. "send_queue": deque(maxlen=20),
  69. "recv_queue": deque(maxlen=20),
  70. "receive_data": b"",
  71. "connect_state": True,
  72. }
  73. self.print_error(
  74. f"Found device: {d.name} - Address: {d.address}"
  75. )
  76. asyncio.create_task(self.connect_and_listen(d.address))
  77. self.connect_state = True
  78. # 注册到遥控器等上位机
  79. def connect_device(self, address, name):
  80. print("注册到遥控器等上位机", address, name)
  81. if "remote" in name:
  82. self.remote_control_v2.to_connect_bluetooth(address, is_test=False)
  83. # 关闭连接
  84. def disconnect_device(self, address, name):
  85. print("关闭蓝牙连接", address, name)
  86. if "remote" in name:
  87. self.print_error("71 关闭蓝牙连接{}-{}".format(address, name))
  88. self.remote_control_v2.close_bluetooth_connect()
  89. self.connect_state = False
  90. self.receive_data = b""
  91. self.devices = {}
  92. self.devices_name = {}
  93. self.last_value = None
  94. self.retry_num = 0
  95. self.last_error = ""
  96. async def handle_disconnect(self, client):
  97. """处理断开连接事件"""
  98. self.print_error("Device was disconnected.")
  99. # 尝试重新连接
  100. while not client.is_connected:
  101. try:
  102. self.print_error("Attempting to reconnect...")
  103. await client.connect()
  104. except Exception as e:
  105. self.print_error(f"Failed to reconnect: {e}")
  106. await asyncio.sleep(5) # 等待一段时间再重试
  107. async def notification_handler(self, address, characteristic, data: bytearray):
  108. """处理接收到的通知数据"""
  109. with self._lock:
  110. #
  111. # if data == bytearray(b'UU\x01c\x9c'):
  112. # self.devices[address]['send_queue'].append(bytearray(b'UU\x01c\x9c'))
  113. # pass
  114. # else:
  115. # # print("60 data:", address, data)
  116. self.devices[address]["recv_queue"].append(data)
  117. self.devices[address]["connect_state"] = True
  118. async def write_characteristic(self, client, characteristic_uuid, data):
  119. """向指定特征写入数据"""
  120. try:
  121. await client.write_gatt_char(characteristic_uuid, data)
  122. except BaseException as e:
  123. self.print_error("write_characteristic error ", e)
  124. async def connect_and_listen(self, address):
  125. """连接到指定地址的ESP32设备并监听通知"""
  126. self.print_error("""连接到指定地址的ESP32设备并监听通知""")
  127. while True:
  128. # try:
  129. if len(self.devices) == 0:
  130. break
  131. async with BleakClient(address) as client:
  132. if not client.is_connected:
  133. self.print_error("Failed to connect to the device.")
  134. self.devices[address]["connect_state"] = False
  135. self.disconnect_device(
  136. address=address, name=self.devices[address]["name"]
  137. )
  138. continue
  139. if len(self.devices) == 0:
  140. break
  141. self.devices[address]["connect_state"] = True
  142. self.print_error(f"Connected to {address}")
  143. # 获取服务和特征(假设你已经知道要监听的特征UUID)
  144. services = await client.get_services()
  145. for service in services:
  146. for char in service.characteristics:
  147. if "notify" in char.properties:
  148. self.print_error(
  149. f"Subscribing to characteristic: {char.uuid}"
  150. )
  151. await client.start_notify(
  152. char,
  153. lambda char, data: asyncio.create_task(
  154. self.notification_handler(address, char, data)
  155. ),
  156. )
  157. # 进入一个简单的循环,保持连接
  158. self.print_error(
  159. "进入一个简单的循环 保持连接", self.devices[address]["name"]
  160. )
  161. self.connect_device(
  162. address=address, name=self.devices[address]["name"]
  163. )
  164. self.retry_num += 1
  165. while True:
  166. if not client.is_connected:
  167. self.print_error(
  168. f"Device {address} disconnected unexpectedly."
  169. )
  170. with self._lock:
  171. self.disconnect_device(
  172. address=address, name=self.devices[address]["name"]
  173. )
  174. if len(self.devices) == 0:
  175. break
  176. self.devices[address]["connect_state"] = False
  177. break
  178. if len(self.devices) == 0:
  179. break
  180. if self.devices[address]["send_queue"]:
  181. with self._lock:
  182. send_data = self.devices[address][
  183. "send_queue"
  184. ].popleft()
  185. # print("-----------> send_data:", self.change_hex_to_10_int(send_data))
  186. await self.write_characteristic(
  187. client, char.uuid, send_data
  188. )
  189. await asyncio.sleep(0.01)
  190. await asyncio.sleep(0.02)
  191. # except Exception as e:
  192. # with self._lock:
  193. # self.disconnect_device(
  194. # address=address, name=self.devices[address]["name"]
  195. # )
  196. # self.devices[address]["connect_state"] = False
  197. # print(f"Error during connection or listening: {e}")
  198. # await asyncio.sleep(2) # 发生错误时等待一段时间再重试
  199. async def main_func(self):
  200. """主函数"""
  201. # address = "24:EC:4A:26:4B:BE"
  202. # _name = "ESP32-S3-remote"
  203. # self.devices[address] = {'name': _name,
  204. # 'client': None,
  205. # 'send_queue': [],
  206. # 'recv_queue': [],
  207. # "receive_data": b"",
  208. # "connect_state": True,
  209. # }
  210. # self.devices_name[_name] = address
  211. # asyncio.create_task(self.connect_and_listen(address))
  212. if self.connect_state == True:
  213. print('蓝牙已连接,不可重新连接')
  214. message = {
  215. "_type": "show_info",
  216. "plugins_mode": "remote_control",
  217. "data": "遥控设备V2 打开蓝牙成功",
  218. }
  219. self.sendSocketMessage(
  220. code=0, msg="遥控设备V2 打开蓝牙成功", data=message, device_status=2
  221. )
  222. return
  223. await self.scan_for_esp32()
  224. # 定期重新扫描以发现新设备
  225. while True:
  226. if self.devices:
  227. await asyncio.sleep(20)
  228. else:
  229. await asyncio.sleep(3)
  230. await self.scan_for_esp32()
  231. def run(self):
  232. self.print_error("开启蓝牙扫描")
  233. asyncio.run(self.main_func())
  234. def write_cmd(self, address, data: list):
  235. buf = []
  236. buf.extend([0x55, 0x55, (0xFF & len(data))])
  237. buf.extend(data)
  238. buf.extend([0xFF & ~sum(data)])
  239. self.send_data(address, bytes(buf))
  240. def send_data(self, address, byte_list: bytes):
  241. chunk_size = 20
  242. chunks = [
  243. byte_list[i : i + chunk_size] for i in range(0, len(byte_list), chunk_size)
  244. ]
  245. try:
  246. with self._lock:
  247. for chunk in chunks:
  248. self.devices[address]["send_queue"].append(chunk)
  249. except Exception as e:
  250. self.devices[address]["connect_state"] = False
  251. self.print_error(f"Error sending notification: {address},{e}")
  252. def change_hex_to_int(self, _bytearray):
  253. return " ".join([hex(x)[2:].zfill(2) for x in _bytearray])
  254. def change_hex_to_10_int(self, _bytearray):
  255. return " ".join([str(int(x)) for x in _bytearray])
  256. def read_cmd(self, time_out=10):
  257. data_list = []
  258. for device_name in self.devices_name:
  259. address = self.devices_name[device_name]
  260. # print(self.devices[address]['connect_state'])
  261. while self.devices[address]["recv_queue"]:
  262. receive_data = self.read_cmd_one(address)
  263. if receive_data:
  264. data_list.append(
  265. {
  266. "device_name": device_name,
  267. "address": address,
  268. "receive_data": receive_data,
  269. }
  270. )
  271. return data_list
  272. def read_cmd_one(self, address):
  273. # 获取所有缓冲区的字节
  274. if not self.devices[address]["connect_state"]:
  275. return
  276. # print("读取数据....")
  277. while 1:
  278. receive_data = self.devices[address]["receive_data"]
  279. # print("166 receive_data", receive_data)
  280. if self.devices[address]["recv_queue"]:
  281. with self._lock:
  282. read_d = self.devices[address]["recv_queue"].popleft()
  283. # print("170 read_d", read_d)
  284. receive_data += read_d
  285. self.devices[address]["receive_data"] = receive_data
  286. if not receive_data:
  287. return None
  288. if len(receive_data) < 4:
  289. break
  290. # print("--------receive_data",self.change_hex_to_10_int(receive_data))
  291. if receive_data[0] == 0x55 and receive_data[1] == 0x55:
  292. data_len = receive_data[2]
  293. if len(receive_data) < data_len + 4:
  294. # n += 1
  295. # if n > time_out:
  296. # time.sleep(0.001)
  297. # return False
  298. # continue
  299. return
  300. _data = receive_data[3 : data_len + 4]
  301. # 更新缓存区
  302. with self._lock:
  303. self.devices[address]["receive_data"] = receive_data[data_len + 4 :]
  304. receive_data = receive_data[data_len + 4 :]
  305. # 校验数据
  306. if 0xFF & ~sum(_data[:-1]) == _data[-1]:
  307. return _data[:-1]
  308. else:
  309. print("数据异常,丢弃")
  310. return False
  311. else:
  312. # 起始位不是 55 55 进行移除
  313. while receive_data:
  314. if len(receive_data) == 1:
  315. if receive_data[0] == 0x55:
  316. break
  317. else:
  318. with self._lock:
  319. self.devices[address]["receive_data"] = b""
  320. receive_data = b""
  321. else:
  322. if receive_data[0] == 0x55 and receive_data[1] == 0x55:
  323. break
  324. else:
  325. with self._lock:
  326. self.devices[address]["receive_data"] = receive_data[1:]
  327. receive_data = receive_data[1:]
  328. def analysis_received_data(self):
  329. receive_data = self.read_cmd(time_out=5)
  330. if not receive_data:
  331. return
  332. else:
  333. # print("receive_data:", receive_data)
  334. # print("read2 receive_data {}".format(self.change_hex_to_int(receive_data)))
  335. pass
  336. for data in receive_data:
  337. address = data["address"]
  338. rec_data = data["receive_data"]
  339. # print("read address {}".format(address))
  340. # print("read receive_data {}".format(self.change_hex_to_int(rec_data)))
  341. # 数据 结构 command,按命令解析
  342. if rec_data[0] == 1:
  343. # 扫码数据
  344. bar_code = rec_data[1:].decode()
  345. bar_code = bar_code.replace("\r", "")
  346. bar_code = bar_code.replace("\n", "")
  347. print("bar_code:", bar_code)
  348. return
  349. if rec_data[0] == 2:
  350. print("read receive_data {}".format(self.change_hex_to_int(rec_data)))
  351. return
  352. if rec_data[0] == 90:
  353. print(
  354. "read receive_data-90 {}".format(self.change_hex_to_int(rec_data))
  355. )
  356. return
  357. if rec_data[0] == 99:
  358. blue_mode.write_cmd(address, [99])
  359. # print("发送心跳包")
  360. return
  361. if rec_data[0] == 111:
  362. print(
  363. "{} receive_data-111 {}".format(
  364. self.retry_num, self.change_hex_to_10_int(rec_data)
  365. )
  366. )
  367. value = (
  368. rec_data[1] << 24
  369. | rec_data[2] << 16
  370. | rec_data[3] << 8
  371. | rec_data[4]
  372. )
  373. if self.last_value is None:
  374. self.last_value = value
  375. else:
  376. self.last_value += 1
  377. if self.last_value == value:
  378. flag = True
  379. else:
  380. flag = False
  381. print(
  382. "{} value:{},last_value;{},flag:{}\n".format(
  383. self.retry_num, value, self.last_value, flag
  384. )
  385. )
  386. if __name__ == "__main__":
  387. blue_mode = BlueToothMode(None)
  388. threading.Thread(target=blue_mode.run, args=()).start()
  389. print("=" * 50)
  390. n = 0
  391. time.sleep(1)
  392. k = 0
  393. last_t = time.time()
  394. while 1:
  395. time.sleep(0.001)
  396. s = time.time()
  397. address = "24:EC:4A:26:4B:BE"
  398. blue_mode.analysis_received_data()
  399. n += 1
  400. if n == 500:
  401. print(s - last_t)
  402. for i in range(3):
  403. k += 1
  404. data = [
  405. 111,
  406. 0xFF & k >> 24,
  407. 0xFF & k >> 16,
  408. 0xFF & k >> 8,
  409. 0xFF & k,
  410. ]
  411. data.extend([x for x in range(100, 130)])
  412. blue_mode.write_cmd(address=address, data=data)
  413. print("send_value:{}".format(k))
  414. data = [90]
  415. blue_mode.write_cmd(address=address, data=data)
  416. n = 0
  417. last_t = s