databases.py 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182
  1. from networkx.algorithms.components import connected
  2. from sqlmodel import Field, Session, SQLModel, create_engine, select
  3. from typing import Dict
  4. from datetime import datetime
  5. from typing import Optional
  6. import json
  7. from sqlalchemy import and_, asc,desc
  8. from model import DeviceConfig, PhotoRecord
  9. # 创建SQLite数据库引擎
  10. sqlite_file_name = "database.db"
  11. sqlite_url = f"sqlite:///{sqlite_file_name}"
  12. engine = create_engine(sqlite_url, echo=True)
  13. # 创建表
  14. def create_all_database():
  15. SQLModel.metadata.create_all(engine)
  16. # 创建会话
  17. def __get_session():
  18. with Session(engine) as session:
  19. yield session
  20. def batch_insert_device_configs(session: Session, data_list: list):
  21. '''批量插入数据到设备配置表'''
  22. for data in data_list:
  23. device_config = DeviceConfig(**data)
  24. session.add(device_config)
  25. session.commit()
  26. def insert_photo_records(image_deal_mode: int, goods_art_no: str, image_index:int):
  27. session = SqlQuery()
  28. """批量插入数据到照片记录"""
  29. data = {
  30. "image_deal_mode": image_deal_mode,
  31. "goods_art_no": goods_art_no,
  32. "image_index": image_index,
  33. }
  34. device_config = PhotoRecord(**data)
  35. session.add(device_config)
  36. session.commit()
  37. return True
  38. # 创建一个通用的 CRUD 类
  39. class CRUD:
  40. def __init__(self, model):
  41. self.model = model
  42. def create(self, session: Session, obj_in):
  43. obj_in_data = dict(obj_in)
  44. db_obj = self.model(**obj_in_data)
  45. session.add(db_obj)
  46. session.commit()
  47. session.refresh(db_obj)
  48. return db_obj
  49. def read(
  50. self,
  51. session: Session,
  52. conditions: Optional[Dict] = None,
  53. order_by: Optional[str] = None,
  54. ascending: bool = True,
  55. ):
  56. query = select(self.model)
  57. if conditions:
  58. query = query.where(and_(*(getattr(self.model, key) == value for key, value in conditions.items())))
  59. if order_by:
  60. if ascending:
  61. query = query.order_by(asc(getattr(self.model, order_by)))
  62. else:
  63. query = query.order_by(desc(getattr(self.model, order_by)))
  64. return session.exec(query).first()
  65. def read_all(
  66. self,
  67. session: Session,
  68. conditions: Optional[Dict] = None,
  69. order_by: Optional[str] = None,
  70. ascending: bool = True,
  71. ):
  72. query = select(self.model)
  73. if conditions:
  74. query = query.where(and_(*(getattr(self.model, key) == value for key, value in conditions.items())))
  75. if order_by:
  76. if ascending:
  77. query = query.order_by(asc(getattr(self.model, order_by)))
  78. else:
  79. query = query.order_by(desc(getattr(self.model, order_by)))
  80. return session.exec(query).all()
  81. def update(self, session: Session, obj_id: int, **kwargs):
  82. db_obj = session.get(self.model, obj_id)
  83. for key, value in kwargs.items():
  84. if value == None or value =="":
  85. continue
  86. setattr(db_obj, key, value)
  87. session.commit()
  88. session.refresh(db_obj)
  89. return db_obj
  90. def deleteConditions(
  91. self,
  92. session: Session,
  93. conditions: Optional[Dict] = None,
  94. ):
  95. query = select(self.model)
  96. if conditions == None:
  97. return False
  98. query = query.where(
  99. and_(
  100. *(
  101. getattr(self.model, key) == value
  102. for key, value in conditions.items()
  103. )
  104. )
  105. )
  106. objects_to_delete = session.exec(query).all()
  107. for obj in objects_to_delete:
  108. session.delete(obj)
  109. session.commit()
  110. return True
  111. def delete(self, session: Session, obj_id: int):
  112. db_obj = session.get(self.model, obj_id)
  113. session.delete(db_obj)
  114. session.commit()
  115. def SqlQuery():
  116. return next(__get_session())
  117. # 使用示例
  118. if __name__ == "__main__":
  119. pass
  120. # 使用 next 函数从生成器中获取 Session 对象
  121. # session = SqlQuery()
  122. # 创建 CRUD 实例
  123. # device_config_crud = CRUD(DeviceConfig)
  124. # 创建新记录
  125. # new_device_config = DeviceConfig(
  126. # mode_type="example_mode",
  127. # execution_type="example_execution",
  128. # action_name="example_action",
  129. # action_index=1,
  130. # picture_index=1,
  131. # camera_height=100,
  132. # camera_angle=45.5,
  133. # number_focus=2,
  134. # take_picture=True,
  135. # turntable_position=10.0,
  136. # turntable_angle=30.5,
  137. # shoe_upturn=False,
  138. # pre_delay=1.5,
  139. # after_delay=2.5,
  140. # led_switch=True,
  141. # is_wait=False,
  142. # )
  143. # created_device_config = device_config_crud.create(session, new_device_config)
  144. # print(f"Created Device Config: {created_device_config}")
  145. # 读取记录
  146. # read_device = device_config_crud.read(session, 1)
  147. # print(f"Read Device Config: {read_device.model_dump()}")
  148. # 读取所有记录
  149. # all_devices = device_config_crud.read_all(session, conditions={"id": 2})
  150. # print(f"All Device Configs: {[device.model_dump() for device in all_devices]}")
  151. # # 更新记录
  152. # updated_device = device_config_crud.update(
  153. # session, created_device_config.id, mode_type="updated_mode"
  154. # )
  155. # print(f"Updated Device Config: {updated_device}")
  156. # # 删除记录
  157. # device_config_crud.delete(session, created_device_config.id)
  158. # print("Device Config deleted.")