123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119 |
- import os
- from typing import Dict, List, Tuple
- import sqlite3
- from pathlib import Path
- from config.config import DATABASE_FILE
- db_folder = Path.cwd() / "db"
- if not Path.exists(db_folder):
- os.makedirs(db_folder)
- database_path = db_folder / DATABASE_FILE
- if not database_path.is_file():
- with open(database_path, "w"):
- pass
- conn = sqlite3.connect(database_path)
- cursor = conn.cursor()
- def update(table: str, data: Tuple, row_id: int):
- cursor.execute(
- f"update {table} set {data[0]}='{data[1]}' where id={row_id}")
- conn.commit()
- def insert(table: str, column_values: Dict):
- columns = ', '.join(column_values.keys())
- values = [tuple(column_values.values())]
- placeholders = ", ".join("?" * len(column_values.keys()))
- cursor.executemany(
- f"INSERT INTO {table} "
- f"({columns}) "
- f"VALUES ({placeholders})",
- values)
- conn.commit()
- id = cursor.execute("SELECT last_insert_rowid()")
- return id.lastrowid
- def _to_objects_list(rows, columns):
- """
- Получает данные из БД в виде списков строк и название колонок. Возвращает
- обычный объект, где ключами являются названия колонок, занчениями - данные
- из колонок
- """
- result = []
- for row in rows:
- dict_row = {}
- for index, column in enumerate(columns):
- dict_row[column] = row[index]
- result.append(dict_row)
- if len(result) == 1:
- return result[0]
- elif not len(result):
- return []
- else:
- return result
- def fetchall(table: str, columns: List[str]):
- columns_joined = ", ".join(columns)
- cursor.execute(f"SELECT {columns_joined} FROM {table}")
- rows = cursor.fetchall()
- return _to_objects_list(rows, columns)
- def select_by_keys(table: str, columns: List[str], args: Dict):
- columns_joined = ", ".join(columns)
- search_strings = []
- for key in args.keys():
- search_strings.append(f"{key}='{args[key]}'")
- query = f"select {columns_joined} from {table} where {' and '.join(search_strings)}"
- cursor.execute(query)
- rows = cursor.fetchall()
- return _to_objects_list(rows, columns)
- def delete(table: str, row_id: int) -> None:
- cursor.execute(f"delete from {table} where id={row_id}")
- conn.commit()
- def delete_many(table: str, args: Dict):
- search_strings = []
- for key in args.keys():
- search_strings.append(f"{key}='{args[key]}'")
- query = f"delete from {table} where {' and '.join(search_strings)}"
- cursor.execute(query)
- conn.commit()
- def get_cursor():
- return cursor
- def _init_db():
- """Инициализирует БД"""
- with open("createdb.sql", "r") as f:
- sql = f.read()
- cursor.executescript(sql)
- conn.commit()
- def check_db_exists():
- """Проверяет, инициализирована ли БД, если нет — инициализирует"""
- cursor.execute("SELECT name FROM sqlite_master "
- "WHERE type='table' AND name='users'")
- table_exists = cursor.fetchall()
- if table_exists:
- return
- _init_db()
- check_db_exists()
|