databases.py 8.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278
  1. from sqlmodel import Field, Session, SQLModel, create_engine, select
  2. from typing import Dict
  3. from typing import Optional
  4. from sqlalchemy import and_, desc, asc
  5. from utils.utils_func import check_path
  6. from model import DeviceConfig, PhotoRecord, SysConfigs, DeviceConfigTabs
  7. check_path("C:/Zhihuiyin")
  8. # 创建SQLite数据库引擎
  9. sqlite_file_name = "C:/Zhihuiyin/database.db"
  10. sqlite_url = f"sqlite:///{sqlite_file_name}"
  11. engine = create_engine(
  12. sqlite_url,
  13. echo=False,
  14. connect_args={"check_same_thread": False}, # 允许多线程访问
  15. pool_size=10,
  16. max_overflow=20,
  17. pool_timeout=30,
  18. pool_recycle=1800,
  19. )
  20. # 创建表
  21. def create_all_database():
  22. SQLModel.metadata.create_all(engine)
  23. # 创建会话
  24. def __get_session():
  25. with Session(engine) as session:
  26. try:
  27. yield session
  28. finally:
  29. session.close()
  30. # 创建一个通用的 CRUD 类
  31. class CRUD:
  32. def __init__(self, model):
  33. self.model = model
  34. def create(self, session: Session, obj_in):
  35. obj_in_data = dict(obj_in)
  36. db_obj = self.model(**obj_in_data)
  37. session.add(db_obj)
  38. session.commit()
  39. session.refresh(db_obj)
  40. return db_obj
  41. def read(
  42. self,
  43. session: Session,
  44. conditions: Optional[Dict] = None,
  45. order_by: Optional[str] = None,
  46. ascending: bool = True,
  47. ):
  48. query = select(self.model)
  49. if conditions:
  50. query = query.where(
  51. and_(
  52. *(
  53. getattr(self.model, key) == value
  54. for key, value in conditions.items()
  55. )
  56. )
  57. )
  58. if order_by:
  59. if ascending:
  60. query = query.order_by(asc(getattr(self.model, order_by)))
  61. else:
  62. query = query.order_by(desc(getattr(self.model, order_by)))
  63. data = session.exec(query).first()
  64. return data
  65. def read_all(
  66. self,
  67. session: Session,
  68. conditions: Optional[Dict] = None,
  69. order_by: Optional[str] = None,
  70. ascending: bool = True,
  71. join_conditions: Optional[list] = None, # 新增:支持多个JOIN
  72. ):
  73. query = select(self.model)
  74. # 处理JOIN逻辑
  75. if join_conditions:
  76. for join_info in join_conditions:
  77. joined_model = join_info.get("model")
  78. on_clause = join_info.get("on")
  79. is_outer = join_info.get("is_outer", False)
  80. if not joined_model or not on_clause:
  81. continue
  82. if is_outer:
  83. query = query.outerjoin(joined_model, on_clause)
  84. else:
  85. query = query.join(joined_model, on_clause)
  86. if conditions:
  87. for key, value in conditions.items():
  88. column = getattr(self.model, key)
  89. if isinstance(value, list):
  90. # 如果值是列表,使用 IN 查询
  91. query = query.where(column.in_(value))
  92. else:
  93. # 否则使用等于条件
  94. query = query.where(column == value)
  95. if order_by:
  96. if ascending:
  97. query = query.order_by(asc(getattr(self.model, order_by)))
  98. else:
  99. query = query.order_by(desc(getattr(self.model, order_by)))
  100. data = session.exec(query).all()
  101. return data
  102. def update(self, session: Session, obj_id: int, **kwargs):
  103. db_obj = session.get(self.model, obj_id)
  104. for key, value in kwargs.items():
  105. setattr(db_obj, key, value)
  106. session.commit()
  107. session.refresh(db_obj)
  108. return db_obj
  109. def deleteConditions(
  110. self,
  111. session: Session,
  112. conditions: Optional[Dict] = None,
  113. ):
  114. query = select(self.model)
  115. if conditions == None:
  116. return False
  117. query = query.where(
  118. and_(
  119. *(
  120. getattr(self.model, key) == value
  121. for key, value in conditions.items()
  122. )
  123. )
  124. )
  125. objects_to_delete = session.exec(query).all()
  126. for obj in objects_to_delete:
  127. session.delete(obj)
  128. session.commit()
  129. # session.refresh()
  130. return True
  131. def delete(self, session: Session, obj_id: int):
  132. db_obj = session.get(self.model, obj_id)
  133. session.delete(db_obj)
  134. session.commit()
  135. # session.refresh()
  136. # 恢复 updateConditions 方法
  137. def updateConditions(self, session: Session, conditions: Dict, **kwargs):
  138. """
  139. 根据条件更新记录
  140. :param session: 数据库会话
  141. :param conditions: 更新条件字典
  142. :param kwargs: 需要更新的字段和值
  143. :return: 更新后的对象
  144. """
  145. query = select(self.model).where(
  146. and_(
  147. *(
  148. getattr(self.model, key) == value
  149. for key, value in conditions.items()
  150. )
  151. )
  152. )
  153. result = session.exec(query).first()
  154. if result:
  155. for key, value in kwargs.items():
  156. setattr(result, key, value)
  157. session.commit() # 提交事务以保存更改
  158. return result
  159. return None
  160. # 批量插入数据到设备配置表
  161. def batch_insert_device_configs(session: Session, action_tabs: list, data_list: list):
  162. """批量插入数据到设备配置表"""
  163. for idx, tab in enumerate(action_tabs):
  164. crud = CRUD(DeviceConfigTabs)
  165. device_tab = DeviceConfigTabs(
  166. mode_type=tab.get("mode_type"),
  167. mode_name=tab.get("mode_name"),
  168. )
  169. create_obj = crud.create(session, obj_in=device_tab)
  170. for data in data_list:
  171. data["tab_id"] = create_obj.id
  172. data["is_system"] = False
  173. if idx in [0, 6]:
  174. data["is_system"] = True
  175. device_config = DeviceConfig(**data)
  176. session.add(device_config)
  177. session.commit() # 合并事务提交
  178. # 批量插入系统配置
  179. def batch_insert_sys_configs(session: Session, data_list: list):
  180. """批量插入数据到设备配置表"""
  181. for data in data_list:
  182. config = SysConfigs(**data)
  183. session.add(config)
  184. session.commit() # 合并事务提交
  185. # 插入照片记录
  186. async def insert_photo_records(
  187. image_deal_mode: int, goods_art_no: str, image_index: int, action_id: int
  188. ):
  189. with SqlQuery() as session: # 使用上下文管理器复用会话
  190. """批量插入数据到照片记录"""
  191. data = {
  192. "image_deal_mode": image_deal_mode,
  193. "goods_art_no": goods_art_no,
  194. "image_index": image_index,
  195. "action_id": action_id,
  196. }
  197. device_config = PhotoRecord(**data)
  198. session.add(device_config)
  199. session.commit()
  200. session.refresh(device_config)
  201. record_id = device_config.id
  202. return True, record_id
  203. def SqlQuery():
  204. return next(__get_session())
  205. # 使用示例
  206. if __name__ == "__main__":
  207. pass
  208. # 使用 next 函数从生成器中获取 Session 对象
  209. # session = SqlQuery()
  210. # 创建 CRUD 实例
  211. # device_config_crud = CRUD(DeviceConfig)
  212. # 创建新记录
  213. # new_device_config = DeviceConfig(
  214. # mode_type="example_mode",
  215. # execution_type="example_execution",
  216. # action_name="example_action",
  217. # action_index=1,
  218. # picture_index=1,
  219. # camera_height=100,
  220. # camera_angle=45.5,
  221. # number_focus=2,
  222. # take_picture=True,
  223. # turntable_position=10.0,
  224. # turntable_angle=30.5,
  225. # shoe_upturn=False,
  226. # pre_delay=1.5,
  227. # after_delay=2.5,
  228. # led_switch=True,
  229. # is_wait=False,
  230. # )
  231. # created_device_config = device_config_crud.create(session, new_device_config)
  232. # print(f"Created Device Config: {created_device_config}")
  233. # 读取记录
  234. # read_device = device_config_crud.read(session, 1)
  235. # print(f"Read Device Config: {read_device.model_dump()}")
  236. # 读取所有记录
  237. # all_devices = device_config_crud.read_all(session, conditions={"id": 2})
  238. # print(f"All Device Configs: {[device.model_dump() for device in all_devices]}")
  239. # # 更新记录
  240. # updated_device = device_config_crud.update(
  241. # session, created_device_config.id, mode_type="updated_mode"
  242. # )
  243. # print(f"Updated Device Config: {updated_device}")
  244. # # 删除记录
  245. # device_config_crud.delete(session, created_device_config.id)
  246. # print("Device Config deleted.")