socket_server.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271
  1. import json
  2. import asyncio
  3. from models import *
  4. from .connect_manager import ConnectionManager
  5. from .message_handler import *
  6. from mcu.DeviceControl import DeviceControl, checkMcuConnection
  7. from mcu.BlueToothMode import BlueToothMode
  8. from mcu.capture.smart_shooter_class import SmartShooter
  9. import time
  10. from .socket_client import socket_manager
  11. from sqlalchemy.exc import NoResultFound
  12. import os, datetime
  13. import traceback
  14. import logging
  15. logger = logging.getLogger(__name__)
  16. conn_manager = ConnectionManager()
  17. active_connections = set()
  18. device_ctrl = DeviceControl(websocket_manager=conn_manager)
  19. blue_tooth = BlueToothMode(websocket_manager=conn_manager)
  20. smart_shooter = SmartShooter(websocket_manager=conn_manager)
  21. from utils.common import message_queue
  22. async def updateDataRecord(PhotoFilename, id):
  23. if id == None:
  24. return
  25. await asyncio.sleep(0.01)
  26. create_time = datetime.datetime.fromtimestamp(os.path.getctime(PhotoFilename))
  27. data = {"id": id, "image_path": PhotoFilename, "photo_create_time": create_time}
  28. # record_model = PhotoRecord(**data)
  29. session = SqlQuery()
  30. record_model = CRUD(PhotoRecord)
  31. model = record_model.read(session, conditions={"id": id,"delete_time": None})
  32. if model == None:
  33. print(f"smart shooter 拍照记录更新失败,记录id:{id},不存在")
  34. else:
  35. # 走编辑逻辑
  36. settings.syncPhotoRecord(data,action_type=3)
  37. record_model.updateConditions(session, conditions={"id": id}, **data)
  38. print(f"smart shooter 拍照记录更新成功,记录id:{id}")
  39. session.close()
  40. @app.websocket("/ws")
  41. async def websocket_endpoint(websocket: WebSocket):
  42. # await websocket.accept()
  43. await conn_manager.connect(websocket)
  44. active_connections.add(websocket)
  45. smart_shooter.websocket = websocket
  46. device_ctrl.websocket = websocket
  47. blue_tooth.websocket = websocket
  48. # 启动 smart_shooter.connect_listen 服务
  49. listen_task = None
  50. tasks = set()
  51. try:
  52. # 初始化回调函数
  53. smart_shooter.callback_listen = MsgCallback
  54. # 创建任务来并发处理不同类型的消息
  55. handler_task = asyncio.create_task(handler_messages(websocket))
  56. # send_task = asyncio.create_task(send_message(websocket))
  57. loop = asyncio.get_event_loop()
  58. listen_task = loop.run_in_executor(None, smart_shooter.connect_listen)
  59. # send_task = loop.run_in_executor(None, send_message(websocket))
  60. # 创建任务来启动 connect_listen
  61. # listen_task = asyncio.create_task(restart_smart_shooter_listener())
  62. # 等待所有任务完成
  63. await asyncio.gather(handler_task, listen_task)
  64. except WebSocketDisconnect:
  65. print("Client disconnected")
  66. finally:
  67. # 确保任务被正确取消
  68. if listen_task and not listen_task.done():
  69. listen_task.cancel()
  70. active_connections.discard(websocket)
  71. async def start_smart_shooter_listen():
  72. """启动 smart_shooter 监听服务"""
  73. loop = asyncio.get_event_loop()
  74. # 在执行器中运行 connect_listen 方法
  75. try:
  76. loop.create_task(None, smart_shooter.connect_listen())
  77. except Exception as e:
  78. print(f"Smart shooter listen error: {e}")
  79. async def handler_messages(websocket):
  80. while True:
  81. try:
  82. byteDats = await websocket.receive()
  83. socket_type = byteDats.get("type")
  84. if socket_type == "websocket.disconnect":
  85. print("socket_type===>", byteDats)
  86. smart_shooter.stop_listen = True
  87. smart_shooter.is_init_while = False
  88. if byteDats.get("code") == 1006:
  89. continue
  90. device_ctrl.close_connect()
  91. device_ctrl.close_lineConnect()
  92. device_ctrl.mcu_exit = True
  93. device_ctrl.p_list = []
  94. device_ctrl.temp_ports_dict = {}
  95. device_ctrl.clearMyInstance()
  96. diviceList = blue_tooth.devices
  97. if len(diviceList) == 0:
  98. blue_tooth.bluetooth_exit = True
  99. blue_tooth.clearMyInstance()
  100. break
  101. diviceAddress = (
  102. ""
  103. if len(list(diviceList.keys())) == 0
  104. else list(diviceList.keys())[0]
  105. )
  106. if diviceAddress != "":
  107. print(diviceList.get(diviceAddress))
  108. diviceName = diviceList[diviceAddress]["name"]
  109. blue_tooth.disconnect_device(diviceAddress, diviceName)
  110. blue_tooth.bluetooth_exit = True
  111. blue_tooth.clearMyInstance()
  112. print("所有设备已断开连接")
  113. break
  114. print("byteDats", byteDats)
  115. # 使用create_task来避免阻塞消息处理循环
  116. asyncio.create_task(
  117. handlerSend(
  118. conn_manager, json.dumps(byteDats), websocket, smart_shooter
  119. )
  120. )
  121. except Exception as e:
  122. print("socket error",e)
  123. break
  124. # async def send_message(websocket):
  125. # print("构建消息监听 send_message")
  126. # while True:
  127. # try:
  128. # # 使用wait()而不是直接get()来避免阻塞
  129. # # 从异步队列中获取消息(在新事件循环中运行)
  130. # message = await message_queue.get()
  131. # # 发送消息
  132. # await websocket.send_json(message)
  133. # message_queue.task_done()
  134. # except asyncio.QueueEmpty:
  135. # continue
  136. # except asyncio.TimeoutError:
  137. # # 超时继续循环,避免永久阻塞
  138. # continue
  139. # except Exception as e:
  140. # print("socket报错",e)
  141. # break
  142. async def message_generator():
  143. """异步生成器,用于从队列中获取消息"""
  144. while True:
  145. try:
  146. # 使用asyncio.wait_for设置合理的超时时间
  147. message = await asyncio.wait_for(message_queue.get(), timeout=0.1)
  148. print("获取消息中",message)
  149. yield message
  150. except asyncio.TimeoutError:
  151. # 超时继续,允许其他协程运行
  152. await asyncio.sleep(0.01)
  153. # print("超时继续,允许其他协程运行")
  154. continue
  155. except Exception as e:
  156. print("消息生成器错误", e)
  157. break
  158. async def send_message(websocket):
  159. """使用异步生成器发送消息"""
  160. print("构建消息监听 send_message")
  161. async for message in message_generator():
  162. try:
  163. # 检查WebSocket连接状态
  164. if websocket.client_state.name != "CONNECTED":
  165. print("WebSocket连接已断开,停止发送消息")
  166. continue
  167. print("发送消息中。。。。。", message)
  168. # 发送消息
  169. await websocket.send_json(message)
  170. message_queue.task_done()
  171. print("消息发送完成...")
  172. except Exception as e:
  173. print("socket报错", e)
  174. break
  175. async def MsgCallback(msg):
  176. msg_id = msg.get("msg_id")
  177. match msg_id:
  178. case "PhotoUpdated":
  179. PhotoFilename = msg.get("PhotoFilename")
  180. PhotoLocation = msg.get("PhotoLocation")
  181. PhotoOrigin = msg.get("PhotoOrigin")
  182. if (PhotoFilename != "" and PhotoFilename != None) and (
  183. PhotoLocation == "Local Disk"
  184. ):
  185. # temp_photo_name = PhotoFilename
  186. # 更新拍照记录
  187. print("PhotoFilename", PhotoFilename, PhotoOrigin)
  188. goods_art_no = None
  189. id = None
  190. try:
  191. if PhotoOrigin != "" and PhotoOrigin not in ["external", "ui"]:
  192. goods_art_no, id = PhotoOrigin.split(",")
  193. # 创建任务来处理数据库更新,避免阻塞回调
  194. await updateDataRecord(PhotoFilename, id)
  195. except Exception as e:
  196. print("拍照更新异常", e)
  197. data = conn_manager.jsonMessage(
  198. code=0,
  199. msg=f"照片获取成功",
  200. data={
  201. "photo_file_name": PhotoFilename,
  202. "goods_art_no": goods_art_no,
  203. },
  204. msg_type="smart_shooter_photo_take",
  205. )
  206. await conn_manager.send_personal_message(data, smart_shooter.websocket)
  207. case "LiveviewUpdated":
  208. CameraLiveviewImage = msg.get("CameraLiveviewImage", None)
  209. # base64_to_image(CameraLiveviewImage, "liveview.jpg")
  210. # print("收到直播画面:CameraLiveviewImage")
  211. data = conn_manager.jsonMessage(
  212. code=1,
  213. msg=f"预览数据发送",
  214. data={"smart_shooter_preview": CameraLiveviewImage},
  215. msg_type="smart_shooter_enable_preview",
  216. )
  217. await conn_manager.send_personal_message(data, smart_shooter.websocket)
  218. # case _:
  219. # print("收到未知数据:{}".format(msg))
  220. # @app.on_event("startup")
  221. # async def startup_event():
  222. # loop = asyncio.get_event_loop()
  223. # loop.run_in_executor(None, await smart_shooter.connect_listen)
  224. # print("监听服务已启动")
  225. @app.on_event("shutdown")
  226. async def shutdown_event():
  227. print("Shutting down...")
  228. # socket_manager.close()
  229. # 清理操作
  230. for connection in list(active_connections):
  231. try:
  232. await connection.close()
  233. except Exception as e:
  234. print(f"Error closing connection: {e}")
  235. smart_shooter.stop_listen = True
  236. smart_shooter.is_init_while = False
  237. device_ctrl.close_connect()
  238. device_ctrl.close_lineConnect()
  239. device_ctrl.mcu_exit = True
  240. device_ctrl.p_list = []
  241. device_ctrl.temp_ports_dict = {}
  242. device_ctrl.clearMyInstance()
  243. diviceList = blue_tooth.devices
  244. if len(diviceList) == 0:
  245. blue_tooth.bluetooth_exit = True
  246. blue_tooth.clearMyInstance()
  247. diviceAddress = "" if len(list(diviceList.keys()))==0 else list(diviceList.keys())[0]
  248. if diviceAddress != "":
  249. print(diviceList.get(diviceAddress))
  250. diviceName = diviceList[diviceAddress]["name"]
  251. blue_tooth.disconnect_device(diviceAddress, diviceName)
  252. blue_tooth.bluetooth_exit = True
  253. blue_tooth.clearMyInstance()
  254. print("所有设备已断开连接")