module_online_data.py 8.0 KB


  1. # import json
  2. # import time
  3. import copy
  4. import random
  5. import time
  6. import requests
  7. import json
  8. import numpy as np
  9. import os, io
  10. from PIL import Image
  11. from io import BytesIO
  12. import configparser, json
  13. from models import UnicornException
  14. # ===============默认数据配置=====================
  15. config = configparser.ConfigParser()
  16. config_name = "config.ini"
  17. config.read(config_name)
  18. debug = config.get("app", "debug")
  19. env = config.get("app", "env")
  20. origin = config.get(env, "origin")
  21. host = config.get(env, "host")
  22. domain = config.get(env, "domain")
  23. Origin = origin
  24. Host = host
  25. class JsonEncoder(json.JSONEncoder):
  26. """Convert numpy classes to JSON serializable objects."""
  27. def default(self, obj):
  28. if isinstance(obj, (np.integer, np.floating, np.bool_)):
  29. return obj.item()
  30. elif isinstance(obj, np.ndarray):
  31. return obj.tolist()
  32. else:
  33. return super(JsonEncoder, self).default(obj)
  34. class GetOnlineData(object):
  35. def __init__(self, token):
  36. self.token = "Bearer " + token
  37. self.s = requests.session()
  38. self.post_headers = {
  39. "Authorization": self.token,
  40. "Origin": Origin,
  41. "Host": Host,
  42. "Content-Length": "0",
  43. "Content-Type": "application/json",
  44. "Accept": "application/json",
  45. }
  46. def refresh_headers(self):
  47. self.post_headers = {
  48. "Authorization": self.token,
  49. "Origin": Origin,
  50. "Host": Host,
  51. "Content-Length": "0",
  52. "Content-Type": "application/json",
  53. "Accept": "application/json",
  54. }
  55. def get_key_secret(self):
  56. # 获取抠图剩余次数
  57. url = "{domain}/api/ai_image/client/get_key_serect".format(domain=domain)
  58. _s = self.s.post(url=url, headers=self.post_headers, timeout=10)
  59. response_data = _s.json()
  60. return response_data["data"]
  61. def get_cutout_image_times(self):
  62. # 获取抠图剩余次数
  63. print("domain", domain)
  64. url = "{domain}/api/ai_image/client/search_company_balance".format(
  65. domain=domain
  66. )
  67. _s = self.s.post(url=url, headers=self.post_headers, timeout=10)
  68. response_data = _s.json()
  69. if "data" not in response_data:
  70. return False
  71. else:
  72. return response_data["data"]
  73. def search_progress(self, generate_ids):
  74. # 查进度
  75. url = "{domain}/api/ai_image/client/search_progress".format(domain=domain)
  76. data = {"generate_ids": generate_ids}
  77. _s = self.s.post(
  78. url=url, headers=self.post_headers, data=json.dumps(data), timeout=60
  79. )
  80. response_data = _s.json()
  81. return response_data["data"]
  82. def download_picture(self, url, out_path):
  83. response = requests.get(url, timeout=30)
  84. pic = response.content
  85. if out_path:
  86. with open(out_path, "wb") as f:
  87. f.write(pic)
  88. else:
  89. return Image.open(BytesIO(pic))
  90. def remove_background(self, images_url):
  91. url = "{domain}/api/ai_image/client/remove_background".format(domain=domain)
  92. data = {"images": images_url}
  93. _s = self.s.post(
  94. url=url, headers=self.post_headers, data=json.dumps(data), timeout=30
  95. )
  96. response_data = _s.json()
  97. return response_data["data"]["generate_ids"], int(
  98. response_data["data"]["balance"]
  99. )
  100. def get_current_menu(self):
  101. def get_menu(_menu_dict, _data):
  102. for menu in _data:
  103. _menu_dict[menu["key"]] = {}
  104. for mods in menu["mods_arr"]:
  105. _menu_dict[menu["key"]][mods["key"]] = mods["name"]
  106. if "_child" in menu:
  107. get_menu(_menu_dict, menu["_child"])
  108. return _menu_dict
  109. url = "{domain}/api/backend/basic/get_current_menu".format(
  110. domain=domain,
  111. )
  112. Headers = {
  113. "Authorization": self.token,
  114. "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/109.0.0.0 Safari/537.36",
  115. "Origin": Origin,
  116. "Host": Host,
  117. "Content-Type": "application/json;charset=UTF-8",
  118. }
  119. _s = self.s.get(url=url, headers=Headers)
  120. response_data = _s.json()
  121. try:
  122. menu_data = response_data["data"]["pc_menu"]
  123. menu_dict = {}
  124. menu_dict = get_menu(menu_dict, menu_data)
  125. except:
  126. menu_dict = {}
  127. # print(json.dumps(menu_dict,ensure_ascii=False))
  128. # raise 1
  129. return menu_dict
  130. def upload_pic(self, file_path, buffer=None):
  131. if buffer is None:
  132. root_path, file_name = os.path.split(file_path)
  133. e = os.path.splitext(file_name)[1]
  134. __format = {
  135. "jpg": "JPEG",
  136. "JPG": "JPEG",
  137. "JPEG": "JPEG",
  138. "jpeg": "JPEG",
  139. "png": "PNG",
  140. "PNG": "PNG",
  141. }
  142. _format = __format[e[1:]]
  143. if _format == "JPEG":
  144. buffer = io.BytesIO()
  145. im = Image.open(file_path)
  146. im.save(buffer, format="JPEG")
  147. buffer.seek(0)
  148. else:
  149. with open(file_path, "rb") as file:
  150. buffer = io.BytesIO(file.read())
  151. files = [("file", (file_path, buffer, "image/{}".format(_format)))]
  152. else:
  153. files = [("file", ("1.jpg", buffer, "image/{}".format("JPEG")))]
  154. url = "{domain}/api/backend/upload".format(domain=domain)
  155. headers = {
  156. "Authorization": self.token,
  157. "Origin": Origin,
  158. "Host": Host,
  159. }
  160. _s = requests.post(url=url, headers=headers, files=files, timeout=60)
  161. response_data = _s.json()
  162. return response_data["data"]["url"]
  163. def model_form_segment(self, data=None):
  164. '''人台抠图任务api'''
  165. url = "{domain}/api/ai_image/v2/model_form_segment".format(domain=domain)
  166. headers = {
  167. "Authorization": self.token,
  168. "Origin": Origin,
  169. "Host": Host,
  170. "Content-Type": "application/json;charset=UTF-8",
  171. }
  172. paramsData = json.dumps(data)
  173. _s = requests.post(url=url, headers=headers, data=paramsData, timeout=60)
  174. response_data = _s.json()
  175. responseCode = response_data['code']
  176. if responseCode != 0:
  177. raise UnicornException(response_data["message"])
  178. return response_data["data"]
  179. def search_progress(self, data=None):
  180. '''进度查询'''
  181. url = "{domain}/api/ai_image/main/search_bacth_progress".format(domain=domain)
  182. headers = {
  183. "Authorization": self.token,
  184. "Origin": Origin,
  185. "Host": Host,
  186. "Content-Type": "application/json;charset=UTF-8",
  187. }
  188. paramsData = json.dumps(data)
  189. _s = requests.post(url=url, headers=headers, data=paramsData, timeout=60)
  190. response_data = _s.json()
  191. responseCode = response_data['code']
  192. if responseCode != 0:
  193. raise UnicornException(response_data["message"])
  194. return response_data["data"]
  195. def get_keys(self):
  196. k = "pxnib99dbchtmdm"
  197. s = "ub9uj5678gs4m2bnrass1t3tn6ughlk065ianosk06akagolcr2u"
  198. return (k, s)
  199. def dispose_point(self, _type):
  200. # 扣分 sub;add为增加分数,每次操作一分
  201. url = "{domain}/api/ai_image/client/dispose_point".format(domain=domain)
  202. data = {"type": _type}
  203. _s = self.s.post(
  204. url=url, headers=self.post_headers, data=json.dumps(data), timeout=10
  205. )
  206. response_data = _s.json()
  207. return response_data
  208. def send_message(self, text):
  209. # 发送钉钉消息
  210. url = "{domain}/api/ai_image/client/send_message".format(domain=domain)
  211. data = {"message": text}
  212. _s = self.s.post(
  213. url=url, headers=self.post_headers, data=json.dumps(data), timeout=10
  214. )
  215. response_data = _s.json()
  216. return response_data