Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/memory error sampling #280

Merged
merged 12 commits into from
May 9, 2024
24 changes: 23 additions & 1 deletion howto/logs_and_checkpoints.md
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,8 @@ Then, the metrics that will be logged are the `key0` and the `key2`. The `key5`
By default the checkpointing is enabled with the following settings:

```yaml
# sheeprl/configs/checkpoint/default.yaml

every: 100
resume_from: null
save_last: True
Expand All @@ -180,4 +182,24 @@ meaning that:

> [!NOTE]
>
> When restarting an experiment from a specific checkpoint (`resume_from=/path/to/checkpoint.ckpt`), it is **mandatory** to pass as arguments the same configurations of the experiment you want to restart. This is due to the way Hydra creates the folder in which it saves configs: if you do not pass the same configurations, you may have an unexpected log directory (i.e., the folder is created in the wrong folder).
> When restarting an experiment from a specific checkpoint (`resume_from=/path/to/checkpoint.ckpt`), it is **mandatory** to pass as arguments the same configurations of the experiment you want to restart. This is due to the way Hydra creates the folder in which it saves configs: if you do not pass the same configurations, you may have an unexpected log directory (i.e., the folder is created in the wrong folder).

### Buffer checkpoint

For off-policy algorithms like SAC or Dreamer there is the possibility to save the replay buffer in the checkpoint by setting `buffer.checkpoint=True` from the CLI or by setting the corresponding parameter in the buffer yaml config:

```yaml
# sheeprl/configs/buffer/default.yaml

size: ???
memmap: True
validate_args: False
from_numpy: False
michele-milesi marked this conversation as resolved.
Show resolved Hide resolved
checkpoint: False # Used only for off-policy algorithms
```

There can be few scenarios to pay attention to:

* If the buffer is memory-mapped (i.e. `buffer.memmap=True`) and one saves the buffer in the checkpoint then one **mustn't delete the buffer folder** of the stopped experiment: if the buffer is memory-mapped a file for every key saved in the replay buffer is created on disk (`observations.memmap`, `rewards.memmap` for example) and when the experiment is resumed those files are read back from the exact same location
michele-milesi marked this conversation as resolved.
Show resolved Hide resolved
* If the buffer is memory-mapped (i.e. `buffer.memmap=True`), one saves the buffer in the checkpoint and the buffer has been filled completely during the previous experiment (meaning that the olders trajectories have been overwritten by newer ones) then it could happen that the agent will be trained from "future" trajectories coming from a "future" policy. To be more precise the buffer is simply a pre-allocated numpy-array with an attribute `pos` that points to the first free slot to be written; if we are using a `sheeprl.data.buffers.SequentialReplayBuffer` we sample sequential sequences in `[0, pos - sequence_length) ∪ [pos, buffer_size)` or simply `[0, pos - sequence_length)` depending on whether the buffer has been filled or not respectively. When we save the buffer into the checkpoint we save all the relevant information regarding it (the `pos` attribute and the path to the memory-mapped files, which represents the buffer content to be retrieved upon resuming). Suppose that we have saved a checkpoint at step `N` and the experiment have gone further for `K < N` steps before it stops, with the buffer that had already been filled at least one time. When we resume the buffer is laoded from the checkpoint, meaning that the `pos` attribute points at the same position it was pointing at step `N` and because we have memory-mapped our buffer we find in `[pos, pos + K]` a bunch of trajectories that comes from a "future" policy: the one that we were training in the previous experiment and stopped! Currently we don't know if this can cause problems to the agent and neither we have found a nice solution to mitigate this problem. We have thought at a bunch of ways to solve this problem: one is to memmap the buffer metadata like the current `pos`: in this way when we load the buffer from the checkpoint we can remove all the unwanted trajectories in `[old_pos, current_pos]`; this could potentially erase a lot of the buffer content if for example one has a checkpoint at step `N` and the experiment stopped at step `2N - 1`. Another solution could be to employ an online queue to save the trajectories momentarily into and flush the queue to the replay buffer only upon checkpointing; the problem with this solution is that one has to maintain in memory a lot of info and the RAM could explode easily if one is working with images (this can be avoided by also memory-mapping the online queue). Practically, another possible solution is to set the `algo.learning_starts=K` from the CLI or in the algorithm section in the experiment config: in this way all the future trajectories will be erased by random samples sampled from the resumed agent.
belerico marked this conversation as resolved.
Show resolved Hide resolved
* In any case, when the checkpoint is resumed the buffer **could be potentially pre-filled for `algo.learning_starts` steps** with random actions sapled from the resumed agent. If you don't want to pre-fill the buffer set `algo.learning_starts=0`
2 changes: 1 addition & 1 deletion howto/register_external_algorithm.md
Original file line number Diff line number Diff line change
Expand Up @@ -591,7 +591,7 @@ def ext_sota_main(fabric: Fabric, cfg: Dict[str, Any]):

for update in range(start_step, num_updates + 1):
for _ in range(0, cfg.algo.rollout_steps):
policy_step += cfg.env.num_envs * world_size
policy_step += policy_steps_per_update

# Measure environment interaction time: this considers both the model forward
# to get the action given the observation and the time taken into the environment
Expand Down
2 changes: 1 addition & 1 deletion howto/register_new_algorithm.md
Original file line number Diff line number Diff line change
Expand Up @@ -590,7 +590,7 @@ def sota_main(fabric: Fabric, cfg: Dict[str, Any]):

for update in range(start_step, num_updates + 1):
for _ in range(0, cfg.algo.rollout_steps):
policy_step += cfg.env.num_envs * world_size
policy_step += policy_steps_per_update

# Measure environment interaction time: this considers both the model forward
# to get the action given the observation and the time taken into the environment
Expand Down
2 changes: 1 addition & 1 deletion sheeprl/algos/a2c/a2c.py
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ def main(fabric: Fabric, cfg: Dict[str, Any]):
for update in range(1, num_updates + 1):
with torch.inference_mode():
for _ in range(0, cfg.algo.rollout_steps):
policy_step += cfg.env.num_envs * world_size
policy_step += policy_steps_per_update

# Measure environment interaction time: this considers both the model forward
# to get the action given the observation and the time taken into the environment
Expand Down
13 changes: 8 additions & 5 deletions sheeprl/algos/dreamer_v1/dreamer_v1.py
Original file line number Diff line number Diff line change
Expand Up @@ -508,12 +508,12 @@ def main(fabric: Fabric, cfg: Dict[str, Any]):
last_log = state["last_log"] if cfg.checkpoint.resume_from else 0
last_checkpoint = state["last_checkpoint"] if cfg.checkpoint.resume_from else 0
policy_steps_per_update = int(cfg.env.num_envs * world_size)
num_updates = int(cfg.algo.total_steps // policy_steps_per_update) if not cfg.dry_run else 1
num_updates = cfg.algo.total_steps // policy_steps_per_update if not cfg.dry_run else 1
learning_starts = (cfg.algo.learning_starts // policy_steps_per_update) if not cfg.dry_run else 0
prefill_steps = learning_starts + start_step
if cfg.checkpoint.resume_from:
cfg.algo.per_rank_batch_size = state["batch_size"] // world_size
if not cfg.buffer.checkpoint:
learning_starts += start_step
learning_starts += start_step

# Create Ratio class
ratio = Ratio(cfg.algo.replay_ratio, pretrain_steps=cfg.algo.per_rank_pretrain_steps)
Expand Down Expand Up @@ -552,7 +552,7 @@ def main(fabric: Fabric, cfg: Dict[str, Any]):

cumulative_per_rank_gradient_steps = 0
for update in range(start_step, num_updates + 1):
policy_step += cfg.env.num_envs * world_size
policy_step += policy_steps_per_update

with torch.inference_mode():
# Measure environment interaction time: this considers both the model forward
Expand Down Expand Up @@ -644,7 +644,10 @@ def main(fabric: Fabric, cfg: Dict[str, Any]):

# Train the agent
if update >= learning_starts:
per_rank_gradient_steps = ratio(policy_step / world_size)
ratio_steps = policy_step - prefill_steps
if update == learning_starts:
ratio_steps += policy_steps_per_update
per_rank_gradient_steps = ratio(ratio_steps / world_size)
if per_rank_gradient_steps > 0:
with timer("Time/train_time", SumMetric, sync_on_compute=cfg.metric.sync_on_compute):
sample = rb.sample_tensors(
Expand Down
11 changes: 7 additions & 4 deletions sheeprl/algos/dreamer_v2/dreamer_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -533,10 +533,10 @@ def main(fabric: Fabric, cfg: Dict[str, Any]):
policy_steps_per_update = int(cfg.env.num_envs * world_size)
num_updates = cfg.algo.total_steps // policy_steps_per_update if not cfg.dry_run else 1
learning_starts = cfg.algo.learning_starts // policy_steps_per_update if not cfg.dry_run else 0
prefill_steps = learning_starts + start_step
if cfg.checkpoint.resume_from:
cfg.algo.per_rank_batch_size = state["batch_size"] // world_size
if not cfg.buffer.checkpoint:
learning_starts += start_step
learning_starts += start_step

# Create Ratio class
ratio = Ratio(cfg.algo.replay_ratio, pretrain_steps=cfg.algo.per_rank_pretrain_steps)
Expand Down Expand Up @@ -577,7 +577,7 @@ def main(fabric: Fabric, cfg: Dict[str, Any]):

cumulative_per_rank_gradient_steps = 0
for update in range(start_step, num_updates + 1):
policy_step += cfg.env.num_envs * world_size
policy_step += policy_steps_per_update

with torch.inference_mode():
# Measure environment interaction time: this considers both the model forward
Expand Down Expand Up @@ -672,7 +672,10 @@ def main(fabric: Fabric, cfg: Dict[str, Any]):

# Train the agent
if update >= learning_starts:
per_rank_gradient_steps = ratio(policy_step / world_size)
ratio_steps = policy_step - prefill_steps
if update == learning_starts:
ratio_steps += policy_steps_per_update
per_rank_gradient_steps = ratio(ratio_steps / world_size)
if per_rank_gradient_steps > 0:
local_data = rb.sample_tensors(
batch_size=cfg.algo.per_rank_batch_size,
Expand Down
13 changes: 8 additions & 5 deletions sheeprl/algos/dreamer_v3/dreamer_v3.py
Original file line number Diff line number Diff line change
Expand Up @@ -503,12 +503,12 @@ def main(fabric: Fabric, cfg: Dict[str, Any]):
last_log = state["last_log"] if cfg.checkpoint.resume_from else 0
last_checkpoint = state["last_checkpoint"] if cfg.checkpoint.resume_from else 0
policy_steps_per_update = int(cfg.env.num_envs * fabric.world_size)
num_updates = int(cfg.algo.total_steps // policy_steps_per_update) if not cfg.dry_run else 1
num_updates = cfg.algo.total_steps // policy_steps_per_update if not cfg.dry_run else 1
learning_starts = cfg.algo.learning_starts // policy_steps_per_update if not cfg.dry_run else 0
prefill_steps = learning_starts + start_step
if cfg.checkpoint.resume_from:
cfg.algo.per_rank_batch_size = state["batch_size"] // fabric.world_size
if not cfg.buffer.checkpoint:
learning_starts += start_step
learning_starts += start_step

# Create Ratio class
ratio = Ratio(cfg.algo.replay_ratio, pretrain_steps=cfg.algo.per_rank_pretrain_steps)
Expand Down Expand Up @@ -544,7 +544,7 @@ def main(fabric: Fabric, cfg: Dict[str, Any]):

cumulative_per_rank_gradient_steps = 0
for update in range(start_step, num_updates + 1):
policy_step += cfg.env.num_envs * world_size
policy_step += policy_steps_per_update

with torch.inference_mode():
# Measure environment interaction time: this considers both the model forward
Expand Down Expand Up @@ -654,7 +654,10 @@ def main(fabric: Fabric, cfg: Dict[str, Any]):

# Train the agent
if update >= learning_starts:
per_rank_gradient_steps = ratio(policy_step / world_size)
ratio_steps = policy_step - prefill_steps
if update == learning_starts:
ratio_steps += policy_steps_per_update
per_rank_gradient_steps = ratio(ratio_steps / world_size)
if per_rank_gradient_steps > 0:
local_data = rb.sample_tensors(
cfg.algo.per_rank_batch_size,
Expand Down
11 changes: 7 additions & 4 deletions sheeprl/algos/droq/droq.py
Original file line number Diff line number Diff line change
Expand Up @@ -259,12 +259,12 @@ def main(fabric: Fabric, cfg: Dict[str, Any]):
last_log = state["last_log"] if cfg.checkpoint.resume_from else 0
last_checkpoint = state["last_checkpoint"] if cfg.checkpoint.resume_from else 0
policy_steps_per_update = int(cfg.env.num_envs * fabric.world_size)
num_updates = int(cfg.algo.total_steps // policy_steps_per_update) if not cfg.dry_run else 1
num_updates = cfg.algo.total_steps // policy_steps_per_update if not cfg.dry_run else 1
learning_starts = cfg.algo.learning_starts // policy_steps_per_update if not cfg.dry_run else 0
prefill_steps = learning_starts + start_step
if cfg.checkpoint.resume_from:
cfg.algo.per_rank_batch_size = state["batch_size"] // fabric.world_size
if not cfg.buffer.checkpoint:
learning_starts += start_step
learning_starts += start_step

# Create Ratio class
ratio = Ratio(cfg.algo.replay_ratio, pretrain_steps=cfg.algo.per_rank_pretrain_steps)
Expand Down Expand Up @@ -346,7 +346,10 @@ def main(fabric: Fabric, cfg: Dict[str, Any]):

# Train the agent
if update >= learning_starts:
per_rank_gradient_steps = ratio(policy_step / world_size)
ratio_steps = policy_step - prefill_steps
if update == learning_starts:
ratio_steps += policy_steps_per_update
per_rank_gradient_steps = ratio(ratio_steps / world_size)
if per_rank_gradient_steps > 0:
train(
fabric,
Expand Down
13 changes: 8 additions & 5 deletions sheeprl/algos/p2e_dv1/p2e_dv1_exploration.py
Original file line number Diff line number Diff line change
Expand Up @@ -532,12 +532,12 @@ def main(fabric: Fabric, cfg: Dict[str, Any]):
last_log = state["last_log"] if cfg.checkpoint.resume_from else 0
last_checkpoint = state["last_checkpoint"] if cfg.checkpoint.resume_from else 0
policy_steps_per_update = int(cfg.env.num_envs * world_size)
num_updates = int(cfg.algo.total_steps // policy_steps_per_update) if not cfg.dry_run else 1
num_updates = cfg.algo.total_steps // policy_steps_per_update if not cfg.dry_run else 1
learning_starts = (cfg.algo.learning_starts // policy_steps_per_update) if not cfg.dry_run else 0
prefill_steps = learning_starts + start_step
if cfg.checkpoint.resume_from:
cfg.algo.per_rank_batch_size = state["batch_size"] // world_size
if not cfg.buffer.checkpoint:
learning_starts += start_step
learning_starts += start_step

# Create Ratio class
ratio = Ratio(cfg.algo.replay_ratio, pretrain_steps=cfg.algo.per_rank_pretrain_steps)
Expand Down Expand Up @@ -576,7 +576,7 @@ def main(fabric: Fabric, cfg: Dict[str, Any]):

cumulative_per_rank_gradient_steps = 0
for update in range(start_step, num_updates + 1):
policy_step += cfg.env.num_envs * world_size
policy_step += policy_steps_per_update

with torch.inference_mode():
# Measure environment interaction time: this considers both the model forward
Expand Down Expand Up @@ -668,7 +668,10 @@ def main(fabric: Fabric, cfg: Dict[str, Any]):

# Train the agent
if update >= learning_starts:
per_rank_gradient_steps = ratio(policy_step / world_size)
ratio_steps = policy_step - prefill_steps
if update == learning_starts:
ratio_steps += policy_steps_per_update
per_rank_gradient_steps = ratio(ratio_steps / world_size)
if per_rank_gradient_steps > 0:
with timer("Time/train_time", SumMetric, sync_on_compute=cfg.metric.sync_on_compute):
sample = rb.sample_tensors(
Expand Down
13 changes: 8 additions & 5 deletions sheeprl/algos/p2e_dv1/p2e_dv1_finetuning.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,12 +203,12 @@ def main(fabric: Fabric, cfg: Dict[str, Any], exploration_cfg: Dict[str, Any]):
last_log = state["last_log"] if resume_from_checkpoint else 0
last_checkpoint = state["last_checkpoint"] if resume_from_checkpoint else 0
policy_steps_per_update = int(cfg.env.num_envs * world_size)
num_updates = int(cfg.algo.total_steps // policy_steps_per_update) if not cfg.dry_run else 1
num_updates = cfg.algo.total_steps // policy_steps_per_update if not cfg.dry_run else 1
learning_starts = (cfg.algo.learning_starts // policy_steps_per_update) if not cfg.dry_run else 0
prefill_steps = learning_starts + start_step
if resume_from_checkpoint:
cfg.algo.per_rank_batch_size = state["batch_size"] // world_size
if resume_from_checkpoint and not cfg.buffer.checkpoint:
learning_starts += start_step
learning_starts += start_step

# Create Ratio class
ratio = Ratio(cfg.algo.replay_ratio, pretrain_steps=cfg.algo.per_rank_pretrain_steps)
Expand Down Expand Up @@ -247,7 +247,7 @@ def main(fabric: Fabric, cfg: Dict[str, Any], exploration_cfg: Dict[str, Any]):

cumulative_per_rank_gradient_steps = 0
for update in range(start_step, num_updates + 1):
policy_step += cfg.env.num_envs * world_size
policy_step += policy_steps_per_update

with torch.inference_mode():
# Measure environment interaction time: this considers both the model forward
Expand Down Expand Up @@ -323,7 +323,10 @@ def main(fabric: Fabric, cfg: Dict[str, Any], exploration_cfg: Dict[str, Any]):

# Train the agent
if update >= learning_starts:
per_rank_gradient_steps = ratio(policy_step / world_size)
ratio_steps = policy_step - prefill_steps
if update == learning_starts:
ratio_steps += policy_steps_per_update
per_rank_gradient_steps = ratio(ratio_steps / world_size)
if per_rank_gradient_steps > 0:
if player.actor_type != "task":
player.actor_type = "task"
Expand Down
11 changes: 7 additions & 4 deletions sheeprl/algos/p2e_dv2/p2e_dv2_exploration.py
Original file line number Diff line number Diff line change
Expand Up @@ -669,10 +669,10 @@ def main(fabric: Fabric, cfg: Dict[str, Any]):
policy_steps_per_update = int(cfg.env.num_envs * world_size)
num_updates = cfg.algo.total_steps // policy_steps_per_update if not cfg.dry_run else 1
learning_starts = cfg.algo.learning_starts // policy_steps_per_update if not cfg.dry_run else 0
prefill_steps = learning_starts + start_step
if cfg.checkpoint.resume_from:
cfg.algo.per_rank_batch_size = state["batch_size"] // world_size
if not cfg.buffer.checkpoint:
learning_starts += start_step
learning_starts += start_step

# Create Ratio class
ratio = Ratio(cfg.algo.replay_ratio, pretrain_steps=cfg.algo.per_rank_pretrain_steps)
Expand Down Expand Up @@ -713,7 +713,7 @@ def main(fabric: Fabric, cfg: Dict[str, Any]):

cumulative_per_rank_gradient_steps = 0
for update in range(start_step, num_updates + 1):
policy_step += cfg.env.num_envs * world_size
policy_step += policy_steps_per_update

with torch.inference_mode():
# Measure environment interaction time: this considers both the model forward
Expand Down Expand Up @@ -808,7 +808,10 @@ def main(fabric: Fabric, cfg: Dict[str, Any]):

# Train the agent
if update >= learning_starts:
per_rank_gradient_steps = ratio(policy_step / world_size)
ratio_steps = policy_step - prefill_steps
if update == learning_starts:
ratio_steps += policy_steps_per_update
per_rank_gradient_steps = ratio(ratio_steps / world_size)
if per_rank_gradient_steps > 0:
local_data = rb.sample_tensors(
batch_size=cfg.algo.per_rank_batch_size,
Expand Down
Loading
Loading