from natsort.natsort import order_by_index from sqlalchemy import func from models import * import requests import json from logger import logger from serial.tools import list_ports from model import PhotoRecord import settings import pandas as pd from utils.hlm_http_request import forward_request from utils.utils_func import check_path from sockets.socket_client import socket_manager from mcu.DeviceControl import DeviceControl import time, shutil from sqlalchemy import and_, asc, desc from functools import partial from service.deal_image import DealImage from databases import DeviceConfig, SqlQuery, CRUD, select from service.run_main import RunMain import importlib from service.auto_deal_pics.upload_pic import UploadPic from service.OnePicTest import OnePicTest # from service.AutoDealPics import AutoDealPics # for plugin in settings.plugins: # module_path, class_name = plugin.rsplit(".", 1) # print("module_path", module_path, class_name) # module = importlib.import_module(module_path) # getattr(module, class_name) @app.get("/") async def index(): # await socket_manager.send_message(msg="测试") return {"message": "Hello World"} @app.get("/send_test") async def index(): data = {"data1": 1, "data2": 2, "data3": 3, "data4": 4} await socket_manager.send_message(msg="测试", data=data) return {"message": "Hello World"} @app.get("/scan_serials", description="扫描可用的设备端口") async def scanSerials(): """扫描串口""" ports = list_ports.comports() print("Scanning", ports) return {"message": "Hello World"} @app.get("/test_conndevice") def test_conndevice(): device_control = DeviceControl() p_list = [] temp_ports_dict = {} # while 1: time.sleep(1) ports_dict = device_control.scan_serial_port() temp_ports_dict = ports_dict if not ports_dict: # 全部清空 移除所有串口 if p_list: _p = p_list.pop() device_control.remove_port(_p) # continue if ports_dict: # print(plist) for index, _i in enumerate(p_list): if _i not in ports_dict: _p = p_list.pop(index) device_control.remove_port(_p) for _port_name, _port_value in ports_dict.items(): if _port_name not in p_list: try: p_list.append(_port_name) device_control.add_port_by_linkage(_port_name) except BaseException as e: print( e.__traceback__.tb_frame.f_globals["__file__"] ) # 发生异常所在的文件 print(e.__traceback__.tb_lineno) # 发生异常所在的行数 print("串口不存在{} {}".format(_port_name, e)) # threading.Thread(target=self.add_port, args=(_port_name, _port_value)).start() # self.add_port(_p) @app.api_route( "/forward_request", methods=["GET", "POST"], description="代理转发hlm项目得请求" ) async def forwardRequest(request: HlmForwardRequest): """ 转发HTTP请求到目标URL :param request: FastAPI Request对象 :return: 目标接口的响应 """ try: if request.method == "GET": params = request.query_params elif request.method == "POST": params = json.dump(request.query_params) else: raise UnicornException("仅支持GET和POST方法") target_url = request.target_url method = request.method.upper() headers = request.headers if not target_url: raise UnicornException("目标url地址不能为空") # 调用 hlm_http_request 中的 forward_request 函数 response = forward_request( target_url, params=params, method=method, headers=headers ) return response except requests.RequestException as e: raise UnicornException(e) except Exception as e: raise UnicornException(e) def fromExcelHandler(params: HandlerDetail): excel_path = params.excel_path excel_df = pd.read_excel(excel_path, sheet_name=0, header=0) handler_result = [] handler_result_folder = "" if "文件夹名称" not in excel_df.columns: raise UnicornException("缺失 [文件夹名称] 列") if "商品货号" not in excel_df.columns: raise UnicornException("缺失 [商品货号] 列") for index, row in excel_df.iterrows(): goods_art_no_image_dir = str(row["文件夹名称"]) goods_art_no = str(row["商品货号"]) try: if not goods_art_no: raise UnicornException("货号不能为空") token = "Bearer " + params.token session = SqlQuery() pr = CRUD(PhotoRecord) images = pr.read_all(session, conditions={"goods_art_no": goods_art_no}) if not images: raise UnicornException("没有可用货号数据") image_dir = "{}/data/".format(os.getcwd()).replace("\\", "/") check_path(image_dir) for itemImg in images: if not os.path.exists( image_dir + "/" + os.path.basename(itemImg.image_path) ): shutil.copy(itemImg.image_path, image_dir) dealImage = DealImage(image_dir) resFlag, path = dealImage.dealMoveImage( image_dir=image_dir, callback_func=None, goods_art_no=goods_art_no_image_dir, ) if not resFlag: raise UnicornException(path) temp_class = {} temp_name_list = [] for tempItem in params.temp_list: temp_class[tempItem.template_id] = tempItem.template_local_classes temp_name_list.append(tempItem.template_id) # if goods_art_no_image_dir not in path: # path = path + "/" + goods_art_no_image_dir config_data = { "image_dir": path, "image_order": params.template_image_order, "goods_art_no": goods_art_no, "is_check_number": False, "resize_image_view": "后跟", "cutout_mode": settings.CUTOUT_MODE, "logo_path": params.logo_path, "special_goods_art_no_folder_line": "", "is_use_excel": ( False if params.excel_path == "" else True ), # 是否使用excel "excel_path": params.excel_path, # excel路径 "is_check_color_is_all": False, "cutout_is_pass": True, "assigned_page_dict": {}, "detail_is_pass": True, "upload_is_pass": False, "upload_is_enable": False, "is_filter": False, "temp_class": temp_class, "temp_name": params.temp_name, "temp_name_list": temp_name_list, "target_error_folder": f"{path}/软件-生成详情错误", } # 动态导入类 temp_class_dict = {} for key, class_path in config_data["temp_class"].items(): module_path, class_name = class_path.rsplit(".", 1) module = importlib.import_module(module_path) cls = getattr(module, class_name) temp_class_dict[key] = cls config_data["temp_class"] = temp_class_dict obj = None run_main = RunMain(obj, token) return_data = run_main.check_before_cutout(config_data) cutout_res = run_main.check_for_cutout_image_first_call_back(return_data) check_for_detail_first_res = None if cutout_res == True: return_data_check_before_detail = run_main.check_before_detail( config_data ) print( "return_data_check_before_detail======> 测试 ==>", return_data_check_before_detail, ) check_for_detail_first_res = run_main.check_for_detail_first_call_back( return_data_check_before_detail ) if isinstance(check_for_detail_first_res, partial): result = check_for_detail_first_res() try: config_data = result["config_data"] except: config_data = result if config_data["sign_text"] == "已结束详情处理": # at_pic = AutoDealPics() print("config_data", config_data) if config_data["upload_is_enable"]: to_deal_dir = "{}/软件-详情图生成".format(config_data["image_dir"]) check_path(to_deal_dir) print("to_deal_dir", to_deal_dir) if os.path.exists(to_deal_dir): upload_pic = UploadPic( windows=None, to_deal_dir=to_deal_dir, config_data=config_data, token=token, ) upload_pic.run() out_put_dir = config_data["out_put_dir"] out_put_dir_path = "{}/{}".format(os.getcwd(), out_put_dir).replace( "\\", "/" ) handler_result_folder = os.path.dirname(out_put_dir_path) handler_result.append( {"goods_art_no": goods_art_no, "success": True, "info": "处理成功"} ) else: handler_result.append( {"goods_art_no": goods_art_no, "success": False, "info": "处理失败"} ) except Exception as e: handler_result.append( {"goods_art_no": goods_art_no, "success": False, "info": str(e)} ) handler_result_folder = "/".join(handler_result_folder.split("/")[:-1]) return { "code": 0, "msg": "", "data": {"output_folder": handler_result_folder, "list": handler_result}, } @app.post("/handle_detail") async def handle_detail(request: Request, params: HandlerDetail): goods_art_no_array = params.goods_art_no handler_result = [] handler_result_folder = "" if params.excel_path != "" or params.excel_path != None: return fromExcelHandler(params) for goods_art_no in goods_art_no_array: try: if not goods_art_no: raise UnicornException("货号不能为空") token = "Bearer " + params.token session = SqlQuery() pr = CRUD(PhotoRecord) images = pr.read_all(session, conditions={"goods_art_no": goods_art_no}) if not images: raise UnicornException("没有可用货号数据") image_dir = "{}/data/".format(os.getcwd()).replace("\\", "/") check_path(image_dir) for itemImg in images: if not os.path.exists( image_dir + "/" + os.path.basename(itemImg.image_path) ): shutil.copy(itemImg.image_path, image_dir) dealImage = DealImage(image_dir) resFlag, path = dealImage.dealMoveImage( image_dir=image_dir, callback_func=None, goods_art_no=goods_art_no ) if not resFlag: raise UnicornException(path) temp_class = {} temp_name_list = [] for tempItem in params.temp_list: temp_class[tempItem.template_id] = tempItem.template_local_classes temp_name_list.append(tempItem.template_id) config_data = { "image_dir": path, "image_order": params.template_image_order, "goods_art_no": goods_art_no, "is_check_number": False, "resize_image_view": "后跟", "cutout_mode": settings.CUTOUT_MODE, "logo_path": params.logo_path, "special_goods_art_no_folder_line": "", "is_use_excel": False if params.excel_path == "" else True, # 是否使用excel "excel_path": params.excel_path, # excel路径 "is_check_color_is_all": False, "cutout_is_pass": True, "assigned_page_dict": {}, "detail_is_pass": True, "upload_is_pass": False, "upload_is_enable": False, "is_filter": False, "temp_class": temp_class, "temp_name": params.temp_name, "temp_name_list": temp_name_list, "target_error_folder": f"{path}/软件-生成详情错误", } # 动态导入类 temp_class_dict = {} for key, class_path in config_data["temp_class"].items(): module_path, class_name = class_path.rsplit(".", 1) module = importlib.import_module(module_path) cls = getattr(module, class_name) temp_class_dict[key] = cls config_data["temp_class"] = temp_class_dict obj = None run_main = RunMain(obj,token) return_data = run_main.check_before_cutout(config_data) cutout_res = run_main.check_for_cutout_image_first_call_back(return_data) check_for_detail_first_res = None if cutout_res == True: return_data_check_before_detail = run_main.check_before_detail(config_data) print( "return_data_check_before_detail======> 测试 ==>", return_data_check_before_detail ) check_for_detail_first_res = run_main.check_for_detail_first_call_back( return_data_check_before_detail ) if isinstance(check_for_detail_first_res, partial): result = check_for_detail_first_res() try: config_data = result["config_data"] except: config_data = result if config_data["sign_text"] == "已结束详情处理": # at_pic = AutoDealPics() print("config_data", config_data) if config_data["upload_is_enable"]: to_deal_dir = "{}/软件-详情图生成".format(config_data["image_dir"]) check_path(to_deal_dir) print("to_deal_dir", to_deal_dir) if os.path.exists(to_deal_dir): upload_pic = UploadPic( windows=None, to_deal_dir=to_deal_dir, config_data=config_data, token=token, ) upload_pic.run() out_put_dir = config_data["out_put_dir"] out_put_dir_path = "{}/{}".format(os.getcwd(), out_put_dir).replace("\\", "/") handler_result_folder = os.path.dirname(out_put_dir_path) handler_result.append( {"goods_art_no": goods_art_no, "success": True, "info": "处理成功"} ) else: handler_result.append( {"goods_art_no": goods_art_no, "success": False, "info": "处理失败"} ) except Exception as e: handler_result.append( {"goods_art_no": goods_art_no, "success": False, "info": str(e)} ) return { "code": 0, "msg": "", "data": {"output_folder": handler_result_folder, "list": handler_result}, } @app.post("/get_device_configs", description="获取可执行程序命令列表") def get_device_configs(params: ModelGetDeviceConfig): mode_type = params.mode_type session = SqlQuery() configModel = CRUD(DeviceConfig) configList = configModel.read_all( session, conditions={"mode_type": mode_type}, order_by="action_index", ascending=True, ) return { "code": 0, "msg": "", "data": {"list": configList}, } @app.post("/device_config_detail", description="获取可执行程序详情") def get_device_configs(params: ModelGetDeviceConfigDetail): action_id = params.id session = SqlQuery() configModel = CRUD(DeviceConfig) model = configModel.read(session, conditions={"id": action_id}) if model == None: return {"code": 1, "msg": "数据不存在", "data": None} return {"code": 0, "msg": "", "data": model} @app.post("/device_config_detail_query", description="通过条件获取可执行程序详情") def get_device_configs(params: ModelGetDeviceConfigDetailQuery): mode_type = params.mode_type action_name = params.action_name session = SqlQuery() configModel = CRUD(DeviceConfig) model = configModel.read( session, conditions={"mode_type": mode_type, "action_name": action_name} ) if model == None: return {"code": 1, "msg": "数据不存在", "data": None} return {"code": 0, "msg": "", "data": model} @app.post("/remove_config", description="删除一条可执行命令") def get_device_configs(params: ModelGetDeviceConfigDetail): action_id = params.id session = SqlQuery() configModel = CRUD(DeviceConfig) model = configModel.read(session, conditions={"id": action_id}) if model == None: return {"code": 1, "msg": "数据不存在", "data": None} configModel.delete(session, obj_id=action_id) return {"code": 0, "msg": "删除成功", "data": None} @app.post("/save_device_config", description="创建或修改一条可执行命令") def save_device_config(params: SaveDeviceConfig): action_id = params.id session = SqlQuery() deviceConfig = CRUD(DeviceConfig) if action_id == None or action_id == 0: # 走新增逻辑 params.id = None save_device_config = deviceConfig.create(session, obj_in=params) else: model = deviceConfig.read(session, conditions={"id": action_id}) if model == None: return {"code": 1, "msg": "数据不存在", "data": None} # 走编辑逻辑 kwargs = params.__dict__ save_device_config = deviceConfig.update(session, obj_id=action_id, **kwargs) return {"code": 0, "msg": "操作成功", "data": save_device_config} @app.post("/reset_config", description="创建或修改一条可执行命令") def reset_config(params: ModelGetDeviceConfig): mode_type = params.mode_type if mode_type == None or mode_type == "": return {"code": 1, "msg": "参数错误", "data": None} session = SqlQuery() deviceConfig = CRUD(DeviceConfig) res = deviceConfig.deleteConditions(session, conditions={"mode_type": mode_type}) if res is False: return {"code": 1, "msg": "操作失败", "data": None} actions = json.load(open("action.json", encoding="utf-8")) act = [] for item in actions: if item.get("mode_type") == mode_type: act.append(item) batch_insert_device_configs(session, act) return {"code": 0, "msg": "操作成功", "data": None} @app.get("/get_photo_records", description="获取拍照记录") def get_photo_records(page: int = 1, size: int = 5): session = SqlQuery() photos = CRUD(PhotoRecord) statement = ( select(PhotoRecord) .offset((page - 1) * size) .limit(size) .order_by(desc("id")) .group_by("goods_art_no") ) list = [] result = session.exec(statement).all() for item in result: list_item = photos.read_all( session, conditions={"goods_art_no": item.goods_art_no} ) list.append( { "goods_art_no": item.goods_art_no, "action_time": item.create_time, "items": list_item, } ) session.close() return { "code": 0, "msg": "", "data": {"list": list, "page": page, "size": size}, } @app.get("/get_photo_record_detail", description="通过货号获取拍照记录详情") def get_photo_records(goods_art_no: str = None): if goods_art_no == None: return {"code": 1, "msg": "参数错误", "data": None} session = SqlQuery() photos = CRUD(PhotoRecord) items = photos.read_all(session, conditions={"goods_art_no": goods_art_no}) session.close() return { "code": 0, "msg": "", "data": {"list": items}, } @app.post("/delect_goods_arts", description="通过货号删除记录") def delect_goods_arts(params: PhotoRecordDelete): session = SqlQuery() photos = CRUD(PhotoRecord) for item in params.goods_art_nos: photos.deleteConditions(session, conditions={"goods_art_no": item}) session.close() return { "code": 0, "msg": "操作成功", "data": None, } @app.get("/get_sys_config", description="查询系统配置") def get_sys_config(key: str = None): if key == None: return {"code": 1, "msg": "参数错误", "data": None} session = SqlQuery() photos = CRUD(SysConfigs) item = photos.read(session, conditions={"key": key}) session.close() return { "code": 0, "msg": "", "data": json.loads(item.value), } @app.post("/update_sys_configs", description="创建或修改系统配置") def save_sys_configs(params: SysConfigParams): session = SqlQuery() sysConfig = CRUD(SysConfigs) model = sysConfig.read(session, conditions={"key": params.key}) if model == None: return {"code": 1, "msg": "配置不存在", "data": None} # 走编辑逻辑 kwargs = params.__dict__ save_device_config = sysConfig.updateConditions( session, conditions={"key": params.key}, **kwargs ) return {"code": 0, "msg": "操作成功", "data": save_device_config} @app.post("/create_main_image", description="创建主图测试") def create_main_image(params: MaineImageTest): file_path = params.file_path onePic = OnePicTest(pic_path=file_path) # session = SqlQuery() # sysConfig = CRUD(SysConfigs) # model = sysConfig.read(session, conditions={"key": params.key}) # if model == None: # return {"code": 1, "msg": "配置不存在", "data": None} # # 走编辑逻辑 # kwargs = params.__dict__ # save_device_config = sysConfig.updateConditions( # session, conditions={"key": params.key}, **kwargs # ) main_out_path = onePic.HandlerMainImage() return {"code": 0, "msg": "操作成功", "data": {"main_out_path": main_out_path}}