Jelajahi Sumber

socket消息发送

rambo 4 bulan lalu
induk
melakukan
3292ecaa25

+ 58 - 9
python/api.py

@@ -39,6 +39,21 @@ def parserGoodsDict2Aigc(return_data_check_before_detail):
     return goods_no_dict
 
 
+async def sendAsyncMessage(msg="", goods_arts=[], status="",msg_type=""):
+    """异步发送消息"""
+    data = {
+        "code": 0,
+        "msg": msg,
+        "status": 2,
+        "data": {
+            "status": status,
+            "goods_art_nos": goods_arts,
+        },
+        "msg_type": msg_type,
+    }
+    await message_queue.put(data)
+
+
 @app.get("/")
 async def index():
     # await socket_manager.send_message(msg="测试")
@@ -448,12 +463,17 @@ async def handle_detail(request: Request, params: HandlerDetail):
         women_id = upper_footer_params.get("women_id")
         if not women_id:
             raise UnicornException("请选择女模特")
-    handler_result = []
     try:
         return_data_check_before_detail = run_main.check_before_detail(config_data)
         if is_product_scene == 1:
             goods_dict = parserGoodsDict2Aigc(return_data_check_before_detail)
             new_goods_dict = {}
+            await sendAsyncMessage(
+                msg="开始处理场景图",
+                goods_arts=list(goods_dict.keys()),
+                status="开始处理",
+                msg_type="scene_progress",
+            )
             for goods_art_no in goods_dict.keys():
                 goods_art_dict_info = goods_dict[goods_art_no]
                 first_goods_art_no_info = goods_art_dict_info.get("货号资料", [])[0]
@@ -476,22 +496,39 @@ async def handle_detail(request: Request, params: HandlerDetail):
                         {
                             "goods_art_no": goods_art_no,
                             "success": True,
-                            "info": "处理成功",
+                            "info": "场景图处理成功",
                         }
                     )
+                    await sendAsyncMessage(
+                        msg="场景图处理完成",
+                        goods_arts=[goods_art_no],
+                        status="场景图处理完成",
+                        msg_type="scene_progress",
+                    )
                 except:
-                    print('An exception occurred')
                     handler_result.append(
                         {
                             "goods_art_no": goods_art_no,
                             "success": False,
-                            "info": "处理失败",
+                            "info": "场景图处理失败",
                         }
                     )
+                    await sendAsyncMessage(
+                        msg="场景图处理失败",
+                        goods_arts=[goods_art_no],
+                        status="场景图处理失败",
+                        msg_type="scene_progress",
+                    )
             return_data_check_before_detail["data"]["goods_no_dict"] = new_goods_dict
         if is_upper_footer == 1:
             goods_dict = parserGoodsDict2Aigc(return_data_check_before_detail)
             new_goods_dict = {}
+            await sendAsyncMessage(
+                msg="开始处理模特图",
+                goods_arts=list(goods_dict.keys()),
+                status="开始处理模特图",
+                msg_type="upper_footer_progress",
+            )
             for goods_art_no in goods_dict.keys():
                 goods_art_dict_info = goods_dict[goods_art_no]
                 first_goods_art_no_info = goods_art_dict_info.get("货号资料", [])[0]
@@ -516,17 +553,29 @@ async def handle_detail(request: Request, params: HandlerDetail):
                         {
                             "goods_art_no": goods_art_no,
                             "success": True,
-                            "info": "处理成功",
+                            "info": "模特图处理成功",
                         }
                     )
+                    await sendAsyncMessage(
+                        msg="模特图处理成功",
+                        goods_arts=[goods_art_no],
+                        status="模特图处理成功",
+                        msg_type="upper_footer_progress",
+                    )
                 except:
                     handler_result.append(
                         {
                             "goods_art_no": goods_art_no,
                             "success": False,
-                            "info": "处理失败",
+                            "info": "模特图处理失败",
                         }
                     )
+                    await sendAsyncMessage(
+                        msg="模特图处理失败",
+                        goods_arts=[goods_art_no],
+                        status="模特图处理失败",
+                        msg_type="upper_footer_progress",
+                    )
             return_data_check_before_detail["data"]["goods_no_dict"] = new_goods_dict
         if is_detail == 0:
             return {
@@ -603,9 +652,9 @@ async def handle_detail(request: Request, params: HandlerDetail):
     except UnicornException as e:
         handler_result_folder = ""
         handler_result = e.msg
-    # except Exception as e:
-    #     handler_result_folder = ""
-    #     handler_result.append({"goods_art_no": "", "success": False, "info": str(e)})
+    except Exception as e:
+        handler_result_folder = ""
+        handler_result.append({"goods_art_no": "", "success": False, "info": str(e)})
     return {
         "code": 0,
         "msg": "",

+ 28 - 1
python/mcu/capture/smart_shooter_class.py

@@ -5,11 +5,12 @@ import zmq
 import asyncio
 from PIL import Image
 from io import BytesIO
-import base64
+import base64,threading
 import zmq, sys, time
 from utils.SingletonType import SingletonType
 import settings
 import logging
+from utils.common import message_queue
 
 logger = logging.getLogger(__name__)
 # 定义为单例模式,避免被重复实例化
@@ -466,6 +467,16 @@ class SmartShooter(metaclass=SingletonType):
             }
             await self.websocket_manager.send_personal_message(message, self.websocket)
             return False, msg_send
+    async def asyncMessageListen(self):
+        if self.websocket.client_state.name != "CONNECTED":
+            print("WebSocket连接已断开,停止发送消息")
+            return
+        message = await asyncio.wait_for(message_queue.get(), timeout=0.1)
+        print("发送消息中。。。。。", message)
+        await self.websocket_manager.send_personal_message(
+                    message, self.websocket
+                )
+        message_queue.task_done()
 
     def connect_listen(self):
         print("smart shooter connect_listen", self.connect_status, self.listen_init)
@@ -475,10 +486,26 @@ class SmartShooter(metaclass=SingletonType):
         sub_socket, context = self.__create_listen()
         print("构建监听", self.connect_status)
         logger.info("构建监听,%s", self.connect_status)
+        # 创建并保存事件循环引用
+        self.listen_loop = asyncio.new_event_loop()
+        asyncio.set_event_loop(self.listen_loop)
         while True:
             self.listen_init = True
             if self.callback_listen == None:
                 continue
+            try:
+                # 创建任务并立即运行,设置超时以避免阻塞
+                future = asyncio.ensure_future(self.asyncMessageListen())
+                # 运行任务,但设置超时以避免无限等待
+                self.listen_loop.run_until_complete(
+                    asyncio.wait_for(future, timeout=0.1)
+                )
+            except asyncio.TimeoutError:
+                # 超时是正常的,表示没有消息需要处理
+                pass
+            except Exception as e:
+                # 处理其他可能的异常
+                print(f"处理异步消息时出错: {e}")
             # camera_states, camera_msg = await self.GetCameraInfo(is_send=False)
             # if not camera_states:
             #     print("相机未连接回调打印", camera_states, camera_msg)

File diff ditekan karena terlalu besar
+ 452 - 191
python/service/base_deal.py


+ 73 - 73
python/service/run_main.py

@@ -270,28 +270,28 @@ class RunMain:
             goods_art_no_folder_data["folder_name"]
             for goods_art_no_folder_data in all_goods_art_no_folder_data
         ]
-        try:
-            loop = asyncio.get_event_loop()
-            loop.create_task(sendSocketMessage(
-                code=0,
-                msg="开始处理抠图",
-                data={
-                    "status": "进行中",
-                    "goods_art_nos": goods_arts,
-                },
-                msg_type="segment_progress",
-            ))
-        except:
-            print('An exception occurred')
-            asyncio.run(sendSocketMessage(
-                code=0,
-                msg="开始处理抠图",
-                data={
-                    "status": "进行中",
-                    "goods_art_nos": goods_arts,
-                },
-                msg_type="segment_progress",
-            ))
+        # try:
+        #     loop = asyncio.get_event_loop()
+        #     loop.create_task(sendSocketMessage(
+        #         code=0,
+        #         msg="开始处理抠图",
+        #         data={
+        #             "status": "进行中",
+        #             "goods_art_nos": goods_arts,
+        #         },
+        #         msg_type="segment_progress",
+        #     ))
+        # except:
+        #     print('An exception occurred')
+        #     asyncio.run(sendSocketMessage(
+        #         code=0,
+        #         msg="开始处理抠图",
+        #         data={
+        #             "status": "进行中",
+        #             "goods_art_nos": goods_arts,
+        #         },
+        #         msg_type="segment_progress",
+        #     ))
         if do_next:
             all_goods_art_no_folder_data = [
                 x for x in all_goods_art_no_folder_data if x["label"] == "待处理"
@@ -312,32 +312,32 @@ class RunMain:
             return new_func
         else:
             print("已结束抠图处理")
-            try:
-                loop = asyncio.get_event_loop()
-                loop.create_task(
-                    sendSocketMessage(
-                        code=0,
-                        msg="抠图结束",
-                        data={
-                            "status": "已完成",
-                            "goods_art_nos": goods_arts,
-                        },
-                        msg_type="segment_progress",
-                    )
-                )
-            except:
-                print('An exception occurred')
-                asyncio.run(
-                    sendSocketMessage(
-                        code=0,
-                        msg="抠图结束",
-                        data={
-                            "status": "已完成",
-                            "goods_art_nos": goods_arts,
-                        },
-                        msg_type="segment_progress",
-                    )
-                )
+            # try:
+            #     loop = asyncio.get_event_loop()
+            #     loop.create_task(
+            #         sendSocketMessage(
+            #             code=0,
+            #             msg="抠图结束",
+            #             data={
+            #                 "status": "已完成",
+            #                 "goods_art_nos": goods_arts,
+            #             },
+            #             msg_type="segment_progress",
+            #         )
+            #     )
+            # except:
+            #     print('An exception occurred')
+            #     asyncio.run(
+            #         sendSocketMessage(
+            #             code=0,
+            #             msg="抠图结束",
+            #             data={
+            #                 "status": "已完成",
+            #                 "goods_art_nos": goods_arts,
+            #             },
+            #             msg_type="segment_progress",
+            #         )
+            #     )
             return True
 
     def do_run_cutout_image(
@@ -395,31 +395,31 @@ class RunMain:
             page="抠图结束",
             data=goods_arts,
         )
-        try:
-            loop = asyncio.get_event_loop()
-            loop.create_task(
-                    sendSocketMessage(
-                        code=0,
-                        msg="抠图结束",
-                        data={
-                            "status": "已完成",
-                            "goods_art_nos": goods_arts,
-                        },
-                        msg_type="segment_progress",
-                    )
-                )
-        except:
-            asyncio.run(
-                sendSocketMessage(
-                    code=0,
-                    msg="抠图结束",
-                    data={
-                        "status": "已完成",
-                        "goods_art_nos": goods_arts,
-                    },
-                    msg_type="segment_progress",
-                )
-            )
+        # try:
+        #     loop = asyncio.get_event_loop()
+        #     loop.create_task(
+        #             sendSocketMessage(
+        #                 code=0,
+        #                 msg="抠图结束",
+        #                 data={
+        #                     "status": "已完成",
+        #                     "goods_art_nos": goods_arts,
+        #                 },
+        #                 msg_type="segment_progress",
+        #             )
+        #         )
+        # except:
+        #     asyncio.run(
+        #         sendSocketMessage(
+        #             code=0,
+        #             msg="抠图结束",
+        #             data={
+        #                 "status": "已完成",
+        #                 "goods_art_nos": goods_arts,
+        #             },
+        #             msg_type="segment_progress",
+        #         )
+        #     )
         callback_func("已结束抠图处理")
         return True
 

+ 2 - 2
python/sockets/socket_server.py

@@ -50,14 +50,14 @@ async def websocket_endpoint(websocket: WebSocket):
         smart_shooter.callback_listen = MsgCallback
         # 创建任务来并发处理不同类型的消息
         handler_task = asyncio.create_task(handler_messages(websocket))
-        send_task = asyncio.create_task(send_message(websocket))
+        # send_task = asyncio.create_task(send_message(websocket))
         loop = asyncio.get_event_loop()
         listen_task = loop.run_in_executor(None, smart_shooter.connect_listen)
         # send_task = loop.run_in_executor(None, send_message(websocket))
         # 创建任务来启动 connect_listen
         # listen_task = asyncio.create_task(restart_smart_shooter_listener())
         # 等待所有任务完成
-        await asyncio.gather(handler_task, listen_task, send_task)
+        await asyncio.gather(handler_task, listen_task)
 
     except WebSocketDisconnect:
         print("Client disconnected")

Beberapa file tidak ditampilkan karena terlalu banyak file yang berubah dalam diff ini