import json from .module_generate_goods_art_no_table import GenerateGoodsArtNoTable from func_timeout import FunctionTimedOut from .grenerate_main_image_test import GeneratePic from threading import Lock from middleware import UnicornException import settings from collections import defaultdict from .remove_bg_ali import RemoveBgALi, Picture from .deal_cutout import DealCutout import time from .image_pic_deal import OnePicDeal from natsort import natsorted, ns import os import shutil import exifread import datetime from databases import DeviceConfig, SqlQuery, CRUD from model.photo_record import PhotoRecord import requests import copy, asyncio from settings import sendSocketMessage from utils.common import message_queue from logger import logger def sendAsyncMessage(msg="", goods_arts=[], status="",progress={}): """异步发送消息""" data = { "code": 0, "msg": msg, "status": 2, "data": { "status": status, "goods_art_nos": goods_arts, }, "progress":{ "msg_type":"segment_progress", "name":"抠图", "goods_art_no":progress.get("goods_art_no",""), "status":progress.get("status"), "current":progress.get("current",0), "total":progress.get("total",0), "error":progress.get("error",0) }, "msg_type": "segment_progress", } message_queue.put_nowait(data) """ 照片自动货号匹配 将图片放置在指定文件夹下,并自动对应不同的货号进行整理 """ _Type = [".png", ".PNG", ".jpg", ".JPG", ".gif", ".GIF", ".jpge", ".JPGE"] class BaseDealImage(object): def __init__(self, image_dir=None, token=None): self.token = token self.goods_images_count_dict = defaultdict(int) # 数据模型 # self.data_mode_auto_deal_pics = DataModeAutoDealPics() self.image_dir = image_dir pass def run_main( self, all_goods_art_no_folder_data, callback_func=None, cutout_mode=None, resize_image_view=None, windows=None, logo_path=None, image_order_list=None, ): # 对所有缺失已抠图的进行抠图处理 self.run_cutout_image( all_goods_art_no_folder_data=all_goods_art_no_folder_data, callback_func=callback_func, cutout_mode=cutout_mode, windows=windows, ) error_num = 0 successful_num = 0 for goods_art_no_folder_data in all_goods_art_no_folder_data: if goods_art_no_folder_data["label"] != "待处理": continue if windows: if windows.state != 1: break folder_name = goods_art_no_folder_data["folder_name"] callback_func("开始处理文件夹========== {} ".format(folder_name)) if settings.IS_TEST: flag = self.shoes_run_one_folder_to_deal( goods_art_no_folder_data=goods_art_no_folder_data, resize_image_view=resize_image_view, logo_path=logo_path, image_order_list=image_order_list, callback_func=callback_func, windows=windows, ) if flag is None: callback_func("货号:{} 数据异常".format(folder_name)) else: if flag: successful_num += 1 callback_func("货号:{} 图片生成处理成功".format(folder_name)) else: error_num += 1 callback_func("货号:{} 图片生成处理失败".format(folder_name)) else: try: flag = self.shoes_run_one_folder_to_deal( goods_art_no_folder_data=goods_art_no_folder_data, resize_image_view=resize_image_view, logo_path=logo_path, image_order_list=image_order_list, callback_func=callback_func, windows=windows, ) if flag is None: callback_func("货号:{} 数据异常".format(folder_name)) else: if flag: successful_num += 1 callback_func( "货号:{} 图片生成处理成功".format(folder_name) ) else: error_num += 1 callback_func( "货号:{} 图片生成处理失败".format(folder_name) ) except BaseException as e: error_num += 1 import traceback traceback.print_exc() callback_func( "货号:{} 图片生成处理异常,原因:{}".format(folder_name, e) ) callback_func("处理成功:{}个,失败:{}".format(successful_num, error_num)) def checkImageAmount( self, image_dir: str, amount: int, todo_goods_art_no_folder_name_list=None ) -> dict: result = {"code": 0, "msg": "", "data": {}} for goods_art_no_folder in self.list_dir(image_dir): # 指定内容检查 if todo_goods_art_no_folder_name_list is not None: if goods_art_no_folder not in todo_goods_art_no_folder_name_list: continue if not os.path.isdir("{}/{}".format(image_dir, goods_art_no_folder)): continue if "软件" in goods_art_no_folder: continue if "无法" in goods_art_no_folder: continue if "原始图" not in self.list_dir( "{}/{}".format(image_dir, goods_art_no_folder) ): result["data"][goods_art_no_folder] = "文件夹下,没有 原始图 文件夹\n" continue # 计算单个文件夹原图数量 images = [ x for x in self.list_dir( "{}/{}/原始图".format(image_dir, goods_art_no_folder) ) ] image_num = 0 for pic_file_name in images: _, e = os.path.splitext(pic_file_name) if e in _Type: image_num += 1 # if self.number_pictures != 0: if image_num > amount: result["data"][goods_art_no_folder] = "货号图片大于{}张~\n".format( amount ) if image_num < 2: result["data"][goods_art_no_folder] = "货号图片小于于2张~\n" if result["data"]: result["code"] = 1 return result def check_folders_image_amount( self, all_goods_art_no_folder_data, image_order_list ): print("*****************check_folders_image_amount************************") amount = len(image_order_list) # print("图片数量检查===>", amount) message = "" for goods_art_no_folder_data in all_goods_art_no_folder_data: if goods_art_no_folder_data["label"] != "待处理": continue folder_path = goods_art_no_folder_data["folder_path"] folder_name = goods_art_no_folder_data["folder_name"] images = [x for x in self.list_dir("{}/原始图".format(folder_path))] image_num = 0 for pic_file_name in images: _, e = os.path.splitext(pic_file_name) if e in _Type: image_num += 1 # print("图片数量判断===>", image_num) # if image_num > amount: # goods_art_no_folder_data["label"] = "错误" # message += '货号{}:图片大于{}张~\n'.format(folder_name, amount) # if image_num < 2: # message += '货号{}:图片小于于2张~\n'.format(folder_name) return all_goods_art_no_folder_data, message def check_one_folder_image_amount(self, folder_data, amount: int): print("*****************check_one_folder_image_amount************************") # 计算单个文件夹原图数量 images = [ x for x in self.list_dir("{}/原始图".format(folder_data["folder_path"])) ] image_num = 0 for pic_file_name in images: _, e = os.path.splitext(pic_file_name) if e in _Type: image_num += 1 if image_num > amount: return False, "货号{}:图片大于{}张~\n".format( folder_data["folder_name"], amount ) if image_num < 2: return False, "货号{}:图片小于于2张~\n".format(folder_data["folder_name"]) return True, "" # 指定的图片顺序 def getImageOrder(self, image_order: str, resize_image_view: str): imageOrderList = ( image_order.replace(",", ",").replace(" ", "").replace("图", "").split(",") ) if len(set(imageOrderList)) != len(imageOrderList): return {"code": 1, "msg": "图片位置与顺序重复,请检查您的输入"} for val in imageOrderList: if val not in [ "正面", "侧视", "背面", "背侧", "组合", "组合2", "组合3", "组合4", "组合5", ]: return { "code": 1, "msg": "可选项为:正面,侧视,背面,背侧,组合,组合2,组合3,组合4,组合5", } # if resize_image_view not in imageOrderList: # return {"code": 1, "msg": "缩小的步骤必须是你填写的图片顺序中"} return {"code": 0, "msg": "sucess", "imageOrderList": imageOrderList} def shoes_run_one_folder_to_deal( self, goods_art_no_folder_data, image_order_list: list, resize_image_view: str, logo_path="", windows=None, callback_func=None, ): """ 操作步骤: 1、查询每个图片的角度 2、 """ is_successful = True folder_path = goods_art_no_folder_data["folder_path"] folder_name = goods_art_no_folder_data["folder_name"] all_original_images = self.get_images("{}/原始图".format(folder_path)) self.check_path("{}/800x800".format(folder_path)) self.crate_all_folders(folder_path) print("all_original_images====>", all_original_images) if not all_original_images: return None # _ = ["俯视", "侧视", "后跟", "鞋底", "内里"] for index, image_dict in enumerate(all_original_images): if index < len(image_order_list): image_dict["image_view"] = image_order_list[index] else: image_dict["image_view"] = "其他{}".format( len(image_order_list) - index + 1 ) # ====================处理所有图片的顺序==================== # ["俯视", "侧视", "后跟", "鞋底", "内里"] _config = { "俯视": 1, "侧视": 2, "后跟": 3, "鞋底": 4, "内里": 5, } r = time.time() n = 0 _index = 0 # 检查是否有重复的角度 _d_views = [] f = True for image_dict in all_original_images: n += 1 _index += 1 image_dict["old_file_name"] = image_dict["file_name"] image_dict["random_name"] = "{}-{}".format(r, n) if image_dict["image_view"] in _config: if image_dict["image_view"] not in _d_views: _d_views.append(image_dict["image_view"]) else: callback_func("货号:{} 处理失败".format(folder_name)) # self.show_progress_detail("货号图{} 存在多个{} 角度~".format(goods_art_no_folder, image_dict["image_view"])) return None image_dict["index"] = _config[image_dict["image_view"]] else: image_dict["index"] = _index all_original_images.sort(key=lambda x: x["index"]) # ==========直接进行处理============= i_n = 0 image_index = 0 # 图片顺序 is_image_deal_mode = 0 # 删除目录再新建 try: if os.path.exists("{}/阴影图处理".format(folder_path)): shutil.rmtree("{}/阴影图处理".format(folder_path),onerror=settings.handle_remove_readonly) except Exception as e: print('An exception occurred') logger.info(f"base deal 抠图前目录删除出现问题:{str(e)}") self.crate_all_folders(folder_path) print( "***************all_original_images*********************", all_original_images, ) # is_flip_800image = settings.getSysConfigs( # "basic_configs", "is_flip_800image", 1 # ) # image_deal_mode = int(is_flip_800image) # 服装固定写死 不执行检测反转 image_deal_mode = 0 for image_dict in all_original_images: if windows: if windows.state != 1: return None i_n += 1 image_index += 1 original_image_path = "{}/原始图/{}{}".format( folder_path, image_dict["file_name"], image_dict["e"] ) file_name = image_dict["file_name"] print("正在处理,货号:{}".format(folder_path)) # self.show_progress_detail("正在处理,货号:{}".format(file_name)) # 该文件在800images下没有时,则进行生成新的抠图 # 检查是否存在已抠图文件,如没有再去抠图 original_move_bg_image_path = "{}/原始图_已抠图/{}{}".format( folder_path, image_dict["file_name"], ".png" ) print( f"*****************此处判断鞋子是否为左右脚====>{image_index}<======={is_image_deal_mode}=======>**********************" ) # 此处判断鞋子是否为左右脚 if image_deal_mode == 1: if image_index == 1: is_image_deal_mode = 0 print("开始识别左右脚=========>") if OnePicDeal(self.token).check_shoe_is_right( image_path=original_move_bg_image_path ): is_image_deal_mode = 1 # 1表示要镜像,0表示不做镜像 print( "*************************进行800image 生成********************************************" ) """进行800image 生成""" generate_pic = GeneratePic() print( "*************************进行800image 结束====>>>>********************************************" ) out_pci_mode = "." + settings.getSysConfigs( "basic_configs", "image_out_format", "png" ) if out_pci_mode == ".jpg": out_path = "{}/800x800/{}{}".format(folder_path, file_name, ".jpg") elif out_pci_mode == ".png": out_path = "{}/800x800/{}{}".format(folder_path, file_name, ".png") else: out_path = "{}/800x800/{}{}".format( folder_path, file_name, out_pci_mode ) out_process_path_1 = "{}/阴影图处理/{}_{}_阴影{}".format( folder_path, file_name, image_dict["image_view"], ".png" ) out_process_path_2 = "{}/阴影图处理/{}_{}_抠图{}".format( folder_path, file_name, image_dict["image_view"], ".png" ) resize_mode = 1 max_box = None print("------------1", image_dict["image_view"], resize_image_view) if image_dict["image_view"] == resize_image_view: print(image_dict["image_view"], resize_image_view) resize_mode = 2 if ( settings.getSysConfigs("other_configs", "product_type", "鞋类") == "皮具" ): max_box = (1000, 1200) out_pic_size = ( [1600] if settings.getSysConfigs("basic_configs", "main_image_size", [1600]) == "" else settings.getSysConfigs("basic_configs", "main_image_size", [1600]) ) # 主图大小 if resize_mode == 2: print( "参数打印================》", is_image_deal_mode, resize_mode, out_pic_size, True if i_n == 1 else False, max_box, ) print("**********123456********************") curve_mask = True if "俯视" in image_order_list else False if not generate_pic.run( image_path=original_image_path, cut_image_path=original_move_bg_image_path, out_path=out_path, image_deal_mode=is_image_deal_mode, # resize_mode=resize_mode, resize_mode=1,#将这里得缩放模式改为强制不缩放 2025-10-22 out_pic_size=out_pic_size, is_logo=True if i_n == 1 else False, out_process_path_1=out_process_path_1, out_process_path_2=out_process_path_2, max_box=max_box, logo_path=logo_path, curve_mask=curve_mask, ): print("**********222222222222222222222222222********************") is_successful = False if is_successful: return True else: return False def to_upload_pic(self, file_path, is_resize=True): file_name = os.path.split(file_path)[1] e = os.path.splitext(file_name)[1][1:] im = Picture(file_path) if im.x > 500: im.resize(width=500) _ = { "jpg": "JPEG", "JPG": "JPEG", "JPEG": "JPEG", "jpeg": "JPEG", "png": "PNG", "PNG": "PNG", } e = _[e] image_io = im.save_to_io(e) goods_data = { "file_path": os.path.split(file_path)[1], "image_io": image_io, "e": e, } url = self.data_mode_image_cut.get_online_data.upload_pic(goods_data=goods_data) return url def get_images(self, path): image_list = [] # 过滤非图片数据 for _file in self.list_dir(path): file_name, e = os.path.splitext(_file) if e in _Type: image_list.append( { "file_path": "{}/{}".format(path, _file), "file_name": file_name, "e": e, } ) return image_list def crate_all_folders(self, root_path): path_list = ["800x800", "原始图_已抠图", "阴影图处理"] for i in path_list: path = "{}/{}".format(root_path, i) self.check_path(path) def run_cutout_image( self, all_goods_art_no_folder_data, callback_func=None, cutout_mode=1, windows=None, ): """ 处理所有的抠图 """ callback_func("开始处理抠图~") goods_art_nos = [ goods_art_item["folder_name"] for goods_art_item in all_goods_art_no_folder_data ] total_progress = len(all_goods_art_no_folder_data) finish_progress = 0 error_progress = 0 progress = {"status":"正在处理", "current":finish_progress, "total":total_progress, "error":error_progress} sendAsyncMessage( msg="开始处理抠图", goods_arts=goods_art_nos, status="开始处理",progress=progress ) error_goods_art_no_folder = [] for goods_art_no_folder_data in all_goods_art_no_folder_data: if goods_art_no_folder_data["label"] != "待处理": continue folder_path = goods_art_no_folder_data["folder_path"] self.crate_all_folders(folder_path) # 检查是否存在已抠图文件,如没有再去抠图 images = [x for x in self.list_dir("{}/原始图".format(folder_path))] cutImageList = [] goods_art_floder_name = goods_art_no_folder_data["folder_name"] progress = {"status":"正在处理", "current":finish_progress, "total":total_progress, "error":error_progress, "goods_art_no":goods_art_floder_name } sendAsyncMessage( msg="正在抠图", goods_arts=[goods_art_floder_name], status="处理中", progress=progress ) for pic_file_name in images: if windows: if windows.state != 1: break # 根据名称判断,没有抠图过的,进行统计 file_name, suffix = os.path.splitext(pic_file_name) if suffix in _Type: original_image_path = "{}/原始图/{}".format( folder_path, pic_file_name ) original_move_bg_image_path = "{}/原始图_已抠图/{}{}".format( folder_path, file_name, ".png" ) if not os.path.exists(original_move_bg_image_path): # 没有抠图文件,进行抠图生成 callback_func("正在抠图 货号:{}".format(file_name)) if cutout_mode == "2": cutImageList.append( { "file_name": file_name, # 文件名 "file_e": suffix, # 后缀,.jpg "file_path": original_image_path, # 完整路径 "file": "{}{}".format( file_name, suffix ), # 图片文件名,带后缀 "need_cutout": True, # 必须,需要抠图 "out_path": original_move_bg_image_path, } ) else: remove_pic_ins = RemoveBgALi() if settings.IS_TEST: im = remove_pic_ins.get_image_cut( file_path=original_image_path, out_file_path=original_move_bg_image_path, ) else: try: im = remove_pic_ins.get_image_cut( file_path=original_image_path, out_file_path=original_move_bg_image_path, ) except FunctionTimedOut as f: callback_func( "货号图{} 抠图处理超时~".format(file_name) ) error_goods_art_no_folder.append(folder_path) im = None except BaseException as e: callback_func( "货号图{} 抠图处理失败,原因{}".format( file_name, e ) ) error_goods_art_no_folder.append(folder_path) im = None if not im: callback_func( "货号图{} 抠图处理失败~".format(file_name) ) error_goods_art_no_folder.append(folder_path) continue else: callback_func("货号图{} 抠图完成~".format(file_name)) progress = { "status":"正在处理", "current":finish_progress, "total":total_progress, "error":error_progress, "goods_art_no":goods_art_floder_name } if goods_art_floder_name not in error_goods_art_no_folder: finish_progress+=1 sendAsyncMessage( msg="正在处理", goods_arts=[goods_art_floder_name], status="正在处理", progress=progress ) else: progress["status"] = "处理失败" sendAsyncMessage( msg="处理失败", goods_arts=[goods_art_floder_name], status="处理失败", progress=progress ) if cutout_mode == "2": dealCutout = DealCutout(windows=None, token=self.token) dealCutout.need_cutout_images = cutImageList dealCutout.run() while True: time.sleep(0.5) if dealCutout.state == 3: if len(dealCutout.resultData) != len(cutImageList): error_goods_art_no_folder.append(folder_path) break error_progress = len(error_goods_art_no_folder) progress = { "status":"处理完成", "current":finish_progress, "total":total_progress, "error":error_progress } if error_goods_art_no_folder: print("以下货号抠图失败~\n {}".format(error_goods_art_no_folder)) callback_func("以下货号抠图失败~\n {}".format(error_goods_art_no_folder)) sendAsyncMessage( msg=f"抠图处理失败", goods_arts=error_goods_art_no_folder, status="抠图处理失败", # progress=progress ) else: pass progress["status"] = "处理失败" if error_progress == total_progress else "处理完成" sendAsyncMessage( msg="抠图完成", goods_arts=[], status="抠图完成", progress=progress ) def checkCutoutImage(self, image_dir: str, todo_goods_art_no_folder_name_list=None): """ 进行图片检查,不合规的直接提示 """ error_goods_art_no_folder = [] self.check_path("{}/软件-处理失败".format(image_dir)) for goods_art_no_folder in self.list_dir(image_dir): # 指定内容检查 if todo_goods_art_no_folder_name_list is not None: if goods_art_no_folder not in todo_goods_art_no_folder_name_list: continue if not os.path.isdir("{}/{}".format(image_dir, goods_art_no_folder)): continue if "软件" in goods_art_no_folder: continue if "无法" in goods_art_no_folder: continue if "原始图" not in self.list_dir( "{}/{}".format(image_dir, goods_art_no_folder) ): error_goods_art_no_folder.append(goods_art_no_folder) continue self.check_path( "{}/{}/原始图_已抠图".format(image_dir, goods_art_no_folder) ) self.check_path("{}/{}/800x800".format(image_dir, goods_art_no_folder)) self.check_path("{}/{}/阴影图处理".format(image_dir, goods_art_no_folder)) if error_goods_art_no_folder: self.move_folders( path_list=[ "{}/{}".format(self.image_dir, x) for x in error_goods_art_no_folder ], target_folder="{}/软件-处理失败".format(self.image_dir), ) return False def move_folders(self, path_list, target_folder): for source_folder in path_list: shutil.move(source_folder, target_folder) def rename_folder_for_hqt(self, all_goods_art_no_folder_data): """ 步骤: 规整红蜻蜓的文件名 重新按文件名进行命名 """ goods_art_no_list = [] for goods_art_no_folder_data in all_goods_art_no_folder_data: if "@" not in goods_art_no_folder_data["folder_name"]: goods_art_no_folder_data["label"] = "待处理" goods_art_no_list.append(goods_art_no_folder_data["folder_name"]) else: goods_art_no_folder_data["label"] = "不处理" if goods_art_no_list: # goods_art_no_dict 文件夹与货号的字典 goods_art_no_dict = self.get_data_from_hqt_with_goods_art_no( goods_art_no_list=goods_art_no_list ) for goods_art_no_folder_data in all_goods_art_no_folder_data: if goods_art_no_folder_data["label"] != "待处理": continue goods_art_no_folder = goods_art_no_folder_data["folder_name"] if goods_art_no_folder in goods_art_no_dict: print(goods_art_no_folder) old_folder_path = goods_art_no_folder_data["folder_path"] new_folder_name = "{}@NUM{}".format( goods_art_no_folder, goods_art_no_dict[goods_art_no_folder]["编号"], ) new_folder_path = "{}/{}".format( goods_art_no_folder_data["root_path"], new_folder_name ) try: os.rename(old_folder_path, new_folder_path) goods_art_no_folder_data["folder_path"] = new_folder_path goods_art_no_folder_data["folder_name"] = new_folder_path goods_art_no_folder_data["label"] = "待处理" except BaseException as e: goods_art_no_folder_data["label"] = "不处理" print("521 文件夹重名命失败:{}".format(e)) # 重新规整修改图片名称 for goods_art_no_folder_data in all_goods_art_no_folder_data: if goods_art_no_folder_data["label"] != "待处理": continue goods_art_no_folder = goods_art_no_folder_data["folder_name"] _img_all = self.list_dir( "{}/原始图".format(goods_art_no_folder_data["folder_path"]) ) index = 0 for _file in _img_all: file_name, suffix = os.path.splitext(_file) if suffix in _Type: index += 1 folder_path = goods_art_no_folder_data["folder_path"] new_file_name = "{}({}){}".format( goods_art_no_folder, index, suffix ) new_path = "{}/原始图/{}".format(folder_path, new_file_name) old_path = "{}/原始图/{}".format(folder_path, _file) crop_new_path = "{}/原始图_已抠图/{}".format( folder_path, "{}({}).png".format(goods_art_no_folder, index) ) crop_old_path = "{}/原始图_已抠图/{}".format( folder_path, "{}.png".format(file_name) ) if old_path != new_path: # 存在货号命名错误的,进行修正 try: if os.path.exists(old_path): os.rename(old_path, new_path) if os.path.exists(crop_old_path): os.rename(crop_old_path, crop_new_path) except BaseException as e: goods_art_no_folder_data["label"] = "不处理" print("550 文件夹重名命失败:{}".format(e)) def cutImagePiju( self, image_dir: str, image_order="", is_check_number=True, is_filter=True, resize_image_view="后跟", callback_func=None, event=None, todo_goods_art_no_folder_name_list=None, ): """ 1、遍历文件夹,基于生成的结果图看哪些需要进行抠图等处理 2、压缩并上传平台获取抠图 3、抠图处理成白底图 4、做成800*800/200*200 :return: """ logo_path = "" res = self.getImageOrder( image_order=image_order, resize_image_view=resize_image_view ) if res["code"] != 0: callback_func(res["msg"]) return {"code": 1, "msg": res["msg"]} imageOrderList = res["imageOrderList"] """扫描文档,检查有哪些需要进行抠图等处理""" self.lock = Lock() to_do_images_total = 0 error_goods_art_no_folder = [] for goods_art_no_folder in self.list_dir(image_dir): # 指定内容检查 if todo_goods_art_no_folder_name_list is not None: if goods_art_no_folder not in todo_goods_art_no_folder_name_list: continue if not os.path.isdir("{}/{}".format(image_dir, goods_art_no_folder)): continue self.check_path("{}/{}/原始图".format(image_dir, goods_art_no_folder)) self.check_path( "{}/{}/原始图_已抠图".format(image_dir, goods_art_no_folder) ) self.check_path("{}/{}/800x800".format(image_dir, goods_art_no_folder)) self.check_path("{}/{}/阴影图处理".format(image_dir, goods_art_no_folder)) # 遍历原始图片文件夹 all_original_images = [ x for x in self.list_dir( "{}/{}/原始图".format(image_dir, goods_art_no_folder) ) ] # 检查已抠图文件夹 all_moved_images = [ os.path.splitext(x)[0] for x in self.list_dir( "{}/{}/原始图_已抠图".format(image_dir, goods_art_no_folder) ) ] all_800images = [ os.path.splitext(x)[0] for x in self.list_dir( "{}/{}/800x800".format(image_dir, goods_art_no_folder) ) ] if is_check_number and len(imageOrderList) != len(all_original_images): callback_func( "{} 文件夹下图片数量与订单数量不一致,请检查!".format( goods_art_no_folder ) ) return { "code": 1, "msg": "{} 文件夹下图片数量与订单数量不一致,请检查!".format( goods_art_no_folder ), } all_800images = [] image_num = 0 for pic_file_name in all_original_images: if pic_file_name not in all_800images: # 根据名称判断,没有抠图过的,进行统计 _, e = os.path.splitext(pic_file_name) print(e) if e in _Type: image_num += 1 print("----------》", goods_art_no_folder, pic_file_name) to_do_images_total += 1 # if image_num > 5: # error_goods_art_no_folder.append(goods_art_no_folder) # if error_goods_art_no_folder: # self.show_progress_detail("以下货号图片张数超过5张~\n {}".format(error_goods_art_no_folder)) # self.set_state(state_value=2) # return if to_do_images_total > 0: # self.progress_sign.emit({"type": "处理图片 抠图、加工等", "progress_bar_value": 0}) for goods_art_no_folder in self.list_dir(image_dir): # 指定内容检查 if todo_goods_art_no_folder_name_list is not None: if goods_art_no_folder not in todo_goods_art_no_folder_name_list: continue if not os.path.isdir("{}/{}".format(image_dir, goods_art_no_folder)): continue self.run_one_folder_to_deal( goods_art_no_folder=goods_art_no_folder, image_dir=image_dir, image_order=image_order, resize_image_view=resize_image_view, callback_func=callback_func, logo_path=logo_path, ) else: # self.show_progress_detail("没有需要处理的图片~") callback_func("没有需要处理的图片~") # self.set_state(state_value=2) return {"code": 0, "msg": "ok"} def run_one_folder_to_deal( self, goods_art_no_folder, image_dir, image_order, resize_image_view, callback_func=None, logo_path="", ): _img_all = self.list_dir("{}/{}/原始图".format(image_dir, goods_art_no_folder)) all_original_images = [] # 过滤非图片数据 index = 0 for _file in _img_all: file_name, e = os.path.splitext(_file) if e in _Type: index += 1 new_file_name = "{}({}){}".format(goods_art_no_folder, index, e) new_path = "{}/{}/原始图/{}".format( image_dir, goods_art_no_folder, new_file_name ) old_path = "{}/{}/原始图/{}".format( image_dir, goods_art_no_folder, _file ) if old_path != new_path: # 存在货号命名错误的,进行修正 try: os.rename(old_path, new_path) except: pass all_original_images.append(new_file_name) if os.path.exists( "{}/{}/原始图/镜像.txt".format(image_dir, goods_art_no_folder) ): file_mirror_mark = True else: file_mirror_mark = None # if goods_art_no_folder == "AC51016112": # print(file_mirror_mark) # raise 111 all_moved_images = [ os.path.splitext(x)[0] for x in self.list_dir( "{}/{}/原始图_已抠图".format(image_dir, goods_art_no_folder) ) ] all_800images = [ os.path.splitext(x)[0] for x in self.list_dir( "{}/{}/800x800".format(image_dir, goods_art_no_folder) ) ] all_800images = [] # 检查哪些图片没有做过抠图处理 i_n = 0 _name_list = [ "视角{}".format(x) for x in range(1, len(all_original_images) + 1) ] # if goods_art_no_folder == "AC51028001": # _name_list = ["正视", "45度", "侧视", "后视", "底视", "其他1", "其他2", "其他3"] image_index = 0 # 图片顺序 is_image_deal_mode = 0 max_box = None for file in all_original_images: i_n += 1 image_index += 1 original_image_path = "{}/{}/原始图/{}".format( image_dir, goods_art_no_folder, file ) file_name = os.path.splitext(file)[0] """ 当第三张就是为后跟 """ if file_name not in all_800images: # 所有都重新生成 # if goods_art_no_folder != "AC51016112": # continue goods_art_no_folder_path = "{}/{}".format( image_dir, goods_art_no_folder ) print("正在处理,货号:{}".format(goods_art_no_folder_path)) # self.show_progress_detail("正在处理,货号:{}".format(file_name)) callback_func("正在处理,货号:{}".format(file_name)) # 该文件在800images下没有时,则进行生成新的抠图 # 检查是否存在已抠图文件,如没有再去抠图 original_move_bg_image_path = "{}/原始图_已抠图/{}{}".format( goods_art_no_folder_path, file_name, ".png" ) image_deal_mode = 0 # 默认图片不做镜像处理 if not os.path.exists(original_move_bg_image_path): # 没有抠图文件,进行抠图生成 # self.show_progress_detail("正在抠图 货号:{}".format(file_name)) callback_func("正在抠图 货号:{}".format(file_name)) remove_pic_ins = RemoveBgALi() im = remove_pic_ins.get_image_cut( file_path=original_image_path, out_file_path=original_move_bg_image_path, ) if not im: # self.show_progress_detail("货号图{} 抠图处理失败~".format(file_name)) callback_func("货号图{} 抠图处理失败~".format(file_name)) continue if image_index == 1: is_image_deal_mode = 0 if settings.Mode == "鞋类": goods_class = "鞋" # 如果图片已存在,则需要通过加载图片判断是否为左右脚 if OnePicDeal().check_shoe_is_right( image_path=original_move_bg_image_path ): image_deal_mode = 1 # 1表示要镜像,0表示不做镜像 is_image_deal_mode = 1 if settings.Mode == "皮具": # 图片对应的商品类型 # goods_class = self.get_goods_class(goods_art_no_folder, original_image_path) max_box = (1000, 1200) # _ = {"AC51028001": "女包", # "AC51028002": "男包", # "AC51028003": "皮带", # "AC51028004": "女包"} # if goods_class in _: # goods_class = _[goods_class] # else: # goods_class = "女包" # # _ = {"女包": (1000, 1200), # "男包": (1000, 1200), # "皮带": (1000, 1000), } # max_box = _[goods_class] # 获取图片信息非必要程序,用于处理图片模式 date_time_original = self.get_date_time_original( original_image_path ) # 获取照片拍照时间 if date_time_original: # 基于照片的时间,与数据库匹配goods_art_no self.lock.acquire() _data = self.dataModeMatchPhoto.get_goods_art_no(date_time_original) self.lock.release() if _data: # 能匹配上数据库 goods_art_no, _image_index, _image_deal_mode = _data if _image_index < 10: image_index = _image_index if _image_deal_mode == 1: image_deal_mode = 1 # print(goods_art_no, image_index, image_deal_mode) if file_mirror_mark: image_deal_mode = 1 """进行800image 生成""" generate_pic = GeneratePic() out_pci_mode = "." + settings.getSysConfigs( "basic_configs", "image_out_format", "png" ) if out_pci_mode == ".jpg": out_path = "{}/800x800/{}{}".format( goods_art_no_folder_path, file_name, ".jpg" ) elif out_pci_mode == ".png": out_path = "{}/800x800/{}{}".format( goods_art_no_folder_path, file_name, ".png" ) else: out_path = "{}/800x800/{}{}".format( goods_art_no_folder_path, file_name, out_pci_mode ) out_process_path_1 = "{}/阴影图处理/{}_{}_阴影{}".format( goods_art_no_folder_path, file_name, _name_list[i_n - 1], ".png" ) out_process_path_2 = "{}/阴影图处理/{}_{}_抠图{}".format( goods_art_no_folder_path, file_name, _name_list[i_n - 1], ".png" ) print("image_index", image_index) image_index = 99 curve_mask = True if "俯视" in image_order["image_view"] else False out_pic_size = ( [1600] if settings.getSysConfigs( "basic_configs", "main_image_size", [1600] ) == "" else settings.getSysConfigs( "basic_configs", "main_image_size", [1600] ) ) # 主图大小 if generate_pic.run( image_path=original_image_path, cut_image_path=original_move_bg_image_path, out_path=out_path, image_deal_mode=is_image_deal_mode, image_index=image_index, out_pic_size=out_pic_size, is_logo=True if i_n == 1 else False, out_process_path_1=out_process_path_1, out_process_path_2=out_process_path_2, max_box=max_box, logo_path=logo_path, curve_mask=curve_mask, ): # self.show_progress_detail("货号图{} _{} 已完成800*800图片制作~".format(image_index, file_name)) callback_func( "货号图{} _{} 已完成800*800图片制作~".format( image_index, file_name ) ) else: # self.show_progress_detail("货号图{} _{} 图片生成处理失败~".format(image_index, file_name)) callback_func( "货号图{} _{} 图片生成处理失败~".format(image_index, file_name) ) # 完成处理的图片进度 self.lock.acquire() # self.set_progress() self.lock.release() def get_goods_art_no(self, date_time_original): time_array = time.strptime(date_time_original, "%Y:%m:%d %H:%M:%S") time_array = time.mktime(time_array) datetime_obj = datetime.fromtimestamp(time_array) session = SqlQuery() configModel = CRUD(DeviceConfig) result = configModel.read( session, conditions={"photo_create_time": datetime_obj}, order_by="id", ascending=True, ) session.close() if result: return result.goods_art_no, result.image_index, result.image_deal_mode else: return None def get_goods_art_no_info( self, numbers_list=None, goods_art_list=None, headers=None ): # 获取商品基础信息,入参为商品的编号 url = "{domain}/api/backend/goods_client/goods_query".format( domain=settings.APP_HOST ) data = {"goods_art_list": goods_art_list} data = json.dumps(data) _s = requests.session().post(url=url, data=data, headers=headers) response_data = _s.json() goods_number_data = {} # ["", "", "", "", "", "", "", "", "", "", "", ] if "data" not in response_data: return {} for data in response_data["data"]: goods_number_data[data["goods_art_no"]] = {} goods_number_data[data["goods_art_no"]]["商品货号"] = data[ "goods_art_no" ].upper() goods_number_data[data["goods_art_no"]]["款号"] = data[ "goods_number" ].upper() goods_number_data[data["goods_art_no"]]["商品面料"] = data["fabric"] goods_number_data[data["goods_art_no"]]["商品内里"] = data["lining"] goods_number_data[data["goods_art_no"]]["商品鞋底"] = data["sole"] goods_number_data[data["goods_art_no"]]["鞋垫"] = data["insole"] goods_number_data[data["goods_art_no"]]["颜色名称"] = data["color"] return goods_number_data def get_data_from_hqt_with_goods_art_no(self, goods_art_no_list): _goods_art_no_list = copy.deepcopy(goods_art_no_list) _list = [] # 单次请求数少于20个 goods_art_no_dict = {} while _goods_art_no_list: goods_art_no = _goods_art_no_list.pop() _list.append(goods_art_no) if len(_list) == 20 or len(_goods_art_no_list) == 0: online_goods_art_data = self.get_goods_art_no_info(goods_art_list=_list) if online_goods_art_data: for _goods_art_no in online_goods_art_data: goods_art_no_dict[_goods_art_no] = online_goods_art_data[ _goods_art_no ] _list = [] return goods_art_no_dict def get_goods_art_no_info( self, numbers_list=None, goods_art_list=None, headers=None ): # 获取商品基础信息,入参为商品的编号 url = "{domain}/api/backend/goods_client/goods_query".format( domain=settings.APP_HOST ) data = {"goods_art_list": goods_art_list} data = json.dumps(data) _s = requests.session().post(url=url, data=data, headers=headers) # _s = self.s.get(url=url, params=params, headers=settings.Headers) response_data = _s.json() goods_number_data = {} # ["", "", "", "", "", "", "", "", "", "", "", ] if "data" not in response_data: return {} for data in response_data["data"]: goods_number_data[data["goods_art_no"]] = {} goods_number_data[data["goods_art_no"]]["商品货号"] = data[ "goods_art_no" ].upper() goods_number_data[data["goods_art_no"]]["款号"] = data[ "goods_number" ].upper() goods_number_data[data["goods_art_no"]]["商品面料"] = data["fabric"] goods_number_data[data["goods_art_no"]]["商品内里"] = data["lining"] goods_number_data[data["goods_art_no"]]["商品鞋底"] = data["sole"] goods_number_data[data["goods_art_no"]]["鞋垫"] = data["insole"] goods_number_data[data["goods_art_no"]]["颜色名称"] = data["color"] return goods_number_data def get_data_from_hqt(self, goods_number_list): _goods_number_list = copy.deepcopy(goods_number_list) _list = [] # 单次请求数少于20个 goods_number_dict = {} while _goods_number_list: goods_art_no = _goods_number_list.pop() if "NUM" in goods_art_no: goods_art_no = goods_art_no.replace("NUM", "") _list.append(goods_art_no) if len(_list) == 20 or len(_goods_number_list) == 0: online_goods_art_data = self.get_goods_art_no_info(numbers_list=_list) if online_goods_art_data: for number in online_goods_art_data: goods_number_dict["NUM" + number] = online_goods_art_data[ number ] _list = [] return goods_number_dict def dealMoveImage(self, image_dir: str, callback_func=None) -> dict: if not self.check_path(image_dir=image_dir + "/历史"): return {"code": 1, "msg": "文件夹创建失败", "data": {}} # 遍历目标文件夹,获取有拍摄信息的图片,并按拍摄时间排序 files = self.list_dir(image_dir) original_photo_list = [] # 原始图片列表 for file in files: # -----图片清洗 file_path = image_dir + "/" + file if os.path.isdir(file_path): # 忽略文件夹 continue file_name, suffix = os.path.splitext(file) if suffix not in _Type: # 非图片进行移除 shutil.move(file_path, image_dir + "/历史/" + file) continue date_time_original = self.get_date_time_original( file_path ) # 获取照片拍照时间 if date_time_original: # 基于照片的时间,与数据库匹配goods_art_no _data = self.get_goods_art_no(date_time_original) if _data: # 能匹配上数据库 goods_art_no, image_index, image_deal_mode = _data print( "832 与数据库匹配goods_art_no", file_name, date_time_original, goods_art_no, ) original_photo_list.append( { "file_path": file_path, "file": file, "date_time_original": date_time_original, "goods_art_no": goods_art_no, "image_index": image_index, "real_goods_art_no": "", "real_goods_number": "", } ) else: # 匹配不上报错 # self.show_progress_detail("图片:{} 无法对应货号,不做处理".format(file)) if callback_func: callback_func("图片:{} 无对应货号".format(file)) # shutil.move(photo_dict["file_path"], self.image_dir + "/历史/" + photo_dict["file"]) continue else: shutil.move(file_path, image_dir + "/历史/" + file) if not original_photo_list: return {"code": 1, "msg": "没有任何匹配的图片", "data": {}} if settings.PROJECT == "红蜻蜓": # 批量请求货号图信息 goods_art_no_list = [x["goods_art_no"] for x in original_photo_list] goods_art_no_list = list(set(goods_art_no_list)) goods_art_no_list = [x for x in goods_art_no_list if "NUM" not in x] if goods_art_no_list: goods_art_no_dict = self.get_data_from_hqt_with_goods_art_no( goods_art_no_list=goods_art_no_list ) for i in original_photo_list: if i["goods_art_no"] in goods_art_no_dict: i["real_goods_art_no"] = i["goods_art_no"] i["real_goods_number"] = "NUM{}".format( goods_art_no_dict[i["goods_art_no"]]["编号"] ) # 批量请求编号对应信息 goods_number_list = [x["goods_art_no"] for x in original_photo_list] goods_number_list = list(set(goods_number_list)) goods_number_list = [x for x in goods_number_list if "NUM" in x] if goods_number_list: goods_number_dict = self.get_data_from_hqt( goods_number_list=goods_number_list ) for i in original_photo_list: if i["goods_art_no"] in goods_number_dict: i["real_goods_number"] = i["goods_art_no"] i["real_goods_art_no"] = goods_number_dict[i["goods_art_no"]][ "商品货号" ] # 排序需要基于拍照的文件序号进行处理 original_photo_list.sort( key=lambda x: "{}-{}-{}".format( x["goods_art_no"], x["image_index"], x["file"] ) ) # print(original_photo_list) # 对有拍摄信息的图片进行数据库比对,如有比对上,则移动至货号文件夹,否则移入历史文件夹 total_num = len(original_photo_list) # 当天日期作为文件夹 seconds = time.time() output_path = "output/{f_name}".format( f_name=time.strftime("%Y-%m-%d", time.localtime(seconds)) ) # 遍历每个匹配好的数据进行处理 n = 0 for photo_dict in original_photo_list: n += 1 # 进度条 goods_art_no = photo_dict["goods_art_no"] original_image_path = photo_dict["file_path"] # 输出货号文件夹 if photo_dict["real_goods_art_no"]: goods_art_no = "{}@{}".format( photo_dict["real_goods_art_no"], photo_dict["real_goods_number"] ) goods_art_no_path = "{output_path}/{goods_art_no}".format( output_path=output_path, goods_art_no=goods_art_no ) # 创建货号下的一系列文件夹 self.create_folder(goods_art_no_path) # 重命名并进行移动 print( "开始移动:{} {} 命名为:{}".format( goods_art_no, original_image_path, goods_art_no_path ) ) self.move_images( goods_art_no, goods_art_no_path, original_image_path ) # 货号、货号文件路径、原始图路径 time.sleep(0.2) # self.progress_sign.emit({"type": "移动原始图片", "progress_bar_value": int(n / total_num * 100)}) # self.show_progress_detail("货号{} 相关文件夹创建完成,已移动原图~".format(goods_art_no)) if callback_func: callback_func( "货号{} 相关文件夹创建完成,已移动原图~".format(goods_art_no) ) print("已完成移动处理") if n != 0: # if settings.MattingPics: # # 检查所有未处理的货号文件夹,查看是否有完成图片加工处理 # self.deal_images() # 自动生成一个货号表 print("output_path", output_path) GenerateGoodsArtNoTable.deal(output_path) # 完成处理 # self.set_state(state_value=2) return {"code": 0, "msg": "处理完成", "target_path": output_path, "data": {}} def check_path(self, image_dir: str): if not os.path.exists(image_dir): os.mkdir(image_dir) return True def get_date_time_original(self, file_path): with open(file_path, "rb") as file_data: tags = exifread.process_file(file_data) if "EXIF DateTimeOriginal" in tags: return str(tags["EXIF DateTimeOriginal"]) else: return False def create_folder(self, path): def check_folder(__path): if not os.path.exists(__path): os.makedirs(__path) return False return True # 文件夹不存在,创建货号子集文件夹 if not check_folder(path): for name in ["原始图", "原始图_已抠图", "800x800", "200images"]: other_path = path + "/" + name check_folder(other_path) def move_images(self, goods_art_no, goods_art_no_path, old_image_path): """ 步骤: 1、移动到原始图 Args: goods_art_no: goods_art_no_path: old_image_path: Returns: """ # 移动到原始图 file = os.path.split(old_image_path)[1] # 扩展名 e = os.path.splitext(file)[1] # 获取图片序列 self.goods_images_count_dict[goods_art_no] += 1 # A9999(1).jpg new_file_name = "{}({})".format( goods_art_no, self.goods_images_count_dict[goods_art_no] ) original_image_path = "{}/原始图/{}{}".format( goods_art_no_path, new_file_name, e ) # 移动图片 shutil.move(old_image_path, original_image_path) def pixianRemoveImageBg( self, file_path: str, out_file_path: str, callbackek_func=None ): url = self.dataModeMatchPhoto.get_online_data.uploadImage(local_path=file_path) remonveUrl = settings.DOMAIN + "/api/ai_image/main/remove_background" param = {"base_image": url} post_headers = { "Authorization": self.token, "Content-Length": "", "Content-Type": "application/json", "Accept": "application/json", } result = requests.post( remonveUrl, data=json.dumps(param), headers=post_headers ).json() print(result) if "code" in result and result["code"] == 0: response = requests.get(result["data"]["image"][0]) with open(out_file_path, "wb") as file: file.write(response.content) return result["data"]["image"][0] else: callbackek_func("精细化抠图处理失败 {}".format(result["message"])) return "" def list_dir(self, path): listdir = os.listdir(path) return natsorted(listdir, alg=ns.PATH)