| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351 |
- import os
- from hashlib import sha256, md5
- from datetime import datetime
- import requests
- from natsort import ns, natsorted
- def get_md5(file_path):
- data_md5 = None
- if os.path.isfile(file_path):
- f = open(file_path, 'rb')
- md5_obj = md5()
- md5_obj.update(f.read())
- hash_code = md5_obj.hexdigest()
- f.close()
- data_md5 = str(hash_code).lower()
- return data_md5
- def get_modified_time(file_path):
- # 获取文件最后修改的时间戳
- timestamp = os.path.getmtime(file_path)
- # 将时间戳转换为datetime对象,并格式化为指定格式的字符串
- modified_time = datetime.fromtimestamp(timestamp).strftime('%Y-%m-%d %H:%M:%S')
- return modified_time
- def compare_two_times(time_str1, time_str2):
- # 定义时间格式
- time_format = "%Y-%m-%d %H:%M:%S"
- # 将时间字符串解析为datetime对象
- time1 = datetime.strptime(time_str1, time_format)
- time2 = datetime.strptime(time_str2, time_format)
- # 比较两个时间
- if time1 > time2:
- # print(f"{time_str1} 比 {time_str2} 新。")
- return "left_new"
- elif time1 < time2:
- # print(f"{time_str2} 比 {time_str1} 新。")
- return "right_new"
- else:
- # print("两个时间相等。")
- return "is_same"
- async def download_file(url, file_path):
- try:
- root_path, file_name = os.path.split(file_path)
- check_path(root_path)
- response = requests.get(url)
- _content = response.content
- with open(file_path, 'wb') as f:
- f.write(_content)
- print("下载成功:{}".format(file_path))
- except:
- print("下载失败:{}".format(file_path))
- def check_path(_path):
- if not os.path.exists(_path):
- # 创建多级目录
- print("创建目录", _path)
- os.makedirs(_path, exist_ok=True)
- # os.mkdir(_path)
- return True
- # from module.view_control.MineQWidget import MineQWidget
- def list_dir(path):
- listdir = os.listdir(path)
- return natsorted(listdir, alg=ns.PATH)
- def get_folder(path):
- folder_list = []
- for _file in list_dir(path):
- file_path = "{}/{}".format(path, _file)
- if os.path.isdir(file_path):
- folder_list.append(
- {
- "folder_path": file_path,
- "folder_name": _file,
- "root_path": path,
- "label": "待处理", # 是否需要继续处理
- }
- )
- return folder_list
- def get_images(path):
- _Type = [".png", ".PNG", ".jpg", ".JPG", ".gif", ".GIF", ".jpge", ".JPGE", ".CR2"]
- image_list = [] # 过滤非图片数据
- for _file in list_dir(path):
- file_name, e = os.path.splitext(_file)
- file_path = "{}/{}".format(path, _file)
- if os.path.isdir(file_path):
- continue
- if e in _Type and "mask" not in file_name:
- image_list.append(
- {
- "file_path": file_path,
- "file_name": file_name,
- "file": _file,
- "root_path": path,
- "e": e,
- }
- )
- return image_list
- def httpGetHandler(url, params=None, headers=None):
- """
- 发送GET请求
- :param endpoint: 请求的端点
- :param params: 查询参数
- :param headers: 请求头
- :return: 响应对象
- """
- try:
- response = requests.get(url, params=params, headers=headers)
- response.raise_for_status() # 如果响应状态码不是200,抛出异常
- return response
- except requests.exceptions.RequestException as e:
- print(f"GET请求失败: {e}")
- return None
- def httpPosthandler(url, data=None, json=None, headers=None):
- """
- 发送POST请求
- :param endpoint: 请求的端点
- :param data: 表单数据
- :param json: JSON数据
- :param headers: 请求头
- :return: 响应对象
- """
- try:
- response = requests.post(url, data=data, json=json, headers=headers)
- response.raise_for_status() # 如果响应状态码不是200,抛出异常
- return response
- except requests.exceptions.RequestException as e:
- print(f"POST请求失败: {e}")
- return None
- # 指定一个列表,指定一个需要移动的数据列表,移动到指定列表中的位置
- def move_elements_inplace(lst, indices, target_index, is_real_same_folder=False):
- """
- 在原地将列表中指定位置的两个元素移动到目标位置。
- :param lst: 要操作的列表
- :param indices: 一个包含两个整数的元组或列表,表示要移动的元素的索引
- :param target_index: 目标插入位置的索引
- """
- if not isinstance(indices, (list, tuple)) or len(indices) == 0:
- raise ValueError("Indices must be a non-empty list or tuple of integers.")
- # 添加到末尾
- if target_index == len(lst):
- if is_real_same_folder:
- # 倒序取出所有内容,并添加到末尾
- indices.sort(reverse=True)
- temp = []
- for i in indices:
- temp.append(lst.pop(i))
- while temp:
- lst.append(temp.pop(-1))
- # 重新写入索引
- for index, image_label in enumerate(lst):
- image_label.image_index = index
- return
- # 检查索引是否有效,并排序以确保按照正确的顺序处理
- valid_indices = sorted(set(indices)) # 去重并排序
- if not all(0 <= idx < len(lst) for idx in valid_indices):
- raise IndexError("One or more indices are out of range.")
- if not (0 <= target_index <= len(lst)):
- raise IndexError("Target index is out of range.")
- elements_to_move = [lst[idx] for idx in valid_indices]
- # 如果目标位置在所有待移动元素的最大索引之后,则不需要调整目标位置
- max_idx = max(valid_indices)
- if max_idx < target_index:
- pass
- else:
- # 计算需要减少的位置数(因为删除元素后,后续元素会向前移动)
- shift_count = sum(1 for idx in valid_indices if idx < target_index)
- target_index -= shift_count
- # 移除元素(从大索引开始以避免影响小索引处的元素)
- for idx in reversed(valid_indices):
- del lst[idx]
- # 插入元素到目标位置,保持它们原来的相对顺序
- for i, element in enumerate(elements_to_move):
- lst.insert(target_index + i, element)
- # 重新写入索引
- for index, image_label in enumerate(lst):
- image_label.image_index = index
- # 给定一个图片路径,如果 是原始图下的则返回对应已扣图等信息。否则只返回基础信息
- def get_cutout_image_info(source_image_path):
- if not os.path.exists(source_image_path):
- return None
- if os.path.isdir(source_image_path):
- return None
- data = {}
- source_root_path, source_file = os.path.split(source_image_path)
- source_file_name, source_file_extension = os.path.splitext(source_file)
- print("---------------source_root_path--------------------")
- print(source_root_path)
- print(source_root_path[-3:])
- # if source_root_path[-3:] != "原始图":
- # return None
- data["source_image_path"] = source_image_path
- data["source_root_path"] = source_root_path
- data["source_file"] = source_file
- data["source_file_name"] = source_file_name
- data["source_file_extension"] = source_file_extension
- cutout_image_path = "{}/原始图_已抠图/{}.png".format(
- source_root_path[:-3], source_file_name
- )
- if not os.path.exists(cutout_image_path):
- data["cutout_image_path"] = None
- return data
- cutout_image_root_path, cutout_image_file = os.path.split(cutout_image_path)
- cutout_image_file_name, cutout_image_file_extension = os.path.splitext(
- cutout_image_file
- )
- data["cutout_image_path"] = cutout_image_path
- data["cutout_image_root_path"] = cutout_image_root_path
- data["cutout_image_file"] = cutout_image_file
- data["cutout_image_file_name"] = cutout_image_file_name
- data["cutout_image_file_extension"] = cutout_image_file_extension
- return data
- def dynamic_parameter_issuance_get(receive_data):
- # 动态解析参数,并进行下发,第一个为功能码,后续为参数。
- func_code = receive_data[1] << 8 | receive_data[2]
- status_code = receive_data[3] << 8 | receive_data[4]
- par_len = receive_data[5]
- par_data = receive_data[6:]
- out_par_data_list = []
- if par_len > 0:
- chunk_size = len(par_data) // par_len
- par_data_list = [
- par_data[i : i + chunk_size] for i in range(0, len(par_data), chunk_size)
- ]
- for one_par in par_data_list:
- start = 0
- _value = (
- one_par[start] << 40
- | one_par[start + 1] << 32
- | one_par[start + 2] << 24
- | one_par[start + 3] << 16
- | one_par[start + 4] << 8
- | one_par[start + 5]
- )
- start = start + 5
- _dir = 1 if one_par[start + 1] == 1 else -1
- _type = "int" if one_par[start + 2] == 1 else "float"
- _precision = one_par[start + 3]
- if _dir < 0:
- _value = _value * _dir
- if _type == "float":
- if _precision > 0:
- _round_x = _precision
- _precision = _precision * -1
- _value = _value * 10**_precision
- _value = round(_value, _round_x)
- out_par_data_list.append(_value)
- return func_code, status_code, out_par_data_list
- def calculation_value(value=None):
- buf = []
- # 地址、正负、类型(整数、浮点)、精度(0.01)、数据(6个字节)、是否只读
- _dir = 1 if value >= 0 else 0 # 是否正负
- _type = 1 if isinstance(value, int) else 0 # 1是整数
- if _type == 0:
- _precision = len(str(value).split(".")[1]) # 精度
- if _precision > 4:
- _precision = 4
- abs_int_value = int(abs(value * 10**_precision))
- else:
- abs_int_value = abs(value)
- _precision = 0
- buf.extend(
- [
- 0xFF & abs_int_value >> 40,
- 0xFF & abs_int_value >> 32,
- 0xFF & abs_int_value >> 24,
- 0xFF & abs_int_value >> 16,
- 0xFF & abs_int_value >> 8,
- 0xFF & abs_int_value,
- ]
- )
- buf.extend([_dir, _type, _precision])
- # print(abs_int_value, _dir, _type, _precision)
- return buf
- def read_cmd_test(receive_data):
- if len(receive_data) < 4:
- return False
- if receive_data[0] == 0x55 and receive_data[1] == 0x55:
- data_len = receive_data[2]
- if len(receive_data) < data_len + 4:
- return False
- _data = receive_data[3 : data_len + 4]
- # 校验数据
- if 0xFF & ~sum(_data[:-1]) == _data[-1]:
- return _data[:-1]
- else:
- return False
- else:
- return False
- # 动态参数生成
- def dynamic_parameter_issuance_send(func_code, status_code, par_data_list=None):
- # 动态解析参数,并进行下发,第一个为功能码,后续为参数。
- _buf = [
- 0xFF & func_code >> 8,
- 0xFF & func_code,
- 0xFF & status_code >> 8,
- 0xFF & status_code,
- 0 if not par_data_list else len(par_data_list),
- ]
- if par_data_list:
- for par_data in par_data_list:
- _buf.extend(calculation_value(par_data))
- return _buf
- def cmd_send_test(data):
- buf = []
- buf.extend([0x55, 0x55, (0xFF & len(data))])
- buf.extend(data)
- buf.extend([0xFF & ~sum(data)])
- return buf
|