Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

readthedocs upgrade #402

Merged
merged 6 commits into from
Sep 10, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ deepspeed.egg-info/
# Website
docs/_site/
docs/build
docs/code-docs/source/_build
docs/code-docs/_build
docs/code-docs/build
.sass-cache/
Expand Down
7 changes: 7 additions & 0 deletions deepspeed/runtime/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -841,6 +841,13 @@ def backward(self, loss, allreduce_gradients=True, release_loss=False):
return loss

def is_gradient_accumulation_boundary(self):
"""Query whether the current micro-batch is at the boundary of
gradient accumulation, and thus will trigger gradient reductions and
an optimizer step.

Returns:
bool: if the current step is a gradient accumulation boundary.
"""
return (self.micro_steps + 1) % \
self.gradient_accumulation_steps() == 0

Expand Down
67 changes: 51 additions & 16 deletions deepspeed/runtime/pipe/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,10 @@ def _tensor_bytes(tensor):


class PipelineEngine(DeepSpeedEngine):
""" A model wrapper for pipeline-parallel execution.
""" A training engine hybrid pipeline, data, and model parallel training.

Parallelism is achieved by executing micro-batches in a pipelined fashion with
gradient accumulation.
This engine is created by ``deepspeed.initialize()`` when a :class:`PipelineModule`
is provided.
"""
def __init__(self, *super_args, **super_kwargs):
super().__init__(*super_args, **super_kwargs)
Expand Down Expand Up @@ -227,10 +227,28 @@ def _reserve_pipe_buffers(self, num_buffers):
self.num_pipe_buffers = num_buffers

def train_batch(self, data_iter=None):
"""Progress the pipeline to train the next batch of data.
"""Progress the pipeline to train the next batch of data. The engine will ingest
``self.train_batch_size()`` total samples collectively across all workers.


An iterator that over training data should be provided as an argument
unless ``deepspeed.initialize()`` was provided a training set. In that event,
the training data will automatically be read.


.. warning::
A total of ``self.gradient_accumulation_steps()`` entries will be pulled
from ``data_iter`` by each pipeline. There must be sufficient
data left in ``data_iter`` or else a ``StopIteration`` will halt training.

DeepSpeed provides a convenience class :class:`deepspeed.utils.RepeatingLoader`
that wraps data loaders to automatically restart upon a ``StopIteration``.

Args:
data_iter (Iterator, optional): Iterator of training data.

Returns:
The arithmetic mean of the losses over all micro-batches.
The arithmetic mean of the losses computed this batch.
"""
if not torch._C.is_grad_enabled():
raise RuntimeError(
Expand Down Expand Up @@ -286,7 +304,9 @@ def train_batch(self, data_iter=None):
return self.agg_train_loss

def eval_batch(self, data_iter):
"""Evaluate the pipeline on a batch of data from ``data_iter``.
"""Evaluate the pipeline on a batch of data from ``data_iter``. The
engine will evaluate ``self.train_batch_size()`` total samples
collectively across all workers.

This method is equivalent to:

Expand All @@ -296,9 +316,21 @@ def eval_batch(self, data_iter):
with torch.no_grad():
output = module(batch)

.. warning::
A total of ``self.gradient_accumulation_steps()`` entries will be pulled
from ``data_iter`` by each pipeline. There must be sufficient
data left in ``data_iter`` or else a ``StopIteration`` will halt training.

DeepSpeed provides a convenience class :class:`deepspeed.utils.RepeatingLoader`
that wraps data loaders to automatically restart upon a ``StopIteration``.

Args:
data_iter (Iterator): Iterator of data to evaluate.

Returns:
The arithmetic mean of the losses over all micro-batches.
The arithmetic mean of the losses computed this batch.
"""

self.module.eval()
self.total_loss = None

Expand Down Expand Up @@ -331,6 +363,14 @@ def eval_batch(self, data_iter):

return self.agg_eval_loss

def is_first_stage(self):
"""True if this process is in the first stage in the pipeline."""
return self.stage_id == 0

def is_last_stage(self):
"""True if this process is in the last stage in the pipeline."""
return self.stage_id == self.num_stages - 1

def _aggregate_total_loss(self):
# Scale loss, average among DP ranks, and bcast loss to the rest of my DP group
if self.is_last_stage():
Expand Down Expand Up @@ -364,7 +404,7 @@ def _aggregate_total_loss(self):
return agg_loss

def set_dataloader(self, loader):
""" Store a DataLoader to sample for training data. """
""""""
if self.is_first_stage() or self.is_last_stage():
self.training_dataloader = loader
self.data_iterator = iter(self.training_dataloader)
Expand Down Expand Up @@ -993,12 +1033,15 @@ def _allocate_buffers(self, shapes, requires_grad=False, num_buffers=-1):
return buffers

def forward(self, *args, **kwargs):
"""Disabled for pipeline parallel training. See ``train_batch()``. """
raise PipelineError("Only train_batch() is accessible in pipeline mode.")

def backward(self, *args, **kwargs):
"""Disabled for pipeline parallel training. See ``train_batch()``. """
raise PipelineError("Only train_batch() is accessible in pipeline mode.")

def step(self, *args, **kwargs):
"""Disabled for pipeline parallel training. See ``train_batch()``. """
raise PipelineError("Only train_batch() is accessible in pipeline mode.")

def mem_status(self, msg, print_rank=-1, reset_max=False):
Expand Down Expand Up @@ -1084,14 +1127,6 @@ def load_module_state_dict(self, state_dict, strict=True):

self.module.load_state_dir(state_dict, strict=strict)

def is_first_stage(self):
"""True if this process is in the first stage in the pipeline."""
return self.stage_id == 0

def is_last_stage(self):
"""True if this process is in the last stage in the pipeline."""
return self.stage_id == self.num_stages - 1

# A map of PipeInstruction types to methods. Each method will be executed with the
# kwargs provided to the PipeInstruction from the scheduler.
_INSTRUCTION_MAP = {
Expand Down
63 changes: 34 additions & 29 deletions deepspeed/runtime/pipe/module.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,17 @@ class LayerSpec:
LayerSpec stores the type information and parameters for each stage in a
PipelineModule. For example:

.. code-block:: python

nn.Sequence(
torch.nn.Linear(self.in_dim, self.hidden_dim, bias=False),
torch.nn.Linear(self.hidden_hidden, self.out_dim)
)

becomes

.. code-block:: python

layer_specs = [
LayerSpec(torch.nn.Linear, self.in_dim, self.hidden_dim, bias=False),
LayerSpec(torch.nn.Linear, self.hidden_hidden, self.out_dim)]
Expand Down Expand Up @@ -79,44 +83,46 @@ def __init__(self,


class PipelineModule(nn.Module):
"""Base class for modules to be parallelized with pipeline parallelism.

Users should subclass PipelineModule and provide layer_specs(), which returns a list
of LayerSpec objects. Thes sequence of layers represents the pipeline-parallel model.
After initialization, a PipelineModule can be used as a traditional torch.nn.Module.

The forward pass is already provided by this base class. The key assumption is that
the output of each layer can be directly fed as input to the next, like a
torch.nn.Sequence.

The key constraint that enables pipeline parallelism is the representation of the
forward pass as a sequence of layers (i.e., stages) and the enforcement of a
simple interface between them.

Example:

class LinearPipeline(PipelineModule):
def __init__(self, in_dim, hidden_dim, out_dim):
self.in_dim = in_dim
self.hidden_dim = hidden_dim
self.out_dim = out_dim
super().__init__()

def layer_specs(self):
return [LayerSpec(torch.nn.Linear, self.in_dim, self.hidden_dim, bias=False),
LayerSpec(torch.nn.Linear, self.hidden_hidden, self.out_dim)]
"""
def __init__(self,
layers,
num_stages=None,
loss_fn=None,
topology=None,
loss_fn=None,
seed_layers=False,
seed_fn=None,
base_seed=1234,
partition_method='parameters',
activation_checkpoint_interval=0,
activation_checkpoint_func=checkpointing.checkpoint):
"""Modules to be parallelized with pipeline parallelism.

The key constraint that enables pipeline parallelism is the
representation of the forward pass as a sequence of layers
and the enforcement of a simple interface between them. The
forward pass is implicitly defined by the module ``layers``. The key
assumption is that the output of each layer can be directly fed as
input to the next, like a ``torch.nn.Sequence``. The forward pass is
implicitly:

.. code-block:: python

def forward(self, inputs):
x = inputs
for layer in self.layers:
x = layer(x)
return x

Args:
layers (Iterable): A sequence of layers defining pipeline structure. Can be a ``torch.nn.Sequential`` module.
num_stages (int, optional): The degree of pipeline parallelism. If not specified, ``topology`` must be provided.
topology (``deepseed.pipe.ProcessTopology``, optional): Defines the axes of parallelism axes for training. Must be provided if ``num_stages`` is ``None``.
loss_fn (callable, optional): Loss is computed ``loss = loss_fn(outputs, label)``
base_seed (int, optional): [description]. Defaults to 1234.
partition_method (str, optional): [description]. Defaults to 'parameters'.
activation_checkpoint_interval (int, optional): The granularity activation checkpointing in terms of number of layers. 0 disables activation checkpointing.
activation_checkpoint_func (callable, optional): The function to use for activation checkpointing. Defaults to ``deepspeed.checkpointing.checkpoint``.
"""

super().__init__()

if num_stages is None and topology is None:
Expand Down Expand Up @@ -488,7 +494,6 @@ def _set_bounds(self, start=None, stop=None):
self._local_stop = stop

def set_checkpoint_interval(self, interval):
""" Checkpoint activations after each ``interval`` layers. Use 0 to disable. """
assert interval >= 0
self.checkpoint_interval = interval

Expand Down
2 changes: 1 addition & 1 deletion docs/code-docs/source/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
author = 'Microsoft'

# The full version, including alpha/beta/rc tags
release = '0.1.0'
release = '0.3.0'

master_doc = 'index'

Expand Down
Loading