import asyncio import json import time from settings import HLM_HOST import requests import os from utils.utils_func import get_md5,get_modified_time,compare_two_times,download_file class init_load_source: def __init__(self): pass async def load_source(self): # asyncio.sleep() # await self.down_resouce() for i in range(100): print(1) await asyncio.sleep(1) async def down_resouce(self): response_data = self.get_update_file() if response_data: for relative_file_path, value in response_data.items(): file_path = "{}\{}".format(os.getcwd(), relative_file_path) if os.path.exists(file_path): file_md5 = get_md5(file_path) if file_md5 != value["file_md5"]: # print(file_path, "md5 不同") file_modified_time = get_modified_time(file_path) text = "md5 不同 开始下载:{}".format(file_path) print(text) download_file(url=value["url"], file_path=file_path) else: # print(file_path, "md5 相同") pass else: text = "文件不存在 开始下载:{}".format(file_path) print(text) time.sleep(1) download_file(url=value["url"], file_path=file_path) else: print("获取更新文件内容失败") def get_update_file(self, type="client_camera", plugins_name="plugins_A"): """ 根据类型和插件名称获取更新文件内容。 调用外部API查询客户端插件信息,并尝试下载指定名称的插件文件内容。 参数: - type: 插件类型,默认为"client_camera"。 - plugins_name: 插件名称,默认为"plugins_A"。 返回: - 插件文件内容字符串,如果获取成功。 - None,如果获取失败或出现错误。 """ url = HLM_HOST + "/api/openai/query_client_addons" params = {"type": type} try: response = requests.get(url, params=params, timeout=10) response.raise_for_status() raw_data = response.json() # 检查 raw_data 和 raw_data["data"] 是否存在 if not raw_data or not raw_data.get("data"): return None # 使用 next 函数简化查找逻辑 item = next((item for item in raw_data["data"]["list"] if item["name"] == plugins_name), None) if item: url = item["url"] response = requests.get(url, timeout=10) response.raise_for_status() return json.loads(response.text) return None except requests.RequestException as e: print(f"An error occurred: {e}") return None