from email.policy import default from settings import * from middleware import UnicornException import copy import requests from PIL import Image from io import BytesIO import base64,shutil from logger import logger '''前端生图接口''' generate_templace = "/generate" class CustomerTemplateService: def __init__(self): pass def generateTemplate(self,config_data,template_json,template_name,save_path): ''' 参数: config_data: 配置数据 template_json: 模板数据 template_name: 模板名称 save_path: 保存路径 ''' print("开始生成模板") # print("config_data",config_data) handler_config_data,model_image,scene_image = self.__handler_config_data(config_data) goods_no = list(handler_config_data.keys())[0] headers = {"Content-Type": "application/json"} json_data = {"goodsList":[handler_config_data],"canvasList":template_json} json_data = json.dumps(json_data,ensure_ascii=False) # print("json_data",json_data) template_result = requests.post(CUSTOMER_TEMPLATE_URL+generate_templace,data=json_data,headers=headers) resultJson = template_result.json() code = resultJson.get("code") msg = resultJson.get("msg") images = resultJson.get("images",[]) if code != 0: raise UnicornException(f"详情页生成失败,请检查模板数据是否正确:{msg}") concat_images_array = [] for image in images: canvasIndex = image.get("canvasIndex") dataUrl = image.get("dataUrl") save_name = f"{save_path}/切片图-{template_name}/{goods_no}({int(canvasIndex)+1}).png" match dataUrl: case "model": # 复制模特图进行拼接 if model_image: model_copy_res = self.copyImage(model_image,save_name) if model_copy_res: model_image_pil = Image.open(model_image) concat_images_array.append(model_image_pil) case "scene": # 复制场景图进行拼接 if scene_image: scene_copy_res = self.copyImage(scene_image,save_name) if scene_copy_res: scene_image_pil = Image.open(scene_image) concat_images_array.append(scene_image_pil) case _: pillowImage = self.save_base64_image(dataUrl,save_name) concat_images_array.append(pillowImage) long_image = self.concat_images_vertically(concat_images_array) save_name = f"{save_path}/详情页-{template_name}.jpg" long_image.save(save_name,format="JPEG") print("模板生成成功") def concat_images_vertically(self,image_array, custom_width=None): """ 按照顺序将图片数组拼接成长图,并统一图片宽度 参数: - image_array: list,存放 Pillow 图片对象的数组 - custom_width: int,可选参数,指定统一的图片宽度(默认为第一张图的宽度) 返回: - concatenated_image: PIL.Image,拼接后的长图对象 """ if not image_array: raise ValueError("图片数组为空,无法拼接") # 1. 确定统一宽度 base_width = custom_width or image_array[0].width # 2. 计算总高度和调整图片尺寸 total_height = 0 resized_images = [] for img in image_array: # 调整图片宽度并保持宽高比 width_ratio = base_width / img.width new_height = int(img.height * width_ratio) resized_img = img.resize((base_width, new_height), Image.Resampling.LANCZOS) resized_images.append(resized_img) total_height += new_height # 3. 创建空白画布 concatenated_image = Image.new("RGB", (base_width, total_height)) # 4. 按顺序拼接图片 y_offset = 0 for resized_img in resized_images: concatenated_image.paste(resized_img, (0, y_offset)) y_offset += resized_img.height return concatenated_image def __handler_config_data(self,config_data): ''' 处理配置数据,返回一个新的数据对象 ''' # 深拷贝原始数据,确保不改变原数据对象 new_config_data = copy.deepcopy(config_data) model_image = None scene_image = None # 如果输入是字典,则将其转换为目标结构 if isinstance(new_config_data, dict): # result = [] for key, item in new_config_data.items(): # 提取需要添加的数据 additional_data = {k: v for k, v in item.items() if k not in ["款号", "货号资料"]} # 遍历货号资料,将额外数据添加到每个货号对象中 for product in item.get("货号资料", []): product.update(additional_data) # 处理 pics 字段中的 xx-抠图 转换为 Base64 并新增字段 pics = product.get("pics", {}) if not model_image: model_image = product.get("模特图", None) if not scene_image: scene_image = product.get("场景图", None) new_pics = {} for pic_key, pic_path in pics.items(): if "-抠图" in pic_key: # 读取图片并转换为 Base64 try: # base64_data = self.crop_image_and_convert_to_base64(pic_path) # 新增字段(去除 -抠图) new_key = pic_key.replace("-抠图", "") new_pics[new_key] = pic_path except Exception as e: print(f"读取图片失败: {pic_path}, 错误: {e}") else: # 非 -抠图 字段保持不变 new_pics[pic_key] = pic_path # 更新 pics 字段 product["pics"] = new_pics # 构建目标结构 # result.append({key: item}) # return result return new_config_data,model_image,scene_image def save_base64_image(self,base64_data, output_path): """ 将 Base64 编码的图像保存到本地文件 参数: - base64_data: str,Base64 编码的图像数据(不包含前缀如 "data:image/png;base64,") - output_path: str,保存图像的本地路径 """ if "data:image/jpeg;base64," in base64_data: base64_data = base64_data.split(",")[1] try: # 1. 解码 Base64 数据 image_data = base64.b64decode(base64_data) # 2. 加载图像数据 image = Image.open(BytesIO(image_data)) # 3. 检查路径是否存在,如果不存在则创建 directory = os.path.dirname(output_path) if directory and not os.path.exists(directory): os.makedirs(directory) print(f"目录已创建: {directory}") # 4. 保存图像到本地 image.save(output_path) print(f"图像已成功保存到 {output_path}") return image except Exception as e: print(f"保存图像失败: {e}") print(f"Base64 数据前 100 字符: {base64_data[:100]}") return None def crop_image_and_convert_to_base64(self,pic_path): """ 使用 Pillow 裁剪图片并生成带有前缀的 Base64 图片 参数: - pic_path: str,图片文件路径 返回: - base64_data_with_prefix: str,带有前缀的 Base64 编码图片数据 """ try: # 1. 加载图片 with Image.open(pic_path) as image: # 2. 获取图片的非透明区域边界框 (bounding box) bbox = image.getbbox() if not bbox: raise ValueError("图片可能是完全透明的,无法获取边界框") # 3. 裁剪图片 cropped_image = image.crop(bbox) # 4. 将裁剪后的图片转换为 Base64 buffered = BytesIO() cropped_image.save(buffered, format="PNG") # 保存为 PNG 格式 base64_data = base64.b64encode(buffered.getvalue()).decode("utf-8") # 5. 添加 Base64 前缀 base64_data_with_prefix = f"data:image/png;base64,{base64_data}" return base64_data_with_prefix except Exception as e: print(f"处理图片失败: {pic_path}, 错误: {e}") return None def copyImage(self,src_path,limit_path): try: shutil.copy(src_path, limit_path) return True except Exception as e: logger.info(f"copyImage 复制模特图/场景图出错:{str(e)}",src_path,limit_path) return False