| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650 |
- from natsort.natsort import order_by_index
- from sqlalchemy import func
- from models import *
- import requests
- import json
- from logger import logger
- from serial.tools import list_ports
- from model import PhotoRecord
- import settings, datetime
- import pandas as pd
- from utils.hlm_http_request import forward_request
- from utils.utils_func import check_path
- from sockets.socket_client import socket_manager
- from mcu.DeviceControl import DeviceControl
- import time, shutil, os
- from sqlalchemy import and_, asc, desc
- from functools import partial
- from service.deal_image import DealImage
- from databases import DeviceConfig, SysConfigs, SqlQuery, CRUD, select
- from service.run_main import RunMain
- import importlib
- from service.auto_deal_pics.upload_pic import UploadPic
- from service.OnePicTest import OnePicTest
- import hashlib
- # from service.AutoDealPics import AutoDealPics
- # for plugin in settings.plugins:
- # module_path, class_name = plugin.rsplit(".", 1)
- # print("module_path", module_path, class_name)
- # module = importlib.import_module(module_path)
- # getattr(module, class_name)
- def calculate_md5(filepath):
- # 打开文件,以二进制只读模式打开
- with open(filepath, "rb") as f:
- # 创建MD5哈希对象
- md5hash = hashlib.md5()
- # 循环读取文件的内容并更新哈希对象
- for chunk in iter(lambda: f.read(4096), b""):
- md5hash.update(chunk)
- # 返回MD5哈希的十六进制表示
- return md5hash.hexdigest()
- @app.get("/")
- async def index():
- # await socket_manager.send_message(msg="测试")
- return {"message": "Hello World"}
- @app.get("/send_test")
- async def index():
- data = {"data1": 1, "data2": 2, "data3": 3, "data4": 4}
- await socket_manager.send_message(msg="测试", data=data)
- return {"message": "Hello World"}
- @app.get("/scan_serials", description="扫描可用的设备端口")
- async def scanSerials():
- """扫描串口"""
- ports = list_ports.comports()
- print("Scanning", ports)
- return {"message": "Hello World"}
- @app.get("/test_conndevice")
- def test_conndevice():
- device_control = DeviceControl()
- p_list = []
- temp_ports_dict = {}
- # while 1:
- time.sleep(1)
- ports_dict = device_control.scan_serial_port()
- temp_ports_dict = ports_dict
- if not ports_dict:
- # 全部清空 移除所有串口
- if p_list:
- _p = p_list.pop()
- device_control.remove_port(_p)
- # continue
- if ports_dict:
- # print(plist)
- for index, _i in enumerate(p_list):
- if _i not in ports_dict:
- _p = p_list.pop(index)
- device_control.remove_port(_p)
- for _port_name, _port_value in ports_dict.items():
- if _port_name not in p_list:
- try:
- p_list.append(_port_name)
- device_control.add_port_by_linkage(_port_name)
- except BaseException as e:
- print(e.__traceback__.tb_frame.f_globals["__file__"]) # 发生异常所在的文件
- print(e.__traceback__.tb_lineno) # 发生异常所在的行数
- print("串口不存在{} {}".format(_port_name, e))
- # threading.Thread(target=self.add_port, args=(_port_name, _port_value)).start()
- # self.add_port(_p)
- @app.api_route("/forward_request", methods=["GET", "POST"], description="代理转发hlm项目得请求")
- async def forwardRequest(request: HlmForwardRequest):
- """
- 转发HTTP请求到目标URL
- :param request: FastAPI Request对象
- :return: 目标接口的响应
- """
- try:
- if request.method == "GET":
- params = request.query_params
- elif request.method == "POST":
- params = json.dump(request.query_params)
- else:
- raise UnicornException("仅支持GET和POST方法")
- target_url = request.target_url
- method = request.method.upper()
- headers = request.headers
- if not target_url:
- raise UnicornException("目标url地址不能为空")
- # 调用 hlm_http_request 中的 forward_request 函数
- response = forward_request(target_url, params=params, method=method, headers=headers)
- return response
- except requests.RequestException as e:
- raise UnicornException(e)
- except Exception as e:
- raise UnicornException(e)
- def fromExcelHandler(params: HandlerDetail):
- excel_path = params.excel_path
- excel_df = pd.read_excel(excel_path, sheet_name=0, header=0)
- handler_result = []
- handler_result_folder = ""
- if "文件夹名称" not in excel_df.columns:
- raise UnicornException("缺失 [文件夹名称] 列")
- if "商品货号" not in excel_df.columns:
- raise UnicornException("缺失 [商品货号] 列")
- for index, row in excel_df.iterrows():
- goods_art_no_image_dir = str(row["文件夹名称"])
- goods_art_no = str(row["商品货号"])
- try:
- if not goods_art_no:
- raise UnicornException("货号不能为空")
- token = "Bearer " + params.token
- session = SqlQuery()
- pr = CRUD(PhotoRecord)
- images = pr.read_all(session, conditions={"goods_art_no": goods_art_no})
- if not images:
- raise UnicornException("没有可用货号数据")
- image_dir = "{}/data/".format(os.getcwd()).replace("\\", "/")
- check_path(image_dir)
- for itemImg in images:
- if not os.path.exists(image_dir + "/" + os.path.basename(itemImg.image_path)):
- shutil.copy(itemImg.image_path, image_dir)
- dealImage = DealImage(image_dir)
- resFlag, path = dealImage.dealMoveImage(
- image_dir=image_dir,
- callback_func=None,
- goods_art_no=goods_art_no_image_dir,
- )
- if not resFlag:
- raise UnicornException(path)
- temp_class = {}
- temp_name_list = []
- for tempItem in params.temp_list:
- temp_class[tempItem.template_id] = tempItem.template_local_classes
- temp_name_list.append(tempItem.template_id)
- # if goods_art_no_image_dir not in path:
- # path = path + "/" + goods_art_no_image_dir
- config_data = {
- "image_dir": path,
- "image_order": params.template_image_order,
- "goods_art_no": goods_art_no,
- "is_check_number": False,
- "resize_image_view": "后跟",
- "cutout_mode": settings.CUTOUT_MODE,
- "logo_path": params.logo_path,
- "special_goods_art_no_folder_line": "",
- "is_use_excel": (False if params.excel_path == "" else True), # 是否使用excel
- "excel_path": params.excel_path, # excel路径
- "is_check_color_is_all": False,
- "cutout_is_pass": True,
- "assigned_page_dict": {},
- "detail_is_pass": True,
- "upload_is_pass": False,
- "upload_is_enable": False,
- "is_filter": False,
- "temp_class": temp_class,
- "temp_name": params.temp_name,
- "temp_name_list": temp_name_list,
- "target_error_folder": f"{path}/软件-生成详情错误",
- }
- # 动态导入类
- temp_class_dict = {}
- for key, class_path in config_data["temp_class"].items():
- module_path, class_name = class_path.rsplit(".", 1)
- module = importlib.import_module(module_path)
- cls = getattr(module, class_name)
- temp_class_dict[key] = cls
- config_data["temp_class"] = temp_class_dict
- obj = None
- run_main = RunMain(obj, token)
- return_data = run_main.check_before_cutout(config_data)
- cutout_res = run_main.check_for_cutout_image_first_call_back(return_data)
- check_for_detail_first_res = None
- if cutout_res == True:
- return_data_check_before_detail = run_main.check_before_detail(config_data)
- print(
- "return_data_check_before_detail======> 测试 ==>",
- return_data_check_before_detail,
- )
- check_for_detail_first_res = run_main.check_for_detail_first_call_back(
- return_data_check_before_detail
- )
- if isinstance(check_for_detail_first_res, partial):
- result = check_for_detail_first_res()
- try:
- config_data = result["config_data"]
- except:
- config_data = result
- if config_data["sign_text"] == "已结束详情处理":
- # at_pic = AutoDealPics()
- print("config_data", config_data)
- if config_data["upload_is_enable"]:
- to_deal_dir = "{}/软件-详情图生成".format(config_data["image_dir"])
- check_path(to_deal_dir)
- print("to_deal_dir", to_deal_dir)
- if os.path.exists(to_deal_dir):
- upload_pic = UploadPic(
- windows=None,
- to_deal_dir=to_deal_dir,
- config_data=config_data,
- token=token,
- )
- upload_pic.run()
- out_put_dir = config_data["out_put_dir"]
- out_put_dir_path = "{}/{}".format(os.getcwd(), out_put_dir).replace("\\", "/")
- handler_result_folder = os.path.dirname(out_put_dir_path)
- handler_result.append({"goods_art_no": goods_art_no, "success": True, "info": "处理成功"})
- else:
- handler_result.append({"goods_art_no": goods_art_no, "success": False, "info": "处理失败"})
- except Exception as e:
- handler_result.append({"goods_art_no": goods_art_no, "success": False, "info": str(e)})
- handler_result_folder = "/".join(handler_result_folder.split("/")[:-1])
- return {
- "code": 0,
- "msg": "",
- "data": {"output_folder": handler_result_folder, "list": handler_result},
- }
- @app.post("/handle_detail")
- async def handle_detail(request: Request, params: HandlerDetail):
- goods_art_no_array = params.goods_art_no
- handler_result = []
- handler_result_folder = ""
- if params.excel_path != "" and params.excel_path != None:
- return fromExcelHandler(params)
- for goods_art_no in goods_art_no_array:
- try:
- if not goods_art_no:
- raise UnicornException("货号不能为空")
- token = "Bearer " + params.token
- session = SqlQuery()
- pr = CRUD(PhotoRecord)
- images = pr.read_all(session, conditions={"goods_art_no": goods_art_no})
- if not images:
- raise UnicornException("没有可用货号数据")
- image_dir = "{}/data/".format(os.getcwd()).replace("\\", "/")
- check_path(image_dir)
- for itemImg in images:
- if not os.path.exists(image_dir + "/" + os.path.basename(itemImg.image_path)):
- shutil.copy(itemImg.image_path, image_dir)
- dealImage = DealImage(image_dir)
- resFlag, path = dealImage.dealMoveImage(
- image_dir=image_dir, callback_func=None, goods_art_no=goods_art_no
- )
- if not resFlag:
- raise UnicornException(path)
- temp_class = {}
- temp_name_list = []
- for tempItem in params.temp_list:
- temp_class[tempItem.template_id] = tempItem.template_local_classes
- temp_name_list.append(tempItem.template_id)
- config_data = {
- "image_dir": path,
- "image_order": params.template_image_order,
- "goods_art_no": goods_art_no,
- "is_check_number": False,
- "resize_image_view": "后跟",
- "cutout_mode": settings.CUTOUT_MODE,
- "logo_path": params.logo_path,
- "special_goods_art_no_folder_line": "",
- "is_use_excel": (False if params.excel_path == "" else True), # 是否使用excel
- "excel_path": params.excel_path, # excel路径
- "is_check_color_is_all": False,
- "cutout_is_pass": True,
- "assigned_page_dict": {},
- "detail_is_pass": True,
- "upload_is_pass": False,
- "upload_is_enable": False,
- "is_filter": False,
- "temp_class": temp_class,
- "temp_name": params.temp_name,
- "temp_name_list": temp_name_list,
- "target_error_folder": f"{path}/软件-生成详情错误",
- }
- # 动态导入类
- temp_class_dict = {}
- for key, class_path in config_data["temp_class"].items():
- module_path, class_name = class_path.rsplit(".", 1)
- module = importlib.import_module(module_path)
- cls = getattr(module, class_name)
- temp_class_dict[key] = cls
- config_data["temp_class"] = temp_class_dict
- obj = None
- run_main = RunMain(obj, token)
- return_data = run_main.check_before_cutout(config_data)
- cutout_res = run_main.check_for_cutout_image_first_call_back(return_data)
- check_for_detail_first_res = None
- if cutout_res == True:
- return_data_check_before_detail = run_main.check_before_detail(config_data)
- print(
- "return_data_check_before_detail======> 测试 ==>",
- return_data_check_before_detail,
- )
- check_for_detail_first_res = run_main.check_for_detail_first_call_back(
- return_data_check_before_detail
- )
- if isinstance(check_for_detail_first_res, partial):
- result = check_for_detail_first_res()
- try:
- config_data = result["config_data"]
- except:
- config_data = result
- if config_data["sign_text"] == "已结束详情处理":
- # at_pic = AutoDealPics()
- print("config_data", config_data)
- if config_data["upload_is_enable"]:
- to_deal_dir = "{}/软件-详情图生成".format(config_data["image_dir"])
- check_path(to_deal_dir)
- print("to_deal_dir", to_deal_dir)
- if os.path.exists(to_deal_dir):
- upload_pic = UploadPic(
- windows=None,
- to_deal_dir=to_deal_dir,
- config_data=config_data,
- token=token,
- )
- upload_pic.run()
- out_put_dir = config_data["out_put_dir"]
- out_put_dir_path = "{}/{}".format(os.getcwd(), out_put_dir).replace("\\", "/")
- handler_result_folder = os.path.dirname(out_put_dir_path)
- handler_result.append({"goods_art_no": goods_art_no, "success": True, "info": "处理成功"})
- else:
- handler_result.append({"goods_art_no": goods_art_no, "success": False, "info": "处理失败"})
- except Exception as e:
- handler_result.append({"goods_art_no": goods_art_no, "success": False, "info": str(e)})
- return {
- "code": 0,
- "msg": "",
- "data": {"output_folder": handler_result_folder, "list": handler_result},
- }
- @app.post("/get_device_configs", description="获取可执行程序命令列表")
- def get_device_configs(params: ModelGetDeviceConfig):
- mode_type = params.mode_type
- session = SqlQuery()
- configModel = CRUD(DeviceConfig)
- configList = configModel.read_all(
- session,
- conditions={"mode_type": mode_type},
- order_by="action_index",
- ascending=True,
- )
- return {
- "code": 0,
- "msg": "",
- "data": {"list": configList},
- }
- @app.post("/device_config_detail", description="获取可执行程序详情")
- def get_device_configs(params: ModelGetDeviceConfigDetail):
- action_id = params.id
- session = SqlQuery()
- configModel = CRUD(DeviceConfig)
- model = configModel.read(session, conditions={"id": action_id})
- if model == None:
- return {"code": 1, "msg": "数据不存在", "data": None}
- return {"code": 0, "msg": "", "data": model}
- @app.post("/device_config_detail_query", description="通过条件获取可执行程序详情")
- def get_device_configs(params: ModelGetDeviceConfigDetailQuery):
- mode_type = params.mode_type
- action_name = params.action_name
- session = SqlQuery()
- configModel = CRUD(DeviceConfig)
- model = configModel.read(session, conditions={"mode_type": mode_type, "action_name": action_name})
- if model == None:
- return {"code": 1, "msg": "数据不存在", "data": None}
- return {"code": 0, "msg": "", "data": model}
- @app.post("/remove_config", description="删除一条可执行命令")
- def get_device_configs(params: ModelGetDeviceConfigDetail):
- action_id = params.id
- session = SqlQuery()
- configModel = CRUD(DeviceConfig)
- model = configModel.read(session, conditions={"id": action_id})
- if model == None:
- return {"code": 1, "msg": "数据不存在", "data": None}
- if model.is_system == True:
- return {"code": 1, "msg": "系统配置不允许删除", "data": None}
- configModel.delete(session, obj_id=action_id)
- return {"code": 0, "msg": "删除成功", "data": None}
- @app.post("/save_device_config", description="创建或修改一条可执行命令")
- def save_device_config(params: SaveDeviceConfig):
- action_id = params.id
- session = SqlQuery()
- deviceConfig = CRUD(DeviceConfig)
- if action_id == None or action_id == 0:
- # 走新增逻辑
- params.id = None
- save_device_config = deviceConfig.create(session, obj_in=params)
- else:
- model = deviceConfig.read(session, conditions={"id": action_id})
- if model == None:
- return {"code": 1, "msg": "数据不存在", "data": None}
- # 走编辑逻辑
- kwargs = params.__dict__
- save_device_config = deviceConfig.update(session, obj_id=action_id, **kwargs)
- return {"code": 0, "msg": "操作成功", "data": save_device_config}
- @app.post("/reset_config", description="创建或修改一条可执行命令")
- def reset_config(params: ModelGetDeviceConfig):
- mode_type = params.mode_type
- if mode_type == None or mode_type == "":
- return {"code": 1, "msg": "参数错误", "data": None}
- session = SqlQuery()
- deviceConfig = CRUD(DeviceConfig)
- res = deviceConfig.deleteConditions(session, conditions={"mode_type": mode_type})
- if res is False:
- return {"code": 1, "msg": "操作失败", "data": None}
- actions = json.load(open("action.json", encoding="utf-8"))
- act = []
- for item in actions:
- if item.get("mode_type") == mode_type:
- act.append(item)
- batch_insert_device_configs(session, act)
- return {"code": 0, "msg": "操作成功", "data": None}
- @app.get("/get_photo_records", description="获取拍照记录")
- def get_photo_records(page: int = 1, size: int = 5):
- session = SqlQuery()
- photos = CRUD(PhotoRecord)
- print("准备查询拍摄记录", datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
- statement = (
- select(PhotoRecord)
- .offset((page - 1) * size)
- .limit(size)
- .order_by(desc("id"))
- .group_by("goods_art_no")
- )
- list = []
- result = session.exec(statement).all()
- print("group 完成 ", datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
- for item in result:
- list_item = photos.read_all(session, conditions={"goods_art_no": item.goods_art_no})
- list.append(
- {
- "goods_art_no": item.goods_art_no,
- "action_time": item.create_time,
- "items": list_item,
- }
- )
- session.close()
- print("循环查询 完成 ", datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
- return {
- "code": 0,
- "msg": "",
- "data": {"list": list, "page": page, "size": size},
- }
- @app.get("/get_last_photo_record", description="获取最后一条拍照记录")
- def get_last_photo_record():
- session = SqlQuery()
- statement = select(PhotoRecord).where(PhotoRecord.image_path != None).order_by(desc("photo_create_time"))
- result = session.exec(statement).first()
- session.close()
- return {
- "code": 0,
- "msg": "",
- "data": result,
- }
- @app.get("/get_photo_record_detail", description="通过货号获取拍照记录详情")
- def get_photo_records(goods_art_no: str = None):
- if goods_art_no == None:
- return {"code": 1, "msg": "参数错误", "data": None}
- session = SqlQuery()
- photos = CRUD(PhotoRecord)
- items = photos.read_all(session, conditions={"goods_art_no": goods_art_no})
- session.close()
- return {
- "code": 0,
- "msg": "",
- "data": {"list": items},
- }
- @app.post("/delect_goods_arts", description="通过货号删除记录")
- def delect_goods_arts(params: PhotoRecordDelete):
- session = SqlQuery()
- photos = CRUD(PhotoRecord)
- for item in params.goods_art_nos:
- photos.deleteConditions(session, conditions={"goods_art_no": item})
- session.close()
- return {
- "code": 0,
- "msg": "操作成功",
- "data": None,
- }
- @app.get("/get_sys_config", description="查询系统配置")
- def get_sys_config(key: str = None):
- if key == None:
- return {"code": 1, "msg": "参数错误", "data": None}
- session = SqlQuery()
- photos = CRUD(SysConfigs)
- item = photos.read(session, conditions={"key": key})
- session.close()
- return {
- "code": 0,
- "msg": "",
- "data": json.loads(item.value),
- }
- @app.post("/update_sys_configs", description="创建或修改系统配置")
- def save_sys_configs(params: SysConfigParams):
- session = SqlQuery()
- sysConfig = CRUD(SysConfigs)
- model = sysConfig.read(session, conditions={"key": params.key})
- if model == None:
- return {"code": 1, "msg": "配置不存在", "data": None}
- # 走编辑逻辑
- kwargs = params.__dict__
- save_device_config = sysConfig.updateConditions(session, conditions={"key": params.key}, **kwargs)
- return {"code": 0, "msg": "操作成功", "data": save_device_config}
- @app.post("/create_main_image", description="创建主图测试")
- def create_main_image(params: MaineImageTest):
- file_path = params.file_path
- onePic = OnePicTest(pic_path=file_path)
- # session = SqlQuery()
- # sysConfig = CRUD(SysConfigs)
- # model = sysConfig.read(session, conditions={"key": params.key})
- # if model == None:
- # return {"code": 1, "msg": "配置不存在", "data": None}
- # # 走编辑逻辑
- # kwargs = params.__dict__
- # save_device_config = sysConfig.updateConditions(
- # session, conditions={"key": params.key}, **kwargs
- # )
- main_out_path = onePic.HandlerMainImage()
- return {"code": 0, "msg": "操作成功", "data": {"main_out_path": main_out_path}}
- def insertEmptyLogoList(session):
- """插入空logo列表"""
- data = {"key": "logo_configs", "value": "[]"}
- config = SysConfigs(**data)
- session.add(config)
- session.commit()
- session.close()
- item = SysConfigs()
- item.key = "logo_configs"
- item.value = "[]"
- return item
- @app.get("/logo_list", description="logo列表")
- def logo_list():
- logo_dir = "{}/data/logo/".format(os.getcwd()).replace("\\", "/")
- check_path(logo_dir)
- logo_files = os.listdir(logo_dir)
- logo_list = []
- for logoItem in logo_files:
- logo_list.append(f"{logo_dir}{logoItem}")
- return {"code": 0, "msg": "操作成功", "data": logo_list}
- @app.post("/add_logo", description="添加logo")
- def add_logo(params: LogoParams):
- logo_path = params.logo_path.replace("\\", "/")
- session = SqlQuery()
- sysConfig = CRUD(SysConfigs)
- item = sysConfig.read(session, conditions={"key": "logo_configs"})
- if item == None:
- item = insertEmptyLogoList(session)
- if os.path.isfile(logo_path) == False:
- return {"code": 1, "msg": "logo文件不存在", "data": None}
- logo_dir = "{}/data/logo/".format(os.getcwd()).replace("\\", "/")
- check_path(logo_dir)
- fpath, fname = os.path.split(logo_path)
- logo_path_info = logo_dir + fname
- shutil.copy(logo_path, logo_path_info) # 复制文件
- logo_files = os.listdir(logo_dir)
- logo_list = []
- for logoItem in logo_files:
- logo_list.append(f"{logo_dir}{logoItem}")
- return {
- "code": 0,
- "msg": "",
- "data": {"logo": logo_path_info},
- }
- @app.post("/delete_logo", description="删除logo")
- def delete_logo(params: LogoParamsupdate):
- logo_path = params.path
- if os.path.isfile(logo_path) == False:
- return {"code": 1, "msg": "logo文件不存在", "data": None}
- os.remove(logo_path)
- logo_dir = "{}/data/logo/".format(os.getcwd()).replace("\\", "/")
- check_path(logo_dir)
- logo_files = os.listdir(logo_dir)
- logo_list = []
- for logoItem in logo_files:
- logo_list.append(f"{logo_dir}{logoItem}")
- return {"code": 0, "msg": "操作成功", "data": logo_list}
|