socket_server.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296
  1. import json
  2. import asyncio
  3. from models import *
  4. from .connect_manager import ConnectionManager
  5. from .message_handler import *
  6. from mcu.DeviceControl import DeviceControl, checkMcuConnection
  7. from mcu.BlueToothMode import BlueToothMode
  8. from mcu.capture.smart_shooter_class import SmartShooter
  9. import time
  10. from .socket_client import socket_manager
  11. from sqlalchemy.exc import NoResultFound
  12. import os, datetime
  13. import traceback
  14. import logging
  15. from utils import common
  16. from utils.common import message_queue
  17. logger = logging.getLogger(__name__)
  18. conn_manager = ConnectionManager()
  19. active_connections = set()
  20. device_ctrl = DeviceControl(websocket_manager=conn_manager)
  21. blue_tooth = BlueToothMode(websocket_manager=conn_manager)
  22. smart_shooter = SmartShooter(websocket_manager=conn_manager)
  23. async def updateDataRecord(PhotoFilename, id):
  24. await asyncio.sleep(0.01)
  25. create_time = datetime.datetime.fromtimestamp(os.path.getctime(PhotoFilename))
  26. data = {"id": id, "image_path": PhotoFilename, "photo_create_time": create_time}
  27. # record_model = PhotoRecord(**data)
  28. session = SqlQuery()
  29. record_model = CRUD(PhotoRecord)
  30. model = record_model.read(session, conditions={"id": id, "delete_time": None})
  31. if model == None:
  32. print(f"smart shooter 拍照记录更新失败,记录id:{id},不存在")
  33. session.close()
  34. return None
  35. else:
  36. # 走编辑逻辑
  37. settings.syncPhotoRecord(data, action_type=3)
  38. record_data = record_model.updateConditions(session, conditions={"id": id}, **data)
  39. session.close()
  40. print(f"smart shooter 拍照记录更新成功,记录id:{id}")
  41. return record_data
  42. @app.websocket("/ws")
  43. async def websocket_endpoint(websocket: WebSocket):
  44. # await websocket.accept()
  45. main_loop = asyncio.get_running_loop()
  46. smart_shooter.main_loop = main_loop # <--- 添加这一行
  47. await conn_manager.connect(websocket)
  48. active_connections.add(websocket)
  49. smart_shooter.websocket = websocket
  50. device_ctrl.websocket = websocket
  51. blue_tooth.websocket = websocket
  52. common.websocket_manager = conn_manager
  53. common.websocket = websocket
  54. # 启动 smart_shooter.connect_listen 服务
  55. listen_task = None
  56. tasks = set()
  57. send_task = None # <--- 新增
  58. try:
  59. # 初始化回调函数
  60. smart_shooter.callback_listen = MsgCallback
  61. # 创建任务来并发处理不同类型的消息
  62. handler_task = asyncio.create_task(handler_messages(websocket))
  63. # send_task = asyncio.create_task(send_message(websocket))
  64. send_task = asyncio.create_task(send_message(websocket)) # <--- 启动消费者
  65. loop = asyncio.get_event_loop()
  66. listen_task = loop.run_in_executor(None, smart_shooter.connect_listen)
  67. # send_task = loop.run_in_executor(None, send_message(websocket))
  68. # 创建任务来启动 connect_listen
  69. # listen_task = asyncio.create_task(restart_smart_shooter_listener())
  70. # 等待所有任务完成
  71. await asyncio.gather(handler_task, listen_task, return_exceptions=True)
  72. except WebSocketDisconnect:
  73. print("Client disconnected")
  74. except asyncio.CancelledError:
  75. print("Connection cancelled")
  76. finally:
  77. # 确保任务被正确取消和清理
  78. tasks_to_cancel = []
  79. if handler_task and not handler_task.done():
  80. tasks_to_cancel.append(handler_task)
  81. if send_task and not send_task.done():
  82. tasks_to_cancel.append(send_task)
  83. # 取消所有待处理的任务
  84. for task in tasks_to_cancel:
  85. task.cancel()
  86. # 等待任务取消完成
  87. if tasks_to_cancel:
  88. await asyncio.gather(*tasks_to_cancel, return_exceptions=True)
  89. # 清理连接
  90. active_connections.discard(websocket)
  91. print("WebSocket connection cleaned up")
  92. async def start_smart_shooter_listen():
  93. """启动 smart_shooter 监听服务"""
  94. loop = asyncio.get_event_loop()
  95. # 在执行器中运行 connect_listen 方法
  96. try:
  97. loop.create_task(None, smart_shooter.connect_listen())
  98. except Exception as e:
  99. print(f"Smart shooter listen error: {e}")
  100. async def handler_messages(websocket):
  101. while True:
  102. try:
  103. byteDats = await websocket.receive()
  104. socket_type = byteDats.get("type")
  105. if socket_type == "websocket.disconnect":
  106. print("socket_type===>", byteDats)
  107. if byteDats.get("code") == 1006:
  108. continue
  109. logger.info("socket强制断开")
  110. diviceList = blue_tooth.devices
  111. if len(diviceList) == 0:
  112. blue_tooth.bluetooth_exit = True
  113. blue_tooth.clearMyInstance()
  114. break
  115. diviceAddress = (
  116. ""
  117. if len(list(diviceList.keys())) == 0
  118. else list(diviceList.keys())[0]
  119. )
  120. if diviceAddress != "":
  121. print(diviceList.get(diviceAddress))
  122. diviceName = diviceList[diviceAddress]["name"]
  123. blue_tooth.disconnect_device(diviceAddress, diviceName)
  124. blue_tooth.bluetooth_exit = True
  125. blue_tooth.clearMyInstance()
  126. print("所有设备已断开连接")
  127. break
  128. print("byteDats", byteDats)
  129. # 使用create_task来避免阻塞消息处理循环
  130. asyncio.create_task(
  131. handlerSend(
  132. conn_manager, json.dumps(byteDats), websocket, smart_shooter
  133. )
  134. )
  135. except Exception as e:
  136. print("socket error", e)
  137. break
  138. async def message_generator():
  139. """异步生成器,用于从队列中获取消息"""
  140. while True:
  141. try:
  142. # 使用asyncio.wait_for设置合理的超时时间
  143. message = await message_queue.get()
  144. yield message
  145. except asyncio.TimeoutError:
  146. # 超时继续,允许其他协程运行
  147. await asyncio.sleep(0.01)
  148. # print("超时继续,允许其他协程运行")
  149. continue
  150. except Exception as e:
  151. print("消息生成器错误", e)
  152. break
  153. async def send_message(websocket):
  154. """使用异步生成器发送消息"""
  155. print("构建消息监听 send_message")
  156. while True:
  157. try:
  158. # 1. 异步等待消息,不会阻塞其他协程
  159. data = await message_queue.get()
  160. # 2. 发送消息
  161. if common.websocket_manager:
  162. # 假设 broadcast 或 send_personal_message 是异步方法
  163. await common.websocket_manager.send_personal_message(data, common.websocket)
  164. # 或者根据你的 ConnectionManager 实现调用具体发送方法
  165. message_queue.task_done()
  166. except Exception as e:
  167. print(f"消息消费错误: {e}")
  168. await asyncio.sleep(1) # 防止死循环报错
  169. async def getActionInfo(record_info):
  170. await asyncio.sleep(0.01)
  171. if not record_info:
  172. return None
  173. action_id = record_info.action_id
  174. if not action_id:
  175. return None
  176. session = SqlQuery()
  177. device_model = CRUD(DeviceConfig)
  178. model = device_model.read(session, conditions={"id": action_id})
  179. if not model:
  180. session.close()
  181. return None
  182. return model
  183. async def MsgCallback(msg):
  184. msg_id = msg.get("msg_id")
  185. match msg_id:
  186. case "PhotoUpdated":
  187. PhotoFilename = msg.get("PhotoFilename")
  188. PhotoLocation = msg.get("PhotoLocation")
  189. PhotoOrigin = msg.get("PhotoOrigin")
  190. if (PhotoFilename != "" and PhotoFilename != None) and (
  191. PhotoLocation == "Local Disk"
  192. ):
  193. # temp_photo_name = PhotoFilename
  194. # 更新拍照记录
  195. logger.info(f"PhotoUpdated,{PhotoFilename}--{PhotoOrigin}")
  196. goods_art_no = None
  197. id = None
  198. try:
  199. if PhotoOrigin != "" and PhotoOrigin not in ["external", "ui"]:
  200. goods_art_no, id = PhotoOrigin.split(",")
  201. # 创建任务来处理数据库更新,避免阻塞回调
  202. recordResult = await updateDataRecord(PhotoFilename, id)
  203. except Exception as e:
  204. print("拍照更新异常", e)
  205. recordResult = None
  206. actionModel = None
  207. if recordResult:
  208. actionModel = await getActionInfo(recordResult)
  209. data = conn_manager.jsonMessage(
  210. code=0,
  211. msg=f"照片获取成功",
  212. data={
  213. "photo_file_name": PhotoFilename,
  214. "goods_art_no": goods_art_no,
  215. "action_name": actionModel.action_name if actionModel else None,
  216. },
  217. msg_type="smart_shooter_photo_take",
  218. )
  219. await conn_manager.send_personal_message(data, smart_shooter.websocket)
  220. case "LiveviewUpdated":
  221. CameraLiveviewImage = msg.get("CameraLiveviewImage", None)
  222. # base64_to_image(CameraLiveviewImage, "liveview.jpg")
  223. # print("收到直播画面:CameraLiveviewImage")
  224. data = conn_manager.jsonMessage(
  225. code=1,
  226. msg=f"预览数据发送",
  227. data={"smart_shooter_preview": CameraLiveviewImage},
  228. msg_type="smart_shooter_enable_preview",
  229. )
  230. await conn_manager.send_personal_message(data, smart_shooter.websocket)
  231. # case _:
  232. # print("收到未知数据:{}".format(msg))
  233. # @app.on_event("startup")
  234. # async def startup_event():
  235. # loop = asyncio.get_event_loop()
  236. # loop.run_in_executor(None, await smart_shooter.connect_listen)
  237. # print("监听服务已启动")
  238. @app.on_event("shutdown")
  239. async def shutdown_event():
  240. print("Shutting down...")
  241. # socket_manager.close()
  242. # 清理操作
  243. for connection in list(active_connections):
  244. try:
  245. await connection.close()
  246. except Exception as e:
  247. print(f"Error closing connection: {e}")
  248. smart_shooter.stop_listen = True
  249. smart_shooter.is_init_while = False
  250. device_ctrl.close_connect()
  251. device_ctrl.close_lineConnect()
  252. device_ctrl.mcu_exit = True
  253. device_ctrl.p_list = []
  254. device_ctrl.temp_ports_dict = {}
  255. device_ctrl.clearMyInstance()
  256. diviceList = blue_tooth.devices
  257. if len(diviceList) == 0:
  258. blue_tooth.bluetooth_exit = True
  259. blue_tooth.clearMyInstance()
  260. diviceAddress = "" if len(list(diviceList.keys())) == 0 else list(diviceList.keys())[0]
  261. if diviceAddress != "":
  262. print(diviceList.get(diviceAddress))
  263. diviceName = diviceList[diviceAddress]["name"]
  264. blue_tooth.disconnect_device(diviceAddress, diviceName)
  265. blue_tooth.bluetooth_exit = True
  266. blue_tooth.clearMyInstance()
  267. print("所有设备已断开连接")