| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139 |
- import os
- from hashlib import sha256, md5
- from datetime import datetime
- import requests
- from natsort import ns, natsorted
- def get_md5(file_path):
- data_md5 = None
- if os.path.isfile(file_path):
- f = open(file_path, 'rb')
- md5_obj = md5()
- md5_obj.update(f.read())
- hash_code = md5_obj.hexdigest()
- f.close()
- data_md5 = str(hash_code).lower()
- return data_md5
- def get_modified_time(file_path):
- # 获取文件最后修改的时间戳
- timestamp = os.path.getmtime(file_path)
- # 将时间戳转换为datetime对象,并格式化为指定格式的字符串
- modified_time = datetime.fromtimestamp(timestamp).strftime('%Y-%m-%d %H:%M:%S')
- return modified_time
- def compare_two_times(time_str1, time_str2):
- # 定义时间格式
- time_format = "%Y-%m-%d %H:%M:%S"
- # 将时间字符串解析为datetime对象
- time1 = datetime.strptime(time_str1, time_format)
- time2 = datetime.strptime(time_str2, time_format)
- # 比较两个时间
- if time1 > time2:
- # print(f"{time_str1} 比 {time_str2} 新。")
- return "left_new"
- elif time1 < time2:
- # print(f"{time_str2} 比 {time_str1} 新。")
- return "right_new"
- else:
- # print("两个时间相等。")
- return "is_same"
- async def download_file(url, file_path):
- try:
- root_path, file_name = os.path.split(file_path)
- check_path(root_path)
- response = requests.get(url)
- _content = response.content
- with open(file_path, 'wb') as f:
- f.write(_content)
- print("下载成功:{}".format(file_path))
- except:
- print("下载失败:{}".format(file_path))
- def check_path(_path):
- if not os.path.exists(_path):
- # 创建多级目录
- os.makedirs(_path, exist_ok=True)
- # os.mkdir(_path)
- return True
- # from module.view_control.MineQWidget import MineQWidget
- def list_dir(path):
- listdir = os.listdir(path)
- return natsorted(listdir, alg=ns.PATH)
- def get_folder(path):
- folder_list = []
- for _file in list_dir(path):
- file_path = "{}/{}".format(path, _file)
- if os.path.isdir(file_path):
- folder_list.append(
- {
- "folder_path": file_path,
- "folder_name": _file,
- "root_path": path,
- "label": "待处理", # 是否需要继续处理
- }
- )
- return folder_list
- def get_images(path):
- _Type = [".png", ".PNG", ".jpg", ".JPG", ".gif", ".GIF", ".jpge", ".JPGE", ".CR2"]
- image_list = [] # 过滤非图片数据
- for _file in list_dir(path):
- file_name, e = os.path.splitext(_file)
- file_path = "{}/{}".format(path, _file)
- if os.path.isdir(file_path):
- continue
- if e in _Type and "mask" not in file_name:
- image_list.append(
- {
- "file_path": file_path,
- "file_name": file_name,
- "file": _file,
- "root_path": path,
- "e": e,
- }
- )
- return image_list
- def httpGetHandler(url, params=None, headers=None):
- """
- 发送GET请求
- :param endpoint: 请求的端点
- :param params: 查询参数
- :param headers: 请求头
- :return: 响应对象
- """
- try:
- response = requests.get(url, params=params, headers=headers)
- response.raise_for_status() # 如果响应状态码不是200,抛出异常
- return response
- except requests.exceptions.RequestException as e:
- print(f"GET请求失败: {e}")
- return None
- def httpPosthandler(url, data=None, json=None, headers=None):
- """
- 发送POST请求
- :param endpoint: 请求的端点
- :param data: 表单数据
- :param json: JSON数据
- :param headers: 请求头
- :return: 响应对象
- """
- try:
- response = requests.post(url, data=data, json=json, headers=headers)
- response.raise_for_status() # 如果响应状态码不是200,抛出异常
- return response
- except requests.exceptions.RequestException as e:
- print(f"POST请求失败: {e}")
- return None
|