| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320 |
- from email.policy import default
- from settings import *
- from middleware import UnicornException
- import copy
- import requests
- from PIL import Image
- from io import BytesIO
- import base64,shutil
- from logger import logger
- from natsort import ns, natsorted
- from service.base import get_images, check_path, get_image_mask
- '''前端生图接口'''
- #
- generate_templace = "/generate"
- class CustomerTemplateService:
- def __init__(self):
- pass
-
- def parse_template_json(self,template_json):
- """
- 解析 template_json 数据(如果它是 URL)
- 参数:
- - template_json: str,模板数据(可能是 URL 或 JSON 字符串)
- 返回:
- - dict,解析后的 JSON 数据
- """
- try:
- # 检查是否为 URL
- if isinstance(template_json, str) and (template_json.startswith("http://") or template_json.startswith("https://")):
- # 发送 GET 请求获取数据
- response = requests.get(template_json)
- response.raise_for_status() # 检查请求是否成功
- # 解析 JSON 数据
- parsed_data = response.json()
- return parsed_data
- else:
- # 如果不是 URL,直接解析为 JSON
- return json.loads(template_json)
- except requests.exceptions.RequestException as e:
- print(f"网络请求失败: {e}")
- return None
- except json.JSONDecodeError as e:
- print(f"JSON 解析失败: {e}")
- return None
-
- def generateTemplate(self,config_data,template_json,template_name,save_path):
- '''
- 参数:
- config_data: 配置数据
- template_json: 模板数据
- template_name: 模板名称
- save_path: 保存路径
- '''
- print("开始生成模板")
- template_json_data = self.parse_template_json(template_json)
- # print("config_data",config_data)
- handler_config_data,model_image,scene_image = self.__handler_config_data(config_data,save_path)
- self.goods_no_value = handler_config_data
- headers = {"Content-Type": "application/json"}
- json_data = {"goodsList":[{self.goods_no:handler_config_data}],"canvasList":template_json_data}
- json_data = json.dumps(json_data,ensure_ascii=False)
- # print("json_data",json_data)
- template_result = requests.post(CUSTOMER_TEMPLATE_URL+generate_templace,data=json_data,headers=headers)
- resultJson = template_result.json()
- code = resultJson.get("code")
- msg = resultJson.get("msg")
- images = resultJson.get("images",[])
- if code != 0:
- raise UnicornException(f"详情页生成失败,请检查模板数据是否正确:{msg}")
- concat_images_array = []
- for image in images:
- canvasIndex = image.get("canvasIndex")
- dataUrl = image.get("dataUrl")
- save_name = f"{save_path}/切片图-{template_name}/{self.goods_no}({int(canvasIndex)+1}).png"
- match dataUrl:
- case "model":
- # 复制模特图进行拼接
- if model_image:
- model_copy_res = self.copyImage(model_image,save_name)
- if model_copy_res:
- model_image_pil = Image.open(model_image)
- concat_images_array.append(model_image_pil)
- case "scene":
- # 复制场景图进行拼接
- if scene_image:
- scene_copy_res = self.copyImage(scene_image,save_name)
- if scene_copy_res:
- scene_image_pil = Image.open(scene_image)
- concat_images_array.append(scene_image_pil)
- case _:
- pillowImage = self.save_base64_image(dataUrl,save_name)
- concat_images_array.append(pillowImage)
- long_image = self.concat_images_vertically(concat_images_array)
- if long_image:
- save_name = f"{save_path}/详情页-{template_name}.jpg"
- long_image.save(save_name,format="JPEG")
- self.move_other_pic(move_main_pic=True,save_path=save_path)
- print("模板生成成功")
- try:
- # 删除 800x800 目录及其内容
- directory_to_remove = os.path.join(save_path, "800x800")
- if os.path.exists(directory_to_remove):
- shutil.rmtree(directory_to_remove)
- print(f"目录 {directory_to_remove} 已成功删除")
- except Exception as e:
- print(f"删除目录失败: {e}")
- def create_folder(self, path):
- # 创建目录
- if path and not os.path.exists(path):
- print(f"创建目录 详情页=================>>>>:{path}")
- os.makedirs(path)
- def move_other_pic(self, move_main_pic=True,save_path=""):
- sorted_list_800 = []
- for goods_art_no_dict in self.goods_no_value["货号资料"]:
- if "800x800" not in goods_art_no_dict:
- continue
- if not goods_art_no_dict["800x800"]:
- continue
- goods_art_no = ""
- if "编号" in goods_art_no_dict:
- if goods_art_no_dict["编号"]:
- goods_art_no = goods_art_no_dict["编号"]
- if not goods_art_no:
- goods_art_no = goods_art_no_dict["货号"]
- sorted_list_800 = natsorted(goods_art_no_dict["800x800"], key=lambda x: x.split("(")[1].split(")")[0])
- self.create_folder(save_path)
- # 放入一张主图
- old_pic_path_1 = sorted_list_800[0]
- shutil.copy(
- old_pic_path_1,
- "{}/颜色图{}{}".format(
- save_path, goods_art_no, os.path.splitext(old_pic_path_1)[1]
- ),
- )
- # 把其他主图放入作为款号图=====================
- if move_main_pic:
- for idx,pic_path in enumerate(sorted_list_800):
- index = idx + 1
- try:
- split_size = pic_path.split("_")[1].split(".")[0]
- except:
- split_size = ""
- suffix_name = "_"+split_size if split_size else ""
- print("pic_path=========>",split_size)
- e = os.path.splitext(pic_path)[1]
- shutil.copy(
- pic_path,
- "{out_put_dir}/主图{goods_no}({goods_no_main_pic_number}){suffix_name}{e}".format(
- out_put_dir=save_path,
- goods_no=goods_art_no,
- goods_no_main_pic_number=str(
- index
- ),
- e=e,
- suffix_name=suffix_name
- ),
- )
- def concat_images_vertically(self,image_array, custom_width=None):
- """
- 按照顺序将图片数组拼接成长图,并统一图片宽度
- 参数:
- - image_array: list,存放 Pillow 图片对象的数组
- - custom_width: int,可选参数,指定统一的图片宽度(默认为第一张图的宽度)
- 返回:
- - concatenated_image: PIL.Image,拼接后的长图对象
- """
- if not image_array:
- return None
- # 1. 确定统一宽度
- base_width = custom_width or image_array[0].width
- # 2. 计算总高度和调整图片尺寸
- total_height = 0
- resized_images = []
- for img in image_array:
- # 调整图片宽度并保持宽高比
- width_ratio = base_width / img.width
- new_height = int(img.height * width_ratio)
- resized_img = img.resize((base_width, new_height), Image.Resampling.LANCZOS)
- resized_images.append(resized_img)
- total_height += new_height
- # 3. 创建空白画布
- concatenated_image = Image.new("RGB", (base_width, total_height))
- # 4. 按顺序拼接图片
- y_offset = 0
- for resized_img in resized_images:
- concatenated_image.paste(resized_img, (0, y_offset))
- y_offset += resized_img.height
- return concatenated_image
- def __handler_config_data(self,config_data,scp_path):
- '''
- 处理配置数据,返回一个新的数据对象
- '''
- directory = os.path.dirname(scp_path)
- self.create_folder(directory)
- # 深拷贝原始数据,确保不改变原数据对象
- new_config_data = copy.deepcopy(config_data)
- print("传入的config数据",new_config_data)
- model_image = None
- scene_image = None
- self.goods_no = new_config_data.get("款号")
- # 如果输入是字典,则将其转换为目标结构
- # 提取需要添加的数据,排除特定字段
- additional_data = {k: v for k, v in new_config_data.items() if k not in ["款号", "货号资料"]}
- # 遍历货号资料,将额外数据添加到每个货号对象中
- for product in new_config_data.get("货号资料", []):
- product.update(additional_data)
- # 处理 pics 字段中的 xx-抠图 转换为 Base64 并新增字段
- pics = product.get("pics", {})
- goods_art_no = product.get("货号",None)
- goods_art_lens = len(new_config_data.get("货号资料", []))
- concat_shuffix = "" if goods_art_lens == 1 else f"_{goods_art_no}"
- if model_image is None:
- model_image_path = product.get("模特图", None)
- if model_image_path:
- model_image = model_image_path
- self.copyImage(model_image_path, f"{scp_path}/模特图{concat_shuffix}.jpg")
- if scene_image is None:
- scene_image_path = product.get("场景图", None)
- if scene_image_path:
- scene_image = scene_image_path
- self.copyImage(scene_image_path, f"{scp_path}/场景图{concat_shuffix}.jpg")
- new_pics = {}
- for pic_key, pic_path in pics.items():
- if "-抠图" in pic_key:
- # 读取图片并转换为 Base64
- try:
- base64_data = self.crop_image_and_convert_to_base64(pic_path)
- # 新增字段(去除 -抠图)
- new_key = pic_key.replace("-抠图", "")
- new_pics[new_key] = base64_data
- except Exception as e:
- print(f"读取图片失败: {pic_path}, 错误: {e}")
- else:
- # 非 -抠图 字段保持不变
- new_pics[pic_key] = pic_path
- # 更新 pics 字段
- product["pics"] = new_pics
-
- # 构建目标结构
- # result.append({key: item})
- # return result
- return new_config_data,model_image,scene_image
- def save_base64_image(self,base64_data, output_path):
- """
- 将 Base64 编码的图像保存到本地文件
- 参数:
- - base64_data: str,Base64 编码的图像数据(不包含前缀如 "data:image/png;base64,")
- - output_path: str,保存图像的本地路径
- """
- if "data:image/jpeg;base64," in base64_data:
- base64_data = base64_data.split(",")[1]
- try:
- # 1. 解码 Base64 数据
- image_data = base64.b64decode(base64_data)
-
- # 2. 加载图像数据
- image = Image.open(BytesIO(image_data))
- # 3. 检查路径是否存在,如果不存在则创建
- directory = os.path.dirname(output_path)
- self.create_folder(directory)
- # 4. 保存图像到本地
- image.save(output_path)
- print(f"图像已成功保存到 {output_path}")
- return image
- except Exception as e:
- print(f"保存图像失败: {e}")
- print(f"Base64 数据前 100 字符: {base64_data[:100]}")
- return None
-
- def crop_image_and_convert_to_base64(self,pic_path):
- """
- 使用 Pillow 裁剪图片并生成带有前缀的 Base64 图片
- 参数:
- - pic_path: str,图片文件路径
- 返回:
- - base64_data_with_prefix: str,带有前缀的 Base64 编码图片数据
- """
- try:
- # 1. 加载图片
- with Image.open(pic_path) as image:
- # 2. 获取图片的非透明区域边界框 (bounding box)
- bbox = image.getbbox()
- if not bbox:
- raise ValueError("图片可能是完全透明的,无法获取边界框")
-
- # 3. 裁剪图片
- cropped_image = image.crop(bbox)
-
- # 4. 将裁剪后的图片转换为 Base64
- buffered = BytesIO()
- cropped_image.save(buffered, format="PNG") # 保存为 PNG 格式
- base64_data = base64.b64encode(buffered.getvalue()).decode("utf-8")
-
- # 5. 添加 Base64 前缀
- base64_data_with_prefix = f"data:image/png;base64,{base64_data}"
- return base64_data_with_prefix
- except Exception as e:
- print(f"处理图片失败: {pic_path}, 错误: {e}")
- return None
- def copyImage(self,src_path,limit_path):
- try:
- shutil.copy(src_path, limit_path)
- return True
- except Exception as e:
- logger.info(f"copyImage 复制模特图/场景图出错:{str(e)}",src_path,limit_path)
- return False
|