ampy_plugin.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423
  1. import json
  2. import time
  3. if __name__ == '__main__':
  4. raise "不允许在当前目录下运行"
  5. import os
  6. import settings
  7. from ampy.pyboard import Pyboard
  8. import ampy.files
  9. from module.view_control.main_window.data_main import DataMode
  10. from module.base_mode.base import calculate_sha256, download_file, compare_two_times, get_modified_time, get_md5
  11. import concurrent.futures
  12. plugins_name = "AMPY插件"
  13. class AmpyScribe(object):
  14. def __init__(self, port_name="COM37", baud_rate=115200, windows=None):
  15. self.port_name = port_name
  16. self.baud_rate = baud_rate
  17. self.windows = windows
  18. self.pyb = Pyboard(self.port_name, baudrate=self.baud_rate)
  19. def send_log(self, text):
  20. if self.windows:
  21. self.windows.send_log(text)
  22. # 运行本地脚本
  23. def run_remote_script(self, script_path):
  24. """Run a script on the remote device."""
  25. try:
  26. self.pyb.enter_raw_repl()
  27. result = self.pyb.execfile(script_path)
  28. print(result.decode())
  29. return result.decode()
  30. except Exception as e:
  31. print(f"An error occurred while running the script: {e}")
  32. return None
  33. def close(self):
  34. if self.pyb is not None:
  35. self.pyb.close()
  36. self.pyb = None
  37. # 上传文件
  38. def upload_file(self, local_path, remote_path, local_path_is_file=True):
  39. """
  40. local_path:电脑本地路径
  41. remote_path:单片机路径,/开头表示根目录
  42. """
  43. try:
  44. if local_path_is_file:
  45. # 打开文件并读取内容
  46. with open(local_path, 'rb') as file:
  47. content = file.read()
  48. else:
  49. content = local_path
  50. # 创建一个 Files 对象用于文件操作
  51. files = ampy.files.Files(self.pyb)
  52. # 将文件内容写入 MicroPython 设备
  53. files.put(remote_path, content)
  54. except Exception as e:
  55. print(f"An error occurred while upload_file: {e}")
  56. # 获取远程文件
  57. def get_file_from_device(self, remote_filename, local_filename):
  58. try:
  59. # 创建一个 Files 对象用于文件操作
  60. files = ampy.files.Files(self.pyb)
  61. # 获取远程文件内容
  62. content = files.get(remote_filename)
  63. # 将内容写入本地文件
  64. with open(local_filename, 'wb') as file:
  65. file.write(content)
  66. print(f"文件 {remote_filename} 下载成功并保存为 {local_filename}")
  67. except Exception as e:
  68. print(f"get_file_from_device 发生错误: {e}")
  69. # 远程更新脚本
  70. class RemoteUpdate(object):
  71. # instance = None
  72. # init_flag = None
  73. def __init__(self, port_name=None, top_windows=None, windows=None, is_test=False):
  74. # if self.init_flag:
  75. # return
  76. # else:
  77. # self.init_flag = True
  78. self.is_test = is_test
  79. self.port_name = port_name
  80. self.top_windows = top_windows
  81. self.windows = windows
  82. self.ampy_scribe = None
  83. self.root = os.getcwd()
  84. self.value = 0
  85. # 基础数据mode
  86. self.data_mode = DataMode()
  87. self.mcu_code_path = "{}\micropython_online_code\mcu_code".format(os.getcwd())
  88. def send_log(self, text):
  89. print(text)
  90. pass
  91. def show_text_info(self, text):
  92. print(text)
  93. if self.windows:
  94. self.windows.text_info_sign.emit({"text": text})
  95. def show_progress_bar(self, add_value=None, value=None):
  96. if self.windows:
  97. if value:
  98. self.windows.progress_bar_sign.emit(value)
  99. else:
  100. self.value += add_value
  101. self.windows.progress_bar_sign.emit(self.value)
  102. def check_windows_state(self):
  103. if self.windows:
  104. if self.windows.state != 1:
  105. self.show_text_info("已取消")
  106. raise "主动停止"
  107. def run(self):
  108. """
  109. 步骤:
  110. 1、进行设备终止,并连接
  111. 2、获取远程文件的md5合集
  112. 3、比对线上合集
  113. 4、整理差异内容,以及对应文件的has校验码
  114. 5、下发文件到临时目录
  115. 6、运行远程更新脚本
  116. 7、自动重启
  117. """
  118. self.show_progress_bar(add_value=5)
  119. self.show_text_info("开始检查处理,停止MCU")
  120. if self.top_windows.mcu.connect_state:
  121. self.stop_mcu()
  122. time.sleep(2)
  123. self.connect_pyboard()
  124. if self.ampy_scribe is None:
  125. self.show_text_info("串口链接失败,已退出")
  126. return
  127. # 删除远程的临时文件
  128. self.show_text_info("开始 删除远程的临时文件")
  129. self.remove_remote_files()
  130. self.show_progress_bar(add_value=5)
  131. self.show_text_info("开始 获取MCU文件的md5合集")
  132. # 获取远程文件的md5合集
  133. micropython_file_dict = self.get_remote_has()
  134. if micropython_file_dict is None:
  135. self.show_text_info("获取MCU文件的md5失败")
  136. return
  137. print("==============micropython_file_dict")
  138. print(micropython_file_dict)
  139. # 获取线上的数据
  140. self.show_text_info("开始 获取线上的数据")
  141. online_files_dict = self.check_resources_download()
  142. self.show_progress_bar(add_value=5)
  143. if not online_files_dict:
  144. self.show_text_info("开始 获取线上的数据 失败")
  145. return
  146. # 线上数据与单片机文件进行比对
  147. print("==============online_files_dict")
  148. print(online_files_dict)
  149. # 生成差异文件
  150. self.show_text_info("比对差异文件数据")
  151. differential_files_dict = self.get_differential_files(micropython_file_dict=micropython_file_dict,
  152. online_files_dict=online_files_dict)
  153. print("==============differential_files_dict")
  154. print(differential_files_dict)
  155. ## 动态创建文件夹路径
  156. # self.create_all_folder()
  157. if not differential_files_dict:
  158. self.show_progress_bar(value=95)
  159. self.show_text_info("MCU文件为最新无需更新,开始重启")
  160. self.restart()
  161. self.show_text_info("开始断开连接")
  162. self.ampy_scribe.close()
  163. # 尝试连接MCU
  164. self.mcu_connect()
  165. self.show_text_info("完成")
  166. self.show_progress_bar(value=100)
  167. return
  168. # 移动差异文件
  169. self.show_text_info("开始下发差异文件,总数:{}".format(len(differential_files_dict)))
  170. to_move_files = self.upload_all_file(differential_files_dict)
  171. if not to_move_files:
  172. self.show_text_info("下发差异文件失败")
  173. return
  174. # 制作更新脚本
  175. self.show_text_info("开始MCU主程序更新")
  176. result = self.update_from_temp()
  177. if result:
  178. if "更新成功" in result:
  179. self.show_text_info("更新成功,即将重启")
  180. print("单片机程序为最新,5秒后重启")
  181. # 重启
  182. self.restart()
  183. self.show_text_info("开始断开连接")
  184. self.ampy_scribe.close()
  185. # 尝试连接MCU
  186. self.mcu_connect()
  187. self.show_progress_bar(value=100)
  188. return
  189. # 制作更新脚本
  190. def update_from_temp(self):
  191. print("制作更新脚本")
  192. path = r"{}\custom_plugins\plugins\micropython_update\other_micropython_code\update_from_temp.py".format(
  193. os.getcwd())
  194. return self.ampy_scribe.run_remote_script(path)
  195. # 创建所有文件夹路径
  196. def create_all_folder(self):
  197. path = r"{}\custom_plugins\plugins\micropython_update\other_micropython_code\create_all_folders.py".format(
  198. os.getcwd())
  199. self.ampy_scribe.run_remote_script(path)
  200. print("创建文件夹路径已完成")
  201. # 批量移动文件到单片机
  202. def upload_all_file(self, files_dict):
  203. # k 示例:sensor/other_sensor/nrf24.py settings.py
  204. to_move_files = {}
  205. f = True
  206. total_s = 100 - self.value - 5
  207. total_len = len(files_dict)
  208. step = int(total_s / total_len)
  209. for relative_file_path, value in files_dict.items():
  210. local_path = "{}\{}".format(self.mcu_code_path, relative_file_path)
  211. # 注意路径格式 / 开头
  212. remote_path = r"/temp/{}".format(relative_file_path)
  213. print("移动文件", local_path, remote_path)
  214. try:
  215. self.ampy_scribe.upload_file(local_path, remote_path)
  216. # to_move_files[remote_path] = value["file_sha256"]
  217. # 使用本地has
  218. to_move_files[remote_path] = calculate_sha256(local_path)
  219. self.show_text_info("文件:{}下发成功".format(relative_file_path))
  220. except BaseException as e:
  221. self.show_text_info("文件:{}下发失败:{}".format(relative_file_path, e))
  222. f = False
  223. return None
  224. self.show_progress_bar(add_value=step)
  225. if f:
  226. # 创建配置文件
  227. print("创建配置文件")
  228. self.show_text_info("创建配置md5文件 并开始下发")
  229. _to_move_files_config = json.dumps(to_move_files)
  230. self.ampy_scribe.upload_file(local_path=_to_move_files_config,
  231. remote_path=r"/temp/_to_move_files_config.json", local_path_is_file=False)
  232. self.show_text_info("创建配置md5文件 并下发成功")
  233. if f:
  234. return to_move_files
  235. else:
  236. return None
  237. # 生成差异文件
  238. def get_differential_files(self, micropython_file_dict: dict, online_files_dict: dict):
  239. # 生成差异文件
  240. ignore_path = ["machine.py", "boot.py"] # "main.py"
  241. differential_files_dict = {}
  242. for online_file_path, value in online_files_dict.items():
  243. if online_file_path in ignore_path:
  244. continue
  245. online_file_sha256 = calculate_sha256(
  246. "{}\micropython_online_code\mcu_code\{}".format(os.getcwd(), online_file_path))
  247. if online_file_path not in micropython_file_dict:
  248. differential_files_dict[online_file_path] = {"file_sha256": online_file_sha256}
  249. print("文件不存在:{}".format(online_file_path))
  250. else:
  251. if online_file_sha256 != micropython_file_dict[online_file_path]["file_sha256"]:
  252. differential_files_dict[online_file_path] = {"file_sha256": online_file_sha256}
  253. else:
  254. print("文件相同:{}".format(online_file_path))
  255. return differential_files_dict
  256. # 删除远程的临时文件
  257. def remove_remote_files(self):
  258. script_path = r"{}\custom_plugins\plugins\micropython_update\other_micropython_code\delete_all_temp.py".format(
  259. self.root)
  260. result = self.ampy_scribe.run_remote_script(script_path)
  261. print(result)
  262. def run_run_remote_script_restart(self):
  263. script_path = r"{}\custom_plugins\plugins\micropython_update\other_micropython_code\restart.py".format(
  264. self.root)
  265. self.ampy_scribe.run_remote_script(script_path)
  266. # 重启单片机
  267. def restart(self):
  268. try:
  269. # 使用上下文管理器确保线程池会正确关闭
  270. with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
  271. future = executor.submit(self.run_run_remote_script_restart)
  272. try:
  273. result = future.result(timeout=1) # 等待最多5秒
  274. except BaseException as e:
  275. print("自动超时")
  276. finally:
  277. # 如果需要立即释放资源,可以显式地调用 shutdown。
  278. # 在 with 语句块结束时也会自动调用 shutdown。
  279. # executor.shutdown(wait=False)
  280. print("shutdown")
  281. print("-----------end--------------")
  282. except BaseException as e:
  283. print(e)
  284. print("程序遇到了一个致命错误")
  285. def mcu_connect(self):
  286. self.show_text_info("5秒后 尝试连接MCU进行恢复")
  287. time.sleep(5)
  288. self.top_windows.mcu.to_connect_com(port_name=self.port_name)
  289. self.show_progress_bar(value=100)
  290. # 检查file_sha256 并进行下载更新文件
  291. def check_resources_download(self):
  292. if self.is_test is False:
  293. if not settings.IsLogin:
  294. print("当前未登录")
  295. return
  296. # 请求获取配置文件
  297. online_data = self.data_mode.get_all_resource_config(plugins_name="plugins_micropython")
  298. # print("check_resources_download")
  299. # print(data)
  300. if not online_data:
  301. text = "check_resources_download data为空"
  302. self.send_log(text)
  303. elif "data" not in online_data:
  304. text = "check_resources_download data不存在"
  305. self.send_log(text)
  306. else:
  307. # 步骤:
  308. # 1、获取线上数据
  309. # 2、如本地MD5不同,但是修改时间比线上的最新,则也不更新下载。
  310. for relative_file_path, value in online_data["data"].items():
  311. file_path = "{}\{}".format(self.mcu_code_path, relative_file_path)
  312. if os.path.exists(file_path):
  313. file_md5 = get_md5(file_path)
  314. if file_md5 != value["file_md5"]:
  315. file_modified_time = get_modified_time(file_path)
  316. if compare_two_times(file_modified_time, online_data["update_time"]) == "left_new":
  317. continue
  318. else:
  319. text = "has256 不同 开始下载:{}".format(file_path)
  320. self.send_log(text)
  321. download_file(url=value["url"], file_path=file_path)
  322. else:
  323. pass
  324. else:
  325. text = "文件不存在 开始下载:{}".format(file_path)
  326. self.send_log(text)
  327. download_file(url=value["url"], file_path=file_path)
  328. if online_data:
  329. return online_data["data"]
  330. return None
  331. def get_remote_has(self):
  332. self.ampy_scribe: AmpyScribe
  333. script_path = r"{}\custom_plugins\plugins\micropython_update\other_micropython_code\get_remote_has256.py".format(
  334. self.root)
  335. result = self.ampy_scribe.run_remote_script(script_path)
  336. if result is None:
  337. return None
  338. has_dict = {}
  339. try:
  340. result_list = result.split("\n")
  341. for i in result_list:
  342. if "@" not in i:
  343. continue
  344. path, _has = i.split("@")
  345. has_dict[path[1:]] = {"file_sha256": _has.replace("\r", "")}
  346. return has_dict
  347. except BaseException as e:
  348. return None
  349. def stop_mcu(self):
  350. if self.is_test:
  351. return
  352. if self.ampy_scribe is None:
  353. self.top_windows.mcu.stop_mcu()
  354. def connect_pyboard(self):
  355. if self.is_test:
  356. self.ampy_scribe = AmpyScribe(port_name=self.port_name, windows=None)
  357. print("ampy_scribe 接连成功")
  358. return
  359. if self.ampy_scribe is None:
  360. if self.port_name:
  361. pass
  362. else:
  363. if self.top_windows.mcu.connect_state:
  364. self.port_name = self.top_windows.mcu.port_name
  365. print("port_name:", self.port_name)
  366. if self.port_name:
  367. if self.top_windows.mcu.connect_state:
  368. self.top_windows.mcu.close_connect()
  369. print("close_connect")
  370. time.sleep(3)
  371. try:
  372. self.ampy_scribe = AmpyScribe(port_name=self.port_name, windows=None)
  373. print("ampy_scribe 接连成功")
  374. except BaseException as e:
  375. print("ampy_scribe 连接失败", e)
  376. self.ampy_scribe = None
  377. def __new1__(cls, *args, **kwargs):
  378. """如果当前没有实例时,调用父类__new__方法,生成示例,有则返回保存的内存地址。"""
  379. if not cls.instance:
  380. cls.instance = super().__new__(cls)
  381. return cls.instance