databases.py 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237
  1. from networkx.algorithms.components import connected
  2. from sqlmodel import Field, Session, SQLModel, create_engine, select
  3. from typing import Dict
  4. from datetime import datetime
  5. from typing import Optional
  6. import json
  7. from sqlalchemy import and_, asc,desc
  8. from model import DeviceConfig, PhotoRecord, SysConfigs
  9. # 创建SQLite数据库引擎
  10. sqlite_file_name = "database.db"
  11. sqlite_url = f"sqlite:///{sqlite_file_name}"
  12. engine = create_engine(
  13. sqlite_url,
  14. echo=False,
  15. pool_size=10,
  16. max_overflow=20,
  17. pool_timeout=30,
  18. pool_recycle=1800,
  19. )
  20. # 创建表
  21. def create_all_database():
  22. SQLModel.metadata.create_all(engine)
  23. # 创建会话
  24. def __get_session():
  25. with Session(engine) as session:
  26. try:
  27. yield session
  28. finally:
  29. session.close()
  30. def batch_insert_device_configs(session: Session, data_list: list):
  31. '''批量插入数据到设备配置表'''
  32. for data in data_list:
  33. device_config = DeviceConfig(**data)
  34. session.add(device_config)
  35. session.commit()
  36. session.close()
  37. def batch_insert_sys_configs(session: Session, data_list: list):
  38. """批量插入数据到设备配置表"""
  39. for data in data_list:
  40. config = SysConfigs(**data)
  41. session.add(config)
  42. session.commit()
  43. session.close()
  44. def insert_photo_records(image_deal_mode: int, goods_art_no: str, image_index:int):
  45. session = SqlQuery()
  46. """批量插入数据到照片记录"""
  47. data = {
  48. "image_deal_mode": image_deal_mode,
  49. "goods_art_no": goods_art_no,
  50. "image_index": image_index,
  51. }
  52. device_config = PhotoRecord(**data)
  53. session.add(device_config)
  54. session.commit()
  55. session.close()
  56. return True
  57. # 创建一个通用的 CRUD 类
  58. class CRUD:
  59. def __init__(self, model):
  60. self.model = model
  61. def create(self, session: Session, obj_in):
  62. obj_in_data = dict(obj_in)
  63. db_obj = self.model(**obj_in_data)
  64. session.add(db_obj)
  65. session.commit()
  66. session.refresh(db_obj)
  67. session.close()
  68. return db_obj
  69. def read(
  70. self,
  71. session: Session,
  72. conditions: Optional[Dict] = None,
  73. order_by: Optional[str] = None,
  74. ascending: bool = True,
  75. ):
  76. query = select(self.model)
  77. if conditions:
  78. query = query.where(and_(*(getattr(self.model, key) == value for key, value in conditions.items())))
  79. if order_by:
  80. if ascending:
  81. query = query.order_by(asc(getattr(self.model, order_by)))
  82. else:
  83. query = query.order_by(desc(getattr(self.model, order_by)))
  84. data = session.exec(query).first()
  85. session.close()
  86. return data
  87. def read_all(
  88. self,
  89. session: Session,
  90. conditions: Optional[Dict] = None,
  91. order_by: Optional[str] = None,
  92. ascending: bool = True,
  93. ):
  94. query = select(self.model)
  95. if conditions:
  96. query = query.where(and_(*(getattr(self.model, key) == value for key, value in conditions.items())))
  97. if order_by:
  98. if ascending:
  99. query = query.order_by(asc(getattr(self.model, order_by)))
  100. else:
  101. query = query.order_by(desc(getattr(self.model, order_by)))
  102. data = session.exec(query).all()
  103. session.close()
  104. return data
  105. def update(self, session: Session, obj_id: int, **kwargs):
  106. db_obj = session.get(self.model, obj_id)
  107. for key, value in kwargs.items():
  108. if value == None or value =="":
  109. continue
  110. setattr(db_obj, key, value)
  111. session.commit()
  112. session.refresh(db_obj)
  113. session.close()
  114. return db_obj
  115. def updateConditions(
  116. self,
  117. session: Session,
  118. conditions: Optional[Dict] = None,
  119. **kwargs
  120. ):
  121. query = select(self.model)
  122. if conditions:
  123. query = query.where(
  124. and_(
  125. *(
  126. getattr(self.model, key) == value
  127. for key, value in conditions.items()
  128. )
  129. )
  130. )
  131. data = session.exec(query).first()
  132. for key, value in kwargs.items():
  133. if value == None or value == "":
  134. continue
  135. setattr(data, key, value)
  136. session.commit()
  137. session.refresh(data)
  138. session.close()
  139. return data
  140. def deleteConditions(
  141. self,
  142. session: Session,
  143. conditions: Optional[Dict] = None,
  144. ):
  145. query = select(self.model)
  146. if conditions == None:
  147. return False
  148. query = query.where(
  149. and_(
  150. *(
  151. getattr(self.model, key) == value
  152. for key, value in conditions.items()
  153. )
  154. )
  155. )
  156. objects_to_delete = session.exec(query).all()
  157. for obj in objects_to_delete:
  158. session.delete(obj)
  159. session.commit()
  160. session.close()
  161. return True
  162. def delete(self, session: Session, obj_id: int):
  163. db_obj = session.get(self.model, obj_id)
  164. session.delete(db_obj)
  165. session.commit()
  166. session.close()
  167. def SqlQuery():
  168. return next(__get_session())
  169. # 使用示例
  170. if __name__ == "__main__":
  171. pass
  172. # 使用 next 函数从生成器中获取 Session 对象
  173. # session = SqlQuery()
  174. # 创建 CRUD 实例
  175. # device_config_crud = CRUD(DeviceConfig)
  176. # 创建新记录
  177. # new_device_config = DeviceConfig(
  178. # mode_type="example_mode",
  179. # execution_type="example_execution",
  180. # action_name="example_action",
  181. # action_index=1,
  182. # picture_index=1,
  183. # camera_height=100,
  184. # camera_angle=45.5,
  185. # number_focus=2,
  186. # take_picture=True,
  187. # turntable_position=10.0,
  188. # turntable_angle=30.5,
  189. # shoe_upturn=False,
  190. # pre_delay=1.5,
  191. # after_delay=2.5,
  192. # led_switch=True,
  193. # is_wait=False,
  194. # )
  195. # created_device_config = device_config_crud.create(session, new_device_config)
  196. # print(f"Created Device Config: {created_device_config}")
  197. # 读取记录
  198. # read_device = device_config_crud.read(session, 1)
  199. # print(f"Read Device Config: {read_device.model_dump()}")
  200. # 读取所有记录
  201. # all_devices = device_config_crud.read_all(session, conditions={"id": 2})
  202. # print(f"All Device Configs: {[device.model_dump() for device in all_devices]}")
  203. # # 更新记录
  204. # updated_device = device_config_crud.update(
  205. # session, created_device_config.id, mode_type="updated_mode"
  206. # )
  207. # print(f"Updated Device Config: {updated_device}")
  208. # # 删除记录
  209. # device_config_crud.delete(session, created_device_config.id)
  210. # print("Device Config deleted.")