import json import asyncio from models import * from .connect_manager import ConnectionManager from .message_handler import * from mcu.DeviceControl import DeviceControl, checkMcuConnection from mcu.BlueToothMode import BlueToothMode from mcu.capture.smart_shooter_class import SmartShooter import time from .socket_client import socket_manager from sqlalchemy.exc import NoResultFound import os, datetime conn_manager = ConnectionManager() active_connections = set() device_ctrl = DeviceControl(websocket_manager=conn_manager) blue_tooth = BlueToothMode(websocket_manager=conn_manager) smart_shooter = SmartShooter(websocket_manager=conn_manager) from utils.common import message_queue async def updateDataRecord(PhotoFilename, id): await asyncio.sleep(0.01) create_time = datetime.datetime.fromtimestamp(os.path.getctime(PhotoFilename)) data = {"id": id, "image_path": PhotoFilename, "photo_create_time": create_time} # record_model = PhotoRecord(**data) session = SqlQuery() record_model = CRUD(PhotoRecord) model = record_model.read(session, conditions={"id": id}) if model == None: print(f"smart shooter 拍照记录更新失败,记录id:{id},不存在") else: # 走编辑逻辑 record_model.updateConditions(session, conditions={"id": id}, **data) print(f"smart shooter 拍照记录更新成功,记录id:{id}") @app.websocket("/ws") async def websocket_endpoint(websocket: WebSocket): # await websocket.accept() await conn_manager.connect(websocket) active_connections.add(websocket) smart_shooter.websocket = websocket # 启动 smart_shooter.connect_listen 服务 listen_task = None tasks = set() try: # 初始化回调函数 smart_shooter.callback_listen = MsgCallback # 创建任务来并发处理不同类型的消息 handler_task = asyncio.create_task(handler_messages(websocket)) send_task = asyncio.create_task(send_message(websocket)) loop = asyncio.get_event_loop() listen_task = loop.run_in_executor(None, smart_shooter.connect_listen) # send_task = loop.run_in_executor(None, send_message(websocket)) # 创建任务来启动 connect_listen # listen_task = asyncio.create_task(restart_smart_shooter_listener()) # 等待所有任务完成 await asyncio.gather(handler_task, listen_task, send_task) except WebSocketDisconnect: print("Client disconnected") finally: # 确保任务被正确取消 if listen_task and not listen_task.done(): listen_task.cancel() active_connections.discard(websocket) async def start_smart_shooter_listen(): """启动 smart_shooter 监听服务""" loop = asyncio.get_event_loop() # 在执行器中运行 connect_listen 方法 try: loop.create_task(None, smart_shooter.connect_listen()) except Exception as e: print(f"Smart shooter listen error: {e}") async def handler_messages(websocket): while True: try: byteDats = await websocket.receive() socket_type = byteDats.get("type") if socket_type == "websocket.disconnect": smart_shooter.stop_listen = True smart_shooter.is_init_while = False device_ctrl.close_connect() device_ctrl.close_lineConnect() device_ctrl.mcu_exit = True device_ctrl.p_list = [] device_ctrl.temp_ports_dict = {} device_ctrl.clearMyInstance() diviceList = blue_tooth.devices if len(diviceList) == 0: blue_tooth.bluetooth_exit = True blue_tooth.clearMyInstance() break diviceAddress = ( "" if len(list(diviceList.keys())) == 0 else list(diviceList.keys())[0] ) if diviceAddress != "": print(diviceList.get(diviceAddress)) diviceName = diviceList[diviceAddress]["name"] blue_tooth.disconnect_device(diviceAddress, diviceName) blue_tooth.bluetooth_exit = True blue_tooth.clearMyInstance() print("所有设备已断开连接") break print("byteDats", byteDats) # 使用create_task来避免阻塞消息处理循环 asyncio.create_task( handlerSend( conn_manager, json.dumps(byteDats), websocket, smart_shooter ) ) except Exception as e: print(e) break # async def send_message(websocket): # print("构建消息监听 send_message") # while True: # try: # # 使用wait()而不是直接get()来避免阻塞 # # 从异步队列中获取消息(在新事件循环中运行) # message = await message_queue.get() # # 发送消息 # await websocket.send_json(message) # message_queue.task_done() # except asyncio.QueueEmpty: # continue # except asyncio.TimeoutError: # # 超时继续循环,避免永久阻塞 # continue # except Exception as e: # print("socket报错",e) # break async def message_generator(): """异步生成器,用于从队列中获取消息""" while True: try: # 使用asyncio.wait_for设置合理的超时时间 message = await asyncio.wait_for(message_queue.get(), timeout=0.1) print("获取消息中",message) yield message except asyncio.TimeoutError: # 超时继续,允许其他协程运行 await asyncio.sleep(0.01) # print("超时继续,允许其他协程运行") continue except Exception as e: print("消息生成器错误", e) break async def send_message(websocket): """使用异步生成器发送消息""" print("构建消息监听 send_message") async for message in message_generator(): try: # 检查WebSocket连接状态 if websocket.client_state.name != "CONNECTED": print("WebSocket连接已断开,停止发送消息") break print("发送消息中。。。。。") # 发送消息 await websocket.send_json(message) message_queue.task_done() print("消息发送完成...") except Exception as e: print("socket报错", e) break async def MsgCallback(msg): msg_id = msg.get("msg_id") match msg_id: case "PhotoUpdated": PhotoFilename = msg.get("PhotoFilename") PhotoLocation = msg.get("PhotoLocation") PhotoOrigin = msg.get("PhotoOrigin") if (PhotoFilename != "" and PhotoFilename != None) and ( PhotoLocation == "Local Disk" ): # temp_photo_name = PhotoFilename # 更新拍照记录 print("PhotoFilename", PhotoFilename, PhotoOrigin) goods_art_no = None try: if PhotoOrigin != "" and PhotoOrigin not in ["external", "ui"]: goods_art_no, id = PhotoOrigin.split(",") # 创建任务来处理数据库更新,避免阻塞回调 await updateDataRecord(PhotoFilename, id) except Exception as e: print("拍照更新异常", e) data = conn_manager.jsonMessage( code=0, msg=f"照片获取成功", data={ "photo_file_name": PhotoFilename, "goods_art_no": goods_art_no, }, msg_type="smart_shooter_photo_take", ) await conn_manager.send_personal_message(data, smart_shooter.websocket) case "LiveviewUpdated": CameraLiveviewImage = msg.get("CameraLiveviewImage", None) # base64_to_image(CameraLiveviewImage, "liveview.jpg") # print("收到直播画面:CameraLiveviewImage") data = conn_manager.jsonMessage( code=1, msg=f"预览数据发送", data={"smart_shooter_preview": CameraLiveviewImage}, msg_type="smart_shooter_enable_preview", ) await conn_manager.send_personal_message(data, smart_shooter.websocket) # case _: # print("收到未知数据:{}".format(msg)) # @app.on_event("startup") # async def startup_event(): # loop = asyncio.get_event_loop() # loop.run_in_executor(None, await smart_shooter.connect_listen) # print("监听服务已启动") @app.on_event("shutdown") async def shutdown_event(): print("Shutting down...") # socket_manager.close() # 清理操作 for connection in list(active_connections): try: await connection.close() except Exception as e: print(f"Error closing connection: {e}") smart_shooter.stop_listen = True smart_shooter.is_init_while = False device_ctrl.close_connect() device_ctrl.close_lineConnect() device_ctrl.mcu_exit = True device_ctrl.p_list = [] device_ctrl.temp_ports_dict = {} device_ctrl.clearMyInstance() diviceList = blue_tooth.devices if len(diviceList) == 0: blue_tooth.bluetooth_exit = True blue_tooth.clearMyInstance() diviceAddress = "" if len(list(diviceList.keys()))==0 else list(diviceList.keys())[0] if diviceAddress != "": print(diviceList.get(diviceAddress)) diviceName = diviceList[diviceAddress]["name"] blue_tooth.disconnect_device(diviceAddress, diviceName) blue_tooth.bluetooth_exit = True blue_tooth.clearMyInstance() print("所有设备已断开连接")