import os import copy import configparser # from module.base_mode.module_aes import Aes import exifread from natsort import ns, natsorted import shutil from hashlib import sha256, md5 import requests from datetime import datetime # 获取digicam的路径 def check_install_path(other): path_list = [] path_list.append(other) path_list.append(r"D:\Program Files (x86)\digiCamControl\CameraControl.exe") path_list.append(r"C:\Program Files (x86)\digiCamControl\CameraControl.exe") path_list.append(r"D:\Program Files\digiCamControl\CameraControl.exe") path_list.append(r"C:\Program Files\digiCamControl\CameraControl.exe") for i in path_list: if os.path.exists(i): return i return "" # 输入文件夹,并检查是否是一个正常的图片文件夹。 def check_goods_folder(folder_path): all_files = os.listdir(folder_path) for file in all_files: file_path = "{}/{}".format(folder_path, file) if not os.path.isdir(file_path): continue if "原始图" in os.listdir(file_path): return folder_path # 上述检查不通过,可能是选择目录错误 if "原始图" in all_files: root_path, _ = os.path.split(folder_path) return root_path return None def download_file(url, file_path): try: root_path, file_name = os.path.split(file_path) check_path(root_path) response = requests.get(url) _content = response.content with open(file_path, 'wb') as f: f.write(_content) print("下载成功:{}".format(file_path)) except: print("下载失败:{}".format(file_path)) def calculate_sha256(file_path): """Calculate the sha256 hash of the given file.""" sha256_hash = sha256() try: with open(file_path, "rb") as f: # Read and update hash string value in blocks of 4K for byte_block in iter(lambda: f.read(4096), b""): sha256_hash.update(byte_block) return sha256_hash.hexdigest() except FileNotFoundError: print(f"The file {file_path} does not exist.") return None except Exception as e: print(f"An error occurred: {e}") return None def get_modified_time(file_path): # 获取文件最后修改的时间戳 timestamp = os.path.getmtime(file_path) # 将时间戳转换为datetime对象,并格式化为指定格式的字符串 modified_time = datetime.fromtimestamp(timestamp).strftime('%Y-%m-%d %H:%M:%S') return modified_time def download_file_to_io(url): try: # 发送GET请求 response = requests.get(url) # 检查请求是否成功 response.raise_for_status() # 如果响应状态码不是200,会抛出HTTPError异常 # 返回响应内容,以文本形式(对于HTML、JSON等文本内容) return response.text # 或者返回原始的二进制内容(对于图片、文件等非文本内容) # return response.content except requests.RequestException as e: print(f"An error occurred: {e}") return None def get_md5(file_path): data_md5 = None if os.path.isfile(file_path): f = open(file_path, 'rb') md5_obj = md5() md5_obj.update(f.read()) hash_code = md5_obj.hexdigest() f.close() data_md5 = str(hash_code).lower() return data_md5 def list_dir(path): listdir = os.listdir(path) return natsorted(listdir, alg=ns.PATH) # 通用串口数据解析器 def get_data_from_receive_data(receive_data, start, len_data, data_magnification=1): # data_magnification 数据放大倍数,或缩小倍数,默认为1 try: if len_data == 1: data = receive_data[start] return data * data_magnification elif len_data == 2: data = receive_data[start] << 8 | receive_data[start + 1] return data * data_magnification elif len_data == 4: data = receive_data[start] << 24 | receive_data[start + 1] << 16 | receive_data[start + 2] << 8 | \ receive_data[start + 3] return data * data_magnification return None except: return None def get_images(path): _Type = [".png", ".PNG", ".jpg", ".JPG", ".gif", ".GIF", ".jpge", ".JPGE", ".CR2"] image_list = [] # 过滤非图片数据 for _file in list_dir(path): file_name, e = os.path.splitext(_file) file_path = "{}/{}".format(path, _file) if os.path.isdir(file_path): continue if e in _Type and "mask" not in file_name: image_list.append( { "file_path": file_path, "file_name": file_name, "file": _file, "root_path": path, "e": e, } ) return image_list def get_image_mask(path): _Type = [".png", ".PNG", ".jpg", ".JPG", ".gif", ".GIF", ".jpge", ".JPGE", ".CR2"] image_list = [] # 过滤非图片数据 for _file in list_dir(path): file_name, e = os.path.splitext(_file) file_path = "{}/{}".format(path, _file) if os.path.isdir(file_path): continue if e in _Type and file_name == "mask": image_list.append( { "file_path": file_path, "file_name": file_name, "file": _file, "root_path": path, "e": e, } ) return image_list # 删除文件夹下的所有文件 def remove_all_file(directory): try: shutil.rmtree(directory) os.makedirs(directory) except Exception as e: print(f'Failed to clear directory {directory}. Reason: {e}') # 删除文件 def remove_files(file_path): try: if os.path.exists(file_path): os.remove(file_path) return True except BaseException as e: print("base 188", e) return False def get_folder(path): folder_list = [] for _file in list_dir(path): file_path = "{}/{}".format(path, _file) if os.path.isdir(file_path): folder_list.append( {"folder_path": file_path, "folder_name": _file, "root_path": path, "label": "待处理", # 是否需要继续处理 } ) return folder_list # 获取所有货号颜色 文件夹 def get_all_goods_art_no_folder(path, goods_art_nos): folder_list = [] if not os.path.exists(path): return folder_list if os.path.isfile(path): return folder_list temp_folder_list = get_folder(path) # for goods_art_no in goods_art_nos: for folder_data in temp_folder_list: folder_path = folder_data["folder_path"] _p = "{}/原始图".format(folder_path) if os.path.exists(_p): folder_data["folder_path"] = f"{folder_path}" folder_list.append(folder_data) return folder_list def check_move_goods_art_no_folder(path, goods_art_nos,limit_folder): """ 检查目录是否存在,如果存在,直接移动到当前日期下的目录 """ folder_list = {} if not os.path.exists(path): return folder_list if os.path.isfile(path): return folder_list temp_folder_list = get_folder(path) for goods_art_no in goods_art_nos: for folder_data in temp_folder_list: folder_path = folder_data["folder_path"] _p = "{}/{}/原始图".format(folder_path, goods_art_no) if os.path.exists(_p): folder_data["folder_path"] = f"{folder_path}/{goods_art_no}" # 整个目录移动到目标目录 folder_list[goods_art_no] = folder_data if not os.path.exists(f"{limit_folder}/{goods_art_no}"): # 目标不存在 folder_list[goods_art_no] = folder_data shutil.move(folder_data["folder_path"], limit_folder) else: # 如果希望覆盖 print(f"目标目录 {limit_folder}/{goods_art_no} 已存在,跳过移动") return folder_list def get_date_time_original(file_path): with open(file_path, "rb") as file_data: tags = exifread.process_file(file_data) if "EXIF DateTimeOriginal" in tags: return str(tags["EXIF DateTimeOriginal"]) else: return False def get_data_from_hqt(goods_number_list, get_online_data_ins): _goods_number_list = copy.deepcopy(goods_number_list) _list = [] # 单次请求数少于20个 goods_number_dict = {} while _goods_number_list: goods_art_no = _goods_number_list.pop() if "NUM" in goods_art_no: goods_art_no = goods_art_no.replace("NUM", "") _list.append(goods_art_no) if len(_list) == 20 or len(_goods_number_list) == 0: online_goods_art_data = get_online_data_ins.get_goods_art_no_info( numbers_list=_list ) if online_goods_art_data: for number in online_goods_art_data: goods_number_dict["NUM" + number] = online_goods_art_data[number] _list = [] return goods_number_dict def get_data_from_hqt_with_goods_art_no(goods_art_no_list, get_online_data_ins): _goods_art_no_list = copy.deepcopy(goods_art_no_list) _list = [] # 单次请求数少于20个 goods_art_no_dict = {} while _goods_art_no_list: goods_art_no = _goods_art_no_list.pop() _list.append(goods_art_no) if len(_list) == 20 or len(_goods_art_no_list) == 0: online_goods_art_data = get_online_data_ins.get_goods_art_no_info( goods_art_list=_list ) if online_goods_art_data: for _goods_art_no in online_goods_art_data: goods_art_no_dict[_goods_art_no] = online_goods_art_data[ _goods_art_no ] _list = [] return goods_art_no_dict def load_config_form_ini(config_path): config = configparser.ConfigParser() if os.path.exists(config_path): try: config.read(config_path, encoding="utf-8") except: config.read(config_path) return config def set_config_to_int(config_ins, config_path, data_dict, section="basicSetup"): for i in data_dict: config_ins.set(section=section, option=i, value=data_dict[i]) try: config_ins.write(open(config_path, "w", encoding="utf-8")) except: config_ins.write(open(config_path, "w")) # try: # config_ins.read(config_path, encoding="utf-8") # except: # config_ins.read(config_path) return config_ins def get_config(config_ins, key, section="basicSetup"): config_dict1 = config_ins.items(section) __config_dict = {} for i, k in config_dict1: __config_dict[i] = k if key in __config_dict: return __config_dict[key] else: return None def get_config_by_items(config_dict): __config_dict = {} for i, k in config_dict: __config_dict[i] = k return __config_dict def get_dict_value(_dict, key, default=None): if key in _dict: return _dict[key] else: return default # def set_key(authorization, SecretKey, SecretIv): # # --------------------用户身份-------------- # if authorization: # authorization = Aes().get_key(authorization, SecretKey, SecretIv) # with open("key", "w") as f: # f.write(authorization) # return authorization def print_dic(_dict): for i, v in _dict.items(): print("{}:{}".format(i, v)) print("\n") def check_path(_path): if not os.path.exists(_path): # 创建多级目录 os.makedirs(_path, exist_ok=True) # os.mkdir(_path) return True def move_folders(path_list, target_folder): if not os.path.exists(target_folder): os.makedirs(target_folder) for source_folder in path_list: shutil.move(source_folder, target_folder) # 给定一个图片路径,如果 是原始图下的则返回对应已扣图等信息。否则只返回基础信息 def get_cutout_image_info(source_image_path): if not os.path.exists(source_image_path): return None if os.path.isdir(source_image_path): return None data = {} source_root_path, source_file = os.path.split(source_image_path) source_file_name, source_file_extension = os.path.splitext(source_file) print("---------------source_root_path--------------------") print(source_root_path) print(source_root_path[-3:]) # if source_root_path[-3:] != "原始图": # return None data["source_image_path"] = source_image_path data["source_root_path"] = source_root_path data["source_file"] = source_file data["source_file_name"] = source_file_name data["source_file_extension"] = source_file_extension cutout_image_path = "{}/原始图_已抠图/{}.png".format(source_root_path[:-3], source_file_name) if not os.path.exists(cutout_image_path): data["cutout_image_path"] = None return data cutout_image_root_path, cutout_image_file = os.path.split(cutout_image_path) cutout_image_file_name, cutout_image_file_extension = os.path.splitext(cutout_image_file) data["cutout_image_path"] = cutout_image_path data["cutout_image_root_path"] = cutout_image_root_path data["cutout_image_file"] = cutout_image_file data["cutout_image_file_name"] = cutout_image_file_name data["cutout_image_file_extension"] = cutout_image_file_extension return data # 指定一个列表,指定一个需要移动的数据列表,移动到指定列表中的位置 def move_elements_inplace(lst, indices, target_index, is_real_same_folder=False): """ 在原地将列表中指定位置的两个元素移动到目标位置。 :param lst: 要操作的列表 :param indices: 一个包含两个整数的元组或列表,表示要移动的元素的索引 :param target_index: 目标插入位置的索引 """ if not isinstance(indices, (list, tuple)) or len(indices) == 0: raise ValueError("Indices must be a non-empty list or tuple of integers.") # 添加到末尾 if target_index == len(lst): if is_real_same_folder: # 倒序取出所有内容,并添加到末尾 indices.sort(reverse=True) temp = [] for i in indices: temp.append(lst.pop(i)) while temp: lst.append(temp.pop(-1)) # 重新写入索引 for index, image_label in enumerate(lst): image_label.image_index = index return # 检查索引是否有效,并排序以确保按照正确的顺序处理 valid_indices = sorted(set(indices)) # 去重并排序 if not all(0 <= idx < len(lst) for idx in valid_indices): raise IndexError("One or more indices are out of range.") if not (0 <= target_index <= len(lst)): raise IndexError("Target index is out of range.") elements_to_move = [lst[idx] for idx in valid_indices] # 如果目标位置在所有待移动元素的最大索引之后,则不需要调整目标位置 max_idx = max(valid_indices) if max_idx < target_index: pass else: # 计算需要减少的位置数(因为删除元素后,后续元素会向前移动) shift_count = sum(1 for idx in valid_indices if idx < target_index) target_index -= shift_count # 移除元素(从大索引开始以避免影响小索引处的元素) for idx in reversed(valid_indices): del lst[idx] # 插入元素到目标位置,保持它们原来的相对顺序 for i, element in enumerate(elements_to_move): lst.insert(target_index + i, element) # 重新写入索引 for index, image_label in enumerate(lst): image_label.image_index = index # 比较两个相同格式的时间字符串 def compare_two_times(time_str1, time_str2): # 定义时间格式 time_format = "%Y-%m-%d %H:%M:%S" # 将时间字符串解析为datetime对象 time1 = datetime.strptime(time_str1, time_format) time2 = datetime.strptime(time_str2, time_format) # 比较两个时间 if time1 > time2: # print(f"{time_str1} 比 {time_str2} 新。") return "left_new" elif time1 < time2: # print(f"{time_str2} 比 {time_str1} 新。") return "right_new" else: # print("两个时间相等。") return "is_same"