databases.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421
  1. from networkx.algorithms.components import connected
  2. from sqlmodel import Field, Session, SQLModel, create_engine, select
  3. from typing import Dict
  4. from datetime import datetime
  5. from typing import Optional
  6. import json
  7. from sqlalchemy import and_, desc, asc, delete,inspect
  8. import settings
  9. from utils.utils_func import check_path
  10. from sqlalchemy.dialects import sqlite
  11. from model import DeviceConfig, PhotoRecord, SysConfigs, DeviceConfigTabs
  12. check_path("C:/Zhihuiyin")
  13. # 创建SQLite数据库引擎
  14. sqlite_file_name = "C:/Zhihuiyin/database.db"
  15. sqlite_url = f"sqlite:///{sqlite_file_name}"
  16. engine = create_engine(
  17. sqlite_url,
  18. echo=False,
  19. connect_args={"check_same_thread": False},
  20. pool_size=20, # 增加基础连接池大小
  21. max_overflow=30, # 增加最大溢出连接数
  22. pool_timeout=60, # 保持合理的超时时间
  23. pool_recycle=1800, # 连接回收时间(秒)
  24. pool_pre_ping=True, # 检查连接有效性
  25. )
  26. # 创建表
  27. def create_all_database():
  28. SQLModel.metadata.create_all(engine)
  29. # 执行自动迁移
  30. auto_add_missing_columns()
  31. # 创建会话
  32. def __get_session():
  33. with Session(engine) as session:
  34. try:
  35. yield session
  36. finally:
  37. session.close()
  38. # 创建一个通用的 CRUD 类
  39. class CRUD:
  40. def __init__(self, model):
  41. self.model = model
  42. def create(self, session: Session, obj_in):
  43. obj_in_data = dict(obj_in)
  44. db_obj = self.model(**obj_in_data)
  45. session.add(db_obj)
  46. session.commit()
  47. session.refresh(db_obj)
  48. return db_obj
  49. def truncate(self, session: Session):
  50. """
  51. 使用SQL删除语句清空所有记录(更高效)
  52. :param session: 数据库会话
  53. """
  54. stmt = delete(self.model)
  55. result = session.exec(stmt)
  56. session.commit()
  57. return result.rowcount
  58. def read(
  59. self,
  60. session: Session,
  61. conditions: Optional[Dict] = None,
  62. order_by: Optional[str] = None,
  63. ascending: bool = True,
  64. ):
  65. query = select(self.model)
  66. if conditions:
  67. query = query.where(
  68. and_(
  69. *(
  70. getattr(self.model, key) == value
  71. for key, value in conditions.items()
  72. )
  73. )
  74. )
  75. if order_by:
  76. if ascending:
  77. query = query.order_by(asc(getattr(self.model, order_by)))
  78. else:
  79. query = query.order_by(desc(getattr(self.model, order_by)))
  80. data = session.exec(query).first()
  81. return data
  82. def read_all(
  83. self,
  84. session: Session,
  85. conditions: Optional[Dict] = None,
  86. order_by: Optional[str] = None,
  87. ascending: bool = True,
  88. join_conditions: Optional[list] = None, # 新增:支持多个JOIN
  89. ):
  90. query = select(self.model)
  91. # 处理JOIN逻辑
  92. if join_conditions:
  93. for join_info in join_conditions:
  94. joined_model = join_info.get("model")
  95. on_clause = join_info.get("on")
  96. is_outer = join_info.get("is_outer", False)
  97. if not joined_model or not on_clause:
  98. continue
  99. if is_outer:
  100. query = query.outerjoin(joined_model, on_clause)
  101. else:
  102. query = query.join(joined_model, on_clause)
  103. if conditions:
  104. for key, value in conditions.items():
  105. column = getattr(self.model, key)
  106. if isinstance(value, list):
  107. # 如果值是列表,使用 IN 查询
  108. query = query.where(column.in_(value))
  109. else:
  110. # 否则使用等于条件
  111. query = query.where(column == value)
  112. if order_by:
  113. if ascending:
  114. query = query.order_by(asc(getattr(self.model, order_by)))
  115. else:
  116. query = query.order_by(desc(getattr(self.model, order_by)))
  117. data = session.exec(query).all()
  118. return data
  119. def update(self, session: Session, obj_id: int, **kwargs):
  120. db_obj = session.get(self.model, obj_id)
  121. for key, value in kwargs.items():
  122. setattr(db_obj, key, value)
  123. session.commit()
  124. session.refresh(db_obj)
  125. return db_obj
  126. def deleteConditions(
  127. self,
  128. session: Session,
  129. conditions: Optional[Dict] = None,
  130. is_soft_delete: bool = True,
  131. ):
  132. query = select(self.model)
  133. if conditions is None:
  134. return False
  135. # 构建查询条件
  136. query = query.where(
  137. and_(
  138. *(
  139. getattr(self.model, key) == value
  140. for key, value in conditions.items()
  141. )
  142. )
  143. )
  144. # 获取需要删除的对象
  145. objects_to_delete = session.exec(query).all()
  146. # 检查模型是否包含 delete_time 字段
  147. model_columns = {column.name for column in inspect(self.model).columns}
  148. if 'delete_time' in model_columns and is_soft_delete ==True:
  149. # 软删除:更新 delete_time 字段
  150. for obj in objects_to_delete:
  151. setattr(obj, 'delete_time', datetime.now())
  152. session.commit()
  153. print("软删除完成")
  154. else:
  155. # 硬删除:直接删除对象
  156. for obj in objects_to_delete:
  157. session.delete(obj)
  158. session.commit()
  159. print("硬删除完成")
  160. return True
  161. def delete(self, session: Session, obj_id: int):
  162. db_obj = session.get(self.model, obj_id)
  163. session.delete(db_obj)
  164. session.commit()
  165. # session.refresh()
  166. # 恢复 updateConditions 方法
  167. def updateConditions(self, session: Session, conditions: Dict, **kwargs):
  168. """
  169. 根据条件更新记录
  170. :param session: 数据库会话
  171. :param conditions: 更新条件字典
  172. :param kwargs: 需要更新的字段和值
  173. :return: 更新后的对象
  174. """
  175. query = select(self.model).where(
  176. and_(
  177. *(
  178. getattr(self.model, key) == value
  179. for key, value in conditions.items()
  180. )
  181. )
  182. )
  183. print("SQL 打印==>",str(query))
  184. result = session.exec(query).first()
  185. if result:
  186. for key, value in kwargs.items():
  187. setattr(result, key, value)
  188. session.commit() # 提交事务以保存更改
  189. return result
  190. return None
  191. def updateConditionsAll(self, session: Session, conditions: Dict, **kwargs):
  192. """
  193. 根据条件更新记录
  194. :param session: 数据库会话
  195. :param conditions: 更新条件字典
  196. :param kwargs: 需要更新的字段和值
  197. :return: 更新后的对象
  198. """
  199. query = select(self.model).where(
  200. and_(
  201. *(
  202. getattr(self.model, key) == value
  203. for key, value in conditions.items()
  204. )
  205. )
  206. )
  207. print("SQL 打印==>", str(query))
  208. results = session.exec(query).fetchall()
  209. if results:
  210. for obj in results: # 遍历每个对象
  211. for key, value in kwargs.items():
  212. setattr(obj, key, value) # 对每个对象设置属性
  213. session.commit() # 提交事务以保存更改
  214. return results
  215. return None
  216. # 批量插入数据到设备配置表
  217. def batch_insert_device_configs(session: Session, action_tabs: list, data_list: list):
  218. """批量插入数据到设备配置表"""
  219. for idx, tab in enumerate(action_tabs):
  220. crud = CRUD(DeviceConfigTabs)
  221. device_tab = DeviceConfigTabs(
  222. mode_type=tab.get("mode_type"),
  223. mode_name=tab.get("mode_name"),
  224. )
  225. create_obj = crud.create(session, obj_in=device_tab)
  226. for data in data_list:
  227. data["tab_id"] = create_obj.id
  228. data["is_system"] = False
  229. if idx in [0, 6]:
  230. data["is_system"] = True
  231. device_config = DeviceConfig(**data)
  232. session.add(device_config)
  233. session.commit() # 合并事务提交
  234. def batch_insert_device_configsNew(
  235. session: Session, action_tabs: list, data_list: list
  236. ):
  237. """批量插入数据到设备配置表"""
  238. for idx, tab in enumerate(action_tabs):
  239. crud = CRUD(DeviceConfigTabs)
  240. device_tab = DeviceConfigTabs(
  241. id=tab.get("id"),
  242. mode_type=tab.get("mode_type"),
  243. mode_name=tab.get("mode_name"),
  244. )
  245. create_obj = crud.create(session, obj_in=device_tab)
  246. for data in data_list:
  247. # data["tab_id"] = create_obj.id
  248. # data["is_system"] = False
  249. # if idx in [0, 6]:
  250. # data["is_system"] = True
  251. device_config = DeviceConfig(**data)
  252. session.add(device_config)
  253. session.commit() # 合并事务提交
  254. # 批量插入系统配置
  255. def batch_insert_sys_configs(session: Session, data_list: list):
  256. """批量插入数据到设备配置表"""
  257. for data in data_list:
  258. print("data", data, type(data))
  259. config = SysConfigs(**data)
  260. session.add(config)
  261. session.commit() # 合并事务提交
  262. # 插入照片记录
  263. async def insert_photo_records(
  264. image_deal_mode: int, goods_art_no: str, image_index: int, action_id: int
  265. ):
  266. with SqlQuery() as session: # 使用上下文管理器复用会话
  267. """批量插入数据到照片记录"""
  268. data = {
  269. "image_deal_mode": image_deal_mode,
  270. "goods_art_no": goods_art_no,
  271. "image_index": image_index,
  272. "action_id": action_id,
  273. }
  274. device_config = PhotoRecord(**data)
  275. session.add(device_config)
  276. session.commit()
  277. session.refresh(device_config)
  278. record_id = device_config.id
  279. syncData = {
  280. "id": record_id,
  281. "image_deal_mode": image_deal_mode,
  282. "goods_art_no": goods_art_no,
  283. "image_index": image_index,
  284. "action_id": action_id,
  285. }
  286. # 异步插入一条数据
  287. settings.syncPhotoRecord(syncData,action_type=1)
  288. return True, record_id
  289. def auto_add_missing_columns():
  290. """
  291. 自动检测并添加缺失的数据库字段(最简化版本)
  292. 只为 device_config 表添加缺失的字段
  293. """
  294. try:
  295. import sqlite3
  296. # 连接数据库
  297. conn = sqlite3.connect(sqlite_file_name)
  298. cursor = conn.cursor()
  299. # 检查表是否存在
  300. cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='device_config'")
  301. if not cursor.fetchone():
  302. conn.close()
  303. return
  304. # 获取现有字段
  305. cursor.execute("PRAGMA table_info(device_config)")
  306. existing_columns = [row[1] for row in cursor.fetchall()]
  307. # 定义需要添加的字段
  308. # 格式: (字段名, SQL类型, 默认值)
  309. # 根据你在 DeviceConfig model 中新增的字段来配置
  310. new_fields = [
  311. # 请在这里添加你实际新增的字段
  312. ("point_name", "VARCHAR", "DEFAULT 'A'"),
  313. ("is_move_device", "BOOLEAN", "DEFAULT 1")
  314. ]
  315. # 添加缺失的字段
  316. for field_name, field_type, default_clause in new_fields:
  317. if field_name not in existing_columns:
  318. try:
  319. sql = f"ALTER TABLE device_config ADD COLUMN {field_name} {field_type} {default_clause}"
  320. cursor.execute(sql)
  321. conn.commit()
  322. except Exception:
  323. pass # 忽略错误,继续下一个
  324. conn.close()
  325. except Exception:
  326. pass # 静默失败,不影响启动
  327. def SqlQuery():
  328. return next(__get_session())
  329. # 使用示例
  330. if __name__ == "__main__":
  331. pass
  332. # 使用 next 函数从生成器中获取 Session 对象
  333. # session = SqlQuery()
  334. # 创建 CRUD 实例
  335. # device_config_crud = CRUD(DeviceConfig)
  336. # 创建新记录
  337. # new_device_config = DeviceConfig(
  338. # mode_type="example_mode",
  339. # execution_type="example_execution",
  340. # action_name="example_action",
  341. # action_index=1,
  342. # picture_index=1,
  343. # camera_height=100,
  344. # camera_angle=45.5,
  345. # number_focus=2,
  346. # take_picture=True,
  347. # turntable_position=10.0,
  348. # turntable_angle=30.5,
  349. # shoe_upturn=False,
  350. # pre_delay=1.5,
  351. # after_delay=2.5,
  352. # led_switch=True,
  353. # is_wait=False,
  354. # )
  355. # created_device_config = device_config_crud.create(session, new_device_config)
  356. # print(f"Created Device Config: {created_device_config}")
  357. # 读取记录
  358. # read_device = device_config_crud.read(session, 1)
  359. # print(f"Read Device Config: {read_device.model_dump()}")
  360. # 读取所有记录
  361. # all_devices = device_config_crud.read_all(session, conditions={"id": 2})
  362. # print(f"All Device Configs: {[device.model_dump() for device in all_devices]}")
  363. # # 更新记录
  364. # updated_device = device_config_crud.update(
  365. # session, created_device_config.id, mode_type="updated_mode"
  366. # )
  367. # print(f"Updated Device Config: {updated_device}")
  368. # # 删除记录
  369. # device_config_crud.delete(session, created_device_config.id)
  370. # print("Device Config deleted.")