283 lines
9.2 KiB
Python
283 lines
9.2 KiB
Python
import logging
|
|
from datetime import datetime, timedelta # Add this line at the top
|
|
|
|
from flask import (Flask, flash, redirect, render_template, request, session,
|
|
url_for)
|
|
|
|
from config.database import DatabaseConfig
|
|
from database import DatabaseFactory
|
|
from database.flask_integration import FlaskDatabaseManager
|
|
from services.auth_service import AuthService
|
|
from services.user_service import UserService
|
|
from utils.auth_decorators import (get_current_user, login_required,
|
|
logout_required)
|
|
|
|
# Logging konfigurieren
|
|
logging.basicConfig(level=logging.INFO)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
app = Flask(__name__)
|
|
app.secret_key = "your-secret-key-change-this-in-production"
|
|
|
|
# Session-Konfiguration
|
|
app.config["PERMANENT_SESSION_LIFETIME"] = timedelta(
|
|
hours=24
|
|
) # Session läuft nach 24h ab
|
|
app.config["SESSION_COOKIE_SECURE"] = (
|
|
False # Für Development - in Production auf True setzen
|
|
)
|
|
app.config["SESSION_COOKIE_HTTPONLY"] = True # Verhindert XSS-Angriffe
|
|
|
|
|
|
# Database Manager initialisieren
|
|
db_config = DatabaseConfig()
|
|
db_type, config = db_config.get_database_config()
|
|
|
|
|
|
def create_database():
|
|
"""Factory-Funktion für Datenbank-Instanzen"""
|
|
return DatabaseFactory.create_database(db_type, config)
|
|
|
|
|
|
db_manager = FlaskDatabaseManager(create_database)
|
|
db_manager.init_app(app)
|
|
|
|
|
|
# Template-Context für alle Templates
|
|
@app.context_processor
|
|
def inject_user():
|
|
"""Macht User-Daten in allen Templates verfügbar"""
|
|
return {"current_user": get_current_user()}
|
|
|
|
|
|
@app.route("/")
|
|
def home():
|
|
"""Startseite - Weiterleitung je nach Login-Status"""
|
|
if "user_id" in session:
|
|
return redirect(url_for("dashboard"))
|
|
return redirect(url_for("login"))
|
|
|
|
|
|
@app.route("/register", methods=["GET", "POST"])
|
|
@logout_required
|
|
def register():
|
|
"""Registrierung - nur für nicht eingeloggte User"""
|
|
if request.method == "POST":
|
|
username = request.form.get("username", "").strip()
|
|
email = request.form.get("email", "").strip()
|
|
password = request.form.get("password", "")
|
|
confirm_password = request.form.get("confirm_password", "")
|
|
|
|
# Basis-Validierung
|
|
if not username or not email or not password or not confirm_password:
|
|
flash("Please fill out all fields", "error")
|
|
return redirect(url_for("register"))
|
|
|
|
if password != confirm_password:
|
|
flash("Passwords do not match", "error")
|
|
return redirect(url_for("register"))
|
|
|
|
# Services mit request-lokaler DB-Instanz
|
|
database = db_manager.get_db()
|
|
user_service = UserService(database)
|
|
|
|
# User erstellen
|
|
success, errors = user_service.create_user(username, email, password)
|
|
|
|
if success:
|
|
flash("Registration successful! Please log in.", "success")
|
|
logger.info(f"New user registered: {email}")
|
|
return redirect(url_for("login"))
|
|
else:
|
|
for error in errors:
|
|
flash(error, "error")
|
|
return redirect(url_for("register"))
|
|
|
|
return render_template("register.html")
|
|
|
|
|
|
@app.route("/login", methods=["GET", "POST"])
|
|
@logout_required
|
|
def login():
|
|
"""Login - nur für nicht eingeloggte User"""
|
|
if request.method == "POST":
|
|
email = request.form.get("email", "").strip()
|
|
password = request.form.get("password", "")
|
|
remember_me = request.form.get("remember_me") # Checkbox für "Remember Me"
|
|
|
|
if not email or not password:
|
|
flash("Please enter email and password", "error")
|
|
return redirect(url_for("login"))
|
|
|
|
# Services mit request-lokaler DB-Instanz
|
|
database = db_manager.get_db()
|
|
auth_service = AuthService(database)
|
|
|
|
# Authentifizierung
|
|
success, user_data, message = auth_service.authenticate(email, password)
|
|
|
|
if success:
|
|
# Session setzen
|
|
session["user_id"] = user_data["id"]
|
|
session["username"] = user_data["username"]
|
|
session["email"] = user_data["email"]
|
|
session["login_time"] = datetime.utcnow().isoformat()
|
|
|
|
# Permanent session wenn "Remember Me" aktiviert
|
|
if remember_me:
|
|
session.permanent = True
|
|
|
|
flash(f'Welcome back, {user_data["username"]}!', "success")
|
|
logger.info(f"User logged in: {email}")
|
|
|
|
# Weiterleitung zu ursprünglich angeforderte Seite (falls vorhanden)
|
|
next_page = request.args.get("next")
|
|
if next_page:
|
|
return redirect(next_page)
|
|
|
|
return redirect(url_for("dashboard"))
|
|
else:
|
|
flash(message, "error")
|
|
return redirect(url_for("login"))
|
|
|
|
return render_template("login.html")
|
|
|
|
|
|
@app.route("/logout")
|
|
@login_required
|
|
def logout():
|
|
"""Logout - nur für eingeloggte User"""
|
|
user_email = session.get("email")
|
|
username = session.get("username")
|
|
|
|
# Session komplett löschen
|
|
session.clear()
|
|
|
|
flash(f"Goodbye, {username}! You have been logged out successfully.", "info")
|
|
|
|
if user_email:
|
|
logger.info(f"User logged out: {user_email}")
|
|
|
|
return redirect(url_for("login"))
|
|
|
|
|
|
@app.route("/dashboard")
|
|
@login_required
|
|
def dashboard():
|
|
"""Dashboard - nur für eingeloggte User"""
|
|
user = get_current_user()
|
|
|
|
# Zusätzliche Dashboard-Daten (optional)
|
|
dashboard_data = {
|
|
"total_users": "N/A", # Könnte aus DB geholt werden
|
|
"last_login": session.get("login_time", "Unknown"),
|
|
"session_expires": "Never" if session.permanent else "24 hours",
|
|
}
|
|
|
|
logger.debug(f"Dashboard accessed by user: {user['email']}")
|
|
|
|
return render_template("dashboard.html", user=user, dashboard_data=dashboard_data)
|
|
|
|
|
|
@app.route("/profile")
|
|
@login_required
|
|
def profile():
|
|
"""User Profile - nur für eingeloggte User"""
|
|
user = get_current_user()
|
|
|
|
# Hier könnten zusätzliche User-Daten aus der DB geholt werden
|
|
database = db_manager.get_db()
|
|
full_user_data = database.get_user_by_email(user["email"])
|
|
|
|
return render_template("profile.html", user=full_user_data)
|
|
|
|
|
|
@app.route("/change-password", methods=["GET", "POST"])
|
|
@login_required
|
|
def change_password():
|
|
"""Passwort ändern - nur für eingeloggte User"""
|
|
if request.method == "POST":
|
|
current_password = request.form.get("current_password", "")
|
|
new_password = request.form.get("new_password", "")
|
|
confirm_password = request.form.get("confirm_password", "")
|
|
|
|
if not current_password or not new_password or not confirm_password:
|
|
flash("Please fill out all fields", "error")
|
|
return redirect(url_for("change_password"))
|
|
|
|
if new_password != confirm_password:
|
|
flash("New passwords do not match", "error")
|
|
return redirect(url_for("change_password"))
|
|
|
|
# Aktuelles Passwort verifizieren
|
|
user = get_current_user()
|
|
database = db_manager.get_db()
|
|
auth_service = AuthService(database)
|
|
|
|
success, _, message = auth_service.authenticate(user["email"], current_password)
|
|
|
|
if not success:
|
|
flash("Current password is incorrect", "error")
|
|
return redirect(url_for("change_password"))
|
|
|
|
# Neues Passwort setzen
|
|
from utils.password_utils import PasswordUtils
|
|
|
|
new_password_hash = PasswordUtils.hash_password_simple(new_password)
|
|
|
|
if database.update_user_password(user["email"], new_password_hash):
|
|
flash("Password changed successfully", "success")
|
|
logger.info(f"Password changed for user: {user['email']}")
|
|
return redirect(url_for("dashboard"))
|
|
else:
|
|
flash("Failed to change password", "error")
|
|
return redirect(url_for("change_password"))
|
|
|
|
return render_template("change_password.html")
|
|
|
|
|
|
# Error Handlers
|
|
@app.errorhandler(404)
|
|
def not_found(error):
|
|
return render_template("404.html"), 404
|
|
|
|
|
|
@app.errorhandler(500)
|
|
def internal_error(error):
|
|
logger.error(f"Internal server error: {error}")
|
|
flash("An internal error occurred. Please try again.", "error")
|
|
return redirect(url_for("home"))
|
|
|
|
|
|
# Session Timeout Check
|
|
@app.before_request
|
|
def check_session_timeout():
|
|
"""Prüft ob Session abgelaufen ist"""
|
|
from datetime import datetime
|
|
|
|
if "user_id" in session:
|
|
# Prüfe ob Session zu alt ist (optional)
|
|
login_time = session.get("login_time")
|
|
if login_time:
|
|
try:
|
|
login_datetime = datetime.fromisoformat(login_time)
|
|
now = datetime.utcnow()
|
|
|
|
# Session nach 24h abgelaufen (falls nicht permanent)
|
|
if (
|
|
not session.permanent
|
|
and (now - login_datetime).total_seconds() > 86400
|
|
):
|
|
session.clear()
|
|
flash("Your session has expired. Please log in again.", "warning")
|
|
return redirect(url_for("login"))
|
|
except (ValueError, TypeError):
|
|
# Ungültiger Zeitstempel - Session löschen
|
|
session.clear()
|
|
flash("Invalid session. Please log in again.", "warning")
|
|
return redirect(url_for("login"))
|
|
|
|
|
|
if __name__ == "__main__":
|
|
app.run(debug=True, host="0.0.0.0", port=8080, threaded=True)
|