| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510 |
- import os
- import copy
- import configparser
- # from module.base_mode.module_aes import Aes
- import exifread
- from natsort import ns, natsorted
- import shutil
- from hashlib import sha256, md5
- import requests
- from datetime import datetime
- from settings import handle_remove_readonly
- # 获取digicam的路径
- def check_install_path(other):
- path_list = []
- path_list.append(other)
- path_list.append(r"D:\Program Files (x86)\digiCamControl\CameraControl.exe")
- path_list.append(r"C:\Program Files (x86)\digiCamControl\CameraControl.exe")
- path_list.append(r"D:\Program Files\digiCamControl\CameraControl.exe")
- path_list.append(r"C:\Program Files\digiCamControl\CameraControl.exe")
- for i in path_list:
- if os.path.exists(i):
- return i
- return ""
- # 输入文件夹,并检查是否是一个正常的图片文件夹。
- def check_goods_folder(folder_path):
- all_files = os.listdir(folder_path)
- for file in all_files:
- file_path = "{}/{}".format(folder_path, file)
- if not os.path.isdir(file_path):
- continue
- if "原始图" in os.listdir(file_path):
- return folder_path
- # 上述检查不通过,可能是选择目录错误
- if "原始图" in all_files:
- root_path, _ = os.path.split(folder_path)
- return root_path
- return None
- def download_file(url, file_path):
- try:
- root_path, file_name = os.path.split(file_path)
- check_path(root_path)
- response = requests.get(url)
- _content = response.content
- with open(file_path, 'wb') as f:
- f.write(_content)
- print("下载成功:{}".format(file_path))
- except:
- print("下载失败:{}".format(file_path))
- def calculate_sha256(file_path):
- """Calculate the sha256 hash of the given file."""
- sha256_hash = sha256()
- try:
- with open(file_path, "rb") as f:
- # Read and update hash string value in blocks of 4K
- for byte_block in iter(lambda: f.read(4096), b""):
- sha256_hash.update(byte_block)
- return sha256_hash.hexdigest()
- except FileNotFoundError:
- print(f"The file {file_path} does not exist.")
- return None
- except Exception as e:
- print(f"An error occurred: {e}")
- return None
- def get_modified_time(file_path):
- # 获取文件最后修改的时间戳
- timestamp = os.path.getmtime(file_path)
- # 将时间戳转换为datetime对象,并格式化为指定格式的字符串
- modified_time = datetime.fromtimestamp(timestamp).strftime('%Y-%m-%d %H:%M:%S')
- return modified_time
- def download_file_to_io(url):
- try:
- # 发送GET请求
- response = requests.get(url)
- # 检查请求是否成功
- response.raise_for_status() # 如果响应状态码不是200,会抛出HTTPError异常
- # 返回响应内容,以文本形式(对于HTML、JSON等文本内容)
- return response.text
- # 或者返回原始的二进制内容(对于图片、文件等非文本内容)
- # return response.content
- except requests.RequestException as e:
- print(f"An error occurred: {e}")
- return None
- def get_md5(file_path):
- data_md5 = None
- if os.path.isfile(file_path):
- f = open(file_path, 'rb')
- md5_obj = md5()
- md5_obj.update(f.read())
- hash_code = md5_obj.hexdigest()
- f.close()
- data_md5 = str(hash_code).lower()
- return data_md5
- def list_dir(path):
- listdir = os.listdir(path)
- return natsorted(listdir, alg=ns.PATH)
- # 通用串口数据解析器
- def get_data_from_receive_data(receive_data, start, len_data, data_magnification=1):
- # data_magnification 数据放大倍数,或缩小倍数,默认为1
- try:
- if len_data == 1:
- data = receive_data[start]
- return data * data_magnification
- elif len_data == 2:
- data = receive_data[start] << 8 | receive_data[start + 1]
- return data * data_magnification
- elif len_data == 4:
- data = receive_data[start] << 24 | receive_data[start + 1] << 16 | receive_data[start + 2] << 8 | \
- receive_data[start + 3]
- return data * data_magnification
- return None
- except:
- return None
- def get_images(path):
- _Type = [".png", ".PNG", ".jpg", ".JPG", ".gif", ".GIF", ".jpge", ".JPGE", ".CR2"]
- image_list = [] # 过滤非图片数据
- for _file in list_dir(path):
- file_name, e = os.path.splitext(_file)
- file_path = "{}/{}".format(path, _file)
- if os.path.isdir(file_path):
- continue
- if e in _Type and "mask" not in file_name:
- image_list.append(
- {
- "file_path": file_path,
- "file_name": file_name,
- "file": _file,
- "root_path": path,
- "e": e,
- }
- )
- return image_list
- def get_image_mask(path):
- _Type = [".png", ".PNG", ".jpg", ".JPG", ".gif", ".GIF", ".jpge", ".JPGE", ".CR2"]
- image_list = [] # 过滤非图片数据
- for _file in list_dir(path):
- file_name, e = os.path.splitext(_file)
- file_path = "{}/{}".format(path, _file)
- if os.path.isdir(file_path):
- continue
- if e in _Type and file_name == "mask":
- image_list.append(
- {
- "file_path": file_path,
- "file_name": file_name,
- "file": _file,
- "root_path": path,
- "e": e,
- }
- )
- return image_list
- # 删除文件夹下的所有文件
- def remove_all_file(directory):
- try:
- shutil.rmtree(directory,onerror=handle_remove_readonly)
- os.makedirs(directory)
- except Exception as e:
- print(f'Failed to clear directory {directory}. Reason: {e}')
- # 删除文件
- def remove_files(file_path):
- try:
- if os.path.exists(file_path):
- os.remove(file_path)
- return True
- except BaseException as e:
- print("base 188", e)
- return False
- def get_folder(path):
- folder_list = []
- for _file in list_dir(path):
- file_path = "{}/{}".format(path, _file)
- if os.path.isdir(file_path):
- folder_list.append(
- {"folder_path": file_path,
- "folder_name": _file,
- "root_path": path,
- "label": "待处理", # 是否需要继续处理
- }
- )
- return folder_list
- # 获取所有货号颜色 文件夹
- def get_all_goods_art_no_folder(path, goods_art_nos):
- folder_list = []
- if not os.path.exists(path):
- return folder_list
- if os.path.isfile(path):
- return folder_list
- temp_folder_list = get_folder(path)
- # for goods_art_no in goods_art_nos:
- for folder_data in temp_folder_list:
- folder_path = folder_data["folder_path"]
- _p = "{}/原始图".format(folder_path)
- if os.path.exists(_p):
- folder_data["folder_path"] = f"{folder_path}"
- folder_list.append(folder_data)
- return folder_list
- def check_move_goods_art_no_folder(path, goods_art_nos,limit_folder):
- """
- 检查目录是否存在,如果存在,直接移动到当前日期下的目录
- """
- folder_list = {}
- if not os.path.exists(path):
- print("path不存在", path)
- return folder_list
- if os.path.isfile(path):
- print("path是文件", path)
- return folder_list
- temp_folder_list = get_folder(path)
- for goods_art_no in goods_art_nos:
- for folder_data in temp_folder_list:
- # folder_path = folder_data["folder_path"]
- folder_name = folder_data["folder_name"]
- _p = "output/{}/{}/原始图".format(folder_name, goods_art_no)
- if os.path.exists(_p):
- folder_data["folder_path"] = f"output/{folder_name}/{goods_art_no}"
- # 整个目录移动到目标目录
- folder_list[goods_art_no] = folder_data
- if not os.path.exists(f"{limit_folder}/{goods_art_no}"):
- # 目标不存在
- try:
- shutil.move(folder_data["folder_path"], limit_folder)
- folder_list[goods_art_no] = folder_data
- print("移动目录", folder_data["folder_path"], limit_folder)
- except:
- continue
- else:
- # 如果希望覆盖
- print(f"目标目录 {limit_folder}/{goods_art_no} 已存在,跳过移动")
- return folder_list
- def get_date_time_original(file_path):
- with open(file_path, "rb") as file_data:
- tags = exifread.process_file(file_data)
- if "EXIF DateTimeOriginal" in tags:
- return str(tags["EXIF DateTimeOriginal"])
- else:
- return False
- def get_data_from_hqt(goods_number_list, get_online_data_ins):
- _goods_number_list = copy.deepcopy(goods_number_list)
- _list = []
- # 单次请求数少于20个
- goods_number_dict = {}
- while _goods_number_list:
- goods_art_no = _goods_number_list.pop()
- if "NUM" in goods_art_no:
- goods_art_no = goods_art_no.replace("NUM", "")
- _list.append(goods_art_no)
- if len(_list) == 20 or len(_goods_number_list) == 0:
- online_goods_art_data = get_online_data_ins.get_goods_art_no_info(
- numbers_list=_list
- )
- if online_goods_art_data:
- for number in online_goods_art_data:
- goods_number_dict["NUM" + number] = online_goods_art_data[number]
- _list = []
- return goods_number_dict
- def get_data_from_hqt_with_goods_art_no(goods_art_no_list, get_online_data_ins):
- _goods_art_no_list = copy.deepcopy(goods_art_no_list)
- _list = []
- # 单次请求数少于20个
- goods_art_no_dict = {}
- while _goods_art_no_list:
- goods_art_no = _goods_art_no_list.pop()
- _list.append(goods_art_no)
- if len(_list) == 20 or len(_goods_art_no_list) == 0:
- online_goods_art_data = get_online_data_ins.get_goods_art_no_info(
- goods_art_list=_list
- )
- if online_goods_art_data:
- for _goods_art_no in online_goods_art_data:
- goods_art_no_dict[_goods_art_no] = online_goods_art_data[
- _goods_art_no
- ]
- _list = []
- return goods_art_no_dict
- def load_config_form_ini(config_path):
- config = configparser.ConfigParser()
- if os.path.exists(config_path):
- try:
- config.read(config_path, encoding="utf-8")
- except:
- config.read(config_path)
- return config
- def set_config_to_int(config_ins, config_path, data_dict, section="basicSetup"):
- for i in data_dict:
- config_ins.set(section=section, option=i, value=data_dict[i])
- try:
- config_ins.write(open(config_path, "w", encoding="utf-8"))
- except:
- config_ins.write(open(config_path, "w"))
- # try:
- # config_ins.read(config_path, encoding="utf-8")
- # except:
- # config_ins.read(config_path)
- return config_ins
- def get_config(config_ins, key, section="basicSetup"):
- config_dict1 = config_ins.items(section)
- __config_dict = {}
- for i, k in config_dict1:
- __config_dict[i] = k
- if key in __config_dict:
- return __config_dict[key]
- else:
- return None
- def get_config_by_items(config_dict):
- __config_dict = {}
- for i, k in config_dict:
- __config_dict[i] = k
- return __config_dict
- def get_dict_value(_dict, key, default=None):
- if key in _dict:
- return _dict[key]
- else:
- return default
- # def set_key(authorization, SecretKey, SecretIv):
- # # --------------------用户身份--------------
- # if authorization:
- # authorization = Aes().get_key(authorization, SecretKey, SecretIv)
- # with open("key", "w") as f:
- # f.write(authorization)
- # return authorization
- def print_dic(_dict):
- for i, v in _dict.items():
- print("{}:{}".format(i, v))
- print("\n")
- def check_path(_path):
- if not os.path.exists(_path):
- # 创建多级目录
- os.makedirs(_path, exist_ok=True)
- # os.mkdir(_path)
- return True
- def move_folders(path_list, target_folder):
- if not os.path.exists(target_folder):
- os.makedirs(target_folder)
- for source_folder in path_list:
- shutil.move(source_folder, target_folder)
- # 给定一个图片路径,如果 是原始图下的则返回对应已扣图等信息。否则只返回基础信息
- def get_cutout_image_info(source_image_path):
- if not os.path.exists(source_image_path):
- return None
- if os.path.isdir(source_image_path):
- return None
- data = {}
- source_root_path, source_file = os.path.split(source_image_path)
- source_file_name, source_file_extension = os.path.splitext(source_file)
- print("---------------source_root_path--------------------")
- print(source_root_path)
- print(source_root_path[-3:])
- # if source_root_path[-3:] != "原始图":
- # return None
- data["source_image_path"] = source_image_path
- data["source_root_path"] = source_root_path
- data["source_file"] = source_file
- data["source_file_name"] = source_file_name
- data["source_file_extension"] = source_file_extension
- cutout_image_path = "{}/原始图_已抠图/{}.png".format(source_root_path[:-3], source_file_name)
- if not os.path.exists(cutout_image_path):
- data["cutout_image_path"] = None
- return data
- cutout_image_root_path, cutout_image_file = os.path.split(cutout_image_path)
- cutout_image_file_name, cutout_image_file_extension = os.path.splitext(cutout_image_file)
- data["cutout_image_path"] = cutout_image_path
- data["cutout_image_root_path"] = cutout_image_root_path
- data["cutout_image_file"] = cutout_image_file
- data["cutout_image_file_name"] = cutout_image_file_name
- data["cutout_image_file_extension"] = cutout_image_file_extension
- return data
- # 指定一个列表,指定一个需要移动的数据列表,移动到指定列表中的位置
- def move_elements_inplace(lst, indices, target_index, is_real_same_folder=False):
- """
- 在原地将列表中指定位置的两个元素移动到目标位置。
- :param lst: 要操作的列表
- :param indices: 一个包含两个整数的元组或列表,表示要移动的元素的索引
- :param target_index: 目标插入位置的索引
- """
- if not isinstance(indices, (list, tuple)) or len(indices) == 0:
- raise ValueError("Indices must be a non-empty list or tuple of integers.")
- # 添加到末尾
- if target_index == len(lst):
- if is_real_same_folder:
- # 倒序取出所有内容,并添加到末尾
- indices.sort(reverse=True)
- temp = []
- for i in indices:
- temp.append(lst.pop(i))
- while temp:
- lst.append(temp.pop(-1))
- # 重新写入索引
- for index, image_label in enumerate(lst):
- image_label.image_index = index
- return
- # 检查索引是否有效,并排序以确保按照正确的顺序处理
- valid_indices = sorted(set(indices)) # 去重并排序
- if not all(0 <= idx < len(lst) for idx in valid_indices):
- raise IndexError("One or more indices are out of range.")
- if not (0 <= target_index <= len(lst)):
- raise IndexError("Target index is out of range.")
- elements_to_move = [lst[idx] for idx in valid_indices]
- # 如果目标位置在所有待移动元素的最大索引之后,则不需要调整目标位置
- max_idx = max(valid_indices)
- if max_idx < target_index:
- pass
- else:
- # 计算需要减少的位置数(因为删除元素后,后续元素会向前移动)
- shift_count = sum(1 for idx in valid_indices if idx < target_index)
- target_index -= shift_count
- # 移除元素(从大索引开始以避免影响小索引处的元素)
- for idx in reversed(valid_indices):
- del lst[idx]
- # 插入元素到目标位置,保持它们原来的相对顺序
- for i, element in enumerate(elements_to_move):
- lst.insert(target_index + i, element)
- # 重新写入索引
- for index, image_label in enumerate(lst):
- image_label.image_index = index
- # 比较两个相同格式的时间字符串
- def compare_two_times(time_str1, time_str2):
- # 定义时间格式
- time_format = "%Y-%m-%d %H:%M:%S"
- # 将时间字符串解析为datetime对象
- time1 = datetime.strptime(time_str1, time_format)
- time2 = datetime.strptime(time_str2, time_format)
- # 比较两个时间
- if time1 > time2:
- # print(f"{time_str1} 比 {time_str2} 新。")
- return "left_new"
- elif time1 < time2:
- # print(f"{time_str2} 比 {time_str1} 新。")
- return "right_new"
- else:
- # print("两个时间相等。")
- return "is_same"
|