Spaces:
Sleeping
Sleeping
from enum import IntEnum | |
import numpy as np | |
STEPS_PER_HOUR = 12 | |
class Decision(IntEnum): | |
# use solar to satisfy consumption, | |
# and if it is not enough, use network. | |
# BESS is not discharged in this mode, | |
# but might be charged if solar has surplus. | |
PASSIVE = 0 | |
# use the battery if possible and necessary. | |
# the possible part means that there's charge in it, | |
# and the necessary part means that the consumption | |
# is not already covered by solar. | |
DISCHARGE = 1 | |
# use the network to charge the battery | |
# this is similar to PASSIVE, but forces the | |
# BESS to be charged, even if solar does not cover | |
# the whole of consumption plus BESS. | |
NETWORK_CHARGE = 2 | |
class RandomDecider: | |
def __init__(self, params, precalculated_supplier): | |
self.input_window_size = STEPS_PER_HOUR * 24 # day long window. | |
self.precalculated_supplier = precalculated_supplier | |
assert params.shape == (2, ) | |
# param_1 is prob of choosing PASSIVE | |
# param_2 is prob of choosing NETWORK_CHARGE | |
self.passive_prob, self.network_prob = params | |
def decide(self, prod_pred, cons_pred, fees, battery_model): | |
r = np.random.rand() | |
if r < self.passive_prob: | |
return Decision.PASSIVE | |
elif r < self.passive_prob + self.network_prob: | |
return Decision.NETWORK_CHARGE | |
else: | |
return Decision.DISCHARGE | |
def clip_params(params): | |
assert params.shape == (2, ) | |
p1, p2 = params | |
p1 = max((0, p1)) | |
p2 = max((0, p2)) | |
s = p1 + p2 | |
if s > 1: | |
p1 /= s | |
p2 /= s | |
return np.array([p1, p2]) | |
def initial_params(): | |
init_mean = np.array([1/3, 1/3]) | |
init_scale = np.array([1.0, 1.0]) | |
return init_mean, init_scale | |
# mock class as usual | |
# output_window_size is not yet used, always decides one timestep. | |
class Decider: | |
def __init__(self, params, precalculated_supplier): | |
self.input_window_size = STEPS_PER_HOUR * 24 # day long window. | |
self.precalculated_supplier = precalculated_supplier | |
assert params.shape == (2, ) | |
# param_1 is how many minutes do we look ahead to decide if there's | |
# an upcoming shortage. | |
# param_2 is the threshhold for shortage, in kW not kWh, | |
# that is, parametrized as an average over the window. | |
self.surdemand_lookahead_window_min, self.lookahead_surdemand_kw = params | |
def decide(self, prod_pred, cons_pred, fees, battery_model): | |
# TODO 15 minutes demand charge window hardwired at this weird place | |
DEMAND_CHARGE_WINDOW_MIN = 15 | |
time_interval_min = self.precalculated_supplier.time_index.freq.n | |
assert DEMAND_CHARGE_WINDOW_MIN % time_interval_min == 0 | |
peak_shaving_window = DEMAND_CHARGE_WINDOW_MIN // time_interval_min | |
step_in_hour = time_interval_min / 60 # [hour], the length of a time step. | |
deficit_kw = (cons_pred[:peak_shaving_window] - prod_pred[:peak_shaving_window]).clip(min=0) | |
deficit_kwh = (step_in_hour * deficit_kw).sum() | |
surdemand_window = int(self.surdemand_lookahead_window_min // time_interval_min) | |
mean_surdemand_kw = (cons_pred[:surdemand_window] - prod_pred[:surdemand_window]).mean() | |
current_fee = fees[0] | |
# TODO this should not be a hard threshold, and more importantly, | |
# it should not be hardwired. | |
HARDWIRED_THRESHOLD_FOR_CHEAP_POWER_HUF_PER_KWH = 20 # [HUF/kWh] | |
if mean_surdemand_kw > self.lookahead_surdemand_kw and current_fee <= HARDWIRED_THRESHOLD_FOR_CHEAP_POWER_HUF_PER_KWH: | |
return Decision.NETWORK_CHARGE | |
# peak shaving | |
if deficit_kwh > self.precalculated_supplier.peak_demand: | |
return Decision.DISCHARGE | |
else: | |
return Decision.PASSIVE | |
# this is called by the optimizer so that meaningless parameter settings are not attempted | |
# we could vectorize this easily, but it's not a bottleneck, the simulation is. | |
def clip_params(params): | |
assert params.shape == (2, ) | |
surdemand_lookahead_window_min, lookahead_surdemand_kw = params | |
surdemand_lookahead_window_min = np.clip(surdemand_lookahead_window_min, 5, 60 * 24 * 3) | |
# no-op right now: | |
lookahead_surdemand_kw = np.clip(lookahead_surdemand_kw, -np.inf, np.inf) | |
return np.array([surdemand_lookahead_window_min, lookahead_surdemand_kw]) | |
def initial_params(): | |
# surdemand_lookahead_window_min, lookahead_surdemand_kw | |
# param1 [minutes]. one day mean, half day scale for lookahead horizon. | |
# param2 [kWh], averaged over the horizon. negative values are meaningful. | |
init_mean = np.array([60 * 24.0, 0.0]) | |
init_scale = np.array([60 * 12.0, 100.0]) | |
return init_mean, init_scale | |