socket_server.py 8.8 KB


  1. import json
  2. import asyncio
  3. from models import *
  4. from .connect_manager import ConnectionManager
  5. from .message_handler import *
  6. from mcu.DeviceControl import DeviceControl, checkMcuConnection
  7. from mcu.BlueToothMode import BlueToothMode
  8. from mcu.capture.smart_shooter_class import SmartShooter
  9. import time
  10. from .socket_client import socket_manager
  11. from sqlalchemy.exc import NoResultFound
  12. import os, datetime
  13. conn_manager = ConnectionManager()
  14. active_connections = set()
  15. device_ctrl = DeviceControl(websocket_manager=conn_manager)
  16. blue_tooth = BlueToothMode(websocket_manager=conn_manager)
  17. smart_shooter = SmartShooter(websocket_manager=conn_manager)
  18. from utils.common import message_queue
  19. async def updateDataRecord(PhotoFilename, id):
  20. await asyncio.sleep(0.01)
  21. create_time = datetime.datetime.fromtimestamp(os.path.getctime(PhotoFilename))
  22. data = {"id": id, "image_path": PhotoFilename, "photo_create_time": create_time}
  23. # record_model = PhotoRecord(**data)
  24. session = SqlQuery()
  25. record_model = CRUD(PhotoRecord)
  26. model = record_model.read(session, conditions={"id": id})
  27. if model == None:
  28. print(f"smart shooter 拍照记录更新失败,记录id:{id},不存在")
  29. else:
  30. # 走编辑逻辑
  31. record_model.updateConditions(session, conditions={"id": id}, **data)
  32. print(f"smart shooter 拍照记录更新成功,记录id:{id}")
  33. @app.websocket("/ws")
  34. async def websocket_endpoint(websocket: WebSocket):
  35. # await websocket.accept()
  36. await conn_manager.connect(websocket)
  37. active_connections.add(websocket)
  38. smart_shooter.websocket = websocket
  39. # 启动 smart_shooter.connect_listen 服务
  40. listen_task = None
  41. tasks = set()
  42. try:
  43. # 初始化回调函数
  44. smart_shooter.callback_listen = MsgCallback
  45. # 创建任务来并发处理不同类型的消息
  46. handler_task = asyncio.create_task(handler_messages(websocket))
  47. send_task = asyncio.create_task(send_message(websocket))
  48. loop = asyncio.get_event_loop()
  49. listen_task = loop.run_in_executor(None, smart_shooter.connect_listen)
  50. # 创建任务来启动 connect_listen
  51. # listen_task = asyncio.create_task(restart_smart_shooter_listener())
  52. # 等待所有任务完成
  53. await asyncio.gather(handler_task, send_task, listen_task)
  54. except WebSocketDisconnect:
  55. print("Client disconnected")
  56. finally:
  57. # 确保任务被正确取消
  58. if listen_task and not listen_task.done():
  59. listen_task.cancel()
  60. active_connections.discard(websocket)
  61. async def restart_smart_shooter_listener():
  62. """重启smart shooter监听器"""
  63. loop = asyncio.get_event_loop()
  64. if not hasattr(smart_shooter, "_listen_tasks"):
  65. smart_shooter._listen_tasks = set()
  66. while True:
  67. if getattr(smart_shooter, "listen_init", False):
  68. print("listen_init=====>", getattr(smart_shooter, "listen_init", False))
  69. break
  70. listen_task = loop.run_in_executor(None, smart_shooter.connect_listen)
  71. smart_shooter._listen_tasks.add(listen_task)
  72. # if smart_shooter.listen_init == False:
  73. # break
  74. try:
  75. await listen_task
  76. # 正常结束后等待重启
  77. await asyncio.sleep(1)
  78. except Exception as e:
  79. await asyncio.sleep(1)
  80. finally:
  81. smart_shooter._listen_tasks.discard(listen_task)
  82. async def start_smart_shooter_listen():
  83. """启动 smart_shooter 监听服务"""
  84. loop = asyncio.get_event_loop()
  85. # 在执行器中运行 connect_listen 方法
  86. try:
  87. loop.create_task(None, smart_shooter.connect_listen())
  88. except Exception as e:
  89. print(f"Smart shooter listen error: {e}")
  90. async def handler_messages(websocket):
  91. while True:
  92. try:
  93. byteDats = await websocket.receive()
  94. socket_type = byteDats.get("type")
  95. if socket_type == "websocket.disconnect":
  96. smart_shooter.stop_listen = True
  97. smart_shooter.is_init_while = False
  98. device_ctrl.close_connect()
  99. device_ctrl.mcu_exit = True
  100. device_ctrl.p_list = []
  101. device_ctrl.temp_ports_dict = {}
  102. device_ctrl.clearMyInstance()
  103. diviceList = blue_tooth.devices
  104. if len(diviceList) == 0:
  105. blue_tooth.bluetooth_exit = True
  106. blue_tooth.clearMyInstance()
  107. break
  108. diviceAddress = (
  109. ""
  110. if len(list(diviceList.keys())) == 0
  111. else list(diviceList.keys())[0]
  112. )
  113. if diviceAddress != "":
  114. print(diviceList.get(diviceAddress))
  115. diviceName = diviceList[diviceAddress]["name"]
  116. blue_tooth.disconnect_device(diviceAddress, diviceName)
  117. blue_tooth.bluetooth_exit = True
  118. blue_tooth.clearMyInstance()
  119. print("所有设备已断开连接")
  120. break
  121. print("byteDats", byteDats)
  122. # 使用create_task来避免阻塞消息处理循环
  123. asyncio.create_task(
  124. handlerSend(
  125. conn_manager, json.dumps(byteDats), websocket, smart_shooter
  126. )
  127. )
  128. except Exception as e:
  129. print(e)
  130. break
  131. async def send_message(websocket):
  132. while True:
  133. try:
  134. # 使用wait()而不是直接get()来避免阻塞
  135. message = await asyncio.wait_for(message_queue.get(), timeout=1.0)
  136. await websocket.send_json(message)
  137. except asyncio.TimeoutError:
  138. # 超时继续循环,避免永久阻塞
  139. continue
  140. except Exception as e:
  141. print("socket报错",e)
  142. break
  143. async def MsgCallback(msg):
  144. msg_id = msg.get("msg_id")
  145. match msg_id:
  146. case "PhotoUpdated":
  147. PhotoFilename = msg.get("PhotoFilename")
  148. PhotoLocation = msg.get("PhotoLocation")
  149. PhotoOrigin = msg.get("PhotoOrigin")
  150. if (PhotoFilename != "" and PhotoFilename != None) and (
  151. PhotoLocation == "Local Disk"
  152. ):
  153. # temp_photo_name = PhotoFilename
  154. # 更新拍照记录
  155. print("PhotoFilename", PhotoFilename, PhotoOrigin)
  156. if PhotoOrigin != "" and PhotoOrigin != "external":
  157. goods_art_no, id = PhotoOrigin.split(",")
  158. # 创建任务来处理数据库更新,避免阻塞回调
  159. asyncio.create_task(updateDataRecord(PhotoFilename, id))
  160. data = conn_manager.jsonMessage(
  161. code=0,
  162. msg=f"照片获取成功",
  163. data={"photo_file_name": PhotoFilename},
  164. msg_type="smart_shooter_photo_take",
  165. )
  166. await conn_manager.send_personal_message(data, smart_shooter.websocket)
  167. case "LiveviewUpdated":
  168. CameraLiveviewImage = msg.get("CameraLiveviewImage", None)
  169. # base64_to_image(CameraLiveviewImage, "liveview.jpg")
  170. # print("收到直播画面:CameraLiveviewImage")
  171. data = conn_manager.jsonMessage(
  172. code=1,
  173. msg=f"预览数据发送",
  174. data={"smart_shooter_preview": CameraLiveviewImage},
  175. msg_type="smart_shooter_enable_preview",
  176. )
  177. await conn_manager.send_personal_message(data, smart_shooter.websocket)
  178. # @app.on_event("startup")
  179. # async def startup_event():
  180. # loop = asyncio.get_event_loop()
  181. # loop.run_in_executor(None, await smart_shooter.connect_listen)
  182. # print("监听服务已启动")
  183. @app.on_event("shutdown")
  184. async def shutdown_event():
  185. print("Shutting down...")
  186. # socket_manager.close()
  187. # 清理操作
  188. for connection in list(active_connections):
  189. try:
  190. await connection.close()
  191. except Exception as e:
  192. print(f"Error closing connection: {e}")
  193. smart_shooter.stop_listen = True
  194. smart_shooter.is_init_while = False
  195. device_ctrl.close_connect()
  196. device_ctrl.mcu_exit = True
  197. device_ctrl.p_list = []
  198. device_ctrl.temp_ports_dict = {}
  199. device_ctrl.clearMyInstance()
  200. diviceList = blue_tooth.devices
  201. if len(diviceList) == 0:
  202. blue_tooth.bluetooth_exit = True
  203. blue_tooth.clearMyInstance()
  204. diviceAddress = "" if len(list(diviceList.keys()))==0 else list(diviceList.keys())[0]
  205. if diviceAddress != "":
  206. print(diviceList.get(diviceAddress))
  207. diviceName = diviceList[diviceAddress]["name"]
  208. blue_tooth.disconnect_device(diviceAddress, diviceName)
  209. blue_tooth.bluetooth_exit = True
  210. blue_tooth.clearMyInstance()
  211. print("所有设备已断开连接")