databases.py 12 KB


  1. from networkx.algorithms.components import connected
  2. from sqlmodel import Field, Session, SQLModel, create_engine, select
  3. from typing import Dict
  4. from datetime import datetime
  5. from typing import Optional
  6. import json
  7. from sqlalchemy import and_, desc, asc, delete,inspect
  8. import settings
  9. from utils.utils_func import check_path
  10. from sqlalchemy.dialects import sqlite
  11. from model import DeviceConfig, PhotoRecord, SysConfigs, DeviceConfigTabs
  12. check_path("C:/Zhihuiyin")
  13. # 创建SQLite数据库引擎
  14. sqlite_file_name = "C:/Zhihuiyin/database.db"
  15. sqlite_url = f"sqlite:///{sqlite_file_name}"
  16. engine = create_engine(
  17. sqlite_url,
  18. echo=False,
  19. connect_args={"check_same_thread": False},
  20. pool_size=20, # 增加基础连接池大小
  21. max_overflow=30, # 增加最大溢出连接数
  22. pool_timeout=60, # 保持合理的超时时间
  23. pool_recycle=1800, # 连接回收时间(秒)
  24. pool_pre_ping=True, # 检查连接有效性
  25. )
  26. # 创建表
  27. def create_all_database():
  28. SQLModel.metadata.create_all(engine)
  29. # 创建会话
  30. def __get_session():
  31. with Session(engine) as session:
  32. try:
  33. yield session
  34. finally:
  35. session.close()
  36. # 创建一个通用的 CRUD 类
  37. class CRUD:
  38. def __init__(self, model):
  39. self.model = model
  40. def create(self, session: Session, obj_in):
  41. obj_in_data = dict(obj_in)
  42. db_obj = self.model(**obj_in_data)
  43. session.add(db_obj)
  44. session.commit()
  45. session.refresh(db_obj)
  46. return db_obj
  47. def truncate(self, session: Session):
  48. """
  49. 使用SQL删除语句清空所有记录(更高效)
  50. :param session: 数据库会话
  51. """
  52. stmt = delete(self.model)
  53. result = session.exec(stmt)
  54. session.commit()
  55. return result.rowcount
  56. def read(
  57. self,
  58. session: Session,
  59. conditions: Optional[Dict] = None,
  60. order_by: Optional[str] = None,
  61. ascending: bool = True,
  62. ):
  63. query = select(self.model)
  64. if conditions:
  65. query = query.where(
  66. and_(
  67. *(
  68. getattr(self.model, key) == value
  69. for key, value in conditions.items()
  70. )
  71. )
  72. )
  73. if order_by:
  74. if ascending:
  75. query = query.order_by(asc(getattr(self.model, order_by)))
  76. else:
  77. query = query.order_by(desc(getattr(self.model, order_by)))
  78. data = session.exec(query).first()
  79. return data
  80. def read_all(
  81. self,
  82. session: Session,
  83. conditions: Optional[Dict] = None,
  84. order_by: Optional[str] = None,
  85. ascending: bool = True,
  86. join_conditions: Optional[list] = None, # 新增:支持多个JOIN
  87. ):
  88. query = select(self.model)
  89. # 处理JOIN逻辑
  90. if join_conditions:
  91. for join_info in join_conditions:
  92. joined_model = join_info.get("model")
  93. on_clause = join_info.get("on")
  94. is_outer = join_info.get("is_outer", False)
  95. if not joined_model or not on_clause:
  96. continue
  97. if is_outer:
  98. query = query.outerjoin(joined_model, on_clause)
  99. else:
  100. query = query.join(joined_model, on_clause)
  101. if conditions:
  102. for key, value in conditions.items():
  103. column = getattr(self.model, key)
  104. if isinstance(value, list):
  105. # 如果值是列表,使用 IN 查询
  106. query = query.where(column.in_(value))
  107. else:
  108. # 否则使用等于条件
  109. query = query.where(column == value)
  110. if order_by:
  111. if ascending:
  112. query = query.order_by(asc(getattr(self.model, order_by)))
  113. else:
  114. query = query.order_by(desc(getattr(self.model, order_by)))
  115. data = session.exec(query).all()
  116. return data
  117. def update(self, session: Session, obj_id: int, **kwargs):
  118. db_obj = session.get(self.model, obj_id)
  119. for key, value in kwargs.items():
  120. setattr(db_obj, key, value)
  121. session.commit()
  122. session.refresh(db_obj)
  123. return db_obj
  124. def deleteConditions(
  125. self,
  126. session: Session,
  127. conditions: Optional[Dict] = None,
  128. is_soft_delete: bool = True,
  129. ):
  130. query = select(self.model)
  131. if conditions is None:
  132. return False
  133. # 构建查询条件
  134. query = query.where(
  135. and_(
  136. *(
  137. getattr(self.model, key) == value
  138. for key, value in conditions.items()
  139. )
  140. )
  141. )
  142. # 获取需要删除的对象
  143. objects_to_delete = session.exec(query).all()
  144. # 检查模型是否包含 delete_time 字段
  145. model_columns = {column.name for column in inspect(self.model).columns}
  146. if 'delete_time' in model_columns and is_soft_delete ==True:
  147. # 软删除:更新 delete_time 字段
  148. for obj in objects_to_delete:
  149. setattr(obj, 'delete_time', datetime.now())
  150. session.commit()
  151. print("软删除完成")
  152. else:
  153. # 硬删除:直接删除对象
  154. for obj in objects_to_delete:
  155. session.delete(obj)
  156. session.commit()
  157. print("硬删除完成")
  158. return True
  159. def delete(self, session: Session, obj_id: int):
  160. db_obj = session.get(self.model, obj_id)
  161. session.delete(db_obj)
  162. session.commit()
  163. # session.refresh()
  164. # 恢复 updateConditions 方法
  165. def updateConditions(self, session: Session, conditions: Dict, **kwargs):
  166. """
  167. 根据条件更新记录
  168. :param session: 数据库会话
  169. :param conditions: 更新条件字典
  170. :param kwargs: 需要更新的字段和值
  171. :return: 更新后的对象
  172. """
  173. query = select(self.model).where(
  174. and_(
  175. *(
  176. getattr(self.model, key) == value
  177. for key, value in conditions.items()
  178. )
  179. )
  180. )
  181. print("SQL 打印==>",str(query))
  182. result = session.exec(query).first()
  183. if result:
  184. for key, value in kwargs.items():
  185. setattr(result, key, value)
  186. session.commit() # 提交事务以保存更改
  187. return result
  188. return None
  189. def updateConditionsAll(self, session: Session, conditions: Dict, **kwargs):
  190. """
  191. 根据条件更新记录
  192. :param session: 数据库会话
  193. :param conditions: 更新条件字典
  194. :param kwargs: 需要更新的字段和值
  195. :return: 更新后的对象
  196. """
  197. query = select(self.model).where(
  198. and_(
  199. *(
  200. getattr(self.model, key) == value
  201. for key, value in conditions.items()
  202. )
  203. )
  204. )
  205. print("SQL 打印==>", str(query))
  206. results = session.exec(query).fetchall()
  207. if results:
  208. for obj in results: # 遍历每个对象
  209. for key, value in kwargs.items():
  210. setattr(obj, key, value) # 对每个对象设置属性
  211. session.commit() # 提交事务以保存更改
  212. return results
  213. return None
  214. # 批量插入数据到设备配置表
  215. def batch_insert_device_configs(session: Session, action_tabs: list, data_list: list):
  216. """批量插入数据到设备配置表"""
  217. for idx, tab in enumerate(action_tabs):
  218. crud = CRUD(DeviceConfigTabs)
  219. device_tab = DeviceConfigTabs(
  220. mode_type=tab.get("mode_type"),
  221. mode_name=tab.get("mode_name"),
  222. )
  223. create_obj = crud.create(session, obj_in=device_tab)
  224. for data in data_list:
  225. data["tab_id"] = create_obj.id
  226. data["is_system"] = False
  227. if idx in [0, 6]:
  228. data["is_system"] = True
  229. device_config = DeviceConfig(**data)
  230. session.add(device_config)
  231. session.commit() # 合并事务提交
  232. def batch_insert_device_configsNew(
  233. session: Session, action_tabs: list, data_list: list
  234. ):
  235. """批量插入数据到设备配置表"""
  236. for idx, tab in enumerate(action_tabs):
  237. crud = CRUD(DeviceConfigTabs)
  238. device_tab = DeviceConfigTabs(
  239. id=tab.get("id"),
  240. mode_type=tab.get("mode_type"),
  241. mode_name=tab.get("mode_name"),
  242. )
  243. create_obj = crud.create(session, obj_in=device_tab)
  244. for data in data_list:
  245. # data["tab_id"] = create_obj.id
  246. # data["is_system"] = False
  247. # if idx in [0, 6]:
  248. # data["is_system"] = True
  249. device_config = DeviceConfig(**data)
  250. session.add(device_config)
  251. session.commit() # 合并事务提交
  252. # 批量插入系统配置
  253. def batch_insert_sys_configs(session: Session, data_list: list):
  254. """批量插入数据到设备配置表"""
  255. for data in data_list:
  256. print("data", data, type(data))
  257. config = SysConfigs(**data)
  258. session.add(config)
  259. session.commit() # 合并事务提交
  260. # 插入照片记录
  261. async def insert_photo_records(
  262. image_deal_mode: int, goods_art_no: str, image_index: int, action_id: int
  263. ):
  264. with SqlQuery() as session: # 使用上下文管理器复用会话
  265. """批量插入数据到照片记录"""
  266. data = {
  267. "image_deal_mode": image_deal_mode,
  268. "goods_art_no": goods_art_no,
  269. "image_index": image_index,
  270. "action_id": action_id,
  271. }
  272. device_config = PhotoRecord(**data)
  273. session.add(device_config)
  274. session.commit()
  275. session.refresh(device_config)
  276. record_id = device_config.id
  277. syncData = {
  278. "id": record_id,
  279. "image_deal_mode": image_deal_mode,
  280. "goods_art_no": goods_art_no,
  281. "image_index": image_index,
  282. "action_id": action_id,
  283. }
  284. # 异步插入一条数据
  285. settings.syncPhotoRecord(syncData,action_type=1)
  286. return True, record_id
  287. def SqlQuery():
  288. return next(__get_session())
  289. # 使用示例
  290. if __name__ == "__main__":
  291. pass
  292. # 使用 next 函数从生成器中获取 Session 对象
  293. # session = SqlQuery()
  294. # 创建 CRUD 实例
  295. # device_config_crud = CRUD(DeviceConfig)
  296. # 创建新记录
  297. # new_device_config = DeviceConfig(
  298. # mode_type="example_mode",
  299. # execution_type="example_execution",
  300. # action_name="example_action",
  301. # action_index=1,
  302. # picture_index=1,
  303. # camera_height=100,
  304. # camera_angle=45.5,
  305. # number_focus=2,
  306. # take_picture=True,
  307. # turntable_position=10.0,
  308. # turntable_angle=30.5,
  309. # shoe_upturn=False,
  310. # pre_delay=1.5,
  311. # after_delay=2.5,
  312. # led_switch=True,
  313. # is_wait=False,
  314. # )
  315. # created_device_config = device_config_crud.create(session, new_device_config)
  316. # print(f"Created Device Config: {created_device_config}")
  317. # 读取记录
  318. # read_device = device_config_crud.read(session, 1)
  319. # print(f"Read Device Config: {read_device.model_dump()}")
  320. # 读取所有记录
  321. # all_devices = device_config_crud.read_all(session, conditions={"id": 2})
  322. # print(f"All Device Configs: {[device.model_dump() for device in all_devices]}")
  323. # # 更新记录
  324. # updated_device = device_config_crud.update(
  325. # session, created_device_config.id, mode_type="updated_mode"
  326. # )
  327. # print(f"Updated Device Config: {updated_device}")
  328. # # 删除记录
  329. # device_config_crud.delete(session, created_device_config.id)
  330. # print("Device Config deleted.")