databases.py 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178
  1. from networkx.algorithms.components import connected
  2. from sqlmodel import Field, Session, SQLModel, create_engine, select
  3. from typing import Dict
  4. from datetime import datetime
  5. from typing import Optional
  6. import json
  7. from sqlalchemy import and_, asc,desc
  8. from model import DeviceConfig, PhotoRecord
  9. # 创建SQLite数据库引擎
  10. sqlite_file_name = "database.db"
  11. sqlite_url = f"sqlite:///{sqlite_file_name}"
  12. engine = create_engine(sqlite_url, echo=True)
  13. # 创建表
  14. def create_all_database():
  15. SQLModel.metadata.create_all(engine)
  16. # 创建会话
  17. def __get_session():
  18. with Session(engine) as session:
  19. yield session
  20. def batch_insert_device_configs(session: Session, data_list: list):
  21. '''批量插入数据到设备配置表'''
  22. for data in data_list:
  23. device_config = DeviceConfig(**data)
  24. session.add(device_config)
  25. session.commit()
  26. def insert_photo_records(image_deal_mode:int,goods_art_no: str):
  27. session = SqlQuery()
  28. """批量插入数据到照片记录"""
  29. data = {"image_deal_mode": image_deal_mode, "goods_art_no": goods_art_no}
  30. device_config = PhotoRecord(**data)
  31. session.add(device_config)
  32. session.commit()
  33. return True
  34. # 创建一个通用的 CRUD 类
  35. class CRUD:
  36. def __init__(self, model):
  37. self.model = model
  38. def create(self, session: Session, obj_in):
  39. obj_in_data = dict(obj_in)
  40. db_obj = self.model(**obj_in_data)
  41. session.add(db_obj)
  42. session.commit()
  43. session.refresh(db_obj)
  44. return db_obj
  45. def read(
  46. self,
  47. session: Session,
  48. conditions: Optional[Dict] = None,
  49. order_by: Optional[str] = None,
  50. ascending: bool = True,
  51. ):
  52. query = select(self.model)
  53. if conditions:
  54. query = query.where(and_(*(getattr(self.model, key) == value for key, value in conditions.items())))
  55. if order_by:
  56. if ascending:
  57. query = query.order_by(asc(getattr(self.model, order_by)))
  58. else:
  59. query = query.order_by(desc(getattr(self.model, order_by)))
  60. return session.exec(query).first()
  61. def read_all(
  62. self,
  63. session: Session,
  64. conditions: Optional[Dict] = None,
  65. order_by: Optional[str] = None,
  66. ascending: bool = True,
  67. ):
  68. query = select(self.model)
  69. if conditions:
  70. query = query.where(and_(*(getattr(self.model, key) == value for key, value in conditions.items())))
  71. if order_by:
  72. if ascending:
  73. query = query.order_by(asc(getattr(self.model, order_by)))
  74. else:
  75. query = query.order_by(desc(getattr(self.model, order_by)))
  76. return session.exec(query).all()
  77. def update(self, session: Session, obj_id: int, **kwargs):
  78. db_obj = session.get(self.model, obj_id)
  79. for key, value in kwargs.items():
  80. if value == None or value =="":
  81. continue
  82. setattr(db_obj, key, value)
  83. session.commit()
  84. session.refresh(db_obj)
  85. return db_obj
  86. def deleteConditions(
  87. self,
  88. session: Session,
  89. conditions: Optional[Dict] = None,
  90. ):
  91. query = select(self.model)
  92. if conditions == None:
  93. return False
  94. query = query.where(
  95. and_(
  96. *(
  97. getattr(self.model, key) == value
  98. for key, value in conditions.items()
  99. )
  100. )
  101. )
  102. objects_to_delete = session.exec(query).all()
  103. for obj in objects_to_delete:
  104. session.delete(obj)
  105. session.commit()
  106. return True
  107. def delete(self, session: Session, obj_id: int):
  108. db_obj = session.get(self.model, obj_id)
  109. session.delete(db_obj)
  110. session.commit()
  111. def SqlQuery():
  112. return next(__get_session())
  113. # 使用示例
  114. if __name__ == "__main__":
  115. pass
  116. # 使用 next 函数从生成器中获取 Session 对象
  117. # session = SqlQuery()
  118. # 创建 CRUD 实例
  119. # device_config_crud = CRUD(DeviceConfig)
  120. # 创建新记录
  121. # new_device_config = DeviceConfig(
  122. # mode_type="example_mode",
  123. # execution_type="example_execution",
  124. # action_name="example_action",
  125. # action_index=1,
  126. # picture_index=1,
  127. # camera_height=100,
  128. # camera_angle=45.5,
  129. # number_focus=2,
  130. # take_picture=True,
  131. # turntable_position=10.0,
  132. # turntable_angle=30.5,
  133. # shoe_upturn=False,
  134. # pre_delay=1.5,
  135. # after_delay=2.5,
  136. # led_switch=True,
  137. # is_wait=False,
  138. # )
  139. # created_device_config = device_config_crud.create(session, new_device_config)
  140. # print(f"Created Device Config: {created_device_config}")
  141. # 读取记录
  142. # read_device = device_config_crud.read(session, 1)
  143. # print(f"Read Device Config: {read_device.model_dump()}")
  144. # 读取所有记录
  145. # all_devices = device_config_crud.read_all(session, conditions={"id": 2})
  146. # print(f"All Device Configs: {[device.model_dump() for device in all_devices]}")
  147. # # 更新记录
  148. # updated_device = device_config_crud.update(
  149. # session, created_device_config.id, mode_type="updated_mode"
  150. # )
  151. # print(f"Updated Device Config: {updated_device}")
  152. # # 删除记录
  153. # device_config_crud.delete(session, created_device_config.id)
  154. # print("Device Config deleted.")