From 6e9f8a05894a7a59ea5ff9d0fdb8b2db775e49df Mon Sep 17 00:00:00 2001 From: Florian Uhlig Date: Sun, 5 Oct 2025 02:28:47 +0200 Subject: [PATCH] Run: black and isort --- config/database.py | 52 +++++---- database/__init__.py | 13 +-- database/flask_integration.py | 10 +- database/interface.py | 2 +- database/sqlite_db.py | 32 +++--- frontend/app.py | 191 ++++++++++++++++++---------------- main.py | 14 +-- services/auth_service.py | 26 +++-- services/user_service.py | 11 +- utils/auth_decorators.py | 34 +++--- utils/password_utils.py | 7 +- utils/validation.py | 19 ++-- 12 files changed, 222 insertions(+), 189 deletions(-) diff --git a/config/database.py b/config/database.py index 1a311fb..2f39957 100644 --- a/config/database.py +++ b/config/database.py @@ -1,46 +1,44 @@ import os -from typing import Dict, Any +from typing import Any, Dict class DatabaseConfig: def __init__(self): # Environment-basierte Konfiguration - self.db_type = os.getenv('DB_TYPE', 'sqlite') + self.db_type = os.getenv("DB_TYPE", "sqlite") self.config = self._load_config() def _load_config(self) -> Dict[str, Any]: - if self.db_type.lower() == 'sqlite': + if self.db_type.lower() == "sqlite": + return {"path": os.getenv("SQLITE_PATH", "databases/chatbot.db")} + elif self.db_type.lower() == "mysql": return { - 'path': os.getenv('SQLITE_PATH', 'databases/chatbot.db') + "host": os.getenv("MYSQL_HOST", "localhost"), + "port": int(os.getenv("MYSQL_PORT", "3306")), + "database": os.getenv("MYSQL_DATABASE", "chatbot"), + "user": os.getenv("MYSQL_USER", "root"), + "password": os.getenv("MYSQL_PASSWORD", ""), + "charset": "utf8mb4", } - elif self.db_type.lower() == 'mysql': + elif self.db_type.lower() == "mariadb": return { - 'host': os.getenv('MYSQL_HOST', 'localhost'), - 'port': int(os.getenv('MYSQL_PORT', '3306')), - 'database': os.getenv('MYSQL_DATABASE', 'chatbot'), - 'user': os.getenv('MYSQL_USER', 'root'), - 'password': os.getenv('MYSQL_PASSWORD', ''), - 'charset': 'utf8mb4' + "host": os.getenv("MARIADB_HOST", "localhost"), + "port": int(os.getenv("MARIADB_PORT", "3306")), + "database": os.getenv("MARIADB_DATABASE", "chatbot"), + "user": os.getenv("MARIADB_USER", "root"), + "password": os.getenv("MARIADB_PASSWORD", ""), + "charset": "utf8", } - elif self.db_type.lower() == 'mariadb': + elif self.db_type.lower() == "postgresql": return { - 'host': os.getenv('MARIADB_HOST', 'localhost'), - 'port': int(os.getenv('MARIADB_PORT', '3306')), - 'database': os.getenv('MARIADB_DATABASE', 'chatbot'), - 'user': os.getenv('MARIADB_USER', 'root'), - 'password': os.getenv('MARIADB_PASSWORD', ''), - 'charset': 'utf8' - } - elif self.db_type.lower() == 'postgresql': - return { - 'host': os.getenv('POSTGRES_HOST', 'localhost'), - 'port': int(os.getenv('POSTGRES_PORT', '5432')), - 'database': os.getenv('POSTGRES_DATABASE', 'chatbot'), - 'user': os.getenv('POSTGRES_USER', 'postgres'), - 'password': os.getenv('POSTGRES_PASSWORD', ''), + "host": os.getenv("POSTGRES_HOST", "localhost"), + "port": int(os.getenv("POSTGRES_PORT", "5432")), + "database": os.getenv("POSTGRES_DATABASE", "chatbot"), + "user": os.getenv("POSTGRES_USER", "postgres"), + "password": os.getenv("POSTGRES_PASSWORD", ""), } else: raise ValueError(f"Unsupported database type: {self.db_type}") def get_database_config(self) -> tuple[str, Dict[str, Any]]: - return self.db_type, self.config \ No newline at end of file + return self.db_type, self.config diff --git a/database/__init__.py b/database/__init__.py index 8bcd0e4..ba7337a 100644 --- a/database/__init__.py +++ b/database/__init__.py @@ -1,5 +1,6 @@ import logging -from typing import Dict, Any +from typing import Any, Dict + from .interface import DatabaseInterface from .sqlite_db import SQLiteDatabase @@ -11,18 +12,18 @@ class DatabaseFactory: @staticmethod def create_database(db_type: str, config: Dict[str, Any]) -> DatabaseInterface: - if db_type.lower() == 'sqlite': - if 'path' not in config: + if db_type.lower() == "sqlite": + if "path" not in config: raise ValueError("SQLite configuration requires 'path'") - return SQLiteDatabase(config['path']) + return SQLiteDatabase(config["path"]) - elif db_type.lower() == 'mysql': + elif db_type.lower() == "mysql": # Für später: MySQL-Implementierung # from .mysql_db import MySQLDatabase # return MySQLDatabase(config) raise NotImplementedError("MySQL support not yet implemented") - elif db_type.lower() == 'postgresql': + elif db_type.lower() == "postgresql": # Für später: PostgreSQL-Implementierung # from .postgresql_db import PostgreSQLDatabase # return PostgreSQLDatabase(config) diff --git a/database/flask_integration.py b/database/flask_integration.py index 5ecdf25..1300051 100644 --- a/database/flask_integration.py +++ b/database/flask_integration.py @@ -1,5 +1,7 @@ import logging -from flask import g, current_app + +from flask import current_app, g + from .interface import DatabaseInterface logger = logging.getLogger(__name__) @@ -16,7 +18,7 @@ class FlaskDatabaseManager: Holt die Datenbank-Instanz für den aktuellen Request Verwendet Flask's 'g' object für request-lokale Speicherung """ - if 'database' not in g: + if "database" not in g: g.database = self.database_factory_func() g.database.connect() g.database.create_user_table() @@ -28,11 +30,11 @@ class FlaskDatabaseManager: """ Schließt die Datenbank-Verbindung am Ende des Requests """ - database = g.pop('database', None) + database = g.pop("database", None) if database is not None: database.disconnect() logger.debug("Database connection closed for request") def init_app(self, app): """Registriert die Database-Manager-Funktionen bei Flask""" - app.teardown_appcontext(self.close_db) \ No newline at end of file + app.teardown_appcontext(self.close_db) diff --git a/database/interface.py b/database/interface.py index 18889b1..0c05695 100644 --- a/database/interface.py +++ b/database/interface.py @@ -1,5 +1,5 @@ from abc import ABC, abstractmethod -from typing import Optional, List, Dict, Any +from typing import Any, Dict, List, Optional class DatabaseInterface(ABC): diff --git a/database/sqlite_db.py b/database/sqlite_db.py index a07831d..1e5074a 100644 --- a/database/sqlite_db.py +++ b/database/sqlite_db.py @@ -1,7 +1,8 @@ -import sqlite3 import logging +import sqlite3 from threading import local -from typing import Optional, Dict, Any +from typing import Any, Dict, Optional + from .interface import DatabaseInterface logger = logging.getLogger(__name__) @@ -17,12 +18,12 @@ class SQLiteDatabase(DatabaseInterface): def _get_connection(self) -> sqlite3.Connection: """Holt oder erstellt eine thread-lokale Verbindung""" - if not hasattr(self._local, 'connection') or self._local.connection is None: + if not hasattr(self._local, "connection") or self._local.connection is None: self._local.connection = sqlite3.connect( self.db_path, check_same_thread=False, # Erlaubt thread-übergreifende Nutzung timeout=30.0, - detect_types=sqlite3.PARSE_DECLTYPES | sqlite3.PARSE_COLNAMES + detect_types=sqlite3.PARSE_DECLTYPES | sqlite3.PARSE_COLNAMES, ) self._local.connection.row_factory = sqlite3.Row # Optimierungen für bessere Performance @@ -45,7 +46,7 @@ class SQLiteDatabase(DatabaseInterface): def disconnect(self) -> None: """Schließt die thread-lokale Verbindung""" - if hasattr(self._local, 'connection') and self._local.connection: + if hasattr(self._local, "connection") and self._local.connection: try: self._local.connection.close() self._local.connection = None @@ -58,7 +59,8 @@ class SQLiteDatabase(DatabaseInterface): conn = self._get_connection() cursor = conn.cursor() try: - cursor.execute(""" + cursor.execute( + """ CREATE TABLE IF NOT EXISTS users ( id INTEGER PRIMARY KEY AUTOINCREMENT, @@ -66,7 +68,8 @@ class SQLiteDatabase(DatabaseInterface): email TEXT NOT NULL UNIQUE, password_hash TEXT NOT NULL, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) - """) + """ + ) conn.commit() logger.info("User table created/verified") except Exception as e: @@ -82,7 +85,7 @@ class SQLiteDatabase(DatabaseInterface): try: cursor.execute( "INSERT INTO users (username, email, password_hash) VALUES (?, ?, ?)", - (username, email, password_hash) + (username, email, password_hash), ) conn.commit() logger.info(f"User created successfully: {email}") @@ -105,7 +108,7 @@ class SQLiteDatabase(DatabaseInterface): try: cursor.execute( "SELECT id, username, email, password_hash, created_at FROM users WHERE email = ?", - (email,) + (email,), ) row = cursor.fetchone() return dict(row) if row else None @@ -122,7 +125,7 @@ class SQLiteDatabase(DatabaseInterface): try: cursor.execute( "SELECT id, username, email, password_hash, created_at FROM users WHERE username = ?", - (username,) + (username,), ) row = cursor.fetchone() return dict(row) if row else None @@ -137,10 +140,7 @@ class SQLiteDatabase(DatabaseInterface): conn = self._get_connection() cursor = conn.cursor() try: - cursor.execute( - "SELECT password_hash FROM users WHERE email = ?", - (email,) - ) + cursor.execute("SELECT password_hash FROM users WHERE email = ?", (email,)) row = cursor.fetchone() return row if row else None except Exception as e: @@ -156,7 +156,7 @@ class SQLiteDatabase(DatabaseInterface): try: cursor.execute( "UPDATE users SET password_hash = ?, updated_at = CURRENT_TIMESTAMP WHERE email = ?", - (new_password_hash, email) + (new_password_hash, email), ) conn.commit() success = cursor.rowcount > 0 @@ -186,4 +186,4 @@ class SQLiteDatabase(DatabaseInterface): conn.rollback() raise finally: - cursor.close() \ No newline at end of file + cursor.close() diff --git a/frontend/app.py b/frontend/app.py index dbf98f9..78c3ca3 100644 --- a/frontend/app.py +++ b/frontend/app.py @@ -1,25 +1,32 @@ -from datetime import datetime, timedelta # Add this line at the top -from flask import Flask, render_template, request, redirect, url_for, flash, session import logging +from datetime import datetime, timedelta # Add this line at the top + +from flask import (Flask, flash, redirect, render_template, request, session, + url_for) + from config.database import DatabaseConfig from database import DatabaseFactory from database.flask_integration import FlaskDatabaseManager -from services.user_service import UserService from services.auth_service import AuthService -from utils.auth_decorators import login_required, logout_required, get_current_user - +from services.user_service import UserService +from utils.auth_decorators import (get_current_user, login_required, + logout_required) # Logging konfigurieren logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) app = Flask(__name__) -app.secret_key = 'your-secret-key-change-this-in-production' +app.secret_key = "your-secret-key-change-this-in-production" # Session-Konfiguration -app.config['PERMANENT_SESSION_LIFETIME'] = timedelta(hours=24) # Session läuft nach 24h ab -app.config['SESSION_COOKIE_SECURE'] = False # Für Development - in Production auf True setzen -app.config['SESSION_COOKIE_HTTPONLY'] = True # Verhindert XSS-Angriffe +app.config["PERMANENT_SESSION_LIFETIME"] = timedelta( + hours=24 +) # Session läuft nach 24h ab +app.config["SESSION_COOKIE_SECURE"] = ( + False # Für Development - in Production auf True setzen +) +app.config["SESSION_COOKIE_HTTPONLY"] = True # Verhindert XSS-Angriffe # Database Manager initialisieren @@ -40,35 +47,35 @@ db_manager.init_app(app) @app.context_processor def inject_user(): """Macht User-Daten in allen Templates verfügbar""" - return {'current_user': get_current_user()} + return {"current_user": get_current_user()} -@app.route('/') +@app.route("/") def home(): """Startseite - Weiterleitung je nach Login-Status""" - if 'user_id' in session: - return redirect(url_for('dashboard')) - return redirect(url_for('login')) + if "user_id" in session: + return redirect(url_for("dashboard")) + return redirect(url_for("login")) -@app.route('/register', methods=['GET', 'POST']) +@app.route("/register", methods=["GET", "POST"]) @logout_required def register(): """Registrierung - nur für nicht eingeloggte User""" - if request.method == 'POST': - username = request.form.get('username', '').strip() - email = request.form.get('email', '').strip() - password = request.form.get('password', '') - confirm_password = request.form.get('confirm_password', '') + if request.method == "POST": + username = request.form.get("username", "").strip() + email = request.form.get("email", "").strip() + password = request.form.get("password", "") + confirm_password = request.form.get("confirm_password", "") # Basis-Validierung if not username or not email or not password or not confirm_password: - flash('Please fill out all fields', 'error') - return redirect(url_for('register')) + flash("Please fill out all fields", "error") + return redirect(url_for("register")) if password != confirm_password: - flash('Passwords do not match', 'error') - return redirect(url_for('register')) + flash("Passwords do not match", "error") + return redirect(url_for("register")) # Services mit request-lokaler DB-Instanz database = db_manager.get_db() @@ -78,29 +85,29 @@ def register(): success, errors = user_service.create_user(username, email, password) if success: - flash('Registration successful! Please log in.', 'success') + flash("Registration successful! Please log in.", "success") logger.info(f"New user registered: {email}") - return redirect(url_for('login')) + return redirect(url_for("login")) else: for error in errors: - flash(error, 'error') - return redirect(url_for('register')) + flash(error, "error") + return redirect(url_for("register")) - return render_template('register.html') + return render_template("register.html") -@app.route('/login', methods=['GET', 'POST']) +@app.route("/login", methods=["GET", "POST"]) @logout_required def login(): """Login - nur für nicht eingeloggte User""" - if request.method == 'POST': - email = request.form.get('email', '').strip() - password = request.form.get('password', '') - remember_me = request.form.get('remember_me') # Checkbox für "Remember Me" + if request.method == "POST": + email = request.form.get("email", "").strip() + password = request.form.get("password", "") + remember_me = request.form.get("remember_me") # Checkbox für "Remember Me" if not email or not password: - flash('Please enter email and password', 'error') - return redirect(url_for('login')) + flash("Please enter email and password", "error") + return redirect(url_for("login")) # Services mit request-lokaler DB-Instanz database = db_manager.get_db() @@ -111,50 +118,50 @@ def login(): if success: # Session setzen - session['user_id'] = user_data['id'] - session['username'] = user_data['username'] - session['email'] = user_data['email'] - session['login_time'] = datetime.utcnow().isoformat() + session["user_id"] = user_data["id"] + session["username"] = user_data["username"] + session["email"] = user_data["email"] + session["login_time"] = datetime.utcnow().isoformat() # Permanent session wenn "Remember Me" aktiviert if remember_me: session.permanent = True - flash(f'Welcome back, {user_data["username"]}!', 'success') + flash(f'Welcome back, {user_data["username"]}!', "success") logger.info(f"User logged in: {email}") # Weiterleitung zu ursprünglich angeforderte Seite (falls vorhanden) - next_page = request.args.get('next') + next_page = request.args.get("next") if next_page: return redirect(next_page) - return redirect(url_for('dashboard')) + return redirect(url_for("dashboard")) else: - flash(message, 'error') - return redirect(url_for('login')) + flash(message, "error") + return redirect(url_for("login")) - return render_template('login.html') + return render_template("login.html") -@app.route('/logout') +@app.route("/logout") @login_required def logout(): """Logout - nur für eingeloggte User""" - user_email = session.get('email') - username = session.get('username') + user_email = session.get("email") + username = session.get("username") # Session komplett löschen session.clear() - flash(f'Goodbye, {username}! You have been logged out successfully.', 'info') + flash(f"Goodbye, {username}! You have been logged out successfully.", "info") if user_email: logger.info(f"User logged out: {user_email}") - return redirect(url_for('login')) + return redirect(url_for("login")) -@app.route('/dashboard') +@app.route("/dashboard") @login_required def dashboard(): """Dashboard - nur für eingeloggte User""" @@ -162,19 +169,17 @@ def dashboard(): # Zusätzliche Dashboard-Daten (optional) dashboard_data = { - 'total_users': 'N/A', # Könnte aus DB geholt werden - 'last_login': session.get('login_time', 'Unknown'), - 'session_expires': 'Never' if session.permanent else '24 hours' + "total_users": "N/A", # Könnte aus DB geholt werden + "last_login": session.get("login_time", "Unknown"), + "session_expires": "Never" if session.permanent else "24 hours", } logger.debug(f"Dashboard accessed by user: {user['email']}") - return render_template('dashboard.html', - user=user, - dashboard_data=dashboard_data) + return render_template("dashboard.html", user=user, dashboard_data=dashboard_data) -@app.route('/profile') +@app.route("/profile") @login_required def profile(): """User Profile - nur für eingeloggte User""" @@ -182,65 +187,66 @@ def profile(): # Hier könnten zusätzliche User-Daten aus der DB geholt werden database = db_manager.get_db() - full_user_data = database.get_user_by_email(user['email']) + full_user_data = database.get_user_by_email(user["email"]) - return render_template('profile.html', user=full_user_data) + return render_template("profile.html", user=full_user_data) -@app.route('/change-password', methods=['GET', 'POST']) +@app.route("/change-password", methods=["GET", "POST"]) @login_required def change_password(): """Passwort ändern - nur für eingeloggte User""" - if request.method == 'POST': - current_password = request.form.get('current_password', '') - new_password = request.form.get('new_password', '') - confirm_password = request.form.get('confirm_password', '') + if request.method == "POST": + current_password = request.form.get("current_password", "") + new_password = request.form.get("new_password", "") + confirm_password = request.form.get("confirm_password", "") if not current_password or not new_password or not confirm_password: - flash('Please fill out all fields', 'error') - return redirect(url_for('change_password')) + flash("Please fill out all fields", "error") + return redirect(url_for("change_password")) if new_password != confirm_password: - flash('New passwords do not match', 'error') - return redirect(url_for('change_password')) + flash("New passwords do not match", "error") + return redirect(url_for("change_password")) # Aktuelles Passwort verifizieren user = get_current_user() database = db_manager.get_db() auth_service = AuthService(database) - success, _, message = auth_service.authenticate(user['email'], current_password) + success, _, message = auth_service.authenticate(user["email"], current_password) if not success: - flash('Current password is incorrect', 'error') - return redirect(url_for('change_password')) + flash("Current password is incorrect", "error") + return redirect(url_for("change_password")) # Neues Passwort setzen from utils.password_utils import PasswordUtils + new_password_hash = PasswordUtils.hash_password_simple(new_password) - if database.update_user_password(user['email'], new_password_hash): - flash('Password changed successfully', 'success') + if database.update_user_password(user["email"], new_password_hash): + flash("Password changed successfully", "success") logger.info(f"Password changed for user: {user['email']}") - return redirect(url_for('dashboard')) + return redirect(url_for("dashboard")) else: - flash('Failed to change password', 'error') - return redirect(url_for('change_password')) + flash("Failed to change password", "error") + return redirect(url_for("change_password")) - return render_template('change_password.html') + return render_template("change_password.html") # Error Handlers @app.errorhandler(404) def not_found(error): - return render_template('404.html'), 404 + return render_template("404.html"), 404 @app.errorhandler(500) def internal_error(error): logger.error(f"Internal server error: {error}") - flash('An internal error occurred. Please try again.', 'error') - return redirect(url_for('home')) + flash("An internal error occurred. Please try again.", "error") + return redirect(url_for("home")) # Session Timeout Check @@ -249,25 +255,28 @@ def check_session_timeout(): """Prüft ob Session abgelaufen ist""" from datetime import datetime - if 'user_id' in session: + if "user_id" in session: # Prüfe ob Session zu alt ist (optional) - login_time = session.get('login_time') + login_time = session.get("login_time") if login_time: try: login_datetime = datetime.fromisoformat(login_time) now = datetime.utcnow() # Session nach 24h abgelaufen (falls nicht permanent) - if not session.permanent and (now - login_datetime).total_seconds() > 86400: + if ( + not session.permanent + and (now - login_datetime).total_seconds() > 86400 + ): session.clear() - flash('Your session has expired. Please log in again.', 'warning') - return redirect(url_for('login')) + flash("Your session has expired. Please log in again.", "warning") + return redirect(url_for("login")) except (ValueError, TypeError): # Ungültiger Zeitstempel - Session löschen session.clear() - flash('Invalid session. Please log in again.', 'warning') - return redirect(url_for('login')) + flash("Invalid session. Please log in again.", "warning") + return redirect(url_for("login")) -if __name__ == '__main__': - app.run(debug=True, host='0.0.0.0', port=8080, threaded=True) \ No newline at end of file +if __name__ == "__main__": + app.run(debug=True, host="0.0.0.0", port=8080, threaded=True) diff --git a/main.py b/main.py index c047fa1..49e1f9f 100644 --- a/main.py +++ b/main.py @@ -1,20 +1,20 @@ import logging import os + from frontend.app import app # Logging konfigurieren logging.basicConfig( - level=logging.INFO, - format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' + level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" ) logger = logging.getLogger(__name__) -if __name__ == '__main__': +if __name__ == "__main__": # Environment-Konfiguration - debug = os.getenv('FLASK_DEBUG', 'False').lower() == 'true' - host = os.getenv('FLASK_HOST', '0.0.0.0') - port = int(os.getenv('FLASK_PORT', '8080')) + debug = os.getenv("FLASK_DEBUG", "False").lower() == "true" + host = os.getenv("FLASK_HOST", "0.0.0.0") + port = int(os.getenv("FLASK_PORT", "8080")) logger.info(f"Starting ChatBot application on {host}:{port}") logger.info(f"Debug mode: {debug}") @@ -25,4 +25,4 @@ if __name__ == '__main__': logger.info("Application stopped by user") except Exception as e: logger.error(f"Application failed to start: {e}") - raise \ No newline at end of file + raise diff --git a/services/auth_service.py b/services/auth_service.py index ec8141e..f71f68a 100644 --- a/services/auth_service.py +++ b/services/auth_service.py @@ -1,17 +1,21 @@ import logging -from typing import Optional, Dict, Any +from typing import Any, Dict, Optional + from database.interface import DatabaseInterface from utils.password_utils import PasswordUtils from utils.validation import ValidationUtils logger = logging.getLogger(__name__) + class AuthService: def __init__(self, database: DatabaseInterface): self.db = database - def authenticate(self, email: str, password: str) -> tuple[bool, Optional[Dict[str, Any]], str]: + def authenticate( + self, email: str, password: str + ) -> tuple[bool, Optional[Dict[str, Any]], str]: if not ValidationUtils.validate_email(email): return False, None, "Invalid email format" @@ -21,10 +25,12 @@ class AuthService: # User holen user = self.db.get_user_by_email(email.lower()) if not user: - logger.warning(f"Authentication failed: user not found for email {email}") + logger.warning( + f"Authentication failed: user not found for email {email}" + ) return False, None, "Invalid email or password" # Passwort prüfen - stored_hash = user.get('password_hash') + stored_hash = user.get("password_hash") if not stored_hash: logger.error(f"No password hash found for user {email}") return False, None, "Authentication error" @@ -35,14 +41,16 @@ class AuthService: logger.info(f"Authentication successful for user: {email}") # Sensible Daten nicht zurückgeben safe_user_data = { - 'id': user['id'], - 'username': user['username'], - 'email': user['email'], - 'created_at': user.get('created_at') + "id": user["id"], + "username": user["username"], + "email": user["email"], + "created_at": user.get("created_at"), } return True, safe_user_data, "Authentication successful" else: - logger.warning(f"Authentication failed: wrong password for email {email}") + logger.warning( + f"Authentication failed: wrong password for email {email}" + ) return False, None, "Invalid email or password" except Exception as e: diff --git a/services/user_service.py b/services/user_service.py index e12dd09..83fa5b7 100644 --- a/services/user_service.py +++ b/services/user_service.py @@ -1,16 +1,20 @@ import logging -from typing import Optional, Dict, Any +from typing import Any, Dict, Optional + from database.interface import DatabaseInterface from utils.password_utils import PasswordUtils from utils.validation import ValidationUtils logger = logging.getLogger(__name__) + class UserService: def __init__(self, database: DatabaseInterface): self.db = database - def create_user(self, username: str, email: str, password: str) -> tuple[bool, list[str]]: + def create_user( + self, username: str, email: str, password: str + ) -> tuple[bool, list[str]]: errors = [] # Validierung @@ -53,7 +57,6 @@ class UserService: logger.error(f"Error creating user: {e}") return False, [f"Database error: {str(e)}"] - def get_user_by_email(self, email: str) -> Optional[Dict[str, Any]]: if not ValidationUtils.validate_email(email): return None @@ -70,4 +73,4 @@ class UserService: return self.db.get_user_by_username(username) except Exception as e: logger.error(f"Error getting user by username: {e}") - return None \ No newline at end of file + return None diff --git a/utils/auth_decorators.py b/utils/auth_decorators.py index bc6001b..86a5f01 100644 --- a/utils/auth_decorators.py +++ b/utils/auth_decorators.py @@ -1,43 +1,51 @@ -from functools import wraps -from flask import session, redirect, url_for, flash, request import logging +from functools import wraps + +from flask import flash, redirect, request, session, url_for logger = logging.getLogger(__name__) + def login_required(f): """ Decorator to protect routes that require authentication """ + @wraps(f) def decorated_function(*args, **kwargs): - if 'user_id' not in session or 'email' not in session: - flash('Please log in to access this page', 'warning') + if "user_id" not in session or "email" not in session: + flash("Please log in to access this page", "warning") logger.info(f"Unauthorized access attempt to {request.endpoint}") - return redirect(url_for('login')) + return redirect(url_for("login")) return f(*args, **kwargs) + return decorated_function + def logout_required(f): """ Decorator for routes that should only be accessible when NOT logged in (e.g., login, register pages) """ + @wraps(f) def decorated_function(*args, **kwargs): - if 'user_id' in session: - flash('You are already logged in', 'info') - return redirect(url_for('dashboard')) + if "user_id" in session: + flash("You are already logged in", "info") + return redirect(url_for("dashboard")) return f(*args, **kwargs) + return decorated_function + def get_current_user(): """ Helper function to get current user info from session """ - if 'user_id' in session: + if "user_id" in session: return { - 'id': session['user_id'], - 'username': session['username'], - 'email': session['email'] + "id": session["user_id"], + "username": session["username"], + "email": session["email"], } - return None \ No newline at end of file + return None diff --git a/utils/password_utils.py b/utils/password_utils.py index b08be81..62ccabb 100644 --- a/utils/password_utils.py +++ b/utils/password_utils.py @@ -1,9 +1,10 @@ import hashlib -import secrets import logging +import secrets logger = logging.getLogger(__name__) + class PasswordUtils: @staticmethod def hash_password(password: str, salt: str = None) -> tuple[str, str]: @@ -11,7 +12,7 @@ class PasswordUtils: salt = secrets.token_hex(32) password = password.strip() salted_password = password + salt - hash_object = hashlib.sha512(salted_password.encode('utf-8')) + hash_object = hashlib.sha512(salted_password.encode("utf-8")) password_hash = hash_object.hexdigest() logger.debug("Password hashed successfully") return password_hash, salt @@ -24,4 +25,4 @@ class PasswordUtils: @staticmethod def hash_password_simple(password: str) -> str: password = password.strip() - return hashlib.sha512(password.encode('utf-8')).hexdigest() \ No newline at end of file + return hashlib.sha512(password.encode("utf-8")).hexdigest() diff --git a/utils/validation.py b/utils/validation.py index eee1154..01ed057 100644 --- a/utils/validation.py +++ b/utils/validation.py @@ -1,8 +1,9 @@ -import re import logging +import re logger = logging.getLogger(__name__) + class ValidationUtils: @staticmethod def validate_email(email: str) -> bool: @@ -10,7 +11,7 @@ class ValidationUtils: return False email = email.strip().lower() - pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$' + pattern = r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$" is_valid = bool(re.match(pattern, email)) if not is_valid: @@ -27,7 +28,7 @@ class ValidationUtils: if len(username) < 3 or len(username) > 25: logger.warning(f"Username length invalid: {len(username)}") return False - pattern = r'^[a-zA-Z0-9_]+$' + pattern = r"^[a-zA-Z0-9_]+$" is_valid = bool(re.match(pattern, username)) if not is_valid: logger.warning(f"Invalid username format: {username}") @@ -40,12 +41,14 @@ class ValidationUtils: errors.append("Password is required") return False, errors if len(password) < 4 or len(password) > 50: - errors.append("Password must be at least 4 characters long and must not exceed 128 characters") - if not re.search(r'[A-Z]', password): + errors.append( + "Password must be at least 4 characters long and must not exceed 128 characters" + ) + if not re.search(r"[A-Z]", password): errors.append("Password must contain at least one uppercase letter") - if not re.search(r'[a-z]', password): + if not re.search(r"[a-z]", password): errors.append("Password must contain at least one lowercase letter") - if not re.search(r'\d', password): + if not re.search(r"\d", password): errors.append("Password must contain at least one digit") if not re.search(r'[!@#$%^&*(),.?":{}|<>]', password): errors.append("Password must contain at least one special character") @@ -53,4 +56,4 @@ class ValidationUtils: if not is_valid: logger.warning(f"Password validation failed: {errors}") - return is_valid, errors \ No newline at end of file + return is_valid, errors