Skip to content

Commit

Permalink
fix horovod
Browse files Browse the repository at this point in the history
  • Loading branch information
four4fish committed Feb 1, 2022
1 parent 6b0a84d commit d964949
Showing 1 changed file with 22 additions and 23 deletions.
45 changes: 22 additions & 23 deletions pytorch_lightning/trainer/connectors/accelerator_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,8 +171,6 @@ def __init__(
# Reset strategy even user has specificed one
self._strategy_check_and_fallbacks()
self._init_strategy()
if _HOROVOD_AVAILABLE and isinstance(self.strategy, HorovodStrategy):
self.handle_horovod

# --Precision----------------------------------------------------------------
self.precision_plugin = self._check_capatibility_and_init_precision()
Expand Down Expand Up @@ -411,7 +409,7 @@ def _choose_accelerator(self):
# self._existing_accelerator_type, [_TPU_AVAILABLE, _IPU_AVAILABLE, torch.cuda.is_available(), True]
# ):
# # only apply to gpu to keep backward compatibility
# if self._accelerator_flag == accelerator_flag == "gpu":
# if self._accelerator_flag == accelerator_flag:
# if not available:
# raise MisconfigurationException(
# f"You choice {accelerator_flag} accelerator, but {accelerator_flag} is not available"
Expand Down Expand Up @@ -491,9 +489,10 @@ def _choose_strategy(self):
if self._parallel_devices and len(self._parallel_devices) > 1:
self._strategy_flag = "tpu_spawn"
else:
self._srategy_flag = SingleTPUStrategy(device=self._parallel_devices[0])
# TODO lazy initialized device, then here could be self._strategy_flag = "single_tpu_device"
self._strategy_flag = SingleTPUStrategy(device=self._parallel_devices[0])
elif _HOROVOD_AVAILABLE and ("OMPI_COMM_WORLD_RANK" in os.environ or "HOROVOD_RANK" in os.environ):
self._strategy_flag = HorovodStrategy()
self._strategy_flag = "horovod"
else:
if self._num_nodes_flag > 1:
self._strategy_flag = "ddp"
Expand All @@ -503,14 +502,15 @@ def _choose_strategy(self):
if self._accelerator_flag == "gpu"
else "cpu"
)
# TODO lazy initialized device, then here could be self._strategy_flag = "single_device"
self._strategy_flag = SingleDeviceStrategy(device=device)
elif len(self._parallel_devices) > 1:
self._strategy_flag = "ddp_spawn"
else:
self._strategy_flag = "ddp"

def _strategy_check_and_fallbacks(self):
# fallback apply to user pass in object as well, so get the _strategy_flag first
# current logic, fallback only apply to user pass in str config not object config
_strategy_flag = "" if isinstance(self._strategy_flag, Strategy) else self._strategy_flag

if _strategy_flag == "ddp_cpu":
Expand All @@ -534,33 +534,18 @@ def _strategy_check_and_fallbacks(self):
if _strategy_flag in ("dp", "ddp2") and self._accelerator_flag == "cpu":
rank_zero_warn(f"{_strategy_flag!r} is not supported on CPUs, hence setting `strategy='ddp'`.")
_strategy_flag = "ddp"
# Current test check precision first. So move this test to the end for now.
# TODO update tests and uncomment this part
# if isinstance(self.accelerator, TPUAccelerator) and "tpu" not in _strategy_flag:
# raise ValueError(
# "The `TPUAccelerator` can only be used with a `SingleTPUStrategy` or `TPUSpawnStrategy`,"
# f" found {_strategy_flag}."
# )

if _strategy_flag:
self._strategy_flag = _strategy_flag

def _init_strategy(self):
if isinstance(self._strategy_flag, str):
self.strategy = StrategyRegistry.get(self._strategy_flag)
elif isinstance(self._strategy_flag, Strategy):
self.strategy = self._strategy_flag
else:
raise RuntimeError(f"{self.strategy} is not valid type: {self.strategy}")

def handle_horovod(self):
if self._num_nodes_flag > 1:
raise MisconfigurationException(
"Horovod does not support setting num_nodes / num_gpus explicitly. Use "
"horovodrun / mpirun to configure the number of processes."
)

if isinstance(self.strategy, HorovodStrategy) and not _HOROVOD_AVAILABLE:
if not _HOROVOD_AVAILABLE:
raise MisconfigurationException(
'Requested `accelerator="horovod"`, but Horovod is not installed.'
"Install with \n $HOROVOD_WITH_PYTORCH=1 pip install horovod[pytorch]"
Expand All @@ -571,7 +556,19 @@ def handle_horovod(self):
# Horovod assigns one local GPU per process
self._parallel_device = list(range(hvd.local_size()))
else:
self._parallel_device = hvd.local_size()
self._parallel_device = [torch.device("cpu")] * hvd.local_size()

def _init_strategy(self):
if isinstance(self._strategy_flag, HorovodStrategy) or self._strategy_flag == "horovod":
# handle horovod has to happen before initialize strategy because HorovodStrategy needs hvd.init() first.
# TODO lazy initialized and setup horovod strategy `global_rank`
self.handle_horovod()
if isinstance(self._strategy_flag, str):
self.strategy = StrategyRegistry.get(self._strategy_flag)
elif isinstance(self._strategy_flag, Strategy):
self.strategy = self._strategy_flag
else:
raise RuntimeError(f"{self.strategy} is not valid type: {self.strategy}")

def _check_capatibility_and_init_precision(self):
self._precision_misconfig_check()
Expand Down Expand Up @@ -699,6 +696,8 @@ def _lazy_init_strategy(self):
" creation inside the worker function."
)

# TODO should be moved to _strategy_check_and_fallbacks().
# Current test check precision first, so keep this check here to meet error order
if isinstance(self.accelerator, TPUAccelerator) and not isinstance(
self.strategy, (SingleTPUStrategy, TPUSpawnStrategy)
):
Expand Down

0 comments on commit d964949

Please sign in to comment.