Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[RLlib] Cleanup examples folder #16: Add missing docstrings to 2 connector example scripts. #45864

Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 27 additions & 25 deletions rllib/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -2086,6 +2086,27 @@ py_test(
# tagged by @OldAPIStack and/or @HybridAPIStack
# ----------------------

# subdirectory: actions/

# Nested action spaces (flattening obs and learning w/ multi-action distribution).
py_test(
name = "examples/actions/nested_action_spaces_ppo",
main = "examples/actions/nested_action_spaces.py",
tags = ["team:rllib", "exclusive", "examples"],
size = "large",
srcs = ["examples/actions/nested_action_spaces.py"],
args = ["--enable-new-api-stack", "--as-test", "--framework=torch", "--stop-reward=-500.0", "--algo=PPO"]
)

py_test(
name = "examples/actions/nested_action_spaces_multi_agent_ppo",
main = "examples/actions/nested_action_spaces.py",
tags = ["team:rllib", "exclusive", "examples"],
size = "large",
srcs = ["examples/actions/nested_action_spaces.py"],
args = ["--enable-new-api-stack", "--as-test", "--num-agents=2", "--framework=torch", "--stop-reward=-1000.0", "--algo=PPO"]
)

# subdirectory: algorithms/

#@OldAPIStack
Expand Down Expand Up @@ -2213,41 +2234,22 @@ py_test(
args = ["--enable-new-api-stack", "--num-agents=2", "--stop-iter=2", "--framework=torch", "--algo=PPO", "--num-env-runners=4", "--num-cpus=6"]
)

# Nested action spaces (flattening obs and learning w/ multi-action distribution).
py_test(
name = "examples/connectors/nested_action_spaces_ppo",
main = "examples/connectors/nested_action_spaces.py",
tags = ["team:rllib", "exclusive", "examples"],
size = "large",
srcs = ["examples/connectors/nested_action_spaces.py"],
args = ["--enable-new-api-stack", "--as-test", "--framework=torch", "--stop-reward=-500.0", "--algo=PPO"]
)

py_test(
name = "examples/connectors/nested_action_spaces_multi_agent_ppo",
main = "examples/connectors/nested_action_spaces.py",
tags = ["team:rllib", "exclusive", "examples"],
size = "large",
srcs = ["examples/connectors/nested_action_spaces.py"],
args = ["--enable-new-api-stack", "--as-test", "--num-agents=2", "--framework=torch", "--stop-reward=-1000.0", "--algo=PPO"]
)

# Nested observation spaces (flattening).
py_test(
name = "examples/connectors/nested_observation_spaces_ppo",
main = "examples/connectors/nested_observation_spaces.py",
name = "examples/connectors/flatten_observations_dict_space_ppo",
main = "examples/connectors/flatten_observations_dict_space.py",
tags = ["team:rllib", "exclusive", "examples"],
size = "medium",
srcs = ["examples/connectors/nested_observation_spaces.py"],
srcs = ["examples/connectors/flatten_observations_dict_space.py"],
args = ["--enable-new-api-stack", "--as-test", "--stop-reward=400.0", "--framework=torch", "--algo=PPO"]
)

py_test(
name = "examples/connectors/nested_observation_spaces_multi_agent_ppo",
main = "examples/connectors/nested_observation_spaces.py",
name = "examples/connectors/flatten_observations_dict_space_multi_agent_ppo",
main = "examples/connectors/flatten_observations_dict_space.py",
tags = ["team:rllib", "exclusive", "examples"],
size = "medium",
srcs = ["examples/connectors/nested_observation_spaces.py"],
srcs = ["examples/connectors/flatten_observations_dict_space.py"],
args = ["--enable-new-api-stack", "--num-agents=2", "--as-test", "--stop-reward=800.0", "--framework=torch", "--algo=PPO"]
)

Expand Down
4 changes: 2 additions & 2 deletions rllib/connectors/env_to_module/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
FlattenObservations,
)
from ray.rllib.connectors.env_to_module.prev_actions_prev_rewards import (
PrevActionsPrevRewardsConnector,
PrevActionsPrevRewards,
)
from ray.rllib.connectors.env_to_module.write_observations_to_episodes import (
WriteObservationsToEpisodes,
Expand All @@ -29,6 +29,6 @@
"EnvToModulePipeline",
"FlattenObservations",
"NumpyToTensor",
"PrevActionsPrevRewardsConnector",
"PrevActionsPrevRewards",
"WriteObservationsToEpisodes",
]
80 changes: 37 additions & 43 deletions rllib/connectors/env_to_module/flatten_observations.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import tree # pip install dm_tree

from ray.rllib.connectors.connector_v2 import ConnectorV2
from ray.rllib.core.columns import Columns
from ray.rllib.core.rl_module.rl_module import RLModule
from ray.rllib.utils.annotations import override
from ray.rllib.utils.numpy import flatten_inputs_to_1d_tensor
Expand All @@ -19,18 +18,12 @@
class FlattenObservations(ConnectorV2):
"""A connector piece that flattens all observation components into a 1D array.

- Only works on data that has already been added to the batch.
- This connector makes the assumption that under the Columns.OBS key in batch,
there is either a list of individual env observations to be flattened (single-agent
case) or a dict mapping agent- and module IDs to lists of data items to be
flattened (multi-agent case).
- Does NOT work in a Learner pipeline as it operates on individual observation
items (as opposed to batched/time-ranked data).
- Therefore, assumes that the altered (flattened) observations will be written
back into the episode by a later connector piece in the env-to-module pipeline
(which this piece is part of as well).
- Does NOT read any information from the given list of Episode objects.
- Does NOT write any observations (or other data) to the given Episode objects.
- Works directly on the incoming episodes list and changes the last observation
in-place (write the flattened observation back into the episode).
- This connector does NOT alter the incoming batch (`data`) when called.
- This connector does NOT work in a `LearnerConnectorPipeline` because it requires
the incoming episodes to still be ongoing (in progress) as it only alters the
latest observation, not all observations in an episode.

.. testcode::

Expand Down Expand Up @@ -169,40 +162,41 @@ def __call__(
shared_data: Optional[dict] = None,
**kwargs,
) -> Any:
observations = data.get(Columns.OBS)

if observations is None:
raise ValueError(
f"`batch` must already have a column named {Columns.OBS} in it "
f"for this connector to work!"
)

# Process each item under the Columns.OBS key individually and flatten
# it. We are using the `ConnectorV2.foreach_batch_item_change_in_place` API,
# allowing us to not worry about multi- or single-agent setups and returning
# the new version of each item we are iterating over.
self.foreach_batch_item_change_in_place(
batch=data,
column=Columns.OBS,
func=(
lambda item, eps_id, agent_id, module_id: (
# Multi-agent AND skip this AgentID.
item
if self._agent_ids and agent_id not in self._agent_ids
# Single-agent or flatten this AgentIDs observation.
for sa_episode in self.single_agent_episode_iterator(
episodes, agents_that_stepped_only=True
):
# Episode is not finalized yet and thus still operates on lists of items.
assert not sa_episode.is_finalized

last_obs = sa_episode.get_observations(-1)

if self._multi_agent:
flattened_obs = {
agent_obs
if agent_id not in self._agent_ids
else flatten_inputs_to_1d_tensor(
item,
inputs=agent_obs,
# In the multi-agent case, we need to use the specific agent's
# space struct, not the multi-agent observation space dict.
(
self._input_obs_base_struct
if not agent_id
else self._input_obs_base_struct[agent_id]
),
# Our items are bare observations (no batch axis present).
spaces_struct=self._input_obs_base_struct[agent_id],
# Our items are individual observations (no batch axis present).
batch_axis=False,
)
for agent_id, agent_obs in last_obs.items()
}
else:
flattened_obs = flatten_inputs_to_1d_tensor(
inputs=last_obs,
spaces_struct=self._input_obs_base_struct,
# Our items are individual observations (no batch axis present).
batch_axis=False,
)
),
)

# Write new observation directly back into the episode.
sa_episode.set_observations(at_indices=-1, new_data=flattened_obs)
# We set the Episode's observation space to ours so that we can safely
# set the last obs to the new value (without causing a space mismatch
# error).
sa_episode.observation_space = self.observation_space

return data
15 changes: 7 additions & 8 deletions rllib/connectors/env_to_module/mean_std_filter.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from typing import Any, Dict, List, Optional
from gymnasium.spaces import Discrete, MultiDiscrete

import gymnasium as gym
from gymnasium.spaces import Discrete, MultiDiscrete
import numpy as np
import tree

Expand All @@ -19,6 +19,8 @@
class MeanStdFilter(ConnectorV2):
"""A connector used to mean-std-filter observations.



Incoming observations are filtered such that the output of this filter is on
average 0.0 and has a standard deviation of 1.0. If the observation space is
a (possibly nested) dict, this filtering is applied separately per element of
Expand Down Expand Up @@ -121,13 +123,10 @@ def __call__(
sa_obs, update=self._update_stats
)
sa_episode.set_observations(at_indices=-1, new_data=normalized_sa_obs)

if len(sa_episode) == 0:
# TODO (sven): This is kind of a hack.
# We set the Episode's observation space to ours so that we can safely
# set the last obs to the new value (without causing a space mismatch
# error).
sa_episode.observation_space = self.observation_space
# We set the Episode's observation space to ours so that we can safely
# set the last obs to the new value (without causing a space mismatch
# error).
sa_episode.observation_space = self.observation_space

# Leave `data` as is. RLlib's default connector will automatically
# populate the OBS column therein from the episodes' now transformed
Expand Down
47 changes: 14 additions & 33 deletions rllib/connectors/env_to_module/prev_actions_prev_rewards.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,13 @@
import numpy as np

from ray.rllib.connectors.connector_v2 import ConnectorV2
from ray.rllib.core.columns import Columns
from ray.rllib.core.rl_module.rl_module import RLModule
from ray.rllib.utils.annotations import override
from ray.rllib.utils.spaces.space_utils import batch, flatten_to_single_ndarray
from ray.rllib.utils.typing import EpisodeType


class PrevActionsPrevRewardsConnector(ConnectorV2):
class PrevActionsPrevRewards(ConnectorV2):
"""A connector piece that adds previous rewards and actions to the input obs.

- Requires Columns.OBS to be already a part of the batch.
Expand All @@ -36,8 +35,6 @@ class PrevActionsPrevRewardsConnector(ConnectorV2):
"""

ORIG_OBS_KEY = "_orig_obs"
PREV_ACTIONS_KEY = "prev_actions"
PREV_REWARDS_KEY = "prev_rewards"

@override(ConnectorV2)
def recompute_observation_space_from_input_spaces(self):
Expand All @@ -64,7 +61,7 @@ def __init__(
n_prev_rewards: int = 1,
**kwargs,
):
"""Initializes a PrevActionsPrevRewardsConnector instance.
"""Initializes a PrevActionsPrevRewards instance.

Args:
multi_agent: Whether this is a connector operating on a multi-agent
Expand Down Expand Up @@ -108,23 +105,16 @@ def __call__(
shared_data: Optional[dict] = None,
**kwargs,
) -> Any:
observations = data.get(Columns.OBS)

if observations is None:
raise ValueError(
f"`batch` must already have a column named {Columns.OBS} in it "
f"for this connector to work!"
)

new_obs = []
for sa_episode, orig_obs in self.single_agent_episode_iterator(
episodes, zip_with_batch_column=observations
for sa_episode in self.single_agent_episode_iterator(
episodes, agents_that_stepped_only=True
):
# Episode is not finalized yet and thus still operates on lists of items.
assert not sa_episode.is_finalized

augmented_obs = {self.ORIG_OBS_KEY: sa_episode.get_observations(-1)}

if self.n_prev_actions:
prev_n_actions = flatten_to_single_ndarray(
augmented_obs[self.PREV_ACTIONS_KEY] = flatten_to_single_ndarray(
batch(
sa_episode.get_actions(
indices=slice(-self.n_prev_actions, None),
Expand All @@ -135,28 +125,19 @@ def __call__(
)

if self.n_prev_rewards:
prev_n_rewards = np.array(
augmented_obs[self.PREV_REWARDS_KEY] = np.array(
sa_episode.get_rewards(
indices=slice(-self.n_prev_rewards, None),
fill=0.0,
)
)

new_obs.append(
{
self.ORIG_OBS_KEY: orig_obs,
self.PREV_ACTIONS_KEY: prev_n_actions,
self.PREV_REWARDS_KEY: prev_n_rewards,
}
)

# Convert the observations in the batch into a dict with the keys:
# "_obs", "_prev_rewards", and "_prev_actions".
self.foreach_batch_item_change_in_place(
batch=data,
column=Columns.OBS,
func=lambda orig_obs, eps_id, agent_id, module_id: new_obs.pop(0),
)
# Write new observation directly back into the episode.
sa_episode.set_observations(at_indices=-1, new_data=augmented_obs)
# We set the Episode's observation space to ours so that we can safely
# set the last obs to the new value (without causing a space mismatch
# error).
sa_episode.observation_space = self.observation_space

return data

Expand Down
Empty file.
Original file line number Diff line number Diff line change
@@ -1,11 +1,7 @@
from gymnasium.spaces import Dict, Tuple, Box, Discrete, MultiDiscrete

from ray.tune.registry import register_env
from ray.rllib.connectors.env_to_module import (
AddObservationsFromEpisodesToBatch,
FlattenObservations,
WriteObservationsToEpisodes,
)
from ray.rllib.connectors.env_to_module import FlattenObservations
from ray.rllib.examples.envs.classes.multi_agent import (
MultiAgentNestedSpaceRepeatAfterMeEnv,
)
Expand All @@ -26,13 +22,13 @@
if __name__ == "__main__":
args = parser.parse_args()

assert (
args.enable_new_api_stack
), "Must set --enable-new-api-stack when running this script!"

# Define env-to-module-connector pipeline for the new stack.
def _env_to_module_pipeline(env):
return [
AddObservationsFromEpisodesToBatch(),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are the rest of the connectors now default?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All off-the-shelf connectors now directly operate on the episodes now (except for framestacking for performance reasons):

  • read last obs from episode
  • alter that last obs
  • directly write it back into the episode via episode.set_observations.

This way, RLlib's default pieces (which come after the user defined ones) only see the already altered episode and thus properly extract the observations from those and put them into the RLModule batch.

This way, things work more uniformly. Reading the last obs from an episode and writing it back is super fast, b/c we are still operating on simple lists in the EnvRunner (not numpy'ized yet).

FlattenObservations(multi_agent=args.num_agents > 0),
WriteObservationsToEpisodes(),
]
return FlattenObservations(multi_agent=args.num_agents > 0)

# Register our environment with tune.
if args.num_agents > 0:
Expand Down
20 changes: 10 additions & 10 deletions rllib/examples/checkpoints/checkpoint_by_custom_criteria.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
"""Example extracting a checkpoint from n trials using one or more custom criteria.

This example:
- runs a CartPole experiment with three different learning rates (three tune
"trials"). During the experiment, for each trial, we create a checkpoint at each
iteration.
- at the end of the experiment, we compare the trials and pick the one that performed
best, based on the criterion: Lowest episode count per single iteration (for CartPole,
a low episode count means the episodes are very long and thus the reward is also very
high).
- from that best trial (with the lowest episode count), we then pick those checkpoints
that a) have the lowest policy loss (good) and b) have the highest value function loss
(bad).
- runs a CartPole experiment with three different learning rates (three tune
"trials"). During the experiment, for each trial, we create a checkpoint at each
iteration.
- at the end of the experiment, we compare the trials and pick the one that
performed best, based on the criterion: Lowest episode count per single iteration
(for CartPole, a low episode count means the episodes are very long and thus the
reward is also very high).
- from that best trial (with the lowest episode count), we then pick those
checkpoints that a) have the lowest policy loss (good) and b) have the highest value
function loss (bad).


How to run this script
Expand Down
Loading
Loading