import settings import threading from concurrent.futures import ThreadPoolExecutor, Future from typing import List, Dict, Any import time from PIL import Image import pillow_avif class ImageSaver: instance = None init_flag = None def __init__(self): """ 初始化ImageSaver对象。 :param max_workers: 线程池中最大工作线程数,默认为4。 """ """此处设计为,如果已经存在实例时,不再执行初始化""" if self.init_flag: return else: self.init_flag = True self.executor = ThreadPoolExecutor(max_workers=settings.IMAGE_SAVE_MAX_WORKERS) self.tasks_dict = {} self.lock = threading.Lock() def save_image(self, image: Image, file_path: str, **kwargs) -> Future[Any]: """ 将图片数据插入任务队列,并返回一个Future对象。 :param image_data: 图片的二进制数据。 :param filename: 图片保存的文件名。 :return: 返回一个Future对象,用于查询任务状态。 """ future = self.executor.submit( self._save_image_worker, image, file_path, **kwargs ) with self.lock: self.tasks_dict[file_path] = { "is_completed": False, "create_time": time.time(), "is_error": False, "error_info": "", } return future def _save_image_worker(self, image: Image, file_path: str, **kwargs) -> None: """ 实际执行保存图片的任务。 :param image_data: 图片的二进制数据。 :param filename: 图片保存的文件名。 """ try: self.save_image_by_thread_run(image=image, out_path=file_path, **kwargs) with self.lock: self.tasks_dict[file_path]["is_completed"] = True except Exception as e: print(f"Error saving {file_path}: {e}") with self.lock: self.tasks_dict[file_path]["is_completed"] = True self.tasks_dict[file_path]["is_error"] = True self.tasks_dict[file_path]["error_info"] = "{}".format(e) def save_image_by_thread_run( self, image: Image, out_path, save_mode="png", quality=None, dpi=None, _format="JPEG", **kwargs, ): if save_mode == "png": image.save(out_path) else: if quality: if dpi: image.save(out_path, quality=quality, dpi=dpi, format=_format) else: image.save(out_path, quality=quality, format=_format) else: image.save(out_path, format=_format) def get_completed_images(self, file_path): """ 获取已完成保存的图片列表。 :return: 已完成保存的图片文件名列表。 """ with self.lock: if file_path in self.tasks_dict: return self.tasks_dict[file_path] return None def get_pending_images(self) -> List[str]: """ 获取尚未完成保存的图片列表。 :return: 尚未完成保存的图片文件名列表。 """ with self.lock: return [ file_path for file_path, _value in self.tasks_dict.items() if not _value["is_completed"] ] def __new__(cls, *args, **kwargs): """如果当前没有实例时,调用父类__new__方法,生成示例,有则返回保存的内存地址。""" if not cls.instance: cls.instance = super().__new__(cls) return cls.instance