Skip to content

Commit

Permalink
Add nemo2 interface for pipeline_model_parallel_comm_backend
Browse files Browse the repository at this point in the history
Signed-off-by: Guyue Huang <[email protected]>
  • Loading branch information
guyueh1 committed Jan 4, 2025
1 parent 7be1d12 commit af35a8c
Show file tree
Hide file tree
Showing 4 changed files with 11 additions and 0 deletions.
2 changes: 2 additions & 0 deletions nemo/lightning/_strategy_lib.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ def init_parallel_ranks(
tensor_model_parallel_size=parallel_config.tensor_model_parallel_size,
expert_model_parallel_size=parallel_config.expert_model_parallel_size,
pipeline_model_parallel_size=parallel_config.pipeline_model_parallel_size,
pipeline_model_parallel_comm_backend=parallel_config.get('pipeline_model_parallel_comm_backend', 'nccl'),
virtual_pipeline_model_parallel_size=parallel_config.virtual_pipeline_model_parallel_size,
context_parallel_size=parallel_config.context_parallel_size,
encoder_tensor_model_parallel_size=getattr(parallel_config, "encoder_tensor_model_parallel_size", 0),
Expand Down Expand Up @@ -117,6 +118,7 @@ def init_model_parallel(model: Optional[nn.Module] = None) -> None:
pipeline_model_parallel_size=app_state.pipeline_model_parallel_size,
virtual_pipeline_model_parallel_size=app_state.virtual_pipeline_model_parallel_size,
pipeline_model_parallel_split_rank=app_state.pipeline_model_parallel_split_rank,
pipeline_model_parallel_comm_backend=app_state.pipeline_model_parallel_comm_backend,
encoder_pipeline_model_parallel_size=app_state.encoder_pipeline_model_parallel_size,
encoder_tensor_model_parallel_size=app_state.encoder_tensor_model_parallel_size,
context_parallel_size=app_state.context_parallel_size,
Expand Down
4 changes: 4 additions & 0 deletions nemo/lightning/fabric/strategies.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ def __init__(
tensor_model_parallel_size: int = 1,
pipeline_model_parallel_size: int = 1,
virtual_pipeline_model_parallel_size: Optional[int] = None,
pipeline_model_parallel_comm_backend: str = 'nccl',
microbatch_group_size_per_vp_stage: Optional[int] = None,
context_parallel_size: int = 1,
sequence_parallel: bool = False,
Expand Down Expand Up @@ -109,6 +110,7 @@ def __init__(
self.data_sampler: Optional['DataSampler'] = data_sampler
self.tensor_model_parallel_size = tensor_model_parallel_size
self.pipeline_model_parallel_size = pipeline_model_parallel_size
self.pipeline_model_parallel_comm_backend = pipeline_model_parallel_comm_backend
self.microbatch_group_size_per_vp_stage = (
microbatch_group_size_per_vp_stage
if microbatch_group_size_per_vp_stage is not None
Expand Down Expand Up @@ -411,6 +413,7 @@ def parallelism(self):
return ParallelismConfig(
tensor_model_parallel_size=self.tensor_model_parallel_size,
pipeline_model_parallel_size=self.pipeline_model_parallel_size,
pipeline_model_parallel_comm_backend=self.pipeline_model_parallel_comm_backend,
virtual_pipeline_model_parallel_size=self.virtual_pipeline_model_parallel_size,
microbatch_group_size_per_vp_stage=self.microbatch_group_size_per_vp_stage,
context_parallel_size=self.context_parallel_size,
Expand Down Expand Up @@ -493,6 +496,7 @@ def convert_megatron_strategy(strategy: MegatronStrategy) -> FabricMegatronStrat
return FabricMegatronStrategy(
tensor_model_parallel_size=strategy.tensor_model_parallel_size,
pipeline_model_parallel_size=strategy.pipeline_model_parallel_size,
pipeline_model_parallel_comm_backend=strategy.pipeline_model_parallel_comm_backend,
virtual_pipeline_model_parallel_size=strategy.virtual_pipeline_model_parallel_size,
context_parallel_size=strategy.context_parallel_size,
sequence_parallel=strategy.sequence_parallel,
Expand Down
2 changes: 2 additions & 0 deletions nemo/lightning/megatron_init.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ def initialize_model_parallel_for_nemo(
pipeline_model_parallel_size=1,
virtual_pipeline_model_parallel_size=None,
pipeline_model_parallel_split_rank=None,
pipeline_model_parallel_comm_backend='nccl',
context_parallel_size=1,
encoder_tensor_model_parallel_size=0,
encoder_pipeline_model_parallel_size=0,
Expand Down Expand Up @@ -124,6 +125,7 @@ def initialize_model_parallel_for_nemo(
app_state.context_parallel_size = context_parallel_size
app_state.encoder_tensor_model_parallel_size = encoder_tensor_model_parallel_size
app_state.encoder_pipeline_model_parallel_size = encoder_pipeline_model_parallel_size
app_state.pipeline_model_parallel_comm_backend = pipeline_model_parallel_comm_backend
app_state.use_fp8 = use_fp8
app_state.init_mpi_proc_group = init_mpi_proc_group
(
Expand Down
3 changes: 3 additions & 0 deletions nemo/lightning/pytorch/strategies/megatron_strategy.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ class ParallelismConfig:
standalone_embedding_stage: bool = False
standalone_loss_stage: bool = False
use_te_rng_tracker: bool = False
pipeline_model_parallel_comm_backend: str = 'nccl'


class MegatronStrategy(DDPStrategy, io.IOMixin):
Expand Down Expand Up @@ -186,6 +187,7 @@ def __init__(
self,
tensor_model_parallel_size: int = 1,
pipeline_model_parallel_size: int = 1,
pipeline_model_parallel_comm_backend: str = 'nccl',
virtual_pipeline_model_parallel_size: Optional[int] = None,
microbatch_group_size_per_vp_stage: Optional[int] = None,
context_parallel_size: int = 1,
Expand Down Expand Up @@ -236,6 +238,7 @@ def __init__(
self.data_sampler: Optional["DataSampler"] = data_sampler
self.tensor_model_parallel_size = tensor_model_parallel_size
self.pipeline_model_parallel_size = pipeline_model_parallel_size
self.pipeline_model_parallel_comm_backend = pipeline_model_parallel_comm_backend
self.microbatch_group_size_per_vp_stage = (
microbatch_group_size_per_vp_stage
if microbatch_group_size_per_vp_stage is not None
Expand Down

0 comments on commit af35a8c

Please sign in to comment.