# import json # import time import copy import random import time import requests import settings import json import numpy as np import os, io from PIL import Image from io import BytesIO class JsonEncoder(json.JSONEncoder): """Convert numpy classes to JSON serializable objects.""" def default(self, obj): if isinstance(obj, (np.integer, np.floating, np.bool_)): return obj.item() elif isinstance(obj, np.ndarray): return obj.tolist() else: return super(JsonEncoder, self).default(obj) class GetOnlineData(object): def __init__(self): self.s = requests.session() self.post_headers = {"Authorization": settings.Headers["Authorization"], "Origin": settings.Headers["Origin"], "Host": settings.Headers["Host"], "Content-Length": "0", "Content-Type": "application/json", "Accept": "application/json"} def refresh_headers(self): self.post_headers = {"Authorization": settings.Headers["Authorization"], "Origin": settings.Headers["Origin"], "Host": settings.Headers["Host"], "Content-Length": "0", "Content-Type": "application/json", "Accept": "application/json"} def get_key_secret(self): # 获取抠图剩余次数 url = "{domain}/api/ai_image/client/get_key_serect".format( domain=settings.DOMAIN) _s = self.s.post(url=url, headers=self.post_headers, timeout=10) response_data = _s.json() return response_data["data"] def get_cutout_image_times(self): # 获取抠图剩余次数 url = "{domain}/api/ai_image/client/search_company_balance".format( domain=settings.DOMAIN) _s = self.s.post(url=url, headers=self.post_headers, timeout=10) response_data = _s.json() if "data" not in response_data: return False else: return response_data["data"] def search_progress(self, generate_ids): # 查进度 url = "{domain}/api/ai_image/client/search_progress".format( domain=settings.DOMAIN ) data = {"generate_ids": generate_ids} _s = self.s.post(url=url, headers=self.post_headers, data=json.dumps(data), timeout=60) response_data = _s.json() return response_data["data"] def download_picture(self, url, out_path): response = requests.get(url, timeout=30) pic = response.content if out_path: with open(out_path, 'wb') as f: f.write(pic) else: return Image.open(BytesIO(pic)) def remove_background(self, images_url): url = "{domain}/api/ai_image/client/remove_background".format( domain=settings.DOMAIN ) data = {"images": images_url} _s = self.s.post(url=url, headers=self.post_headers, data=json.dumps(data), timeout=30) response_data = _s.json() return response_data["data"]["generate_ids"], int(response_data["data"]["balance"]) def get_current_menu(self): def get_menu(_menu_dict, _data): for menu in _data: _menu_dict[menu["key"]] = {} for mods in menu["mods_arr"]: _menu_dict[menu["key"]][mods["key"]] = mods["name"] if "_child" in menu: get_menu(_menu_dict, menu["_child"]) return _menu_dict url = "{domain}/api/backend/basic/get_current_menu".format( domain=settings.DOMAIN, ) _s = self.s.get(url=url, headers=settings.Headers) response_data = _s.json() try: menu_data = response_data["data"]["pc_menu"] menu_dict = {} menu_dict = get_menu(menu_dict, menu_data) except: menu_dict = {} # print(json.dumps(menu_dict,ensure_ascii=False)) # raise 1 return menu_dict def upload_pic(self, file_path, buffer=None): if buffer is None: root_path, file_name = os.path.split(file_path) e = os.path.splitext(file_name)[1] __format = {"jpg": "JPEG", "JPG": "JPEG", "JPEG": "JPEG", "jpeg": "JPEG", "png": "PNG", "PNG": "PNG", } _format = __format[e[1:]] if _format == "JPEG": buffer = io.BytesIO() im = Image.open(file_path) im.save(buffer, format='JPEG') buffer.seek(0) else: with open(file_path, 'rb') as file: buffer = io.BytesIO(file.read()) files = [ ('file', (file_path, buffer, 'image/{}'.format(_format))) ] else: files = [ ('file', ("1.jpg", buffer, 'image/{}'.format("JPEG"))) ] url = "{domain}/api/backend/upload".format( domain=settings.DOMAIN ) headers = {"Authorization": settings.Headers["Authorization"], "Origin": settings.Headers["Origin"], "Host": settings.Headers["Host"], } _s = requests.post(url=url, headers=headers, files=files, timeout=60) # print(_s.text) response_data = _s.json() return response_data["data"]["url"] def get_keys(self): k = "pxnib99dbchtmdm" s = "ub9uj5678gs4m2bnrass1t3tn6ughlk065ianosk06akagolcr2u" return (k,s) def dispose_point(self,_type): # 扣分 sub;add为增加分数,每次操作一分 url = "{domain}/api/ai_image/client/dispose_point".format( domain=settings.DOMAIN) data = {"type": _type} _s = self.s.post(url=url, headers=self.post_headers,data=json.dumps(data), timeout=10) response_data = _s.json() return response_data def send_message(self,text): # 发送钉钉消息 url = "{domain}/api/ai_image/client/send_message".format( domain=settings.DOMAIN) data = {"message": text} _s = self.s.post(url=url, headers=self.post_headers,data=json.dumps(data), timeout=10) response_data = _s.json() return response_data