|
from satellites.base_satellite import VegapunkSatellite |
|
from typing import Dict, Any, List |
|
import numpy as np |
|
from scipy import stats |
|
import matplotlib.pyplot as plt |
|
import io |
|
import base64 |
|
import random |
|
from utils.logger import get_logger |
|
import pandas as pd |
|
from bs4 import BeautifulSoup |
|
import json |
|
import os |
|
from dotenv import load_dotenv |
|
from openai import OpenAI |
|
import requests |
|
import logging |
|
|
|
load_dotenv() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
role = "mathématiques, statistiques et analyse de données" |
|
fonction = "Effectuer des calculs complexes et analyser des ensembles de données" |
|
|
|
class Pythagoras(VegapunkSatellite): |
|
def __init__(self): |
|
super().__init__(name="Pythagoras", specialty="Role") |
|
self.llm_api_key = os.getenv("MISTRAL_API_KEY") |
|
self.llm_api_url = "https://api.mistral.ai/v1/chat/completions" |
|
self.research_databases = { |
|
"scientific": "https://api.example-scientific-db.com/search", |
|
"news": "https://api.example-news-db.com/search", |
|
"general": "https://api.example-general-db.com/search" |
|
} |
|
self.mathematical_constants = { |
|
"pi": np.pi, |
|
"e": np.e, |
|
"golden_ratio": (1 + np.sqrt(5)) / 2 |
|
} |
|
self.resources = {} |
|
self.external_apis = {} |
|
self.logger = get_logger("pythagoras") |
|
|
|
def process_task(self, task: Dict[str, Any]) -> Dict[str, Any]: |
|
task_type = task.get('type') |
|
if task_type == "calculate": |
|
return self._perform_calculation(task.get('operation'), task.get('values')) |
|
elif task_type == "analyze_data": |
|
return self._analyze_dataset(task.get('data')) |
|
elif task_type == "statistical_test": |
|
return self._perform_statistical_test(task.get('test_type'), task.get('data')) |
|
if task_type == "analyze_data": |
|
result = self.analyze_data_2(task["data"]) |
|
elif task_type == "conduct_research": |
|
result = self.conduct_research(task["topic"], task.get("depth", "medium")) |
|
elif task_type == "extract_information": |
|
result = self.extract_information(task["content"], task.get("keywords", [])) |
|
else: |
|
result = f"Tâche non reconnue : {task_type}" |
|
|
|
self.log_activity(f"Tâche traitée : {task_type}, Résultat : {result}") |
|
return {"result": result} |
|
|
|
def _perform_calculation(self, operation: str, values: List[float]) -> Dict[str, Any]: |
|
if not operation or not values: |
|
return {"error": "Opération ou valeurs manquantes"} |
|
|
|
try: |
|
if operation == "mean": |
|
result = np.mean(values) |
|
elif operation == "median": |
|
result = np.median(values) |
|
elif operation == "std_dev": |
|
result = np.std(values) |
|
elif operation == "correlation": |
|
if len(values) != 2 or not all(isinstance(v, list) for v in values): |
|
return {"error": "La corrélation nécessite deux listes de valeurs"} |
|
result = np.corrcoef(values[0], values[1])[0, 1] |
|
else: |
|
return {"error": "Opération non reconnue"} |
|
|
|
return {"operation": operation, "result": result} |
|
except Exception as e: |
|
return {"error": f"Erreur lors du calcul: {str(e)}"} |
|
|
|
def _analyze_dataset(self, data: List[float]) -> Dict[str, Any]: |
|
if not data: |
|
return {"error": "Ensemble de données vide"} |
|
|
|
try: |
|
analysis = { |
|
"mean": np.mean(data), |
|
"median": np.median(data), |
|
"std_dev": np.std(data), |
|
"min": np.min(data), |
|
"max": np.max(data), |
|
"quartiles": np.percentile(data, [25, 50, 75]).tolist() |
|
} |
|
|
|
|
|
plt.figure(figsize=(10, 6)) |
|
plt.hist(data, bins='auto', alpha=0.7, color='skyblue', edgecolor='black') |
|
plt.title("Histogramme des données") |
|
plt.xlabel("Valeurs") |
|
plt.ylabel("Fréquence") |
|
|
|
|
|
buffer = io.BytesIO() |
|
plt.savefig(buffer, format='png') |
|
buffer.seek(0) |
|
image_base64 = base64.b64encode(buffer.getvalue()).decode() |
|
plt.close() |
|
|
|
analysis["histogram"] = image_base64 |
|
|
|
return analysis |
|
except Exception as e: |
|
return {"error": f"Erreur lors de l'analyse: {str(e)}"} |
|
|
|
def _perform_statistical_test(self, test_type: str, data: Dict[str, List[float]]) -> Dict[str, Any]: |
|
if not test_type or not data: |
|
return {"error": "Type de test ou données manquantes"} |
|
|
|
try: |
|
if test_type == "t_test": |
|
if 'group1' not in data or 'group2' not in data: |
|
return {"error": "Deux groupes de données sont nécessaires pour le t-test"} |
|
t_stat, p_value = stats.ttest_ind(data['group1'], data['group2']) |
|
return {"test": "t_test", "t_statistic": t_stat, "p_value": p_value} |
|
elif test_type == "anova": |
|
if len(data) < 2: |
|
return {"error": "Au moins deux groupes sont nécessaires pour l'ANOVA"} |
|
f_stat, p_value = stats.f_oneway(*data.values()) |
|
return {"test": "ANOVA", "f_statistic": f_stat, "p_value": p_value} |
|
else: |
|
return {"error": "Type de test statistique non reconnu"} |
|
except Exception as e: |
|
return {"error": f"Erreur lors du test statistique: {str(e)}"} |
|
|
|
def analyze_data_2(self, data: List[Dict[str, Any]]) -> Dict[str, Any]: |
|
df = pd.DataFrame(data) |
|
analysis = { |
|
"summary": df.describe().to_dict(), |
|
"correlations": df.corr().to_dict(), |
|
"trends": self._detect_trends(df), |
|
"outliers": self._detect_outliers(df) |
|
} |
|
return analysis |
|
|
|
def conduct_research(self, topic: str, depth: str = "medium") -> Dict[str, Any]: |
|
research_results = {} |
|
for db_name, db_url in self.research_databases.items(): |
|
research_results[db_name] = self._search_database(db_url, topic, depth) |
|
|
|
summary = self._summarize_research(topic, research_results) |
|
return { |
|
"topic": topic, |
|
"depth": depth, |
|
"results": research_results, |
|
"summary": summary |
|
} |
|
|
|
def extract_information(self, content: str, keywords: List[str] = []) -> Dict[str, Any]: |
|
prompt = f"Extraire les informations clés du texte suivant, en se concentrant sur les mots-clés {keywords} si fournis : {content}" |
|
extracted_info = self._query_llm(prompt) |
|
|
|
return { |
|
"original_content_length": len(content), |
|
"extracted_information": extracted_info, |
|
"keywords_used": keywords |
|
} |
|
|
|
def _detect_trends(self, df: pd.DataFrame) -> Dict[str, Any]: |
|
trends = {} |
|
for column in df.select_dtypes(include=[np.number]).columns: |
|
trend = stats.linregress(range(len(df)), df[column]) |
|
trends[column] = { |
|
"slope": trend.slope, |
|
"intercept": trend.intercept, |
|
"r_value": trend.rvalue, |
|
"p_value": trend.pvalue, |
|
"trend": "increasing" if trend.slope > 0 else "decreasing" |
|
} |
|
return trends |
|
|
|
def _detect_outliers(self, df: pd.DataFrame) -> Dict[str, List[Any]]: |
|
outliers = {} |
|
for column in df.select_dtypes(include=[np.number]).columns: |
|
z_scores = np.abs(stats.zscore(df[column])) |
|
outliers[column] = df[column][z_scores > 3].tolist() |
|
return outliers |
|
|
|
def _search_database(self, db_url: str, topic: str, depth: str) -> List[Dict[str, Any]]: |
|
|
|
|
|
return [ |
|
{"title": f"Résultat 1 pour {topic}", "summary": f"Résumé du résultat 1 pour {topic}"}, |
|
{"title": f"Résultat 2 pour {topic}", "summary": f"Résumé du résultat 2 pour {topic}"} |
|
] |
|
|
|
def _summarize_research(self, topic: str, research_results: Dict[str, List[Dict[str, Any]]]) -> str: |
|
|
|
research_summary = json.dumps(research_results) |
|
prompt = f"Résumez les résultats de recherche suivants sur le sujet '{topic}' : {research_summary}" |
|
return self._query_llm(prompt) |
|
|
|
def _query_llm(self, prompt: str) -> str: |
|
headers = { |
|
"Authorization": f"Bearer {self.llm_api_key}", |
|
"Content-Type": "application/json" |
|
} |
|
data = { |
|
"model": "gpt-3.5-turbo", |
|
"messages": [{"role": "user", "content": prompt}] |
|
} |
|
try: |
|
response = requests.post(self.llm_api_url, headers=headers, json=data) |
|
response.raise_for_status() |
|
return response.json()['choices'][0]['message']['content'] |
|
except requests.RequestException as e: |
|
return f"Erreur lors de la requête LLM : {str(e)}" |
|
|
|
def log_activity(self, activity: str): |
|
logging.info(activity) |
|
|
|
def communicate_with_stellar(self, message: Dict[str, Any]) -> Dict[str, Any]: |
|
print(f"{self.name} envoie un message à Stellar: {message}") |
|
return {"Statut": "Message reçu", "message": "Stellar a bien reçu le message de Pythagoras"} |
|
|
|
def update_from_punkrecord(self): |
|
print(f"{self.name} met à jour sa base de connaissances depuis Punkrecord") |
|
new_constant = random.choice(["Constante de Planck", "Nombre d'Avogadro", "Constante de Boltzmann"]) |
|
self.add_to_knowledge_base("Nouvelle_constante", new_constant) |
|
|
|
def report_status(self) -> Dict[str, Any]: |
|
status = super().report_status() |
|
status.update({ |
|
"Constantes_mathématiques": list(self.mathematical_constants.keys()), |
|
"Opérations_disponibles": ["mean", "median", "std_dev", "correlation"] |
|
}) |
|
return status |
|
|
|
|
|
def process_communication(self,sender_name:str,message:Dict[str,Any]) ->Dict[str,Any]: |
|
if message.get("type")== "task": |
|
task_result = self.process_task(message.get("task")) |
|
return {"status": "Traitement effectué", "result": task_result} |
|
elif message.get("type") == "research": |
|
research_result = self.conduct_research(message.get("topic"), message.get("depth")) |
|
return {"status": "Recherche effectuée", "result": research_result} |
|
elif message.get("type") == "information_extraction": |
|
info_result = self.extract_information(message.get("content"), message.get("keywords", [])) |
|
return {"status": "Extraction d'information effectuée", "result": info_result} |
|
elif message.get("type") == "update_constants": |
|
self.mathematical_constants.update(message.get("constants", {})) |
|
return {"status": "Constantes mises à jour", "result": self.mathematical_constants} |
|
elif message.get("type") == "update_resources": |
|
self.resources.update(message.get("resources", {})) |
|
return {"status": "Ressources mises à jour", "result": self.resources} |
|
elif message.get("type") == "update_external_apis": |
|
self.external_apis.update(message.get("apis", {})) |
|
return {"status": "APIs mises à jour", "result": self.external_apis} |
|
|
|
else: |
|
return {"status": "Erreur", "result": "Type de tâche inconnu"} |
|
|
|
def receive_communication(self, sender_name: str, message: Dict[str, Any]) -> Dict[str, Any]: |
|
logging.info(f"{self.name} received communication from {sender_name}") |
|
return self.process_communication(sender_name, message) |
|
|
|
|