Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ray launcher WIP #518

Merged
merged 85 commits into from
Oct 28, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
85 commits
Select commit Hold shift + click to select a range
4211bc8
hydra ray launcher
jieru-hu Jul 15, 2020
e20484b
Add resolver for runtime python version
jieru-hu Jul 15, 2020
05a318f
fix lints
jieru-hu Jul 21, 2020
04995c5
fix circleCI file
jieru-hu Jul 23, 2020
d1f6ddd
separate ray launcher into its own session
jieru-hu Jul 23, 2020
a9a7a81
commit
jieru-hu Jul 23, 2020
2dfe72d
fix nox file
jieru-hu Jul 23, 2020
8627dd8
fix noxfile
jieru-hu Jul 23, 2020
2b490d9
clean up nox file
jieru-hu Jul 27, 2020
f1c862a
fix nox file
jieru-hu Jul 28, 2020
c57a670
fix merge issue
jieru-hu Jul 30, 2020
ba4df3d
Address comments
jieru-hu Jul 31, 2020
063a6c5
Address comments
jieru-hu Aug 5, 2020
6478ca9
fix doc
jieru-hu Aug 5, 2020
fe91ed6
fix the setup commands
jieru-hu Aug 5, 2020
04d6cc2
Address comments
jieru-hu Aug 11, 2020
c161a7c
fix lint
jieru-hu Aug 13, 2020
90482cc
address comments
jieru-hu Aug 31, 2020
5a08397
iam policy stuff
jieru-hu Aug 31, 2020
631176f
commit
jieru-hu Sep 21, 2020
446cb24
commit
jieru-hu Sep 21, 2020
4d730bd
commit
jieru-hu Sep 22, 2020
adcce56
Add ray AMI for integration tests
jieru-hu Sep 26, 2020
bd05d3c
fix merge conflicts
jieru-hu Sep 28, 2020
4246de6
rayapp working
jieru-hu Sep 28, 2020
af64b85
fix all the lints
jieru-hu Sep 28, 2020
1caf26a
fix noxfile changes
jieru-hu Sep 28, 2020
ddd906b
fix merge issue
jieru-hu Sep 28, 2020
62fbbe7
fix comments
jieru-hu Sep 28, 2020
accf610
fix lints & tests
jieru-hu Sep 29, 2020
71b5791
remove download ray-wheel, since it's installed onthe AMI already.
jieru-hu Sep 29, 2020
cfd7644
fix lints
jieru-hu Sep 29, 2020
995df58
skip warning for ray-launcher
jieru-hu Sep 29, 2020
195a785
fix nox format
jieru-hu Sep 29, 2020
8f99afa
run ray test with silent=0 for debugging
jieru-hu Sep 29, 2020
fcbca4e
remove subprocess.check_output
jieru-hu Sep 29, 2020
56cf317
fix mypy errors
jieru-hu Sep 29, 2020
f52b3ce
fix mypy and linux conda
jieru-hu Sep 29, 2020
27880fe
fix linux setup
jieru-hu Sep 29, 2020
771016e
remove conda install
jieru-hu Sep 29, 2020
08bd7a6
source .bashrc
jieru-hu Sep 29, 2020
2a5a5a3
replace conda with /home/circleci/miniconda3/bin/conda
jieru-hu Sep 29, 2020
95c197f
commit
jieru-hu Sep 29, 2020
ee45ee5
commit
jieru-hu Sep 29, 2020
b33fd97
fix conda commands
jieru-hu Sep 29, 2020
d3b7804
commit
jieru-hu Sep 29, 2020
92f48eb
commit
jieru-hu Sep 29, 2020
ac25c72
fix path
jieru-hu Sep 29, 2020
bc124c7
print python version
jieru-hu Sep 29, 2020
55da531
fix python PATH
jieru-hu Sep 29, 2020
008b31b
clean up circleci yaml
jieru-hu Sep 29, 2020
c8686c6
print python version
jieru-hu Sep 29, 2020
9163532
pin circleci docker image & fix create ami script
jieru-hu Sep 29, 2020
4f26dce
fix picle method
jieru-hu Sep 30, 2020
d97ad38
locally working rayapp & nox lint & integ tests
jieru-hu Sep 30, 2020
af38ec7
pin cloudpickle version
jieru-hu Sep 30, 2020
ea0a8f9
use cloudpickle out of the box
jieru-hu Sep 30, 2020
68d9905
rayapp working, lint working, and test working.
jieru-hu Oct 1, 2020
9ae5a92
lint and tests pass
jieru-hu Oct 1, 2020
1df7a27
fix bandit
jieru-hu Oct 1, 2020
2522f3e
use cloudpickle directly, pass rayapp and lin
jieru-hu Oct 1, 2020
25951e0
remove unintended change in pytest.ini & remove py3.6 for now
jieru-hu Oct 1, 2020
4737d9a
rebase and add install rsync
jieru-hu Oct 1, 2020
8ab1b77
remove noxfile change, add pytest.ini for ray plugin
jieru-hu Oct 2, 2020
44ac7f6
set logging level
jieru-hu Oct 2, 2020
6ff5a11
rebase & resolve conflicts
jieru-hu Oct 5, 2020
b686b1d
add logging for temp path
jieru-hu Oct 5, 2020
946a955
update sha & tmp path filter
jieru-hu Oct 5, 2020
2f2c437
fix lints
jieru-hu Oct 5, 2020
9ecf4af
update ray init struct config
jieru-hu Oct 5, 2020
8934ca7
fix merge conflicts in circleCI yaml
jieru-hu Oct 14, 2020
2c4c46b
address comments
jieru-hu Oct 14, 2020
7c88c4d
address comments
jieru-hu Oct 14, 2020
d520323
address commments
jieru-hu Oct 14, 2020
f9f46db
fix lints:
jieru-hu Oct 14, 2020
08a323e
fix scripts
jieru-hu Oct 15, 2020
097966c
fix imports warning
jieru-hu Oct 15, 2020
b439a48
fix pickle issue
jieru-hu Oct 15, 2020
908d167
Create mandatory install for Ray launcher
jieru-hu Oct 19, 2020
7959a30
update default stop_cluster=True
jieru-hu Oct 19, 2020
466b454
address comments
jieru-hu Oct 26, 2020
e56073f
update script comment
jieru-hu Oct 27, 2020
5bf6a64
fix script comment
jieru-hu Oct 27, 2020
5073018
fix lgtm warning
jieru-hu Oct 27, 2020
4d020e8
fix merge conflicts
jieru-hu Oct 28, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ commands:
name: Preparing environment - Other dependency
command: |
sudo apt-get update
sudo apt-get install -y expect fish openjdk-11-jre
sudo apt-get install -y expect fish openjdk-11-jre rsync
- run:
name: Preparing environment - Hydra
command: |
Expand Down Expand Up @@ -273,4 +273,4 @@ workflows:


orbs:
win: circleci/[email protected]
win: circleci/[email protected]
1 change: 0 additions & 1 deletion .isort.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,3 @@ skip=
,hydra/grammar/gen
,tools/configen/example/gen
,tools/configen/tests/test_modules/expected

3 changes: 3 additions & 0 deletions plugins/hydra_ray_launcher/MANIFEST.in
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
global-exclude *.pyc
global-exclude __pycache__
recursive-include hydra_plugins/* *.yaml
4 changes: 4 additions & 0 deletions plugins/hydra_ray_launcher/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
# Hydra Ray Launcher
jieru-hu marked this conversation as resolved.
Show resolved Hide resolved
Provides a [`Ray`](https://docs.ray.io/en/latest/) based Hydra Launcher supporting execution on AWS.

See [website](https://hydra.cc/docs/next/plugins/ray_launcher) for more information
1 change: 1 addition & 0 deletions plugins/hydra_ray_launcher/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
# Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved
6 changes: 6 additions & 0 deletions plugins/hydra_ray_launcher/example/conf/config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
defaults:
- hydra/launcher: ray_local


random_seed: 0
checkpoint_path: checkpoint
24 changes: 24 additions & 0 deletions plugins/hydra_ray_launcher/example/conf/extra_configs/aws.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
# @package _global_

hydra:
launcher:
sync_up:
# source dir is relative in this case, assuming you are running from
# <project_root>/hydra/plugins/hydra_ray_launcher/example
# absolute path is also supported.
source_dir: "."
# we leave target_dir to be null
# as a result the files will be synced to a temp dir on remote cluster.
# the temp dir will be cleaned up after the jobs are done.
# recommend to leave target_dir to be null if you are syncing code/artifacts to remote cluster so you don't need
# configure $PYTHONPATH on remote cluster
include: ["model", "*.py"]
# No need to sync up config files.
exclude: ["*"]
sync_down:
include: ["*.pt", "*/"]
# No need to sync down config files.
exclude: ["*"]
ray_cluster_cfg:
provider:
cache_stopped_nodes: true
19 changes: 19 additions & 0 deletions plugins/hydra_ray_launcher/example/model/my_model.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved
import logging
from datetime import datetime
from pathlib import Path

log = logging.getLogger(__name__)
jieru-hu marked this conversation as resolved.
Show resolved Hide resolved


class MyModel:
def __init__(self, random_seed: int):
self.random_seed = random_seed
log.info("Init my model")

def save(self, checkpoint_path: str) -> None:
checkpoint_dir = Path(checkpoint_path)
checkpoint_dir.mkdir(parents=True, exist_ok=True)
log.info(f"Created dir for checkpoints. dir={checkpoint_dir}")
with open(checkpoint_dir / f"checkpoint_{self.random_seed}.pt", "w") as f:
f.write(f"checkpoint@{datetime.now()}")
20 changes: 20 additions & 0 deletions plugins/hydra_ray_launcher/example/train.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved
import logging

import hydra
from model.my_model import MyModel
from omegaconf import DictConfig

log = logging.getLogger(__name__)


@hydra.main(config_path="conf", config_name="config")
def main(cfg: DictConfig) -> None:
log.info("Start training...")
model = MyModel(cfg.random_seed)
# save checkpoint to current working dir.
model.save(cfg.checkpoint_path)


if __name__ == "__main__":
main()
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved
from hydra.core.config_search_path import ConfigSearchPath
from hydra.plugins.search_path_plugin import SearchPathPlugin


class RayLauncherSearchPathPlugin(SearchPathPlugin):
def manipulate_search_path(self, search_path: ConfigSearchPath) -> None:
# Appends the search path for this plugin to the end of the search path
search_path.append(
"hydra-ray-launcher", "pkg://hydra_plugins.hydra_ray_launcher.conf"
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,199 @@
# Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved
jieru-hu marked this conversation as resolved.
Show resolved Hide resolved
import logging
import os
import tempfile
from pathlib import Path
from typing import Any, Dict, List, Sequence

import cloudpickle # type: ignore
import pickle5 as pickle # type: ignore
from hydra.core.hydra_config import HydraConfig
from hydra.core.singleton import Singleton
from hydra.core.utils import JobReturn, configure_log, filter_overrides, setup_globals
from omegaconf import OmegaConf, open_dict, read_write

from hydra_plugins.hydra_ray_launcher._launcher_util import ( # type: ignore
JOB_RETURN_PICKLE,
JOB_SPEC_PICKLE,
ray_down,
ray_exec,
ray_rsync_down,
ray_rsync_up,
ray_tmp_dir,
ray_up,
rsync,
)
from hydra_plugins.hydra_ray_launcher.ray_aws_launcher import ( # type: ignore
RayAWSLauncher,
)

log = logging.getLogger(__name__)


def _get_abs_code_dir(code_dir: str) -> str:
if code_dir:
if os.path.isabs(code_dir):
return code_dir
else:
return os.path.join(os.getcwd(), code_dir)
else:
return ""


def _pickle_jobs(tmp_dir: str, **jobspec: Dict[Any, Any]) -> None:
path = os.path.join(tmp_dir, JOB_SPEC_PICKLE)
os.makedirs(os.path.dirname(path), exist_ok=True)
with open(path, "wb") as f:
cloudpickle.dump(jobspec, f)
log.info(f"Pickle for jobs: {f.name}")


def launch(
launcher: RayAWSLauncher,
job_overrides: Sequence[Sequence[str]],
initial_job_idx: int,
) -> Sequence[JobReturn]:
setup_globals()
assert launcher.config is not None
assert launcher.config_loader is not None
assert launcher.task_function is not None

setup_commands = launcher.mandatory_install.install_commands
setup_commands.extend(launcher.ray_cluster_cfg.setup_commands)

with read_write(launcher.ray_cluster_cfg):
launcher.ray_cluster_cfg.setup_commands = setup_commands

configure_log(launcher.config.hydra.hydra_logging, launcher.config.hydra.verbose)

log.info(f"Ray Launcher is launching {len(job_overrides)} jobs, ")

with tempfile.TemporaryDirectory() as local_tmp_dir:
sweep_configs = []
for idx, overrides in enumerate(job_overrides):
idx = initial_job_idx + idx
ostr = " ".join(filter_overrides(overrides))
log.info(f"\t#{idx} : {ostr}")
sweep_config = launcher.config_loader.load_sweep_config(
launcher.config, list(overrides)
)
with open_dict(sweep_config):
# job.id will be set on the EC2 instance before running the job.
sweep_config.hydra.job.num = idx

sweep_configs.append(sweep_config)

_pickle_jobs(
tmp_dir=local_tmp_dir,
sweep_configs=sweep_configs, # type: ignore
task_function=launcher.task_function,
singleton_state=Singleton.get_state(),
)

with tempfile.NamedTemporaryFile(suffix=".yaml", delete=False) as f:
with open(f.name, "w") as file:
OmegaConf.save(
config=launcher.ray_cluster_cfg, f=file.name, resolve=True
)
launcher.ray_yaml_path = f.name
log.info(
f"Saving RayClusterConf in a temp yaml file: {launcher.ray_yaml_path}."
)

return launch_jobs(
launcher, local_tmp_dir, Path(HydraConfig.get().sweep.dir)
)


def launch_jobs(
launcher: RayAWSLauncher, local_tmp_dir: str, sweep_dir: Path
) -> Sequence[JobReturn]:
ray_up(launcher.ray_yaml_path)
with tempfile.TemporaryDirectory() as local_tmp_download_dir:

with ray_tmp_dir(
launcher.ray_yaml_path, launcher.docker_enabled
) as remote_tmp_dir:

ray_rsync_up(
launcher.ray_yaml_path, os.path.join(local_tmp_dir, ""), remote_tmp_dir
)

script_path = os.path.join(os.path.dirname(__file__), "_remote_invoke.py")
ray_rsync_up(launcher.ray_yaml_path, script_path, remote_tmp_dir)

if launcher.sync_up.source_dir:
source_dir = _get_abs_code_dir(launcher.sync_up.source_dir)
target_dir = (
launcher.sync_up.target_dir
if launcher.sync_up.target_dir
else remote_tmp_dir
)
rsync(
launcher.ray_yaml_path,
launcher.sync_up.include,
launcher.sync_up.exclude,
os.path.join(source_dir, ""),
target_dir,
)

ray_exec(
launcher.ray_yaml_path,
launcher.docker_enabled,
os.path.join(remote_tmp_dir, "_remote_invoke.py"),
remote_tmp_dir,
)

ray_rsync_down(
launcher.ray_yaml_path,
os.path.join(remote_tmp_dir, JOB_RETURN_PICKLE),
local_tmp_download_dir,
)

sync_down_cfg = launcher.sync_down

if (
sync_down_cfg.target_dir
or sync_down_cfg.source_dir
or sync_down_cfg.include
or sync_down_cfg.exclude
):
source_dir = (
sync_down_cfg.source_dir if sync_down_cfg.source_dir else sweep_dir
)
target_dir = (
sync_down_cfg.source_dir if sync_down_cfg.source_dir else sweep_dir
)
target_dir = Path(_get_abs_code_dir(target_dir))
target_dir.mkdir(parents=True, exist_ok=True)

rsync(
launcher.ray_yaml_path,
launcher.sync_down.include,
launcher.sync_down.exclude,
os.path.join(source_dir),
str(target_dir),
up=False,
)
log.info(
f"Syncing outputs from remote dir: {source_dir} to local dir: {target_dir.absolute()} "
)

if launcher.stop_cluster:
log.info("Stopping cluster now. (stop_cluster=true)")
if launcher.ray_cluster_cfg.provider.cache_stopped_nodes:
log.info("NOT deleting the cluster (provider.cache_stopped_nodes=true)")
else:
log.info("Deleted the cluster (provider.cache_stopped_nodes=false)")
ray_down(launcher.ray_yaml_path)
else:
log.warning(
"NOT stopping cluster, this may incur extra cost for you. (stop_cluster=false)"
)

with open(os.path.join(local_tmp_download_dir, JOB_RETURN_PICKLE), "rb") as f:
job_returns = pickle.load(f) # nosec
assert isinstance(job_returns, List)
for run in job_returns:
assert isinstance(run, JobReturn)
return job_returns
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
# Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved
import logging
from pathlib import Path
from typing import Sequence

import ray
from hydra.core.singleton import Singleton
from hydra.core.utils import JobReturn, configure_log, filter_overrides, setup_globals
from omegaconf import open_dict

from hydra_plugins.hydra_ray_launcher._launcher_util import ( # type: ignore
launch_job_on_ray,
start_ray,
)
from hydra_plugins.hydra_ray_launcher.ray_local_launcher import ( # type: ignore
RayLocalLauncher,
)

log = logging.getLogger(__name__)


def launch(
launcher: RayLocalLauncher,
job_overrides: Sequence[Sequence[str]],
initial_job_idx: int,
) -> Sequence[JobReturn]:
setup_globals()
assert launcher.config is not None
assert launcher.config_loader is not None
assert launcher.task_function is not None

configure_log(launcher.config.hydra.hydra_logging, launcher.config.hydra.verbose)
sweep_dir = Path(str(launcher.config.hydra.sweep.dir))
sweep_dir.mkdir(parents=True, exist_ok=True)
log.info(
f"Ray Launcher is launching {len(job_overrides)} jobs, "
f"sweep output dir: {sweep_dir}"
)

start_ray(launcher.ray_init_cfg)

runs = []
for idx, overrides in enumerate(job_overrides):
idx = initial_job_idx + idx
ostr = " ".join(filter_overrides(overrides))
log.info(f"\t#{idx} : {ostr}")
sweep_config = launcher.config_loader.load_sweep_config(
launcher.config, list(overrides)
)
with open_dict(sweep_config):
# This typically coming from the underlying scheduler (SLURM_JOB_ID for instance)
# In that case, it will not be available here because we are still in the main process.
# but instead should be populated remotely before calling the task_function.
sweep_config.hydra.job.id = f"job_id_for_{idx}"
sweep_config.hydra.job.num = idx
ray_obj = launch_job_on_ray(
launcher.ray_remote_cfg,
sweep_config,
launcher.task_function,
Singleton.get_state(),
)
runs.append(ray_obj)

return [ray.get(run) for run in runs]
Loading