ampy_plugin.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418
  1. import json
  2. import time
  3. if __name__ == '__main__':
  4. raise "不允许在当前目录下运行"
  5. import os
  6. import settings
  7. from ampy.pyboard import Pyboard
  8. import ampy.files
  9. from module.view_control.main_window.data_main import DataMode
  10. from module.base_mode.base import calculate_sha256, download_file, compare_two_times, get_modified_time
  11. import concurrent.futures
  12. plugins_name = "AMPY插件"
  13. class AmpyScribe(object):
  14. def __init__(self, port_name="COM37", baud_rate=115200, windows=None):
  15. self.port_name = port_name
  16. self.baud_rate = baud_rate
  17. self.windows = windows
  18. self.pyb = Pyboard(self.port_name, baudrate=self.baud_rate)
  19. def send_log(self, text):
  20. if self.windows:
  21. self.windows.send_log(text)
  22. # 运行本地脚本
  23. def run_remote_script(self, script_path):
  24. """Run a script on the remote device."""
  25. try:
  26. self.pyb.enter_raw_repl()
  27. result = self.pyb.execfile(script_path)
  28. print(result.decode())
  29. return result.decode()
  30. except Exception as e:
  31. print(f"An error occurred while running the script: {e}")
  32. return None
  33. def close(self):
  34. if self.pyb is not None:
  35. self.pyb.close()
  36. self.pyb = None
  37. # 上传文件
  38. def upload_file(self, local_path, remote_path, local_path_is_file=True):
  39. """
  40. local_path:电脑本地路径
  41. remote_path:单片机路径,/开头表示根目录
  42. """
  43. try:
  44. if local_path_is_file:
  45. # 打开文件并读取内容
  46. with open(local_path, 'rb') as file:
  47. content = file.read()
  48. else:
  49. content = local_path
  50. # 创建一个 Files 对象用于文件操作
  51. files = ampy.files.Files(self.pyb)
  52. # 将文件内容写入 MicroPython 设备
  53. files.put(remote_path, content)
  54. except Exception as e:
  55. print(f"An error occurred while upload_file: {e}")
  56. # 获取远程文件
  57. def get_file_from_device(self, remote_filename, local_filename):
  58. try:
  59. # 创建一个 Files 对象用于文件操作
  60. files = ampy.files.Files(self.pyb)
  61. # 获取远程文件内容
  62. content = files.get(remote_filename)
  63. # 将内容写入本地文件
  64. with open(local_filename, 'wb') as file:
  65. file.write(content)
  66. print(f"文件 {remote_filename} 下载成功并保存为 {local_filename}")
  67. except Exception as e:
  68. print(f"get_file_from_device 发生错误: {e}")
  69. # 远程更新脚本
  70. class RemoteUpdate(object):
  71. # instance = None
  72. # init_flag = None
  73. def __init__(self, port_name=None, top_windows=None, windows=None, is_test=False):
  74. # if self.init_flag:
  75. # return
  76. # else:
  77. # self.init_flag = True
  78. self.is_test = is_test
  79. self.port_name = port_name
  80. self.top_windows = top_windows
  81. self.windows = windows
  82. self.ampy_scribe = None
  83. self.root = os.getcwd()
  84. self.value = 0
  85. # 基础数据mode
  86. self.data_mode = DataMode()
  87. self.mcu_code_path = "{}\micropython_online_code\mcu_code".format(os.getcwd())
  88. def send_log(self, text):
  89. print(text)
  90. pass
  91. def show_text_info(self, text):
  92. print(text)
  93. if self.windows:
  94. self.windows.text_info_sign.emit({"text": text})
  95. def show_progress_bar(self, add_value=None, value=None):
  96. if self.windows:
  97. if value:
  98. self.windows.progress_bar_sign.emit(value)
  99. else:
  100. self.value += add_value
  101. self.windows.progress_bar_sign.emit(self.value)
  102. def check_windows_state(self):
  103. if self.windows:
  104. if self.windows.state != 1:
  105. self.show_text_info("已取消")
  106. raise "主动停止"
  107. def run(self):
  108. """
  109. 步骤:
  110. 1、进行设备终止,并连接
  111. 2、获取远程文件的md5合集
  112. 3、比对线上合集
  113. 4、整理差异内容,以及对应文件的has校验码
  114. 5、下发文件到临时目录
  115. 6、运行远程更新脚本
  116. 7、自动重启
  117. """
  118. self.show_progress_bar(add_value=5)
  119. self.show_text_info("开始检查处理,停止MCU")
  120. self.stop_mcu()
  121. time.sleep(0.5)
  122. self.connect_pyboard()
  123. if self.ampy_scribe is None:
  124. self.show_text_info("串口链接失败,已退出")
  125. return
  126. # 删除远程的临时文件
  127. self.show_text_info("开始 删除远程的临时文件")
  128. self.remove_remote_files()
  129. self.show_progress_bar(add_value=5)
  130. self.show_text_info("开始 获取MCU文件的md5合集")
  131. # 获取远程文件的md5合集
  132. micropython_file_dict = self.get_remote_has()
  133. if micropython_file_dict is None:
  134. self.show_text_info("获取MCU文件的md5失败")
  135. return
  136. print("==============micropython_file_dict")
  137. print(micropython_file_dict)
  138. # 获取线上的数据
  139. self.show_text_info("开始 获取线上的数据")
  140. online_files_dict = self.check_resources_download()
  141. self.show_progress_bar(add_value=5)
  142. if not online_files_dict:
  143. self.show_text_info("开始 获取线上的数据 失败")
  144. return
  145. # 线上数据与单片机文件进行比对
  146. print("==============online_files_dict")
  147. print(online_files_dict)
  148. # 生成差异文件
  149. self.show_text_info("比对差异文件数据")
  150. differential_files_dict = self.get_differential_files(micropython_file_dict=micropython_file_dict,
  151. online_files_dict=online_files_dict)
  152. print("==============differential_files_dict")
  153. print(differential_files_dict)
  154. ## 动态创建文件夹路径
  155. # self.create_all_folder()
  156. if not differential_files_dict:
  157. self.show_progress_bar(value=95)
  158. self.show_text_info("MCU文件为最新无需更新,开始重启")
  159. self.restart()
  160. self.show_text_info("开始断开连接")
  161. self.ampy_scribe.close()
  162. # 尝试连接MCU
  163. self.mcu_connect()
  164. self.show_text_info("完成")
  165. self.show_progress_bar(value=100)
  166. return
  167. # 移动差异文件
  168. self.show_text_info("开始下发差异文件,总数:{}".format(len(differential_files_dict)))
  169. to_move_files = self.upload_all_file(differential_files_dict)
  170. if not to_move_files:
  171. self.show_text_info("下发差异文件失败")
  172. return
  173. # 制作更新脚本
  174. self.show_text_info("开始MCU主程序更新")
  175. result = self.update_from_temp()
  176. if result:
  177. if "更新成功" in result:
  178. self.show_text_info("更新成功,即将重启")
  179. print("单片机程序为最新,5秒后重启")
  180. # 重启
  181. self.restart()
  182. self.show_text_info("开始断开连接")
  183. self.ampy_scribe.close()
  184. # 尝试连接MCU
  185. self.mcu_connect()
  186. self.show_progress_bar(value=100)
  187. return
  188. # 制作更新脚本
  189. def update_from_temp(self):
  190. print("制作更新脚本")
  191. path = r"{}\custom_plugins\plugins\micropython_update\other_micropython_code\update_from_temp.py".format(
  192. os.getcwd())
  193. return self.ampy_scribe.run_remote_script(path)
  194. # 创建所有文件夹路径
  195. def create_all_folder(self):
  196. path = r"{}\custom_plugins\plugins\micropython_update\other_micropython_code\create_all_folders.py".format(
  197. os.getcwd())
  198. self.ampy_scribe.run_remote_script(path)
  199. print("创建文件夹路径已完成")
  200. # 批量移动文件到单片机
  201. def upload_all_file(self, files_dict):
  202. # k 示例:sensor/other_sensor/nrf24.py settings.py
  203. to_move_files = {}
  204. f = True
  205. total_s = 100 - self.value - 5
  206. total_len = len(files_dict)
  207. step = int(total_s / total_len)
  208. for relative_file_path, value in files_dict.items():
  209. local_path = "{}\{}".format(self.mcu_code_path, relative_file_path)
  210. # 注意路径格式 / 开头
  211. remote_path = r"/temp/{}".format(relative_file_path)
  212. print("移动文件", local_path, remote_path)
  213. try:
  214. self.ampy_scribe.upload_file(local_path, remote_path)
  215. # to_move_files[remote_path] = value["file_sha256"]
  216. # 使用本地has
  217. to_move_files[remote_path] = calculate_sha256(local_path)
  218. self.show_text_info("文件:{}下发成功".format(relative_file_path))
  219. except BaseException as e:
  220. self.show_text_info("文件:{}下发失败:{}".format(relative_file_path, e))
  221. f = False
  222. return None
  223. self.show_progress_bar(add_value=step)
  224. if f:
  225. # 创建配置文件
  226. print("创建配置文件")
  227. self.show_text_info("创建配置md5文件 并开始下发")
  228. _to_move_files_config = json.dumps(to_move_files)
  229. self.ampy_scribe.upload_file(local_path=_to_move_files_config,
  230. remote_path=r"/temp/_to_move_files_config.json", local_path_is_file=False)
  231. self.show_text_info("创建配置md5文件 并下发成功")
  232. if f:
  233. return to_move_files
  234. else:
  235. return None
  236. # 生成差异文件
  237. def get_differential_files(self, micropython_file_dict: dict, online_files_dict: dict):
  238. # 生成差异文件
  239. ignore_path = ["machine.py", "boot.py"] # "main.py"
  240. differential_files_dict = {}
  241. for online_file_path, value in online_files_dict.items():
  242. if online_file_path in ignore_path:
  243. continue
  244. if online_file_path not in micropython_file_dict:
  245. differential_files_dict[online_file_path] = {"file_sha256": value["file_sha256"]}
  246. print("文件不存在:{}".format(online_file_path))
  247. else:
  248. if value["file_sha256"] != micropython_file_dict[online_file_path]["file_sha256"]:
  249. differential_files_dict[online_file_path] = {"file_sha256": value["file_sha256"]}
  250. else:
  251. print("文件相同:{}".format(online_file_path))
  252. return differential_files_dict
  253. # 删除远程的临时文件
  254. def remove_remote_files(self):
  255. script_path = r"{}\custom_plugins\plugins\micropython_update\other_micropython_code\delete_all_temp.py".format(
  256. self.root)
  257. result = self.ampy_scribe.run_remote_script(script_path)
  258. print(result)
  259. def run_run_remote_script_restart(self):
  260. script_path = r"{}\custom_plugins\plugins\micropython_update\other_micropython_code\restart.py".format(
  261. self.root)
  262. self.ampy_scribe.run_remote_script(script_path)
  263. # 重启单片机
  264. def restart(self):
  265. try:
  266. # 使用上下文管理器确保线程池会正确关闭
  267. with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
  268. future = executor.submit(self.run_run_remote_script_restart)
  269. try:
  270. result = future.result(timeout=1) # 等待最多5秒
  271. except BaseException as e:
  272. print("自动超时")
  273. finally:
  274. # 如果需要立即释放资源,可以显式地调用 shutdown。
  275. # 在 with 语句块结束时也会自动调用 shutdown。
  276. # executor.shutdown(wait=False)
  277. print("shutdown")
  278. print("-----------end--------------")
  279. except BaseException as e:
  280. print(e)
  281. print("程序遇到了一个致命错误")
  282. def mcu_connect(self):
  283. self.show_text_info("5秒后 尝试连接MCU进行恢复")
  284. time.sleep(5)
  285. self.top_windows.mcu.to_connect_com(port_name=self.port_name)
  286. self.show_progress_bar(value=100)
  287. # 检查file_sha256 并进行下载更新文件
  288. def check_resources_download(self):
  289. if self.is_test is False:
  290. if not settings.IsLogin:
  291. print("当前未登录")
  292. return
  293. # 请求获取配置文件
  294. online_data = self.data_mode.get_all_resource_config(plugins_name="plugins_micropython")
  295. # print("check_resources_download")
  296. # print(data)
  297. if not online_data:
  298. text = "check_resources_download data为空"
  299. self.send_log(text)
  300. elif "data" not in online_data:
  301. text = "check_resources_download data不存在"
  302. self.send_log(text)
  303. else:
  304. # 步骤:
  305. # 1、获取线上数据
  306. # 2、如本地MD5不同,但是修改时间比线上的最新,则也不更新下载。
  307. for relative_file_path, value in online_data["data"].items():
  308. file_path = "{}\{}".format(self.mcu_code_path, relative_file_path)
  309. if os.path.exists(file_path):
  310. file_has256 = calculate_sha256(file_path)
  311. if file_has256 != value["file_sha256"]:
  312. file_modified_time = get_modified_time(file_path)
  313. if compare_two_times(file_modified_time, online_data["update_time"]) == "left_new":
  314. continue
  315. else:
  316. text = "has256 不同 开始下载:{}".format(file_path)
  317. self.send_log(text)
  318. download_file(url=value["url"], file_path=file_path)
  319. else:
  320. pass
  321. else:
  322. text = "文件不存在 开始下载:{}".format(file_path)
  323. self.send_log(text)
  324. download_file(url=value["url"], file_path=file_path)
  325. if online_data:
  326. return online_data["data"]
  327. return None
  328. def get_remote_has(self):
  329. self.ampy_scribe: AmpyScribe
  330. script_path = r"{}\custom_plugins\plugins\micropython_update\other_micropython_code\get_remote_has256.py".format(
  331. self.root)
  332. result = self.ampy_scribe.run_remote_script(script_path)
  333. if result is None:
  334. return None
  335. has_dict = {}
  336. try:
  337. result_list = result.split("\n")
  338. for i in result_list:
  339. if "@" not in i:
  340. continue
  341. path, _has = i.split("@")
  342. has_dict[path[1:]] = {"file_sha256": _has.replace("\r", "")}
  343. return has_dict
  344. except BaseException as e:
  345. return None
  346. def stop_mcu(self):
  347. if self.is_test:
  348. return
  349. if self.ampy_scribe is None:
  350. self.top_windows.mcu.stop_mcu()
  351. def connect_pyboard(self):
  352. if self.is_test:
  353. self.ampy_scribe = AmpyScribe(port_name=self.port_name, windows=None)
  354. print("ampy_scribe 接连成功")
  355. return
  356. if self.ampy_scribe is None:
  357. if self.port_name:
  358. pass
  359. else:
  360. self.port_name = self.top_windows.mcu.port_name
  361. print("port_name:", self.port_name)
  362. if self.port_name:
  363. self.top_windows.mcu.close_connect()
  364. time.sleep(3)
  365. try:
  366. self.ampy_scribe = AmpyScribe(port_name=self.port_name, windows=None)
  367. print("ampy_scribe 接连成功")
  368. except BaseException as e:
  369. print("ampy_scribe 连接失败", e)
  370. self.ampy_scribe = None
  371. def __new1__(cls, *args, **kwargs):
  372. """如果当前没有实例时,调用父类__new__方法,生成示例,有则返回保存的内存地址。"""
  373. if not cls.instance:
  374. cls.instance = super().__new__(cls)
  375. return cls.instance