| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217 |
- from email.policy import default
- from settings import *
- from middleware import UnicornException
- import copy
- import requests
- from PIL import Image
- from io import BytesIO
- import base64,shutil
- from logger import logger
- '''前端生图接口'''
- generate_templace = "/generate"
- class CustomerTemplateService:
- def __init__(self):
- pass
- def generateTemplate(self,config_data,template_json,template_name,save_path):
- '''
- 参数:
- config_data: 配置数据
- template_json: 模板数据
- template_name: 模板名称
- save_path: 保存路径
- '''
- print("开始生成模板")
- # print("config_data",config_data)
- handler_config_data,model_image,scene_image = self.__handler_config_data(config_data)
- goods_no = list(handler_config_data.keys())[0]
- headers = {"Content-Type": "application/json"}
- json_data = {"goodsList":[handler_config_data],"canvasList":template_json}
- json_data = json.dumps(json_data,ensure_ascii=False)
- # print("json_data",json_data)
- template_result = requests.post(CUSTOMER_TEMPLATE_URL+generate_templace,data=json_data,headers=headers)
- resultJson = template_result.json()
- code = resultJson.get("code")
- msg = resultJson.get("msg")
- images = resultJson.get("images",[])
- if code != 0:
- raise UnicornException(f"详情页生成失败,请检查模板数据是否正确:{msg}")
- concat_images_array = []
- for image in images:
- canvasIndex = image.get("canvasIndex")
- dataUrl = image.get("dataUrl")
- save_name = f"{save_path}/切片图-{template_name}/{goods_no}({int(canvasIndex)+1}).png"
- match dataUrl:
- case "model":
- # 复制模特图进行拼接
- if model_image:
- model_copy_res = self.copyImage(model_image,save_name)
- if model_copy_res:
- model_image_pil = Image.open(model_image)
- concat_images_array.append(model_image_pil)
- case "scene":
- # 复制场景图进行拼接
- if scene_image:
- scene_copy_res = self.copyImage(scene_image,save_name)
- if scene_copy_res:
- scene_image_pil = Image.open(scene_image)
- concat_images_array.append(scene_image_pil)
- case _:
- pillowImage = self.save_base64_image(dataUrl,save_name)
- concat_images_array.append(pillowImage)
- print("模板生成成功")
-
- def concat_images_vertically(image_array, custom_width=None):
- """
- 按照顺序将图片数组拼接成长图,并统一图片宽度
- 参数:
- - image_array: list,存放 Pillow 图片对象的数组
- - custom_width: int,可选参数,指定统一的图片宽度(默认为第一张图的宽度)
- 返回:
- - concatenated_image: PIL.Image,拼接后的长图对象
- """
- if not image_array:
- raise ValueError("图片数组为空,无法拼接")
- # 1. 确定统一宽度
- base_width = custom_width or image_array[0].width
- # 2. 计算总高度和调整图片尺寸
- total_height = 0
- resized_images = []
- for img in image_array:
- # 调整图片宽度并保持宽高比
- width_ratio = base_width / img.width
- new_height = int(img.height * width_ratio)
- resized_img = img.resize((base_width, new_height), Image.ANTIALIAS)
- resized_images.append(resized_img)
- total_height += new_height
- # 3. 创建空白画布
- concatenated_image = Image.new("RGB", (base_width, total_height))
- # 4. 按顺序拼接图片
- y_offset = 0
- for resized_img in resized_images:
- concatenated_image.paste(resized_img, (0, y_offset))
- y_offset += resized_img.height
- return concatenated_image
- def __handler_config_data(self,config_data):
- '''
- 处理配置数据,返回一个新的数据对象
- '''
- # 深拷贝原始数据,确保不改变原数据对象
- new_config_data = copy.deepcopy(config_data)
- model_image = None
- scene_image = None
- # 如果输入是字典,则将其转换为目标结构
- if isinstance(new_config_data, dict):
- # result = []
- for key, item in new_config_data.items():
- # 提取需要添加的数据
- additional_data = {k: v for k, v in item.items() if k not in ["款号", "货号资料"]}
-
- # 遍历货号资料,将额外数据添加到每个货号对象中
- for product in item.get("货号资料", []):
- product.update(additional_data)
- # 处理 pics 字段中的 xx-抠图 转换为 Base64 并新增字段
- pics = product.get("pics", {})
- if not model_image:
- model_image = product.get("模特图", None)
- if not scene_image:
- scene_image = product.get("场景图", None)
- new_pics = {}
- for pic_key, pic_path in pics.items():
- if "-抠图" in pic_key:
- # 读取图片并转换为 Base64
- try:
- # base64_data = self.crop_image_and_convert_to_base64(pic_path)
- # 新增字段(去除 -抠图)
- new_key = pic_key.replace("-抠图", "")
- new_pics[new_key] = pic_path
- except Exception as e:
- print(f"读取图片失败: {pic_path}, 错误: {e}")
- else:
- # 非 -抠图 字段保持不变
- new_pics[pic_key] = pic_path
-
- # 更新 pics 字段
- product["pics"] = new_pics
-
- # 构建目标结构
- # result.append({key: item})
- # return result
-
- return new_config_data,model_image,scene_image
- def save_base64_image(self,base64_data, output_path):
- """
- 将 Base64 编码的图像保存到本地文件
- 参数:
- - base64_data: str,Base64 编码的图像数据(不包含前缀如 "data:image/png;base64,")
- - output_path: str,保存图像的本地路径
- """
- if "data:image/jpeg;base64," in base64_data:
- base64_data = base64_data.split(",")[1]
- try:
- # 1. 解码 Base64 数据
- image_data = base64.b64decode(base64_data)
-
- # 2. 加载图像数据
- image = Image.open(BytesIO(image_data))
- # 3. 检查路径是否存在,如果不存在则创建
- directory = os.path.dirname(output_path)
- if directory and not os.path.exists(directory):
- os.makedirs(directory)
- print(f"目录已创建: {directory}")
- # 4. 保存图像到本地
- image.save(output_path)
- print(f"图像已成功保存到 {output_path}")
- return image
- except Exception as e:
- print(f"保存图像失败: {e}")
- print(f"Base64 数据前 100 字符: {base64_data[:100]}")
- return None
-
- def crop_image_and_convert_to_base64(self,pic_path):
- """
- 使用 Pillow 裁剪图片并生成带有前缀的 Base64 图片
- 参数:
- - pic_path: str,图片文件路径
- 返回:
- - base64_data_with_prefix: str,带有前缀的 Base64 编码图片数据
- """
- try:
- # 1. 加载图片
- with Image.open(pic_path) as image:
- # 2. 获取图片的非透明区域边界框 (bounding box)
- bbox = image.getbbox()
- if not bbox:
- raise ValueError("图片可能是完全透明的,无法获取边界框")
-
- # 3. 裁剪图片
- cropped_image = image.crop(bbox)
-
- # 4. 将裁剪后的图片转换为 Base64
- buffered = BytesIO()
- cropped_image.save(buffered, format="PNG") # 保存为 PNG 格式
- base64_data = base64.b64encode(buffered.getvalue()).decode("utf-8")
-
- # 5. 添加 Base64 前缀
- base64_data_with_prefix = f"data:image/png;base64,{base64_data}"
- return base64_data_with_prefix
- except Exception as e:
- print(f"处理图片失败: {pic_path}, 错误: {e}")
- return None
- def copyImage(self,src_path,limit_path):
- try:
- shutil.copy(src_path, limit_path)
- return True
- except Exception as e:
- logger.info(f"copyImage 复制模特图/场景图出错:{str(e)}",src_path,limit_path)
- return False
|