import base64 import requests import settings import json, asyncio import numpy as np from utils.common import message_queue from middleware import UnicornException from PIL import Image import io, time from logger import logger # logger = logging.getLogger(__name__) class JsonEncoder(json.JSONEncoder): """Convert numpy classes to JSON serializable objects.""" def default(self, obj): if isinstance(obj, (np.integer, np.floating, np.bool_)): return obj.item() elif isinstance(obj, np.ndarray): return obj.tolist() else: return super(JsonEncoder, self).default(obj) def download_image_with_pil(url, save_path): """通过url保存图片""" try: # 发送请求获取图片数据 response = requests.get(url) image_data = response.content # 使用PIL处理下载的图片 image = Image.open(io.BytesIO(image_data)) image = image.convert("RGB") # 根据当前时间生成文件名 # 保存图片 image.save(save_path) return save_path except Exception as e: print("保存错误",e) return "error" class AIGCDataRequest(object): def __init__(self, token): self.s = requests.session() self.token = token self.post_headers = { "Authorization": token, } def uploadImage(self, local_path: str) -> str: post_headers = {"Authorization": self.token} url = settings.DOMAIN + "/api/upload" resultData = self.s.post( url, files={"file": open(local_path, "rb")}, headers=post_headers ).json() return resultData["data"]["url"] def center_paste_image( self, source_image_path, output_path, width=900, ratio=(3, 4) ): """ 将一张PNG透明图片居中粘贴到指定尺寸和比例的背景上,保持透明度 Args: source_image_path: 源PNG图片路径 output_path: 输出图片路径 width: 目标图片宽度 ratio: 目标图片比例 (宽:高) """ # 计算目标尺寸 (900 x 1200) target_width = width target_height = int(width * ratio[1] / ratio[0]) # 创建透明背景图片(RGBA模式) background = Image.new( "RGBA", (target_width, target_height), (255, 255, 255, 0) ) # 打开源图片 source_img = Image.open(source_image_path) # 确保源图片是RGBA模式以保持透明度 if source_img.mode != "RGBA": source_img = source_img.convert("RGBA") # 调整源图片大小以适应目标尺寸,保持原图比例 source_width, source_height = source_img.size scale = min(target_width / source_width, target_height / source_height) new_width = int(source_width * scale) new_height = int(source_height * scale) # 调整源图片尺寸 source_img_resized = source_img.resize( (new_width, new_height), Image.Resampling.LANCZOS ) # 计算居中粘贴位置 paste_x = (target_width - new_width) // 2 paste_y = (target_height - new_height) // 2 # 将调整后的源图片居中粘贴到背景上,保留透明度 background.paste(source_img_resized, (paste_x, paste_y), source_img_resized) # 如果需要保存为PNG格式以保持透明度 if output_path.lower().endswith(".png"): background.save(output_path, format="PNG") else: # 如果保存为JPG等不支持透明度的格式,转换为RGB并使用白色背景 background = background.convert("RGB") background.save(output_path) source_img.close() return background def generateProductScene(self, local_path, prompt, save_path): imageUrl = self.uploadImage(local_path) print("imageUrl", imageUrl) data = { "site": 1, "base_image": imageUrl, "keyword": prompt, "model_type": 1, "gemini_model": "gemini-2.5-flash-image-preview", } """生成场景图""" try: url = settings.DOMAIN + "/api/ai_image/inspired/command_to_image" resultData = self.s.post(url, data=data, headers=self.post_headers, timeout=80).json() code = resultData.get("code", 0) message = resultData.get("message", "") if code != 0: raise UnicornException(message) image_arr = resultData.get("data", None).get("image", []) if len(image_arr) == 0: raise UnicornException("场景图生成失败") image_url = image_arr[0] save_image_path = download_image_with_pil(image_url, save_path) return save_image_path except: raise UnicornException("场景图生成失败") def generateProductSceneQW(self, local_path, prompt, save_path): '''千问生成场景图''' imageUrl = self.uploadImage(local_path) data = { "machine_type": 0, # 0鞋;1服装 "generate_type": 0, # 生成类型,这里指代得是场景图还是模特图;0场景图;1模特图 "base_image": imageUrl, "prompt": prompt } """生成场景图""" url = settings.DOMAIN + "/api/ai_image/main/image_edit_generate" resultData = self.s.post(url, data=data, headers=self.post_headers).json() code = resultData.get("code", 0) message = resultData.get("message", "") if code != 0: raise UnicornException(message) image_url = resultData.get("data", None).get("image_url", '') if image_url == "" or image_url is None: raise UnicornException("场景图生成失败") save_image_path = download_image_with_pil(image_url, save_path) return save_image_path def generateModelShoesQW(self, local_path, model_id, save_path): '''千问生成场景图''' imageUrl = self.uploadImage(local_path) data = { "machine_type": 0, # 0鞋;1服装 "generate_type": 1, # 生成类型,这里指代得是场景图还是模特图;0场景图;1模特图 "base_image": imageUrl, "model_template_id": model_id, } """生成场景图""" url = settings.DOMAIN + "/api/ai_image/main/image_edit_generate" resultData = self.s.post(url, data=data, headers=self.post_headers).json() code = resultData.get("code", 0) message = resultData.get("message", "") if code != 0: raise UnicornException(message) image_url = resultData.get("data", None).get("image_url", '') if image_url == "" or image_url is None: raise UnicornException("模特图生成失败") save_image_path = download_image_with_pil(image_url, save_path) return save_image_path def searchProgress(self, id): """查询进度""" try: url = settings.DOMAIN + "/api/ai_image/main/search_bacth_progress" data = {"site": 1, "generate_ids": [id], "type": "aigc_pro"} resultData = self.s.post(url, json=data, headers=self.post_headers, timeout=10) resultData = resultData.json() code = resultData.get("code", 0) message = resultData.get("message", "") if code != 0: raise UnicornException(message) data_result = resultData.get("data", []) print("查询进度", data_result) logger.info(f"查询进度:{data_result}") if len(data_result) == 0: return -1, None data_item = data_result[0] status = data_item.get("status", -1) result_image = None # 只有在完成状态下才返回图片URL if status == 2: # 完成 result_image_urls = data_item.get("result_image_urls", []) result_image_urls = [] if result_image_urls is None else result_image_urls result_image = result_image_urls[0] if len(result_image_urls) > 0 else None return status, result_image except requests.Timeout: print("查询进度超时") return -1, None except Exception as e: print(f"查询进度异常: {e}") return -1, None def generateUpperShoes(self, local_path, model_id, save_path): """生成上脚图""" print("生成上脚图", local_path, model_id, save_path) imageUrl = self.uploadImage(local_path) data = { "site": 1, "model_template_id": model_id, "base_image": imageUrl, "creatClass": "鞋子上脚图-鞋子上脚图", "creatTimer": 40, "pname": "OnFeetImage", } """生成上脚图""" url = settings.DOMAIN + "/api/ai_image/main/upper_footer" resultData = self.s.post(url, data=data, headers=self.post_headers).json() code = resultData.get("code", 0) message = resultData.get("message", "") if code != 0: raise UnicornException(message) generate_ids = resultData.get("data", None).get("generate_ids", []) if len(generate_ids) == 0: raise UnicornException("模特图生成失败") generate_id = generate_ids[0] search_times = 80 status = 0 result_image = None print("generate_id", generate_id) while search_times > 0: print(f"模特图查询第{search_times}次") logger.info(f"模特图查询第{search_times}次") status, result_image = self.searchProgress(generate_id) # status: -1=失败, 0=排队中, 1=进行中, 2=完成 if status == 2: # 完成 break if status == -1: # 失败 break # status为0(排队中)或1(进行中)时继续查询 time.sleep(1) search_times -= 1 # 循环结束后检查最终状态 if status == -1 or (status != 2 and search_times <= 0): raise UnicornException("模特图生成失败") save_image_path = download_image_with_pil(result_image, save_path) print("上脚图save_image_path",result_image, save_image_path) return save_image_path def generateModelFitting(self, local_path, model_id, face_type, save_path): """生成上脚图""" imageUrl = self.uploadImage(local_path) data = { "site": 1, "scene_key": model_id, "topsImg": imageUrl, "generate_type": 1, "face_type": face_type, "pname": "ModelFitting_clothing", } print("生成上脚图", local_path, model_id, save_path) print("生成上脚图==>data", data) """生成上脚图""" url = settings.DOMAIN + "/api/ai_image/clothing/model_fitting_vk" resultData = self.s.post(url, data=data, headers=self.post_headers) # print("模特图resultData", resultData.content) resultData = resultData.json() code = resultData.get("code", 0) message = resultData.get("message", "") if code != 0: raise UnicornException(message) generate_ids = resultData.get("data", None).get("generate_ids", []) if len(generate_ids) == 0: raise UnicornException("模特图生成失败") generate_id = generate_ids[0] search_times = 80 status = 0 result_image = None while search_times > 0: print(f"查询第{search_times}次") status, result_image = self.searchProgress(generate_id) # status: -1=失败, 0=排队中, 1=进行中, 2=完成 if status == 2: # 完成 break if status == -1: # 失败 break # status为0(排队中)或1(进行中)时继续查询 time.sleep(1) search_times -= 1 # 循环结束后检查最终状态 if status == -1 or (status != 2 and search_times <= 0): raise UnicornException("模特图生成失败") save_image_path = download_image_with_pil(result_image, save_path) print("上脚图save_image_path",result_image, save_image_path) return save_image_path def generateProductSceneQW(self, local_path, prompt, save_path): """千问生成场景图""" imageUrl = self.uploadImage(local_path) data = { "machine_type": 1, # 0鞋;1服装 "generate_type": 0, # 生成类型,这里指代得是场景图还是模特图;0场景图;1模特图 "base_image": imageUrl, "prompt": prompt, } """生成场景图""" url = settings.DOMAIN + "/api/ai_image/main/image_edit_generate" resultData = self.s.post(url, data=data, headers=self.post_headers).json() code = resultData.get("code", 0) message = resultData.get("message", "") if code != 0: raise UnicornException(message) image_url = resultData.get("data", None).get("image_url", "") if image_url == "" or image_url is None: raise UnicornException("场景图生成失败") save_image_path = download_image_with_pil(image_url, save_path) return save_image_path def generateModelclothingQW(self, local_path, model_id, save_path): """千问生成场景图""" imageUrl = self.uploadImage(local_path) data = { "machine_type": 1, # 0鞋;1服装 "generate_type": 1, # 生成类型,这里指代得是场景图还是模特图;0场景图;1模特图 "base_image": imageUrl, "model_template_id": model_id, } """生成场景图""" url = settings.DOMAIN + "/api/ai_image/main/image_edit_generate" resultData = self.s.post(url, data=data, headers=self.post_headers).json() code = resultData.get("code", 0) message = resultData.get("message", "") if code != 0: raise UnicornException(message) image_url = resultData.get("data", None).get("image_url", "") if image_url == "" or image_url is None: raise UnicornException("模特图生成失败") save_image_path = download_image_with_pil(image_url, save_path) return save_image_path class OnlineDataRequest(object): def __init__(self, token): self.s = requests.session() self.token = token self.post_headers = { "Authorization": token, # "Origin": settings.Headers["Origin"], # "Host": settings.Headers["Host"], "Content-Length": "0", "Content-Type": "application/json", "Accept": "application/json", } print("28 Authorization:", self.post_headers["Authorization"]) def refresh_headers(self, token): self.post_headers = { "Authorization": token, # "Origin": settings.Headers["Origin"], # "Host": settings.Headers["Host"], "Content-Length": "0", "Content-Type": "application/json", "Accept": "application/json", } def auth_user(self): # 用户登录 url = "{domain}/api/auth/user".format(domain=settings.DOMAIN) s = requests.session() _s = s.get(url=url, headers=settings.Headers) response_data = _s.json() return response_data def logout(self): url = "{domain}/api/auth/logout".format(domain=settings.DOMAIN) s = requests.session() _s = s.post(url=url, headers=settings.Headers) def get_change_bar_code(self, code): url = "{domain}/api/hct/open/sting_search_goods?string={code}".format( domain=settings.DOMAIN, code=code ) try: s = requests.get(url) goods_art_no = s.json()["data"]["goods_art_no"] return goods_art_no except BaseException as e: print(e) return def get_goods_art_no_info(self, numbers_list=None, goods_art_list=None, token=None): # 获取商品基础信息,入参为商品的编号 url = "{domain}/api/backend/produce/goods/info".format(domain=settings.DOMAIN) print("执行 get_goods_art_no_info ", url) if numbers_list: data = {"numbers": numbers_list} print("请求编码:", numbers_list) else: data = {"goods_art_nos": goods_art_list} print("请求货号:", goods_art_list) print("请求货号=====>", self.token) print("执行 get_goods_art_no_info----------------", data) post_headers = { "Authorization": token, # "Origin": settings.Headers["Origin"], # "Host": settings.Headers["Host"], "Content-Length": "", "Content-Type": "application/json", "Accept": "application/json", } data = json.dumps(data) post_headers["Content-Length"] = str(len(data)) _s = self.s.post(url=url, data=data, headers=post_headers) # _s = self.s.get(url=url, params=params, headers=settings.Headers) response_data = _s.json() # print(response_data) # print("\n") goods_number_data = {} # ["", "", "", "", "", "", "", "", "", "", "", ] if "data" not in response_data: return {} for data in response_data["data"]: if numbers_list: number = data["number"] else: number = data["goods_art_no"].upper() goods_number_data[number] = {} goods_number_data[number]["商品面料"] = data["fabric"] goods_number_data[number]["商品内里"] = data["lining"] goods_number_data[number]["商品鞋底"] = data["sole"] goods_number_data[number]["后帮高"] = data["back_height"] goods_number_data[number]["前掌宽"] = data["forefoot_width"] goods_number_data[number]["鞋跟高"] = data["heel_height"] goods_number_data[number]["FAB介绍"] = data["fab_info"] goods_number_data[number]["编号"] = data["number"] goods_number_data[number]["商品货号"] = data["goods_art_no"].upper() goods_number_data[number]["款号"] = data["goods_number"].upper() goods_number_data[number]["颜色名称"] = data["color"] goods_number_data[number]["所属企划"] = data["projects"][0] goods_number_data[number]["设计方名称"] = data["purchasing_unit"] goods_number_data[number]["供应商"] = data["supplier_name"] goods_number_data[number]["供应商编码"] = data["supplier_code"].lstrip("0") goods_number_data[number]["供应商货号"] = data["supplier_goods_artno"] goods_number_data[number]["销售工厂"] = data["sales_factory_name"] goods_number_data[number]["销售组织"] = data["man_org_name"] goods_number_data[number]["是否SAP"] = data["source"] goods_number_data[number]["OEM报价"] = data["oem_price"] goods_number_data[number]["出厂价"] = data["ex_factory_price"] goods_number_data[number]["首单货期"] = data["earliest_delivery_date"] goods_number_data[number]["包装"] = data["package_specification"] goods_number_data[number]["创建日期"] = data["created_at"] goods_number_data[number]["货号图"] = data["image"] return goods_number_data def get_on_goods_all_art(self, number): # 获取商品基础信息,入参为商品的编号 url = "{domain}/api/backend/produce/goods/query/numbers?number={number}".format( domain=settings.DOMAIN, number=number ) _s = self.s.get(url=url, headers=self.post_headers) response_data = _s.json() print(number, response_data) """ 14250230 {'data': {'goods_number': 'AC5200117', 'brother_goods_arts': [{'number': '14250232', 'goods_art_no': 'AC52001173', 'color': '杏色'}, {'number': '14250231', 'goods_art_no': 'AC52001172', 'color': '灰色'}, {'number': '14250230', 'goods_art_no': 'AC52001171', 'color': '黑色'}]}, 'code': 0, 'message': 'success'} """ return ( response_data["data"]["goods_number"], response_data["data"]["brother_goods_arts"], response_data["data"]["goods_art_no"], ) def get_views(self, image_url): url = "http://{}/shoes_category".format(settings.VIEW_DEAL_DOMAIN) data = { "train_path": "./datasets/Shoes_Dataset/Train/angle", "model_filename": "./models/0320/output0320.pth", "validate_path": image_url, } _s = requests.post( url=url, data=json.dumps(data), ) response_data = _s.json() return response_data["classify_result"] def uploadImage(self, local_path: str) -> str: post_headers = {"Authorization": self.token} url = settings.DOMAIN + "/api/upload" resultData = self.s.post( url, files={"file": open(local_path, "rb")}, headers=post_headers ).json() return resultData["data"]["url"] def get_current_menu(self): def get_menu(_menu_dict, _data): for menu in _data: _menu_dict[menu["key"]] = {} for mods in menu["mods_arr"]: _menu_dict[menu["key"]][mods["key"]] = mods["name"] if "_child" in menu: get_menu(_menu_dict, menu["_child"]) return _menu_dict url = "{domain}/api/backend/basic/get_current_menu".format( domain=settings.DOMAIN, ) _s = self.s.get(url=url, headers=settings.Headers) response_data = _s.json() try: menu_data = response_data["data"]["pc_menu"] menu_dict = {} menu_dict = get_menu(menu_dict, menu_data) except: menu_dict = {} # print(json.dumps(menu_dict,ensure_ascii=False)) # raise 1 return menu_dict # 获取所有资源的配置 def get_resource_config(self): url = "{domain}/api/openai/query_client_addons".format(domain=settings.DOMAIN) _s = self.s.get( url=url, headers=self.post_headers, params={"type": "client_camera"} ) response_data = _s.json() return response_data # 拍照日志上报 def add_auto_photo_logs(self, data): url = "{domain}/api/openai/add_auto_photo_logs".format(domain=settings.DOMAIN) post_data = { "goods_no": data["goods_art_no"], "take_photo_created_at": data["take_photo_created_at"], "photo_created_at": data["photo_create_time"], "image_dispose_mode": data["image_deal_mode"], "photo_serial_number": data["image_index"], } post_data = json.dumps(post_data) _s = self.s.post(url=url, headers=self.post_headers, data=post_data) response_data = _s.json() if settings.IS_TEST: print("209-----拍照日志上报 add_auto_photo_logs") print(response_data) return response_data def upload_pic_list_data(self, data): url = "{domain}/api/backend/goods/save/images".format(domain=settings.DOMAIN) data = json.dumps(data) self.post_headers["Content-Length"] = str(len(data)) _s = self.s.post(url=url, data=data, headers=self.post_headers) response_data = _s.json() try: if response_data["code"] == 0 and response_data["message"] == "success": return True else: print(data) print(response_data) return False except BaseException as e: print(data) print(e) print(response_data) return False def upload_pic(self, goods_data): # 检查货号图是否存在 url = "{domain}/api/backend/upload".format(domain=settings.DOMAIN) # print(url) headers = { "Authorization": settings.Headers["Authorization"], "User-Agent": settings.Headers["User-Agent"], "Origin": settings.Headers["Origin"], "Host": settings.Headers["Host"], } files = [ ( "file", ( goods_data["file_path"], goods_data["image_io"], "image/{}".format(goods_data["e"]), ), ) ] _s = requests.post(url=url, headers=headers, files=files) response_data = _s.json() return response_data["data"]["url"] # 查询是否已有详情图 def check_detail_image(self, goods_art_no, token): url = "{domain}/api/backend/goods/check_detail_image?number={number}".format( domain=settings.DOMAIN, number=goods_art_no ) _s = self.s.get(url=url, headers=self.post_headers) response_data = _s.json() # print(response_data) return response_data["data"]["hasDetailImage"] # 调用API识别是否是拖鞋 def yolo_shoes_category(self, image_url): url = "{domain}/api/ai_image/main/yolo_shoes_category".format( domain=settings.DOMAIN ) post_data = { "image_url": image_url, } post_data = json.dumps(post_data) _s = self.s.post(url=url, headers=self.post_headers, data=post_data) response_data = _s.json() if settings.IS_TEST: print("278-----yolo_shoes_category") print(response_data) r_data = None try: r_data = response_data["data"]["category"] except BaseException as e: print("285", e) return r_data # 图片上传by IO def upload_image_by_io(self, image_io) -> str: post_headers = {"Authorization": settings.Authorization} url = settings.DOMAIN + "/api/upload" resultData = self.s.post( url, files={"file": image_io}, headers=post_headers ).json() return resultData["data"]["url"] def upload_goods_api(self, params): """上传商品api""" post_headers = { "Authorization": self.token, "Content-Type": "application/json", } url = settings.DOMAIN + "/api/ai_image/camera_machine/publish_goods" postData = json.dumps(params) # print("上传商品api==>url", url) # print("上传第三方数据打印", params) resultData = self.s.post(url, data=postData, headers=post_headers).json() print("上传商品api==>resultData", resultData) return resultData def sendSocketMessage(self, code=0, msg="", data=None, device_status=2,msg_type="upload_goods_progress"): data = { "code": code, "msg": msg, "status": device_status, "data": data, "msg_type": msg_type, } loop = asyncio.get_event_loop() loop.create_task(message_queue.put(data)) def uploadGoods2ThirdParty(self, goods_no_dict=None, online_stores=[]): params = [] message_type = "upload_goods_progress" if goods_no_dict == None: return success_goods_arts = [] for store in online_stores: for goods_no in goods_no_dict.keys(): goods_data = goods_no_dict[goods_no] detail_path = goods_data.get("detail_path", "") if detail_path == "": continue goods_title = goods_data.get("商品标题", "") if goods_title == "": continue goods_price = goods_data.get("商品价格", 0) if goods_price == '': goods_price = 0 if goods_price == 0: continue skuList = [] itemImageInfoList = [] itemSkuImageList = [] sku_list_basic = goods_data.get("货号资料", []) quantity = 9999 skuPropValueList = [] for skuIdx, sku_data in enumerate(sku_list_basic): sku_goods_art_no = sku_data.get("货号", "") color_name = sku_data.get("颜色名称", "") size = sku_data.get("尺码", "") # 尺码 mainImages = sku_data.get("800x800", []) if not mainImages: continue success_goods_arts.append(sku_goods_art_no) mainImagePath = mainImages[0] imageUrl = self.uploadImage(local_path=mainImagePath) skuItemData = { "skuNo": sku_goods_art_no, "originalPrice": float(goods_price), "newSkuWeight": int(1), "skuMainImageUrl": str(imageUrl), "skuName": f"颜色:{color_name};尺码:{size}", "sellingPrice": float(goods_price), "quantity": int(quantity), "showOrder": int(skuIdx + 1), } skuList.append(skuItemData) itemImage = { "imageUrl": str(imageUrl), "imageType": 0, "imageItem": int(skuIdx), "imageIndex": 10, } itemImageInfoList.append(itemImage) imageJson = { "imageUrl": str(imageUrl), "imageType": 1, "showOrder": 1, } skuPropValueList.append( { "imageJson": imageJson, "propValue": str(color_name), "showOrder": 1, } ) itemSkuImageList.append( { "propName": "颜色", "isImageProp": 1, "propShowOrder": 1, "skuPropValueList": skuPropValueList, } ) detailImageUrl = self.uploadImage(local_path=detail_path) category_info = "流行男鞋>>休闲鞋>>时尚休闲鞋" itemData = { "catePathName": category_info, # 分类 "itemName": str(goods_title), # 商品标题 "itemNo": str(goods_no), "brandName": store, # 品牌名称 "sellingPrice": float(goods_price), # 售价(未划线价) "originalPrice": float(goods_price), # 划线价 "quantity": int(quantity), # 库存数量 "propInfoList": [ {"propName": "品牌", "propIndex": 2, "propValue": "Vali"}, { "propName": "平台类目", "propIndex": 3, "propValue": category_info, }, ], "skuList": skuList, "itemImageInfoList": itemImageInfoList, "itemSkuImageList": itemSkuImageList, "wapDescription": f'', "pcDescription": f'', } params.append(itemData) json_params = str(params) # 直接转换为字符串表示 print("json_params", json_params) # 使用base64编码 encoded = base64.b64encode(json_params.encode("utf-8")).decode("utf-8") self.upload_goods_api({"bizcontent": encoded, "online_stores": online_stores}) print("商品上传第三方成功") return True class GetOnlineDataHLM(OnlineDataRequest): def __init__(self, token): super().__init__(token) self.token = token def upload_pic(self, goods_data, token): # 检查货号图是否存在 url = "{domain}/api/backend/upload".format(domain=settings.DOMAIN) # print(url) headers = { "Authorization": self.token, # 'User-Agent': settings.Headers["User-Agent"], # 'Origin': settings.Headers["Origin"], # 'Host': settings.Headers["Host"], } files = [ ( "file", ( goods_data["file_path"], goods_data["image_io"], "image/{}".format(goods_data["e"]), ), ) ] _s = requests.post(url=url, headers=headers, files=files) response_data = _s.json() return response_data["data"]["url"] def upload_pic_list_data(self, data, token): url = "{domain}/api/backend/goods/save/images".format(domain=settings.DOMAIN) data = json.dumps(data) self.post_headers["Content-Length"] = str(len(data)) _s = self.s.post(url=url, data=data, headers=self.post_headers) response_data = _s.json() try: if response_data["code"] == 0 and response_data["message"] == "success": return True else: print(response_data) return False except BaseException as e: print(e) print(response_data) return False def get_goods_art_no_info(self, numbers_list=None, goods_art_list=None, token=None): # 获取商品基础信息,入参为商品的编号 url = "{domain}/api/backend/goods_client/goods_query".format( domain=settings.DOMAIN ) data = {"goods_art_list": goods_art_list} print("url:", url) print("请求货号:", goods_art_list) post_headers = { "Authorization": token, # "Origin": settings.Headers["Origin"], # "Host": settings.Headers["Host"], "Content-Length": "", "Content-Type": "application/json", "Accept": "application/json", } data = json.dumps(data) print(post_headers) print(data) # post_headers["Content-Length"] = str(len(data)) _s = self.s.post(url=url, data=data, headers=post_headers) # _s = self.s.get(url=url, params=params, headers=settings.Headers) response_data = _s.json() print(response_data) print("\n") goods_number_data = {} # ["", "", "", "", "", "", "", "", "", "", "", ] if "data" not in response_data: return {} for data in response_data["data"]: goods_art_no = str(data["goods_art_no"]) goods_number_data[goods_art_no] = {} goods_number_data[goods_art_no]["商品货号"] = data[ "goods_art_no" ].upper() goods_number_data[goods_art_no]["款号"] = data[ "goods_number" ].upper() goods_number_data[goods_art_no]["商品面料"] = data["fabric"] goods_number_data[goods_art_no]["商品内里"] = data["lining"] goods_number_data[goods_art_no]["商品鞋底"] = data["sole"] goods_number_data[goods_art_no]["鞋垫"] = data["insole"] goods_number_data[goods_art_no]["颜色名称"] = data["color"] goods_number_data[goods_art_no]["商品标题"] = data["goods_title"] goods_number_data[goods_art_no]["商品价格"] = data["retail_price"] goods_number_data[goods_art_no]["尺码"] = data["size"] goods_number_data[goods_art_no]["性别"] = data["gender"] goods_number_data[goods_art_no]["token"] = self.token print("货号数据:", goods_number_data) return goods_number_data def uploadImage(self, local_path: str) -> str: post_headers = {"Authorization": settings.Authorization} url = settings.DOMAIN + "/api/upload" resultData = self.s.post( url, files={"file": open(local_path, "rb")}, headers=post_headers ).json() return resultData["data"]["url"] # ============pixian抠图处理========================== def dispose_point(self, _type): # 扣分 sub;add为增加分数,每次操作一分 url = "{domain}/api/ai_image/client/dispose_point".format( domain=settings.DOMAIN ) data = {"type": _type} _s = self.s.post( url=url, headers=self.post_headers, data=json.dumps(data), timeout=10 ) response_data = _s.json() return response_data def send_message(self, text): # 发送钉钉消息 url = "{domain}/api/ai_image/client/send_message".format(domain=settings.DOMAIN) data = {"message": text} _s = self.s.post( url=url, headers=self.post_headers, data=json.dumps(data), timeout=10 ) response_data = _s.json() return response_data def get_cutout_image_times(self): # 获取抠图剩余次数 url = "{domain}/api/ai_image/client/search_company_balance".format( domain=settings.DOMAIN ) _s = self.s.post(url=url, headers=self.post_headers, timeout=10) response_data = _s.json() if "data" not in response_data: return False else: return response_data["data"] def get_key_secret(self): # 获取抠图剩余次数 url = "{domain}/api/ai_image/client/get_key_serect".format( domain=settings.DOMAIN ) _s = self.s.post(url=url, headers=self.post_headers, timeout=10) response_data = _s.json() return response_data["data"]