| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267 |
- import json
- import asyncio
- from models import *
- from .connect_manager import ConnectionManager
- from .message_handler import *
- from mcu.DeviceControl import DeviceControl, checkMcuConnection
- from mcu.BlueToothMode import BlueToothMode
- from mcu.capture.smart_shooter_class import SmartShooter
- import time
- from .socket_client import socket_manager
- from sqlalchemy.exc import NoResultFound
- import os, datetime
- import traceback
- import logging
- logger = logging.getLogger(__name__)
- conn_manager = ConnectionManager()
- active_connections = set()
- device_ctrl = DeviceControl(websocket_manager=conn_manager)
- blue_tooth = BlueToothMode(websocket_manager=conn_manager)
- smart_shooter = SmartShooter(websocket_manager=conn_manager)
- from utils.common import message_queue
- async def updateDataRecord(PhotoFilename, id):
- await asyncio.sleep(0.01)
- create_time = datetime.datetime.fromtimestamp(os.path.getctime(PhotoFilename))
- data = {"id": id, "image_path": PhotoFilename, "photo_create_time": create_time}
- # record_model = PhotoRecord(**data)
- session = SqlQuery()
- record_model = CRUD(PhotoRecord)
- model = record_model.read(session, conditions={"id": id})
- if model == None:
- print(f"smart shooter 拍照记录更新失败,记录id:{id},不存在")
- else:
- # 走编辑逻辑
- record_model.updateConditions(session, conditions={"id": id}, **data)
- print(f"smart shooter 拍照记录更新成功,记录id:{id}")
- session.close()
- @app.websocket("/ws")
- async def websocket_endpoint(websocket: WebSocket):
- # await websocket.accept()
- await conn_manager.connect(websocket)
- active_connections.add(websocket)
- smart_shooter.websocket = websocket
- device_ctrl.websocket = websocket
- blue_tooth.websocket = websocket
- # 启动 smart_shooter.connect_listen 服务
- listen_task = None
- tasks = set()
- try:
- # 初始化回调函数
- smart_shooter.callback_listen = MsgCallback
- # 创建任务来并发处理不同类型的消息
- handler_task = asyncio.create_task(handler_messages(websocket))
- # send_task = asyncio.create_task(send_message(websocket))
- loop = asyncio.get_event_loop()
- listen_task = loop.run_in_executor(None, smart_shooter.connect_listen)
- # send_task = loop.run_in_executor(None, send_message(websocket))
- # 创建任务来启动 connect_listen
- # listen_task = asyncio.create_task(restart_smart_shooter_listener())
- # 等待所有任务完成
- await asyncio.gather(handler_task, listen_task)
- except WebSocketDisconnect:
- print("Client disconnected")
- finally:
- # 确保任务被正确取消
- if listen_task and not listen_task.done():
- listen_task.cancel()
- active_connections.discard(websocket)
- async def start_smart_shooter_listen():
- """启动 smart_shooter 监听服务"""
- loop = asyncio.get_event_loop()
- # 在执行器中运行 connect_listen 方法
- try:
- loop.create_task(None, smart_shooter.connect_listen())
- except Exception as e:
- print(f"Smart shooter listen error: {e}")
- async def handler_messages(websocket):
- while True:
- try:
- byteDats = await websocket.receive()
- socket_type = byteDats.get("type")
- if socket_type == "websocket.disconnect":
- print("socket_type===>", byteDats)
- smart_shooter.stop_listen = True
- smart_shooter.is_init_while = False
- if byteDats.get("code") == 1006:
- continue
- device_ctrl.close_connect()
- device_ctrl.close_lineConnect()
- device_ctrl.mcu_exit = True
- device_ctrl.p_list = []
- device_ctrl.temp_ports_dict = {}
- device_ctrl.clearMyInstance()
- diviceList = blue_tooth.devices
- if len(diviceList) == 0:
- blue_tooth.bluetooth_exit = True
- blue_tooth.clearMyInstance()
- break
- diviceAddress = (
- ""
- if len(list(diviceList.keys())) == 0
- else list(diviceList.keys())[0]
- )
- if diviceAddress != "":
- print(diviceList.get(diviceAddress))
- diviceName = diviceList[diviceAddress]["name"]
- blue_tooth.disconnect_device(diviceAddress, diviceName)
- blue_tooth.bluetooth_exit = True
- blue_tooth.clearMyInstance()
- print("所有设备已断开连接")
- break
- print("byteDats", byteDats)
- # 使用create_task来避免阻塞消息处理循环
- asyncio.create_task(
- handlerSend(
- conn_manager, json.dumps(byteDats), websocket, smart_shooter
- )
- )
- except Exception as e:
- print("socket error",e)
- break
- # async def send_message(websocket):
- # print("构建消息监听 send_message")
- # while True:
- # try:
- # # 使用wait()而不是直接get()来避免阻塞
- # # 从异步队列中获取消息(在新事件循环中运行)
- # message = await message_queue.get()
- # # 发送消息
- # await websocket.send_json(message)
- # message_queue.task_done()
- # except asyncio.QueueEmpty:
- # continue
- # except asyncio.TimeoutError:
- # # 超时继续循环,避免永久阻塞
- # continue
- # except Exception as e:
- # print("socket报错",e)
- # break
- async def message_generator():
- """异步生成器,用于从队列中获取消息"""
- while True:
- try:
- # 使用asyncio.wait_for设置合理的超时时间
- message = await asyncio.wait_for(message_queue.get(), timeout=0.1)
- print("获取消息中",message)
- yield message
- except asyncio.TimeoutError:
- # 超时继续,允许其他协程运行
- await asyncio.sleep(0.01)
- # print("超时继续,允许其他协程运行")
- continue
- except Exception as e:
- print("消息生成器错误", e)
- break
- async def send_message(websocket):
- """使用异步生成器发送消息"""
- print("构建消息监听 send_message")
- async for message in message_generator():
- try:
- # 检查WebSocket连接状态
- if websocket.client_state.name != "CONNECTED":
- print("WebSocket连接已断开,停止发送消息")
- continue
- print("发送消息中。。。。。", message)
- # 发送消息
- await websocket.send_json(message)
- message_queue.task_done()
- print("消息发送完成...")
- except Exception as e:
- print("socket报错", e)
- break
- async def MsgCallback(msg):
- msg_id = msg.get("msg_id")
- match msg_id:
- case "PhotoUpdated":
- PhotoFilename = msg.get("PhotoFilename")
- PhotoLocation = msg.get("PhotoLocation")
- PhotoOrigin = msg.get("PhotoOrigin")
- if (PhotoFilename != "" and PhotoFilename != None) and (
- PhotoLocation == "Local Disk"
- ):
- # temp_photo_name = PhotoFilename
- # 更新拍照记录
- print("PhotoFilename", PhotoFilename, PhotoOrigin)
- goods_art_no = None
- try:
- if PhotoOrigin != "" and PhotoOrigin not in ["external", "ui"]:
- goods_art_no, id = PhotoOrigin.split(",")
- # 创建任务来处理数据库更新,避免阻塞回调
- await updateDataRecord(PhotoFilename, id)
- except Exception as e:
- print("拍照更新异常", e)
- data = conn_manager.jsonMessage(
- code=0,
- msg=f"照片获取成功",
- data={
- "photo_file_name": PhotoFilename,
- "goods_art_no": goods_art_no,
- },
- msg_type="smart_shooter_photo_take",
- )
- await conn_manager.send_personal_message(data, smart_shooter.websocket)
- case "LiveviewUpdated":
- CameraLiveviewImage = msg.get("CameraLiveviewImage", None)
- # base64_to_image(CameraLiveviewImage, "liveview.jpg")
- # print("收到直播画面:CameraLiveviewImage")
- data = conn_manager.jsonMessage(
- code=1,
- msg=f"预览数据发送",
- data={"smart_shooter_preview": CameraLiveviewImage},
- msg_type="smart_shooter_enable_preview",
- )
- await conn_manager.send_personal_message(data, smart_shooter.websocket)
- # case _:
- # print("收到未知数据:{}".format(msg))
- # @app.on_event("startup")
- # async def startup_event():
- # loop = asyncio.get_event_loop()
- # loop.run_in_executor(None, await smart_shooter.connect_listen)
- # print("监听服务已启动")
- @app.on_event("shutdown")
- async def shutdown_event():
- print("Shutting down...")
- # socket_manager.close()
- # 清理操作
- for connection in list(active_connections):
- try:
- await connection.close()
- except Exception as e:
- print(f"Error closing connection: {e}")
- smart_shooter.stop_listen = True
- smart_shooter.is_init_while = False
- device_ctrl.close_connect()
- device_ctrl.close_lineConnect()
- device_ctrl.mcu_exit = True
- device_ctrl.p_list = []
- device_ctrl.temp_ports_dict = {}
- device_ctrl.clearMyInstance()
- diviceList = blue_tooth.devices
- if len(diviceList) == 0:
- blue_tooth.bluetooth_exit = True
- blue_tooth.clearMyInstance()
- diviceAddress = "" if len(list(diviceList.keys()))==0 else list(diviceList.keys())[0]
- if diviceAddress != "":
- print(diviceList.get(diviceAddress))
- diviceName = diviceList[diviceAddress]["name"]
- blue_tooth.disconnect_device(diviceAddress, diviceName)
- blue_tooth.bluetooth_exit = True
- blue_tooth.clearMyInstance()
- print("所有设备已断开连接")
|