AIdeaText commited on
Commit
ad0fb1d
1 Parent(s): 7d2b8ec

Create auth.py

Browse files
Files changed (1) hide show
  1. modules/auth.py +95 -0
modules/auth.py ADDED
@@ -0,0 +1,95 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import os
2
+ from azure.cosmos import CosmosClient, exceptions
3
+ import bcrypt
4
+ import base64
5
+
6
+ def clean_and_validate_key(key):
7
+ key = key.strip()
8
+ while len(key) % 4 != 0:
9
+ key += '='
10
+ try:
11
+ base64.b64decode(key)
12
+ return key
13
+ except:
14
+ raise ValueError("La clave proporcionada no es válida")
15
+
16
+ # Azure Cosmos DB configuration
17
+ endpoint = os.environ.get("COSMOS_ENDPOINT")
18
+ key = os.environ.get("COSMOS_KEY")
19
+
20
+ if not endpoint or not key:
21
+ raise ValueError("Las variables de entorno COSMOS_ENDPOINT y COSMOS_KEY deben estar configuradas")
22
+
23
+ key = clean_and_validate_key(key)
24
+
25
+ try:
26
+ client = CosmosClient(endpoint, key)
27
+ database = client.get_database_client("user_database")
28
+ container = database.get_container_client("users")
29
+ # Prueba de conexión
30
+ database_list = list(client.list_databases())
31
+ print(f"Conexión exitosa. Bases de datos encontradas: {len(database_list)}")
32
+ except Exception as e:
33
+ print(f"Error al conectar con Cosmos DB: {str(e)}")
34
+ raise
35
+
36
+ def hash_password(password):
37
+ """Hash a password for storing."""
38
+ return bcrypt.hashpw(password.encode('utf-8'), bcrypt.gensalt()).decode('utf-8')
39
+
40
+ def verify_password(stored_password, provided_password):
41
+ """Verify a stored password against one provided by user"""
42
+ return bcrypt.checkpw(provided_password.encode('utf-8'), stored_password.encode('utf-8'))
43
+
44
+ def register_user(username, password, additional_info=None):
45
+ try:
46
+ query = f"SELECT * FROM c WHERE c.id = '{username}'"
47
+ existing_user = list(container.query_items(query=query, enable_cross_partition_query=True))
48
+
49
+ if existing_user:
50
+ return False # User already exists
51
+
52
+ new_user = {
53
+ 'id': username,
54
+ 'password': hash_password(password),
55
+ 'role': 'Estudiante',
56
+ 'additional_info': additional_info or {}
57
+ }
58
+
59
+ new_user['partitionKey'] = username
60
+
61
+ container.create_item(body=new_user)
62
+ return True
63
+ except exceptions.CosmosHttpResponseError as e:
64
+ print(f"Error al registrar usuario: {str(e)}")
65
+ return False
66
+
67
+ def authenticate_user(username, password):
68
+ """Authenticate a user."""
69
+ try:
70
+ query = f"SELECT * FROM c WHERE c.id = '{username}'"
71
+ results = list(container.query_items(query=query, partition_key=username))
72
+
73
+ if results:
74
+ stored_user = results[0]
75
+ if verify_password(stored_user['password'], password):
76
+ return True
77
+ except exceptions.CosmosHttpResponseError:
78
+ pass
79
+
80
+ return False
81
+
82
+ def get_user_role(username):
83
+ """Get the role of a user."""
84
+ try:
85
+ query = f"SELECT c.role FROM c WHERE c.id = '{username}'"
86
+ results = list(container.query_items(query=query, partition_key=username))
87
+
88
+ if results:
89
+ return results[0]['role']
90
+ except exceptions.CosmosHttpResponseError:
91
+ pass
92
+
93
+ return None
94
+
95
+ def update_user_info(username, new_info):