Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update docs and add cupy as requirement #171

Merged
merged 4 commits into from
Dec 20, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 48 additions & 24 deletions chainermn/extensions/checkpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@ def create_multi_node_checkpointer(name, comm, cp_interval=5,
It keeps several old snapshots to rollback synchronized
snapshot at each MPI process. Snapshot files are identified
as '<name>.<rank>.<iteration>'.
<name> ... identifier of the run where snapshot is kept for
<rank> ... which process owned the model
<iteration> ... number of iteration.

- <name> ... identifier of the run where snapshot is kept for
- <rank> ... which process owned the model
- <iteration> ... number of iteration.

This extension keeps several files for each execution and allows
users to resume the whole job at the latest snapshots of each MPI
Expand All @@ -28,12 +29,12 @@ def create_multi_node_checkpointer(name, comm, cp_interval=5,
As this object is a usual Chainer extension, users can just
create this object and pass to the trainer as an extension::

cpr = create_multi_node_checkpointer(name=run_id, comm=comm)
trainer.extend(cpr, trigger=(25, 'iteration'))
checkpointer = create_multi_node_checkpointer(name=run_id, comm=comm)
trainer.extend(checkpointer, trigger=(25, 'iteration'))

To run recovery at startup, before first iteration, run::
To run recovery at startup, before first iteration, run

cpr.maybe_load(trainer, optimizer)
checkpointer.maybe_load(trainer, optimizer)

before ``trainer.run()`` . If nothing is recovered (i.e. no
snapshot found), ``trainer.updater.iteration`` will remain ``0``
Expand All @@ -42,34 +43,26 @@ def create_multi_node_checkpointer(name, comm, cp_interval=5,
this will let multi node optimizer avoid initial broadcast when
all snapshot data among nodes are all in sync.

After training finished without errors all those temporary
checkpoints will be cleaned up at all nodes.

Another example to use checkpointer *without* trainer would be::

cpr = create_multi_node_checkpointer(name=run_id, comm=comm)
cpr.maybe_load(obj_you_want_to_cpr, optimizer)
checkpointer = create_multi_node_checkpointer(name=run_id, comm=comm)
checkpointer.maybe_load(obj_you_want_to_snap, optimizer)

while True: ## Training loop
...
updater.update()
...
cpr.save(obj_you_want_to_cpr) # Update checkpoints

c.f. Taking snapshots in single node execution would be much simpler::

trainer.extend(extensions.snapshot())

TODO(kuenishi): do we need checksum? ... snapshot_object is smart
that uses temporary files and then moving the file, which is
usually an atomic operation. If we assume external (malicious or
innocent) hands such as bit rot or file truncate we need this. In
current implementation manual removal of latest snapshot files will
let recovery happen against next-latest snapshot.
TODO(kuenishi): make non-distributed version and contribute to Chainer?
checkpointer.save(obj_you_want_to_snap) # Make a checkpoint

Args:
name (str): unique id of the run
comm: communicater in ChainerMN
cp_interval (int): number of checkpoints to guarantee preserved
cp_interval (int): minimum number of checkpoints to preserve
gc_interval (int): interval to collect non-preserved checkpoints

'''
experimental('chainermn.extensions.create_multi_node_checkpointer')
return _MultiNodeCheckpointer(name, comm, cp_interval, gc_interval, path)
Expand Down Expand Up @@ -137,6 +130,21 @@ def __call__(self, trainer):
self.save(trainer, trainer.updater.iteration)

def save(self, target, iteration):
'''Take snapshots of a target (mostly trainer) at each node

This must be called at all nodes synchronously at the same
timing of same iteration.

'''
# TODO(kuenishi): Possibly taking checksum on snapshot file
# may help model loading more reliable ... snapshot_object is
# smart that uses temporary files and then moving the file,
# which prevents partial write by atomic operation. If we
# assume external hands such as bit rot or file truncate we
# need this. In current implementation manual removal of
# latest snapshot files will let recovery happen against
# next-latest snapshot.

filename = self._filename(iteration)

self.stats.start()
Expand All @@ -150,6 +158,11 @@ def save(self, target, iteration):
self._sync_file_list(remove_remainder=True)

def finalize(self):
'''Finalize checkpointer

Clean up all intermediate snapshots.

'''
assert self.path is not None

files2remove = self.files
Expand All @@ -163,6 +176,15 @@ def finalize(self):
self.files = []

def get_stats(self):
'''Get statistics of taking snapshots

After or during training, checkpointer holds statistics on
saving checkpoints such as average time, minimum and maximum
time. With this stats users may identify slow nodes or disk,
or know average time penalty of taking snapshot and optmize
interval to take snapshots.

'''
return self.stats.report()

def _sync_file_list(self, remove_remainder=False):
Expand Down Expand Up @@ -229,7 +251,9 @@ def _parse_filename(self, filename):
return name, int(rank), int(iter)

def maybe_load(self, trainer, optimizer=None, path=None):
# If there's existing model, load, sync, and resume.
'''If there's existing model, load, sync, and resume.

'''
if self.path is None:
if path is not None:
self.path = path
Expand Down
8 changes: 5 additions & 3 deletions chainermn/extensions/multi_node_evaluator.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,9 @@
def create_multi_node_evaluator(actual_evaluator, communicator):
"""Create a multi node evaluator from a normal evaluator.

Actually patches the evaluator to work in multinode
Actually this method patches the evaluator to work in multi node
environment. This method adds several hidden attributes starting
with _mn_ prefix. After the patch, original evaluator does not
work correctly in non-MPI environment.
with `_mn_` prefix.

Args:
actual_evaluator: evaluator to be patched
Expand All @@ -17,6 +16,9 @@ def create_multi_node_evaluator(actual_evaluator, communicator):
Returns:
The multi-node patched ``actual_evaluator``.

.. note:: After patched, original evaluator does not work
correctly in non-MPI environment.

"""

actual_evaluator._mn_original_evaluate = actual_evaluator.evaluate
Expand Down
2 changes: 1 addition & 1 deletion docs/source/installation/guide.rst
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ Installation Guide
Requirements
------------
In addition to Chainer, ChainerMN depends on the following software libraries:
CUDA-Aware MPI, NVIDIA NCCL, and a few Python packages including MPI4py.
CUDA-Aware MPI, NVIDIA NCCL, and a few Python packages including MPI4py and cupy.


Chainer
Expand Down
5 changes: 5 additions & 0 deletions docs/source/reference/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,8 @@ Functions
.. autofunction:: chainermn.functions.recv
.. autofunction:: chainermn.functions.pseudo_connect
.. autofunction:: chainermn.functions.all_to_all

Trainer extensions
~~~~~~~~~~~~~~~~~~

.. autofunction:: chainermn.create_multi_node_checkpointer
6 changes: 2 additions & 4 deletions docs/source/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
cython

# Fix broken search link issue
# https://github.com/rtfd/readthedocs-sphinx-ext/issues/25
sphinx ==1.4.*
sphinx_rtd_theme
sphinx
2 changes: 2 additions & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
elif os.environ.get('READTHEDOCS', None) == 'True':
ext_modules = []
install_requires.remove('mpi4py') # mpi4py cannot be installed without MPI
else:
install_requires.append('cupy')

setup(
name='chainermn',
Expand Down