Skip to content

Commit

Permalink
[SGD] Retry sgd.local_rank() (#18824)
Browse files Browse the repository at this point in the history
* finish

* fix

* wip

* address comment

* update

* fix test

* fix failing test

* address comments

* fix test

* fix
  • Loading branch information
amogkam authored and jjyao committed Sep 23, 2021
1 parent 5e9cb23 commit f71cfca
Show file tree
Hide file tree
Showing 14 changed files with 224 additions and 56 deletions.
7 changes: 7 additions & 0 deletions doc/source/raysgd/v2/raysgd.rst
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,10 @@ system. Let's take following simple examples:
results = trainer.run(train_func_distributed)
trainer.shutdown()
See :ref:`sgd-porting-code` for a more comprehensive example.


.. group-tab:: TensorFlow

This example shows how you can use RaySGD to set up `Multi-worker training
Expand Down Expand Up @@ -250,4 +254,7 @@ system. Let's take following simple examples:
trainer.shutdown()
See :ref:`sgd-porting-code` for a more comprehensive example.


**Next steps:** Check out the :ref:`User Guide <sgd-user-guide>`!
24 changes: 24 additions & 0 deletions doc/source/raysgd/v2/user_guide.rst
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,30 @@ training.
sampler=DistributedSampler(dataset))
**Step 3:** Set the proper CUDA device if you are using GPUs.

If you are using GPUs, you need to make sure to the CUDA devices are properly setup inside your training function.

This involves 3 steps:
1. Use the local rank to set the default CUDA device for the worker.
2. Move the model to the default CUDA device (or a specific CUDA device).
3. Specify ``device_ids`` when wrapping in ``DistributedDataParallel``.

.. code-block:: python
def train_func():
device = torch.device(f"cuda:{sgd.local_rank()}" if
torch.cuda.is_available() else "cpu")
torch.cuda.set_device(device)
# Create model.
model = NeuralNetwork()
model = model.to(device)
model = DistributedDataParallel(
model,
device_ids=[sgd.local_rank()] if torch.cuda.is_available() else None)
.. group-tab:: TensorFlow

.. note::
Expand Down
2 changes: 1 addition & 1 deletion python/ray/util/sgd/v2/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ py_test(

py_test(
name = "test_worker_group",
size = "small",
size = "medium",
srcs = ["tests/test_worker_group.py"],
tags = ["team:ml", "exclusive"],
deps = [":sgd_v2_lib"]
Expand Down
4 changes: 2 additions & 2 deletions python/ray/util/sgd/v2/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@
from ray.util.sgd.v2.callbacks import SGDCallback
from ray.util.sgd.v2.checkpoint import CheckpointStrategy
from ray.util.sgd.v2.session import (load_checkpoint, save_checkpoint, report,
world_rank)
world_rank, local_rank)
from ray.util.sgd.v2.trainer import Trainer, SGDIterator

__all__ = [
"BackendConfig", "CheckpointStrategy", "HorovodConfig", "load_checkpoint",
"report", "save_checkpoint", "SGDCallback", "SGDIterator",
"local_rank", "report", "save_checkpoint", "SGDCallback", "SGDIterator",
"TensorflowConfig", "TorchConfig", "Trainer", "world_rank"
]
48 changes: 39 additions & 9 deletions python/ray/util/sgd/v2/backends/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,7 @@
TUNE_CHECKPOINT_ID
from ray.util.sgd.v2.session import TrainingResultType, TrainingResult
from ray.util.sgd.v2.session import init_session, get_session, shutdown_session
from ray.util.sgd.v2.utils import construct_path, get_node_id, get_gpu_ids, \
check_for_failure
from ray.util.sgd.v2.utils import construct_path, check_for_failure
from ray.util.sgd.v2.worker_group import WorkerGroup

if TUNE_INSTALLED:
Expand Down Expand Up @@ -309,12 +308,8 @@ def _setup_gpus(self):
"""

def get_node_id_and_gpu():
node_id = get_node_id()
gpu_ids = get_gpu_ids()
return node_id, gpu_ids

node_ids_and_gpu_ids = self.worker_group.execute(get_node_id_and_gpu)
node_ids_and_gpu_ids = [(w.metadata.node_id, w.metadata.gpu_ids)
for w in self.worker_group.workers]

node_id_to_worker_id = defaultdict(set)
node_id_to_gpu_ids = defaultdict(set)
Expand All @@ -336,6 +331,37 @@ def set_gpu_ids():
worker_id, set_gpu_ids))
ray.get(futures)

def _create_local_rank_map(self) -> Dict:
"""Create mapping from worker world_rank to local_rank.
Example:
Worker 0: 0.0.0.0
Worker 1: 0.0.0.0
Worker 2: 0.0.0.1
Worker 3: 0.0.0.0
Worker 4: 0.0.0.1
Workers 0, 1, 3 are on 0.0.0.0.
Workers 2, 4 are on 0.0.0.1.
Expected Output:
{
0 -> 0,
1 -> 1,
2 -> 0,
3 -> 2,
4 -> 1
}
"""
rank_mapping = {}
ip_dict = defaultdict(int)
for world_rank in range(len(self.worker_group)):
worker = self.worker_group.workers[world_rank]
node_ip = worker.metadata.node_ip
rank_mapping[world_rank] = ip_dict[node_ip]
ip_dict[node_ip] += 1
return rank_mapping

def start_training(
self,
train_func: Callable[[], T],
Expand Down Expand Up @@ -371,11 +397,12 @@ def start_training(
ENABLE_DETAILED_AUTOFILLED_METRICS_ENV, 0)

# First initialize the session.
def initialize_session(world_rank, train_func, checkpoint):
def initialize_session(world_rank, local_rank, train_func, checkpoint):
try:
init_session(
training_func=train_func,
world_rank=world_rank,
local_rank=local_rank,
checkpoint=checkpoint,
detailed_autofilled_metrics=use_detailed_autofilled_metrics
)
Expand All @@ -388,13 +415,16 @@ def initialize_session(world_rank, train_func, checkpoint):

checkpoint_dict = self.checkpoint_manager._load_checkpoint(checkpoint)

local_rank_map = self._create_local_rank_map()

futures = []
for world_rank in range(len(self.worker_group)):
futures.append(
self.worker_group.execute_single_async(
world_rank,
initialize_session,
world_rank=world_rank,
local_rank=local_rank_map[world_rank],
train_func=train_func,
checkpoint=checkpoint_dict))

Expand Down
14 changes: 8 additions & 6 deletions python/ray/util/sgd/v2/backends/horovod.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

import ray
from ray.util.sgd.v2.backends.backend import BackendConfig, Backend
from ray.util.sgd.v2.utils import get_node_id, get_hostname, update_env_vars
from ray.util.sgd.v2.utils import update_env_vars
from ray.util.sgd.v2.worker_group import WorkerGroup

try:
Expand Down Expand Up @@ -44,9 +44,9 @@ def backend_cls(self):
return HorovodBackend


def init_env_vars(world_rank: int, world_size: int):
def init_env_vars(world_rank: int, world_size: int, node_id: str):
"""Initialize Horovod environment variables."""
os.environ["HOROVOD_HOSTNAME"] = get_node_id()
os.environ["HOROVOD_HOSTNAME"] = node_id
os.environ["HOROVOD_RANK"] = str(world_rank)
os.environ["HOROVOD_SIZE"] = str(world_size)

Expand All @@ -60,18 +60,20 @@ def on_start(self, worker_group: WorkerGroup,
# Initialize workers with Horovod environment variables
setup_futures = []
for rank in range(len(worker_group)):
worker_node_id = worker_group.workers[rank].metadata.node_id
setup_futures.append(
worker_group.execute_single_async(rank, init_env_vars, rank,
len(worker_group)))
len(worker_group),
worker_node_id))
ray.get(setup_futures)

# Use Horovod Ray Coordinator
# backend_config as settings
self.coordinator = Coordinator(backend_config)

# Get all the hostnames of all workers
node_ids = worker_group.execute(get_node_id)
hostnames = worker_group.execute(get_hostname)
node_ids = [w.metadata.node_id for w in worker_group.workers]
hostnames = [w.metadata.hostname for w in worker_group.workers]
# Register each hostname to the coordinator. assumes the hostname
# ordering is the same.
for rank, (hostname, node_id) in enumerate(zip(hostnames, node_ids)):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,9 @@ def train_func(config: Dict):
lr = config["lr"]
epochs = config["epochs"]

device = torch.device(f"cuda:{sgd.local_rank()}"
if torch.cuda.is_available() else "cpu")

# Create data loaders.
train_dataloader = DataLoader(
training_data,
Expand All @@ -97,10 +100,11 @@ def train_func(config: Dict):
sampler=DistributedSampler(test_data))

# Create model.
device = "cuda" if torch.cuda.is_available() else "cpu"
model = NeuralNetwork()
model = model.to(device)
model = DistributedDataParallel(model)
model = DistributedDataParallel(
model,
device_ids=[device.index] if torch.cuda.is_available() else None)

loss_fn = nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(model.parameters(), lr=lr)
Expand Down
25 changes: 25 additions & 0 deletions python/ray/util/sgd/v2/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,14 @@ class Session:
def __init__(self,
training_func: Callable,
world_rank: int,
local_rank: int,
checkpoint: Optional[Dict] = None,
detailed_autofilled_metrics: bool = False):
# The Thread object that is running the training function.
self.training_thread = PropagatingThread(
target=training_func, daemon=True)
self.world_rank = world_rank
self.local_rank = local_rank
self.loaded_checkpoint = checkpoint

# This lock is used to control the execution of the training thread.
Expand Down Expand Up @@ -263,6 +265,29 @@ def train_func():
return session.world_rank


def local_rank() -> int:
"""Get the local rank of this worker (rank of the worker on its node).
.. code-block:: python
import time
from ray.util import sgd
def train_func():
if torch.cuda.is_available():
torch.cuda.set_device(sgd.local_rank())
...
trainer = Trainer(backend="torch", use_gpu=True)
trainer.start()
trainer.run(train_func)
trainer.shutdown()
"""
session = get_session()
return session.local_rank


def load_checkpoint() -> Optional[Dict]:
"""Loads checkpoint data onto the worker.
Expand Down
17 changes: 15 additions & 2 deletions python/ray/util/sgd/v2/tests/test_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,10 @@ def ray_2_node_4_gpu():
def gen_execute_special(special_f):
def execute_async_special(self, f):
"""Runs f on worker 0, special_f on other workers."""
futures = [self.workers[0]._BaseWorkerMixin__execute.remote(f)]
futures = [self.workers[0].actor._BaseWorkerMixin__execute.remote(f)]
for worker in self.workers[1:]:
futures.append(worker._BaseWorkerMixin__execute.remote(special_f))
futures.append(
worker.actor._BaseWorkerMixin__execute.remote(special_f))
return futures

return execute_async_special
Expand Down Expand Up @@ -123,6 +124,18 @@ def test_train(ray_start_2_cpus, tmp_path):
assert e.finish_training() == [1, 1]


def test_local_ranks(ray_start_2_cpus, tmp_path):
config = TestConfig()
e = BackendExecutor(config, num_workers=2)
e.start()

def train():
return sgd.local_rank()

e.start_training(train, run_dir=tmp_path)
assert set(e.finish_training()) == {0, 1}


def test_train_failure(ray_start_2_cpus, tmp_path):
config = TestConfig()
e = BackendExecutor(config, num_workers=2)
Expand Down
Loading

0 comments on commit f71cfca

Please sign in to comment.