|
|
@@ -4,6 +4,10 @@ import settings
|
|
|
import json, asyncio
|
|
|
import numpy as np
|
|
|
from utils.common import message_queue
|
|
|
+from middleware import UnicornException
|
|
|
+from PIL import Image
|
|
|
+import io, time
|
|
|
+
|
|
|
|
|
|
class JsonEncoder(json.JSONEncoder):
|
|
|
"""Convert numpy classes to JSON serializable objects."""
|
|
|
@@ -17,6 +21,183 @@ class JsonEncoder(json.JSONEncoder):
|
|
|
return super(JsonEncoder, self).default(obj)
|
|
|
|
|
|
|
|
|
+def download_image_with_pil(url, save_path):
|
|
|
+ """通过url保存图片"""
|
|
|
+ try:
|
|
|
+ # 发送请求获取图片数据
|
|
|
+ response = requests.get(url)
|
|
|
+ image_data = response.content
|
|
|
+ # 使用PIL处理下载的图片
|
|
|
+ image = Image.open(io.BytesIO(image_data))
|
|
|
+ image = image.convert("RGB")
|
|
|
+ # 根据当前时间生成文件名
|
|
|
+ # 保存图片
|
|
|
+ image.save(save_path)
|
|
|
+ return save_path
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ print("保存错误",e)
|
|
|
+ return "error"
|
|
|
+
|
|
|
+
|
|
|
+class AIGCDataRequest(object):
|
|
|
+ def __init__(self, token):
|
|
|
+ self.s = requests.session()
|
|
|
+ self.token = token
|
|
|
+ self.post_headers = {
|
|
|
+ "Authorization": token,
|
|
|
+ }
|
|
|
+
|
|
|
+ def uploadImage(self, local_path: str) -> str:
|
|
|
+ post_headers = {"Authorization": self.token}
|
|
|
+ url = settings.DOMAIN + "/api/upload"
|
|
|
+ resultData = self.s.post(
|
|
|
+ url, files={"file": open(local_path, "rb")}, headers=post_headers
|
|
|
+ ).json()
|
|
|
+ return resultData["data"]["url"]
|
|
|
+
|
|
|
+ def center_paste_image(
|
|
|
+ self, source_image_path, output_path, width=900, ratio=(3, 4)
|
|
|
+ ):
|
|
|
+ """
|
|
|
+ 将一张PNG透明图片居中粘贴到指定尺寸和比例的背景上,保持透明度
|
|
|
+
|
|
|
+ Args:
|
|
|
+ source_image_path: 源PNG图片路径
|
|
|
+ output_path: 输出图片路径
|
|
|
+ width: 目标图片宽度
|
|
|
+ ratio: 目标图片比例 (宽:高)
|
|
|
+ """
|
|
|
+ # 计算目标尺寸 (900 x 1200)
|
|
|
+ target_width = width
|
|
|
+ target_height = int(width * ratio[1] / ratio[0])
|
|
|
+
|
|
|
+ # 创建透明背景图片(RGBA模式)
|
|
|
+ background = Image.new(
|
|
|
+ "RGBA", (target_width, target_height), (255, 255, 255, 0)
|
|
|
+ )
|
|
|
+
|
|
|
+ # 打开源图片
|
|
|
+ source_img = Image.open(source_image_path)
|
|
|
+
|
|
|
+ # 确保源图片是RGBA模式以保持透明度
|
|
|
+ if source_img.mode != "RGBA":
|
|
|
+ source_img = source_img.convert("RGBA")
|
|
|
+
|
|
|
+ # 调整源图片大小以适应目标尺寸,保持原图比例
|
|
|
+ source_width, source_height = source_img.size
|
|
|
+ scale = min(target_width / source_width, target_height / source_height)
|
|
|
+ new_width = int(source_width * scale)
|
|
|
+ new_height = int(source_height * scale)
|
|
|
+
|
|
|
+ # 调整源图片尺寸
|
|
|
+ source_img_resized = source_img.resize(
|
|
|
+ (new_width, new_height), Image.Resampling.LANCZOS
|
|
|
+ )
|
|
|
+
|
|
|
+ # 计算居中粘贴位置
|
|
|
+ paste_x = (target_width - new_width) // 2
|
|
|
+ paste_y = (target_height - new_height) // 2
|
|
|
+
|
|
|
+ # 将调整后的源图片居中粘贴到背景上,保留透明度
|
|
|
+ background.paste(source_img_resized, (paste_x, paste_y), source_img_resized)
|
|
|
+
|
|
|
+ # 如果需要保存为PNG格式以保持透明度
|
|
|
+ if output_path.lower().endswith(".png"):
|
|
|
+ background.save(output_path, format="PNG")
|
|
|
+ else:
|
|
|
+ # 如果保存为JPG等不支持透明度的格式,转换为RGB并使用白色背景
|
|
|
+ background = background.convert("RGB")
|
|
|
+ background.save(output_path)
|
|
|
+
|
|
|
+ return background
|
|
|
+
|
|
|
+ def generateProductScene(self, local_path, prompt, save_path):
|
|
|
+ imageUrl = self.uploadImage(local_path)
|
|
|
+ print("imageUrl", imageUrl)
|
|
|
+ data = {
|
|
|
+ "site": 1,
|
|
|
+ "base_image": imageUrl,
|
|
|
+ "keyword": prompt,
|
|
|
+ "model_type": 1,
|
|
|
+ "gemini_model": "gemini-2.5-flash-image-preview",
|
|
|
+ }
|
|
|
+ """生成场景图"""
|
|
|
+ url = settings.DOMAIN + "/api/ai_image/inspired/command_to_image"
|
|
|
+ resultData = self.s.post(url, data=data, headers=self.post_headers).json()
|
|
|
+ code = resultData.get("code", 0)
|
|
|
+ message = resultData.get("message", "")
|
|
|
+ if code != 0:
|
|
|
+ raise UnicornException(message)
|
|
|
+ image_arr = resultData.get("data", None).get("image", [])
|
|
|
+ if len(image_arr) == 0:
|
|
|
+ raise UnicornException("场景图生成失败")
|
|
|
+ image_url = image_arr[0]
|
|
|
+ save_image_path = download_image_with_pil(image_url, save_path)
|
|
|
+ return save_image_path
|
|
|
+
|
|
|
+ def searchProgress(self, id):
|
|
|
+ """查询进度"""
|
|
|
+ url = settings.DOMAIN + "/api/ai_image/main/search_bacth_progress"
|
|
|
+ data = {"site": 1, "generate_ids": [id], "type": "aigc_pro"}
|
|
|
+ resultData = self.s.post(url, json=data, headers=self.post_headers)
|
|
|
+ resultData = resultData.json()
|
|
|
+ code = resultData.get("code", 0)
|
|
|
+ message = resultData.get("message", "")
|
|
|
+ if code != 0:
|
|
|
+ raise UnicornException(message)
|
|
|
+ data_result = resultData.get("data", [])
|
|
|
+ if len(data_result) == 0:
|
|
|
+ return -1, None
|
|
|
+ data_item = data_result[0]
|
|
|
+ status = data_item.get("status", -1)
|
|
|
+ if status in [0,1]:
|
|
|
+ return status, None
|
|
|
+ result_image_urls = data_item.get("result_image_urls", [])
|
|
|
+ result_image = result_image_urls[0] if len(result_image_urls) > 0 else None
|
|
|
+ return status, result_image
|
|
|
+
|
|
|
+ def generateUpperShoes(self, local_path, model_id, save_path):
|
|
|
+ """生成上脚图"""
|
|
|
+ print("生成上脚图", local_path, model_id, save_path)
|
|
|
+ imageUrl = self.uploadImage(local_path)
|
|
|
+ data = {
|
|
|
+ "site": 1,
|
|
|
+ "model_template_id": model_id,
|
|
|
+ "base_image": imageUrl,
|
|
|
+ "creatClass": "鞋子上脚图-鞋子上脚图",
|
|
|
+ "creatTimer": 40,
|
|
|
+ "pname": "OnFeetImage",
|
|
|
+ }
|
|
|
+ """生成上脚图"""
|
|
|
+ url = settings.DOMAIN + "/api/ai_image/main/upper_footer"
|
|
|
+ resultData = self.s.post(url, data=data, headers=self.post_headers).json()
|
|
|
+ code = resultData.get("code", 0)
|
|
|
+ message = resultData.get("message", "")
|
|
|
+ if code != 0:
|
|
|
+ raise UnicornException(message)
|
|
|
+ generate_ids = resultData.get("data", None).get("generate_ids", [])
|
|
|
+ if len(generate_ids) == 0:
|
|
|
+ raise UnicornException("模特图生成失败")
|
|
|
+ generate_id = generate_ids[0]
|
|
|
+ search_times = 60
|
|
|
+ status = 0
|
|
|
+ result_image = None
|
|
|
+ print("generate_id", generate_id)
|
|
|
+ while search_times > 0:
|
|
|
+ print(f"查询第{search_times}次")
|
|
|
+ status, result_image = self.searchProgress(generate_id)
|
|
|
+ if status in [-1, 2]:
|
|
|
+ break
|
|
|
+ time.sleep(1)
|
|
|
+ search_times -= 1
|
|
|
+ if not result_image:
|
|
|
+ raise UnicornException("模特图生成失败")
|
|
|
+ save_image_path = download_image_with_pil(result_image, save_path)
|
|
|
+ print("上脚图save_image_path",result_image, save_image_path)
|
|
|
+ return save_image_path
|
|
|
+
|
|
|
+
|
|
|
class OnlineDataRequest(object):
|
|
|
def __init__(self, token):
|
|
|
self.s = requests.session()
|
|
|
@@ -318,12 +499,12 @@ class OnlineDataRequest(object):
|
|
|
return resultData["data"]["url"]
|
|
|
|
|
|
def upload_goods_api(self, params):
|
|
|
- '''上传商品api'''
|
|
|
+ """上传商品api"""
|
|
|
post_headers = {
|
|
|
"Authorization": self.token,
|
|
|
"Content-Type": "application/json",
|
|
|
}
|
|
|
- url = settings.DOMAIN+"/api/ai_image/camera_machine/publish_goods"
|
|
|
+ url = settings.DOMAIN + "/api/ai_image/camera_machine/publish_goods"
|
|
|
postData = json.dumps(params)
|
|
|
print("上传商品api==>url", url)
|
|
|
print("上传第三方数据打印", params)
|
|
|
@@ -551,6 +732,8 @@ class GetOnlineDataHLM(OnlineDataRequest):
|
|
|
goods_number_data[data["goods_art_no"]]["颜色名称"] = data["color"]
|
|
|
goods_number_data[data["goods_art_no"]]["商品标题"] = data["goods_title"]
|
|
|
goods_number_data[data["goods_art_no"]]["商品价格"] = data["retail_price"]
|
|
|
+ goods_number_data[data["goods_art_no"]]["性别"] = data["gender"]
|
|
|
+ goods_number_data[data["goods_art_no"]]["token"] = self.token
|
|
|
|
|
|
return goods_number_data
|
|
|
|