Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Tune] FileNotFoundError on params.json when restoring Tune experiment from remote storage #40484

Closed
aloysius-lim opened this issue Oct 19, 2023 · 9 comments · Fixed by #40647
Assignees
Labels
bug Something that is supposed to be working; but isn't P1 Issue that should be fixed within a few weeks tune Tune-related issues

Comments

@aloysius-lim
Copy link

What happened + What you expected to happen

Given a Ray Tune experiment has been run with an fsspec-backed remote storage (specifically using adlfs for Azure Blob Storage)
When the tuner is restored using tune.Tuner.restore()
And tuner.fit() is run
Then a FileNotFoundError is thrown, stating that params.json cannot be found in the root folder of the experiment.

Here is the error log, with private details (paths) redacted:

2023-10-19 12:57:27,567	INFO experiment_state.py:479 -- Trying to find and download experiment checkpoint at cuerank/ray_tune_restore_issue/train_fn_2023-10-19_12-55-19
2023-10-19 12:57:34,447	ERROR experiment_state.py:495 -- Got error when trying to sync down.
Please check this error message for potential access problems - if a directory was not found, that is expected at this stage when you're starting a new experiment.
Traceback (most recent call last):
  File "<PYTHON_ENV>/lib/python3.10/site-packages/ray/tune/execution/experiment_state.py", line 485, in _resume_auto
    self.sync_down_experiment_state()
  File "<PYTHON_ENV>/lib/python3.10/site-packages/ray/tune/execution/experiment_state.py", line 402, in sync_down_experiment_state
    pyarrow.fs.copy_files(
  File "<PYTHON_ENV>/lib/python3.10/site-packages/pyarrow/fs.py", line 273, in copy_files
    _copy_files(source_fs, source_path,
  File "pyarrow/_fs.pyx", line 1603, in pyarrow._fs._copy_files
  File "pyarrow/_fs.pyx", line 1543, in pyarrow._fs._cb_open_input_stream
  File "<PYTHON_ENV>/lib/python3.10/site-packages/pyarrow/fs.py", line 416, in open_input_stream
    raise FileNotFoundError(path)
FileNotFoundError: <CHECKPOINT_FOLDER>/MyModel/params.json
2023-10-19 12:57:34,450	INFO experiment_state.py:502 -- No remote checkpoint was found or an error occurred when trying to download the experiment checkpoint. Please check the previous warning message for more details. Ray Tune will now start a new experiment.
2023-10-19 12:57:36,444	ERROR tune_controller.py:1502 -- Trial task failed for trial train_fn_0032e_00000

A look in the remote storage folder confirms that params.json is not present at the root of the checkpoint folder. However, they are present in the subfolders of the individual trials:

# Contents of <CHECKPOINT_FOLDER>/MyModel/
.validate_storage_marker
basic-variant-state-2023-10-19_12-55-24.json
experiment-state-2023-10-19_12-55-24.json
tuner.pkl
train_fn_b6a64_00023_23_dim=8,lr=0.0003_2023-10-19_12-55-28/events.out.tfevents.1697691340.local
train_fn_b6a64_00023_23_dim=8,lr=0.0003_2023-10-19_12-55-28/params.json
train_fn_b6a64_00023_23_dim=8,lr=0.0003_2023-10-19_12-55-28/params.pkl
train_fn_b6a64_00023_23_dim=8,lr=0.0003_2023-10-19_12-55-28/progress.csv
train_fn_b6a64_00023_23_dim=8,lr=0.0003_2023-10-19_12-55-28/result.json
# And other similar subfolders for individual trials, each containing params.json

This error does not happen when using a local storage path for the checkpoints.

Versions / Dependencies

OS: Macos 14.0
python: 3.10.12
adlfs: 2023.9.0
fsspec: 2023.9.2
pyarrow: 13.0.0
ray: 2.7.0
torch: 2.0.1

Reproduction script

This example uses adlfs to access Azure Blob Storage. I do not have access to other remote storage services to test this elsewhere.

import fsspec
import pyarrow.fs
import ray
from ray import train, tune
from ray.train import RunConfig
import torch
from torch import nn

ray.init()

class MyModel(nn.Module):
    def __init__(self, dim: int = 8):
        super().__init__()
        self.dim = dim
        self.linear = nn.Sequential(
            nn.Linear(1, dim),
            nn.ReLU(),
            nn.Linear(dim, 1),
        )

    def forward(self, x):
        return self.linear(x)

def train_fn(config):
    model = MyModel(dim=config["dim"])
    optimizer = torch.optim.Adam(model.parameters(), lr=config["lr"])
    criterion = nn.MSELoss()
    for _ in range(1000):
        optimizer.zero_grad()
        x = torch.rand(1, 1)
        y = model(x)
        loss = criterion(y, x)
        loss.backward()
        optimizer.step()
        train.report({"loss": loss.item()})

checkpoint_dir = "abfs://<CHECKPOINT_DIR>"
checkpoint_fs_kwargs = {
    "account_name": "<REDACTED>",
    "tenant_id": "<REDACTED>",
    "client_id": "<REDACTED>",
    "client_secret": "<REDACTED>",
}

# Create trainer.
checkpoint_fs, checkpoint_path = fsspec.core.url_to_fs(
    checkpoint_dir, **checkpoint_fs_kwargs
)
checkpoint_fs_pa = pyarrow.fs.PyFileSystem(pyarrow.fs.FSSpecHandler(checkpoint_fs))
run_config = RunConfig(
    name=MyModel.__name__,
    storage_path=checkpoint_path,
    storage_filesystem=checkpoint_fs_pa,
)

# Create tuner.
tuner = tune.Tuner(
    train_fn,
    param_space={
        "dim": tune.choice([1, 2, 4, 8, 16, 32, 64, 128]),
        "lr": tune.loguniform(1e-4, 1e-1),
    },
    tune_config=tune.TuneConfig(num_samples=100),
    run_config=run_config,
)

# Run tuner.
tuner.fit()

# MANUAL STEP: Interrupt the fit after a few successful trials. At this point, some checkpoints have been saved in the remote storage directory.

# Restore tuner.
if tune.Tuner.can_restore(
    "/".join([checkpoint_path, MyModel.__name__]), checkpoint_fs_pa
):
    tuner = tune.Tuner.restore(
        "/".join([checkpoint_path, MyModel.__name__]),
        storage_filesystem=checkpoint_fs_pa,
        trainable=train_fn,
        resume_errored=True,
    )

# Run tuner.
tuner.fit()
# Error happens here.

Issue Severity

High: It blocks me from completing my task.

@aloysius-lim aloysius-lim added bug Something that is supposed to be working; but isn't triage Needs triage (eg: priority, bug/not-bug, and owning component) labels Oct 19, 2023
@matthewdeng matthewdeng added the tune Tune-related issues label Oct 19, 2023
@fardinabbasi
Copy link

I have the same issue. I trained my model on my PC and cloned the results dir. into Collab. now when I try to restored the trained agent, it seems like it cant read trials correctly!
Here is the warning log:

2023-10-20 13:39:07,135	WARNING experiment_analysis.py:205 -- Failed to fetch metrics for 1 trial(s):
- TD3_RankingEnv_ce7f8b78: FileNotFoundError('Could not fetch metrics for TD3_RankingEnv_ce7f8b78: both result.json and progress.csv were not found at /mainfs/scratch/sb5e19/RL_LTR/TD3_TRAIN/TD3_TRAIN/TD3_RankingEnv_ce7f8b78_1_AlgorithmConfig__prior_exploration_config=None,disable_action_flattening=False,disable_execution_plan_ap_2023-10-19_18-22-51')

<ray.tune.tuner.Tuner object at 0x7f22d0512200>

Reproduction script

class DRLlibv2:
    def __init__(
        self,
        trainable: str | Any,
        params: dict,
        train_env=None,
        run_name: str = "tune_run",
        local_dir: str = "tune_results",
        search_alg=None,
        concurrent_trials: int = 0,
        num_samples: int = 0,
        scheduler_=None,
        # num_cpus: float | int = 2,
        dataframe_save: str = "tune.csv",
        metric: str = "episode_reward_mean",
        mode: str | list[str] = "max",
        max_failures: int = 0,
        training_iterations: int = 100,
        checkpoint_num_to_keep: None | int = None,
        checkpoint_freq: int = 0,
        reuse_actors: bool = True
    ):
        self.params = params

        # if train_env is not None:
        #     register_env(self.params['env'], lambda env_config: train_env(env_config))


        self.train_env = train_env
        self.run_name = run_name
        self.local_dir = local_dir
        self.search_alg = search_alg
        if concurrent_trials != 0:
            self.search_alg = ConcurrencyLimiter(
                self.search_alg, max_concurrent=concurrent_trials
            )
        self.scheduler_ = scheduler_
        self.num_samples = num_samples
        self.trainable = trainable
        if isinstance(self.trainable, str):
            self.trainable = self.trainable.upper()
        # self.num_cpus = num_cpus
        self.dataframe_save = dataframe_save
        self.metric = metric
        self.mode = mode
        self.max_failures = max_failures
        self.training_iterations = training_iterations
        self.checkpoint_freq = checkpoint_freq
        self.checkpoint_num_to_keep = checkpoint_num_to_keep
        self.reuse_actors = reuse_actors

    def train_tune_model(self):

        # if ray.is_initialized():
        #   ray.shutdown()

        # ray.init(num_cpus=self.num_cpus, num_gpus=self.params['num_gpus'], ignore_reinit_error=True)

        if self.train_env is not None:
            register_env(self.params['env'], lambda env_config: self.train_env)


        tuner = tune.Tuner(
            self.trainable,
            param_space=self.params,
            tune_config=TuneConfig(
                search_alg=self.search_alg,
                scheduler=self.scheduler_,
                num_samples=self.num_samples,
                # metric=self.metric,
                # mode=self.mode,
                **({'metric': self.metric, 'mode': self.mode} if self.scheduler_ is None else {}),
                reuse_actors=self.reuse_actors,

            ),
            run_config=RunConfig(
                name=self.run_name,
                storage_path=self.local_dir,
                failure_config=FailureConfig(
                    max_failures=self.max_failures, fail_fast=False
                ),
                stop={"training_iteration": self.training_iterations},
                checkpoint_config=CheckpointConfig(
                    num_to_keep=self.checkpoint_num_to_keep,
                    checkpoint_score_attribute=self.metric,
                    checkpoint_score_order=self.mode,
                    checkpoint_frequency=self.checkpoint_freq,
                    checkpoint_at_end=True,
                ),
                verbose=3,#Verbosity mode. 0 = silent, 1 = default, 2 = verbose, 3 = detailed
            ),
        )

        self.results = tuner.fit()
        if self.search_alg is not None:
            self.search_alg.save_to_dir(self.local_dir)
        # ray.shutdown()
        return self.results

    def infer_results(self, to_dataframe: str = None, mode: str = "a"):

        results_df = self.results.get_dataframe()

        if to_dataframe is None:
            to_dataframe = self.dataframe_save

        results_df.to_csv(to_dataframe, mode=mode)

        best_result = self.results.get_best_result()
        # best_result = self.results.get_best_result()
        # best_metric = best_result.metrics
        # best_checkpoint = best_result.checkpoint
        # best_trial_dir = best_result.log_dir
        # results_df = self.results.get_dataframe()

        return results_df, best_result

    def restore_agent(
        self,
        checkpoint_path: str = "",
        restore_search: bool = False,
        resume_unfinished: bool = True,
        resume_errored: bool = False,
        restart_errored: bool = False,
    ):

        # if restore_search:
        # self.search_alg = self.search_alg.restore_from_dir(self.local_dir)
        if checkpoint_path == "":
            checkpoint_path = self.results.get_best_result().checkpoint._local_path

        restored_agent = tune.Tuner.restore(
            checkpoint_path, trainable = self.trainable,
            param_space=self.params,
            restart_errored=restart_errored,
            resume_unfinished=resume_unfinished,
            resume_errored=resume_errored,
        )
        print(restored_agent)
        self.results = restored_agent.get_results()

        if self.search_alg is not None:
            self.search_alg.save_to_dir(self.local_dir)
        return self.results

    def get_test_agent(self, test_env_name: str=None, test_env=None, checkpoint=None):

        # if test_env is not None:
        #     register_env(test_env_name, lambda config: [test_env])

        if checkpoint is None:
            checkpoint = self.results.get_best_result().checkpoint

        testing_agent = Algorithm.from_checkpoint(checkpoint)
        # testing_agent.config['env'] = test_env_name

        return testing_agent
drl_agent = DRLlibv2(
    trainable="TD3",
    # train_env = RankingEnv,
    # num_cpus = num_cpus,
    run_name = "TD3_TRAIN",
    local_dir = local_dir,
    params = train_config.to_dict(),
    num_samples = 1,#Number of samples of hyperparameters config to run
    # training_iterations=5,
    checkpoint_freq=5,
    # scheduler_=scheduler_,
    search_alg=search_alg,
    metric = "episode_reward_mean",
    mode = "max"
    # callbacks=[wandb_callback]
)
results = drl_agent.restore_agent((local_dir/"TD3_TRAIN").as_posix())

@justinvyu justinvyu self-assigned this Oct 23, 2023
@justinvyu justinvyu added P1 Issue that should be fixed within a few weeks and removed triage Needs triage (eg: priority, bug/not-bug, and owning component) labels Oct 23, 2023
@justinvyu
Copy link
Contributor

justinvyu commented Oct 26, 2023

@aloysius-lim Thanks for filing this issue. This has highlighted some problematic sync-down logic that happens on restoration, but it actually only turned up now because of the custom adlfs fs that you are providing.

pyarrow's wrapper of fsspec filesystems uses fs.find to perform a list operation at a directory.

The fsspec find interface accepts maxdepth, but the adlfs implementation doesn't actually use this parameter -- so it always recursively lists all files in the directory, which results in this error.

See here:

https://github.com/fsspec/adlfs/blob/f15c37a43afd87a04f01b61cd90294dd57181e1d/adlfs/spec.py#L1128

Compare this to the s3fs implementation, which we test in our CI:

https://github.com/fsspec/s3fs/blob/2c074502c2d6a9be0d3f05eb678f4cc5add2e7e5/s3fs/core.py#L787

I can put up a fix on our end to generally make the sync-down logic more robust, but this is actually something that adlfs should correct in the implementation -- perhaps you could open up a PR for them?

Edit: I've posted an issue on their repo with a recommended fix -- maybe you can continue from that? fsspec/adlfs#435

@justinvyu
Copy link
Contributor

@fardinabbasi I believe your issue is a different one that will be solved by #40647

@justinvyu
Copy link
Contributor

@aloysius-lim On ray nightly, this logic has been updated so that it should no longer error for you. Let me know if it works out for you.

@fardinabbasi Your issue should also be solved by the same PR.

@justinvyu
Copy link
Contributor

justinvyu commented Nov 21, 2023

Hey @aloysius-lim, were you able to get a adlfs custom filesystem to work? Were you encountering any pickling/serialization issues like this issue: #41125

@aloysius-lim
Copy link
Author

aloysius-lim commented Jan 4, 2024

Hey @aloysius-lim, were you able to get a adlfs custom filesystem to work? Were you encountering any pickling/serialization issues like this issue: #41125

I'm sorry for the long radio silence. My issue is now resolved, thank you! I did not encounter any pickling / serialization issues.

@grizzlybearg
Copy link

Hey @aloysius-lim, were you able to get a adlfs custom filesystem to work? Were you encountering any pickling/serialization issues like this issue: #41125

I'm sorry for the long radio silence. My issue is now resolved, thank you! I did not encounter any pickling / serialization issues.

Hey @aloysius-lim. I'm still experiencing a pickling/serialization issue with the following dependencies:

OS: Ubuntu 22.04
python: 3.11
adlfs: 2023.12.0
fsspec: 2023.12.0
pyarrow: 14.0.2
ray: 2.9.0
torch: 2.0.1

Are your dependencies the same as mentioned above? #40484 (comment)

@aloysius-lim
Copy link
Author

Are your dependencies the same as mentioned above? #40484 (comment)

@grizzlybearg they were the same, except I updated Ray to the latest version.

@grizzlybearg
Copy link

@aloysius-lim thanks for the update. I'll try bump down some requirements and test

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something that is supposed to be working; but isn't P1 Issue that should be fixed within a few weeks tune Tune-related issues
Projects
None yet
Development

Successfully merging a pull request may close this issue.

5 participants