| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101 |
- import settings
- import threading
- from concurrent.futures import ThreadPoolExecutor, Future
- from typing import List, Dict, Any
- import time
- from PIL import Image
- class ImageSaver:
- instance = None
- init_flag = None
- def __init__(self):
- """
- 初始化ImageSaver对象。
- :param max_workers: 线程池中最大工作线程数,默认为4。
- """
- """此处设计为,如果已经存在实例时,不再执行初始化"""
- if self.init_flag:
- return
- else:
- self.init_flag = True
- self.executor = ThreadPoolExecutor(max_workers=settings.IMAGE_SAVE_MAX_WORKERS)
- self.tasks_dict = {}
- self.lock = threading.Lock()
- def save_image(self, image: Image, file_path: str, **kwargs) -> Future[Any]:
- """
- 将图片数据插入任务队列,并返回一个Future对象。
- :param image_data: 图片的二进制数据。
- :param filename: 图片保存的文件名。
- :return: 返回一个Future对象,用于查询任务状态。
- """
- future = self.executor.submit(self._save_image_worker, image, file_path, **kwargs)
- with self.lock:
- self.tasks_dict[file_path] = {"is_completed": False,
- "create_time": time.time(),
- "is_error": False,
- "error_info": "",
- }
- return future
- def _save_image_worker(self, image: Image, file_path: str, **kwargs) -> None:
- """
- 实际执行保存图片的任务。
- :param image_data: 图片的二进制数据。
- :param filename: 图片保存的文件名。
- """
- try:
- self.save_image_by_thread_run(image=image, out_path=file_path, **kwargs)
- with self.lock:
- self.tasks_dict[file_path]["is_completed"] = True
- except Exception as e:
- print(f"Error saving {file_path}: {e}")
- with self.lock:
- self.tasks_dict[file_path]["is_completed"] = True
- self.tasks_dict[file_path]["is_error"] = True
- self.tasks_dict[file_path]["error_info"] = "{}".format(e)
- def save_image_by_thread_run(self, image: Image, out_path, save_mode="png", quality=None, dpi=None, _format="JPEG",
- **kwargs):
- if save_mode == "png":
- image.save(out_path)
- else:
- if quality:
- if dpi:
- image.save(out_path, quality=quality, dpi=dpi, format=_format)
- else:
- image.save(out_path, quality=quality, format=_format)
- else:
- image.save(out_path, format=_format)
- def get_completed_images(self, file_path):
- """
- 获取已完成保存的图片列表。
- :return: 已完成保存的图片文件名列表。
- """
- with self.lock:
- if file_path in self.tasks_dict:
- return self.tasks_dict[file_path]
- return None
- def get_pending_images(self) -> List[str]:
- """
- 获取尚未完成保存的图片列表。
- :return: 尚未完成保存的图片文件名列表。
- """
- with self.lock:
- return [file_path for file_path, _value in self.tasks_dict.items() if not _value["is_completed"]]
- def __new__(cls, *args, **kwargs):
- """如果当前没有实例时,调用父类__new__方法,生成示例,有则返回保存的内存地址。"""
- if not cls.instance:
- cls.instance = super().__new__(cls)
- return cls.instance
|