Explorar el Código

消息执行机制

rambo hace 4 meses
padre
commit
a4e837603b

+ 96 - 34
python/api.py

@@ -29,6 +29,33 @@ from service.online_request.module_online_data import OnlineDataRequest
 from concurrent.futures import ThreadPoolExecutor
 from functools import partial
 from service.online_request.module_online_data import AIGCDataRequest
+import asyncio
+from fastapi import BackgroundTasks
+import functools
+import traceback
+
+def log_exception_with_context(context_message=""):
+    """装饰器:为函数添加异常日志上下文"""
+
+    def decorator(func):
+        @functools.wraps(func)
+        def wrapper(*args, **kwargs):
+            try:
+                return func(*args, **kwargs)
+            except Exception as e:
+                logger.error(f"=== 异常发生在函数: {func.__name__} ===")
+                if context_message:
+                    logger.error(f"上下文信息: {context_message}")
+                logger.error(f"函数参数: args={args}, kwargs={kwargs}")
+                logger.error(f"异常类型: {type(e).__name__}")
+                logger.error(f"异常信息: {str(e)}")
+                logger.error("完整堆栈跟踪:")
+                logger.error(traceback.format_exc())
+                raise  # 重新抛出异常
+
+        return wrapper
+
+    return decorator
 
 
 def parserGoodsDict2Aigc(return_data_check_before_detail):
@@ -39,13 +66,13 @@ def parserGoodsDict2Aigc(return_data_check_before_detail):
     return goods_no_dict
 
 
-async def sendAsyncMessage(msg="", goods_arts=[], status="",msg_type=""):
+async def sendAsyncMessage(msg="", goods_arts=[], status="",msg_type="",data=None):
     """异步发送消息"""
     data = {
         "code": 0,
         "msg": msg,
         "status": 2,
-        "data": {
+        "data": data if data is not None else {
             "status": status,
             "goods_art_nos": goods_arts,
         },
@@ -222,7 +249,7 @@ async def fromExcelHandler(params: HandlerDetail):
         temp_class_dict[key] = cls
 
     config_data["temp_class"] = temp_class_dict
-    executor = ThreadPoolExecutor(max_workers=4)
+    # executor = ThreadPoolExecutor(max_workers=4)
     # 此处对抠图进行批量处理,保证所有的图片在生成详情图之前已经完成抠图,以保证详情图生成的效率
     return_data = run_main.check_before_cutout(config_data)
     cutout_res = run_main.check_for_cutout_image_first_call_back(return_data)
@@ -230,7 +257,7 @@ async def fromExcelHandler(params: HandlerDetail):
     try:
         if cutout_res:
             return_data_check_before_detail = run_main.check_before_detail(config_data)
-            check_for_detail_first_res = run_main.check_for_detail_first_call_back(
+            check_for_detail_first_res = await run_main.check_for_detail_first_call_back(
                 return_data_check_before_detail
             )
         if isinstance(check_for_detail_first_res, partial):
@@ -295,11 +322,13 @@ async def fromExcelHandler(params: HandlerDetail):
                 {"goods_art_no": good_art, "success": False, "info": str(e)}
             )
     handler_result_folder = "/".join(handler_result_folder.split("/")[:-1])
-    return {
-        "code": 0,
-        "msg": "",
-        "data": {"output_folder": handler_result_folder, "list": handler_result},
-    }
+    await sendAsyncMessage(
+        msg="处理结束",
+        data={"output_folder": handler_result_folder, "list": handler_result},
+        status="处理结束",
+        msg_type="detail_progress",
+    )
+    return
 
 
 def group_by_style_number(data):
@@ -315,7 +344,15 @@ def group_by_style_number(data):
 
 
 @app.post("/handle_detail")
-async def handle_detail(request: Request, params: HandlerDetail):
+async def handle_detail_background(
+    request: Request, params: HandlerDetail, background_tasks: BackgroundTasks
+):
+    # background_tasks.add_task(process_handle_detail, request, params)
+    asyncio.create_task(process_handle_detail(request, params))
+    return {"code": 0, "msg": "任务已提交后台处理", "data": {"status": "processing"}}
+
+
+async def process_handle_detail(request: Request, params: HandlerDetail):
     obj = None
     token = "Bearer " + params.token
     uuid = params.uuid
@@ -446,11 +483,18 @@ async def handle_detail(request: Request, params: HandlerDetail):
                 }
             )
     if is_only_cutout == 1:
-        return {
-            "code": 0,
-            "msg": "",
-            "data": {"output_folder": handler_result_folder, "list": handler_result},
-        }
+        # return {
+        #     "code": 0,
+        #     "msg": "",
+        #     "data": {"output_folder": handler_result_folder, "list": handler_result},
+        # }
+        await sendAsyncMessage(
+            msg="处理结束",
+            data={"output_folder": handler_result_folder, "list": handler_result},
+            status="处理结束",
+            msg_type="detail_progress",
+        )
+        return
     if is_product_scene == 1:
         if product_scene_prompt == "" or product_scene_prompt == None:
             raise UnicornException("请填写场景描述")
@@ -465,12 +509,13 @@ async def handle_detail(request: Request, params: HandlerDetail):
             raise UnicornException("请选择女模特")
     try:
         return_data_check_before_detail = run_main.check_before_detail(config_data)
+        print("报错前返回的结果数据", return_data_check_before_detail)
         if is_product_scene == 1:
             goods_dict = parserGoodsDict2Aigc(return_data_check_before_detail)
             new_goods_dict = {}
             await sendAsyncMessage(
                 msg="开始处理场景图",
-                goods_arts=list(goods_dict.keys()),
+                goods_arts=[goods_art_no for goods_art_no in goods_dict.keys()],
                 status="开始处理",
                 msg_type="scene_progress",
             )
@@ -487,7 +532,7 @@ async def handle_detail(request: Request, params: HandlerDetail):
                     continue
                 aigc_clazz.center_paste_image(ceshi_image_path, save_image_path)
                 try:
-                    image_path = aigc_clazz.generateProductScene(
+                    image_path = await aigc_clazz.generateProductScene(
                         save_image_path, product_scene_prompt, save_image_path
                     )
                     goods_art_dict_info["场景图"] = image_path
@@ -545,7 +590,7 @@ async def handle_detail(request: Request, params: HandlerDetail):
                     continue
                 shutil.copy(ceshi_image_path, save_image_path)
                 try:
-                    image_path = aigc_clazz.generateUpperShoes(
+                    image_path = await aigc_clazz.generateUpperShoes(
                         save_image_path, model_id, save_image_path
                     )
                     goods_art_dict_info["模特图"] = image_path
@@ -580,14 +625,21 @@ async def handle_detail(request: Request, params: HandlerDetail):
                     )
             return_data_check_before_detail["data"]["goods_no_dict"] = new_goods_dict
         if is_detail == 0:
-            return {
-                "code": 0,
-                "msg": "",
-                "data": {
-                    "output_folder": handler_result_folder,
-                    "list": handler_result,
-                },
-            }
+            # return {
+            #     "code": 0,
+            #     "msg": "",
+            #     "data": {
+            #         "output_folder": handler_result_folder,
+            #         "list": handler_result,
+            #     },
+            # }
+            await sendAsyncMessage(
+                msg="处理结束",
+                data={"output_folder": handler_result_folder, "list": handler_result},
+                status="处理结束",
+                msg_type="detail_progress",
+            )
+            return
         check_for_detail_first_res = await run_main.check_for_detail_first_call_back(
             return_data_check_before_detail
         )
@@ -649,9 +701,12 @@ async def handle_detail(request: Request, params: HandlerDetail):
                             status="开始上传商品数据",
                             msg_type="upload_goods_progress",
                         )
-                        await onlineData.uploadGoods2ThirdParty(
-                            result_goods_no_dict, online_stores=online_stores
-                        )
+                        try:
+                            await onlineData.uploadGoods2ThirdParty(
+                                result_goods_no_dict, online_stores=online_stores
+                            )
+                        except Exception as e:
+                            print(f'上传任务出现错误:{e}')
                         await sendAsyncMessage(
                             msg="商品上传第三方成功",
                             goods_arts=[],
@@ -671,11 +726,18 @@ async def handle_detail(request: Request, params: HandlerDetail):
         print(f"详情图生成错误信息:{e}")
         handler_result_folder = ""
         handler_result.append({"goods_art_no": "", "success": False, "info": str(e)})
-    return {
-        "code": 0,
-        "msg": "",
-        "data": {"output_folder": handler_result_folder, "list": handler_result},
-    }
+    # return {
+    #     "code": 0,
+    #     "msg": "",
+    #     "data": {"output_folder": handler_result_folder, "list": handler_result},
+    # }
+    await sendAsyncMessage(
+                msg="处理结束",
+                data={"output_folder": handler_result_folder, "list": handler_result},
+                status="处理结束",
+                msg_type="detail_progress",
+            )
+    return
 
 
 @app.get("/get_device_tabs", description="获取可执行程序命令列表")

+ 0 - 1
python/error.txt

@@ -1 +0,0 @@
-{"code":999,"message":"\u751f\u6210\u8bb0\u5f55id\u4e0d\u80fd\u4e3a\u7a7a","debug":{"database":{"total":2,"items":[{"connection":"mysql","query":"select * from `accounts` where `accounts`.`id` = '1296' and `accounts`.`deleted_at` is null limit 1;","time":1.12},{"connection":"mysql","query":"select * from `aigc_userpoints_logs` where (`partner_account_id` = '1296' and `action_name` = '\u65b0\u7528\u6237\u6ce8\u518c') limit 1;","time":0.73}]},"cache":{"hit":{"keys":["18323b96c68234597b1fa8d10fecb6bbe45cadc3","18323b96c68234597b1fa8d10fecb6bbe45cadc3","18323b96c68234597b1fa8d10fecb6bbe45cadc3","18323b96c68234597b1fa8d10fecb6bbe45cadc3","18323b96c68234597b1fa8d10fecb6bbe45cadc3","ai_gc_day_sign_in2025-08-291296","18323b96c68234597b1fa8d10fecb6bbe45cadc3","18323b96c68234597b1fa8d10fecb6bbe45cadc3","64c84e952453cd25d3097c7cfb8ba8178a0d109f"],"total":9},"miss":{"keys":["64c84e952453cd25d3097c7cfb8ba8178a0d109f"],"total":1},"write":{"keys":[],"total":0},"forget":{"keys":[],"total":0}},"profiling":[{"event":"request-time","time":0.01586294174194336}],"memory":{"usage":4364888,"peak":4389336}}}

+ 0 - 1
python/index.py

@@ -11,7 +11,6 @@ import uvicorn.lifespan.on
 from multiprocessing import Process, freeze_support
 from service.init_load_source import init_load_source
 
-
 def handle_shutdown(signum, frame):
     """关闭系统应用服务"""
     # 终止事件循环

+ 10 - 2
python/mcu/capture/smart_shooter_class.py

@@ -494,7 +494,10 @@ class SmartShooter(metaclass=SingletonType):
         logger.info("构建监听,%s", self.connect_status)
         try:
             # 尝试获取当前线程的事件循环
-            self.listen_loop = asyncio.get_event_loop()
+            try:
+                self.listen_loop = asyncio.get_running_loop()
+            except RuntimeError:
+                self.listen_loop = asyncio.new_event_loop()
         except RuntimeError:
             # 如果当前线程没有事件循环,则创建一个新的
             self.listen_loop = asyncio.new_event_loop()
@@ -507,7 +510,12 @@ class SmartShooter(metaclass=SingletonType):
                 # 创建任务并立即运行,设置超时以避免阻塞
                 # future = asyncio.ensure_future(self.asyncMessageListen())
                 # 运行任务,但设置超时以避免无限等待
-                self.listen_loop.run_until_complete(self.asyncMessageListen())
+                # 使用一致的事件循环运行异步任务
+                asyncio.run(self.asyncMessageListen())
+                # 运行任务,但设置超时以避免无限等待
+                # self.listen_loop.run_until_complete(
+                #     asyncio.wait_for(future, timeout=0.1)
+                # )
             except asyncio.TimeoutError:
                 # 超时是正常的,表示没有消息需要处理
                 pass

+ 12 - 9
python/service/multi_threaded_image_saving.py

@@ -34,17 +34,20 @@ class ImageSaver:
         :param filename: 图片保存的文件名。
         :return: 返回一个Future对象,用于查询任务状态。
         """
-        future = self.executor.submit(
+        try:
+            future = self.executor.submit(
             self._save_image_worker, image, file_path, **kwargs
         )
-        with self.lock:
-            self.tasks_dict[file_path] = {
-                "is_completed": False,
-                "create_time": time.time(),
-                "is_error": False,
-                "error_info": "",
-            }
-        return future
+            with self.lock:
+                self.tasks_dict[file_path] = {
+                    "is_completed": False,
+                    "create_time": time.time(),
+                    "is_error": False,
+                    "error_info": "",
+                }
+            return future
+        except Exception as e:
+            print(f"保存图片出现异常==>save_image:{e}")
 
     def _save_image_worker(self, image: Image, file_path: str, **kwargs) -> None:
         """

+ 13 - 11
python/service/online_request/module_online_data.py

@@ -54,7 +54,7 @@ class AIGCDataRequest(object):
         resultData = self.s.post(
             url, files={"file": open(local_path, "rb")}, headers=post_headers
         ).json()
-        self.s.close()
+        
         return resultData["data"]["url"]
 
     def center_paste_image(
@@ -113,7 +113,8 @@ class AIGCDataRequest(object):
 
         return background
 
-    def generateProductScene(self, local_path, prompt, save_path):
+    async def generateProductScene(self, local_path, prompt, save_path):
+        await asyncio.sleep(0.1)
         imageUrl = self.uploadImage(local_path)
         print("imageUrl", imageUrl)
         data = {
@@ -126,6 +127,7 @@ class AIGCDataRequest(object):
         """生成场景图"""
         url = settings.DOMAIN + "/api/ai_image/inspired/command_to_image"
         resultData = self.s.post(url, data=data, headers=self.post_headers).json()
+        
         code = resultData.get("code", 0)
         message = resultData.get("message", "")
         if code != 0:
@@ -135,16 +137,16 @@ class AIGCDataRequest(object):
             raise UnicornException("场景图生成失败")
         image_url = image_arr[0]
         save_image_path = download_image_with_pil(image_url, save_path)
-        self.s.close()
         return save_image_path
 
-    def searchProgress(self, id):
+    async def searchProgress(self, id):
+        await asyncio.sleep(0.1)
         """查询进度"""
         url = settings.DOMAIN + "/api/ai_image/main/search_bacth_progress"
         data = {"site": 1, "generate_ids": [id], "type": "aigc_pro"}
         resultData = self.s.post(url, json=data, headers=self.post_headers)
         resultData = resultData.json()
-        self.s.close()
+        
         code = resultData.get("code", 0)
         message = resultData.get("message", "")
         if code != 0:
@@ -160,7 +162,7 @@ class AIGCDataRequest(object):
         result_image = result_image_urls[0] if len(result_image_urls) > 0 else None
         return status, result_image
 
-    def generateUpperShoes(self, local_path, model_id, save_path):
+    async def generateUpperShoes(self, local_path, model_id, save_path):
         """生成上脚图"""
         print("生成上脚图", local_path, model_id, save_path)
         imageUrl = self.uploadImage(local_path)
@@ -175,7 +177,7 @@ class AIGCDataRequest(object):
         """生成上脚图"""
         url = settings.DOMAIN + "/api/ai_image/main/upper_footer"
         resultData = self.s.post(url, data=data, headers=self.post_headers).json()
-        self.s.close()
+        
         code = resultData.get("code", 0)
         message = resultData.get("message", "")
         if code != 0:
@@ -190,7 +192,7 @@ class AIGCDataRequest(object):
         print("generate_id", generate_id)
         while search_times > 0:
             print(f"查询第{search_times}次")
-            status, result_image = self.searchProgress(generate_id)
+            status, result_image = await self.searchProgress(generate_id)
             if status in [-1, 2]:
                 break
             time.sleep(1)
@@ -357,7 +359,7 @@ class OnlineDataRequest(object):
         resultData = self.s.post(
             url, files={"file": open(local_path, "rb")}, headers=post_headers
         ).json()
-        self.s.close()
+        
         return resultData["data"]["url"]
 
     def get_current_menu(self):
@@ -514,8 +516,8 @@ class OnlineDataRequest(object):
         # print("上传商品api==>url", url)
         # print("上传第三方数据打印", params)
         resultData = self.s.post(url, data=postData, headers=post_headers).json()
-        self.s.close()
-        # print("上传商品api==>resultData", resultData)
+        
+        print("上传商品api==>resultData", resultData)
         return resultData
 
     def sendSocketMessage(self, code=0, msg="", data=None, device_status=2,msg_type="upload_goods_progress"):

+ 6 - 2
python/sockets/socket_server.py

@@ -10,7 +10,9 @@ import time
 from .socket_client import socket_manager
 from sqlalchemy.exc import NoResultFound
 import os, datetime
-
+import traceback
+import logging
+logger = logging.getLogger(__name__)
 conn_manager = ConnectionManager()
 active_connections = set()
 device_ctrl = DeviceControl(websocket_manager=conn_manager)
@@ -19,6 +21,8 @@ smart_shooter = SmartShooter(websocket_manager=conn_manager)
 from utils.common import message_queue
 
 
+
+
 async def updateDataRecord(PhotoFilename, id):
     await asyncio.sleep(0.01)
     create_time = datetime.datetime.fromtimestamp(os.path.getctime(PhotoFilename))
@@ -168,7 +172,7 @@ async def send_message(websocket):
             # 检查WebSocket连接状态
             if websocket.client_state.name != "CONNECTED":
                 print("WebSocket连接已断开,停止发送消息")
-                break
+                continue
             print("发送消息中。。。。。", message)
             # 发送消息
             await websocket.send_json(message)