| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423 |
- import json
- import time
- if __name__ == '__main__':
- raise "不允许在当前目录下运行"
- import os
- import settings
- from ampy.pyboard import Pyboard
- import ampy.files
- from module.view_control.main_window.data_main import DataMode
- from module.base_mode.base import calculate_sha256, download_file, compare_two_times, get_modified_time, get_md5
- import concurrent.futures
- plugins_name = "AMPY插件"
- class AmpyScribe(object):
- def __init__(self, port_name="COM37", baud_rate=115200, windows=None):
- self.port_name = port_name
- self.baud_rate = baud_rate
- self.windows = windows
- self.pyb = Pyboard(self.port_name, baudrate=self.baud_rate)
- def send_log(self, text):
- if self.windows:
- self.windows.send_log(text)
- # 运行本地脚本
- def run_remote_script(self, script_path):
- """Run a script on the remote device."""
- try:
- self.pyb.enter_raw_repl()
- result = self.pyb.execfile(script_path)
- print(result.decode())
- return result.decode()
- except Exception as e:
- print(f"An error occurred while running the script: {e}")
- return None
- def close(self):
- if self.pyb is not None:
- self.pyb.close()
- self.pyb = None
- # 上传文件
- def upload_file(self, local_path, remote_path, local_path_is_file=True):
- """
- local_path:电脑本地路径
- remote_path:单片机路径,/开头表示根目录
- """
- try:
- if local_path_is_file:
- # 打开文件并读取内容
- with open(local_path, 'rb') as file:
- content = file.read()
- else:
- content = local_path
- # 创建一个 Files 对象用于文件操作
- files = ampy.files.Files(self.pyb)
- # 将文件内容写入 MicroPython 设备
- files.put(remote_path, content)
- except Exception as e:
- print(f"An error occurred while upload_file: {e}")
- # 获取远程文件
- def get_file_from_device(self, remote_filename, local_filename):
- try:
- # 创建一个 Files 对象用于文件操作
- files = ampy.files.Files(self.pyb)
- # 获取远程文件内容
- content = files.get(remote_filename)
- # 将内容写入本地文件
- with open(local_filename, 'wb') as file:
- file.write(content)
- print(f"文件 {remote_filename} 下载成功并保存为 {local_filename}")
- except Exception as e:
- print(f"get_file_from_device 发生错误: {e}")
- # 远程更新脚本
- class RemoteUpdate(object):
- # instance = None
- # init_flag = None
- def __init__(self, port_name=None, top_windows=None, windows=None, is_test=False):
- # if self.init_flag:
- # return
- # else:
- # self.init_flag = True
- self.is_test = is_test
- self.port_name = port_name
- self.top_windows = top_windows
- self.windows = windows
- self.ampy_scribe = None
- self.root = os.getcwd()
- self.value = 0
- # 基础数据mode
- self.data_mode = DataMode()
- self.mcu_code_path = "{}\micropython_online_code\mcu_code".format(os.getcwd())
- def send_log(self, text):
- print(text)
- pass
- def show_text_info(self, text):
- print(text)
- if self.windows:
- self.windows.text_info_sign.emit({"text": text})
- def show_progress_bar(self, add_value=None, value=None):
- if self.windows:
- if value:
- self.windows.progress_bar_sign.emit(value)
- else:
- self.value += add_value
- self.windows.progress_bar_sign.emit(self.value)
- def check_windows_state(self):
- if self.windows:
- if self.windows.state != 1:
- self.show_text_info("已取消")
- raise "主动停止"
- def run(self):
- """
- 步骤:
- 1、进行设备终止,并连接
- 2、获取远程文件的md5合集
- 3、比对线上合集
- 4、整理差异内容,以及对应文件的has校验码
- 5、下发文件到临时目录
- 6、运行远程更新脚本
- 7、自动重启
- """
- self.show_progress_bar(add_value=5)
- self.show_text_info("开始检查处理,停止MCU")
- if self.top_windows.mcu.connect_state:
- self.stop_mcu()
- time.sleep(2)
- self.connect_pyboard()
- if self.ampy_scribe is None:
- self.show_text_info("串口链接失败,已退出")
- return
- # 删除远程的临时文件
- self.show_text_info("开始 删除远程的临时文件")
- self.remove_remote_files()
- self.show_progress_bar(add_value=5)
- self.show_text_info("开始 获取MCU文件的md5合集")
- # 获取远程文件的md5合集
- micropython_file_dict = self.get_remote_has()
- if micropython_file_dict is None:
- self.show_text_info("获取MCU文件的md5失败")
- return
- print("==============micropython_file_dict")
- print(micropython_file_dict)
- # 获取线上的数据
- self.show_text_info("开始 获取线上的数据")
- online_files_dict = self.check_resources_download()
- self.show_progress_bar(add_value=5)
- if not online_files_dict:
- self.show_text_info("开始 获取线上的数据 失败")
- return
- # 线上数据与单片机文件进行比对
- print("==============online_files_dict")
- print(online_files_dict)
- # 生成差异文件
- self.show_text_info("比对差异文件数据")
- differential_files_dict = self.get_differential_files(micropython_file_dict=micropython_file_dict,
- online_files_dict=online_files_dict)
- print("==============differential_files_dict")
- print(differential_files_dict)
- ## 动态创建文件夹路径
- # self.create_all_folder()
- if not differential_files_dict:
- self.show_progress_bar(value=95)
- self.show_text_info("MCU文件为最新无需更新,开始重启")
- self.restart()
- self.show_text_info("开始断开连接")
- self.ampy_scribe.close()
- # 尝试连接MCU
- self.mcu_connect()
- self.show_text_info("完成")
- self.show_progress_bar(value=100)
- return
- # 移动差异文件
- self.show_text_info("开始下发差异文件,总数:{}".format(len(differential_files_dict)))
- to_move_files = self.upload_all_file(differential_files_dict)
- if not to_move_files:
- self.show_text_info("下发差异文件失败")
- return
- # 制作更新脚本
- self.show_text_info("开始MCU主程序更新")
- result = self.update_from_temp()
- if result:
- if "更新成功" in result:
- self.show_text_info("更新成功,即将重启")
- print("单片机程序为最新,5秒后重启")
- # 重启
- self.restart()
- self.show_text_info("开始断开连接")
- self.ampy_scribe.close()
- # 尝试连接MCU
- self.mcu_connect()
- self.show_progress_bar(value=100)
- return
- # 制作更新脚本
- def update_from_temp(self):
- print("制作更新脚本")
- path = r"{}\custom_plugins\plugins\micropython_update\other_micropython_code\update_from_temp.py".format(
- os.getcwd())
- return self.ampy_scribe.run_remote_script(path)
- # 创建所有文件夹路径
- def create_all_folder(self):
- path = r"{}\custom_plugins\plugins\micropython_update\other_micropython_code\create_all_folders.py".format(
- os.getcwd())
- self.ampy_scribe.run_remote_script(path)
- print("创建文件夹路径已完成")
- # 批量移动文件到单片机
- def upload_all_file(self, files_dict):
- # k 示例:sensor/other_sensor/nrf24.py settings.py
- to_move_files = {}
- f = True
- total_s = 100 - self.value - 5
- total_len = len(files_dict)
- step = int(total_s / total_len)
- for relative_file_path, value in files_dict.items():
- local_path = "{}\{}".format(self.mcu_code_path, relative_file_path)
- # 注意路径格式 / 开头
- remote_path = r"/temp/{}".format(relative_file_path)
- print("移动文件", local_path, remote_path)
- try:
- self.ampy_scribe.upload_file(local_path, remote_path)
- # to_move_files[remote_path] = value["file_sha256"]
- # 使用本地has
- to_move_files[remote_path] = calculate_sha256(local_path)
- self.show_text_info("文件:{}下发成功".format(relative_file_path))
- except BaseException as e:
- self.show_text_info("文件:{}下发失败:{}".format(relative_file_path, e))
- f = False
- return None
- self.show_progress_bar(add_value=step)
- if f:
- # 创建配置文件
- print("创建配置文件")
- self.show_text_info("创建配置md5文件 并开始下发")
- _to_move_files_config = json.dumps(to_move_files)
- self.ampy_scribe.upload_file(local_path=_to_move_files_config,
- remote_path=r"/temp/_to_move_files_config.json", local_path_is_file=False)
- self.show_text_info("创建配置md5文件 并下发成功")
- if f:
- return to_move_files
- else:
- return None
- # 生成差异文件
- def get_differential_files(self, micropython_file_dict: dict, online_files_dict: dict):
- # 生成差异文件
- ignore_path = ["machine.py", "boot.py"] # "main.py"
- differential_files_dict = {}
- for online_file_path, value in online_files_dict.items():
- if online_file_path in ignore_path:
- continue
- online_file_sha256 = calculate_sha256(
- "{}\micropython_online_code\mcu_code\{}".format(os.getcwd(), online_file_path))
- if online_file_path not in micropython_file_dict:
- differential_files_dict[online_file_path] = {"file_sha256": online_file_sha256}
- print("文件不存在:{}".format(online_file_path))
- else:
- if online_file_sha256 != micropython_file_dict[online_file_path]["file_sha256"]:
- differential_files_dict[online_file_path] = {"file_sha256": online_file_sha256}
- else:
- print("文件相同:{}".format(online_file_path))
- return differential_files_dict
- # 删除远程的临时文件
- def remove_remote_files(self):
- script_path = r"{}\custom_plugins\plugins\micropython_update\other_micropython_code\delete_all_temp.py".format(
- self.root)
- result = self.ampy_scribe.run_remote_script(script_path)
- print(result)
- def run_run_remote_script_restart(self):
- script_path = r"{}\custom_plugins\plugins\micropython_update\other_micropython_code\restart.py".format(
- self.root)
- self.ampy_scribe.run_remote_script(script_path)
- # 重启单片机
- def restart(self):
- try:
- # 使用上下文管理器确保线程池会正确关闭
- with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
- future = executor.submit(self.run_run_remote_script_restart)
- try:
- result = future.result(timeout=1) # 等待最多5秒
- except BaseException as e:
- print("自动超时")
- finally:
- # 如果需要立即释放资源,可以显式地调用 shutdown。
- # 在 with 语句块结束时也会自动调用 shutdown。
- # executor.shutdown(wait=False)
- print("shutdown")
- print("-----------end--------------")
- except BaseException as e:
- print(e)
- print("程序遇到了一个致命错误")
- def mcu_connect(self):
- self.show_text_info("5秒后 尝试连接MCU进行恢复")
- time.sleep(5)
- self.top_windows.mcu.to_connect_com(port_name=self.port_name)
- self.show_progress_bar(value=100)
- # 检查file_sha256 并进行下载更新文件
- def check_resources_download(self):
- if self.is_test is False:
- if not settings.IsLogin:
- print("当前未登录")
- return
- # 请求获取配置文件
- online_data = self.data_mode.get_all_resource_config(plugins_name="plugins_micropython")
- # print("check_resources_download")
- # print(data)
- if not online_data:
- text = "check_resources_download data为空"
- self.send_log(text)
- elif "data" not in online_data:
- text = "check_resources_download data不存在"
- self.send_log(text)
- else:
- # 步骤:
- # 1、获取线上数据
- # 2、如本地MD5不同,但是修改时间比线上的最新,则也不更新下载。
- for relative_file_path, value in online_data["data"].items():
- file_path = "{}\{}".format(self.mcu_code_path, relative_file_path)
- if os.path.exists(file_path):
- file_md5 = get_md5(file_path)
- if file_md5 != value["file_md5"]:
- file_modified_time = get_modified_time(file_path)
- if compare_two_times(file_modified_time, online_data["update_time"]) == "left_new":
- continue
- else:
- text = "has256 不同 开始下载:{}".format(file_path)
- self.send_log(text)
- download_file(url=value["url"], file_path=file_path)
- else:
- pass
- else:
- text = "文件不存在 开始下载:{}".format(file_path)
- self.send_log(text)
- download_file(url=value["url"], file_path=file_path)
- if online_data:
- return online_data["data"]
- return None
- def get_remote_has(self):
- self.ampy_scribe: AmpyScribe
- script_path = r"{}\custom_plugins\plugins\micropython_update\other_micropython_code\get_remote_has256.py".format(
- self.root)
- result = self.ampy_scribe.run_remote_script(script_path)
- if result is None:
- return None
- has_dict = {}
- try:
- result_list = result.split("\n")
- for i in result_list:
- if "@" not in i:
- continue
- path, _has = i.split("@")
- has_dict[path[1:]] = {"file_sha256": _has.replace("\r", "")}
- return has_dict
- except BaseException as e:
- return None
- def stop_mcu(self):
- if self.is_test:
- return
- if self.ampy_scribe is None:
- self.top_windows.mcu.stop_mcu()
- def connect_pyboard(self):
- if self.is_test:
- self.ampy_scribe = AmpyScribe(port_name=self.port_name, windows=None)
- print("ampy_scribe 接连成功")
- return
- if self.ampy_scribe is None:
- if self.port_name:
- pass
- else:
- if self.top_windows.mcu.connect_state:
- self.port_name = self.top_windows.mcu.port_name
- print("port_name:", self.port_name)
- if self.port_name:
- if self.top_windows.mcu.connect_state:
- self.top_windows.mcu.close_connect()
- print("close_connect")
- time.sleep(3)
- try:
- self.ampy_scribe = AmpyScribe(port_name=self.port_name, windows=None)
- print("ampy_scribe 接连成功")
- except BaseException as e:
- print("ampy_scribe 连接失败", e)
- self.ampy_scribe = None
- def __new1__(cls, *args, **kwargs):
- """如果当前没有实例时,调用父类__new__方法,生成示例,有则返回保存的内存地址。"""
- if not cls.instance:
- cls.instance = super().__new__(cls)
- return cls.instance
|