To_DO: ERROR AFTER LOGGING IN

This commit is contained in:
florianuhlig
2025-10-03 15:03:18 +02:00
parent 70c85cb8be
commit 1554723ed4
27 changed files with 1484 additions and 273 deletions

40
database/__init__.py Normal file
View File

@@ -0,0 +1,40 @@
import logging
from typing import Dict, Any
from .interface import DatabaseInterface
from .sqlite_db import SQLiteDatabase
logger = logging.getLogger(__name__)
class DatabaseFactory:
@staticmethod
def create_database(db_type: str, config: Dict[str, Any]) -> DatabaseInterface:
if db_type.lower() == 'sqlite':
if 'path' not in config:
raise ValueError("SQLite configuration requires 'path'")
return SQLiteDatabase(config['path'])
elif db_type.lower() == 'mysql':
# Für später: MySQL-Implementierung
# from .mysql_db import MySQLDatabase
# return MySQLDatabase(config)
raise NotImplementedError("MySQL support not yet implemented")
elif db_type.lower() == 'postgresql':
# Für später: PostgreSQL-Implementierung
# from .postgresql_db import PostgreSQLDatabase
# return PostgreSQLDatabase(config)
raise NotImplementedError("PostgreSQL support not yet implemented")
else:
raise ValueError(f"Unsupported database type: {db_type}")
# Convenience-Funktion für einfachen Zugriff
def get_database(db_type: str, config: Dict[str, Any]) -> DatabaseInterface:
db = DatabaseFactory.create_database(db_type, config)
db.connect()
db.create_user_table()
return db

View File

@@ -0,0 +1,38 @@
import logging
from flask import g, current_app
from .interface import DatabaseInterface
logger = logging.getLogger(__name__)
class FlaskDatabaseManager:
"""Flask-Integration für Thread-Safe Database Management"""
def __init__(self, database_factory_func):
self.database_factory_func = database_factory_func
def get_db(self) -> DatabaseInterface:
"""
Holt die Datenbank-Instanz für den aktuellen Request
Verwendet Flask's 'g' object für request-lokale Speicherung
"""
if 'database' not in g:
g.database = self.database_factory_func()
g.database.connect()
g.database.create_user_table()
logger.debug("Database instance created for request")
return g.database
def close_db(self, error=None):
"""
Schließt die Datenbank-Verbindung am Ende des Requests
"""
database = g.pop('database', None)
if database is not None:
database.disconnect()
logger.debug("Database connection closed for request")
def init_app(self, app):
"""Registriert die Database-Manager-Funktionen bei Flask"""
app.teardown_appcontext(self.close_db)

40
database/interface.py Normal file
View File

@@ -0,0 +1,40 @@
from abc import ABC, abstractmethod
from typing import Optional, List, Dict, Any
class DatabaseInterface(ABC):
@abstractmethod
def connect(self) -> None:
pass
@abstractmethod
def disconnect(self) -> None:
pass
@abstractmethod
def create_user_table(self) -> None:
pass
@abstractmethod
def create_user(self, username: str, email: str, password_hash: str) -> bool:
pass
@abstractmethod
def get_user_by_email(self, email: str) -> Optional[Dict[str, Any]]:
pass
@abstractmethod
def get_user_by_username(self, username: str) -> Optional[Dict[str, Any]]:
pass
@abstractmethod
def get_password_hash_by_email(self, email: str) -> Optional[str]:
pass
@abstractmethod
def update_user_password(self, email: str, new_password_hash: str) -> bool:
pass
@abstractmethod
def delete_user(self, email: str) -> bool:
pass

0
database/mysql_db.py Normal file
View File

View File

189
database/sqlite_db.py Normal file
View File

@@ -0,0 +1,189 @@
import sqlite3
import logging
from threading import local
from typing import Optional, Dict, Any
from .interface import DatabaseInterface
logger = logging.getLogger(__name__)
class SQLiteDatabase(DatabaseInterface):
"""Thread-Safe SQLite-Implementierung der Database-Interface"""
def __init__(self, db_path: str):
self.db_path = db_path
# Thread-local storage für Verbindungen
self._local = local()
def _get_connection(self) -> sqlite3.Connection:
"""Holt oder erstellt eine thread-lokale Verbindung"""
if not hasattr(self._local, 'connection') or self._local.connection is None:
self._local.connection = sqlite3.connect(
self.db_path,
check_same_thread=False, # Erlaubt thread-übergreifende Nutzung
timeout=30.0,
detect_types=sqlite3.PARSE_DECLTYPES | sqlite3.PARSE_COLNAMES
)
self._local.connection.row_factory = sqlite3.Row
# Optimierungen für bessere Performance
self._local.connection.execute("PRAGMA journal_mode=WAL")
self._local.connection.execute("PRAGMA synchronous=NORMAL")
self._local.connection.execute("PRAGMA cache_size=1000")
self._local.connection.execute("PRAGMA temp_store=MEMORY")
logger.debug(f"New SQLite connection created for thread")
return self._local.connection
def connect(self) -> None:
"""Initialisiert die thread-lokale Verbindung"""
try:
conn = self._get_connection()
logger.info(f"Connected to SQLite database: {self.db_path}")
except Exception as e:
logger.error(f"Failed to connect to SQLite database: {e}")
raise
def disconnect(self) -> None:
"""Schließt die thread-lokale Verbindung"""
if hasattr(self._local, 'connection') and self._local.connection:
try:
self._local.connection.close()
self._local.connection = None
logger.debug("SQLite connection closed for thread")
except Exception as e:
logger.warning(f"Error closing SQLite connection: {e}")
def create_user_table(self) -> None:
"""User-Tabelle erstellen"""
conn = self._get_connection()
cursor = conn.cursor()
try:
cursor.execute("""
CREATE TABLE IF NOT EXISTS users
(
id INTEGER PRIMARY KEY AUTOINCREMENT,
username TEXT NOT NULL UNIQUE,
email TEXT NOT NULL UNIQUE,
password_hash TEXT NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP )
""")
conn.commit()
logger.info("User table created/verified")
except Exception as e:
logger.error(f"Failed to create user table: {e}")
raise
finally:
cursor.close()
def create_user(self, username: str, email: str, password_hash: str) -> bool:
"""Neuen User erstellen"""
conn = self._get_connection()
cursor = conn.cursor()
try:
cursor.execute(
"INSERT INTO users (username, email, password_hash) VALUES (?, ?, ?)",
(username, email, password_hash)
)
conn.commit()
logger.info(f"User created successfully: {email}")
return True
except sqlite3.IntegrityError as e:
logger.warning(f"User creation failed (duplicate): {e}")
conn.rollback()
return False
except Exception as e:
logger.error(f"Failed to create user: {e}")
conn.rollback()
raise
finally:
cursor.close()
def get_user_by_email(self, email: str) -> Optional[Dict[str, Any]]:
"""User anhand Email suchen"""
conn = self._get_connection()
cursor = conn.cursor()
try:
cursor.execute(
"SELECT id, username, email, password_hash, created_at FROM users WHERE email = ?",
(email,)
)
row = cursor.fetchone()
return dict(row) if row else None
except Exception as e:
logger.error(f"Failed to get user by email: {e}")
raise
finally:
cursor.close()
def get_user_by_username(self, username: str) -> Optional[Dict[str, Any]]:
"""User anhand Username suchen"""
conn = self._get_connection()
cursor = conn.cursor()
try:
cursor.execute(
"SELECT id, username, email, password_hash, created_at FROM users WHERE username = ?",
(username,)
)
row = cursor.fetchone()
return dict(row) if row else None
except Exception as e:
logger.error(f"Failed to get user by username: {e}")
raise
finally:
cursor.close()
def get_password_hash_by_email(self, email: str) -> Optional[str]:
"""Password-Hash für Email abrufen"""
conn = self._get_connection()
cursor = conn.cursor()
try:
cursor.execute(
"SELECT password_hash FROM users WHERE email = ?",
(email,)
)
row = cursor.fetchone()
return row if row else None
except Exception as e:
logger.error(f"Failed to get password hash by email: {e}")
raise
finally:
cursor.close()
def update_user_password(self, email: str, new_password_hash: str) -> bool:
"""Passwort aktualisieren"""
conn = self._get_connection()
cursor = conn.cursor()
try:
cursor.execute(
"UPDATE users SET password_hash = ?, updated_at = CURRENT_TIMESTAMP WHERE email = ?",
(new_password_hash, email)
)
conn.commit()
success = cursor.rowcount > 0
if success:
logger.info(f"Password updated for user: {email}")
return success
except Exception as e:
logger.error(f"Failed to update password: {e}")
conn.rollback()
raise
finally:
cursor.close()
def delete_user(self, email: str) -> bool:
"""User löschen"""
conn = self._get_connection()
cursor = conn.cursor()
try:
cursor.execute("DELETE FROM users WHERE email = ?", (email,))
conn.commit()
success = cursor.rowcount > 0
if success:
logger.info(f"User deleted: {email}")
return success
except Exception as e:
logger.error(f"Failed to delete user: {e}")
conn.rollback()
raise
finally:
cursor.close()