import numpy as np import pandas as pd import copy from enum import IntEnum # it's really just a network pricing model from supplier import Supplier from data_processing import read_datasets, add_production_field, interpolate_and_join, SolarParameters STEPS_PER_HOUR = 12 # SOC is normalized so that minimal_depth_of_discharge = 0 and maximal_depth_of_discharge = 1. # please set capacity_Ah = nominal_capacity_Ah * (max_dod - min_dod) # # TODO efficiency multiplier is not currently used, where best to put it? class BatteryModel: def __init__(self, capacity_Ah, time_interval_h): self.capacity_Ah = capacity_Ah self.efficiency = 0.9 # [dimensionless] self.voltage_V = 600 self.charge_kW = 50 self.discharge_kW = 60 self.time_interval_h = time_interval_h # the only non-constant member variable! # ratio of self.current_capacity_kWh and self.maximal_capacity_kWh self.soc = 0.0 @property def maximal_capacity_kWh(self): return self.capacity_Ah * self.voltage_V / 1000 @property def current_capacity_kWh(self): return self.soc * self.maximal_capacity_kWh def satisfy_demand(self, demand_kW): assert 0 <= self.soc <= 1 assert demand_kW >= 0 # rate limited: possible_discharge_in_timestep_kWh = self.discharge_kW * self.time_interval_h # limited by current capacity: possible_discharge_in_timestep_kWh = min((possible_discharge_in_timestep_kWh, self.current_capacity_kWh)) # limited by need: discharge_in_timestep_kWh = min((possible_discharge_in_timestep_kWh, demand_kW * self.time_interval_h)) consumption_from_bess_kW = discharge_in_timestep_kWh / self.time_interval_h unsatisfied_demand_kW = demand_kW - consumption_from_bess_kW cap_of_battery_kWh = self.current_capacity_kWh - discharge_in_timestep_kWh soc = cap_of_battery_kWh / self.maximal_capacity_kWh assert 0 <= soc <= self.soc <= 1 self.soc = soc return unsatisfied_demand_kW def charge(self, charge_kW): assert 0 <= self.soc <= 1 assert charge_kW >= 0 # rate limited: possible_charge_in_timestep_kWh = self.charge_kW * self.time_interval_h # limited by current capacity: possible_charge_in_timestep_kWh = min((possible_charge_in_timestep_kWh, self.maximal_capacity_kWh - self.current_capacity_kWh)) # limited by supply: charge_in_timestep_kWh = min((possible_charge_in_timestep_kWh, charge_kW * self.time_interval_h)) actual_charge_kW = charge_in_timestep_kWh / self.time_interval_h unused_charge_kW = charge_kW - actual_charge_kW cap_of_battery_kWh = self.current_capacity_kWh + charge_in_timestep_kWh soc = cap_of_battery_kWh / self.maximal_capacity_kWh assert 0 <= self.soc <= soc <= 1 self.soc = soc return unused_charge_kW class Decision(IntEnum): # use solar to satisfy consumption, # and if it is not enough, use network. # BESS is not discharged in this mode, # but might be charged if solar has surplus. PASSIVE = 0 # use the battery if possible and necessary. # the possible part means that there's charge in it, # and the necessary part means that the consumption # is not already covered by solar. DISCHARGE = 1 # use the network to charge the battery # this is similar to PASSIVE, but forces the # BESS to be charged, even if solar does not cover # the whole of consumption plus BESS. NETWORK_CHARGE = 2 # mock class as usual # output_window_size is not yet used, always decides one timestep. class Decider: def __init__(self): self.input_window_size = STEPS_PER_HOUR * 24 # day long window. self.random_seed = 0 # prod_cons_pred is a dataframe starting at now, containing # fields Production and Consumption. # this function does not mutate its inputs. # battery_model is just queried for capacity and current soc. # the method returns a pd.Series of Decisions as integers. def decide(self, prod_pred, cons_pred, battery_model): # assert len(prod_pred) == len(cons_pred) == self.input_window_size self.random_seed += 1 self.random_seed %= 3 # dummy rotates between Decisions self.random_seed = Decision.PASSIVE return self.random_seed # dummy decider always says DISCHARGE: # return pd.Series([Decision.DISCHARGE] * self.output_window_size, dtype=int) # even mock-er class than usual. # knows the future in advance, so it predicts it very well. # it's also unrealistic in that it takes row index instead of date. class DummyPredictor: def __init__(self, series): self.series = series def predict(self, indx, window_size): prediction = self.series.iloc[indx: indx + window_size] return prediction # this function does not mutate its inputs. # it makes a clone of battery_model and modifies that. def simulator(battery_model, supplier, prod_cons, prod_predictor, cons_predictor, decider): battery_model = copy.copy(battery_model) demand_np = prod_cons['Consumption'].to_numpy() production_np = prod_cons['Production'].to_numpy() assert len(demand_np) == len(production_np) step_in_minutes = prod_cons.index.freq.n assert step_in_minutes == 5 print("Simulating for", len(demand_np), "time steps. Each step is", step_in_minutes, "minutes.") soc_series = [] # by convention, we only call end user demand, demand, # and we only call end user consumption, consumption. # in our simple model, demand is always satisfied, hence demand=consumption. # BESS demand is called charge. consumption_from_solar_series = [] # demand satisfied by solar production consumption_from_network_series = [] # full demand satisfied by network, also includes consumption_from_network_to_bess. consumption_from_bess_series = [] # demand satisfied by BESS consumption_from_network_to_bess_series = [] # network used to charge BESS, also included in consumption_from_network. # the previous three must sum to demand_series. discarded_production_series = [] # solar power thrown away # 1 is not nominal but targeted (healthy) maximum charge. # we start with an empty battery, but not emptier than what's healthy for the batteries. # For the sake of simplicity 0 <= soc <= 1 # soc=0 means battery is emptied till it's 20% and soc=1 means battery is charged till 80% of its capacity # soc = 1 - maximal_depth_of_discharge # and will use only maximal_depth_of_discharge percent of the real battery capacity time_interval = step_in_minutes / 60 # amount of time step in hours for i, (demand, production) in enumerate(zip(demand_np, production_np)): # these five are modified on the appropriate codepaths: consumption_from_solar = 0 consumption_from_bess = 0 consumption_from_network = 0 discarded_production = 0 network_used_to_charge = 0 unsatisfied_demand = demand remaining_production = production prod_prediction = prod_predictor.predict(i, decider.input_window_size) cons_prediction = cons_predictor.predict(i, decider.input_window_size) decision = decider.decide(prod_prediction, cons_prediction, battery_model) production_used_to_charge = 0 if unsatisfied_demand >= remaining_production: # all goes to demand, no rate limit between solar and consumer consumption_from_solar = remaining_production unsatisfied_demand -= consumption_from_solar remaining_production = 0 if decision == Decision.DISCHARGE: # we try to cover the rest from BESS unsatisfied_demand = battery_model.satisfy_demand(unsatisfied_demand) # we cover the rest from network consumption_from_network = unsatisfied_demand unsatisfied_demand = 0 else: # demand fully satisfied by production, no rate limit between solar and consumer consumption_from_solar = unsatisfied_demand remaining_production -= unsatisfied_demand unsatisfied_demand = 0 if remaining_production > 0: # exploitable production still remains: discarded_production = battery_model.charge(remaining_production) # remaining_production [kW] if decision == Decision.NETWORK_CHARGE: # that is some random big number, the actual charge will be # determined by the combination of the BESS rate limit (battery_model.charge_kW) # and the BESS capacity fictional_network_charge_kW = 1000 discarded_network_charge_kW = battery_model.charge(fictional_network_charge_kW) actual_network_charge_kW = fictional_network_charge_kW - discarded_network_charge_kW consumption_from_network_to_bess = actual_network_charge_kW # just a renaming. consumption_from_network += actual_network_charge_kW else: consumption_from_network_to_bess = 0 soc_series.append(battery_model.soc) consumption_from_solar_series.append(consumption_from_solar) consumption_from_network_series.append(consumption_from_network) consumption_from_network_to_bess_series.append(consumption_from_network_to_bess) consumption_from_bess_series.append(consumption_from_bess) discarded_production_series.append(discarded_production) soc_series = np.array(soc_series) consumption_from_solar_series = np.array(consumption_from_solar_series) consumption_from_network_series = np.array(consumption_from_network_series) consumption_from_bess_series = np.array(consumption_from_bess_series) discarded_production_series = np.array(discarded_production_series) total_charge, consumption_charge_series, demand_charges = supplier.fee( pd.Series(consumption_from_network_series, index=prod_cons.index), provide_detail=True) print(f"All in all we have paid {total_charge} to network.") results = pd.DataFrame({'soc_series': soc_series, 'consumption_from_solar': consumption_from_solar_series, 'consumption_from_network': consumption_from_network_series, 'consumption_from_bess': consumption_from_bess_series, 'consumption_from_network_to_bess_series': consumption_from_network_to_bess_series, 'discarded_production': discarded_production_series, 'Consumption': prod_cons['Consumption'], 'Production': prod_cons['Production'] }) results = results.set_index(prod_cons.index) return results def main(): supplier = Supplier(price=100) # Ft/kWh supplier.set_price_for_interval(9, 17, 150) # nine-to-five increased price. parameters = SolarParameters() met_2021_data, cons_2021_data = read_datasets() add_production_field(met_2021_data, parameters) all_2021_data = interpolate_and_join(met_2021_data, cons_2021_data) time_interval_min = all_2021_data.index.freq.n time_interval_h = time_interval_min / 60 battery_model = BatteryModel(capacity_Ah=600, time_interval_h=time_interval_h) prod_predictor = DummyPredictor(pd.Series(all_2021_data['Production'])) cons_predictor = DummyPredictor(pd.Series(all_2021_data['Consumption'])) decider = Decider() results = simulator(battery_model, supplier, all_2021_data, prod_predictor, cons_predictor, decider) import matplotlib.pyplot as plt results['soc_series'].plot() plt.show() if __name__ == '__main__': main()