db.py 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119
  1. import os
  2. from typing import Dict, List, Tuple
  3. import sqlite3
  4. from pathlib import Path
  5. from config.config import DATABASE_FILE
  6. db_folder = Path.cwd() / "db"
  7. if not Path.exists(db_folder):
  8. os.makedirs(db_folder)
  9. database_path = db_folder / DATABASE_FILE
  10. if not database_path.is_file():
  11. with open(database_path, "w"):
  12. pass
  13. conn = sqlite3.connect(database_path)
  14. cursor = conn.cursor()
  15. def update(table: str, data: Tuple, row_id: int):
  16. cursor.execute(
  17. f"update {table} set {data[0]}='{data[1]}' where id={row_id}")
  18. conn.commit()
  19. def insert(table: str, column_values: Dict):
  20. columns = ', '.join(column_values.keys())
  21. values = [tuple(column_values.values())]
  22. placeholders = ", ".join("?" * len(column_values.keys()))
  23. cursor.executemany(
  24. f"INSERT INTO {table} "
  25. f"({columns}) "
  26. f"VALUES ({placeholders})",
  27. values)
  28. conn.commit()
  29. id = cursor.execute("SELECT last_insert_rowid()")
  30. return id.lastrowid
  31. def _to_objects_list(rows, columns):
  32. """
  33. Получает данные из БД в виде списков строк и название колонок. Возвращает
  34. обычный объект, где ключами являются названия колонок, занчениями - данные
  35. из колонок
  36. """
  37. result = []
  38. for row in rows:
  39. dict_row = {}
  40. for index, column in enumerate(columns):
  41. dict_row[column] = row[index]
  42. result.append(dict_row)
  43. if len(result) == 1:
  44. return result[0]
  45. elif not len(result):
  46. return []
  47. else:
  48. return result
  49. def fetchall(table: str, columns: List[str]):
  50. columns_joined = ", ".join(columns)
  51. cursor.execute(f"SELECT {columns_joined} FROM {table}")
  52. rows = cursor.fetchall()
  53. return _to_objects_list(rows, columns)
  54. def select_by_keys(table: str, columns: List[str], args: Dict):
  55. columns_joined = ", ".join(columns)
  56. search_strings = []
  57. for key in args.keys():
  58. search_strings.append(f"{key}='{args[key]}'")
  59. query = f"select {columns_joined} from {table} where {' and '.join(search_strings)}"
  60. cursor.execute(query)
  61. rows = cursor.fetchall()
  62. return _to_objects_list(rows, columns)
  63. def delete(table: str, row_id: int) -> None:
  64. cursor.execute(f"delete from {table} where id={row_id}")
  65. conn.commit()
  66. def delete_many(table: str, args: Dict):
  67. search_strings = []
  68. for key in args.keys():
  69. search_strings.append(f"{key}='{args[key]}'")
  70. query = f"delete from {table} where {' and '.join(search_strings)}"
  71. cursor.execute(query)
  72. conn.commit()
  73. def get_cursor():
  74. return cursor
  75. def _init_db():
  76. """Инициализирует БД"""
  77. with open("createdb.sql", "r") as f:
  78. sql = f.read()
  79. cursor.executescript(sql)
  80. conn.commit()
  81. def check_db_exists():
  82. """Проверяет, инициализирована ли БД, если нет — инициализирует"""
  83. cursor.execute("SELECT name FROM sqlite_master "
  84. "WHERE type='table' AND name='users'")
  85. table_exists = cursor.fetchall()
  86. if table_exists:
  87. return
  88. _init_db()
  89. check_db_exists()