Skip to content

Commit

Permalink
Davidm/cherrypick r1.16.0 (#6082)
Browse files Browse the repository at this point in the history
* gpt fix

Signed-off-by: David Mosallanezhad <[email protected]>

* per-micro-batch input loader (#5635)

* per-micro-batch input loader

* per-micro-batch input loader

set arg default val

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* minor fix

* apply per-microbatch-loader to only GPT

* update docstring on micro-batch input loader

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* fixed the default arg val

* fix batch size to 1 at log stat registration

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* update container for CI

Signed-off-by: ericharper <[email protected]>

* update container in jenkinsfile

Signed-off-by: ericharper <[email protected]>

* update container for CI

Signed-off-by: ericharper <[email protected]>

fix merge conflict

* revert Jenkinsfile

* Revert "revert Jenkinsfile"

This reverts commit d23b775.

* Update nemo/collections/nlp/models/language_modeling/megatron_gpt_model.py

Signed-off-by: Tim Moon <[email protected]>

* add GradScaler

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

---------

Signed-off-by: ericharper <[email protected]>
Signed-off-by: Tim Moon <[email protected]>
Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
Co-authored-by: ericharper <[email protected]>
Co-authored-by: Tim Moon <[email protected]>

* added PR#5995

Signed-off-by: David Mosallanezhad <[email protected]>

* Distributed Adam optimizer overlaps param all-gather with forward compute (#5684)

* Add distopt support for overlapping param all-gather with forward compute

Signed-off-by: Tim Moon <[email protected]>

* Update Apex commit

Signed-off-by: Tim Moon <[email protected]>

---------

Signed-off-by: Tim Moon <[email protected]>
Co-authored-by: Eric Harper <[email protected]>

* per-micro-batch input loader (#5635)

* per-micro-batch input loader

* per-micro-batch input loader

set arg default val

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* minor fix

* apply per-microbatch-loader to only GPT

* update docstring on micro-batch input loader

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* fixed the default arg val

* fix batch size to 1 at log stat registration

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* update container for CI

Signed-off-by: ericharper <[email protected]>

* update container in jenkinsfile

Signed-off-by: ericharper <[email protected]>

* update container for CI

Signed-off-by: ericharper <[email protected]>

fix merge conflict

* revert Jenkinsfile

* Revert "revert Jenkinsfile"

This reverts commit d23b775.

* Update nemo/collections/nlp/models/language_modeling/megatron_gpt_model.py

Signed-off-by: Tim Moon <[email protected]>

* add GradScaler

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

---------

Signed-off-by: ericharper <[email protected]>
Signed-off-by: Tim Moon <[email protected]>
Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
Co-authored-by: ericharper <[email protected]>
Co-authored-by: Tim Moon <[email protected]>

* adding early stop callback to ptuning (#6028)

* patch to allow using tokenizers without additional_special_tokens_ids attribute

Signed-off-by: arendu <[email protected]>

* early stop callback for prompt/p tuning

Signed-off-by: arendu <[email protected]>

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* update

Signed-off-by: arendu <[email protected]>

* added exp manager config for early stop

Signed-off-by: arendu <[email protected]>

* pushed logic for creating early stopping inside exp manager

Signed-off-by: arendu <[email protected]>

* pushed logic for creating early stopping inside exp manager

Signed-off-by: arendu <[email protected]>

* minor updates and added dataclass check

Signed-off-by: arendu <[email protected]>

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* more args

Signed-off-by: arendu <[email protected]>

* more args

Signed-off-by: arendu <[email protected]>

---------

Signed-off-by: arendu <[email protected]>
Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

---------

Signed-off-by: David Mosallanezhad <[email protected]>
Signed-off-by: ericharper <[email protected]>
Signed-off-by: Tim Moon <[email protected]>
Signed-off-by: Tim Moon <[email protected]>
Signed-off-by: arendu <[email protected]>
Co-authored-by: David Mosallanezhad <[email protected]>
Co-authored-by: Sangkug Lym <[email protected]>
Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
Co-authored-by: ericharper <[email protected]>
Co-authored-by: Tim Moon <[email protected]>
Co-authored-by: Adi Renduchintala <[email protected]>
  • Loading branch information
7 people authored Mar 7, 2023
1 parent 2791852 commit 28b5f29
Show file tree
Hide file tree
Showing 13 changed files with 318 additions and 130 deletions.
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ WORKDIR /tmp/
# container
RUN git clone https://github.com/NVIDIA/apex.git && \
cd apex && \
git checkout 75f401e088ef88e7c85a57ecf70fb232235f0334 && \
git checkout c0a0b0f69d2d5a98bd141be12ee8e5eebd3ec7ca && \
pip3 install -v --disable-pip-version-check --no-cache-dir --global-option="--cpp_ext" --global-option="--cuda_ext" --global-option="--fast_layer_norm" --global-option="--distributed_adam" --global-option="--deprecated_fused_adam" ./

# uninstall stuff from base container
Expand Down
4 changes: 2 additions & 2 deletions Jenkinsfile
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
pipeline {
agent {
docker {
image 'nvcr.io/nvidia/pytorch:23.01-py3'
image 'nemo_containers:23.01_apex_c3d575f2478cd379b3c2d81f41edde39791b5d92'
args '--device=/dev/nvidia0 --gpus all --user 0:128 -v /home/TestData:/home/TestData -v $HOME/.cache:/root/.cache --shm-size=8g'
}
}
Expand Down Expand Up @@ -4510,4 +4510,4 @@ assert_frame_equal(training_curve, gt_curve, rtol=1e-3, atol=1e-3)"'''
cleanWs()
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ trainer:
gradient_clip_val: 1.0
resume_from_checkpoint: null # The path to a checkpoint file to continue the training, restores the whole state including the epoch, step, LR schedulers, apex, etc.
benchmark: False



exp_manager:
Expand All @@ -36,6 +37,14 @@ exp_manager:
filename: 'megatron_gpt_prompt_tune--{val_loss:.3f}-{step}'
model_parallel_size: ${model.tensor_model_parallel_size}
save_best_model: True
create_early_stopping_callback: True
early_stopping_callback_params:
monitor: "val_loss"
mode: "min"
min_delta: 0.001
patience: 10
verbose: True


model:
seed: 1234
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,13 @@ exp_manager:
filename: "megatron_t5_prompt_tune--{${exp_manager.checkpoint_callback_params.monitor}:.3f}-{step}"
model_parallel_size: ${model.tensor_model_parallel_size}
save_best_model: True
create_early_stopping_callback: True
early_stopping_callback_params:
monitor: "val_loss"
mode: "min"
min_delta: 0.001
patience: 10
verbose: True

model:
seed: 1234
Expand Down
3 changes: 3 additions & 0 deletions examples/nlp/language_modeling/megatron_gpt_pretraining.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
# limitations under the License.


import torch.multiprocessing as mp
from omegaconf.omegaconf import OmegaConf, open_dict
from pytorch_lightning import Trainer
from pytorch_lightning.plugins.environments import TorchElasticEnvironment
Expand All @@ -29,6 +30,8 @@
from nemo.utils import logging
from nemo.utils.exp_manager import exp_manager

mp.set_start_method("spawn", force=True)


@hydra_runner(config_path="conf", config_name="megatron_gpt_config")
def main(cfg) -> None:
Expand Down
136 changes: 97 additions & 39 deletions nemo/collections/nlp/data/language_modeling/megatron/data_samplers.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,43 +14,84 @@

"""Dataloaders."""

import abc
from typing import Optional

import torch

from nemo.utils import logging


class MegatronPretrainingSampler:
class BaseMegatronSampler:
def __init__(
self, total_samples, consumed_samples, micro_batch_size, data_parallel_rank, data_parallel_size, drop_last=True
):
self,
total_samples: int,
consumed_samples: int,
micro_batch_size: int,
data_parallel_rank: int,
data_parallel_size: int,
drop_last: bool = True,
global_batch_size: Optional[int] = None,
pad_samples_to_global_batch_size: Optional[bool] = False,
) -> None:
# Sanity checks.
if total_samples <= 0:
raise RuntimeError("no sample to consume: {}".format(total_samples))
if consumed_samples >= total_samples:
raise RuntimeError("no samples left to consume: {}, {}".format(consumed_samples, total_samples))
if micro_batch_size <= 0:
raise RuntimeError(f"micro_batch_size size must be greater than 0, but {micro_batch_size}")
if data_parallel_size <= 0:
raise RuntimeError(f"data parallel size must be greater than 0, but {data_parallel_size}")
if data_parallel_rank >= data_parallel_size:
raise RuntimeError(
"data_parallel_rank should be smaller than data size, but {} >= {}".format(
data_parallel_rank, data_parallel_size
)
)
if global_batch_size is not None:
if global_batch_size % (micro_batch_size * data_parallel_size) != 0:
raise RuntimeError(
f"`global_batch_size` ({global_batch_size}) is not divisible by "
f"`micro_batch_size ({micro_batch_size}) x data_parallel_size "
f"({data_parallel_size})`"
)
if pad_samples_to_global_batch_size and global_batch_size is None:
raise RuntimeError(
f"`pad_samples_to_global_batch_size` can be `True` only when "
f"`global_batch_size` is set to an integer value"
)

# Keep a copy of input params for later use.
self.total_samples = total_samples
self.consumed_samples = consumed_samples
self.micro_batch_size = micro_batch_size
self.data_parallel_rank = data_parallel_rank
self.micro_batch_times_data_parallel_size = self.micro_batch_size * data_parallel_size
self.drop_last = drop_last
self.global_batch_size = global_batch_size
self.pad_samples_to_global_batch_size = pad_samples_to_global_batch_size

logging.info(
f'Instantiating MegatronPretrainingSampler with total_samples: {total_samples} and consumed_samples: {consumed_samples}'
)

# Sanity checks.
assert self.total_samples > 0, 'no sample to consume: {}'.format(self.total_samples)
assert self.consumed_samples < self.total_samples, 'no samples left to consume: {}, {}'.format(
self.consumed_samples, self.total_samples
)
assert self.micro_batch_size > 0
assert data_parallel_size > 0
assert self.data_parallel_rank < data_parallel_size, (
'data_parallel_rank should be smaller than data size: {}, '
'{}'.format(self.data_parallel_rank, data_parallel_size)
)

def __len__(self):
return (self.total_samples - self.consumed_samples - 1) // self.micro_batch_times_data_parallel_size + 1
num_available_samples: int = self.total_samples - self.consumed_samples
if self.global_batch_size is not None:
if self.drop_last:
return num_available_samples // self.global_batch_size
else:
return (num_available_samples + self.global_batch_size - 1) // self.global_batch_size
else:
return (num_available_samples - 1) // self.micro_batch_times_data_parallel_size + 1

@abc.abstractmethod
def __iter__(self):
...


class MegatronPretrainingSampler(BaseMegatronSampler):
def get_start_end_idx(self):
start_idx = self.data_parallel_rank * self.micro_batch_size
end_idx = start_idx + self.micro_batch_size
Expand All @@ -68,32 +109,45 @@ def __iter__(self):

# Check the last partial batch and see drop_last is set
if len(batch) > 0 and not self.drop_last:
start_idx, end_idx = self.get_start_end_idx()
yield batch[start_idx:end_idx]

if self.pad_samples_to_global_batch_size:
for i in range(
self.data_parallel_rank, self.global_batch_size, self.micro_batch_times_data_parallel_size
):
indices = [batch[j] for j in range(i, max(len(batch), i + self.micro_batch_size))]
num_pad = self.micro_batch_size - len(indices)
indices = indices + [-1] * num_pad
yield indices
else:
start_idx, end_idx = self.get_start_end_idx()
yield batch[start_idx:end_idx]

class MegatronPretrainingRandomSampler:
def __init__(self, total_samples, consumed_samples, micro_batch_size, data_parallel_rank, data_parallel_size):
# Keep a copy of input params for later use.
self.total_samples = total_samples
self.consumed_samples = consumed_samples
self.micro_batch_size = micro_batch_size
self.data_parallel_rank = data_parallel_rank
self.data_parallel_size = data_parallel_size
self.micro_batch_times_data_parallel_size = self.micro_batch_size * data_parallel_size
self.last_batch_size = self.total_samples % self.micro_batch_times_data_parallel_size

# Sanity checks.
assert self.total_samples > 0, 'no sample to consume: {}'.format(self.total_samples)
assert self.micro_batch_size > 0
assert data_parallel_size > 0
assert self.data_parallel_rank < data_parallel_size, (
'data_parallel_rank should be smaller than data size: {}, '
'{}'.format(self.data_parallel_rank, data_parallel_size)
class MegatronPretrainingRandomSampler(BaseMegatronSampler):
def __init__(
self,
total_samples: int,
consumed_samples: int,
micro_batch_size: int,
data_parallel_rank: int,
data_parallel_size: int,
drop_last: bool = True,
global_batch_size: Optional[int] = None,
pad_samples_to_global_batch_size: Optional[bool] = False,
) -> None:
super().__init__(
total_samples=total_samples,
consumed_samples=consumed_samples,
micro_batch_size=micro_batch_size,
data_parallel_rank=data_parallel_rank,
data_parallel_size=data_parallel_size,
drop_last=drop_last,
global_batch_size=global_batch_size,
pad_samples_to_global_batch_size=pad_samples_to_global_batch_size,
)

def __len__(self):
return self.total_samples
assert (
pad_samples_to_global_batch_size == False
), "`MegatronPretrainingRandomSampler` does not support sample padding"
self.last_batch_size = self.total_samples % self.micro_batch_times_data_parallel_size

def __iter__(self):
active_total_samples = self.total_samples - self.last_batch_size
Expand All @@ -119,3 +173,7 @@ def __iter__(self):
self.consumed_samples += self.micro_batch_times_data_parallel_size
yield batch
batch = []

# Check the last partial batch and see drop_last is set
if len(batch) > 0 and not self.drop_last:
yield batch
Original file line number Diff line number Diff line change
Expand Up @@ -500,10 +500,10 @@ def __init__(self, path, skip_warmup=False):
def __getstate__(self):
return self._path

# def __setstate__(self, state):
# self._do_init(state)
def __setstate__(self, state):
self._do_init(state)

def _do_init(self, path, skip_warmup):
def _do_init(self, path, skip_warmup=True):
self._path = path
self._index = self.Index(index_file_path(self._path), skip_warmup)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ def configure_gradient_clipping(self, *args, **kwargs):
parameters = self._get_parameters()
grad_norm = clip_grad_norm_fp32(parameters=parameters, max_norm=clip_val)

self.log('grad_norm', grad_norm, rank_zero_only=True)
self.log('grad_norm', grad_norm, rank_zero_only=True, batch_size=1)

def allreduce_gradients(self):
"""Reduce gradients across data parallel ranks.
Expand Down Expand Up @@ -326,8 +326,9 @@ def setup_optimization(
optim_kwargs = {} if optim_kwargs is None else optim_kwargs.copy()
if self.with_distributed_adam:

# Allocate grads since we are storing between microbatches
# Allocate contiguous buffers to avoid extra copies
optim_kwargs['contiguous_grad_buffer'] = True
optim_kwargs['contiguous_param_buffer'] = True

if self.megatron_amp_o2:
# Match param allgather with model dtype
Expand Down Expand Up @@ -417,6 +418,9 @@ def configure_optimizers(self):
self._optimizer.init_params(reversed(overlap_params))
self._optimizer.init_params(reversed(no_overlap_params))

# Initialize contiguous parameter buffer
self._optimizer.init_param_buffer()

if self._scheduler is None:
return self._optimizer
else:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -767,6 +767,8 @@ def configure_optimizers(self):
param._disable_overlap_grad_sync = True

# Initialize parameter buckets for overlapped grad and param syncs
# Note: Params with disabled overlapping are put in the
# last param bucket
buckets = []
if self.cfg.get('virtual_pipeline_model_parallel_size', None) is not None:
# Initialize a bucket for each virtual pipeline stage
Expand All @@ -793,7 +795,7 @@ def configure_optimizers(self):
used_params = set()
for bucket in buckets:
used_params.update(bucket)
buckets.append([p for p in self.parameters() if p not in used_params])
buckets[-1].extend(p for p in self.parameters() if p not in used_params)
self.distributed_adam_buckets = buckets

return super().configure_optimizers()
Expand Down
Loading

0 comments on commit 28b5f29

Please sign in to comment.