Run: black and isort

This commit is contained in:
2025-10-05 02:28:47 +02:00
parent bd3e23da07
commit 6e9f8a0589
12 changed files with 222 additions and 189 deletions

View File

@@ -1,46 +1,44 @@
import os import os
from typing import Dict, Any from typing import Any, Dict
class DatabaseConfig: class DatabaseConfig:
def __init__(self): def __init__(self):
# Environment-basierte Konfiguration # Environment-basierte Konfiguration
self.db_type = os.getenv('DB_TYPE', 'sqlite') self.db_type = os.getenv("DB_TYPE", "sqlite")
self.config = self._load_config() self.config = self._load_config()
def _load_config(self) -> Dict[str, Any]: def _load_config(self) -> Dict[str, Any]:
if self.db_type.lower() == 'sqlite': if self.db_type.lower() == "sqlite":
return {"path": os.getenv("SQLITE_PATH", "databases/chatbot.db")}
elif self.db_type.lower() == "mysql":
return { return {
'path': os.getenv('SQLITE_PATH', 'databases/chatbot.db') "host": os.getenv("MYSQL_HOST", "localhost"),
"port": int(os.getenv("MYSQL_PORT", "3306")),
"database": os.getenv("MYSQL_DATABASE", "chatbot"),
"user": os.getenv("MYSQL_USER", "root"),
"password": os.getenv("MYSQL_PASSWORD", ""),
"charset": "utf8mb4",
} }
elif self.db_type.lower() == 'mysql': elif self.db_type.lower() == "mariadb":
return { return {
'host': os.getenv('MYSQL_HOST', 'localhost'), "host": os.getenv("MARIADB_HOST", "localhost"),
'port': int(os.getenv('MYSQL_PORT', '3306')), "port": int(os.getenv("MARIADB_PORT", "3306")),
'database': os.getenv('MYSQL_DATABASE', 'chatbot'), "database": os.getenv("MARIADB_DATABASE", "chatbot"),
'user': os.getenv('MYSQL_USER', 'root'), "user": os.getenv("MARIADB_USER", "root"),
'password': os.getenv('MYSQL_PASSWORD', ''), "password": os.getenv("MARIADB_PASSWORD", ""),
'charset': 'utf8mb4' "charset": "utf8",
} }
elif self.db_type.lower() == 'mariadb': elif self.db_type.lower() == "postgresql":
return { return {
'host': os.getenv('MARIADB_HOST', 'localhost'), "host": os.getenv("POSTGRES_HOST", "localhost"),
'port': int(os.getenv('MARIADB_PORT', '3306')), "port": int(os.getenv("POSTGRES_PORT", "5432")),
'database': os.getenv('MARIADB_DATABASE', 'chatbot'), "database": os.getenv("POSTGRES_DATABASE", "chatbot"),
'user': os.getenv('MARIADB_USER', 'root'), "user": os.getenv("POSTGRES_USER", "postgres"),
'password': os.getenv('MARIADB_PASSWORD', ''), "password": os.getenv("POSTGRES_PASSWORD", ""),
'charset': 'utf8'
}
elif self.db_type.lower() == 'postgresql':
return {
'host': os.getenv('POSTGRES_HOST', 'localhost'),
'port': int(os.getenv('POSTGRES_PORT', '5432')),
'database': os.getenv('POSTGRES_DATABASE', 'chatbot'),
'user': os.getenv('POSTGRES_USER', 'postgres'),
'password': os.getenv('POSTGRES_PASSWORD', ''),
} }
else: else:
raise ValueError(f"Unsupported database type: {self.db_type}") raise ValueError(f"Unsupported database type: {self.db_type}")
def get_database_config(self) -> tuple[str, Dict[str, Any]]: def get_database_config(self) -> tuple[str, Dict[str, Any]]:
return self.db_type, self.config return self.db_type, self.config

View File

@@ -1,5 +1,6 @@
import logging import logging
from typing import Dict, Any from typing import Any, Dict
from .interface import DatabaseInterface from .interface import DatabaseInterface
from .sqlite_db import SQLiteDatabase from .sqlite_db import SQLiteDatabase
@@ -11,18 +12,18 @@ class DatabaseFactory:
@staticmethod @staticmethod
def create_database(db_type: str, config: Dict[str, Any]) -> DatabaseInterface: def create_database(db_type: str, config: Dict[str, Any]) -> DatabaseInterface:
if db_type.lower() == 'sqlite': if db_type.lower() == "sqlite":
if 'path' not in config: if "path" not in config:
raise ValueError("SQLite configuration requires 'path'") raise ValueError("SQLite configuration requires 'path'")
return SQLiteDatabase(config['path']) return SQLiteDatabase(config["path"])
elif db_type.lower() == 'mysql': elif db_type.lower() == "mysql":
# Für später: MySQL-Implementierung # Für später: MySQL-Implementierung
# from .mysql_db import MySQLDatabase # from .mysql_db import MySQLDatabase
# return MySQLDatabase(config) # return MySQLDatabase(config)
raise NotImplementedError("MySQL support not yet implemented") raise NotImplementedError("MySQL support not yet implemented")
elif db_type.lower() == 'postgresql': elif db_type.lower() == "postgresql":
# Für später: PostgreSQL-Implementierung # Für später: PostgreSQL-Implementierung
# from .postgresql_db import PostgreSQLDatabase # from .postgresql_db import PostgreSQLDatabase
# return PostgreSQLDatabase(config) # return PostgreSQLDatabase(config)

View File

@@ -1,5 +1,7 @@
import logging import logging
from flask import g, current_app
from flask import current_app, g
from .interface import DatabaseInterface from .interface import DatabaseInterface
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -16,7 +18,7 @@ class FlaskDatabaseManager:
Holt die Datenbank-Instanz für den aktuellen Request Holt die Datenbank-Instanz für den aktuellen Request
Verwendet Flask's 'g' object für request-lokale Speicherung Verwendet Flask's 'g' object für request-lokale Speicherung
""" """
if 'database' not in g: if "database" not in g:
g.database = self.database_factory_func() g.database = self.database_factory_func()
g.database.connect() g.database.connect()
g.database.create_user_table() g.database.create_user_table()
@@ -28,11 +30,11 @@ class FlaskDatabaseManager:
""" """
Schließt die Datenbank-Verbindung am Ende des Requests Schließt die Datenbank-Verbindung am Ende des Requests
""" """
database = g.pop('database', None) database = g.pop("database", None)
if database is not None: if database is not None:
database.disconnect() database.disconnect()
logger.debug("Database connection closed for request") logger.debug("Database connection closed for request")
def init_app(self, app): def init_app(self, app):
"""Registriert die Database-Manager-Funktionen bei Flask""" """Registriert die Database-Manager-Funktionen bei Flask"""
app.teardown_appcontext(self.close_db) app.teardown_appcontext(self.close_db)

View File

@@ -1,5 +1,5 @@
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from typing import Optional, List, Dict, Any from typing import Any, Dict, List, Optional
class DatabaseInterface(ABC): class DatabaseInterface(ABC):

View File

@@ -1,7 +1,8 @@
import sqlite3
import logging import logging
import sqlite3
from threading import local from threading import local
from typing import Optional, Dict, Any from typing import Any, Dict, Optional
from .interface import DatabaseInterface from .interface import DatabaseInterface
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -17,12 +18,12 @@ class SQLiteDatabase(DatabaseInterface):
def _get_connection(self) -> sqlite3.Connection: def _get_connection(self) -> sqlite3.Connection:
"""Holt oder erstellt eine thread-lokale Verbindung""" """Holt oder erstellt eine thread-lokale Verbindung"""
if not hasattr(self._local, 'connection') or self._local.connection is None: if not hasattr(self._local, "connection") or self._local.connection is None:
self._local.connection = sqlite3.connect( self._local.connection = sqlite3.connect(
self.db_path, self.db_path,
check_same_thread=False, # Erlaubt thread-übergreifende Nutzung check_same_thread=False, # Erlaubt thread-übergreifende Nutzung
timeout=30.0, timeout=30.0,
detect_types=sqlite3.PARSE_DECLTYPES | sqlite3.PARSE_COLNAMES detect_types=sqlite3.PARSE_DECLTYPES | sqlite3.PARSE_COLNAMES,
) )
self._local.connection.row_factory = sqlite3.Row self._local.connection.row_factory = sqlite3.Row
# Optimierungen für bessere Performance # Optimierungen für bessere Performance
@@ -45,7 +46,7 @@ class SQLiteDatabase(DatabaseInterface):
def disconnect(self) -> None: def disconnect(self) -> None:
"""Schließt die thread-lokale Verbindung""" """Schließt die thread-lokale Verbindung"""
if hasattr(self._local, 'connection') and self._local.connection: if hasattr(self._local, "connection") and self._local.connection:
try: try:
self._local.connection.close() self._local.connection.close()
self._local.connection = None self._local.connection = None
@@ -58,7 +59,8 @@ class SQLiteDatabase(DatabaseInterface):
conn = self._get_connection() conn = self._get_connection()
cursor = conn.cursor() cursor = conn.cursor()
try: try:
cursor.execute(""" cursor.execute(
"""
CREATE TABLE IF NOT EXISTS users CREATE TABLE IF NOT EXISTS users
( (
id INTEGER PRIMARY KEY AUTOINCREMENT, id INTEGER PRIMARY KEY AUTOINCREMENT,
@@ -66,7 +68,8 @@ class SQLiteDatabase(DatabaseInterface):
email TEXT NOT NULL UNIQUE, email TEXT NOT NULL UNIQUE,
password_hash TEXT NOT NULL, password_hash TEXT NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP )
""") """
)
conn.commit() conn.commit()
logger.info("User table created/verified") logger.info("User table created/verified")
except Exception as e: except Exception as e:
@@ -82,7 +85,7 @@ class SQLiteDatabase(DatabaseInterface):
try: try:
cursor.execute( cursor.execute(
"INSERT INTO users (username, email, password_hash) VALUES (?, ?, ?)", "INSERT INTO users (username, email, password_hash) VALUES (?, ?, ?)",
(username, email, password_hash) (username, email, password_hash),
) )
conn.commit() conn.commit()
logger.info(f"User created successfully: {email}") logger.info(f"User created successfully: {email}")
@@ -105,7 +108,7 @@ class SQLiteDatabase(DatabaseInterface):
try: try:
cursor.execute( cursor.execute(
"SELECT id, username, email, password_hash, created_at FROM users WHERE email = ?", "SELECT id, username, email, password_hash, created_at FROM users WHERE email = ?",
(email,) (email,),
) )
row = cursor.fetchone() row = cursor.fetchone()
return dict(row) if row else None return dict(row) if row else None
@@ -122,7 +125,7 @@ class SQLiteDatabase(DatabaseInterface):
try: try:
cursor.execute( cursor.execute(
"SELECT id, username, email, password_hash, created_at FROM users WHERE username = ?", "SELECT id, username, email, password_hash, created_at FROM users WHERE username = ?",
(username,) (username,),
) )
row = cursor.fetchone() row = cursor.fetchone()
return dict(row) if row else None return dict(row) if row else None
@@ -137,10 +140,7 @@ class SQLiteDatabase(DatabaseInterface):
conn = self._get_connection() conn = self._get_connection()
cursor = conn.cursor() cursor = conn.cursor()
try: try:
cursor.execute( cursor.execute("SELECT password_hash FROM users WHERE email = ?", (email,))
"SELECT password_hash FROM users WHERE email = ?",
(email,)
)
row = cursor.fetchone() row = cursor.fetchone()
return row if row else None return row if row else None
except Exception as e: except Exception as e:
@@ -156,7 +156,7 @@ class SQLiteDatabase(DatabaseInterface):
try: try:
cursor.execute( cursor.execute(
"UPDATE users SET password_hash = ?, updated_at = CURRENT_TIMESTAMP WHERE email = ?", "UPDATE users SET password_hash = ?, updated_at = CURRENT_TIMESTAMP WHERE email = ?",
(new_password_hash, email) (new_password_hash, email),
) )
conn.commit() conn.commit()
success = cursor.rowcount > 0 success = cursor.rowcount > 0
@@ -186,4 +186,4 @@ class SQLiteDatabase(DatabaseInterface):
conn.rollback() conn.rollback()
raise raise
finally: finally:
cursor.close() cursor.close()

View File

@@ -1,25 +1,32 @@
from datetime import datetime, timedelta # Add this line at the top
from flask import Flask, render_template, request, redirect, url_for, flash, session
import logging import logging
from datetime import datetime, timedelta # Add this line at the top
from flask import (Flask, flash, redirect, render_template, request, session,
url_for)
from config.database import DatabaseConfig from config.database import DatabaseConfig
from database import DatabaseFactory from database import DatabaseFactory
from database.flask_integration import FlaskDatabaseManager from database.flask_integration import FlaskDatabaseManager
from services.user_service import UserService
from services.auth_service import AuthService from services.auth_service import AuthService
from utils.auth_decorators import login_required, logout_required, get_current_user from services.user_service import UserService
from utils.auth_decorators import (get_current_user, login_required,
logout_required)
# Logging konfigurieren # Logging konfigurieren
logging.basicConfig(level=logging.INFO) logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
app = Flask(__name__) app = Flask(__name__)
app.secret_key = 'your-secret-key-change-this-in-production' app.secret_key = "your-secret-key-change-this-in-production"
# Session-Konfiguration # Session-Konfiguration
app.config['PERMANENT_SESSION_LIFETIME'] = timedelta(hours=24) # Session läuft nach 24h ab app.config["PERMANENT_SESSION_LIFETIME"] = timedelta(
app.config['SESSION_COOKIE_SECURE'] = False # Für Development - in Production auf True setzen hours=24
app.config['SESSION_COOKIE_HTTPONLY'] = True # Verhindert XSS-Angriffe ) # Session läuft nach 24h ab
app.config["SESSION_COOKIE_SECURE"] = (
False # Für Development - in Production auf True setzen
)
app.config["SESSION_COOKIE_HTTPONLY"] = True # Verhindert XSS-Angriffe
# Database Manager initialisieren # Database Manager initialisieren
@@ -40,35 +47,35 @@ db_manager.init_app(app)
@app.context_processor @app.context_processor
def inject_user(): def inject_user():
"""Macht User-Daten in allen Templates verfügbar""" """Macht User-Daten in allen Templates verfügbar"""
return {'current_user': get_current_user()} return {"current_user": get_current_user()}
@app.route('/') @app.route("/")
def home(): def home():
"""Startseite - Weiterleitung je nach Login-Status""" """Startseite - Weiterleitung je nach Login-Status"""
if 'user_id' in session: if "user_id" in session:
return redirect(url_for('dashboard')) return redirect(url_for("dashboard"))
return redirect(url_for('login')) return redirect(url_for("login"))
@app.route('/register', methods=['GET', 'POST']) @app.route("/register", methods=["GET", "POST"])
@logout_required @logout_required
def register(): def register():
"""Registrierung - nur für nicht eingeloggte User""" """Registrierung - nur für nicht eingeloggte User"""
if request.method == 'POST': if request.method == "POST":
username = request.form.get('username', '').strip() username = request.form.get("username", "").strip()
email = request.form.get('email', '').strip() email = request.form.get("email", "").strip()
password = request.form.get('password', '') password = request.form.get("password", "")
confirm_password = request.form.get('confirm_password', '') confirm_password = request.form.get("confirm_password", "")
# Basis-Validierung # Basis-Validierung
if not username or not email or not password or not confirm_password: if not username or not email or not password or not confirm_password:
flash('Please fill out all fields', 'error') flash("Please fill out all fields", "error")
return redirect(url_for('register')) return redirect(url_for("register"))
if password != confirm_password: if password != confirm_password:
flash('Passwords do not match', 'error') flash("Passwords do not match", "error")
return redirect(url_for('register')) return redirect(url_for("register"))
# Services mit request-lokaler DB-Instanz # Services mit request-lokaler DB-Instanz
database = db_manager.get_db() database = db_manager.get_db()
@@ -78,29 +85,29 @@ def register():
success, errors = user_service.create_user(username, email, password) success, errors = user_service.create_user(username, email, password)
if success: if success:
flash('Registration successful! Please log in.', 'success') flash("Registration successful! Please log in.", "success")
logger.info(f"New user registered: {email}") logger.info(f"New user registered: {email}")
return redirect(url_for('login')) return redirect(url_for("login"))
else: else:
for error in errors: for error in errors:
flash(error, 'error') flash(error, "error")
return redirect(url_for('register')) return redirect(url_for("register"))
return render_template('register.html') return render_template("register.html")
@app.route('/login', methods=['GET', 'POST']) @app.route("/login", methods=["GET", "POST"])
@logout_required @logout_required
def login(): def login():
"""Login - nur für nicht eingeloggte User""" """Login - nur für nicht eingeloggte User"""
if request.method == 'POST': if request.method == "POST":
email = request.form.get('email', '').strip() email = request.form.get("email", "").strip()
password = request.form.get('password', '') password = request.form.get("password", "")
remember_me = request.form.get('remember_me') # Checkbox für "Remember Me" remember_me = request.form.get("remember_me") # Checkbox für "Remember Me"
if not email or not password: if not email or not password:
flash('Please enter email and password', 'error') flash("Please enter email and password", "error")
return redirect(url_for('login')) return redirect(url_for("login"))
# Services mit request-lokaler DB-Instanz # Services mit request-lokaler DB-Instanz
database = db_manager.get_db() database = db_manager.get_db()
@@ -111,50 +118,50 @@ def login():
if success: if success:
# Session setzen # Session setzen
session['user_id'] = user_data['id'] session["user_id"] = user_data["id"]
session['username'] = user_data['username'] session["username"] = user_data["username"]
session['email'] = user_data['email'] session["email"] = user_data["email"]
session['login_time'] = datetime.utcnow().isoformat() session["login_time"] = datetime.utcnow().isoformat()
# Permanent session wenn "Remember Me" aktiviert # Permanent session wenn "Remember Me" aktiviert
if remember_me: if remember_me:
session.permanent = True session.permanent = True
flash(f'Welcome back, {user_data["username"]}!', 'success') flash(f'Welcome back, {user_data["username"]}!', "success")
logger.info(f"User logged in: {email}") logger.info(f"User logged in: {email}")
# Weiterleitung zu ursprünglich angeforderte Seite (falls vorhanden) # Weiterleitung zu ursprünglich angeforderte Seite (falls vorhanden)
next_page = request.args.get('next') next_page = request.args.get("next")
if next_page: if next_page:
return redirect(next_page) return redirect(next_page)
return redirect(url_for('dashboard')) return redirect(url_for("dashboard"))
else: else:
flash(message, 'error') flash(message, "error")
return redirect(url_for('login')) return redirect(url_for("login"))
return render_template('login.html') return render_template("login.html")
@app.route('/logout') @app.route("/logout")
@login_required @login_required
def logout(): def logout():
"""Logout - nur für eingeloggte User""" """Logout - nur für eingeloggte User"""
user_email = session.get('email') user_email = session.get("email")
username = session.get('username') username = session.get("username")
# Session komplett löschen # Session komplett löschen
session.clear() session.clear()
flash(f'Goodbye, {username}! You have been logged out successfully.', 'info') flash(f"Goodbye, {username}! You have been logged out successfully.", "info")
if user_email: if user_email:
logger.info(f"User logged out: {user_email}") logger.info(f"User logged out: {user_email}")
return redirect(url_for('login')) return redirect(url_for("login"))
@app.route('/dashboard') @app.route("/dashboard")
@login_required @login_required
def dashboard(): def dashboard():
"""Dashboard - nur für eingeloggte User""" """Dashboard - nur für eingeloggte User"""
@@ -162,19 +169,17 @@ def dashboard():
# Zusätzliche Dashboard-Daten (optional) # Zusätzliche Dashboard-Daten (optional)
dashboard_data = { dashboard_data = {
'total_users': 'N/A', # Könnte aus DB geholt werden "total_users": "N/A", # Könnte aus DB geholt werden
'last_login': session.get('login_time', 'Unknown'), "last_login": session.get("login_time", "Unknown"),
'session_expires': 'Never' if session.permanent else '24 hours' "session_expires": "Never" if session.permanent else "24 hours",
} }
logger.debug(f"Dashboard accessed by user: {user['email']}") logger.debug(f"Dashboard accessed by user: {user['email']}")
return render_template('dashboard.html', return render_template("dashboard.html", user=user, dashboard_data=dashboard_data)
user=user,
dashboard_data=dashboard_data)
@app.route('/profile') @app.route("/profile")
@login_required @login_required
def profile(): def profile():
"""User Profile - nur für eingeloggte User""" """User Profile - nur für eingeloggte User"""
@@ -182,65 +187,66 @@ def profile():
# Hier könnten zusätzliche User-Daten aus der DB geholt werden # Hier könnten zusätzliche User-Daten aus der DB geholt werden
database = db_manager.get_db() database = db_manager.get_db()
full_user_data = database.get_user_by_email(user['email']) full_user_data = database.get_user_by_email(user["email"])
return render_template('profile.html', user=full_user_data) return render_template("profile.html", user=full_user_data)
@app.route('/change-password', methods=['GET', 'POST']) @app.route("/change-password", methods=["GET", "POST"])
@login_required @login_required
def change_password(): def change_password():
"""Passwort ändern - nur für eingeloggte User""" """Passwort ändern - nur für eingeloggte User"""
if request.method == 'POST': if request.method == "POST":
current_password = request.form.get('current_password', '') current_password = request.form.get("current_password", "")
new_password = request.form.get('new_password', '') new_password = request.form.get("new_password", "")
confirm_password = request.form.get('confirm_password', '') confirm_password = request.form.get("confirm_password", "")
if not current_password or not new_password or not confirm_password: if not current_password or not new_password or not confirm_password:
flash('Please fill out all fields', 'error') flash("Please fill out all fields", "error")
return redirect(url_for('change_password')) return redirect(url_for("change_password"))
if new_password != confirm_password: if new_password != confirm_password:
flash('New passwords do not match', 'error') flash("New passwords do not match", "error")
return redirect(url_for('change_password')) return redirect(url_for("change_password"))
# Aktuelles Passwort verifizieren # Aktuelles Passwort verifizieren
user = get_current_user() user = get_current_user()
database = db_manager.get_db() database = db_manager.get_db()
auth_service = AuthService(database) auth_service = AuthService(database)
success, _, message = auth_service.authenticate(user['email'], current_password) success, _, message = auth_service.authenticate(user["email"], current_password)
if not success: if not success:
flash('Current password is incorrect', 'error') flash("Current password is incorrect", "error")
return redirect(url_for('change_password')) return redirect(url_for("change_password"))
# Neues Passwort setzen # Neues Passwort setzen
from utils.password_utils import PasswordUtils from utils.password_utils import PasswordUtils
new_password_hash = PasswordUtils.hash_password_simple(new_password) new_password_hash = PasswordUtils.hash_password_simple(new_password)
if database.update_user_password(user['email'], new_password_hash): if database.update_user_password(user["email"], new_password_hash):
flash('Password changed successfully', 'success') flash("Password changed successfully", "success")
logger.info(f"Password changed for user: {user['email']}") logger.info(f"Password changed for user: {user['email']}")
return redirect(url_for('dashboard')) return redirect(url_for("dashboard"))
else: else:
flash('Failed to change password', 'error') flash("Failed to change password", "error")
return redirect(url_for('change_password')) return redirect(url_for("change_password"))
return render_template('change_password.html') return render_template("change_password.html")
# Error Handlers # Error Handlers
@app.errorhandler(404) @app.errorhandler(404)
def not_found(error): def not_found(error):
return render_template('404.html'), 404 return render_template("404.html"), 404
@app.errorhandler(500) @app.errorhandler(500)
def internal_error(error): def internal_error(error):
logger.error(f"Internal server error: {error}") logger.error(f"Internal server error: {error}")
flash('An internal error occurred. Please try again.', 'error') flash("An internal error occurred. Please try again.", "error")
return redirect(url_for('home')) return redirect(url_for("home"))
# Session Timeout Check # Session Timeout Check
@@ -249,25 +255,28 @@ def check_session_timeout():
"""Prüft ob Session abgelaufen ist""" """Prüft ob Session abgelaufen ist"""
from datetime import datetime from datetime import datetime
if 'user_id' in session: if "user_id" in session:
# Prüfe ob Session zu alt ist (optional) # Prüfe ob Session zu alt ist (optional)
login_time = session.get('login_time') login_time = session.get("login_time")
if login_time: if login_time:
try: try:
login_datetime = datetime.fromisoformat(login_time) login_datetime = datetime.fromisoformat(login_time)
now = datetime.utcnow() now = datetime.utcnow()
# Session nach 24h abgelaufen (falls nicht permanent) # Session nach 24h abgelaufen (falls nicht permanent)
if not session.permanent and (now - login_datetime).total_seconds() > 86400: if (
not session.permanent
and (now - login_datetime).total_seconds() > 86400
):
session.clear() session.clear()
flash('Your session has expired. Please log in again.', 'warning') flash("Your session has expired. Please log in again.", "warning")
return redirect(url_for('login')) return redirect(url_for("login"))
except (ValueError, TypeError): except (ValueError, TypeError):
# Ungültiger Zeitstempel - Session löschen # Ungültiger Zeitstempel - Session löschen
session.clear() session.clear()
flash('Invalid session. Please log in again.', 'warning') flash("Invalid session. Please log in again.", "warning")
return redirect(url_for('login')) return redirect(url_for("login"))
if __name__ == '__main__': if __name__ == "__main__":
app.run(debug=True, host='0.0.0.0', port=8080, threaded=True) app.run(debug=True, host="0.0.0.0", port=8080, threaded=True)

14
main.py
View File

@@ -1,20 +1,20 @@
import logging import logging
import os import os
from frontend.app import app from frontend.app import app
# Logging konfigurieren # Logging konfigurieren
logging.basicConfig( logging.basicConfig(
level=logging.INFO, level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
) )
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
if __name__ == '__main__': if __name__ == "__main__":
# Environment-Konfiguration # Environment-Konfiguration
debug = os.getenv('FLASK_DEBUG', 'False').lower() == 'true' debug = os.getenv("FLASK_DEBUG", "False").lower() == "true"
host = os.getenv('FLASK_HOST', '0.0.0.0') host = os.getenv("FLASK_HOST", "0.0.0.0")
port = int(os.getenv('FLASK_PORT', '8080')) port = int(os.getenv("FLASK_PORT", "8080"))
logger.info(f"Starting ChatBot application on {host}:{port}") logger.info(f"Starting ChatBot application on {host}:{port}")
logger.info(f"Debug mode: {debug}") logger.info(f"Debug mode: {debug}")
@@ -25,4 +25,4 @@ if __name__ == '__main__':
logger.info("Application stopped by user") logger.info("Application stopped by user")
except Exception as e: except Exception as e:
logger.error(f"Application failed to start: {e}") logger.error(f"Application failed to start: {e}")
raise raise

View File

@@ -1,17 +1,21 @@
import logging import logging
from typing import Optional, Dict, Any from typing import Any, Dict, Optional
from database.interface import DatabaseInterface from database.interface import DatabaseInterface
from utils.password_utils import PasswordUtils from utils.password_utils import PasswordUtils
from utils.validation import ValidationUtils from utils.validation import ValidationUtils
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class AuthService: class AuthService:
def __init__(self, database: DatabaseInterface): def __init__(self, database: DatabaseInterface):
self.db = database self.db = database
def authenticate(self, email: str, password: str) -> tuple[bool, Optional[Dict[str, Any]], str]: def authenticate(
self, email: str, password: str
) -> tuple[bool, Optional[Dict[str, Any]], str]:
if not ValidationUtils.validate_email(email): if not ValidationUtils.validate_email(email):
return False, None, "Invalid email format" return False, None, "Invalid email format"
@@ -21,10 +25,12 @@ class AuthService:
# User holen # User holen
user = self.db.get_user_by_email(email.lower()) user = self.db.get_user_by_email(email.lower())
if not user: if not user:
logger.warning(f"Authentication failed: user not found for email {email}") logger.warning(
f"Authentication failed: user not found for email {email}"
)
return False, None, "Invalid email or password" return False, None, "Invalid email or password"
# Passwort prüfen # Passwort prüfen
stored_hash = user.get('password_hash') stored_hash = user.get("password_hash")
if not stored_hash: if not stored_hash:
logger.error(f"No password hash found for user {email}") logger.error(f"No password hash found for user {email}")
return False, None, "Authentication error" return False, None, "Authentication error"
@@ -35,14 +41,16 @@ class AuthService:
logger.info(f"Authentication successful for user: {email}") logger.info(f"Authentication successful for user: {email}")
# Sensible Daten nicht zurückgeben # Sensible Daten nicht zurückgeben
safe_user_data = { safe_user_data = {
'id': user['id'], "id": user["id"],
'username': user['username'], "username": user["username"],
'email': user['email'], "email": user["email"],
'created_at': user.get('created_at') "created_at": user.get("created_at"),
} }
return True, safe_user_data, "Authentication successful" return True, safe_user_data, "Authentication successful"
else: else:
logger.warning(f"Authentication failed: wrong password for email {email}") logger.warning(
f"Authentication failed: wrong password for email {email}"
)
return False, None, "Invalid email or password" return False, None, "Invalid email or password"
except Exception as e: except Exception as e:

View File

@@ -1,16 +1,20 @@
import logging import logging
from typing import Optional, Dict, Any from typing import Any, Dict, Optional
from database.interface import DatabaseInterface from database.interface import DatabaseInterface
from utils.password_utils import PasswordUtils from utils.password_utils import PasswordUtils
from utils.validation import ValidationUtils from utils.validation import ValidationUtils
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class UserService: class UserService:
def __init__(self, database: DatabaseInterface): def __init__(self, database: DatabaseInterface):
self.db = database self.db = database
def create_user(self, username: str, email: str, password: str) -> tuple[bool, list[str]]: def create_user(
self, username: str, email: str, password: str
) -> tuple[bool, list[str]]:
errors = [] errors = []
# Validierung # Validierung
@@ -53,7 +57,6 @@ class UserService:
logger.error(f"Error creating user: {e}") logger.error(f"Error creating user: {e}")
return False, [f"Database error: {str(e)}"] return False, [f"Database error: {str(e)}"]
def get_user_by_email(self, email: str) -> Optional[Dict[str, Any]]: def get_user_by_email(self, email: str) -> Optional[Dict[str, Any]]:
if not ValidationUtils.validate_email(email): if not ValidationUtils.validate_email(email):
return None return None
@@ -70,4 +73,4 @@ class UserService:
return self.db.get_user_by_username(username) return self.db.get_user_by_username(username)
except Exception as e: except Exception as e:
logger.error(f"Error getting user by username: {e}") logger.error(f"Error getting user by username: {e}")
return None return None

View File

@@ -1,43 +1,51 @@
from functools import wraps
from flask import session, redirect, url_for, flash, request
import logging import logging
from functools import wraps
from flask import flash, redirect, request, session, url_for
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
def login_required(f): def login_required(f):
""" """
Decorator to protect routes that require authentication Decorator to protect routes that require authentication
""" """
@wraps(f) @wraps(f)
def decorated_function(*args, **kwargs): def decorated_function(*args, **kwargs):
if 'user_id' not in session or 'email' not in session: if "user_id" not in session or "email" not in session:
flash('Please log in to access this page', 'warning') flash("Please log in to access this page", "warning")
logger.info(f"Unauthorized access attempt to {request.endpoint}") logger.info(f"Unauthorized access attempt to {request.endpoint}")
return redirect(url_for('login')) return redirect(url_for("login"))
return f(*args, **kwargs) return f(*args, **kwargs)
return decorated_function return decorated_function
def logout_required(f): def logout_required(f):
""" """
Decorator for routes that should only be accessible when NOT logged in Decorator for routes that should only be accessible when NOT logged in
(e.g., login, register pages) (e.g., login, register pages)
""" """
@wraps(f) @wraps(f)
def decorated_function(*args, **kwargs): def decorated_function(*args, **kwargs):
if 'user_id' in session: if "user_id" in session:
flash('You are already logged in', 'info') flash("You are already logged in", "info")
return redirect(url_for('dashboard')) return redirect(url_for("dashboard"))
return f(*args, **kwargs) return f(*args, **kwargs)
return decorated_function return decorated_function
def get_current_user(): def get_current_user():
""" """
Helper function to get current user info from session Helper function to get current user info from session
""" """
if 'user_id' in session: if "user_id" in session:
return { return {
'id': session['user_id'], "id": session["user_id"],
'username': session['username'], "username": session["username"],
'email': session['email'] "email": session["email"],
} }
return None return None

View File

@@ -1,9 +1,10 @@
import hashlib import hashlib
import secrets
import logging import logging
import secrets
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class PasswordUtils: class PasswordUtils:
@staticmethod @staticmethod
def hash_password(password: str, salt: str = None) -> tuple[str, str]: def hash_password(password: str, salt: str = None) -> tuple[str, str]:
@@ -11,7 +12,7 @@ class PasswordUtils:
salt = secrets.token_hex(32) salt = secrets.token_hex(32)
password = password.strip() password = password.strip()
salted_password = password + salt salted_password = password + salt
hash_object = hashlib.sha512(salted_password.encode('utf-8')) hash_object = hashlib.sha512(salted_password.encode("utf-8"))
password_hash = hash_object.hexdigest() password_hash = hash_object.hexdigest()
logger.debug("Password hashed successfully") logger.debug("Password hashed successfully")
return password_hash, salt return password_hash, salt
@@ -24,4 +25,4 @@ class PasswordUtils:
@staticmethod @staticmethod
def hash_password_simple(password: str) -> str: def hash_password_simple(password: str) -> str:
password = password.strip() password = password.strip()
return hashlib.sha512(password.encode('utf-8')).hexdigest() return hashlib.sha512(password.encode("utf-8")).hexdigest()

View File

@@ -1,8 +1,9 @@
import re
import logging import logging
import re
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class ValidationUtils: class ValidationUtils:
@staticmethod @staticmethod
def validate_email(email: str) -> bool: def validate_email(email: str) -> bool:
@@ -10,7 +11,7 @@ class ValidationUtils:
return False return False
email = email.strip().lower() email = email.strip().lower()
pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$' pattern = r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$"
is_valid = bool(re.match(pattern, email)) is_valid = bool(re.match(pattern, email))
if not is_valid: if not is_valid:
@@ -27,7 +28,7 @@ class ValidationUtils:
if len(username) < 3 or len(username) > 25: if len(username) < 3 or len(username) > 25:
logger.warning(f"Username length invalid: {len(username)}") logger.warning(f"Username length invalid: {len(username)}")
return False return False
pattern = r'^[a-zA-Z0-9_]+$' pattern = r"^[a-zA-Z0-9_]+$"
is_valid = bool(re.match(pattern, username)) is_valid = bool(re.match(pattern, username))
if not is_valid: if not is_valid:
logger.warning(f"Invalid username format: {username}") logger.warning(f"Invalid username format: {username}")
@@ -40,12 +41,14 @@ class ValidationUtils:
errors.append("Password is required") errors.append("Password is required")
return False, errors return False, errors
if len(password) < 4 or len(password) > 50: if len(password) < 4 or len(password) > 50:
errors.append("Password must be at least 4 characters long and must not exceed 128 characters") errors.append(
if not re.search(r'[A-Z]', password): "Password must be at least 4 characters long and must not exceed 128 characters"
)
if not re.search(r"[A-Z]", password):
errors.append("Password must contain at least one uppercase letter") errors.append("Password must contain at least one uppercase letter")
if not re.search(r'[a-z]', password): if not re.search(r"[a-z]", password):
errors.append("Password must contain at least one lowercase letter") errors.append("Password must contain at least one lowercase letter")
if not re.search(r'\d', password): if not re.search(r"\d", password):
errors.append("Password must contain at least one digit") errors.append("Password must contain at least one digit")
if not re.search(r'[!@#$%^&*(),.?":{}|<>]', password): if not re.search(r'[!@#$%^&*(),.?":{}|<>]', password):
errors.append("Password must contain at least one special character") errors.append("Password must contain at least one special character")
@@ -53,4 +56,4 @@ class ValidationUtils:
if not is_valid: if not is_valid:
logger.warning(f"Password validation failed: {errors}") logger.warning(f"Password validation failed: {errors}")
return is_valid, errors return is_valid, errors