databases.py 9.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315
  1. from networkx.algorithms.components import connected
  2. from sqlmodel import Field, Session, SQLModel, create_engine, select
  3. from typing import Dict
  4. from datetime import datetime
  5. from typing import Optional
  6. import json
  7. from sqlalchemy import and_, desc, asc, delete
  8. from utils.utils_func import check_path
  9. from sqlalchemy.dialects import sqlite
  10. from model import DeviceConfig, PhotoRecord, SysConfigs, DeviceConfigTabs
  11. check_path("C:/Zhihuiyin")
  12. # 创建SQLite数据库引擎
  13. sqlite_file_name = "C:/Zhihuiyin/database.db"
  14. sqlite_url = f"sqlite:///{sqlite_file_name}"
  15. engine = create_engine(
  16. sqlite_url,
  17. echo=False,
  18. connect_args={"check_same_thread": False}, # 允许多线程访问
  19. pool_size=10,
  20. max_overflow=20,
  21. pool_timeout=30,
  22. pool_recycle=1800,
  23. )
  24. # 创建表
  25. def create_all_database():
  26. SQLModel.metadata.create_all(engine)
  27. # 创建会话
  28. def __get_session():
  29. with Session(engine) as session:
  30. try:
  31. yield session
  32. finally:
  33. session.close()
  34. # 创建一个通用的 CRUD 类
  35. class CRUD:
  36. def __init__(self, model):
  37. self.model = model
  38. def create(self, session: Session, obj_in):
  39. obj_in_data = dict(obj_in)
  40. db_obj = self.model(**obj_in_data)
  41. session.add(db_obj)
  42. session.commit()
  43. session.refresh(db_obj)
  44. return db_obj
  45. def truncate(self, session: Session):
  46. """
  47. 使用SQL删除语句清空所有记录(更高效)
  48. :param session: 数据库会话
  49. """
  50. stmt = delete(self.model)
  51. result = session.exec(stmt)
  52. session.commit()
  53. return result.rowcount
  54. def read(
  55. self,
  56. session: Session,
  57. conditions: Optional[Dict] = None,
  58. order_by: Optional[str] = None,
  59. ascending: bool = True,
  60. ):
  61. query = select(self.model)
  62. if conditions:
  63. query = query.where(
  64. and_(
  65. *(
  66. getattr(self.model, key) == value
  67. for key, value in conditions.items()
  68. )
  69. )
  70. )
  71. if order_by:
  72. if ascending:
  73. query = query.order_by(asc(getattr(self.model, order_by)))
  74. else:
  75. query = query.order_by(desc(getattr(self.model, order_by)))
  76. data = session.exec(query).first()
  77. return data
  78. def read_all(
  79. self,
  80. session: Session,
  81. conditions: Optional[Dict] = None,
  82. order_by: Optional[str] = None,
  83. ascending: bool = True,
  84. join_conditions: Optional[list] = None, # 新增:支持多个JOIN
  85. ):
  86. query = select(self.model)
  87. # 处理JOIN逻辑
  88. if join_conditions:
  89. for join_info in join_conditions:
  90. joined_model = join_info.get("model")
  91. on_clause = join_info.get("on")
  92. is_outer = join_info.get("is_outer", False)
  93. if not joined_model or not on_clause:
  94. continue
  95. if is_outer:
  96. query = query.outerjoin(joined_model, on_clause)
  97. else:
  98. query = query.join(joined_model, on_clause)
  99. if conditions:
  100. for key, value in conditions.items():
  101. column = getattr(self.model, key)
  102. if isinstance(value, list):
  103. # 如果值是列表,使用 IN 查询
  104. query = query.where(column.in_(value))
  105. else:
  106. # 否则使用等于条件
  107. query = query.where(column == value)
  108. if order_by:
  109. if ascending:
  110. query = query.order_by(asc(getattr(self.model, order_by)))
  111. else:
  112. query = query.order_by(desc(getattr(self.model, order_by)))
  113. data = session.exec(query).all()
  114. return data
  115. def update(self, session: Session, obj_id: int, **kwargs):
  116. db_obj = session.get(self.model, obj_id)
  117. for key, value in kwargs.items():
  118. setattr(db_obj, key, value)
  119. session.commit()
  120. session.refresh(db_obj)
  121. return db_obj
  122. def deleteConditions(
  123. self,
  124. session: Session,
  125. conditions: Optional[Dict] = None,
  126. ):
  127. query = select(self.model)
  128. if conditions == None:
  129. return False
  130. query = query.where(
  131. and_(
  132. *(
  133. getattr(self.model, key) == value
  134. for key, value in conditions.items()
  135. )
  136. )
  137. )
  138. objects_to_delete = session.exec(query).all()
  139. for obj in objects_to_delete:
  140. session.delete(obj)
  141. session.commit()
  142. # session.refresh()
  143. return True
  144. def delete(self, session: Session, obj_id: int):
  145. db_obj = session.get(self.model, obj_id)
  146. session.delete(db_obj)
  147. session.commit()
  148. # session.refresh()
  149. # 恢复 updateConditions 方法
  150. def updateConditions(self, session: Session, conditions: Dict, **kwargs):
  151. """
  152. 根据条件更新记录
  153. :param session: 数据库会话
  154. :param conditions: 更新条件字典
  155. :param kwargs: 需要更新的字段和值
  156. :return: 更新后的对象
  157. """
  158. query = select(self.model).where(
  159. and_(
  160. *(
  161. getattr(self.model, key) == value
  162. for key, value in conditions.items()
  163. )
  164. )
  165. )
  166. result = session.exec(query).first()
  167. if result:
  168. for key, value in kwargs.items():
  169. setattr(result, key, value)
  170. session.commit() # 提交事务以保存更改
  171. return result
  172. return None
  173. # 批量插入数据到设备配置表
  174. def batch_insert_device_configs(session: Session, action_tabs: list, data_list: list):
  175. """批量插入数据到设备配置表"""
  176. for idx, tab in enumerate(action_tabs):
  177. crud = CRUD(DeviceConfigTabs)
  178. device_tab = DeviceConfigTabs(
  179. mode_type=tab.get("mode_type"),
  180. mode_name=tab.get("mode_name"),
  181. )
  182. create_obj = crud.create(session, obj_in=device_tab)
  183. for data in data_list:
  184. data["tab_id"] = create_obj.id
  185. data["is_system"] = False
  186. if idx in [0, 6]:
  187. data["is_system"] = True
  188. device_config = DeviceConfig(**data)
  189. session.add(device_config)
  190. session.commit() # 合并事务提交
  191. def batch_insert_device_configsNew(
  192. session: Session, action_tabs: list, data_list: list
  193. ):
  194. """批量插入数据到设备配置表"""
  195. for idx, tab in enumerate(action_tabs):
  196. crud = CRUD(DeviceConfigTabs)
  197. device_tab = DeviceConfigTabs(
  198. id=tab.get("id"),
  199. mode_type=tab.get("mode_type"),
  200. mode_name=tab.get("mode_name"),
  201. )
  202. create_obj = crud.create(session, obj_in=device_tab)
  203. for data in data_list:
  204. # data["tab_id"] = create_obj.id
  205. # data["is_system"] = False
  206. # if idx in [0, 6]:
  207. # data["is_system"] = True
  208. device_config = DeviceConfig(**data)
  209. session.add(device_config)
  210. session.commit() # 合并事务提交
  211. # 批量插入系统配置
  212. def batch_insert_sys_configs(session: Session, data_list: list):
  213. """批量插入数据到设备配置表"""
  214. for data in data_list:
  215. print("data", data, type(data))
  216. config = SysConfigs(**data)
  217. session.add(config)
  218. session.commit() # 合并事务提交
  219. # 插入照片记录
  220. async def insert_photo_records(
  221. image_deal_mode: int, goods_art_no: str, image_index: int, action_id: int
  222. ):
  223. with SqlQuery() as session: # 使用上下文管理器复用会话
  224. """批量插入数据到照片记录"""
  225. data = {
  226. "image_deal_mode": image_deal_mode,
  227. "goods_art_no": goods_art_no,
  228. "image_index": image_index,
  229. "action_id": action_id,
  230. }
  231. device_config = PhotoRecord(**data)
  232. session.add(device_config)
  233. session.commit()
  234. session.refresh(device_config)
  235. record_id = device_config.id
  236. return True, record_id
  237. def SqlQuery():
  238. return next(__get_session())
  239. # 使用示例
  240. if __name__ == "__main__":
  241. pass
  242. # 使用 next 函数从生成器中获取 Session 对象
  243. # session = SqlQuery()
  244. # 创建 CRUD 实例
  245. # device_config_crud = CRUD(DeviceConfig)
  246. # 创建新记录
  247. # new_device_config = DeviceConfig(
  248. # mode_type="example_mode",
  249. # execution_type="example_execution",
  250. # action_name="example_action",
  251. # action_index=1,
  252. # picture_index=1,
  253. # camera_height=100,
  254. # camera_angle=45.5,
  255. # number_focus=2,
  256. # take_picture=True,
  257. # turntable_position=10.0,
  258. # turntable_angle=30.5,
  259. # shoe_upturn=False,
  260. # pre_delay=1.5,
  261. # after_delay=2.5,
  262. # led_switch=True,
  263. # is_wait=False,
  264. # )
  265. # created_device_config = device_config_crud.create(session, new_device_config)
  266. # print(f"Created Device Config: {created_device_config}")
  267. # 读取记录
  268. # read_device = device_config_crud.read(session, 1)
  269. # print(f"Read Device Config: {read_device.model_dump()}")
  270. # 读取所有记录
  271. # all_devices = device_config_crud.read_all(session, conditions={"id": 2})
  272. # print(f"All Device Configs: {[device.model_dump() for device in all_devices]}")
  273. # # 更新记录
  274. # updated_device = device_config_crud.update(
  275. # session, created_device_config.id, mode_type="updated_mode"
  276. # )
  277. # print(f"Updated Device Config: {updated_device}")
  278. # # 删除记录
  279. # device_config_crud.delete(session, created_device_config.id)
  280. # print("Device Config deleted.")