File size: 1,248 Bytes
94a6c20 cabb2b8 94a6c20 cabb2b8 94a6c20 cabb2b8 94a6c20 cabb2b8 94a6c20 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 |
from satellites.base_satellite import VegapunkSatellite
from abc import ABC, abstractmethod
from typing import Dict, Any,Optional
import logging
class Stellar:
def __init__(self):
self.satellites = {} # dictionnaire pour stocke les references satellites
def register_satellites(self,satellite:VegapunkSatellite)->None:
self.satellites[satellite.name] = satellite
def route_communication(self, sender_name: str, target_name: str, message: Dict[str, Any]) -> Optional[Dict[str, Any]]:
if sender_name not in self.satellites or target_name not in self.satellites:
logging.error(f"Invalid sender or target: {sender_name} -> {target_name}")
return None
sender = self.satellites[sender_name]
target = self.satellites[target_name]
return sender.communicate_with_other_satellite(target, message)
def broadcast_message(self, sender_name: str, message: Dict[str, Any]) -> Dict[str, Dict[str, Any]]:
results = {}
for satellite_name, satellite in self.satellites.items():
if satellite_name != sender_name:
results[satellite_name] = self.route_communication(sender_name, satellite_name, message)
return results
|