import base64
import requests
import settings
import json, asyncio
import numpy as np
from utils.common import message_queue
from middleware import UnicornException
from PIL import Image
import io, time
from logger import logger
# logger = logging.getLogger(__name__)
class JsonEncoder(json.JSONEncoder):
"""Convert numpy classes to JSON serializable objects."""
def default(self, obj):
if isinstance(obj, (np.integer, np.floating, np.bool_)):
return obj.item()
elif isinstance(obj, np.ndarray):
return obj.tolist()
else:
return super(JsonEncoder, self).default(obj)
def download_image_with_pil(url, save_path):
"""通过url保存图片"""
try:
# 发送请求获取图片数据
response = requests.get(url)
image_data = response.content
# 使用PIL处理下载的图片
image = Image.open(io.BytesIO(image_data))
image = image.convert("RGB")
# 根据当前时间生成文件名
# 保存图片
image.save(save_path)
return save_path
except Exception as e:
print("保存错误",e)
return "error"
class AIGCDataRequest(object):
def __init__(self, token):
self.s = requests.session()
self.token = token
self.post_headers = {
"Authorization": token,
}
def uploadImage(self, local_path: str) -> str:
post_headers = {"Authorization": self.token}
url = settings.DOMAIN + "/api/upload"
resultData = self.s.post(
url, files={"file": open(local_path, "rb")}, headers=post_headers
).json()
return resultData["data"]["url"]
def center_paste_image(
self, source_image_path, output_path, width=900, ratio=(3, 4)
):
"""
将一张PNG透明图片居中粘贴到指定尺寸和比例的背景上,保持透明度
Args:
source_image_path: 源PNG图片路径
output_path: 输出图片路径
width: 目标图片宽度
ratio: 目标图片比例 (宽:高)
"""
# 计算目标尺寸 (900 x 1200)
target_width = width
target_height = int(width * ratio[1] / ratio[0])
# 创建透明背景图片(RGBA模式)
background = Image.new(
"RGBA", (target_width, target_height), (255, 255, 255, 0)
)
# 打开源图片
source_img = Image.open(source_image_path)
# 确保源图片是RGBA模式以保持透明度
if source_img.mode != "RGBA":
source_img = source_img.convert("RGBA")
# 调整源图片大小以适应目标尺寸,保持原图比例
source_width, source_height = source_img.size
scale = min(target_width / source_width, target_height / source_height)
new_width = int(source_width * scale)
new_height = int(source_height * scale)
# 调整源图片尺寸
source_img_resized = source_img.resize(
(new_width, new_height), Image.Resampling.LANCZOS
)
# 计算居中粘贴位置
paste_x = (target_width - new_width) // 2
paste_y = (target_height - new_height) // 2
# 将调整后的源图片居中粘贴到背景上,保留透明度
background.paste(source_img_resized, (paste_x, paste_y), source_img_resized)
# 如果需要保存为PNG格式以保持透明度
if output_path.lower().endswith(".png"):
background.save(output_path, format="PNG")
else:
# 如果保存为JPG等不支持透明度的格式,转换为RGB并使用白色背景
background = background.convert("RGB")
background.save(output_path)
source_img.close()
return background
def generateProductScene(self, local_path, prompt, save_path):
imageUrl = self.uploadImage(local_path)
print("imageUrl", imageUrl)
data = {
"site": 1,
"base_image": imageUrl,
"keyword": prompt,
"model_type": 1,
"gemini_model": "gemini-2.5-flash-image-preview",
}
"""生成场景图"""
try:
url = settings.DOMAIN + "/api/ai_image/inspired/command_to_image"
resultData = self.s.post(url, data=data, headers=self.post_headers, timeout=80).json()
code = resultData.get("code", 0)
message = resultData.get("message", "")
if code != 0:
raise UnicornException(message)
image_arr = resultData.get("data", None).get("image", [])
if len(image_arr) == 0:
raise UnicornException("场景图生成失败")
image_url = image_arr[0]
save_image_path = download_image_with_pil(image_url, save_path)
return save_image_path
except:
raise UnicornException("场景图生成失败")
def generateProductSceneQW(self, local_path, prompt, save_path):
'''千问生成场景图'''
imageUrl = self.uploadImage(local_path)
data = {
"machine_type": 0, # 0鞋;1服装
"generate_type": 0, # 生成类型,这里指代得是场景图还是模特图;0场景图;1模特图
"base_image": imageUrl,
"prompt": prompt
}
"""生成场景图"""
url = settings.DOMAIN + "/api/ai_image/main/image_edit_generate"
resultData = self.s.post(url, data=data, headers=self.post_headers).json()
code = resultData.get("code", 0)
message = resultData.get("message", "")
if code != 0:
raise UnicornException(message)
image_url = resultData.get("data", None).get("image_url", '')
if image_url == "" or image_url is None:
raise UnicornException("场景图生成失败")
save_image_path = download_image_with_pil(image_url, save_path)
return save_image_path
def generateModelShoesQW(self, local_path, model_id, save_path):
'''千问生成场景图'''
imageUrl = self.uploadImage(local_path)
data = {
"machine_type": 0, # 0鞋;1服装
"generate_type": 1, # 生成类型,这里指代得是场景图还是模特图;0场景图;1模特图
"base_image": imageUrl,
"model_template_id": model_id,
}
"""生成场景图"""
url = settings.DOMAIN + "/api/ai_image/main/image_edit_generate"
resultData = self.s.post(url, data=data, headers=self.post_headers).json()
code = resultData.get("code", 0)
message = resultData.get("message", "")
if code != 0:
raise UnicornException(message)
image_url = resultData.get("data", None).get("image_url", '')
if image_url == "" or image_url is None:
raise UnicornException("模特图生成失败")
save_image_path = download_image_with_pil(image_url, save_path)
return save_image_path
def searchProgress(self, id):
"""查询进度"""
try:
url = settings.DOMAIN + "/api/ai_image/main/search_bacth_progress"
data = {"site": 1, "generate_ids": [id], "type": "aigc_pro"}
resultData = self.s.post(url, json=data, headers=self.post_headers, timeout=10)
resultData = resultData.json()
code = resultData.get("code", 0)
message = resultData.get("message", "")
if code != 0:
raise UnicornException(message)
data_result = resultData.get("data", [])
print("查询进度", data_result)
logger.info(f"查询进度:{data_result}")
if len(data_result) == 0:
return -1, None
data_item = data_result[0]
status = data_item.get("status", -1)
result_image = None
# 只有在完成状态下才返回图片URL
if status == 2: # 完成
result_image_urls = data_item.get("result_image_urls", [])
result_image_urls = [] if result_image_urls is None else result_image_urls
result_image = result_image_urls[0] if len(result_image_urls) > 0 else None
return status, result_image
except requests.Timeout:
print("查询进度超时")
return -1, None
except Exception as e:
print(f"查询进度异常: {e}")
return -1, None
def generateUpperShoes(self, local_path, model_id, save_path):
"""生成上脚图"""
print("生成上脚图", local_path, model_id, save_path)
imageUrl = self.uploadImage(local_path)
data = {
"site": 1,
"model_template_id": model_id,
"base_image": imageUrl,
"creatClass": "鞋子上脚图-鞋子上脚图",
"creatTimer": 40,
"pname": "OnFeetImage",
}
"""生成上脚图"""
url = settings.DOMAIN + "/api/ai_image/main/upper_footer"
resultData = self.s.post(url, data=data, headers=self.post_headers).json()
code = resultData.get("code", 0)
message = resultData.get("message", "")
if code != 0:
raise UnicornException(message)
generate_ids = resultData.get("data", None).get("generate_ids", [])
if len(generate_ids) == 0:
raise UnicornException("模特图生成失败")
generate_id = generate_ids[0]
search_times = 80
status = 0
result_image = None
print("generate_id", generate_id)
while search_times > 0:
print(f"模特图查询第{search_times}次")
logger.info(f"模特图查询第{search_times}次")
status, result_image = self.searchProgress(generate_id)
# status: -1=失败, 0=排队中, 1=进行中, 2=完成
if status == 2: # 完成
break
if status == -1: # 失败
break
# status为0(排队中)或1(进行中)时继续查询
time.sleep(1)
search_times -= 1
# 循环结束后检查最终状态
if status == -1 or (status != 2 and search_times <= 0):
raise UnicornException("模特图生成失败")
save_image_path = download_image_with_pil(result_image, save_path)
print("上脚图save_image_path",result_image, save_image_path)
return save_image_path
def generateModelFitting(self, local_path, model_id, face_type, save_path):
"""生成上脚图"""
imageUrl = self.uploadImage(local_path)
data = {
"site": 1,
"scene_key": model_id,
"topsImg": imageUrl,
"generate_type": 1,
"face_type": face_type,
"pname": "ModelFitting_clothing",
}
print("生成上脚图", local_path, model_id, save_path)
print("生成上脚图==>data", data)
"""生成上脚图"""
url = settings.DOMAIN + "/api/ai_image/clothing/model_fitting_vk"
resultData = self.s.post(url, data=data, headers=self.post_headers)
# print("模特图resultData", resultData.content)
resultData = resultData.json()
code = resultData.get("code", 0)
message = resultData.get("message", "")
if code != 0:
raise UnicornException(message)
generate_ids = resultData.get("data", None).get("generate_ids", [])
if len(generate_ids) == 0:
raise UnicornException("模特图生成失败")
generate_id = generate_ids[0]
search_times = 80
status = 0
result_image = None
while search_times > 0:
print(f"查询第{search_times}次")
status, result_image = self.searchProgress(generate_id)
# status: -1=失败, 0=排队中, 1=进行中, 2=完成
if status == 2: # 完成
break
if status == -1: # 失败
break
# status为0(排队中)或1(进行中)时继续查询
time.sleep(1)
search_times -= 1
# 循环结束后检查最终状态
if status == -1 or (status != 2 and search_times <= 0):
raise UnicornException("模特图生成失败")
save_image_path = download_image_with_pil(result_image, save_path)
print("上脚图save_image_path",result_image, save_image_path)
return save_image_path
def generateProductSceneQW(self, local_path, prompt, save_path):
"""千问生成场景图"""
imageUrl = self.uploadImage(local_path)
data = {
"machine_type": 1, # 0鞋;1服装
"generate_type": 0, # 生成类型,这里指代得是场景图还是模特图;0场景图;1模特图
"base_image": imageUrl,
"prompt": prompt,
}
"""生成场景图"""
url = settings.DOMAIN + "/api/ai_image/main/image_edit_generate"
resultData = self.s.post(url, data=data, headers=self.post_headers).json()
code = resultData.get("code", 0)
message = resultData.get("message", "")
if code != 0:
raise UnicornException(message)
image_url = resultData.get("data", None).get("image_url", "")
if image_url == "" or image_url is None:
raise UnicornException("场景图生成失败")
save_image_path = download_image_with_pil(image_url, save_path)
return save_image_path
def generateModelclothingQW(self, local_path, model_id, save_path):
"""千问生成场景图"""
imageUrl = self.uploadImage(local_path)
data = {
"machine_type": 1, # 0鞋;1服装
"generate_type": 1, # 生成类型,这里指代得是场景图还是模特图;0场景图;1模特图
"base_image": imageUrl,
"model_template_id": model_id,
}
"""生成场景图"""
url = settings.DOMAIN + "/api/ai_image/main/image_edit_generate"
resultData = self.s.post(url, data=data, headers=self.post_headers).json()
code = resultData.get("code", 0)
message = resultData.get("message", "")
if code != 0:
raise UnicornException(message)
image_url = resultData.get("data", None).get("image_url", "")
if image_url == "" or image_url is None:
raise UnicornException("模特图生成失败")
save_image_path = download_image_with_pil(image_url, save_path)
return save_image_path
class OnlineDataRequest(object):
def __init__(self, token):
self.s = requests.session()
self.token = token
self.post_headers = {
"Authorization": token,
# "Origin": settings.Headers["Origin"],
# "Host": settings.Headers["Host"],
"Content-Length": "0",
"Content-Type": "application/json",
"Accept": "application/json",
}
print("28 Authorization:", self.post_headers["Authorization"])
def refresh_headers(self, token):
self.post_headers = {
"Authorization": token,
# "Origin": settings.Headers["Origin"],
# "Host": settings.Headers["Host"],
"Content-Length": "0",
"Content-Type": "application/json",
"Accept": "application/json",
}
def auth_user(self):
# 用户登录
url = "{domain}/api/auth/user".format(domain=settings.DOMAIN)
s = requests.session()
_s = s.get(url=url, headers=settings.Headers)
response_data = _s.json()
return response_data
def logout(self):
url = "{domain}/api/auth/logout".format(domain=settings.DOMAIN)
s = requests.session()
_s = s.post(url=url, headers=settings.Headers)
def get_change_bar_code(self, code):
url = "{domain}/api/hct/open/sting_search_goods?string={code}".format(
domain=settings.DOMAIN, code=code
)
try:
s = requests.get(url)
goods_art_no = s.json()["data"]["goods_art_no"]
return goods_art_no
except BaseException as e:
print(e)
return
def get_goods_art_no_info(self, numbers_list=None, goods_art_list=None, token=None):
# 获取商品基础信息,入参为商品的编号
url = "{domain}/api/backend/produce/goods/info".format(domain=settings.DOMAIN)
print("执行 get_goods_art_no_info ", url)
if numbers_list:
data = {"numbers": numbers_list}
print("请求编码:", numbers_list)
else:
data = {"goods_art_nos": goods_art_list}
print("请求货号:", goods_art_list)
print("请求货号=====>", self.token)
print("执行 get_goods_art_no_info----------------", data)
post_headers = {
"Authorization": token,
# "Origin": settings.Headers["Origin"],
# "Host": settings.Headers["Host"],
"Content-Length": "",
"Content-Type": "application/json",
"Accept": "application/json",
}
data = json.dumps(data)
post_headers["Content-Length"] = str(len(data))
_s = self.s.post(url=url, data=data, headers=post_headers)
# _s = self.s.get(url=url, params=params, headers=settings.Headers)
response_data = _s.json()
# print(response_data)
# print("\n")
goods_number_data = {}
# ["", "", "", "", "", "", "", "", "", "", "", ]
if "data" not in response_data:
return {}
for data in response_data["data"]:
if numbers_list:
number = data["number"]
else:
number = data["goods_art_no"].upper()
goods_number_data[number] = {}
goods_number_data[number]["商品面料"] = data["fabric"]
goods_number_data[number]["商品内里"] = data["lining"]
goods_number_data[number]["商品鞋底"] = data["sole"]
goods_number_data[number]["后帮高"] = data["back_height"]
goods_number_data[number]["前掌宽"] = data["forefoot_width"]
goods_number_data[number]["鞋跟高"] = data["heel_height"]
goods_number_data[number]["FAB介绍"] = data["fab_info"]
goods_number_data[number]["编号"] = data["number"]
goods_number_data[number]["商品货号"] = data["goods_art_no"].upper()
goods_number_data[number]["款号"] = data["goods_number"].upper()
goods_number_data[number]["颜色名称"] = data["color"]
goods_number_data[number]["所属企划"] = data["projects"][0]
goods_number_data[number]["设计方名称"] = data["purchasing_unit"]
goods_number_data[number]["供应商"] = data["supplier_name"]
goods_number_data[number]["供应商编码"] = data["supplier_code"].lstrip("0")
goods_number_data[number]["供应商货号"] = data["supplier_goods_artno"]
goods_number_data[number]["销售工厂"] = data["sales_factory_name"]
goods_number_data[number]["销售组织"] = data["man_org_name"]
goods_number_data[number]["是否SAP"] = data["source"]
goods_number_data[number]["OEM报价"] = data["oem_price"]
goods_number_data[number]["出厂价"] = data["ex_factory_price"]
goods_number_data[number]["首单货期"] = data["earliest_delivery_date"]
goods_number_data[number]["包装"] = data["package_specification"]
goods_number_data[number]["创建日期"] = data["created_at"]
goods_number_data[number]["货号图"] = data["image"]
return goods_number_data
def get_on_goods_all_art(self, number):
# 获取商品基础信息,入参为商品的编号
url = "{domain}/api/backend/produce/goods/query/numbers?number={number}".format(
domain=settings.DOMAIN, number=number
)
_s = self.s.get(url=url, headers=self.post_headers)
response_data = _s.json()
print(number, response_data)
"""
14250230 {'data': {'goods_number': 'AC5200117', 'brother_goods_arts': [{'number': '14250232', 'goods_art_no': 'AC52001173', 'color': '杏色'}, {'number': '14250231', 'goods_art_no': 'AC52001172', 'color': '灰色'}, {'number': '14250230', 'goods_art_no': 'AC52001171', 'color': '黑色'}]}, 'code': 0, 'message': 'success'}
"""
return (
response_data["data"]["goods_number"],
response_data["data"]["brother_goods_arts"],
response_data["data"]["goods_art_no"],
)
def get_views(self, image_url):
url = "http://{}/shoes_category".format(settings.VIEW_DEAL_DOMAIN)
data = {
"train_path": "./datasets/Shoes_Dataset/Train/angle",
"model_filename": "./models/0320/output0320.pth",
"validate_path": image_url,
}
_s = requests.post(
url=url,
data=json.dumps(data),
)
response_data = _s.json()
return response_data["classify_result"]
def uploadImage(self, local_path: str) -> str:
post_headers = {"Authorization": self.token}
url = settings.DOMAIN + "/api/upload"
resultData = self.s.post(
url, files={"file": open(local_path, "rb")}, headers=post_headers
).json()
return resultData["data"]["url"]
def get_current_menu(self):
def get_menu(_menu_dict, _data):
for menu in _data:
_menu_dict[menu["key"]] = {}
for mods in menu["mods_arr"]:
_menu_dict[menu["key"]][mods["key"]] = mods["name"]
if "_child" in menu:
get_menu(_menu_dict, menu["_child"])
return _menu_dict
url = "{domain}/api/backend/basic/get_current_menu".format(
domain=settings.DOMAIN,
)
_s = self.s.get(url=url, headers=settings.Headers)
response_data = _s.json()
try:
menu_data = response_data["data"]["pc_menu"]
menu_dict = {}
menu_dict = get_menu(menu_dict, menu_data)
except:
menu_dict = {}
# print(json.dumps(menu_dict,ensure_ascii=False))
# raise 1
return menu_dict
# 获取所有资源的配置
def get_resource_config(self):
url = "{domain}/api/openai/query_client_addons".format(domain=settings.DOMAIN)
_s = self.s.get(
url=url, headers=self.post_headers, params={"type": "client_camera"}
)
response_data = _s.json()
return response_data
# 拍照日志上报
def add_auto_photo_logs(self, data):
url = "{domain}/api/openai/add_auto_photo_logs".format(domain=settings.DOMAIN)
post_data = {
"goods_no": data["goods_art_no"],
"take_photo_created_at": data["take_photo_created_at"],
"photo_created_at": data["photo_create_time"],
"image_dispose_mode": data["image_deal_mode"],
"photo_serial_number": data["image_index"],
}
post_data = json.dumps(post_data)
_s = self.s.post(url=url, headers=self.post_headers, data=post_data)
response_data = _s.json()
if settings.IS_TEST:
print("209-----拍照日志上报 add_auto_photo_logs")
print(response_data)
return response_data
def upload_pic_list_data(self, data):
url = "{domain}/api/backend/goods/save/images".format(domain=settings.DOMAIN)
data = json.dumps(data)
self.post_headers["Content-Length"] = str(len(data))
_s = self.s.post(url=url, data=data, headers=self.post_headers)
response_data = _s.json()
try:
if response_data["code"] == 0 and response_data["message"] == "success":
return True
else:
print(data)
print(response_data)
return False
except BaseException as e:
print(data)
print(e)
print(response_data)
return False
def upload_pic(self, goods_data):
# 检查货号图是否存在
url = "{domain}/api/backend/upload".format(domain=settings.DOMAIN)
# print(url)
headers = {
"Authorization": settings.Headers["Authorization"],
"User-Agent": settings.Headers["User-Agent"],
"Origin": settings.Headers["Origin"],
"Host": settings.Headers["Host"],
}
files = [
(
"file",
(
goods_data["file_path"],
goods_data["image_io"],
"image/{}".format(goods_data["e"]),
),
)
]
_s = requests.post(url=url, headers=headers, files=files)
response_data = _s.json()
return response_data["data"]["url"]
# 查询是否已有详情图
def check_detail_image(self, goods_art_no, token):
url = "{domain}/api/backend/goods/check_detail_image?number={number}".format(
domain=settings.DOMAIN, number=goods_art_no
)
_s = self.s.get(url=url, headers=self.post_headers)
response_data = _s.json()
# print(response_data)
return response_data["data"]["hasDetailImage"]
# 调用API识别是否是拖鞋
def yolo_shoes_category(self, image_url):
url = "{domain}/api/ai_image/main/yolo_shoes_category".format(
domain=settings.DOMAIN
)
post_data = {
"image_url": image_url,
}
post_data = json.dumps(post_data)
_s = self.s.post(url=url, headers=self.post_headers, data=post_data)
response_data = _s.json()
if settings.IS_TEST:
print("278-----yolo_shoes_category")
print(response_data)
r_data = None
try:
r_data = response_data["data"]["category"]
except BaseException as e:
print("285", e)
return r_data
# 图片上传by IO
def upload_image_by_io(self, image_io) -> str:
post_headers = {"Authorization": settings.Authorization}
url = settings.DOMAIN + "/api/upload"
resultData = self.s.post(
url, files={"file": image_io}, headers=post_headers
).json()
return resultData["data"]["url"]
def upload_goods_api(self, params):
"""上传商品api"""
post_headers = {
"Authorization": self.token,
"Content-Type": "application/json",
}
url = settings.DOMAIN + "/api/ai_image/camera_machine/publish_goods"
postData = json.dumps(params)
# print("上传商品api==>url", url)
# print("上传第三方数据打印", params)
resultData = self.s.post(url, data=postData, headers=post_headers).json()
print("上传商品api==>resultData", resultData)
return resultData
def sendSocketMessage(self, code=0, msg="", data=None, device_status=2,msg_type="upload_goods_progress"):
data = {
"code": code,
"msg": msg,
"status": device_status,
"data": data,
"msg_type": msg_type,
}
loop = asyncio.get_event_loop()
loop.create_task(message_queue.put(data))
def uploadGoods2ThirdParty(self, goods_no_dict=None, online_stores=[]):
params = []
message_type = "upload_goods_progress"
if goods_no_dict == None:
return
success_goods_arts = []
for store in online_stores:
for goods_no in goods_no_dict.keys():
goods_data = goods_no_dict[goods_no]
detail_path = goods_data.get("detail_path", "")
if detail_path == "":
continue
goods_title = goods_data.get("商品标题", "")
if goods_title == "":
continue
goods_price = goods_data.get("商品价格", 0)
if goods_price == '':
goods_price = 0
if goods_price == 0:
continue
skuList = []
itemImageInfoList = []
itemSkuImageList = []
sku_list_basic = goods_data.get("货号资料", [])
quantity = 9999
skuPropValueList = []
for skuIdx, sku_data in enumerate(sku_list_basic):
sku_goods_art_no = sku_data.get("货号", "")
color_name = sku_data.get("颜色名称", "")
size = sku_data.get("尺码", "")
# 尺码
mainImages = sku_data.get("800x800", [])
if not mainImages:
continue
success_goods_arts.append(sku_goods_art_no)
mainImagePath = mainImages[0]
imageUrl = self.uploadImage(local_path=mainImagePath)
skuItemData = {
"skuNo": sku_goods_art_no,
"originalPrice": float(goods_price),
"newSkuWeight": int(1),
"skuMainImageUrl": str(imageUrl),
"skuName": f"颜色:{color_name};尺码:{size}",
"sellingPrice": float(goods_price),
"quantity": int(quantity),
"showOrder": int(skuIdx + 1),
}
skuList.append(skuItemData)
itemImage = {
"imageUrl": str(imageUrl),
"imageType": 0,
"imageItem": int(skuIdx),
"imageIndex": 10,
}
itemImageInfoList.append(itemImage)
imageJson = {
"imageUrl": str(imageUrl),
"imageType": 1,
"showOrder": 1,
}
skuPropValueList.append(
{
"imageJson": imageJson,
"propValue": str(color_name),
"showOrder": 1,
}
)
itemSkuImageList.append(
{
"propName": "颜色",
"isImageProp": 1,
"propShowOrder": 1,
"skuPropValueList": skuPropValueList,
}
)
detailImageUrl = self.uploadImage(local_path=detail_path)
category_info = "流行男鞋>>休闲鞋>>时尚休闲鞋"
itemData = {
"catePathName": category_info, # 分类
"itemName": str(goods_title), # 商品标题
"itemNo": str(goods_no),
"brandName": store, # 品牌名称
"sellingPrice": float(goods_price), # 售价(未划线价)
"originalPrice": float(goods_price), # 划线价
"quantity": int(quantity), # 库存数量
"propInfoList": [
{"propName": "品牌", "propIndex": 2, "propValue": "Vali"},
{
"propName": "平台类目",
"propIndex": 3,
"propValue": category_info,
},
],
"skuList": skuList,
"itemImageInfoList": itemImageInfoList,
"itemSkuImageList": itemSkuImageList,
"wapDescription": f'
',
"pcDescription": f'
',
}
params.append(itemData)
json_params = str(params) # 直接转换为字符串表示
print("json_params", json_params)
# 使用base64编码
encoded = base64.b64encode(json_params.encode("utf-8")).decode("utf-8")
self.upload_goods_api({"bizcontent": encoded, "online_stores": online_stores})
print("商品上传第三方成功")
return True
class GetOnlineDataHLM(OnlineDataRequest):
def __init__(self, token):
super().__init__(token)
self.token = token
def upload_pic(self, goods_data, token):
# 检查货号图是否存在
url = "{domain}/api/backend/upload".format(domain=settings.DOMAIN)
# print(url)
headers = {
"Authorization": self.token,
# 'User-Agent': settings.Headers["User-Agent"],
# 'Origin': settings.Headers["Origin"],
# 'Host': settings.Headers["Host"],
}
files = [
(
"file",
(
goods_data["file_path"],
goods_data["image_io"],
"image/{}".format(goods_data["e"]),
),
)
]
_s = requests.post(url=url, headers=headers, files=files)
response_data = _s.json()
return response_data["data"]["url"]
def upload_pic_list_data(self, data, token):
url = "{domain}/api/backend/goods/save/images".format(domain=settings.DOMAIN)
data = json.dumps(data)
self.post_headers["Content-Length"] = str(len(data))
_s = self.s.post(url=url, data=data, headers=self.post_headers)
response_data = _s.json()
try:
if response_data["code"] == 0 and response_data["message"] == "success":
return True
else:
print(response_data)
return False
except BaseException as e:
print(e)
print(response_data)
return False
def get_goods_art_no_info(self, numbers_list=None, goods_art_list=None, token=None):
# 获取商品基础信息,入参为商品的编号
url = "{domain}/api/backend/goods_client/goods_query".format(
domain=settings.DOMAIN
)
data = {"goods_art_list": goods_art_list}
print("url:", url)
print("请求货号:", goods_art_list)
post_headers = {
"Authorization": token,
# "Origin": settings.Headers["Origin"],
# "Host": settings.Headers["Host"],
"Content-Length": "",
"Content-Type": "application/json",
"Accept": "application/json",
}
data = json.dumps(data)
print(post_headers)
print(data)
# post_headers["Content-Length"] = str(len(data))
_s = self.s.post(url=url, data=data, headers=post_headers)
# _s = self.s.get(url=url, params=params, headers=settings.Headers)
response_data = _s.json()
print(response_data)
print("\n")
goods_number_data = {}
# ["", "", "", "", "", "", "", "", "", "", "", ]
if "data" not in response_data:
return {}
for data in response_data["data"]:
goods_art_no = str(data["goods_art_no"])
goods_number_data[goods_art_no] = {}
goods_number_data[goods_art_no]["商品货号"] = data[
"goods_art_no"
].upper()
goods_number_data[goods_art_no]["款号"] = data[
"goods_number"
].upper()
goods_number_data[goods_art_no]["商品面料"] = data["fabric"]
goods_number_data[goods_art_no]["商品内里"] = data["lining"]
goods_number_data[goods_art_no]["商品鞋底"] = data["sole"]
goods_number_data[goods_art_no]["鞋垫"] = data["insole"]
goods_number_data[goods_art_no]["颜色名称"] = data["color"]
goods_number_data[goods_art_no]["商品标题"] = data["goods_title"]
goods_number_data[goods_art_no]["商品价格"] = data["retail_price"]
goods_number_data[goods_art_no]["尺码"] = data["size"]
goods_number_data[goods_art_no]["性别"] = data["gender"]
goods_number_data[goods_art_no]["token"] = self.token
print("货号数据:", goods_number_data)
return goods_number_data
def uploadImage(self, local_path: str) -> str:
post_headers = {"Authorization": settings.Authorization}
url = settings.DOMAIN + "/api/upload"
resultData = self.s.post(
url, files={"file": open(local_path, "rb")}, headers=post_headers
).json()
return resultData["data"]["url"]
# ============pixian抠图处理==========================
def dispose_point(self, _type):
# 扣分 sub;add为增加分数,每次操作一分
url = "{domain}/api/ai_image/client/dispose_point".format(
domain=settings.DOMAIN
)
data = {"type": _type}
_s = self.s.post(
url=url, headers=self.post_headers, data=json.dumps(data), timeout=10
)
response_data = _s.json()
return response_data
def send_message(self, text):
# 发送钉钉消息
url = "{domain}/api/ai_image/client/send_message".format(domain=settings.DOMAIN)
data = {"message": text}
_s = self.s.post(
url=url, headers=self.post_headers, data=json.dumps(data), timeout=10
)
response_data = _s.json()
return response_data
def get_cutout_image_times(self):
# 获取抠图剩余次数
url = "{domain}/api/ai_image/client/search_company_balance".format(
domain=settings.DOMAIN
)
_s = self.s.post(url=url, headers=self.post_headers, timeout=10)
response_data = _s.json()
if "data" not in response_data:
return False
else:
return response_data["data"]
def get_key_secret(self):
# 获取抠图剩余次数
url = "{domain}/api/ai_image/client/get_key_serect".format(
domain=settings.DOMAIN
)
_s = self.s.post(url=url, headers=self.post_headers, timeout=10)
response_data = _s.json()
return response_data["data"]