api.py 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650
  1. from natsort.natsort import order_by_index
  2. from sqlalchemy import func
  3. from models import *
  4. import requests
  5. import json
  6. from logger import logger
  7. from serial.tools import list_ports
  8. from model import PhotoRecord
  9. import settings, datetime
  10. import pandas as pd
  11. from utils.hlm_http_request import forward_request
  12. from utils.utils_func import check_path
  13. from sockets.socket_client import socket_manager
  14. from mcu.DeviceControl import DeviceControl
  15. import time, shutil, os
  16. from sqlalchemy import and_, asc, desc
  17. from functools import partial
  18. from service.deal_image import DealImage
  19. from databases import DeviceConfig, SysConfigs, SqlQuery, CRUD, select
  20. from service.run_main import RunMain
  21. import importlib
  22. from service.auto_deal_pics.upload_pic import UploadPic
  23. from service.OnePicTest import OnePicTest
  24. import hashlib
  25. # from service.AutoDealPics import AutoDealPics
  26. # for plugin in settings.plugins:
  27. # module_path, class_name = plugin.rsplit(".", 1)
  28. # print("module_path", module_path, class_name)
  29. # module = importlib.import_module(module_path)
  30. # getattr(module, class_name)
  31. def calculate_md5(filepath):
  32. # 打开文件,以二进制只读模式打开
  33. with open(filepath, "rb") as f:
  34. # 创建MD5哈希对象
  35. md5hash = hashlib.md5()
  36. # 循环读取文件的内容并更新哈希对象
  37. for chunk in iter(lambda: f.read(4096), b""):
  38. md5hash.update(chunk)
  39. # 返回MD5哈希的十六进制表示
  40. return md5hash.hexdigest()
  41. @app.get("/")
  42. async def index():
  43. # await socket_manager.send_message(msg="测试")
  44. return {"message": "Hello World"}
  45. @app.get("/send_test")
  46. async def index():
  47. data = {"data1": 1, "data2": 2, "data3": 3, "data4": 4}
  48. await socket_manager.send_message(msg="测试", data=data)
  49. return {"message": "Hello World"}
  50. @app.get("/scan_serials", description="扫描可用的设备端口")
  51. async def scanSerials():
  52. """扫描串口"""
  53. ports = list_ports.comports()
  54. print("Scanning", ports)
  55. return {"message": "Hello World"}
  56. @app.get("/test_conndevice")
  57. def test_conndevice():
  58. device_control = DeviceControl()
  59. p_list = []
  60. temp_ports_dict = {}
  61. # while 1:
  62. time.sleep(1)
  63. ports_dict = device_control.scan_serial_port()
  64. temp_ports_dict = ports_dict
  65. if not ports_dict:
  66. # 全部清空 移除所有串口
  67. if p_list:
  68. _p = p_list.pop()
  69. device_control.remove_port(_p)
  70. # continue
  71. if ports_dict:
  72. # print(plist)
  73. for index, _i in enumerate(p_list):
  74. if _i not in ports_dict:
  75. _p = p_list.pop(index)
  76. device_control.remove_port(_p)
  77. for _port_name, _port_value in ports_dict.items():
  78. if _port_name not in p_list:
  79. try:
  80. p_list.append(_port_name)
  81. device_control.add_port_by_linkage(_port_name)
  82. except BaseException as e:
  83. print(e.__traceback__.tb_frame.f_globals["__file__"]) # 发生异常所在的文件
  84. print(e.__traceback__.tb_lineno) # 发生异常所在的行数
  85. print("串口不存在{} {}".format(_port_name, e))
  86. # threading.Thread(target=self.add_port, args=(_port_name, _port_value)).start()
  87. # self.add_port(_p)
  88. @app.api_route("/forward_request", methods=["GET", "POST"], description="代理转发hlm项目得请求")
  89. async def forwardRequest(request: HlmForwardRequest):
  90. """
  91. 转发HTTP请求到目标URL
  92. :param request: FastAPI Request对象
  93. :return: 目标接口的响应
  94. """
  95. try:
  96. if request.method == "GET":
  97. params = request.query_params
  98. elif request.method == "POST":
  99. params = json.dump(request.query_params)
  100. else:
  101. raise UnicornException("仅支持GET和POST方法")
  102. target_url = request.target_url
  103. method = request.method.upper()
  104. headers = request.headers
  105. if not target_url:
  106. raise UnicornException("目标url地址不能为空")
  107. # 调用 hlm_http_request 中的 forward_request 函数
  108. response = forward_request(target_url, params=params, method=method, headers=headers)
  109. return response
  110. except requests.RequestException as e:
  111. raise UnicornException(e)
  112. except Exception as e:
  113. raise UnicornException(e)
  114. def fromExcelHandler(params: HandlerDetail):
  115. excel_path = params.excel_path
  116. excel_df = pd.read_excel(excel_path, sheet_name=0, header=0)
  117. handler_result = []
  118. handler_result_folder = ""
  119. if "文件夹名称" not in excel_df.columns:
  120. raise UnicornException("缺失 [文件夹名称] 列")
  121. if "商品货号" not in excel_df.columns:
  122. raise UnicornException("缺失 [商品货号] 列")
  123. for index, row in excel_df.iterrows():
  124. goods_art_no_image_dir = str(row["文件夹名称"])
  125. goods_art_no = str(row["商品货号"])
  126. try:
  127. if not goods_art_no:
  128. raise UnicornException("货号不能为空")
  129. token = "Bearer " + params.token
  130. session = SqlQuery()
  131. pr = CRUD(PhotoRecord)
  132. images = pr.read_all(session, conditions={"goods_art_no": goods_art_no})
  133. if not images:
  134. raise UnicornException("没有可用货号数据")
  135. image_dir = "{}/data/".format(os.getcwd()).replace("\\", "/")
  136. check_path(image_dir)
  137. for itemImg in images:
  138. if not os.path.exists(image_dir + "/" + os.path.basename(itemImg.image_path)):
  139. shutil.copy(itemImg.image_path, image_dir)
  140. dealImage = DealImage(image_dir)
  141. resFlag, path = dealImage.dealMoveImage(
  142. image_dir=image_dir,
  143. callback_func=None,
  144. goods_art_no=goods_art_no_image_dir,
  145. )
  146. if not resFlag:
  147. raise UnicornException(path)
  148. temp_class = {}
  149. temp_name_list = []
  150. for tempItem in params.temp_list:
  151. temp_class[tempItem.template_id] = tempItem.template_local_classes
  152. temp_name_list.append(tempItem.template_id)
  153. # if goods_art_no_image_dir not in path:
  154. # path = path + "/" + goods_art_no_image_dir
  155. config_data = {
  156. "image_dir": path,
  157. "image_order": params.template_image_order,
  158. "goods_art_no": goods_art_no,
  159. "is_check_number": False,
  160. "resize_image_view": "后跟",
  161. "cutout_mode": settings.CUTOUT_MODE,
  162. "logo_path": params.logo_path,
  163. "special_goods_art_no_folder_line": "",
  164. "is_use_excel": (False if params.excel_path == "" else True), # 是否使用excel
  165. "excel_path": params.excel_path, # excel路径
  166. "is_check_color_is_all": False,
  167. "cutout_is_pass": True,
  168. "assigned_page_dict": {},
  169. "detail_is_pass": True,
  170. "upload_is_pass": False,
  171. "upload_is_enable": False,
  172. "is_filter": False,
  173. "temp_class": temp_class,
  174. "temp_name": params.temp_name,
  175. "temp_name_list": temp_name_list,
  176. "target_error_folder": f"{path}/软件-生成详情错误",
  177. }
  178. # 动态导入类
  179. temp_class_dict = {}
  180. for key, class_path in config_data["temp_class"].items():
  181. module_path, class_name = class_path.rsplit(".", 1)
  182. module = importlib.import_module(module_path)
  183. cls = getattr(module, class_name)
  184. temp_class_dict[key] = cls
  185. config_data["temp_class"] = temp_class_dict
  186. obj = None
  187. run_main = RunMain(obj, token)
  188. return_data = run_main.check_before_cutout(config_data)
  189. cutout_res = run_main.check_for_cutout_image_first_call_back(return_data)
  190. check_for_detail_first_res = None
  191. if cutout_res == True:
  192. return_data_check_before_detail = run_main.check_before_detail(config_data)
  193. print(
  194. "return_data_check_before_detail======> 测试 ==>",
  195. return_data_check_before_detail,
  196. )
  197. check_for_detail_first_res = run_main.check_for_detail_first_call_back(
  198. return_data_check_before_detail
  199. )
  200. if isinstance(check_for_detail_first_res, partial):
  201. result = check_for_detail_first_res()
  202. try:
  203. config_data = result["config_data"]
  204. except:
  205. config_data = result
  206. if config_data["sign_text"] == "已结束详情处理":
  207. # at_pic = AutoDealPics()
  208. print("config_data", config_data)
  209. if config_data["upload_is_enable"]:
  210. to_deal_dir = "{}/软件-详情图生成".format(config_data["image_dir"])
  211. check_path(to_deal_dir)
  212. print("to_deal_dir", to_deal_dir)
  213. if os.path.exists(to_deal_dir):
  214. upload_pic = UploadPic(
  215. windows=None,
  216. to_deal_dir=to_deal_dir,
  217. config_data=config_data,
  218. token=token,
  219. )
  220. upload_pic.run()
  221. out_put_dir = config_data["out_put_dir"]
  222. out_put_dir_path = "{}/{}".format(os.getcwd(), out_put_dir).replace("\\", "/")
  223. handler_result_folder = os.path.dirname(out_put_dir_path)
  224. handler_result.append({"goods_art_no": goods_art_no, "success": True, "info": "处理成功"})
  225. else:
  226. handler_result.append({"goods_art_no": goods_art_no, "success": False, "info": "处理失败"})
  227. except Exception as e:
  228. handler_result.append({"goods_art_no": goods_art_no, "success": False, "info": str(e)})
  229. handler_result_folder = "/".join(handler_result_folder.split("/")[:-1])
  230. return {
  231. "code": 0,
  232. "msg": "",
  233. "data": {"output_folder": handler_result_folder, "list": handler_result},
  234. }
  235. @app.post("/handle_detail")
  236. async def handle_detail(request: Request, params: HandlerDetail):
  237. goods_art_no_array = params.goods_art_no
  238. handler_result = []
  239. handler_result_folder = ""
  240. if params.excel_path != "" and params.excel_path != None:
  241. return fromExcelHandler(params)
  242. for goods_art_no in goods_art_no_array:
  243. try:
  244. if not goods_art_no:
  245. raise UnicornException("货号不能为空")
  246. token = "Bearer " + params.token
  247. session = SqlQuery()
  248. pr = CRUD(PhotoRecord)
  249. images = pr.read_all(session, conditions={"goods_art_no": goods_art_no})
  250. if not images:
  251. raise UnicornException("没有可用货号数据")
  252. image_dir = "{}/data/".format(os.getcwd()).replace("\\", "/")
  253. check_path(image_dir)
  254. for itemImg in images:
  255. if not os.path.exists(image_dir + "/" + os.path.basename(itemImg.image_path)):
  256. shutil.copy(itemImg.image_path, image_dir)
  257. dealImage = DealImage(image_dir)
  258. resFlag, path = dealImage.dealMoveImage(
  259. image_dir=image_dir, callback_func=None, goods_art_no=goods_art_no
  260. )
  261. if not resFlag:
  262. raise UnicornException(path)
  263. temp_class = {}
  264. temp_name_list = []
  265. for tempItem in params.temp_list:
  266. temp_class[tempItem.template_id] = tempItem.template_local_classes
  267. temp_name_list.append(tempItem.template_id)
  268. config_data = {
  269. "image_dir": path,
  270. "image_order": params.template_image_order,
  271. "goods_art_no": goods_art_no,
  272. "is_check_number": False,
  273. "resize_image_view": "后跟",
  274. "cutout_mode": settings.CUTOUT_MODE,
  275. "logo_path": params.logo_path,
  276. "special_goods_art_no_folder_line": "",
  277. "is_use_excel": (False if params.excel_path == "" else True), # 是否使用excel
  278. "excel_path": params.excel_path, # excel路径
  279. "is_check_color_is_all": False,
  280. "cutout_is_pass": True,
  281. "assigned_page_dict": {},
  282. "detail_is_pass": True,
  283. "upload_is_pass": False,
  284. "upload_is_enable": False,
  285. "is_filter": False,
  286. "temp_class": temp_class,
  287. "temp_name": params.temp_name,
  288. "temp_name_list": temp_name_list,
  289. "target_error_folder": f"{path}/软件-生成详情错误",
  290. }
  291. # 动态导入类
  292. temp_class_dict = {}
  293. for key, class_path in config_data["temp_class"].items():
  294. module_path, class_name = class_path.rsplit(".", 1)
  295. module = importlib.import_module(module_path)
  296. cls = getattr(module, class_name)
  297. temp_class_dict[key] = cls
  298. config_data["temp_class"] = temp_class_dict
  299. obj = None
  300. run_main = RunMain(obj, token)
  301. return_data = run_main.check_before_cutout(config_data)
  302. cutout_res = run_main.check_for_cutout_image_first_call_back(return_data)
  303. check_for_detail_first_res = None
  304. if cutout_res == True:
  305. return_data_check_before_detail = run_main.check_before_detail(config_data)
  306. print(
  307. "return_data_check_before_detail======> 测试 ==>",
  308. return_data_check_before_detail,
  309. )
  310. check_for_detail_first_res = run_main.check_for_detail_first_call_back(
  311. return_data_check_before_detail
  312. )
  313. if isinstance(check_for_detail_first_res, partial):
  314. result = check_for_detail_first_res()
  315. try:
  316. config_data = result["config_data"]
  317. except:
  318. config_data = result
  319. if config_data["sign_text"] == "已结束详情处理":
  320. # at_pic = AutoDealPics()
  321. print("config_data", config_data)
  322. if config_data["upload_is_enable"]:
  323. to_deal_dir = "{}/软件-详情图生成".format(config_data["image_dir"])
  324. check_path(to_deal_dir)
  325. print("to_deal_dir", to_deal_dir)
  326. if os.path.exists(to_deal_dir):
  327. upload_pic = UploadPic(
  328. windows=None,
  329. to_deal_dir=to_deal_dir,
  330. config_data=config_data,
  331. token=token,
  332. )
  333. upload_pic.run()
  334. out_put_dir = config_data["out_put_dir"]
  335. out_put_dir_path = "{}/{}".format(os.getcwd(), out_put_dir).replace("\\", "/")
  336. handler_result_folder = os.path.dirname(out_put_dir_path)
  337. handler_result.append({"goods_art_no": goods_art_no, "success": True, "info": "处理成功"})
  338. else:
  339. handler_result.append({"goods_art_no": goods_art_no, "success": False, "info": "处理失败"})
  340. except Exception as e:
  341. handler_result.append({"goods_art_no": goods_art_no, "success": False, "info": str(e)})
  342. return {
  343. "code": 0,
  344. "msg": "",
  345. "data": {"output_folder": handler_result_folder, "list": handler_result},
  346. }
  347. @app.post("/get_device_configs", description="获取可执行程序命令列表")
  348. def get_device_configs(params: ModelGetDeviceConfig):
  349. mode_type = params.mode_type
  350. session = SqlQuery()
  351. configModel = CRUD(DeviceConfig)
  352. configList = configModel.read_all(
  353. session,
  354. conditions={"mode_type": mode_type},
  355. order_by="action_index",
  356. ascending=True,
  357. )
  358. return {
  359. "code": 0,
  360. "msg": "",
  361. "data": {"list": configList},
  362. }
  363. @app.post("/device_config_detail", description="获取可执行程序详情")
  364. def get_device_configs(params: ModelGetDeviceConfigDetail):
  365. action_id = params.id
  366. session = SqlQuery()
  367. configModel = CRUD(DeviceConfig)
  368. model = configModel.read(session, conditions={"id": action_id})
  369. if model == None:
  370. return {"code": 1, "msg": "数据不存在", "data": None}
  371. return {"code": 0, "msg": "", "data": model}
  372. @app.post("/device_config_detail_query", description="通过条件获取可执行程序详情")
  373. def get_device_configs(params: ModelGetDeviceConfigDetailQuery):
  374. mode_type = params.mode_type
  375. action_name = params.action_name
  376. session = SqlQuery()
  377. configModel = CRUD(DeviceConfig)
  378. model = configModel.read(session, conditions={"mode_type": mode_type, "action_name": action_name})
  379. if model == None:
  380. return {"code": 1, "msg": "数据不存在", "data": None}
  381. return {"code": 0, "msg": "", "data": model}
  382. @app.post("/remove_config", description="删除一条可执行命令")
  383. def get_device_configs(params: ModelGetDeviceConfigDetail):
  384. action_id = params.id
  385. session = SqlQuery()
  386. configModel = CRUD(DeviceConfig)
  387. model = configModel.read(session, conditions={"id": action_id})
  388. if model == None:
  389. return {"code": 1, "msg": "数据不存在", "data": None}
  390. if model.is_system == True:
  391. return {"code": 1, "msg": "系统配置不允许删除", "data": None}
  392. configModel.delete(session, obj_id=action_id)
  393. return {"code": 0, "msg": "删除成功", "data": None}
  394. @app.post("/save_device_config", description="创建或修改一条可执行命令")
  395. def save_device_config(params: SaveDeviceConfig):
  396. action_id = params.id
  397. session = SqlQuery()
  398. deviceConfig = CRUD(DeviceConfig)
  399. if action_id == None or action_id == 0:
  400. # 走新增逻辑
  401. params.id = None
  402. save_device_config = deviceConfig.create(session, obj_in=params)
  403. else:
  404. model = deviceConfig.read(session, conditions={"id": action_id})
  405. if model == None:
  406. return {"code": 1, "msg": "数据不存在", "data": None}
  407. # 走编辑逻辑
  408. kwargs = params.__dict__
  409. save_device_config = deviceConfig.update(session, obj_id=action_id, **kwargs)
  410. return {"code": 0, "msg": "操作成功", "data": save_device_config}
  411. @app.post("/reset_config", description="创建或修改一条可执行命令")
  412. def reset_config(params: ModelGetDeviceConfig):
  413. mode_type = params.mode_type
  414. if mode_type == None or mode_type == "":
  415. return {"code": 1, "msg": "参数错误", "data": None}
  416. session = SqlQuery()
  417. deviceConfig = CRUD(DeviceConfig)
  418. res = deviceConfig.deleteConditions(session, conditions={"mode_type": mode_type})
  419. if res is False:
  420. return {"code": 1, "msg": "操作失败", "data": None}
  421. actions = json.load(open("action.json", encoding="utf-8"))
  422. act = []
  423. for item in actions:
  424. if item.get("mode_type") == mode_type:
  425. act.append(item)
  426. batch_insert_device_configs(session, act)
  427. return {"code": 0, "msg": "操作成功", "data": None}
  428. @app.get("/get_photo_records", description="获取拍照记录")
  429. def get_photo_records(page: int = 1, size: int = 5):
  430. session = SqlQuery()
  431. photos = CRUD(PhotoRecord)
  432. print("准备查询拍摄记录", datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
  433. statement = (
  434. select(PhotoRecord)
  435. .offset((page - 1) * size)
  436. .limit(size)
  437. .order_by(desc("id"))
  438. .group_by("goods_art_no")
  439. )
  440. list = []
  441. result = session.exec(statement).all()
  442. print("group 完成 ", datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
  443. for item in result:
  444. list_item = photos.read_all(session, conditions={"goods_art_no": item.goods_art_no})
  445. list.append(
  446. {
  447. "goods_art_no": item.goods_art_no,
  448. "action_time": item.create_time,
  449. "items": list_item,
  450. }
  451. )
  452. session.close()
  453. print("循环查询 完成 ", datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
  454. return {
  455. "code": 0,
  456. "msg": "",
  457. "data": {"list": list, "page": page, "size": size},
  458. }
  459. @app.get("/get_last_photo_record", description="获取最后一条拍照记录")
  460. def get_last_photo_record():
  461. session = SqlQuery()
  462. statement = select(PhotoRecord).where(PhotoRecord.image_path != None).order_by(desc("photo_create_time"))
  463. result = session.exec(statement).first()
  464. session.close()
  465. return {
  466. "code": 0,
  467. "msg": "",
  468. "data": result,
  469. }
  470. @app.get("/get_photo_record_detail", description="通过货号获取拍照记录详情")
  471. def get_photo_records(goods_art_no: str = None):
  472. if goods_art_no == None:
  473. return {"code": 1, "msg": "参数错误", "data": None}
  474. session = SqlQuery()
  475. photos = CRUD(PhotoRecord)
  476. items = photos.read_all(session, conditions={"goods_art_no": goods_art_no})
  477. session.close()
  478. return {
  479. "code": 0,
  480. "msg": "",
  481. "data": {"list": items},
  482. }
  483. @app.post("/delect_goods_arts", description="通过货号删除记录")
  484. def delect_goods_arts(params: PhotoRecordDelete):
  485. session = SqlQuery()
  486. photos = CRUD(PhotoRecord)
  487. for item in params.goods_art_nos:
  488. photos.deleteConditions(session, conditions={"goods_art_no": item})
  489. session.close()
  490. return {
  491. "code": 0,
  492. "msg": "操作成功",
  493. "data": None,
  494. }
  495. @app.get("/get_sys_config", description="查询系统配置")
  496. def get_sys_config(key: str = None):
  497. if key == None:
  498. return {"code": 1, "msg": "参数错误", "data": None}
  499. session = SqlQuery()
  500. photos = CRUD(SysConfigs)
  501. item = photos.read(session, conditions={"key": key})
  502. session.close()
  503. return {
  504. "code": 0,
  505. "msg": "",
  506. "data": json.loads(item.value),
  507. }
  508. @app.post("/update_sys_configs", description="创建或修改系统配置")
  509. def save_sys_configs(params: SysConfigParams):
  510. session = SqlQuery()
  511. sysConfig = CRUD(SysConfigs)
  512. model = sysConfig.read(session, conditions={"key": params.key})
  513. if model == None:
  514. return {"code": 1, "msg": "配置不存在", "data": None}
  515. # 走编辑逻辑
  516. kwargs = params.__dict__
  517. save_device_config = sysConfig.updateConditions(session, conditions={"key": params.key}, **kwargs)
  518. return {"code": 0, "msg": "操作成功", "data": save_device_config}
  519. @app.post("/create_main_image", description="创建主图测试")
  520. def create_main_image(params: MaineImageTest):
  521. file_path = params.file_path
  522. onePic = OnePicTest(pic_path=file_path)
  523. # session = SqlQuery()
  524. # sysConfig = CRUD(SysConfigs)
  525. # model = sysConfig.read(session, conditions={"key": params.key})
  526. # if model == None:
  527. # return {"code": 1, "msg": "配置不存在", "data": None}
  528. # # 走编辑逻辑
  529. # kwargs = params.__dict__
  530. # save_device_config = sysConfig.updateConditions(
  531. # session, conditions={"key": params.key}, **kwargs
  532. # )
  533. main_out_path = onePic.HandlerMainImage()
  534. return {"code": 0, "msg": "操作成功", "data": {"main_out_path": main_out_path}}
  535. def insertEmptyLogoList(session):
  536. """插入空logo列表"""
  537. data = {"key": "logo_configs", "value": "[]"}
  538. config = SysConfigs(**data)
  539. session.add(config)
  540. session.commit()
  541. session.close()
  542. item = SysConfigs()
  543. item.key = "logo_configs"
  544. item.value = "[]"
  545. return item
  546. @app.get("/logo_list", description="logo列表")
  547. def logo_list():
  548. logo_dir = "{}/data/logo/".format(os.getcwd()).replace("\\", "/")
  549. check_path(logo_dir)
  550. logo_files = os.listdir(logo_dir)
  551. logo_list = []
  552. for logoItem in logo_files:
  553. logo_list.append(f"{logo_dir}{logoItem}")
  554. return {"code": 0, "msg": "操作成功", "data": logo_list}
  555. @app.post("/add_logo", description="添加logo")
  556. def add_logo(params: LogoParams):
  557. logo_path = params.logo_path.replace("\\", "/")
  558. session = SqlQuery()
  559. sysConfig = CRUD(SysConfigs)
  560. item = sysConfig.read(session, conditions={"key": "logo_configs"})
  561. if item == None:
  562. item = insertEmptyLogoList(session)
  563. if os.path.isfile(logo_path) == False:
  564. return {"code": 1, "msg": "logo文件不存在", "data": None}
  565. logo_dir = "{}/data/logo/".format(os.getcwd()).replace("\\", "/")
  566. check_path(logo_dir)
  567. fpath, fname = os.path.split(logo_path)
  568. logo_path_info = logo_dir + fname
  569. shutil.copy(logo_path, logo_path_info) # 复制文件
  570. logo_files = os.listdir(logo_dir)
  571. logo_list = []
  572. for logoItem in logo_files:
  573. logo_list.append(f"{logo_dir}{logoItem}")
  574. return {
  575. "code": 0,
  576. "msg": "",
  577. "data": {"logo": logo_path_info},
  578. }
  579. @app.post("/delete_logo", description="删除logo")
  580. def delete_logo(params: LogoParamsupdate):
  581. logo_path = params.path
  582. if os.path.isfile(logo_path) == False:
  583. return {"code": 1, "msg": "logo文件不存在", "data": None}
  584. os.remove(logo_path)
  585. logo_dir = "{}/data/logo/".format(os.getcwd()).replace("\\", "/")
  586. check_path(logo_dir)
  587. logo_files = os.listdir(logo_dir)
  588. logo_list = []
  589. for logoItem in logo_files:
  590. logo_list.append(f"{logo_dir}{logoItem}")
  591. return {"code": 0, "msg": "操作成功", "data": logo_list}