utils_func.py 8.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242
  1. import os
  2. from hashlib import sha256, md5
  3. from datetime import datetime
  4. import requests
  5. from natsort import ns, natsorted
  6. def get_md5(file_path):
  7. data_md5 = None
  8. if os.path.isfile(file_path):
  9. f = open(file_path, 'rb')
  10. md5_obj = md5()
  11. md5_obj.update(f.read())
  12. hash_code = md5_obj.hexdigest()
  13. f.close()
  14. data_md5 = str(hash_code).lower()
  15. return data_md5
  16. def get_modified_time(file_path):
  17. # 获取文件最后修改的时间戳
  18. timestamp = os.path.getmtime(file_path)
  19. # 将时间戳转换为datetime对象,并格式化为指定格式的字符串
  20. modified_time = datetime.fromtimestamp(timestamp).strftime('%Y-%m-%d %H:%M:%S')
  21. return modified_time
  22. def compare_two_times(time_str1, time_str2):
  23. # 定义时间格式
  24. time_format = "%Y-%m-%d %H:%M:%S"
  25. # 将时间字符串解析为datetime对象
  26. time1 = datetime.strptime(time_str1, time_format)
  27. time2 = datetime.strptime(time_str2, time_format)
  28. # 比较两个时间
  29. if time1 > time2:
  30. # print(f"{time_str1} 比 {time_str2} 新。")
  31. return "left_new"
  32. elif time1 < time2:
  33. # print(f"{time_str2} 比 {time_str1} 新。")
  34. return "right_new"
  35. else:
  36. # print("两个时间相等。")
  37. return "is_same"
  38. async def download_file(url, file_path):
  39. try:
  40. root_path, file_name = os.path.split(file_path)
  41. check_path(root_path)
  42. response = requests.get(url)
  43. _content = response.content
  44. with open(file_path, 'wb') as f:
  45. f.write(_content)
  46. print("下载成功:{}".format(file_path))
  47. except:
  48. print("下载失败:{}".format(file_path))
  49. def check_path(_path):
  50. if not os.path.exists(_path):
  51. # 创建多级目录
  52. print("创建目录", _path)
  53. os.makedirs(_path, exist_ok=True)
  54. # os.mkdir(_path)
  55. return True
  56. # from module.view_control.MineQWidget import MineQWidget
  57. def list_dir(path):
  58. listdir = os.listdir(path)
  59. return natsorted(listdir, alg=ns.PATH)
  60. def get_folder(path):
  61. folder_list = []
  62. for _file in list_dir(path):
  63. file_path = "{}/{}".format(path, _file)
  64. if os.path.isdir(file_path):
  65. folder_list.append(
  66. {
  67. "folder_path": file_path,
  68. "folder_name": _file,
  69. "root_path": path,
  70. "label": "待处理", # 是否需要继续处理
  71. }
  72. )
  73. return folder_list
  74. def get_images(path):
  75. _Type = [".png", ".PNG", ".jpg", ".JPG", ".gif", ".GIF", ".jpge", ".JPGE", ".CR2"]
  76. image_list = [] # 过滤非图片数据
  77. for _file in list_dir(path):
  78. file_name, e = os.path.splitext(_file)
  79. file_path = "{}/{}".format(path, _file)
  80. if os.path.isdir(file_path):
  81. continue
  82. if e in _Type and "mask" not in file_name:
  83. image_list.append(
  84. {
  85. "file_path": file_path,
  86. "file_name": file_name,
  87. "file": _file,
  88. "root_path": path,
  89. "e": e,
  90. }
  91. )
  92. return image_list
  93. def httpGetHandler(url, params=None, headers=None):
  94. """
  95. 发送GET请求
  96. :param endpoint: 请求的端点
  97. :param params: 查询参数
  98. :param headers: 请求头
  99. :return: 响应对象
  100. """
  101. try:
  102. response = requests.get(url, params=params, headers=headers)
  103. response.raise_for_status() # 如果响应状态码不是200,抛出异常
  104. return response
  105. except requests.exceptions.RequestException as e:
  106. print(f"GET请求失败: {e}")
  107. return None
  108. def httpPosthandler(url, data=None, json=None, headers=None):
  109. """
  110. 发送POST请求
  111. :param endpoint: 请求的端点
  112. :param data: 表单数据
  113. :param json: JSON数据
  114. :param headers: 请求头
  115. :return: 响应对象
  116. """
  117. try:
  118. response = requests.post(url, data=data, json=json, headers=headers)
  119. response.raise_for_status() # 如果响应状态码不是200,抛出异常
  120. return response
  121. except requests.exceptions.RequestException as e:
  122. print(f"POST请求失败: {e}")
  123. return None
  124. # 指定一个列表,指定一个需要移动的数据列表,移动到指定列表中的位置
  125. def move_elements_inplace(lst, indices, target_index, is_real_same_folder=False):
  126. """
  127. 在原地将列表中指定位置的两个元素移动到目标位置。
  128. :param lst: 要操作的列表
  129. :param indices: 一个包含两个整数的元组或列表,表示要移动的元素的索引
  130. :param target_index: 目标插入位置的索引
  131. """
  132. if not isinstance(indices, (list, tuple)) or len(indices) == 0:
  133. raise ValueError("Indices must be a non-empty list or tuple of integers.")
  134. # 添加到末尾
  135. if target_index == len(lst):
  136. if is_real_same_folder:
  137. # 倒序取出所有内容,并添加到末尾
  138. indices.sort(reverse=True)
  139. temp = []
  140. for i in indices:
  141. temp.append(lst.pop(i))
  142. while temp:
  143. lst.append(temp.pop(-1))
  144. # 重新写入索引
  145. for index, image_label in enumerate(lst):
  146. image_label.image_index = index
  147. return
  148. # 检查索引是否有效,并排序以确保按照正确的顺序处理
  149. valid_indices = sorted(set(indices)) # 去重并排序
  150. if not all(0 <= idx < len(lst) for idx in valid_indices):
  151. raise IndexError("One or more indices are out of range.")
  152. if not (0 <= target_index <= len(lst)):
  153. raise IndexError("Target index is out of range.")
  154. elements_to_move = [lst[idx] for idx in valid_indices]
  155. # 如果目标位置在所有待移动元素的最大索引之后,则不需要调整目标位置
  156. max_idx = max(valid_indices)
  157. if max_idx < target_index:
  158. pass
  159. else:
  160. # 计算需要减少的位置数(因为删除元素后,后续元素会向前移动)
  161. shift_count = sum(1 for idx in valid_indices if idx < target_index)
  162. target_index -= shift_count
  163. # 移除元素(从大索引开始以避免影响小索引处的元素)
  164. for idx in reversed(valid_indices):
  165. del lst[idx]
  166. # 插入元素到目标位置,保持它们原来的相对顺序
  167. for i, element in enumerate(elements_to_move):
  168. lst.insert(target_index + i, element)
  169. # 重新写入索引
  170. for index, image_label in enumerate(lst):
  171. image_label.image_index = index
  172. # 给定一个图片路径,如果 是原始图下的则返回对应已扣图等信息。否则只返回基础信息
  173. def get_cutout_image_info(source_image_path):
  174. if not os.path.exists(source_image_path):
  175. return None
  176. if os.path.isdir(source_image_path):
  177. return None
  178. data = {}
  179. source_root_path, source_file = os.path.split(source_image_path)
  180. source_file_name, source_file_extension = os.path.splitext(source_file)
  181. print("---------------source_root_path--------------------")
  182. print(source_root_path)
  183. print(source_root_path[-3:])
  184. # if source_root_path[-3:] != "原始图":
  185. # return None
  186. data["source_image_path"] = source_image_path
  187. data["source_root_path"] = source_root_path
  188. data["source_file"] = source_file
  189. data["source_file_name"] = source_file_name
  190. data["source_file_extension"] = source_file_extension
  191. cutout_image_path = "{}/原始图_已抠图/{}.png".format(
  192. source_root_path[:-3], source_file_name
  193. )
  194. if not os.path.exists(cutout_image_path):
  195. data["cutout_image_path"] = None
  196. return data
  197. cutout_image_root_path, cutout_image_file = os.path.split(cutout_image_path)
  198. cutout_image_file_name, cutout_image_file_extension = os.path.splitext(
  199. cutout_image_file
  200. )
  201. data["cutout_image_path"] = cutout_image_path
  202. data["cutout_image_root_path"] = cutout_image_root_path
  203. data["cutout_image_file"] = cutout_image_file
  204. data["cutout_image_file_name"] = cutout_image_file_name
  205. data["cutout_image_file_extension"] = cutout_image_file_extension
  206. return data