from watchdog.events import FileSystemEventHandler from watchdog.observers import Observer import settings import time import os from utils.utils_func import get_folder, check_path import datetime from utils.SingletonType import SingletonType class FileEventHandler(FileSystemEventHandler, metaclass=SingletonType): instance = None init_flag = None def __init__(self): # if self.init_flag: # return # else: # self.init_flag = True super().__init__() # self.window = window FileSystemEventHandler.__init__(self) self.receive_photo_data = [] # 以下为测试用途 self._get_image_index = -1 self.observer = None self.last_create_time = datetime.datetime.now() def start_observer(self,path): print("图片保存目录:", path) if path == None or path == "": return self.observer = Observer() watch_path = self.check_and_get_real_dir(path) if watch_path: self.observer.schedule(self, watch_path, recursive=True) # recursive 遍历目录 self.observer.start() print("开启开门狗目录监听") else: print("路径错误不存在") def check_and_get_real_dir(self, watch_path): """ 逻辑: 1、检查当前路径如包含Originals 则定位到对应Originals的父级目录 2、如没有Originals,则认定为一个普通目录(且子文件夹没有Originals),否则定位到父级目录 3、根据父级目录,获取当天应该创建的拍摄子目录,如没有则自动创建并定位 """ if not os.path.exists(watch_path): return None if "Originals" in watch_path: root_path = watch_path.split("Originals", 1)[0] else: if "Originals" in [x["folder_name"] for x in get_folder(watch_path)]: root_path = watch_path else: return watch_path # 检查并创建日期 now = datetime.datetime.now() year = now.year month = now.month day = now.day path = r"{root_path}\Originals\{year}\{month}\{day}".format(root_path=root_path, year=year, month=month, day=day) check_path(path) print("watch_path:",path) return path def on_moved(self, event): # if event.is_directory: # print("directory moved from {0} to {1}".format(event.src_path, event.dest_path)) # else: # print("file moved from {0} to {1}".format(event.src_path, event.dest_path)) # if os.path.split(event.dest_path)[0] == settings.PhotoOutputDir: # print("1111111") pass def on_created(self, event): if not event.is_directory: file_path = event.src_path print("file created:{0}".format(file_path)) self.receive_photo_data.append(file_path) # self.get_photo_info(file_path) # self.window.data_sign.emit({"_type": "photo_number_music_play"}) try: take_time = time.time() self.send_log("获取文件file_path:{}".format(file_path)) create_time = datetime.datetime.fromtimestamp(os.path.getctime(file_path)) # print("获取文件create_time:{}".format(create_time)) self.get_photo_info(raw_path=file_path, create_time=create_time, take_time=take_time) self.send_log("获取文件create_time:{}".format(create_time)) except BaseException as e: print("获取文件create_time失败", e) self.send_log("获取文件处理失败{}".format(e)) def send_log(self, text): print(text) def get_photo_info(self, raw_path, create_time, take_time): # 看门狗监听到系统有图片生成后,自动进行图片对应 f_path = os.path.split(raw_path)[0] print("raw_path:", raw_path) # take_time = time.time() # 创建图片时间,默认为拍照时间 time.sleep(0.2) # 等待原始图片文件写入 # 查找真实的图片路径名称 f = False if f: if not os.path.exists(raw_path): print("不存在", raw_path) for file in os.listdir(f_path): file_path = os.path.join(f_path, file) # 判断是否为文件 if os.path.isfile(file_path): # 获取文件创建时间 create_time = datetime.datetime.fromtimestamp(os.path.getctime(file_path)) # 更新最早时间 if create_time > self.last_create_time: self.last_create_time = create_time raw_path = file_path # last_file_size = os.path.getsize(raw_path) # k = 30 # # 检查文件是否有写入完成 # flag = False # while k: # k -= 1 # if k == 0: # break # _file_size = os.path.getsize(raw_path) # if last_file_size == _file_size: # flag = True # break # else: # last_file_size = _file_size # time.sleep(0.1) # if not flag: # return # 调用父程序,执行报错图片命令 # self.window.show_img_on_photo_todo_list_sign(raw_path=raw_path, take_time=take_time) def stop(self): # 结束监听 if self.observer is not None: self.observer.stop() print("结束监听") del self.observer def __new__(cls, *args, **kwargs): """如果当前没有实例时,调用父类__new__方法,生成示例,有则返回保存的内存地址。""" if not cls.instance: cls.instance = super().__new__(cls) return cls.instance def __del__(self): self.stop() print("结束监听,进程关闭")