| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515 |
- import copy
- import os.path
- from .other.module_online_data import GetOnlineData
- import time
- from .other.log import MyLogger
- import os
- from .remove_bg_pixian import RemoveBgPiXian
- from PIL import Image
- from .other.pic import Picture
- from .other.remove_bg_ali import RemoveBgALi
- import cv2
- import numpy as np
- from middleware import UnicornException
- class Base(object):
- def __init__(self, image_data, lock, windows, num,token):
- self.lock = lock
- self.image_data = image_data
- self.num = num
- self.windows = windows
- self.get_online_data = GetOnlineData(token)
- self.file_path = image_data["file_path"]
- self.file_name = image_data["file_name"]
- self.file = os.path.split(self.file_path)[1]
- self.is_once_data = {}
- self.logger = MyLogger().logger
- def add_log(self, text, _type="info"):
- self.logger.info(
- "第{}个,图片名称:{},内容:{}".format(self.num, self.file, text)
- )
- def show_image_info(self, data):
- data["file_path"] = self.file_path
- with self.lock:
- data = {
- "_type": "show_image_item_info",
- "data": data,
- }
- # self.windows.signal_data.emit(data)
- def send_info(
- self,
- text="",
- is_success=None,
- _type="show_text_browser",
- need_point_return=False,
- ):
- with self.lock:
- if is_success is not None:
- if is_success:
- processing_failed = 0
- processing_successfully = 1
- else:
- processing_failed = 1
- processing_successfully = 0
- # 分数返回
- if need_point_return:
- # 分数返回
- if self.is_once("add_point"):
- print(
- "第{}个,图片名称:{},内容:{}".format(
- self.num, self.file, "扣分返回"
- )
- )
- self.dispose_point(_type="add")
- self.refresh_times(cumulative_frequency_times_change=-1)
- pass
- # self.windows.signal_data.emit({"_type": "schedule",
- # "data": {"processing_failed": processing_failed,
- # "processing_successfully": processing_successfully,
- # }})
- if text:
- data = {
- "_type": "show_text_browser",
- "data": text,
- }
- # self.windows.signal_data.emit(data)
- def check_path(self, _path):
- if not os.path.exists(_path):
- os.mkdir(_path)
- return True
- def is_once(self, key):
- if key not in self.is_once_data:
- self.is_once_data[key] = 0
- return True
- return False
- def refresh_times(self, cumulative_frequency_times_change=0):
- data = {
- "_type": "refresh_times",
- "data": {
- "cumulative_frequency_times_change": cumulative_frequency_times_change
- },
- }
- # self.windows.signal_data.emit(data)
- pass
- def dispose_point(self, _type):
- n = 3
- while n:
- n -= 1
- try:
- _r = self.get_online_data.dispose_point(_type)
- balance = _r["data"]["balance"]
- return True
- except:
- time.sleep(0.5)
- continue
- return False
- class DealOneImage(Base):
- def __init__(self, image_data, lock, windows, num, token):
- super().__init__(image_data, lock, windows, num, token)
- self.r_pixian = RemoveBgPiXian()
- def run(self, image_data, upload_pic_dict):
- self.file_path = image_data["file_path"]
- self.file = os.path.split(self.file_path)[1]
- self.root_path = image_data["root_path"]
- self.file_name = image_data["file_name"]
- cut_status = False
- success_path = ""
- # 直接调用抠图
- # 1、增加获取key,2、key需要加密、3、429报错 重试再来拿一个KEY
- self.add_log("开始处理")
- remaining_times = self.get_online_data.get_cutout_image_times().get("balance")
- print("remaining_times", remaining_times)
- if remaining_times <= 0:
- raise UnicornException("次数不足,处理失败")
- # 检查图片上传是否有结束
- n = 60
- if self.file_path not in upload_pic_dict:
- # 不扣图
- raise UnicornException("处理失败")
- s = time.time()
- image_deal_info = upload_pic_dict[self.file_path]["image_deal_info"]
- original_im = upload_pic_dict[self.file_path]["_im"]
- self.add_log("抠图中")
- # 抠图
- out_root_path = "{}/已扣图".format(self.root_path)
- self.check_path(out_root_path)
- try:
- balance = self.get_online_data.get_cutout_image_times().get("balance")
- self.add_log("查询balance:{}成功".format(balance))
- if balance <= 0:
- self.add_log("次数不足,处理失败")
- raise UnicornException("次数不足,处理失败")
- except:
- self.add_log("查询balance失败")
- raise UnicornException("查询balance失败")
- n = 0
- second_cut_image = self.runPiXian(n, original_im=original_im)
- if second_cut_image is None:
- raise UnicornException("抠图失败")
- try:
- out_path = "{}/{}.png".format(out_root_path, self.file_name)
- if image_deal_info["二次抠图是否缩放"]:
- self.add_log(text="图片尺寸进行还原")
- original_im = image_deal_info["抠图扩边后PIL对象"]
- second_cut_image = self.picture_resize_to_original(
- second_cut_image, original_im
- )
- # 创建空白图片并粘贴回去
- _img_im = Image.new(
- mode="RGBA", size=image_deal_info["原始图片大小"], color=(0, 0, 0, 0)
- )
- _img_im.paste(
- second_cut_image,
- box=(
- image_deal_info["抠图扩边后位置"][0],
- image_deal_info["抠图扩边后位置"][1],
- ),
- )
- if self.windows.output_type == 1:
- background = Image.new("RGBA", _img_im.size, (255, 255, 255, 255))
- _img_im = Image.alpha_composite(background, _img_im)
- cut_status = True
- success_path = out_path
- _img_im.save(out_path)
- except BaseException as e:
- text = "{} 图片处理错误,代码44".format(e)
- self.add_log(text)
- self.send_info(text=text, is_success=False, need_point_return=True)
- cut_status = False
- self.add_log(text="本张耗时:{}".format(time.time() - s))
- self.send_info(text="抠图已完成", is_success=True)
- image_data["status"] = cut_status
- image_data["success_path"] = success_path
- return image_data
- def runPiXian(self, num, original_im):
- """执行pixian抠图"""
- num += 1
- data = self.get_online_data.get_key_secret()
- key = (data["api_info"]["api_key"], data["api_info"]["api_serect"])
- self.add_log("查询key成功")
- if not key:
- raise UnicornException("处理失败,请联系管理员")
- if self.is_once("sub_point"):
- # 调用扣分
- self.refresh_times(cumulative_frequency_times_change=1)
- f = self.dispose_point(_type="sub")
- if not f:
- self.add_log(text="多次获取调用余额扣减失败")
- raise UnicornException("多次获取调用余额扣减失败")
- pixian_cutout_data = self.r_pixian.run_by_image_im(original_im, key)
- if pixian_cutout_data["status_code"] == 200:
- second_cut_image = pixian_cutout_data["im"]
- self.add_log(text="调用抠图完成")
- return second_cut_image
- elif pixian_cutout_data["status_code"] == 402:
- if num >= 2:
- self.cutoutFail(pixian_cutout_data)
- raise UnicornException("多次获取调用余额扣减失败")
- self.add_log(
- text="抠图失败:{},延迟6秒".format(pixian_cutout_data["status_code"])
- )
- time.sleep(6)
- self.runPiXian(num, original_im=original_im)
- elif pixian_cutout_data["status_code"] == 429:
- if num >= 2:
- self.cutoutFail(pixian_cutout_data)
- return None
- self.add_log(
- text="抠图失败:{},延迟10秒".format(pixian_cutout_data["status_code"])
- )
- time.sleep(10)
- self.runPiXian(num, original_im=original_im)
- else:
- _data = {
- "text": "出错/超时",
- "info": "抠图异常",
- }
- self.show_image_info(_data)
- self.get_online_data.dispose_point("add")
- if "message" in pixian_cutout_data:
- text = "抠图异常,code:{},message:{}".format(
- pixian_cutout_data["status_code"], pixian_cutout_data["message"]
- )
- else:
- text = "抠图异常,code:{}".format(pixian_cutout_data["status_code"])
- self.add_log(text)
- return None
- return second_cut_image
- def cutoutFail(self, pixian_cutout_data):
- """处理失败"""
- _data = {
- "text": "出错/超时",
- "info": "多次抠图失败:{}".format(pixian_cutout_data["status_code"]),
- }
- self.show_image_info(_data)
- self.add_log(text="多次抠图失败:{}".format(pixian_cutout_data["status_code"]))
- self.send_info(
- text="处理失败,请联系管理员",
- is_success=False,
- need_point_return=True,
- )
- def picture_resize_to_original(self, _img, original_im):
- """
- Parameters
- ----------
- _img 需要还原的PIL对象
- original_im 原图对象
- Returns
- -------
- """
- # 将抠图结果转成mask
- # 将抠图结果放大到原始图大小
- _img = _img.resize(original_im.size)
- new_big_mask = Image.new("RGB", _img.size, (0, 0, 0))
- white = Image.new("RGB", _img.size, (255, 255, 255))
- new_big_mask.paste(white, mask=_img.split()[3])
- # ---------制作选区缩小的mask
- mask = cv2.cvtColor(
- np.asarray(new_big_mask), cv2.COLOR_BGR2GRAY
- ) # 将PIL 格式转换为 CV对象
- mask[mask != 255] = 0
- # 黑白反转
- # mask = 255 - mask
- # 选区缩小10
- kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (10, 10))
- erode_im = cv2.morphologyEx(mask, cv2.MORPH_ERODE, kernel)
- # -------再进行抠图处理
- mask = Image.fromarray(
- cv2.cvtColor(erode_im, cv2.COLOR_GRAY2RGBA)
- ) # CV 对象转 PIL
- transparent_im = Image.new("RGBA", original_im.size, (0, 0, 0, 0))
- transparent_im.paste(original_im, (0, 0), mask.convert("L"))
- # 上述抠图结果进行拼接
- _img.paste(transparent_im, (0, 0), transparent_im)
- return _img
- class DealOneImageBeforehand(Base):
- def __init__(self, image_data, lock, windows, num, token):
- super().__init__(image_data, lock, windows, num, token)
- self.r_ali = RemoveBgALi()
- def run(self, upload_pic_dict):
- # 增加阿里调用报错的重试机制
- image_deal_info = {}
- try:
- cut_image, image_deal_info = self.get_image_cut()
- except BaseException as e:
- raise UnicornException("上传出错")
- upload_pic_dict[self.file_path] = {
- "image_deal_info": image_deal_info,
- "_im": cut_image,
- }
- return upload_pic_dict
- def get_image_cut(self):
- original_pic = Picture(self.file_path)
- original_pic.im = self.get_image_orientation(original_pic.im)
- original_pic.x, original_pic.y = original_pic.im.size
- original_pic.im = original_pic.im.convert("RGB")
- image_deal_info = {}
- image_deal_info["原始图片大小"] = (original_pic.x, original_pic.y)
- if original_pic.x * original_pic.y < 1000000:
- cut_image = original_pic.im
- image_deal_info["抠图扩边后图片大小"] = cut_image.size
- image_deal_info["二次抠图是否缩放"] = False
- image_deal_info["抠图扩边后位置"] = (0, 0, original_pic.x, original_pic.y)
- else:
- self.add_log("开始预抠图处理")
- cut_image = self.r_ali.get_image_cut(
- file_path=None, out_file_path=None, original_im=original_pic.im
- )
- self.add_log("预抠图处理结束")
- x1, y1, x2, y2 = cut_image.getbbox()
- image_deal_info["鞋子原始位置"] = (x1, y1, x2, y2)
- o_w, o_h = cut_image.size
- image_deal_info["鞋子原始抠图后大小"] = (o_w, o_h)
- # 扩边处理
- _w, _h = x2 - x1, y2 - y1
- out_px = 0.025
- _w, _h = int(out_px * _w), int(out_px * _h)
- n_x1, n_y1, n_x2, n_y2 = x1 - _w, y1 - _h, x2 + _w, y2 + _h
- if n_x1 < 0:
- n_x1 = 0
- if n_y1 < 0:
- n_y1 = 0
- if n_x2 > o_w:
- n_x2 = o_w
- if n_y2 > o_h:
- n_y2 = o_h
- image_deal_info["抠图扩边后位置"] = (n_x1, n_y1, n_x2, n_y2)
- cut_image = original_pic.im.crop(image_deal_info["抠图扩边后位置"])
- image_deal_info["抠图扩边后图片大小"] = cut_image.size
- x, y = image_deal_info["抠图扩边后图片大小"]
- # max_size = 32000000
- max_size = 12000000
- if x * y > max_size:
- r = max_size / (x * y)
- size = (int(x * r), int(y * r))
- self.add_log(
- text="图片进行压缩,压缩前:{},压缩后:{}".format(
- image_deal_info["抠图扩边后图片大小"], size
- )
- )
- image_deal_info["抠图扩边后PIL对象"] = copy.deepcopy(cut_image)
- cut_image = cut_image.resize(size=size)
- # print(cut_image.size)
- # print(image_deal_info["抠图扩边后PIL对象"].size)
- image_deal_info["二次抠图是否缩放"] = True
- else:
- image_deal_info["二次抠图是否缩放"] = False
- return cut_image, image_deal_info
- def get_image_cut_noraml(self, image_data):
- """普通模式抠图"""
- self.file_path = image_data["file_path"]
- self.file = os.path.split(self.file_path)[1]
- self.root_path = image_data["root_path"]
- self.file_name = image_data["file_name"]
- original_pic = Picture(self.file_path)
- original_pic.im = self.get_image_orientation(original_pic.im)
- original_pic.x, original_pic.y = original_pic.im.size
- original_pic.im = original_pic.im.convert("RGB")
- image_deal_info = {}
- image_deal_info["原始图片大小"] = (original_pic.x, original_pic.y)
- self.add_log("开始预抠图处理")
- remaining_times = self.get_online_data.get_cutout_image_times().get("balance")
- print("remaining_times", remaining_times)
- if remaining_times <= 0:
- raise UnicornException("次数不足,处理失败")
- self.get_online_data.dispose_point("sub")
- cut_image = self.r_ali.get_image_cut(
- file_path=None, out_file_path=None, original_im=original_pic.im
- )
- success_path = ""
- if cut_image is None:
- cut_status = False
- self.get_online_data.dispose_point("add")
- else:
- self.add_log("预抠图处理结束")
- cut_status = True
- save_root_path = "{}/已扣图".format(self.root_path)
- self.check_path(save_root_path)
- if self.windows.output_type == 1:
- background = Image.new("RGBA", cut_image.size, (255, 255, 255, 255))
- cut_image = Image.alpha_composite(background, cut_image)
- success_path = "{}/{}.png".format(save_root_path, self.file_name)
- cut_image.save(success_path)
- image_data["status"] = cut_status
- image_data["success_path"] = success_path
- return image_data
- def get_image_cut_cloths(self, image_data):
- self.file_path = image_data["file_path"]
- self.file = os.path.split(self.file_path)[1]
- self.root_path = image_data["root_path"]
- self.file_name = image_data["file_name"]
- original_pic = Picture(self.file_path)
- original_pic.im = self.get_image_orientation(original_pic.im)
- original_pic.x, original_pic.y = original_pic.im.size
- original_pic.im = original_pic.im.convert("RGB")
- image_deal_info = {}
- image_deal_info["原始图片大小"] = (original_pic.x, original_pic.y)
- self.add_log("开始预抠图处理")
- remaining_times = self.get_online_data.get_cutout_image_times().get("balance")
- print("remaining_times", remaining_times)
- if remaining_times <= 0:
- raise UnicornException("次数不足,处理失败")
- self.get_online_data.dispose_point("sub")
- cut_images = self.r_ali.get_image_cut_cloths(
- file_path=None, out_file_path=None, original_im=original_pic.im
- )
- success_path = ""
- if cut_images is None or len(cut_images) == 0:
- cut_status = False
- self.get_online_data.dispose_point("add")
- else:
- self.add_log("预抠图处理结束")
- cut_status = True
- save_root_path = "{}/已扣图".format(self.root_path)
- self.check_path(save_root_path)
- save_file_path = "{}/{}".format(save_root_path, self.file_name)
- self.check_path(save_file_path)
- success_path = save_file_path
- for idx, item in enumerate(cut_images):
- cn_name = item.get("cn_name")
- image_obj = item.get("image_obj")
- if self.windows.output_type == 1:
- background = Image.new("RGBA", image_obj.size, (255, 255, 255, 255))
- image_obj = Image.alpha_composite(background, image_obj)
- image_obj.save("{}/{}.png".format(save_file_path, cn_name))
- image_data["status"] = cut_status
- image_data["success_path"] = success_path
- return image_data
- def get_image_orientation(self, img):
- # 获取EXIF数据
- exif = img._getexif()
- if exif is not None:
- # EXIF标签274对应的是Orientation
- orientation = exif.get(0x0112)
- if orientation == 2:
- # 水平翻转
- img = img.transpose(Image.FLIP_LEFT_RIGHT)
- elif orientation == 3:
- # 旋转180度
- img = img.rotate(180, expand=True)
- elif orientation == 4:
- # 垂直翻转
- img = img.transpose(Image.FLIP_TOP_BOTTOM)
- elif orientation == 5:
- # 水平翻转后顺时针旋转90度
- img = img.transpose(Image.FLIP_LEFT_RIGHT).transpose(Image.ROTATE_270)
- elif orientation == 6:
- # 顺时针旋转90度
- img = img.transpose(Image.ROTATE_270)
- elif orientation == 7:
- # 水平翻转后逆时针旋转90度
- img = img.transpose(Image.FLIP_LEFT_RIGHT).transpose(Image.ROTATE_90)
- elif orientation == 8:
- # 逆时针旋转90度
- img = img.transpose(Image.ROTATE_90)
- else:
- print("没有EXIF数据或没有方向信息")
- orientation = 1
- return img
|