diff --git a/grid2op/Environment/Environment.py b/grid2op/Environment/Environment.py index 565566e74..172e52207 100644 --- a/grid2op/Environment/Environment.py +++ b/grid2op/Environment/Environment.py @@ -916,7 +916,9 @@ def get_kwargs(self, with_backend=True): res["opponent_attack_cooldown"] = self._opponent_attack_cooldown res["kwargs_opponent"] = self._kwargs_opponent - # TODO alarm attention budget + res["attention_budget_cls"] = self._attention_budget_cls + res["kwargs_attention_budget"] = copy.deepcopy(self._kwargs_attention_budget) + res["has_attention_budget"] = self._has_attention_budget return res def _chronics_folder_name(self): @@ -1240,5 +1242,8 @@ def get_params_for_runner(self): res["opponent_attack_duration"] = self._opponent_attack_duration res["opponent_attack_cooldown"] = self._opponent_attack_cooldown res["opponent_kwargs"] = self._kwargs_opponent - # TODO alarm attention budget + + res["attention_budget_cls"] = self._attention_budget_cls + res["kwargs_attention_budget"] = copy.deepcopy(self._kwargs_attention_budget) + res["has_attention_budget"] = self._has_attention_budget return res diff --git a/grid2op/Runner/__init__.py b/grid2op/Runner/__init__.py index 80f9bd8b5..ff3d28efb 100644 --- a/grid2op/Runner/__init__.py +++ b/grid2op/Runner/__init__.py @@ -2,5 +2,5 @@ "Runner" ] -from grid2op.Runner.Runner import Runner +from grid2op.Runner.runner import Runner diff --git a/grid2op/Runner/aux_fun.py b/grid2op/Runner/aux_fun.py new file mode 100644 index 000000000..6fa1d4c08 --- /dev/null +++ b/grid2op/Runner/aux_fun.py @@ -0,0 +1,228 @@ +# Copyright (c) 2019-2020, RTE (https://www.rte-france.com) +# See AUTHORS.txt +# This Source Code Form is subject to the terms of the Mozilla Public License, version 2.0. +# If a copy of the Mozilla Public License, version 2.0 was not distributed with this file, +# you can obtain one at http://mozilla.org/MPL/2.0/. +# SPDX-License-Identifier: MPL-2.0 +# This file is part of Grid2Op, Grid2Op a testbed platform to model sequential decision making in power systems. + +import copy +import time + +import numpy as np + +from grid2op.Episode import EpisodeData +from grid2op.Runner.FakePBar import _FakePbar +from grid2op.dtypes import dt_int, dt_float, dt_bool +from grid2op.Chronics import ChronicsHandler + + +def _aux_one_process_parrallel(runner, + episode_this_process, + process_id, + path_save=None, + env_seeds=None, + max_iter=None, + add_detailed_output=False, + agent_seeds=None): + """this is out of the runner, otherwise it does not work on windows / macos """ + chronics_handler = ChronicsHandler(chronicsClass=runner.gridStateclass, + path=runner.path_chron, + **runner.gridStateclass_kwargs) + parameters = copy.deepcopy(runner.parameters) + nb_episode_this_process = len(episode_this_process) + res = [(None, None, None) for _ in range(nb_episode_this_process)] + for i, p_id in enumerate(episode_this_process): + env, agent = runner._new_env(chronics_handler=chronics_handler, + parameters=parameters) + try: + env_seed = None + if env_seeds is not None: + env_seed = env_seeds[i] + agt_seed = None + if agent_seeds is not None: + agt_seed = agent_seeds[i] + name_chron, cum_reward, nb_time_step, episode_data = _aux_run_one_episode( + env, agent, runner.logger, p_id, path_save, env_seed=env_seed, max_iter=max_iter, agent_seed=agt_seed, + detailed_output=add_detailed_output) + id_chron = chronics_handler.get_id() + max_ts = chronics_handler.max_timestep() + if add_detailed_output: + res[i] = (id_chron, name_chron, float(cum_reward), nb_time_step, max_ts, episode_data) + else: + res[i] = (id_chron, name_chron, float(cum_reward), nb_time_step, max_ts) + finally: + env.close() + return res + + +def _aux_run_one_episode(env, agent, logger, indx, path_save=None, + pbar=False, env_seed=None, agent_seed=None, max_iter=None, detailed_output=False): + done = False + time_step = int(0) + time_act = 0. + cum_reward = dt_float(0.0) + + # reset the environment + env.chronics_handler.tell_id(indx-1) + # the "-1" above is because the environment will be reset. So it will increase id of 1. + + # set the seed + if env_seed is not None: + env.seed(env_seed) + + # handle max_iter + if max_iter is not None: + env.chronics_handler.set_max_iter(max_iter) + + # reset it + obs = env.reset() + + # seed and reset the agent + if agent_seed is not None: + agent.seed(agent_seed) + agent.reset(obs) + + # compute the size and everything if it needs to be stored + nb_timestep_max = env.chronics_handler.max_timestep() + efficient_storing = nb_timestep_max > 0 + nb_timestep_max = max(nb_timestep_max, 0) + + if path_save is None and not detailed_output: + # i don't store anything on drive, so i don't need to store anything on memory + nb_timestep_max = 0 + + disc_lines_templ = np.full( + (1, env.backend.n_line), fill_value=False, dtype=dt_bool) + + attack_templ = np.full( + (1, env._oppSpace.action_space.size()), fill_value=0., dtype=dt_float) + if efficient_storing: + times = np.full(nb_timestep_max, fill_value=np.NaN, dtype=dt_float) + rewards = np.full(nb_timestep_max, fill_value=np.NaN, dtype=dt_float) + actions = np.full((nb_timestep_max, env.action_space.n), + fill_value=np.NaN, dtype=dt_float) + env_actions = np.full( + (nb_timestep_max, env._helper_action_env.n), fill_value=np.NaN, dtype=dt_float) + observations = np.full( + (nb_timestep_max+1, env.observation_space.n), fill_value=np.NaN, dtype=dt_float) + disc_lines = np.full( + (nb_timestep_max, env.backend.n_line), fill_value=np.NaN, dtype=dt_bool) + attack = np.full((nb_timestep_max, env._opponent_action_space.n), fill_value=0., dtype=dt_float) + else: + times = np.full(0, fill_value=np.NaN, dtype=dt_float) + rewards = np.full(0, fill_value=np.NaN, dtype=dt_float) + actions = np.full((0, env.action_space.n), fill_value=np.NaN, dtype=dt_float) + env_actions = np.full((0, env._helper_action_env.n), fill_value=np.NaN, dtype=dt_float) + observations = np.full((0, env.observation_space.n), fill_value=np.NaN, dtype=dt_float) + disc_lines = np.full((0, env.backend.n_line), fill_value=np.NaN, dtype=dt_bool) + attack = np.full((0, env._opponent_action_space.n), fill_value=0., dtype=dt_float) + + if path_save is not None: + # store observation at timestep 0 + if efficient_storing: + observations[time_step, :] = obs.to_vect() + else: + observations = np.concatenate((observations, obs.to_vect().reshape(1, -1))) + episode = EpisodeData(actions=actions, + env_actions=env_actions, + observations=observations, + rewards=rewards, + disc_lines=disc_lines, + times=times, + observation_space=env.observation_space, + action_space=env.action_space, + helper_action_env=env._helper_action_env, + path_save=path_save, + disc_lines_templ=disc_lines_templ, + attack_templ=attack_templ, + attack=attack, + attack_space=env._opponent_action_space, + logger=logger, + name=env.chronics_handler.get_name(), + force_detail=detailed_output, + other_rewards=[]) + episode.set_parameters(env) + + beg_ = time.time() + + reward = float(env.reward_range[0]) + done = False + + next_pbar = [False] + with _aux_make_progress_bar(pbar, nb_timestep_max, next_pbar) as pbar_: + while not done: + beg__ = time.time() + act = agent.act(obs, reward, done) + end__ = time.time() + time_act += end__ - beg__ + + obs, reward, done, info = env.step(act) # should load the first time stamp + cum_reward += reward + time_step += 1 + pbar_.update(1) + opp_attack = env._oppSpace.last_attack + episode.incr_store(efficient_storing, + time_step, + end__ - beg__, + float(reward), + env._env_modification, + act, obs, opp_attack, + info) + + end_ = time.time() + episode.set_meta(env, time_step, float(cum_reward), env_seed, agent_seed) + + li_text = ["Env: {:.2f}s", "\t - apply act {:.2f}s", "\t - run pf: {:.2f}s", + "\t - env update + observation: {:.2f}s", "Agent: {:.2f}s", "Total time: {:.2f}s", + "Cumulative reward: {:1f}"] + msg_ = "\n".join(li_text) + logger.info(msg_.format( + env._time_apply_act + env._time_powerflow + env._time_extract_obs, + env._time_apply_act, env._time_powerflow, env._time_extract_obs, + time_act, end_ - beg_, cum_reward)) + + episode.set_episode_times(env, time_act, beg_, end_) + + episode.to_disk() + name_chron = env.chronics_handler.get_name() + return name_chron, cum_reward, int(time_step), episode + + +def _aux_make_progress_bar(pbar, total, next_pbar): + """ + INTERNAL + + .. warning:: /!\\\\ Internal, do not use unless you know what you are doing /!\\\\ + + Parameters + ---------- + pbar: ``bool`` or ``type`` or ``object`` + How to display the progress bar, understood as follow: + + - if pbar is ``None`` nothing is done. + - if pbar is a boolean, tqdm pbar are used, if tqdm package is available and installed on the system + [if ``true``]. If it's false it's equivalent to pbar being ``None`` + - if pbar is a ``type`` ( a class), it is used to build a progress bar at the highest level (episode) and + and the lower levels (step during the episode). If it's a type it muyst accept the argument "total" + and "desc" when being built, and the closing is ensured by this method. + - if pbar is an object (an instance of a class) it is used to make a progress bar at this highest level + (episode) but not at lower levels (step during the episode) + """ + pbar_ = _FakePbar() + next_pbar[0] = False + + if isinstance(pbar, bool): + if pbar: + try: + from tqdm import tqdm + pbar_ = tqdm(total=total, desc="episode") + next_pbar[0] = True + except (ImportError, ModuleNotFoundError): + pass + elif isinstance(pbar, type): + pbar_ = pbar(total=total, desc="episode") + next_pbar[0] = pbar + elif isinstance(pbar, object): + pbar_ = pbar + return pbar_ diff --git a/grid2op/Runner/basic_logger.py b/grid2op/Runner/basic_logger.py new file mode 100644 index 000000000..90cfb88f1 --- /dev/null +++ b/grid2op/Runner/basic_logger.py @@ -0,0 +1,76 @@ +# Copyright (c) 2019-2021, RTE (https://www.rte-france.com) +# See AUTHORS.txt +# This Source Code Form is subject to the terms of the Mozilla Public License, version 2.0. +# If a copy of the Mozilla Public License, version 2.0 was not distributed with this file, +# you can obtain one at http://mozilla.org/MPL/2.0/. +# SPDX-License-Identifier: MPL-2.0 +# This file is part of Grid2Op, Grid2Op a testbed platform to model sequential decision making in power systems. + + +class DoNothingLog: + """ + INTERNAL + + .. warning:: /!\\\\ Internal, do not use unless you know what you are doing /!\\\\ + + A class to emulate the behaviour of a logger, but that does absolutely nothing. + """ + INFO = 2 + WARNING = 1 + ERROR = 0 + + def __init__(self, max_level=2): + self.max_level = max_level + + def warn(self, *args, **kwargs): + pass + + def info(self, *args, **kwargs): + pass + + def error(self, *args, **kwargs): + pass + + def warning(self, *args, **kwargs): + pass + + +class ConsoleLog(DoNothingLog): + """ + INTERNAL + + .. warning:: /!\\\\ Internal, do not use unless you know what you are doing /!\\\\ + + A class to emulate the behaviour of a logger, but that prints on the console + """ + + def __init__(self, max_level=2): + DoNothingLog.__init__(self, max_level) + + def warn(self, *args, **kwargs): + if self.max_level >= self.WARNING: + if args: + print("WARNING: \"{}\"".format(", ".join(args))) + if kwargs: + print("WARNING: {}".format(kwargs)) + + def info(self, *args, **kwargs): + if self.max_level >= self.INFO: + if args: + print("INFO: \"{}\"".format(", ".join(args))) + if kwargs: + print("INFO: {}".format(kwargs)) + + def error(self, *args, **kwargs): + if self.max_level >= self.ERROR: + if args: + print("ERROR: \"{}\"".format(", ".join(args))) + if kwargs: + print("ERROR: {}".format(kwargs)) + + def warning(self, *args, **kwargs): + if self.max_level >= self.WARNING: + if args: + print("WARNING: \"{}\"".format(", ".join(args))) + if kwargs: + print("WARNING: {}".format(kwargs)) diff --git a/grid2op/Runner/Runner.py b/grid2op/Runner/runner.py similarity index 80% rename from grid2op/Runner/Runner.py rename to grid2op/Runner/runner.py index 96ac58ff1..bc60b07c7 100644 --- a/grid2op/Runner/Runner.py +++ b/grid2op/Runner/runner.py @@ -5,15 +5,12 @@ # you can obtain one at http://mozilla.org/MPL/2.0/. # SPDX-License-Identifier: MPL-2.0 # This file is part of Grid2Op, Grid2Op a testbed platform to model sequential decision making in power systems. -import time + import os import warnings -import sys -import numpy as np import copy from multiprocessing import Pool -from grid2op.dtypes import dt_int, dt_float, dt_bool from grid2op.Action import BaseAction, TopologyAction, DontAct from grid2op.Exceptions import UsedRunnerError, Grid2OpException, EnvError from grid2op.Observation import CompleteObservation, BaseObservation @@ -24,11 +21,12 @@ from grid2op.Backend import Backend, PandaPowerBackend from grid2op.Parameters import Parameters from grid2op.Agent import DoNothingAgent, BaseAgent -from grid2op.Episode import EpisodeData -from grid2op.Runner.FakePBar import _FakePbar from grid2op.VoltageControler import ControlVoltageFromFile from grid2op.dtypes import dt_float from grid2op.Opponent import BaseOpponent, NeverAttackBudget +from grid2op.operator_attention import LinearAttentionBudget +from grid2op.Runner.aux_fun import _aux_run_one_episode, _aux_make_progress_bar, _aux_one_process_parrallel +from grid2op.Runner.basic_logger import DoNothingLog, ConsoleLog # on windows if i start using sequential, i need to continue using sequential # if i start using parallel i need to continue using parallel @@ -45,289 +43,9 @@ # TODO: if chronics are "loop through" multiple times, only last results are saved. :-/ -class DoNothingLog: - """ - INTERNAL - - .. warning:: /!\\\\ Internal, do not use unless you know what you are doing /!\\\\ - - A class to emulate the behaviour of a logger, but that does absolutely nothing. - """ - INFO = 2 - WARNING = 1 - ERROR = 0 - - def __init__(self, max_level=2): - self.max_level = max_level - - def warn(self, *args, **kwargs): - pass - - def info(self, *args, **kwargs): - pass - - def error(self, *args, **kwargs): - pass - - def warning(self, *args, **kwargs): - pass - - -class ConsoleLog(DoNothingLog): - """ - INTERNAL - - .. warning:: /!\\\\ Internal, do not use unless you know what you are doing /!\\\\ - - A class to emulate the behaviour of a logger, but that prints on the console - """ - - def __init__(self, max_level=2): - DoNothingLog.__init__(self, max_level) - - def warn(self, *args, **kwargs): - if self.max_level >= self.WARNING: - if args: - print("WARNING: \"{}\"".format(", ".join(args))) - if kwargs: - print("WARNING: {}".format(kwargs)) - - def info(self, *args, **kwargs): - if self.max_level >= self.INFO: - if args: - print("INFO: \"{}\"".format(", ".join(args))) - if kwargs: - print("INFO: {}".format(kwargs)) - - def error(self, *args, **kwargs): - if self.max_level >= self.ERROR: - if args: - print("ERROR: \"{}\"".format(", ".join(args))) - if kwargs: - print("ERROR: {}".format(kwargs)) - - def warning(self, *args, **kwargs): - if self.max_level >= self.WARNING: - if args: - print("WARNING: \"{}\"".format(", ".join(args))) - if kwargs: - print("WARNING: {}".format(kwargs)) - - -def _aux_one_process_parrallel(runner, - episode_this_process, - process_id, - path_save=None, - env_seeds=None, - max_iter=None, - add_detailed_output=False, - agent_seeds=None): - """this is out of the runner, otherwise it does not work on windows / macos """ - chronics_handler = ChronicsHandler(chronicsClass=runner.gridStateclass, - path=runner.path_chron, - **runner.gridStateclass_kwargs) - parameters = copy.deepcopy(runner.parameters) - nb_episode_this_process = len(episode_this_process) - res = [(None, None, None) for _ in range(nb_episode_this_process)] - for i, p_id in enumerate(episode_this_process): - env, agent = runner._new_env(chronics_handler=chronics_handler, - parameters=parameters) - try: - env_seed = None - if env_seeds is not None: - env_seed = env_seeds[i] - agt_seed = None - if agent_seeds is not None: - agt_seed = agent_seeds[i] - name_chron, cum_reward, nb_time_step, episode_data = _aux_run_one_episode( - env, agent, runner.logger, p_id, path_save, env_seed=env_seed, max_iter=max_iter, agent_seed=agt_seed, - detailed_output=add_detailed_output) - id_chron = chronics_handler.get_id() - max_ts = chronics_handler.max_timestep() - if add_detailed_output: - res[i] = (id_chron, name_chron, float(cum_reward), nb_time_step, max_ts, episode_data) - else: - res[i] = (id_chron, name_chron, float(cum_reward), nb_time_step, max_ts) - finally: - env.close() - return res - - -def _aux_run_one_episode(env, agent, logger, indx, path_save=None, - pbar=False, env_seed=None, agent_seed=None, max_iter=None, detailed_output=False): - done = False - time_step = int(0) - time_act = 0. - cum_reward = dt_float(0.0) - - # reset the environment - env.chronics_handler.tell_id(indx-1) - # the "-1" above is because the environment will be reset. So it will increase id of 1. - - # set the seed - if env_seed is not None: - env.seed(env_seed) - - # handle max_iter - if max_iter is not None: - env.chronics_handler.set_max_iter(max_iter) - - # reset it - obs = env.reset() - - # seed and reset the agent - if agent_seed is not None: - agent.seed(agent_seed) - agent.reset(obs) - - # compute the size and everything if it needs to be stored - nb_timestep_max = env.chronics_handler.max_timestep() - efficient_storing = nb_timestep_max > 0 - nb_timestep_max = max(nb_timestep_max, 0) - - if path_save is None and not detailed_output: - # i don't store anything on drive, so i don't need to store anything on memory - nb_timestep_max = 0 - - disc_lines_templ = np.full( - (1, env.backend.n_line), fill_value=False, dtype=dt_bool) - - attack_templ = np.full( - (1, env._oppSpace.action_space.size()), fill_value=0., dtype=dt_float) - if efficient_storing: - times = np.full(nb_timestep_max, fill_value=np.NaN, dtype=dt_float) - rewards = np.full(nb_timestep_max, fill_value=np.NaN, dtype=dt_float) - actions = np.full((nb_timestep_max, env.action_space.n), - fill_value=np.NaN, dtype=dt_float) - env_actions = np.full( - (nb_timestep_max, env._helper_action_env.n), fill_value=np.NaN, dtype=dt_float) - observations = np.full( - (nb_timestep_max+1, env.observation_space.n), fill_value=np.NaN, dtype=dt_float) - disc_lines = np.full( - (nb_timestep_max, env.backend.n_line), fill_value=np.NaN, dtype=dt_bool) - attack = np.full((nb_timestep_max, env._opponent_action_space.n), fill_value=0., dtype=dt_float) - else: - times = np.full(0, fill_value=np.NaN, dtype=dt_float) - rewards = np.full(0, fill_value=np.NaN, dtype=dt_float) - actions = np.full((0, env.action_space.n), fill_value=np.NaN, dtype=dt_float) - env_actions = np.full((0, env._helper_action_env.n), fill_value=np.NaN, dtype=dt_float) - observations = np.full((0, env.observation_space.n), fill_value=np.NaN, dtype=dt_float) - disc_lines = np.full((0, env.backend.n_line), fill_value=np.NaN, dtype=dt_bool) - attack = np.full((0, env._opponent_action_space.n), fill_value=0., dtype=dt_float) - - if path_save is not None: - # store observation at timestep 0 - if efficient_storing: - observations[time_step, :] = obs.to_vect() - else: - observations = np.concatenate((observations, obs.to_vect().reshape(1, -1))) - episode = EpisodeData(actions=actions, - env_actions=env_actions, - observations=observations, - rewards=rewards, - disc_lines=disc_lines, - times=times, - observation_space=env.observation_space, - action_space=env.action_space, - helper_action_env=env._helper_action_env, - path_save=path_save, - disc_lines_templ=disc_lines_templ, - attack_templ=attack_templ, - attack=attack, - attack_space=env._opponent_action_space, - logger=logger, - name=env.chronics_handler.get_name(), - force_detail=detailed_output, - other_rewards=[]) - episode.set_parameters(env) - - beg_ = time.time() - - reward = float(env.reward_range[0]) - done = False - - next_pbar = [False] - with _aux_make_progress_bar(pbar, nb_timestep_max, next_pbar) as pbar_: - while not done: - beg__ = time.time() - act = agent.act(obs, reward, done) - end__ = time.time() - time_act += end__ - beg__ - - obs, reward, done, info = env.step(act) # should load the first time stamp - cum_reward += reward - time_step += 1 - pbar_.update(1) - opp_attack = env._oppSpace.last_attack - episode.incr_store(efficient_storing, - time_step, - end__ - beg__, - float(reward), - env._env_modification, - act, obs, opp_attack, - info) - - end_ = time.time() - episode.set_meta(env, time_step, float(cum_reward), env_seed, agent_seed) - - li_text = ["Env: {:.2f}s", "\t - apply act {:.2f}s", "\t - run pf: {:.2f}s", - "\t - env update + observation: {:.2f}s", "Agent: {:.2f}s", "Total time: {:.2f}s", - "Cumulative reward: {:1f}"] - msg_ = "\n".join(li_text) - logger.info(msg_.format( - env._time_apply_act + env._time_powerflow + env._time_extract_obs, - env._time_apply_act, env._time_powerflow, env._time_extract_obs, - time_act, end_ - beg_, cum_reward)) - - episode.set_episode_times(env, time_act, beg_, end_) - - episode.to_disk() - name_chron = env.chronics_handler.get_name() - return name_chron, cum_reward, int(time_step), episode - - -def _aux_make_progress_bar(pbar, total, next_pbar): - """ - INTERNAL - - .. warning:: /!\\\\ Internal, do not use unless you know what you are doing /!\\\\ - - Parameters - ---------- - pbar: ``bool`` or ``type`` or ``object`` - How to display the progress bar, understood as follow: - - - if pbar is ``None`` nothing is done. - - if pbar is a boolean, tqdm pbar are used, if tqdm package is available and installed on the system - [if ``true``]. If it's false it's equivalent to pbar being ``None`` - - if pbar is a ``type`` ( a class), it is used to build a progress bar at the highest level (episode) and - and the lower levels (step during the episode). If it's a type it muyst accept the argument "total" - and "desc" when being built, and the closing is ensured by this method. - - if pbar is an object (an instance of a class) it is used to make a progress bar at this highest level - (episode) but not at lower levels (step during the episode) - """ - pbar_ = _FakePbar() - next_pbar[0] = False - - if isinstance(pbar, bool): - if pbar: - try: - from tqdm import tqdm - pbar_ = tqdm(total=total, desc="episode") - next_pbar[0] = True - except (ImportError, ModuleNotFoundError): - pass - elif isinstance(pbar, type): - pbar_ = pbar(total=total, desc="episode") - next_pbar[0] = pbar - elif isinstance(pbar, object): - pbar_ = pbar - return pbar_ - - class Runner(object): """ - A runner is a utilitary tool that allows to run simulations more easily. + A runner is a utility tool that allows to run simulations more easily. It is a more convenient way to execute the following loops: @@ -480,6 +198,11 @@ class Runner(object): grid_layout: ``dict``, optional The layout of the grid (position of each substation) usefull if you need to plot some things for example. + TODO + _attention_budget_cls=LinearAttentionBudget, + _kwargs_attention_budget=None, + _has_attention_budget=False + Examples -------- Different examples are showed in the description of the main method :func:`Runner.run` @@ -526,7 +249,10 @@ def __init__(self, opponent_attack_cooldown=99999, opponent_kwargs={}, grid_layout=None, - with_forecast=True): + with_forecast=True, + attention_budget_cls=LinearAttentionBudget, + kwargs_attention_budget=None, + has_attention_budget=False): """ Initialize the Runner. @@ -584,6 +310,7 @@ def __init__(self, The controler that will change the voltage setpoints of the generators. # TODO documentation on the opponent + # TOOD doc for the attention budget """ self.with_forecast = with_forecast self.name_env = name_env @@ -756,6 +483,11 @@ def __init__(self, self.opponent_kwargs = opponent_kwargs self.grid_layout = grid_layout + # attention budget + self._attention_budget_cls = attention_budget_cls + self._kwargs_attention_budget = copy.deepcopy(kwargs_attention_budget) + self._has_attention_budget = has_attention_budget + # otherwise on windows / macos it sometimes fail in the runner in multi process # on linux like OS i prefer to generate all the proper classes accordingly if _IS_LINUX: @@ -792,6 +524,9 @@ def _new_env(self, chronics_handler, parameters): opponent_attack_cooldown=self.opponent_attack_cooldown, kwargs_opponent=self.opponent_kwargs, with_forecast=self.with_forecast, + attention_budget_cls=self._attention_budget_cls, + kwargs_attention_budget=self._kwargs_attention_budget, + has_attention_budget=self._has_attention_budget, _raw_backend_class=self.backendClass ) diff --git a/grid2op/tests/test_AlarmFeature.py b/grid2op/tests/test_AlarmFeature.py new file mode 100644 index 000000000..036998659 --- /dev/null +++ b/grid2op/tests/test_AlarmFeature.py @@ -0,0 +1,440 @@ +# Copyright (c) 2019-2020, RTE (https://www.rte-france.com) +# See AUTHORS.txt +# This Source Code Form is subject to the terms of the Mozilla Public License, version 2.0. +# If a copy of the Mozilla Public License, version 2.0 was not distributed with this file, +# you can obtain one at http://mozilla.org/MPL/2.0/. +# SPDX-License-Identifier: MPL-2.0 +# This file is part of Grid2Op, Grid2Op a testbed platform to model sequential decision making in power systems. + +import warnings +import numpy as np +import unittest +import os +import tempfile +from grid2op.tests.helper_path_test import * + +from grid2op.operator_attention import LinearAttentionBudget +from grid2op import make +from grid2op.Reward import RedispReward +from grid2op.Exceptions import Grid2OpException +from grid2op.Runner import Runner +from grid2op.Environment import Environment +from grid2op.Episode import EpisodeData + + +class TestAlarmFeature(unittest.TestCase): + """test the basic bahavior of the alarm feature""" + def setUp(self) -> None: + self.env_nm = os.path.join(PATH_DATA_TEST, "l2rpn_neurips_2020_track1_with_alert") + with warnings.catch_warnings(): + warnings.filterwarnings("ignore") + self.env = make(self.env_nm, test=True) + self.env.seed(0) + self.env.reset() + self.env.reset() + self.max_iter = 10 + self.default_kwargs_att_budget = {"max_budget": 5., + "budget_per_ts": 1. / (12.*8), + "alarm_cost": 1., + "init_budget": 3.} + + def tearDown(self) -> None: + self.env.close() + + def test_create_ok(self): + """test that the stuff is created with the right parameters""" + assert self.env._has_attention_budget + assert self.env._attention_budget is not None + assert isinstance(self.env._attention_budget, LinearAttentionBudget) + assert abs(self.env._attention_budget._budget_per_ts - 1. / (12.*8)) <= 1e-6 + assert abs(self.env._attention_budget._max_budget - 5) <= 1e-6 + assert abs(self.env._attention_budget._alarm_cost - 1) <= 1e-6 + assert abs(self.env._attention_budget._current_budget - 3.) <= 1e-6 + + with self.assertRaises(Grid2OpException): + # it raises because the default reward: AlarmReward can only be used + # if there is an alarm budget + with make(self.env_nm, has_attention_budget=False) as env: + assert env._has_attention_budget is False + assert env._attention_budget is None + + with make(self.env_nm, has_attention_budget=False, reward_class=RedispReward) as env: + assert env._has_attention_budget is False + assert env._attention_budget is None + + with make(self.env_nm, kwargs_attention_budget={"max_budget": 15, + "budget_per_ts": 1, + "alarm_cost": 12, + "init_budget": 0}) as env: + assert env._has_attention_budget + assert env._attention_budget is not None + assert isinstance(env._attention_budget, LinearAttentionBudget) + assert abs(env._attention_budget._budget_per_ts - 1.) <= 1e-6 + assert abs(env._attention_budget._max_budget - 15) <= 1e-6 + assert abs(env._attention_budget._alarm_cost - 12) <= 1e-6 + assert abs(env._attention_budget._current_budget - 0.) <= 1e-6 + + def test_budget_increases_ok(self): + """test the attention budget properly increases when no alarm are raised + and that it does not exceed the maximum value""" + # check increaes ok normally + self.env.step(self.env.action_space()) + assert abs(self.env._attention_budget._current_budget - (3 + 1. / (12. * 8.))) <= 1e-6 + self.env.step(self.env.action_space()) + assert abs(self.env._attention_budget._current_budget - (3 + 2. / (12. * 8.))) <= 1e-6 + + # check that it does not "overflow" + with make(self.env_nm, kwargs_attention_budget={"max_budget": 5, + "budget_per_ts": 1, + "alarm_cost": 12, + "init_budget": 0}) as env: + env.step(self.env.action_space()) + assert abs(env._attention_budget._current_budget - 1) <= 1e-6 + env.step(self.env.action_space()) + assert abs(env._attention_budget._current_budget - 2) <= 1e-6 + env.step(self.env.action_space()) + assert abs(env._attention_budget._current_budget - 3) <= 1e-6 + env.step(self.env.action_space()) + assert abs(env._attention_budget._current_budget - 4) <= 1e-6 + env.step(self.env.action_space()) + assert abs(env._attention_budget._current_budget - 5) <= 1e-6 + env.step(self.env.action_space()) + assert abs(env._attention_budget._current_budget - 5) <= 1e-6 + + def test_alarm_in_legal_action_ok(self): + """I test the budget is properly updated when the action is legal and non ambiguous""" + act = self.env.action_space() + act.raise_alarm = [0] + self.env.step(act) + assert abs(self.env._attention_budget._current_budget - 2) <= 1e-6 + + def test_reset_ok(self): + self.env.step(self.env.action_space()) + assert abs(self.env._attention_budget._current_budget - (3 + 1. / (12. * 8.))) <= 1e-6 + self.env.reset() + assert abs(self.env._attention_budget._current_budget - 3) <= 1e-6 + + def test_illegal_action(self): + """illegal action should not modify the alarm budget""" + th_budget = 3. + act = self.env.action_space() + arr = 1 * act.set_bus + arr[:12] = 1 + act.set_bus = arr + obs, reward, done, info = self.env.step(act) + assert info["is_illegal"] + assert abs(self.env._attention_budget._current_budget - th_budget) <= 1e-6 + assert abs(self.env._attention_budget._current_budget - th_budget) <= 1e-6 + + act = self.env.action_space() + arr = 1 * act.set_bus + arr[:12] = 1 + act.set_bus = arr + act.raise_alarm = [0] + obs, reward, done, info = self.env.step(act) + assert info["is_illegal"] + assert abs(self.env._attention_budget._current_budget - th_budget) <= 1e-6 + assert abs(self.env._attention_budget._current_budget - th_budget) <= 1e-6 + + def test_ambiguous_action(self): + """ambiguous action should not modify the alarm budget""" + th_budget = 3. + act = self.env.action_space() + act.set_bus = [(0, 1)] + act.change_bus = [0] + obs, reward, done, info = self.env.step(act) + assert info["is_ambiguous"] + assert abs(self.env._attention_budget._current_budget - th_budget) <= 1e-6 + + act = self.env.action_space() + act.set_bus = [(0, 1)] + act.change_bus = [0] + act.raise_alarm = [0] + obs, reward, done, info = self.env.step(act) + assert info["is_ambiguous"] + assert abs(self.env._attention_budget._current_budget - th_budget) <= 1e-6 + + def test_alarm_obs_noalarm(self): + """test the observation is behaving correctly concerning the alarm part, when i don't send alarms""" + obs = self.env.reset() + assert abs(self.env._attention_budget._current_budget - 3.) <= 1e-6 + assert abs(obs.attention_budget - 3.) <= 1e-6 + obs, reward, done, info = self.env.step(self.env.action_space()) + nb_th = 3 + 1. / (12. * 8.) + assert abs(self.env._attention_budget._current_budget - nb_th) <= 1e-6 + assert abs(obs.attention_budget - nb_th) <= 1e-6 + assert obs.time_since_last_alarm == -1 + + def test_alarm_obs_whenalarm(self): + """test the observation is behaving correctly concerning the alarm part, when i send alarms""" + act = self.env.action_space() + act.raise_alarm = [0] + obs, reward, done, info = self.env.step(act) + nb_th = 2 + assert abs(self.env._attention_budget._current_budget - nb_th) <= 1e-6 + assert abs(obs.attention_budget - nb_th) <= 1e-6 + assert obs.time_since_last_alarm == 0 + assert np.all(obs.last_alarm == [1, -1, -1]) + + obs, reward, done, info = self.env.step(self.env.action_space()) + nb_th += 1. / (12. * 8.) + assert abs(self.env._attention_budget._current_budget - nb_th) <= 1e-6 + assert abs(obs.attention_budget - nb_th) <= 1e-6 + assert obs.time_since_last_alarm == 1 + assert np.all(obs.last_alarm == [1, -1, -1]) + + obs = self.env.reset() + nb_th = 3 + assert abs(self.env._attention_budget._current_budget - nb_th) <= 1e-6 + assert abs(obs.attention_budget - nb_th) <= 1e-6 + assert obs.time_since_last_alarm == -1 + assert np.all(obs.last_alarm == [-1, -1, -1]) + + def test_simulate_act_ok(self): + """test the attention budget when simulating an ok action""" + obs = self.env.reset() + act = self.env.action_space() + act.raise_alarm = [0] + act2 = self.env.action_space() + act2.raise_alarm = [1] + + # i simulate no action + sim_obs, *_ = obs.simulate(self.env.action_space()) + nb_th = 3 + 1. / (12. * 8.) + assert abs(sim_obs.attention_budget - nb_th) <= 1e-6 + assert sim_obs.time_since_last_alarm == -1 + assert np.all(sim_obs.last_alarm == [-1, -1, -1]) + + # i simulate an action, this should work as for step, if i do no actions + sim_obs, *_ = obs.simulate(act) + nb_th = 2 + assert abs(sim_obs.attention_budget - nb_th) <= 1e-6 + assert sim_obs.time_since_last_alarm == 0 + assert np.all(sim_obs.last_alarm == [1, -1, -1]) + + # i simulate no action, this should remove the previous stuff and work + sim_obs, *_ = obs.simulate(self.env.action_space()) + nb_th = 3 + 1. / (12. * 8.) + assert abs(sim_obs.attention_budget - nb_th) <= 1e-6 + assert sim_obs.time_since_last_alarm == -1 + assert np.all(sim_obs.last_alarm == [-1, -1, -1]) + + # i do a step and check now + obs, *_ = self.env.step(act) + + sim_obs, *_ = obs.simulate(self.env.action_space()) + nb_th = 2 + 1. / (12. * 8.) + assert abs(sim_obs.attention_budget - nb_th) <= 1e-6 + assert sim_obs.time_since_last_alarm == 1 + assert np.all(sim_obs.last_alarm == [1, -1, -1]) + + # i simulate an action, this should work as for step, if i do no actions + sim_obs, *_ = obs.simulate(act2) + nb_th = 1 + assert abs(sim_obs.attention_budget - nb_th) <= 1e-6 + assert sim_obs.time_since_last_alarm == 0 + assert np.all(sim_obs.last_alarm == [1, 2, -1]) + + def _aux_trigger_cascading_failure(self): + act_ko1 = self.env.action_space() + act_ko1.line_set_status = [(56, -1)] + obs, reward, done, info = self.env.step(act_ko1) + assert not done + assert reward == 0 + act_ko2 = self.env.action_space() + act_ko2.line_set_status = [(41, -1)] + obs, reward, done, info = self.env.step(act_ko2) + assert not done + assert reward == 0 + act_ko3 = self.env.action_space() + act_ko3.line_set_status = [(40, -1)] + obs, reward, done, info = self.env.step(act_ko3) + assert not done + assert reward == 0 + + act_ko4 = self.env.action_space() + act_ko4.line_set_status = [(39, -1)] + obs, reward, done, info = self.env.step(act_ko4) + assert not done + assert reward == 0 + + act_ko5 = self.env.action_space() + act_ko5.line_set_status = [(13, -1)] + obs, reward, done, info = self.env.step(act_ko5) + assert not done + assert reward == 0 + + def test_alarm_reward_simple(self): + """very basic test for the reward and """ + # normal step, no game over => 0 + obs, reward, done, info = self.env.step(self.env.action_space()) + assert reward == 0 + self.env.fast_forward_chronics(861) + obs, reward, done, info = self.env.step(self.env.action_space()) + assert not done + assert reward == 0 + + # end of an episode, no game over: +1 + obs, reward, done, info = self.env.step(self.env.action_space()) + assert done + assert reward == +1 + assert not obs.was_alarm_used_after_game_over + + def test_reward_game_over_connex(self): + """test i don't get any points if there is a game over for non connex grid""" + # game over not due to line disconnection, no points + obs = self.env.reset() + act_ko = self.env.action_space() + act_ko.gen_set_bus = [(0, -1)] + obs, reward, done, info = self.env.step(act_ko) + assert done + assert reward == -1 + assert not obs.was_alarm_used_after_game_over + + def test_reward_no_alarm(self): + """test that i don't get any points if i don't send any alarm""" + # FYI parrallel lines: + # 48, 49 || 18, 19 || 27, 28 || 37, 38 + + # game not due to line disconnection, but no alarm => no points + self._aux_trigger_cascading_failure() + obs, reward, done, info = self.env.step(self.env.action_space()) + assert done + assert reward == -1 + assert not obs.was_alarm_used_after_game_over + + def test_reward_wrong_area_wrong_time(self): + """test that i got a few point for the wrong area, but at the wrong time""" + # now i raise an alarm, and after i do a cascading failure (but i send a wrong alarm) + act = self.env.action_space() + act.raise_alarm = [0] + obs, reward, done, info = self.env.step(act) + self._aux_trigger_cascading_failure() + obs, reward, done, info = self.env.step(self.env.action_space()) + assert done + assert reward == 0.375 + assert obs.was_alarm_used_after_game_over + + def test_reward_right_area_not_best_time(self): + """test that i got some point for the right area, but at the wrong time""" + # now i raise an alarm, and after i do a cascading failure (and i send a right alarm) + act = self.env.action_space() + act.raise_alarm = [1] + obs, reward, done, info = self.env.step(act) + self._aux_trigger_cascading_failure() + obs, reward, done, info = self.env.step(self.env.action_space()) + assert done + assert reward == 0.75 + assert obs.was_alarm_used_after_game_over + + def test_reward_right_time_wrong_area(self): + """test that the alarm has half "value" if taken exactly at the right time but for the wrong area""" + # now i raise an alarm just at the right time, and after i do a cascading failure (wrong zone) + act = self.env.action_space() + act.raise_alarm = [0] + obs, reward, done, info = self.env.step(act) + for _ in range(6): + obs, reward, done, info = self.env.step(self.env.action_space()) + self._aux_trigger_cascading_failure() + obs, reward, done, info = self.env.step(self.env.action_space()) + assert done + assert reward == 0.5 + assert obs.was_alarm_used_after_game_over + + def test_reward_right_time_right_area(self): + """test that the alarm has perfect "value" if taken exactly at the right time and for the right area""" + # now i raise an alarm just at the right time, and after i do a cascading failure (right zone) + act = self.env.action_space() + act.raise_alarm = [1] + obs, reward, done, info = self.env.step(act) + for _ in range(6): + obs, reward, done, info = self.env.step(self.env.action_space()) + self._aux_trigger_cascading_failure() + obs, reward, done, info = self.env.step(self.env.action_space()) + assert done + assert reward == 1 + assert obs.was_alarm_used_after_game_over + + def test_reward_right_area_too_early(self): + """test that the alarm is not taken into account if send too early""" + # now i raise an alarm but too early, i don't get any points (even if right zone) + act = self.env.action_space() + act.raise_alarm = [1] + obs, reward, done, info = self.env.step(act) + for _ in range(6): + obs, reward, done, info = self.env.step(self.env.action_space()) + for _ in range(12): + obs, reward, done, info = self.env.step(self.env.action_space()) + self._aux_trigger_cascading_failure() + obs, reward, done, info = self.env.step(self.env.action_space()) + assert done + assert reward == -1 + assert not obs.was_alarm_used_after_game_over + + def test_reward_correct_alarmused_right_early(self): + """test that the maximum is taken, when an alarm is send at the right time, and another one too early""" + # now i raise two alarms: one at just the right time, another one a bit earlier, and i check the correct + # one is used + act = self.env.action_space() + act.raise_alarm = [1] + obs, reward, done, info = self.env.step(act) # a bit too early + for _ in range(3): + obs, reward, done, info = self.env.step(self.env.action_space()) + obs, reward, done, info = self.env.step(act) + for _ in range(6): + obs, reward, done, info = self.env.step(self.env.action_space()) # just at the right time + self._aux_trigger_cascading_failure() + obs, reward, done, info = self.env.step(self.env.action_space()) + assert done + assert reward == 1. # it should count this one + assert obs.was_alarm_used_after_game_over + + def test_reward_correct_alarmused_right_toolate(self): + """test that the maximum is taken, when an alarm is send at the right time, and another one too late""" + # now i raise two alarms: one at just the right time, another one a bit later, and i check the correct + # one is used + act = self.env.action_space() + act.raise_alarm = [1] + obs, reward, done, info = self.env.step(act) # just at the right time + for _ in range(3): + obs, reward, done, info = self.env.step(self.env.action_space()) + obs, reward, done, info = self.env.step(act) # a bit too early + for _ in range(2): + obs, reward, done, info = self.env.step(self.env.action_space()) + self._aux_trigger_cascading_failure() + obs, reward, done, info = self.env.step(self.env.action_space()) + assert done + assert reward == 1. # it should count this one + assert obs.was_alarm_used_after_game_over + + def test_runner(self): + """test i can create properly a runner""" + runner = Runner(**self.env.get_params_for_runner()) + + # normal run + res = runner.run(nb_episode=1, nb_process=1, max_iter=self.max_iter) + assert res[0][-1] == 10 + assert res[0][-2] == 10 + assert res[0][-3] == 1.0 + + # run + episode data + with tempfile.TemporaryDirectory() as f: + res = runner.run(nb_episode=1, nb_process=1, max_iter=self.max_iter, path_save=f) + ep_dat = EpisodeData.from_disk(agent_path=f, name=res[0][1]) + assert len(ep_dat) == 10 + assert ep_dat.observations[0].attention_budget == 3 + + def test_kwargs(self): + """test the get_kwargs function properly foward the attention budget""" + env2 = Environment(**self.env.get_kwargs()) + assert env2._has_attention_budget + assert env2._kwargs_attention_budget == self.default_kwargs_att_budget + assert env2._attention_budget_cls == LinearAttentionBudget + obs = env2.reset() + assert obs.attention_budget == 3 + obs, reward, done, info = env2.step(env2.action_space()) + assert obs.attention_budget == 3 + 1. / (12. * 8.) + + +if __name__ == "__main__": + unittest.main() diff --git a/grid2op/tests/test_Environment.py b/grid2op/tests/test_Environment.py index 60db879d0..12105bf49 100644 --- a/grid2op/tests/test_Environment.py +++ b/grid2op/tests/test_Environment.py @@ -21,9 +21,7 @@ from grid2op.Reward import L2RPNReward from grid2op.MakeEnv import make from grid2op.Rules import RulesChecker, DefaultRules -from grid2op.operator_attention import LinearAttentionBudget from grid2op.dtypes import dt_float -from grid2op.Reward import RedispReward DEBUG = False @@ -808,385 +806,5 @@ def test_can_change_max_iter(self): self.env.set_max_iter(0) -class TestAlarmFeature(unittest.TestCase): - def setUp(self) -> None: - self.env_nm = os.path.join(PATH_DATA_TEST, "l2rpn_neurips_2020_track1_with_alert") - with warnings.catch_warnings(): - warnings.filterwarnings("ignore") - self.env = make(self.env_nm, test=True) - self.env.seed(0) - self.env.reset() - self.env.reset() - - def tearDown(self) -> None: - self.env.close() - - def test_create_ok(self): - """test that the stuff is created with the right parameters""" - assert self.env._has_attention_budget - assert self.env._attention_budget is not None - assert isinstance(self.env._attention_budget, LinearAttentionBudget) - assert abs(self.env._attention_budget._budget_per_ts - 1. / (12.*8)) <= 1e-6 - assert abs(self.env._attention_budget._max_budget - 5) <= 1e-6 - assert abs(self.env._attention_budget._alarm_cost - 1) <= 1e-6 - assert abs(self.env._attention_budget._current_budget - 3.) <= 1e-6 - - with self.assertRaises(Grid2OpException): - # it raises because the default reward: AlarmReward can only be used - # if there is an alarm budget - with make(self.env_nm, has_attention_budget=False) as env: - assert env._has_attention_budget is False - assert env._attention_budget is None - - with make(self.env_nm, has_attention_budget=False, reward_class=RedispReward) as env: - assert env._has_attention_budget is False - assert env._attention_budget is None - - with make(self.env_nm, kwargs_attention_budget={"max_budget": 15, - "budget_per_ts": 1, - "alarm_cost": 12, - "init_budget": 0}) as env: - assert env._has_attention_budget - assert env._attention_budget is not None - assert isinstance(env._attention_budget, LinearAttentionBudget) - assert abs(env._attention_budget._budget_per_ts - 1.) <= 1e-6 - assert abs(env._attention_budget._max_budget - 15) <= 1e-6 - assert abs(env._attention_budget._alarm_cost - 12) <= 1e-6 - assert abs(env._attention_budget._current_budget - 0.) <= 1e-6 - - def test_budget_increases_ok(self): - """test the attention budget properly increases when no alarm are raised - and that it does not exceed the maximum value""" - # check increaes ok normally - self.env.step(self.env.action_space()) - assert abs(self.env._attention_budget._current_budget - (3 + 1. / (12. * 8.))) <= 1e-6 - self.env.step(self.env.action_space()) - assert abs(self.env._attention_budget._current_budget - (3 + 2. / (12. * 8.))) <= 1e-6 - - # check that it does not "overflow" - with make(self.env_nm, kwargs_attention_budget={"max_budget": 5, - "budget_per_ts": 1, - "alarm_cost": 12, - "init_budget": 0}) as env: - env.step(self.env.action_space()) - assert abs(env._attention_budget._current_budget - 1) <= 1e-6 - env.step(self.env.action_space()) - assert abs(env._attention_budget._current_budget - 2) <= 1e-6 - env.step(self.env.action_space()) - assert abs(env._attention_budget._current_budget - 3) <= 1e-6 - env.step(self.env.action_space()) - assert abs(env._attention_budget._current_budget - 4) <= 1e-6 - env.step(self.env.action_space()) - assert abs(env._attention_budget._current_budget - 5) <= 1e-6 - env.step(self.env.action_space()) - assert abs(env._attention_budget._current_budget - 5) <= 1e-6 - - def test_alarm_in_legal_action_ok(self): - """I test the budget is properly updated when the action is legal and non ambiguous""" - act = self.env.action_space() - act.raise_alarm = [0] - self.env.step(act) - assert abs(self.env._attention_budget._current_budget - 2) <= 1e-6 - - def test_reset_ok(self): - self.env.step(self.env.action_space()) - assert abs(self.env._attention_budget._current_budget - (3 + 1. / (12. * 8.))) <= 1e-6 - self.env.reset() - assert abs(self.env._attention_budget._current_budget - 3) <= 1e-6 - - def test_illegal_action(self): - """illegal action should not modify the alarm budget""" - th_budget = 3. - act = self.env.action_space() - arr = 1 * act.set_bus - arr[:12] = 1 - act.set_bus = arr - obs, reward, done, info = self.env.step(act) - assert info["is_illegal"] - assert abs(self.env._attention_budget._current_budget - th_budget) <= 1e-6 - assert abs(self.env._attention_budget._current_budget - th_budget) <= 1e-6 - - act = self.env.action_space() - arr = 1 * act.set_bus - arr[:12] = 1 - act.set_bus = arr - act.raise_alarm = [0] - obs, reward, done, info = self.env.step(act) - assert info["is_illegal"] - assert abs(self.env._attention_budget._current_budget - th_budget) <= 1e-6 - assert abs(self.env._attention_budget._current_budget - th_budget) <= 1e-6 - - def test_ambiguous_action(self): - """ambiguous action should not modify the alarm budget""" - th_budget = 3. - act = self.env.action_space() - act.set_bus = [(0, 1)] - act.change_bus = [0] - obs, reward, done, info = self.env.step(act) - assert info["is_ambiguous"] - assert abs(self.env._attention_budget._current_budget - th_budget) <= 1e-6 - - act = self.env.action_space() - act.set_bus = [(0, 1)] - act.change_bus = [0] - act.raise_alarm = [0] - obs, reward, done, info = self.env.step(act) - assert info["is_ambiguous"] - assert abs(self.env._attention_budget._current_budget - th_budget) <= 1e-6 - - def test_alarm_obs_noalarm(self): - """test the observation is behaving correctly concerning the alarm part, when i don't send alarms""" - obs = self.env.reset() - assert abs(self.env._attention_budget._current_budget - 3.) <= 1e-6 - assert abs(obs.attention_budget - 3.) <= 1e-6 - obs, reward, done, info = self.env.step(self.env.action_space()) - nb_th = 3 + 1. / (12. * 8.) - assert abs(self.env._attention_budget._current_budget - nb_th) <= 1e-6 - assert abs(obs.attention_budget - nb_th) <= 1e-6 - assert obs.time_since_last_alarm == -1 - - def test_alarm_obs_whenalarm(self): - """test the observation is behaving correctly concerning the alarm part, when i send alarms""" - act = self.env.action_space() - act.raise_alarm = [0] - obs, reward, done, info = self.env.step(act) - nb_th = 2 - assert abs(self.env._attention_budget._current_budget - nb_th) <= 1e-6 - assert abs(obs.attention_budget - nb_th) <= 1e-6 - assert obs.time_since_last_alarm == 0 - assert np.all(obs.last_alarm == [1, -1, -1]) - - obs, reward, done, info = self.env.step(self.env.action_space()) - nb_th += 1. / (12. * 8.) - assert abs(self.env._attention_budget._current_budget - nb_th) <= 1e-6 - assert abs(obs.attention_budget - nb_th) <= 1e-6 - assert obs.time_since_last_alarm == 1 - assert np.all(obs.last_alarm == [1, -1, -1]) - - obs = self.env.reset() - nb_th = 3 - assert abs(self.env._attention_budget._current_budget - nb_th) <= 1e-6 - assert abs(obs.attention_budget - nb_th) <= 1e-6 - assert obs.time_since_last_alarm == -1 - assert np.all(obs.last_alarm == [-1, -1, -1]) - - def test_simulate_act_ok(self): - """test the attention budget when simulating an ok action""" - obs = self.env.reset() - act = self.env.action_space() - act.raise_alarm = [0] - act2 = self.env.action_space() - act2.raise_alarm = [1] - - # i simulate no action - sim_obs, *_ = obs.simulate(self.env.action_space()) - nb_th = 3 + 1. / (12. * 8.) - assert abs(sim_obs.attention_budget - nb_th) <= 1e-6 - assert sim_obs.time_since_last_alarm == -1 - assert np.all(sim_obs.last_alarm == [-1, -1, -1]) - - # i simulate an action, this should work as for step, if i do no actions - sim_obs, *_ = obs.simulate(act) - nb_th = 2 - assert abs(sim_obs.attention_budget - nb_th) <= 1e-6 - assert sim_obs.time_since_last_alarm == 0 - assert np.all(sim_obs.last_alarm == [1, -1, -1]) - - # i simulate no action, this should remove the previous stuff and work - sim_obs, *_ = obs.simulate(self.env.action_space()) - nb_th = 3 + 1. / (12. * 8.) - assert abs(sim_obs.attention_budget - nb_th) <= 1e-6 - assert sim_obs.time_since_last_alarm == -1 - assert np.all(sim_obs.last_alarm == [-1, -1, -1]) - - # i do a step and check now - obs, *_ = self.env.step(act) - - sim_obs, *_ = obs.simulate(self.env.action_space()) - nb_th = 2 + 1. / (12. * 8.) - assert abs(sim_obs.attention_budget - nb_th) <= 1e-6 - assert sim_obs.time_since_last_alarm == 1 - assert np.all(sim_obs.last_alarm == [1, -1, -1]) - - # i simulate an action, this should work as for step, if i do no actions - sim_obs, *_ = obs.simulate(act2) - nb_th = 1 - assert abs(sim_obs.attention_budget - nb_th) <= 1e-6 - assert sim_obs.time_since_last_alarm == 0 - assert np.all(sim_obs.last_alarm == [1, 2, -1]) - - def _aux_trigger_cascading_failure(self): - act_ko1 = self.env.action_space() - act_ko1.line_set_status = [(56, -1)] - obs, reward, done, info = self.env.step(act_ko1) - assert not done - assert reward == 0 - act_ko2 = self.env.action_space() - act_ko2.line_set_status = [(41, -1)] - obs, reward, done, info = self.env.step(act_ko2) - assert not done - assert reward == 0 - act_ko3 = self.env.action_space() - act_ko3.line_set_status = [(40, -1)] - obs, reward, done, info = self.env.step(act_ko3) - assert not done - assert reward == 0 - - act_ko4 = self.env.action_space() - act_ko4.line_set_status = [(39, -1)] - obs, reward, done, info = self.env.step(act_ko4) - assert not done - assert reward == 0 - - act_ko5 = self.env.action_space() - act_ko5.line_set_status = [(13, -1)] - obs, reward, done, info = self.env.step(act_ko5) - assert not done - assert reward == 0 - - def test_alarm_reward_simple(self): - """very basic test for the reward and """ - # normal step, no game over => 0 - obs, reward, done, info = self.env.step(self.env.action_space()) - assert reward == 0 - self.env.fast_forward_chronics(861) - obs, reward, done, info = self.env.step(self.env.action_space()) - assert not done - assert reward == 0 - - # end of an episode, no game over: +1 - obs, reward, done, info = self.env.step(self.env.action_space()) - assert done - assert reward == +1 - assert not obs.was_alarm_used_after_game_over - - def test_reward_game_over_connex(self): - """test i don't get any points if there is a game over for non connex grid""" - # game over not due to line disconnection, no points - obs = self.env.reset() - act_ko = self.env.action_space() - act_ko.gen_set_bus = [(0, -1)] - obs, reward, done, info = self.env.step(act_ko) - assert done - assert reward == -1 - assert not obs.was_alarm_used_after_game_over - - def test_reward_no_alarm(self): - """test that i don't get any points if i don't send any alarm""" - # FYI parrallel lines: - # 48, 49 || 18, 19 || 27, 28 || 37, 38 - - # game not due to line disconnection, but no alarm => no points - self._aux_trigger_cascading_failure() - obs, reward, done, info = self.env.step(self.env.action_space()) - assert done - assert reward == -1 - assert not obs.was_alarm_used_after_game_over - - def test_reward_wrong_area_wrong_time(self): - """test that i got a few point for the wrong area, but at the wrong time""" - # now i raise an alarm, and after i do a cascading failure (but i send a wrong alarm) - act = self.env.action_space() - act.raise_alarm = [0] - obs, reward, done, info = self.env.step(act) - self._aux_trigger_cascading_failure() - obs, reward, done, info = self.env.step(self.env.action_space()) - assert done - assert reward == 0.375 - assert obs.was_alarm_used_after_game_over - - def test_reward_right_area_not_best_time(self): - """test that i got some point for the right area, but at the wrong time""" - # now i raise an alarm, and after i do a cascading failure (and i send a right alarm) - act = self.env.action_space() - act.raise_alarm = [1] - obs, reward, done, info = self.env.step(act) - self._aux_trigger_cascading_failure() - obs, reward, done, info = self.env.step(self.env.action_space()) - assert done - assert reward == 0.75 - assert obs.was_alarm_used_after_game_over - - def test_reward_right_time_wrong_area(self): - """test that the alarm has half "value" if taken exactly at the right time but for the wrong area""" - # now i raise an alarm just at the right time, and after i do a cascading failure (wrong zone) - act = self.env.action_space() - act.raise_alarm = [0] - obs, reward, done, info = self.env.step(act) - for _ in range(6): - obs, reward, done, info = self.env.step(self.env.action_space()) - self._aux_trigger_cascading_failure() - obs, reward, done, info = self.env.step(self.env.action_space()) - assert done - assert reward == 0.5 - assert obs.was_alarm_used_after_game_over - - def test_reward_right_time_right_area(self): - """test that the alarm has perfect "value" if taken exactly at the right time and for the right area""" - # now i raise an alarm just at the right time, and after i do a cascading failure (right zone) - act = self.env.action_space() - act.raise_alarm = [1] - obs, reward, done, info = self.env.step(act) - for _ in range(6): - obs, reward, done, info = self.env.step(self.env.action_space()) - self._aux_trigger_cascading_failure() - obs, reward, done, info = self.env.step(self.env.action_space()) - assert done - assert reward == 1 - assert obs.was_alarm_used_after_game_over - - def test_reward_right_area_too_early(self): - """test that the alarm is not taken into account if send too early""" - # now i raise an alarm but too early, i don't get any points (even if right zone) - act = self.env.action_space() - act.raise_alarm = [1] - obs, reward, done, info = self.env.step(act) - for _ in range(6): - obs, reward, done, info = self.env.step(self.env.action_space()) - for _ in range(12): - obs, reward, done, info = self.env.step(self.env.action_space()) - self._aux_trigger_cascading_failure() - obs, reward, done, info = self.env.step(self.env.action_space()) - assert done - assert reward == -1 - assert not obs.was_alarm_used_after_game_over - - def test_reward_correct_alarmused_right_early(self): - """test that the maximum is taken, when an alarm is send at the right time, and another one too early""" - # now i raise two alarms: one at just the right time, another one a bit earlier, and i check the correct - # one is used - act = self.env.action_space() - act.raise_alarm = [1] - obs, reward, done, info = self.env.step(act) # a bit too early - for _ in range(3): - obs, reward, done, info = self.env.step(self.env.action_space()) - obs, reward, done, info = self.env.step(act) - for _ in range(6): - obs, reward, done, info = self.env.step(self.env.action_space()) # just at the right time - self._aux_trigger_cascading_failure() - obs, reward, done, info = self.env.step(self.env.action_space()) - assert done - assert reward == 1. # it should count this one - assert obs.was_alarm_used_after_game_over - - def test_reward_correct_alarmused_right_toolate(self): - """test that the maximum is taken, when an alarm is send at the right time, and another one too late""" - # now i raise two alarms: one at just the right time, another one a bit later, and i check the correct - # one is used - act = self.env.action_space() - act.raise_alarm = [1] - obs, reward, done, info = self.env.step(act) # just at the right time - for _ in range(3): - obs, reward, done, info = self.env.step(self.env.action_space()) - obs, reward, done, info = self.env.step(act) # a bit too early - for _ in range(2): - obs, reward, done, info = self.env.step(self.env.action_space()) - self._aux_trigger_cascading_failure() - obs, reward, done, info = self.env.step(self.env.action_space()) - assert done - assert reward == 1. # it should count this one - assert obs.was_alarm_used_after_game_over - - if __name__ == "__main__": unittest.main()