pq / v2 /architecture.py
Daniel Varga
big code drop with different main_* entry points, tuned evolution, and less logging.
history blame contribute delete
No virus
17.5 kB
import numpy as np
import pandas as pd
import copy
import time
import matplotlib.pyplot as plt
# it's really just a network pricing model
from decider import Decision, Decider, RandomDecider
from supplier import Supplier, precalculate_supplier
from data_processing import read_datasets, add_production_field, interpolate_and_join, SolarParameters
from bess import BatteryModel
from evolution_strategies import evolution_strategies_optimizer
from visualization import plotly_visualize_simulation, plotly_visualize_monthly
DO_VIS = False
# we predict last week same time for consumption, and yesterday same time for production.
# adds fields in-place
def add_dummy_predictions(all_data_with_predictions):
time_interval_min = all_data_with_predictions.index.freq.n
MESSING_IT_UP_SHIFT = 0 * 60 // time_interval_min
print("To test dependence on prediction quality, we artificially mess up t-168 prediction with a", MESSING_IT_UP_SHIFT * time_interval_min / 60, "hour shift.")
cons_shift = 60 * 168 // time_interval_min + MESSING_IT_UP_SHIFT
prod_shift = 60 * 24 // time_interval_min + MESSING_IT_UP_SHIFT
all_data_with_predictions['Consumption_prediction'] = all_data_with_predictions['Consumption'].shift(periods=cons_shift)
all_data_with_predictions['Production_prediction'] = all_data_with_predictions['Production'].shift(periods=prod_shift)
# we predict zero before we have data, no big deal:
all_data_with_predictions.loc[all_data_with_predictions.index[:cons_shift], 'Consumption_prediction'] = 0
all_data_with_predictions.loc[all_data_with_predictions.index[:prod_shift], 'Production_prediction'] = 0
# even mock-er class than usual.
# knows the future in advance, so it predicts it very well.
# it's also unrealistic in that it takes row index instead of date.
class DummyPredictor:
def __init__(self, series):
self.series = series
def predict(self, indx, window_size):
prediction = self.series.iloc[indx: indx + window_size]
return prediction
# this function does not mutate its inputs.
# it makes a clone of battery_model and modifies that.
def simulator(battery_model, prod_cons, decider):
battery_model = copy.copy(battery_model)
demand_np = prod_cons['Consumption'].to_numpy()
production_np = prod_cons['Production'].to_numpy()
demand_prediction_np = prod_cons['Consumption_prediction'].to_numpy()
production_prediction_np = prod_cons['Production_prediction'].to_numpy()
assert len(demand_np) == len(production_np)
step_in_minutes = prod_cons.index.freq.n
assert step_in_minutes == 5
surdemand_np = demand_np - production_np
plt.plot(demand_np, c="r")
plt.plot(production_np, c="g")
plt.plot(surdemand_np, c="b")
print("max prod", production_np.max())
surdemand_window = 24 * 60 // 5 # one day
mean_surdemands_kw = []
for i in range(len(demand_prediction_np) - surdemand_window):
mean_surdemand_kw = (demand_prediction_np[i: i+surdemand_window] - production_prediction_np[i: i+surdemand_window]).mean()
mean_surdemands_kw = pd.Series(mean_surdemands_kw, prod_cons.index[:len(mean_surdemands_kw)])
consumption_fees_np = prod_cons['Consumption_fees'].to_numpy()
# print("Simulating for", len(demand_np), "time steps. Each step is", step_in_minutes, "minutes.")
soc_series = []
# by convention, we only call end user demand, demand,
# and we only call end user consumption, consumption.
# in our simple model, demand is always satisfied, hence demand=consumption.
# BESS demand is called charge.
consumption_from_solar_series = [] # demand satisfied by solar production
consumption_from_network_series = [] # full demand satisfied by network, also includes consumption_from_network_to_bess.
consumption_from_bess_series = [] # demand satisfied by BESS
consumption_from_network_to_bess_series = [] # network used to charge BESS, also included in consumption_from_network.
# the previous three must sum to demand_series.
discarded_production_series = [] # solar power thrown away
decisions = []
# 1 is not nominal but targeted (healthy) maximum charge.
# we start with an empty battery, but not emptier than what's healthy for the batteries.
# For the sake of simplicity 0 <= soc <= 1
# soc=0 means battery is emptied till it's 20% and soc=1 means battery is charged till 80% of its capacity
# soc = 1 - maximal_depth_of_discharge
# and will use only maximal_depth_of_discharge percent of the real battery capacity
time_interval = step_in_minutes / 60 # amount of time step in hours
for i, (demand, production) in enumerate(zip(demand_np, production_np)):
# these five are modified on the appropriate codepaths:
consumption_from_solar = 0
consumption_from_bess = 0
consumption_from_network = 0
discarded_production = 0
network_used_to_charge = 0
bess_from_solar = 0
unsatisfied_demand = demand
remaining_production = production
# TODO what to call it, demand or consumption?
# 1. sometimes demand is inappropriate, like consumption_from_solar vs demand_from_solar.
# 2. sometimes consumption is inappropriate, like unsatisfied_demand vs unsatisfied_consumption.
# 3. there should not be two of them.
prod_prediction = production_prediction_np[i: i + decider.input_window_size]
cons_prediction = demand_prediction_np[i: i + decider.input_window_size]
consumption_fees = consumption_fees_np[i: i + decider.input_window_size]
decision = decider.decide(prod_prediction, cons_prediction, consumption_fees, battery_model)
production_used_to_charge = 0
if unsatisfied_demand >= remaining_production:
# all goes to demand, no rate limit between solar and consumer
consumption_from_solar = remaining_production
unsatisfied_demand -= consumption_from_solar
remaining_production = 0
if decision == Decision.DISCHARGE:
# we try to cover the rest from BESS
still_unsatisfied_demand = battery_model.satisfy_demand(unsatisfied_demand)
consumption_from_bess = unsatisfied_demand - still_unsatisfied_demand
unsatisfied_demand = still_unsatisfied_demand
# we cover the rest from network
consumption_from_network = unsatisfied_demand
unsatisfied_demand = 0
# demand fully satisfied by production, no rate limit between solar and consumer
consumption_from_solar = unsatisfied_demand
remaining_production -= unsatisfied_demand
unsatisfied_demand = 0
if remaining_production > 0:
# exploitable production still remains:
discarded_production = battery_model.charge(remaining_production) # remaining_production [kW]
bess_from_solar = remaining_production - discarded_production
if decision == Decision.NETWORK_CHARGE:
# that is some random big number, the actual charge will be
# determined by the combination of the BESS rate limit (battery_model.charge_kW)
# and the BESS capacity
fictional_network_charge_kW = 1000
discarded_network_charge_kW = battery_model.charge(fictional_network_charge_kW)
actual_network_charge_kW = fictional_network_charge_kW - discarded_network_charge_kW
consumption_from_network_to_bess = actual_network_charge_kW # just a renaming.
consumption_from_network += actual_network_charge_kW
consumption_from_network_to_bess = 0
assert np.isclose(consumption_from_solar + consumption_from_bess + consumption_from_network, demand + consumption_from_network_to_bess)
assert np.isclose(consumption_from_solar + discarded_production + bess_from_solar, production)
soc_series = np.array(soc_series)
consumption_from_solar_series = np.array(consumption_from_solar_series)
consumption_from_network_series = np.array(consumption_from_network_series)
consumption_from_bess_series = np.array(consumption_from_bess_series)
discarded_production_series = np.array(discarded_production_series)
decisions = np.array(decisions)
consumption_from_network_pandas_series = pd.Series(consumption_from_network_series, index=prod_cons.index)
# TODO badly duplicating functionality
total_charge, consumption_charge_series, demand_charges = supplier.fee(
consumption_charge_series = consumption_fees_np * consumption_from_network_pandas_series.to_numpy()
step_in_hour = consumption_from_network_pandas_series.index.freq.n / 60 # [hour], the length of a time step.
fifteen_minute_demands_in_kwh = consumption_from_network_pandas_series.resample('15min').sum() * step_in_hour
fifteen_minute_surdemands_in_kwh = (fifteen_minute_demands_in_kwh - decider.precalculated_supplier.peak_demand).clip(lower=0)
demand_charges = fifteen_minute_surdemands_in_kwh * decider.precalculated_supplier.surcharge_per_kwh
total_network_fee = consumption_charge_series.sum() + demand_charges.sum()
# print(f"Total network fee {total_network_fee / 10 ** 6} MHUF.")
if DO_VIS:
results = pd.DataFrame({'soc_series': soc_series, 'consumption_from_solar': consumption_from_solar_series,
'consumption_from_network': consumption_from_network_series,
'consumption_from_bess': consumption_from_bess_series,
'consumption_from_network_to_bess_series': consumption_from_network_to_bess_series,
'discarded_production': discarded_production_series,
'Consumption': prod_cons['Consumption'],
'Production': prod_cons['Production'],
'decisions': decisions
results = results.set_index(prod_cons.index)
return results, total_network_fee
def optimizer(decider_class, precalculated_supplier, battery_model, all_data_with_predictions):
number_of_generations = 10
population_size = 20
collected_loss_values = []
def objective_function(params):
# there was an evil numpy view bug that shuffled all results randomly.
params = params.copy()
# print("Simulating with parameters", params)
decider = decider_class(params, precalculated_supplier)
t = time.perf_counter()
results, total_network_fee = simulator(battery_model, all_data_with_predictions, decider)
collected_loss_values.append((params, total_network_fee))
print(params, total_network_fee)
return total_network_fee
def clipper_function(params):
return decider_class.clip_params(params)
init_mean, init_scale = decider_class.initial_params()
best_params = evolution_strategies_optimizer(objective_function, clipper_function,
init_mean=init_mean, init_scale=init_scale,
return best_params, collected_loss_values
def visualize_collected_loss_values(collected_loss_values):
all_params = []
losses = []
for row in collected_loss_values:
params, loss = row
all_params = np.array(all_params)
losses = np.array(losses)
# losses -= losses.min() ; losses /= losses.max()
# plt.scatter(all_params[:, 0], all_params[:, 1], c=range(len(all_params))) ; plt.show()
from mpl_toolkits.mplot3d import Axes3D
fig = plt.figure()
# Add a 3D subplot
ax = fig.add_subplot(111, projection='3d')
# Scatter plot
ax.scatter(all_params[:, 0], all_params[:, 1], losses, c=range(len(all_params)))
def main_param_grid(precalculated_supplier, battery_model, all_data_with_predictions):
N = 11
losses = np.zeros((N, N))
losses[:, :] = np.nan
xlim = [5, 60*24*3 - 1]
ylim = [-50, 50]
for i, x in enumerate(np.linspace(*xlim, N)):
for j, y in enumerate(np.linspace(*ylim, N)):
decider = Decider(np.array([x, y]), precalculated_supplier)
results, total_network_fee = simulator(battery_model, all_data_with_predictions, decider)
# print(x, y, total_network_fee)
losses[j, i] = total_network_fee
# losses[0, -1] = np.nan
# + is list extend.
# the aspect means square pixels
plt.imshow(losses, extent=xlim + ylim, aspect=(xlim[1]-xlim[0])/(ylim[1]-ylim[0]))
plt.xlabel('Surdemand lookahead window size [minutes]')
plt.ylabel('Lookahead surdemand [kW]')
def main_inspect_params(precalculated_supplier, battery_model, all_data_with_predictions):
def inspect_params(params):
decider = Decider(params, precalculated_supplier)
results, total_network_fee = simulator(battery_model, all_data_with_predictions, decider)
print(params, total_network_fee)
date_range = ("2021-01-15", "2021-02-01")
date_range = ("2021-08-15", "2021-09-01")
plotly_fig = plotly_visualize_simulation(results, date_range=date_range)
plotly_fig.update_layout(title=dict(text=f"{total_network_fee/1e6:0.2f} MFt"))
plotly_fig_2 = plotly_visualize_monthly(results)
plotly_fig_2.update_layout(title=dict(text=f"{total_network_fee/1e6:0.2f} MFt"))
inspect_params(np.array([5, 39]))
inspect_params(np.array([1400, 50]))
def main():
supplier = Supplier(price=100) # Ft/kWh
# nine-to-five increased price.
supplier.set_price_for_daily_interval(9, 17, 150)
# midnight-to-three decreased price, to test network charge.
supplier.set_price_for_daily_interval(0, 3, 20)
# peak_demand dimension is kWh, but it's interpreted as the full consumption
# during a 15 minute timestep.
supplier.set_demand_charge(peak_demand=2.5, surcharge_per_kwh=500) # kWh in a 15 minutes interval, Ft/kWh
solar_parameters = SolarParameters()
met_2021_data, cons_2021_data = read_datasets()
add_production_field(met_2021_data, solar_parameters)
all_data = interpolate_and_join(met_2021_data, cons_2021_data)
time_interval_min = all_data.index.freq.n
time_interval_h = time_interval_min / 60
# for faster testing:
print("Truncating dataset to", DATASET_TRUNCATED_SIZE, "datapoints, that is", DATASET_TRUNCATED_SIZE * time_interval_h / 24, "days")
all_data = all_data.iloc[:DATASET_TRUNCATED_SIZE]
print("Working with", solar_parameters.solar_cell_num, "solar cells, that's a maximum production of", all_data['Production'].max(), "kW.")
all_data_with_predictions = all_data.copy()
precalculated_supplier = precalculate_supplier(supplier, all_data.index)
# we delete the supplier to avoid accidentally calling it instead of precalculated_supplier
supplier = None
all_data_with_predictions['Consumption_fees'] = precalculated_supplier.consumption_fees # [HUF / kWh]
battery_model = BatteryModel(capacity_Ah=600, time_interval_h=time_interval_h)
# now that we've finally set everything up, we can do various things, hence several main_*().
# main_param_grid(precalculated_supplier, battery_model, all_data_with_predictions) ; exit()
# main_inspect_params(precalculated_supplier, battery_model, all_data_with_predictions) ; exit()
decider_class = Decider
# TODO this is super unfortunate:
# Consumption_fees travels via all_data_with_predictions,
# peak_demand and surcharge_per_kwh travels via precalculated_supplier of decider.
best_params, collected_loss_values = optimizer(decider_class, precalculated_supplier, battery_model, all_data_with_predictions)
if __name__ == '__main__':