databases.py 7.1 KB


  1. from networkx.algorithms.components import connected
  2. from sqlmodel import Field, Session, SQLModel, create_engine, select
  3. from typing import Dict
  4. from datetime import datetime
  5. from typing import Optional
  6. import json
  7. from sqlalchemy import and_, desc, asc
  8. from sqlalchemy.dialects import sqlite
  9. from model import DeviceConfig, PhotoRecord, SysConfigs
  10. # 创建SQLite数据库引擎
  11. sqlite_file_name = "database.db"
  12. sqlite_url = f"sqlite:///{sqlite_file_name}"
  13. engine = create_engine(
  14. sqlite_url,
  15. echo=False,
  16. pool_size=10,
  17. max_overflow=20,
  18. pool_timeout=30,
  19. pool_recycle=1800,
  20. )
  21. # 创建表
  22. def create_all_database():
  23. SQLModel.metadata.create_all(engine)
  24. # 创建会话
  25. def __get_session():
  26. with Session(engine) as session:
  27. try:
  28. yield session
  29. finally:
  30. session.close()
  31. def batch_insert_device_configs(session: Session, data_list: list):
  32. """批量插入数据到设备配置表"""
  33. for data in data_list:
  34. device_config = DeviceConfig(**data)
  35. session.add(device_config)
  36. session.commit()
  37. session.close()
  38. def batch_insert_sys_configs(session: Session, data_list: list):
  39. """批量插入数据到设备配置表"""
  40. for data in data_list:
  41. config = SysConfigs(**data)
  42. session.add(config)
  43. session.commit()
  44. session.close()
  45. def insert_photo_records(
  46. image_deal_mode: int, goods_art_no: str, image_index: int, action_id: int
  47. ):
  48. session = SqlQuery()
  49. """批量插入数据到照片记录"""
  50. data = {
  51. "image_deal_mode": image_deal_mode,
  52. "goods_art_no": goods_art_no,
  53. "image_index": image_index,
  54. "action_id": action_id,
  55. }
  56. device_config = PhotoRecord(**data)
  57. session.add(device_config)
  58. session.commit()
  59. session.close()
  60. return True
  61. # 创建一个通用的 CRUD 类
  62. class CRUD:
  63. def __init__(self, model):
  64. self.model = model
  65. def create(self, session: Session, obj_in):
  66. obj_in_data = dict(obj_in)
  67. db_obj = self.model(**obj_in_data)
  68. session.add(db_obj)
  69. session.commit()
  70. session.refresh(db_obj)
  71. session.close()
  72. return db_obj
  73. def read(
  74. self,
  75. session: Session,
  76. conditions: Optional[Dict] = None,
  77. order_by: Optional[str] = None,
  78. ascending: bool = True,
  79. ):
  80. query = select(self.model)
  81. if conditions:
  82. query = query.where(
  83. and_(
  84. *(
  85. getattr(self.model, key) == value
  86. for key, value in conditions.items()
  87. )
  88. )
  89. )
  90. if order_by:
  91. if ascending:
  92. query = query.order_by(asc(getattr(self.model, order_by)))
  93. else:
  94. query = query.order_by(desc(getattr(self.model, order_by)))
  95. data = session.exec(query).first()
  96. session.close()
  97. return data
  98. def read_all(
  99. self,
  100. session: Session,
  101. conditions: Optional[Dict] = None,
  102. order_by: Optional[str] = None,
  103. ascending: bool = True,
  104. ):
  105. query = select(self.model)
  106. if conditions:
  107. query = query.where(
  108. and_(
  109. *(
  110. getattr(self.model, key) == value
  111. for key, value in conditions.items()
  112. )
  113. )
  114. )
  115. if order_by:
  116. if ascending:
  117. query = query.order_by(asc(getattr(self.model, order_by)))
  118. else:
  119. query = query.order_by(desc(getattr(self.model, order_by)))
  120. data = session.exec(query).all()
  121. session.close()
  122. return data
  123. def update(self, session: Session, obj_id: int, **kwargs):
  124. db_obj = session.get(self.model, obj_id)
  125. for key, value in kwargs.items():
  126. # if value == None or value == "":
  127. # continue
  128. setattr(db_obj, key, value)
  129. session.commit()
  130. session.refresh(db_obj)
  131. session.close()
  132. return db_obj
  133. def updateConditions(
  134. self, session: Session, conditions: Optional[Dict] = None, **kwargs
  135. ):
  136. query = select(self.model)
  137. if conditions:
  138. query = query.where(
  139. and_(
  140. *(
  141. getattr(self.model, key) == value
  142. for key, value in conditions.items()
  143. )
  144. )
  145. )
  146. data = session.exec(query).first()
  147. for key, value in kwargs.items():
  148. if value == None or value == "":
  149. continue
  150. setattr(data, key, value)
  151. session.commit()
  152. session.refresh(data)
  153. session.close()
  154. return data
  155. def deleteConditions(
  156. self,
  157. session: Session,
  158. conditions: Optional[Dict] = None,
  159. ):
  160. query = select(self.model)
  161. if conditions == None:
  162. return False
  163. query = query.where(
  164. and_(
  165. *(
  166. getattr(self.model, key) == value
  167. for key, value in conditions.items()
  168. )
  169. )
  170. )
  171. objects_to_delete = session.exec(query).all()
  172. for obj in objects_to_delete:
  173. session.delete(obj)
  174. session.commit()
  175. session.close()
  176. return True
  177. def delete(self, session: Session, obj_id: int):
  178. db_obj = session.get(self.model, obj_id)
  179. session.delete(db_obj)
  180. session.commit()
  181. session.close()
  182. def SqlQuery():
  183. return next(__get_session())
  184. # 使用示例
  185. if __name__ == "__main__":
  186. pass
  187. # 使用 next 函数从生成器中获取 Session 对象
  188. # session = SqlQuery()
  189. # 创建 CRUD 实例
  190. # device_config_crud = CRUD(DeviceConfig)
  191. # 创建新记录
  192. # new_device_config = DeviceConfig(
  193. # mode_type="example_mode",
  194. # execution_type="example_execution",
  195. # action_name="example_action",
  196. # action_index=1,
  197. # picture_index=1,
  198. # camera_height=100,
  199. # camera_angle=45.5,
  200. # number_focus=2,
  201. # take_picture=True,
  202. # turntable_position=10.0,
  203. # turntable_angle=30.5,
  204. # shoe_upturn=False,
  205. # pre_delay=1.5,
  206. # after_delay=2.5,
  207. # led_switch=True,
  208. # is_wait=False,
  209. # )
  210. # created_device_config = device_config_crud.create(session, new_device_config)
  211. # print(f"Created Device Config: {created_device_config}")
  212. # 读取记录
  213. # read_device = device_config_crud.read(session, 1)
  214. # print(f"Read Device Config: {read_device.model_dump()}")
  215. # 读取所有记录
  216. # all_devices = device_config_crud.read_all(session, conditions={"id": 2})
  217. # print(f"All Device Configs: {[device.model_dump() for device in all_devices]}")
  218. # # 更新记录
  219. # updated_device = device_config_crud.update(
  220. # session, created_device_config.id, mode_type="updated_mode"
  221. # )
  222. # print(f"Updated Device Config: {updated_device}")
  223. # # 删除记录
  224. # device_config_crud.delete(session, created_device_config.id)
  225. # print("Device Config deleted.")