base.py 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510
  1. import os
  2. import copy
  3. import configparser
  4. # from module.base_mode.module_aes import Aes
  5. import exifread
  6. from natsort import ns, natsorted
  7. import shutil
  8. from hashlib import sha256, md5
  9. import requests
  10. from datetime import datetime
  11. from settings import handle_remove_readonly
  12. # 获取digicam的路径
  13. def check_install_path(other):
  14. path_list = []
  15. path_list.append(other)
  16. path_list.append(r"D:\Program Files (x86)\digiCamControl\CameraControl.exe")
  17. path_list.append(r"C:\Program Files (x86)\digiCamControl\CameraControl.exe")
  18. path_list.append(r"D:\Program Files\digiCamControl\CameraControl.exe")
  19. path_list.append(r"C:\Program Files\digiCamControl\CameraControl.exe")
  20. for i in path_list:
  21. if os.path.exists(i):
  22. return i
  23. return ""
  24. # 输入文件夹,并检查是否是一个正常的图片文件夹。
  25. def check_goods_folder(folder_path):
  26. all_files = os.listdir(folder_path)
  27. for file in all_files:
  28. file_path = "{}/{}".format(folder_path, file)
  29. if not os.path.isdir(file_path):
  30. continue
  31. if "原始图" in os.listdir(file_path):
  32. return folder_path
  33. # 上述检查不通过,可能是选择目录错误
  34. if "原始图" in all_files:
  35. root_path, _ = os.path.split(folder_path)
  36. return root_path
  37. return None
  38. def download_file(url, file_path):
  39. try:
  40. root_path, file_name = os.path.split(file_path)
  41. check_path(root_path)
  42. response = requests.get(url)
  43. _content = response.content
  44. with open(file_path, 'wb') as f:
  45. f.write(_content)
  46. print("下载成功:{}".format(file_path))
  47. except:
  48. print("下载失败:{}".format(file_path))
  49. def calculate_sha256(file_path):
  50. """Calculate the sha256 hash of the given file."""
  51. sha256_hash = sha256()
  52. try:
  53. with open(file_path, "rb") as f:
  54. # Read and update hash string value in blocks of 4K
  55. for byte_block in iter(lambda: f.read(4096), b""):
  56. sha256_hash.update(byte_block)
  57. return sha256_hash.hexdigest()
  58. except FileNotFoundError:
  59. print(f"The file {file_path} does not exist.")
  60. return None
  61. except Exception as e:
  62. print(f"An error occurred: {e}")
  63. return None
  64. def get_modified_time(file_path):
  65. # 获取文件最后修改的时间戳
  66. timestamp = os.path.getmtime(file_path)
  67. # 将时间戳转换为datetime对象,并格式化为指定格式的字符串
  68. modified_time = datetime.fromtimestamp(timestamp).strftime('%Y-%m-%d %H:%M:%S')
  69. return modified_time
  70. def download_file_to_io(url):
  71. try:
  72. # 发送GET请求
  73. response = requests.get(url)
  74. # 检查请求是否成功
  75. response.raise_for_status() # 如果响应状态码不是200,会抛出HTTPError异常
  76. # 返回响应内容,以文本形式(对于HTML、JSON等文本内容)
  77. return response.text
  78. # 或者返回原始的二进制内容(对于图片、文件等非文本内容)
  79. # return response.content
  80. except requests.RequestException as e:
  81. print(f"An error occurred: {e}")
  82. return None
  83. def get_md5(file_path):
  84. data_md5 = None
  85. if os.path.isfile(file_path):
  86. f = open(file_path, 'rb')
  87. md5_obj = md5()
  88. md5_obj.update(f.read())
  89. hash_code = md5_obj.hexdigest()
  90. f.close()
  91. data_md5 = str(hash_code).lower()
  92. return data_md5
  93. def list_dir(path):
  94. listdir = os.listdir(path)
  95. return natsorted(listdir, alg=ns.PATH)
  96. # 通用串口数据解析器
  97. def get_data_from_receive_data(receive_data, start, len_data, data_magnification=1):
  98. # data_magnification 数据放大倍数,或缩小倍数,默认为1
  99. try:
  100. if len_data == 1:
  101. data = receive_data[start]
  102. return data * data_magnification
  103. elif len_data == 2:
  104. data = receive_data[start] << 8 | receive_data[start + 1]
  105. return data * data_magnification
  106. elif len_data == 4:
  107. data = receive_data[start] << 24 | receive_data[start + 1] << 16 | receive_data[start + 2] << 8 | \
  108. receive_data[start + 3]
  109. return data * data_magnification
  110. return None
  111. except:
  112. return None
  113. def get_images(path):
  114. _Type = [".png", ".PNG", ".jpg", ".JPG", ".gif", ".GIF", ".jpge", ".JPGE", ".CR2"]
  115. image_list = [] # 过滤非图片数据
  116. for _file in list_dir(path):
  117. file_name, e = os.path.splitext(_file)
  118. file_path = "{}/{}".format(path, _file)
  119. if os.path.isdir(file_path):
  120. continue
  121. if e in _Type and "mask" not in file_name:
  122. image_list.append(
  123. {
  124. "file_path": file_path,
  125. "file_name": file_name,
  126. "file": _file,
  127. "root_path": path,
  128. "e": e,
  129. }
  130. )
  131. return image_list
  132. def get_image_mask(path):
  133. _Type = [".png", ".PNG", ".jpg", ".JPG", ".gif", ".GIF", ".jpge", ".JPGE", ".CR2"]
  134. image_list = [] # 过滤非图片数据
  135. for _file in list_dir(path):
  136. file_name, e = os.path.splitext(_file)
  137. file_path = "{}/{}".format(path, _file)
  138. if os.path.isdir(file_path):
  139. continue
  140. if e in _Type and file_name == "mask":
  141. image_list.append(
  142. {
  143. "file_path": file_path,
  144. "file_name": file_name,
  145. "file": _file,
  146. "root_path": path,
  147. "e": e,
  148. }
  149. )
  150. return image_list
  151. # 删除文件夹下的所有文件
  152. def remove_all_file(directory):
  153. try:
  154. shutil.rmtree(directory,onerror=handle_remove_readonly)
  155. os.makedirs(directory)
  156. except Exception as e:
  157. print(f'Failed to clear directory {directory}. Reason: {e}')
  158. # 删除文件
  159. def remove_files(file_path):
  160. try:
  161. if os.path.exists(file_path):
  162. os.remove(file_path)
  163. return True
  164. except BaseException as e:
  165. print("base 188", e)
  166. return False
  167. def get_folder(path):
  168. folder_list = []
  169. for _file in list_dir(path):
  170. file_path = "{}/{}".format(path, _file)
  171. if os.path.isdir(file_path):
  172. folder_list.append(
  173. {"folder_path": file_path,
  174. "folder_name": _file,
  175. "root_path": path,
  176. "label": "待处理", # 是否需要继续处理
  177. }
  178. )
  179. return folder_list
  180. # 获取所有货号颜色 文件夹
  181. def get_all_goods_art_no_folder(path, goods_art_nos):
  182. folder_list = []
  183. if not os.path.exists(path):
  184. return folder_list
  185. if os.path.isfile(path):
  186. return folder_list
  187. temp_folder_list = get_folder(path)
  188. # for goods_art_no in goods_art_nos:
  189. for folder_data in temp_folder_list:
  190. folder_path = folder_data["folder_path"]
  191. _p = "{}/原始图".format(folder_path)
  192. if os.path.exists(_p):
  193. folder_data["folder_path"] = f"{folder_path}"
  194. folder_list.append(folder_data)
  195. return folder_list
  196. def check_move_goods_art_no_folder(path, goods_art_nos,limit_folder):
  197. """
  198. 检查目录是否存在,如果存在,直接移动到当前日期下的目录
  199. """
  200. folder_list = {}
  201. if not os.path.exists(path):
  202. print("path不存在", path)
  203. return folder_list
  204. if os.path.isfile(path):
  205. print("path是文件", path)
  206. return folder_list
  207. temp_folder_list = get_folder(path)
  208. for goods_art_no in goods_art_nos:
  209. for folder_data in temp_folder_list:
  210. # folder_path = folder_data["folder_path"]
  211. folder_name = folder_data["folder_name"]
  212. _p = "output/{}/{}/原始图".format(folder_name, goods_art_no)
  213. if os.path.exists(_p):
  214. folder_data["folder_path"] = f"output/{folder_name}/{goods_art_no}"
  215. # 整个目录移动到目标目录
  216. folder_list[goods_art_no] = folder_data
  217. if not os.path.exists(f"{limit_folder}/{goods_art_no}"):
  218. # 目标不存在
  219. try:
  220. shutil.move(folder_data["folder_path"], limit_folder)
  221. folder_list[goods_art_no] = folder_data
  222. print("移动目录", folder_data["folder_path"], limit_folder)
  223. except:
  224. continue
  225. else:
  226. # 如果希望覆盖
  227. print(f"目标目录 {limit_folder}/{goods_art_no} 已存在,跳过移动")
  228. return folder_list
  229. def get_date_time_original(file_path):
  230. with open(file_path, "rb") as file_data:
  231. tags = exifread.process_file(file_data)
  232. if "EXIF DateTimeOriginal" in tags:
  233. return str(tags["EXIF DateTimeOriginal"])
  234. else:
  235. return False
  236. def get_data_from_hqt(goods_number_list, get_online_data_ins):
  237. _goods_number_list = copy.deepcopy(goods_number_list)
  238. _list = []
  239. # 单次请求数少于20个
  240. goods_number_dict = {}
  241. while _goods_number_list:
  242. goods_art_no = _goods_number_list.pop()
  243. if "NUM" in goods_art_no:
  244. goods_art_no = goods_art_no.replace("NUM", "")
  245. _list.append(goods_art_no)
  246. if len(_list) == 20 or len(_goods_number_list) == 0:
  247. online_goods_art_data = get_online_data_ins.get_goods_art_no_info(
  248. numbers_list=_list
  249. )
  250. if online_goods_art_data:
  251. for number in online_goods_art_data:
  252. goods_number_dict["NUM" + number] = online_goods_art_data[number]
  253. _list = []
  254. return goods_number_dict
  255. def get_data_from_hqt_with_goods_art_no(goods_art_no_list, get_online_data_ins):
  256. _goods_art_no_list = copy.deepcopy(goods_art_no_list)
  257. _list = []
  258. # 单次请求数少于20个
  259. goods_art_no_dict = {}
  260. while _goods_art_no_list:
  261. goods_art_no = _goods_art_no_list.pop()
  262. _list.append(goods_art_no)
  263. if len(_list) == 20 or len(_goods_art_no_list) == 0:
  264. online_goods_art_data = get_online_data_ins.get_goods_art_no_info(
  265. goods_art_list=_list
  266. )
  267. if online_goods_art_data:
  268. for _goods_art_no in online_goods_art_data:
  269. goods_art_no_dict[_goods_art_no] = online_goods_art_data[
  270. _goods_art_no
  271. ]
  272. _list = []
  273. return goods_art_no_dict
  274. def load_config_form_ini(config_path):
  275. config = configparser.ConfigParser()
  276. if os.path.exists(config_path):
  277. try:
  278. config.read(config_path, encoding="utf-8")
  279. except:
  280. config.read(config_path)
  281. return config
  282. def set_config_to_int(config_ins, config_path, data_dict, section="basicSetup"):
  283. for i in data_dict:
  284. config_ins.set(section=section, option=i, value=data_dict[i])
  285. try:
  286. config_ins.write(open(config_path, "w", encoding="utf-8"))
  287. except:
  288. config_ins.write(open(config_path, "w"))
  289. # try:
  290. # config_ins.read(config_path, encoding="utf-8")
  291. # except:
  292. # config_ins.read(config_path)
  293. return config_ins
  294. def get_config(config_ins, key, section="basicSetup"):
  295. config_dict1 = config_ins.items(section)
  296. __config_dict = {}
  297. for i, k in config_dict1:
  298. __config_dict[i] = k
  299. if key in __config_dict:
  300. return __config_dict[key]
  301. else:
  302. return None
  303. def get_config_by_items(config_dict):
  304. __config_dict = {}
  305. for i, k in config_dict:
  306. __config_dict[i] = k
  307. return __config_dict
  308. def get_dict_value(_dict, key, default=None):
  309. if key in _dict:
  310. return _dict[key]
  311. else:
  312. return default
  313. # def set_key(authorization, SecretKey, SecretIv):
  314. # # --------------------用户身份--------------
  315. # if authorization:
  316. # authorization = Aes().get_key(authorization, SecretKey, SecretIv)
  317. # with open("key", "w") as f:
  318. # f.write(authorization)
  319. # return authorization
  320. def print_dic(_dict):
  321. for i, v in _dict.items():
  322. print("{}:{}".format(i, v))
  323. print("\n")
  324. def check_path(_path):
  325. if not os.path.exists(_path):
  326. # 创建多级目录
  327. os.makedirs(_path, exist_ok=True)
  328. # os.mkdir(_path)
  329. return True
  330. def move_folders(path_list, target_folder):
  331. if not os.path.exists(target_folder):
  332. os.makedirs(target_folder)
  333. for source_folder in path_list:
  334. shutil.move(source_folder, target_folder)
  335. # 给定一个图片路径,如果 是原始图下的则返回对应已扣图等信息。否则只返回基础信息
  336. def get_cutout_image_info(source_image_path):
  337. if not os.path.exists(source_image_path):
  338. return None
  339. if os.path.isdir(source_image_path):
  340. return None
  341. data = {}
  342. source_root_path, source_file = os.path.split(source_image_path)
  343. source_file_name, source_file_extension = os.path.splitext(source_file)
  344. print("---------------source_root_path--------------------")
  345. print(source_root_path)
  346. print(source_root_path[-3:])
  347. # if source_root_path[-3:] != "原始图":
  348. # return None
  349. data["source_image_path"] = source_image_path
  350. data["source_root_path"] = source_root_path
  351. data["source_file"] = source_file
  352. data["source_file_name"] = source_file_name
  353. data["source_file_extension"] = source_file_extension
  354. cutout_image_path = "{}/原始图_已抠图/{}.png".format(source_root_path[:-3], source_file_name)
  355. if not os.path.exists(cutout_image_path):
  356. data["cutout_image_path"] = None
  357. return data
  358. cutout_image_root_path, cutout_image_file = os.path.split(cutout_image_path)
  359. cutout_image_file_name, cutout_image_file_extension = os.path.splitext(cutout_image_file)
  360. data["cutout_image_path"] = cutout_image_path
  361. data["cutout_image_root_path"] = cutout_image_root_path
  362. data["cutout_image_file"] = cutout_image_file
  363. data["cutout_image_file_name"] = cutout_image_file_name
  364. data["cutout_image_file_extension"] = cutout_image_file_extension
  365. return data
  366. # 指定一个列表,指定一个需要移动的数据列表,移动到指定列表中的位置
  367. def move_elements_inplace(lst, indices, target_index, is_real_same_folder=False):
  368. """
  369. 在原地将列表中指定位置的两个元素移动到目标位置。
  370. :param lst: 要操作的列表
  371. :param indices: 一个包含两个整数的元组或列表,表示要移动的元素的索引
  372. :param target_index: 目标插入位置的索引
  373. """
  374. if not isinstance(indices, (list, tuple)) or len(indices) == 0:
  375. raise ValueError("Indices must be a non-empty list or tuple of integers.")
  376. # 添加到末尾
  377. if target_index == len(lst):
  378. if is_real_same_folder:
  379. # 倒序取出所有内容,并添加到末尾
  380. indices.sort(reverse=True)
  381. temp = []
  382. for i in indices:
  383. temp.append(lst.pop(i))
  384. while temp:
  385. lst.append(temp.pop(-1))
  386. # 重新写入索引
  387. for index, image_label in enumerate(lst):
  388. image_label.image_index = index
  389. return
  390. # 检查索引是否有效,并排序以确保按照正确的顺序处理
  391. valid_indices = sorted(set(indices)) # 去重并排序
  392. if not all(0 <= idx < len(lst) for idx in valid_indices):
  393. raise IndexError("One or more indices are out of range.")
  394. if not (0 <= target_index <= len(lst)):
  395. raise IndexError("Target index is out of range.")
  396. elements_to_move = [lst[idx] for idx in valid_indices]
  397. # 如果目标位置在所有待移动元素的最大索引之后,则不需要调整目标位置
  398. max_idx = max(valid_indices)
  399. if max_idx < target_index:
  400. pass
  401. else:
  402. # 计算需要减少的位置数(因为删除元素后,后续元素会向前移动)
  403. shift_count = sum(1 for idx in valid_indices if idx < target_index)
  404. target_index -= shift_count
  405. # 移除元素(从大索引开始以避免影响小索引处的元素)
  406. for idx in reversed(valid_indices):
  407. del lst[idx]
  408. # 插入元素到目标位置,保持它们原来的相对顺序
  409. for i, element in enumerate(elements_to_move):
  410. lst.insert(target_index + i, element)
  411. # 重新写入索引
  412. for index, image_label in enumerate(lst):
  413. image_label.image_index = index
  414. # 比较两个相同格式的时间字符串
  415. def compare_two_times(time_str1, time_str2):
  416. # 定义时间格式
  417. time_format = "%Y-%m-%d %H:%M:%S"
  418. # 将时间字符串解析为datetime对象
  419. time1 = datetime.strptime(time_str1, time_format)
  420. time2 = datetime.strptime(time_str2, time_format)
  421. # 比较两个时间
  422. if time1 > time2:
  423. # print(f"{time_str1} 比 {time_str2} 新。")
  424. return "left_new"
  425. elif time1 < time2:
  426. # print(f"{time_str2} 比 {time_str1} 新。")
  427. return "right_new"
  428. else:
  429. # print("两个时间相等。")
  430. return "is_same"