databases.py 8.4 KB


  1. from networkx.algorithms.components import connected
  2. from sqlmodel import Field, Session, SQLModel, create_engine, select
  3. from typing import Dict
  4. from datetime import datetime
  5. from typing import Optional
  6. import json
  7. from sqlalchemy import and_, desc, asc
  8. from utils.utils_func import check_path
  9. from sqlalchemy.dialects import sqlite
  10. from model import DeviceConfig, PhotoRecord, SysConfigs, DeviceConfigTabs
  11. check_path("C:/Zhihuiyin")
  12. # 创建SQLite数据库引擎
  13. sqlite_file_name = "C:/Zhihuiyin/database.db"
  14. sqlite_url = f"sqlite:///{sqlite_file_name}"
  15. engine = create_engine(
  16. sqlite_url,
  17. echo=False,
  18. pool_size=10,
  19. max_overflow=20,
  20. pool_timeout=30,
  21. pool_recycle=1800,
  22. )
  23. # 创建表
  24. def create_all_database():
  25. SQLModel.metadata.create_all(engine)
  26. # 创建会话
  27. def __get_session():
  28. with Session(engine) as session:
  29. try:
  30. yield session
  31. finally:
  32. session.close()
  33. def batch_insert_device_configs(session: Session, action_tabs: list, data_list: list):
  34. """批量插入数据到设备配置表"""
  35. for idx, tab in enumerate(action_tabs):
  36. crud = CRUD(DeviceConfigTabs)
  37. device_tab = DeviceConfigTabs(
  38. mode_type=tab.get("mode_type"),
  39. mode_name=tab.get("mode_name"),
  40. )
  41. create_obj = crud.create(session, obj_in=device_tab)
  42. for data in data_list:
  43. data["tab_id"] = create_obj.id
  44. data["is_system"] = False
  45. if idx in [0, 6]:
  46. data["is_system"] = True
  47. device_config = DeviceConfig(**data)
  48. session.add(device_config)
  49. session.commit()
  50. session.close()
  51. def batch_insert_sys_configs(session: Session, data_list: list):
  52. """批量插入数据到设备配置表"""
  53. for data in data_list:
  54. config = SysConfigs(**data)
  55. session.add(config)
  56. session.commit()
  57. session.close()
  58. def insert_photo_records(
  59. image_deal_mode: int, goods_art_no: str, image_index: int, action_id: int
  60. ):
  61. session = SqlQuery()
  62. """批量插入数据到照片记录"""
  63. data = {
  64. "image_deal_mode": image_deal_mode,
  65. "goods_art_no": goods_art_no,
  66. "image_index": image_index,
  67. "action_id": action_id,
  68. }
  69. device_config = PhotoRecord(**data)
  70. session.add(device_config)
  71. session.commit()
  72. session.close()
  73. return True
  74. # 创建一个通用的 CRUD 类
  75. class CRUD:
  76. def __init__(self, model):
  77. self.model = model
  78. def create(self, session: Session, obj_in):
  79. obj_in_data = dict(obj_in)
  80. db_obj = self.model(**obj_in_data)
  81. session.add(db_obj)
  82. session.commit()
  83. session.refresh(db_obj)
  84. session.close()
  85. return db_obj
  86. def read(
  87. self,
  88. session: Session,
  89. conditions: Optional[Dict] = None,
  90. order_by: Optional[str] = None,
  91. ascending: bool = True,
  92. ):
  93. query = select(self.model)
  94. if conditions:
  95. query = query.where(
  96. and_(
  97. *(
  98. getattr(self.model, key) == value
  99. for key, value in conditions.items()
  100. )
  101. )
  102. )
  103. if order_by:
  104. if ascending:
  105. query = query.order_by(asc(getattr(self.model, order_by)))
  106. else:
  107. query = query.order_by(desc(getattr(self.model, order_by)))
  108. data = session.exec(query).first()
  109. session.close()
  110. return data
  111. def read_all(
  112. self,
  113. session: Session,
  114. conditions: Optional[Dict] = None,
  115. order_by: Optional[str] = None,
  116. ascending: bool = True,
  117. join_conditions: Optional[list] = None, # 新增:支持多个JOIN
  118. ):
  119. query = select(self.model)
  120. # 处理JOIN逻辑
  121. if join_conditions:
  122. for join_info in join_conditions:
  123. joined_model = join_info.get("model")
  124. on_clause = join_info.get("on")
  125. is_outer = join_info.get("is_outer", False)
  126. if not joined_model or not on_clause:
  127. continue
  128. if is_outer:
  129. query = query.outerjoin(joined_model, on_clause)
  130. else:
  131. query = query.join(joined_model, on_clause)
  132. if conditions:
  133. for key, value in conditions.items():
  134. column = getattr(self.model, key)
  135. if isinstance(value, list):
  136. # 如果值是列表,使用 IN 查询
  137. query = query.where(column.in_(value))
  138. else:
  139. # 否则使用等于条件
  140. query = query.where(column == value)
  141. if order_by:
  142. if ascending:
  143. query = query.order_by(asc(getattr(self.model, order_by)))
  144. else:
  145. query = query.order_by(desc(getattr(self.model, order_by)))
  146. data = session.exec(query).all()
  147. session.close()
  148. return data
  149. def update(self, session: Session, obj_id: int, **kwargs):
  150. db_obj = session.get(self.model, obj_id)
  151. for key, value in kwargs.items():
  152. # if value == None or value == "":
  153. # continue
  154. setattr(db_obj, key, value)
  155. session.commit()
  156. session.refresh(db_obj)
  157. session.close()
  158. return db_obj
  159. def updateConditions(
  160. self, session: Session, conditions: Optional[Dict] = None, **kwargs
  161. ):
  162. query = select(self.model)
  163. if conditions:
  164. query = query.where(
  165. and_(
  166. *(
  167. getattr(self.model, key) == value
  168. for key, value in conditions.items()
  169. )
  170. )
  171. )
  172. data = session.exec(query).first()
  173. for key, value in kwargs.items():
  174. if value == None or value == "":
  175. continue
  176. setattr(data, key, value)
  177. session.commit()
  178. session.refresh(data)
  179. session.close()
  180. return data
  181. def deleteConditions(
  182. self,
  183. session: Session,
  184. conditions: Optional[Dict] = None,
  185. ):
  186. query = select(self.model)
  187. if conditions == None:
  188. return False
  189. query = query.where(
  190. and_(
  191. *(
  192. getattr(self.model, key) == value
  193. for key, value in conditions.items()
  194. )
  195. )
  196. )
  197. objects_to_delete = session.exec(query).all()
  198. for obj in objects_to_delete:
  199. session.delete(obj)
  200. session.commit()
  201. session.close()
  202. return True
  203. def delete(self, session: Session, obj_id: int):
  204. db_obj = session.get(self.model, obj_id)
  205. session.delete(db_obj)
  206. session.commit()
  207. session.close()
  208. def SqlQuery():
  209. return next(__get_session())
  210. # 使用示例
  211. if __name__ == "__main__":
  212. pass
  213. # 使用 next 函数从生成器中获取 Session 对象
  214. # session = SqlQuery()
  215. # 创建 CRUD 实例
  216. # device_config_crud = CRUD(DeviceConfig)
  217. # 创建新记录
  218. # new_device_config = DeviceConfig(
  219. # mode_type="example_mode",
  220. # execution_type="example_execution",
  221. # action_name="example_action",
  222. # action_index=1,
  223. # picture_index=1,
  224. # camera_height=100,
  225. # camera_angle=45.5,
  226. # number_focus=2,
  227. # take_picture=True,
  228. # turntable_position=10.0,
  229. # turntable_angle=30.5,
  230. # shoe_upturn=False,
  231. # pre_delay=1.5,
  232. # after_delay=2.5,
  233. # led_switch=True,
  234. # is_wait=False,
  235. # )
  236. # created_device_config = device_config_crud.create(session, new_device_config)
  237. # print(f"Created Device Config: {created_device_config}")
  238. # 读取记录
  239. # read_device = device_config_crud.read(session, 1)
  240. # print(f"Read Device Config: {read_device.model_dump()}")
  241. # 读取所有记录
  242. # all_devices = device_config_crud.read_all(session, conditions={"id": 2})
  243. # print(f"All Device Configs: {[device.model_dump() for device in all_devices]}")
  244. # # 更新记录
  245. # updated_device = device_config_crud.update(
  246. # session, created_device_config.id, mode_type="updated_mode"
  247. # )
  248. # print(f"Updated Device Config: {updated_device}")
  249. # # 删除记录
  250. # device_config_crud.delete(session, created_device_config.id)
  251. # print("Device Config deleted.")