import copy import random import time from concurrent.futures import as_completed, ThreadPoolExecutor import threading import os from module.remove_bg_pixian import RemoveBgPiXian from PySide6.QtWidgets import * from PySide6.QtCore import * from module.other.module_online_data import GetOnlineData import hashlib from PIL import Image from module.other.remove_bg_ali import RemoveBgALi from module.other.pic import Picture import io from module.other.log import logger from module.deal_one_image import DealOneImage class DealCutout(QThread): signal_data = Signal(dict) def __init__(self, windows): super().__init__() self.windows = windows self.lock = threading.Lock() self.r_ali = RemoveBgALi() self.r_pixian = RemoveBgPiXian() self.need_cutout_images = {} self.state = 2 # 1进行中 2停止 self.get_online_data = GetOnlineData() self.is_upload_pic_num = 0 self.is_deal_num = 0 # 图片列表 self.upload_pic_dict = {} def run(self): # self.total_num = len([x for x in self.need_cutout_images if x["need_cutout"]]) # 总条数 # self.pending_processing = int(self.total_num) # 待处理 # self.processing_failed = 0 # 处理失败 # self.processing_successfully = 0 # 处理成功 # self.processing_error = 0 # 处理异常 self.get_online_data.refresh_headers() executor = ThreadPoolExecutor(max_workers=4) executor_pic_upload = ThreadPoolExecutor(max_workers=2) tasks = [] tasks_2 = [] self.state = 1 self.is_upload_pic_num = 0 self.is_deal_num = 0 num = 0 for image_data in self.need_cutout_images: if not image_data["need_cutout"]: continue num += 1 task = executor.submit(DealOneImage(image_data=image_data, lock=self.lock, windows=self, num=num).run) tasks.append(task) task_2 = executor_pic_upload.submit(self.upload_pics, image_data=image_data) tasks_2.append(task_2) def send_info(self, text="", is_success=None, _type="show_text_browser"): with self.lock: if is_success is not None: if is_success: processing_failed = 0 processing_successfully = 1 else: processing_failed = 1 processing_successfully = 0 self.signal_data.emit({"_type": "schedule", "data": {"processing_failed": processing_failed, "processing_successfully": processing_successfully, }}) if text: data = {"_type": "show_text_browser", "data": text, } self.signal_data.emit(data) def show_image_info(self, data): with self.lock: data = {"_type": "show_image_item_info", "data": data, } self.signal_data.emit(data) def add_log(self, text, _type="info"): with self.lock: logger.info(text) def run_with_thread222(self, image_data): with self.lock: self.is_deal_num += 1 file_path = image_data["file_path"] self.add_log("开始处理第{}个,图片地址:{}".format(self.is_deal_num, file_path)) self.send_info("\n开始处理:{}".format(file_path[-20:] if len(file_path) > 20 else file_path)) _data = {"text": "处理中", "info": "", "file_path": file_path } self.show_image_info(_data) if self.windows.remaining_times <= 0: self.send_info(text="次数不足,处理失败", is_success=False) return # 检查图片上传是否有结束 n = 90 while 1: if self.state != 1: self.send_info(text="{} 用户主动终止".format(image_data["file_name"]), is_success=False) return time.sleep(1) n -= 1 # print(n) if file_path in self.upload_pic_dict: break else: if n <= 0: text = "{} 图片上传处理超时".format(image_data["file_name"]) self.send_info(text=text, is_success=False) logger.warning(text) _data = {"text": "出错/超时", "info": "处理超时", "file_path": file_path } self.show_image_info(_data) return continue s = time.time() # print(self.upload_pic_dict[file_path]) _flag = self.upload_pic_dict[file_path]["flag"] if not _flag: self.add_log("第{}个 未查到上传的图片地址".format(self.is_deal_num)) _data = {"text": "出错/超时", "info": "上传错误", "file_path": file_path } self.show_image_info(_data) text = "{} 上传错误".format(image_data["file_name"]) self.send_info(text=text, is_success=False) return image_url = self.upload_pic_dict[file_path]["url"] image_deal_info = self.upload_pic_dict[file_path]["image_deal_info"] _data = {"text": "开始抠图", "info": "", "file_path": file_path } self.show_image_info(_data) self.add_log("第{}个 开始抠图".format(self.is_deal_num)) with self.lock: self.is_upload_pic_num -= 1 # todo 抠图 out_root_path = "{}/已扣图".format(image_data["root_path"]) self.check_path(out_root_path) print(image_url) try: generate_ids, remaining_times = self.get_online_data.remove_background(images_url=[image_url]) self.add_log("第{}个 上报抠图任务成功,任务id:{}".format(self.is_deal_num, generate_ids)) except BaseException as e: print(e) text = "{} 提交抠图请求错误".format(image_data["file_name"]) print(text) self.add_log("第{}个 提交抠图请求错误".format(self.is_deal_num)) self.send_info(text=text, is_success=False) _data = {"text": "出错/超时", "info": text, "file_path": file_path } self.show_image_info(_data) return with self.lock: self.signal_data.emit({"_type": "refresh_times", "data": {"remaining_times": remaining_times, "is_online": False }}) # 查询抠图进度 time.sleep(2) generate_ids = [str(x) for x in generate_ids] n = 60 * 5 with self.lock: print(generate_ids) print(file_path, "查询抠图进度") self.add_log("第{}个 查询抠图进度,id:{}".format(self.is_deal_num, generate_ids)) while 1: try: data = self.get_online_data.search_progress(generate_ids=generate_ids) pixian_status = data[0]["status"] except BaseException as e: print(e) text = "{} 查询抠图进度错误".format(image_data["file_name"]) self.add_log("第{}个 查询抠图进度错误,id:{}".format(self.is_deal_num, generate_ids)) print(text) logger.error("{};{}".format(text, e)) self.send_info(text=text, is_success=False) _data = {"text": "出错/超时", "info": text, "file_path": file_path } self.show_image_info(_data) return # print(data) time.sleep(1) n -= 1 if self.state != 1: self.send_info(text="{} 用户主动终止".format(image_data["file_name"]), is_success=False) return if n <= 0: result = False text = "{} 抠图300秒超时".format(image_data["file_name"]) _data = {"text": "已完成", "info": text, "file_path": file_path } self.show_image_info(_data) self.send_info(text=text, is_success=False) _data = {"text": "出错/超时", "info": text, "file_path": file_path } self.show_image_info(_data) return if pixian_status == -1: text = "服务端抠图出错" self.send_info(text=text, is_success=False) _data = {"text": "出错/超时", "info": text, "file_path": file_path } self.show_image_info(_data) return if pixian_status == 2: result_image_url = data[0]["result_image_urls"][0] result = True # print(result_image_url) try: second_cut_image = self.get_online_data.download_picture(url=result_image_url, out_path=None) except BaseException as e: print(e) text = "{} 图片数据流获取错误".format(image_data["file_name"]) print(text) logger.error("{};{}".format(text, e)) self.send_info(text=text, is_success=False) _data = {"text": "出错/超时", "info": text, "file_path": file_path } self.show_image_info(_data) return break if result: # 拼接处理 # print("耗时1:", time.time() - s) try: out_path = "{}/{}.png".format(out_root_path, image_data["file_name"]) if image_deal_info["二次抠图是否缩放"]: print("图片尺寸还原") second_cut_image = second_cut_image.resize(image_deal_info["抠图扩边后图片大小"]) # 创建空白图片并粘贴回去 _img_im = Image.new(mode="RGBA", size=image_deal_info["原始图片大小"], color=(0, 0, 0, 0)) _img_im.paste(second_cut_image, box=(image_deal_info["抠图扩边后位置"][0], image_deal_info["抠图扩边后位置"][1])) _img_im.save(out_path) except BaseException as e: print(e) text = "{} 图片处理错误,代码44".format(image_data["file_name"]) print(text) logger.error("{};{}".format(text, e)) self.send_info(text=text, is_success=False) _data = {"text": "出错/超时", "info": text, "file_path": file_path } self.show_image_info(_data) return with self.lock: print("耗时2:", time.time() - s) self.add_log("第{}个 抠图已完成,id:{},耗时:{}".format(self.is_deal_num, generate_ids, time.time() - s)) self.send_info(text="{} 抠图已完成".format(image_data["file_name"]), is_success=True) _data = {"text": "已完成", "info": "", "file_path": file_path } self.show_image_info(_data) else: self.send_info(text="{} 处理失败,请联系管理员".format(image_data["file_name"]), is_success=False) _data = {"text": "出错/超时", "info": "处理失败,请联系管理员", "file_path": file_path } self.show_image_info(_data) def get_keys(self): # 执行顺序 # 获取key、获取im对象、调用抠图、返回抠图结果 n = 3 while 1: if self.state != 1: return None key = self.get_online_data.get_keys() n -= 1 if n <= 0: return None if not key: time.sleep(10) continue else: return key def run_with_thread(self, image_data): # 直接调用抠图 # todo 1、增加获取key,2、key需要加密、3、429报错 重试再来拿一个KEY with self.lock: self.is_deal_num += 1 is_deal_num = copy.copy(self.is_deal_num) file_path = image_data["file_path"] self.add_log("开始处理第{}个,图片地址:{}".format(is_deal_num, file_path)) _data = {"text": "处理中", "info": "", "file_path": file_path } self.show_image_info(_data) if self.windows.remaining_times <= 0: self.send_info(text="次数不足,处理失败", is_success=False) return # 检查图片上传是否有结束 n = 60 while 1: if self.state != 1: self.send_info(text="{} 用户主动终止".format(image_data["file_name"]), is_success=False) return n -= 1 # print(n) if file_path in self.upload_pic_dict: break else: time.sleep(1) if n <= 0: text = "{} 处理超时".format(image_data["file_name"]) self.send_info(text=text, is_success=False) logger.warning(text) _data = {"text": "出错/超时", "info": "处理超时", "file_path": file_path } self.show_image_info(_data) return continue s = time.time() # print(self.upload_pic_dict[file_path]) _flag = self.upload_pic_dict[file_path]["flag"] if not _flag: self.add_log("第{}个 未查到上传的图片地址".format(self.is_deal_num)) _data = {"text": "出错/超时", "info": "上传错误", "file_path": file_path } self.show_image_info(_data) text = "{} 上传错误".format(image_data["file_name"]) self.send_info(text=text, is_success=False) return # image_url = self.upload_pic_dict[file_path]["url"] image_deal_info = self.upload_pic_dict[file_path]["image_deal_info"] _im = self.upload_pic_dict[file_path]["_im"] _data = {"text": "开始抠图", "info": "", "file_path": file_path } self.show_image_info(_data) self.add_log("第{}个 开始抠图".format(self.is_deal_num)) with self.lock: self.is_upload_pic_num -= 1 # todo 抠图 out_root_path = "{}/已扣图".format(image_data["root_path"]) self.check_path(out_root_path) # 直接调用pixian # second_cut_image,_ = self.r_pixian.run_by_image_url(image_url) # todo 获取key n = 2 while 1: key = self.get_keys() if not key: _data = {"text": "出错/超时", "info": "多次获取key失败", "file_path": file_path } self.show_image_info(_data) self.send_info(text="{} 处理失败,请联系管理员".format(image_data["file_name"]), is_success=False) return n -= 1 second_cut_image, _ = self.r_pixian.run_by_image_im(_im, key) if second_cut_image is None: # todo 判断code if n <= 0: _data = {"text": "出错/超时", "info": "多次抠图失败:{}".format(_), "file_path": file_path } self.show_image_info(_data) self.send_info(text="{} 处理失败,请联系管理员".format(image_data["file_name"]), is_success=False) return time.sleep(6) continue else: break # 拼接处理 # print("耗时1:", time.time() - s) try: out_path = "{}/{}.png".format(out_root_path, image_data["file_name"]) if image_deal_info["二次抠图是否缩放"]: print("图片尺寸还原") second_cut_image = second_cut_image.resize(image_deal_info["抠图扩边后图片大小"]) # 创建空白图片并粘贴回去 _img_im = Image.new(mode="RGBA", size=image_deal_info["原始图片大小"], color=(0, 0, 0, 0)) _img_im.paste(second_cut_image, box=(image_deal_info["抠图扩边后位置"][0], image_deal_info["抠图扩边后位置"][1])) _img_im.save(out_path) except BaseException as e: print(e) text = "{} 图片处理错误,代码44".format(image_data["file_name"]) print(text) logger.error("{};{}".format(text, e)) self.send_info(text=text, is_success=False) _data = {"text": "出错/超时", "info": text, "file_path": file_path } self.show_image_info(_data) return with self.lock: print("耗时2:", time.time() - s) self.send_info(text="{} 抠图已完成".format(image_data["file_name"]), is_success=True) _data = {"text": "已完成", "info": "", "file_path": file_path } self.show_image_info(_data) def upload_pics(self, image_data): # todo 1、增加阿里调用报错的重试机制 while 1: if self.state != 1: return # print("self.is_upload_pic_num:", self.is_upload_pic_num) with self.lock: if self.is_upload_pic_num >= 4: f = False else: self.is_upload_pic_num += 1 f = True if f: break else: time.sleep(1) continue file_path = image_data["file_path"] image_deal_info = {} try: _data = {"text": "开始上传", "info": "", "file_path": file_path } self.show_image_info(_data) cut_image, image_deal_info = self.get_image_cut(file_path) # buffer = io.BytesIO() # cut_image.save(buffer, format='JPEG') # buffer.seek(0) # url = self.get_online_data.upload_pic(file_path=None, buffer=buffer) f = True url = "" _data = {"text": "上传成功", "info": "", "file_path": file_path } self.show_image_info(_data) except BaseException as e: print(e) _data = {"text": "上传出错", "info": "{}".format(e), "file_path": file_path } self.show_image_info(_data) text = "{} 图片上传".format(image_data["file_name"]) logger.error("{};{}".format(text, e)) self.send_info(text=text, is_success=None) f = False url = "" cut_image = "" with self.lock: # print(file_path, "图片已上传") self.upload_pic_dict[file_path] = {"url": url, "image_deal_info": image_deal_info, "flag": f, "_im": cut_image, } def upload_pics2222(self, image_data): while 1: if self.state != 1: return # print("self.is_upload_pic_num:", self.is_upload_pic_num) with self.lock: if self.is_upload_pic_num >= 4: f = False else: self.is_upload_pic_num += 1 f = True if f: break else: time.sleep(1) continue file_path = image_data["file_path"] image_deal_info = {} try: _data = {"text": "开始上传", "info": "", "file_path": file_path } self.show_image_info(_data) cut_image, image_deal_info = self.get_image_cut(file_path) buffer = io.BytesIO() cut_image.save(buffer, format='JPEG') buffer.seek(0) url = self.get_online_data.upload_pic(file_path=None, buffer=buffer) f = True # url = "" _data = {"text": "上传成功", "info": "", "file_path": file_path } self.show_image_info(_data) except BaseException as e: print(e) _data = {"text": "上传出错", "info": "{}".format(e), "file_path": file_path } self.show_image_info(_data) text = "{} 图片上传".format(image_data["file_name"]) logger.error("{};{}".format(text, e)) self.send_info(text=text, is_success=None) f = False url = "" cut_image = "" with self.lock: # print(file_path, "图片已上传") self.upload_pic_dict[file_path] = {"url": url, "image_deal_info": image_deal_info, "flag": f, "_im": cut_image, } def get_image_cut(self, file_path): original_pic = Picture(file_path) original_pic.im = original_pic.im.convert("RGB") image_deal_info = {} image_deal_info["原始图片大小"] = (original_pic.x, original_pic.y) if original_pic.x * original_pic.y < 1000000: cut_image = original_pic.im image_deal_info["抠图扩边后图片大小"] = cut_image.size image_deal_info["二次抠图是否缩放"] = False image_deal_info["抠图扩边后位置"] = (0, 0, original_pic.x, original_pic.y) else: # print("alilkoutu1") cut_image = self.r_ali.get_image_cut(file_path=None, out_file_path=None, original_im=original_pic.im) # print("alilkoutu2") x1, y1, x2, y2 = cut_image.getbbox() image_deal_info["鞋子原始位置"] = (x1, y1, x2, y2) o_w, o_h = cut_image.size image_deal_info["鞋子原始抠图后大小"] = (o_w, o_h) # 扩边处理 _w, _h = x2 - x1, y2 - y1 out_px = 0.06 _w, _h = int(out_px * _w), int(out_px * _h) n_x1, n_y1, n_x2, n_y2 = x1 - _w, y1 - _h, x2 + _w, y2 + _h if n_x1 < 0: n_x1 = 0 if n_y1 < 0: n_y1 = 0 if n_x2 > o_w: n_x2 = o_w if n_y2 > o_h: n_y2 = o_h image_deal_info["抠图扩边后位置"] = (n_x1, n_y1, n_x2, n_y2) cut_image = original_pic.im.crop(image_deal_info["抠图扩边后位置"]) image_deal_info["抠图扩边后图片大小"] = cut_image.size x, y = image_deal_info["抠图扩边后图片大小"] max_size = 32000000 # max_size = 1000000 if x * y > max_size: r = max_size / (x * y) size = (int(x * r), int(y * r)) print(size) cut_image = cut_image.resize(size=size) image_deal_info["二次抠图是否缩放"] = True else: image_deal_info["二次抠图是否缩放"] = False return cut_image, image_deal_info def check_path(self, _path): if not os.path.exists(_path): os.mkdir(_path) return True def md5_of_image(self, image_path): # 打开图片文件 with open(image_path, 'rb') as f: # 读取文件内容 data = f.read() # 计算MD5值 md5 = hashlib.md5(data).hexdigest() return md5