test2 / modules /database.py
AIdeaText's picture
Update modules/database.py
7807ad5 verified
raw
history blame
No virus
5.89 kB
# database.py
# database.py
import logging
import os
from azure.cosmos import CosmosClient
from azure.cosmos.exceptions import CosmosHttpResponseError
from pymongo import MongoClient
import certifi
from datetime import datetime
import io
import base64
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)
# Variables globales para Cosmos DB SQL API
cosmos_client = None
user_database = None
user_container = None
# Variables globales para Cosmos DB MongoDB API
mongo_client = None
mongo_db = None
analysis_collection = None
def initialize_cosmos_sql_connection():
global cosmos_client, user_database, user_container
try:
cosmos_endpoint = os.environ.get("COSMOS_ENDPOINT")
cosmos_key = os.environ.get("COSMOS_KEY")
if not cosmos_endpoint or not cosmos_key:
raise ValueError("Las variables de entorno COSMOS_ENDPOINT y COSMOS_KEY deben estar configuradas")
cosmos_client = CosmosClient(cosmos_endpoint, cosmos_key)
user_database = cosmos_client.get_database_client("user_database")
user_container = user_database.get_container_client("users")
logger.info("Conexi贸n a Cosmos DB SQL API exitosa")
return True
except Exception as e:
logger.error(f"Error al conectar con Cosmos DB SQL API: {str(e)}")
return False
def initialize_mongodb_connection():
global mongo_client, mongo_db, analysis_collection
try:
cosmos_mongodb_connection_string = os.getenv("MONGODB_CONNECTION_STRING")
if not cosmos_mongodb_connection_string:
logger.error("La variable de entorno MONGODB_CONNECTION_STRING no est谩 configurada")
return False
mongo_client = MongoClient(cosmos_mongodb_connection_string,
tls=True,
tlsCAFile=certifi.where(),
retryWrites=False,
serverSelectionTimeoutMS=5000,
connectTimeoutMS=10000,
socketTimeoutMS=10000)
mongo_client.admin.command('ping')
mongo_db = mongo_client['aideatext_db']
analysis_collection = mongo_db['text_analysis']
logger.info("Conexi贸n a Cosmos DB MongoDB API exitosa")
return True
except Exception as e:
logger.error(f"Error al conectar con Cosmos DB MongoDB API: {str(e)}", exc_info=True)
return False
# Funciones para Cosmos DB SQL API (manejo de usuarios)
def get_user(username):
try:
query = f"SELECT * FROM c WHERE c.id = '{username}'"
items = list(user_container.query_items(query=query, enable_cross_partition_query=True))
return items[0] if items else None
except Exception as e:
logger.error(f"Error al obtener usuario {username}: {str(e)}")
return None
def create_user(user_data):
try:
user_container.create_item(body=user_data)
return True
except Exception as e:
logger.error(f"Error al crear usuario: {str(e)}")
return False
# Funciones para Cosmos DB MongoDB API (an谩lisis de texto)
def get_student_data(username):
if analysis_collection is None:
logger.error("La conexi贸n a MongoDB no est谩 inicializada")
return None
try:
cursor = analysis_collection.find({"username": username}).sort("timestamp", -1)
formatted_data = {
"username": username,
"entries": [],
"entries_count": 0,
"word_count": {},
"arc_diagrams": [],
"network_diagrams": []
}
for entry in cursor:
formatted_data["entries"].append({
"timestamp": entry["timestamp"].isoformat(),
"text": entry["text"]
})
formatted_data["entries_count"] += 1
for category, count in entry.get("word_count", {}).items():
if category in formatted_data["word_count"]:
formatted_data["word_count"][category] += count
else:
formatted_data["word_count"][category] = count
formatted_data["arc_diagrams"].extend(entry.get("arc_diagrams", []))
formatted_data["network_diagrams"].append(entry.get("network_diagram", ""))
return formatted_data if formatted_data["entries_count"] > 0 else None
except Exception as e:
logger.error(f"Error al obtener datos del estudiante {username}: {str(e)}")
return None
def store_analysis_result(username, text, repeated_words, arc_diagrams, network_diagram):
if analysis_collection is None:
logger.error("La conexi贸n a MongoDB no est谩 inicializada")
return False
try:
buffer = io.BytesIO()
network_diagram.savefig(buffer, format='png')
buffer.seek(0)
network_diagram_base64 = base64.b64encode(buffer.getvalue()).decode()
word_count = {}
for word, color in repeated_words.items():
category = color # Asumiendo que 'color' es la categor铆a gramatical
word_count[category] = word_count.get(category, 0) + 1
analysis_document = {
'username': username,
'timestamp': datetime.utcnow(),
'text': text,
'word_count': word_count,
'arc_diagrams': arc_diagrams,
'network_diagram': network_diagram_base64
}
result = analysis_collection.insert_one(analysis_document)
logger.info(f"An谩lisis guardado con ID: {result.inserted_id} para el usuario: {username}")
return True
except Exception as e:
logger.error(f"Error al guardar el an谩lisis para el usuario {username}: {str(e)}")
return False