from flask import Flask, request, redirect, url_for, send_file, render_template, flash, Response from flask_cors import CORS from werkzeug.utils import secure_filename from pymongo.mongo_client import MongoClient from pymongo.server_api import ServerApi import urllib.parse from functools import wraps import os import io app = Flask(__name__) app.secret_key = os.getenv('SECRET_KEY') username = urllib.parse.quote_plus(os.getenv('MONGO_USERNAME')) password = urllib.parse.quote_plus(os.getenv('MONGO_PASSWORD')) restUri = os.getenv('REST_URI') uri = f'mongodb+srv://{username}:{password}{restUri}' client = MongoClient(uri, server_api=ServerApi('1')) db = client['file_storage'] files_collection = db['files'] try: client.admin.command('ping') print("Pinged your deployment. You successfully connected to MongoDB!") except Exception as e: print(e) # Get the password from the environment variable APP_PASSWORD = os.getenv('APP_PASSWORD') def check_auth(username, password): """Check if a username/password combination is valid.""" return username == 'admin' and password == APP_PASSWORD def authenticate(): """Send a 401 response that enables basic auth.""" return Response( 'Could not verify your access level for that URL.\n' 'You have to login with proper credentials', 401, {'WWW-Authenticate': 'Basic realm="Login Required"'}) def requires_auth(f): @wraps(f) def decorated(*args, **kwargs): auth = request.authorization if not auth or not check_auth(auth.username, auth.password): return authenticate() return f(*args, **kwargs) return decorated @app.route('/') @requires_auth def index(): return render_template('index.html') @app.route('/upload', methods=['GET', 'POST']) @requires_auth def upload_file(): if request.method == 'POST': if 'file' not in request.files: flash('No file part') return redirect(request.url) files = request.files.getlist('file') for file in files: if file.filename == '': flash('No selected file') return redirect(request.url) if file: filename = secure_filename(file.filename) file_data = { 'filename': filename, 'data': file.read() } files_collection.insert_one(file_data) flash('Files successfully uploaded') return redirect(url_for('list_files')) return render_template('upload.html') @app.route('/uploads/') @requires_auth def uploaded_file(filename): file_data = files_collection.find_one({'filename': filename}) if file_data: return send_file( io.BytesIO(file_data['data']), mimetype='application/octet-stream' ) else: return 'File not found' @app.route('/files') @requires_auth def list_files(): files = [file_data['filename'] for file_data in files_collection.find({}, {'_id': 1, 'filename': 1})] return render_template('files.html', files=files) @app.route('/delete/', methods=['POST']) @requires_auth def delete_file(filename): file_data = files_collection.find_one({'filename': filename}) if file_data: files_collection.delete_one({'filename': filename}) flash('File successfully deleted') else: flash('File not found') return redirect(url_for('list_files')) if __name__ == '__main__': app.run(debug=True)