| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432 |
- import os
- from natsort import natsorted,ns
- import shutil
- import exifread
- import time
- import datetime
- from databases import PhotoRecord,SqlQuery,CRUD
- import settings
- import copy
- import json
- import requests
- from service.pic_deal import Picture
- import xlsxwriter
- from PIL import Image
- from .base_deal import BaseDealImage
- from middleware import UnicornException
- _Type = ['.png', '.PNG', '.jpg', '.JPG', '.gif', '.GIF', ".jpge", ".JPGE"]
- class DealImage(BaseDealImage):
- def __init__(self, image_dir=None):
- super().__init__()
- self.image_dir = image_dir
- self.header = None
- def check_path(self, image_dir: str):
- if not os.path.exists(image_dir):
- os.mkdir(image_dir)
- return True
- def list_dir(self, path):
- listdir = os.listdir(path)
- return natsorted(listdir, alg=ns.PATH)
- def get_date_time_original(self, file_path):
- with open(file_path, 'rb') as file_data:
- tags = exifread.process_file(file_data)
- if "EXIF DateTimeOriginal" in tags:
- return str(tags["EXIF DateTimeOriginal"])
- else:
- return False
- def get_goods_art_no(self, goods_art_no):
- # time_array = time.strptime(date_time_original, "%Y:%m:%d %H:%M:%S")
- # time_array = time.mktime(time_array)
- # datetime_obj = datetime.datetime.fromtimestamp(time_array)
- session = SqlQuery()
- configModel = CRUD(PhotoRecord)
- result = configModel.read(
- session,
- conditions={"goods_art_no": goods_art_no},
- order_by="id",
- ascending=True,
- )
- if result:
- return result.goods_art_no, result.image_index, result.image_deal_mode
- else:
- return None
- def get_data_from_hqt_with_goods_art_no(self, goods_art_no_list):
- _goods_art_no_list = copy.deepcopy(goods_art_no_list)
- _list = []
- # 单次请求数少于20个
- goods_art_no_dict = {}
- while _goods_art_no_list:
- goods_art_no = _goods_art_no_list.pop()
- _list.append(goods_art_no)
- if len(_list) == 20 or len(_goods_art_no_list) == 0:
- online_goods_art_data = self.get_goods_art_no_info(
- goods_art_list=_list
- )
- if online_goods_art_data:
- for _goods_art_no in online_goods_art_data:
- goods_art_no_dict[_goods_art_no] = online_goods_art_data[
- _goods_art_no
- ]
- _list = []
- return goods_art_no_dict
- def get_goods_art_no_info(self, numbers_list=None, goods_art_list=None):
- # 获取商品基础信息,入参为商品的编号
- url = "{domain}/api/backend/goods_client/goods_query".format(
- domain=settings.DOMAIN
- )
- data = {
- 'goods_art_list': goods_art_list
- }
- data = json.dumps(data)
- _s = requests.session().post(url=url, data=data, headers=self.header)
- response_data = _s.json()
- goods_number_data = {}
- # ["", "", "", "", "", "", "", "", "", "", "", ]
- if "data" not in response_data:
- return {}
- for data in response_data["data"]:
- goods_number_data[data["goods_art_no"]] = {}
- goods_number_data[data["goods_art_no"]]["商品货号"] = data["goods_art_no"].upper()
- goods_number_data[data["goods_art_no"]]["款号"] = data["goods_number"].upper()
- goods_number_data[data["goods_art_no"]]["商品面料"] = data["fabric"]
- goods_number_data[data["goods_art_no"]]["商品内里"] = data["lining"]
- goods_number_data[data["goods_art_no"]]["商品鞋底"] = data["sole"]
- goods_number_data[data["goods_art_no"]]["鞋垫"] = data["insole"]
- goods_number_data[data["goods_art_no"]]["颜色名称"] = data["color"]
- return goods_number_data
- def get_data_from_hqt(self, goods_number_list):
- _goods_number_list = copy.deepcopy(goods_number_list)
- _list = []
- # 单次请求数少于20个
- goods_number_dict = {}
- while _goods_number_list:
- goods_art_no = _goods_number_list.pop()
- if "NUM" in goods_art_no:
- goods_art_no = goods_art_no.replace("NUM", "")
- _list.append(goods_art_no)
- if len(_list) == 20 or len(_goods_number_list) == 0:
- online_goods_art_data = self.get_goods_art_no_info(
- numbers_list=_list
- )
- if online_goods_art_data:
- for number in online_goods_art_data:
- goods_number_dict["NUM" + number] = online_goods_art_data[
- number
- ]
- _list = []
- return goods_number_dict
- def create_folder(self, path):
- def check_folder(__path):
- if not os.path.exists(__path):
- os.makedirs(__path)
- return False
- return True
- # 文件夹不存在,创建货号子集文件夹
- if not check_folder(path):
- for name in ["原始图", "原始图_已抠图", "800x800", "200images"]:
- other_path = path + "/" + name
- check_folder(other_path)
- def move_images(self, goods_art_no, goods_art_no_path, old_image_path):
- """
- 步骤:
- 1、移动到原始图
- Args:
- goods_art_no:
- goods_art_no_path:
- old_image_path:
- Returns:
- """
- # 移动到原始图
- file = os.path.split(old_image_path)[1]
- # 扩展名
- e = os.path.splitext(file)[1]
- print("self.goods_images_count_dict", self.goods_images_count_dict)
- # 获取图片序列
- self.goods_images_count_dict[goods_art_no] += 1
- # A9999(1).jpg
- new_file_name = "{}({})".format(goods_art_no, self.goods_images_count_dict[goods_art_no])
- original_image_path = "{}/原始图/{}{}".format(goods_art_no_path, new_file_name, e)
- # 移动图片
- shutil.move(old_image_path, original_image_path)
- def dealMoveImage(self, image_dir: str, callback_func=None,goods_art_no=None) -> dict:
- if not self.check_path(image_dir=image_dir + "/历史"):
- raise UnicornException("文件夹创建失败")
- # 遍历目标文件夹,获取有拍摄信息的图片,并按拍摄时间排序
- files = self.list_dir(image_dir)
- print("files", files)
- original_photo_list = [] # 原始图片列表
- for file in files:
- # -----图片清洗
- file_path = image_dir + "/" + file
- if os.path.isdir(file_path): # 忽略文件夹
- continue
- file_name, suffix = os.path.splitext(file)
- if suffix not in _Type: # 非图片进行移除
- shutil.move(file_path, image_dir + "/历史/" + file)
- continue
- date_time_original = self.get_date_time_original(file_path) # 获取照片拍照时间
- if date_time_original:
- # 基于照片的时间,与数据库匹配goods_art_no
- _data = self.get_goods_art_no(goods_art_no)
- if _data:
- # 能匹配上数据库
- goods_art_no, image_index, image_deal_mode = _data
- print("832 与数据库匹配goods_art_no", file_name, date_time_original, goods_art_no)
- original_photo_list.append({"file_path": file_path,
- "file": file,
- "date_time_original": date_time_original,
- "goods_art_no": goods_art_no,
- "image_index": image_index,
- "real_goods_art_no": "",
- "real_goods_number": "",
- })
- else:
- # 匹配不上报错
- # self.show_progress_detail("图片:{} 无法对应货号,不做处理".format(file))
- if callback_func:
- callback_func("图片:{} 无对应货号".format(file))
- # shutil.move(photo_dict["file_path"], self.image_dir + "/历史/" + photo_dict["file"])
- continue
- else:
- shutil.move(file_path, image_dir + "/历史/" + file)
- if not original_photo_list:
- raise UnicornException("没有任何匹配的图片")
- return False, "没有任何匹配的图片"
- # 暂不处理红蜻蜓
- # if settings.PROJECT == "红蜻蜓":
- # # 批量请求货号图信息
- # goods_art_no_list = [x["goods_art_no"] for x in original_photo_list]
- # goods_art_no_list = list(set(goods_art_no_list))
- # goods_art_no_list = [x for x in goods_art_no_list if "NUM" not in x]
- # if goods_art_no_list:
- # goods_art_no_dict = self.get_data_from_hqt_with_goods_art_no(
- # goods_art_no_list=goods_art_no_list)
- # for i in original_photo_list:
- # if i["goods_art_no"] in goods_art_no_dict:
- # i["real_goods_art_no"] = i["goods_art_no"]
- # i["real_goods_number"] = "NUM{}".format(goods_art_no_dict[i["goods_art_no"]]["编号"])
- # # 批量请求编号对应信息
- # goods_number_list = [x["goods_art_no"] for x in original_photo_list]
- # goods_number_list = list(set(goods_number_list))
- # goods_number_list = [x for x in goods_number_list if "NUM" in x]
- # if goods_number_list:
- # goods_number_dict = self.get_data_from_hqt(goods_number_list=goods_number_list)
- # for i in original_photo_list:
- # if i["goods_art_no"] in goods_number_dict:
- # i["real_goods_number"] = i["goods_art_no"]
- # i["real_goods_art_no"] = goods_number_dict[i["goods_art_no"]]["商品货号"]
- # 排序需要基于拍照的文件序号进行处理
- original_photo_list.sort(
- key=lambda x: "{}-{}-{}".format(x["goods_art_no"], x["image_index"], x["file"]))
- # print(original_photo_list)
- # 对有拍摄信息的图片进行数据库比对,如有比对上,则移动至货号文件夹,否则移入历史文件夹
- total_num = len(original_photo_list)
- # 当天日期作为文件夹
- seconds = time.time()
- output_path = "output/{f_name}".format(f_name=time.strftime("%Y-%m-%d", time.localtime(seconds)))
- # 遍历每个匹配好的数据进行处理
- n = 0
- for photo_dict in original_photo_list:
- n += 1
- # 进度条
- goods_art_no = photo_dict["goods_art_no"]
- original_image_path = photo_dict["file_path"]
- # 输出货号文件夹
- if photo_dict["real_goods_art_no"]:
- goods_art_no = "{}@{}".format(photo_dict["real_goods_art_no"], photo_dict["real_goods_number"])
- goods_art_no_path = "{output_path}/{goods_art_no}".format(output_path=output_path,
- goods_art_no=goods_art_no)
- # 创建货号下的一系列文件夹
- self.create_folder(goods_art_no_path)
- # 重命名并进行移动
- print("开始移动:{} {} 命名为:{}".format(goods_art_no, original_image_path, goods_art_no_path))
- self.move_images(goods_art_no, goods_art_no_path, original_image_path) # 货号、货号文件路径、原始图路径
- time.sleep(0.2)
- # self.progress_sign.emit({"type": "移动原始图片", "progress_bar_value": int(n / total_num * 100)})
- # self.show_progress_detail("货号{} 相关文件夹创建完成,已移动原图~".format(goods_art_no))
- if callback_func:
- callback_func("货号{} 相关文件夹创建完成,已移动原图~".format(goods_art_no))
- print("已完成移动处理")
- if n != 0:
- # if settings.MattingPics:
- # # 检查所有未处理的货号文件夹,查看是否有完成图片加工处理
- # self.deal_images()
- # 自动生成一个货号表
- print("output_path", output_path)
- self.deal(output_path)
- # 完成处理
- # self.set_state(state_value=2)
- return True, output_path
- def deal(cls, dir_path):
- # print("dir_path", dir_path)
- out_excel_data = []
- for goods_art_no_folder in os.listdir(dir_path): # 遍历货号文件夹集合
- if not os.path.isdir(
- "{}/{}".format(dir_path, goods_art_no_folder)
- ): # 非文件夹进行过滤
- continue
- if "软件" in goods_art_no_folder:
- continue
- # print("goods_art_no_folder", goods_art_no_folder)
- # 如果存在800的主图,则优先进行使用
- big_image_folder_path = "{}/{}/800x800".format(
- dir_path, goods_art_no_folder
- )
- if not os.path.exists(big_image_folder_path):
- os.mkdir(big_image_folder_path)
- all_big_images = os.listdir(big_image_folder_path)
- goods_pic_total = len(all_big_images)
- _Type = [".png", ".PNG", ".jpg", ".JPG", ".gif", ".GIF", ".jpge", ".JPGE"]
- thumb_image_file_path = None
- # print("all_big_images",all_big_images)
- if all_big_images:
- for _file in all_big_images:
- # print(_file)
- file_name, e = os.path.splitext(_file)
- # print(file_name, e)
- if e in _Type:
- thumb_image_file_path = "{}/{}/800x800/{}".format(
- dir_path, goods_art_no_folder, _file
- )
- break
- # 如果不存在主图则进行使用原始图
- if thumb_image_file_path is None:
- _path = "{}/{}/原始图".format(dir_path, goods_art_no_folder)
- if not os.path.exists(_path):
- continue
- all_original_images = os.listdir(_path) # 遍历货号原始图文件夹
- goods_pic_total = len(all_original_images)
- if not all_original_images:
- continue
- image_file = all_original_images[0] # 取第一个货号图
- image_file_path = "{}/{}/原始图/{}".format(
- dir_path, goods_art_no_folder, image_file
- )
- if not os.path.exists(
- "{}/{}/200images".format(dir_path, goods_art_no_folder)
- ):
- os.mkdir("{}/{}/200images".format(dir_path, goods_art_no_folder))
- thumb_image_file_path = "{}/{}/200images/{}".format(
- dir_path, goods_art_no_folder, image_file
- )
- if not os.path.exists(thumb_image_file_path):
- # 开始触发进行压缩生成文件
- shutil.copy(image_file_path, thumb_image_file_path) # 复制文件
- pic = Picture(thumb_image_file_path)
- pic.resize(width=600)
- pic.save_img(thumb_image_file_path)
- # print("thumb_image_file_path", thumb_image_file_path)
- goods_number = ""
- if "@" in goods_art_no_folder:
- _ = goods_art_no_folder.split("@")
- goods_art_no_folder = _[0]
- goods_number = _[1].replace("NUM", "")
- out_excel_data.append(
- [
- goods_number,
- goods_art_no_folder,
- thumb_image_file_path,
- goods_pic_total,
- ]
- )
- if out_excel_data:
- out_excel_path = "{}/货号表-{}.xlsx".format(dir_path, time.time())
- options = {
- "default_format_properties": {
- "align": "left",
- "valign": "vcenter",
- "text_wrap": True,
- }
- }
- book = xlsxwriter.Workbook(filename=out_excel_path, options=options)
- sheet = book.add_worksheet("sheet1")
- # sheet.freeze_panes(1, 2)
- sheet.set_column("B:B", 17)
- sheet.set_column("D:D", 20)
- sheet.write_row("A1", ["编号", "原货号", "新货号", "缩略图", "原始图张数"])
- for index, data in enumerate(out_excel_data):
- # print(data)
- goods_number, goods_art_no, image_file, goods_pic_total = data
- try:
- im = Image.open(image_file)
- im_x, im_y = im.size
- image_width = 100
- image_height = int(im_y * image_width / im_x)
- sheet.set_row(index + 1, 95)
- x_scale = round(
- image_width / im_x, 2
- ) # 固定宽度/要插入的原始图片宽
- y_scale = round(
- image_height / im_y, 2
- ) # 固定高度/要插入的原始图片高
- sheet.insert_image(
- index + 1,
- 3,
- image_file,
- {
- "x_scale": x_scale,
- "y_scale": y_scale,
- "x_offset": 5,
- "y_offset": 5,
- "object_position":1,
- },
- )
- except:
- pass
- sheet.write_row("A{}".format(index + 2), [goods_number])
- sheet.write_row("B{}".format(index + 2), [goods_art_no])
- sheet.write_row("E{}".format(index + 2), [goods_pic_total])
|