소스 검색

```
feat(mcu): 添加时间追踪日志优化消息发送流程

- 在BaseClass.py中添加time模块导入,增加时间追踪日志用于调试消息发送流程
- 重构sendSocketMessage方法,添加详细的时间戳记录不同阶段的执行时间
- 在DeviceControl.py中注释掉智能射击器预览功能,暂时禁用相关调用

fix(smart_shooter): 修复异步回调和事件循环管理问题

- 修改SmartShooter类移除未使用的main_loop属性
- 重构connect_listen方法,简化ZMQ消息监听逻辑
- 将回调提交到主事件循环使用run_coroutine_threadsafe确保线程安全
- 修复自动对焦过程中EnableCameraPreview方法的异步调用问题

refactor(sockets): 优化websocket连接管理和消息发送机制

- 在ConnectionManager中添加时间追踪日志记录websocket消息发送过程
- 改进socket_server.py中的事件循环管理,正确设置smart_shooter主循环引用
- 修复send_message函数中WebSocket断开连接时的处理逻辑,使用break替代continue
- 添加正确的任务清理机制确保资源正确释放
```

rambo 1 개월 전
부모
커밋
b7e1223522
5개의 변경된 파일82개의 추가작업 그리고 81개의 파일을 삭제
  1. 18 6
      python/mcu/BaseClass.py
  2. 6 6
      python/mcu/DeviceControl.py
  3. 42 61
      python/mcu/capture/smart_shooter_class.py
  4. 7 4
      python/sockets/connect_manager.py
  5. 9 4
      python/sockets/socket_server.py

+ 18 - 6
python/mcu/BaseClass.py

@@ -1,4 +1,4 @@
-import asyncio
+import asyncio,time
 from sockets import ConnectionManager
 from utils.common import message_queue
 from mcu.capture.smart_shooter_class import SmartShooter
@@ -17,20 +17,32 @@ class BaseClass:
         # self.device_status = 2
 
     def sendSocketMessage(self, code=0, msg="", data=None, device_status=2):
-        data = {
+        t_start = time.time()
+        payload = {
             "code": code,
             "msg": msg,
             "status": device_status,
             "data": data,
             "msg_type": self.msg_type,
         }
+        
+        print(f"[T1: {t_start:.4f}] sendSocketMessage 调用, msg={msg}")
+
         loop = asyncio.get_event_loop()
         if self.websocket == None:
-            loop.create_task(message_queue.put(data))
+            print(f"[T1: {time.time()-t_start:.4f}s] 走队列路径")
+            loop.create_task(message_queue.put(payload))
         else:
-            loop.create_task(
-                self.websocket_manager.send_personal_message(data, self.websocket)
-            )
+            print(f"[T1: {time.time()-t_start:.4f}s] 走直接发送路径")
+            
+            async def _do_send():
+                t2 = time.time()
+                print(f"[T2: {t2-t_start:.4f}s] 任务开始执行, 准备调用 send_personal_message")
+                await self.websocket_manager.send_personal_message(payload, self.websocket)
+                t3 = time.time()
+                print(f"[T3: {t3-t_start:.4f}s] send_personal_message 完成, 总耗时: {t3-t2:.4f}s")
+                
+            loop.create_task(_do_send())
         print("\033[1;32;40m 发送消息===>sendSocketMessage \033[0m", data)
     async def asyncSendSocketMessage(self, code=0, msg="", data=None, device_status=2):
         data = {

+ 6 - 6
python/mcu/DeviceControl.py

@@ -1655,9 +1655,9 @@ class DeviceControl(BaseClass, metaclass=SingletonType):
                 },
             )
             await self.controlDevice("laser_position", 0)
-            await smart_shooter.EnableCameraPreview(
-                    enable_status=True, msg_type="smart_shooter_enable_preview"
-                )
+            # await smart_shooter.EnableCameraPreview(
+            #         enable_status=True, msg_type="smart_shooter_enable_preview"
+            #     )
             self.msg_type = "mcu"
             self.is_runn_action = True
             for index, action in enumerate(config_list):
@@ -1731,9 +1731,9 @@ class DeviceControl(BaseClass, metaclass=SingletonType):
             self.msg_type = "mcu"
             await self.controlDevice("laser_position", 1)
         self.action_state = 2
-        await smart_shooter.EnableCameraPreview(
-                    enable_status=False, msg_type="smart_shooter_enable_preview"
-                )
+        # await smart_shooter.EnableCameraPreview(
+        #             enable_status=False, msg_type="smart_shooter_enable_preview"
+        #         )
     async def run_mcu_config_single(
         self,
         config_info,

+ 42 - 61
python/mcu/capture/smart_shooter_class.py

@@ -30,6 +30,7 @@ class SmartShooter(metaclass=SingletonType):
         self.callback_listen = None
         self.listen_init = None
         self.websocket = None
+        # self.main_loop = None
 
     def __send_tcp_message(self, socket, msg):
         # await asyncio.sleep(0.01)
@@ -393,13 +394,13 @@ class SmartShooter(metaclass=SingletonType):
         await asyncio.sleep(delay)
         # 对焦混用
         if is_af:
-            self.EnableCameraPreview(enable_status=True, msg_type=msg_type)
+            await self.EnableCameraPreview(enable_status=True, msg_type=msg_type)
             start_time = time.time()
             await self.CameraAutofocus()
             end_time = time.time()
             elapsed_time = end_time - start_time
             logger.info(f"自动对焦耗时  {elapsed_time:.4f} 秒")
-            self.EnableCameraPreview(enable_status=False, msg_type=msg_type)
+            await self.EnableCameraPreview(enable_status=False, msg_type=msg_type)
         self.msg_type = msg_type
         print("camera_states", msg_type)
         """
@@ -496,68 +497,48 @@ class SmartShooter(metaclass=SingletonType):
             pass
 
     def connect_listen(self):
-        print("smart shooter connect_listen", self.connect_status, self.listen_init)
+        print("smart shooter connect_listen START")
         if self.connect_status == True or self.listen_init == True:
             return True
-        # 发起监听
+        
         sub_socket, context = self.__create_listen()
         print("构建监听", self.connect_status)
-        logger.info("构建监听,%s", self.connect_status)
+        
+        # 不再需要 self.listen_loop,我们只使用主循环
+        
         try:
-            # 尝试获取当前线程的事件循环
-            try:
-                self.listen_loop = asyncio.get_running_loop()
-            except RuntimeError:
-                self.listen_loop = asyncio.new_event_loop()
-        except RuntimeError:
-            # 如果当前线程没有事件循环,则创建一个新的
-            self.listen_loop = asyncio.new_event_loop()
-            asyncio.set_event_loop(self.listen_loop)
-        while True:
-            self.listen_init = True
-            if self.callback_listen == None:
-                continue
-            try:
-                # 创建任务并立即运行,设置超时以避免阻塞
-                # future = asyncio.ensure_future(self.asyncMessageListen())
-                # 运行任务,但设置超时以避免无限等待
-                # 使用一致的事件循环运行异步任务
-                # asyncio.run(self.asyncMessageListen())
-                # 运行任务,但设置超时以避免无限等待
-                self.listen_loop.run_until_complete(self.asyncMessageListen())
-            except asyncio.TimeoutError:
-                # 超时是正常的,表示没有消息需要处理
-                pass
-            except Exception as e:
-                # 处理其他可能的异常
-                print(f"Error handling async message-asyncMessageListen: {e}")
-            # camera_states, camera_msg = await self.GetCameraInfo(is_send=False)
-            # if not camera_states:
-            #     print("相机未连接回调打印", camera_states, camera_msg)
-            #     await asyncio.sleep(0.01)  # 等待相机连接
-            #     continue
-            if self.stop_listen:
-                break
-            try:
-                self.connect_status = True
-                raw = sub_socket.recv()
-                str_msg = raw.decode("utf-8")
-                json_msg = json.loads(str_msg)
-                if json_msg["msg_id"] == "NetworkPing":
+            while True:
+                self.listen_init = True
+                
+                if self.stop_listen:
+                    break
+                
+                # 1. 阻塞接收 ZMQ 消息 (这是唯一的阻塞点,但在子线程,所以没问题)
+                try:
+                    raw = sub_socket.recv() # 这里会阻塞直到有消息或超时
+                    str_msg = raw.decode("utf-8")
+                    json_msg = json.loads(str_msg)
+                    
+                    if json_msg.get("msg_id") == "NetworkPing":
+                        continue
+                    
+                    # 2. 将回调提交到【主事件循环】
+                    if hasattr(self, 'main_loop') and self.main_loop:
+                        # 非阻塞提交,立即返回
+                        asyncio.run_coroutine_threadsafe(self.callback_listen(json_msg), self.main_loop)
+                    else:
+                        print("Error: main_loop not set in SmartShooter")
+                        
+                except zmq.Again:
+                    # 超时,继续循环
                     continue
-                # self.callback_listen(json_msg)
-                asyncio.run(self.callback_listen(json_msg))
-            except zmq.Again:
-                # print("接收超时,继续监听...")
-                # logger.info("接收超时,继续监听...")
-                continue
-            except Exception as e:
-                self.connect_status = False
-                print(f"发生错误: {e}")
-                break
-        self.listen_init = False
-        self.connect_status = False
-        self.stop_listen = False
-        sub_socket.close()
-        context.term()
-        print("smart shooter连接断开")
+                except Exception as e:
+                    print(f"ZMQ Error: {e}")
+                    break
+                    
+        finally:
+            self.listen_init = False
+            self.connect_status = False
+            sub_socket.close()
+            context.term()
+            print("smart shooter连接断开")

+ 7 - 4
python/sockets/connect_manager.py

@@ -1,6 +1,6 @@
 from models import WebSocket
 from logger import logger
-import json, asyncio
+import json, asyncio,time
 from starlette.websockets import WebSocketState
 class ConnectionManager:
     is_connected = False
@@ -27,11 +27,14 @@ class ConnectionManager:
     async def send_personal_message(self, message: str, websocket: WebSocket):
         '''向用户发送消息'''
         # await websocket.send_json(message)
+        t_send_start = time.time()
         try:
-          await websocket.send_json(message)
+            print(f"[T4: {time.time()-t_send_start:.4f}s] 开始 websocket.send_json")
+            await websocket.send_json(message)
+            print(f"[T5: {time.time()-t_send_start:.4f}s] websocket.send_json 返回")
         except Exception as e:
-          logger.info(f"socket 消息发送异常:{str(e)}")
-          await asyncio.sleep(0.001)
+            logger.info(f"socket 消息发送异常:{str(e)}")
+            await asyncio.sleep(0.001)
 
     async def broadcast(self, message: str):
         """广播消息"""

+ 9 - 4
python/sockets/socket_server.py

@@ -41,6 +41,8 @@ async def updateDataRecord(PhotoFilename, id):
 @app.websocket("/ws")
 async def websocket_endpoint(websocket: WebSocket):
     # await websocket.accept()
+    main_loop = asyncio.get_running_loop()
+    smart_shooter.main_loop = main_loop  # <--- 添加这一行
     await conn_manager.connect(websocket)
     active_connections.add(websocket)
     smart_shooter.websocket = websocket
@@ -49,19 +51,21 @@ async def websocket_endpoint(websocket: WebSocket):
     # 启动 smart_shooter.connect_listen 服务
     listen_task = None
     tasks = set()
+    send_task = None # <--- 新增
     try:
         # 初始化回调函数
         smart_shooter.callback_listen = MsgCallback
         # 创建任务来并发处理不同类型的消息
         handler_task = asyncio.create_task(handler_messages(websocket))
         # send_task = asyncio.create_task(send_message(websocket))
+        send_task = asyncio.create_task(send_message(websocket)) # <--- 启动消费者
         loop = asyncio.get_event_loop()
         listen_task = loop.run_in_executor(None, smart_shooter.connect_listen)
         # send_task = loop.run_in_executor(None, send_message(websocket))
         # 创建任务来启动 connect_listen
         # listen_task = asyncio.create_task(restart_smart_shooter_listener())
         # 等待所有任务完成
-        await asyncio.gather(handler_task, listen_task)
+        await asyncio.gather(handler_task,listen_task)
 
     except WebSocketDisconnect:
         print("Client disconnected")
@@ -69,6 +73,8 @@ async def websocket_endpoint(websocket: WebSocket):
         # 确保任务被正确取消
         if listen_task and not listen_task.done():
             listen_task.cancel()
+        if send_task and not send_task.done():
+            send_task.cancel() # <--- 清理
         active_connections.discard(websocket)
 
 
@@ -152,8 +158,7 @@ async def message_generator():
     while True:
         try:
             # 使用asyncio.wait_for设置合理的超时时间
-            message = await asyncio.wait_for(message_queue.get(), timeout=0.1)
-            print("获取消息中",message)
+            message = await message_queue.get()
             yield message
         except asyncio.TimeoutError:
             # 超时继续,允许其他协程运行
@@ -173,7 +178,7 @@ async def send_message(websocket):
             # 检查WebSocket连接状态
             if websocket.client_state.name != "CONNECTED":
                 print("WebSocket连接已断开,停止发送消息")
-                continue
+                break # 退出循环
             print("发送消息中。。。。。", message)
             # 发送消息
             await websocket.send_json(message)