import logging import os import re from datetime import datetime, timedelta from argon2 import PasswordHasher from dotenv import load_dotenv from flask import Flask, request, jsonify, render_template from flask_sqlalchemy import SQLAlchemy from sqlalchemy.exc import IntegrityError # load environment vars from .env load_dotenv(".env") logger = logging.getLogger(__name__) app = Flask(__name__, template_folder="assets", static_folder="assets") app.config["SQLALCHEMY_DATABASE_URI"] = "sqlite:///users.db" db = SQLAlchemy(app) hasher = PasswordHasher() HOST = os.environ.get("HOST", "0.0.0.0") PORT = int(os.environ.get("PORT", 5000)) DEBUG = True if str(os.environ.get("DEBUG", "true")).lower() == "true" else False MAX_LOGIN_ATTEMPTS = 10 LOCKOUT_DURATION = timedelta(seconds=60) COMMON_SWEAR_WORDS = [ "fuck", "shit", "bitch", "asshole", "bastard", "cunt", "dick", "cock", "pussy", "motherfucker", "wanker", "twat", "bollocks", "arsehole", "crap", "damn", "bugger", "bloody", "sod", "git", "idiot", "moron", "prick", "slut", "whore", "nigger", "retard" ] CHARACTER_SUBSTITUTIONS = { "0": "o", "1": "i", "3": "e", "4": "a", "5": "s", "6": "g", "7": "t", "8": "b", "9": "g", "@": "a", "$": "s", "!": "i" } def load_passwords_from_file(file_path: str) -> set: """Loads passwords from a file specified""" try: with open(file_path, 'r') as file: passwords = set(file.read().splitlines()) return passwords except Exception as e: logger.exception(f"Error occurred while retrieving passwords. {str(e)}") return set() WEAK_PASSWORDS = load_passwords_from_file('assets/weakpasswords.txt') BREACHED_PASSWORDS = load_passwords_from_file('assets/breachedpasswords.txt') BREACHED_PASSWORDS_LOWER = [p.lower() for p in BREACHED_PASSWORDS] # create a sqlite model for # storing user data class User(db.Model): id = db.Column(db.Integer, primary_key=True) username = db.Column(db.String(80), unique=True, nullable=False) hashed_password = db.Column(db.String(256), nullable=False) salt = db.Column(db.String(16), nullable=False) login_attempts = db.Column(db.Integer, default=0) last_login_time = db.Column(db.DateTime, default=datetime.utcnow) # create database tables with app.app_context(): db.create_all() # custom exception definitions class CredentialValidationError(Exception): def __init__(self, message): self.message = message super().__init__(message) def apply_character_substitutions(word): """Converts a char or number substituted word into its original representation""" for original, replacement in CHARACTER_SUBSTITUTIONS.items(): word = word.replace(original, replacement) return word def validate_username(username: str) -> None: """Validates the username by checking it against a pre-defined ruleset""" # 1. raise a validation exception if username # contains non-alphanumeric chars other than # the underscore if not re.match(r'^[a-zA-Z0-9_]+$', username): raise CredentialValidationError("Invalid username") # 2. raise a validation exception when the # username contains a commonly used swear word if any(word in username.lower() for word in COMMON_SWEAR_WORDS): raise CredentialValidationError("Username cannot contain swear words") # 3. raises when users attempt to bypass the # above rule using symbol and number substitutions if any(word in apply_character_substitutions(username.lower()) for word in COMMON_SWEAR_WORDS): raise CredentialValidationError( "Username contains a symbol or number substituted swear word" ) def validate_password(username: str, password: str) -> None: """Validates a user password according to the NISP password guidelines""" # 1. Length: At least 8 characters if len(password) < 8: raise CredentialValidationError( "The password must be at least 8 characters long" ) # 2. Complexity: Overly complex rules # will not be enforced # 3. Composition: Disallowing consequent # characters if consequent char count > 3 if bool(re.search(r'(.)\1\1+', password)): raise CredentialValidationError( "The password cannot contain 3 or more " "consequent repeated characters" ) # 4. Expiration: Password expiration is # not checked since it's not recommended # to frequently expire passwords # 5. Similarity to username: If exactly or # partially similar to the username, an # exception will be raised if username.lower() in password.lower(): raise CredentialValidationError( "The password cannot be similar to the username" ) if apply_character_substitutions(username.lower()) == apply_character_substitutions(password.lower()): raise CredentialValidationError( "The password cannot be similar to the username, " "even with character and number substitutions" ) # 6. Data Breaches and Weak Passwords: any weak # password or breached passwords are disallowed if password in WEAK_PASSWORDS: raise CredentialValidationError( "Password is too weak. Please try again with a strong password" ) if password in BREACHED_PASSWORDS: raise CredentialValidationError( "Found the password in an already breached password dictionary, " "thus not secure. Please try again with a strong password" ) if password.lower() in BREACHED_PASSWORDS_LOWER: raise CredentialValidationError( "Password is very similar to a password in an already breached " "password dictionary. Please try again with a strong password" ) def hash_password(password) -> tuple[str, ...]: try: salt = os.urandom(16) hashed_password = hasher.hash(password + salt.hex()) return hashed_password, salt.hex() except Exception as e: logger.exception("Couldn't hash the password") raise e def verify_password(hashed_password, password, salt) -> bool: try: hasher.verify(hashed_password, password + salt) return True except Exception as e: logger.exception(f"Couldn't verify the password. {str(e)}") return False @app.route("/") def homepage(): return render_template("index.html") @app.route("/enroll") def enroll_page(): return render_template("enroll.html") @app.route("/api/enroll", methods=["POST"]) def enroll(): """Enrolling new users""" try: data = request.get_json() username = data.get("username") password = data.get("password") validate_username(username=username) validate_password(username=username, password=password) # securely hashing the password # and storing it in the sqlite db hashed_password, salt = hash_password(password) user = User(username=username.lower(), hashed_password=hashed_password, salt=salt) db.session.add(user) db.session.commit() return jsonify({"message": "User enrolled successfully"}), 200 except CredentialValidationError as e: logger.exception(str(e)) return jsonify({"error": str(e)}), 400 except IntegrityError as e: logger.exception(str(e)) return jsonify({"error": "Username is already taken"}), 409 except Exception as e: logger.exception(str(e)) return jsonify({"error": "Internal Server Error", "details": str(e)}), 500 @app.route("/api/authenticate", methods=["POST"]) def authenticate(): """Authenticates a user based on user credentials""" try: data = request.get_json() username = data.get("username") password = data.get("password") # 2FA/MFA: NIST entertains 2FA or MFA, but here it # is not imposed due to its implementation complexity user = User.query.filter_by(username=username.lower()).first() if user is None: return jsonify({"error": "User not found"}), 404 # Retry attempts: Users are given 10 consequent # login attempts until they're locked out if user.login_attempts >= MAX_LOGIN_ATTEMPTS: lockout_time = user.last_login_time + LOCKOUT_DURATION remaining_duration = lockout_time - datetime.utcnow() if remaining_duration.total_seconds() > 0: remaining_seconds = int(remaining_duration.total_seconds()) minutes, seconds = divmod(remaining_seconds, 60) if minutes > 0: remaining_time_str = f"{minutes} minute(s) and {seconds} second(s)" else: remaining_time_str = f"{seconds} second(s)" return jsonify( { "error": f"Account locked out. Try again in {remaining_time_str}" } ), 401 else: user.login_attempts = 0 user.last_login_time = datetime.utcnow() db.session.commit() # Verify the password if not verify_password(user.hashed_password, password, user.salt): user.login_attempts += 1 user.last_login_time = datetime.utcnow() db.session.commit() if user.login_attempts >= MAX_LOGIN_ATTEMPTS: remaining_time = int(LOCKOUT_DURATION.total_seconds()) minutes, seconds = divmod(remaining_time, 60) if minutes > 0: remaining_time_str = f"{minutes} minute(s) and {seconds} second(s)" else: remaining_time_str = f"{seconds} second(s)" return jsonify({"error": f"Account locked out. Try again in {remaining_time_str}"}), 401 return jsonify({"error": "Invalid password"}), 401 # Reset if a valid login attempt user.login_attempts = 0 user.last_login_time = datetime.utcnow() db.session.commit() return jsonify({"message": f"Authentication successful. Welcome @{username.lower()}!"}), 200 except Exception as e: logger.exception(str(e)) return jsonify({"error": "Internal Server Error", "details": str(e)}), 500 @app.route("/api/users", methods=["GET"]) def get_users(): """Retrieves the list of all users""" try: users = User.query.all() user_list = [{"id": user.id, "username": user.username} for user in users] return jsonify(user_list), 200 except Exception as e: logger.exception(str(e)) return jsonify({"error": "Internal Server Error", "details": str(e)}), 500 @app.route("/api/users/", methods=["DELETE"]) def delete_user(username): """Deletes a user by username""" try: user = User.query.filter_by(username=username.lower()).first() if user is None: return jsonify({"error": "User not found"}), 404 db.session.delete(user) db.session.commit() return jsonify({"message": "User deleted successfully"}), 200 except Exception as e: logger.exception(str(e)) return jsonify({"error": "Internal Server Error", "details": str(e)}), 500 if __name__ == "__main__": app.run(host=HOST, port=PORT, debug=DEBUG)