import json import time if __name__ == '__main__': raise "不允许在当前目录下运行" import os import settings from ampy.pyboard import Pyboard import ampy.files from module.view_control.main_window.data_main import DataMode from module.base_mode.base import calculate_sha256, download_file, compare_two_times, get_modified_time, get_md5 import concurrent.futures plugins_name = "AMPY插件" class AmpyScribe(object): def __init__(self, port_name="COM37", baud_rate=115200, windows=None): self.port_name = port_name self.baud_rate = baud_rate self.windows = windows self.pyb = Pyboard(self.port_name, baudrate=self.baud_rate) def send_log(self, text): if self.windows: self.windows.send_log(text) # 运行本地脚本 def run_remote_script(self, script_path): """Run a script on the remote device.""" try: self.pyb.enter_raw_repl() result = self.pyb.execfile(script_path) print(result.decode()) return result.decode() except Exception as e: print(f"An error occurred while running the script: {e}") return None def close(self): if self.pyb is not None: self.pyb.close() self.pyb = None # 上传文件 def upload_file(self, local_path, remote_path, local_path_is_file=True): """ local_path:电脑本地路径 remote_path:单片机路径,/开头表示根目录 """ try: if local_path_is_file: # 打开文件并读取内容 with open(local_path, 'rb') as file: content = file.read() else: content = local_path # 创建一个 Files 对象用于文件操作 files = ampy.files.Files(self.pyb) # 将文件内容写入 MicroPython 设备 files.put(remote_path, content) except Exception as e: print(f"An error occurred while upload_file: {e}") # 获取远程文件 def get_file_from_device(self, remote_filename, local_filename): try: # 创建一个 Files 对象用于文件操作 files = ampy.files.Files(self.pyb) # 获取远程文件内容 content = files.get(remote_filename) # 将内容写入本地文件 with open(local_filename, 'wb') as file: file.write(content) print(f"文件 {remote_filename} 下载成功并保存为 {local_filename}") except Exception as e: print(f"get_file_from_device 发生错误: {e}") # 远程更新脚本 class RemoteUpdate(object): # instance = None # init_flag = None def __init__(self, port_name=None, top_windows=None, windows=None, is_test=False): # if self.init_flag: # return # else: # self.init_flag = True self.is_test = is_test self.port_name = port_name self.top_windows = top_windows self.windows = windows self.ampy_scribe = None self.root = os.getcwd() self.value = 0 # 基础数据mode self.data_mode = DataMode() self.mcu_code_path = "{}\micropython_online_code\mcu_code".format(os.getcwd()) def send_log(self, text): print(text) pass def show_text_info(self, text): print(text) if self.windows: self.windows.text_info_sign.emit({"text": text}) def show_progress_bar(self, add_value=None, value=None): if self.windows: if value: self.windows.progress_bar_sign.emit(value) else: self.value += add_value self.windows.progress_bar_sign.emit(self.value) def check_windows_state(self): if self.windows: if self.windows.state != 1: self.show_text_info("已取消") raise "主动停止" def run(self): """ 步骤: 1、进行设备终止,并连接 2、获取远程文件的md5合集 3、比对线上合集 4、整理差异内容,以及对应文件的has校验码 5、下发文件到临时目录 6、运行远程更新脚本 7、自动重启 """ self.show_progress_bar(add_value=5) self.show_text_info("开始检查处理,停止MCU") if self.top_windows.mcu.connect_state: self.stop_mcu() time.sleep(2) self.connect_pyboard() if self.ampy_scribe is None: self.show_text_info("串口链接失败,已退出") return # 删除远程的临时文件 self.show_text_info("开始 删除远程的临时文件") self.remove_remote_files() self.show_progress_bar(add_value=5) self.show_text_info("开始 获取MCU文件的md5合集") # 获取远程文件的md5合集 micropython_file_dict = self.get_remote_has() if micropython_file_dict is None: self.show_text_info("获取MCU文件的md5失败") return print("==============micropython_file_dict") print(micropython_file_dict) # 获取线上的数据 self.show_text_info("开始 获取线上的数据") online_files_dict = self.check_resources_download() self.show_progress_bar(add_value=5) if not online_files_dict: self.show_text_info("开始 获取线上的数据 失败") return # 线上数据与单片机文件进行比对 print("==============online_files_dict") print(online_files_dict) # 生成差异文件 self.show_text_info("比对差异文件数据") differential_files_dict = self.get_differential_files(micropython_file_dict=micropython_file_dict, online_files_dict=online_files_dict) print("==============differential_files_dict") print(differential_files_dict) ## 动态创建文件夹路径 # self.create_all_folder() if not differential_files_dict: self.show_progress_bar(value=95) self.show_text_info("MCU文件为最新无需更新,开始重启") self.restart() self.show_text_info("开始断开连接") self.ampy_scribe.close() # 尝试连接MCU self.mcu_connect() self.show_text_info("完成") self.show_progress_bar(value=100) return # 移动差异文件 self.show_text_info("开始下发差异文件,总数:{}".format(len(differential_files_dict))) to_move_files = self.upload_all_file(differential_files_dict) if not to_move_files: self.show_text_info("下发差异文件失败") return # 制作更新脚本 self.show_text_info("开始MCU主程序更新") result = self.update_from_temp() if result: if "更新成功" in result: self.show_text_info("更新成功,即将重启") print("单片机程序为最新,5秒后重启") # 重启 self.restart() self.show_text_info("开始断开连接") self.ampy_scribe.close() # 尝试连接MCU self.mcu_connect() self.show_progress_bar(value=100) return # 制作更新脚本 def update_from_temp(self): print("制作更新脚本") path = r"{}\custom_plugins\plugins\micropython_update\other_micropython_code\update_from_temp.py".format( os.getcwd()) return self.ampy_scribe.run_remote_script(path) # 创建所有文件夹路径 def create_all_folder(self): path = r"{}\custom_plugins\plugins\micropython_update\other_micropython_code\create_all_folders.py".format( os.getcwd()) self.ampy_scribe.run_remote_script(path) print("创建文件夹路径已完成") # 批量移动文件到单片机 def upload_all_file(self, files_dict): # k 示例:sensor/other_sensor/nrf24.py settings.py to_move_files = {} f = True total_s = 100 - self.value - 5 total_len = len(files_dict) step = int(total_s / total_len) for relative_file_path, value in files_dict.items(): local_path = "{}\{}".format(self.mcu_code_path, relative_file_path) # 注意路径格式 / 开头 remote_path = r"/temp/{}".format(relative_file_path) print("移动文件", local_path, remote_path) try: self.ampy_scribe.upload_file(local_path, remote_path) # to_move_files[remote_path] = value["file_sha256"] # 使用本地has to_move_files[remote_path] = calculate_sha256(local_path) self.show_text_info("文件:{}下发成功".format(relative_file_path)) except BaseException as e: self.show_text_info("文件:{}下发失败:{}".format(relative_file_path, e)) f = False return None self.show_progress_bar(add_value=step) if f: # 创建配置文件 print("创建配置文件") self.show_text_info("创建配置md5文件 并开始下发") _to_move_files_config = json.dumps(to_move_files) self.ampy_scribe.upload_file(local_path=_to_move_files_config, remote_path=r"/temp/_to_move_files_config.json", local_path_is_file=False) self.show_text_info("创建配置md5文件 并下发成功") if f: return to_move_files else: return None # 生成差异文件 def get_differential_files(self, micropython_file_dict: dict, online_files_dict: dict): # 生成差异文件 ignore_path = ["machine.py", "boot.py"] # "main.py" differential_files_dict = {} for online_file_path, value in online_files_dict.items(): if online_file_path in ignore_path: continue online_file_sha256 = calculate_sha256( "{}\micropython_online_code\mcu_code\{}".format(os.getcwd(), online_file_path)) if online_file_path not in micropython_file_dict: differential_files_dict[online_file_path] = {"file_sha256": online_file_sha256} print("文件不存在:{}".format(online_file_path)) else: if online_file_sha256 != micropython_file_dict[online_file_path]["file_sha256"]: differential_files_dict[online_file_path] = {"file_sha256": online_file_sha256} else: print("文件相同:{}".format(online_file_path)) return differential_files_dict # 删除远程的临时文件 def remove_remote_files(self): script_path = r"{}\custom_plugins\plugins\micropython_update\other_micropython_code\delete_all_temp.py".format( self.root) result = self.ampy_scribe.run_remote_script(script_path) print(result) def run_run_remote_script_restart(self): script_path = r"{}\custom_plugins\plugins\micropython_update\other_micropython_code\restart.py".format( self.root) self.ampy_scribe.run_remote_script(script_path) # 重启单片机 def restart(self): try: # 使用上下文管理器确保线程池会正确关闭 with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor: future = executor.submit(self.run_run_remote_script_restart) try: result = future.result(timeout=1) # 等待最多5秒 except BaseException as e: print("自动超时") finally: # 如果需要立即释放资源,可以显式地调用 shutdown。 # 在 with 语句块结束时也会自动调用 shutdown。 # executor.shutdown(wait=False) print("shutdown") print("-----------end--------------") except BaseException as e: print(e) print("程序遇到了一个致命错误") def mcu_connect(self): self.show_text_info("5秒后 尝试连接MCU进行恢复") time.sleep(5) self.top_windows.mcu.to_connect_com(port_name=self.port_name) self.show_progress_bar(value=100) # 检查file_sha256 并进行下载更新文件 def check_resources_download(self): if self.is_test is False: if not settings.IsLogin: print("当前未登录") return # 请求获取配置文件 online_data = self.data_mode.get_all_resource_config(plugins_name="plugins_micropython") # print("check_resources_download") # print(data) if not online_data: text = "check_resources_download data为空" self.send_log(text) elif "data" not in online_data: text = "check_resources_download data不存在" self.send_log(text) else: # 步骤: # 1、获取线上数据 # 2、如本地MD5不同,但是修改时间比线上的最新,则也不更新下载。 for relative_file_path, value in online_data["data"].items(): file_path = "{}\{}".format(self.mcu_code_path, relative_file_path) if os.path.exists(file_path): file_md5 = get_md5(file_path) if file_md5 != value["file_md5"]: file_modified_time = get_modified_time(file_path) if compare_two_times(file_modified_time, online_data["update_time"]) == "left_new": continue else: text = "has256 不同 开始下载:{}".format(file_path) self.send_log(text) download_file(url=value["url"], file_path=file_path) else: pass else: text = "文件不存在 开始下载:{}".format(file_path) self.send_log(text) download_file(url=value["url"], file_path=file_path) if online_data: return online_data["data"] return None def get_remote_has(self): self.ampy_scribe: AmpyScribe script_path = r"{}\custom_plugins\plugins\micropython_update\other_micropython_code\get_remote_has256.py".format( self.root) result = self.ampy_scribe.run_remote_script(script_path) if result is None: return None has_dict = {} try: result_list = result.split("\n") for i in result_list: if "@" not in i: continue path, _has = i.split("@") has_dict[path[1:]] = {"file_sha256": _has.replace("\r", "")} return has_dict except BaseException as e: return None def stop_mcu(self): if self.is_test: return if self.ampy_scribe is None: self.top_windows.mcu.stop_mcu() def connect_pyboard(self): if self.is_test: self.ampy_scribe = AmpyScribe(port_name=self.port_name, windows=None) print("ampy_scribe 接连成功") return if self.ampy_scribe is None: if self.port_name: pass else: if self.top_windows.mcu.connect_state: self.port_name = self.top_windows.mcu.port_name print("port_name:", self.port_name) if self.port_name: if self.top_windows.mcu.connect_state: self.top_windows.mcu.close_connect() print("close_connect") time.sleep(3) try: self.ampy_scribe = AmpyScribe(port_name=self.port_name, windows=None) print("ampy_scribe 接连成功") except BaseException as e: print("ampy_scribe 连接失败", e) self.ampy_scribe = None def __new1__(cls, *args, **kwargs): """如果当前没有实例时,调用父类__new__方法,生成示例,有则返回保存的内存地址。""" if not cls.instance: cls.instance = super().__new__(cls) return cls.instance