from networkx.algorithms.components import connected from sqlmodel import Field, Session, SQLModel, create_engine, select from typing import Dict from datetime import datetime from typing import Optional import json from sqlalchemy import and_, desc, asc from utils.utils_func import check_path from sqlalchemy.dialects import sqlite from model import DeviceConfig, PhotoRecord, SysConfigs, DeviceConfigTabs check_path("C:/Zhihuiyin") # 创建SQLite数据库引擎 sqlite_file_name = "C:/Zhihuiyin/database.db" sqlite_url = f"sqlite:///{sqlite_file_name}" engine = create_engine( sqlite_url, echo=False, connect_args={"check_same_thread": False}, # 允许多线程访问 pool_size=10, max_overflow=20, pool_timeout=30, pool_recycle=1800, ) # 创建表 def create_all_database(): SQLModel.metadata.create_all(engine) # 创建会话 def __get_session(): with Session(engine) as session: try: yield session finally: session.close() # 创建一个通用的 CRUD 类 class CRUD: def __init__(self, model): self.model = model def create(self, session: Session, obj_in): obj_in_data = dict(obj_in) db_obj = self.model(**obj_in_data) session.add(db_obj) session.commit() session.refresh(db_obj) return db_obj def read( self, session: Session, conditions: Optional[Dict] = None, order_by: Optional[str] = None, ascending: bool = True, ): query = select(self.model) if conditions: query = query.where( and_( *( getattr(self.model, key) == value for key, value in conditions.items() ) ) ) if order_by: if ascending: query = query.order_by(asc(getattr(self.model, order_by))) else: query = query.order_by(desc(getattr(self.model, order_by))) data = session.exec(query).first() return data def read_all( self, session: Session, conditions: Optional[Dict] = None, order_by: Optional[str] = None, ascending: bool = True, join_conditions: Optional[list] = None, # 新增:支持多个JOIN ): query = select(self.model) # 处理JOIN逻辑 if join_conditions: for join_info in join_conditions: joined_model = join_info.get("model") on_clause = join_info.get("on") is_outer = join_info.get("is_outer", False) if not joined_model or not on_clause: continue if is_outer: query = query.outerjoin(joined_model, on_clause) else: query = query.join(joined_model, on_clause) if conditions: for key, value in conditions.items(): column = getattr(self.model, key) if isinstance(value, list): # 如果值是列表,使用 IN 查询 query = query.where(column.in_(value)) else: # 否则使用等于条件 query = query.where(column == value) if order_by: if ascending: query = query.order_by(asc(getattr(self.model, order_by))) else: query = query.order_by(desc(getattr(self.model, order_by))) data = session.exec(query).all() return data def update(self, session: Session, obj_id: int, **kwargs): db_obj = session.get(self.model, obj_id) for key, value in kwargs.items(): setattr(db_obj, key, value) session.commit() session.refresh(db_obj) return db_obj def deleteConditions( self, session: Session, conditions: Optional[Dict] = None, ): query = select(self.model) if conditions == None: return False query = query.where( and_( *( getattr(self.model, key) == value for key, value in conditions.items() ) ) ) objects_to_delete = session.exec(query).all() for obj in objects_to_delete: session.delete(obj) session.commit() # session.refresh() return True def delete(self, session: Session, obj_id: int): db_obj = session.get(self.model, obj_id) session.delete(db_obj) session.commit() # session.refresh() # 恢复 updateConditions 方法 def updateConditions(self, session: Session, conditions: Dict, **kwargs): """ 根据条件更新记录 :param session: 数据库会话 :param conditions: 更新条件字典 :param kwargs: 需要更新的字段和值 :return: 更新后的对象 """ query = select(self.model).where( and_( *( getattr(self.model, key) == value for key, value in conditions.items() ) ) ) result = session.exec(query).first() if result: for key, value in kwargs.items(): setattr(result, key, value) session.commit() # 提交事务以保存更改 return result return None # 批量插入数据到设备配置表 def batch_insert_device_configs(session: Session, action_tabs: list, data_list: list): """批量插入数据到设备配置表""" for idx, tab in enumerate(action_tabs): crud = CRUD(DeviceConfigTabs) device_tab = DeviceConfigTabs( mode_type=tab.get("mode_type"), mode_name=tab.get("mode_name"), ) create_obj = crud.create(session, obj_in=device_tab) for data in data_list: data["tab_id"] = create_obj.id data["is_system"] = False if idx in [0, 6]: data["is_system"] = True device_config = DeviceConfig(**data) session.add(device_config) session.commit() # 合并事务提交 # 批量插入系统配置 def batch_insert_sys_configs(session: Session, data_list: list): """批量插入数据到设备配置表""" for data in data_list: config = SysConfigs(**data) session.add(config) session.commit() # 合并事务提交 # 插入照片记录 async def insert_photo_records( image_deal_mode: int, goods_art_no: str, image_index: int, action_id: int ): with SqlQuery() as session: # 使用上下文管理器复用会话 """批量插入数据到照片记录""" data = { "image_deal_mode": image_deal_mode, "goods_art_no": goods_art_no, "image_index": image_index, "action_id": action_id, } device_config = PhotoRecord(**data) session.add(device_config) session.commit() session.refresh(device_config) record_id = device_config.id return True, record_id def SqlQuery(): return next(__get_session()) # 使用示例 if __name__ == "__main__": pass # 使用 next 函数从生成器中获取 Session 对象 # session = SqlQuery() # 创建 CRUD 实例 # device_config_crud = CRUD(DeviceConfig) # 创建新记录 # new_device_config = DeviceConfig( # mode_type="example_mode", # execution_type="example_execution", # action_name="example_action", # action_index=1, # picture_index=1, # camera_height=100, # camera_angle=45.5, # number_focus=2, # take_picture=True, # turntable_position=10.0, # turntable_angle=30.5, # shoe_upturn=False, # pre_delay=1.5, # after_delay=2.5, # led_switch=True, # is_wait=False, # ) # created_device_config = device_config_crud.create(session, new_device_config) # print(f"Created Device Config: {created_device_config}") # 读取记录 # read_device = device_config_crud.read(session, 1) # print(f"Read Device Config: {read_device.model_dump()}") # 读取所有记录 # all_devices = device_config_crud.read_all(session, conditions={"id": 2}) # print(f"All Device Configs: {[device.model_dump() for device in all_devices]}") # # 更新记录 # updated_device = device_config_crud.update( # session, created_device_config.id, mode_type="updated_mode" # ) # print(f"Updated Device Config: {updated_device}") # # 删除记录 # device_config_crud.delete(session, created_device_config.id) # print("Device Config deleted.")