||
- from networkx.algorithms.components import connected
- from sqlmodel import Field, Session, SQLModel, create_engine, select
- from typing import Dict
- from datetime import datetime
- from typing import Optional
- import json
- from sqlalchemy import and_, desc, asc, delete
- from utils.utils_func import check_path
- from sqlalchemy.dialects import sqlite
- from model import DeviceConfig, PhotoRecord, SysConfigs, DeviceConfigTabs
- check_path("C:/Zhihuiyin")
- # 创建SQLite数据库引擎
- sqlite_file_name = "C:/Zhihuiyin/database.db"
- sqlite_url = f"sqlite:///{sqlite_file_name}"
- engine = create_engine(
- sqlite_url,
- echo=False,
- connect_args={"check_same_thread": False}, # 允许多线程访问
- pool_size=10,
- max_overflow=20,
- pool_timeout=30,
- pool_recycle=1800,
- )
- # 创建表
- def create_all_database():
- SQLModel.metadata.create_all(engine)
- # 创建会话
- def __get_session():
- with Session(engine) as session:
- try:
- yield session
- finally:
- session.close()
- # 创建一个通用的 CRUD 类
- class CRUD:
- def __init__(self, model):
- self.model = model
- def create(self, session: Session, obj_in):
- obj_in_data = dict(obj_in)
- db_obj = self.model(**obj_in_data)
- session.add(db_obj)
- session.commit()
- session.refresh(db_obj)
- return db_obj
- def truncate(self, session: Session):
- """
- 使用SQL删除语句清空所有记录(更高效)
- :param session: 数据库会话
- """
- stmt = delete(self.model)
- result = session.exec(stmt)
- session.commit()
- return result.rowcount
- def read(
- self,
- session: Session,
- conditions: Optional[Dict] = None,
- order_by: Optional[str] = None,
- ascending: bool = True,
- ):
- query = select(self.model)
- if conditions:
- query = query.where(
- and_(
- *(
- getattr(self.model, key) == value
- for key, value in conditions.items()
- )
- )
- )
- if order_by:
- if ascending:
- query = query.order_by(asc(getattr(self.model, order_by)))
- else:
- query = query.order_by(desc(getattr(self.model, order_by)))
- data = session.exec(query).first()
- return data
- def read_all(
- self,
- session: Session,
- conditions: Optional[Dict] = None,
- order_by: Optional[str] = None,
- ascending: bool = True,
- join_conditions: Optional[list] = None, # 新增:支持多个JOIN
- ):
- query = select(self.model)
- # 处理JOIN逻辑
- if join_conditions:
- for join_info in join_conditions:
- joined_model = join_info.get("model")
- on_clause = join_info.get("on")
- is_outer = join_info.get("is_outer", False)
- if not joined_model or not on_clause:
- continue
- if is_outer:
- query = query.outerjoin(joined_model, on_clause)
- else:
- query = query.join(joined_model, on_clause)
- if conditions:
- for key, value in conditions.items():
- column = getattr(self.model, key)
- if isinstance(value, list):
- # 如果值是列表,使用 IN 查询
- query = query.where(column.in_(value))
- else:
- # 否则使用等于条件
- query = query.where(column == value)
- if order_by:
- if ascending:
- query = query.order_by(asc(getattr(self.model, order_by)))
- else:
- query = query.order_by(desc(getattr(self.model, order_by)))
- data = session.exec(query).all()
- return data
- def update(self, session: Session, obj_id: int, **kwargs):
- db_obj = session.get(self.model, obj_id)
- for key, value in kwargs.items():
- setattr(db_obj, key, value)
- session.commit()
- session.refresh(db_obj)
- return db_obj
- def deleteConditions(
- self,
- session: Session,
- conditions: Optional[Dict] = None,
- ):
- query = select(self.model)
- if conditions == None:
- return False
- query = query.where(
- and_(
- *(
- getattr(self.model, key) == value
- for key, value in conditions.items()
- )
- )
- )
- objects_to_delete = session.exec(query).all()
- for obj in objects_to_delete:
- session.delete(obj)
- session.commit()
- # session.refresh()
- return True
- def delete(self, session: Session, obj_id: int):
- db_obj = session.get(self.model, obj_id)
- session.delete(db_obj)
- session.commit()
- # session.refresh()
- # 恢复 updateConditions 方法
- def updateConditions(self, session: Session, conditions: Dict, **kwargs):
- """
- 根据条件更新记录
- :param session: 数据库会话
- :param conditions: 更新条件字典
- :param kwargs: 需要更新的字段和值
- :return: 更新后的对象
- """
- query = select(self.model).where(
- and_(
- *(
- getattr(self.model, key) == value
- for key, value in conditions.items()
- )
- )
- )
- result = session.exec(query).first()
- if result:
- for key, value in kwargs.items():
- setattr(result, key, value)
- session.commit() # 提交事务以保存更改
- return result
- return None
- # 批量插入数据到设备配置表
- def batch_insert_device_configs(session: Session, action_tabs: list, data_list: list):
- """批量插入数据到设备配置表"""
- for idx, tab in enumerate(action_tabs):
- crud = CRUD(DeviceConfigTabs)
- device_tab = DeviceConfigTabs(
- mode_type=tab.get("mode_type"),
- mode_name=tab.get("mode_name"),
- )
- create_obj = crud.create(session, obj_in=device_tab)
- for data in data_list:
- data["tab_id"] = create_obj.id
- data["is_system"] = False
- if idx in [0, 6]:
- data["is_system"] = True
- device_config = DeviceConfig(**data)
- session.add(device_config)
- session.commit() # 合并事务提交
- def batch_insert_device_configsNew(
- session: Session, action_tabs: list, data_list: list
- ):
- """批量插入数据到设备配置表"""
- for idx, tab in enumerate(action_tabs):
- crud = CRUD(DeviceConfigTabs)
- device_tab = DeviceConfigTabs(
- id=tab.get("id"),
- mode_type=tab.get("mode_type"),
- mode_name=tab.get("mode_name"),
- )
- create_obj = crud.create(session, obj_in=device_tab)
- for data in data_list:
- # data["tab_id"] = create_obj.id
- # data["is_system"] = False
- # if idx in [0, 6]:
- # data["is_system"] = True
- device_config = DeviceConfig(**data)
- session.add(device_config)
- session.commit() # 合并事务提交
- # 批量插入系统配置
- def batch_insert_sys_configs(session: Session, data_list: list):
- """批量插入数据到设备配置表"""
- for data in data_list:
- print("data", data, type(data))
- config = SysConfigs(**data)
- session.add(config)
- session.commit() # 合并事务提交
- # 插入照片记录
- async def insert_photo_records(
- image_deal_mode: int, goods_art_no: str, image_index: int, action_id: int
- ):
- with SqlQuery() as session: # 使用上下文管理器复用会话
- """批量插入数据到照片记录"""
- data = {
- "image_deal_mode": image_deal_mode,
- "goods_art_no": goods_art_no,
- "image_index": image_index,
- "action_id": action_id,
- }
- device_config = PhotoRecord(**data)
- session.add(device_config)
- session.commit()
- session.refresh(device_config)
- record_id = device_config.id
- return True, record_id
- def SqlQuery():
- return next(__get_session())
- # 使用示例
- if __name__ == "__main__":
- pass
- # 使用 next 函数从生成器中获取 Session 对象
- # session = SqlQuery()
- # 创建 CRUD 实例
- # device_config_crud = CRUD(DeviceConfig)
- # 创建新记录
- # new_device_config = DeviceConfig(
- # mode_type="example_mode",
- # execution_type="example_execution",
- # action_name="example_action",
- # action_index=1,
- # picture_index=1,
- # camera_height=100,
- # camera_angle=45.5,
- # number_focus=2,
- # take_picture=True,
- # turntable_position=10.0,
- # turntable_angle=30.5,
- # shoe_upturn=False,
- # pre_delay=1.5,
- # after_delay=2.5,
- # led_switch=True,
- # is_wait=False,
- # )
- # created_device_config = device_config_crud.create(session, new_device_config)
- # print(f"Created Device Config: {created_device_config}")
- # 读取记录
- # read_device = device_config_crud.read(session, 1)
- # print(f"Read Device Config: {read_device.model_dump()}")
- # 读取所有记录
- # all_devices = device_config_crud.read_all(session, conditions={"id": 2})
- # print(f"All Device Configs: {[device.model_dump() for device in all_devices]}")
- # # 更新记录
- # updated_device = device_config_crud.update(
- # session, created_device_config.id, mode_type="updated_mode"
- # )
- # print(f"Updated Device Config: {updated_device}")
- # # 删除记录
- # device_config_crud.delete(session, created_device_config.id)
- # print("Device Config deleted.")
|