databases.py 6.8 KB


  1. from networkx.algorithms.components import connected
  2. from sqlmodel import Field, Session, SQLModel, create_engine, select
  3. from typing import Dict
  4. from datetime import datetime
  5. from typing import Optional
  6. import json
  7. from sqlalchemy import and_, desc,asc
  8. from sqlalchemy.dialects import sqlite
  9. from model import DeviceConfig, PhotoRecord, SysConfigs
  10. # 创建SQLite数据库引擎
  11. sqlite_file_name = "database.db"
  12. sqlite_url = f"sqlite:///{sqlite_file_name}"
  13. engine = create_engine(
  14. sqlite_url,
  15. echo=False,
  16. pool_size=10,
  17. max_overflow=20,
  18. pool_timeout=30,
  19. pool_recycle=1800,
  20. )
  21. # 创建表
  22. def create_all_database():
  23. SQLModel.metadata.create_all(engine)
  24. # 创建会话
  25. def __get_session():
  26. with Session(engine) as session:
  27. try:
  28. yield session
  29. finally:
  30. session.close()
  31. def batch_insert_device_configs(session: Session, data_list: list):
  32. '''批量插入数据到设备配置表'''
  33. for data in data_list:
  34. device_config = DeviceConfig(**data)
  35. session.add(device_config)
  36. session.commit()
  37. session.close()
  38. def batch_insert_sys_configs(session: Session, data_list: list):
  39. """批量插入数据到设备配置表"""
  40. for data in data_list:
  41. config = SysConfigs(**data)
  42. session.add(config)
  43. session.commit()
  44. session.close()
  45. def insert_photo_records(image_deal_mode: int, goods_art_no: str, image_index:int):
  46. session = SqlQuery()
  47. """批量插入数据到照片记录"""
  48. data = {
  49. "image_deal_mode": image_deal_mode,
  50. "goods_art_no": goods_art_no,
  51. "image_index": image_index,
  52. }
  53. device_config = PhotoRecord(**data)
  54. session.add(device_config)
  55. session.commit()
  56. session.close()
  57. return True
  58. # 创建一个通用的 CRUD 类
  59. class CRUD:
  60. def __init__(self, model):
  61. self.model = model
  62. def create(self, session: Session, obj_in):
  63. obj_in_data = dict(obj_in)
  64. db_obj = self.model(**obj_in_data)
  65. session.add(db_obj)
  66. session.commit()
  67. session.refresh(db_obj)
  68. session.close()
  69. return db_obj
  70. def read(
  71. self,
  72. session: Session,
  73. conditions: Optional[Dict] = None,
  74. order_by: Optional[str] = None,
  75. ascending: bool = True,
  76. ):
  77. query = select(self.model)
  78. if conditions:
  79. query = query.where(and_(*(getattr(self.model, key) == value for key, value in conditions.items())))
  80. if order_by:
  81. if ascending:
  82. query = query.order_by(asc(getattr(self.model, order_by)))
  83. else:
  84. query = query.order_by(desc(getattr(self.model, order_by)))
  85. data = session.exec(query).first()
  86. session.close()
  87. return data
  88. def read_all(
  89. self,
  90. session: Session,
  91. conditions: Optional[Dict] = None,
  92. order_by: Optional[str] = None,
  93. ascending: bool = True,
  94. ):
  95. query = select(self.model)
  96. if conditions:
  97. query = query.where(and_(*(getattr(self.model, key) == value for key, value in conditions.items())))
  98. if order_by:
  99. if ascending:
  100. query = query.order_by(asc(getattr(self.model, order_by)))
  101. else:
  102. query = query.order_by(desc(getattr(self.model, order_by)))
  103. data = session.exec(query).all()
  104. session.close()
  105. return data
  106. def update(self, session: Session, obj_id: int, **kwargs):
  107. db_obj = session.get(self.model, obj_id)
  108. for key, value in kwargs.items():
  109. if value == None or value =="":
  110. continue
  111. setattr(db_obj, key, value)
  112. session.commit()
  113. session.refresh(db_obj)
  114. session.close()
  115. return db_obj
  116. def updateConditions(
  117. self,
  118. session: Session,
  119. conditions: Optional[Dict] = None,
  120. **kwargs
  121. ):
  122. query = select(self.model)
  123. if conditions:
  124. query = query.where(
  125. and_(
  126. *(
  127. getattr(self.model, key) == value
  128. for key, value in conditions.items()
  129. )
  130. )
  131. )
  132. data = session.exec(query).first()
  133. for key, value in kwargs.items():
  134. if value == None or value == "":
  135. continue
  136. setattr(data, key, value)
  137. session.commit()
  138. session.refresh(data)
  139. session.close()
  140. return data
  141. def deleteConditions(
  142. self,
  143. session: Session,
  144. conditions: Optional[Dict] = None,
  145. ):
  146. query = select(self.model)
  147. if conditions == None:
  148. return False
  149. query = query.where(
  150. and_(
  151. *(
  152. getattr(self.model, key) == value
  153. for key, value in conditions.items()
  154. )
  155. )
  156. )
  157. objects_to_delete = session.exec(query).all()
  158. for obj in objects_to_delete:
  159. session.delete(obj)
  160. session.commit()
  161. session.close()
  162. return True
  163. def delete(self, session: Session, obj_id: int):
  164. db_obj = session.get(self.model, obj_id)
  165. session.delete(db_obj)
  166. session.commit()
  167. session.close()
  168. def SqlQuery():
  169. return next(__get_session())
  170. # 使用示例
  171. if __name__ == "__main__":
  172. pass
  173. # 使用 next 函数从生成器中获取 Session 对象
  174. # session = SqlQuery()
  175. # 创建 CRUD 实例
  176. # device_config_crud = CRUD(DeviceConfig)
  177. # 创建新记录
  178. # new_device_config = DeviceConfig(
  179. # mode_type="example_mode",
  180. # execution_type="example_execution",
  181. # action_name="example_action",
  182. # action_index=1,
  183. # picture_index=1,
  184. # camera_height=100,
  185. # camera_angle=45.5,
  186. # number_focus=2,
  187. # take_picture=True,
  188. # turntable_position=10.0,
  189. # turntable_angle=30.5,
  190. # shoe_upturn=False,
  191. # pre_delay=1.5,
  192. # after_delay=2.5,
  193. # led_switch=True,
  194. # is_wait=False,
  195. # )
  196. # created_device_config = device_config_crud.create(session, new_device_config)
  197. # print(f"Created Device Config: {created_device_config}")
  198. # 读取记录
  199. # read_device = device_config_crud.read(session, 1)
  200. # print(f"Read Device Config: {read_device.model_dump()}")
  201. # 读取所有记录
  202. # all_devices = device_config_crud.read_all(session, conditions={"id": 2})
  203. # print(f"All Device Configs: {[device.model_dump() for device in all_devices]}")
  204. # # 更新记录
  205. # updated_device = device_config_crud.update(
  206. # session, created_device_config.id, mode_type="updated_mode"
  207. # )
  208. # print(f"Updated Device Config: {updated_device}")
  209. # # 删除记录
  210. # device_config_crud.delete(session, created_device_config.id)
  211. # print("Device Config deleted.")