base.py 17 KB


  1. import os
  2. import copy
  3. import configparser
  4. # from module.base_mode.module_aes import Aes
  5. import exifread
  6. from natsort import ns, natsorted
  7. import shutil
  8. from hashlib import sha256, md5
  9. import requests
  10. from datetime import datetime
  11. # 获取digicam的路径
  12. def check_install_path(other):
  13. path_list = []
  14. path_list.append(other)
  15. path_list.append(r"D:\Program Files (x86)\digiCamControl\CameraControl.exe")
  16. path_list.append(r"C:\Program Files (x86)\digiCamControl\CameraControl.exe")
  17. path_list.append(r"D:\Program Files\digiCamControl\CameraControl.exe")
  18. path_list.append(r"C:\Program Files\digiCamControl\CameraControl.exe")
  19. for i in path_list:
  20. if os.path.exists(i):
  21. return i
  22. return ""
  23. # 输入文件夹,并检查是否是一个正常的图片文件夹。
  24. def check_goods_folder(folder_path):
  25. all_files = os.listdir(folder_path)
  26. for file in all_files:
  27. file_path = "{}/{}".format(folder_path, file)
  28. if not os.path.isdir(file_path):
  29. continue
  30. if "原始图" in os.listdir(file_path):
  31. return folder_path
  32. # 上述检查不通过,可能是选择目录错误
  33. if "原始图" in all_files:
  34. root_path, _ = os.path.split(folder_path)
  35. return root_path
  36. return None
  37. def download_file(url, file_path):
  38. try:
  39. root_path, file_name = os.path.split(file_path)
  40. check_path(root_path)
  41. response = requests.get(url)
  42. _content = response.content
  43. with open(file_path, 'wb') as f:
  44. f.write(_content)
  45. print("下载成功:{}".format(file_path))
  46. except:
  47. print("下载失败:{}".format(file_path))
  48. def calculate_sha256(file_path):
  49. """Calculate the sha256 hash of the given file."""
  50. sha256_hash = sha256()
  51. try:
  52. with open(file_path, "rb") as f:
  53. # Read and update hash string value in blocks of 4K
  54. for byte_block in iter(lambda: f.read(4096), b""):
  55. sha256_hash.update(byte_block)
  56. return sha256_hash.hexdigest()
  57. except FileNotFoundError:
  58. print(f"The file {file_path} does not exist.")
  59. return None
  60. except Exception as e:
  61. print(f"An error occurred: {e}")
  62. return None
  63. def get_modified_time(file_path):
  64. # 获取文件最后修改的时间戳
  65. timestamp = os.path.getmtime(file_path)
  66. # 将时间戳转换为datetime对象,并格式化为指定格式的字符串
  67. modified_time = datetime.fromtimestamp(timestamp).strftime('%Y-%m-%d %H:%M:%S')
  68. return modified_time
  69. def download_file_to_io(url):
  70. try:
  71. # 发送GET请求
  72. response = requests.get(url)
  73. # 检查请求是否成功
  74. response.raise_for_status() # 如果响应状态码不是200,会抛出HTTPError异常
  75. # 返回响应内容,以文本形式(对于HTML、JSON等文本内容)
  76. return response.text
  77. # 或者返回原始的二进制内容(对于图片、文件等非文本内容)
  78. # return response.content
  79. except requests.RequestException as e:
  80. print(f"An error occurred: {e}")
  81. return None
  82. def get_md5(file_path):
  83. data_md5 = None
  84. if os.path.isfile(file_path):
  85. f = open(file_path, 'rb')
  86. md5_obj = md5()
  87. md5_obj.update(f.read())
  88. hash_code = md5_obj.hexdigest()
  89. f.close()
  90. data_md5 = str(hash_code).lower()
  91. return data_md5
  92. def list_dir(path):
  93. listdir = os.listdir(path)
  94. return natsorted(listdir, alg=ns.PATH)
  95. # 通用串口数据解析器
  96. def get_data_from_receive_data(receive_data, start, len_data, data_magnification=1):
  97. # data_magnification 数据放大倍数,或缩小倍数,默认为1
  98. try:
  99. if len_data == 1:
  100. data = receive_data[start]
  101. return data * data_magnification
  102. elif len_data == 2:
  103. data = receive_data[start] << 8 | receive_data[start + 1]
  104. return data * data_magnification
  105. elif len_data == 4:
  106. data = receive_data[start] << 24 | receive_data[start + 1] << 16 | receive_data[start + 2] << 8 | \
  107. receive_data[start + 3]
  108. return data * data_magnification
  109. return None
  110. except:
  111. return None
  112. def get_images(path):
  113. _Type = [".png", ".PNG", ".jpg", ".JPG", ".gif", ".GIF", ".jpge", ".JPGE", ".CR2"]
  114. image_list = [] # 过滤非图片数据
  115. for _file in list_dir(path):
  116. file_name, e = os.path.splitext(_file)
  117. file_path = "{}/{}".format(path, _file)
  118. if os.path.isdir(file_path):
  119. continue
  120. if e in _Type and "mask" not in file_name:
  121. image_list.append(
  122. {
  123. "file_path": file_path,
  124. "file_name": file_name,
  125. "file": _file,
  126. "root_path": path,
  127. "e": e,
  128. }
  129. )
  130. return image_list
  131. def get_image_mask(path):
  132. _Type = [".png", ".PNG", ".jpg", ".JPG", ".gif", ".GIF", ".jpge", ".JPGE", ".CR2"]
  133. image_list = [] # 过滤非图片数据
  134. for _file in list_dir(path):
  135. file_name, e = os.path.splitext(_file)
  136. file_path = "{}/{}".format(path, _file)
  137. if os.path.isdir(file_path):
  138. continue
  139. if e in _Type and file_name == "mask":
  140. image_list.append(
  141. {
  142. "file_path": file_path,
  143. "file_name": file_name,
  144. "file": _file,
  145. "root_path": path,
  146. "e": e,
  147. }
  148. )
  149. return image_list
  150. # 删除文件夹下的所有文件
  151. def remove_all_file(directory):
  152. try:
  153. shutil.rmtree(directory)
  154. os.makedirs(directory)
  155. except Exception as e:
  156. print(f'Failed to clear directory {directory}. Reason: {e}')
  157. # 删除文件
  158. def remove_files(file_path):
  159. try:
  160. if os.path.exists(file_path):
  161. os.remove(file_path)
  162. return True
  163. except BaseException as e:
  164. print("base 188", e)
  165. return False
  166. def get_folder(path):
  167. folder_list = []
  168. for _file in list_dir(path):
  169. file_path = "{}/{}".format(path, _file)
  170. if os.path.isdir(file_path):
  171. folder_list.append(
  172. {"folder_path": file_path,
  173. "folder_name": _file,
  174. "root_path": path,
  175. "label": "待处理", # 是否需要继续处理
  176. }
  177. )
  178. return folder_list
  179. # 获取所有货号颜色 文件夹
  180. def get_all_goods_art_no_folder(path, goods_art_nos):
  181. folder_list = []
  182. if not os.path.exists(path):
  183. return folder_list
  184. if os.path.isfile(path):
  185. return folder_list
  186. temp_folder_list = get_folder(path)
  187. # for goods_art_no in goods_art_nos:
  188. for folder_data in temp_folder_list:
  189. folder_path = folder_data["folder_path"]
  190. _p = "{}/原始图".format(folder_path)
  191. if os.path.exists(_p):
  192. folder_data["folder_path"] = f"{folder_path}"
  193. folder_list.append(folder_data)
  194. return folder_list
  195. def check_move_goods_art_no_folder(path, goods_art_nos,limit_folder):
  196. """
  197. 检查目录是否存在,如果存在,直接移动到当前日期下的目录
  198. """
  199. folder_list = {}
  200. if not os.path.exists(path):
  201. return folder_list
  202. if os.path.isfile(path):
  203. return folder_list
  204. temp_folder_list = get_folder(path)
  205. for goods_art_no in goods_art_nos:
  206. for folder_data in temp_folder_list:
  207. folder_path = folder_data["folder_path"]
  208. _p = "{}/{}/原始图".format(folder_path, goods_art_no)
  209. if os.path.exists(_p):
  210. folder_data["folder_path"] = f"{folder_path}/{goods_art_no}"
  211. # 整个目录移动到目标目录
  212. folder_list[goods_art_no] = folder_data
  213. if not os.path.exists(f"{limit_folder}/{goods_art_no}"):
  214. # 目标不存在
  215. folder_list[goods_art_no] = folder_data
  216. shutil.move(folder_data["folder_path"], limit_folder)
  217. else:
  218. # 如果希望覆盖
  219. print(f"目标目录 {limit_folder}/{goods_art_no} 已存在,跳过移动")
  220. return folder_list
  221. def get_date_time_original(file_path):
  222. with open(file_path, "rb") as file_data:
  223. tags = exifread.process_file(file_data)
  224. if "EXIF DateTimeOriginal" in tags:
  225. return str(tags["EXIF DateTimeOriginal"])
  226. else:
  227. return False
  228. def get_data_from_hqt(goods_number_list, get_online_data_ins):
  229. _goods_number_list = copy.deepcopy(goods_number_list)
  230. _list = []
  231. # 单次请求数少于20个
  232. goods_number_dict = {}
  233. while _goods_number_list:
  234. goods_art_no = _goods_number_list.pop()
  235. if "NUM" in goods_art_no:
  236. goods_art_no = goods_art_no.replace("NUM", "")
  237. _list.append(goods_art_no)
  238. if len(_list) == 20 or len(_goods_number_list) == 0:
  239. online_goods_art_data = get_online_data_ins.get_goods_art_no_info(
  240. numbers_list=_list
  241. )
  242. if online_goods_art_data:
  243. for number in online_goods_art_data:
  244. goods_number_dict["NUM" + number] = online_goods_art_data[number]
  245. _list = []
  246. return goods_number_dict
  247. def get_data_from_hqt_with_goods_art_no(goods_art_no_list, get_online_data_ins):
  248. _goods_art_no_list = copy.deepcopy(goods_art_no_list)
  249. _list = []
  250. # 单次请求数少于20个
  251. goods_art_no_dict = {}
  252. while _goods_art_no_list:
  253. goods_art_no = _goods_art_no_list.pop()
  254. _list.append(goods_art_no)
  255. if len(_list) == 20 or len(_goods_art_no_list) == 0:
  256. online_goods_art_data = get_online_data_ins.get_goods_art_no_info(
  257. goods_art_list=_list
  258. )
  259. if online_goods_art_data:
  260. for _goods_art_no in online_goods_art_data:
  261. goods_art_no_dict[_goods_art_no] = online_goods_art_data[
  262. _goods_art_no
  263. ]
  264. _list = []
  265. return goods_art_no_dict
  266. def load_config_form_ini(config_path):
  267. config = configparser.ConfigParser()
  268. if os.path.exists(config_path):
  269. try:
  270. config.read(config_path, encoding="utf-8")
  271. except:
  272. config.read(config_path)
  273. return config
  274. def set_config_to_int(config_ins, config_path, data_dict, section="basicSetup"):
  275. for i in data_dict:
  276. config_ins.set(section=section, option=i, value=data_dict[i])
  277. try:
  278. config_ins.write(open(config_path, "w", encoding="utf-8"))
  279. except:
  280. config_ins.write(open(config_path, "w"))
  281. # try:
  282. # config_ins.read(config_path, encoding="utf-8")
  283. # except:
  284. # config_ins.read(config_path)
  285. return config_ins
  286. def get_config(config_ins, key, section="basicSetup"):
  287. config_dict1 = config_ins.items(section)
  288. __config_dict = {}
  289. for i, k in config_dict1:
  290. __config_dict[i] = k
  291. if key in __config_dict:
  292. return __config_dict[key]
  293. else:
  294. return None
  295. def get_config_by_items(config_dict):
  296. __config_dict = {}
  297. for i, k in config_dict:
  298. __config_dict[i] = k
  299. return __config_dict
  300. def get_dict_value(_dict, key, default=None):
  301. if key in _dict:
  302. return _dict[key]
  303. else:
  304. return default
  305. # def set_key(authorization, SecretKey, SecretIv):
  306. # # --------------------用户身份--------------
  307. # if authorization:
  308. # authorization = Aes().get_key(authorization, SecretKey, SecretIv)
  309. # with open("key", "w") as f:
  310. # f.write(authorization)
  311. # return authorization
  312. def print_dic(_dict):
  313. for i, v in _dict.items():
  314. print("{}:{}".format(i, v))
  315. print("\n")
  316. def check_path(_path):
  317. if not os.path.exists(_path):
  318. # 创建多级目录
  319. os.makedirs(_path, exist_ok=True)
  320. # os.mkdir(_path)
  321. return True
  322. def move_folders(path_list, target_folder):
  323. if not os.path.exists(target_folder):
  324. os.makedirs(target_folder)
  325. for source_folder in path_list:
  326. shutil.move(source_folder, target_folder)
  327. # 给定一个图片路径,如果 是原始图下的则返回对应已扣图等信息。否则只返回基础信息
  328. def get_cutout_image_info(source_image_path):
  329. if not os.path.exists(source_image_path):
  330. return None
  331. if os.path.isdir(source_image_path):
  332. return None
  333. data = {}
  334. source_root_path, source_file = os.path.split(source_image_path)
  335. source_file_name, source_file_extension = os.path.splitext(source_file)
  336. print("---------------source_root_path--------------------")
  337. print(source_root_path)
  338. print(source_root_path[-3:])
  339. # if source_root_path[-3:] != "原始图":
  340. # return None
  341. data["source_image_path"] = source_image_path
  342. data["source_root_path"] = source_root_path
  343. data["source_file"] = source_file
  344. data["source_file_name"] = source_file_name
  345. data["source_file_extension"] = source_file_extension
  346. cutout_image_path = "{}/原始图_已抠图/{}.png".format(source_root_path[:-3], source_file_name)
  347. if not os.path.exists(cutout_image_path):
  348. data["cutout_image_path"] = None
  349. return data
  350. cutout_image_root_path, cutout_image_file = os.path.split(cutout_image_path)
  351. cutout_image_file_name, cutout_image_file_extension = os.path.splitext(cutout_image_file)
  352. data["cutout_image_path"] = cutout_image_path
  353. data["cutout_image_root_path"] = cutout_image_root_path
  354. data["cutout_image_file"] = cutout_image_file
  355. data["cutout_image_file_name"] = cutout_image_file_name
  356. data["cutout_image_file_extension"] = cutout_image_file_extension
  357. return data
  358. # 指定一个列表,指定一个需要移动的数据列表,移动到指定列表中的位置
  359. def move_elements_inplace(lst, indices, target_index, is_real_same_folder=False):
  360. """
  361. 在原地将列表中指定位置的两个元素移动到目标位置。
  362. :param lst: 要操作的列表
  363. :param indices: 一个包含两个整数的元组或列表,表示要移动的元素的索引
  364. :param target_index: 目标插入位置的索引
  365. """
  366. if not isinstance(indices, (list, tuple)) or len(indices) == 0:
  367. raise ValueError("Indices must be a non-empty list or tuple of integers.")
  368. # 添加到末尾
  369. if target_index == len(lst):
  370. if is_real_same_folder:
  371. # 倒序取出所有内容,并添加到末尾
  372. indices.sort(reverse=True)
  373. temp = []
  374. for i in indices:
  375. temp.append(lst.pop(i))
  376. while temp:
  377. lst.append(temp.pop(-1))
  378. # 重新写入索引
  379. for index, image_label in enumerate(lst):
  380. image_label.image_index = index
  381. return
  382. # 检查索引是否有效,并排序以确保按照正确的顺序处理
  383. valid_indices = sorted(set(indices)) # 去重并排序
  384. if not all(0 <= idx < len(lst) for idx in valid_indices):
  385. raise IndexError("One or more indices are out of range.")
  386. if not (0 <= target_index <= len(lst)):
  387. raise IndexError("Target index is out of range.")
  388. elements_to_move = [lst[idx] for idx in valid_indices]
  389. # 如果目标位置在所有待移动元素的最大索引之后,则不需要调整目标位置
  390. max_idx = max(valid_indices)
  391. if max_idx < target_index:
  392. pass
  393. else:
  394. # 计算需要减少的位置数(因为删除元素后,后续元素会向前移动)
  395. shift_count = sum(1 for idx in valid_indices if idx < target_index)
  396. target_index -= shift_count
  397. # 移除元素(从大索引开始以避免影响小索引处的元素)
  398. for idx in reversed(valid_indices):
  399. del lst[idx]
  400. # 插入元素到目标位置,保持它们原来的相对顺序
  401. for i, element in enumerate(elements_to_move):
  402. lst.insert(target_index + i, element)
  403. # 重新写入索引
  404. for index, image_label in enumerate(lst):
  405. image_label.image_index = index
  406. # 比较两个相同格式的时间字符串
  407. def compare_two_times(time_str1, time_str2):
  408. # 定义时间格式
  409. time_format = "%Y-%m-%d %H:%M:%S"
  410. # 将时间字符串解析为datetime对象
  411. time1 = datetime.strptime(time_str1, time_format)
  412. time2 = datetime.strptime(time_str2, time_format)
  413. # 比较两个时间
  414. if time1 > time2:
  415. # print(f"{time_str1} 比 {time_str2} 新。")
  416. return "left_new"
  417. elif time1 < time2:
  418. # print(f"{time_str2} 比 {time_str1} 新。")
  419. return "right_new"
  420. else:
  421. # print("两个时间相等。")
  422. return "is_same"