| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238 |
- # import json
- # import time
- import copy
- import random
- import time
- import requests
- import json
- import numpy as np
- import os, io
- from PIL import Image
- from io import BytesIO
- import configparser, json
- from models import UnicornException
- # ===============默认数据配置=====================
- config = configparser.ConfigParser()
- config_name = "config.ini"
- config.read(config_name)
- debug = config.get("app", "debug")
- env = config.get("app", "env")
- origin = config.get(env, "origin")
- host = config.get(env, "host")
- domain = config.get(env, "domain")
- Origin = origin
- Host = host
- class JsonEncoder(json.JSONEncoder):
- """Convert numpy classes to JSON serializable objects."""
- def default(self, obj):
- if isinstance(obj, (np.integer, np.floating, np.bool_)):
- return obj.item()
- elif isinstance(obj, np.ndarray):
- return obj.tolist()
- else:
- return super(JsonEncoder, self).default(obj)
- class GetOnlineData(object):
- def __init__(self, token):
- self.token = "Bearer " + token
- self.s = requests.session()
- self.post_headers = {
- "Authorization": self.token,
- "Origin": Origin,
- "Host": Host,
- "Content-Length": "0",
- "Content-Type": "application/json",
- "Accept": "application/json",
- }
- def refresh_headers(self):
- self.post_headers = {
- "Authorization": self.token,
- "Origin": Origin,
- "Host": Host,
- "Content-Length": "0",
- "Content-Type": "application/json",
- "Accept": "application/json",
- }
- def get_key_secret(self):
- # 获取抠图剩余次数
- url = "{domain}/api/ai_image/client/get_key_serect".format(domain=domain)
- _s = self.s.post(url=url, headers=self.post_headers, timeout=10)
- response_data = _s.json()
- return response_data["data"]
- def get_cutout_image_times(self):
- # 获取抠图剩余次数
- print("domain", domain)
- url = "{domain}/api/ai_image/client/search_company_balance".format(
- domain=domain
- )
- _s = self.s.post(url=url, headers=self.post_headers, timeout=10)
- response_data = _s.json()
- if "data" not in response_data:
- return False
- else:
- return response_data["data"]
- def search_progress(self, generate_ids):
- # 查进度
- url = "{domain}/api/ai_image/client/search_progress".format(domain=domain)
- data = {"generate_ids": generate_ids}
- _s = self.s.post(
- url=url, headers=self.post_headers, data=json.dumps(data), timeout=60
- )
- response_data = _s.json()
- return response_data["data"]
- def download_picture(self, url, out_path):
- response = requests.get(url, timeout=30)
- pic = response.content
- if out_path:
- with open(out_path, "wb") as f:
- f.write(pic)
- else:
- return Image.open(BytesIO(pic))
- def remove_background(self, images_url):
- url = "{domain}/api/ai_image/client/remove_background".format(domain=domain)
- data = {"images": images_url}
- _s = self.s.post(
- url=url, headers=self.post_headers, data=json.dumps(data), timeout=30
- )
- response_data = _s.json()
- return response_data["data"]["generate_ids"], int(
- response_data["data"]["balance"]
- )
- def get_current_menu(self):
- def get_menu(_menu_dict, _data):
- for menu in _data:
- _menu_dict[menu["key"]] = {}
- for mods in menu["mods_arr"]:
- _menu_dict[menu["key"]][mods["key"]] = mods["name"]
- if "_child" in menu:
- get_menu(_menu_dict, menu["_child"])
- return _menu_dict
- url = "{domain}/api/backend/basic/get_current_menu".format(
- domain=domain,
- )
- Headers = {
- "Authorization": self.token,
- "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/109.0.0.0 Safari/537.36",
- "Origin": Origin,
- "Host": Host,
- "Content-Type": "application/json;charset=UTF-8",
- }
- _s = self.s.get(url=url, headers=Headers)
- response_data = _s.json()
- try:
- menu_data = response_data["data"]["pc_menu"]
- menu_dict = {}
- menu_dict = get_menu(menu_dict, menu_data)
- except:
- menu_dict = {}
- # print(json.dumps(menu_dict,ensure_ascii=False))
- # raise 1
- return menu_dict
- def upload_pic(self, file_path, buffer=None):
- if buffer is None:
- root_path, file_name = os.path.split(file_path)
- e = os.path.splitext(file_name)[1]
- __format = {
- "jpg": "JPEG",
- "JPG": "JPEG",
- "JPEG": "JPEG",
- "jpeg": "JPEG",
- "png": "PNG",
- "PNG": "PNG",
- }
- _format = __format[e[1:]]
- if _format == "JPEG":
- buffer = io.BytesIO()
- im = Image.open(file_path)
- im.save(buffer, format="JPEG")
- buffer.seek(0)
- else:
- with open(file_path, "rb") as file:
- buffer = io.BytesIO(file.read())
- files = [("file", (file_path, buffer, "image/{}".format(_format)))]
- else:
- files = [("file", ("1.jpg", buffer, "image/{}".format("JPEG")))]
- url = "{domain}/api/backend/upload".format(domain=domain)
- headers = {
- "Authorization": self.token,
- "Origin": Origin,
- "Host": Host,
- }
- _s = requests.post(url=url, headers=headers, files=files, timeout=60)
- response_data = _s.json()
- return response_data["data"]["url"]
- def model_form_segment(self, data=None):
- '''人台抠图任务api'''
- url = "{domain}/api/ai_image/v2/model_form_segment".format(domain=domain)
- headers = {
- "Authorization": self.token,
- "Origin": Origin,
- "Host": Host,
- "Content-Type": "application/json;charset=UTF-8",
- }
- paramsData = json.dumps(data)
- _s = requests.post(url=url, headers=headers, data=paramsData, timeout=60)
- response_data = _s.json()
- responseCode = response_data['code']
- if responseCode != 0:
- raise UnicornException(response_data["message"])
- return response_data["data"]
- def search_progress(self, data=None):
- '''进度查询'''
- url = "{domain}/api/ai_image/main/search_bacth_progress".format(domain=domain)
- headers = {
- "Authorization": self.token,
- "Origin": Origin,
- "Host": Host,
- "Content-Type": "application/json;charset=UTF-8",
- }
- paramsData = json.dumps(data)
- _s = requests.post(url=url, headers=headers, data=paramsData, timeout=60)
- response_data = _s.json()
- responseCode = response_data['code']
- if responseCode != 0:
- raise UnicornException(response_data["message"])
- return response_data["data"]
- def get_keys(self):
- k = "pxnib99dbchtmdm"
- s = "ub9uj5678gs4m2bnrass1t3tn6ughlk065ianosk06akagolcr2u"
- return (k, s)
- def dispose_point(self, _type):
- # 扣分 sub;add为增加分数,每次操作一分
- url = "{domain}/api/ai_image/client/dispose_point".format(domain=domain)
- data = {"type": _type}
- _s = self.s.post(
- url=url, headers=self.post_headers, data=json.dumps(data), timeout=10
- )
- response_data = _s.json()
- return response_data
- def send_message(self, text):
- # 发送钉钉消息
- url = "{domain}/api/ai_image/client/send_message".format(domain=domain)
- data = {"message": text}
- _s = self.s.post(
- url=url, headers=self.post_headers, data=json.dumps(data), timeout=10
- )
- response_data = _s.json()
- return response_data
|