Просмотр исходного кода

```
feat(api): 添加重命名阴影文件功能并优化文件操作

- 添加了 copy_directory_walk 函数用于安全复制目录
- 添加了 rename_file_safe 函数用于安全重命名文件
- 新增 /rename_shadow_folder API 端点用于批量重命名阴影文件
- 添加 RenameShadow 数据模型用于接收货号数组参数
- 在 socket 消息处理中添加货号特殊字符验证功能
- 修复了同步照片记录接口的返回信息
- 优化了文件路径处理和目录操作逻辑
```

rambo 2 недель назад
Родитель
Сommit
932947d9f2
3 измененных файлов с 138 добавлено и 4 удалено
  1. 111 2
      python/api.py
  2. 6 1
      python/models.py
  3. 21 1
      python/sockets/message_handler.py

+ 111 - 2
python/api.py

@@ -1,4 +1,4 @@
-from re import search
+from re import search,match
 from natsort.natsort import order_by_index
 from sqlalchemy import func
 from models import *
@@ -1717,4 +1717,113 @@ async def syncPhotoRecord(params:SyncPhotoRecord):
     # 最终转换为JSON字符串
     settings.syncPhotoRecord(json_results,action_type=0)
     session.close()
-    return {"code": 0, "message": "同步完成", "data": None}
+    return {"code": 0, "message": "同步完成", "data": None}
+def copy_directory_walk(src, dst):
+    """
+    使用 os.walk() 遍历并复制目录
+    """
+    for root, dirs, files in os.walk(src):
+        # 计算相对路径
+        rel_path = os.path.relpath(root, src)
+        dst_dir = os.path.join(dst, rel_path) if rel_path != '.' else dst
+        # 创建目标目录
+        if not os.path.exists(dst_dir):
+            os.makedirs(dst_dir)
+        # 复制文件
+        for file in files:
+            src_file = os.path.join(root, file)
+            dst_file = os.path.join(dst_dir, file)
+            shutil.copy2(src_file, dst_file)
+def rename_file_safe(src, dst, overwrite=True):
+    """
+    安全地重命名文件
+    
+    Args:
+        src: 源文件路径
+        dst: 新文件路径
+        overwrite: 是否覆盖已存在的文件
+    """
+    try:
+        # 检查源文件是否存在
+        if not os.path.exists(src):
+            return False, f"源文件 {src} 不存在"
+        
+        # 检查目标文件是否存在
+        if os.path.exists(dst) and not overwrite:
+            return False, f"目标文件 {dst} 已存在,设置 overwrite=True 以覆盖"
+        
+        # 执行重命名
+        os.rename(src, dst)
+        return True, f"文件已成功从 {src} 重命名为 {dst}"
+    except OSError as e:
+        return False, f"重命名失败: {str(e)}"
+@app.post("/rename_shadow_folder", description="同步本地拍照记录-和output目录")
+async def rename_shadow_folder(params:RenameShadow):
+    # 货号数组
+    goods_art_nos = params.goods_art_nos
+    # for goods_art_no in goods_art_nos:
+    # 查询这些货号的所有记录
+    goods_art_dict = {}
+    for goods in goods_art_nos:
+        query = (
+            select(PhotoRecord, DeviceConfig.action_name)
+            .outerjoin(DeviceConfig, PhotoRecord.action_id == DeviceConfig.id)
+            .where(PhotoRecord.goods_art_no.in_(goods_art_nos))
+            .where(PhotoRecord.delete_time == None)
+            .order_by(PhotoRecord.goods_art_no, asc("id"))  # 按货号分组并按ID倒序
+        )
+        all_items = session.exec(query).mappings().all()
+        if len(all_items) == 0:  # 如果没有记录则返回
+            # raise UnicornException("暂无可用货号")
+            continue
+        goods_art_rename_list = []
+        for item in all_items:
+            if item["action_name"] == None:
+                continue
+            data = {"goods_art_no": item.PhotoRecord.goods_art_no, "action_name": item["action_name"],"image_index":item.PhotoRecord.image_index}
+            goods_art_rename_list.append(data)
+        goods_art_dict[goods] = goods_art_rename_list
+    outputDir = settings.OUTPUT_DIR
+    if not os.path.exists(outputDir):
+        raise UnicornException(f"生成目录[{outputDir}]暂无文件,请拍摄或抠图后处理")
+    outputList = os.listdir(outputDir)
+    success_result = []
+    for firstDir in outputList:
+        secondPath = f"{outputDir}\\{firstDir}"
+        for goodsArt in goods_art_dict:
+            goods_art_no_obj = goods_art_dict[goodsArt]
+            goodsArtPath = f"{secondPath}\\{goodsArt}"
+            if not os.path.exists(goodsArtPath):
+                continue
+            renameSrcPath = f"{goodsArtPath}\\阴影图处理"
+            renameDstPath = f"{goodsArtPath}\\阴影图处理-重命名"
+            if not os.path.exists(renameSrcPath):
+                print("阴影图目录不存在...","不处理")
+                continue
+            if len(os.listdir(renameSrcPath)) == 0:
+                print("阴影图处理目录无内容...","不处理")
+                continue
+            if not os.path.exists(renameDstPath):
+                try:
+                    copy_directory_walk(renameSrcPath,renameDstPath)
+                except Exception as e:
+                    print("重命名失败",e)
+                    continue
+            dstPath = os.listdir(renameDstPath)
+            for dts_files in dstPath:
+                for goods_obj in goods_art_no_obj:
+                    goods_art_no_name = goods_obj["goods_art_no"]
+                    image_index = goods_obj["image_index"]
+                    action_name = goods_obj["action_name"]
+                    image_index+=1
+                    basic_name = f"{goods_art_no_name}({image_index})"
+                    if basic_name in dts_files:
+                        if "抠图" in dts_files:
+                            is_ok,msg =  rename_file_safe(f"{renameDstPath}\\{dts_files}",f"{renameDstPath}\\{basic_name}_{action_name}_抠图.png")
+                            print(is_ok,msg)
+                        if "阴影" in dts_files:
+                            is_ok,msg = rename_file_safe(f"{renameDstPath}\\{dts_files}",f"{renameDstPath}\\{basic_name}_{action_name}_阴影.png")
+                            print(is_ok,msg)
+                        print("goods_obj",goods_obj)
+            success_result.append({"goods_art_no":goods_art_no_name,"path":renameDstPath})
+    return {"code": 0, "message": "重命名完成", "data": {"result": success_result}}

+ 6 - 1
python/models.py

@@ -166,4 +166,9 @@ class SyncPhotoRecord(BaseModel):
     """同步图片记录"""
 
     token: str = Field(default=None, description="用户token")
-    env: str = Field(default="dev", description="当前环境")
+    env: str = Field(default="dev", description="当前环境")
+    
+    
+class RenameShadow(BaseModel):
+    """重命名阴影文件"""
+    goods_art_nos: list[str] = Field(default=None, description="货号数组")

+ 21 - 1
python/sockets/message_handler.py

@@ -111,7 +111,20 @@ def handlerFolderDelete(limit_path, goods_art_no_arrays, is_write_txt_log):
                         logger.info(f"抠图前目录删除出现问题--Exception:{str(e)};{retry_count}")
                         time.sleep(0.5)  # 等待0.5秒后重试
     
-    return move_folder_array                
+    return move_folder_array
+def validate_goods_art_no(self, goods_art_no):
+        """
+        验证货号输入是否包含特殊字符
+        """
+        import re
+        # 定义不允许的特殊字符,主要是文件系统中可能导致路径问题的字符
+        invalid_chars = r'[<>:"/\\|?*]'
+        if re.search(invalid_chars, goods_art_no):
+            # 找出所有非法字符
+            invalid_found = re.findall(invalid_chars, goods_art_no)
+            invalid_str = ', '.join(set(invalid_found))
+            return False,f"货号包含非法字符: {invalid_str},请修改后再提交"
+        return True,""
 # socket消息发送逻辑处理方法
 async def handlerSend(
     manager: ConnectionManager,
@@ -199,6 +212,13 @@ async def handlerSend(
                 )
                 await manager.send_personal_message(data, websocket)
                 return
+            is_ok, msg = validate_goods_art_no(goods_art_no)
+            if not is_ok:
+                data = manager.jsonMessage(
+                    code=1, msg=msg, msg_type=msg_type
+                )
+                await manager.send_personal_message(data, websocket)
+                return
             session = SqlQuery()
             sys_configs = CRUD(SysConfigs)
             action_configs = sys_configs.read(