||
- import base64
- import requests
- import settings
- import json, asyncio
- import numpy as np
- from utils.common import message_queue
- from middleware import UnicornException
- from PIL import Image
- import io, time
- from logger import logger
- # logger = logging.getLogger(__name__)
- class JsonEncoder(json.JSONEncoder):
- """Convert numpy classes to JSON serializable objects."""
- def default(self, obj):
- if isinstance(obj, (np.integer, np.floating, np.bool_)):
- return obj.item()
- elif isinstance(obj, np.ndarray):
- return obj.tolist()
- else:
- return super(JsonEncoder, self).default(obj)
- def download_image_with_pil(url, save_path):
- """通过url保存图片"""
- try:
- # 发送请求获取图片数据
- response = requests.get(url)
- image_data = response.content
- # 使用PIL处理下载的图片
- image = Image.open(io.BytesIO(image_data))
- image = image.convert("RGB")
- # 根据当前时间生成文件名
- # 保存图片
- image.save(save_path)
- return save_path
- except Exception as e:
- print("保存错误",e)
- return "error"
- class AIGCDataRequest(object):
- def __init__(self, token):
- self.s = requests.session()
- self.token = token
- self.post_headers = {
- "Authorization": token,
- }
- def uploadImage(self, local_path: str) -> str:
- post_headers = {"Authorization": self.token}
- url = settings.DOMAIN + "/api/upload"
- resultData = self.s.post(
- url, files={"file": open(local_path, "rb")}, headers=post_headers
- ).json()
- return resultData["data"]["url"]
- def center_paste_image(
- self, source_image_path, output_path, width=900, ratio=(3, 4)
- ):
- """
- 将一张PNG透明图片居中粘贴到指定尺寸和比例的背景上,保持透明度
- Args:
- source_image_path: 源PNG图片路径
- output_path: 输出图片路径
- width: 目标图片宽度
- ratio: 目标图片比例 (宽:高)
- """
- # 计算目标尺寸 (900 x 1200)
- target_width = width
- target_height = int(width * ratio[1] / ratio[0])
- # 创建透明背景图片(RGBA模式)
- background = Image.new(
- "RGBA", (target_width, target_height), (255, 255, 255, 0)
- )
- # 打开源图片
- source_img = Image.open(source_image_path)
- # 确保源图片是RGBA模式以保持透明度
- if source_img.mode != "RGBA":
- source_img = source_img.convert("RGBA")
- # 调整源图片大小以适应目标尺寸,保持原图比例
- source_width, source_height = source_img.size
- scale = min(target_width / source_width, target_height / source_height)
- new_width = int(source_width * scale)
- new_height = int(source_height * scale)
- # 调整源图片尺寸
- source_img_resized = source_img.resize(
- (new_width, new_height), Image.Resampling.LANCZOS
- )
- # 计算居中粘贴位置
- paste_x = (target_width - new_width) // 2
- paste_y = (target_height - new_height) // 2
- # 将调整后的源图片居中粘贴到背景上,保留透明度
- background.paste(source_img_resized, (paste_x, paste_y), source_img_resized)
- # 如果需要保存为PNG格式以保持透明度
- if output_path.lower().endswith(".png"):
- background.save(output_path, format="PNG")
- else:
- # 如果保存为JPG等不支持透明度的格式,转换为RGB并使用白色背景
- background = background.convert("RGB")
- background.save(output_path)
- source_img.close()
- return background
- def generateProductScene(self, local_path, prompt, save_path):
- imageUrl = self.uploadImage(local_path)
- print("imageUrl", imageUrl)
- data = {
- "site": 1,
- "base_image": imageUrl,
- "keyword": prompt,
- "model_type": 1,
- "gemini_model": "gemini-2.5-flash-image-preview",
- }
- """生成场景图"""
- try:
- url = settings.DOMAIN + "/api/ai_image/inspired/command_to_image"
- resultData = self.s.post(url, data=data, headers=self.post_headers, timeout=80).json()
- code = resultData.get("code", 0)
- message = resultData.get("message", "")
- if code != 0:
- raise UnicornException(message)
- image_arr = resultData.get("data", None).get("image", [])
- if len(image_arr) == 0:
- raise UnicornException("场景图生成失败")
- image_url = image_arr[0]
- save_image_path = download_image_with_pil(image_url, save_path)
- return save_image_path
- except:
- raise UnicornException("场景图生成失败")
- def generateProductSceneQW(self, local_path, prompt, save_path):
- '''千问生成场景图'''
- imageUrl = self.uploadImage(local_path)
- data = {
- "machine_type": 0, # 0鞋;1服装
- "generate_type": 0, # 生成类型,这里指代得是场景图还是模特图;0场景图;1模特图
- "base_image": imageUrl,
- "prompt": prompt
- }
- """生成场景图"""
- url = settings.DOMAIN + "/api/ai_image/main/image_edit_generate"
- resultData = self.s.post(url, data=data, headers=self.post_headers).json()
- code = resultData.get("code", 0)
- message = resultData.get("message", "")
- if code != 0:
- raise UnicornException(message)
- image_url = resultData.get("data", None).get("image_url", '')
- if image_url == "" or image_url is None:
- raise UnicornException("场景图生成失败")
- save_image_path = download_image_with_pil(image_url, save_path)
- return save_image_path
- def generateModelShoesQW(self, local_path, model_id, save_path):
- '''千问生成场景图'''
- imageUrl = self.uploadImage(local_path)
- data = {
- "machine_type": 0, # 0鞋;1服装
- "generate_type": 1, # 生成类型,这里指代得是场景图还是模特图;0场景图;1模特图
- "base_image": imageUrl,
- "model_template_id": model_id,
- }
- """生成场景图"""
- url = settings.DOMAIN + "/api/ai_image/main/image_edit_generate"
- resultData = self.s.post(url, data=data, headers=self.post_headers).json()
- code = resultData.get("code", 0)
- message = resultData.get("message", "")
- if code != 0:
- raise UnicornException(message)
- image_url = resultData.get("data", None).get("image_url", '')
- if image_url == "" or image_url is None:
- raise UnicornException("模特图生成失败")
- save_image_path = download_image_with_pil(image_url, save_path)
- return save_image_path
- def searchProgress(self, id):
- """查询进度"""
- try:
- url = settings.DOMAIN + "/api/ai_image/main/search_bacth_progress"
- data = {"site": 1, "generate_ids": [id], "type": "aigc_pro"}
- resultData = self.s.post(url, json=data, headers=self.post_headers, timeout=10)
- resultData = resultData.json()
- code = resultData.get("code", 0)
- message = resultData.get("message", "")
- if code != 0:
- raise UnicornException(message)
- data_result = resultData.get("data", [])
- print("查询进度", data_result)
- logger.info(f"查询进度:{data_result}")
- if len(data_result) == 0:
- return -1, None
- data_item = data_result[0]
- status = data_item.get("status", -1)
- result_image = None
-
- # 只有在完成状态下才返回图片URL
- if status == 2: # 完成
- result_image_urls = data_item.get("result_image_urls", [])
- result_image_urls = [] if result_image_urls is None else result_image_urls
- result_image = result_image_urls[0] if len(result_image_urls) > 0 else None
-
- return status, result_image
- except requests.Timeout:
- print("查询进度超时")
- return -1, None
- except Exception as e:
- print(f"查询进度异常: {e}")
- return -1, None
- def generateUpperShoes(self, local_path, model_id, save_path):
- """生成上脚图"""
- print("生成上脚图", local_path, model_id, save_path)
- imageUrl = self.uploadImage(local_path)
- data = {
- "site": 1,
- "model_template_id": model_id,
- "base_image": imageUrl,
- "creatClass": "鞋子上脚图-鞋子上脚图",
- "creatTimer": 40,
- "pname": "OnFeetImage",
- }
- """生成上脚图"""
- url = settings.DOMAIN + "/api/ai_image/main/upper_footer"
- resultData = self.s.post(url, data=data, headers=self.post_headers).json()
- code = resultData.get("code", 0)
- message = resultData.get("message", "")
- if code != 0:
- raise UnicornException(message)
- generate_ids = resultData.get("data", None).get("generate_ids", [])
- if len(generate_ids) == 0:
- raise UnicornException("模特图生成失败")
- generate_id = generate_ids[0]
- search_times = 80
- status = 0
- result_image = None
- print("generate_id", generate_id)
- while search_times > 0:
- print(f"模特图查询第{search_times}次")
- logger.info(f"模特图查询第{search_times}次")
- status, result_image = self.searchProgress(generate_id)
- # status: -1=失败, 0=排队中, 1=进行中, 2=完成
- if status == 2: # 完成
- break
- if status == -1: # 失败
- break
- # status为0(排队中)或1(进行中)时继续查询
- time.sleep(1)
- search_times -= 1
- # 循环结束后检查最终状态
- if status == -1 or (status != 2 and search_times <= 0):
- raise UnicornException("模特图生成失败")
- save_image_path = download_image_with_pil(result_image, save_path)
- print("上脚图save_image_path",result_image, save_image_path)
- return save_image_path
- def generateModelFitting(self, local_path, model_id, face_type, save_path):
- """生成上脚图"""
- imageUrl = self.uploadImage(local_path)
- data = {
- "site": 1,
- "scene_key": model_id,
- "topsImg": imageUrl,
- "generate_type": 1,
- "face_type": face_type,
- "pname": "ModelFitting_clothing",
- }
- print("生成上脚图", local_path, model_id, save_path)
- print("生成上脚图==>data", data)
- """生成上脚图"""
- url = settings.DOMAIN + "/api/ai_image/clothing/model_fitting_vk"
- resultData = self.s.post(url, data=data, headers=self.post_headers)
- # print("模特图resultData", resultData.content)
- resultData = resultData.json()
- code = resultData.get("code", 0)
- message = resultData.get("message", "")
- if code != 0:
- raise UnicornException(message)
- generate_ids = resultData.get("data", None).get("generate_ids", [])
- if len(generate_ids) == 0:
- raise UnicornException("模特图生成失败")
- generate_id = generate_ids[0]
- search_times = 80
- status = 0
- result_image = None
- while search_times > 0:
- print(f"查询第{search_times}次")
- status, result_image = self.searchProgress(generate_id)
- # status: -1=失败, 0=排队中, 1=进行中, 2=完成
- if status == 2: # 完成
- break
- if status == -1: # 失败
- break
- # status为0(排队中)或1(进行中)时继续查询
- time.sleep(1)
- search_times -= 1
- # 循环结束后检查最终状态
- if status == -1 or (status != 2 and search_times <= 0):
- raise UnicornException("模特图生成失败")
- save_image_path = download_image_with_pil(result_image, save_path)
- print("上脚图save_image_path",result_image, save_image_path)
- return save_image_path
- def generateProductSceneQW(self, local_path, prompt, save_path):
- """千问生成场景图"""
- imageUrl = self.uploadImage(local_path)
- data = {
- "machine_type": 1, # 0鞋;1服装
- "generate_type": 0, # 生成类型,这里指代得是场景图还是模特图;0场景图;1模特图
- "base_image": imageUrl,
- "prompt": prompt,
- }
- """生成场景图"""
- url = settings.DOMAIN + "/api/ai_image/main/image_edit_generate"
- resultData = self.s.post(url, data=data, headers=self.post_headers).json()
- code = resultData.get("code", 0)
- message = resultData.get("message", "")
- if code != 0:
- raise UnicornException(message)
- image_url = resultData.get("data", None).get("image_url", "")
- if image_url == "" or image_url is None:
- raise UnicornException("场景图生成失败")
- save_image_path = download_image_with_pil(image_url, save_path)
- return save_image_path
- def generateModelclothingQW(self, local_path, model_id, save_path):
- """千问生成场景图"""
- imageUrl = self.uploadImage(local_path)
- data = {
- "machine_type": 1, # 0鞋;1服装
- "generate_type": 1, # 生成类型,这里指代得是场景图还是模特图;0场景图;1模特图
- "base_image": imageUrl,
- "model_template_id": model_id,
- }
- """生成场景图"""
- url = settings.DOMAIN + "/api/ai_image/main/image_edit_generate"
- resultData = self.s.post(url, data=data, headers=self.post_headers).json()
- code = resultData.get("code", 0)
- message = resultData.get("message", "")
- if code != 0:
- raise UnicornException(message)
- image_url = resultData.get("data", None).get("image_url", "")
- if image_url == "" or image_url is None:
- raise UnicornException("模特图生成失败")
- save_image_path = download_image_with_pil(image_url, save_path)
- return save_image_path
- class OnlineDataRequest(object):
- def __init__(self, token):
- self.s = requests.session()
- self.token = token
- self.post_headers = {
- "Authorization": token,
- # "Origin": settings.Headers["Origin"],
- # "Host": settings.Headers["Host"],
- "Content-Length": "0",
- "Content-Type": "application/json",
- "Accept": "application/json",
- }
- print("28 Authorization:", self.post_headers["Authorization"])
- def refresh_headers(self, token):
- self.post_headers = {
- "Authorization": token,
- # "Origin": settings.Headers["Origin"],
- # "Host": settings.Headers["Host"],
- "Content-Length": "0",
- "Content-Type": "application/json",
- "Accept": "application/json",
- }
- def auth_user(self):
- # 用户登录
- url = "{domain}/api/auth/user".format(domain=settings.DOMAIN)
- s = requests.session()
- _s = s.get(url=url, headers=settings.Headers)
- response_data = _s.json()
- return response_data
- def logout(self):
- url = "{domain}/api/auth/logout".format(domain=settings.DOMAIN)
- s = requests.session()
- _s = s.post(url=url, headers=settings.Headers)
- def get_change_bar_code(self, code):
- url = "{domain}/api/hct/open/sting_search_goods?string={code}".format(
- domain=settings.DOMAIN, code=code
- )
- try:
- s = requests.get(url)
- goods_art_no = s.json()["data"]["goods_art_no"]
- return goods_art_no
- except BaseException as e:
- print(e)
- return
- def get_goods_art_no_info(self, numbers_list=None, goods_art_list=None, token=None):
- # 获取商品基础信息,入参为商品的编号
- url = "{domain}/api/backend/produce/goods/info".format(domain=settings.DOMAIN)
- print("执行 get_goods_art_no_info ", url)
- if numbers_list:
- data = {"numbers": numbers_list}
- print("请求编码:", numbers_list)
- else:
- data = {"goods_art_nos": goods_art_list}
- print("请求货号:", goods_art_list)
- print("请求货号=====>", self.token)
- print("执行 get_goods_art_no_info----------------", data)
- post_headers = {
- "Authorization": token,
- # "Origin": settings.Headers["Origin"],
- # "Host": settings.Headers["Host"],
- "Content-Length": "",
- "Content-Type": "application/json",
- "Accept": "application/json",
- }
- data = json.dumps(data)
- post_headers["Content-Length"] = str(len(data))
- _s = self.s.post(url=url, data=data, headers=post_headers)
- # _s = self.s.get(url=url, params=params, headers=settings.Headers)
- response_data = _s.json()
- # print(response_data)
- # print("\n")
- goods_number_data = {}
- # ["", "", "", "", "", "", "", "", "", "", "", ]
- if "data" not in response_data:
- return {}
- for data in response_data["data"]:
- if numbers_list:
- number = data["number"]
- else:
- number = data["goods_art_no"].upper()
- goods_number_data[number] = {}
- goods_number_data[number]["商品面料"] = data["fabric"]
- goods_number_data[number]["商品内里"] = data["lining"]
- goods_number_data[number]["商品鞋底"] = data["sole"]
- goods_number_data[number]["后帮高"] = data["back_height"]
- goods_number_data[number]["前掌宽"] = data["forefoot_width"]
- goods_number_data[number]["鞋跟高"] = data["heel_height"]
- goods_number_data[number]["FAB介绍"] = data["fab_info"]
- goods_number_data[number]["编号"] = data["number"]
- goods_number_data[number]["商品货号"] = data["goods_art_no"].upper()
- goods_number_data[number]["款号"] = data["goods_number"].upper()
- goods_number_data[number]["颜色名称"] = data["color"]
- goods_number_data[number]["所属企划"] = data["projects"][0]
- goods_number_data[number]["设计方名称"] = data["purchasing_unit"]
- goods_number_data[number]["供应商"] = data["supplier_name"]
- goods_number_data[number]["供应商编码"] = data["supplier_code"].lstrip("0")
- goods_number_data[number]["供应商货号"] = data["supplier_goods_artno"]
- goods_number_data[number]["销售工厂"] = data["sales_factory_name"]
- goods_number_data[number]["销售组织"] = data["man_org_name"]
- goods_number_data[number]["是否SAP"] = data["source"]
- goods_number_data[number]["OEM报价"] = data["oem_price"]
- goods_number_data[number]["出厂价"] = data["ex_factory_price"]
- goods_number_data[number]["首单货期"] = data["earliest_delivery_date"]
- goods_number_data[number]["包装"] = data["package_specification"]
- goods_number_data[number]["创建日期"] = data["created_at"]
- goods_number_data[number]["货号图"] = data["image"]
- return goods_number_data
- def get_on_goods_all_art(self, number):
- # 获取商品基础信息,入参为商品的编号
- url = "{domain}/api/backend/produce/goods/query/numbers?number={number}".format(
- domain=settings.DOMAIN, number=number
- )
- _s = self.s.get(url=url, headers=self.post_headers)
- response_data = _s.json()
- print(number, response_data)
- """
- 14250230 {'data': {'goods_number': 'AC5200117', 'brother_goods_arts': [{'number': '14250232', 'goods_art_no': 'AC52001173', 'color': '杏色'}, {'number': '14250231', 'goods_art_no': 'AC52001172', 'color': '灰色'}, {'number': '14250230', 'goods_art_no': 'AC52001171', 'color': '黑色'}]}, 'code': 0, 'message': 'success'}
- """
- return (
- response_data["data"]["goods_number"],
- response_data["data"]["brother_goods_arts"],
- response_data["data"]["goods_art_no"],
- )
- def get_views(self, image_url):
- url = "http://{}/shoes_category".format(settings.VIEW_DEAL_DOMAIN)
- data = {
- "train_path": "./datasets/Shoes_Dataset/Train/angle",
- "model_filename": "./models/0320/output0320.pth",
- "validate_path": image_url,
- }
- _s = requests.post(
- url=url,
- data=json.dumps(data),
- )
- response_data = _s.json()
- return response_data["classify_result"]
- def uploadImage(self, local_path: str) -> str:
- post_headers = {"Authorization": self.token}
- url = settings.DOMAIN + "/api/upload"
- resultData = self.s.post(
- url, files={"file": open(local_path, "rb")}, headers=post_headers
- ).json()
- return resultData["data"]["url"]
- def get_current_menu(self):
- def get_menu(_menu_dict, _data):
- for menu in _data:
- _menu_dict[menu["key"]] = {}
- for mods in menu["mods_arr"]:
- _menu_dict[menu["key"]][mods["key"]] = mods["name"]
- if "_child" in menu:
- get_menu(_menu_dict, menu["_child"])
- return _menu_dict
- url = "{domain}/api/backend/basic/get_current_menu".format(
- domain=settings.DOMAIN,
- )
- _s = self.s.get(url=url, headers=settings.Headers)
- response_data = _s.json()
- try:
- menu_data = response_data["data"]["pc_menu"]
- menu_dict = {}
- menu_dict = get_menu(menu_dict, menu_data)
- except:
- menu_dict = {}
- # print(json.dumps(menu_dict,ensure_ascii=False))
- # raise 1
- return menu_dict
- # 获取所有资源的配置
- def get_resource_config(self):
- url = "{domain}/api/openai/query_client_addons".format(domain=settings.DOMAIN)
- _s = self.s.get(
- url=url, headers=self.post_headers, params={"type": "client_camera"}
- )
- response_data = _s.json()
- return response_data
- # 拍照日志上报
- def add_auto_photo_logs(self, data):
- url = "{domain}/api/openai/add_auto_photo_logs".format(domain=settings.DOMAIN)
- post_data = {
- "goods_no": data["goods_art_no"],
- "take_photo_created_at": data["take_photo_created_at"],
- "photo_created_at": data["photo_create_time"],
- "image_dispose_mode": data["image_deal_mode"],
- "photo_serial_number": data["image_index"],
- }
- post_data = json.dumps(post_data)
- _s = self.s.post(url=url, headers=self.post_headers, data=post_data)
- response_data = _s.json()
- if settings.IS_TEST:
- print("209-----拍照日志上报 add_auto_photo_logs")
- print(response_data)
- return response_data
- def upload_pic_list_data(self, data):
- url = "{domain}/api/backend/goods/save/images".format(domain=settings.DOMAIN)
- data = json.dumps(data)
- self.post_headers["Content-Length"] = str(len(data))
- _s = self.s.post(url=url, data=data, headers=self.post_headers)
- response_data = _s.json()
- try:
- if response_data["code"] == 0 and response_data["message"] == "success":
- return True
- else:
- print(data)
- print(response_data)
- return False
- except BaseException as e:
- print(data)
- print(e)
- print(response_data)
- return False
- def upload_pic(self, goods_data):
- # 检查货号图是否存在
- url = "{domain}/api/backend/upload".format(domain=settings.DOMAIN)
- # print(url)
- headers = {
- "Authorization": settings.Headers["Authorization"],
- "User-Agent": settings.Headers["User-Agent"],
- "Origin": settings.Headers["Origin"],
- "Host": settings.Headers["Host"],
- }
- files = [
- (
- "file",
- (
- goods_data["file_path"],
- goods_data["image_io"],
- "image/{}".format(goods_data["e"]),
- ),
- )
- ]
- _s = requests.post(url=url, headers=headers, files=files)
- response_data = _s.json()
- return response_data["data"]["url"]
- # 查询是否已有详情图
- def check_detail_image(self, goods_art_no, token):
- url = "{domain}/api/backend/goods/check_detail_image?number={number}".format(
- domain=settings.DOMAIN, number=goods_art_no
- )
- _s = self.s.get(url=url, headers=self.post_headers)
- response_data = _s.json()
- # print(response_data)
- return response_data["data"]["hasDetailImage"]
- # 调用API识别是否是拖鞋
- def yolo_shoes_category(self, image_url):
- url = "{domain}/api/ai_image/main/yolo_shoes_category".format(
- domain=settings.DOMAIN
- )
- post_data = {
- "image_url": image_url,
- }
- post_data = json.dumps(post_data)
- _s = self.s.post(url=url, headers=self.post_headers, data=post_data)
- response_data = _s.json()
- if settings.IS_TEST:
- print("278-----yolo_shoes_category")
- print(response_data)
- r_data = None
- try:
- r_data = response_data["data"]["category"]
- except BaseException as e:
- print("285", e)
- return r_data
- # 图片上传by IO
- def upload_image_by_io(self, image_io) -> str:
- post_headers = {"Authorization": settings.Authorization}
- url = settings.DOMAIN + "/api/upload"
- resultData = self.s.post(
- url, files={"file": image_io}, headers=post_headers
- ).json()
- return resultData["data"]["url"]
- def upload_goods_api(self, params):
- """上传商品api"""
- post_headers = {
- "Authorization": self.token,
- "Content-Type": "application/json",
- }
- url = settings.DOMAIN + "/api/ai_image/camera_machine/publish_goods"
- postData = json.dumps(params)
- # print("上传商品api==>url", url)
- # print("上传第三方数据打印", params)
- resultData = self.s.post(url, data=postData, headers=post_headers).json()
- print("上传商品api==>resultData", resultData)
- return resultData
- def sendSocketMessage(self, code=0, msg="", data=None, device_status=2,msg_type="upload_goods_progress"):
- data = {
- "code": code,
- "msg": msg,
- "status": device_status,
- "data": data,
- "msg_type": msg_type,
- }
- loop = asyncio.get_event_loop()
- loop.create_task(message_queue.put(data))
- def uploadGoods2ThirdParty(self, goods_no_dict=None, online_stores=[]):
- params = []
- message_type = "upload_goods_progress"
- if goods_no_dict == None:
- return
- success_goods_arts = []
- for store in online_stores:
- for goods_no in goods_no_dict.keys():
- goods_data = goods_no_dict[goods_no]
- detail_path = goods_data.get("detail_path", "")
- if detail_path == "":
- continue
- goods_title = goods_data.get("商品标题", "")
- if goods_title == "":
- continue
- goods_price = goods_data.get("商品价格", 0)
- if goods_price == '':
- goods_price = 0
- if goods_price == 0:
- continue
- skuList = []
- itemImageInfoList = []
- itemSkuImageList = []
- sku_list_basic = goods_data.get("货号资料", [])
- quantity = 9999
- skuPropValueList = []
- for skuIdx, sku_data in enumerate(sku_list_basic):
- sku_goods_art_no = sku_data.get("货号", "")
- color_name = sku_data.get("颜色名称", "")
- size = sku_data.get("尺码", 37)
- # 尺码
- mainImages = sku_data.get("800x800", [])
- if not mainImages:
- continue
- success_goods_arts.append(sku_goods_art_no)
- mainImagePath = mainImages[0]
- imageUrl = self.uploadImage(local_path=mainImagePath)
- skuItemData = {
- "skuNo": sku_goods_art_no,
- "originalPrice": float(goods_price),
- "newSkuWeight": int(1),
- "skuMainImageUrl": str(imageUrl),
- "skuName": f"颜色:{color_name};尺寸:{size}",
- "sellingPrice": float(goods_price),
- "quantity": int(quantity),
- "showOrder": int(skuIdx + 1),
- }
- skuList.append(skuItemData)
- itemImage = {
- "imageUrl": str(imageUrl),
- "imageType": 0,
- "imageItem": int(skuIdx),
- "imageIndex": 10,
- }
- itemImageInfoList.append(itemImage)
- imageJson = {
- "imageUrl": str(imageUrl),
- "imageType": 1,
- "showOrder": 1,
- }
- skuPropValueList.append(
- {
- "imageJson": [imageJson],
- "propValue": str(color_name),
- "showOrder": 1,
- }
- )
- itemSkuImageList.append(
- {
- "propName": "颜色",
- "isImageProp": 1,
- "propShowOrder": 1,
- "skuPropValueList": skuPropValueList,
- }
- )
- itemSkuImageList.append({
- "propName": "尺寸",
- "value": None,
- "isImageProp": 0,
- "propShowOrder": 1,
- "showOrder": 0,
- "propValue": size,
- "skuPropValueList": [
- {"propValue": str(37),
- "showOrder": 1}
- ],
- })
- detailImageUrl = self.uploadImage(local_path=detail_path)
- category_info = "流行男鞋>>休闲鞋>>时尚休闲鞋"
- itemData = {
- "catePathName": category_info, # 分类
- "itemName": str(goods_title), # 商品标题
- "itemNo": str(goods_no),
- "brandName": store, # 品牌名称
- "sellingPrice": float(goods_price), # 售价(未划线价)
- "originalPrice": float(goods_price), # 划线价
- "quantity": int(quantity), # 库存数量
- "propInfoList": [
- {"propName": "品牌", "propIndex": 2, "propValue": "Vali"},
- {
- "propName": "平台类目",
- "propIndex": 3,
- "propValue": category_info,
- },
- ],
- "skuList": skuList,
- "itemImageInfoList": itemImageInfoList,
- "itemSkuImageList": itemSkuImageList,
- "wapDescription": f'<img src="{detailImageUrl}"/>',
- "pcDescription": f'<img src="{detailImageUrl}"/>',
- }
- params.append(itemData)
- json_params = str(params) # 直接转换为字符串表示
- print("json_params", json_params)
- # 使用base64编码
- encoded = base64.b64encode(json_params.encode("utf-8")).decode("utf-8")
- self.upload_goods_api({"bizcontent": encoded, "online_stores": online_stores})
- print("商品上传第三方成功")
- return True
- class GetOnlineDataHLM(OnlineDataRequest):
- def __init__(self, token):
- super().__init__(token)
- self.token = token
- def upload_pic(self, goods_data, token):
- # 检查货号图是否存在
- url = "{domain}/api/backend/upload".format(domain=settings.DOMAIN)
- # print(url)
- headers = {
- "Authorization": self.token,
- # 'User-Agent': settings.Headers["User-Agent"],
- # 'Origin': settings.Headers["Origin"],
- # 'Host': settings.Headers["Host"],
- }
- files = [
- (
- "file",
- (
- goods_data["file_path"],
- goods_data["image_io"],
- "image/{}".format(goods_data["e"]),
- ),
- )
- ]
- _s = requests.post(url=url, headers=headers, files=files)
- response_data = _s.json()
- return response_data["data"]["url"]
- def upload_pic_list_data(self, data, token):
- url = "{domain}/api/backend/goods/save/images".format(domain=settings.DOMAIN)
- data = json.dumps(data)
- self.post_headers["Content-Length"] = str(len(data))
- _s = self.s.post(url=url, data=data, headers=self.post_headers)
- response_data = _s.json()
- try:
- if response_data["code"] == 0 and response_data["message"] == "success":
- return True
- else:
- print(response_data)
- return False
- except BaseException as e:
- print(e)
- print(response_data)
- return False
- def get_goods_art_no_info(self, numbers_list=None, goods_art_list=None, token=None):
- # 获取商品基础信息,入参为商品的编号
- url = "{domain}/api/backend/goods_client/goods_query".format(
- domain=settings.DOMAIN
- )
- data = {"goods_art_list": goods_art_list}
- print("url:", url)
- print("请求货号:", goods_art_list)
- post_headers = {
- "Authorization": token,
- # "Origin": settings.Headers["Origin"],
- # "Host": settings.Headers["Host"],
- "Content-Length": "",
- "Content-Type": "application/json",
- "Accept": "application/json",
- }
- data = json.dumps(data)
- print(post_headers)
- print(data)
- # post_headers["Content-Length"] = str(len(data))
- _s = self.s.post(url=url, data=data, headers=post_headers)
- # _s = self.s.get(url=url, params=params, headers=settings.Headers)
- response_data = _s.json()
- print(response_data)
- print("\n")
- goods_number_data = {}
- # ["", "", "", "", "", "", "", "", "", "", "", ]
- if "data" not in response_data:
- return {}
- for data in response_data["data"]:
- goods_art_no = str(data["goods_art_no"])
- goods_number_data[goods_art_no] = {}
- goods_number_data[goods_art_no]["商品货号"] = data[
- "goods_art_no"
- ].upper()
- goods_number_data[goods_art_no]["款号"] = data[
- "goods_number"
- ].upper()
- goods_number_data[goods_art_no]["商品面料"] = data["fabric"]
- goods_number_data[goods_art_no]["商品内里"] = data["lining"]
- goods_number_data[goods_art_no]["商品鞋底"] = data["sole"]
- goods_number_data[goods_art_no]["鞋垫"] = data["insole"]
- goods_number_data[goods_art_no]["颜色名称"] = data["color"]
- goods_number_data[goods_art_no]["商品标题"] = data["goods_title"]
- goods_number_data[goods_art_no]["商品价格"] = data["retail_price"]
- goods_number_data[goods_art_no]["尺码"] = data["size"]
- goods_number_data[goods_art_no]["性别"] = data["gender"]
- goods_number_data[goods_art_no]["token"] = self.token
- print("货号数据:", goods_number_data)
- return goods_number_data
- def uploadImage(self, local_path: str) -> str:
- post_headers = {"Authorization": settings.Authorization}
- url = settings.DOMAIN + "/api/upload"
- resultData = self.s.post(
- url, files={"file": open(local_path, "rb")}, headers=post_headers
- ).json()
- return resultData["data"]["url"]
- # ============pixian抠图处理==========================
- def dispose_point(self, _type):
- # 扣分 sub;add为增加分数,每次操作一分
- url = "{domain}/api/ai_image/client/dispose_point".format(
- domain=settings.DOMAIN
- )
- data = {"type": _type}
- _s = self.s.post(
- url=url, headers=self.post_headers, data=json.dumps(data), timeout=10
- )
- response_data = _s.json()
- return response_data
- def send_message(self, text):
- # 发送钉钉消息
- url = "{domain}/api/ai_image/client/send_message".format(domain=settings.DOMAIN)
- data = {"message": text}
- _s = self.s.post(
- url=url, headers=self.post_headers, data=json.dumps(data), timeout=10
- )
- response_data = _s.json()
- return response_data
- def get_cutout_image_times(self):
- # 获取抠图剩余次数
- url = "{domain}/api/ai_image/client/search_company_balance".format(
- domain=settings.DOMAIN
- )
- _s = self.s.post(url=url, headers=self.post_headers, timeout=10)
- response_data = _s.json()
- if "data" not in response_data:
- return False
- else:
- return response_data["data"]
- def get_key_secret(self):
- # 获取抠图剩余次数
- url = "{domain}/api/ai_image/client/get_key_serect".format(
- domain=settings.DOMAIN
- )
- _s = self.s.post(url=url, headers=self.post_headers, timeout=10)
- response_data = _s.json()
- return response_data["data"]
|