From 980a67f51e99518658f94491bc292fde676487b8 Mon Sep 17 00:00:00 2001 From: ssuzuki Date: Mon, 11 Dec 2017 16:07:58 +0900 Subject: [PATCH 1/5] add a test for dynamic network --- .../test_multi_node_optimizer.py | 228 ++++++++++++++++++ 1 file changed, 228 insertions(+) create mode 100644 tests/optimizer_tests/test_multi_node_optimizer.py diff --git a/tests/optimizer_tests/test_multi_node_optimizer.py b/tests/optimizer_tests/test_multi_node_optimizer.py new file mode 100644 index 00000000..224e5576 --- /dev/null +++ b/tests/optimizer_tests/test_multi_node_optimizer.py @@ -0,0 +1,228 @@ +import chainer +import chainer.testing +import chainer.testing.attr +import chainermn +from chainermn import nccl +import mock +import nose +import numpy as np +import unittest + + +class ExampleModel(chainer.Chain): + + def __init__(self): + super(ExampleModel, self).__init__( + a=chainer.links.Linear(2, 3), + b=chainer.links.Linear(3, 4), + c=chainer.links.Linear(4, 5), + ) + + +class TestMultiNodeOptimizer(unittest.TestCase): + + def setup_cpu(self): + self.comm = chainermn.create_communicator('naive') + self.target = ExampleModel() + self.target.a.W.data[:] = self.comm.rank + self.target.b.W.data[:] = self.comm.rank + 1 + self.target.c.W.data[:] = self.comm.rank + 2 + self.target.a.W.grad[:] = 0 + self.target.b.W.grad[:] = 0 + self.target.c.W.grad[:] = 0 + self.actual_optimizer = chainer.GradientMethod() + self.actual_optimizer.create_update_rule = mock.MagicMock + self.actual_optimizer.setup(self.target) + + def setup_gpu(self, device=None): + self.comm = chainermn.create_communicator('hierarchical') + device = self.comm.intra_rank + chainer.cuda.get_device(device).use() + self.target = ExampleModel() + self.target.to_gpu() + self.target.a.W.data[:] = self.comm.rank + self.target.b.W.data[:] = self.comm.rank + 1 + self.target.c.W.data[:] = self.comm.rank + 2 + self.target.a.W.grad[:] = 0 + self.target.b.W.grad[:] = 0 + self.target.c.W.grad[:] = 0 + self.actual_optimizer = chainer.GradientMethod() + self.actual_optimizer.create_update_rule = mock.MagicMock + self.actual_optimizer.setup(self.target) + + def test_update_with_cpu(self): + self.setup_cpu() + self.optimizer = chainermn.create_multi_node_optimizer( + self.actual_optimizer, self.comm) + self.optimizer.update() + self.assertEqual(self.actual_optimizer.t, 0) + self.optimizer.target.a.W.grad[:] = self.comm.rank + self.optimizer.target.b.W.grad[:] = self.comm.rank + 1 + self.optimizer.target.c.W.grad[:] = self.comm.rank + 2 + + self.optimizer.update() + self.assertEqual(self.actual_optimizer.t, 1) + self.optimizer.target.a.W.update_rule.update.assert_called_once_with( + self.optimizer.target.a.W) + self.optimizer.target.b.W.update_rule.update.assert_called_once_with( + self.optimizer.target.b.W) + self.optimizer.target.c.W.update_rule.update.assert_called_once_with( + self.optimizer.target.c.W) + + base = (self.comm.size - 1.0) / 2 + chainer.testing.assert_allclose(self.optimizer.target.a.W.grad, + (base + 0) * np.ones((3, 2))) + chainer.testing.assert_allclose(self.optimizer.target.b.W.grad, + (base + 1) * np.ones((4, 3))) + chainer.testing.assert_allclose(self.optimizer.target.c.W.grad, + (base + 2) * np.ones((5, 4))) + + @chainer.testing.attr.gpu + def test_update_with_gpu(self): + self.setup_gpu() + self.optimizer = chainermn.create_multi_node_optimizer( + self.actual_optimizer, self.comm) + self.optimizer.update() + self.assertEqual(self.actual_optimizer.t, 0) + self.optimizer.target.a.W.grad[:] = self.comm.rank + self.optimizer.target.b.W.grad[:] = self.comm.rank + 1 + self.optimizer.target.c.W.grad[:] = self.comm.rank + 2 + + self.optimizer.update() + self.assertEqual(self.actual_optimizer.t, 1) + self.optimizer.target.a.W.update_rule.update.assert_called_once_with( + self.optimizer.target.a.W) + self.optimizer.target.b.W.update_rule.update.assert_called_once_with( + self.optimizer.target.b.W) + self.optimizer.target.c.W.update_rule.update.assert_called_once_with( + self.optimizer.target.c.W) + + base = (self.comm.size - 1.0) / 2 + chainer.testing.assert_allclose(self.optimizer.target.a.W.grad, + (base + 0) * np.ones((3, 2))) + chainer.testing.assert_allclose(self.optimizer.target.b.W.grad, + (base + 1) * np.ones((4, 3))) + chainer.testing.assert_allclose(self.optimizer.target.c.W.grad, + (base + 2) * np.ones((5, 4))) + + + +class DynamicExampleModel(chainer.Chain): + + def __init__(self): + super(DynamicExampleModel, self).__init__() + with self.init_scope(): + self.a=chainer.links.Linear(2, 3) + self.b=chainer.links.Linear(3, 4) + + +class TestMultiNodeOptimizerWithDynamicModel(unittest.TestCase): + + def setup_cpu(self): + self.comm = chainermn.create_communicator('naive') + self.target = DynamicExampleModel() + self.target.a.W.data[:] = self.comm.rank + self.target.b.W.data[:] = self.comm.rank + 1 + self.target.a.W.grad[:] = 0 + self.target.b.W.grad[:] = 0 + self.actual_optimizer = chainer.GradientMethod() + self.actual_optimizer.create_update_rule = mock.MagicMock + self.actual_optimizer.setup(self.target) + + def setup_gpu(self, device=None): + self.comm = chainermn.create_communicator('hierarchical') + device = self.comm.intra_rank + chainer.cuda.get_device(device).use() + self.target = DynamicExampleModel() + self.target.to_gpu() + self.target.a.W.data[:] = self.comm.rank + self.target.b.W.data[:] = self.comm.rank + 1 + self.target.a.W.grad[:] = 0 + self.target.b.W.grad[:] = 0 + self.actual_optimizer = chainer.GradientMethod() + self.actual_optimizer.create_update_rule = mock.MagicMock + self.actual_optimizer.setup(self.target) + + def test_update_with_cpu(self): + self.setup_cpu() + self.optimizer = chainermn.create_multi_node_optimizer( + self.actual_optimizer, self.comm) + self.optimizer.update() + self.assertEqual(self.actual_optimizer.t, 0) + + with self.target.init_scope(): + self.target.c = chainer.links.Linear(4, 4) + if self.comm.rank == 0: + self.target.c.W.data[:] = self.comm.rank + 2 + self.optimizer.update() + self.assertEqual(self.actual_optimizer.t, 0) + + send_buf = chainer.cuda.to_cpu(self.optimizer.target.c.W.data) + recv_buf = self.comm.mpi_comm.allgather(send_buf) + for i in range(1, self.comm.size): + print(recv_buf[i], flush=True) + chainer.testing.assert_allclose(recv_buf[0], recv_buf[i]) + + self.optimizer.target.a.W.grad[:] = self.comm.rank + self.optimizer.target.b.W.grad[:] = self.comm.rank + 1 + self.optimizer.target.c.W.grad[:] = self.comm.rank + 2 + self.optimizer.update() + self.assertEqual(self.actual_optimizer.t, 1) + self.optimizer.target.a.W.update_rule.update.assert_called_once_with( + self.optimizer.target.a.W) + self.optimizer.target.b.W.update_rule.update.assert_called_once_with( + self.optimizer.target.b.W) + self.optimizer.target.c.W.update_rule.update.assert_called_once_with( + self.optimizer.target.c.W) + + base = (self.comm.size - 1.0) / 2 + chainer.testing.assert_allclose(self.optimizer.target.a.W.grad, + (base + 0) * np.ones((3, 2))) + chainer.testing.assert_allclose(self.optimizer.target.b.W.grad, + (base + 1) * np.ones((4, 3))) + chainer.testing.assert_allclose(self.optimizer.target.c.W.grad, + (base + 2) * np.ones((5, 4))) + + @chainer.testing.attr.gpu + def test_update_with_gpu(self): + self.setup_gpu() + self.optimizer = chainermn.create_multi_node_optimizer( + self.actual_optimizer, self.comm) + self.optimizer.update() + self.assertEqual(self.actual_optimizer.t, 0) + + with self.target.init_scope(): + c = chainer.links.Linear(4, 4) + c.to_gpu() + self.target.c = c + if self.comm.rank == 0: + self.target.c.W.data[:] = self.comm.rank + 2 + self.optimizer.update() + self.assertEqual(self.actual_optimizer.t, 0) + + send_buf = chainer.cuda.to_cpu(self.optimizer.target.c.W.data) + recv_buf = self.comm.mpi_comm.allgather(send_buf) + for i in range(1, self.comm.size): + print(recv_buf[i], flush=True) + chainer.testing.assert_allclose(recv_buf[0], recv_buf[i]) + + self.optimizer.target.a.W.grad[:] = self.comm.rank + self.optimizer.target.b.W.grad[:] = self.comm.rank + 1 + self.optimizer.target.c.W.grad[:] = self.comm.rank + 2 + self.optimizer.update() + self.assertEqual(self.actual_optimizer.t, 1) + self.optimizer.target.a.W.update_rule.update.assert_called_once_with( + self.optimizer.target.a.W) + self.optimizer.target.b.W.update_rule.update.assert_called_once_with( + self.optimizer.target.b.W) + self.optimizer.target.c.W.update_rule.update.assert_called_once_with( + self.optimizer.target.c.W) + + base = (self.comm.size - 1.0) / 2 + chainer.testing.assert_allclose(self.optimizer.target.a.W.grad, + (base + 0) * np.ones((3, 2))) + chainer.testing.assert_allclose(self.optimizer.target.b.W.grad, + (base + 1) * np.ones((4, 3))) + chainer.testing.assert_allclose(self.optimizer.target.c.W.grad, + (base + 2) * np.ones((5, 4))) + From ef19f3e57575afce7bf9c353135989221f57f984 Mon Sep 17 00:00:00 2001 From: ssuzuki Date: Wed, 13 Dec 2017 13:29:17 +0900 Subject: [PATCH 2/5] fix MultiNodeOptimizer --- chainermn/optimizers.py | 18 ++++++++++++++- .../test_multi_node_optimizer.py | 23 +++++++++++-------- 2 files changed, 30 insertions(+), 11 deletions(-) diff --git a/chainermn/optimizers.py b/chainermn/optimizers.py index 6a2a7659..1af8af45 100644 --- a/chainermn/optimizers.py +++ b/chainermn/optimizers.py @@ -7,6 +7,8 @@ def __init__(self, actual_optimizer, communicator): 'actual_optimizer', actual_optimizer) super(_MultiNodeOptimizer, self).__setattr__( 'needs_broadcast', True) + super(_MultiNodeOptimizer, self).__setattr__( + 'target_params', []) def update(self, lossfun=None, *args, **kwds): target = self.target @@ -20,7 +22,7 @@ def update(self, lossfun=None, *args, **kwds): loss.backward() del loss - if self.needs_broadcast: + if self.is_changed(target): self.communicator.broadcast_data(target) super(_MultiNodeOptimizer, self).__setattr__( 'needs_broadcast', False) @@ -28,6 +30,20 @@ def update(self, lossfun=None, *args, **kwds): self.communicator.allreduce_grad(target) self.actual_optimizer.update(None, *args, **kwds) + def is_changed(self, target): + previous_params = self.target_params + super(_MultiNodeOptimizer, self).__setattr__( + 'target_params', [(name, param.data is not None) for name, param in sorted(target.namedparams())]) + if len(previous_params) == len(self.target_params): + for param1, param2 in zip(self.target_params, previous_params): + if param1[0] != param2[0]: + return True + if param1[1] != param2[1]: + return True + return False + else: + return True + def __getattr__(self, attr_name): return getattr(self.actual_optimizer, attr_name) diff --git a/tests/optimizer_tests/test_multi_node_optimizer.py b/tests/optimizer_tests/test_multi_node_optimizer.py index 224e5576..cd194085 100644 --- a/tests/optimizer_tests/test_multi_node_optimizer.py +++ b/tests/optimizer_tests/test_multi_node_optimizer.py @@ -2,13 +2,16 @@ import chainer.testing import chainer.testing.attr import chainermn -from chainermn import nccl import mock +import mpi4py.MPI import nose import numpy as np import unittest +from chainermn.communicators import _communication_utility + + class ExampleModel(chainer.Chain): def __init__(self): @@ -32,7 +35,6 @@ def setup_cpu(self): self.target.c.W.grad[:] = 0 self.actual_optimizer = chainer.GradientMethod() self.actual_optimizer.create_update_rule = mock.MagicMock - self.actual_optimizer.setup(self.target) def setup_gpu(self, device=None): self.comm = chainermn.create_communicator('hierarchical') @@ -48,12 +50,12 @@ def setup_gpu(self, device=None): self.target.c.W.grad[:] = 0 self.actual_optimizer = chainer.GradientMethod() self.actual_optimizer.create_update_rule = mock.MagicMock - self.actual_optimizer.setup(self.target) def test_update_with_cpu(self): self.setup_cpu() self.optimizer = chainermn.create_multi_node_optimizer( self.actual_optimizer, self.comm) + self.optimizer.setup(self.target) self.optimizer.update() self.assertEqual(self.actual_optimizer.t, 0) self.optimizer.target.a.W.grad[:] = self.comm.rank @@ -82,6 +84,7 @@ def test_update_with_gpu(self): self.setup_gpu() self.optimizer = chainermn.create_multi_node_optimizer( self.actual_optimizer, self.comm) + self.optimizer.setup(self.target) self.optimizer.update() self.assertEqual(self.actual_optimizer.t, 0) self.optimizer.target.a.W.grad[:] = self.comm.rank @@ -117,7 +120,7 @@ def __init__(self): class TestMultiNodeOptimizerWithDynamicModel(unittest.TestCase): - + def setup_cpu(self): self.comm = chainermn.create_communicator('naive') self.target = DynamicExampleModel() @@ -127,7 +130,6 @@ def setup_cpu(self): self.target.b.W.grad[:] = 0 self.actual_optimizer = chainer.GradientMethod() self.actual_optimizer.create_update_rule = mock.MagicMock - self.actual_optimizer.setup(self.target) def setup_gpu(self, device=None): self.comm = chainermn.create_communicator('hierarchical') @@ -141,12 +143,12 @@ def setup_gpu(self, device=None): self.target.b.W.grad[:] = 0 self.actual_optimizer = chainer.GradientMethod() self.actual_optimizer.create_update_rule = mock.MagicMock - self.actual_optimizer.setup(self.target) def test_update_with_cpu(self): self.setup_cpu() self.optimizer = chainermn.create_multi_node_optimizer( self.actual_optimizer, self.comm) + self.optimizer.setup(self.target) self.optimizer.update() self.assertEqual(self.actual_optimizer.t, 0) @@ -154,13 +156,13 @@ def test_update_with_cpu(self): self.target.c = chainer.links.Linear(4, 4) if self.comm.rank == 0: self.target.c.W.data[:] = self.comm.rank + 2 + self.optimizer.setup(self.target) self.optimizer.update() self.assertEqual(self.actual_optimizer.t, 0) send_buf = chainer.cuda.to_cpu(self.optimizer.target.c.W.data) recv_buf = self.comm.mpi_comm.allgather(send_buf) for i in range(1, self.comm.size): - print(recv_buf[i], flush=True) chainer.testing.assert_allclose(recv_buf[0], recv_buf[i]) self.optimizer.target.a.W.grad[:] = self.comm.rank @@ -181,13 +183,14 @@ def test_update_with_cpu(self): chainer.testing.assert_allclose(self.optimizer.target.b.W.grad, (base + 1) * np.ones((4, 3))) chainer.testing.assert_allclose(self.optimizer.target.c.W.grad, - (base + 2) * np.ones((5, 4))) + (base + 2) * np.ones((4, 4))) @chainer.testing.attr.gpu def test_update_with_gpu(self): self.setup_gpu() self.optimizer = chainermn.create_multi_node_optimizer( self.actual_optimizer, self.comm) + self.optimizer.setup(self.target) self.optimizer.update() self.assertEqual(self.actual_optimizer.t, 0) @@ -197,13 +200,13 @@ def test_update_with_gpu(self): self.target.c = c if self.comm.rank == 0: self.target.c.W.data[:] = self.comm.rank + 2 + self.optimizer.setup(self.target) self.optimizer.update() self.assertEqual(self.actual_optimizer.t, 0) send_buf = chainer.cuda.to_cpu(self.optimizer.target.c.W.data) recv_buf = self.comm.mpi_comm.allgather(send_buf) for i in range(1, self.comm.size): - print(recv_buf[i], flush=True) chainer.testing.assert_allclose(recv_buf[0], recv_buf[i]) self.optimizer.target.a.W.grad[:] = self.comm.rank @@ -224,5 +227,5 @@ def test_update_with_gpu(self): chainer.testing.assert_allclose(self.optimizer.target.b.W.grad, (base + 1) * np.ones((4, 3))) chainer.testing.assert_allclose(self.optimizer.target.c.W.grad, - (base + 2) * np.ones((5, 4))) + (base + 2) * np.ones((4, 4))) From 62bebe197e15fbd6cfbe5cc048b7280ef1751484 Mon Sep 17 00:00:00 2001 From: ssuzuki Date: Wed, 13 Dec 2017 13:34:44 +0900 Subject: [PATCH 3/5] delte needs_broadcast --- chainermn/optimizers.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/chainermn/optimizers.py b/chainermn/optimizers.py index 1af8af45..66d033b2 100644 --- a/chainermn/optimizers.py +++ b/chainermn/optimizers.py @@ -5,8 +5,6 @@ def __init__(self, actual_optimizer, communicator): 'communicator', communicator) super(_MultiNodeOptimizer, self).__setattr__( 'actual_optimizer', actual_optimizer) - super(_MultiNodeOptimizer, self).__setattr__( - 'needs_broadcast', True) super(_MultiNodeOptimizer, self).__setattr__( 'target_params', []) @@ -24,8 +22,6 @@ def update(self, lossfun=None, *args, **kwds): if self.is_changed(target): self.communicator.broadcast_data(target) - super(_MultiNodeOptimizer, self).__setattr__( - 'needs_broadcast', False) else: self.communicator.allreduce_grad(target) self.actual_optimizer.update(None, *args, **kwds) From 1ac8f9e04862411b24c1f0511722325a984793f0 Mon Sep 17 00:00:00 2001 From: ssuzuki Date: Wed, 13 Dec 2017 16:20:05 +0900 Subject: [PATCH 4/5] fix flake8 errors --- chainermn/optimizers.py | 3 ++- .../test_multi_node_optimizer.py | 21 +++++++------------ 2 files changed, 9 insertions(+), 15 deletions(-) diff --git a/chainermn/optimizers.py b/chainermn/optimizers.py index 66d033b2..d5a0746c 100644 --- a/chainermn/optimizers.py +++ b/chainermn/optimizers.py @@ -29,7 +29,8 @@ def update(self, lossfun=None, *args, **kwds): def is_changed(self, target): previous_params = self.target_params super(_MultiNodeOptimizer, self).__setattr__( - 'target_params', [(name, param.data is not None) for name, param in sorted(target.namedparams())]) + 'target_params', [(name, param.data is not None) + for name, param in sorted(target.namedparams())]) if len(previous_params) == len(self.target_params): for param1, param2 in zip(self.target_params, previous_params): if param1[0] != param2[0]: diff --git a/tests/optimizer_tests/test_multi_node_optimizer.py b/tests/optimizer_tests/test_multi_node_optimizer.py index cd194085..fca64fa9 100644 --- a/tests/optimizer_tests/test_multi_node_optimizer.py +++ b/tests/optimizer_tests/test_multi_node_optimizer.py @@ -3,15 +3,10 @@ import chainer.testing.attr import chainermn import mock -import mpi4py.MPI -import nose import numpy as np import unittest -from chainermn.communicators import _communication_utility - - class ExampleModel(chainer.Chain): def __init__(self): @@ -109,14 +104,13 @@ def test_update_with_gpu(self): (base + 2) * np.ones((5, 4))) - class DynamicExampleModel(chainer.Chain): def __init__(self): super(DynamicExampleModel, self).__init__() with self.init_scope(): - self.a=chainer.links.Linear(2, 3) - self.b=chainer.links.Linear(3, 4) + self.a = chainer.links.Linear(2, 3) + self.b = chainer.links.Linear(3, 4) class TestMultiNodeOptimizerWithDynamicModel(unittest.TestCase): @@ -151,7 +145,7 @@ def test_update_with_cpu(self): self.optimizer.setup(self.target) self.optimizer.update() self.assertEqual(self.actual_optimizer.t, 0) - + with self.target.init_scope(): self.target.c = chainer.links.Linear(4, 4) if self.comm.rank == 0: @@ -159,7 +153,7 @@ def test_update_with_cpu(self): self.optimizer.setup(self.target) self.optimizer.update() self.assertEqual(self.actual_optimizer.t, 0) - + send_buf = chainer.cuda.to_cpu(self.optimizer.target.c.W.data) recv_buf = self.comm.mpi_comm.allgather(send_buf) for i in range(1, self.comm.size): @@ -193,17 +187,17 @@ def test_update_with_gpu(self): self.optimizer.setup(self.target) self.optimizer.update() self.assertEqual(self.actual_optimizer.t, 0) - + with self.target.init_scope(): c = chainer.links.Linear(4, 4) c.to_gpu() - self.target.c = c + self.target.c = c if self.comm.rank == 0: self.target.c.W.data[:] = self.comm.rank + 2 self.optimizer.setup(self.target) self.optimizer.update() self.assertEqual(self.actual_optimizer.t, 0) - + send_buf = chainer.cuda.to_cpu(self.optimizer.target.c.W.data) recv_buf = self.comm.mpi_comm.allgather(send_buf) for i in range(1, self.comm.size): @@ -228,4 +222,3 @@ def test_update_with_gpu(self): (base + 1) * np.ones((4, 3))) chainer.testing.assert_allclose(self.optimizer.target.c.W.grad, (base + 2) * np.ones((4, 4))) - From fa61449eecd162bf32f191c057cfbed580cd9c3e Mon Sep 17 00:00:00 2001 From: ssuzuki Date: Fri, 15 Dec 2017 10:48:49 +0900 Subject: [PATCH 5/5] refactoring --- chainermn/optimizers.py | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/chainermn/optimizers.py b/chainermn/optimizers.py index d5a0746c..9879f34b 100644 --- a/chainermn/optimizers.py +++ b/chainermn/optimizers.py @@ -31,16 +31,14 @@ def is_changed(self, target): super(_MultiNodeOptimizer, self).__setattr__( 'target_params', [(name, param.data is not None) for name, param in sorted(target.namedparams())]) - if len(previous_params) == len(self.target_params): - for param1, param2 in zip(self.target_params, previous_params): - if param1[0] != param2[0]: - return True - if param1[1] != param2[1]: - return True - return False - else: + if len(previous_params) != len(self.target_params): return True + for param1, param2 in zip(self.target_params, previous_params): + if (param1[0] != param2[0]) or param1[1] != param2[1]: + return True + return False + def __getattr__(self, attr_name): return getattr(self.actual_optimizer, attr_name)