databases.py 7.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268
  1. from networkx.algorithms.components import connected
  2. from sqlmodel import Field, Session, SQLModel, create_engine, select
  3. from typing import Dict
  4. from datetime import datetime
  5. from typing import Optional
  6. import json
  7. from sqlalchemy import and_, desc, asc
  8. from sqlalchemy.dialects import sqlite
  9. from model import DeviceConfig, PhotoRecord, SysConfigs, DeviceConfigTabs
  10. # 创建SQLite数据库引擎
  11. sqlite_file_name = "database.db"
  12. sqlite_url = f"sqlite:///{sqlite_file_name}"
  13. engine = create_engine(
  14. sqlite_url,
  15. echo=False,
  16. pool_size=10,
  17. max_overflow=20,
  18. pool_timeout=30,
  19. pool_recycle=1800,
  20. )
  21. # 创建表
  22. def create_all_database():
  23. SQLModel.metadata.create_all(engine)
  24. # 创建会话
  25. def __get_session():
  26. with Session(engine) as session:
  27. try:
  28. yield session
  29. finally:
  30. session.close()
  31. def batch_insert_device_configs(session: Session, action_tabs: list, data_list: list):
  32. """批量插入数据到设备配置表"""
  33. for idx, tab in enumerate(action_tabs):
  34. crud = CRUD(DeviceConfigTabs)
  35. device_tab = DeviceConfigTabs(
  36. mode_type=tab.get("mode_type"),
  37. mode_name=tab.get("mode_name"),
  38. )
  39. create_obj = crud.create(session, obj_in=device_tab)
  40. for data in data_list:
  41. data["tab_id"] = create_obj.id
  42. data["is_system"] = False
  43. if idx in [0, 6]:
  44. data["is_system"] = True
  45. device_config = DeviceConfig(**data)
  46. session.add(device_config)
  47. session.commit()
  48. session.close()
  49. def batch_insert_sys_configs(session: Session, data_list: list):
  50. """批量插入数据到设备配置表"""
  51. for data in data_list:
  52. config = SysConfigs(**data)
  53. session.add(config)
  54. session.commit()
  55. session.close()
  56. def insert_photo_records(
  57. image_deal_mode: int, goods_art_no: str, image_index: int, action_id: int
  58. ):
  59. session = SqlQuery()
  60. """批量插入数据到照片记录"""
  61. data = {
  62. "image_deal_mode": image_deal_mode,
  63. "goods_art_no": goods_art_no,
  64. "image_index": image_index,
  65. "action_id": action_id,
  66. }
  67. device_config = PhotoRecord(**data)
  68. session.add(device_config)
  69. session.commit()
  70. session.close()
  71. return True
  72. # 创建一个通用的 CRUD 类
  73. class CRUD:
  74. def __init__(self, model):
  75. self.model = model
  76. def create(self, session: Session, obj_in):
  77. obj_in_data = dict(obj_in)
  78. db_obj = self.model(**obj_in_data)
  79. session.add(db_obj)
  80. session.commit()
  81. session.refresh(db_obj)
  82. session.close()
  83. return db_obj
  84. def read(
  85. self,
  86. session: Session,
  87. conditions: Optional[Dict] = None,
  88. order_by: Optional[str] = None,
  89. ascending: bool = True,
  90. ):
  91. query = select(self.model)
  92. if conditions:
  93. query = query.where(
  94. and_(
  95. *(
  96. getattr(self.model, key) == value
  97. for key, value in conditions.items()
  98. )
  99. )
  100. )
  101. if order_by:
  102. if ascending:
  103. query = query.order_by(asc(getattr(self.model, order_by)))
  104. else:
  105. query = query.order_by(desc(getattr(self.model, order_by)))
  106. data = session.exec(query).first()
  107. session.close()
  108. return data
  109. def read_all(
  110. self,
  111. session: Session,
  112. conditions: Optional[Dict] = None,
  113. order_by: Optional[str] = None,
  114. ascending: bool = True,
  115. ):
  116. query = select(self.model)
  117. if conditions:
  118. for key, value in conditions.items():
  119. column = getattr(self.model, key)
  120. if isinstance(value, list):
  121. # 如果值是列表,使用 IN 查询
  122. query = query.where(column.in_(value))
  123. else:
  124. # 否则使用等于条件
  125. query = query.where(column == value)
  126. if order_by:
  127. if ascending:
  128. query = query.order_by(asc(getattr(self.model, order_by)))
  129. else:
  130. query = query.order_by(desc(getattr(self.model, order_by)))
  131. data = session.exec(query).all()
  132. session.close()
  133. return data
  134. def update(self, session: Session, obj_id: int, **kwargs):
  135. db_obj = session.get(self.model, obj_id)
  136. for key, value in kwargs.items():
  137. # if value == None or value == "":
  138. # continue
  139. setattr(db_obj, key, value)
  140. session.commit()
  141. session.refresh(db_obj)
  142. session.close()
  143. return db_obj
  144. def updateConditions(
  145. self, session: Session, conditions: Optional[Dict] = None, **kwargs
  146. ):
  147. query = select(self.model)
  148. if conditions:
  149. query = query.where(
  150. and_(
  151. *(
  152. getattr(self.model, key) == value
  153. for key, value in conditions.items()
  154. )
  155. )
  156. )
  157. data = session.exec(query).first()
  158. for key, value in kwargs.items():
  159. if value == None or value == "":
  160. continue
  161. setattr(data, key, value)
  162. session.commit()
  163. session.refresh(data)
  164. session.close()
  165. return data
  166. def deleteConditions(
  167. self,
  168. session: Session,
  169. conditions: Optional[Dict] = None,
  170. ):
  171. query = select(self.model)
  172. if conditions == None:
  173. return False
  174. query = query.where(
  175. and_(
  176. *(
  177. getattr(self.model, key) == value
  178. for key, value in conditions.items()
  179. )
  180. )
  181. )
  182. objects_to_delete = session.exec(query).all()
  183. for obj in objects_to_delete:
  184. session.delete(obj)
  185. session.commit()
  186. session.close()
  187. return True
  188. def delete(self, session: Session, obj_id: int):
  189. db_obj = session.get(self.model, obj_id)
  190. session.delete(db_obj)
  191. session.commit()
  192. session.close()
  193. def SqlQuery():
  194. return next(__get_session())
  195. # 使用示例
  196. if __name__ == "__main__":
  197. pass
  198. # 使用 next 函数从生成器中获取 Session 对象
  199. # session = SqlQuery()
  200. # 创建 CRUD 实例
  201. # device_config_crud = CRUD(DeviceConfig)
  202. # 创建新记录
  203. # new_device_config = DeviceConfig(
  204. # mode_type="example_mode",
  205. # execution_type="example_execution",
  206. # action_name="example_action",
  207. # action_index=1,
  208. # picture_index=1,
  209. # camera_height=100,
  210. # camera_angle=45.5,
  211. # number_focus=2,
  212. # take_picture=True,
  213. # turntable_position=10.0,
  214. # turntable_angle=30.5,
  215. # shoe_upturn=False,
  216. # pre_delay=1.5,
  217. # after_delay=2.5,
  218. # led_switch=True,
  219. # is_wait=False,
  220. # )
  221. # created_device_config = device_config_crud.create(session, new_device_config)
  222. # print(f"Created Device Config: {created_device_config}")
  223. # 读取记录
  224. # read_device = device_config_crud.read(session, 1)
  225. # print(f"Read Device Config: {read_device.model_dump()}")
  226. # 读取所有记录
  227. # all_devices = device_config_crud.read_all(session, conditions={"id": 2})
  228. # print(f"All Device Configs: {[device.model_dump() for device in all_devices]}")
  229. # # 更新记录
  230. # updated_device = device_config_crud.update(
  231. # session, created_device_config.id, mode_type="updated_mode"
  232. # )
  233. # print(f"Updated Device Config: {updated_device}")
  234. # # 删除记录
  235. # device_config_crud.delete(session, created_device_config.id)
  236. # print("Device Config deleted.")