Skip to content
This repository was archived by the owner on Mar 21, 2024. It is now read-only.

Adding Distributed Data Parallel #261

Closed
wants to merge 52 commits into from
Closed
Changes from 1 commit
Commits
Show all changes
52 commits
Select commit Hold shift + click to select a range
cb0ee83
added distributeddataparallel
mebristo Sep 30, 2020
90488d6
fix bug introduced in code cleanup
mebristo Sep 30, 2020
c1f5191
merge changes from master
mebristo Sep 30, 2020
1e24971
Get DDP working with model parallel on single machine
mebristo Oct 4, 2020
411d4b9
run validation and testing on single device
mebristo Oct 5, 2020
2e30ba4
add pytorch to azure_runner yaml
mebristo Oct 12, 2020
1755e69
update azure runner
mebristo Oct 12, 2020
cb47c4d
Merge recent changes from master
mebristo Oct 13, 2020
464f701
fix bugs to run on AML
mebristo Oct 13, 2020
e6f7744
remove config
mebristo Oct 13, 2020
983afce
Merge branch 'master' into mebristo/ddp
javier-alvarez Oct 16, 2020
94c4fde
switch global rank for local
mebristo Oct 16, 2020
50403dc
Merge branch 'mebristo/ddp' of https://github.com/microsoft/InnerEye-…
mebristo Oct 16, 2020
55b1297
undo changes to rank
mebristo Oct 16, 2020
fdc67cd
debug error with rank
mebristo Oct 19, 2020
0d1b8a5
checkpoint only saves for 1 rank and distributed timing is different
mebristo Oct 19, 2020
c2cebf6
fix sync bug
mebristo Oct 19, 2020
9ac02b0
fix bug in output_size
mebristo Oct 20, 2020
32c8597
Refactor
mebristo Oct 20, 2020
4f8efbd
bug fix
mebristo Oct 20, 2020
6417c42
debugging mem loss in inference on val set
mebristo Oct 21, 2020
82ca448
debug mem error in inference for val set
mebristo Oct 21, 2020
b326ee3
debug cuda memory error in inference
mebristo Oct 21, 2020
f7c58a3
temporarily make val set smaller for debugging
mebristo Oct 21, 2020
00134a3
debug memory error in inference on val set
mebristo Oct 21, 2020
a42d9a2
debug cuda mem error in inference
mebristo Oct 21, 2020
c0c86a7
debug slow inference
mebristo Oct 22, 2020
dd5d904
save epoch only one device
mebristo Oct 22, 2020
2dad14d
compare time doing inference on gpu 0
mebristo Oct 22, 2020
ceed0ef
tidy up
mebristo Oct 22, 2020
32b4d0e
tidy up
mebristo Oct 22, 2020
ba218e6
tidy up and fix tests
mebristo Oct 22, 2020
9c69809
restore model config after debugging finished
mebristo Oct 22, 2020
78f51c1
tidy up
mebristo Oct 23, 2020
a3027b2
merge recent changes from master
mebristo Oct 23, 2020
f4c4e65
tidy up
mebristo Oct 23, 2020
2f6904f
work on 1 device
mebristo Oct 27, 2020
a5980e4
Address PR comments
mebristo Oct 28, 2020
1bc6653
address PR comments
mebristo Oct 28, 2020
179f1bf
bug fix in inference
mebristo Oct 29, 2020
972c794
debug inference mem error: try clearing cache
mebristo Oct 29, 2020
7aef7d2
Destroy process group after trainingcomplete
mebristo Oct 29, 2020
726ad5f
address PR comments
mebristo Oct 29, 2020
7c4bcd8
attempt to fix bug in import
mebristo Oct 29, 2020
42d6cba
fix problem with importing torch
mebristo Oct 29, 2020
393547a
override global and local size with command line args
mebristo Oct 30, 2020
7acdc98
address PR comments
mebristo Oct 30, 2020
d7bf3ba
fix tests
mebristo Nov 12, 2020
1130c9b
merge recent changes from master
mebristo Nov 12, 2020
f7508de
fix test
mebristo Nov 12, 2020
ea195fd
Merge remote-tracking branch 'origin/master' into mebristo/ddp
ant0nsc Nov 16, 2020
0435b6c
Merge branch 'master' into mebristo/ddp
javier-alvarez Nov 17, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
debug slow inference
mebristo committed Oct 22, 2020
commit c0c86a7750d722621b8ca9ba7ba840d2ed380596
11 changes: 6 additions & 5 deletions InnerEye/ML/model_training.py
Original file line number Diff line number Diff line change
@@ -25,7 +25,8 @@
from InnerEye.ML.scalar_config import ScalarModelBase
from InnerEye.ML.sequence_config import SequenceModelBase
from InnerEye.ML.utils import ml_util, model_util
from InnerEye.ML.utils.aml_distributed_utils import get_global_rank, get_global_size, get_local_size, get_local_rank
from InnerEye.ML.utils.aml_distributed_utils import get_global_rank, get_global_size, get_local_size, get_local_rank, \
get_max_rank

from InnerEye.ML.utils.config_util import ModelConfigLoader
from InnerEye.ML.utils.lr_scheduler import SchedulerWithWarmUp
@@ -238,15 +239,15 @@ def train(rank: Optional[int], config: ModelConfigBase, run_recovery: Optional[R
train_val_params.save_metrics = not (save_epoch and config.temperature_scaling_config)

training_steps = create_model_training_steps(config, train_val_params)

val_epoch_results = train_or_validate_epoch(training_steps, local_rank, device)
val_results_per_epoch.append(val_epoch_results.metrics)

if config.is_segmentation_model:
metrics.store_epoch_stats_for_segmentation(config.outputs_folder, epoch, epoch_lrs,
train_epoch_results.metrics,
val_epoch_results.metrics)

if save_epoch and global_rank == 0:
if save_epoch:
# perform temperature scaling if required
if isinstance(config, SequenceModelBase) and config.temperature_scaling_config:
optimal_temperature, scaled_val_results = \
@@ -315,7 +316,7 @@ def temperature_scaling_steps(config: SequenceModelBase,
return temperature_value, val_epoch_results


def train_or_validate_epoch(training_steps: ModelTrainingStepsBase, rank, device) -> ModelOutputsAndMetricsForEpoch:
def train_or_validate_epoch(training_steps: ModelTrainingStepsBase, rank: int, device: torch.device) -> ModelOutputsAndMetricsForEpoch:
"""
Trains or validates the model for one epoch.
:param training_steps: Training pipeline to use.
@@ -324,7 +325,7 @@ def train_or_validate_epoch(training_steps: ModelTrainingStepsBase, rank, device
training_random_state = None
train_val_params = training_steps.train_val_params
config = training_steps.model_config
cuda_available = torch.cuda.is_available() & rank == 0
cuda_available = torch.cuda.is_available() & rank == get_max_rank()

if cuda_available:
item_start_time = torch.cuda.Event(enable_timing=True)
31 changes: 14 additions & 17 deletions InnerEye/ML/model_training_steps.py
Original file line number Diff line number Diff line change
@@ -41,6 +41,7 @@
from InnerEye.ML.scalar_config import ScalarLoss, ScalarModelBase
from InnerEye.ML.sequence_config import SequenceModelBase
from InnerEye.ML.utils import dataset_util, metrics_util
from InnerEye.ML.utils.aml_distributed_utils import get_max_rank
from InnerEye.ML.utils.dataset_util import DatasetExample
from InnerEye.ML.utils.image_util import NumpyOrTorch
from InnerEye.ML.utils.metrics_util import SummaryWriters
@@ -231,9 +232,9 @@ def get_scalar_model_inputs_and_labels(model_config: ScalarModelBase,
if isinstance(model, DataParallelModel):
model = model.get_module()

for key, value in sample.items():
if isinstance(value, torch.Tensor):
sample[key] = value.to(device)
# for key, value in sample.items():
# if isinstance(value, torch.Tensor):
# sample[key] = value.to(device)

if isinstance(model_config, SequenceModelBase):
sequence_model: DeviceAwareModule[List[ClassificationItemSequence], torch.Tensor] = model # type: ignore
@@ -339,8 +340,8 @@ def get_logits_and_posteriors(self, *model_inputs: torch.Tensor, use_mean_teache
posteriors = self.model_config.get_post_loss_logits_normalization_function()(gather_tensor(logits))
return logits, posteriors

def _compute_model_output_and_loss(self, model_inputs_and_labels: ScalarModelInputsAndLabels, rank, device: torch.device
) -> \
def _compute_model_output_and_loss(self, model_inputs_and_labels: ScalarModelInputsAndLabels, rank: int,
device: torch.device) -> \
Tuple[Tensor, Tensor, Tensor]:
"""
Computes the output of the model for a given set of inputs and labels.
@@ -357,24 +358,19 @@ def compute() -> Tuple[Tensor, Tensor, Tensor]:
model.train()
logits, posteriors = self.get_logits_and_posteriors(*model_inputs_and_labels.model_inputs)
else:

# if rank == 0 or self.model_config.use_ddp==False:
model.eval()
# # move model to CUDA:0 if available, else cpu
# device2 = torch.device('cuda', rank) if torch.cuda.is_available() else torch.device('cpu')
# model.to(device2)

with torch.no_grad():
logits, posteriors = self.get_logits_and_posteriors(*model_inputs_and_labels.model_inputs)
model.train()
if rank == get_max_rank():
model.eval()
with torch.no_grad():
logits, posteriors = self.get_logits_and_posteriors(*model_inputs_and_labels.model_inputs)
model.train()
loss = self.compute_loss(logits, label_gpu, device)
return logits, posteriors, loss

return execute_within_autocast_if_needed(func=compute, use_autocast=self.model_config.use_mixed_precision)

def forward_and_backward_minibatch(self, sample: Dict[str, Any],
batch_index: int, epoch: int, rank: Optional[int],
device: Optional[torch.device]
batch_index: int, epoch: int, rank: int,
device: torch.device
) -> ModelForwardAndBackwardsOutputs:
"""
Runs training for a single minibatch of training data, and computes all metrics.
@@ -679,6 +675,7 @@ def forward_and_backward_minibatch(self, sample: Dict[str, Any],
forward_pass_result = self.pipeline.forward_pass_patches(patches=cropped_sample.image,
labels=labels,
mask=mask,
rank=rank,
device=device)
# Clear the GPU cache between forward and backward passes to avoid possible out-of-memory
torch.cuda.empty_cache()
19 changes: 9 additions & 10 deletions InnerEye/ML/pipelines/forward_pass.py
Original file line number Diff line number Diff line change
@@ -19,6 +19,7 @@
from InnerEye.ML.models.architectures.base_model import DeviceAwareModule
from InnerEye.ML.models.parallel.data_parallel import execute_within_autocast_if_needed
from InnerEye.ML.utils import image_util, ml_util
from InnerEye.ML.utils.aml_distributed_utils import get_max_rank


class SegmentationForwardPass:
@@ -64,7 +65,7 @@ def forward_pass_patches(self, patches: torch.Tensor,
labels: Optional[torch.Tensor] = None,
mask: Optional[torch.Tensor] = None,
device: Optional[torch.device] = None,
rank: Optional[int] = 0) -> \
rank: Optional[int] = None) -> \
SegmentationForwardPass.Result:
"""
Wrapper function to handle model forward pass, including updating of the optimizer_type with loss gradients
@@ -108,15 +109,13 @@ def forward_pass_patches(self, patches: torch.Tensor,
result = self._forward_pass_with_anomaly_detection(patches=patches, mask=mask,
labels=labels, device=device)
else:
if rank == 0:
self.model.eval()
# turn off autograd for memory optimizations
with torch.no_grad():
result = self._forward_pass_with_anomaly_detection(patches=patches, mask=mask,
labels=labels, device=device)
self.model.train()
else:
result = None
print("attempting forward pass on rank ", rank)
self.model.eval()
# turn off autograd for memory optimizations
with torch.no_grad():
result = self._forward_pass_with_anomaly_detection(patches=patches, mask=mask,
labels=labels, device=device)
self.model.train()
return result

def _forward_pass_with_anomaly_detection(self,
19 changes: 14 additions & 5 deletions InnerEye/ML/pipelines/inference.py
Original file line number Diff line number Diff line change
@@ -22,6 +22,7 @@
from InnerEye.ML.models.architectures.base_model import CropSizeConstraints
from InnerEye.ML.pipelines.forward_pass import SegmentationForwardPass
from InnerEye.ML.utils import image_util, ml_util, model_util
from InnerEye.ML.utils.aml_distributed_utils import get_global_rank
from InnerEye.ML.utils.image_util import compute_uncertainty_map_from_posteriors, gaussian_smooth_posteriors, \
posteriors_to_segmentation
from InnerEye.ML.utils.device_aware_module import DeviceAwareModule
@@ -398,6 +399,14 @@ def predict(self) -> InferenceBatch:
"""
model_config = self.get_configs()

if model_config.use_ddp:
# If possible use another GPU than master node
rank = get_global_rank()
device = torch.device('cuda', rank)
else:
rank = 0
device = torch.device('cuda', 0) if torch.cuda.is_available() else torch.device('cpu')

# extract patches for each image channel: Num patches x Channels x Z x Y x X
patches = self._extract_patches_for_image_channels()

@@ -409,7 +418,7 @@ def predict(self) -> InferenceBatch:
# slice over the batches to prepare batch
batch = patches[batch_idx: batch_idx + batch_size, ...]
# perform the forward pass
batch_predictions = self._model_fn(batch)
batch_predictions = self._model_fn(batch, rank, device)
image_util.check_array_range(batch_predictions,
expected_range=InferencePipeline.MODEL_OUTPUT_POSTERIOR_RANGE, # type: ignore
error_prefix="Model predictions for current batch")
@@ -506,20 +515,20 @@ def _extract_patches_for_image_channels(self) -> np.ndarray:

return np.stack(patches, axis=1)

def _model_fn(self, patches: np.ndarray) -> np.ndarray:
def _model_fn(self, patches: np.ndarray, rank: int, device: torch.device) -> np.ndarray:
"""
Wrapper function to handle the model forward pass
:param patches: Image patches to be passed to the model in format Patches x Channels x Z x Y x X
:return posteriors: Confidence maps [0,1] for each patch per class
in format: Patches x Channels x Class x Z x Y x X
"""
# perform inference only on one GPU, if available, else GPU
device = torch.device('cuda', 0) if torch.cuda.is_available() else torch.device('cpu')
model_config = self.get_configs()

# get the model from the pipeline environment
model = self.pipeline.get_variable(InferencePipeline.Variables.Model)

model.to(device)

# convert patches to Torch tensor
patches = torch.from_numpy(patches).float()

@@ -529,4 +538,4 @@ def _model_fn(self, patches: np.ndarray) -> np.ndarray:
batch_size=model_config.inference_batch_size,
optimizer=None,
in_training_mode=False
).forward_pass_patches(patches=patches, rank=0, device=device).posteriors
).forward_pass_patches(patches=patches, rank=rank, device=device).posteriors
13 changes: 13 additions & 0 deletions InnerEye/ML/utils/aml_distributed_utils.py
Original file line number Diff line number Diff line change
@@ -36,3 +36,16 @@ def get_local_size(is_offline_run: Optional[bool] = True) -> int:
if is_offline_run:
return torch.cuda.device_count()
return int(os.environ['OMPI_COMM_WORLD_LOCAL_SIZE'])


def get_max_rank():
"""
Return the highest CUDA-enabled device ID, or 0 if none available
:return:
"""
if torch.cuda.is_available():
max_rank = max([ii for ii in list(range(torch.cuda.device_count()))])
else:
max_rank = 0
print('Max cuda rank found: ', max_rank)
return max_rank