multi_threaded_image_saving.py 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101
  1. import settings
  2. import threading
  3. from concurrent.futures import ThreadPoolExecutor, Future
  4. from typing import List, Dict, Any
  5. import time
  6. from PIL import Image
  7. class ImageSaver:
  8. instance = None
  9. init_flag = None
  10. def __init__(self):
  11. """
  12. 初始化ImageSaver对象。
  13. :param max_workers: 线程池中最大工作线程数,默认为4。
  14. """
  15. """此处设计为,如果已经存在实例时,不再执行初始化"""
  16. if self.init_flag:
  17. return
  18. else:
  19. self.init_flag = True
  20. self.executor = ThreadPoolExecutor(max_workers=settings.IMAGE_SAVE_MAX_WORKERS)
  21. self.tasks_dict = {}
  22. self.lock = threading.Lock()
  23. def save_image(self, image: Image, file_path: str, **kwargs) -> Future[Any]:
  24. """
  25. 将图片数据插入任务队列,并返回一个Future对象。
  26. :param image_data: 图片的二进制数据。
  27. :param filename: 图片保存的文件名。
  28. :return: 返回一个Future对象,用于查询任务状态。
  29. """
  30. future = self.executor.submit(self._save_image_worker, image, file_path, **kwargs)
  31. with self.lock:
  32. self.tasks_dict[file_path] = {"is_completed": False,
  33. "create_time": time.time(),
  34. "is_error": False,
  35. "error_info": "",
  36. }
  37. return future
  38. def _save_image_worker(self, image: Image, file_path: str, **kwargs) -> None:
  39. """
  40. 实际执行保存图片的任务。
  41. :param image_data: 图片的二进制数据。
  42. :param filename: 图片保存的文件名。
  43. """
  44. try:
  45. self.save_image_by_thread_run(image=image, out_path=file_path, **kwargs)
  46. with self.lock:
  47. self.tasks_dict[file_path]["is_completed"] = True
  48. except Exception as e:
  49. print(f"Error saving {file_path}: {e}")
  50. with self.lock:
  51. self.tasks_dict[file_path]["is_completed"] = True
  52. self.tasks_dict[file_path]["is_error"] = True
  53. self.tasks_dict[file_path]["error_info"] = "{}".format(e)
  54. def save_image_by_thread_run(self, image: Image, out_path, save_mode="png", quality=None, dpi=None, _format="JPEG",
  55. **kwargs):
  56. if save_mode == "png":
  57. image.save(out_path)
  58. else:
  59. if quality:
  60. if dpi:
  61. image.save(out_path, quality=quality, dpi=dpi, format=_format)
  62. else:
  63. image.save(out_path, quality=quality, format=_format)
  64. else:
  65. image.save(out_path, format=_format)
  66. def get_completed_images(self, file_path):
  67. """
  68. 获取已完成保存的图片列表。
  69. :return: 已完成保存的图片文件名列表。
  70. """
  71. with self.lock:
  72. if file_path in self.tasks_dict:
  73. return self.tasks_dict[file_path]
  74. return None
  75. def get_pending_images(self) -> List[str]:
  76. """
  77. 获取尚未完成保存的图片列表。
  78. :return: 尚未完成保存的图片文件名列表。
  79. """
  80. with self.lock:
  81. return [file_path for file_path, _value in self.tasks_dict.items() if not _value["is_completed"]]
  82. def __new__(cls, *args, **kwargs):
  83. """如果当前没有实例时,调用父类__new__方法,生成示例,有则返回保存的内存地址。"""
  84. if not cls.instance:
  85. cls.instance = super().__new__(cls)
  86. return cls.instance