# import json # import time import copy import random import time import requests import json import numpy as np import os, io from PIL import Image from io import BytesIO import configparser, json # ===============默认数据配置===================== config = configparser.ConfigParser() config_name = "config.ini" config.read(config_name) debug = config.get("app", "debug") env = config.get("app", "env") print("debug====>", debug) print("debug====>", debug,123) origin = config.get(env, "origin") host = config.get(env, "host") domain = config.get(env, "domain") Origin = origin Host = host class JsonEncoder(json.JSONEncoder): """Convert numpy classes to JSON serializable objects.""" def default(self, obj): if isinstance(obj, (np.integer, np.floating, np.bool_)): return obj.item() elif isinstance(obj, np.ndarray): return obj.tolist() else: return super(JsonEncoder, self).default(obj) class GetOnlineData(object): def __init__(self, token): self.token = "Bearer " + token self.s = requests.session() self.post_headers = { "Authorization": self.token, "Origin": Origin, "Host": Host, "Content-Length": "0", "Content-Type": "application/json", "Accept": "application/json", } def refresh_headers(self): self.post_headers = { "Authorization": self.token, "Origin": Origin, "Host": Host, "Content-Length": "0", "Content-Type": "application/json", "Accept": "application/json", } def get_key_secret(self): # 获取抠图剩余次数 url = "{domain}/api/ai_image/client/get_key_serect".format(domain=domain) _s = self.s.post(url=url, headers=self.post_headers, timeout=10) response_data = _s.json() return response_data["data"] def get_cutout_image_times(self): # 获取抠图剩余次数 print("domain", domain) url = "{domain}/api/ai_image/client/search_company_balance".format( domain=domain ) _s = self.s.post(url=url, headers=self.post_headers, timeout=10) response_data = _s.json() if "data" not in response_data: return False else: return response_data["data"] def search_progress(self, generate_ids): # 查进度 url = "{domain}/api/ai_image/client/search_progress".format(domain=domain) data = {"generate_ids": generate_ids} _s = self.s.post( url=url, headers=self.post_headers, data=json.dumps(data), timeout=60 ) response_data = _s.json() return response_data["data"] def download_picture(self, url, out_path): response = requests.get(url, timeout=30) pic = response.content if out_path: with open(out_path, "wb") as f: f.write(pic) else: return Image.open(BytesIO(pic)) def remove_background(self, images_url): url = "{domain}/api/ai_image/client/remove_background".format(domain=domain) data = {"images": images_url} _s = self.s.post( url=url, headers=self.post_headers, data=json.dumps(data), timeout=30 ) response_data = _s.json() return response_data["data"]["generate_ids"], int( response_data["data"]["balance"] ) def get_current_menu(self): def get_menu(_menu_dict, _data): for menu in _data: _menu_dict[menu["key"]] = {} for mods in menu["mods_arr"]: _menu_dict[menu["key"]][mods["key"]] = mods["name"] if "_child" in menu: get_menu(_menu_dict, menu["_child"]) return _menu_dict url = "{domain}/api/backend/basic/get_current_menu".format( domain=domain, ) Headers = { "Authorization": self.token, "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/109.0.0.0 Safari/537.36", "Origin": Origin, "Host": Host, "Content-Type": "application/json;charset=UTF-8", } _s = self.s.get(url=url, headers=Headers) response_data = _s.json() try: menu_data = response_data["data"]["pc_menu"] menu_dict = {} menu_dict = get_menu(menu_dict, menu_data) except: menu_dict = {} # print(json.dumps(menu_dict,ensure_ascii=False)) # raise 1 return menu_dict def upload_pic(self, file_path, buffer=None): if buffer is None: root_path, file_name = os.path.split(file_path) e = os.path.splitext(file_name)[1] __format = { "jpg": "JPEG", "JPG": "JPEG", "JPEG": "JPEG", "jpeg": "JPEG", "png": "PNG", "PNG": "PNG", } _format = __format[e[1:]] if _format == "JPEG": buffer = io.BytesIO() im = Image.open(file_path) im.save(buffer, format="JPEG") buffer.seek(0) else: with open(file_path, "rb") as file: buffer = io.BytesIO(file.read()) files = [("file", (file_path, buffer, "image/{}".format(_format)))] else: files = [("file", ("1.jpg", buffer, "image/{}".format("JPEG")))] url = "{domain}/api/backend/upload".format(domain=domain) headers = { "Authorization": self.token, "Origin": Origin, "Host": Host, } _s = requests.post(url=url, headers=headers, files=files, timeout=60) # print(_s.text) response_data = _s.json() return response_data["data"]["url"] def get_keys(self): k = "pxnib99dbchtmdm" s = "ub9uj5678gs4m2bnrass1t3tn6ughlk065ianosk06akagolcr2u" return (k, s) def dispose_point(self, _type): # 扣分 sub;add为增加分数,每次操作一分 url = "{domain}/api/ai_image/client/dispose_point".format(domain=domain) data = {"type": _type} _s = self.s.post( url=url, headers=self.post_headers, data=json.dumps(data), timeout=10 ) response_data = _s.json() return response_data def send_message(self, text): # 发送钉钉消息 url = "{domain}/api/ai_image/client/send_message".format(domain=domain) data = {"message": text} _s = self.s.post( url=url, headers=self.post_headers, data=json.dumps(data), timeout=10 ) response_data = _s.json() return response_data