from re import search from natsort.natsort import order_by_index from sqlalchemy import func from models import * import requests import json from logger import logger from serial.tools import list_ports from model import PhotoRecord import settings, datetime import pandas as pd from utils.hlm_http_request import forward_request from utils.utils_func import check_path from sockets.socket_client import socket_manager from mcu.DeviceControl import DeviceControl import time, shutil, os from sqlalchemy import and_, asc, desc from functools import partial from service.deal_image import DealImage from databases import DeviceConfig, SysConfigs, SqlQuery, CRUD, select, DeviceConfigTabs from service.run_main import RunMain import importlib from service.auto_deal_pics.upload_pic import UploadPic from service.OnePicTest import OnePicTest from service.base import check_move_goods_art_no_folder import win32api, win32gui, win32con from win32gui import EnumWindows, GetWindowText from service.online_request.module_online_data import OnlineDataRequest from concurrent.futures import ThreadPoolExecutor from functools import partial from service.online_request.module_online_data import AIGCDataRequest import asyncio from fastapi import BackgroundTasks import functools import traceback,stat import concurrent.futures def log_exception_with_context(context_message=""): """装饰器:为函数添加异常日志上下文""" def decorator(func): @functools.wraps(func) def wrapper(*args, **kwargs): try: return func(*args, **kwargs) except Exception as e: logger.error(f"=== 异常发生在函数: {func.__name__} ===") if context_message: logger.error(f"上下文信息: {context_message}") logger.error(f"函数参数: args={args}, kwargs={kwargs}") logger.error(f"异常类型: {type(e).__name__}") logger.error(f"异常信息: {str(e)}") logger.error("完整堆栈跟踪:") logger.error(traceback.format_exc()) raise # 重新抛出异常 return wrapper return decorator def parserGoodsDict2Aigc(return_data_check_before_detail): """获取商品组装数据""" goods_no_dict = return_data_check_before_detail.get("data", {}).get( "goods_no_dict", {} ) return goods_no_dict async def sendAsyncMessage(msg="", goods_arts=[], status="", msg_type="", data=None,progress=None): """异步发送消息""" mapping = {"segment_progress":"抠图","scene_progress":"场景图","upper_footer_progress":"模特图","detail_progress":"详情页","upload_goods_progress":"上传第三方商品"} if progress is not None: progress["msg_type"] = msg_type name = mapping.get(msg_type,None) if name: progress["name"] = name data = { "code": 0, "msg": msg, "status": 2, "data": ( data if data is not None else { "status": status, "goods_art_nos": goods_arts, } ), "progress": None if progress is None else progress, "msg_type": msg_type, } await message_queue.put(data) @app.get("/") async def index(): # await socket_manager.send_message(msg="测试") return {"message": "Hello World"} @app.get("/scan_serials", description="扫描可用的设备端口") async def scanSerials(): """扫描串口""" ports = list_ports.comports() print("Scanning", ports) return {"message": "Hello World"} @app.api_route( "/forward_request", methods=["GET", "POST"], description="代理转发hlm项目得请求" ) async def forwardRequest(request: HlmForwardRequest): """ 转发HTTP请求到目标URL :param request: FastAPI Request对象 :return: 目标接口的响应 """ try: if request.method == "GET": params = request.query_params elif request.method == "POST": params = json.dump(request.query_params) else: raise UnicornException("仅支持GET和POST方法") target_url = request.target_url method = request.method.upper() headers = request.headers if not target_url: raise UnicornException("目标url地址不能为空") # 调用 hlm_http_request 中的 forward_request 函数 response = forward_request( target_url, params=params, method=method, headers=headers ) return response except requests.RequestException as e: raise UnicornException(e) except Exception as e: raise UnicornException(e) def __createExcelGoodsArray(excel_path): '''创建通过excel形式得货号组数据''' excel_df = pd.read_excel(excel_path, sheet_name=0, header=0) if "文件夹名称" not in excel_df.columns: raise UnicornException("缺失 [文件夹名称] 列") if "商品货号" not in excel_df.columns: raise UnicornException("缺失 [商品货号] 列") if "款号" not in excel_df.columns: raise UnicornException("缺失 [款号] 列") goods_art_dirs = excel_df.groupby(excel_df["款号"]) # 抠图时用到的货号列表,与生成详情图有所区别 goods_art_no_arrays = [] # # 详情图生成需要对同款商品进行分组,保证详情图可以生成多个色 goods_art_no_group_arrays = [] for _, goods_row in excel_df.iterrows(): goods_art_no = str(goods_row["商品货号"]) goods_art_no_arrays.append(goods_art_no) goods_no = str(goods_row["款号"]) a001_df = goods_art_dirs.get_group(goods_no) goods_art_groups = a001_df["商品货号"].tolist() if goods_art_groups in goods_art_no_group_arrays: continue goods_art_no_group_arrays.append(goods_art_groups) return goods_art_no_arrays,excel_df,goods_art_no_group_arrays def group_by_style_number(data): result = {} for goods_id, info in data.items(): style_number = info["款号"] if style_number not in result: result[style_number] = [] result[style_number].append(goods_id) return result @app.post("/handle_detail") async def handle_detail_background( request: Request, params: HandlerDetail, background_tasks: BackgroundTasks ): goods_art_no_arrays = params.goods_art_no is_check = params.is_check obj = None token = "Bearer " + params.token uuid = params.uuid run_main = RunMain(obj, token, uuid) online_stores = params.online_stores # 上传第三方的店铺名称数组 is_detail = params.is_detail # 上传第三方的店铺名称数组 is_product_scene = params.is_product_scene # 上传第三方的店铺名称数组 is_upper_footer = params.is_upper_footer # 上传第三方的店铺名称数组 # 该数组表示是否需要后面的移动文件夹操作,减少重复抠图,提升抠图时间和速度 if online_stores: """如果上传第三方,则需要强制生成详情页""" params.is_detail = 1 is_detail = 1 try: excel_path = params.excel_path.strip() print("excel信息打印输出:",excel_path) if excel_path: goods_art_no_arrays,excel_df,_ = __createExcelGoodsArray(excel_path) await _process_excel_mode( goods_art_no_arrays, excel_df ) else: await _process_non_excel_mode( params, goods_art_no_arrays ) remote_data = run_main.data_mode_generate_detail.get_goods_art_no_info(goods_art_list=goods_art_no_arrays) if remote_data == {}: raise UnicornException("所有货号在商品档案资料中不存在,请检查货号是否正确") error_goods_art_no = [] for goods_art_no_check in goods_art_no_arrays: check_item = remote_data.get(goods_art_no_check,None) if not check_item: error_goods_art_no.append(goods_art_no_check) if len(error_goods_art_no) > 0: raise UnicornException("以下货号在商品档案资料中不存在,请检查货号是否正确:{}".format(error_goods_art_no)) except Exception as e: raise UnicornException(str(e)) progress = [{"msg_type":"segment_progress","name":"抠图","status":"等待处理","current":0,"total":0,"error":0}] if is_product_scene == 1: progress.append({"msg_type":"scene_progress","name":"场景图","status":"等待处理","current":0,"total":0,"error":0}) if is_upper_footer == 1: progress.append({"msg_type":"upper_footer_progress","name":"模特图","status":"等待处理","current":0,"total":0,"error":0}) if is_detail==1 or (params.excel_path != "" and params.excel_path != None): progress.append({"msg_type":"detail_progress","name":"详情页","status":"等待处理","current":0,"total":0,"error":0}) if len(online_stores) > 0: progress.append({"msg_type":"upload_goods_progress","name":"上传第三方商品","status":"等待处理","current":0,"total":0,"error":0}) if is_check == 1: return { "code": 0, "msg": "检测通过", "data": {"progress":progress}, } asyncio.create_task(process_handle_detail(request, params)) return {"code": 0, "msg": "任务已提交后台处理", "data": {"status": "processing"}} # 重构后的简化版本 async def process_handle_detail(request: Request, params: HandlerDetail): """处理商品详情图生成的主要函数""" try: # 初始化基础变量 handler_result = [] handler_result_folder = "" # 处理参数 obj, token, uuid = None, "Bearer " + params.token, params.uuid aigc_clazz = AIGCDataRequest(token) run_main = RunMain(obj, token, uuid) onlineData = OnlineDataRequest(token) # 提取参数 goods_art_no_arrays = params.goods_art_no online_stores = params.online_stores is_detail = params.is_detail is_product_scene = params.is_product_scene is_upper_footer = params.is_upper_footer upper_footer_params = params.upper_footer_params product_scene_prompt = params.product_scene_prompt if params.excel_path and params.excel_path.strip(): # 处理Excel模式 is_detail = 1#强制处理详情页 goods_art_no_arrays,excel_df,_ = __createExcelGoodsArray(params.excel_path) move_folder_array = await _process_excel_mode( goods_art_no_arrays, excel_df ) else: # 处理非Excel模式 move_folder_array = await _process_non_excel_mode( params, goods_art_no_arrays ) # 构建配置数据 config_data = await _build_config_data(params, goods_art_no_arrays) # 执行抠图处理 handler_result_folder, handler_result = await _process_cutout( run_main, config_data, goods_art_no_arrays, move_folder_array ) # 处理场景图和模特图 return_data_check_before_detail = run_main.check_before_detail(config_data) # 检查处理结果 success_handler = return_data_check_before_detail.get("data", {}).get("config_data", {}).get("success_handler", []) failed_items = [item for item in success_handler if item.get('success') == False] if failed_items: await sendAsyncMessage( msg="处理结束", data={"output_folder": handler_result_folder, "list": failed_items}, status="处理结束", msg_type="detail_result_progress", ) # 如果全部失败,直接终止 if failed_items and len(failed_items) == len(success_handler): check_progress = { "status": "处理失败", "current": 0, "total": len(success_handler), "error": len(failed_items) } await sendAsyncMessage( msg="检查失败", goods_arts=[], status="检查失败", msg_type="segment_progress", progress=check_progress ) return True # 处理场景图 if is_product_scene == 1: return_data_check_before_detail = await _process_scene_images( aigc_clazz, run_main, return_data_check_before_detail, product_scene_prompt ) # 处理模特图 if is_upper_footer == 1: return_data_check_before_detail = await _process_model_images( aigc_clazz, run_main, return_data_check_before_detail, upper_footer_params ) # 处理详情页 if is_detail == 1: handler_result_folder, handler_result= await _process_detail_pages( run_main, return_data_check_before_detail, onlineData, online_stores, goods_art_no_arrays, handler_result_folder ) # 如果需要上传到第三方平台 if online_stores: await _upload_to_third_party( onlineData, return_data_check_before_detail, config_data, online_stores ) await sendAsyncMessage( msg="处理结束", data={"output_folder": handler_result_folder, "list": handler_result}, status="处理结束", msg_type="detail_result_progress", ) else: await sendAsyncMessage( msg="处理结束", data={"output_folder": handler_result_folder, "list": handler_result}, status="处理结束", msg_type="detail_result_progress", ) except UnicornException as e: await _handle_exception(e.msg, handler_result_folder) except Exception as e: await _handle_exception(str(e), handler_result_folder) return True async def _process_non_excel_mode(params, goods_art_no_arrays): """处理非Excel模式的准备工作""" limit_path = "output/{}".format(time.strftime("%Y-%m-%d", time.localtime(time.time()))) check_path(limit_path) move_folder_array = check_move_goods_art_no_folder("output", goods_art_no_arrays, limit_path) for goods_art_no in goods_art_no_arrays: if not goods_art_no: raise UnicornException("货号不能为空") session = SqlQuery() pr = CRUD(PhotoRecord) images = pr.read_all(session, conditions={"goods_art_no": goods_art_no}) session.close() if not images: raise UnicornException( f"商品货号【{goods_art_no}】未查询到拍摄记录,请检查货号是否正确" ) detail_counts = len(params.template_image_order.split(",")) image_counts = len(images) if image_counts < detail_counts: raise UnicornException( f"货号:[{goods_art_no}],实际照片数量:{image_counts}张,小于详情图要求数量:{detail_counts}张" ) if move_folder_array.get(goods_art_no) is None: await _process_image_copy_and_move(goods_art_no, images,False) return move_folder_array async def _process_excel_mode(goods_art_no_arrays,excel_df): """处理非Excel模式的准备工作""" limit_path = "output/{}".format(time.strftime("%Y-%m-%d", time.localtime(time.time()))) check_path(limit_path) move_folder_array = check_move_goods_art_no_folder("output", goods_art_no_arrays, limit_path) session = SqlQuery() for index, row in excel_df.iterrows(): goods_art_no_image_dir = str(row["文件夹名称"]) goods_art_no = str(row["商品货号"]) print("货号数据", goods_art_no) if not goods_art_no: raise UnicornException("货号不能为空") pr = CRUD(PhotoRecord) images = pr.read_all(session, conditions={"goods_art_no": goods_art_no}) if not images: raise UnicornException( f"商品货号【{goods_art_no}】未查询到拍摄记录,请检查表格中的货号数据列" ) # 货号目录不存在再去进行移动和创建操作 if move_folder_array.get(goods_art_no) is None: await _process_image_copy_and_move(goods_art_no_image_dir, images,True) session.close() return move_folder_array async def _process_image_copy_and_move(goods_art_no, images,is_excel): """处理图片复制和移动""" image_dir = "{}/data/".format(os.getcwd()).replace("\\", "/") check_path(image_dir) for idx, itemImg in enumerate(images): if not itemImg.image_path: raise UnicornException( f"货号【{goods_art_no}】存在没有拍摄完成的图片,请重拍或删除后重试" ) new_file_name = str(itemImg.goods_art_no) + "_" + str(idx) + ".jpg" if not os.path.exists(image_dir + "/" + os.path.basename(new_file_name)): shutil.copy(itemImg.image_path, image_dir + new_file_name) dealImage = DealImage(image_dir) resFlag, path = dealImage.dealMoveImage( image_dir=image_dir, callback_func=None, goods_art_no=goods_art_no ) if not resFlag: raise UnicornException(path) if is_excel: try: shutil.rmtree(image_dir,onerror=settings.handle_remove_readonly) except Exception as e: logger.info(f"删除图片失败:{str(e)}") async def _build_config_data(params, goods_art_no_arrays): """构建配置数据""" temp_class = {} temp_name_list = [] for tempItem in params.temp_list: temp_class[tempItem.template_id] = tempItem.template_local_classes temp_name_list.append(tempItem.template_id) cutOutMode = ( "1" if settings.getSysConfigs("other_configs", "cutout_mode", "普通抠图") == "普通抠图" else "2" ) limit_path = "output/{}".format(time.strftime("%Y-%m-%d", time.localtime(time.time()))) config_data = { "image_dir": limit_path, "image_order": ( "俯视,侧视,后跟,鞋底,内里,组合,组合2,组合3,组合4,组合5" if not params.template_image_order else params.template_image_order ), "goods_art_no": "", "goods_art_nos": goods_art_no_arrays, "is_check_number": False, "resize_image_view": "后跟", "cutout_mode": cutOutMode, "logo_path": params.logo_path, "special_goods_art_no_folder_line": "", "is_use_excel": (False if params.excel_path == "" else True), # 是否使用excel "excel_path": params.excel_path, # excel路径 "is_check_color_is_all": False, "cutout_is_pass": True, "assigned_page_dict": {}, "detail_is_pass": True, "upload_is_pass": False, "upload_is_enable": False, "is_filter": False, "temp_class": temp_class, "temp_name": params.temp_name, "temp_name_list": temp_name_list, "target_error_folder": f"{limit_path}/软件-生成详情错误", "success_handler": [], } # 动态导入类 try: temp_class_dict = {} for key, class_path in config_data["temp_class"].items(): module_path, class_name = class_path.rsplit(".", 1) module = importlib.import_module(module_path) cls = getattr(module, class_name) temp_class_dict[key] = cls except: raise UnicornException("详情页模板不存在或未下载完成,请重启软件后重试") config_data["temp_class"] = temp_class_dict return config_data async def _process_cutout(run_main, config_data, goods_art_no_arrays, move_folder_array): """处理抠图""" handler_result = [] handler_result_folder = "" return_data = run_main.check_before_cutout(config_data) cutout_res = run_main.check_for_cutout_image_first_call_back(return_data) if cutout_res: sys_path = format(os.getcwd()).replace("\\", "/") handler_result_folder = f"{sys_path}/{config_data['image_dir']}" for goods_art_item in goods_art_no_arrays: handler_result.append({ "goods_art_no": goods_art_item, "success": True, "info": "处理成功", }) if len(move_folder_array.keys()) == len(goods_art_no_arrays): progress = { "status": "处理完成", "current": len(goods_art_no_arrays), "total": len(goods_art_no_arrays), "error": 0 } await sendAsyncMessage( msg="抠图完成", data={"output_folder": handler_result_folder, "list": handler_result}, status="抠图完成", msg_type="segment_progress", progress=progress ) return handler_result_folder, handler_result async def _process_scene_images(aigc_clazz, run_main, return_data_check_before_detail, product_scene_prompt): """处理场景图生成""" # 参数验证 if not product_scene_prompt: raise UnicornException("请填写场景描述") goods_dict = parserGoodsDict2Aigc(return_data_check_before_detail) new_goods_dict = {} product_scene_total_progress = len(goods_dict.keys()) product_scene_finish_progress = 0 product_scene_error_progress = 0 product_scene_progress = { "status": "正在处理", "current": product_scene_finish_progress, "total": product_scene_total_progress, "error": product_scene_error_progress } await sendAsyncMessage( msg="开始处理场景图", goods_arts=[goods_art_no for goods_art_no in goods_dict.keys()], status="开始处理", msg_type="scene_progress", progress=product_scene_progress ) for goods_art_no in goods_dict.keys(): try: product_scene_finish_progress += 1 product_scene_progress = { "status": "正在处理", "goods_art_no": goods_art_no, "current": product_scene_finish_progress, "total": product_scene_total_progress, "error": product_scene_error_progress } await sendAsyncMessage( msg="正在处理场景图", goods_arts=[goods_art_no], status="正在处理场景图", msg_type="scene_progress", progress=product_scene_progress ) goods_art_dict_info = goods_dict[goods_art_no] first_goods_art_no_info = goods_art_dict_info.get("货号资料", [])[0] first_pics = first_goods_art_no_info.get("pics") ceshi_image_path = first_pics.get("侧视-抠图") save_root_path = ceshi_image_path.split("阴影图处理")[0] save_image_path = f"{save_root_path}场景图.jpg" if os.path.isfile(save_image_path): goods_art_dict_info["场景图"] = save_image_path new_goods_dict[goods_art_no] = goods_art_dict_info continue aigc_clazz.center_paste_image(ceshi_image_path, save_image_path) image_path = aigc_clazz.generateProductScene( save_image_path, product_scene_prompt, save_image_path ) goods_art_dict_info["场景图"] = image_path new_goods_dict[goods_art_no] = goods_art_dict_info product_scene_progress = { "status": "正在处理", "current": product_scene_finish_progress, "total": product_scene_total_progress, "error": product_scene_error_progress, "goods_art_no": goods_art_no, } await sendAsyncMessage( msg="场景图处理完成", goods_arts=[goods_art_no], status="场景图处理完成", msg_type="scene_progress", progress=product_scene_progress ) except Exception as e: os.remove(save_image_path) product_scene_finish_progress -= 1 product_scene_error_progress += 1 product_scene_progress = { "status": "处理失败", "goods_art_no": goods_art_no, "current": product_scene_finish_progress, "total": product_scene_total_progress, "error": product_scene_error_progress } await sendAsyncMessage( msg="场景图处理失败", goods_arts=[goods_art_no], status="场景图处理失败", msg_type="scene_progress", progress=product_scene_progress ) status_text = "处理完成" if product_scene_finish_progress > 0 else "处理失败" product_scene_progress = { "status": status_text, "current": product_scene_finish_progress, "total": product_scene_total_progress, "error": product_scene_error_progress } await sendAsyncMessage( msg="场景图处理结束", goods_arts=[], status="场景图处理结束", msg_type="scene_progress", progress=product_scene_progress ) if new_goods_dict: return_data_check_before_detail["data"]["goods_no_dict"] = new_goods_dict return return_data_check_before_detail async def _process_model_images(aigc_clazz, run_main, return_data_check_before_detail, upper_footer_params): """处理模特图生成""" # 参数验证 if not upper_footer_params: raise UnicornException("请选择模特") man_id = upper_footer_params.get("man_id") women_id = upper_footer_params.get("women_id") if not man_id: raise UnicornException("请选择男模特") if not women_id: raise UnicornException("请选择女模特") goods_dict = parserGoodsDict2Aigc(return_data_check_before_detail) new_goods_dict = {} upper_footer_total_progress = len(goods_dict.keys()) upper_footer_finish_progress = 0 upper_footer_error_progress = 0 upper_footer_progress = { "status": "正在处理", "current": upper_footer_finish_progress, "total": upper_footer_total_progress, "error": upper_footer_error_progress } await sendAsyncMessage( msg="开始处理模特图", goods_arts=list(goods_dict.keys()), status="开始处理模特图", msg_type="upper_footer_progress", progress=upper_footer_progress ) for goods_art_no in goods_dict.keys(): upper_footer_finish_progress += 1 try: upper_footer_progress = { "status": "正在处理", "goods_art_no": goods_art_no, "current": upper_footer_finish_progress, "total": upper_footer_total_progress, "error": upper_footer_error_progress } await sendAsyncMessage( msg="正在处理模特图", goods_arts=[goods_art_no], status="正在处理模特图", msg_type="upper_footer_progress", progress=upper_footer_progress ) goods_art_dict_info = goods_dict[goods_art_no] first_goods_art_no_info = goods_art_dict_info.get("货号资料", [])[0] first_pics = first_goods_art_no_info.get("pics") gender = goods_art_dict_info.get("性别","女") model_id = man_id if "男" in gender else women_id ceshi_image_path = first_pics.get("侧视-抠图") save_root_path = ceshi_image_path.split("阴影图处理")[0] save_image_path = f"{save_root_path}模特图.jpg" if os.path.isfile(save_image_path): goods_art_dict_info["模特图"] = save_image_path new_goods_dict[goods_art_no] = goods_art_dict_info continue shutil.copy(ceshi_image_path, save_image_path) # 添加超时控制 with concurrent.futures.ThreadPoolExecutor() as executor: future = executor.submit( aigc_clazz.generateUpperShoes, save_image_path, model_id, save_image_path ) # 设置70秒超时 image_path = future.result(timeout=70) goods_art_dict_info["模特图"] = image_path new_goods_dict[goods_art_no] = goods_art_dict_info upper_footer_progress = { "status": "正在处理", "goods_art_no": goods_art_no, "current": upper_footer_finish_progress, "total": upper_footer_total_progress, "error": upper_footer_error_progress } await sendAsyncMessage( msg="模特图处理成功", goods_arts=[goods_art_no], status="模特图处理成功", msg_type="upper_footer_progress", progress=upper_footer_progress ) except (concurrent.futures.TimeoutError, Exception) as e: os.remove(save_image_path) upper_footer_finish_progress-=1 upper_footer_error_progress += 1 upper_footer_progress = { "status": "处理失败", "goods_art_no": goods_art_no, "current": upper_footer_finish_progress, "total": upper_footer_total_progress, "error": upper_footer_error_progress } await sendAsyncMessage( msg="模特图处理失败", goods_arts=[goods_art_no], status="模特图处理失败", msg_type="upper_footer_progress", progress=upper_footer_progress ) status_text = "处理完成" if upper_footer_finish_progress > 0 else "处理失败" upper_footer_progress = { "status": status_text, "current": upper_footer_finish_progress, "total": upper_footer_total_progress, "error": upper_footer_error_progress } await sendAsyncMessage( msg="模特图处理结束", goods_arts=[], status="模特图处理结束", msg_type="upper_footer_progress", progress=upper_footer_progress ) if new_goods_dict: return_data_check_before_detail["data"]["goods_no_dict"] = new_goods_dict return return_data_check_before_detail async def _process_detail_pages(run_main, return_data_check_before_detail, onlineData, online_stores, goods_art_no_arrays, handler_result_folder): """处理详情页生成和上传""" check_for_detail_first_res = run_main.check_for_detail_first_call_back( return_data_check_before_detail ) print("<======>check_for_detail_first_res<======>",check_for_detail_first_res) if isinstance(check_for_detail_first_res, partial): result = check_for_detail_first_res() try: config_data = result["config_data"] except: config_data = result else: config_data = check_for_detail_first_res out_put_dir = config_data.get("out_put_dir") success_handler = config_data.get("success_handler") if not out_put_dir: handler_result_folder = "" # 处理失败情况 else: out_put_dir_path = "{}/{}".format(os.getcwd(), out_put_dir).replace("\\", "/") handler_result_folder = os.path.dirname(out_put_dir_path) if handler_result_folder: handler_result_folder = os.path.dirname(handler_result_folder) return handler_result_folder,success_handler async def _upload_to_third_party(onlineData, return_data_check_before_detail, config_data, online_stores): """上传到第三方平台""" result_goods_no_dict = return_data_check_before_detail["data"]["goods_no_dict"] for goods_idx, goods_no_dict in enumerate(result_goods_no_dict.keys()): all_detail_path_list = config_data["all_detail_path_list"] for detail_path in all_detail_path_list: if goods_no_dict in detail_path: detail_path_replace = detail_path.replace("\\", "/") result_goods_no_dict[goods_no_dict]["detail_path"] = f"{detail_path_replace}/详情页.jpg" upload_total_progress = len(result_goods_no_dict.keys()) upload_finish_progress = 0 upload_error_progress = 0 upload_progress = { "status": "开始处理", "current": upload_finish_progress, "total": upload_total_progress, "error": upload_error_progress } await sendAsyncMessage( msg="开始上传商品数据", goods_arts=[], status="开始上传商品数据", msg_type="upload_goods_progress", progress=upload_progress ) try: onlineData.uploadGoods2ThirdParty( result_goods_no_dict, online_stores=online_stores ) except Exception as e: upload_error_progress = upload_total_progress print(f"上传任务出现错误:{e}") upload_finish_progress = upload_total_progress if upload_error_progress > 0: upload_finish_progress = 0 upload_progress = { "status": "处理完成", "current": upload_finish_progress, "total": upload_total_progress, "error": upload_error_progress } await sendAsyncMessage( msg="商品上传第三方成功", goods_arts=[], status="商品上传第三方成功", msg_type="upload_goods_progress", progress=upload_progress ) async def _handle_exception(error_msg, handler_result_folder): """处理异常情况""" handler_result = [{"goods_art_no": "", "success": False, "info": error_msg}] await sendAsyncMessage( msg="处理结束", data={"output_folder": handler_result_folder, "list": handler_result}, status="处理结束", msg_type="detail_result_progress", ) @app.get("/get_device_tabs", description="获取可执行程序命令列表") def get_device_tabs(type: int): session = SqlQuery() statement = ( select(DeviceConfigTabs) .where(DeviceConfigTabs.mode_type == type) .order_by(asc("id")) ) result = session.exec(statement).all() sys = CRUD(SysConfigs) action_configs = sys.read(session, conditions={"key": "action_configs"}) session.close() return { "code": 0, "msg": "", "data": {"tabs": result, "select_configs": json.loads(action_configs.value)}, } @app.post("/update_tab_name", description="更改tab名称") def update_tab_name(params: DeviceConfigTabsReq): if params.mode_name == "": return {"code": 1, "msg": "名称不能为空", "data": {}} session = SqlQuery() tabModel = CRUD(DeviceConfigTabs) kwargs = {"mode_name": params.mode_name} tabModel.updateConditions(session, conditions={"id": params.id}, **kwargs) session.close() return { "code": 0, "msg": "", "data": None, } @app.post("/get_device_configs", description="获取可执行程序命令列表") def get_device_configs(params: ModelGetDeviceConfig): tab_id = params.tab_id session = SqlQuery() configModel = CRUD(DeviceConfig) configList = configModel.read_all( session, conditions={"tab_id": tab_id}, order_by="action_index", ascending=True, ) session.close() return { "code": 0, "msg": "", "data": {"list": configList}, } @app.post("/device_config_detail", description="获取可执行程序详情") def device_config_detail(params: ModelGetDeviceConfigDetail): action_id = params.id session = SqlQuery() configModel = CRUD(DeviceConfig) model = configModel.read(session, conditions={"id": action_id}) session.close() if model == None: return {"code": 1, "msg": "数据不存在", "data": None} return {"code": 0, "msg": "", "data": model} @app.post("/device_config_detail_query", description="通过条件获取可执行程序详情") def device_config_detail_query(): # tab_id = params.tab_id # action_name = params.action_name session = SqlQuery() sys = CRUD(SysConfigs) action_configs = sys.read(session, conditions={"key": "action_configs"}) action_configs_value = json.loads(action_configs.value) left_config = action_configs_value.get("left") configModel = CRUD(DeviceConfig) model = configModel.read( session, conditions={"tab_id": left_config, "action_name": "侧视"} ) if model == None: model = configModel.read(session, conditions={"tab_id": left_config}) session.close() return {"code": 0, "msg": "", "data": model} @app.post("/remove_config", description="删除一条可执行命令") def get_device_configs(params: ModelGetDeviceConfigDetail): action_id = params.id session = SqlQuery() configModel = CRUD(DeviceConfig) model = configModel.read(session, conditions={"id": action_id}) if model == None: return {"code": 1, "msg": "数据不存在", "data": None} if model.is_system == True: return {"code": 1, "msg": "系统配置不允许删除", "data": None} configArray = configModel.read_all(session, conditions={"tab_id": model.tab_id}) if len(configArray) == 1: return {"code": 1, "msg": "请至少保留一个配置", "data": None} configModel.delete(session, obj_id=action_id) session.close() return {"code": 0, "msg": "删除成功", "data": None} @app.post("/save_device_config", description="创建或修改一条可执行命令") def save_device_config(params: SaveDeviceConfig): action_id = params.id session = SqlQuery() deviceConfig = CRUD(DeviceConfig) if action_id == None or action_id == 0: # 走新增逻辑 params.id = None save_device_config = deviceConfig.create(session, obj_in=params) else: model = deviceConfig.read(session, conditions={"id": action_id}) if model == None: return {"code": 1, "msg": "数据不存在", "data": None} # 走编辑逻辑 kwargs = params.__dict__ save_device_config = deviceConfig.update(session, obj_id=action_id, **kwargs) session.close() return {"code": 0, "msg": "操作成功", "data": save_device_config} @app.post("/reset_config", description="创建或修改一条可执行命令") def reset_config(params: ModelGetDeviceConfig): tab_id = params.tab_id if tab_id == None or tab_id == "": return {"code": 1, "msg": "参数错误", "data": None} session = SqlQuery() deviceConfig = CRUD(DeviceConfig) first_config = deviceConfig.read(session, conditions={"tab_id": tab_id}) res = deviceConfig.deleteConditions(session, conditions={"tab_id": tab_id}) if res is False: return {"code": 1, "msg": "操作失败", "data": None} actions = json.load(open("action.json", encoding="utf-8")) for data in actions: data["tab_id"] = tab_id data["is_system"] = first_config.is_system device_config = DeviceConfig(**data) session.add(device_config) session.commit() session.close() return {"code": 0, "msg": "操作成功", "data": None} @app.get("/get_photo_records", description="获取拍照记录") def get_photo_records(page: int = 1, size: int = 5): session = SqlQuery() # photos = CRUD(PhotoRecord) print("准备查询拍摄记录", datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")) statement = ( select(PhotoRecord) .offset((page - 1) * size) .limit(size) .order_by(desc("id")) .group_by("goods_art_no") ) list = [] result = session.exec(statement).all() print("group 完成 ", datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")) join_conditions = [ { "model": DeviceConfig, "on": PhotoRecord.action_id == DeviceConfig.id, "is_outer": False, # 可选,默认False,设为True则为LEFT JOIN } ] for item in result: query = ( select(PhotoRecord, DeviceConfig.action_name) .where(PhotoRecord.goods_art_no == item.goods_art_no) .join(DeviceConfig, PhotoRecord.action_id == DeviceConfig.id) ) list_item = session.exec(query).mappings().all() list.append( { "goods_art_no": item.goods_art_no, "action_time": item.create_time, "items": list_item, } ) session.close() print("循环查询 完成 ", datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")) return { "code": 0, "msg": "", "data": {"list": list, "page": page, "size": size}, } @app.get("/get_last_photo_record", description="获取最后一条拍照记录") def get_last_photo_record(): session = SqlQuery() statement = ( select(PhotoRecord) .where(PhotoRecord.image_path != None) .order_by(desc("photo_create_time")) ) result = session.exec(statement).first() session.close() return { "code": 0, "msg": "", "data": result, } @app.get("/get_photo_record_detail", description="通过货号获取拍照记录详情") def get_photo_record_detail(goods_art_no: str = None): if goods_art_no == None: return {"code": 1, "msg": "参数错误", "data": None} session = SqlQuery() photos = CRUD(PhotoRecord) items = photos.read_all(session, conditions={"goods_art_no": goods_art_no}) session.close() return { "code": 0, "msg": "", "data": {"list": items}, } @app.post("/delect_goods_arts", description="通过货号删除记录") def delect_goods_arts(params: PhotoRecordDelete): session = SqlQuery() photos = CRUD(PhotoRecord) for item in params.goods_art_nos: photos.deleteConditions(session, conditions={"goods_art_no": item}) session.close() return { "code": 0, "msg": "操作成功", "data": None, } def query_config_by_key(key_name): """ 在sys_configs.json格式的数据中查询指定的key,如果匹配则返回整个对象 Args: key_name (str): 要查询的key名称 Returns: dict or None: 匹配的对象或None(如果没有找到) """ try: # 获取所有配置数据 configs = json.load(open("sys_configs.json", encoding="utf-8")) # 遍历配置数据查找匹配的key for config in configs: if config.get("key") == key_name: return config return None except Exception as e: print(f"查询配置时出错: {e}") return None @app.get("/get_sys_config", description="查询系统配置") def get_sys_config(key: str = None): if key == None: return {"code": 1, "msg": "参数错误", "data": None} session = SqlQuery() photos = CRUD(SysConfigs) item = photos.read(session, conditions={"key": key}) search_data = None if item == None else json.loads(item.value) if search_data == None: sys_config_json = query_config_by_key(key) if sys_config_json != None: config = SysConfigs(**sys_config_json) session.add(config) session.commit() # 合并事务提交 search_data = json.loads(sys_config_json.get("value")) session.close() return { "code": 0, "msg": "", "data": search_data, } @app.post("/update_left_right_config", description="更新左右脚配置") def update_left_right_config(params: LeftRightParams): session = SqlQuery() sysConfig = CRUD(SysConfigs) model = sysConfig.read(session, conditions={"key": "action_configs"}) if model == None: return {"code": 1, "msg": "配置不存在", "data": None} config_value = json.loads(model.value) config_value[params.type] = params.id update_value = json.dumps(config_value) # 走编辑逻辑 kwargs = {"key": "action_configs", "value": update_value} save_device_config = sysConfig.updateConditions( session, conditions={"key": "action_configs"}, **kwargs ) session.close() return {"code": 0, "msg": "操作成功", "data": None} @app.post("/update_record", description="更新拍照记录") def update_record(params: RecordUpdate): session = SqlQuery() photoRecord = CRUD(PhotoRecord) model = photoRecord.read(session, conditions={"id": params.id}) if model == None: return {"code": 1, "msg": "记录不存在", "data": None} kwargs = params.__dict__ save_device_config = photoRecord.update(session, obj_id=params.id, **kwargs) session.close() return {"code": 0, "msg": "操作成功", "data": save_device_config} @app.post("/update_sys_configs", description="创建或修改系统配置") def save_sys_configs(params: SysConfigParams): session = SqlQuery() sysConfig = CRUD(SysConfigs) model = sysConfig.read(session, conditions={"key": params.key}) if model == None: return {"code": 1, "msg": "配置不存在", "data": None} # 走编辑逻辑 kwargs = params.__dict__ save_device_config = sysConfig.updateConditions( session, conditions={"key": params.key}, **kwargs ) session.close() return {"code": 0, "msg": "操作成功", "data": save_device_config} @app.post("/create_main_image", description="创建主图测试") def create_main_image(params: MaineImageTest): file_path = params.file_path onePic = OnePicTest(pic_path=file_path) main_out_path = onePic.HandlerMainImage() return {"code": 0, "msg": "操作成功", "data": {"main_out_path": main_out_path}} def insertEmptyLogoList(session): """插入空logo列表""" data = {"key": "logo_configs", "value": "[]"} config = SysConfigs(**data) session.add(config) session.commit() item = SysConfigs() item.key = "logo_configs" item.value = "[]" return item @app.get("/logo_list", description="logo列表") def logo_list(): logo_dir = "{}/data/logo/".format(os.getcwd()).replace("\\", "/") check_path(logo_dir) logo_files = os.listdir(logo_dir) logo_list = [] for logoItem in logo_files: logo_list.append(f"{logo_dir}{logoItem}") return {"code": 0, "msg": "操作成功", "data": logo_list} @app.post("/add_logo", description="添加logo") def add_logo(params: LogoParams): logo_path = params.logo_path.replace("\\", "/") session = SqlQuery() sysConfig = CRUD(SysConfigs) item = sysConfig.read(session, conditions={"key": "logo_configs"}) if item == None: item = insertEmptyLogoList(session) if os.path.isfile(logo_path) == False: return {"code": 1, "msg": "logo文件不存在", "data": None} logo_dir = "{}/data/logo/".format(os.getcwd()).replace("\\", "/") check_path(logo_dir) session.close() fpath, fname = os.path.split(logo_path) logo_path_info = logo_dir + fname shutil.copy(logo_path, logo_path_info) # 复制文件 logo_files = os.listdir(logo_dir) logo_list = [] for logoItem in logo_files: logo_list.append(f"{logo_dir}{logoItem}") return { "code": 0, "msg": "", "data": {"logo": logo_path_info}, } @app.post("/delete_logo", description="删除logo") def delete_logo(params: LogoParamsupdate): logo_path = params.path if os.path.isfile(logo_path) == False: return {"code": 1, "msg": "logo文件不存在", "data": None} os.remove(logo_path) logo_dir = "{}/data/logo/".format(os.getcwd()).replace("\\", "/") check_path(logo_dir) logo_files = os.listdir(logo_dir) logo_list = [] for logoItem in logo_files: logo_list.append(f"{logo_dir}{logoItem}") return {"code": 0, "msg": "操作成功", "data": logo_list} @app.post("/close_other_window", description="关闭窗口") def close_other_window(): hwnd_list = [] def callback(hwnd, _): title = GetWindowText(hwnd) if title == "digiCamControl by Duka Istvan": hwnd_list.append(hwnd) EnumWindows(callback, None) if hwnd_list: hwnd = hwnd_list[0] win32gui.PostMessage(hwnd, win32con.WM_CLOSE, 0, 0) return {"code": 0, "msg": "关闭成功", "data": {"status": True}} return {"code": 0, "msg": "关闭失败", "data": {"status": False}} def syncUserJsonConfigs(token): hlm_token = token headers = { "Authorization": f"Bearer {hlm_token}", "content-type": "application/json", } # 追加配置参数 machine_type 拍照机设备类型;0鞋;1服装 url = settings.DOMAIN + f"/api/ai_image/camera_machine/get_all_user_configs?machine_type={MACHINE_TYPE}" result = requests.get(url=url, headers=headers) sys_configs = result.json().get("data", {}).get("configs") session = SqlQuery() sysConfigs = CRUD(SysConfigs) if sys_configs: sysConfigs.deleteConditions(session, {}) configList = [] for config_keys in sys_configs.keys(): sys_configs[config_keys] configList.append( { "key": config_keys, "value": json.dumps(sys_configs[config_keys], ensure_ascii=False), } ) batch_insert_sys_configs(session, configList) session.close() @app.post("/sync_sys_configs", description="同步线上配置到本地") def sync_sys_configs(params: SyncLocalConfigs): hlm_token = params.token headers = { "Authorization": f"Bearer {hlm_token}", "content-type": "application/json", } # 追加配置参数 machine_type 拍照机设备类型;0鞋;1服装 url = settings.DOMAIN + f"/api/ai_image/camera_machine/get_all_user_configs?machine_type={MACHINE_TYPE}" result = requests.get(url=url, headers=headers) sys_configs = result.json().get("data", {}).get("configs") session = SqlQuery() sysConfigs = CRUD(SysConfigs) if sys_configs: sysConfigs.deleteConditions(session, {}) configList = [] for config_keys in sys_configs.keys(): sys_configs[config_keys] configList.append( { "key": config_keys, "value": json.dumps(sys_configs[config_keys], ensure_ascii=False), } ) batch_insert_sys_configs(session, configList) else: all_configs = sysConfigs.read_all(session) localConfigData = {} for local_config in all_configs: localConfigData[local_config.key] = json.loads(local_config.value) data_json = json.dumps({"configs": localConfigData,"machine_type":MACHINE_TYPE}, ensure_ascii=False) # 同步本地到线上 url = settings.DOMAIN + "/api/ai_image/camera_machine/update_all_user_configs" requests.post(url=url, headers=headers, data=data_json) session.close() return {"code": 0, "msg": "操作成功", "data": None} @app.post("/sync_actions", description="同步左右脚配置到本地") def sync_action_configs(params: SyncLocalConfigs): hlm_token = params.token headers = { "Authorization": f"Bearer {hlm_token}", "content-type": "application/json", } url = settings.DOMAIN + f"/api/ai_image/camera_machine/get_all_user_tabs?machine_type={MACHINE_TYPE}" result = requests.get(url=url, headers=headers) session = SqlQuery() deviceConfigs = CRUD(DeviceConfig) deviceConfigTabs = CRUD(DeviceConfigTabs) tabs = result.json().get("data", {}).get("tabs") actions = result.json().get("data", {}).get("actions") if tabs: # 先删除再创建 deviceConfigTabs.deleteConditions(session, {}) deviceConfigs.deleteConditions(session, {}) batch_insert_device_configsNew(session, tabs, actions) else: all_actions = deviceConfigs.read_all(session) all_tabs = deviceConfigTabs.read_all(session) all_tabs_json = [item.model_dump(mode='json') for item in all_tabs] all_actions_json = [item.model_dump(mode="json") for item in all_actions] data_json = json.dumps( {"tabs": all_tabs_json, "actions": all_actions_json,"machine_type":MACHINE_TYPE}, ensure_ascii=False ) sync_url = settings.DOMAIN + "/api/ai_image/camera_machine/sync_actions" result = requests.post(url=sync_url, headers=headers, data=data_json) tabs = result.json().get("data", {}).get("tabs") actions = result.json().get("data", {}).get("actions") insert_action_ids = result.json().get("data", {}).get("insert_action_ids") if tabs: deviceConfigTabs.deleteConditions(session, {}) deviceConfigs.deleteConditions(session, {}) batch_insert_device_configsNew(session, tabs, actions) if insert_action_ids: for action_item in insert_action_ids: photos = CRUD(PhotoRecord) old_id = action_item.get("old_id") new_id = action_item.get("new_id") kwargs = {"action_id": new_id} photos.updateConditionsAll( session, conditions={"action_id": old_id}, **kwargs ) # 因为左右脚线上id可能会发生变化 所以需要重新同步一下本地得配置信息 # syncUserJsonConfigs(hlm_token) session.close() return {"code": 0, "msg": "操作成功", "data": None}