from networkx.algorithms.components import connected from sqlmodel import Field, Session, SQLModel, create_engine, select from typing import Dict from datetime import datetime from typing import Optional import json from sqlalchemy import and_, asc,desc from model import DeviceConfig, PhotoRecord # 创建SQLite数据库引擎 sqlite_file_name = "database.db" sqlite_url = f"sqlite:///{sqlite_file_name}" engine = create_engine(sqlite_url, echo=True) # 创建表 def create_all_database(): SQLModel.metadata.create_all(engine) # 创建会话 def __get_session(): with Session(engine) as session: yield session def batch_insert_device_configs(session: Session, data_list: list): '''批量插入数据到设备配置表''' for data in data_list: device_config = DeviceConfig(**data) session.add(device_config) session.commit() # 创建一个通用的 CRUD 类 class CRUD: def __init__(self, model): self.model = model def create(self, session: Session, obj_in): obj_in_data = dict(obj_in) db_obj = self.model(**obj_in_data) session.add(db_obj) session.commit() session.refresh(db_obj) return db_obj def read( self, session: Session, conditions: Optional[Dict] = None, order_by: Optional[str] = None, ascending: bool = True, ): query = select(self.model) if conditions: query = query.where(and_(*(getattr(self.model, key) == value for key, value in conditions.items()))) if order_by: if ascending: query = query.order_by(asc(getattr(self.model, order_by))) else: query = query.order_by(desc(getattr(self.model, order_by))) return session.exec(query).first() def read_all( self, session: Session, conditions: Optional[Dict] = None, order_by: Optional[str] = None, ascending: bool = True, ): query = select(self.model) if conditions: query = query.where(and_(*(getattr(self.model, key) == value for key, value in conditions.items()))) if order_by: if ascending: query = query.order_by(asc(getattr(self.model, order_by))) else: query = query.order_by(desc(getattr(self.model, order_by))) return session.exec(query).all() def update(self, session: Session, obj_id: int, **kwargs): db_obj = session.get(self.model, obj_id) for key, value in kwargs.items(): if value == None or value =="": continue setattr(db_obj, key, value) session.commit() session.refresh(db_obj) return db_obj def delete(self, session: Session, obj_id: int): db_obj = session.get(self.model, obj_id) session.delete(db_obj) session.commit() def SqlQuery(): return next(__get_session()) # 使用示例 if __name__ == "__main__": pass # 使用 next 函数从生成器中获取 Session 对象 # session = SqlQuery() # 创建 CRUD 实例 # device_config_crud = CRUD(DeviceConfig) # 创建新记录 # new_device_config = DeviceConfig( # mode_type="example_mode", # execution_type="example_execution", # action_name="example_action", # action_index=1, # picture_index=1, # camera_height=100, # camera_angle=45.5, # number_focus=2, # take_picture=True, # turntable_position=10.0, # turntable_angle=30.5, # shoe_upturn=False, # pre_delay=1.5, # after_delay=2.5, # led_switch=True, # is_wait=False, # ) # created_device_config = device_config_crud.create(session, new_device_config) # print(f"Created Device Config: {created_device_config}") # 读取记录 # read_device = device_config_crud.read(session, 1) # print(f"Read Device Config: {read_device.model_dump()}") # 读取所有记录 # all_devices = device_config_crud.read_all(session, conditions={"id": 2}) # print(f"All Device Configs: {[device.model_dump() for device in all_devices]}") # # 更新记录 # updated_device = device_config_crud.update( # session, created_device_config.id, mode_type="updated_mode" # ) # print(f"Updated Device Config: {updated_device}") # # 删除记录 # device_config_crud.delete(session, created_device_config.id) # print("Device Config deleted.")