multi_threaded_image_saving.py 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116
  1. import settings
  2. import threading
  3. from concurrent.futures import ThreadPoolExecutor, Future
  4. from typing import List, Dict, Any
  5. import time
  6. from PIL import Image
  7. import pillow_avif
  8. class ImageSaver:
  9. instance = None
  10. init_flag = None
  11. def __init__(self):
  12. """
  13. 初始化ImageSaver对象。
  14. :param max_workers: 线程池中最大工作线程数,默认为4。
  15. """
  16. """此处设计为,如果已经存在实例时,不再执行初始化"""
  17. if self.init_flag:
  18. return
  19. else:
  20. self.init_flag = True
  21. self.executor = ThreadPoolExecutor(max_workers=settings.IMAGE_SAVE_MAX_WORKERS)
  22. self.tasks_dict = {}
  23. self.lock = threading.Lock()
  24. def save_image(self, image: Image, file_path: str, **kwargs) -> Future[Any]:
  25. """
  26. 将图片数据插入任务队列,并返回一个Future对象。
  27. :param image_data: 图片的二进制数据。
  28. :param filename: 图片保存的文件名。
  29. :return: 返回一个Future对象,用于查询任务状态。
  30. """
  31. future = self.executor.submit(
  32. self._save_image_worker, image, file_path, **kwargs
  33. )
  34. with self.lock:
  35. self.tasks_dict[file_path] = {
  36. "is_completed": False,
  37. "create_time": time.time(),
  38. "is_error": False,
  39. "error_info": "",
  40. }
  41. return future
  42. def _save_image_worker(self, image: Image, file_path: str, **kwargs) -> None:
  43. """
  44. 实际执行保存图片的任务。
  45. :param image_data: 图片的二进制数据。
  46. :param filename: 图片保存的文件名。
  47. """
  48. try:
  49. self.save_image_by_thread_run(image=image, out_path=file_path, **kwargs)
  50. with self.lock:
  51. self.tasks_dict[file_path]["is_completed"] = True
  52. except Exception as e:
  53. print(f"Error saving {file_path}: {e}")
  54. with self.lock:
  55. self.tasks_dict[file_path]["is_completed"] = True
  56. self.tasks_dict[file_path]["is_error"] = True
  57. self.tasks_dict[file_path]["error_info"] = "{}".format(e)
  58. def save_image_by_thread_run(
  59. self,
  60. image: Image,
  61. out_path,
  62. save_mode="png",
  63. quality=None,
  64. dpi=None,
  65. _format="JPEG",
  66. **kwargs,
  67. ):
  68. if save_mode == "png":
  69. image.save(out_path)
  70. else:
  71. if quality:
  72. if dpi:
  73. image.save(out_path, quality=quality, dpi=dpi, format=_format)
  74. else:
  75. image.save(out_path, quality=quality, format=_format)
  76. else:
  77. image.save(out_path, format=_format)
  78. def get_completed_images(self, file_path):
  79. """
  80. 获取已完成保存的图片列表。
  81. :return: 已完成保存的图片文件名列表。
  82. """
  83. with self.lock:
  84. if file_path in self.tasks_dict:
  85. return self.tasks_dict[file_path]
  86. return None
  87. def get_pending_images(self) -> List[str]:
  88. """
  89. 获取尚未完成保存的图片列表。
  90. :return: 尚未完成保存的图片文件名列表。
  91. """
  92. with self.lock:
  93. return [
  94. file_path
  95. for file_path, _value in self.tasks_dict.items()
  96. if not _value["is_completed"]
  97. ]
  98. def __new__(cls, *args, **kwargs):
  99. """如果当前没有实例时,调用父类__new__方法,生成示例,有则返回保存的内存地址。"""
  100. if not cls.instance:
  101. cls.instance = super().__new__(cls)
  102. return cls.instance