from email.policy import default from settings import * from middleware import UnicornException import copy import requests from PIL import Image from io import BytesIO import base64,shutil from logger import logger from natsort import ns, natsorted from service.base import get_images, check_path, get_image_mask '''前端生图接口''' # generate_templace = "/generate" class CustomerTemplateService: def __init__(self): pass def parse_template_json(self,template_json): """ 解析 template_json 数据(如果它是 URL) 参数: - template_json: str,模板数据(可能是 URL 或 JSON 字符串) 返回: - dict,解析后的 JSON 数据 """ try: # 检查是否为 URL if isinstance(template_json, str) and (template_json.startswith("http://") or template_json.startswith("https://")): # 发送 GET 请求获取数据 response = requests.get(template_json) response.raise_for_status() # 检查请求是否成功 # 解析 JSON 数据 parsed_data = response.json() return parsed_data else: # 如果不是 URL,直接解析为 JSON return json.loads(template_json) except requests.exceptions.RequestException as e: print(f"网络请求失败: {e}") return None except json.JSONDecodeError as e: print(f"JSON 解析失败: {e}") return None def generateTemplate(self,config_data,template_json,template_name,save_path): ''' 参数: config_data: 配置数据 template_json: 模板数据 template_name: 模板名称 save_path: 保存路径 ''' print("开始生成模板") template_json_data = self.parse_template_json(template_json) # print("config_data",config_data) handler_config_data,model_image,scene_image = self.__handler_config_data(config_data,save_path) self.goods_no_value = handler_config_data headers = {"Content-Type": "application/json"} json_data = {"goodsList":[{self.goods_no:handler_config_data}],"canvasList":template_json_data} json_data = json.dumps(json_data,ensure_ascii=False) # print("json_data",json_data) template_result = requests.post(CUSTOMER_TEMPLATE_URL+generate_templace,data=json_data,headers=headers) resultJson = template_result.json() code = resultJson.get("code") msg = resultJson.get("msg") images = resultJson.get("images",[]) if code != 0: raise UnicornException(f"详情页生成失败,请检查模板数据是否正确:{msg}") concat_images_array = [] for image in images: canvasIndex = image.get("canvasIndex") dataUrl = image.get("dataUrl") save_name = f"{save_path}/切片图-{template_name}/{self.goods_no}({int(canvasIndex)+1}).png" match dataUrl: case "model": # 复制模特图进行拼接 if model_image: model_copy_res = self.copyImage(model_image,save_name) if model_copy_res: model_image_pil = Image.open(model_image) concat_images_array.append(model_image_pil) case "scene": # 复制场景图进行拼接 if scene_image: scene_copy_res = self.copyImage(scene_image,save_name) if scene_copy_res: scene_image_pil = Image.open(scene_image) concat_images_array.append(scene_image_pil) case _: pillowImage = self.save_base64_image(dataUrl,save_name) concat_images_array.append(pillowImage) long_image = self.concat_images_vertically(concat_images_array) if long_image: save_name = f"{save_path}/详情页-{template_name}.jpg" long_image.save(save_name,format="JPEG") self.move_other_pic(move_main_pic=True,save_path=save_path) print("模板生成成功") def create_folder(self, path): # 创建目录 if path and not os.path.exists(path): print(f"创建目录 详情页=================>>>>:{path}") os.makedirs(path) def move_other_pic(self, move_main_pic=True,save_path=""): sorted_list_800 = [] for goods_art_no_dict in self.goods_no_value["货号资料"]: if "800x800" not in goods_art_no_dict: continue if not goods_art_no_dict["800x800"]: continue goods_art_no = "" if "编号" in goods_art_no_dict: if goods_art_no_dict["编号"]: goods_art_no = goods_art_no_dict["编号"] if not goods_art_no: goods_art_no = goods_art_no_dict["货号"] sorted_list_800 = natsorted(goods_art_no_dict["800x800"], key=lambda x: x.split("(")[1].split(")")[0]) self.create_folder(save_path) # 放入一张主图 old_pic_path_1 = sorted_list_800[0] shutil.copy( old_pic_path_1, "{}/颜色图{}{}".format( save_path, goods_art_no, os.path.splitext(old_pic_path_1)[1] ), ) # 把其他主图放入作为款号图===================== if move_main_pic: for idx,pic_path in enumerate(sorted_list_800): index = idx + 1 try: split_size = pic_path.split("_")[1].split(".")[0] except: split_size = "" suffix_name = "_"+split_size if split_size else "" print("pic_path=========>",split_size) e = os.path.splitext(pic_path)[1] shutil.copy( pic_path, "{out_put_dir}/主图{goods_no}({goods_no_main_pic_number}){suffix_name}{e}".format( out_put_dir=save_path, goods_no=goods_art_no, goods_no_main_pic_number=str( index ), e=e, suffix_name=suffix_name ), ) def concat_images_vertically(self,image_array, custom_width=None): """ 按照顺序将图片数组拼接成长图,并统一图片宽度 参数: - image_array: list,存放 Pillow 图片对象的数组 - custom_width: int,可选参数,指定统一的图片宽度(默认为第一张图的宽度) 返回: - concatenated_image: PIL.Image,拼接后的长图对象 """ if not image_array: return None # 1. 确定统一宽度 base_width = custom_width or image_array[0].width # 2. 计算总高度和调整图片尺寸 total_height = 0 resized_images = [] for img in image_array: # 调整图片宽度并保持宽高比 width_ratio = base_width / img.width new_height = int(img.height * width_ratio) resized_img = img.resize((base_width, new_height), Image.Resampling.LANCZOS) resized_images.append(resized_img) total_height += new_height # 3. 创建空白画布 concatenated_image = Image.new("RGB", (base_width, total_height)) # 4. 按顺序拼接图片 y_offset = 0 for resized_img in resized_images: concatenated_image.paste(resized_img, (0, y_offset)) y_offset += resized_img.height return concatenated_image def __handler_config_data(self,config_data,scp_path): ''' 处理配置数据,返回一个新的数据对象 ''' directory = os.path.dirname(scp_path) self.create_folder(directory) # 深拷贝原始数据,确保不改变原数据对象 new_config_data = copy.deepcopy(config_data) print("传入的config数据",new_config_data) model_image = None scene_image = None self.goods_no = new_config_data.get("款号") # 如果输入是字典,则将其转换为目标结构 # 提取需要添加的数据,排除特定字段 additional_data = {k: v for k, v in new_config_data.items() if k not in ["款号", "货号资料"]} # 遍历货号资料,将额外数据添加到每个货号对象中 for product in new_config_data.get("货号资料", []): product.update(additional_data) # 处理 pics 字段中的 xx-抠图 转换为 Base64 并新增字段 pics = product.get("pics", {}) goods_art_no = product.get("货号",None) goods_art_lens = len(new_config_data.get("货号资料", [])) concat_shuffix = "" if goods_art_lens == 1 else f"_{goods_art_no}" if not model_image: model_image = product.get("模特图", None) if model_image: self.copyImage(model_image, f"{scp_path}/模特图{concat_shuffix}.jpg") if not scene_image: scene_image = product.get("场景图", None) if scene_image: self.copyImage(scene_image, f"{scp_path}/场景图{concat_shuffix}.jpg") new_pics = {} for pic_key, pic_path in pics.items(): if "-抠图" in pic_key: # 读取图片并转换为 Base64 try: base64_data = self.crop_image_and_convert_to_base64(pic_path) # 新增字段(去除 -抠图) new_key = pic_key.replace("-抠图", "") new_pics[new_key] = base64_data except Exception as e: print(f"读取图片失败: {pic_path}, 错误: {e}") else: # 非 -抠图 字段保持不变 new_pics[pic_key] = pic_path # 更新 pics 字段 product["pics"] = new_pics # 构建目标结构 # result.append({key: item}) # return result return new_config_data,model_image,scene_image def save_base64_image(self,base64_data, output_path): """ 将 Base64 编码的图像保存到本地文件 参数: - base64_data: str,Base64 编码的图像数据(不包含前缀如 "data:image/png;base64,") - output_path: str,保存图像的本地路径 """ if "data:image/jpeg;base64," in base64_data: base64_data = base64_data.split(",")[1] try: # 1. 解码 Base64 数据 image_data = base64.b64decode(base64_data) # 2. 加载图像数据 image = Image.open(BytesIO(image_data)) # 3. 检查路径是否存在,如果不存在则创建 directory = os.path.dirname(output_path) self.create_folder(directory) # 4. 保存图像到本地 image.save(output_path) print(f"图像已成功保存到 {output_path}") return image except Exception as e: print(f"保存图像失败: {e}") print(f"Base64 数据前 100 字符: {base64_data[:100]}") return None def crop_image_and_convert_to_base64(self,pic_path): """ 使用 Pillow 裁剪图片并生成带有前缀的 Base64 图片 参数: - pic_path: str,图片文件路径 返回: - base64_data_with_prefix: str,带有前缀的 Base64 编码图片数据 """ try: # 1. 加载图片 with Image.open(pic_path) as image: # 2. 获取图片的非透明区域边界框 (bounding box) bbox = image.getbbox() if not bbox: raise ValueError("图片可能是完全透明的,无法获取边界框") # 3. 裁剪图片 cropped_image = image.crop(bbox) # 4. 将裁剪后的图片转换为 Base64 buffered = BytesIO() cropped_image.save(buffered, format="PNG") # 保存为 PNG 格式 base64_data = base64.b64encode(buffered.getvalue()).decode("utf-8") # 5. 添加 Base64 前缀 base64_data_with_prefix = f"data:image/png;base64,{base64_data}" return base64_data_with_prefix except Exception as e: print(f"处理图片失败: {pic_path}, 错误: {e}") return None def copyImage(self,src_path,limit_path): try: shutil.copy(src_path, limit_path) return True except Exception as e: logger.info(f"copyImage 复制模特图/场景图出错:{str(e)}",src_path,limit_path) return False