databases.py 8.6 KB


  1. from networkx.algorithms.components import connected
  2. from sqlmodel import Field, Session, SQLModel, create_engine, select
  3. from typing import Dict
  4. from typing import Optional
  5. from sqlalchemy import and_, desc, asc
  6. from utils.utils_func import check_path
  7. from model import DeviceConfig, PhotoRecord, SysConfigs, DeviceConfigTabs
  8. check_path("C:/Zhihuiyin")
  9. # 创建SQLite数据库引擎
  10. sqlite_file_name = "C:/Zhihuiyin/database.db"
  11. sqlite_url = f"sqlite:///{sqlite_file_name}"
  12. engine = create_engine(
  13. sqlite_url,
  14. echo=False,
  15. connect_args={"check_same_thread": False}, # 允许多线程访问
  16. pool_size=10,
  17. max_overflow=20,
  18. pool_timeout=30,
  19. pool_recycle=1800,
  20. )
  21. # 创建表
  22. def create_all_database():
  23. SQLModel.metadata.create_all(engine)
  24. # 创建会话
  25. def __get_session():
  26. with Session(engine) as session:
  27. try:
  28. yield session
  29. finally:
  30. session.close()
  31. # 创建一个通用的 CRUD 类
  32. class CRUD:
  33. def __init__(self, model):
  34. self.model = model
  35. def create(self, session: Session, obj_in):
  36. obj_in_data = dict(obj_in)
  37. db_obj = self.model(**obj_in_data)
  38. session.add(db_obj)
  39. session.commit()
  40. session.refresh(db_obj)
  41. return db_obj
  42. def read(
  43. self,
  44. session: Session,
  45. conditions: Optional[Dict] = None,
  46. order_by: Optional[str] = None,
  47. ascending: bool = True,
  48. ):
  49. query = select(self.model)
  50. if conditions:
  51. query = query.where(
  52. and_(
  53. *(
  54. getattr(self.model, key) == value
  55. for key, value in conditions.items()
  56. )
  57. )
  58. )
  59. if order_by:
  60. if ascending:
  61. query = query.order_by(asc(getattr(self.model, order_by)))
  62. else:
  63. query = query.order_by(desc(getattr(self.model, order_by)))
  64. data = session.exec(query).first()
  65. return data
  66. def read_all(
  67. self,
  68. session: Session,
  69. conditions: Optional[Dict] = None,
  70. order_by: Optional[str] = None,
  71. ascending: bool = True,
  72. join_conditions: Optional[list] = None, # 新增:支持多个JOIN
  73. ):
  74. query = select(self.model)
  75. # 处理JOIN逻辑
  76. if join_conditions:
  77. for join_info in join_conditions:
  78. joined_model = join_info.get("model")
  79. on_clause = join_info.get("on")
  80. is_outer = join_info.get("is_outer", False)
  81. if not joined_model or not on_clause:
  82. continue
  83. if is_outer:
  84. query = query.outerjoin(joined_model, on_clause)
  85. else:
  86. query = query.join(joined_model, on_clause)
  87. if conditions:
  88. for key, value in conditions.items():
  89. column = getattr(self.model, key)
  90. if isinstance(value, list):
  91. # 如果值是列表,使用 IN 查询
  92. query = query.where(column.in_(value))
  93. else:
  94. # 否则使用等于条件
  95. query = query.where(column == value)
  96. if order_by:
  97. if ascending:
  98. query = query.order_by(asc(getattr(self.model, order_by)))
  99. else:
  100. query = query.order_by(desc(getattr(self.model, order_by)))
  101. data = session.exec(query).all()
  102. return data
  103. def update(self, session: Session, obj_id: int, **kwargs):
  104. db_obj = session.get(self.model, obj_id)
  105. for key, value in kwargs.items():
  106. setattr(db_obj, key, value)
  107. session.commit()
  108. session.refresh(db_obj)
  109. return db_obj
  110. def deleteConditions(
  111. self,
  112. session: Session,
  113. conditions: Optional[Dict] = None,
  114. ):
  115. query = select(self.model)
  116. if conditions == None:
  117. return False
  118. query = query.where(
  119. and_(
  120. *(
  121. getattr(self.model, key) == value
  122. for key, value in conditions.items()
  123. )
  124. )
  125. )
  126. objects_to_delete = session.exec(query).all()
  127. for obj in objects_to_delete:
  128. session.delete(obj)
  129. session.commit()
  130. # session.refresh()
  131. return True
  132. def delete(self, session: Session, obj_id: int):
  133. db_obj = session.get(self.model, obj_id)
  134. session.delete(db_obj)
  135. session.commit()
  136. # session.refresh()
  137. # 恢复 updateConditions 方法
  138. def updateConditions(self, session: Session, conditions: Dict, **kwargs):
  139. """
  140. 根据条件更新记录
  141. :param session: 数据库会话
  142. :param conditions: 更新条件字典
  143. :param kwargs: 需要更新的字段和值
  144. :return: 更新后的对象
  145. """
  146. query = select(self.model).where(
  147. and_(
  148. *(
  149. getattr(self.model, key) == value
  150. for key, value in conditions.items()
  151. )
  152. )
  153. )
  154. result = session.exec(query).first()
  155. if result:
  156. for key, value in kwargs.items():
  157. setattr(result, key, value)
  158. session.commit() # 提交事务以保存更改
  159. return result
  160. return None
  161. # 批量插入数据到设备配置表
  162. def batch_insert_device_configs(session: Session, action_tabs: list, data_list: list):
  163. """批量插入数据到设备配置表"""
  164. for idx, tab in enumerate(action_tabs):
  165. crud = CRUD(DeviceConfigTabs)
  166. device_tab = DeviceConfigTabs(
  167. mode_type=tab.get("mode_type"),
  168. mode_name=tab.get("mode_name"),
  169. )
  170. create_obj = crud.create(session, obj_in=device_tab)
  171. for data in data_list:
  172. data["tab_id"] = create_obj.id
  173. data["is_system"] = False
  174. if idx in [0, 6]:
  175. data["is_system"] = True
  176. device_config = DeviceConfig(**data)
  177. session.add(device_config)
  178. session.commit() # 合并事务提交
  179. # 批量插入系统配置
  180. def batch_insert_sys_configs(session: Session, data_list: list):
  181. """批量插入数据到设备配置表"""
  182. for data in data_list:
  183. config = SysConfigs(**data)
  184. session.add(config)
  185. session.commit() # 合并事务提交
  186. # 插入照片记录
  187. async def insert_photo_records(
  188. image_deal_mode: int, goods_art_no: str, image_index: int, action_id: int
  189. ):
  190. with SqlQuery() as session: # 使用上下文管理器复用会话
  191. """批量插入数据到照片记录"""
  192. data = {
  193. "image_deal_mode": image_deal_mode,
  194. "goods_art_no": goods_art_no,
  195. "image_index": image_index,
  196. "action_id": action_id,
  197. }
  198. device_config = PhotoRecord(**data)
  199. session.add(device_config)
  200. session.commit()
  201. session.refresh(device_config)
  202. record_id = device_config.id
  203. return True, record_id
  204. def SqlQuery():
  205. return next(__get_session())
  206. # 使用示例
  207. if __name__ == "__main__":
  208. pass
  209. # 使用 next 函数从生成器中获取 Session 对象
  210. # session = SqlQuery()
  211. # 创建 CRUD 实例
  212. # device_config_crud = CRUD(DeviceConfig)
  213. # 创建新记录
  214. # new_device_config = DeviceConfig(
  215. # mode_type="example_mode",
  216. # execution_type="example_execution",
  217. # action_name="example_action",
  218. # action_index=1,
  219. # picture_index=1,
  220. # camera_height=100,
  221. # camera_angle=45.5,
  222. # number_focus=2,
  223. # take_picture=True,
  224. # turntable_position=10.0,
  225. # turntable_angle=30.5,
  226. # shoe_upturn=False,
  227. # pre_delay=1.5,
  228. # after_delay=2.5,
  229. # led_switch=True,
  230. # is_wait=False,
  231. # )
  232. # created_device_config = device_config_crud.create(session, new_device_config)
  233. # print(f"Created Device Config: {created_device_config}")
  234. # 读取记录
  235. # read_device = device_config_crud.read(session, 1)
  236. # print(f"Read Device Config: {read_device.model_dump()}")
  237. # 读取所有记录
  238. # all_devices = device_config_crud.read_all(session, conditions={"id": 2})
  239. # print(f"All Device Configs: {[device.model_dump() for device in all_devices]}")
  240. # # 更新记录
  241. # updated_device = device_config_crud.update(
  242. # session, created_device_config.id, mode_type="updated_mode"
  243. # )
  244. # print(f"Updated Device Config: {updated_device}")
  245. # # 删除记录
  246. # device_config_crud.delete(session, created_device_config.id)
  247. # print("Device Config deleted.")