123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188 |
- # import json
- # import time
- import copy
- import random
- import time
- import requests
- import settings
- import json
- import numpy as np
- import os, io
- from PIL import Image
- from io import BytesIO
- class JsonEncoder(json.JSONEncoder):
- """Convert numpy classes to JSON serializable objects."""
- def default(self, obj):
- if isinstance(obj, (np.integer, np.floating, np.bool_)):
- return obj.item()
- elif isinstance(obj, np.ndarray):
- return obj.tolist()
- else:
- return super(JsonEncoder, self).default(obj)
- class GetOnlineData(object):
- def __init__(self):
- self.s = requests.session()
- self.post_headers = {"Authorization": settings.Headers["Authorization"],
- "Origin": settings.Headers["Origin"],
- "Host": settings.Headers["Host"],
- "Content-Length": "0",
- "Content-Type": "application/json",
- "Accept": "application/json"}
- def refresh_headers(self):
- self.post_headers = {"Authorization": settings.Headers["Authorization"],
- "Origin": settings.Headers["Origin"],
- "Host": settings.Headers["Host"],
- "Content-Length": "0",
- "Content-Type": "application/json",
- "Accept": "application/json"}
- def get_key_secret(self):
- # 获取抠图剩余次数
- url = "{domain}/api/ai_image/client/get_key_serect".format(
- domain=settings.DOMAIN)
- _s = self.s.post(url=url, headers=self.post_headers, timeout=10)
- response_data = _s.json()
- return response_data["data"]
- def get_cutout_image_times(self):
- # 获取抠图剩余次数
- url = "{domain}/api/ai_image/client/search_company_balance".format(
- domain=settings.DOMAIN)
- _s = self.s.post(url=url, headers=self.post_headers, timeout=10)
- response_data = _s.json()
- if "data" not in response_data:
- return False
- else:
- return response_data["data"]
- def search_progress(self, generate_ids):
- # 查进度
- url = "{domain}/api/ai_image/client/search_progress".format(
- domain=settings.DOMAIN
- )
- data = {"generate_ids": generate_ids}
- _s = self.s.post(url=url, headers=self.post_headers, data=json.dumps(data), timeout=60)
- response_data = _s.json()
- return response_data["data"]
- def download_picture(self, url, out_path):
- response = requests.get(url, timeout=30)
- pic = response.content
- if out_path:
- with open(out_path, 'wb') as f:
- f.write(pic)
- else:
- return Image.open(BytesIO(pic))
- def remove_background(self, images_url):
- url = "{domain}/api/ai_image/client/remove_background".format(
- domain=settings.DOMAIN
- )
- data = {"images": images_url}
- _s = self.s.post(url=url, headers=self.post_headers, data=json.dumps(data), timeout=30)
- response_data = _s.json()
- return response_data["data"]["generate_ids"], int(response_data["data"]["balance"])
- def get_current_menu(self):
- def get_menu(_menu_dict, _data):
- for menu in _data:
- _menu_dict[menu["key"]] = {}
- for mods in menu["mods_arr"]:
- _menu_dict[menu["key"]][mods["key"]] = mods["name"]
- if "_child" in menu:
- get_menu(_menu_dict, menu["_child"])
- return _menu_dict
- url = "{domain}/api/backend/basic/get_current_menu".format(
- domain=settings.DOMAIN,
- )
- _s = self.s.get(url=url, headers=settings.Headers)
- response_data = _s.json()
- try:
- menu_data = response_data["data"]["pc_menu"]
- menu_dict = {}
- menu_dict = get_menu(menu_dict, menu_data)
- except:
- menu_dict = {}
- # print(json.dumps(menu_dict,ensure_ascii=False))
- # raise 1
- return menu_dict
- def upload_pic(self, file_path, buffer=None):
- if buffer is None:
- root_path, file_name = os.path.split(file_path)
- e = os.path.splitext(file_name)[1]
- __format = {"jpg": "JPEG",
- "JPG": "JPEG",
- "JPEG": "JPEG",
- "jpeg": "JPEG",
- "png": "PNG",
- "PNG": "PNG", }
- _format = __format[e[1:]]
- if _format == "JPEG":
- buffer = io.BytesIO()
- im = Image.open(file_path)
- im.save(buffer, format='JPEG')
- buffer.seek(0)
- else:
- with open(file_path, 'rb') as file:
- buffer = io.BytesIO(file.read())
- files = [
- ('file',
- (file_path,
- buffer,
- 'image/{}'.format(_format)))
- ]
- else:
- files = [
- ('file',
- ("1.jpg",
- buffer,
- 'image/{}'.format("JPEG")))
- ]
- url = "{domain}/api/backend/upload".format(
- domain=settings.DOMAIN
- )
- headers = {"Authorization": settings.Headers["Authorization"],
- "Origin": settings.Headers["Origin"],
- "Host": settings.Headers["Host"],
- }
- _s = requests.post(url=url, headers=headers, files=files, timeout=60)
- # print(_s.text)
- response_data = _s.json()
- return response_data["data"]["url"]
- def get_keys(self):
- k = "pxnib99dbchtmdm"
- s = "ub9uj5678gs4m2bnrass1t3tn6ughlk065ianosk06akagolcr2u"
- return (k,s)
- def dispose_point(self,_type):
- # 扣分 sub;add为增加分数,每次操作一分
- url = "{domain}/api/ai_image/client/dispose_point".format(
- domain=settings.DOMAIN)
- data = {"type": _type}
- _s = self.s.post(url=url, headers=self.post_headers,data=json.dumps(data), timeout=10)
- response_data = _s.json()
- return response_data
- def send_message(self,text):
- # 发送钉钉消息
- url = "{domain}/api/ai_image/client/send_message".format(
- domain=settings.DOMAIN)
- data = {"message": text}
- _s = self.s.post(url=url, headers=self.post_headers,data=json.dumps(data), timeout=10)
- response_data = _s.json()
- return response_data
|